rtlola_interpreter/
evaluator.rs

1use std::cell::RefCell;
2use std::collections::HashMap;
3use std::ops::Not;
4use std::rc::Rc;
5use std::time::Duration;
6
7use bit_set::BitSet;
8use itertools::Itertools;
9use num::traits::Inv;
10use rtlola_frontend::mir::{
11    ActivationCondition as Activation, InputReference, OutputKind, OutputReference, PacingLocality,
12    PacingType, RtLolaMir, Stream, StreamReference, Task, TimeDrivenStream, Trigger,
13    TriggerReference, WindowReference,
14};
15use uom::si::rational64::Time as UOM_Time;
16use uom::si::time::nanosecond;
17
18use crate::api::monitor::{Change, Instance};
19use crate::closuregen::{CompiledExpr, Expr};
20use crate::monitor::{Parameters, Tracer};
21use crate::schedule::{DynamicSchedule, EvaluationTask};
22use crate::storage::{
23    GlobalStore, InstanceAggregationTrait, InstanceStore, Value, WindowParameterization,
24    WindowParameterizationKind,
25};
26use crate::Time;
27
28/// Enum to describe the activation condition of a stream; If the activation condition is described by a conjunction, the evaluator uses a bitset representation.
29#[derive(Debug)]
30pub(crate) enum ActivationConditionOp {
31    TimeDriven,
32    True,
33    Conjunction(BitSet),
34    General(Activation),
35}
36
37pub(crate) struct EvaluatorData {
38    // Evaluation order of output streams
39    layers: Vec<Vec<Task>>,
40    // Accessed by stream index
41    stream_activation_conditions: Vec<ActivationConditionOp>,
42    spawn_activation_conditions: Vec<ActivationConditionOp>,
43    close_activation_conditions: Vec<ActivationConditionOp>,
44    stream_windows: HashMap<StreamReference, Vec<WindowReference>>,
45    stream_instance_aggregations: HashMap<StreamReference, Vec<WindowReference>>,
46    global_store: GlobalStore,
47    fresh_inputs: BitSet,
48    fresh_outputs: BitSet,
49    spawned_outputs: BitSet,
50    closed_outputs: BitSet,
51    fresh_triggers: BitSet,
52    triggers: Vec<Option<Trigger>>,
53    time_driven_streams: Vec<Option<TimeDrivenStream>>,
54    closing_streams: Vec<OutputReference>,
55    ir: RtLolaMir,
56    dyn_schedule: Rc<RefCell<DynamicSchedule>>,
57}
58
59#[allow(missing_debug_implementations)]
60pub(crate) struct Evaluator {
61    // Evaluation order of output streams
62    layers: &'static [Vec<Task>],
63    // Indexed by stream reference.
64    stream_activation_conditions: &'static [ActivationConditionOp],
65    // Indexed by output reference
66    spawn_activation_conditions: &'static [ActivationConditionOp],
67    // Indexed by output reference
68    close_activation_conditions: &'static [ActivationConditionOp],
69    // Accessed by stream index
70    // If Value::None is returned by an expression the filter was false
71    compiled_stream_exprs: Vec<CompiledExpr>,
72    // Accessed by stream index
73    // If Value::None is returned, the spawn condition was false.
74    // If a stream has no spawn target, then an empty tuple is returned if the condition is true
75    compiled_spawn_exprs: Vec<CompiledExpr>,
76    // Accessed by stream index
77    compiled_close_exprs: Vec<CompiledExpr>,
78    stream_windows: &'static HashMap<StreamReference, Vec<WindowReference>>,
79    stream_instance_aggregations: &'static HashMap<StreamReference, Vec<WindowReference>>,
80    global_store: &'static mut GlobalStore,
81    fresh_inputs: &'static mut BitSet,
82    fresh_outputs: &'static mut BitSet,
83    spawned_outputs: &'static mut BitSet,
84    closed_outputs: &'static mut BitSet,
85    fresh_triggers: &'static mut BitSet,
86    // Indexed by output reference
87    triggers: &'static [Option<Trigger>],
88    // Indexed by output reference
89    time_driven_streams: &'static [Option<TimeDrivenStream>],
90
91    closing_streams: &'static [OutputReference],
92    ir: &'static RtLolaMir,
93    dyn_schedule: &'static RefCell<DynamicSchedule>,
94    raw_data: *mut EvaluatorData,
95}
96
97pub(crate) struct EvaluationContext<'e> {
98    ts: Time,
99    global_store: &'e GlobalStore,
100    fresh_inputs: &'e BitSet,
101    fresh_outputs: &'e BitSet,
102    pub(crate) parameter: &'e [Value],
103    pub(crate) lambda_parameter: Option<&'e [Value]>,
104}
105
106impl EvaluatorData {
107    pub(crate) fn new(ir: RtLolaMir, dyn_schedule: Rc<RefCell<DynamicSchedule>>) -> Self {
108        // Layers of event based output streams
109        let layers: Vec<Vec<Task>> = ir.get_event_driven_layers();
110        let closing_streams = ir
111            .outputs
112            .iter()
113            .filter(|s| s.close.condition.is_some())
114            .map(|s| s.reference.out_ix())
115            .collect();
116        let stream_acs = ir
117            .outputs
118            .iter()
119            .map(|o| match &o.eval.eval_pacing {
120                PacingType::GlobalPeriodic(_) | PacingType::LocalPeriodic(_) => {
121                    ActivationConditionOp::TimeDriven
122                }
123                PacingType::Event(ac) => ActivationConditionOp::new(ac, ir.inputs.len()),
124                PacingType::Constant => ActivationConditionOp::True,
125            })
126            .collect();
127        let spawn_acs = ir
128            .outputs
129            .iter()
130            .map(|o| match &o.spawn.pacing {
131                PacingType::GlobalPeriodic(_) | PacingType::LocalPeriodic(_) => {
132                    ActivationConditionOp::TimeDriven
133                }
134                PacingType::Event(ac) => ActivationConditionOp::new(ac, ir.inputs.len()),
135                PacingType::Constant => ActivationConditionOp::True,
136            })
137            .collect();
138        let close_acs = ir
139            .outputs
140            .iter()
141            .map(|o| match &o.close.pacing {
142                PacingType::GlobalPeriodic(_) | PacingType::LocalPeriodic(_) => {
143                    ActivationConditionOp::TimeDriven
144                }
145                PacingType::Event(ac) => ActivationConditionOp::new(ac, ir.inputs.len()),
146                PacingType::Constant => ActivationConditionOp::True,
147            })
148            .collect();
149
150        let global_store = GlobalStore::new(&ir);
151        let fresh_inputs = BitSet::with_capacity(ir.inputs.len());
152        let fresh_outputs = BitSet::with_capacity(ir.outputs.len());
153        let spawned_outputs = BitSet::with_capacity(ir.outputs.len());
154        let closed_outputs = BitSet::with_capacity(ir.outputs.len());
155        let fresh_triggers = BitSet::with_capacity(ir.outputs.len());
156        let mut triggers = vec![None; ir.outputs.len()];
157
158        let stream_windows = ir
159            .sliding_windows
160            .iter()
161            .map(|w| (w.caller, w.reference))
162            .chain(ir.discrete_windows.iter().map(|w| (w.caller, w.reference)))
163            .into_group_map();
164
165        let stream_instance_aggregations = ir
166            .instance_aggregations
167            .iter()
168            .map(|ia| (ia.target, ia.reference))
169            .into_group_map();
170
171        for t in &ir.triggers {
172            triggers[t.output_reference.out_ix()] = Some(*t);
173        }
174        let mut time_driven_streams = vec![None; ir.outputs.len()];
175        for t in &ir.time_driven {
176            time_driven_streams[t.reference.out_ix()] = Some(*t);
177        }
178        EvaluatorData {
179            layers,
180            stream_activation_conditions: stream_acs,
181            spawn_activation_conditions: spawn_acs,
182            close_activation_conditions: close_acs,
183            stream_windows,
184            stream_instance_aggregations,
185            global_store,
186            fresh_inputs,
187            fresh_outputs,
188            spawned_outputs,
189            closed_outputs,
190            fresh_triggers,
191            triggers,
192            time_driven_streams,
193            closing_streams,
194            ir,
195            dyn_schedule,
196        }
197    }
198
199    pub(crate) fn into_evaluator(self) -> Evaluator {
200        let mut on_heap = Box::new(self);
201        // Store pointer to data so we can delete it in implementation of Drop trait.
202        // This is necessary since we leak the evaluator data.
203        let heap_ptr: *mut EvaluatorData = &mut *on_heap;
204        let leaked_data: &'static mut EvaluatorData = Box::leak(on_heap);
205
206        //Compile expressions
207        let compiled_stream_exprs = leaked_data
208            .ir
209            .outputs
210            .iter()
211            .map(|o| {
212                let clauses = o
213                    .eval
214                    .clauses
215                    .iter()
216                    .map(|clause| {
217                        let exp = match &clause.condition {
218                        None => clause.expression.clone().compile(),
219                        Some(filter_exp) => CompiledExpr::create_filter(
220                            filter_exp.clone().compile(),
221                            clause.expression.clone().compile(),
222                        ),
223                    };
224                    if clause.pacing != o.eval.eval_pacing {
225                        if let PacingType::Event(ac) = &clause.pacing {
226                        CompiledExpr::create_activation(exp, ActivationConditionOp::new(ac, leaked_data.ir.inputs.len()))
227                        } else {
228                            unreachable!("different pacing types of multiple eval clauses are only supported for event-driven streams. This is ensured by the frontend.")
229                        }
230                    } else {
231                        exp
232                    }
233                })
234                    .collect::<Vec<_>>();
235                if clauses.len() == 1 {
236                    clauses.into_iter().next().expect("has exactly one element")
237                } else {
238                    CompiledExpr::create_clauses(clauses)
239                }
240            })
241            .collect();
242
243        let compiled_spawn_exprs = leaked_data
244            .ir
245            .outputs
246            .iter()
247            .map(
248                |o| match (o.spawn.expression.as_ref(), o.spawn.condition.as_ref()) {
249                    (None, None) => CompiledExpr::new(|_| Value::Tuple(vec![].into_boxed_slice())),
250                    (Some(target), None) => target.clone().compile(),
251                    (None, Some(condition)) => CompiledExpr::create_filter(
252                        condition.clone().compile(),
253                        CompiledExpr::new(|_| Value::Tuple(vec![].into_boxed_slice())),
254                    ),
255                    (Some(target), Some(condition)) => CompiledExpr::create_filter(
256                        condition.clone().compile(),
257                        target.clone().compile(),
258                    ),
259                },
260            )
261            .collect();
262
263        let compiled_close_exprs = leaked_data
264            .ir
265            .outputs
266            .iter()
267            .map(|o| {
268                o.close
269                    .condition
270                    .as_ref()
271                    .map_or(CompiledExpr::new(|_| Value::None), |e| e.clone().compile())
272            })
273            .collect();
274
275        Evaluator {
276            layers: &leaked_data.layers,
277            stream_activation_conditions: &leaked_data.stream_activation_conditions,
278            spawn_activation_conditions: &leaked_data.spawn_activation_conditions,
279            close_activation_conditions: &leaked_data.close_activation_conditions,
280            compiled_stream_exprs,
281            compiled_spawn_exprs,
282            compiled_close_exprs,
283            stream_windows: &leaked_data.stream_windows,
284            stream_instance_aggregations: &leaked_data.stream_instance_aggregations,
285            global_store: &mut leaked_data.global_store,
286            fresh_inputs: &mut leaked_data.fresh_inputs,
287            fresh_outputs: &mut leaked_data.fresh_outputs,
288            spawned_outputs: &mut leaked_data.spawned_outputs,
289            closed_outputs: &mut leaked_data.closed_outputs,
290            fresh_triggers: &mut leaked_data.fresh_triggers,
291            triggers: &leaked_data.triggers,
292            time_driven_streams: &leaked_data.time_driven_streams,
293            closing_streams: &leaked_data.closing_streams,
294            ir: &leaked_data.ir,
295            dyn_schedule: &leaked_data.dyn_schedule,
296            raw_data: heap_ptr,
297        }
298    }
299}
300
301impl Drop for Evaluator {
302    #[allow(unsafe_code)]
303    fn drop(&mut self) {
304        drop(unsafe { Box::from_raw(self.raw_data) });
305    }
306}
307
308impl Evaluator {
309    /// Values of event are expected in the order of the input streams
310    /// Time should be relative to the starting time of the monitor
311    pub(crate) fn eval_event(&mut self, event: &[Value], ts: Time, tracer: &mut impl Tracer) {
312        self.new_cycle(ts);
313        self.accept_inputs(event, ts);
314        self.eval_event_driven(ts, tracer);
315    }
316
317    /// NOT for external use because the values are volatile
318    pub(crate) fn peek_fresh_outputs(&self) -> Vec<(OutputReference, Vec<Change>)> {
319        self.ir
320            .outputs
321            .iter()
322            .filter_map(|o| {
323                let stream = o.reference;
324                let out_ix = o.reference.out_ix();
325                let changes = if o.is_parameterized() {
326                    let instances = self.global_store.get_out_instance_collection(out_ix);
327                    instances
328                        .spawned()
329                        .map(|p| Change::Spawn(p.clone()))
330                        .chain(instances.fresh().map(|p| {
331                            Change::Value(
332                                Some(p.clone()),
333                                self.peek_value(stream, p, 0).expect("Marked as fresh"),
334                            )
335                        }))
336                        .chain(instances.closed().map(|p| Change::Close(p.clone())))
337                        .collect()
338                } else if o.is_spawned() {
339                    let mut res = Vec::new();
340                    if self.spawned_outputs.contains(out_ix) {
341                        res.push(Change::Spawn(vec![]));
342                    }
343                    if self.fresh_outputs.contains(out_ix) {
344                        res.push(Change::Value(
345                            Some(vec![]),
346                            self.peek_value(stream, &[], 0).expect("Marked as fresh"),
347                        ));
348                    }
349                    if self.closed_outputs.contains(out_ix) {
350                        res.push(Change::Close(vec![]));
351                    }
352                    res
353                } else if self.fresh_outputs.contains(out_ix) {
354                    vec![Change::Value(
355                        None,
356                        self.peek_value(stream, &[], 0).expect("Marked as fresh"),
357                    )]
358                } else {
359                    vec![]
360                };
361                changes
362                    .is_empty()
363                    .not()
364                    .then(|| (o.reference.out_ix(), changes))
365            })
366            .collect()
367    }
368
369    /// NOT for external use because the values are volatile
370    pub(crate) fn peek_violated_triggers_messages(
371        &self,
372    ) -> Vec<(OutputReference, Parameters, String)> {
373        self.peek_fresh_outputs()
374            .into_iter()
375            .filter(|(o_ref, _)| matches!(self.ir.outputs[*o_ref].kind, OutputKind::Trigger(_)))
376            .flat_map(|(o_ref, changes)| {
377                changes.into_iter().filter_map(move |change| match change {
378                    Change::Value(parameters, Value::Str(msg)) => {
379                        Some((o_ref, parameters, msg.into()))
380                    }
381                    Change::Value(_, _) => {
382                        unreachable!("trigger values are strings; checked by the frontend")
383                    }
384                    _ => None,
385                })
386            })
387            .collect()
388    }
389
390    /// NOT for external use because the values are volatile
391    pub(crate) fn peek_violated_triggers(&self) -> Vec<TriggerReference> {
392        self.fresh_triggers.iter().collect()
393    }
394
395    /// NOT for external use because the values are volatile
396    pub(crate) fn peek_fresh_input(&self) -> Vec<(InputReference, Value)> {
397        self.fresh_inputs
398            .iter()
399            .map(|i| {
400                (
401                    i,
402                    self.peek_value(StreamReference::In(i), &[], 0)
403                        .expect("Marked as fresh"),
404                )
405            })
406            .collect()
407    }
408
409    /// NOT for external use because the values are volatile
410    pub(crate) fn peek_inputs(&self) -> Vec<Option<Value>> {
411        self.ir
412            .inputs
413            .iter()
414            .map(|elem| self.peek_value(elem.reference, &[], 0))
415            .collect()
416    }
417
418    /// NOT for external use because the values are volatile
419    pub(crate) fn peek_outputs(&self) -> Vec<Vec<Instance>> {
420        self.ir
421            .outputs
422            .iter()
423            .map(|elem| {
424                if elem.is_parameterized() {
425                    let ix = elem.reference.out_ix();
426                    let values: Vec<Instance> = self
427                        .global_store
428                        .get_out_instance_collection(ix)
429                        .all_parameter()
430                        .map(|para| {
431                            (
432                                Some(para.clone()),
433                                self.peek_value(elem.reference, para.as_ref(), 0),
434                            )
435                        })
436                        .collect();
437                    values
438                } else if elem.is_spawned() {
439                    vec![(Some(vec![]), self.peek_value(elem.reference, &[], 0))]
440                } else {
441                    vec![(None, self.peek_value(elem.reference, &[], 0))]
442                }
443            })
444            .collect()
445    }
446
447    fn accept_inputs(&mut self, event: &[Value], ts: Time) {
448        for (ix, v) in event.iter().enumerate() {
449            match v {
450                Value::None => {}
451                v => self.accept_input(ix, v.clone(), ts),
452            }
453        }
454    }
455
456    fn accept_input(&mut self, input: InputReference, v: Value, ts: Time) {
457        self.global_store
458            .get_in_instance_mut(input)
459            .push_value(v.clone());
460        self.fresh_inputs.insert(input);
461        let extended = &self.ir.inputs[input];
462        for (_sr, _origin, win) in extended.aggregated_by.iter().filter(|(_, _, w)| {
463            matches!(
464                w,
465                WindowReference::Sliding(_) | WindowReference::Discrete(_)
466            )
467        }) {
468            self.extend_window(&[], *win, v.clone(), ts);
469        }
470    }
471
472    fn eval_event_driven(&mut self, ts: Time, tracer: &mut impl Tracer) {
473        self.prepare_evaluation(ts);
474        for layer in self.layers {
475            self.eval_event_driven_layer(layer, ts, tracer);
476        }
477        for close in self.closing_streams {
478            let ac = &self.close_activation_conditions[*close];
479            if ac.is_eventdriven() && ac.eval(self.fresh_inputs) {
480                self.eval_close_instances(*close, ts, tracer);
481            }
482        }
483    }
484
485    fn eval_event_driven_layer(&mut self, tasks: &[Task], ts: Time, tracer: &mut impl Tracer) {
486        for task in tasks {
487            match task {
488                Task::Evaluate(idx) => self.eval_event_driven_output(*idx, ts, tracer),
489                Task::Spawn(idx) => self.eval_event_driven_spawn(*idx, ts, tracer),
490                Task::Close(_) => unreachable!("closes are not included in evaluation layer"),
491            }
492        }
493    }
494
495    fn eval_spawn(&mut self, output: OutputReference, ts: Time) {
496        let stream = self.ir.output(StreamReference::Out(output));
497        debug_assert!(
498            stream.is_spawned(),
499            "tried to spawn stream that should not be spawned"
500        );
501
502        let expr = self.compiled_spawn_exprs[output].clone();
503        let parameter = vec![];
504        let ctx = self.as_EvaluationContext(&parameter, ts);
505        let res = expr.execute(&ctx);
506
507        let parameter_values = match res {
508            Value::None => return, // spawn condition evaluated to false
509            Value::Tuple(paras) => paras.to_vec(),
510            x => vec![x],
511        };
512
513        if stream.is_parameterized() {
514            debug_assert!(!parameter_values.is_empty());
515            let instances = self.global_store.get_out_instance_collection_mut(output);
516            if instances.contains(parameter_values.as_slice()) {
517                // instance already exists -> nothing to do
518                return;
519            }
520            instances.create_instance(parameter_values.as_slice());
521        } else {
522            debug_assert!(parameter_values.is_empty());
523            let inst = self.global_store.get_out_instance_mut(output);
524            if inst.is_active() {
525                // instance already exists -> nothing to do
526                return;
527            }
528            inst.activate();
529        }
530
531        self.spawn_windows(output, parameter_values.as_slice(), ts);
532
533        // Schedule instance evaluation if stream is periodic
534        if let Some(tds) = self.time_driven_streams[output] {
535            let mut schedule = (*self.dyn_schedule).borrow_mut();
536
537            // Schedule eval if it has local pacing
538            if tds.locality == PacingLocality::Local {
539                schedule.schedule_evaluation(
540                    output,
541                    parameter_values.as_slice(),
542                    ts,
543                    tds.period_in_duration(),
544                );
545            }
546
547            // Schedule close if it has local pacing
548            if let PacingType::LocalPeriodic(f) = stream.close.pacing {
549                let period = Duration::from_nanos(
550                    UOM_Time::new::<uom::si::time::second>(
551                        f.get::<uom::si::frequency::hertz>().inv(),
552                    )
553                    .get::<nanosecond>()
554                    .to_integer()
555                    .try_into()
556                    .expect("Period [ns] too large for u64!"),
557                );
558                schedule.schedule_close(output, parameter_values.as_slice(), ts, period);
559            }
560        }
561
562        self.spawned_outputs.insert(output);
563    }
564
565    fn spawn_windows(&mut self, stream: OutputReference, parameter_values: &[Value], ts: Time) {
566        let stream = &self.ir.outputs[stream];
567        let own_windows: Vec<WindowReference> = self
568            .stream_windows
569            .get(&stream.reference)
570            .map(|windows| windows.to_vec())
571            .unwrap_or_default();
572
573        //activate windows of this stream
574        for win_ref in own_windows {
575            let WindowParameterization { kind, global } = self.window_parameterization(win_ref);
576            let target = self.ir.window(win_ref).target();
577            // Self is the caller of the window
578            match (kind, global) {
579                (WindowParameterizationKind::None | WindowParameterizationKind::Caller, true) => {
580                    // Only a single window with a global clock exists. nothing to do...
581                }
582                (WindowParameterizationKind::None, false) => {
583                    // Caller is spawned but not parameterized
584                    // activate single window now
585                    let (inst, fresh) = match target {
586                        StreamReference::In(ix) => (
587                            self.global_store.get_in_instance(ix),
588                            self.fresh_inputs.contains(ix),
589                        ),
590                        StreamReference::Out(ix) => (
591                            self.global_store.get_out_instance(ix),
592                            self.fresh_outputs.contains(ix),
593                        ),
594                    };
595                    let target_value = fresh.then(|| inst.get_value(0).unwrap());
596                    let window = self.global_store.get_window_mut(win_ref);
597
598                    if !window.is_active() {
599                        window.activate(ts);
600                        if let Some(val) = target_value {
601                            window.accept_value(val, ts);
602                        }
603                    }
604                }
605                (WindowParameterizationKind::Target, false) => {
606                    // Caller is spawned and Target is parameterized; window evaluates at local clock
607                    // activate all windows registered now
608                    let fresh_values = self
609                        .global_store
610                        .get_out_instance_collection(target.out_ix())
611                        .fresh_values();
612                    self.global_store
613                        .get_window_collection_mut(win_ref)
614                        .activate_all(fresh_values, ts, ts);
615                }
616                (WindowParameterizationKind::Caller, false) => {
617                    // Caller is parameterized and windows are evaluated at local clock
618                    // Create and activate window instance of the spawned instance
619                    let (inst, fresh) = match target {
620                        StreamReference::In(ix) => (
621                            self.global_store.get_in_instance(ix),
622                            self.fresh_inputs.contains(ix),
623                        ),
624                        StreamReference::Out(ix) => (
625                            self.global_store.get_out_instance(ix),
626                            self.fresh_outputs.contains(ix),
627                        ),
628                    };
629                    let target_value = fresh.then(|| inst.get_value(0).unwrap());
630                    let windows = self.global_store.get_window_collection_mut(win_ref);
631                    let window = windows.get_or_create(parameter_values, ts);
632                    // Check if target of window produced a value in this iteration and add the value
633                    if !window.is_active() {
634                        window.activate(ts);
635                        if let Some(val) = target_value {
636                            window.accept_value(val, ts);
637                        }
638                    }
639                }
640                (WindowParameterizationKind::Both, false) => {
641                    // target and caller are parameterized and windows are evaluated at local clock
642                    // create and activate a new set of sliding windows over all available target instances
643                    let fresh_values = self
644                        .global_store
645                        .get_out_instance_collection(target.out_ix())
646                        .fresh_values();
647                    let windows = self
648                        .global_store
649                        .get_two_layer_window_collection_mut(win_ref);
650                    windows.spawn_caller_instance(fresh_values, parameter_values, ts, ts);
651                }
652                (WindowParameterizationKind::Both | WindowParameterizationKind::Target, true) => {
653                    // target and caller are parameterized and windows are evaluated at global clock.
654                    // nothing to do?
655                }
656            }
657        }
658
659        //create window that aggregate over this stream
660        for (_, _origin, win_ref) in stream.aggregated_by.iter().filter(|(_, _, w)| {
661            matches!(
662                w,
663                WindowReference::Sliding(_) | WindowReference::Discrete(_)
664            )
665        }) {
666            let WindowParameterization { kind, global } = self.window_parameterization(*win_ref);
667            // Self is target of the window
668            match (kind, global) {
669                (WindowParameterizationKind::Caller | WindowParameterizationKind::None, _) => {}
670                (WindowParameterizationKind::Both | WindowParameterizationKind::Target, true) => {
671                    let windows = self.global_store.get_window_collection_mut(*win_ref);
672                    let window = windows.get_or_create(parameter_values, ts);
673                    // Window is not activated by caller so we assume it to have existed since the beginning.
674                    window.activate(Time::default());
675                }
676                (WindowParameterizationKind::Target, false) => {
677                    let windows = self.global_store.get_window_collection_mut(*win_ref);
678                    windows.create_window(parameter_values, ts);
679                    // Window is activated by caller at spawn
680                }
681                (WindowParameterizationKind::Both, false) => {
682                    let windows = self
683                        .global_store
684                        .get_two_layer_window_collection_mut(*win_ref);
685                    windows.spawn_target_instance(parameter_values);
686                    // Windows are activated by caller at spawn
687                }
688            }
689        }
690    }
691
692    fn eval_close(&mut self, output: OutputReference, parameter: &[Value], ts: Time) {
693        let stream = self.ir.output(StreamReference::Out(output));
694
695        let expr = self.compiled_close_exprs[output].clone();
696        let ctx = self.as_EvaluationContext(parameter, ts);
697        let res = expr.execute(&ctx);
698        if !res.as_bool() {
699            return;
700        }
701
702        let own_windows: Vec<WindowReference> = self
703            .stream_windows
704            .get(&stream.reference)
705            .map(|windows| windows.to_vec())
706            .unwrap_or_default();
707        if stream.is_parameterized() {
708            // mark instance for closing
709            self.global_store
710                .get_out_instance_collection_mut(output)
711                .mark_for_deletion(parameter);
712
713            for win_ref in own_windows {
714                // Self is caller of the window
715                match self.window_parameterization(win_ref).kind {
716                    WindowParameterizationKind::None | WindowParameterizationKind::Target => {
717                        // Do nothing! closing is handled by target
718                    }
719                    WindowParameterizationKind::Caller => {
720                        self.global_store
721                            .get_window_collection_mut(win_ref)
722                            .delete_window(parameter);
723                    }
724                    WindowParameterizationKind::Both => {
725                        self.global_store
726                            .get_two_layer_window_collection_mut(win_ref)
727                            .close_caller_instance(parameter);
728                    }
729                }
730            }
731
732            // close all windows referencing this instance
733            for (_, _origin, win_ref) in stream.aggregated_by.iter().filter(|(_, _, w)| {
734                matches!(
735                    w,
736                    WindowReference::Sliding(_) | WindowReference::Discrete(_)
737                )
738            }) {
739                // Self is target of the window
740                match self.window_parameterization(*win_ref).kind {
741                    WindowParameterizationKind::None | WindowParameterizationKind::Caller => {
742                        unreachable!()
743                    }
744                    WindowParameterizationKind::Target => {
745                        self.global_store
746                            .get_window_collection_mut(*win_ref)
747                            .schedule_deletion(parameter, ts);
748                    }
749                    WindowParameterizationKind::Both => {
750                        self.global_store
751                            .get_two_layer_window_collection_mut(*win_ref)
752                            .close_target_instance(parameter, ts);
753                    }
754                }
755            }
756        } else {
757            for win_ref in own_windows {
758                // Self is caller of the window
759                match self.window_parameterization(win_ref).kind {
760                    WindowParameterizationKind::None => {
761                        self.global_store.get_window_mut(win_ref).deactivate();
762                    }
763                    WindowParameterizationKind::Target => {
764                        self.global_store
765                            .get_window_collection_mut(win_ref)
766                            .deactivate_all();
767                    }
768                    WindowParameterizationKind::Caller | WindowParameterizationKind::Both => {
769                        unreachable!("Parameters are empty")
770                    }
771                }
772            }
773        }
774        self.closed_outputs.insert(output);
775
776        // Remove instance evaluation from schedule if stream is periodic
777        if let Some(tds) = self.time_driven_streams[output] {
778            let mut schedule = (*self.dyn_schedule).borrow_mut();
779            schedule.remove_evaluation(output, parameter, tds.period_in_duration());
780
781            // Remove close from schedule if it depends on current instance
782            if let PacingType::LocalPeriodic(f) = stream.close.pacing {
783                let period = Duration::from_nanos(
784                    UOM_Time::new::<uom::si::time::second>(
785                        f.get::<uom::si::frequency::hertz>().inv(),
786                    )
787                    .get::<nanosecond>()
788                    .to_integer()
789                    .try_into()
790                    .expect("Period [ns] too large for u64!"),
791                );
792                schedule.remove_close(output, parameter, period);
793            }
794        }
795    }
796
797    /// Closes all streams marked for deletion
798    fn close_streams(&mut self) {
799        for o in self.closed_outputs.iter() {
800            if self.ir.output(StreamReference::Out(o)).is_parameterized() {
801                let vals = self
802                    .global_store
803                    .get_out_instance_collection_mut(o)
804                    .delete_instances();
805                if let Some(wrefs) = self
806                    .stream_instance_aggregations
807                    .get(&StreamReference::Out(o))
808                {
809                    wrefs.iter().for_each(|aggr| {
810                        let inst = self.global_store.get_instance_aggregation_mut(*aggr);
811                        vals.iter().for_each(|v| {
812                            inst.remove_value(v.clone());
813                        })
814                    })
815                }
816            } else {
817                self.global_store.get_out_instance_mut(o).deactivate();
818            }
819        }
820    }
821
822    fn eval_event_driven_spawn(
823        &mut self,
824        output: OutputReference,
825        ts: Time,
826        tracer: &mut impl Tracer,
827    ) {
828        if self.spawn_activation_conditions[output].eval(self.fresh_inputs) {
829            tracer.spawn_start(output);
830            self.eval_spawn(output, ts);
831            tracer.spawn_end(output);
832        }
833    }
834
835    fn eval_stream_instances(
836        &mut self,
837        output: OutputReference,
838        ts: Time,
839        tracer: &mut impl Tracer,
840    ) {
841        if self
842            .ir
843            .output(StreamReference::Out(output))
844            .is_parameterized()
845        {
846            let parameter: Vec<Vec<Value>> = self
847                .global_store
848                .get_out_instance_collection(output)
849                .all_parameter()
850                .cloned()
851                .collect();
852            for instance in parameter {
853                tracer.instance_eval_start(output, instance.as_slice());
854                self.eval_stream_instance(output, instance.as_slice(), ts);
855                tracer.instance_eval_end(output, instance.as_slice());
856            }
857        } else if self.global_store.get_out_instance(output).is_active() {
858            tracer.instance_eval_start(output, &[]);
859            self.eval_stream_instance(output, &[], ts);
860            tracer.instance_eval_end(output, &[]);
861        }
862    }
863
864    fn eval_close_instances(
865        &mut self,
866        output: OutputReference,
867        ts: Time,
868        tracer: &mut impl Tracer,
869    ) {
870        if self
871            .ir
872            .output(StreamReference::Out(output))
873            .is_parameterized()
874        {
875            let parameter: Vec<Vec<Value>> = self
876                .global_store
877                .get_out_instance_collection(output)
878                .all_parameter()
879                .cloned()
880                .collect();
881            for instance in parameter {
882                tracer.close_start(output, instance.as_slice());
883                self.eval_close(output, instance.as_slice(), ts);
884                tracer.close_end(output, instance.as_slice());
885            }
886        } else if self.global_store.get_out_instance(output).is_active() {
887            tracer.close_start(output, &[]);
888            self.eval_close(output, &[], ts);
889            tracer.close_end(output, &[]);
890        }
891    }
892
893    fn eval_event_driven_output(
894        &mut self,
895        output: OutputReference,
896        ts: Time,
897        tracer: &mut impl Tracer,
898    ) {
899        if self.stream_activation_conditions[output].eval(self.fresh_inputs) {
900            self.eval_stream_instances(output, ts, tracer)
901        }
902    }
903
904    /// Time is expected to be relative to the start of the monitor
905    pub(crate) fn eval_time_driven_tasks(
906        &mut self,
907        tasks: Vec<EvaluationTask>,
908        ts: Time,
909        tracer: &mut impl Tracer,
910    ) {
911        if tasks.is_empty() {
912            return;
913        }
914        self.new_cycle(ts);
915        self.prepare_evaluation(ts);
916        for task in tasks {
917            match task {
918                EvaluationTask::Evaluate(idx, parameter) => {
919                    tracer.instance_eval_start(idx, parameter.as_slice());
920                    self.eval_stream_instance(idx, parameter.as_slice(), ts);
921                    tracer.instance_eval_end(idx, parameter.as_slice());
922                }
923                EvaluationTask::EvaluateInstances(idx) => {
924                    self.eval_stream_instances(idx, ts, tracer);
925                }
926                EvaluationTask::Spawn(idx) => {
927                    tracer.spawn_start(idx);
928                    self.eval_spawn(idx, ts);
929                    tracer.spawn_end(idx);
930                }
931                EvaluationTask::Close(idx, parameter) => {
932                    tracer.close_start(idx, parameter.as_slice());
933                    self.eval_close(idx, parameter.as_slice(), ts);
934                    tracer.close_end(idx, parameter.as_slice());
935                }
936                EvaluationTask::CloseInstances(idx) => {
937                    self.eval_close_instances(idx, ts, tracer);
938                }
939            }
940        }
941    }
942
943    fn prepare_evaluation(&mut self, ts: Time) {
944        // We need to copy the references first because updating needs exclusive access to `self`.
945        let windows = &self.ir.sliding_windows;
946        for win in windows {
947            let WindowParameterization { kind, .. } = self.window_parameterization(win.reference);
948            match kind {
949                WindowParameterizationKind::None => {
950                    let window = self.global_store.get_window_mut(win.reference);
951                    if window.is_active() {
952                        window.update(ts);
953                    }
954                }
955                WindowParameterizationKind::Caller | WindowParameterizationKind::Target => {
956                    self.global_store
957                        .get_window_collection_mut(win.reference)
958                        .update_all(ts);
959                }
960                WindowParameterizationKind::Both => {
961                    self.global_store
962                        .get_two_layer_window_collection_mut(win.reference)
963                        .update_all(ts);
964                }
965            }
966        }
967    }
968
969    fn eval_stream_instance(&mut self, output: OutputReference, parameter: &[Value], ts: Time) {
970        let ix = output;
971
972        let expr = self.compiled_stream_exprs[ix].clone();
973        let ctx = self.as_EvaluationContext(parameter, ts);
974        let res = expr.execute(&ctx);
975
976        // Filter evaluated to false
977        if let Value::None = res {
978            return;
979        }
980
981        let is_parameterized = self.ir.outputs[ix].is_parameterized();
982        // Register value in global store.
983        let instance = if is_parameterized {
984            self.global_store
985                .get_out_instance_collection_mut(output)
986                .instance_mut(parameter)
987                .expect("tried to eval non existing instance")
988        } else {
989            self.global_store.get_out_instance_mut(output)
990        };
991        let old_value = instance.get_value(0);
992        instance.push_value(res.clone());
993        self.fresh_outputs.insert(ix);
994
995        if let Some(trigger) = self.is_trigger(output) {
996            self.fresh_triggers.insert(trigger.trigger_reference);
997        }
998
999        // Update Instance aggregations
1000        if let Some(aggrs) = self
1001            .stream_instance_aggregations
1002            .get(&StreamReference::Out(output))
1003        {
1004            aggrs.iter().for_each(|w| {
1005                let aggr = self.global_store.get_instance_aggregation_mut(*w);
1006                if let Some(old) = old_value.clone() {
1007                    aggr.remove_value(old);
1008                }
1009                aggr.accept_value(res.clone());
1010            });
1011        }
1012
1013        // Check linked windows and inform them.
1014        let extended = &self.ir.outputs[ix];
1015        for (_sr, _origin, win) in extended.aggregated_by.iter().filter(|(_, _, w)| {
1016            matches!(
1017                w,
1018                WindowReference::Sliding(_) | WindowReference::Discrete(_)
1019            )
1020        }) {
1021            self.extend_window(parameter, *win, res.clone(), ts);
1022        }
1023    }
1024
1025    fn window_parameterization(&self, win: WindowReference) -> WindowParameterization {
1026        self.global_store.window_parameterization(win)
1027    }
1028
1029    fn extend_window(
1030        &mut self,
1031        own_parameter: &[Value],
1032        win: WindowReference,
1033        value: Value,
1034        ts: Time,
1035    ) {
1036        match self.window_parameterization(win).kind {
1037            WindowParameterizationKind::None => self
1038                .global_store
1039                .get_window_mut(win)
1040                .accept_value(value, ts),
1041            WindowParameterizationKind::Caller => self
1042                .global_store
1043                .get_window_collection_mut(win)
1044                .accept_value_all(value, ts),
1045            WindowParameterizationKind::Target => self
1046                .global_store
1047                .get_window_collection_mut(win)
1048                .window_mut(own_parameter)
1049                .expect("tried to extend non existing window")
1050                .accept_value(value, ts),
1051            WindowParameterizationKind::Both => self
1052                .global_store
1053                .get_two_layer_window_collection_mut(win)
1054                .accept_value(own_parameter, value, ts),
1055        }
1056    }
1057
1058    /// Marks a new evaluation cycle
1059    fn new_cycle(&mut self, ts: Time) {
1060        self.close_streams();
1061        self.fresh_inputs.clear();
1062        self.fresh_outputs.clear();
1063        self.fresh_triggers.clear();
1064
1065        self.spawned_outputs.clear();
1066        self.closed_outputs.clear();
1067        self.global_store.new_cycle(ts);
1068    }
1069
1070    fn is_trigger(&self, ix: OutputReference) -> Option<&Trigger> {
1071        self.triggers[ix].as_ref()
1072    }
1073
1074    fn peek_value(&self, sr: StreamReference, args: &[Value], offset: i16) -> Option<Value> {
1075        match sr {
1076            StreamReference::In(ix) => {
1077                assert!(args.is_empty());
1078                self.global_store.get_in_instance(ix).get_value(offset)
1079            }
1080            StreamReference::Out(ix) => {
1081                if self.ir.stream(sr).is_parameterized() {
1082                    assert!(!args.is_empty());
1083                    self.global_store
1084                        .get_out_instance_collection(ix)
1085                        .instance(args)
1086                        .and_then(|i| i.get_value(offset))
1087                } else {
1088                    self.global_store.get_out_instance(ix).get_value(offset)
1089                }
1090            }
1091        }
1092    }
1093
1094    #[allow(non_snake_case)]
1095    fn as_EvaluationContext<'a>(
1096        &'a mut self,
1097        parameter: &'a [Value],
1098        ts: Time,
1099    ) -> EvaluationContext<'a> {
1100        EvaluationContext {
1101            ts,
1102            global_store: self.global_store,
1103            fresh_inputs: self.fresh_inputs,
1104            fresh_outputs: self.fresh_outputs,
1105            parameter,
1106            lambda_parameter: None,
1107        }
1108    }
1109}
1110
1111impl EvaluationContext<'_> {
1112    pub(crate) fn lookup_latest(&self, stream_ref: StreamReference, parameter: &[Value]) -> Value {
1113        match stream_ref {
1114            StreamReference::In(ix) => self
1115                .global_store
1116                .get_in_instance(ix)
1117                .get_value(0)
1118                .unwrap_or(Value::None),
1119            StreamReference::Out(ix) => {
1120                if parameter.is_empty() {
1121                    self.global_store
1122                        .get_out_instance(ix)
1123                        .get_value(0)
1124                        .unwrap_or(Value::None)
1125                } else {
1126                    self.global_store
1127                        .get_out_instance_collection(ix)
1128                        .instance(parameter)
1129                        .and_then(|i| i.get_value(0))
1130                        .unwrap_or(Value::None)
1131                }
1132            }
1133        }
1134    }
1135
1136    pub(crate) fn lookup_latest_check(
1137        &self,
1138        stream_ref: StreamReference,
1139        parameter: &[Value],
1140    ) -> Value {
1141        let inst = match stream_ref {
1142            StreamReference::In(ix) => {
1143                debug_assert!(self.fresh_inputs.contains(ix), "ix={}", ix);
1144                self.global_store.get_in_instance(ix)
1145            }
1146            StreamReference::Out(ix) => {
1147                debug_assert!(self.fresh_outputs.contains(ix), "ix={}", ix);
1148                if parameter.is_empty() {
1149                    self.global_store.get_out_instance(ix)
1150                } else {
1151                    self.global_store
1152                        .get_out_instance_collection(ix)
1153                        .instance(parameter)
1154                        .expect("tried to sync access non existing instance")
1155                }
1156            }
1157        };
1158        inst.get_value(0).unwrap_or(Value::None)
1159    }
1160
1161    fn get_instance_and_fresh(
1162        &self,
1163        stream_ref: StreamReference,
1164        parameter: &[Value],
1165    ) -> (Option<&InstanceStore>, bool) {
1166        match stream_ref {
1167            StreamReference::In(ix) => (
1168                Some(self.global_store.get_in_instance(ix)),
1169                self.fresh_inputs.contains(ix),
1170            ),
1171            StreamReference::Out(ix) => {
1172                if parameter.is_empty() {
1173                    (
1174                        Some(self.global_store.get_out_instance(ix)),
1175                        self.fresh_outputs.contains(ix),
1176                    )
1177                } else {
1178                    let collection = self.global_store.get_out_instance_collection(ix);
1179                    (
1180                        collection.instance(parameter),
1181                        collection.is_fresh(parameter),
1182                    )
1183                }
1184            }
1185        }
1186    }
1187
1188    pub(crate) fn lookup_fresh(&self, stream_ref: StreamReference, parameter: &[Value]) -> Value {
1189        let (_, fresh) = self.get_instance_and_fresh(stream_ref, parameter);
1190        Value::Bool(fresh)
1191    }
1192
1193    pub(crate) fn lookup_with_offset(
1194        &self,
1195        stream_ref: StreamReference,
1196        parameter: &[Value],
1197        offset: i16,
1198    ) -> Value {
1199        let (inst, fresh) = self.get_instance_and_fresh(stream_ref, parameter);
1200        let inst = inst.expect("target stream instance to exist for sync access");
1201        if fresh {
1202            inst.get_value(offset).unwrap_or(Value::None)
1203        } else {
1204            inst.get_value(offset + 1).unwrap_or(Value::None)
1205        }
1206    }
1207
1208    pub(crate) fn lookup_current(&self, stream_ref: StreamReference, parameter: &[Value]) -> Value {
1209        let (inst, fresh) = self.get_instance_and_fresh(stream_ref, parameter);
1210        if fresh {
1211            inst.expect("fresh instance to exist")
1212                .get_value(0)
1213                .expect("fresh stream to have a value.")
1214        } else {
1215            Value::None
1216        }
1217    }
1218
1219    /// Immediately evaluates the instance aggregation given all instances.
1220    pub(crate) fn lookup_instance_aggr(&self, window_reference: WindowReference) -> Value {
1221        if let WindowReference::Instance(idx) = window_reference {
1222            let aggr = &self.global_store.instance_aggregations[idx];
1223            let target = &self.global_store.p_outputs
1224                [self.global_store.stream_index_map[aggr.target.out_ix()]];
1225            aggr.get_value_with_ctx(target, self)
1226        } else {
1227            unreachable!("Called update_instance_aggregation for non instance");
1228        }
1229    }
1230
1231    pub(crate) fn lookup_window(
1232        &self,
1233        window_ref: WindowReference,
1234        target_parameter: &[Value],
1235    ) -> Value {
1236        let parameterization = self.global_store.window_parameterization(window_ref).kind;
1237        match parameterization {
1238            WindowParameterizationKind::None => {
1239                self.global_store.get_window(window_ref).get_value(self.ts)
1240            }
1241            WindowParameterizationKind::Caller => self
1242                .global_store
1243                .get_window_collection(window_ref)
1244                .window(self.parameter)
1245                .expect("Own window to exist")
1246                .get_value(self.ts),
1247            WindowParameterizationKind::Target => {
1248                let window_collection = self.global_store.get_window_collection(window_ref);
1249                let window = window_collection.window(target_parameter);
1250                if let Some(w) = window {
1251                    w.get_value(self.ts)
1252                } else {
1253                    window_collection.default_value(self.ts)
1254                }
1255            }
1256            WindowParameterizationKind::Both => {
1257                let collection = self
1258                    .global_store
1259                    .get_two_layer_window_collection(window_ref);
1260                let window = collection.window(target_parameter, self.parameter);
1261                if let Some(w) = window {
1262                    w.get_value(self.ts)
1263                } else {
1264                    collection.default_value(self.ts)
1265                }
1266            }
1267        }
1268    }
1269
1270    pub(crate) fn is_active(&self, ac: &ActivationConditionOp) -> bool {
1271        ac.eval(self.fresh_inputs)
1272    }
1273
1274    pub(crate) fn with_new_instance<'a>(&'a self, inst: &'a Vec<Value>) -> EvaluationContext<'a> {
1275        let EvaluationContext {
1276            ts,
1277            global_store,
1278            fresh_inputs,
1279            fresh_outputs,
1280            parameter,
1281            lambda_parameter: _,
1282        } = self;
1283        EvaluationContext {
1284            ts: *ts,
1285            global_store,
1286            fresh_inputs,
1287            fresh_outputs,
1288            parameter,
1289            lambda_parameter: Some(inst),
1290        }
1291    }
1292}
1293
1294impl ActivationConditionOp {
1295    fn new(ac: &Activation, n_inputs: usize) -> Self {
1296        use ActivationConditionOp::*;
1297        if let Activation::True = ac {
1298            // special case for constant output streams
1299            return True;
1300        }
1301        if let Activation::Conjunction(vec) = ac {
1302            assert!(!vec.is_empty());
1303            let ixs: Vec<usize> = vec
1304                .iter()
1305                .flat_map(|ac| {
1306                    if let Activation::Stream(var) = ac {
1307                        Some(var.in_ix())
1308                    } else {
1309                        None
1310                    }
1311                })
1312                .collect();
1313            if vec.len() == ixs.len() {
1314                // fast path for conjunctive activation conditions
1315                let mut bs = BitSet::with_capacity(n_inputs);
1316                for ix in ixs {
1317                    bs.insert(ix);
1318                }
1319                return Conjunction(bs);
1320            }
1321        }
1322        General(ac.clone())
1323    }
1324
1325    pub(crate) fn eval(&self, inputs: &BitSet) -> bool {
1326        use ActivationConditionOp::*;
1327        match self {
1328            True => true,
1329            Conjunction(bs) => bs.is_subset(inputs),
1330            General(ac) => Self::eval_(ac, inputs),
1331            TimeDriven => unreachable!(),
1332        }
1333    }
1334
1335    fn eval_(ac: &Activation, inputs: &BitSet) -> bool {
1336        use Activation::*;
1337        match ac {
1338            Stream(var) => inputs.contains(var.in_ix()),
1339            Conjunction(vec) => vec.iter().all(|ac| Self::eval_(ac, inputs)),
1340            Disjunction(vec) => vec.iter().any(|ac| Self::eval_(ac, inputs)),
1341            True => unreachable!(),
1342        }
1343    }
1344
1345    fn is_eventdriven(&self) -> bool {
1346        !matches!(self, ActivationConditionOp::TimeDriven)
1347    }
1348}
1349
1350#[cfg(test)]
1351mod tests {
1352
1353    use std::time::Duration;
1354
1355    use ordered_float::{Float, NotNan};
1356    use rtlola_frontend::ParserConfig;
1357
1358    use super::*;
1359    use crate::monitor::NoTracer;
1360    use crate::schedule::dynamic_schedule::*;
1361    use crate::storage::Value::*;
1362
1363    fn setup(spec: &str) -> (RtLolaMir, EvaluatorData, Duration) {
1364        let cfg = ParserConfig::for_string(spec.to_string());
1365        let handler = rtlola_frontend::Handler::from(&cfg);
1366        let ir = rtlola_frontend::parse(&cfg).unwrap_or_else(|e| {
1367            handler.emit_error(&e);
1368            panic!();
1369        });
1370        let dyn_schedule = Rc::new(RefCell::new(DynamicSchedule::new()));
1371        let now = Duration::ZERO;
1372        let eval = EvaluatorData::new(ir.clone(), dyn_schedule);
1373        (ir, eval, now)
1374    }
1375
1376    fn setup_time(spec: &str) -> (RtLolaMir, EvaluatorData, Time) {
1377        let (ir, eval, _) = setup(spec);
1378        (ir, eval, Time::default())
1379    }
1380
1381    macro_rules! assert_float_eq {
1382        ($left:expr, $right:expr) => {
1383            if let Float(left) = $left {
1384                if let Float(right) = $right {
1385                    assert!(
1386                        (left - right).abs() < f64::epsilon(),
1387                        "Assertion failed: Difference between {} and {} is greater than {}",
1388                        left,
1389                        right,
1390                        f64::epsilon()
1391                    );
1392                } else {
1393                    panic!("{:?} is not a float.", $right)
1394                }
1395            } else {
1396                panic!("{:?} is not a float.", $left)
1397            }
1398        };
1399    }
1400
1401    macro_rules! eval_stream_instances {
1402        ($eval:expr, $start:expr, $ix:expr) => {
1403            $eval.eval_event_driven_output($ix.out_ix(), $start, &mut NoTracer::default());
1404        };
1405    }
1406
1407    macro_rules! eval_stream_instances_timed {
1408        ($eval:expr, $time:expr, $ix:expr) => {
1409            $eval.prepare_evaluation($time);
1410            $eval.eval_event_driven_output($ix.out_ix(), $time, &mut NoTracer::default());
1411        };
1412    }
1413
1414    macro_rules! eval_stream {
1415        ($eval:expr, $start:expr, $ix:expr, $parameter:expr) => {
1416            $eval.eval_stream_instance($ix, $parameter.as_slice(), $start);
1417        };
1418    }
1419
1420    macro_rules! spawn_stream {
1421        ($eval:expr, $start:expr, $ix:expr) => {
1422            $eval.eval_event_driven_spawn($ix.out_ix(), $start, &mut NoTracer::default());
1423        };
1424    }
1425
1426    macro_rules! spawn_stream_timed {
1427        ($eval:expr, $time:expr, $ix:expr) => {
1428            $eval.eval_event_driven_spawn($ix.out_ix(), $time, &mut NoTracer::default());
1429        };
1430    }
1431
1432    macro_rules! eval_close {
1433        ($eval:expr, $start:expr, $ix:expr, $parameter:expr) => {
1434            $eval.eval_close($ix.out_ix(), $parameter.as_slice(), $start);
1435            $eval.close_streams();
1436        };
1437    }
1438
1439    macro_rules! eval_close_timed {
1440        ($eval:expr, $time:expr, $ix:expr, $parameter:expr) => {
1441            $eval.eval_close($ix.out_ix(), $parameter.as_slice(), $time);
1442            $eval.close_streams();
1443        };
1444    }
1445
1446    macro_rules! stream_has_instance {
1447        ($eval:expr, $ix:expr, $parameter:expr) => {
1448            if $parameter.is_empty() {
1449                $eval
1450                    .global_store
1451                    .get_out_instance($ix.out_ix())
1452                    .is_active()
1453            } else {
1454                $eval
1455                    .global_store
1456                    .get_out_instance_collection($ix.out_ix())
1457                    .contains($parameter.as_slice())
1458            }
1459        };
1460    }
1461
1462    macro_rules! eval_stream_timed {
1463        ($eval:expr, $ix:expr, $parameter:expr, $time:expr) => {
1464            $eval.prepare_evaluation($time);
1465            $eval.eval_stream_instance($ix, $parameter.as_slice(), $time);
1466        };
1467    }
1468
1469    macro_rules! accept_input {
1470        ($eval:expr, $start:expr, $str_ref:expr, $v:expr) => {
1471            $eval.accept_input($str_ref.in_ix(), $v.clone(), $start);
1472        };
1473    }
1474
1475    macro_rules! accept_input_timed {
1476        ($eval:expr, $str_ref:expr, $v:expr, $time:expr) => {
1477            $eval.accept_input($str_ref.in_ix(), $v.clone(), $time);
1478        };
1479    }
1480
1481    macro_rules! peek_assert_eq {
1482        ($eval:expr, $start:expr, $ix:expr, $parameter:expr, $value:expr) => {
1483            eval_stream!($eval, $start, $ix, $parameter);
1484            assert_eq!(
1485                $eval
1486                    .peek_value(StreamReference::Out($ix), $parameter.as_slice(), 0)
1487                    .unwrap(),
1488                $value
1489            );
1490        };
1491    }
1492
1493    #[test]
1494    fn test_const_output_literals() {
1495        let (_, eval, start) = setup(
1496            r#"
1497        input i_0: UInt8
1498
1499        output o_0: Bool @i_0 := true
1500        output o_1: UInt8 @i_0 := 3
1501        output o_2: Int8 @i_0 := -5
1502        output o_3: Float32 @i_0 := -123.456
1503        output o_4: String @i_0 := "foobar"
1504        "#,
1505        );
1506        let mut eval = eval.into_evaluator();
1507        let sr = StreamReference::In(0);
1508        let v = Unsigned(3);
1509        accept_input!(eval, start, sr, v);
1510        peek_assert_eq!(eval, start, 0, vec![], Bool(true));
1511        peek_assert_eq!(eval, start, 1, vec![], Unsigned(3));
1512        peek_assert_eq!(eval, start, 2, vec![], Signed(-5));
1513        peek_assert_eq!(eval, start, 3, vec![], Value::try_from(-123.456).unwrap());
1514        peek_assert_eq!(eval, start, 4, vec![], Str("foobar".into()));
1515    }
1516
1517    #[test]
1518    fn test_const_output_arithlog() {
1519        let (_, eval, start) = setup(
1520            r#"
1521        input i_0: Int8
1522
1523        output o_0:   Bool @i_0 := !false
1524        output o_1:   Bool @i_0 := !true
1525        output o_2:  UInt8 @i_0 := 8 + 3
1526        output o_3:  UInt8 @i_0 := 8 - 3
1527        output o_4:  UInt8 @i_0 := 8 * 3
1528        output o_5:  UInt8 @i_0 := 8 / 3
1529        output o_6:  UInt8 @i_0 := 8 % 3
1530        output o_7:  UInt8 @i_0 := 8 ** 3
1531        output o_8:   Bool @i_0 := false || false
1532        output o_9:   Bool @i_0 := false || true
1533        output o_10:  Bool @i_0 := true  || false
1534        output o_11:  Bool @i_0 := true  || true
1535        output o_12:  Bool @i_0 := false && false
1536        output o_13:  Bool @i_0 := false && true
1537        output o_14:  Bool @i_0 := true  && false
1538        output o_15:  Bool @i_0 := true  && true
1539        output o_16:  Bool @i_0 := 0 < 1
1540        output o_17:  Bool @i_0 := 0 < 0
1541        output o_18:  Bool @i_0 := 1 < 0
1542        output o_19:  Bool @i_0 := 0 <= 1
1543        output o_20:  Bool @i_0 := 0 <= 0
1544        output o_21:  Bool @i_0 := 1 <= 0
1545        output o_22:  Bool @i_0 := 0 >= 1
1546        output o_23:  Bool @i_0 := 0 >= 0
1547        output o_24:  Bool @i_0 := 1 >= 0
1548        output o_25:  Bool @i_0 := 0 > 1
1549        output o_26:  Bool @i_0 := 0 > 0
1550        output o_27:  Bool @i_0 := 1 > 0
1551        output o_28:  Bool @i_0 := 0 == 0
1552        output o_29:  Bool @i_0 := 0 == 1
1553        output o_30:  Bool @i_0 := 0 != 0
1554        output o_31:  Bool @i_0 := 0 != 1
1555        "#,
1556        );
1557        let mut eval = eval.into_evaluator();
1558        let sr = StreamReference::In(0);
1559        let v = Unsigned(3);
1560        accept_input!(eval, start, sr, v);
1561        peek_assert_eq!(eval, start, 0, vec![], Bool(!false));
1562        peek_assert_eq!(eval, start, 1, vec![], Bool(!true));
1563        peek_assert_eq!(eval, start, 2, vec![], Unsigned(8 + 3));
1564        peek_assert_eq!(eval, start, 3, vec![], Unsigned(8 - 3));
1565        peek_assert_eq!(eval, start, 4, vec![], Unsigned(8 * 3));
1566        peek_assert_eq!(eval, start, 5, vec![], Unsigned(8 / 3));
1567        peek_assert_eq!(eval, start, 6, vec![], Unsigned(8 % 3));
1568        peek_assert_eq!(eval, start, 7, vec![], Unsigned(8 * 8 * 8));
1569        peek_assert_eq!(eval, start, 8, vec![], Bool(false || false));
1570        peek_assert_eq!(eval, start, 9, vec![], Bool(false || true));
1571        peek_assert_eq!(eval, start, 10, vec![], Bool(true || false));
1572        peek_assert_eq!(eval, start, 11, vec![], Bool(true || true));
1573        peek_assert_eq!(eval, start, 12, vec![], Bool(false && false));
1574        peek_assert_eq!(eval, start, 13, vec![], Bool(false && true));
1575        peek_assert_eq!(eval, start, 14, vec![], Bool(true && false));
1576        peek_assert_eq!(eval, start, 15, vec![], Bool(true && true));
1577        peek_assert_eq!(eval, start, 16, vec![], Bool(0 < 1));
1578        peek_assert_eq!(eval, start, 17, vec![], Bool(0 < 0));
1579        peek_assert_eq!(eval, start, 18, vec![], Bool(1 < 0));
1580        peek_assert_eq!(eval, start, 19, vec![], Bool(0 <= 1));
1581        peek_assert_eq!(eval, start, 20, vec![], Bool(0 <= 0));
1582        peek_assert_eq!(eval, start, 21, vec![], Bool(1 <= 0));
1583        peek_assert_eq!(eval, start, 22, vec![], Bool(0 >= 1));
1584        peek_assert_eq!(eval, start, 23, vec![], Bool(0 >= 0));
1585        peek_assert_eq!(eval, start, 24, vec![], Bool(1 >= 0));
1586        peek_assert_eq!(eval, start, 25, vec![], Bool(0 > 1));
1587        peek_assert_eq!(eval, start, 26, vec![], Bool(0 > 0));
1588        peek_assert_eq!(eval, start, 27, vec![], Bool(1 > 0));
1589        peek_assert_eq!(eval, start, 28, vec![], Bool(0 == 0));
1590        peek_assert_eq!(eval, start, 29, vec![], Bool(0 == 1));
1591        peek_assert_eq!(eval, start, 30, vec![], Bool(0 != 0));
1592        peek_assert_eq!(eval, start, 31, vec![], Bool(0 != 1));
1593    }
1594
1595    #[test]
1596    fn test_input_only() {
1597        let (_, eval, start) = setup("input a: UInt8");
1598        let mut eval = eval.into_evaluator();
1599        let sr = StreamReference::In(0);
1600        let v = Unsigned(3);
1601        accept_input!(eval, start, sr, v);
1602        assert_eq!(eval.peek_value(sr, &Vec::new(), 0).unwrap(), v)
1603    }
1604
1605    #[test]
1606    fn test_sync_lookup() {
1607        let (_, eval, start) = setup("input a: UInt8 output b: UInt8 := a output c: UInt8 := b");
1608        let mut eval = eval.into_evaluator();
1609        let out_ref_0 = StreamReference::Out(0);
1610        let out_ref_1 = StreamReference::Out(1);
1611        let in_ref = StreamReference::In(0);
1612        let v = Unsigned(9);
1613        accept_input!(eval, start, in_ref, v);
1614        eval_stream!(eval, start, 0, vec![]);
1615        eval_stream!(eval, start, 1, vec![]);
1616        assert_eq!(eval.peek_value(out_ref_0, &Vec::new(), 0).unwrap(), v);
1617        assert_eq!(eval.peek_value(out_ref_1, &Vec::new(), 0).unwrap(), v)
1618    }
1619
1620    #[test]
1621    fn test_oob_lookup() {
1622        let (_, eval, start) =
1623            setup("input a: UInt8\noutput b := a.offset(by: -1).defaults(to: 3)\noutput x: UInt8 @5Hz := b.hold().defaults(to: 3)");
1624        let mut eval = eval.into_evaluator();
1625        let out_ref = StreamReference::Out(1);
1626        let in_ref = StreamReference::In(0);
1627        let v1 = Unsigned(1);
1628        accept_input!(eval, start, in_ref, v1);
1629        eval_stream!(eval, start, 0, vec![]);
1630        eval_stream!(eval, start, 1, vec![]);
1631        assert_eq!(
1632            eval.peek_value(out_ref, &Vec::new(), 0).unwrap(),
1633            Unsigned(3)
1634        );
1635    }
1636
1637    #[test]
1638    fn test_output_lookup() {
1639        let (_, eval, start) = setup(
1640            "input a: UInt8\n\
1641                    output mirror: UInt8 := a\n\
1642                    output mirror_offset := mirror.offset(by: -1).defaults(to: 5)\n\
1643                    output c: UInt8 @5Hz := mirror.hold().defaults(to: 8)\n\
1644                    output d: UInt8 @5Hz := mirror_offset.hold().defaults(to: 3)",
1645        );
1646        let mut eval = eval.into_evaluator();
1647        let out_ref = StreamReference::Out(2);
1648        let in_ref = StreamReference::In(0);
1649        let v1 = Unsigned(1);
1650        let v2 = Unsigned(2);
1651        accept_input!(eval, start, in_ref, v1);
1652        eval_stream!(eval, start, 0, vec![]);
1653        eval_stream!(eval, start, 1, vec![]);
1654        accept_input!(eval, start, in_ref, v2);
1655        eval_stream!(eval, start, 0, vec![]);
1656        eval_stream!(eval, start, 1, vec![]);
1657        eval_stream!(eval, start, 2, vec![]);
1658        assert_eq!(
1659            eval.peek_value(StreamReference::Out(0), &Vec::new(), 0)
1660                .unwrap(),
1661            v2
1662        );
1663        assert_eq!(
1664            eval.peek_value(StreamReference::Out(1), &Vec::new(), 0)
1665                .unwrap(),
1666            v1
1667        );
1668        assert_eq!(eval.peek_value(out_ref, &Vec::new(), 0).unwrap(), v2);
1669    }
1670
1671    #[test]
1672    fn test_get_fresh_lookup() {
1673        let (_, eval, start) = setup(
1674            "input a: UInt8\n\
1675                    input b: UInt8\n\
1676                    output mirror: UInt8 := a\n\
1677                    output mirror_parameter (p)\n
1678                        spawn with a\n\
1679                        eval with a
1680                    output g1: UInt8 @a := mirror.get().defaults(to: 8)\n\
1681                    output g2: UInt8 @a := mirror_parameter(a).get().defaults(to: 8)\n\
1682                    output f1: Bool @a := mirror.is_fresh()\n\
1683                    output f2: Bool @a := mirror_parameter(a).is_fresh()",
1684        );
1685        let mut eval = eval.into_evaluator();
1686        let a = StreamReference::In(0);
1687        let mirror_p = StreamReference::Out(1);
1688        let g1 = StreamReference::Out(2);
1689        let g2 = StreamReference::Out(3);
1690        let f1 = StreamReference::Out(4);
1691        let f2 = StreamReference::Out(5);
1692
1693        let v1 = Unsigned(1);
1694        let t = Bool(true);
1695
1696        accept_input!(eval, start, a, v1);
1697        eval_stream!(eval, start, 0, vec![]);
1698        spawn_stream!(eval, start, mirror_p);
1699        eval_stream_instances!(eval, start, mirror_p);
1700
1701        eval_stream!(eval, start, 2, vec![]);
1702        eval_stream!(eval, start, 3, vec![]);
1703        eval_stream!(eval, start, 4, vec![]);
1704        eval_stream!(eval, start, 5, vec![]);
1705
1706        assert_eq!(eval.peek_value(g1, &Vec::new(), 0).unwrap(), v1);
1707        assert_eq!(eval.peek_value(g2, &Vec::new(), 0).unwrap(), v1);
1708
1709        assert_eq!(eval.peek_value(f1, &Vec::new(), 0).unwrap(), t);
1710        assert_eq!(eval.peek_value(f2, &Vec::new(), 0).unwrap(), t);
1711    }
1712
1713    #[test]
1714    fn test_get_fresh_lookup_fail() {
1715        let (_, eval, start) = setup(
1716            "input a: UInt8\n\
1717                    input b: UInt8\n\
1718                    output mirror: UInt8 := a\n\
1719                    output mirror_parameter (p)\n
1720                        spawn with a\n\
1721                        eval with a
1722                    output g1: UInt8 @b := mirror.get().defaults(to: 8)\n\
1723                    output g2: UInt8 @b := mirror_parameter(b).get().defaults(to: 8)\n\
1724                    output f1: Bool @b := mirror.is_fresh()\n\
1725                    output f2: Bool @b := mirror_parameter(b).is_fresh()",
1726        );
1727        let mut eval = eval.into_evaluator();
1728        let b = StreamReference::In(1);
1729        let g1 = StreamReference::Out(2);
1730        let g2 = StreamReference::Out(3);
1731        let f1 = StreamReference::Out(4);
1732        let f2 = StreamReference::Out(5);
1733
1734        let v2 = Unsigned(2);
1735        let d = Unsigned(8);
1736        let f = Bool(false);
1737
1738        accept_input!(eval, start, b, v2);
1739
1740        eval_stream!(eval, start, 2, vec![]);
1741        eval_stream!(eval, start, 3, vec![]);
1742        eval_stream!(eval, start, 4, vec![]);
1743        eval_stream!(eval, start, 5, vec![]);
1744
1745        assert_eq!(eval.peek_value(g1, &Vec::new(), 0).unwrap(), d);
1746        assert_eq!(eval.peek_value(g2, &Vec::new(), 0).unwrap(), d);
1747
1748        assert_eq!(eval.peek_value(f1, &Vec::new(), 0).unwrap(), f);
1749        assert_eq!(eval.peek_value(f2, &Vec::new(), 0).unwrap(), f);
1750    }
1751
1752    #[test]
1753    fn test_conversion_if() {
1754        let (_, eval, start) =
1755            setup("input a: UInt8\noutput b: UInt16 := widen<UInt16>(if true then a else a[-1].defaults(to: 0))");
1756        let mut eval = eval.into_evaluator();
1757        let out_ref = StreamReference::Out(0);
1758        let in_ref = StreamReference::In(0);
1759        let v1 = Unsigned(1);
1760        accept_input!(eval, start, in_ref, v1);
1761        eval_stream!(eval, start, 0, vec![]);
1762        assert_eq!(eval.peek_value(out_ref, &Vec::new(), 0).unwrap(), v1);
1763    }
1764
1765    #[test]
1766    #[ignore] // See issue #32 in LolaParser.
1767    fn test_conversion_lookup() {
1768        let (_, eval, start) = setup("input a: UInt8\noutput b: UInt32 := a + 100000");
1769        let mut eval = eval.into_evaluator();
1770        let out_ref = StreamReference::Out(0);
1771        let in_ref = StreamReference::In(0);
1772        let expected = Unsigned(7 + 100000);
1773        let v1 = Unsigned(7);
1774        accept_input!(eval, start, in_ref, v1);
1775        eval_stream!(eval, start, 0, vec![]);
1776        assert_eq!(eval.peek_value(out_ref, &Vec::new(), 0).unwrap(), expected);
1777    }
1778
1779    #[test]
1780    fn test_bin_op() {
1781        let (_, eval, start) =
1782            setup("input a: UInt16\n input b: UInt16\noutput c: UInt16 := a + b");
1783        let mut eval = eval.into_evaluator();
1784        let out_ref = StreamReference::Out(0);
1785        let a = StreamReference::In(0);
1786        let b = StreamReference::In(1);
1787        let v1 = Unsigned(1);
1788        let v2 = Unsigned(2);
1789        let expected = Unsigned(1 + 2);
1790        accept_input!(eval, start, a, v1);
1791        accept_input!(eval, start, b, v2);
1792        eval_stream!(eval, start, 0, vec![]);
1793        assert_eq!(eval.peek_value(out_ref, &Vec::new(), 0).unwrap(), expected);
1794    }
1795
1796    #[test]
1797    fn test_bin_op_float() {
1798        let (_, eval, start) =
1799            setup("input a: Float64\n input b: Float64\noutput c: Float64 := a + b");
1800        let mut eval = eval.into_evaluator();
1801        let out_ref = StreamReference::Out(0);
1802        let a = StreamReference::In(0);
1803        let b = StreamReference::In(1);
1804        let v1 = Float(NotNan::new(3.5f64).unwrap());
1805        let v2 = Float(NotNan::new(39.347568f64).unwrap());
1806        let expected = Float(NotNan::new(3.5f64 + 39.347568f64).unwrap());
1807        accept_input!(eval, start, a, v1);
1808        accept_input!(eval, start, b, v2);
1809        eval_stream!(eval, start, 0, vec![]);
1810        assert_eq!(eval.peek_value(out_ref, &Vec::new(), 0).unwrap(), expected);
1811    }
1812
1813    #[test]
1814    fn test_bin_tuple() {
1815        let (_, eval, start) = setup(
1816            "input a: Int32\n input b: Bool\noutput c := (a, b) output d := c.0 output e := c.1",
1817        );
1818        let mut eval = eval.into_evaluator();
1819        let out_ref = StreamReference::Out(0);
1820        let out_ref0 = StreamReference::Out(1);
1821        let out_ref1 = StreamReference::Out(2);
1822        let a = StreamReference::In(0);
1823        let b = StreamReference::In(1);
1824        let v1 = Signed(1);
1825        let v2 = Bool(true);
1826        let expected = Tuple(Box::new([v1.clone(), v2.clone()]));
1827        accept_input!(eval, start, a, v1);
1828        accept_input!(eval, start, b, v2);
1829        eval_stream!(eval, start, 0, vec![]);
1830        eval_stream!(eval, start, 1, vec![]);
1831        eval_stream!(eval, start, 2, vec![]);
1832        assert_eq!(eval.peek_value(out_ref, &Vec::new(), 0).unwrap(), expected);
1833        assert_eq!(eval.peek_value(out_ref0, &Vec::new(), 0).unwrap(), v1);
1834        assert_eq!(eval.peek_value(out_ref1, &Vec::new(), 0).unwrap(), v2);
1835    }
1836
1837    #[test]
1838    fn test_regular_lookup() {
1839        let (_, eval, start) =
1840            setup("input a: UInt8 output b := a.offset(by: -1).defaults(to: 5) output x: UInt8 @5Hz := b.hold().defaults(to: 3)");
1841        let mut eval = eval.into_evaluator();
1842        let out_ref = StreamReference::Out(1);
1843        let in_ref = StreamReference::In(0);
1844        let v1 = Unsigned(1);
1845        let v2 = Unsigned(2);
1846        let v3 = Unsigned(3);
1847        accept_input!(eval, start, in_ref, v1);
1848        accept_input!(eval, start, in_ref, v2);
1849        accept_input!(eval, start, in_ref, v3);
1850        eval_stream!(eval, start, 0, vec![]);
1851        eval_stream!(eval, start, 1, vec![]);
1852        assert_eq!(eval.peek_value(out_ref, &Vec::new(), 0).unwrap(), v2)
1853    }
1854
1855    #[ignore] // triggers no longer store values
1856    #[test]
1857    fn test_trigger() {
1858        let (_, eval, start) =
1859            setup("input a: UInt8 output b := a.offset(by: -1) output x: UInt8 @5Hz := b.hold().defaults(to: 3)\n trigger x > 4");
1860        let mut eval = eval.into_evaluator();
1861        let out_ref = StreamReference::Out(1);
1862        let trig_ref = StreamReference::Out(2);
1863        let in_ref = StreamReference::In(0);
1864        let v1 = Unsigned(8);
1865        eval_stream!(eval, start, 0, vec![]);
1866        eval_stream!(eval, start, 1, vec![]);
1867        eval_stream!(eval, start, 2, vec![]);
1868        assert_eq!(
1869            eval.peek_value(out_ref, &Vec::new(), 0).unwrap(),
1870            Unsigned(3)
1871        );
1872        assert_eq!(
1873            eval.peek_value(trig_ref, &Vec::new(), 0).unwrap(),
1874            Bool(false)
1875        );
1876        accept_input!(eval, start, in_ref, v1);
1877        eval_stream!(eval, start, 0, vec![]);
1878        eval_stream!(eval, start, 1, vec![]);
1879        eval_stream!(eval, start, 2, vec![]);
1880        assert_eq!(
1881            eval.peek_value(out_ref, &Vec::new(), 0).unwrap(),
1882            Unsigned(3)
1883        );
1884        assert_eq!(
1885            eval.peek_value(trig_ref, &Vec::new(), 0).unwrap(),
1886            Bool(false)
1887        );
1888        accept_input!(eval, start, in_ref, Unsigned(17));
1889        eval_stream!(eval, start, 0, vec![]);
1890        eval_stream!(eval, start, 1, vec![]);
1891        eval_stream!(eval, start, 2, vec![]);
1892        assert_eq!(eval.peek_value(out_ref, &Vec::new(), 0).unwrap(), v1);
1893        assert_eq!(
1894            eval.peek_value(trig_ref, &Vec::new(), 0).unwrap(),
1895            Bool(true)
1896        );
1897    }
1898
1899    #[test]
1900    fn test_sum_window() {
1901        let (_, eval, mut time) = setup_time(
1902            "input a: Int16\noutput b: Int16 @0.25Hz := a.aggregate(over: 40s, using: sum)",
1903        );
1904        let mut eval = eval.into_evaluator();
1905        time += Duration::from_secs(45);
1906        let out_ref = StreamReference::Out(0);
1907        let in_ref = StreamReference::In(0);
1908        let n = 25;
1909        for v in 1..=n {
1910            accept_input_timed!(eval, in_ref, Signed(v), time);
1911            time += Duration::from_secs(1);
1912        }
1913        time += Duration::from_secs(1);
1914        // 71 secs have passed. All values should be within the window.
1915        eval_stream_timed!(eval, 0, vec![], time);
1916        let expected = Signed((n * n + n) / 2);
1917        assert_eq!(eval.peek_value(out_ref, &Vec::new(), 0).unwrap(), expected);
1918    }
1919
1920    #[test]
1921    fn test_count_window() {
1922        let (_, eval, mut time) = setup_time(
1923            "input a: UInt16\noutput b: UInt16 @0.25Hz := a.aggregate(over: 40s, using: count)",
1924        );
1925        let mut eval = eval.into_evaluator();
1926        time += Duration::from_secs(45);
1927        let out_ref = StreamReference::Out(0);
1928        let in_ref = StreamReference::In(0);
1929        let n = 25;
1930        for v in 1..=n {
1931            accept_input_timed!(eval, in_ref, Unsigned(v), time);
1932            time += Duration::from_secs(1);
1933        }
1934        time += Duration::from_secs(1);
1935        // 71 secs have passed. All values should be within the window.
1936        eval_stream_timed!(eval, 0, vec![], time);
1937        let expected = Unsigned(n);
1938        assert_eq!(eval.peek_value(out_ref, &Vec::new(), 0).unwrap(), expected);
1939    }
1940
1941    #[test]
1942    fn test_average_window() {
1943        let (_, eval, mut time) = setup_time(
1944            "input a: Float32\noutput b @0.25Hz := a.aggregate(over: 40s, using: average).defaults(to: -3.0)",
1945        );
1946        let mut eval = eval.into_evaluator();
1947        time += Duration::from_secs(45);
1948        let out_ref = StreamReference::Out(0);
1949        let in_ref = StreamReference::In(0);
1950
1951        // No time has passed. No values should be within the window. We should see the default value.
1952        eval_stream_timed!(eval, 0, vec![], time);
1953        let expected = Value::try_from(-3.0).unwrap();
1954        assert_eq!(eval.peek_value(out_ref, &Vec::new(), 0).unwrap(), expected);
1955
1956        let n = 25;
1957        for v in 1..=n {
1958            accept_input_timed!(eval, in_ref, Value::try_from(v as f64).unwrap(), time);
1959            time += Duration::from_secs(1);
1960        }
1961        time += Duration::from_secs(1);
1962
1963        // 71 secs have passed. All values should be within the window.
1964        eval_stream_timed!(eval, 0, vec![], time);
1965        let n = n as f64;
1966        let expected = Value::try_from(((n * n + n) / 2.0) / 25.0).unwrap();
1967        assert_eq!(eval.peek_value(out_ref, &Vec::new(), 0).unwrap(), expected);
1968    }
1969
1970    #[test]
1971    fn test_window_correct_bucketing() {
1972        let (_, eval, mut time) =
1973            setup_time("input a: Float32\noutput b @2Hz := a.aggregate(over: 3s, using: sum)");
1974        let mut eval = eval.into_evaluator();
1975        let out_ref = StreamReference::Out(0);
1976        let in_ref = StreamReference::In(0);
1977
1978        accept_input_timed!(eval, in_ref, Value::try_from(0 as f64).unwrap(), time);
1979
1980        time += Duration::from_millis(500);
1981        eval_stream_timed!(eval, 0, vec![], time);
1982        let expected = Value::try_from(0.0).unwrap();
1983        assert_eq!(eval.peek_value(out_ref, &Vec::new(), 0).unwrap(), expected);
1984
1985        time += Duration::from_millis(500);
1986        eval_stream_timed!(eval, 0, vec![], time);
1987        let expected = Value::try_from(0.0).unwrap();
1988        assert_eq!(eval.peek_value(out_ref, &Vec::new(), 0).unwrap(), expected);
1989
1990        //1s
1991
1992        time += Duration::from_millis(100);
1993        accept_input_timed!(eval, in_ref, Value::try_from(1 as f64).unwrap(), time);
1994
1995        time += Duration::from_millis(400);
1996        eval_stream_timed!(eval, 0, vec![], time);
1997        let expected = Value::try_from(1.0).unwrap();
1998        assert_eq!(eval.peek_value(out_ref, &Vec::new(), 0).unwrap(), expected);
1999
2000        time += Duration::from_millis(500);
2001        eval_stream_timed!(eval, 0, vec![], time);
2002        let expected = Value::try_from(1.0).unwrap();
2003        assert_eq!(eval.peek_value(out_ref, &Vec::new(), 0).unwrap(), expected);
2004
2005        //2s
2006
2007        time += Duration::from_millis(500);
2008        eval_stream_timed!(eval, 0, vec![], time);
2009        let expected = Value::try_from(1.0).unwrap();
2010        assert_eq!(eval.peek_value(out_ref, &Vec::new(), 0).unwrap(), expected);
2011
2012        time += Duration::from_millis(500);
2013        eval_stream_timed!(eval, 0, vec![], time);
2014        let expected = Value::try_from(1.0).unwrap();
2015        assert_eq!(eval.peek_value(out_ref, &Vec::new(), 0).unwrap(), expected);
2016
2017        //3s
2018
2019        time += Duration::from_millis(40);
2020        accept_input_timed!(eval, in_ref, Value::try_from(2 as f64).unwrap(), time);
2021
2022        time += Duration::from_millis(460);
2023        eval_stream_timed!(eval, 0, vec![], time);
2024        let expected = Value::try_from(3.0).unwrap();
2025        assert_eq!(eval.peek_value(out_ref, &Vec::new(), 0).unwrap(), expected);
2026
2027        time += Duration::from_millis(500);
2028        eval_stream_timed!(eval, 0, vec![], time);
2029        let expected = Value::try_from(3.0).unwrap();
2030        assert_eq!(eval.peek_value(out_ref, &Vec::new(), 0).unwrap(), expected);
2031
2032        //4s
2033
2034        time += Duration::from_millis(500);
2035        eval_stream_timed!(eval, 0, vec![], time);
2036        let expected = Value::try_from(2.0).unwrap();
2037        assert_eq!(eval.peek_value(out_ref, &Vec::new(), 0).unwrap(), expected);
2038
2039        time += Duration::from_millis(500);
2040        eval_stream_timed!(eval, 0, vec![], time);
2041        let expected = Value::try_from(2.0).unwrap();
2042        assert_eq!(eval.peek_value(out_ref, &Vec::new(), 0).unwrap(), expected);
2043
2044        //5s
2045
2046        time += Duration::from_millis(110);
2047        accept_input_timed!(eval, in_ref, Value::try_from(3 as f64).unwrap(), time);
2048
2049        time += Duration::from_millis(390);
2050        eval_stream_timed!(eval, 0, vec![], time);
2051        let expected = Value::try_from(5.0).unwrap();
2052        assert_eq!(eval.peek_value(out_ref, &Vec::new(), 0).unwrap(), expected);
2053
2054        time += Duration::from_millis(500);
2055        eval_stream_timed!(eval, 0, vec![], time);
2056        let expected = Value::try_from(5.0).unwrap();
2057        assert_eq!(eval.peek_value(out_ref, &Vec::new(), 0).unwrap(), expected);
2058    }
2059
2060    #[test]
2061    fn test_integral_window() {
2062        let (_, eval, mut time) = setup_time(
2063            "input a: Float64\noutput b: Float64 @0.25Hz := a.aggregate(over_exactly: 40s, using: integral).defaults(to: -3.0)",
2064        );
2065        let mut eval = eval.into_evaluator();
2066        time += Duration::from_secs(45);
2067        let out_ref = StreamReference::Out(0);
2068        let in_ref = StreamReference::In(0);
2069
2070        fn mv(f: f64) -> Value {
2071            Float(NotNan::new(f).unwrap())
2072        }
2073
2074        accept_input_timed!(eval, in_ref, mv(1f64), time);
2075        time += Duration::from_secs(2);
2076        accept_input_timed!(eval, in_ref, mv(5f64), time);
2077        // Value so far: (1+5) / 2 * 2 = 6
2078        time += Duration::from_secs(5);
2079        accept_input_timed!(eval, in_ref, mv(25f64), time);
2080        // Value so far: 6 + (5+25) / 2 * 5 = 6 + 75 = 81
2081        time += Duration::from_secs(1);
2082        accept_input_timed!(eval, in_ref, mv(0f64), time);
2083        // Value so far: 81 + (25+0) / 2 * 1 = 81 + 12.5 = 93.5
2084        time += Duration::from_secs(10);
2085        accept_input_timed!(eval, in_ref, mv(-40f64), time);
2086        // Value so far: 93.5 + (0+(-40)) / 2 * 10 = 93.5 - 200 = -106.5
2087        // Time passed: 2 + 5 + 1 + 10 = 18.
2088
2089        eval_stream_timed!(eval, 0, vec![], time);
2090
2091        let expected = Float(NotNan::new(-106.5).unwrap());
2092        assert_eq!(
2093            eval.peek_value(out_ref, vec![].as_slice(), 0).unwrap(),
2094            expected
2095        );
2096    }
2097
2098    #[test]
2099    fn test_integral_window2() {
2100        fn mv(f: f64) -> Value {
2101            Float(NotNan::new(f).unwrap())
2102        }
2103
2104        let (_, eval, mut time) =
2105            setup_time("input a : Int64\noutput b@1Hz := a.aggregate(over: 5s, using: integral)");
2106        let mut eval = eval.into_evaluator();
2107        let out_ref = StreamReference::Out(0);
2108        let in_ref = StreamReference::In(0);
2109
2110        accept_input_timed!(eval, in_ref, mv(0f64), time);
2111
2112        time += Duration::from_secs(1);
2113        accept_input_timed!(eval, in_ref, mv(8f64), time);
2114        eval_stream_timed!(eval, 0, vec![], time);
2115        let expected = Float(NotNan::new(4.0).unwrap());
2116        assert_eq!(
2117            eval.peek_value(out_ref, vec![].as_slice(), 0).unwrap(),
2118            expected
2119        );
2120    }
2121
2122    #[test]
2123    fn test_window_type_count() {
2124        let (_, eval, start) =
2125            setup("input a: Int32\noutput b @ 10Hz := a.aggregate(over: 0.1s, using: count)");
2126        let mut eval = eval.into_evaluator();
2127        let out_ref = StreamReference::Out(0);
2128        let _a = StreamReference::In(0);
2129        let expected = Unsigned(0);
2130        eval_stream!(eval, start, 0, vec![]);
2131        assert_eq!(eval.peek_value(out_ref, &Vec::new(), 0).unwrap(), expected);
2132    }
2133
2134    #[test]
2135    fn test_sum_window_discrete() {
2136        let (_, eval, mut time) = setup_time(
2137            "input a: Int16\noutput b: Int16 := a.aggregate(over_discrete: 6, using: sum)",
2138        );
2139        let mut eval = eval.into_evaluator();
2140        time += Duration::from_secs(45);
2141        let out_ref = StreamReference::Out(0);
2142        let in_ref = StreamReference::In(0);
2143        let n = 25;
2144        for v in 1..=n {
2145            accept_input_timed!(eval, in_ref, Signed(v), time);
2146            time += Duration::from_secs(1);
2147        }
2148        time += Duration::from_secs(1);
2149        // 71 secs have passed. All values should be within the window.
2150        eval_stream_timed!(eval, 0, vec![], time);
2151        let expected = Signed(135);
2152        //assert_eq!(eval.peek_value(in_ref, &Vec::new(), -1).unwrap(), Signed(24));
2153        assert_eq!(eval.peek_value(in_ref, &Vec::new(), 0).unwrap(), Signed(25));
2154        assert_eq!(eval.peek_value(out_ref, &Vec::new(), 0).unwrap(), expected);
2155    }
2156
2157    // New windows tests
2158    #[test]
2159    fn test_last_window_float() {
2160        let (_, eval, mut time) = setup_time(
2161            "input a: Float32\noutput b: Float32 @1Hz:= a.aggregate(over: 5s, using: last).defaults(to:0.0)",
2162        );
2163        let mut eval = eval.into_evaluator();
2164        time += Duration::from_secs(45);
2165        let out_ref = StreamReference::Out(0);
2166        let in_ref = StreamReference::In(0);
2167        let n = 25;
2168        for v in 1..=n {
2169            accept_input_timed!(eval, in_ref, Float(NotNan::new(v as f64).unwrap()), time);
2170            time += Duration::from_secs(1);
2171        }
2172        time += Duration::from_secs(1);
2173        // 71 secs have passed. All values should be within the window.
2174        eval_stream_timed!(eval, 0, vec![], time);
2175        let expected = Float(NotNan::new(25.0).unwrap());
2176        assert_eq!(
2177            eval.peek_value(in_ref, &Vec::new(), 0).unwrap(),
2178            Float(NotNan::new(25.0).unwrap())
2179        );
2180        assert_eq!(eval.peek_value(out_ref, &Vec::new(), 0).unwrap(), expected);
2181    }
2182
2183    #[test]
2184    fn test_last_window_signed() {
2185        let (_, eval, mut time) =
2186            setup_time("input a: Int32\noutput b: Int32 @1Hz:= a.aggregate(over: 20s, using: last).defaults(to:0)");
2187        let mut eval = eval.into_evaluator();
2188        time += Duration::from_secs(45);
2189        let out_ref = StreamReference::Out(0);
2190        let in_ref = StreamReference::In(0);
2191        let n = 25;
2192        for v in 1..=n {
2193            accept_input_timed!(eval, in_ref, Signed(v), time);
2194            time += Duration::from_secs(1);
2195        }
2196        time += Duration::from_secs(1);
2197        // 71 secs have passed. All values should be within the window.
2198        eval_stream_timed!(eval, 0, vec![], time);
2199        let expected = Signed(25);
2200        assert_eq!(eval.peek_value(in_ref, &Vec::new(), 0).unwrap(), Signed(25));
2201        assert_eq!(eval.peek_value(out_ref, &Vec::new(), 0).unwrap(), expected);
2202    }
2203
2204    #[test]
2205    fn test_last_window_unsigned() {
2206        let (_, eval, mut time) =
2207            setup_time("input a: UInt32\noutput b: UInt32 @1Hz:= a.aggregate(over: 20s, using: last).defaults(to:0)");
2208        let mut eval = eval.into_evaluator();
2209        time += Duration::from_secs(45);
2210        let out_ref = StreamReference::Out(0);
2211        let in_ref = StreamReference::In(0);
2212        let n = 25;
2213        for v in 1..=n {
2214            accept_input_timed!(eval, in_ref, Unsigned(v), time);
2215            time += Duration::from_secs(1);
2216        }
2217        time += Duration::from_secs(1);
2218        // 71 secs have passed. All values should be within the window.
2219        eval_stream_timed!(eval, 0, vec![], time);
2220        let expected = Unsigned(25);
2221        assert_eq!(
2222            eval.peek_value(in_ref, &Vec::new(), 0).unwrap(),
2223            Unsigned(25)
2224        );
2225        assert_eq!(eval.peek_value(out_ref, &Vec::new(), 0).unwrap(), expected);
2226    }
2227
2228    #[test]
2229    fn test_percentile_float() {
2230        for (pctl, exp) in &[
2231            ("pctl25", Value::try_from(13.0)),
2232            ("pctl75", Value::try_from(18.0)),
2233            ("pctl10", Value::try_from(11.5)),
2234            ("pctl5", Value::try_from(11.0)),
2235            ("pctl90", Value::try_from(19.5)),
2236            ("med", Value::try_from(15.5)),
2237        ] {
2238            let (_, eval, mut time) = setup_time(&format!(
2239                "input a: Float32\noutput b: Float32 @1Hz:= a.aggregate(over: 10s, using: {}).defaults(to:0.0)",
2240                pctl
2241            ));
2242            let mut eval = eval.into_evaluator();
2243            time += Duration::from_secs(45);
2244            let out_ref = StreamReference::Out(0);
2245            let in_ref = StreamReference::In(0);
2246            let n = 20;
2247            for v in 1..=n {
2248                time += Duration::from_secs(1);
2249                accept_input_timed!(eval, in_ref, Float(NotNan::new(v as f64).unwrap()), time);
2250            }
2251            // 66 secs have passed. All values should be within the window.
2252            eval_stream_timed!(eval, 0, vec![], time);
2253            assert_eq!(
2254                eval.peek_value(in_ref, &Vec::new(), 0).unwrap(),
2255                Float(NotNan::new(20.0).unwrap())
2256            );
2257            assert_eq!(
2258                eval.peek_value(out_ref, &Vec::new(), 0).unwrap(),
2259                exp.as_ref().unwrap().clone()
2260            );
2261        }
2262    }
2263
2264    #[test]
2265    fn test_percentile_float_unordered_input() {
2266        for (pctl, exp) in &[
2267            ("pctl25", Value::try_from(13.0)),
2268            ("pctl75", Value::try_from(18.0)),
2269            ("pctl10", Value::try_from(11.5)),
2270            ("pctl5", Value::try_from(11.0)),
2271            ("pctl90", Value::try_from(19.5)),
2272            ("med", Value::try_from(15.5)),
2273        ] {
2274            let (_, eval, mut time) = setup_time(&format!(
2275                "input a: Float32\noutput b: Float32 @1Hz:= a.aggregate(over: 10s, using: {}).defaults(to:0.0)",
2276                pctl
2277            ));
2278            let mut eval = eval.into_evaluator();
2279            time += Duration::from_secs(45);
2280            let out_ref = StreamReference::Out(0);
2281            let in_ref = StreamReference::In(0);
2282            let n = 20;
2283            let input_val = [
2284                1, 9, 8, 5, 4, 3, 7, 2, 10, 6, 20, 11, 19, 12, 18, 13, 17, 14, 16, 15,
2285            ];
2286            for v in 0..n {
2287                time += Duration::from_secs(1);
2288                accept_input_timed!(
2289                    eval,
2290                    in_ref,
2291                    Float(NotNan::new(input_val[v] as f64).unwrap()),
2292                    time
2293                );
2294            }
2295            // 66 secs have passed. All values should be within the window.
2296            eval_stream_timed!(eval, 0, vec![], time);
2297            assert_eq!(
2298                eval.peek_value(in_ref, &Vec::new(), 0).unwrap(),
2299                Float(NotNan::new(15.0).unwrap())
2300            );
2301            assert_eq!(
2302                eval.peek_value(out_ref, &Vec::new(), 0).unwrap(),
2303                exp.as_ref().unwrap().clone()
2304            );
2305        }
2306    }
2307
2308    #[test]
2309    fn test_percentile_signed() {
2310        for (pctl, exp) in &[
2311            ("pctl25", Signed(13)),
2312            ("pctl75", Signed(18)),
2313            ("pctl10", Signed(11)),
2314            ("pctl5", Signed(11)),
2315            ("pctl90", Signed(19)),
2316            ("med", Signed(15)),
2317        ] {
2318            let (_, eval, mut time) = setup_time(&format!(
2319                "input a: Int32\noutput b: Int32 @1Hz:= a.aggregate(over: 10s, using: {}).defaults(to:0)",
2320                pctl
2321            ));
2322            let mut eval = eval.into_evaluator();
2323            time += Duration::from_secs(45);
2324            let out_ref = StreamReference::Out(0);
2325            let in_ref = StreamReference::In(0);
2326            let n = 20;
2327            for v in 1..=n {
2328                time += Duration::from_secs(1);
2329                accept_input_timed!(eval, in_ref, Signed(v), time);
2330            }
2331            // 66 secs have passed. All values should be within the window.
2332            eval_stream_timed!(eval, 0, vec![], time);
2333            assert_eq!(
2334                eval.peek_value(out_ref, &Vec::new(), 0).unwrap(),
2335                exp.clone()
2336            );
2337        }
2338    }
2339
2340    #[test]
2341    fn test_percentile_unsigned() {
2342        for (pctl, exp) in &[
2343            ("pctl25", Unsigned(13)),
2344            ("pctl75", Unsigned(18)),
2345            ("pctl10", Unsigned(11)),
2346            ("pctl5", Unsigned(11)),
2347            ("pctl90", Unsigned(19)),
2348            ("med", Unsigned(15)),
2349        ] {
2350            let (_, eval, mut time) = setup_time(&format!(
2351                "input a: UInt32\noutput b: UInt32 @1Hz:= a.aggregate(over: 10s, using: {}).defaults(to:0)",
2352                pctl
2353            ));
2354            let mut eval = eval.into_evaluator();
2355            time += Duration::from_secs(45);
2356            let out_ref = StreamReference::Out(0);
2357            let in_ref = StreamReference::In(0);
2358            let n = 20;
2359            for v in 1..=n {
2360                time += Duration::from_secs(1);
2361                accept_input_timed!(eval, in_ref, Unsigned(v), time);
2362            }
2363            // 66 secs have passed. All values should be within the window.
2364            eval_stream_timed!(eval, 0, vec![], time);
2365            assert_eq!(
2366                eval.peek_value(out_ref, &Vec::new(), 0).unwrap(),
2367                exp.clone()
2368            );
2369        }
2370    }
2371
2372    #[test]
2373    fn test_percentile_discrete_float_unordered_input() {
2374        for (pctl, exp) in &[
2375            ("pctl25", Value::try_from(13.0)),
2376            ("pctl75", Value::try_from(18.0)),
2377            ("pctl10", Value::try_from(11.5)),
2378            ("pctl5", Value::try_from(11.0)),
2379            ("pctl90", Value::try_from(19.5)),
2380            ("med", Value::try_from(15.5)),
2381        ] {
2382            let (_, eval, mut time) = setup_time(&format!(
2383                "input a: Float32\noutput b: Float32 := a.aggregate(over_discrete: 10, using: {}).defaults(to:0.0)",
2384                pctl
2385            ));
2386            let mut eval = eval.into_evaluator();
2387            time += Duration::from_secs(45);
2388            let out_ref = StreamReference::Out(0);
2389            let in_ref = StreamReference::In(0);
2390            let n = 20;
2391            let input_val = [
2392                1, 9, 8, 5, 4, 3, 7, 2, 10, 6, 20, 11, 19, 12, 18, 13, 17, 14, 16, 15,
2393            ];
2394            for v in 0..n {
2395                time += Duration::from_secs(1);
2396                accept_input_timed!(
2397                    eval,
2398                    in_ref,
2399                    Float(NotNan::new(input_val[v] as f64).unwrap()),
2400                    time
2401                );
2402            }
2403            // 66 secs have passed. All values should be within the window.
2404            eval_stream_timed!(eval, 0, vec![], time);
2405            assert_eq!(
2406                eval.peek_value(in_ref, &Vec::new(), 0).unwrap(),
2407                Float(NotNan::new(15.0).unwrap())
2408            );
2409            assert_eq!(
2410                eval.peek_value(out_ref, &Vec::new(), 0).unwrap(),
2411                exp.as_ref().unwrap().clone()
2412            );
2413        }
2414    }
2415
2416    #[test]
2417    fn test_var_equal_input() {
2418        let (_, eval, mut time) =
2419            setup_time("input a: Float32\noutput b: Float32 @1Hz:= a.aggregate(over: 5s, using: var).defaults(to:0.0)");
2420        let mut eval = eval.into_evaluator();
2421        time += Duration::from_secs(45);
2422        let out_ref = StreamReference::Out(0);
2423        let in_ref = StreamReference::In(0);
2424        let n = 25;
2425        for _ in 1..=n {
2426            accept_input_timed!(eval, in_ref, Float(NotNan::new(10_f64).unwrap()), time);
2427            time += Duration::from_secs(1);
2428        }
2429        time += Duration::from_secs(1);
2430        // 71 secs have passed. All values should be within the window.
2431        eval_stream_timed!(eval, 0, vec![], time);
2432        let expected = Float(NotNan::new(0.0).unwrap());
2433        assert_eq!(
2434            eval.peek_value(in_ref, &Vec::new(), 0).unwrap(),
2435            Float(NotNan::new(10.0).unwrap())
2436        );
2437        assert_eq!(eval.peek_value(out_ref, &Vec::new(), 0).unwrap(), expected);
2438    }
2439
2440    #[test]
2441    fn test_var_window() {
2442        for (duration, exp) in &[
2443            ("2", Value::try_from(0.25)),
2444            ("3", Value::try_from(2.0 / 3.0)),
2445            ("4", Value::try_from(1.25)),
2446            ("5", Value::try_from(2.0)),
2447            ("6", Value::try_from(17.5 / 6.0)),
2448            ("7", Value::try_from(4.0)),
2449            ("8", Value::try_from(5.25)),
2450            ("9", Value::try_from(60.0 / 9.0)),
2451            ("10", Value::try_from(8.25)),
2452            ("11", Value::try_from(10.0)),
2453        ] {
2454            let (_, eval, mut time) = setup_time(&format!(
2455                "input a: Float32\noutput b: Float32 @1Hz:= a.aggregate(over: {}s, using: var).defaults(to:0.0)",
2456                duration
2457            ));
2458            let mut eval = eval.into_evaluator();
2459            time += Duration::from_secs(45);
2460            let out_ref = StreamReference::Out(0);
2461            let in_ref = StreamReference::In(0);
2462            let n = 20;
2463            for v in 1..=n {
2464                time += Duration::from_secs(1);
2465                accept_input_timed!(eval, in_ref, Float(NotNan::new(v as f64).unwrap()), time);
2466            }
2467            // 66 secs have passed. All values should be within the window.
2468            eval_stream_timed!(eval, 0, vec![], time);
2469            assert_eq!(
2470                eval.peek_value(in_ref, &Vec::new(), 0).unwrap(),
2471                Float(NotNan::new(20.0).unwrap())
2472            );
2473            assert_float_eq!(
2474                eval.peek_value(out_ref, &Vec::new(), 0).unwrap(),
2475                exp.as_ref().unwrap().clone()
2476            );
2477        }
2478    }
2479
2480    #[test]
2481    fn test_sd_window() {
2482        for (duration, exp) in &[
2483            ("2", Value::try_from(0.25f64.sqrt())),
2484            ("3", Value::try_from((2.0 / 3.0f64).sqrt())),
2485            ("4", Value::try_from(1.25f64.sqrt())),
2486            ("5", Value::try_from(2.0f64.sqrt())),
2487            ("6", Value::try_from((17.5 / 6.0f64).sqrt())),
2488            ("7", Value::try_from(4.0f64.sqrt())),
2489            ("8", Value::try_from(5.25f64.sqrt())),
2490            ("9", Value::try_from((60.0 / 9.0f64).sqrt())),
2491            ("10", Value::try_from(8.25f64.sqrt())),
2492            ("11", Value::try_from(10.0f64.sqrt())),
2493        ] {
2494            let (_, eval, mut time) = setup_time(&format!(
2495                "input a: Float32\noutput b: Float32 @1Hz:= a.aggregate(over: {}s, using: sd).defaults(to:0.0)",
2496                duration
2497            ));
2498            let mut eval = eval.into_evaluator();
2499            time += Duration::from_secs(45);
2500            let out_ref = StreamReference::Out(0);
2501            let in_ref = StreamReference::In(0);
2502            let n = 20;
2503            for v in 1..=n {
2504                time += Duration::from_secs(1);
2505                accept_input_timed!(eval, in_ref, Float(NotNan::new(v as f64).unwrap()), time);
2506            }
2507            // 66 secs have passed. All values should be within the window.
2508            eval_stream_timed!(eval, 0, vec![], time);
2509            assert_float_eq!(
2510                eval.peek_value(in_ref, &Vec::new(), 0).unwrap(),
2511                Float(NotNan::new(20.0).unwrap())
2512            );
2513            assert_float_eq!(
2514                eval.peek_value(out_ref, &Vec::new(), 0).unwrap(),
2515                exp.as_ref().unwrap().clone()
2516            );
2517        }
2518    }
2519
2520    #[test]
2521    fn test_cov() {
2522        let (_, eval, mut time) =
2523            setup_time("input in: Float32\n input in2: Float32\noutput t@in&in2:= (in,in2)\n output out: Float32 @1Hz := t.aggregate(over: 6s, using: cov).defaults(to: 1337.0)");
2524        let mut eval = eval.into_evaluator();
2525        time += Duration::from_secs(45);
2526        let out_ref = StreamReference::Out(1);
2527        let in_ref = StreamReference::In(0);
2528        let in_ref_2 = StreamReference::In(1);
2529        let n = 20;
2530        for v in 1..=n {
2531            time += Duration::from_secs(1);
2532            accept_input_timed!(eval, in_ref, Value::try_from(v as f64).unwrap(), time);
2533            accept_input_timed!(eval, in_ref_2, Value::try_from(v as f64).unwrap(), time);
2534            eval_stream_timed!(eval, 0, vec![], time);
2535        }
2536        // 66 secs have passed. All values should be within the window.
2537        eval_stream_timed!(eval, 1, vec![], time);
2538        let expected = Float(NotNan::new(17.5 / 6.0).unwrap());
2539        assert_float_eq!(
2540            eval.peek_value(in_ref, &Vec::new(), 0).unwrap(),
2541            Float(NotNan::new(20.0).unwrap())
2542        );
2543        assert_float_eq!(
2544            eval.peek_value(in_ref_2, &Vec::new(), 0).unwrap(),
2545            Float(NotNan::new(20.0).unwrap())
2546        );
2547        assert_float_eq!(eval.peek_value(out_ref, &Vec::new(), 0).unwrap(), expected);
2548    }
2549
2550    #[test]
2551    fn test_cov_2() {
2552        let (_, eval, mut time) =
2553            setup_time("input in: Float32\n input in2: Float32\noutput t@in&in2:= (in,in2)\n output out: Float32 @1Hz := t.aggregate(over: 5s, using: cov).defaults(to: 1337.0)");
2554        let mut eval = eval.into_evaluator();
2555        time += Duration::from_secs(45);
2556        let out_ref = StreamReference::Out(1);
2557        let in_ref = StreamReference::In(0);
2558        let in_ref_2 = StreamReference::In(1);
2559        let n = 20;
2560        for v in 1..=n {
2561            accept_input_timed!(eval, in_ref, Value::try_from(v as f64).unwrap(), time);
2562            accept_input_timed!(eval, in_ref_2, Value::try_from(16.0).unwrap(), time);
2563            eval_stream_timed!(eval, 0, vec![], time);
2564            time += Duration::from_secs(1);
2565        }
2566        time += Duration::from_secs(1);
2567        // 66 secs have passed. All values should be within the window.
2568        eval_stream_timed!(eval, 1, vec![], time);
2569        let expected = Float(NotNan::new(0.0).unwrap());
2570        assert_eq!(
2571            eval.peek_value(in_ref, &Vec::new(), 0).unwrap(),
2572            Float(NotNan::new(20.0).unwrap())
2573        );
2574        assert_eq!(
2575            eval.peek_value(in_ref_2, &Vec::new(), 0).unwrap(),
2576            Float(NotNan::new(16.0).unwrap())
2577        );
2578        assert_eq!(eval.peek_value(out_ref, &Vec::new(), 0).unwrap(), expected);
2579    }
2580
2581    #[test]
2582    fn test_var_discrete() {
2583        for (duration, exp) in &[
2584            ("2", Value::try_from(0.25)),
2585            ("3", Value::try_from(2.0 / 3.0)),
2586            ("4", Value::try_from(1.25)),
2587            ("5", Value::try_from(2.0)),
2588            ("6", Value::try_from(17.5 / 6.0)),
2589            ("7", Value::try_from(4.0)),
2590            ("8", Value::try_from(5.25)),
2591            ("9", Value::try_from(60.0 / 9.0)),
2592            ("10", Value::try_from(8.25)),
2593            ("11", Value::try_from(10.0)),
2594        ] {
2595            let (_, eval, mut time) = setup_time(&format!(
2596                "input a: Float32\noutput b: Float32 := a.aggregate(over_discrete: {}, using: var).defaults(to:0.0)",
2597                duration
2598            ));
2599            let mut eval = eval.into_evaluator();
2600            time += Duration::from_secs(45);
2601            let out_ref = StreamReference::Out(0);
2602            let in_ref = StreamReference::In(0);
2603            let n = 20;
2604            for v in 1..=n {
2605                accept_input_timed!(eval, in_ref, Float(NotNan::new(v as f64).unwrap()), time);
2606                time += Duration::from_secs(1);
2607            }
2608            time += Duration::from_secs(1);
2609            // 66 secs have passed. All values should be within the window.
2610            eval_stream_timed!(eval, 0, vec![], time);
2611            assert_eq!(
2612                eval.peek_value(in_ref, &Vec::new(), 0).unwrap(),
2613                Float(NotNan::new(20.0).unwrap())
2614            );
2615            assert_float_eq!(
2616                eval.peek_value(out_ref, &Vec::new(), 0).unwrap(),
2617                exp.as_ref().unwrap().clone()
2618            );
2619        }
2620    }
2621
2622    #[test]
2623    fn test_sd_discrete() {
2624        for (duration, exp) in &[
2625            ("2", Value::try_from(0.25f64.sqrt())),
2626            ("3", Value::try_from((2.0 / 3.0f64).sqrt())),
2627            ("4", Value::try_from(1.25f64.sqrt())),
2628            ("5", Value::try_from(2.0f64.sqrt())),
2629            ("6", Value::try_from((17.5 / 6.0f64).sqrt())),
2630            ("7", Value::try_from(4.0f64.sqrt())),
2631            ("8", Value::try_from(5.25f64.sqrt())),
2632            ("9", Value::try_from((60.0 / 9.0f64).sqrt())),
2633            ("10", Value::try_from(8.25f64.sqrt())),
2634            ("11", Value::try_from(10.0f64.sqrt())),
2635        ] {
2636            let (_, eval, mut time) = setup_time(&format!(
2637                "input a: Float32\noutput b: Float32 := a.aggregate(over_discrete: {}, using: sd).defaults(to:0.0)",
2638                duration
2639            ));
2640            let mut eval = eval.into_evaluator();
2641            time += Duration::from_secs(45);
2642            let out_ref = StreamReference::Out(0);
2643            let in_ref = StreamReference::In(0);
2644            let n = 20;
2645            for v in 1..=n {
2646                accept_input_timed!(eval, in_ref, Float(NotNan::new(v as f64).unwrap()), time);
2647                time += Duration::from_secs(1);
2648            }
2649            time += Duration::from_secs(1);
2650            // 66 secs have passed. All values should be within the window.
2651            eval_stream_timed!(eval, 0, vec![], time);
2652            assert_eq!(
2653                eval.peek_value(in_ref, &Vec::new(), 0).unwrap(),
2654                Float(NotNan::new(20.0).unwrap())
2655            );
2656            assert_eq!(
2657                eval.peek_value(out_ref, &Vec::new(), 0).unwrap(),
2658                exp.as_ref().unwrap().clone()
2659            );
2660        }
2661    }
2662
2663    #[test]
2664    fn test_cases_window_discrete_float() {
2665        for (aggr, exp, default) in &[
2666            ("sum", Value::try_from(115.0), false),
2667            ("min", Value::try_from(21.0), true),
2668            ("max", Value::try_from(25.0), true),
2669            ("avg", Value::try_from(23.0), true),
2670            ("integral", Value::try_from(92.0), false),
2671            ("last", Value::try_from(25.0), true),
2672            ("med", Value::try_from(23.0), true),
2673            ("pctl20", Value::try_from(21.5), true),
2674        ] {
2675            let mut spec =
2676                String::from("input a: Float32\noutput b := a.aggregate(over_discrete: 5, using: ");
2677            spec += aggr;
2678            spec += ")";
2679            if *default {
2680                spec += ".defaults(to:1337.0)"
2681            }
2682            let (_, eval, mut time) = setup_time(&spec);
2683            let mut eval = eval.into_evaluator();
2684            time += Duration::from_secs(45);
2685            let out_ref = StreamReference::Out(0);
2686            let in_ref = StreamReference::In(0);
2687            let n = 25;
2688            for v in 1..=n {
2689                accept_input_timed!(eval, in_ref, Value::try_from(v as f64).unwrap(), time);
2690                time += Duration::from_secs(1);
2691            }
2692            time += Duration::from_secs(1);
2693            // 71 secs have passed. All values should be within the window.
2694            eval_stream_timed!(eval, 0, vec![], time);
2695            let expected = exp.as_ref().unwrap().clone();
2696            assert_eq!(eval.peek_value(out_ref, &Vec::new(), 0).unwrap(), expected);
2697        }
2698    }
2699
2700    #[test]
2701    fn test_cases_window_discrete_signed() {
2702        for (aggr, exp, default) in &[
2703            ("sum", Signed(115), false),
2704            ("count", Unsigned(5), false),
2705            ("min", Signed(21), true),
2706            ("max", Signed(25), true),
2707            ("avg", Signed(23), true),
2708            ("integral", Value::try_from(92.0).unwrap(), false),
2709            ("last", Signed(25), true),
2710            ("med", Signed(23), true),
2711            ("pctl20", Signed(21), true),
2712        ] {
2713            let mut spec =
2714                String::from("input a: Int16\noutput b := a.aggregate(over_discrete: 5, using: ");
2715            spec += aggr;
2716            spec += ")";
2717            if *default {
2718                spec += ".defaults(to:1337)"
2719            }
2720            let (_, eval, mut time) = setup_time(&spec);
2721            let mut eval = eval.into_evaluator();
2722            time += Duration::from_secs(45);
2723            let out_ref = StreamReference::Out(0);
2724            let in_ref = StreamReference::In(0);
2725            let n = 25;
2726            for v in 1..=n {
2727                accept_input_timed!(eval, in_ref, Signed(v), time);
2728                time += Duration::from_secs(1);
2729            }
2730            time += Duration::from_secs(1);
2731            // 71 secs have passed. All values should be within the window.
2732            eval_stream_timed!(eval, 0, vec![], time);
2733            let expected = exp.clone();
2734            assert_eq!(eval.peek_value(out_ref, &Vec::new(), 0).unwrap(), expected);
2735        }
2736    }
2737
2738    #[test]
2739    fn test_cases_window_discrete_unsigned() {
2740        for (aggr, exp, default) in &[
2741            ("sum", Unsigned(115), false),
2742            ("count", Unsigned(5), false),
2743            ("min", Unsigned(21), true),
2744            ("max", Unsigned(25), true),
2745            ("avg", Unsigned(23), true),
2746            ("integral", Value::try_from(92.0).unwrap(), false),
2747            ("last", Unsigned(25), true),
2748            ("med", Unsigned(23), true),
2749            ("pctl20", Unsigned(21), true),
2750        ] {
2751            let mut spec =
2752                String::from("input a: UInt16\noutput b := a.aggregate(over_discrete: 5, using: ");
2753            spec += aggr;
2754            spec += ")";
2755            if *default {
2756                spec += ".defaults(to:1337)"
2757            }
2758            let (_, eval, mut time) = setup_time(&spec);
2759            let mut eval = eval.into_evaluator();
2760            time += Duration::from_secs(45);
2761            let out_ref = StreamReference::Out(0);
2762            let in_ref = StreamReference::In(0);
2763            let n = 25;
2764            for v in 1..=n {
2765                accept_input_timed!(eval, in_ref, Unsigned(v), time);
2766                time += Duration::from_secs(1);
2767            }
2768            time += Duration::from_secs(1);
2769            // 71 secs have passed. All values should be within the window.
2770            eval_stream_timed!(eval, 0, vec![], time);
2771            let expected = exp.clone();
2772            assert_eq!(eval.peek_value(out_ref, &Vec::new(), 0).unwrap(), expected);
2773        }
2774    }
2775
2776    #[test]
2777    fn test_filter() {
2778        let (_, eval, start) = setup(
2779            "input a: Int32\n\
2780                   output b eval when a == 42 with a + 8",
2781        );
2782        let mut eval = eval.into_evaluator();
2783        let out_ref = StreamReference::Out(0);
2784        let in_ref = StreamReference::In(0);
2785        accept_input!(eval, start, in_ref, Signed(15));
2786        eval_stream!(eval, start, out_ref.out_ix(), vec![]);
2787        assert_eq!(eval.peek_value(out_ref, &Vec::new(), 0), Option::None);
2788
2789        accept_input!(eval, start, in_ref, Signed(42));
2790        eval_stream!(eval, start, out_ref.out_ix(), vec![]);
2791        assert_eq!(
2792            eval.peek_value(out_ref, &Vec::new(), 0).unwrap(),
2793            Signed(50)
2794        );
2795    }
2796
2797    #[test]
2798    fn test_spawn_eventbased() {
2799        let (_, eval, start) = setup(
2800            "input a: Int32\n\
2801                  output b(x: Int32) spawn with a eval with x + a",
2802        );
2803        let mut eval = eval.into_evaluator();
2804        let out_ref = StreamReference::Out(0);
2805        let in_ref = StreamReference::In(0);
2806        accept_input!(eval, start, in_ref, Signed(15));
2807        spawn_stream!(eval, start, out_ref);
2808
2809        assert!(stream_has_instance!(eval, out_ref, vec![Signed(15)]));
2810
2811        eval_stream_instances!(eval, start, out_ref);
2812        assert_eq!(
2813            eval.peek_value(out_ref, &[Signed(15)], 0).unwrap(),
2814            Signed(30)
2815        );
2816    }
2817
2818    #[test]
2819    fn test_spawn_timedriven() {
2820        let (_, eval, mut time) = setup_time(
2821            "input a: Int32\n\
2822                  output b(x: Int32) spawn with a eval @1Hz with x + a.hold(or: 42)",
2823        );
2824        let mut eval = eval.into_evaluator();
2825        let out_ref = StreamReference::Out(0);
2826        let in_ref = StreamReference::In(0);
2827
2828        time += Duration::from_secs(5);
2829        accept_input_timed!(eval, in_ref, Signed(15), time);
2830        spawn_stream_timed!(eval, time, out_ref);
2831        assert!(stream_has_instance!(eval, out_ref, vec![Signed(15)]));
2832
2833        time += Duration::from_secs(1);
2834        let mut schedule = eval.dyn_schedule.borrow_mut();
2835        let next_due = schedule.get_next_deadline_due().unwrap();
2836        let next_deadline = schedule.get_next_deadline(time).unwrap();
2837        assert_eq!(next_due, Duration::from_secs(6));
2838        assert_eq!(
2839            next_deadline,
2840            DynamicDeadline {
2841                due: Duration::from_secs(6),
2842                tasks: vec![EvaluationTask::Evaluate(out_ref.out_ix(), vec![Signed(15)])]
2843            }
2844        );
2845
2846        eval_stream_timed!(eval, out_ref.out_ix(), vec![Signed(15)], time);
2847        assert_eq!(
2848            eval.peek_value(out_ref, &[Signed(15)], 0).unwrap(),
2849            Signed(30)
2850        );
2851    }
2852
2853    #[test]
2854    fn test_spawn_eventbased_unit() {
2855        let (_, eval, start) = setup(
2856            "input a: Int32\n\
2857                  output b spawn when a == 42 eval with a",
2858        );
2859        let mut eval = eval.into_evaluator();
2860        let out_ref = StreamReference::Out(0);
2861        let in_ref = StreamReference::In(0);
2862        accept_input!(eval, start, in_ref, Signed(15));
2863        spawn_stream!(eval, start, out_ref);
2864
2865        assert!(!stream_has_instance!(eval, out_ref, Vec::<Value>::new()));
2866
2867        accept_input!(eval, start, in_ref, Signed(42));
2868        spawn_stream!(eval, start, out_ref);
2869
2870        assert!(stream_has_instance!(eval, out_ref, Vec::<Value>::new()));
2871
2872        eval_stream_instances!(eval, start, out_ref);
2873        assert_eq!(
2874            eval.peek_value(out_ref, &Vec::<Value>::new(), 0).unwrap(),
2875            Signed(42)
2876        );
2877    }
2878
2879    #[test]
2880    fn test_spawn_timedriven_unit() {
2881        let (_, eval, mut time) = setup_time(
2882            "input a: Int32\n\
2883                  output b spawn when a == 42 eval @1Hz with a.hold(or: 42)",
2884        );
2885        let mut eval = eval.into_evaluator();
2886        let out_ref = StreamReference::Out(0);
2887        let in_ref = StreamReference::In(0);
2888
2889        time += Duration::from_secs(5);
2890        accept_input_timed!(eval, in_ref, Signed(15), time);
2891        spawn_stream_timed!(eval, time, out_ref);
2892        assert!(!stream_has_instance!(eval, out_ref, Vec::<Value>::new()));
2893
2894        time += Duration::from_secs(5);
2895        accept_input_timed!(eval, in_ref, Signed(42), time);
2896        spawn_stream_timed!(eval, time, out_ref);
2897        assert!(stream_has_instance!(eval, out_ref, Vec::<Value>::new()));
2898
2899        time += Duration::from_secs(1);
2900        let mut schedule = eval.dyn_schedule.borrow_mut();
2901        let next_due = schedule.get_next_deadline_due().unwrap();
2902        let next_deadline = schedule.get_next_deadline(time).unwrap();
2903        assert_eq!(next_due, Duration::from_secs(11));
2904        assert_eq!(
2905            next_deadline,
2906            DynamicDeadline {
2907                due: Duration::from_secs(11),
2908                tasks: vec![EvaluationTask::Evaluate(out_ref.out_ix(), vec![])]
2909            }
2910        );
2911
2912        eval_stream_timed!(eval, out_ref.out_ix(), vec![], time);
2913        assert_eq!(eval.peek_value(out_ref, &[], 0).unwrap(), Signed(42));
2914    }
2915
2916    // Window access a unit parameterized stream before it is spawned
2917    #[test]
2918    fn test_spawn_window_unit() {
2919        let (_, eval, mut time) = setup_time(
2920            "input a: Int32\n\
2921                  output b spawn when a == 42 eval with a\n\
2922                  output c @1Hz := b.aggregate(over: 1s, using: sum)",
2923        );
2924        let mut eval = eval.into_evaluator();
2925        let b_ref = StreamReference::Out(0);
2926        let c_ref = StreamReference::Out(1);
2927        let in_ref = StreamReference::In(0);
2928
2929        //Stream is not spawned but c produced initial value
2930        time += Duration::from_secs(1);
2931        accept_input_timed!(eval, in_ref, Signed(15), time);
2932        spawn_stream_timed!(eval, time, b_ref);
2933        assert!(!stream_has_instance!(eval, b_ref, Vec::<Value>::new()));
2934        eval_stream_timed!(eval, c_ref.out_ix(), vec![], time);
2935        assert_eq!(eval.peek_value(c_ref, &[], 0).unwrap(), Signed(0));
2936
2937        //Stream is spawned
2938        time += Duration::from_millis(200);
2939        accept_input_timed!(eval, in_ref, Signed(42), time);
2940        spawn_stream_timed!(eval, time, b_ref);
2941        assert!(stream_has_instance!(eval, b_ref, Vec::<Value>::new()));
2942        eval_stream_instances_timed!(eval, time, b_ref);
2943        assert_eq!(eval.peek_value(b_ref, &[], 0).unwrap(), Signed(42));
2944
2945        //Stream gets new value
2946        time += Duration::from_millis(200);
2947        accept_input_timed!(eval, in_ref, Signed(18), time);
2948        eval_stream_instances_timed!(eval, time, b_ref);
2949        assert_eq!(eval.peek_value(b_ref, &[], 0).unwrap(), Signed(18));
2950
2951        //Stream gets new value
2952        time += Duration::from_millis(200);
2953        accept_input_timed!(eval, in_ref, Signed(17), time);
2954        eval_stream_instances_timed!(eval, time, b_ref);
2955        assert_eq!(eval.peek_value(b_ref, &[], 0).unwrap(), Signed(17));
2956
2957        //Stream gets new value. Window is evaluated.
2958        time += Duration::from_millis(400);
2959        accept_input_timed!(eval, in_ref, Signed(3), time);
2960        eval_stream_instances_timed!(eval, time, b_ref);
2961        assert_eq!(eval.peek_value(b_ref, &[], 0).unwrap(), Signed(3));
2962        eval_stream_timed!(eval, c_ref.out_ix(), vec![], time);
2963        assert_eq!(eval.peek_value(c_ref, &[], 0).unwrap(), Signed(80));
2964    }
2965
2966    #[test]
2967    fn test_both_parameterized_window() {
2968        let (_, eval, mut time) = setup_time(
2969            "input a: Int32\n\
2970                  output b(p) spawn with a eval with p+a\n\
2971                  output c(p) spawn with a eval @1Hz with b(p).aggregate(over: 2s, using: sum)",
2972        );
2973        let mut eval = eval.into_evaluator();
2974        let b_ref = StreamReference::Out(0);
2975        let c_ref = StreamReference::Out(1);
2976        let in_ref = StreamReference::In(0);
2977
2978        time += Duration::from_secs(1);
2979        accept_input_timed!(eval, in_ref, Signed(15), time);
2980        spawn_stream_timed!(eval, time, b_ref);
2981        spawn_stream_timed!(eval, time, c_ref);
2982        assert!(stream_has_instance!(eval, b_ref, vec![Signed(15)]));
2983        assert!(stream_has_instance!(eval, c_ref, vec![Signed(15)]));
2984        eval_stream_timed!(eval, b_ref.out_ix(), vec![Signed(15)], time);
2985        eval_stream_timed!(eval, c_ref.out_ix(), vec![Signed(15)], time);
2986        assert_eq!(
2987            eval.peek_value(b_ref, &[Signed(15)], 0).unwrap(),
2988            Signed(30)
2989        );
2990        assert_eq!(
2991            eval.peek_value(c_ref, &[Signed(15)], 0).unwrap(),
2992            Signed(30)
2993        );
2994
2995        time += Duration::from_secs(1);
2996        accept_input_timed!(eval, in_ref, Signed(5), time);
2997        spawn_stream_timed!(eval, time, b_ref);
2998        spawn_stream_timed!(eval, time, c_ref);
2999        assert!(stream_has_instance!(eval, b_ref, vec![Signed(5)]));
3000        assert!(stream_has_instance!(eval, c_ref, vec![Signed(5)]));
3001        eval_stream_timed!(eval, b_ref.out_ix(), vec![Signed(5)], time);
3002        eval_stream_timed!(eval, c_ref.out_ix(), vec![Signed(5)], time);
3003        eval_stream_timed!(eval, b_ref.out_ix(), vec![Signed(15)], time);
3004        eval_stream_timed!(eval, c_ref.out_ix(), vec![Signed(15)], time);
3005        assert_eq!(eval.peek_value(b_ref, &[Signed(5)], 0).unwrap(), Signed(10));
3006        assert_eq!(eval.peek_value(c_ref, &[Signed(5)], 0).unwrap(), Signed(10));
3007        assert_eq!(
3008            eval.peek_value(b_ref, &[Signed(15)], 0).unwrap(),
3009            Signed(20)
3010        );
3011        assert_eq!(
3012            eval.peek_value(c_ref, &[Signed(15)], 0).unwrap(),
3013            Signed(50)
3014        );
3015
3016        time += Duration::from_secs(1);
3017        accept_input_timed!(eval, in_ref, Signed(5), time);
3018        eval_stream_timed!(eval, b_ref.out_ix(), vec![Signed(5)], time);
3019        eval_stream_timed!(eval, c_ref.out_ix(), vec![Signed(5)], time);
3020        eval_stream_timed!(eval, b_ref.out_ix(), vec![Signed(15)], time);
3021        eval_stream_timed!(eval, c_ref.out_ix(), vec![Signed(15)], time);
3022        assert_eq!(eval.peek_value(b_ref, &[Signed(5)], 0).unwrap(), Signed(10));
3023        assert_eq!(eval.peek_value(c_ref, &[Signed(5)], 0).unwrap(), Signed(20));
3024        assert_eq!(
3025            eval.peek_value(b_ref, &[Signed(15)], 0).unwrap(),
3026            Signed(20)
3027        );
3028        assert_eq!(
3029            eval.peek_value(c_ref, &[Signed(15)], 0).unwrap(),
3030            Signed(40)
3031        );
3032    }
3033
3034    #[test]
3035    fn test_spawn_window_unit3() {
3036        let (_, eval, mut time) = setup_time(
3037            "input a: Int32\n\
3038                  output b(p) spawn with a eval with p+a\n\
3039                  output c spawn when a == 5 eval @1Hz with b(15).aggregate(over: 2s, using: sum)",
3040        );
3041        let mut eval = eval.into_evaluator();
3042        let b_ref = StreamReference::Out(0);
3043        let c_ref = StreamReference::Out(1);
3044        let in_ref = StreamReference::In(0);
3045        let empty: Vec<Value> = vec![];
3046
3047        time += Duration::from_secs(1);
3048        accept_input_timed!(eval, in_ref, Signed(15), time);
3049        spawn_stream_timed!(eval, time, b_ref);
3050        spawn_stream_timed!(eval, time, c_ref);
3051        assert!(stream_has_instance!(eval, b_ref, vec![Signed(15)]));
3052        assert!(!stream_has_instance!(eval, c_ref, empty));
3053        eval_stream_timed!(eval, b_ref.out_ix(), vec![Signed(15)], time);
3054        assert_eq!(
3055            eval.peek_value(b_ref, &[Signed(15)], 0).unwrap(),
3056            Signed(30)
3057        );
3058
3059        time += Duration::from_secs(1);
3060        eval.new_cycle(time);
3061        accept_input_timed!(eval, in_ref, Signed(5), time);
3062        spawn_stream_timed!(eval, time, b_ref);
3063        spawn_stream_timed!(eval, time, c_ref);
3064        assert!(stream_has_instance!(eval, b_ref, vec![Signed(5)]));
3065        assert!(stream_has_instance!(eval, c_ref, empty));
3066        eval_stream_timed!(eval, b_ref.out_ix(), vec![Signed(5)], time);
3067        eval_stream_timed!(eval, b_ref.out_ix(), vec![Signed(15)], time);
3068        eval_stream_timed!(eval, c_ref.out_ix(), vec![], time);
3069        assert_eq!(eval.peek_value(b_ref, &[Signed(5)], 0).unwrap(), Signed(10));
3070        assert_eq!(
3071            eval.peek_value(b_ref, &[Signed(15)], 0).unwrap(),
3072            Signed(20)
3073        );
3074        assert_eq!(eval.peek_value(c_ref, &[], 0).unwrap(), Signed(20));
3075
3076        time += Duration::from_secs(1);
3077        eval.new_cycle(time);
3078        accept_input_timed!(eval, in_ref, Signed(5), time);
3079        eval_stream_timed!(eval, b_ref.out_ix(), vec![Signed(5)], time);
3080        eval_stream_timed!(eval, b_ref.out_ix(), vec![Signed(15)], time);
3081        eval_stream_timed!(eval, c_ref.out_ix(), vec![], time);
3082        assert_eq!(eval.peek_value(b_ref, &[Signed(5)], 0).unwrap(), Signed(10));
3083        assert_eq!(
3084            eval.peek_value(b_ref, &[Signed(15)], 0).unwrap(),
3085            Signed(20)
3086        );
3087        assert_eq!(eval.peek_value(c_ref, &[], 0).unwrap(), Signed(40));
3088    }
3089
3090    // Window access a unit parameterized stream after it is spawned
3091    #[test]
3092    fn test_spawn_window_unit2() {
3093        let (_, eval, mut time) = setup_time(
3094            "input a: Int32\n\
3095                  output b spawn when a == 42 eval with a\n\
3096                  output c @1Hz := b.aggregate(over: 1s, using: sum)",
3097        );
3098        let mut eval = eval.into_evaluator();
3099        let b_ref = StreamReference::Out(0);
3100        let c_ref = StreamReference::Out(1);
3101        let in_ref = StreamReference::In(0);
3102
3103        //Stream is spawned
3104        time += Duration::from_millis(200);
3105        accept_input_timed!(eval, in_ref, Signed(42), time);
3106        spawn_stream_timed!(eval, time, b_ref);
3107        assert!(stream_has_instance!(eval, b_ref, Vec::<Value>::new()));
3108        eval_stream_instances_timed!(eval, time, b_ref);
3109        assert_eq!(eval.peek_value(b_ref, &[], 0).unwrap(), Signed(42));
3110
3111        //Stream gets new value
3112        time += Duration::from_millis(200);
3113        accept_input_timed!(eval, in_ref, Signed(18), time);
3114        eval_stream_instances_timed!(eval, time, b_ref);
3115        assert_eq!(eval.peek_value(b_ref, &[], 0).unwrap(), Signed(18));
3116
3117        //Stream gets new value
3118        time += Duration::from_millis(200);
3119        accept_input_timed!(eval, in_ref, Signed(17), time);
3120        eval_stream_instances_timed!(eval, time, b_ref);
3121        assert_eq!(eval.peek_value(b_ref, &[], 0).unwrap(), Signed(17));
3122
3123        //Stream gets new value. Window is evaluated.
3124        time += Duration::from_millis(400);
3125        accept_input_timed!(eval, in_ref, Signed(3), time);
3126        eval_stream_instances_timed!(eval, time, b_ref);
3127        assert_eq!(eval.peek_value(b_ref, &[], 0).unwrap(), Signed(3));
3128        eval_stream_timed!(eval, c_ref.out_ix(), vec![], time);
3129        assert_eq!(eval.peek_value(c_ref, &[], 0).unwrap(), Signed(80));
3130    }
3131
3132    // p = true -> only gets 42 values
3133    // p = false -> get als remaining values
3134    #[test]
3135    fn test_spawn_window_parameterized() {
3136        let (_, eval, mut time) = setup_time(
3137            "input a: Int32\n\
3138                  output b(p: Bool) spawn with a == 42 eval when !p || a == 42 with a\n\
3139                  output c @1Hz := b(false).aggregate(over: 1s, using: sum)\n\
3140                  output d @1Hz := b(true).aggregate(over: 1s, using: sum)",
3141        );
3142        let mut eval = eval.into_evaluator();
3143        let b_ref = StreamReference::Out(0);
3144        let c_ref = StreamReference::Out(1);
3145        let d_ref = StreamReference::Out(2);
3146        let in_ref = StreamReference::In(0);
3147
3148        //Instance b(false) is spawned
3149        time += Duration::from_millis(500);
3150        accept_input_timed!(eval, in_ref, Signed(15), time);
3151        spawn_stream_timed!(eval, time, b_ref);
3152        assert!(stream_has_instance!(eval, b_ref, vec![Bool(false)]));
3153        assert!(!stream_has_instance!(eval, b_ref, vec![Bool(true)]));
3154        eval_stream_instances_timed!(eval, time, b_ref);
3155        assert_eq!(
3156            eval.peek_value(b_ref, &[Bool(false)], 0).unwrap(),
3157            Signed(15)
3158        );
3159
3160        //Instance b(false) gets new value
3161        //Instance b(true) is spawned
3162        //Timed streams are evaluated
3163        time += Duration::from_millis(500);
3164        accept_input_timed!(eval, in_ref, Signed(42), time);
3165        spawn_stream_timed!(eval, time, b_ref);
3166        assert!(stream_has_instance!(eval, b_ref, vec![Bool(true)]));
3167        eval_stream_instances_timed!(eval, time, b_ref);
3168        assert_eq!(
3169            eval.peek_value(b_ref, &[Bool(false)], 0).unwrap(),
3170            Signed(42)
3171        );
3172        assert_eq!(
3173            eval.peek_value(b_ref, &[Bool(true)], 0).unwrap(),
3174            Signed(42)
3175        );
3176        eval_stream_timed!(eval, c_ref.out_ix(), vec![], time);
3177        assert_eq!(eval.peek_value(c_ref, &[], 0).unwrap(), Signed(57));
3178        eval_stream_timed!(eval, d_ref.out_ix(), vec![], time);
3179        assert_eq!(eval.peek_value(d_ref, &[], 0).unwrap(), Signed(42));
3180
3181        //Instance b(false) gets new value
3182        //Instance b(true) gets new value
3183        //Timed streams are evaluated
3184        time += Duration::from_secs(1);
3185        accept_input_timed!(eval, in_ref, Signed(42), time);
3186        eval_stream_instances_timed!(eval, time, b_ref);
3187        assert_eq!(
3188            eval.peek_value(b_ref, &[Bool(false)], 0).unwrap(),
3189            Signed(42)
3190        );
3191        assert_eq!(
3192            eval.peek_value(b_ref, &[Bool(true)], 0).unwrap(),
3193            Signed(42)
3194        );
3195        eval_stream_timed!(eval, c_ref.out_ix(), vec![], time);
3196        assert_eq!(eval.peek_value(c_ref, &[], 0).unwrap(), Signed(42));
3197        eval_stream_timed!(eval, d_ref.out_ix(), vec![], time);
3198        assert_eq!(eval.peek_value(d_ref, &[], 0).unwrap(), Signed(42));
3199    }
3200
3201    #[test]
3202    fn test_close_parameterized() {
3203        let (_, eval, start) = setup(
3204            "input a: Int32\n\
3205                  input b: Bool\n\
3206                  output c(x: Int32) spawn with a close when b && (x % 2 == 0) eval with x + a",
3207        );
3208        let mut eval = eval.into_evaluator();
3209        let out_ref = StreamReference::Out(0);
3210        let a_ref = StreamReference::In(0);
3211        let b_ref = StreamReference::In(1);
3212        accept_input!(eval, start, a_ref, Signed(15));
3213        spawn_stream!(eval, start, out_ref);
3214
3215        assert!(stream_has_instance!(eval, out_ref, vec![Signed(15)]));
3216
3217        eval_stream_instances!(eval, start, out_ref);
3218        assert_eq!(
3219            eval.peek_value(out_ref, &[Signed(15)], 0).unwrap(),
3220            Signed(30)
3221        );
3222
3223        accept_input!(eval, start, b_ref, Bool(false));
3224        accept_input!(eval, start, a_ref, Signed(8));
3225        spawn_stream!(eval, start, out_ref);
3226
3227        assert!(stream_has_instance!(eval, out_ref, vec![Signed(15)]));
3228        assert!(stream_has_instance!(eval, out_ref, vec![Signed(8)]));
3229
3230        eval_stream_instances!(eval, start, out_ref);
3231        assert_eq!(
3232            eval.peek_value(out_ref, &[Signed(15)], 0).unwrap(),
3233            Signed(23)
3234        );
3235        assert_eq!(
3236            eval.peek_value(out_ref, &[Signed(8)], 0).unwrap(),
3237            Signed(16)
3238        );
3239
3240        // Close has no effect, because it is false
3241        eval_close!(eval, start, out_ref, vec![Signed(15)]);
3242        eval_close!(eval, start, out_ref, vec![Signed(8)]);
3243        assert!(stream_has_instance!(eval, out_ref, vec![Signed(15)]));
3244        assert!(stream_has_instance!(eval, out_ref, vec![Signed(8)]));
3245
3246        accept_input!(eval, start, b_ref, Bool(true));
3247
3248        eval_close!(eval, start, out_ref, vec![Signed(15)]);
3249        eval_close!(eval, start, out_ref, vec![Signed(8)]);
3250        assert!(stream_has_instance!(eval, out_ref, vec![Signed(15)]));
3251        assert!(!stream_has_instance!(eval, out_ref, vec![Signed(8)]));
3252        assert!(eval.peek_value(out_ref, &[Signed(8)], 0).is_none());
3253    }
3254
3255    #[test]
3256    fn test_close_unit() {
3257        let (_, eval, start) = setup(
3258            "input a: Int32\n\
3259                  input b: Bool\n\
3260                  output c spawn when a = 42 close when b eval with a",
3261        );
3262        let mut eval = eval.into_evaluator();
3263        let out_ref = StreamReference::Out(0);
3264        let a_ref = StreamReference::In(0);
3265        let b_ref = StreamReference::In(1);
3266        accept_input!(eval, start, a_ref, Signed(42));
3267        spawn_stream!(eval, start, out_ref);
3268
3269        assert!(stream_has_instance!(eval, out_ref, Vec::<Value>::new()));
3270
3271        eval_stream_instances!(eval, start, out_ref);
3272        assert_eq!(
3273            eval.peek_value(out_ref, &Vec::<Value>::new(), 0).unwrap(),
3274            Signed(42)
3275        );
3276
3277        accept_input!(eval, start, b_ref, Bool(false));
3278        accept_input!(eval, start, a_ref, Signed(8));
3279
3280        eval_stream_instances!(eval, start, out_ref);
3281        assert_eq!(
3282            eval.peek_value(out_ref, &Vec::<Value>::new(), 0).unwrap(),
3283            Signed(8)
3284        );
3285
3286        // Close has no effect, because it is false
3287        eval_close!(eval, start, out_ref, Vec::<Value>::new());
3288        assert!(stream_has_instance!(eval, out_ref, Vec::<Value>::new()));
3289
3290        accept_input!(eval, start, b_ref, Bool(true));
3291
3292        eval_close!(eval, start, out_ref, Vec::<Value>::new());
3293        assert!(!stream_has_instance!(eval, out_ref, Vec::<Value>::new()));
3294        assert!(eval.peek_value(out_ref, &Vec::<Value>::new(), 0).is_none());
3295    }
3296
3297    #[test]
3298    fn test_close_selfref_unit() {
3299        let (_, eval, start) = setup(
3300            "input a: Int32\n\
3301                  output c spawn when a = 42 close when c = 1337 eval with a",
3302        );
3303        let mut eval = eval.into_evaluator();
3304        let out_ref = StreamReference::Out(0);
3305        let a_ref = StreamReference::In(0);
3306        accept_input!(eval, start, a_ref, Signed(42));
3307        spawn_stream!(eval, start, out_ref);
3308
3309        assert!(stream_has_instance!(eval, out_ref, Vec::<Value>::new()));
3310
3311        eval_stream_instances!(eval, start, out_ref);
3312        assert_eq!(
3313            eval.peek_value(out_ref, &Vec::<Value>::new(), 0).unwrap(),
3314            Signed(42)
3315        );
3316
3317        accept_input!(eval, start, a_ref, Signed(1337));
3318
3319        eval_stream_instances!(eval, start, out_ref);
3320        assert_eq!(
3321            eval.peek_value(out_ref, &Vec::<Value>::new(), 0).unwrap(),
3322            Signed(1337)
3323        );
3324
3325        eval_close!(eval, start, out_ref, Vec::<Value>::new());
3326        assert!(!stream_has_instance!(eval, out_ref, Vec::<Value>::new()));
3327        assert!(eval.peek_value(out_ref, &Vec::<Value>::new(), 0).is_none());
3328    }
3329
3330    #[test]
3331    fn test_close_selfref_parameter() {
3332        let (_, eval, start) = setup(
3333            "input a: Int32\n\
3334                  output c(p: Int32) spawn with a close when c(p) = 1337 eval with p+a",
3335        );
3336        let mut eval = eval.into_evaluator();
3337        let out_ref = StreamReference::Out(0);
3338        let a_ref = StreamReference::In(0);
3339        accept_input!(eval, start, a_ref, Signed(15));
3340        spawn_stream!(eval, start, out_ref);
3341
3342        assert!(stream_has_instance!(eval, out_ref, vec![Signed(15)]));
3343
3344        eval_stream_instances!(eval, start, out_ref);
3345        assert_eq!(
3346            eval.peek_value(out_ref, &[Signed(15)], 0).unwrap(),
3347            Signed(30)
3348        );
3349
3350        accept_input!(eval, start, a_ref, Signed(1322));
3351        spawn_stream!(eval, start, out_ref);
3352        assert!(stream_has_instance!(eval, out_ref, vec![Signed(1322)]));
3353
3354        eval_stream_instances!(eval, start, out_ref);
3355        assert_eq!(
3356            eval.peek_value(out_ref, &[Signed(15)], 0).unwrap(),
3357            Signed(1337)
3358        );
3359        assert_eq!(
3360            eval.peek_value(out_ref, &[Signed(1322)], 0).unwrap(),
3361            Signed(2644)
3362        );
3363
3364        eval_close!(eval, start, out_ref, vec![Signed(15)]);
3365        eval_close!(eval, start, out_ref, vec![Signed(1322)]);
3366        assert!(!stream_has_instance!(eval, out_ref, vec![Signed(15)]));
3367        assert!(stream_has_instance!(eval, out_ref, vec![Signed(1322)]));
3368        assert!(eval.peek_value(out_ref, &[Signed(15)], 0).is_none());
3369    }
3370
3371    // p = true -> only gets 42 values
3372    // p = false -> get all remaining values
3373    #[test]
3374    fn test_close_window_parameterized() {
3375        let (_, eval, mut time) = setup_time(
3376            "input a: Int32\n\
3377                  output b(p: Bool) spawn with a == 42  close when b(p) == 1337 eval when !p || a == 42 with a\n\
3378                  output c @1Hz := b(false).aggregate(over: 1s, using: sum)\n\
3379                  output d @1Hz := b(true).aggregate(over: 1s, using: sum)",
3380        );
3381        let mut eval = eval.into_evaluator();
3382        let b_ref = StreamReference::Out(0);
3383        let c_ref = StreamReference::Out(1);
3384        let d_ref = StreamReference::Out(2);
3385        let in_ref = StreamReference::In(0);
3386
3387        //Intance b(false) is spawned
3388        time += Duration::from_millis(500);
3389        eval.new_cycle(time);
3390        accept_input_timed!(eval, in_ref, Signed(15), time);
3391        spawn_stream_timed!(eval, time, b_ref);
3392        assert!(stream_has_instance!(eval, b_ref, vec![Bool(false)]));
3393        assert!(!stream_has_instance!(eval, b_ref, vec![Bool(true)]));
3394        eval_stream_instances_timed!(eval, time, b_ref);
3395        assert_eq!(
3396            eval.peek_value(b_ref, &[Bool(false)], 0).unwrap(),
3397            Signed(15)
3398        );
3399
3400        //Intance b(false) gets new value
3401        //Intance b(true) is spawned
3402        //Timed streams are evaluated
3403        time += Duration::from_millis(500);
3404        eval.new_cycle(time);
3405        accept_input_timed!(eval, in_ref, Signed(42), time);
3406        spawn_stream_timed!(eval, time, b_ref);
3407        assert!(stream_has_instance!(eval, b_ref, vec![Bool(true)]));
3408        eval_stream_instances_timed!(eval, time, b_ref);
3409        assert_eq!(
3410            eval.peek_value(b_ref, &[Bool(false)], 0).unwrap(),
3411            Signed(42)
3412        );
3413        assert_eq!(
3414            eval.peek_value(b_ref, &[Bool(true)], 0).unwrap(),
3415            Signed(42)
3416        );
3417        eval_stream_timed!(eval, c_ref.out_ix(), vec![], time);
3418        assert_eq!(eval.peek_value(c_ref, &[], 0).unwrap(), Signed(57));
3419        eval_stream_timed!(eval, d_ref.out_ix(), vec![], time);
3420        assert_eq!(eval.peek_value(d_ref, &[], 0).unwrap(), Signed(42));
3421
3422        //Intance b(false) gets new value
3423        // Instance b(false) is closed
3424        time += Duration::from_millis(500);
3425        eval.new_cycle(time);
3426        accept_input_timed!(eval, in_ref, Signed(1337), time);
3427        eval.prepare_evaluation(time);
3428        eval_stream_instances_timed!(eval, time, b_ref);
3429        assert_eq!(
3430            eval.peek_value(b_ref, &[Bool(false)], 0).unwrap(),
3431            Signed(1337)
3432        );
3433        assert_eq!(
3434            eval.peek_value(b_ref, &[Bool(true)], 0).unwrap(),
3435            Signed(42)
3436        );
3437        eval_close_timed!(eval, time, b_ref, &vec![Bool(false)]);
3438        eval_close_timed!(eval, time, b_ref, &vec![Bool(true)]);
3439        assert!(!stream_has_instance!(eval, b_ref, vec![Bool(false)]));
3440        assert!(stream_has_instance!(eval, b_ref, vec![Bool(true)]));
3441
3442        //Eval timed streams again
3443        time += Duration::from_millis(500);
3444        eval.new_cycle(time);
3445        eval.prepare_evaluation(time);
3446        eval_stream_timed!(eval, c_ref.out_ix(), vec![], time);
3447        assert_eq!(eval.peek_value(c_ref, &[], 0).unwrap(), Signed(1337));
3448        eval_stream_timed!(eval, d_ref.out_ix(), vec![], time);
3449        assert_eq!(eval.peek_value(d_ref, &[], 0).unwrap(), Signed(0));
3450
3451        //Check window instance is closed
3452        time += Duration::from_millis(550);
3453        eval.new_cycle(time);
3454        eval.prepare_evaluation(time);
3455        let window = eval.ir.sliding_windows[0].reference;
3456        assert!(eval
3457            .global_store
3458            .get_window_collection_mut(window)
3459            .window(&[Bool(false)])
3460            .is_none())
3461    }
3462
3463    #[test]
3464    fn test_close_window_unit() {
3465        let (_, eval, mut time) = setup_time(
3466            "input a: Int32\n\
3467                  output b spawn when a == 42 close when a = 1337 eval with a\n\
3468                  output c @1Hz := b.aggregate(over: 1s, using: sum)",
3469        );
3470        let mut eval = eval.into_evaluator();
3471        let b_ref = StreamReference::Out(0);
3472        let c_ref = StreamReference::Out(1);
3473        let in_ref = StreamReference::In(0);
3474
3475        //Stream is spawned
3476        time += Duration::from_millis(200);
3477        accept_input_timed!(eval, in_ref, Signed(42), time);
3478        spawn_stream_timed!(eval, time, b_ref);
3479        assert!(stream_has_instance!(eval, b_ref, Vec::<Value>::new()));
3480        eval_stream_instances_timed!(eval, time, b_ref);
3481        assert_eq!(eval.peek_value(b_ref, &[], 0).unwrap(), Signed(42));
3482
3483        //Stream gets new value
3484        time += Duration::from_millis(200);
3485        accept_input_timed!(eval, in_ref, Signed(18), time);
3486        eval_stream_instances_timed!(eval, time, b_ref);
3487        assert_eq!(eval.peek_value(b_ref, &[], 0).unwrap(), Signed(18));
3488
3489        //Stream gets new value
3490        time += Duration::from_millis(200);
3491        accept_input_timed!(eval, in_ref, Signed(17), time);
3492        eval_stream_instances_timed!(eval, time, b_ref);
3493        assert_eq!(eval.peek_value(b_ref, &[], 0).unwrap(), Signed(17));
3494
3495        //Stream gets new value. Window is evaluated.
3496        time += Duration::from_millis(400);
3497        accept_input_timed!(eval, in_ref, Signed(3), time);
3498        eval_stream_instances_timed!(eval, time, b_ref);
3499        assert_eq!(eval.peek_value(b_ref, &[], 0).unwrap(), Signed(3));
3500        eval_stream_timed!(eval, c_ref.out_ix(), vec![], time);
3501        assert_eq!(eval.peek_value(c_ref, &[], 0).unwrap(), Signed(80));
3502
3503        //Stream gets new value and is closed
3504        time += Duration::from_millis(500);
3505        accept_input_timed!(eval, in_ref, Signed(1337), time);
3506        eval_stream_instances_timed!(eval, time, b_ref);
3507        assert_eq!(eval.peek_value(b_ref, &[], 0).unwrap(), Signed(1337));
3508        eval_close_timed!(eval, time, b_ref, &vec![]);
3509        assert!(!stream_has_instance!(eval, b_ref, Vec::<Value>::new()));
3510
3511        //Timed streams are evaluated again
3512        time += Duration::from_millis(500);
3513        eval.prepare_evaluation(time);
3514        eval_stream_timed!(eval, c_ref.out_ix(), vec![], time);
3515        assert_eq!(eval.peek_value(c_ref, &[], 0).unwrap(), Signed(1337));
3516    }
3517
3518    #[test]
3519    fn test_optional_tuple_access() {
3520        let (_, eval, start) = setup(
3521            "input a: (UInt, (Bool, Float))\n\
3522             output b := a.offset(by: -1).1.0.defaults(to: false)",
3523        );
3524        let mut eval = eval.into_evaluator();
3525        let out_ref = StreamReference::Out(0);
3526        let a_ref = StreamReference::In(0);
3527        accept_input!(
3528            eval,
3529            start,
3530            a_ref,
3531            Tuple(Box::new([
3532                Unsigned(42),
3533                Tuple(Box::new([Bool(true), Float(NotNan::new(1.5).unwrap())]))
3534            ]))
3535        );
3536
3537        eval_stream_instances!(eval, start, out_ref);
3538        assert_eq!(eval.peek_value(out_ref, &[], 0).unwrap(), Bool(false));
3539
3540        accept_input!(
3541            eval,
3542            start,
3543            a_ref,
3544            Tuple(Box::new([
3545                Unsigned(13),
3546                Tuple(Box::new([Bool(false), Float(NotNan::new(42.0).unwrap())]))
3547            ]))
3548        );
3549
3550        eval_stream_instances!(eval, start, out_ref);
3551        assert_eq!(eval.peek_value(out_ref, &[], 0).unwrap(), Bool(true));
3552    }
3553
3554    #[test]
3555    fn test_instance_aggr_all_all() {
3556        let (_, eval, start) = setup(
3557            "input a: Int32\n\
3558                  output b(x: Int32) \
3559                    spawn with a \
3560                    eval @a with x % 2 = 0 \
3561                    close when a == 42 \
3562                  output c @a := b.aggregate(over_instances: all, using: forall)",
3563        );
3564        let mut eval = eval.into_evaluator();
3565        let b_ref = StreamReference::Out(0);
3566        let c_ref = StreamReference::Out(1);
3567        let in_ref = StreamReference::In(0);
3568        accept_input!(eval, start, in_ref, Signed(16));
3569        spawn_stream!(eval, start, b_ref);
3570        eval_stream_instances!(eval, start, b_ref);
3571        eval_stream_instances!(eval, start, c_ref);
3572
3573        assert!(stream_has_instance!(eval, b_ref, vec![Signed(16)]));
3574
3575        assert_eq!(
3576            eval.peek_value(b_ref, &[Signed(16)], 0).unwrap(),
3577            Bool(true)
3578        );
3579        assert_eq!(eval.peek_value(c_ref, &[], 0).unwrap(), Bool(true));
3580
3581        let mut time = start + Duration::from_secs(1);
3582
3583        accept_input!(eval, time, in_ref, Signed(7));
3584        spawn_stream!(eval, time, b_ref);
3585        eval_stream_instances!(eval, time, b_ref);
3586        eval_stream_instances!(eval, time, c_ref);
3587
3588        assert!(stream_has_instance!(eval, b_ref, vec![Signed(7)]));
3589
3590        assert_eq!(
3591            eval.peek_value(b_ref, &[Signed(16)], 0).unwrap(),
3592            Bool(true)
3593        );
3594        assert_eq!(
3595            eval.peek_value(b_ref, &[Signed(7)], 0).unwrap(),
3596            Bool(false)
3597        );
3598        assert_eq!(eval.peek_value(c_ref, &[], 0).unwrap(), Bool(false));
3599
3600        time += Duration::from_secs(1);
3601
3602        accept_input!(eval, time, in_ref, Signed(42));
3603        spawn_stream!(eval, time, b_ref);
3604        eval_stream_instances!(eval, time, b_ref);
3605        eval_stream_instances!(eval, time, c_ref);
3606
3607        assert!(stream_has_instance!(eval, b_ref, vec![Signed(42)]));
3608
3609        assert_eq!(
3610            eval.peek_value(b_ref, &[Signed(16)], 0).unwrap(),
3611            Bool(true)
3612        );
3613        assert_eq!(
3614            eval.peek_value(b_ref, &[Signed(7)], 0).unwrap(),
3615            Bool(false)
3616        );
3617        assert_eq!(
3618            eval.peek_value(b_ref, &[Signed(42)], 0).unwrap(),
3619            Bool(true)
3620        );
3621        assert_eq!(eval.peek_value(c_ref, &[], 0).unwrap(), Bool(false));
3622
3623        eval_close!(eval, time, b_ref, vec![Signed(16)]);
3624        eval_close!(eval, time, b_ref, vec![Signed(7)]);
3625        eval_close!(eval, time, b_ref, vec![Signed(42)]);
3626
3627        time += Duration::from_secs(1);
3628
3629        accept_input!(eval, time, in_ref, Signed(16));
3630        spawn_stream!(eval, time, b_ref);
3631        eval_stream_instances!(eval, time, b_ref);
3632        eval_stream_instances!(eval, time, c_ref);
3633
3634        assert!(stream_has_instance!(eval, b_ref, vec![Signed(16)]));
3635
3636        assert_eq!(
3637            eval.peek_value(b_ref, &[Signed(16)], 0).unwrap(),
3638            Bool(true)
3639        );
3640        assert_eq!(eval.peek_value(c_ref, &[], 0).unwrap(), Bool(true));
3641    }
3642
3643    #[test]
3644    fn test_instance_aggr_all_any() {
3645        let (_, eval, start) = setup(
3646            "input a: Int32\n\
3647                  output b(x: Int32) \
3648                    spawn with a \
3649                    eval @a with x % 2 = 0 \
3650                    close when a == 42 \
3651                  output c @a := b.aggregate(over_instances: all, using: exists)",
3652        );
3653        let mut eval = eval.into_evaluator();
3654        let b_ref = StreamReference::Out(0);
3655        let c_ref = StreamReference::Out(1);
3656        let in_ref = StreamReference::In(0);
3657        accept_input!(eval, start, in_ref, Signed(17));
3658        spawn_stream!(eval, start, b_ref);
3659        eval_stream_instances!(eval, start, b_ref);
3660        eval_stream_instances!(eval, start, c_ref);
3661
3662        assert!(stream_has_instance!(eval, b_ref, vec![Signed(17)]));
3663
3664        assert_eq!(
3665            eval.peek_value(b_ref, &[Signed(17)], 0).unwrap(),
3666            Bool(false)
3667        );
3668        assert_eq!(eval.peek_value(c_ref, &[], 0).unwrap(), Bool(false));
3669
3670        let mut time = start + Duration::from_secs(1);
3671
3672        accept_input!(eval, time, in_ref, Signed(6));
3673        spawn_stream!(eval, time, b_ref);
3674        eval_stream_instances!(eval, time, b_ref);
3675        eval_stream_instances!(eval, time, c_ref);
3676
3677        assert!(stream_has_instance!(eval, b_ref, vec![Signed(6)]));
3678
3679        assert_eq!(
3680            eval.peek_value(b_ref, &[Signed(17)], 0).unwrap(),
3681            Bool(false)
3682        );
3683        assert_eq!(eval.peek_value(b_ref, &[Signed(6)], 0).unwrap(), Bool(true));
3684        assert_eq!(eval.peek_value(c_ref, &[], 0).unwrap(), Bool(true));
3685
3686        time += Duration::from_secs(1);
3687
3688        accept_input!(eval, time, in_ref, Signed(42));
3689        spawn_stream!(eval, time, b_ref);
3690        eval_stream_instances!(eval, time, b_ref);
3691        eval_stream_instances!(eval, time, c_ref);
3692
3693        assert!(stream_has_instance!(eval, b_ref, vec![Signed(42)]));
3694
3695        assert_eq!(
3696            eval.peek_value(b_ref, &[Signed(17)], 0).unwrap(),
3697            Bool(false)
3698        );
3699        assert_eq!(eval.peek_value(b_ref, &[Signed(6)], 0).unwrap(), Bool(true));
3700        assert_eq!(
3701            eval.peek_value(b_ref, &[Signed(42)], 0).unwrap(),
3702            Bool(true)
3703        );
3704        assert_eq!(eval.peek_value(c_ref, &[], 0).unwrap(), Bool(true));
3705
3706        eval_close!(eval, time, b_ref, vec![Signed(17)]);
3707        eval_close!(eval, time, b_ref, vec![Signed(6)]);
3708        eval_close!(eval, time, b_ref, vec![Signed(42)]);
3709
3710        time += Duration::from_secs(1);
3711
3712        accept_input!(eval, time, in_ref, Signed(17));
3713        spawn_stream!(eval, time, b_ref);
3714        eval_stream_instances!(eval, time, b_ref);
3715        eval_stream_instances!(eval, time, c_ref);
3716
3717        assert!(stream_has_instance!(eval, b_ref, vec![Signed(17)]));
3718
3719        assert_eq!(
3720            eval.peek_value(b_ref, &[Signed(17)], 0).unwrap(),
3721            Bool(false)
3722        );
3723        assert_eq!(eval.peek_value(c_ref, &[], 0).unwrap(), Bool(false));
3724    }
3725
3726    #[test]
3727    fn test_instance_aggr_fresh_all() {
3728        let (_, eval, start) = setup(
3729            "input a: Int32\n\
3730                   input i: Int32\n\
3731                  output b(x: Int32)\n\
3732                    spawn with a\n\
3733                    eval when (x + i) % 2 = 0 with x > 10\n\
3734                    close when a == 42\n\
3735                  output c := b.aggregate(over_instances: fresh, using: forall)",
3736        );
3737        let mut eval = eval.into_evaluator();
3738        let b_ref = StreamReference::Out(0);
3739        let c_ref = StreamReference::Out(1);
3740        let a_ref = StreamReference::In(0);
3741        let i_ref = StreamReference::In(1);
3742        accept_input!(eval, start, a_ref, Signed(16));
3743        accept_input!(eval, start, i_ref, Signed(0));
3744        spawn_stream!(eval, start, b_ref);
3745        eval_stream_instances!(eval, start, b_ref);
3746        eval_stream_instances!(eval, start, c_ref);
3747
3748        assert!(stream_has_instance!(eval, b_ref, vec![Signed(16)]));
3749
3750        assert_eq!(
3751            eval.peek_value(b_ref, &[Signed(16)], 0).unwrap(),
3752            Bool(true)
3753        );
3754        assert_eq!(eval.peek_value(c_ref, &[], 0).unwrap(), Bool(true));
3755
3756        let mut time = start + Duration::from_secs(1);
3757        eval.new_cycle(time);
3758
3759        accept_input!(eval, time, a_ref, Signed(6));
3760        accept_input!(eval, time, i_ref, Signed(0));
3761        spawn_stream!(eval, time, b_ref);
3762        eval_stream_instances!(eval, time, b_ref);
3763        eval_stream_instances!(eval, time, c_ref);
3764
3765        assert!(stream_has_instance!(eval, b_ref, vec![Signed(6)]));
3766
3767        assert_eq!(
3768            eval.peek_value(b_ref, &[Signed(16)], 0).unwrap(),
3769            Bool(true)
3770        );
3771        assert_eq!(
3772            eval.peek_value(b_ref, &[Signed(6)], 0).unwrap(),
3773            Bool(false)
3774        );
3775        assert_eq!(eval.peek_value(c_ref, &[], 0).unwrap(), Bool(false));
3776
3777        time += Duration::from_secs(1);
3778        eval.new_cycle(time);
3779
3780        accept_input!(eval, time, a_ref, Signed(11));
3781        accept_input!(eval, time, i_ref, Signed(1));
3782        spawn_stream!(eval, time, b_ref);
3783        eval_stream_instances!(eval, time, b_ref);
3784        eval_stream_instances!(eval, time, c_ref);
3785
3786        assert!(stream_has_instance!(eval, b_ref, vec![Signed(11)]));
3787
3788        assert_eq!(
3789            eval.peek_value(b_ref, &[Signed(16)], 0).unwrap(),
3790            Bool(true)
3791        );
3792        assert_eq!(
3793            eval.peek_value(b_ref, &[Signed(6)], 0).unwrap(),
3794            Bool(false)
3795        );
3796        assert_eq!(
3797            eval.peek_value(b_ref, &[Signed(11)], 0).unwrap(),
3798            Bool(true)
3799        );
3800        assert_eq!(eval.peek_value(c_ref, &[], 0).unwrap(), Bool(true));
3801
3802        time += Duration::from_secs(1);
3803        eval.new_cycle(time);
3804
3805        accept_input!(eval, time, a_ref, Signed(42));
3806        accept_input!(eval, time, i_ref, Signed(1));
3807        spawn_stream!(eval, time, b_ref);
3808        eval_stream_instances!(eval, time, b_ref);
3809        eval_stream_instances!(eval, time, c_ref);
3810
3811        assert!(stream_has_instance!(eval, b_ref, vec![Signed(42)]));
3812
3813        assert_eq!(
3814            eval.peek_value(b_ref, &[Signed(16)], 0).unwrap(),
3815            Bool(true)
3816        );
3817        assert_eq!(
3818            eval.peek_value(b_ref, &[Signed(6)], 0).unwrap(),
3819            Bool(false)
3820        );
3821        assert_eq!(
3822            eval.peek_value(b_ref, &[Signed(11)], 0).unwrap(),
3823            Bool(true)
3824        );
3825        assert!(eval.peek_value(b_ref, &[Signed(42)], 0).is_none());
3826        assert_eq!(eval.peek_value(c_ref, &[], 0).unwrap(), Bool(true));
3827
3828        eval_close!(eval, time, b_ref, vec![Signed(16)]);
3829        eval_close!(eval, time, b_ref, vec![Signed(6)]);
3830        eval_close!(eval, time, b_ref, vec![Signed(11)]);
3831        eval_close!(eval, time, b_ref, vec![Signed(42)]);
3832
3833        time += Duration::from_secs(1);
3834        eval.new_cycle(time);
3835
3836        accept_input!(eval, time, a_ref, Signed(4));
3837        accept_input!(eval, time, i_ref, Signed(0));
3838        spawn_stream!(eval, time, b_ref);
3839        eval_stream_instances!(eval, time, b_ref);
3840        eval_stream_instances!(eval, time, c_ref);
3841
3842        assert!(stream_has_instance!(eval, b_ref, vec![Signed(4)]));
3843
3844        assert_eq!(
3845            eval.peek_value(b_ref, &[Signed(4)], 0).unwrap(),
3846            Bool(false)
3847        );
3848        assert_eq!(eval.peek_value(c_ref, &[], 0).unwrap(), Bool(false));
3849    }
3850
3851    #[test]
3852    fn test_multiple_eval_clauses() {
3853        let (_, eval, start) = setup(
3854            "input a : UInt64\n\
3855            output b
3856                eval @a when a < 10 with a + 1
3857                eval @a when a < 20 with a + 2
3858                eval @a with a + 3",
3859        );
3860        let mut eval = eval.into_evaluator();
3861        let out_ref = StreamReference::Out(0);
3862        let a_ref = StreamReference::In(0);
3863
3864        accept_input!(eval, start, a_ref, Unsigned(0));
3865        eval_stream_instances!(eval, start, out_ref);
3866        assert_eq!(eval.peek_value(out_ref, &[], 0).unwrap(), Unsigned(1));
3867        accept_input!(eval, start, a_ref, Unsigned(12));
3868        eval_stream_instances!(eval, start, out_ref);
3869        assert_eq!(eval.peek_value(out_ref, &[], 0).unwrap(), Unsigned(14));
3870        accept_input!(eval, start, a_ref, Unsigned(20));
3871        eval_stream_instances!(eval, start, out_ref);
3872        assert_eq!(eval.peek_value(out_ref, &[], 0).unwrap(), Unsigned(23));
3873    }
3874
3875    #[test]
3876    fn test_multiple_eval_clauses_no_filter() {
3877        let (_, eval, start) = setup(
3878            "input a : UInt64\n\
3879            output b
3880                eval @a with a + 1
3881                eval @a when a < 20 with a + 2",
3882        );
3883        let mut eval = eval.into_evaluator();
3884        let out_ref = StreamReference::Out(0);
3885        let a_ref = StreamReference::In(0);
3886
3887        accept_input!(eval, start, a_ref, Unsigned(0));
3888        eval_stream_instances!(eval, start, out_ref);
3889        assert_eq!(eval.peek_value(out_ref, &[], 0).unwrap(), Unsigned(1));
3890        accept_input!(eval, start, a_ref, Unsigned(12));
3891        eval_stream_instances!(eval, start, out_ref);
3892        assert_eq!(eval.peek_value(out_ref, &[], 0).unwrap(), Unsigned(13));
3893    }
3894
3895    #[test]
3896    fn test_multiple_eval_clauses_different_ac() {
3897        let (_, eval, start) = setup(
3898            "input a : UInt64\ninput b : UInt64\n\
3899            output c : UInt64
3900                eval @a&&b with 1
3901                eval @a with 2",
3902        );
3903        let mut eval = eval.into_evaluator();
3904        let out_ref = StreamReference::Out(0);
3905        let a_ref = StreamReference::In(0);
3906        let b_ref = StreamReference::In(1);
3907
3908        accept_input!(eval, start, a_ref, Unsigned(0));
3909        eval_stream_instances!(eval, start, out_ref);
3910        assert_eq!(eval.peek_value(out_ref, &[], 0).unwrap(), Unsigned(2));
3911        accept_input!(eval, start, a_ref, Unsigned(1));
3912        accept_input!(eval, start, b_ref, Unsigned(1));
3913        eval_stream_instances!(eval, start, out_ref);
3914        assert_eq!(eval.peek_value(out_ref, &[], 0).unwrap(), Unsigned(1));
3915    }
3916
3917    #[test]
3918    fn test_multiple_eval_clauses_ac_and_filter() {
3919        let (_, eval, start) = setup(
3920            "input a : UInt64\ninput b : UInt64\n\
3921            output c : UInt64
3922                eval @a&&b when b < 10 with 1
3923                eval @a&&b with 3
3924                eval @a when a < 10 with 2
3925                eval @a with 4",
3926        );
3927        let mut eval = eval.into_evaluator();
3928        let out_ref = StreamReference::Out(0);
3929        let a_ref = StreamReference::In(0);
3930        let b_ref = StreamReference::In(1);
3931
3932        accept_input!(eval, start, a_ref, Unsigned(11));
3933        eval_stream_instances!(eval, start, out_ref);
3934        assert_eq!(eval.peek_value(out_ref, &[], 0).unwrap(), Unsigned(4));
3935
3936        accept_input!(eval, start, a_ref, Unsigned(0));
3937        eval_stream_instances!(eval, start, out_ref);
3938        assert_eq!(eval.peek_value(out_ref, &[], 0).unwrap(), Unsigned(2));
3939
3940        accept_input!(eval, start, a_ref, Unsigned(1));
3941        accept_input!(eval, start, b_ref, Unsigned(11));
3942        eval_stream_instances!(eval, start, out_ref);
3943        assert_eq!(eval.peek_value(out_ref, &[], 0).unwrap(), Unsigned(3));
3944
3945        accept_input!(eval, start, a_ref, Unsigned(1));
3946        accept_input!(eval, start, b_ref, Unsigned(1));
3947        eval_stream_instances!(eval, start, out_ref);
3948        assert_eq!(eval.peek_value(out_ref, &[], 0).unwrap(), Unsigned(1));
3949    }
3950
3951    // Corresponds to issue #18
3952    #[test]
3953    fn test_parameterized_window_bug() {
3954        let (_, eval, mut time) = setup(
3955            "input a : Int64
3956                output b(p1)
3957                    spawn with a
3958                    eval when a == p1 with true
3959                output c(p1)
3960                    spawn with a
3961                    eval @1Hz with b(p1).aggregate(over: 10s, using: count)
3962                ",
3963        );
3964        let mut eval = eval.into_evaluator();
3965        let b_ref = StreamReference::Out(0);
3966        let c_ref = StreamReference::Out(1);
3967        let mut tracer = NoTracer::default();
3968
3969        time += Duration::from_millis(1000);
3970        eval.eval_event(&[Signed(1)], time, &mut tracer);
3971        assert!(stream_has_instance!(eval, b_ref, vec![Signed(1)]));
3972        assert!(stream_has_instance!(eval, c_ref, vec![Signed(1)]));
3973        assert_eq!(eval.peek_value(b_ref, &[Signed(1)], 0).unwrap(), Bool(true));
3974
3975        time += Duration::from_millis(1000);
3976        eval.eval_time_driven_tasks(
3977            vec![EvaluationTask::Evaluate(c_ref.out_ix(), vec![Signed(1)])],
3978            time,
3979            &mut tracer,
3980        );
3981        eval.eval_event(&[Signed(1)], time, &mut tracer);
3982        assert_eq!(eval.peek_value(b_ref, &[Signed(1)], 0).unwrap(), Bool(true));
3983        assert_eq!(
3984            eval.peek_value(c_ref, &[Signed(1)], 0).unwrap(),
3985            Unsigned(1)
3986        );
3987
3988        time += Duration::from_millis(1000);
3989        eval.eval_time_driven_tasks(
3990            vec![EvaluationTask::Evaluate(c_ref.out_ix(), vec![Signed(1)])],
3991            time,
3992            &mut tracer,
3993        );
3994        assert_eq!(
3995            eval.peek_value(c_ref, &[Signed(1)], 0).unwrap(),
3996            Unsigned(2)
3997        );
3998
3999        // This test is correct and shows the inteded behavior. Yet it is confusing as inputs coincide with periods.
4000        /*
4001         The following is the order of events that take place:
4002         1s: a = 1 -> b spawns -> c spawns -> the windows is created -> b = true -> window = 1
4003         2s: c = 1 -> a = 1 -> b = true -> window = 2
4004         3s: c = 2
4005        !!! Remember that in case that an input coincides with a deadline, the deadline is evaluated first, hence the confusing order of events at time 2s
4006         */
4007    }
4008
4009    // Corresponds to issue #19
4010    #[test]
4011    fn test_parameterized_window_spawn_bug() {
4012        let (_, eval, mut time) = setup(
4013            "input a : Int
4014                       output b spawn @a eval @0.5s with a.aggregate(over: 5s, using: count)",
4015        );
4016        let mut eval = eval.into_evaluator();
4017        let b_ref = StreamReference::Out(0);
4018        let mut tracer = NoTracer::default();
4019
4020        // A receives a value and b is spawned
4021        // The windows over a is spawned as well and should already contain the current value of a
4022        time += Duration::from_millis(1000);
4023        eval.eval_event(&[Signed(1)], time, &mut tracer);
4024        assert!(stream_has_instance!(eval, b_ref, Vec::<Value>::new()));
4025
4026        // b evaluates and the window contains one value. (The value produces by 'a' at time 1)
4027        time += Duration::from_millis(500);
4028        eval.eval_time_driven_tasks(
4029            vec![EvaluationTask::Evaluate(b_ref.out_ix(), vec![Signed(1)])],
4030            time,
4031            &mut tracer,
4032        );
4033        assert_eq!(
4034            eval.peek_value(b_ref, &[Signed(1)], 0).unwrap(),
4035            Unsigned(1)
4036        );
4037    }
4038
4039    // Corresponds to issue #20
4040    #[test]
4041    fn test_parameterized_window_global_freq() {
4042        let (_, eval, mut time) = setup(
4043            "input page_id: UInt
4044                    output page_id_visits(pid)
4045                      spawn with page_id
4046                      eval when pid == page_id
4047
4048                    output visits_per_day(pid)
4049                      spawn with page_id
4050                      eval @Global(1h) with page_id_visits(pid).aggregate(over: 1h, using: count)",
4051        );
4052        let mut eval = eval.into_evaluator();
4053        let visits = StreamReference::Out(0);
4054        let avg = StreamReference::Out(1);
4055        let mut tracer = NoTracer::default();
4056
4057        eval.eval_event(&[Signed(1)], time, &mut tracer);
4058        assert!(stream_has_instance!(eval, visits, vec![Signed(1)]));
4059        assert!(stream_has_instance!(eval, avg, vec![Signed(1)]));
4060
4061        time += Duration::from_millis(1000);
4062        eval.eval_event(&[Signed(2)], time, &mut tracer);
4063        assert!(stream_has_instance!(eval, visits, vec![Signed(2)]));
4064        assert!(stream_has_instance!(eval, avg, vec![Signed(2)]));
4065
4066        time += Duration::from_millis(1000);
4067        eval.eval_event(&[Signed(3)], time, &mut tracer);
4068        assert!(stream_has_instance!(eval, visits, vec![Signed(3)]));
4069        assert!(stream_has_instance!(eval, avg, vec![Signed(3)]));
4070
4071        time += Duration::from_millis(1000);
4072        eval.eval_event(&[Signed(5)], time, &mut tracer);
4073        assert!(stream_has_instance!(eval, visits, vec![Signed(5)]));
4074        assert!(stream_has_instance!(eval, avg, vec![Signed(5)]));
4075
4076        time += Duration::from_millis(1000);
4077        eval.eval_event(&[Signed(3)], time, &mut tracer);
4078        assert!(stream_has_instance!(eval, visits, vec![Signed(3)]));
4079        assert!(stream_has_instance!(eval, avg, vec![Signed(3)]));
4080
4081        time += Duration::from_millis(1000);
4082        eval.eval_event(&[Signed(1)], time, &mut tracer);
4083        assert!(stream_has_instance!(eval, visits, vec![Signed(1)]));
4084        assert!(stream_has_instance!(eval, avg, vec![Signed(1)]));
4085
4086        time = Duration::from_secs(3600);
4087        eval.eval_time_driven_tasks(
4088            vec![
4089                EvaluationTask::Evaluate(avg.out_ix(), vec![Signed(1)]),
4090                EvaluationTask::Evaluate(avg.out_ix(), vec![Signed(2)]),
4091                EvaluationTask::Evaluate(avg.out_ix(), vec![Signed(3)]),
4092                EvaluationTask::Evaluate(avg.out_ix(), vec![Signed(5)]),
4093            ],
4094            time,
4095            &mut tracer,
4096        );
4097        assert_eq!(eval.peek_value(avg, &[Signed(1)], 0).unwrap(), Unsigned(1));
4098        assert_eq!(eval.peek_value(avg, &[Signed(2)], 0).unwrap(), Unsigned(1));
4099        assert_eq!(eval.peek_value(avg, &[Signed(3)], 0).unwrap(), Unsigned(2));
4100        assert_eq!(eval.peek_value(avg, &[Signed(5)], 0).unwrap(), Unsigned(1));
4101    }
4102
4103    #[test]
4104    fn filtered_instance_aggregation() {
4105        let (_, eval, mut time) = setup(
4106            "input a:  Int64
4107        input b : Int64
4108        output c(p) spawn with a
4109        eval @a with c(p).offset(by: -1).defaults(to: 0) + 1
4110        output d eval @a&&b with c.aggregate(over_instances: all(where: (p) => p > b), using: count)",
4111        );
4112
4113        let mut eval = eval.into_evaluator();
4114        let mut tracer = NoTracer::default();
4115
4116        let d_ref = StreamReference::Out(1);
4117
4118        eval.eval_event(&[Signed(1), Signed(2)], time, &mut tracer);
4119        assert_eq!(eval.peek_value(d_ref, &[], 0).unwrap(), Unsigned(0));
4120        time += Duration::from_secs(1);
4121        eval.eval_event(&[Signed(3), Signed(2)], time, &mut tracer);
4122        assert_eq!(eval.peek_value(d_ref, &[], 0).unwrap(), Unsigned(1));
4123        time += Duration::from_secs(1);
4124        eval.eval_event(&[Signed(5), Signed(0)], time, &mut tracer);
4125        assert_eq!(eval.peek_value(d_ref, &[], 0).unwrap(), Unsigned(3));
4126    }
4127
4128    #[test]
4129    fn instance_aggregation_grouping() {
4130        let (_, eval, mut time) = setup(
4131            "input a:  Int64
4132        input b : Int64
4133        output c(p1,p2)
4134            spawn with (a,b)
4135            eval when a == p1 && b == p2 with p1 + p2
4136        output d(p)
4137            spawn with a
4138            eval @true with c.aggregate(over_instances: all(where: (p1,p2) => p1==p), using: sum)",
4139        );
4140
4141        let mut eval = eval.into_evaluator();
4142        let mut tracer = NoTracer::default();
4143
4144        let d_ref = StreamReference::Out(1);
4145
4146        eval.eval_event(&[Signed(1), Signed(2)], time, &mut tracer);
4147        time += Duration::from_secs(1);
4148        eval.eval_event(&[Signed(2), Signed(3)], time, &mut tracer);
4149        time += Duration::from_secs(1);
4150        eval.eval_event(&[Signed(1), Signed(5)], time, &mut tracer);
4151        time += Duration::from_secs(1);
4152        eval.eval_event(&[Signed(4), Signed(2)], time, &mut tracer);
4153        time += Duration::from_secs(1);
4154        eval.eval_event(&[Signed(4), Signed(1)], time, &mut tracer);
4155        time += Duration::from_secs(1);
4156        eval.eval_event(&[Signed(2), Signed(5)], time, &mut tracer);
4157        time += Duration::from_secs(1);
4158        eval.eval_event(&[Signed(2), Signed(4)], time, &mut tracer);
4159        assert_eq!(eval.peek_value(d_ref, &[Signed(1)], 0).unwrap(), Signed(9));
4160        assert_eq!(eval.peek_value(d_ref, &[Signed(2)], 0).unwrap(), Signed(18));
4161        assert_eq!(eval.peek_value(d_ref, &[Signed(4)], 0).unwrap(), Signed(11));
4162    }
4163}