use std::collections::{HashMap, HashSet, VecDeque};
use std::time::Duration;
use either::Either;
use rtlola_frontend::mir::{
InputReference, MemorizationBound, OutputReference, OutputStream, RtLolaMir, Stream, StreamReference, Type,
WindowOperation, WindowReference,
};
use super::Value;
use crate::storage::SlidingWindow;
use crate::Time;
pub(crate) struct InstanceCollection {
instances: HashMap<Vec<Value>, InstanceStore>,
fresh: HashSet<Vec<Value>>,
spawned: HashSet<Vec<Value>>,
closed: HashSet<Vec<Value>>,
value_type: Type,
bound: MemorizationBound,
}
impl InstanceCollection {
pub(crate) fn new(ty: &Type, bound: MemorizationBound) -> Self {
InstanceCollection {
instances: HashMap::new(),
fresh: HashSet::new(),
spawned: HashSet::new(),
closed: HashSet::new(),
value_type: ty.clone(),
bound,
}
}
pub(crate) fn instance(&self, parameter: &[Value]) -> Option<&InstanceStore> {
self.instances.get(parameter)
}
pub(crate) fn instance_mut(&mut self, parameter: &[Value]) -> Option<&mut InstanceStore> {
self.fresh.insert(parameter.to_vec());
self.instances.get_mut(parameter)
}
pub(crate) fn create_instance(&mut self, parameter: &[Value]) -> Option<&InstanceStore> {
if !self.instances.contains_key(parameter) {
self.spawned.insert(parameter.to_vec());
self.instances.insert(
parameter.to_vec(),
InstanceStore::new(&self.value_type, self.bound, true),
);
self.instances.get(parameter)
} else {
None
}
}
pub(crate) fn mark_for_deletion(&mut self, parameter: &[Value]) {
debug_assert!(self.instances.contains_key(parameter));
self.closed.insert(parameter.to_vec());
}
pub(crate) fn delete_instances(&mut self) {
for inst in self.closed.iter() {
self.instances.remove(inst);
}
}
pub(crate) fn all_instances(&self) -> Vec<Vec<Value>> {
self.instances.keys().cloned().collect()
}
pub(crate) fn fresh(&self) -> impl Iterator<Item = &Vec<Value>> {
self.fresh.iter()
}
pub(crate) fn is_fresh(&self, parameter: &[Value]) -> bool {
self.fresh.contains(parameter)
}
pub(crate) fn spawned(&self) -> impl Iterator<Item = &Vec<Value>> {
self.spawned.iter()
}
pub(crate) fn closed(&self) -> impl Iterator<Item = &Vec<Value>> {
self.closed.iter()
}
pub(crate) fn new_cycle(&mut self) {
self.fresh.clear();
self.spawned.clear();
self.closed.clear();
}
pub(crate) fn contains(&self, parameter: &[Value]) -> bool {
self.instances.contains_key(parameter)
}
}
pub(crate) struct SlidingWindowCollection {
windows: HashMap<Vec<Value>, SlidingWindow>,
duration: Either<Duration, usize>,
wait: bool,
op: WindowOperation,
ty: Type,
}
impl SlidingWindowCollection {
pub(crate) fn new_for_sliding(dur: Duration, wait: bool, op: WindowOperation, ty: &Type) -> Self {
SlidingWindowCollection {
windows: HashMap::new(),
duration: Either::Left(dur),
wait,
op,
ty: ty.clone(),
}
}
pub(crate) fn new_for_discrete(dur: usize, wait: bool, op: WindowOperation, ty: &Type) -> Self {
SlidingWindowCollection {
windows: HashMap::new(),
duration: Either::Right(dur),
wait,
op,
ty: ty.clone(),
}
}
pub(crate) fn create_window(&mut self, parameters: &[Value], start_time: Time) -> Option<&mut SlidingWindow> {
if !self.windows.contains_key(parameters) {
let window = match self.duration {
Either::Left(dur) => SlidingWindow::from_sliding(dur, self.wait, self.op, start_time, &self.ty, false),
Either::Right(dur) => {
SlidingWindow::from_discrete(dur, self.wait, self.op, start_time, &self.ty, false)
},
};
self.windows.insert(parameters.to_vec(), window);
self.windows.get_mut(parameters)
} else {
None
}
}
pub(crate) fn get_or_create(&mut self, parameters: &[Value], start_time: Time) -> &mut SlidingWindow {
if self.windows.contains_key(parameters) {
self.window_mut(parameters).unwrap()
} else {
self.create_window(parameters, start_time).unwrap()
}
}
pub(crate) fn window(&self, parameter: &[Value]) -> Option<&SlidingWindow> {
self.windows.get(parameter)
}
pub(crate) fn window_mut(&mut self, parameter: &[Value]) -> Option<&mut SlidingWindow> {
self.windows.get_mut(parameter)
}
pub(crate) fn delete_window(&mut self, parameter: &[Value]) {
debug_assert!(self.windows.contains_key(parameter));
self.windows.remove(parameter);
}
pub(crate) fn update_all(&mut self, ts: Time) {
self.windows.iter_mut().for_each(|(_, w)| {
if w.is_active() {
w.update(ts)
}
});
}
}
pub(crate) struct GlobalStore {
inputs: Vec<InstanceStore>,
stream_index_map: Vec<usize>,
np_outputs: Vec<InstanceStore>,
p_outputs: Vec<InstanceCollection>,
window_index_map: Vec<usize>,
np_windows: Vec<SlidingWindow>,
p_windows: Vec<SlidingWindowCollection>,
discrete_window_index_map: Vec<usize>,
np_discrete_windows: Vec<SlidingWindow>,
p_discrete_windows: Vec<SlidingWindowCollection>,
}
impl GlobalStore {
pub(crate) fn new(ir: &RtLolaMir, ts: Time) -> GlobalStore {
let mut stream_index_map: Vec<Option<usize>> = vec![None; ir.outputs.len()];
let (ps, nps): (Vec<&OutputStream>, Vec<&OutputStream>) = ir.outputs.iter().partition(|o| o.is_parameterized());
let nps_refs: Vec<StreamReference> = nps.iter().map(|o| o.reference).collect();
for (np_ix, o) in nps.iter().enumerate() {
stream_index_map[o.reference.out_ix()] = Some(np_ix);
}
for (p_ix, o) in ps.iter().enumerate() {
stream_index_map[o.reference.out_ix()] = Some(p_ix);
}
debug_assert!(stream_index_map.iter().all(Option::is_some));
let stream_index_map = stream_index_map.into_iter().flatten().collect();
let mut window_index_map: Vec<Option<usize>> = vec![None; ir.sliding_windows.len()];
let (np_windows, p_windows): (
Vec<&rtlola_frontend::mir::SlidingWindow>,
Vec<&rtlola_frontend::mir::SlidingWindow>,
) = ir
.sliding_windows
.iter()
.partition(|w| w.target.is_input() || nps_refs.contains(&w.target));
for (ix, w) in np_windows.iter().enumerate() {
window_index_map[w.reference.idx()] = Some(ix);
}
for (ix, w) in p_windows.iter().enumerate() {
window_index_map[w.reference.idx()] = Some(ix);
}
debug_assert!(window_index_map.iter().all(Option::is_some));
let window_index_map = window_index_map.into_iter().flatten().collect();
let mut discrete_window_index_map: Vec<Option<usize>> = vec![None; ir.discrete_windows.len()];
let (np_discrete_windows, p_discrete_windows): (
Vec<&rtlola_frontend::mir::DiscreteWindow>,
Vec<&rtlola_frontend::mir::DiscreteWindow>,
) = ir
.discrete_windows
.iter()
.partition(|w| w.target.is_input() || nps_refs.contains(&w.target));
for (ix, w) in np_discrete_windows.iter().enumerate() {
discrete_window_index_map[w.reference.idx()] = Some(ix);
}
for (ix, w) in p_discrete_windows.iter().enumerate() {
discrete_window_index_map[w.reference.idx()] = Some(ix);
}
debug_assert!(discrete_window_index_map.iter().all(Option::is_some));
let discrete_window_index_map = discrete_window_index_map.into_iter().flatten().collect();
let np_outputs = nps
.iter()
.map(|o| InstanceStore::new(&o.ty, o.memory_bound, !o.is_spawned()))
.collect();
let p_outputs = ps
.iter()
.map(|o| InstanceCollection::new(&o.ty, o.memory_bound))
.collect();
let inputs = ir
.inputs
.iter()
.map(|i| InstanceStore::new(&i.ty, i.memory_bound, true))
.collect();
let np_windows = np_windows
.iter()
.map(|w| {
SlidingWindow::from_sliding(w.duration, w.wait, w.op, ts, &w.ty, !ir.stream(w.target).is_spawned())
})
.collect();
let p_windows = p_windows
.iter()
.map(|w| SlidingWindowCollection::new_for_sliding(w.duration, w.wait, w.op, &w.ty))
.collect();
let np_discrete_windows = np_discrete_windows
.iter()
.map(|w| {
SlidingWindow::from_discrete(w.duration, w.wait, w.op, ts, &w.ty, !ir.stream(w.target).is_spawned())
})
.collect();
let p_discrete_windows = p_discrete_windows
.iter()
.map(|w| SlidingWindowCollection::new_for_discrete(w.duration, w.wait, w.op, &w.ty))
.collect();
GlobalStore {
inputs,
stream_index_map,
np_outputs,
p_outputs,
window_index_map,
np_windows,
p_windows,
discrete_window_index_map,
np_discrete_windows,
p_discrete_windows,
}
}
pub(crate) fn get_in_instance(&self, inst: InputReference) -> &InstanceStore {
let ix = inst;
&self.inputs[ix]
}
pub(crate) fn get_in_instance_mut(&mut self, inst: InputReference) -> &mut InstanceStore {
let ix = inst;
&mut self.inputs[ix]
}
pub(crate) fn get_out_instance(&self, inst: OutputReference) -> &InstanceStore {
let ix = inst;
&self.np_outputs[self.stream_index_map[ix]]
}
pub(crate) fn get_out_instance_collection(&self, inst: OutputReference) -> &InstanceCollection {
let ix = inst;
&self.p_outputs[self.stream_index_map[ix]]
}
pub(crate) fn get_out_instance_mut(&mut self, inst: OutputReference) -> &mut InstanceStore {
let ix = inst;
&mut self.np_outputs[self.stream_index_map[ix]]
}
pub(crate) fn get_out_instance_collection_mut(&mut self, inst: OutputReference) -> &mut InstanceCollection {
let ix = inst;
&mut self.p_outputs[self.stream_index_map[ix]]
}
pub(crate) fn get_window(&self, window: WindowReference) -> &SlidingWindow {
match window {
WindowReference::Sliding(x) => &self.np_windows[self.window_index_map[x]],
WindowReference::Discrete(x) => &self.np_discrete_windows[self.discrete_window_index_map[x]],
}
}
pub(crate) fn get_window_mut(&mut self, window: WindowReference) -> &mut SlidingWindow {
match window {
WindowReference::Sliding(x) => &mut self.np_windows[self.window_index_map[x]],
WindowReference::Discrete(x) => &mut self.np_discrete_windows[self.discrete_window_index_map[x]],
}
}
pub(crate) fn get_window_collection_mut(&mut self, window: WindowReference) -> &mut SlidingWindowCollection {
match window {
WindowReference::Sliding(x) => &mut self.p_windows[self.window_index_map[x]],
WindowReference::Discrete(x) => &mut self.p_discrete_windows[self.discrete_window_index_map[x]],
}
}
pub(crate) fn new_cycle(&mut self) {
self.p_outputs.iter_mut().for_each(|is| is.new_cycle())
}
}
#[derive(Clone, Debug)]
pub(crate) struct InstanceStore {
buffer: VecDeque<Value>,
bound: MemorizationBound,
active: bool,
}
const SIZE: usize = 256;
impl InstanceStore {
pub(crate) fn new(_type: &Type, bound: MemorizationBound, active: bool) -> InstanceStore {
match bound {
MemorizationBound::Bounded(limit) => {
InstanceStore {
buffer: VecDeque::with_capacity(limit as usize),
bound,
active,
}
},
MemorizationBound::Unbounded => {
InstanceStore {
buffer: VecDeque::with_capacity(SIZE),
bound,
active,
}
},
}
}
pub(crate) fn get_value(&self, offset: i16) -> Option<Value> {
assert!(offset <= 0);
if !self.active {
return None;
}
if offset == 0 {
self.buffer.front().cloned()
} else {
let offset = offset.unsigned_abs() as usize;
self.buffer.get(offset).cloned()
}
}
pub(crate) fn push_value(&mut self, v: Value) {
assert!(self.active);
if let MemorizationBound::Bounded(limit) = self.bound {
if self.buffer.len() == limit as usize {
self.buffer.pop_back();
}
}
self.buffer.push_front(v);
}
pub(crate) fn activate(&mut self) {
self.active = true;
}
pub(crate) fn is_active(&self) -> bool {
self.active
}
pub(crate) fn deactivate(&mut self) {
self.active = false;
self.buffer.clear()
}
}