1use std::cell::RefCell;
2use std::collections::HashMap;
3use std::ops::Not;
4use std::rc::Rc;
5use std::time::Duration;
6
7use bit_set::BitSet;
8use itertools::Itertools;
9use num::traits::Inv;
10use rtlola_frontend::mir::{
11 ActivationCondition as Activation, InputReference, OutputKind, OutputReference, PacingLocality,
12 PacingType, RtLolaMir, Stream, StreamReference, Task, TimeDrivenStream, Trigger,
13 TriggerReference, WindowReference,
14};
15use uom::si::rational64::Time as UOM_Time;
16use uom::si::time::nanosecond;
17
18use crate::api::monitor::{Change, Instance};
19use crate::closuregen::{CompiledExpr, Expr};
20use crate::monitor::{Parameters, Tracer};
21use crate::schedule::{DynamicSchedule, EvaluationTask};
22use crate::storage::{
23 GlobalStore, InstanceAggregationTrait, InstanceStore, Value, WindowParameterization,
24 WindowParameterizationKind,
25};
26use crate::Time;
27
28#[derive(Debug)]
30pub(crate) enum ActivationConditionOp {
31 TimeDriven,
32 True,
33 Conjunction(BitSet),
34 General(Activation),
35}
36
37pub(crate) struct EvaluatorData {
38 layers: Vec<Vec<Task>>,
40 stream_activation_conditions: Vec<ActivationConditionOp>,
42 spawn_activation_conditions: Vec<ActivationConditionOp>,
43 close_activation_conditions: Vec<ActivationConditionOp>,
44 stream_windows: HashMap<StreamReference, Vec<WindowReference>>,
45 stream_instance_aggregations: HashMap<StreamReference, Vec<WindowReference>>,
46 global_store: GlobalStore,
47 fresh_inputs: BitSet,
48 fresh_outputs: BitSet,
49 spawned_outputs: BitSet,
50 closed_outputs: BitSet,
51 fresh_triggers: BitSet,
52 triggers: Vec<Option<Trigger>>,
53 time_driven_streams: Vec<Option<TimeDrivenStream>>,
54 closing_streams: Vec<OutputReference>,
55 ir: RtLolaMir,
56 dyn_schedule: Rc<RefCell<DynamicSchedule>>,
57}
58
59#[allow(missing_debug_implementations)]
60pub(crate) struct Evaluator {
61 layers: &'static [Vec<Task>],
63 stream_activation_conditions: &'static [ActivationConditionOp],
65 spawn_activation_conditions: &'static [ActivationConditionOp],
67 close_activation_conditions: &'static [ActivationConditionOp],
69 compiled_stream_exprs: Vec<CompiledExpr>,
72 compiled_spawn_exprs: Vec<CompiledExpr>,
76 compiled_close_exprs: Vec<CompiledExpr>,
78 stream_windows: &'static HashMap<StreamReference, Vec<WindowReference>>,
79 stream_instance_aggregations: &'static HashMap<StreamReference, Vec<WindowReference>>,
80 global_store: &'static mut GlobalStore,
81 fresh_inputs: &'static mut BitSet,
82 fresh_outputs: &'static mut BitSet,
83 spawned_outputs: &'static mut BitSet,
84 closed_outputs: &'static mut BitSet,
85 fresh_triggers: &'static mut BitSet,
86 triggers: &'static [Option<Trigger>],
88 time_driven_streams: &'static [Option<TimeDrivenStream>],
90
91 closing_streams: &'static [OutputReference],
92 ir: &'static RtLolaMir,
93 dyn_schedule: &'static RefCell<DynamicSchedule>,
94 raw_data: *mut EvaluatorData,
95}
96
97pub(crate) struct EvaluationContext<'e> {
98 ts: Time,
99 global_store: &'e GlobalStore,
100 fresh_inputs: &'e BitSet,
101 fresh_outputs: &'e BitSet,
102 pub(crate) parameter: &'e [Value],
103 pub(crate) lambda_parameter: Option<&'e [Value]>,
104}
105
106impl EvaluatorData {
107 pub(crate) fn new(ir: RtLolaMir, dyn_schedule: Rc<RefCell<DynamicSchedule>>) -> Self {
108 let layers: Vec<Vec<Task>> = ir.get_event_driven_layers();
110 let closing_streams = ir
111 .outputs
112 .iter()
113 .filter(|s| s.close.condition.is_some())
114 .map(|s| s.reference.out_ix())
115 .collect();
116 let stream_acs = ir
117 .outputs
118 .iter()
119 .map(|o| match &o.eval.eval_pacing {
120 PacingType::GlobalPeriodic(_) | PacingType::LocalPeriodic(_) => {
121 ActivationConditionOp::TimeDriven
122 }
123 PacingType::Event(ac) => ActivationConditionOp::new(ac, ir.inputs.len()),
124 PacingType::Constant => ActivationConditionOp::True,
125 })
126 .collect();
127 let spawn_acs = ir
128 .outputs
129 .iter()
130 .map(|o| match &o.spawn.pacing {
131 PacingType::GlobalPeriodic(_) | PacingType::LocalPeriodic(_) => {
132 ActivationConditionOp::TimeDriven
133 }
134 PacingType::Event(ac) => ActivationConditionOp::new(ac, ir.inputs.len()),
135 PacingType::Constant => ActivationConditionOp::True,
136 })
137 .collect();
138 let close_acs = ir
139 .outputs
140 .iter()
141 .map(|o| match &o.close.pacing {
142 PacingType::GlobalPeriodic(_) | PacingType::LocalPeriodic(_) => {
143 ActivationConditionOp::TimeDriven
144 }
145 PacingType::Event(ac) => ActivationConditionOp::new(ac, ir.inputs.len()),
146 PacingType::Constant => ActivationConditionOp::True,
147 })
148 .collect();
149
150 let global_store = GlobalStore::new(&ir);
151 let fresh_inputs = BitSet::with_capacity(ir.inputs.len());
152 let fresh_outputs = BitSet::with_capacity(ir.outputs.len());
153 let spawned_outputs = BitSet::with_capacity(ir.outputs.len());
154 let closed_outputs = BitSet::with_capacity(ir.outputs.len());
155 let fresh_triggers = BitSet::with_capacity(ir.outputs.len());
156 let mut triggers = vec![None; ir.outputs.len()];
157
158 let stream_windows = ir
159 .sliding_windows
160 .iter()
161 .map(|w| (w.caller, w.reference))
162 .chain(ir.discrete_windows.iter().map(|w| (w.caller, w.reference)))
163 .into_group_map();
164
165 let stream_instance_aggregations = ir
166 .instance_aggregations
167 .iter()
168 .map(|ia| (ia.target, ia.reference))
169 .into_group_map();
170
171 for t in &ir.triggers {
172 triggers[t.output_reference.out_ix()] = Some(*t);
173 }
174 let mut time_driven_streams = vec![None; ir.outputs.len()];
175 for t in &ir.time_driven {
176 time_driven_streams[t.reference.out_ix()] = Some(*t);
177 }
178 EvaluatorData {
179 layers,
180 stream_activation_conditions: stream_acs,
181 spawn_activation_conditions: spawn_acs,
182 close_activation_conditions: close_acs,
183 stream_windows,
184 stream_instance_aggregations,
185 global_store,
186 fresh_inputs,
187 fresh_outputs,
188 spawned_outputs,
189 closed_outputs,
190 fresh_triggers,
191 triggers,
192 time_driven_streams,
193 closing_streams,
194 ir,
195 dyn_schedule,
196 }
197 }
198
199 pub(crate) fn into_evaluator(self) -> Evaluator {
200 let mut on_heap = Box::new(self);
201 let heap_ptr: *mut EvaluatorData = &mut *on_heap;
204 let leaked_data: &'static mut EvaluatorData = Box::leak(on_heap);
205
206 let compiled_stream_exprs = leaked_data
208 .ir
209 .outputs
210 .iter()
211 .map(|o| {
212 let clauses = o
213 .eval
214 .clauses
215 .iter()
216 .map(|clause| {
217 let exp = match &clause.condition {
218 None => clause.expression.clone().compile(),
219 Some(filter_exp) => CompiledExpr::create_filter(
220 filter_exp.clone().compile(),
221 clause.expression.clone().compile(),
222 ),
223 };
224 if clause.pacing != o.eval.eval_pacing {
225 if let PacingType::Event(ac) = &clause.pacing {
226 CompiledExpr::create_activation(exp, ActivationConditionOp::new(ac, leaked_data.ir.inputs.len()))
227 } else {
228 unreachable!("different pacing types of multiple eval clauses are only supported for event-driven streams. This is ensured by the frontend.")
229 }
230 } else {
231 exp
232 }
233 })
234 .collect::<Vec<_>>();
235 if clauses.len() == 1 {
236 clauses.into_iter().next().expect("has exactly one element")
237 } else {
238 CompiledExpr::create_clauses(clauses)
239 }
240 })
241 .collect();
242
243 let compiled_spawn_exprs = leaked_data
244 .ir
245 .outputs
246 .iter()
247 .map(
248 |o| match (o.spawn.expression.as_ref(), o.spawn.condition.as_ref()) {
249 (None, None) => CompiledExpr::new(|_| Value::Tuple(vec![].into_boxed_slice())),
250 (Some(target), None) => target.clone().compile(),
251 (None, Some(condition)) => CompiledExpr::create_filter(
252 condition.clone().compile(),
253 CompiledExpr::new(|_| Value::Tuple(vec![].into_boxed_slice())),
254 ),
255 (Some(target), Some(condition)) => CompiledExpr::create_filter(
256 condition.clone().compile(),
257 target.clone().compile(),
258 ),
259 },
260 )
261 .collect();
262
263 let compiled_close_exprs = leaked_data
264 .ir
265 .outputs
266 .iter()
267 .map(|o| {
268 o.close
269 .condition
270 .as_ref()
271 .map_or(CompiledExpr::new(|_| Value::None), |e| e.clone().compile())
272 })
273 .collect();
274
275 Evaluator {
276 layers: &leaked_data.layers,
277 stream_activation_conditions: &leaked_data.stream_activation_conditions,
278 spawn_activation_conditions: &leaked_data.spawn_activation_conditions,
279 close_activation_conditions: &leaked_data.close_activation_conditions,
280 compiled_stream_exprs,
281 compiled_spawn_exprs,
282 compiled_close_exprs,
283 stream_windows: &leaked_data.stream_windows,
284 stream_instance_aggregations: &leaked_data.stream_instance_aggregations,
285 global_store: &mut leaked_data.global_store,
286 fresh_inputs: &mut leaked_data.fresh_inputs,
287 fresh_outputs: &mut leaked_data.fresh_outputs,
288 spawned_outputs: &mut leaked_data.spawned_outputs,
289 closed_outputs: &mut leaked_data.closed_outputs,
290 fresh_triggers: &mut leaked_data.fresh_triggers,
291 triggers: &leaked_data.triggers,
292 time_driven_streams: &leaked_data.time_driven_streams,
293 closing_streams: &leaked_data.closing_streams,
294 ir: &leaked_data.ir,
295 dyn_schedule: &leaked_data.dyn_schedule,
296 raw_data: heap_ptr,
297 }
298 }
299}
300
301impl Drop for Evaluator {
302 #[allow(unsafe_code)]
303 fn drop(&mut self) {
304 drop(unsafe { Box::from_raw(self.raw_data) });
305 }
306}
307
308impl Evaluator {
309 pub(crate) fn eval_event(&mut self, event: &[Value], ts: Time, tracer: &mut impl Tracer) {
312 self.new_cycle(ts);
313 self.accept_inputs(event, ts);
314 self.eval_event_driven(ts, tracer);
315 }
316
317 pub(crate) fn peek_fresh_outputs(&self) -> Vec<(OutputReference, Vec<Change>)> {
319 self.ir
320 .outputs
321 .iter()
322 .filter_map(|o| {
323 let stream = o.reference;
324 let out_ix = o.reference.out_ix();
325 let changes = if o.is_parameterized() {
326 let instances = self.global_store.get_out_instance_collection(out_ix);
327 instances
328 .spawned()
329 .map(|p| Change::Spawn(p.clone()))
330 .chain(instances.fresh().map(|p| {
331 Change::Value(
332 Some(p.clone()),
333 self.peek_value(stream, p, 0).expect("Marked as fresh"),
334 )
335 }))
336 .chain(instances.closed().map(|p| Change::Close(p.clone())))
337 .collect()
338 } else if o.is_spawned() {
339 let mut res = Vec::new();
340 if self.spawned_outputs.contains(out_ix) {
341 res.push(Change::Spawn(vec![]));
342 }
343 if self.fresh_outputs.contains(out_ix) {
344 res.push(Change::Value(
345 Some(vec![]),
346 self.peek_value(stream, &[], 0).expect("Marked as fresh"),
347 ));
348 }
349 if self.closed_outputs.contains(out_ix) {
350 res.push(Change::Close(vec![]));
351 }
352 res
353 } else if self.fresh_outputs.contains(out_ix) {
354 vec![Change::Value(
355 None,
356 self.peek_value(stream, &[], 0).expect("Marked as fresh"),
357 )]
358 } else {
359 vec![]
360 };
361 changes
362 .is_empty()
363 .not()
364 .then(|| (o.reference.out_ix(), changes))
365 })
366 .collect()
367 }
368
369 pub(crate) fn peek_violated_triggers_messages(
371 &self,
372 ) -> Vec<(OutputReference, Parameters, String)> {
373 self.peek_fresh_outputs()
374 .into_iter()
375 .filter(|(o_ref, _)| matches!(self.ir.outputs[*o_ref].kind, OutputKind::Trigger(_)))
376 .flat_map(|(o_ref, changes)| {
377 changes.into_iter().filter_map(move |change| match change {
378 Change::Value(parameters, Value::Str(msg)) => {
379 Some((o_ref, parameters, msg.into()))
380 }
381 Change::Value(_, _) => {
382 unreachable!("trigger values are strings; checked by the frontend")
383 }
384 _ => None,
385 })
386 })
387 .collect()
388 }
389
390 pub(crate) fn peek_violated_triggers(&self) -> Vec<TriggerReference> {
392 self.fresh_triggers.iter().collect()
393 }
394
395 pub(crate) fn peek_fresh_input(&self) -> Vec<(InputReference, Value)> {
397 self.fresh_inputs
398 .iter()
399 .map(|i| {
400 (
401 i,
402 self.peek_value(StreamReference::In(i), &[], 0)
403 .expect("Marked as fresh"),
404 )
405 })
406 .collect()
407 }
408
409 pub(crate) fn peek_inputs(&self) -> Vec<Option<Value>> {
411 self.ir
412 .inputs
413 .iter()
414 .map(|elem| self.peek_value(elem.reference, &[], 0))
415 .collect()
416 }
417
418 pub(crate) fn peek_outputs(&self) -> Vec<Vec<Instance>> {
420 self.ir
421 .outputs
422 .iter()
423 .map(|elem| {
424 if elem.is_parameterized() {
425 let ix = elem.reference.out_ix();
426 let values: Vec<Instance> = self
427 .global_store
428 .get_out_instance_collection(ix)
429 .all_parameter()
430 .map(|para| {
431 (
432 Some(para.clone()),
433 self.peek_value(elem.reference, para.as_ref(), 0),
434 )
435 })
436 .collect();
437 values
438 } else if elem.is_spawned() {
439 vec![(Some(vec![]), self.peek_value(elem.reference, &[], 0))]
440 } else {
441 vec![(None, self.peek_value(elem.reference, &[], 0))]
442 }
443 })
444 .collect()
445 }
446
447 fn accept_inputs(&mut self, event: &[Value], ts: Time) {
448 for (ix, v) in event.iter().enumerate() {
449 match v {
450 Value::None => {}
451 v => self.accept_input(ix, v.clone(), ts),
452 }
453 }
454 }
455
456 fn accept_input(&mut self, input: InputReference, v: Value, ts: Time) {
457 self.global_store
458 .get_in_instance_mut(input)
459 .push_value(v.clone());
460 self.fresh_inputs.insert(input);
461 let extended = &self.ir.inputs[input];
462 for (_sr, _origin, win) in extended.aggregated_by.iter().filter(|(_, _, w)| {
463 matches!(
464 w,
465 WindowReference::Sliding(_) | WindowReference::Discrete(_)
466 )
467 }) {
468 self.extend_window(&[], *win, v.clone(), ts);
469 }
470 }
471
472 fn eval_event_driven(&mut self, ts: Time, tracer: &mut impl Tracer) {
473 self.prepare_evaluation(ts);
474 for layer in self.layers {
475 self.eval_event_driven_layer(layer, ts, tracer);
476 }
477 for close in self.closing_streams {
478 let ac = &self.close_activation_conditions[*close];
479 if ac.is_eventdriven() && ac.eval(self.fresh_inputs) {
480 self.eval_close_instances(*close, ts, tracer);
481 }
482 }
483 }
484
485 fn eval_event_driven_layer(&mut self, tasks: &[Task], ts: Time, tracer: &mut impl Tracer) {
486 for task in tasks {
487 match task {
488 Task::Evaluate(idx) => self.eval_event_driven_output(*idx, ts, tracer),
489 Task::Spawn(idx) => self.eval_event_driven_spawn(*idx, ts, tracer),
490 Task::Close(_) => unreachable!("closes are not included in evaluation layer"),
491 }
492 }
493 }
494
495 fn eval_spawn(&mut self, output: OutputReference, ts: Time) {
496 let stream = self.ir.output(StreamReference::Out(output));
497 debug_assert!(
498 stream.is_spawned(),
499 "tried to spawn stream that should not be spawned"
500 );
501
502 let expr = self.compiled_spawn_exprs[output].clone();
503 let parameter = vec![];
504 let ctx = self.as_EvaluationContext(¶meter, ts);
505 let res = expr.execute(&ctx);
506
507 let parameter_values = match res {
508 Value::None => return, Value::Tuple(paras) => paras.to_vec(),
510 x => vec![x],
511 };
512
513 if stream.is_parameterized() {
514 debug_assert!(!parameter_values.is_empty());
515 let instances = self.global_store.get_out_instance_collection_mut(output);
516 if instances.contains(parameter_values.as_slice()) {
517 return;
519 }
520 instances.create_instance(parameter_values.as_slice());
521 } else {
522 debug_assert!(parameter_values.is_empty());
523 let inst = self.global_store.get_out_instance_mut(output);
524 if inst.is_active() {
525 return;
527 }
528 inst.activate();
529 }
530
531 self.spawn_windows(output, parameter_values.as_slice(), ts);
532
533 if let Some(tds) = self.time_driven_streams[output] {
535 let mut schedule = (*self.dyn_schedule).borrow_mut();
536
537 if tds.locality == PacingLocality::Local {
539 schedule.schedule_evaluation(
540 output,
541 parameter_values.as_slice(),
542 ts,
543 tds.period_in_duration(),
544 );
545 }
546
547 if let PacingType::LocalPeriodic(f) = stream.close.pacing {
549 let period = Duration::from_nanos(
550 UOM_Time::new::<uom::si::time::second>(
551 f.get::<uom::si::frequency::hertz>().inv(),
552 )
553 .get::<nanosecond>()
554 .to_integer()
555 .try_into()
556 .expect("Period [ns] too large for u64!"),
557 );
558 schedule.schedule_close(output, parameter_values.as_slice(), ts, period);
559 }
560 }
561
562 self.spawned_outputs.insert(output);
563 }
564
565 fn spawn_windows(&mut self, stream: OutputReference, parameter_values: &[Value], ts: Time) {
566 let stream = &self.ir.outputs[stream];
567 let own_windows: Vec<WindowReference> = self
568 .stream_windows
569 .get(&stream.reference)
570 .map(|windows| windows.to_vec())
571 .unwrap_or_default();
572
573 for win_ref in own_windows {
575 let WindowParameterization { kind, global } = self.window_parameterization(win_ref);
576 let target = self.ir.window(win_ref).target();
577 match (kind, global) {
579 (WindowParameterizationKind::None | WindowParameterizationKind::Caller, true) => {
580 }
582 (WindowParameterizationKind::None, false) => {
583 let (inst, fresh) = match target {
586 StreamReference::In(ix) => (
587 self.global_store.get_in_instance(ix),
588 self.fresh_inputs.contains(ix),
589 ),
590 StreamReference::Out(ix) => (
591 self.global_store.get_out_instance(ix),
592 self.fresh_outputs.contains(ix),
593 ),
594 };
595 let target_value = fresh.then(|| inst.get_value(0).unwrap());
596 let window = self.global_store.get_window_mut(win_ref);
597
598 if !window.is_active() {
599 window.activate(ts);
600 if let Some(val) = target_value {
601 window.accept_value(val, ts);
602 }
603 }
604 }
605 (WindowParameterizationKind::Target, false) => {
606 let fresh_values = self
609 .global_store
610 .get_out_instance_collection(target.out_ix())
611 .fresh_values();
612 self.global_store
613 .get_window_collection_mut(win_ref)
614 .activate_all(fresh_values, ts, ts);
615 }
616 (WindowParameterizationKind::Caller, false) => {
617 let (inst, fresh) = match target {
620 StreamReference::In(ix) => (
621 self.global_store.get_in_instance(ix),
622 self.fresh_inputs.contains(ix),
623 ),
624 StreamReference::Out(ix) => (
625 self.global_store.get_out_instance(ix),
626 self.fresh_outputs.contains(ix),
627 ),
628 };
629 let target_value = fresh.then(|| inst.get_value(0).unwrap());
630 let windows = self.global_store.get_window_collection_mut(win_ref);
631 let window = windows.get_or_create(parameter_values, ts);
632 if !window.is_active() {
634 window.activate(ts);
635 if let Some(val) = target_value {
636 window.accept_value(val, ts);
637 }
638 }
639 }
640 (WindowParameterizationKind::Both, false) => {
641 let fresh_values = self
644 .global_store
645 .get_out_instance_collection(target.out_ix())
646 .fresh_values();
647 let windows = self
648 .global_store
649 .get_two_layer_window_collection_mut(win_ref);
650 windows.spawn_caller_instance(fresh_values, parameter_values, ts, ts);
651 }
652 (WindowParameterizationKind::Both | WindowParameterizationKind::Target, true) => {
653 }
656 }
657 }
658
659 for (_, _origin, win_ref) in stream.aggregated_by.iter().filter(|(_, _, w)| {
661 matches!(
662 w,
663 WindowReference::Sliding(_) | WindowReference::Discrete(_)
664 )
665 }) {
666 let WindowParameterization { kind, global } = self.window_parameterization(*win_ref);
667 match (kind, global) {
669 (WindowParameterizationKind::Caller | WindowParameterizationKind::None, _) => {}
670 (WindowParameterizationKind::Both | WindowParameterizationKind::Target, true) => {
671 let windows = self.global_store.get_window_collection_mut(*win_ref);
672 let window = windows.get_or_create(parameter_values, ts);
673 window.activate(Time::default());
675 }
676 (WindowParameterizationKind::Target, false) => {
677 let windows = self.global_store.get_window_collection_mut(*win_ref);
678 windows.create_window(parameter_values, ts);
679 }
681 (WindowParameterizationKind::Both, false) => {
682 let windows = self
683 .global_store
684 .get_two_layer_window_collection_mut(*win_ref);
685 windows.spawn_target_instance(parameter_values);
686 }
688 }
689 }
690 }
691
692 fn eval_close(&mut self, output: OutputReference, parameter: &[Value], ts: Time) {
693 let stream = self.ir.output(StreamReference::Out(output));
694
695 let expr = self.compiled_close_exprs[output].clone();
696 let ctx = self.as_EvaluationContext(parameter, ts);
697 let res = expr.execute(&ctx);
698 if !res.as_bool() {
699 return;
700 }
701
702 let own_windows: Vec<WindowReference> = self
703 .stream_windows
704 .get(&stream.reference)
705 .map(|windows| windows.to_vec())
706 .unwrap_or_default();
707 if stream.is_parameterized() {
708 self.global_store
710 .get_out_instance_collection_mut(output)
711 .mark_for_deletion(parameter);
712
713 for win_ref in own_windows {
714 match self.window_parameterization(win_ref).kind {
716 WindowParameterizationKind::None | WindowParameterizationKind::Target => {
717 }
719 WindowParameterizationKind::Caller => {
720 self.global_store
721 .get_window_collection_mut(win_ref)
722 .delete_window(parameter);
723 }
724 WindowParameterizationKind::Both => {
725 self.global_store
726 .get_two_layer_window_collection_mut(win_ref)
727 .close_caller_instance(parameter);
728 }
729 }
730 }
731
732 for (_, _origin, win_ref) in stream.aggregated_by.iter().filter(|(_, _, w)| {
734 matches!(
735 w,
736 WindowReference::Sliding(_) | WindowReference::Discrete(_)
737 )
738 }) {
739 match self.window_parameterization(*win_ref).kind {
741 WindowParameterizationKind::None | WindowParameterizationKind::Caller => {
742 unreachable!()
743 }
744 WindowParameterizationKind::Target => {
745 self.global_store
746 .get_window_collection_mut(*win_ref)
747 .schedule_deletion(parameter, ts);
748 }
749 WindowParameterizationKind::Both => {
750 self.global_store
751 .get_two_layer_window_collection_mut(*win_ref)
752 .close_target_instance(parameter, ts);
753 }
754 }
755 }
756 } else {
757 for win_ref in own_windows {
758 match self.window_parameterization(win_ref).kind {
760 WindowParameterizationKind::None => {
761 self.global_store.get_window_mut(win_ref).deactivate();
762 }
763 WindowParameterizationKind::Target => {
764 self.global_store
765 .get_window_collection_mut(win_ref)
766 .deactivate_all();
767 }
768 WindowParameterizationKind::Caller | WindowParameterizationKind::Both => {
769 unreachable!("Parameters are empty")
770 }
771 }
772 }
773 }
774 self.closed_outputs.insert(output);
775
776 if let Some(tds) = self.time_driven_streams[output] {
778 let mut schedule = (*self.dyn_schedule).borrow_mut();
779 schedule.remove_evaluation(output, parameter, tds.period_in_duration());
780
781 if let PacingType::LocalPeriodic(f) = stream.close.pacing {
783 let period = Duration::from_nanos(
784 UOM_Time::new::<uom::si::time::second>(
785 f.get::<uom::si::frequency::hertz>().inv(),
786 )
787 .get::<nanosecond>()
788 .to_integer()
789 .try_into()
790 .expect("Period [ns] too large for u64!"),
791 );
792 schedule.remove_close(output, parameter, period);
793 }
794 }
795 }
796
797 fn close_streams(&mut self) {
799 for o in self.closed_outputs.iter() {
800 if self.ir.output(StreamReference::Out(o)).is_parameterized() {
801 let vals = self
802 .global_store
803 .get_out_instance_collection_mut(o)
804 .delete_instances();
805 if let Some(wrefs) = self
806 .stream_instance_aggregations
807 .get(&StreamReference::Out(o))
808 {
809 wrefs.iter().for_each(|aggr| {
810 let inst = self.global_store.get_instance_aggregation_mut(*aggr);
811 vals.iter().for_each(|v| {
812 inst.remove_value(v.clone());
813 })
814 })
815 }
816 } else {
817 self.global_store.get_out_instance_mut(o).deactivate();
818 }
819 }
820 }
821
822 fn eval_event_driven_spawn(
823 &mut self,
824 output: OutputReference,
825 ts: Time,
826 tracer: &mut impl Tracer,
827 ) {
828 if self.spawn_activation_conditions[output].eval(self.fresh_inputs) {
829 tracer.spawn_start(output);
830 self.eval_spawn(output, ts);
831 tracer.spawn_end(output);
832 }
833 }
834
835 fn eval_stream_instances(
836 &mut self,
837 output: OutputReference,
838 ts: Time,
839 tracer: &mut impl Tracer,
840 ) {
841 if self
842 .ir
843 .output(StreamReference::Out(output))
844 .is_parameterized()
845 {
846 let parameter: Vec<Vec<Value>> = self
847 .global_store
848 .get_out_instance_collection(output)
849 .all_parameter()
850 .cloned()
851 .collect();
852 for instance in parameter {
853 tracer.instance_eval_start(output, instance.as_slice());
854 self.eval_stream_instance(output, instance.as_slice(), ts);
855 tracer.instance_eval_end(output, instance.as_slice());
856 }
857 } else if self.global_store.get_out_instance(output).is_active() {
858 tracer.instance_eval_start(output, &[]);
859 self.eval_stream_instance(output, &[], ts);
860 tracer.instance_eval_end(output, &[]);
861 }
862 }
863
864 fn eval_close_instances(
865 &mut self,
866 output: OutputReference,
867 ts: Time,
868 tracer: &mut impl Tracer,
869 ) {
870 if self
871 .ir
872 .output(StreamReference::Out(output))
873 .is_parameterized()
874 {
875 let parameter: Vec<Vec<Value>> = self
876 .global_store
877 .get_out_instance_collection(output)
878 .all_parameter()
879 .cloned()
880 .collect();
881 for instance in parameter {
882 tracer.close_start(output, instance.as_slice());
883 self.eval_close(output, instance.as_slice(), ts);
884 tracer.close_end(output, instance.as_slice());
885 }
886 } else if self.global_store.get_out_instance(output).is_active() {
887 tracer.close_start(output, &[]);
888 self.eval_close(output, &[], ts);
889 tracer.close_end(output, &[]);
890 }
891 }
892
893 fn eval_event_driven_output(
894 &mut self,
895 output: OutputReference,
896 ts: Time,
897 tracer: &mut impl Tracer,
898 ) {
899 if self.stream_activation_conditions[output].eval(self.fresh_inputs) {
900 self.eval_stream_instances(output, ts, tracer)
901 }
902 }
903
904 pub(crate) fn eval_time_driven_tasks(
906 &mut self,
907 tasks: Vec<EvaluationTask>,
908 ts: Time,
909 tracer: &mut impl Tracer,
910 ) {
911 if tasks.is_empty() {
912 return;
913 }
914 self.new_cycle(ts);
915 self.prepare_evaluation(ts);
916 for task in tasks {
917 match task {
918 EvaluationTask::Evaluate(idx, parameter) => {
919 tracer.instance_eval_start(idx, parameter.as_slice());
920 self.eval_stream_instance(idx, parameter.as_slice(), ts);
921 tracer.instance_eval_end(idx, parameter.as_slice());
922 }
923 EvaluationTask::EvaluateInstances(idx) => {
924 self.eval_stream_instances(idx, ts, tracer);
925 }
926 EvaluationTask::Spawn(idx) => {
927 tracer.spawn_start(idx);
928 self.eval_spawn(idx, ts);
929 tracer.spawn_end(idx);
930 }
931 EvaluationTask::Close(idx, parameter) => {
932 tracer.close_start(idx, parameter.as_slice());
933 self.eval_close(idx, parameter.as_slice(), ts);
934 tracer.close_end(idx, parameter.as_slice());
935 }
936 EvaluationTask::CloseInstances(idx) => {
937 self.eval_close_instances(idx, ts, tracer);
938 }
939 }
940 }
941 }
942
943 fn prepare_evaluation(&mut self, ts: Time) {
944 let windows = &self.ir.sliding_windows;
946 for win in windows {
947 let WindowParameterization { kind, .. } = self.window_parameterization(win.reference);
948 match kind {
949 WindowParameterizationKind::None => {
950 let window = self.global_store.get_window_mut(win.reference);
951 if window.is_active() {
952 window.update(ts);
953 }
954 }
955 WindowParameterizationKind::Caller | WindowParameterizationKind::Target => {
956 self.global_store
957 .get_window_collection_mut(win.reference)
958 .update_all(ts);
959 }
960 WindowParameterizationKind::Both => {
961 self.global_store
962 .get_two_layer_window_collection_mut(win.reference)
963 .update_all(ts);
964 }
965 }
966 }
967 }
968
969 fn eval_stream_instance(&mut self, output: OutputReference, parameter: &[Value], ts: Time) {
970 let ix = output;
971
972 let expr = self.compiled_stream_exprs[ix].clone();
973 let ctx = self.as_EvaluationContext(parameter, ts);
974 let res = expr.execute(&ctx);
975
976 if let Value::None = res {
978 return;
979 }
980
981 let is_parameterized = self.ir.outputs[ix].is_parameterized();
982 let instance = if is_parameterized {
984 self.global_store
985 .get_out_instance_collection_mut(output)
986 .instance_mut(parameter)
987 .expect("tried to eval non existing instance")
988 } else {
989 self.global_store.get_out_instance_mut(output)
990 };
991 let old_value = instance.get_value(0);
992 instance.push_value(res.clone());
993 self.fresh_outputs.insert(ix);
994
995 if let Some(trigger) = self.is_trigger(output) {
996 self.fresh_triggers.insert(trigger.trigger_reference);
997 }
998
999 if let Some(aggrs) = self
1001 .stream_instance_aggregations
1002 .get(&StreamReference::Out(output))
1003 {
1004 aggrs.iter().for_each(|w| {
1005 let aggr = self.global_store.get_instance_aggregation_mut(*w);
1006 if let Some(old) = old_value.clone() {
1007 aggr.remove_value(old);
1008 }
1009 aggr.accept_value(res.clone());
1010 });
1011 }
1012
1013 let extended = &self.ir.outputs[ix];
1015 for (_sr, _origin, win) in extended.aggregated_by.iter().filter(|(_, _, w)| {
1016 matches!(
1017 w,
1018 WindowReference::Sliding(_) | WindowReference::Discrete(_)
1019 )
1020 }) {
1021 self.extend_window(parameter, *win, res.clone(), ts);
1022 }
1023 }
1024
1025 fn window_parameterization(&self, win: WindowReference) -> WindowParameterization {
1026 self.global_store.window_parameterization(win)
1027 }
1028
1029 fn extend_window(
1030 &mut self,
1031 own_parameter: &[Value],
1032 win: WindowReference,
1033 value: Value,
1034 ts: Time,
1035 ) {
1036 match self.window_parameterization(win).kind {
1037 WindowParameterizationKind::None => self
1038 .global_store
1039 .get_window_mut(win)
1040 .accept_value(value, ts),
1041 WindowParameterizationKind::Caller => self
1042 .global_store
1043 .get_window_collection_mut(win)
1044 .accept_value_all(value, ts),
1045 WindowParameterizationKind::Target => self
1046 .global_store
1047 .get_window_collection_mut(win)
1048 .window_mut(own_parameter)
1049 .expect("tried to extend non existing window")
1050 .accept_value(value, ts),
1051 WindowParameterizationKind::Both => self
1052 .global_store
1053 .get_two_layer_window_collection_mut(win)
1054 .accept_value(own_parameter, value, ts),
1055 }
1056 }
1057
1058 fn new_cycle(&mut self, ts: Time) {
1060 self.close_streams();
1061 self.fresh_inputs.clear();
1062 self.fresh_outputs.clear();
1063 self.fresh_triggers.clear();
1064
1065 self.spawned_outputs.clear();
1066 self.closed_outputs.clear();
1067 self.global_store.new_cycle(ts);
1068 }
1069
1070 fn is_trigger(&self, ix: OutputReference) -> Option<&Trigger> {
1071 self.triggers[ix].as_ref()
1072 }
1073
1074 fn peek_value(&self, sr: StreamReference, args: &[Value], offset: i16) -> Option<Value> {
1075 match sr {
1076 StreamReference::In(ix) => {
1077 assert!(args.is_empty());
1078 self.global_store.get_in_instance(ix).get_value(offset)
1079 }
1080 StreamReference::Out(ix) => {
1081 if self.ir.stream(sr).is_parameterized() {
1082 assert!(!args.is_empty());
1083 self.global_store
1084 .get_out_instance_collection(ix)
1085 .instance(args)
1086 .and_then(|i| i.get_value(offset))
1087 } else {
1088 self.global_store.get_out_instance(ix).get_value(offset)
1089 }
1090 }
1091 }
1092 }
1093
1094 #[allow(non_snake_case)]
1095 fn as_EvaluationContext<'a>(
1096 &'a mut self,
1097 parameter: &'a [Value],
1098 ts: Time,
1099 ) -> EvaluationContext<'a> {
1100 EvaluationContext {
1101 ts,
1102 global_store: self.global_store,
1103 fresh_inputs: self.fresh_inputs,
1104 fresh_outputs: self.fresh_outputs,
1105 parameter,
1106 lambda_parameter: None,
1107 }
1108 }
1109}
1110
1111impl EvaluationContext<'_> {
1112 pub(crate) fn lookup_latest(&self, stream_ref: StreamReference, parameter: &[Value]) -> Value {
1113 match stream_ref {
1114 StreamReference::In(ix) => self
1115 .global_store
1116 .get_in_instance(ix)
1117 .get_value(0)
1118 .unwrap_or(Value::None),
1119 StreamReference::Out(ix) => {
1120 if parameter.is_empty() {
1121 self.global_store
1122 .get_out_instance(ix)
1123 .get_value(0)
1124 .unwrap_or(Value::None)
1125 } else {
1126 self.global_store
1127 .get_out_instance_collection(ix)
1128 .instance(parameter)
1129 .and_then(|i| i.get_value(0))
1130 .unwrap_or(Value::None)
1131 }
1132 }
1133 }
1134 }
1135
1136 pub(crate) fn lookup_latest_check(
1137 &self,
1138 stream_ref: StreamReference,
1139 parameter: &[Value],
1140 ) -> Value {
1141 let inst = match stream_ref {
1142 StreamReference::In(ix) => {
1143 debug_assert!(self.fresh_inputs.contains(ix), "ix={}", ix);
1144 self.global_store.get_in_instance(ix)
1145 }
1146 StreamReference::Out(ix) => {
1147 debug_assert!(self.fresh_outputs.contains(ix), "ix={}", ix);
1148 if parameter.is_empty() {
1149 self.global_store.get_out_instance(ix)
1150 } else {
1151 self.global_store
1152 .get_out_instance_collection(ix)
1153 .instance(parameter)
1154 .expect("tried to sync access non existing instance")
1155 }
1156 }
1157 };
1158 inst.get_value(0).unwrap_or(Value::None)
1159 }
1160
1161 fn get_instance_and_fresh(
1162 &self,
1163 stream_ref: StreamReference,
1164 parameter: &[Value],
1165 ) -> (Option<&InstanceStore>, bool) {
1166 match stream_ref {
1167 StreamReference::In(ix) => (
1168 Some(self.global_store.get_in_instance(ix)),
1169 self.fresh_inputs.contains(ix),
1170 ),
1171 StreamReference::Out(ix) => {
1172 if parameter.is_empty() {
1173 (
1174 Some(self.global_store.get_out_instance(ix)),
1175 self.fresh_outputs.contains(ix),
1176 )
1177 } else {
1178 let collection = self.global_store.get_out_instance_collection(ix);
1179 (
1180 collection.instance(parameter),
1181 collection.is_fresh(parameter),
1182 )
1183 }
1184 }
1185 }
1186 }
1187
1188 pub(crate) fn lookup_fresh(&self, stream_ref: StreamReference, parameter: &[Value]) -> Value {
1189 let (_, fresh) = self.get_instance_and_fresh(stream_ref, parameter);
1190 Value::Bool(fresh)
1191 }
1192
1193 pub(crate) fn lookup_with_offset(
1194 &self,
1195 stream_ref: StreamReference,
1196 parameter: &[Value],
1197 offset: i16,
1198 ) -> Value {
1199 let (inst, fresh) = self.get_instance_and_fresh(stream_ref, parameter);
1200 let inst = inst.expect("target stream instance to exist for sync access");
1201 if fresh {
1202 inst.get_value(offset).unwrap_or(Value::None)
1203 } else {
1204 inst.get_value(offset + 1).unwrap_or(Value::None)
1205 }
1206 }
1207
1208 pub(crate) fn lookup_current(&self, stream_ref: StreamReference, parameter: &[Value]) -> Value {
1209 let (inst, fresh) = self.get_instance_and_fresh(stream_ref, parameter);
1210 if fresh {
1211 inst.expect("fresh instance to exist")
1212 .get_value(0)
1213 .expect("fresh stream to have a value.")
1214 } else {
1215 Value::None
1216 }
1217 }
1218
1219 pub(crate) fn lookup_instance_aggr(&self, window_reference: WindowReference) -> Value {
1221 if let WindowReference::Instance(idx) = window_reference {
1222 let aggr = &self.global_store.instance_aggregations[idx];
1223 let target = &self.global_store.p_outputs
1224 [self.global_store.stream_index_map[aggr.target.out_ix()]];
1225 aggr.get_value_with_ctx(target, self)
1226 } else {
1227 unreachable!("Called update_instance_aggregation for non instance");
1228 }
1229 }
1230
1231 pub(crate) fn lookup_window(
1232 &self,
1233 window_ref: WindowReference,
1234 target_parameter: &[Value],
1235 ) -> Value {
1236 let parameterization = self.global_store.window_parameterization(window_ref).kind;
1237 match parameterization {
1238 WindowParameterizationKind::None => {
1239 self.global_store.get_window(window_ref).get_value(self.ts)
1240 }
1241 WindowParameterizationKind::Caller => self
1242 .global_store
1243 .get_window_collection(window_ref)
1244 .window(self.parameter)
1245 .expect("Own window to exist")
1246 .get_value(self.ts),
1247 WindowParameterizationKind::Target => {
1248 let window_collection = self.global_store.get_window_collection(window_ref);
1249 let window = window_collection.window(target_parameter);
1250 if let Some(w) = window {
1251 w.get_value(self.ts)
1252 } else {
1253 window_collection.default_value(self.ts)
1254 }
1255 }
1256 WindowParameterizationKind::Both => {
1257 let collection = self
1258 .global_store
1259 .get_two_layer_window_collection(window_ref);
1260 let window = collection.window(target_parameter, self.parameter);
1261 if let Some(w) = window {
1262 w.get_value(self.ts)
1263 } else {
1264 collection.default_value(self.ts)
1265 }
1266 }
1267 }
1268 }
1269
1270 pub(crate) fn is_active(&self, ac: &ActivationConditionOp) -> bool {
1271 ac.eval(self.fresh_inputs)
1272 }
1273
1274 pub(crate) fn with_new_instance<'a>(&'a self, inst: &'a Vec<Value>) -> EvaluationContext<'a> {
1275 let EvaluationContext {
1276 ts,
1277 global_store,
1278 fresh_inputs,
1279 fresh_outputs,
1280 parameter,
1281 lambda_parameter: _,
1282 } = self;
1283 EvaluationContext {
1284 ts: *ts,
1285 global_store,
1286 fresh_inputs,
1287 fresh_outputs,
1288 parameter,
1289 lambda_parameter: Some(inst),
1290 }
1291 }
1292}
1293
1294impl ActivationConditionOp {
1295 fn new(ac: &Activation, n_inputs: usize) -> Self {
1296 use ActivationConditionOp::*;
1297 if let Activation::True = ac {
1298 return True;
1300 }
1301 if let Activation::Conjunction(vec) = ac {
1302 assert!(!vec.is_empty());
1303 let ixs: Vec<usize> = vec
1304 .iter()
1305 .flat_map(|ac| {
1306 if let Activation::Stream(var) = ac {
1307 Some(var.in_ix())
1308 } else {
1309 None
1310 }
1311 })
1312 .collect();
1313 if vec.len() == ixs.len() {
1314 let mut bs = BitSet::with_capacity(n_inputs);
1316 for ix in ixs {
1317 bs.insert(ix);
1318 }
1319 return Conjunction(bs);
1320 }
1321 }
1322 General(ac.clone())
1323 }
1324
1325 pub(crate) fn eval(&self, inputs: &BitSet) -> bool {
1326 use ActivationConditionOp::*;
1327 match self {
1328 True => true,
1329 Conjunction(bs) => bs.is_subset(inputs),
1330 General(ac) => Self::eval_(ac, inputs),
1331 TimeDriven => unreachable!(),
1332 }
1333 }
1334
1335 fn eval_(ac: &Activation, inputs: &BitSet) -> bool {
1336 use Activation::*;
1337 match ac {
1338 Stream(var) => inputs.contains(var.in_ix()),
1339 Conjunction(vec) => vec.iter().all(|ac| Self::eval_(ac, inputs)),
1340 Disjunction(vec) => vec.iter().any(|ac| Self::eval_(ac, inputs)),
1341 True => unreachable!(),
1342 }
1343 }
1344
1345 fn is_eventdriven(&self) -> bool {
1346 !matches!(self, ActivationConditionOp::TimeDriven)
1347 }
1348}
1349
1350#[cfg(test)]
1351mod tests {
1352
1353 use std::time::Duration;
1354
1355 use ordered_float::{Float, NotNan};
1356 use rtlola_frontend::ParserConfig;
1357
1358 use super::*;
1359 use crate::monitor::NoTracer;
1360 use crate::schedule::dynamic_schedule::*;
1361 use crate::storage::Value::*;
1362
1363 fn setup(spec: &str) -> (RtLolaMir, EvaluatorData, Duration) {
1364 let cfg = ParserConfig::for_string(spec.to_string());
1365 let handler = rtlola_frontend::Handler::from(&cfg);
1366 let ir = rtlola_frontend::parse(&cfg).unwrap_or_else(|e| {
1367 handler.emit_error(&e);
1368 panic!();
1369 });
1370 let dyn_schedule = Rc::new(RefCell::new(DynamicSchedule::new()));
1371 let now = Duration::ZERO;
1372 let eval = EvaluatorData::new(ir.clone(), dyn_schedule);
1373 (ir, eval, now)
1374 }
1375
1376 fn setup_time(spec: &str) -> (RtLolaMir, EvaluatorData, Time) {
1377 let (ir, eval, _) = setup(spec);
1378 (ir, eval, Time::default())
1379 }
1380
1381 macro_rules! assert_float_eq {
1382 ($left:expr, $right:expr) => {
1383 if let Float(left) = $left {
1384 if let Float(right) = $right {
1385 assert!(
1386 (left - right).abs() < f64::epsilon(),
1387 "Assertion failed: Difference between {} and {} is greater than {}",
1388 left,
1389 right,
1390 f64::epsilon()
1391 );
1392 } else {
1393 panic!("{:?} is not a float.", $right)
1394 }
1395 } else {
1396 panic!("{:?} is not a float.", $left)
1397 }
1398 };
1399 }
1400
1401 macro_rules! eval_stream_instances {
1402 ($eval:expr, $start:expr, $ix:expr) => {
1403 $eval.eval_event_driven_output($ix.out_ix(), $start, &mut NoTracer::default());
1404 };
1405 }
1406
1407 macro_rules! eval_stream_instances_timed {
1408 ($eval:expr, $time:expr, $ix:expr) => {
1409 $eval.prepare_evaluation($time);
1410 $eval.eval_event_driven_output($ix.out_ix(), $time, &mut NoTracer::default());
1411 };
1412 }
1413
1414 macro_rules! eval_stream {
1415 ($eval:expr, $start:expr, $ix:expr, $parameter:expr) => {
1416 $eval.eval_stream_instance($ix, $parameter.as_slice(), $start);
1417 };
1418 }
1419
1420 macro_rules! spawn_stream {
1421 ($eval:expr, $start:expr, $ix:expr) => {
1422 $eval.eval_event_driven_spawn($ix.out_ix(), $start, &mut NoTracer::default());
1423 };
1424 }
1425
1426 macro_rules! spawn_stream_timed {
1427 ($eval:expr, $time:expr, $ix:expr) => {
1428 $eval.eval_event_driven_spawn($ix.out_ix(), $time, &mut NoTracer::default());
1429 };
1430 }
1431
1432 macro_rules! eval_close {
1433 ($eval:expr, $start:expr, $ix:expr, $parameter:expr) => {
1434 $eval.eval_close($ix.out_ix(), $parameter.as_slice(), $start);
1435 $eval.close_streams();
1436 };
1437 }
1438
1439 macro_rules! eval_close_timed {
1440 ($eval:expr, $time:expr, $ix:expr, $parameter:expr) => {
1441 $eval.eval_close($ix.out_ix(), $parameter.as_slice(), $time);
1442 $eval.close_streams();
1443 };
1444 }
1445
1446 macro_rules! stream_has_instance {
1447 ($eval:expr, $ix:expr, $parameter:expr) => {
1448 if $parameter.is_empty() {
1449 $eval
1450 .global_store
1451 .get_out_instance($ix.out_ix())
1452 .is_active()
1453 } else {
1454 $eval
1455 .global_store
1456 .get_out_instance_collection($ix.out_ix())
1457 .contains($parameter.as_slice())
1458 }
1459 };
1460 }
1461
1462 macro_rules! eval_stream_timed {
1463 ($eval:expr, $ix:expr, $parameter:expr, $time:expr) => {
1464 $eval.prepare_evaluation($time);
1465 $eval.eval_stream_instance($ix, $parameter.as_slice(), $time);
1466 };
1467 }
1468
1469 macro_rules! accept_input {
1470 ($eval:expr, $start:expr, $str_ref:expr, $v:expr) => {
1471 $eval.accept_input($str_ref.in_ix(), $v.clone(), $start);
1472 };
1473 }
1474
1475 macro_rules! accept_input_timed {
1476 ($eval:expr, $str_ref:expr, $v:expr, $time:expr) => {
1477 $eval.accept_input($str_ref.in_ix(), $v.clone(), $time);
1478 };
1479 }
1480
1481 macro_rules! peek_assert_eq {
1482 ($eval:expr, $start:expr, $ix:expr, $parameter:expr, $value:expr) => {
1483 eval_stream!($eval, $start, $ix, $parameter);
1484 assert_eq!(
1485 $eval
1486 .peek_value(StreamReference::Out($ix), $parameter.as_slice(), 0)
1487 .unwrap(),
1488 $value
1489 );
1490 };
1491 }
1492
1493 #[test]
1494 fn test_const_output_literals() {
1495 let (_, eval, start) = setup(
1496 r#"
1497 input i_0: UInt8
1498
1499 output o_0: Bool @i_0 := true
1500 output o_1: UInt8 @i_0 := 3
1501 output o_2: Int8 @i_0 := -5
1502 output o_3: Float32 @i_0 := -123.456
1503 output o_4: String @i_0 := "foobar"
1504 "#,
1505 );
1506 let mut eval = eval.into_evaluator();
1507 let sr = StreamReference::In(0);
1508 let v = Unsigned(3);
1509 accept_input!(eval, start, sr, v);
1510 peek_assert_eq!(eval, start, 0, vec![], Bool(true));
1511 peek_assert_eq!(eval, start, 1, vec![], Unsigned(3));
1512 peek_assert_eq!(eval, start, 2, vec![], Signed(-5));
1513 peek_assert_eq!(eval, start, 3, vec![], Value::try_from(-123.456).unwrap());
1514 peek_assert_eq!(eval, start, 4, vec![], Str("foobar".into()));
1515 }
1516
1517 #[test]
1518 fn test_const_output_arithlog() {
1519 let (_, eval, start) = setup(
1520 r#"
1521 input i_0: Int8
1522
1523 output o_0: Bool @i_0 := !false
1524 output o_1: Bool @i_0 := !true
1525 output o_2: UInt8 @i_0 := 8 + 3
1526 output o_3: UInt8 @i_0 := 8 - 3
1527 output o_4: UInt8 @i_0 := 8 * 3
1528 output o_5: UInt8 @i_0 := 8 / 3
1529 output o_6: UInt8 @i_0 := 8 % 3
1530 output o_7: UInt8 @i_0 := 8 ** 3
1531 output o_8: Bool @i_0 := false || false
1532 output o_9: Bool @i_0 := false || true
1533 output o_10: Bool @i_0 := true || false
1534 output o_11: Bool @i_0 := true || true
1535 output o_12: Bool @i_0 := false && false
1536 output o_13: Bool @i_0 := false && true
1537 output o_14: Bool @i_0 := true && false
1538 output o_15: Bool @i_0 := true && true
1539 output o_16: Bool @i_0 := 0 < 1
1540 output o_17: Bool @i_0 := 0 < 0
1541 output o_18: Bool @i_0 := 1 < 0
1542 output o_19: Bool @i_0 := 0 <= 1
1543 output o_20: Bool @i_0 := 0 <= 0
1544 output o_21: Bool @i_0 := 1 <= 0
1545 output o_22: Bool @i_0 := 0 >= 1
1546 output o_23: Bool @i_0 := 0 >= 0
1547 output o_24: Bool @i_0 := 1 >= 0
1548 output o_25: Bool @i_0 := 0 > 1
1549 output o_26: Bool @i_0 := 0 > 0
1550 output o_27: Bool @i_0 := 1 > 0
1551 output o_28: Bool @i_0 := 0 == 0
1552 output o_29: Bool @i_0 := 0 == 1
1553 output o_30: Bool @i_0 := 0 != 0
1554 output o_31: Bool @i_0 := 0 != 1
1555 "#,
1556 );
1557 let mut eval = eval.into_evaluator();
1558 let sr = StreamReference::In(0);
1559 let v = Unsigned(3);
1560 accept_input!(eval, start, sr, v);
1561 peek_assert_eq!(eval, start, 0, vec![], Bool(!false));
1562 peek_assert_eq!(eval, start, 1, vec![], Bool(!true));
1563 peek_assert_eq!(eval, start, 2, vec![], Unsigned(8 + 3));
1564 peek_assert_eq!(eval, start, 3, vec![], Unsigned(8 - 3));
1565 peek_assert_eq!(eval, start, 4, vec![], Unsigned(8 * 3));
1566 peek_assert_eq!(eval, start, 5, vec![], Unsigned(8 / 3));
1567 peek_assert_eq!(eval, start, 6, vec![], Unsigned(8 % 3));
1568 peek_assert_eq!(eval, start, 7, vec![], Unsigned(8 * 8 * 8));
1569 peek_assert_eq!(eval, start, 8, vec![], Bool(false || false));
1570 peek_assert_eq!(eval, start, 9, vec![], Bool(false || true));
1571 peek_assert_eq!(eval, start, 10, vec![], Bool(true || false));
1572 peek_assert_eq!(eval, start, 11, vec![], Bool(true || true));
1573 peek_assert_eq!(eval, start, 12, vec![], Bool(false && false));
1574 peek_assert_eq!(eval, start, 13, vec![], Bool(false && true));
1575 peek_assert_eq!(eval, start, 14, vec![], Bool(true && false));
1576 peek_assert_eq!(eval, start, 15, vec![], Bool(true && true));
1577 peek_assert_eq!(eval, start, 16, vec![], Bool(0 < 1));
1578 peek_assert_eq!(eval, start, 17, vec![], Bool(0 < 0));
1579 peek_assert_eq!(eval, start, 18, vec![], Bool(1 < 0));
1580 peek_assert_eq!(eval, start, 19, vec![], Bool(0 <= 1));
1581 peek_assert_eq!(eval, start, 20, vec![], Bool(0 <= 0));
1582 peek_assert_eq!(eval, start, 21, vec![], Bool(1 <= 0));
1583 peek_assert_eq!(eval, start, 22, vec![], Bool(0 >= 1));
1584 peek_assert_eq!(eval, start, 23, vec![], Bool(0 >= 0));
1585 peek_assert_eq!(eval, start, 24, vec![], Bool(1 >= 0));
1586 peek_assert_eq!(eval, start, 25, vec![], Bool(0 > 1));
1587 peek_assert_eq!(eval, start, 26, vec![], Bool(0 > 0));
1588 peek_assert_eq!(eval, start, 27, vec![], Bool(1 > 0));
1589 peek_assert_eq!(eval, start, 28, vec![], Bool(0 == 0));
1590 peek_assert_eq!(eval, start, 29, vec![], Bool(0 == 1));
1591 peek_assert_eq!(eval, start, 30, vec![], Bool(0 != 0));
1592 peek_assert_eq!(eval, start, 31, vec![], Bool(0 != 1));
1593 }
1594
1595 #[test]
1596 fn test_input_only() {
1597 let (_, eval, start) = setup("input a: UInt8");
1598 let mut eval = eval.into_evaluator();
1599 let sr = StreamReference::In(0);
1600 let v = Unsigned(3);
1601 accept_input!(eval, start, sr, v);
1602 assert_eq!(eval.peek_value(sr, &Vec::new(), 0).unwrap(), v)
1603 }
1604
1605 #[test]
1606 fn test_sync_lookup() {
1607 let (_, eval, start) = setup("input a: UInt8 output b: UInt8 := a output c: UInt8 := b");
1608 let mut eval = eval.into_evaluator();
1609 let out_ref_0 = StreamReference::Out(0);
1610 let out_ref_1 = StreamReference::Out(1);
1611 let in_ref = StreamReference::In(0);
1612 let v = Unsigned(9);
1613 accept_input!(eval, start, in_ref, v);
1614 eval_stream!(eval, start, 0, vec![]);
1615 eval_stream!(eval, start, 1, vec![]);
1616 assert_eq!(eval.peek_value(out_ref_0, &Vec::new(), 0).unwrap(), v);
1617 assert_eq!(eval.peek_value(out_ref_1, &Vec::new(), 0).unwrap(), v)
1618 }
1619
1620 #[test]
1621 fn test_oob_lookup() {
1622 let (_, eval, start) =
1623 setup("input a: UInt8\noutput b := a.offset(by: -1).defaults(to: 3)\noutput x: UInt8 @5Hz := b.hold().defaults(to: 3)");
1624 let mut eval = eval.into_evaluator();
1625 let out_ref = StreamReference::Out(1);
1626 let in_ref = StreamReference::In(0);
1627 let v1 = Unsigned(1);
1628 accept_input!(eval, start, in_ref, v1);
1629 eval_stream!(eval, start, 0, vec![]);
1630 eval_stream!(eval, start, 1, vec![]);
1631 assert_eq!(
1632 eval.peek_value(out_ref, &Vec::new(), 0).unwrap(),
1633 Unsigned(3)
1634 );
1635 }
1636
1637 #[test]
1638 fn test_output_lookup() {
1639 let (_, eval, start) = setup(
1640 "input a: UInt8\n\
1641 output mirror: UInt8 := a\n\
1642 output mirror_offset := mirror.offset(by: -1).defaults(to: 5)\n\
1643 output c: UInt8 @5Hz := mirror.hold().defaults(to: 8)\n\
1644 output d: UInt8 @5Hz := mirror_offset.hold().defaults(to: 3)",
1645 );
1646 let mut eval = eval.into_evaluator();
1647 let out_ref = StreamReference::Out(2);
1648 let in_ref = StreamReference::In(0);
1649 let v1 = Unsigned(1);
1650 let v2 = Unsigned(2);
1651 accept_input!(eval, start, in_ref, v1);
1652 eval_stream!(eval, start, 0, vec![]);
1653 eval_stream!(eval, start, 1, vec![]);
1654 accept_input!(eval, start, in_ref, v2);
1655 eval_stream!(eval, start, 0, vec![]);
1656 eval_stream!(eval, start, 1, vec![]);
1657 eval_stream!(eval, start, 2, vec![]);
1658 assert_eq!(
1659 eval.peek_value(StreamReference::Out(0), &Vec::new(), 0)
1660 .unwrap(),
1661 v2
1662 );
1663 assert_eq!(
1664 eval.peek_value(StreamReference::Out(1), &Vec::new(), 0)
1665 .unwrap(),
1666 v1
1667 );
1668 assert_eq!(eval.peek_value(out_ref, &Vec::new(), 0).unwrap(), v2);
1669 }
1670
1671 #[test]
1672 fn test_get_fresh_lookup() {
1673 let (_, eval, start) = setup(
1674 "input a: UInt8\n\
1675 input b: UInt8\n\
1676 output mirror: UInt8 := a\n\
1677 output mirror_parameter (p)\n
1678 spawn with a\n\
1679 eval with a
1680 output g1: UInt8 @a := mirror.get().defaults(to: 8)\n\
1681 output g2: UInt8 @a := mirror_parameter(a).get().defaults(to: 8)\n\
1682 output f1: Bool @a := mirror.is_fresh()\n\
1683 output f2: Bool @a := mirror_parameter(a).is_fresh()",
1684 );
1685 let mut eval = eval.into_evaluator();
1686 let a = StreamReference::In(0);
1687 let mirror_p = StreamReference::Out(1);
1688 let g1 = StreamReference::Out(2);
1689 let g2 = StreamReference::Out(3);
1690 let f1 = StreamReference::Out(4);
1691 let f2 = StreamReference::Out(5);
1692
1693 let v1 = Unsigned(1);
1694 let t = Bool(true);
1695
1696 accept_input!(eval, start, a, v1);
1697 eval_stream!(eval, start, 0, vec![]);
1698 spawn_stream!(eval, start, mirror_p);
1699 eval_stream_instances!(eval, start, mirror_p);
1700
1701 eval_stream!(eval, start, 2, vec![]);
1702 eval_stream!(eval, start, 3, vec![]);
1703 eval_stream!(eval, start, 4, vec![]);
1704 eval_stream!(eval, start, 5, vec![]);
1705
1706 assert_eq!(eval.peek_value(g1, &Vec::new(), 0).unwrap(), v1);
1707 assert_eq!(eval.peek_value(g2, &Vec::new(), 0).unwrap(), v1);
1708
1709 assert_eq!(eval.peek_value(f1, &Vec::new(), 0).unwrap(), t);
1710 assert_eq!(eval.peek_value(f2, &Vec::new(), 0).unwrap(), t);
1711 }
1712
1713 #[test]
1714 fn test_get_fresh_lookup_fail() {
1715 let (_, eval, start) = setup(
1716 "input a: UInt8\n\
1717 input b: UInt8\n\
1718 output mirror: UInt8 := a\n\
1719 output mirror_parameter (p)\n
1720 spawn with a\n\
1721 eval with a
1722 output g1: UInt8 @b := mirror.get().defaults(to: 8)\n\
1723 output g2: UInt8 @b := mirror_parameter(b).get().defaults(to: 8)\n\
1724 output f1: Bool @b := mirror.is_fresh()\n\
1725 output f2: Bool @b := mirror_parameter(b).is_fresh()",
1726 );
1727 let mut eval = eval.into_evaluator();
1728 let b = StreamReference::In(1);
1729 let g1 = StreamReference::Out(2);
1730 let g2 = StreamReference::Out(3);
1731 let f1 = StreamReference::Out(4);
1732 let f2 = StreamReference::Out(5);
1733
1734 let v2 = Unsigned(2);
1735 let d = Unsigned(8);
1736 let f = Bool(false);
1737
1738 accept_input!(eval, start, b, v2);
1739
1740 eval_stream!(eval, start, 2, vec![]);
1741 eval_stream!(eval, start, 3, vec![]);
1742 eval_stream!(eval, start, 4, vec![]);
1743 eval_stream!(eval, start, 5, vec![]);
1744
1745 assert_eq!(eval.peek_value(g1, &Vec::new(), 0).unwrap(), d);
1746 assert_eq!(eval.peek_value(g2, &Vec::new(), 0).unwrap(), d);
1747
1748 assert_eq!(eval.peek_value(f1, &Vec::new(), 0).unwrap(), f);
1749 assert_eq!(eval.peek_value(f2, &Vec::new(), 0).unwrap(), f);
1750 }
1751
1752 #[test]
1753 fn test_conversion_if() {
1754 let (_, eval, start) =
1755 setup("input a: UInt8\noutput b: UInt16 := widen<UInt16>(if true then a else a[-1].defaults(to: 0))");
1756 let mut eval = eval.into_evaluator();
1757 let out_ref = StreamReference::Out(0);
1758 let in_ref = StreamReference::In(0);
1759 let v1 = Unsigned(1);
1760 accept_input!(eval, start, in_ref, v1);
1761 eval_stream!(eval, start, 0, vec![]);
1762 assert_eq!(eval.peek_value(out_ref, &Vec::new(), 0).unwrap(), v1);
1763 }
1764
1765 #[test]
1766 #[ignore] fn test_conversion_lookup() {
1768 let (_, eval, start) = setup("input a: UInt8\noutput b: UInt32 := a + 100000");
1769 let mut eval = eval.into_evaluator();
1770 let out_ref = StreamReference::Out(0);
1771 let in_ref = StreamReference::In(0);
1772 let expected = Unsigned(7 + 100000);
1773 let v1 = Unsigned(7);
1774 accept_input!(eval, start, in_ref, v1);
1775 eval_stream!(eval, start, 0, vec![]);
1776 assert_eq!(eval.peek_value(out_ref, &Vec::new(), 0).unwrap(), expected);
1777 }
1778
1779 #[test]
1780 fn test_bin_op() {
1781 let (_, eval, start) =
1782 setup("input a: UInt16\n input b: UInt16\noutput c: UInt16 := a + b");
1783 let mut eval = eval.into_evaluator();
1784 let out_ref = StreamReference::Out(0);
1785 let a = StreamReference::In(0);
1786 let b = StreamReference::In(1);
1787 let v1 = Unsigned(1);
1788 let v2 = Unsigned(2);
1789 let expected = Unsigned(1 + 2);
1790 accept_input!(eval, start, a, v1);
1791 accept_input!(eval, start, b, v2);
1792 eval_stream!(eval, start, 0, vec![]);
1793 assert_eq!(eval.peek_value(out_ref, &Vec::new(), 0).unwrap(), expected);
1794 }
1795
1796 #[test]
1797 fn test_bin_op_float() {
1798 let (_, eval, start) =
1799 setup("input a: Float64\n input b: Float64\noutput c: Float64 := a + b");
1800 let mut eval = eval.into_evaluator();
1801 let out_ref = StreamReference::Out(0);
1802 let a = StreamReference::In(0);
1803 let b = StreamReference::In(1);
1804 let v1 = Float(NotNan::new(3.5f64).unwrap());
1805 let v2 = Float(NotNan::new(39.347568f64).unwrap());
1806 let expected = Float(NotNan::new(3.5f64 + 39.347568f64).unwrap());
1807 accept_input!(eval, start, a, v1);
1808 accept_input!(eval, start, b, v2);
1809 eval_stream!(eval, start, 0, vec![]);
1810 assert_eq!(eval.peek_value(out_ref, &Vec::new(), 0).unwrap(), expected);
1811 }
1812
1813 #[test]
1814 fn test_bin_tuple() {
1815 let (_, eval, start) = setup(
1816 "input a: Int32\n input b: Bool\noutput c := (a, b) output d := c.0 output e := c.1",
1817 );
1818 let mut eval = eval.into_evaluator();
1819 let out_ref = StreamReference::Out(0);
1820 let out_ref0 = StreamReference::Out(1);
1821 let out_ref1 = StreamReference::Out(2);
1822 let a = StreamReference::In(0);
1823 let b = StreamReference::In(1);
1824 let v1 = Signed(1);
1825 let v2 = Bool(true);
1826 let expected = Tuple(Box::new([v1.clone(), v2.clone()]));
1827 accept_input!(eval, start, a, v1);
1828 accept_input!(eval, start, b, v2);
1829 eval_stream!(eval, start, 0, vec![]);
1830 eval_stream!(eval, start, 1, vec![]);
1831 eval_stream!(eval, start, 2, vec![]);
1832 assert_eq!(eval.peek_value(out_ref, &Vec::new(), 0).unwrap(), expected);
1833 assert_eq!(eval.peek_value(out_ref0, &Vec::new(), 0).unwrap(), v1);
1834 assert_eq!(eval.peek_value(out_ref1, &Vec::new(), 0).unwrap(), v2);
1835 }
1836
1837 #[test]
1838 fn test_regular_lookup() {
1839 let (_, eval, start) =
1840 setup("input a: UInt8 output b := a.offset(by: -1).defaults(to: 5) output x: UInt8 @5Hz := b.hold().defaults(to: 3)");
1841 let mut eval = eval.into_evaluator();
1842 let out_ref = StreamReference::Out(1);
1843 let in_ref = StreamReference::In(0);
1844 let v1 = Unsigned(1);
1845 let v2 = Unsigned(2);
1846 let v3 = Unsigned(3);
1847 accept_input!(eval, start, in_ref, v1);
1848 accept_input!(eval, start, in_ref, v2);
1849 accept_input!(eval, start, in_ref, v3);
1850 eval_stream!(eval, start, 0, vec![]);
1851 eval_stream!(eval, start, 1, vec![]);
1852 assert_eq!(eval.peek_value(out_ref, &Vec::new(), 0).unwrap(), v2)
1853 }
1854
1855 #[ignore] #[test]
1857 fn test_trigger() {
1858 let (_, eval, start) =
1859 setup("input a: UInt8 output b := a.offset(by: -1) output x: UInt8 @5Hz := b.hold().defaults(to: 3)\n trigger x > 4");
1860 let mut eval = eval.into_evaluator();
1861 let out_ref = StreamReference::Out(1);
1862 let trig_ref = StreamReference::Out(2);
1863 let in_ref = StreamReference::In(0);
1864 let v1 = Unsigned(8);
1865 eval_stream!(eval, start, 0, vec![]);
1866 eval_stream!(eval, start, 1, vec![]);
1867 eval_stream!(eval, start, 2, vec![]);
1868 assert_eq!(
1869 eval.peek_value(out_ref, &Vec::new(), 0).unwrap(),
1870 Unsigned(3)
1871 );
1872 assert_eq!(
1873 eval.peek_value(trig_ref, &Vec::new(), 0).unwrap(),
1874 Bool(false)
1875 );
1876 accept_input!(eval, start, in_ref, v1);
1877 eval_stream!(eval, start, 0, vec![]);
1878 eval_stream!(eval, start, 1, vec![]);
1879 eval_stream!(eval, start, 2, vec![]);
1880 assert_eq!(
1881 eval.peek_value(out_ref, &Vec::new(), 0).unwrap(),
1882 Unsigned(3)
1883 );
1884 assert_eq!(
1885 eval.peek_value(trig_ref, &Vec::new(), 0).unwrap(),
1886 Bool(false)
1887 );
1888 accept_input!(eval, start, in_ref, Unsigned(17));
1889 eval_stream!(eval, start, 0, vec![]);
1890 eval_stream!(eval, start, 1, vec![]);
1891 eval_stream!(eval, start, 2, vec![]);
1892 assert_eq!(eval.peek_value(out_ref, &Vec::new(), 0).unwrap(), v1);
1893 assert_eq!(
1894 eval.peek_value(trig_ref, &Vec::new(), 0).unwrap(),
1895 Bool(true)
1896 );
1897 }
1898
1899 #[test]
1900 fn test_sum_window() {
1901 let (_, eval, mut time) = setup_time(
1902 "input a: Int16\noutput b: Int16 @0.25Hz := a.aggregate(over: 40s, using: sum)",
1903 );
1904 let mut eval = eval.into_evaluator();
1905 time += Duration::from_secs(45);
1906 let out_ref = StreamReference::Out(0);
1907 let in_ref = StreamReference::In(0);
1908 let n = 25;
1909 for v in 1..=n {
1910 accept_input_timed!(eval, in_ref, Signed(v), time);
1911 time += Duration::from_secs(1);
1912 }
1913 time += Duration::from_secs(1);
1914 eval_stream_timed!(eval, 0, vec![], time);
1916 let expected = Signed((n * n + n) / 2);
1917 assert_eq!(eval.peek_value(out_ref, &Vec::new(), 0).unwrap(), expected);
1918 }
1919
1920 #[test]
1921 fn test_count_window() {
1922 let (_, eval, mut time) = setup_time(
1923 "input a: UInt16\noutput b: UInt16 @0.25Hz := a.aggregate(over: 40s, using: count)",
1924 );
1925 let mut eval = eval.into_evaluator();
1926 time += Duration::from_secs(45);
1927 let out_ref = StreamReference::Out(0);
1928 let in_ref = StreamReference::In(0);
1929 let n = 25;
1930 for v in 1..=n {
1931 accept_input_timed!(eval, in_ref, Unsigned(v), time);
1932 time += Duration::from_secs(1);
1933 }
1934 time += Duration::from_secs(1);
1935 eval_stream_timed!(eval, 0, vec![], time);
1937 let expected = Unsigned(n);
1938 assert_eq!(eval.peek_value(out_ref, &Vec::new(), 0).unwrap(), expected);
1939 }
1940
1941 #[test]
1942 fn test_average_window() {
1943 let (_, eval, mut time) = setup_time(
1944 "input a: Float32\noutput b @0.25Hz := a.aggregate(over: 40s, using: average).defaults(to: -3.0)",
1945 );
1946 let mut eval = eval.into_evaluator();
1947 time += Duration::from_secs(45);
1948 let out_ref = StreamReference::Out(0);
1949 let in_ref = StreamReference::In(0);
1950
1951 eval_stream_timed!(eval, 0, vec![], time);
1953 let expected = Value::try_from(-3.0).unwrap();
1954 assert_eq!(eval.peek_value(out_ref, &Vec::new(), 0).unwrap(), expected);
1955
1956 let n = 25;
1957 for v in 1..=n {
1958 accept_input_timed!(eval, in_ref, Value::try_from(v as f64).unwrap(), time);
1959 time += Duration::from_secs(1);
1960 }
1961 time += Duration::from_secs(1);
1962
1963 eval_stream_timed!(eval, 0, vec![], time);
1965 let n = n as f64;
1966 let expected = Value::try_from(((n * n + n) / 2.0) / 25.0).unwrap();
1967 assert_eq!(eval.peek_value(out_ref, &Vec::new(), 0).unwrap(), expected);
1968 }
1969
1970 #[test]
1971 fn test_window_correct_bucketing() {
1972 let (_, eval, mut time) =
1973 setup_time("input a: Float32\noutput b @2Hz := a.aggregate(over: 3s, using: sum)");
1974 let mut eval = eval.into_evaluator();
1975 let out_ref = StreamReference::Out(0);
1976 let in_ref = StreamReference::In(0);
1977
1978 accept_input_timed!(eval, in_ref, Value::try_from(0 as f64).unwrap(), time);
1979
1980 time += Duration::from_millis(500);
1981 eval_stream_timed!(eval, 0, vec![], time);
1982 let expected = Value::try_from(0.0).unwrap();
1983 assert_eq!(eval.peek_value(out_ref, &Vec::new(), 0).unwrap(), expected);
1984
1985 time += Duration::from_millis(500);
1986 eval_stream_timed!(eval, 0, vec![], time);
1987 let expected = Value::try_from(0.0).unwrap();
1988 assert_eq!(eval.peek_value(out_ref, &Vec::new(), 0).unwrap(), expected);
1989
1990 time += Duration::from_millis(100);
1993 accept_input_timed!(eval, in_ref, Value::try_from(1 as f64).unwrap(), time);
1994
1995 time += Duration::from_millis(400);
1996 eval_stream_timed!(eval, 0, vec![], time);
1997 let expected = Value::try_from(1.0).unwrap();
1998 assert_eq!(eval.peek_value(out_ref, &Vec::new(), 0).unwrap(), expected);
1999
2000 time += Duration::from_millis(500);
2001 eval_stream_timed!(eval, 0, vec![], time);
2002 let expected = Value::try_from(1.0).unwrap();
2003 assert_eq!(eval.peek_value(out_ref, &Vec::new(), 0).unwrap(), expected);
2004
2005 time += Duration::from_millis(500);
2008 eval_stream_timed!(eval, 0, vec![], time);
2009 let expected = Value::try_from(1.0).unwrap();
2010 assert_eq!(eval.peek_value(out_ref, &Vec::new(), 0).unwrap(), expected);
2011
2012 time += Duration::from_millis(500);
2013 eval_stream_timed!(eval, 0, vec![], time);
2014 let expected = Value::try_from(1.0).unwrap();
2015 assert_eq!(eval.peek_value(out_ref, &Vec::new(), 0).unwrap(), expected);
2016
2017 time += Duration::from_millis(40);
2020 accept_input_timed!(eval, in_ref, Value::try_from(2 as f64).unwrap(), time);
2021
2022 time += Duration::from_millis(460);
2023 eval_stream_timed!(eval, 0, vec![], time);
2024 let expected = Value::try_from(3.0).unwrap();
2025 assert_eq!(eval.peek_value(out_ref, &Vec::new(), 0).unwrap(), expected);
2026
2027 time += Duration::from_millis(500);
2028 eval_stream_timed!(eval, 0, vec![], time);
2029 let expected = Value::try_from(3.0).unwrap();
2030 assert_eq!(eval.peek_value(out_ref, &Vec::new(), 0).unwrap(), expected);
2031
2032 time += Duration::from_millis(500);
2035 eval_stream_timed!(eval, 0, vec![], time);
2036 let expected = Value::try_from(2.0).unwrap();
2037 assert_eq!(eval.peek_value(out_ref, &Vec::new(), 0).unwrap(), expected);
2038
2039 time += Duration::from_millis(500);
2040 eval_stream_timed!(eval, 0, vec![], time);
2041 let expected = Value::try_from(2.0).unwrap();
2042 assert_eq!(eval.peek_value(out_ref, &Vec::new(), 0).unwrap(), expected);
2043
2044 time += Duration::from_millis(110);
2047 accept_input_timed!(eval, in_ref, Value::try_from(3 as f64).unwrap(), time);
2048
2049 time += Duration::from_millis(390);
2050 eval_stream_timed!(eval, 0, vec![], time);
2051 let expected = Value::try_from(5.0).unwrap();
2052 assert_eq!(eval.peek_value(out_ref, &Vec::new(), 0).unwrap(), expected);
2053
2054 time += Duration::from_millis(500);
2055 eval_stream_timed!(eval, 0, vec![], time);
2056 let expected = Value::try_from(5.0).unwrap();
2057 assert_eq!(eval.peek_value(out_ref, &Vec::new(), 0).unwrap(), expected);
2058 }
2059
2060 #[test]
2061 fn test_integral_window() {
2062 let (_, eval, mut time) = setup_time(
2063 "input a: Float64\noutput b: Float64 @0.25Hz := a.aggregate(over_exactly: 40s, using: integral).defaults(to: -3.0)",
2064 );
2065 let mut eval = eval.into_evaluator();
2066 time += Duration::from_secs(45);
2067 let out_ref = StreamReference::Out(0);
2068 let in_ref = StreamReference::In(0);
2069
2070 fn mv(f: f64) -> Value {
2071 Float(NotNan::new(f).unwrap())
2072 }
2073
2074 accept_input_timed!(eval, in_ref, mv(1f64), time);
2075 time += Duration::from_secs(2);
2076 accept_input_timed!(eval, in_ref, mv(5f64), time);
2077 time += Duration::from_secs(5);
2079 accept_input_timed!(eval, in_ref, mv(25f64), time);
2080 time += Duration::from_secs(1);
2082 accept_input_timed!(eval, in_ref, mv(0f64), time);
2083 time += Duration::from_secs(10);
2085 accept_input_timed!(eval, in_ref, mv(-40f64), time);
2086 eval_stream_timed!(eval, 0, vec![], time);
2090
2091 let expected = Float(NotNan::new(-106.5).unwrap());
2092 assert_eq!(
2093 eval.peek_value(out_ref, vec![].as_slice(), 0).unwrap(),
2094 expected
2095 );
2096 }
2097
2098 #[test]
2099 fn test_integral_window2() {
2100 fn mv(f: f64) -> Value {
2101 Float(NotNan::new(f).unwrap())
2102 }
2103
2104 let (_, eval, mut time) =
2105 setup_time("input a : Int64\noutput b@1Hz := a.aggregate(over: 5s, using: integral)");
2106 let mut eval = eval.into_evaluator();
2107 let out_ref = StreamReference::Out(0);
2108 let in_ref = StreamReference::In(0);
2109
2110 accept_input_timed!(eval, in_ref, mv(0f64), time);
2111
2112 time += Duration::from_secs(1);
2113 accept_input_timed!(eval, in_ref, mv(8f64), time);
2114 eval_stream_timed!(eval, 0, vec![], time);
2115 let expected = Float(NotNan::new(4.0).unwrap());
2116 assert_eq!(
2117 eval.peek_value(out_ref, vec![].as_slice(), 0).unwrap(),
2118 expected
2119 );
2120 }
2121
2122 #[test]
2123 fn test_window_type_count() {
2124 let (_, eval, start) =
2125 setup("input a: Int32\noutput b @ 10Hz := a.aggregate(over: 0.1s, using: count)");
2126 let mut eval = eval.into_evaluator();
2127 let out_ref = StreamReference::Out(0);
2128 let _a = StreamReference::In(0);
2129 let expected = Unsigned(0);
2130 eval_stream!(eval, start, 0, vec![]);
2131 assert_eq!(eval.peek_value(out_ref, &Vec::new(), 0).unwrap(), expected);
2132 }
2133
2134 #[test]
2135 fn test_sum_window_discrete() {
2136 let (_, eval, mut time) = setup_time(
2137 "input a: Int16\noutput b: Int16 := a.aggregate(over_discrete: 6, using: sum)",
2138 );
2139 let mut eval = eval.into_evaluator();
2140 time += Duration::from_secs(45);
2141 let out_ref = StreamReference::Out(0);
2142 let in_ref = StreamReference::In(0);
2143 let n = 25;
2144 for v in 1..=n {
2145 accept_input_timed!(eval, in_ref, Signed(v), time);
2146 time += Duration::from_secs(1);
2147 }
2148 time += Duration::from_secs(1);
2149 eval_stream_timed!(eval, 0, vec![], time);
2151 let expected = Signed(135);
2152 assert_eq!(eval.peek_value(in_ref, &Vec::new(), 0).unwrap(), Signed(25));
2154 assert_eq!(eval.peek_value(out_ref, &Vec::new(), 0).unwrap(), expected);
2155 }
2156
2157 #[test]
2159 fn test_last_window_float() {
2160 let (_, eval, mut time) = setup_time(
2161 "input a: Float32\noutput b: Float32 @1Hz:= a.aggregate(over: 5s, using: last).defaults(to:0.0)",
2162 );
2163 let mut eval = eval.into_evaluator();
2164 time += Duration::from_secs(45);
2165 let out_ref = StreamReference::Out(0);
2166 let in_ref = StreamReference::In(0);
2167 let n = 25;
2168 for v in 1..=n {
2169 accept_input_timed!(eval, in_ref, Float(NotNan::new(v as f64).unwrap()), time);
2170 time += Duration::from_secs(1);
2171 }
2172 time += Duration::from_secs(1);
2173 eval_stream_timed!(eval, 0, vec![], time);
2175 let expected = Float(NotNan::new(25.0).unwrap());
2176 assert_eq!(
2177 eval.peek_value(in_ref, &Vec::new(), 0).unwrap(),
2178 Float(NotNan::new(25.0).unwrap())
2179 );
2180 assert_eq!(eval.peek_value(out_ref, &Vec::new(), 0).unwrap(), expected);
2181 }
2182
2183 #[test]
2184 fn test_last_window_signed() {
2185 let (_, eval, mut time) =
2186 setup_time("input a: Int32\noutput b: Int32 @1Hz:= a.aggregate(over: 20s, using: last).defaults(to:0)");
2187 let mut eval = eval.into_evaluator();
2188 time += Duration::from_secs(45);
2189 let out_ref = StreamReference::Out(0);
2190 let in_ref = StreamReference::In(0);
2191 let n = 25;
2192 for v in 1..=n {
2193 accept_input_timed!(eval, in_ref, Signed(v), time);
2194 time += Duration::from_secs(1);
2195 }
2196 time += Duration::from_secs(1);
2197 eval_stream_timed!(eval, 0, vec![], time);
2199 let expected = Signed(25);
2200 assert_eq!(eval.peek_value(in_ref, &Vec::new(), 0).unwrap(), Signed(25));
2201 assert_eq!(eval.peek_value(out_ref, &Vec::new(), 0).unwrap(), expected);
2202 }
2203
2204 #[test]
2205 fn test_last_window_unsigned() {
2206 let (_, eval, mut time) =
2207 setup_time("input a: UInt32\noutput b: UInt32 @1Hz:= a.aggregate(over: 20s, using: last).defaults(to:0)");
2208 let mut eval = eval.into_evaluator();
2209 time += Duration::from_secs(45);
2210 let out_ref = StreamReference::Out(0);
2211 let in_ref = StreamReference::In(0);
2212 let n = 25;
2213 for v in 1..=n {
2214 accept_input_timed!(eval, in_ref, Unsigned(v), time);
2215 time += Duration::from_secs(1);
2216 }
2217 time += Duration::from_secs(1);
2218 eval_stream_timed!(eval, 0, vec![], time);
2220 let expected = Unsigned(25);
2221 assert_eq!(
2222 eval.peek_value(in_ref, &Vec::new(), 0).unwrap(),
2223 Unsigned(25)
2224 );
2225 assert_eq!(eval.peek_value(out_ref, &Vec::new(), 0).unwrap(), expected);
2226 }
2227
2228 #[test]
2229 fn test_percentile_float() {
2230 for (pctl, exp) in &[
2231 ("pctl25", Value::try_from(13.0)),
2232 ("pctl75", Value::try_from(18.0)),
2233 ("pctl10", Value::try_from(11.5)),
2234 ("pctl5", Value::try_from(11.0)),
2235 ("pctl90", Value::try_from(19.5)),
2236 ("med", Value::try_from(15.5)),
2237 ] {
2238 let (_, eval, mut time) = setup_time(&format!(
2239 "input a: Float32\noutput b: Float32 @1Hz:= a.aggregate(over: 10s, using: {}).defaults(to:0.0)",
2240 pctl
2241 ));
2242 let mut eval = eval.into_evaluator();
2243 time += Duration::from_secs(45);
2244 let out_ref = StreamReference::Out(0);
2245 let in_ref = StreamReference::In(0);
2246 let n = 20;
2247 for v in 1..=n {
2248 time += Duration::from_secs(1);
2249 accept_input_timed!(eval, in_ref, Float(NotNan::new(v as f64).unwrap()), time);
2250 }
2251 eval_stream_timed!(eval, 0, vec![], time);
2253 assert_eq!(
2254 eval.peek_value(in_ref, &Vec::new(), 0).unwrap(),
2255 Float(NotNan::new(20.0).unwrap())
2256 );
2257 assert_eq!(
2258 eval.peek_value(out_ref, &Vec::new(), 0).unwrap(),
2259 exp.as_ref().unwrap().clone()
2260 );
2261 }
2262 }
2263
2264 #[test]
2265 fn test_percentile_float_unordered_input() {
2266 for (pctl, exp) in &[
2267 ("pctl25", Value::try_from(13.0)),
2268 ("pctl75", Value::try_from(18.0)),
2269 ("pctl10", Value::try_from(11.5)),
2270 ("pctl5", Value::try_from(11.0)),
2271 ("pctl90", Value::try_from(19.5)),
2272 ("med", Value::try_from(15.5)),
2273 ] {
2274 let (_, eval, mut time) = setup_time(&format!(
2275 "input a: Float32\noutput b: Float32 @1Hz:= a.aggregate(over: 10s, using: {}).defaults(to:0.0)",
2276 pctl
2277 ));
2278 let mut eval = eval.into_evaluator();
2279 time += Duration::from_secs(45);
2280 let out_ref = StreamReference::Out(0);
2281 let in_ref = StreamReference::In(0);
2282 let n = 20;
2283 let input_val = [
2284 1, 9, 8, 5, 4, 3, 7, 2, 10, 6, 20, 11, 19, 12, 18, 13, 17, 14, 16, 15,
2285 ];
2286 for v in 0..n {
2287 time += Duration::from_secs(1);
2288 accept_input_timed!(
2289 eval,
2290 in_ref,
2291 Float(NotNan::new(input_val[v] as f64).unwrap()),
2292 time
2293 );
2294 }
2295 eval_stream_timed!(eval, 0, vec![], time);
2297 assert_eq!(
2298 eval.peek_value(in_ref, &Vec::new(), 0).unwrap(),
2299 Float(NotNan::new(15.0).unwrap())
2300 );
2301 assert_eq!(
2302 eval.peek_value(out_ref, &Vec::new(), 0).unwrap(),
2303 exp.as_ref().unwrap().clone()
2304 );
2305 }
2306 }
2307
2308 #[test]
2309 fn test_percentile_signed() {
2310 for (pctl, exp) in &[
2311 ("pctl25", Signed(13)),
2312 ("pctl75", Signed(18)),
2313 ("pctl10", Signed(11)),
2314 ("pctl5", Signed(11)),
2315 ("pctl90", Signed(19)),
2316 ("med", Signed(15)),
2317 ] {
2318 let (_, eval, mut time) = setup_time(&format!(
2319 "input a: Int32\noutput b: Int32 @1Hz:= a.aggregate(over: 10s, using: {}).defaults(to:0)",
2320 pctl
2321 ));
2322 let mut eval = eval.into_evaluator();
2323 time += Duration::from_secs(45);
2324 let out_ref = StreamReference::Out(0);
2325 let in_ref = StreamReference::In(0);
2326 let n = 20;
2327 for v in 1..=n {
2328 time += Duration::from_secs(1);
2329 accept_input_timed!(eval, in_ref, Signed(v), time);
2330 }
2331 eval_stream_timed!(eval, 0, vec![], time);
2333 assert_eq!(
2334 eval.peek_value(out_ref, &Vec::new(), 0).unwrap(),
2335 exp.clone()
2336 );
2337 }
2338 }
2339
2340 #[test]
2341 fn test_percentile_unsigned() {
2342 for (pctl, exp) in &[
2343 ("pctl25", Unsigned(13)),
2344 ("pctl75", Unsigned(18)),
2345 ("pctl10", Unsigned(11)),
2346 ("pctl5", Unsigned(11)),
2347 ("pctl90", Unsigned(19)),
2348 ("med", Unsigned(15)),
2349 ] {
2350 let (_, eval, mut time) = setup_time(&format!(
2351 "input a: UInt32\noutput b: UInt32 @1Hz:= a.aggregate(over: 10s, using: {}).defaults(to:0)",
2352 pctl
2353 ));
2354 let mut eval = eval.into_evaluator();
2355 time += Duration::from_secs(45);
2356 let out_ref = StreamReference::Out(0);
2357 let in_ref = StreamReference::In(0);
2358 let n = 20;
2359 for v in 1..=n {
2360 time += Duration::from_secs(1);
2361 accept_input_timed!(eval, in_ref, Unsigned(v), time);
2362 }
2363 eval_stream_timed!(eval, 0, vec![], time);
2365 assert_eq!(
2366 eval.peek_value(out_ref, &Vec::new(), 0).unwrap(),
2367 exp.clone()
2368 );
2369 }
2370 }
2371
2372 #[test]
2373 fn test_percentile_discrete_float_unordered_input() {
2374 for (pctl, exp) in &[
2375 ("pctl25", Value::try_from(13.0)),
2376 ("pctl75", Value::try_from(18.0)),
2377 ("pctl10", Value::try_from(11.5)),
2378 ("pctl5", Value::try_from(11.0)),
2379 ("pctl90", Value::try_from(19.5)),
2380 ("med", Value::try_from(15.5)),
2381 ] {
2382 let (_, eval, mut time) = setup_time(&format!(
2383 "input a: Float32\noutput b: Float32 := a.aggregate(over_discrete: 10, using: {}).defaults(to:0.0)",
2384 pctl
2385 ));
2386 let mut eval = eval.into_evaluator();
2387 time += Duration::from_secs(45);
2388 let out_ref = StreamReference::Out(0);
2389 let in_ref = StreamReference::In(0);
2390 let n = 20;
2391 let input_val = [
2392 1, 9, 8, 5, 4, 3, 7, 2, 10, 6, 20, 11, 19, 12, 18, 13, 17, 14, 16, 15,
2393 ];
2394 for v in 0..n {
2395 time += Duration::from_secs(1);
2396 accept_input_timed!(
2397 eval,
2398 in_ref,
2399 Float(NotNan::new(input_val[v] as f64).unwrap()),
2400 time
2401 );
2402 }
2403 eval_stream_timed!(eval, 0, vec![], time);
2405 assert_eq!(
2406 eval.peek_value(in_ref, &Vec::new(), 0).unwrap(),
2407 Float(NotNan::new(15.0).unwrap())
2408 );
2409 assert_eq!(
2410 eval.peek_value(out_ref, &Vec::new(), 0).unwrap(),
2411 exp.as_ref().unwrap().clone()
2412 );
2413 }
2414 }
2415
2416 #[test]
2417 fn test_var_equal_input() {
2418 let (_, eval, mut time) =
2419 setup_time("input a: Float32\noutput b: Float32 @1Hz:= a.aggregate(over: 5s, using: var).defaults(to:0.0)");
2420 let mut eval = eval.into_evaluator();
2421 time += Duration::from_secs(45);
2422 let out_ref = StreamReference::Out(0);
2423 let in_ref = StreamReference::In(0);
2424 let n = 25;
2425 for _ in 1..=n {
2426 accept_input_timed!(eval, in_ref, Float(NotNan::new(10_f64).unwrap()), time);
2427 time += Duration::from_secs(1);
2428 }
2429 time += Duration::from_secs(1);
2430 eval_stream_timed!(eval, 0, vec![], time);
2432 let expected = Float(NotNan::new(0.0).unwrap());
2433 assert_eq!(
2434 eval.peek_value(in_ref, &Vec::new(), 0).unwrap(),
2435 Float(NotNan::new(10.0).unwrap())
2436 );
2437 assert_eq!(eval.peek_value(out_ref, &Vec::new(), 0).unwrap(), expected);
2438 }
2439
2440 #[test]
2441 fn test_var_window() {
2442 for (duration, exp) in &[
2443 ("2", Value::try_from(0.25)),
2444 ("3", Value::try_from(2.0 / 3.0)),
2445 ("4", Value::try_from(1.25)),
2446 ("5", Value::try_from(2.0)),
2447 ("6", Value::try_from(17.5 / 6.0)),
2448 ("7", Value::try_from(4.0)),
2449 ("8", Value::try_from(5.25)),
2450 ("9", Value::try_from(60.0 / 9.0)),
2451 ("10", Value::try_from(8.25)),
2452 ("11", Value::try_from(10.0)),
2453 ] {
2454 let (_, eval, mut time) = setup_time(&format!(
2455 "input a: Float32\noutput b: Float32 @1Hz:= a.aggregate(over: {}s, using: var).defaults(to:0.0)",
2456 duration
2457 ));
2458 let mut eval = eval.into_evaluator();
2459 time += Duration::from_secs(45);
2460 let out_ref = StreamReference::Out(0);
2461 let in_ref = StreamReference::In(0);
2462 let n = 20;
2463 for v in 1..=n {
2464 time += Duration::from_secs(1);
2465 accept_input_timed!(eval, in_ref, Float(NotNan::new(v as f64).unwrap()), time);
2466 }
2467 eval_stream_timed!(eval, 0, vec![], time);
2469 assert_eq!(
2470 eval.peek_value(in_ref, &Vec::new(), 0).unwrap(),
2471 Float(NotNan::new(20.0).unwrap())
2472 );
2473 assert_float_eq!(
2474 eval.peek_value(out_ref, &Vec::new(), 0).unwrap(),
2475 exp.as_ref().unwrap().clone()
2476 );
2477 }
2478 }
2479
2480 #[test]
2481 fn test_sd_window() {
2482 for (duration, exp) in &[
2483 ("2", Value::try_from(0.25f64.sqrt())),
2484 ("3", Value::try_from((2.0 / 3.0f64).sqrt())),
2485 ("4", Value::try_from(1.25f64.sqrt())),
2486 ("5", Value::try_from(2.0f64.sqrt())),
2487 ("6", Value::try_from((17.5 / 6.0f64).sqrt())),
2488 ("7", Value::try_from(4.0f64.sqrt())),
2489 ("8", Value::try_from(5.25f64.sqrt())),
2490 ("9", Value::try_from((60.0 / 9.0f64).sqrt())),
2491 ("10", Value::try_from(8.25f64.sqrt())),
2492 ("11", Value::try_from(10.0f64.sqrt())),
2493 ] {
2494 let (_, eval, mut time) = setup_time(&format!(
2495 "input a: Float32\noutput b: Float32 @1Hz:= a.aggregate(over: {}s, using: sd).defaults(to:0.0)",
2496 duration
2497 ));
2498 let mut eval = eval.into_evaluator();
2499 time += Duration::from_secs(45);
2500 let out_ref = StreamReference::Out(0);
2501 let in_ref = StreamReference::In(0);
2502 let n = 20;
2503 for v in 1..=n {
2504 time += Duration::from_secs(1);
2505 accept_input_timed!(eval, in_ref, Float(NotNan::new(v as f64).unwrap()), time);
2506 }
2507 eval_stream_timed!(eval, 0, vec![], time);
2509 assert_float_eq!(
2510 eval.peek_value(in_ref, &Vec::new(), 0).unwrap(),
2511 Float(NotNan::new(20.0).unwrap())
2512 );
2513 assert_float_eq!(
2514 eval.peek_value(out_ref, &Vec::new(), 0).unwrap(),
2515 exp.as_ref().unwrap().clone()
2516 );
2517 }
2518 }
2519
2520 #[test]
2521 fn test_cov() {
2522 let (_, eval, mut time) =
2523 setup_time("input in: Float32\n input in2: Float32\noutput t@in&in2:= (in,in2)\n output out: Float32 @1Hz := t.aggregate(over: 6s, using: cov).defaults(to: 1337.0)");
2524 let mut eval = eval.into_evaluator();
2525 time += Duration::from_secs(45);
2526 let out_ref = StreamReference::Out(1);
2527 let in_ref = StreamReference::In(0);
2528 let in_ref_2 = StreamReference::In(1);
2529 let n = 20;
2530 for v in 1..=n {
2531 time += Duration::from_secs(1);
2532 accept_input_timed!(eval, in_ref, Value::try_from(v as f64).unwrap(), time);
2533 accept_input_timed!(eval, in_ref_2, Value::try_from(v as f64).unwrap(), time);
2534 eval_stream_timed!(eval, 0, vec![], time);
2535 }
2536 eval_stream_timed!(eval, 1, vec![], time);
2538 let expected = Float(NotNan::new(17.5 / 6.0).unwrap());
2539 assert_float_eq!(
2540 eval.peek_value(in_ref, &Vec::new(), 0).unwrap(),
2541 Float(NotNan::new(20.0).unwrap())
2542 );
2543 assert_float_eq!(
2544 eval.peek_value(in_ref_2, &Vec::new(), 0).unwrap(),
2545 Float(NotNan::new(20.0).unwrap())
2546 );
2547 assert_float_eq!(eval.peek_value(out_ref, &Vec::new(), 0).unwrap(), expected);
2548 }
2549
2550 #[test]
2551 fn test_cov_2() {
2552 let (_, eval, mut time) =
2553 setup_time("input in: Float32\n input in2: Float32\noutput t@in&in2:= (in,in2)\n output out: Float32 @1Hz := t.aggregate(over: 5s, using: cov).defaults(to: 1337.0)");
2554 let mut eval = eval.into_evaluator();
2555 time += Duration::from_secs(45);
2556 let out_ref = StreamReference::Out(1);
2557 let in_ref = StreamReference::In(0);
2558 let in_ref_2 = StreamReference::In(1);
2559 let n = 20;
2560 for v in 1..=n {
2561 accept_input_timed!(eval, in_ref, Value::try_from(v as f64).unwrap(), time);
2562 accept_input_timed!(eval, in_ref_2, Value::try_from(16.0).unwrap(), time);
2563 eval_stream_timed!(eval, 0, vec![], time);
2564 time += Duration::from_secs(1);
2565 }
2566 time += Duration::from_secs(1);
2567 eval_stream_timed!(eval, 1, vec![], time);
2569 let expected = Float(NotNan::new(0.0).unwrap());
2570 assert_eq!(
2571 eval.peek_value(in_ref, &Vec::new(), 0).unwrap(),
2572 Float(NotNan::new(20.0).unwrap())
2573 );
2574 assert_eq!(
2575 eval.peek_value(in_ref_2, &Vec::new(), 0).unwrap(),
2576 Float(NotNan::new(16.0).unwrap())
2577 );
2578 assert_eq!(eval.peek_value(out_ref, &Vec::new(), 0).unwrap(), expected);
2579 }
2580
2581 #[test]
2582 fn test_var_discrete() {
2583 for (duration, exp) in &[
2584 ("2", Value::try_from(0.25)),
2585 ("3", Value::try_from(2.0 / 3.0)),
2586 ("4", Value::try_from(1.25)),
2587 ("5", Value::try_from(2.0)),
2588 ("6", Value::try_from(17.5 / 6.0)),
2589 ("7", Value::try_from(4.0)),
2590 ("8", Value::try_from(5.25)),
2591 ("9", Value::try_from(60.0 / 9.0)),
2592 ("10", Value::try_from(8.25)),
2593 ("11", Value::try_from(10.0)),
2594 ] {
2595 let (_, eval, mut time) = setup_time(&format!(
2596 "input a: Float32\noutput b: Float32 := a.aggregate(over_discrete: {}, using: var).defaults(to:0.0)",
2597 duration
2598 ));
2599 let mut eval = eval.into_evaluator();
2600 time += Duration::from_secs(45);
2601 let out_ref = StreamReference::Out(0);
2602 let in_ref = StreamReference::In(0);
2603 let n = 20;
2604 for v in 1..=n {
2605 accept_input_timed!(eval, in_ref, Float(NotNan::new(v as f64).unwrap()), time);
2606 time += Duration::from_secs(1);
2607 }
2608 time += Duration::from_secs(1);
2609 eval_stream_timed!(eval, 0, vec![], time);
2611 assert_eq!(
2612 eval.peek_value(in_ref, &Vec::new(), 0).unwrap(),
2613 Float(NotNan::new(20.0).unwrap())
2614 );
2615 assert_float_eq!(
2616 eval.peek_value(out_ref, &Vec::new(), 0).unwrap(),
2617 exp.as_ref().unwrap().clone()
2618 );
2619 }
2620 }
2621
2622 #[test]
2623 fn test_sd_discrete() {
2624 for (duration, exp) in &[
2625 ("2", Value::try_from(0.25f64.sqrt())),
2626 ("3", Value::try_from((2.0 / 3.0f64).sqrt())),
2627 ("4", Value::try_from(1.25f64.sqrt())),
2628 ("5", Value::try_from(2.0f64.sqrt())),
2629 ("6", Value::try_from((17.5 / 6.0f64).sqrt())),
2630 ("7", Value::try_from(4.0f64.sqrt())),
2631 ("8", Value::try_from(5.25f64.sqrt())),
2632 ("9", Value::try_from((60.0 / 9.0f64).sqrt())),
2633 ("10", Value::try_from(8.25f64.sqrt())),
2634 ("11", Value::try_from(10.0f64.sqrt())),
2635 ] {
2636 let (_, eval, mut time) = setup_time(&format!(
2637 "input a: Float32\noutput b: Float32 := a.aggregate(over_discrete: {}, using: sd).defaults(to:0.0)",
2638 duration
2639 ));
2640 let mut eval = eval.into_evaluator();
2641 time += Duration::from_secs(45);
2642 let out_ref = StreamReference::Out(0);
2643 let in_ref = StreamReference::In(0);
2644 let n = 20;
2645 for v in 1..=n {
2646 accept_input_timed!(eval, in_ref, Float(NotNan::new(v as f64).unwrap()), time);
2647 time += Duration::from_secs(1);
2648 }
2649 time += Duration::from_secs(1);
2650 eval_stream_timed!(eval, 0, vec![], time);
2652 assert_eq!(
2653 eval.peek_value(in_ref, &Vec::new(), 0).unwrap(),
2654 Float(NotNan::new(20.0).unwrap())
2655 );
2656 assert_eq!(
2657 eval.peek_value(out_ref, &Vec::new(), 0).unwrap(),
2658 exp.as_ref().unwrap().clone()
2659 );
2660 }
2661 }
2662
2663 #[test]
2664 fn test_cases_window_discrete_float() {
2665 for (aggr, exp, default) in &[
2666 ("sum", Value::try_from(115.0), false),
2667 ("min", Value::try_from(21.0), true),
2668 ("max", Value::try_from(25.0), true),
2669 ("avg", Value::try_from(23.0), true),
2670 ("integral", Value::try_from(92.0), false),
2671 ("last", Value::try_from(25.0), true),
2672 ("med", Value::try_from(23.0), true),
2673 ("pctl20", Value::try_from(21.5), true),
2674 ] {
2675 let mut spec =
2676 String::from("input a: Float32\noutput b := a.aggregate(over_discrete: 5, using: ");
2677 spec += aggr;
2678 spec += ")";
2679 if *default {
2680 spec += ".defaults(to:1337.0)"
2681 }
2682 let (_, eval, mut time) = setup_time(&spec);
2683 let mut eval = eval.into_evaluator();
2684 time += Duration::from_secs(45);
2685 let out_ref = StreamReference::Out(0);
2686 let in_ref = StreamReference::In(0);
2687 let n = 25;
2688 for v in 1..=n {
2689 accept_input_timed!(eval, in_ref, Value::try_from(v as f64).unwrap(), time);
2690 time += Duration::from_secs(1);
2691 }
2692 time += Duration::from_secs(1);
2693 eval_stream_timed!(eval, 0, vec![], time);
2695 let expected = exp.as_ref().unwrap().clone();
2696 assert_eq!(eval.peek_value(out_ref, &Vec::new(), 0).unwrap(), expected);
2697 }
2698 }
2699
2700 #[test]
2701 fn test_cases_window_discrete_signed() {
2702 for (aggr, exp, default) in &[
2703 ("sum", Signed(115), false),
2704 ("count", Unsigned(5), false),
2705 ("min", Signed(21), true),
2706 ("max", Signed(25), true),
2707 ("avg", Signed(23), true),
2708 ("integral", Value::try_from(92.0).unwrap(), false),
2709 ("last", Signed(25), true),
2710 ("med", Signed(23), true),
2711 ("pctl20", Signed(21), true),
2712 ] {
2713 let mut spec =
2714 String::from("input a: Int16\noutput b := a.aggregate(over_discrete: 5, using: ");
2715 spec += aggr;
2716 spec += ")";
2717 if *default {
2718 spec += ".defaults(to:1337)"
2719 }
2720 let (_, eval, mut time) = setup_time(&spec);
2721 let mut eval = eval.into_evaluator();
2722 time += Duration::from_secs(45);
2723 let out_ref = StreamReference::Out(0);
2724 let in_ref = StreamReference::In(0);
2725 let n = 25;
2726 for v in 1..=n {
2727 accept_input_timed!(eval, in_ref, Signed(v), time);
2728 time += Duration::from_secs(1);
2729 }
2730 time += Duration::from_secs(1);
2731 eval_stream_timed!(eval, 0, vec![], time);
2733 let expected = exp.clone();
2734 assert_eq!(eval.peek_value(out_ref, &Vec::new(), 0).unwrap(), expected);
2735 }
2736 }
2737
2738 #[test]
2739 fn test_cases_window_discrete_unsigned() {
2740 for (aggr, exp, default) in &[
2741 ("sum", Unsigned(115), false),
2742 ("count", Unsigned(5), false),
2743 ("min", Unsigned(21), true),
2744 ("max", Unsigned(25), true),
2745 ("avg", Unsigned(23), true),
2746 ("integral", Value::try_from(92.0).unwrap(), false),
2747 ("last", Unsigned(25), true),
2748 ("med", Unsigned(23), true),
2749 ("pctl20", Unsigned(21), true),
2750 ] {
2751 let mut spec =
2752 String::from("input a: UInt16\noutput b := a.aggregate(over_discrete: 5, using: ");
2753 spec += aggr;
2754 spec += ")";
2755 if *default {
2756 spec += ".defaults(to:1337)"
2757 }
2758 let (_, eval, mut time) = setup_time(&spec);
2759 let mut eval = eval.into_evaluator();
2760 time += Duration::from_secs(45);
2761 let out_ref = StreamReference::Out(0);
2762 let in_ref = StreamReference::In(0);
2763 let n = 25;
2764 for v in 1..=n {
2765 accept_input_timed!(eval, in_ref, Unsigned(v), time);
2766 time += Duration::from_secs(1);
2767 }
2768 time += Duration::from_secs(1);
2769 eval_stream_timed!(eval, 0, vec![], time);
2771 let expected = exp.clone();
2772 assert_eq!(eval.peek_value(out_ref, &Vec::new(), 0).unwrap(), expected);
2773 }
2774 }
2775
2776 #[test]
2777 fn test_filter() {
2778 let (_, eval, start) = setup(
2779 "input a: Int32\n\
2780 output b eval when a == 42 with a + 8",
2781 );
2782 let mut eval = eval.into_evaluator();
2783 let out_ref = StreamReference::Out(0);
2784 let in_ref = StreamReference::In(0);
2785 accept_input!(eval, start, in_ref, Signed(15));
2786 eval_stream!(eval, start, out_ref.out_ix(), vec![]);
2787 assert_eq!(eval.peek_value(out_ref, &Vec::new(), 0), Option::None);
2788
2789 accept_input!(eval, start, in_ref, Signed(42));
2790 eval_stream!(eval, start, out_ref.out_ix(), vec![]);
2791 assert_eq!(
2792 eval.peek_value(out_ref, &Vec::new(), 0).unwrap(),
2793 Signed(50)
2794 );
2795 }
2796
2797 #[test]
2798 fn test_spawn_eventbased() {
2799 let (_, eval, start) = setup(
2800 "input a: Int32\n\
2801 output b(x: Int32) spawn with a eval with x + a",
2802 );
2803 let mut eval = eval.into_evaluator();
2804 let out_ref = StreamReference::Out(0);
2805 let in_ref = StreamReference::In(0);
2806 accept_input!(eval, start, in_ref, Signed(15));
2807 spawn_stream!(eval, start, out_ref);
2808
2809 assert!(stream_has_instance!(eval, out_ref, vec![Signed(15)]));
2810
2811 eval_stream_instances!(eval, start, out_ref);
2812 assert_eq!(
2813 eval.peek_value(out_ref, &[Signed(15)], 0).unwrap(),
2814 Signed(30)
2815 );
2816 }
2817
2818 #[test]
2819 fn test_spawn_timedriven() {
2820 let (_, eval, mut time) = setup_time(
2821 "input a: Int32\n\
2822 output b(x: Int32) spawn with a eval @1Hz with x + a.hold(or: 42)",
2823 );
2824 let mut eval = eval.into_evaluator();
2825 let out_ref = StreamReference::Out(0);
2826 let in_ref = StreamReference::In(0);
2827
2828 time += Duration::from_secs(5);
2829 accept_input_timed!(eval, in_ref, Signed(15), time);
2830 spawn_stream_timed!(eval, time, out_ref);
2831 assert!(stream_has_instance!(eval, out_ref, vec![Signed(15)]));
2832
2833 time += Duration::from_secs(1);
2834 let mut schedule = eval.dyn_schedule.borrow_mut();
2835 let next_due = schedule.get_next_deadline_due().unwrap();
2836 let next_deadline = schedule.get_next_deadline(time).unwrap();
2837 assert_eq!(next_due, Duration::from_secs(6));
2838 assert_eq!(
2839 next_deadline,
2840 DynamicDeadline {
2841 due: Duration::from_secs(6),
2842 tasks: vec![EvaluationTask::Evaluate(out_ref.out_ix(), vec![Signed(15)])]
2843 }
2844 );
2845
2846 eval_stream_timed!(eval, out_ref.out_ix(), vec![Signed(15)], time);
2847 assert_eq!(
2848 eval.peek_value(out_ref, &[Signed(15)], 0).unwrap(),
2849 Signed(30)
2850 );
2851 }
2852
2853 #[test]
2854 fn test_spawn_eventbased_unit() {
2855 let (_, eval, start) = setup(
2856 "input a: Int32\n\
2857 output b spawn when a == 42 eval with a",
2858 );
2859 let mut eval = eval.into_evaluator();
2860 let out_ref = StreamReference::Out(0);
2861 let in_ref = StreamReference::In(0);
2862 accept_input!(eval, start, in_ref, Signed(15));
2863 spawn_stream!(eval, start, out_ref);
2864
2865 assert!(!stream_has_instance!(eval, out_ref, Vec::<Value>::new()));
2866
2867 accept_input!(eval, start, in_ref, Signed(42));
2868 spawn_stream!(eval, start, out_ref);
2869
2870 assert!(stream_has_instance!(eval, out_ref, Vec::<Value>::new()));
2871
2872 eval_stream_instances!(eval, start, out_ref);
2873 assert_eq!(
2874 eval.peek_value(out_ref, &Vec::<Value>::new(), 0).unwrap(),
2875 Signed(42)
2876 );
2877 }
2878
2879 #[test]
2880 fn test_spawn_timedriven_unit() {
2881 let (_, eval, mut time) = setup_time(
2882 "input a: Int32\n\
2883 output b spawn when a == 42 eval @1Hz with a.hold(or: 42)",
2884 );
2885 let mut eval = eval.into_evaluator();
2886 let out_ref = StreamReference::Out(0);
2887 let in_ref = StreamReference::In(0);
2888
2889 time += Duration::from_secs(5);
2890 accept_input_timed!(eval, in_ref, Signed(15), time);
2891 spawn_stream_timed!(eval, time, out_ref);
2892 assert!(!stream_has_instance!(eval, out_ref, Vec::<Value>::new()));
2893
2894 time += Duration::from_secs(5);
2895 accept_input_timed!(eval, in_ref, Signed(42), time);
2896 spawn_stream_timed!(eval, time, out_ref);
2897 assert!(stream_has_instance!(eval, out_ref, Vec::<Value>::new()));
2898
2899 time += Duration::from_secs(1);
2900 let mut schedule = eval.dyn_schedule.borrow_mut();
2901 let next_due = schedule.get_next_deadline_due().unwrap();
2902 let next_deadline = schedule.get_next_deadline(time).unwrap();
2903 assert_eq!(next_due, Duration::from_secs(11));
2904 assert_eq!(
2905 next_deadline,
2906 DynamicDeadline {
2907 due: Duration::from_secs(11),
2908 tasks: vec![EvaluationTask::Evaluate(out_ref.out_ix(), vec![])]
2909 }
2910 );
2911
2912 eval_stream_timed!(eval, out_ref.out_ix(), vec![], time);
2913 assert_eq!(eval.peek_value(out_ref, &[], 0).unwrap(), Signed(42));
2914 }
2915
2916 #[test]
2918 fn test_spawn_window_unit() {
2919 let (_, eval, mut time) = setup_time(
2920 "input a: Int32\n\
2921 output b spawn when a == 42 eval with a\n\
2922 output c @1Hz := b.aggregate(over: 1s, using: sum)",
2923 );
2924 let mut eval = eval.into_evaluator();
2925 let b_ref = StreamReference::Out(0);
2926 let c_ref = StreamReference::Out(1);
2927 let in_ref = StreamReference::In(0);
2928
2929 time += Duration::from_secs(1);
2931 accept_input_timed!(eval, in_ref, Signed(15), time);
2932 spawn_stream_timed!(eval, time, b_ref);
2933 assert!(!stream_has_instance!(eval, b_ref, Vec::<Value>::new()));
2934 eval_stream_timed!(eval, c_ref.out_ix(), vec![], time);
2935 assert_eq!(eval.peek_value(c_ref, &[], 0).unwrap(), Signed(0));
2936
2937 time += Duration::from_millis(200);
2939 accept_input_timed!(eval, in_ref, Signed(42), time);
2940 spawn_stream_timed!(eval, time, b_ref);
2941 assert!(stream_has_instance!(eval, b_ref, Vec::<Value>::new()));
2942 eval_stream_instances_timed!(eval, time, b_ref);
2943 assert_eq!(eval.peek_value(b_ref, &[], 0).unwrap(), Signed(42));
2944
2945 time += Duration::from_millis(200);
2947 accept_input_timed!(eval, in_ref, Signed(18), time);
2948 eval_stream_instances_timed!(eval, time, b_ref);
2949 assert_eq!(eval.peek_value(b_ref, &[], 0).unwrap(), Signed(18));
2950
2951 time += Duration::from_millis(200);
2953 accept_input_timed!(eval, in_ref, Signed(17), time);
2954 eval_stream_instances_timed!(eval, time, b_ref);
2955 assert_eq!(eval.peek_value(b_ref, &[], 0).unwrap(), Signed(17));
2956
2957 time += Duration::from_millis(400);
2959 accept_input_timed!(eval, in_ref, Signed(3), time);
2960 eval_stream_instances_timed!(eval, time, b_ref);
2961 assert_eq!(eval.peek_value(b_ref, &[], 0).unwrap(), Signed(3));
2962 eval_stream_timed!(eval, c_ref.out_ix(), vec![], time);
2963 assert_eq!(eval.peek_value(c_ref, &[], 0).unwrap(), Signed(80));
2964 }
2965
2966 #[test]
2967 fn test_both_parameterized_window() {
2968 let (_, eval, mut time) = setup_time(
2969 "input a: Int32\n\
2970 output b(p) spawn with a eval with p+a\n\
2971 output c(p) spawn with a eval @1Hz with b(p).aggregate(over: 2s, using: sum)",
2972 );
2973 let mut eval = eval.into_evaluator();
2974 let b_ref = StreamReference::Out(0);
2975 let c_ref = StreamReference::Out(1);
2976 let in_ref = StreamReference::In(0);
2977
2978 time += Duration::from_secs(1);
2979 accept_input_timed!(eval, in_ref, Signed(15), time);
2980 spawn_stream_timed!(eval, time, b_ref);
2981 spawn_stream_timed!(eval, time, c_ref);
2982 assert!(stream_has_instance!(eval, b_ref, vec![Signed(15)]));
2983 assert!(stream_has_instance!(eval, c_ref, vec![Signed(15)]));
2984 eval_stream_timed!(eval, b_ref.out_ix(), vec![Signed(15)], time);
2985 eval_stream_timed!(eval, c_ref.out_ix(), vec![Signed(15)], time);
2986 assert_eq!(
2987 eval.peek_value(b_ref, &[Signed(15)], 0).unwrap(),
2988 Signed(30)
2989 );
2990 assert_eq!(
2991 eval.peek_value(c_ref, &[Signed(15)], 0).unwrap(),
2992 Signed(30)
2993 );
2994
2995 time += Duration::from_secs(1);
2996 accept_input_timed!(eval, in_ref, Signed(5), time);
2997 spawn_stream_timed!(eval, time, b_ref);
2998 spawn_stream_timed!(eval, time, c_ref);
2999 assert!(stream_has_instance!(eval, b_ref, vec![Signed(5)]));
3000 assert!(stream_has_instance!(eval, c_ref, vec![Signed(5)]));
3001 eval_stream_timed!(eval, b_ref.out_ix(), vec![Signed(5)], time);
3002 eval_stream_timed!(eval, c_ref.out_ix(), vec![Signed(5)], time);
3003 eval_stream_timed!(eval, b_ref.out_ix(), vec![Signed(15)], time);
3004 eval_stream_timed!(eval, c_ref.out_ix(), vec![Signed(15)], time);
3005 assert_eq!(eval.peek_value(b_ref, &[Signed(5)], 0).unwrap(), Signed(10));
3006 assert_eq!(eval.peek_value(c_ref, &[Signed(5)], 0).unwrap(), Signed(10));
3007 assert_eq!(
3008 eval.peek_value(b_ref, &[Signed(15)], 0).unwrap(),
3009 Signed(20)
3010 );
3011 assert_eq!(
3012 eval.peek_value(c_ref, &[Signed(15)], 0).unwrap(),
3013 Signed(50)
3014 );
3015
3016 time += Duration::from_secs(1);
3017 accept_input_timed!(eval, in_ref, Signed(5), time);
3018 eval_stream_timed!(eval, b_ref.out_ix(), vec![Signed(5)], time);
3019 eval_stream_timed!(eval, c_ref.out_ix(), vec![Signed(5)], time);
3020 eval_stream_timed!(eval, b_ref.out_ix(), vec![Signed(15)], time);
3021 eval_stream_timed!(eval, c_ref.out_ix(), vec![Signed(15)], time);
3022 assert_eq!(eval.peek_value(b_ref, &[Signed(5)], 0).unwrap(), Signed(10));
3023 assert_eq!(eval.peek_value(c_ref, &[Signed(5)], 0).unwrap(), Signed(20));
3024 assert_eq!(
3025 eval.peek_value(b_ref, &[Signed(15)], 0).unwrap(),
3026 Signed(20)
3027 );
3028 assert_eq!(
3029 eval.peek_value(c_ref, &[Signed(15)], 0).unwrap(),
3030 Signed(40)
3031 );
3032 }
3033
3034 #[test]
3035 fn test_spawn_window_unit3() {
3036 let (_, eval, mut time) = setup_time(
3037 "input a: Int32\n\
3038 output b(p) spawn with a eval with p+a\n\
3039 output c spawn when a == 5 eval @1Hz with b(15).aggregate(over: 2s, using: sum)",
3040 );
3041 let mut eval = eval.into_evaluator();
3042 let b_ref = StreamReference::Out(0);
3043 let c_ref = StreamReference::Out(1);
3044 let in_ref = StreamReference::In(0);
3045 let empty: Vec<Value> = vec![];
3046
3047 time += Duration::from_secs(1);
3048 accept_input_timed!(eval, in_ref, Signed(15), time);
3049 spawn_stream_timed!(eval, time, b_ref);
3050 spawn_stream_timed!(eval, time, c_ref);
3051 assert!(stream_has_instance!(eval, b_ref, vec![Signed(15)]));
3052 assert!(!stream_has_instance!(eval, c_ref, empty));
3053 eval_stream_timed!(eval, b_ref.out_ix(), vec![Signed(15)], time);
3054 assert_eq!(
3055 eval.peek_value(b_ref, &[Signed(15)], 0).unwrap(),
3056 Signed(30)
3057 );
3058
3059 time += Duration::from_secs(1);
3060 eval.new_cycle(time);
3061 accept_input_timed!(eval, in_ref, Signed(5), time);
3062 spawn_stream_timed!(eval, time, b_ref);
3063 spawn_stream_timed!(eval, time, c_ref);
3064 assert!(stream_has_instance!(eval, b_ref, vec![Signed(5)]));
3065 assert!(stream_has_instance!(eval, c_ref, empty));
3066 eval_stream_timed!(eval, b_ref.out_ix(), vec![Signed(5)], time);
3067 eval_stream_timed!(eval, b_ref.out_ix(), vec![Signed(15)], time);
3068 eval_stream_timed!(eval, c_ref.out_ix(), vec![], time);
3069 assert_eq!(eval.peek_value(b_ref, &[Signed(5)], 0).unwrap(), Signed(10));
3070 assert_eq!(
3071 eval.peek_value(b_ref, &[Signed(15)], 0).unwrap(),
3072 Signed(20)
3073 );
3074 assert_eq!(eval.peek_value(c_ref, &[], 0).unwrap(), Signed(20));
3075
3076 time += Duration::from_secs(1);
3077 eval.new_cycle(time);
3078 accept_input_timed!(eval, in_ref, Signed(5), time);
3079 eval_stream_timed!(eval, b_ref.out_ix(), vec![Signed(5)], time);
3080 eval_stream_timed!(eval, b_ref.out_ix(), vec![Signed(15)], time);
3081 eval_stream_timed!(eval, c_ref.out_ix(), vec![], time);
3082 assert_eq!(eval.peek_value(b_ref, &[Signed(5)], 0).unwrap(), Signed(10));
3083 assert_eq!(
3084 eval.peek_value(b_ref, &[Signed(15)], 0).unwrap(),
3085 Signed(20)
3086 );
3087 assert_eq!(eval.peek_value(c_ref, &[], 0).unwrap(), Signed(40));
3088 }
3089
3090 #[test]
3092 fn test_spawn_window_unit2() {
3093 let (_, eval, mut time) = setup_time(
3094 "input a: Int32\n\
3095 output b spawn when a == 42 eval with a\n\
3096 output c @1Hz := b.aggregate(over: 1s, using: sum)",
3097 );
3098 let mut eval = eval.into_evaluator();
3099 let b_ref = StreamReference::Out(0);
3100 let c_ref = StreamReference::Out(1);
3101 let in_ref = StreamReference::In(0);
3102
3103 time += Duration::from_millis(200);
3105 accept_input_timed!(eval, in_ref, Signed(42), time);
3106 spawn_stream_timed!(eval, time, b_ref);
3107 assert!(stream_has_instance!(eval, b_ref, Vec::<Value>::new()));
3108 eval_stream_instances_timed!(eval, time, b_ref);
3109 assert_eq!(eval.peek_value(b_ref, &[], 0).unwrap(), Signed(42));
3110
3111 time += Duration::from_millis(200);
3113 accept_input_timed!(eval, in_ref, Signed(18), time);
3114 eval_stream_instances_timed!(eval, time, b_ref);
3115 assert_eq!(eval.peek_value(b_ref, &[], 0).unwrap(), Signed(18));
3116
3117 time += Duration::from_millis(200);
3119 accept_input_timed!(eval, in_ref, Signed(17), time);
3120 eval_stream_instances_timed!(eval, time, b_ref);
3121 assert_eq!(eval.peek_value(b_ref, &[], 0).unwrap(), Signed(17));
3122
3123 time += Duration::from_millis(400);
3125 accept_input_timed!(eval, in_ref, Signed(3), time);
3126 eval_stream_instances_timed!(eval, time, b_ref);
3127 assert_eq!(eval.peek_value(b_ref, &[], 0).unwrap(), Signed(3));
3128 eval_stream_timed!(eval, c_ref.out_ix(), vec![], time);
3129 assert_eq!(eval.peek_value(c_ref, &[], 0).unwrap(), Signed(80));
3130 }
3131
3132 #[test]
3135 fn test_spawn_window_parameterized() {
3136 let (_, eval, mut time) = setup_time(
3137 "input a: Int32\n\
3138 output b(p: Bool) spawn with a == 42 eval when !p || a == 42 with a\n\
3139 output c @1Hz := b(false).aggregate(over: 1s, using: sum)\n\
3140 output d @1Hz := b(true).aggregate(over: 1s, using: sum)",
3141 );
3142 let mut eval = eval.into_evaluator();
3143 let b_ref = StreamReference::Out(0);
3144 let c_ref = StreamReference::Out(1);
3145 let d_ref = StreamReference::Out(2);
3146 let in_ref = StreamReference::In(0);
3147
3148 time += Duration::from_millis(500);
3150 accept_input_timed!(eval, in_ref, Signed(15), time);
3151 spawn_stream_timed!(eval, time, b_ref);
3152 assert!(stream_has_instance!(eval, b_ref, vec![Bool(false)]));
3153 assert!(!stream_has_instance!(eval, b_ref, vec![Bool(true)]));
3154 eval_stream_instances_timed!(eval, time, b_ref);
3155 assert_eq!(
3156 eval.peek_value(b_ref, &[Bool(false)], 0).unwrap(),
3157 Signed(15)
3158 );
3159
3160 time += Duration::from_millis(500);
3164 accept_input_timed!(eval, in_ref, Signed(42), time);
3165 spawn_stream_timed!(eval, time, b_ref);
3166 assert!(stream_has_instance!(eval, b_ref, vec![Bool(true)]));
3167 eval_stream_instances_timed!(eval, time, b_ref);
3168 assert_eq!(
3169 eval.peek_value(b_ref, &[Bool(false)], 0).unwrap(),
3170 Signed(42)
3171 );
3172 assert_eq!(
3173 eval.peek_value(b_ref, &[Bool(true)], 0).unwrap(),
3174 Signed(42)
3175 );
3176 eval_stream_timed!(eval, c_ref.out_ix(), vec![], time);
3177 assert_eq!(eval.peek_value(c_ref, &[], 0).unwrap(), Signed(57));
3178 eval_stream_timed!(eval, d_ref.out_ix(), vec![], time);
3179 assert_eq!(eval.peek_value(d_ref, &[], 0).unwrap(), Signed(42));
3180
3181 time += Duration::from_secs(1);
3185 accept_input_timed!(eval, in_ref, Signed(42), time);
3186 eval_stream_instances_timed!(eval, time, b_ref);
3187 assert_eq!(
3188 eval.peek_value(b_ref, &[Bool(false)], 0).unwrap(),
3189 Signed(42)
3190 );
3191 assert_eq!(
3192 eval.peek_value(b_ref, &[Bool(true)], 0).unwrap(),
3193 Signed(42)
3194 );
3195 eval_stream_timed!(eval, c_ref.out_ix(), vec![], time);
3196 assert_eq!(eval.peek_value(c_ref, &[], 0).unwrap(), Signed(42));
3197 eval_stream_timed!(eval, d_ref.out_ix(), vec![], time);
3198 assert_eq!(eval.peek_value(d_ref, &[], 0).unwrap(), Signed(42));
3199 }
3200
3201 #[test]
3202 fn test_close_parameterized() {
3203 let (_, eval, start) = setup(
3204 "input a: Int32\n\
3205 input b: Bool\n\
3206 output c(x: Int32) spawn with a close when b && (x % 2 == 0) eval with x + a",
3207 );
3208 let mut eval = eval.into_evaluator();
3209 let out_ref = StreamReference::Out(0);
3210 let a_ref = StreamReference::In(0);
3211 let b_ref = StreamReference::In(1);
3212 accept_input!(eval, start, a_ref, Signed(15));
3213 spawn_stream!(eval, start, out_ref);
3214
3215 assert!(stream_has_instance!(eval, out_ref, vec![Signed(15)]));
3216
3217 eval_stream_instances!(eval, start, out_ref);
3218 assert_eq!(
3219 eval.peek_value(out_ref, &[Signed(15)], 0).unwrap(),
3220 Signed(30)
3221 );
3222
3223 accept_input!(eval, start, b_ref, Bool(false));
3224 accept_input!(eval, start, a_ref, Signed(8));
3225 spawn_stream!(eval, start, out_ref);
3226
3227 assert!(stream_has_instance!(eval, out_ref, vec![Signed(15)]));
3228 assert!(stream_has_instance!(eval, out_ref, vec![Signed(8)]));
3229
3230 eval_stream_instances!(eval, start, out_ref);
3231 assert_eq!(
3232 eval.peek_value(out_ref, &[Signed(15)], 0).unwrap(),
3233 Signed(23)
3234 );
3235 assert_eq!(
3236 eval.peek_value(out_ref, &[Signed(8)], 0).unwrap(),
3237 Signed(16)
3238 );
3239
3240 eval_close!(eval, start, out_ref, vec![Signed(15)]);
3242 eval_close!(eval, start, out_ref, vec![Signed(8)]);
3243 assert!(stream_has_instance!(eval, out_ref, vec![Signed(15)]));
3244 assert!(stream_has_instance!(eval, out_ref, vec![Signed(8)]));
3245
3246 accept_input!(eval, start, b_ref, Bool(true));
3247
3248 eval_close!(eval, start, out_ref, vec![Signed(15)]);
3249 eval_close!(eval, start, out_ref, vec![Signed(8)]);
3250 assert!(stream_has_instance!(eval, out_ref, vec![Signed(15)]));
3251 assert!(!stream_has_instance!(eval, out_ref, vec![Signed(8)]));
3252 assert!(eval.peek_value(out_ref, &[Signed(8)], 0).is_none());
3253 }
3254
3255 #[test]
3256 fn test_close_unit() {
3257 let (_, eval, start) = setup(
3258 "input a: Int32\n\
3259 input b: Bool\n\
3260 output c spawn when a = 42 close when b eval with a",
3261 );
3262 let mut eval = eval.into_evaluator();
3263 let out_ref = StreamReference::Out(0);
3264 let a_ref = StreamReference::In(0);
3265 let b_ref = StreamReference::In(1);
3266 accept_input!(eval, start, a_ref, Signed(42));
3267 spawn_stream!(eval, start, out_ref);
3268
3269 assert!(stream_has_instance!(eval, out_ref, Vec::<Value>::new()));
3270
3271 eval_stream_instances!(eval, start, out_ref);
3272 assert_eq!(
3273 eval.peek_value(out_ref, &Vec::<Value>::new(), 0).unwrap(),
3274 Signed(42)
3275 );
3276
3277 accept_input!(eval, start, b_ref, Bool(false));
3278 accept_input!(eval, start, a_ref, Signed(8));
3279
3280 eval_stream_instances!(eval, start, out_ref);
3281 assert_eq!(
3282 eval.peek_value(out_ref, &Vec::<Value>::new(), 0).unwrap(),
3283 Signed(8)
3284 );
3285
3286 eval_close!(eval, start, out_ref, Vec::<Value>::new());
3288 assert!(stream_has_instance!(eval, out_ref, Vec::<Value>::new()));
3289
3290 accept_input!(eval, start, b_ref, Bool(true));
3291
3292 eval_close!(eval, start, out_ref, Vec::<Value>::new());
3293 assert!(!stream_has_instance!(eval, out_ref, Vec::<Value>::new()));
3294 assert!(eval.peek_value(out_ref, &Vec::<Value>::new(), 0).is_none());
3295 }
3296
3297 #[test]
3298 fn test_close_selfref_unit() {
3299 let (_, eval, start) = setup(
3300 "input a: Int32\n\
3301 output c spawn when a = 42 close when c = 1337 eval with a",
3302 );
3303 let mut eval = eval.into_evaluator();
3304 let out_ref = StreamReference::Out(0);
3305 let a_ref = StreamReference::In(0);
3306 accept_input!(eval, start, a_ref, Signed(42));
3307 spawn_stream!(eval, start, out_ref);
3308
3309 assert!(stream_has_instance!(eval, out_ref, Vec::<Value>::new()));
3310
3311 eval_stream_instances!(eval, start, out_ref);
3312 assert_eq!(
3313 eval.peek_value(out_ref, &Vec::<Value>::new(), 0).unwrap(),
3314 Signed(42)
3315 );
3316
3317 accept_input!(eval, start, a_ref, Signed(1337));
3318
3319 eval_stream_instances!(eval, start, out_ref);
3320 assert_eq!(
3321 eval.peek_value(out_ref, &Vec::<Value>::new(), 0).unwrap(),
3322 Signed(1337)
3323 );
3324
3325 eval_close!(eval, start, out_ref, Vec::<Value>::new());
3326 assert!(!stream_has_instance!(eval, out_ref, Vec::<Value>::new()));
3327 assert!(eval.peek_value(out_ref, &Vec::<Value>::new(), 0).is_none());
3328 }
3329
3330 #[test]
3331 fn test_close_selfref_parameter() {
3332 let (_, eval, start) = setup(
3333 "input a: Int32\n\
3334 output c(p: Int32) spawn with a close when c(p) = 1337 eval with p+a",
3335 );
3336 let mut eval = eval.into_evaluator();
3337 let out_ref = StreamReference::Out(0);
3338 let a_ref = StreamReference::In(0);
3339 accept_input!(eval, start, a_ref, Signed(15));
3340 spawn_stream!(eval, start, out_ref);
3341
3342 assert!(stream_has_instance!(eval, out_ref, vec![Signed(15)]));
3343
3344 eval_stream_instances!(eval, start, out_ref);
3345 assert_eq!(
3346 eval.peek_value(out_ref, &[Signed(15)], 0).unwrap(),
3347 Signed(30)
3348 );
3349
3350 accept_input!(eval, start, a_ref, Signed(1322));
3351 spawn_stream!(eval, start, out_ref);
3352 assert!(stream_has_instance!(eval, out_ref, vec![Signed(1322)]));
3353
3354 eval_stream_instances!(eval, start, out_ref);
3355 assert_eq!(
3356 eval.peek_value(out_ref, &[Signed(15)], 0).unwrap(),
3357 Signed(1337)
3358 );
3359 assert_eq!(
3360 eval.peek_value(out_ref, &[Signed(1322)], 0).unwrap(),
3361 Signed(2644)
3362 );
3363
3364 eval_close!(eval, start, out_ref, vec![Signed(15)]);
3365 eval_close!(eval, start, out_ref, vec![Signed(1322)]);
3366 assert!(!stream_has_instance!(eval, out_ref, vec![Signed(15)]));
3367 assert!(stream_has_instance!(eval, out_ref, vec![Signed(1322)]));
3368 assert!(eval.peek_value(out_ref, &[Signed(15)], 0).is_none());
3369 }
3370
3371 #[test]
3374 fn test_close_window_parameterized() {
3375 let (_, eval, mut time) = setup_time(
3376 "input a: Int32\n\
3377 output b(p: Bool) spawn with a == 42 close when b(p) == 1337 eval when !p || a == 42 with a\n\
3378 output c @1Hz := b(false).aggregate(over: 1s, using: sum)\n\
3379 output d @1Hz := b(true).aggregate(over: 1s, using: sum)",
3380 );
3381 let mut eval = eval.into_evaluator();
3382 let b_ref = StreamReference::Out(0);
3383 let c_ref = StreamReference::Out(1);
3384 let d_ref = StreamReference::Out(2);
3385 let in_ref = StreamReference::In(0);
3386
3387 time += Duration::from_millis(500);
3389 eval.new_cycle(time);
3390 accept_input_timed!(eval, in_ref, Signed(15), time);
3391 spawn_stream_timed!(eval, time, b_ref);
3392 assert!(stream_has_instance!(eval, b_ref, vec![Bool(false)]));
3393 assert!(!stream_has_instance!(eval, b_ref, vec![Bool(true)]));
3394 eval_stream_instances_timed!(eval, time, b_ref);
3395 assert_eq!(
3396 eval.peek_value(b_ref, &[Bool(false)], 0).unwrap(),
3397 Signed(15)
3398 );
3399
3400 time += Duration::from_millis(500);
3404 eval.new_cycle(time);
3405 accept_input_timed!(eval, in_ref, Signed(42), time);
3406 spawn_stream_timed!(eval, time, b_ref);
3407 assert!(stream_has_instance!(eval, b_ref, vec![Bool(true)]));
3408 eval_stream_instances_timed!(eval, time, b_ref);
3409 assert_eq!(
3410 eval.peek_value(b_ref, &[Bool(false)], 0).unwrap(),
3411 Signed(42)
3412 );
3413 assert_eq!(
3414 eval.peek_value(b_ref, &[Bool(true)], 0).unwrap(),
3415 Signed(42)
3416 );
3417 eval_stream_timed!(eval, c_ref.out_ix(), vec![], time);
3418 assert_eq!(eval.peek_value(c_ref, &[], 0).unwrap(), Signed(57));
3419 eval_stream_timed!(eval, d_ref.out_ix(), vec![], time);
3420 assert_eq!(eval.peek_value(d_ref, &[], 0).unwrap(), Signed(42));
3421
3422 time += Duration::from_millis(500);
3425 eval.new_cycle(time);
3426 accept_input_timed!(eval, in_ref, Signed(1337), time);
3427 eval.prepare_evaluation(time);
3428 eval_stream_instances_timed!(eval, time, b_ref);
3429 assert_eq!(
3430 eval.peek_value(b_ref, &[Bool(false)], 0).unwrap(),
3431 Signed(1337)
3432 );
3433 assert_eq!(
3434 eval.peek_value(b_ref, &[Bool(true)], 0).unwrap(),
3435 Signed(42)
3436 );
3437 eval_close_timed!(eval, time, b_ref, &vec![Bool(false)]);
3438 eval_close_timed!(eval, time, b_ref, &vec![Bool(true)]);
3439 assert!(!stream_has_instance!(eval, b_ref, vec![Bool(false)]));
3440 assert!(stream_has_instance!(eval, b_ref, vec![Bool(true)]));
3441
3442 time += Duration::from_millis(500);
3444 eval.new_cycle(time);
3445 eval.prepare_evaluation(time);
3446 eval_stream_timed!(eval, c_ref.out_ix(), vec![], time);
3447 assert_eq!(eval.peek_value(c_ref, &[], 0).unwrap(), Signed(1337));
3448 eval_stream_timed!(eval, d_ref.out_ix(), vec![], time);
3449 assert_eq!(eval.peek_value(d_ref, &[], 0).unwrap(), Signed(0));
3450
3451 time += Duration::from_millis(550);
3453 eval.new_cycle(time);
3454 eval.prepare_evaluation(time);
3455 let window = eval.ir.sliding_windows[0].reference;
3456 assert!(eval
3457 .global_store
3458 .get_window_collection_mut(window)
3459 .window(&[Bool(false)])
3460 .is_none())
3461 }
3462
3463 #[test]
3464 fn test_close_window_unit() {
3465 let (_, eval, mut time) = setup_time(
3466 "input a: Int32\n\
3467 output b spawn when a == 42 close when a = 1337 eval with a\n\
3468 output c @1Hz := b.aggregate(over: 1s, using: sum)",
3469 );
3470 let mut eval = eval.into_evaluator();
3471 let b_ref = StreamReference::Out(0);
3472 let c_ref = StreamReference::Out(1);
3473 let in_ref = StreamReference::In(0);
3474
3475 time += Duration::from_millis(200);
3477 accept_input_timed!(eval, in_ref, Signed(42), time);
3478 spawn_stream_timed!(eval, time, b_ref);
3479 assert!(stream_has_instance!(eval, b_ref, Vec::<Value>::new()));
3480 eval_stream_instances_timed!(eval, time, b_ref);
3481 assert_eq!(eval.peek_value(b_ref, &[], 0).unwrap(), Signed(42));
3482
3483 time += Duration::from_millis(200);
3485 accept_input_timed!(eval, in_ref, Signed(18), time);
3486 eval_stream_instances_timed!(eval, time, b_ref);
3487 assert_eq!(eval.peek_value(b_ref, &[], 0).unwrap(), Signed(18));
3488
3489 time += Duration::from_millis(200);
3491 accept_input_timed!(eval, in_ref, Signed(17), time);
3492 eval_stream_instances_timed!(eval, time, b_ref);
3493 assert_eq!(eval.peek_value(b_ref, &[], 0).unwrap(), Signed(17));
3494
3495 time += Duration::from_millis(400);
3497 accept_input_timed!(eval, in_ref, Signed(3), time);
3498 eval_stream_instances_timed!(eval, time, b_ref);
3499 assert_eq!(eval.peek_value(b_ref, &[], 0).unwrap(), Signed(3));
3500 eval_stream_timed!(eval, c_ref.out_ix(), vec![], time);
3501 assert_eq!(eval.peek_value(c_ref, &[], 0).unwrap(), Signed(80));
3502
3503 time += Duration::from_millis(500);
3505 accept_input_timed!(eval, in_ref, Signed(1337), time);
3506 eval_stream_instances_timed!(eval, time, b_ref);
3507 assert_eq!(eval.peek_value(b_ref, &[], 0).unwrap(), Signed(1337));
3508 eval_close_timed!(eval, time, b_ref, &vec![]);
3509 assert!(!stream_has_instance!(eval, b_ref, Vec::<Value>::new()));
3510
3511 time += Duration::from_millis(500);
3513 eval.prepare_evaluation(time);
3514 eval_stream_timed!(eval, c_ref.out_ix(), vec![], time);
3515 assert_eq!(eval.peek_value(c_ref, &[], 0).unwrap(), Signed(1337));
3516 }
3517
3518 #[test]
3519 fn test_optional_tuple_access() {
3520 let (_, eval, start) = setup(
3521 "input a: (UInt, (Bool, Float))\n\
3522 output b := a.offset(by: -1).1.0.defaults(to: false)",
3523 );
3524 let mut eval = eval.into_evaluator();
3525 let out_ref = StreamReference::Out(0);
3526 let a_ref = StreamReference::In(0);
3527 accept_input!(
3528 eval,
3529 start,
3530 a_ref,
3531 Tuple(Box::new([
3532 Unsigned(42),
3533 Tuple(Box::new([Bool(true), Float(NotNan::new(1.5).unwrap())]))
3534 ]))
3535 );
3536
3537 eval_stream_instances!(eval, start, out_ref);
3538 assert_eq!(eval.peek_value(out_ref, &[], 0).unwrap(), Bool(false));
3539
3540 accept_input!(
3541 eval,
3542 start,
3543 a_ref,
3544 Tuple(Box::new([
3545 Unsigned(13),
3546 Tuple(Box::new([Bool(false), Float(NotNan::new(42.0).unwrap())]))
3547 ]))
3548 );
3549
3550 eval_stream_instances!(eval, start, out_ref);
3551 assert_eq!(eval.peek_value(out_ref, &[], 0).unwrap(), Bool(true));
3552 }
3553
3554 #[test]
3555 fn test_instance_aggr_all_all() {
3556 let (_, eval, start) = setup(
3557 "input a: Int32\n\
3558 output b(x: Int32) \
3559 spawn with a \
3560 eval @a with x % 2 = 0 \
3561 close when a == 42 \
3562 output c @a := b.aggregate(over_instances: all, using: forall)",
3563 );
3564 let mut eval = eval.into_evaluator();
3565 let b_ref = StreamReference::Out(0);
3566 let c_ref = StreamReference::Out(1);
3567 let in_ref = StreamReference::In(0);
3568 accept_input!(eval, start, in_ref, Signed(16));
3569 spawn_stream!(eval, start, b_ref);
3570 eval_stream_instances!(eval, start, b_ref);
3571 eval_stream_instances!(eval, start, c_ref);
3572
3573 assert!(stream_has_instance!(eval, b_ref, vec![Signed(16)]));
3574
3575 assert_eq!(
3576 eval.peek_value(b_ref, &[Signed(16)], 0).unwrap(),
3577 Bool(true)
3578 );
3579 assert_eq!(eval.peek_value(c_ref, &[], 0).unwrap(), Bool(true));
3580
3581 let mut time = start + Duration::from_secs(1);
3582
3583 accept_input!(eval, time, in_ref, Signed(7));
3584 spawn_stream!(eval, time, b_ref);
3585 eval_stream_instances!(eval, time, b_ref);
3586 eval_stream_instances!(eval, time, c_ref);
3587
3588 assert!(stream_has_instance!(eval, b_ref, vec![Signed(7)]));
3589
3590 assert_eq!(
3591 eval.peek_value(b_ref, &[Signed(16)], 0).unwrap(),
3592 Bool(true)
3593 );
3594 assert_eq!(
3595 eval.peek_value(b_ref, &[Signed(7)], 0).unwrap(),
3596 Bool(false)
3597 );
3598 assert_eq!(eval.peek_value(c_ref, &[], 0).unwrap(), Bool(false));
3599
3600 time += Duration::from_secs(1);
3601
3602 accept_input!(eval, time, in_ref, Signed(42));
3603 spawn_stream!(eval, time, b_ref);
3604 eval_stream_instances!(eval, time, b_ref);
3605 eval_stream_instances!(eval, time, c_ref);
3606
3607 assert!(stream_has_instance!(eval, b_ref, vec![Signed(42)]));
3608
3609 assert_eq!(
3610 eval.peek_value(b_ref, &[Signed(16)], 0).unwrap(),
3611 Bool(true)
3612 );
3613 assert_eq!(
3614 eval.peek_value(b_ref, &[Signed(7)], 0).unwrap(),
3615 Bool(false)
3616 );
3617 assert_eq!(
3618 eval.peek_value(b_ref, &[Signed(42)], 0).unwrap(),
3619 Bool(true)
3620 );
3621 assert_eq!(eval.peek_value(c_ref, &[], 0).unwrap(), Bool(false));
3622
3623 eval_close!(eval, time, b_ref, vec![Signed(16)]);
3624 eval_close!(eval, time, b_ref, vec![Signed(7)]);
3625 eval_close!(eval, time, b_ref, vec![Signed(42)]);
3626
3627 time += Duration::from_secs(1);
3628
3629 accept_input!(eval, time, in_ref, Signed(16));
3630 spawn_stream!(eval, time, b_ref);
3631 eval_stream_instances!(eval, time, b_ref);
3632 eval_stream_instances!(eval, time, c_ref);
3633
3634 assert!(stream_has_instance!(eval, b_ref, vec![Signed(16)]));
3635
3636 assert_eq!(
3637 eval.peek_value(b_ref, &[Signed(16)], 0).unwrap(),
3638 Bool(true)
3639 );
3640 assert_eq!(eval.peek_value(c_ref, &[], 0).unwrap(), Bool(true));
3641 }
3642
3643 #[test]
3644 fn test_instance_aggr_all_any() {
3645 let (_, eval, start) = setup(
3646 "input a: Int32\n\
3647 output b(x: Int32) \
3648 spawn with a \
3649 eval @a with x % 2 = 0 \
3650 close when a == 42 \
3651 output c @a := b.aggregate(over_instances: all, using: exists)",
3652 );
3653 let mut eval = eval.into_evaluator();
3654 let b_ref = StreamReference::Out(0);
3655 let c_ref = StreamReference::Out(1);
3656 let in_ref = StreamReference::In(0);
3657 accept_input!(eval, start, in_ref, Signed(17));
3658 spawn_stream!(eval, start, b_ref);
3659 eval_stream_instances!(eval, start, b_ref);
3660 eval_stream_instances!(eval, start, c_ref);
3661
3662 assert!(stream_has_instance!(eval, b_ref, vec![Signed(17)]));
3663
3664 assert_eq!(
3665 eval.peek_value(b_ref, &[Signed(17)], 0).unwrap(),
3666 Bool(false)
3667 );
3668 assert_eq!(eval.peek_value(c_ref, &[], 0).unwrap(), Bool(false));
3669
3670 let mut time = start + Duration::from_secs(1);
3671
3672 accept_input!(eval, time, in_ref, Signed(6));
3673 spawn_stream!(eval, time, b_ref);
3674 eval_stream_instances!(eval, time, b_ref);
3675 eval_stream_instances!(eval, time, c_ref);
3676
3677 assert!(stream_has_instance!(eval, b_ref, vec![Signed(6)]));
3678
3679 assert_eq!(
3680 eval.peek_value(b_ref, &[Signed(17)], 0).unwrap(),
3681 Bool(false)
3682 );
3683 assert_eq!(eval.peek_value(b_ref, &[Signed(6)], 0).unwrap(), Bool(true));
3684 assert_eq!(eval.peek_value(c_ref, &[], 0).unwrap(), Bool(true));
3685
3686 time += Duration::from_secs(1);
3687
3688 accept_input!(eval, time, in_ref, Signed(42));
3689 spawn_stream!(eval, time, b_ref);
3690 eval_stream_instances!(eval, time, b_ref);
3691 eval_stream_instances!(eval, time, c_ref);
3692
3693 assert!(stream_has_instance!(eval, b_ref, vec![Signed(42)]));
3694
3695 assert_eq!(
3696 eval.peek_value(b_ref, &[Signed(17)], 0).unwrap(),
3697 Bool(false)
3698 );
3699 assert_eq!(eval.peek_value(b_ref, &[Signed(6)], 0).unwrap(), Bool(true));
3700 assert_eq!(
3701 eval.peek_value(b_ref, &[Signed(42)], 0).unwrap(),
3702 Bool(true)
3703 );
3704 assert_eq!(eval.peek_value(c_ref, &[], 0).unwrap(), Bool(true));
3705
3706 eval_close!(eval, time, b_ref, vec![Signed(17)]);
3707 eval_close!(eval, time, b_ref, vec![Signed(6)]);
3708 eval_close!(eval, time, b_ref, vec![Signed(42)]);
3709
3710 time += Duration::from_secs(1);
3711
3712 accept_input!(eval, time, in_ref, Signed(17));
3713 spawn_stream!(eval, time, b_ref);
3714 eval_stream_instances!(eval, time, b_ref);
3715 eval_stream_instances!(eval, time, c_ref);
3716
3717 assert!(stream_has_instance!(eval, b_ref, vec![Signed(17)]));
3718
3719 assert_eq!(
3720 eval.peek_value(b_ref, &[Signed(17)], 0).unwrap(),
3721 Bool(false)
3722 );
3723 assert_eq!(eval.peek_value(c_ref, &[], 0).unwrap(), Bool(false));
3724 }
3725
3726 #[test]
3727 fn test_instance_aggr_fresh_all() {
3728 let (_, eval, start) = setup(
3729 "input a: Int32\n\
3730 input i: Int32\n\
3731 output b(x: Int32)\n\
3732 spawn with a\n\
3733 eval when (x + i) % 2 = 0 with x > 10\n\
3734 close when a == 42\n\
3735 output c := b.aggregate(over_instances: fresh, using: forall)",
3736 );
3737 let mut eval = eval.into_evaluator();
3738 let b_ref = StreamReference::Out(0);
3739 let c_ref = StreamReference::Out(1);
3740 let a_ref = StreamReference::In(0);
3741 let i_ref = StreamReference::In(1);
3742 accept_input!(eval, start, a_ref, Signed(16));
3743 accept_input!(eval, start, i_ref, Signed(0));
3744 spawn_stream!(eval, start, b_ref);
3745 eval_stream_instances!(eval, start, b_ref);
3746 eval_stream_instances!(eval, start, c_ref);
3747
3748 assert!(stream_has_instance!(eval, b_ref, vec![Signed(16)]));
3749
3750 assert_eq!(
3751 eval.peek_value(b_ref, &[Signed(16)], 0).unwrap(),
3752 Bool(true)
3753 );
3754 assert_eq!(eval.peek_value(c_ref, &[], 0).unwrap(), Bool(true));
3755
3756 let mut time = start + Duration::from_secs(1);
3757 eval.new_cycle(time);
3758
3759 accept_input!(eval, time, a_ref, Signed(6));
3760 accept_input!(eval, time, i_ref, Signed(0));
3761 spawn_stream!(eval, time, b_ref);
3762 eval_stream_instances!(eval, time, b_ref);
3763 eval_stream_instances!(eval, time, c_ref);
3764
3765 assert!(stream_has_instance!(eval, b_ref, vec![Signed(6)]));
3766
3767 assert_eq!(
3768 eval.peek_value(b_ref, &[Signed(16)], 0).unwrap(),
3769 Bool(true)
3770 );
3771 assert_eq!(
3772 eval.peek_value(b_ref, &[Signed(6)], 0).unwrap(),
3773 Bool(false)
3774 );
3775 assert_eq!(eval.peek_value(c_ref, &[], 0).unwrap(), Bool(false));
3776
3777 time += Duration::from_secs(1);
3778 eval.new_cycle(time);
3779
3780 accept_input!(eval, time, a_ref, Signed(11));
3781 accept_input!(eval, time, i_ref, Signed(1));
3782 spawn_stream!(eval, time, b_ref);
3783 eval_stream_instances!(eval, time, b_ref);
3784 eval_stream_instances!(eval, time, c_ref);
3785
3786 assert!(stream_has_instance!(eval, b_ref, vec![Signed(11)]));
3787
3788 assert_eq!(
3789 eval.peek_value(b_ref, &[Signed(16)], 0).unwrap(),
3790 Bool(true)
3791 );
3792 assert_eq!(
3793 eval.peek_value(b_ref, &[Signed(6)], 0).unwrap(),
3794 Bool(false)
3795 );
3796 assert_eq!(
3797 eval.peek_value(b_ref, &[Signed(11)], 0).unwrap(),
3798 Bool(true)
3799 );
3800 assert_eq!(eval.peek_value(c_ref, &[], 0).unwrap(), Bool(true));
3801
3802 time += Duration::from_secs(1);
3803 eval.new_cycle(time);
3804
3805 accept_input!(eval, time, a_ref, Signed(42));
3806 accept_input!(eval, time, i_ref, Signed(1));
3807 spawn_stream!(eval, time, b_ref);
3808 eval_stream_instances!(eval, time, b_ref);
3809 eval_stream_instances!(eval, time, c_ref);
3810
3811 assert!(stream_has_instance!(eval, b_ref, vec![Signed(42)]));
3812
3813 assert_eq!(
3814 eval.peek_value(b_ref, &[Signed(16)], 0).unwrap(),
3815 Bool(true)
3816 );
3817 assert_eq!(
3818 eval.peek_value(b_ref, &[Signed(6)], 0).unwrap(),
3819 Bool(false)
3820 );
3821 assert_eq!(
3822 eval.peek_value(b_ref, &[Signed(11)], 0).unwrap(),
3823 Bool(true)
3824 );
3825 assert!(eval.peek_value(b_ref, &[Signed(42)], 0).is_none());
3826 assert_eq!(eval.peek_value(c_ref, &[], 0).unwrap(), Bool(true));
3827
3828 eval_close!(eval, time, b_ref, vec![Signed(16)]);
3829 eval_close!(eval, time, b_ref, vec![Signed(6)]);
3830 eval_close!(eval, time, b_ref, vec![Signed(11)]);
3831 eval_close!(eval, time, b_ref, vec![Signed(42)]);
3832
3833 time += Duration::from_secs(1);
3834 eval.new_cycle(time);
3835
3836 accept_input!(eval, time, a_ref, Signed(4));
3837 accept_input!(eval, time, i_ref, Signed(0));
3838 spawn_stream!(eval, time, b_ref);
3839 eval_stream_instances!(eval, time, b_ref);
3840 eval_stream_instances!(eval, time, c_ref);
3841
3842 assert!(stream_has_instance!(eval, b_ref, vec![Signed(4)]));
3843
3844 assert_eq!(
3845 eval.peek_value(b_ref, &[Signed(4)], 0).unwrap(),
3846 Bool(false)
3847 );
3848 assert_eq!(eval.peek_value(c_ref, &[], 0).unwrap(), Bool(false));
3849 }
3850
3851 #[test]
3852 fn test_multiple_eval_clauses() {
3853 let (_, eval, start) = setup(
3854 "input a : UInt64\n\
3855 output b
3856 eval @a when a < 10 with a + 1
3857 eval @a when a < 20 with a + 2
3858 eval @a with a + 3",
3859 );
3860 let mut eval = eval.into_evaluator();
3861 let out_ref = StreamReference::Out(0);
3862 let a_ref = StreamReference::In(0);
3863
3864 accept_input!(eval, start, a_ref, Unsigned(0));
3865 eval_stream_instances!(eval, start, out_ref);
3866 assert_eq!(eval.peek_value(out_ref, &[], 0).unwrap(), Unsigned(1));
3867 accept_input!(eval, start, a_ref, Unsigned(12));
3868 eval_stream_instances!(eval, start, out_ref);
3869 assert_eq!(eval.peek_value(out_ref, &[], 0).unwrap(), Unsigned(14));
3870 accept_input!(eval, start, a_ref, Unsigned(20));
3871 eval_stream_instances!(eval, start, out_ref);
3872 assert_eq!(eval.peek_value(out_ref, &[], 0).unwrap(), Unsigned(23));
3873 }
3874
3875 #[test]
3876 fn test_multiple_eval_clauses_no_filter() {
3877 let (_, eval, start) = setup(
3878 "input a : UInt64\n\
3879 output b
3880 eval @a with a + 1
3881 eval @a when a < 20 with a + 2",
3882 );
3883 let mut eval = eval.into_evaluator();
3884 let out_ref = StreamReference::Out(0);
3885 let a_ref = StreamReference::In(0);
3886
3887 accept_input!(eval, start, a_ref, Unsigned(0));
3888 eval_stream_instances!(eval, start, out_ref);
3889 assert_eq!(eval.peek_value(out_ref, &[], 0).unwrap(), Unsigned(1));
3890 accept_input!(eval, start, a_ref, Unsigned(12));
3891 eval_stream_instances!(eval, start, out_ref);
3892 assert_eq!(eval.peek_value(out_ref, &[], 0).unwrap(), Unsigned(13));
3893 }
3894
3895 #[test]
3896 fn test_multiple_eval_clauses_different_ac() {
3897 let (_, eval, start) = setup(
3898 "input a : UInt64\ninput b : UInt64\n\
3899 output c : UInt64
3900 eval @a&&b with 1
3901 eval @a with 2",
3902 );
3903 let mut eval = eval.into_evaluator();
3904 let out_ref = StreamReference::Out(0);
3905 let a_ref = StreamReference::In(0);
3906 let b_ref = StreamReference::In(1);
3907
3908 accept_input!(eval, start, a_ref, Unsigned(0));
3909 eval_stream_instances!(eval, start, out_ref);
3910 assert_eq!(eval.peek_value(out_ref, &[], 0).unwrap(), Unsigned(2));
3911 accept_input!(eval, start, a_ref, Unsigned(1));
3912 accept_input!(eval, start, b_ref, Unsigned(1));
3913 eval_stream_instances!(eval, start, out_ref);
3914 assert_eq!(eval.peek_value(out_ref, &[], 0).unwrap(), Unsigned(1));
3915 }
3916
3917 #[test]
3918 fn test_multiple_eval_clauses_ac_and_filter() {
3919 let (_, eval, start) = setup(
3920 "input a : UInt64\ninput b : UInt64\n\
3921 output c : UInt64
3922 eval @a&&b when b < 10 with 1
3923 eval @a&&b with 3
3924 eval @a when a < 10 with 2
3925 eval @a with 4",
3926 );
3927 let mut eval = eval.into_evaluator();
3928 let out_ref = StreamReference::Out(0);
3929 let a_ref = StreamReference::In(0);
3930 let b_ref = StreamReference::In(1);
3931
3932 accept_input!(eval, start, a_ref, Unsigned(11));
3933 eval_stream_instances!(eval, start, out_ref);
3934 assert_eq!(eval.peek_value(out_ref, &[], 0).unwrap(), Unsigned(4));
3935
3936 accept_input!(eval, start, a_ref, Unsigned(0));
3937 eval_stream_instances!(eval, start, out_ref);
3938 assert_eq!(eval.peek_value(out_ref, &[], 0).unwrap(), Unsigned(2));
3939
3940 accept_input!(eval, start, a_ref, Unsigned(1));
3941 accept_input!(eval, start, b_ref, Unsigned(11));
3942 eval_stream_instances!(eval, start, out_ref);
3943 assert_eq!(eval.peek_value(out_ref, &[], 0).unwrap(), Unsigned(3));
3944
3945 accept_input!(eval, start, a_ref, Unsigned(1));
3946 accept_input!(eval, start, b_ref, Unsigned(1));
3947 eval_stream_instances!(eval, start, out_ref);
3948 assert_eq!(eval.peek_value(out_ref, &[], 0).unwrap(), Unsigned(1));
3949 }
3950
3951 #[test]
3953 fn test_parameterized_window_bug() {
3954 let (_, eval, mut time) = setup(
3955 "input a : Int64
3956 output b(p1)
3957 spawn with a
3958 eval when a == p1 with true
3959 output c(p1)
3960 spawn with a
3961 eval @1Hz with b(p1).aggregate(over: 10s, using: count)
3962 ",
3963 );
3964 let mut eval = eval.into_evaluator();
3965 let b_ref = StreamReference::Out(0);
3966 let c_ref = StreamReference::Out(1);
3967 let mut tracer = NoTracer::default();
3968
3969 time += Duration::from_millis(1000);
3970 eval.eval_event(&[Signed(1)], time, &mut tracer);
3971 assert!(stream_has_instance!(eval, b_ref, vec![Signed(1)]));
3972 assert!(stream_has_instance!(eval, c_ref, vec![Signed(1)]));
3973 assert_eq!(eval.peek_value(b_ref, &[Signed(1)], 0).unwrap(), Bool(true));
3974
3975 time += Duration::from_millis(1000);
3976 eval.eval_time_driven_tasks(
3977 vec![EvaluationTask::Evaluate(c_ref.out_ix(), vec![Signed(1)])],
3978 time,
3979 &mut tracer,
3980 );
3981 eval.eval_event(&[Signed(1)], time, &mut tracer);
3982 assert_eq!(eval.peek_value(b_ref, &[Signed(1)], 0).unwrap(), Bool(true));
3983 assert_eq!(
3984 eval.peek_value(c_ref, &[Signed(1)], 0).unwrap(),
3985 Unsigned(1)
3986 );
3987
3988 time += Duration::from_millis(1000);
3989 eval.eval_time_driven_tasks(
3990 vec![EvaluationTask::Evaluate(c_ref.out_ix(), vec![Signed(1)])],
3991 time,
3992 &mut tracer,
3993 );
3994 assert_eq!(
3995 eval.peek_value(c_ref, &[Signed(1)], 0).unwrap(),
3996 Unsigned(2)
3997 );
3998
3999 }
4008
4009 #[test]
4011 fn test_parameterized_window_spawn_bug() {
4012 let (_, eval, mut time) = setup(
4013 "input a : Int
4014 output b spawn @a eval @0.5s with a.aggregate(over: 5s, using: count)",
4015 );
4016 let mut eval = eval.into_evaluator();
4017 let b_ref = StreamReference::Out(0);
4018 let mut tracer = NoTracer::default();
4019
4020 time += Duration::from_millis(1000);
4023 eval.eval_event(&[Signed(1)], time, &mut tracer);
4024 assert!(stream_has_instance!(eval, b_ref, Vec::<Value>::new()));
4025
4026 time += Duration::from_millis(500);
4028 eval.eval_time_driven_tasks(
4029 vec![EvaluationTask::Evaluate(b_ref.out_ix(), vec![Signed(1)])],
4030 time,
4031 &mut tracer,
4032 );
4033 assert_eq!(
4034 eval.peek_value(b_ref, &[Signed(1)], 0).unwrap(),
4035 Unsigned(1)
4036 );
4037 }
4038
4039 #[test]
4041 fn test_parameterized_window_global_freq() {
4042 let (_, eval, mut time) = setup(
4043 "input page_id: UInt
4044 output page_id_visits(pid)
4045 spawn with page_id
4046 eval when pid == page_id
4047
4048 output visits_per_day(pid)
4049 spawn with page_id
4050 eval @Global(1h) with page_id_visits(pid).aggregate(over: 1h, using: count)",
4051 );
4052 let mut eval = eval.into_evaluator();
4053 let visits = StreamReference::Out(0);
4054 let avg = StreamReference::Out(1);
4055 let mut tracer = NoTracer::default();
4056
4057 eval.eval_event(&[Signed(1)], time, &mut tracer);
4058 assert!(stream_has_instance!(eval, visits, vec![Signed(1)]));
4059 assert!(stream_has_instance!(eval, avg, vec![Signed(1)]));
4060
4061 time += Duration::from_millis(1000);
4062 eval.eval_event(&[Signed(2)], time, &mut tracer);
4063 assert!(stream_has_instance!(eval, visits, vec![Signed(2)]));
4064 assert!(stream_has_instance!(eval, avg, vec![Signed(2)]));
4065
4066 time += Duration::from_millis(1000);
4067 eval.eval_event(&[Signed(3)], time, &mut tracer);
4068 assert!(stream_has_instance!(eval, visits, vec![Signed(3)]));
4069 assert!(stream_has_instance!(eval, avg, vec![Signed(3)]));
4070
4071 time += Duration::from_millis(1000);
4072 eval.eval_event(&[Signed(5)], time, &mut tracer);
4073 assert!(stream_has_instance!(eval, visits, vec![Signed(5)]));
4074 assert!(stream_has_instance!(eval, avg, vec![Signed(5)]));
4075
4076 time += Duration::from_millis(1000);
4077 eval.eval_event(&[Signed(3)], time, &mut tracer);
4078 assert!(stream_has_instance!(eval, visits, vec![Signed(3)]));
4079 assert!(stream_has_instance!(eval, avg, vec![Signed(3)]));
4080
4081 time += Duration::from_millis(1000);
4082 eval.eval_event(&[Signed(1)], time, &mut tracer);
4083 assert!(stream_has_instance!(eval, visits, vec![Signed(1)]));
4084 assert!(stream_has_instance!(eval, avg, vec![Signed(1)]));
4085
4086 time = Duration::from_secs(3600);
4087 eval.eval_time_driven_tasks(
4088 vec![
4089 EvaluationTask::Evaluate(avg.out_ix(), vec![Signed(1)]),
4090 EvaluationTask::Evaluate(avg.out_ix(), vec![Signed(2)]),
4091 EvaluationTask::Evaluate(avg.out_ix(), vec![Signed(3)]),
4092 EvaluationTask::Evaluate(avg.out_ix(), vec![Signed(5)]),
4093 ],
4094 time,
4095 &mut tracer,
4096 );
4097 assert_eq!(eval.peek_value(avg, &[Signed(1)], 0).unwrap(), Unsigned(1));
4098 assert_eq!(eval.peek_value(avg, &[Signed(2)], 0).unwrap(), Unsigned(1));
4099 assert_eq!(eval.peek_value(avg, &[Signed(3)], 0).unwrap(), Unsigned(2));
4100 assert_eq!(eval.peek_value(avg, &[Signed(5)], 0).unwrap(), Unsigned(1));
4101 }
4102
4103 #[test]
4104 fn filtered_instance_aggregation() {
4105 let (_, eval, mut time) = setup(
4106 "input a: Int64
4107 input b : Int64
4108 output c(p) spawn with a
4109 eval @a with c(p).offset(by: -1).defaults(to: 0) + 1
4110 output d eval @a&&b with c.aggregate(over_instances: all(where: (p) => p > b), using: count)",
4111 );
4112
4113 let mut eval = eval.into_evaluator();
4114 let mut tracer = NoTracer::default();
4115
4116 let d_ref = StreamReference::Out(1);
4117
4118 eval.eval_event(&[Signed(1), Signed(2)], time, &mut tracer);
4119 assert_eq!(eval.peek_value(d_ref, &[], 0).unwrap(), Unsigned(0));
4120 time += Duration::from_secs(1);
4121 eval.eval_event(&[Signed(3), Signed(2)], time, &mut tracer);
4122 assert_eq!(eval.peek_value(d_ref, &[], 0).unwrap(), Unsigned(1));
4123 time += Duration::from_secs(1);
4124 eval.eval_event(&[Signed(5), Signed(0)], time, &mut tracer);
4125 assert_eq!(eval.peek_value(d_ref, &[], 0).unwrap(), Unsigned(3));
4126 }
4127
4128 #[test]
4129 fn instance_aggregation_grouping() {
4130 let (_, eval, mut time) = setup(
4131 "input a: Int64
4132 input b : Int64
4133 output c(p1,p2)
4134 spawn with (a,b)
4135 eval when a == p1 && b == p2 with p1 + p2
4136 output d(p)
4137 spawn with a
4138 eval @true with c.aggregate(over_instances: all(where: (p1,p2) => p1==p), using: sum)",
4139 );
4140
4141 let mut eval = eval.into_evaluator();
4142 let mut tracer = NoTracer::default();
4143
4144 let d_ref = StreamReference::Out(1);
4145
4146 eval.eval_event(&[Signed(1), Signed(2)], time, &mut tracer);
4147 time += Duration::from_secs(1);
4148 eval.eval_event(&[Signed(2), Signed(3)], time, &mut tracer);
4149 time += Duration::from_secs(1);
4150 eval.eval_event(&[Signed(1), Signed(5)], time, &mut tracer);
4151 time += Duration::from_secs(1);
4152 eval.eval_event(&[Signed(4), Signed(2)], time, &mut tracer);
4153 time += Duration::from_secs(1);
4154 eval.eval_event(&[Signed(4), Signed(1)], time, &mut tracer);
4155 time += Duration::from_secs(1);
4156 eval.eval_event(&[Signed(2), Signed(5)], time, &mut tracer);
4157 time += Duration::from_secs(1);
4158 eval.eval_event(&[Signed(2), Signed(4)], time, &mut tracer);
4159 assert_eq!(eval.peek_value(d_ref, &[Signed(1)], 0).unwrap(), Signed(9));
4160 assert_eq!(eval.peek_value(d_ref, &[Signed(2)], 0).unwrap(), Signed(18));
4161 assert_eq!(eval.peek_value(d_ref, &[Signed(4)], 0).unwrap(), Signed(11));
4162 }
4163}