rtlola_interpreter/api/
monitor.rs

1//! The [Monitor] is the single threaded version of the API.
2//! Consequently deadlines of timed streams are only evaluated with a new event.
3//! Hence this API is more suitable for offline monitoring or embedded scenarios.
4//!
5//! The [Monitor] is parameterized over its input and output method.
6//! The preferred method to create an API is using the [ConfigBuilder](crate::ConfigBuilder) and the [monitor](crate::ConfigBuilder::monitor) method.
7//!
8//! # Input Method
9//! An input method has to implement the [EventFactory](crate::input::EventFactory) trait. Out of the box two different methods are provided:
10//! * [ArrayFactory](crate::input::ArrayFactory): Provides a basic input method for anything that already is an [Event] or that can be transformed into one using `TryInto<[Value]>`.
11//! * [MappedFactory](crate::input::MappedFactory): Is a more elaborate input method. It allows to provide a custom data structure to the monitor as an event, as long as it implements the [InputMap](crate::input::InputMap) trait.
12//!     If implemented this traits provides functionality to generate a new value for any input stream from the data structure.
13//!
14//! # Output Method
15//! The [Monitor] can provide output with a varying level of detail captured by the [VerdictRepresentation] trait. The different output formats are:
16//! * [Incremental]: For each processed event a condensed list of monitor state changes is provided.
17//! * [Total]: For each event a complete snapshot of the current monitor state is returned
18//! * [TotalIncremental](crate::monitor::TotalIncremental): For each processed event a complete list of monitor state changes is provided
19//! * [TriggerMessages]: For each event a list of violated triggers with their description is produced.
20use std::cell::RefCell;
21use std::fmt::{Debug, Display, Formatter};
22use std::marker::PhantomData;
23use std::rc::Rc;
24use std::time::Duration;
25
26use itertools::Itertools;
27use rtlola_frontend::mir::{InputReference, OutputReference, RtLolaMir, TriggerReference, Type};
28#[cfg(feature = "serde")]
29use serde::Serialize;
30
31use crate::config::{Config, ExecutionMode};
32use crate::configuration::time::{OutputTimeRepresentation, RelativeFloat, TimeRepresentation};
33use crate::evaluator::{Evaluator, EvaluatorData};
34use crate::input::{EventFactory, EventFactoryError};
35use crate::schedule::schedule_manager::ScheduleManager;
36use crate::schedule::DynamicSchedule;
37use crate::storage::Value;
38use crate::{CondSerialize, Time};
39
40/// An event to be handled by the interpreter
41pub type Event = Vec<Value>;
42
43/**
44    Provides the functionality to generate a snapshot of the streams values.
45*/
46pub trait VerdictRepresentation: Clone + Debug + Send + CondSerialize + 'static {
47    /// This subtype captures the tracing capabilities of the verdict representation.
48    type Tracing: Tracer;
49
50    /// Creates a snapshot of the streams values.
51    fn create(data: RawVerdict) -> Self;
52
53    /// Creates a snapshot of the streams values including tracing data.
54    fn create_with_trace(data: RawVerdict, _tracing: Self::Tracing) -> Self {
55        Self::create(data)
56    }
57
58    /// Returns whether the verdict is empty. I.e. it doesn't contain any information.
59    fn is_empty(&self) -> bool;
60}
61
62/**
63Provides the functionality to collect additional tracing data during evaluation.
64The 'start' methods are guaranteed to be called before the 'end' method, while either both or none of them are called.
65 */
66pub trait Tracer: Default + Clone + Debug + Send + 'static {
67    /// This method is invoked at the start of event parsing
68    fn parse_start(&mut self) {}
69    /// This method is invoked at the end of event parsing
70    fn parse_end(&mut self) {}
71
72    /// This method is invoked at the start of the evaluation cycle.
73    fn eval_start(&mut self) {}
74    /// This method is invoked at the end of the evaluation cycle.
75    fn eval_end(&mut self) {}
76
77    /// This method is invoked at the start of the spawn evaluation of stream `output`
78    fn spawn_start(&mut self, _output: OutputReference) {}
79    /// This method is invoked at the end of the spawn evaluation of stream `output`
80    fn spawn_end(&mut self, _output: OutputReference) {}
81
82    /// This method is invoked at the start of the evaluation of stream `output`
83    fn instance_eval_start(&mut self, _output: OutputReference, _instance: &[Value]) {}
84    /// This method is invoked at the end of the evaluation of stream `output`
85    fn instance_eval_end(&mut self, _output: OutputReference, _instance: &[Value]) {}
86
87    /// This method is invoked at the start of the close evaluation of stream `output`
88    fn close_start(&mut self, _output: OutputReference, _instance: &[Value]) {}
89    /// This method is invoked at the end of the close evaluation of stream `output`
90    fn close_end(&mut self, _output: OutputReference, _instance: &[Value]) {}
91}
92
93/// This tracer provides no tracing data at all and serves as a default value.
94#[cfg_attr(feature = "serde", derive(Serialize))]
95#[derive(Debug, Clone, Copy, Default)]
96pub struct NoTracer {}
97impl Tracer for NoTracer {}
98
99/// A generic VerdictRepresentation suitable to use with any tracer.
100#[cfg_attr(feature = "serde", derive(Serialize))]
101#[derive(Debug, Clone)]
102pub struct TracingVerdict<T: Tracer, V: VerdictRepresentation> {
103    /// The contained tracing information.
104    #[cfg_attr(feature = "serde", serde(skip))]
105    pub tracer: T,
106    /// The verdict given in the chosen VerdictRepresentation
107    pub verdict: V,
108}
109
110impl<T: Tracer, V: VerdictRepresentation<Tracing = NoTracer>> VerdictRepresentation
111    for TracingVerdict<T, V>
112{
113    type Tracing = T;
114
115    fn create(data: RawVerdict) -> Self {
116        Self {
117            tracer: T::default(),
118            verdict: V::create(data),
119        }
120    }
121
122    fn create_with_trace(data: RawVerdict, tracing: Self::Tracing) -> Self {
123        Self {
124            tracer: tracing,
125            verdict: V::create(data),
126        }
127    }
128
129    fn is_empty(&self) -> bool {
130        V::is_empty(&self.verdict)
131    }
132}
133
134/// A type representing the parameters of a stream.
135/// If a stream is not dynamically created it defaults to `None`.
136/// If a stream is dynamically created but does not have parameters it defaults to `Some(vec![])`
137pub type Parameters = Option<Vec<Value>>;
138
139/// A stream instance. First element represents the parameter values of the instance, the second element the value of the instance.
140pub type Instance = (Parameters, Option<Value>);
141
142/// An enum representing a change in the monitor.
143#[cfg_attr(feature = "serde", derive(Serialize))]
144#[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd)]
145pub enum Change {
146    /// Indicates that a new instance of a stream was created with the given values as parameters.
147    Spawn(Vec<Value>),
148    /// Indicates that an instance got a new value. The instance is identified through the given [Parameters].
149    Value(Parameters, Value),
150    /// Indicates that an instance was closed. The given values are the parameters of the closed instance.
151    Close(Vec<Value>),
152}
153
154impl Display for Change {
155    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
156        match self {
157            Change::Spawn(para) => write!(f, "Spawn<{}>", para.iter().join(", ")),
158            Change::Close(para) => write!(f, "Close<{}>", para.iter().join(", ")),
159            Change::Value(para, value) => match para {
160                Some(para) => write!(f, "Instance<{}> = {}", para.iter().join(", "), value),
161                None => write!(f, "Value = {}", value),
162            },
163        }
164    }
165}
166
167/**
168    Represents the changes of the monitor state. Each element represents a set of [Change]s of a specific output stream.
169*/
170pub type Incremental = Vec<(OutputReference, Vec<Change>)>;
171
172impl VerdictRepresentation for Incremental {
173    type Tracing = NoTracer;
174
175    fn create(data: RawVerdict) -> Self {
176        data.eval.peek_fresh_outputs()
177    }
178
179    fn is_empty(&self) -> bool {
180        Vec::is_empty(self)
181    }
182}
183
184/**
185Represents the changes of the monitor state divided into inputs, outputs and trigger.
186Changes of output streams are represented by a set of [Change]s.
187A change of an input is represented by its new [Value].
188A change of a trigger is represented by its [TriggerReference].
189
190Note: Only streams that actually changed are included in the collections.
191 */
192#[cfg_attr(feature = "serde", derive(Serialize))]
193#[derive(Debug, Clone)]
194pub struct TotalIncremental {
195    /// The set of changed inputs.
196    pub inputs: Vec<(InputReference, Value)>,
197    /// The set of changed outputs.
198    pub outputs: Vec<(OutputReference, Vec<Change>)>,
199    /// The set of changed triggers. I.e. all triggers that were activated.
200    pub trigger: Vec<TriggerReference>,
201}
202
203impl VerdictRepresentation for TotalIncremental {
204    type Tracing = NoTracer;
205
206    fn create(data: RawVerdict) -> Self {
207        let inputs = data.eval.peek_fresh_input();
208        let outputs = data.eval.peek_fresh_outputs();
209        let trigger = data.eval.peek_violated_triggers();
210        Self {
211            inputs,
212            outputs,
213            trigger,
214        }
215    }
216
217    fn is_empty(&self) -> bool {
218        self.inputs.is_empty() && self.outputs.is_empty() && self.trigger.is_empty()
219    }
220}
221
222/**
223    Represents a snapshot of the monitor state containing the current value of each output and input stream.
224*/
225#[cfg_attr(feature = "serde", derive(Serialize))]
226#[derive(Debug, Clone, Eq, PartialEq)]
227pub struct Total {
228    /// The ith value in this vector is the current value of the ith input stream.
229    pub inputs: Vec<Option<Value>>,
230
231    /// The ith value in this vector is the vector of instances of the ith output stream.
232    /// If the stream has no instance yet, this vector is empty. If a stream is not parameterized, the vector will always be of size 1.
233    pub outputs: Vec<Vec<Instance>>,
234}
235
236impl VerdictRepresentation for Total {
237    type Tracing = NoTracer;
238
239    fn create(data: RawVerdict) -> Self {
240        Total {
241            inputs: data.eval.peek_inputs(),
242            outputs: data.eval.peek_outputs(),
243        }
244    }
245
246    fn is_empty(&self) -> bool {
247        false
248    }
249}
250
251/**
252    Represents the index and the formatted message of all violated triggers.
253*/
254pub type TriggerMessages = Vec<(OutputReference, Parameters, String)>;
255
256impl VerdictRepresentation for TriggerMessages {
257    type Tracing = NoTracer;
258
259    fn create(data: RawVerdict) -> Self
260    where
261        Self: Sized,
262    {
263        data.eval.peek_violated_triggers_messages()
264    }
265
266    fn is_empty(&self) -> bool {
267        Vec::is_empty(self)
268    }
269}
270
271/**
272    The [Verdicts] struct represents the verdict of the API.
273
274    It contains the output of the periodic streams with the `timed` field and the output of the event-based streams with `event`.
275    The field `timed` is a vector, containing all updates of periodic streams since the last event.
276*/
277#[cfg_attr(feature = "serde", derive(Serialize))]
278#[derive(Debug, Clone)]
279pub struct Verdicts<V: VerdictRepresentation, VerdictTime: OutputTimeRepresentation> {
280    /// All verdicts caused by timed streams given at each deadline that occurred.
281    pub timed: Vec<(VerdictTime::InnerTime, V)>,
282    /// The verdict that resulted from evaluation the event.
283    pub event: V,
284    /// The time of the event.
285    pub ts: VerdictTime::InnerTime,
286}
287
288/**
289The Monitor is the central object exposed by the API.
290
291The [Monitor] accepts new events and computes streams.
292It can compute event-based streams based on new events through `accept_event`.
293It can also simply advance periodic streams up to a given timestamp through `accept_time`.
294The generic argument `Source` implements the [EventFactory] trait describing the input source of the API.
295The generic argument `SourceTime` implements the [TimeRepresentation] trait defining the input time format.
296The generic argument `Verdict` implements the [VerdictRepresentation] trait describing the output format of the API that is by default [Incremental].
297The generic argument `VerdictTime` implements the [TimeRepresentation] trait defining the output time format. It defaults to [RelativeFloat]
298 */
299#[allow(missing_debug_implementations)]
300pub struct Monitor<Source, Mode, Verdict = Incremental, VerdictTime = RelativeFloat>
301where
302    Source: EventFactory,
303    Mode: ExecutionMode,
304    Verdict: VerdictRepresentation,
305    VerdictTime: OutputTimeRepresentation + 'static,
306{
307    ir: RtLolaMir,
308    eval: Evaluator,
309
310    schedule_manager: ScheduleManager,
311
312    source: Source,
313
314    source_time: Mode::SourceTime,
315    output_time: VerdictTime,
316
317    phantom: PhantomData<Verdict>,
318}
319
320/// Crate-public interface
321impl<Source, Mode, Verdict, VerdictTime> Monitor<Source, Mode, Verdict, VerdictTime>
322where
323    Source: EventFactory,
324    Mode: ExecutionMode,
325    Verdict: VerdictRepresentation,
326    VerdictTime: OutputTimeRepresentation,
327{
328    ///setup
329    pub fn setup(
330        config: Config<Mode, VerdictTime>,
331        setup_data: Source::CreationData,
332    ) -> Result<Monitor<Source, Mode, Verdict, VerdictTime>, EventFactoryError> {
333        let dyn_schedule = Rc::new(RefCell::new(DynamicSchedule::new()));
334        let mut source_time = config.mode.time_representation().clone();
335        let mut output_time = VerdictTime::default();
336
337        let st = source_time.init_start_time(config.start_time);
338        output_time.set_start_time(st);
339
340        let input_map = config
341            .ir
342            .inputs
343            .iter()
344            .map(|i| (i.name.clone(), i.reference.in_ix()))
345            .collect();
346
347        let eval_data = EvaluatorData::new(config.ir.clone(), dyn_schedule.clone());
348
349        let time_manager = ScheduleManager::setup(config.ir.clone(), dyn_schedule)
350            .expect("Error computing schedule for time-driven streams");
351
352        Ok(Monitor {
353            ir: config.ir,
354            eval: eval_data.into_evaluator(),
355
356            schedule_manager: time_manager,
357
358            source: Source::new(input_map, setup_data)?,
359
360            source_time,
361            output_time,
362
363            phantom: PhantomData,
364        })
365    }
366
367    fn eval_deadlines(&mut self, ts: Time, only_before: bool) -> Vec<(Time, Verdict)> {
368        let mut timed: Vec<(Time, Verdict)> = vec![];
369        while self.schedule_manager.get_next_due().is_some() {
370            let mut tracer = Verdict::Tracing::default();
371            tracer.eval_start();
372            let due = self.schedule_manager.get_next_due().unwrap();
373            if due > ts || (only_before && due == ts) {
374                break;
375            }
376            let deadline = self.schedule_manager.get_next_deadline(ts);
377
378            self.eval.eval_time_driven_tasks(deadline, due, &mut tracer);
379            tracer.eval_end();
380            timed.push((
381                due,
382                Verdict::create_with_trace(RawVerdict::from(&self.eval), tracer),
383            ))
384        }
385        timed
386    }
387}
388
389/// A raw verdict that is transformed into the respective representation
390#[allow(missing_debug_implementations)]
391#[derive(Copy, Clone)]
392pub struct RawVerdict<'a> {
393    eval: &'a Evaluator,
394}
395
396impl<'a> From<&'a Evaluator> for RawVerdict<'a> {
397    fn from(eval: &'a Evaluator) -> Self {
398        RawVerdict { eval }
399    }
400}
401
402/// Public interface
403impl<Source, Mode, Verdict, VerdictTime> Monitor<Source, Mode, Verdict, VerdictTime>
404where
405    Source: EventFactory,
406    Mode: ExecutionMode,
407    Verdict: VerdictRepresentation,
408    VerdictTime: OutputTimeRepresentation,
409{
410    /**
411    Computes all periodic streams up through the new timestamp and then handles the input event.
412
413    The new event is therefore not seen by periodic streams up through a new timestamp.
414    */
415    pub fn accept_event(
416        &mut self,
417        ev: Source::Record,
418        ts: <Mode::SourceTime as TimeRepresentation>::InnerTime,
419    ) -> Result<Verdicts<Verdict, VerdictTime>, EventFactoryError> {
420        let mut tracer = Verdict::Tracing::default();
421
422        tracer.parse_start();
423        let ev = self.source.get_event(ev)?;
424        tracer.parse_end();
425        let ts = self.source_time.convert_from(ts);
426
427        // Evaluate timed streams with due < ts
428        let timed = if self.ir.has_time_driven_features() {
429            self.eval_deadlines(ts, true)
430        } else {
431            vec![]
432        };
433
434        // Evaluate
435        tracer.eval_start();
436        self.eval.eval_event(ev.as_slice(), ts, &mut tracer);
437        tracer.eval_end();
438        let event_change = Verdict::create_with_trace(RawVerdict::from(&self.eval), tracer);
439
440        let timed = timed
441            .into_iter()
442            .map(|(t, v)| (self.output_time.convert_into(t), v))
443            .collect();
444
445        Ok(Verdicts::<Verdict, VerdictTime> {
446            timed,
447            event: event_change,
448            ts: self.output_time.convert_into(ts),
449        })
450    }
451
452    /**
453    Computes all periodic streams up through and including the timestamp.
454    */
455    pub fn accept_time(
456        &mut self,
457        ts: <Mode::SourceTime as TimeRepresentation>::InnerTime,
458    ) -> Vec<(VerdictTime::InnerTime, Verdict)> {
459        let ts = self.source_time.convert_from(ts);
460
461        let timed = if self.ir.has_time_driven_features() {
462            self.eval_deadlines(ts, false)
463        } else {
464            vec![]
465        };
466
467        timed
468            .into_iter()
469            .map(|(t, v)| (self.output_time.convert_into(t), v))
470            .collect()
471    }
472
473    /// Returns the underlying representation of the specification as an [RtLolaMir]
474    pub fn ir(&self) -> &RtLolaMir {
475        &self.ir
476    }
477
478    /**
479    Get the name of an input stream based on its [InputReference].
480
481    The reference is valid for the lifetime of the monitor.
482    */
483    pub fn name_for_input(&self, id: InputReference) -> &str {
484        self.ir.inputs[id].name.as_str()
485    }
486
487    /**
488    Get the name of an output stream based on its [OutputReference].
489
490    The reference is valid for the lifetime of the monitor.
491    */
492    pub fn name_for_output(&self, id: OutputReference) -> &str {
493        self.ir.outputs[id].name.as_str()
494    }
495
496    /**
497    Get the [OutputReference] of a trigger based on its index.
498    */
499    pub fn trigger_stream_index(&self, id: usize) -> usize {
500        self.ir.triggers[id].output_reference.out_ix()
501    }
502
503    /**
504    Get the number of input streams.
505    */
506    pub fn number_of_input_streams(&self) -> usize {
507        self.ir.inputs.len()
508    }
509
510    /**
511    Get the number of output streams (this includes one output stream for each trigger).
512    */
513    pub fn number_of_output_streams(&self) -> usize {
514        self.ir.outputs.len()
515    }
516
517    /**
518    Get the number of triggers.
519    */
520    pub fn number_of_triggers(&self) -> usize {
521        self.ir.triggers.len()
522    }
523
524    /**
525    Get the type of an input stream based on its [InputReference].
526
527    The reference is valid for the lifetime of the monitor.
528    */
529    pub fn type_of_input(&self, id: InputReference) -> &Type {
530        &self.ir.inputs[id].ty
531    }
532
533    /**
534    Get the type of an output stream based on its [OutputReference].
535
536    The reference is valid for the lifetime of the monitor.
537    */
538    pub fn type_of_output(&self, id: OutputReference) -> &Type {
539        &self.ir.outputs[id].ty
540    }
541
542    /**
543    Get the extend rate of an output stream based on its [OutputReference].
544
545    The reference is valid for the lifetime of the monitor.
546    */
547    pub fn extend_rate_of_output(&self, id: OutputReference) -> Option<Duration> {
548        self.ir
549            .time_driven
550            .iter()
551            .find(|time_driven_stream| time_driven_stream.reference.out_ix() == id)
552            .map(|time_driven_stream| time_driven_stream.period_in_duration())
553    }
554
555    /// Switch [VerdictRepresentation]s of the [Monitor].
556    pub fn with_verdict_representation<T: VerdictRepresentation>(
557        self,
558    ) -> Monitor<Source, Mode, T, VerdictTime> {
559        let Monitor {
560            ir,
561            eval,
562            schedule_manager: time_manager,
563            source_time,
564            source,
565            output_time,
566            phantom: _,
567        } = self;
568        Monitor {
569            ir,
570            eval,
571            schedule_manager: time_manager,
572            source_time,
573            source,
574            output_time,
575            phantom: PhantomData,
576        }
577    }
578}
579
580#[cfg(test)]
581#[cfg(not(feature = "serde"))]
582mod tests {
583    use std::convert::Infallible;
584    use std::time::{Duration, Instant};
585
586    use crate::api::monitor::Change;
587    use crate::config::OfflineMode;
588    use crate::input::ArrayFactory;
589    use crate::monitor::{Incremental, Monitor, Total, Value, VerdictRepresentation};
590    use crate::time::RelativeFloat;
591    use crate::ConfigBuilder;
592
593    fn setup<const N: usize, V: VerdictRepresentation>(
594        spec: &str,
595    ) -> (
596        Instant,
597        Monitor<
598            ArrayFactory<N, Infallible, [Value; N]>,
599            OfflineMode<RelativeFloat>,
600            V,
601            RelativeFloat,
602        >,
603    ) {
604        // Init Monitor API
605        let monitor = ConfigBuilder::new()
606            .spec_str(spec)
607            .offline::<RelativeFloat>()
608            .with_array_events::<N, Infallible, [Value; N]>()
609            .with_verdict::<V>()
610            .monitor()
611            .expect("Failed to create monitor");
612        (Instant::now(), monitor)
613    }
614
615    fn sort_total(res: Total) -> Total {
616        let Total {
617            inputs,
618            mut outputs,
619        } = res;
620        outputs.iter_mut().for_each(|s| s.sort());
621        Total { inputs, outputs }
622    }
623
624    fn sort_incremental(mut res: Incremental) -> Incremental {
625        res.iter_mut().for_each(|(_, changes)| changes.sort());
626        res
627    }
628
629    #[test]
630    fn test_const_output_literals() {
631        let (start, mut monitor) = setup::<1, Total>(
632            r#"
633        input i_0: UInt8
634
635        output o_0: Bool @i_0 := true
636        output o_1: UInt8 @i_0 := 3
637        output o_2: Int8 @i_0 := -5
638        output o_3: Float32 @i_0 := -123.456
639        output o_4: String @i_0 := "foobar"
640        "#,
641        );
642        let v = Value::Unsigned(3);
643        let res = monitor
644            .accept_event([v.clone()], start.elapsed())
645            .expect("Failed to accept value");
646        assert!(res.timed.is_empty());
647        let res = res.event;
648        assert_eq!(res.inputs[0], Some(v));
649        assert_eq!(res.outputs[0][0], (None, Some(Value::Bool(true))));
650        assert_eq!(res.outputs[1][0], (None, Some(Value::Unsigned(3))));
651        assert_eq!(res.outputs[2][0], (None, Some(Value::Signed(-5))));
652        assert_eq!(
653            res.outputs[3][0],
654            (None, Some(Value::try_from(-123.456).unwrap()))
655        );
656        assert_eq!(res.outputs[4][0], (None, Some(Value::Str("foobar".into()))));
657    }
658
659    #[test]
660    fn test_count_window() {
661        let (_, mut monitor) = setup::<1, Incremental>(
662            "input a: UInt16\noutput b: UInt16 @0.25Hz := a.aggregate(over: 40s, using: count)",
663        );
664
665        let n = 25;
666        let mut time = Duration::from_secs(45);
667        let res = monitor
668            .accept_event([Value::Unsigned(1)], time)
669            .expect("Failed to accept value");
670        assert!(res.event.is_empty());
671        assert_eq!(res.timed.len(), 11);
672        assert!(res.timed.iter().all(|(time, change)| {
673            time.as_secs() % 4 == 0
674                && change[0].0 == 0
675                && change[0].1[0] == Change::Value(None, Value::Unsigned(0))
676        }));
677        for v in 2..=n {
678            time += Duration::from_secs(1);
679            let res = monitor
680                .accept_event([Value::Unsigned(v)], time)
681                .expect("Failed to accept value");
682
683            assert_eq!(res.event.len(), 0);
684            if (v - 1) % 4 == 0 {
685                assert_eq!(res.timed.len(), 1);
686                assert_eq!(
687                    res.timed[0].1[0].1[0],
688                    Change::Value(None, Value::Unsigned(v - 1))
689                );
690            } else {
691                assert_eq!(res.timed.len(), 0);
692            }
693        }
694    }
695
696    #[test]
697    fn test_spawn_eventbased() {
698        let (_, mut monitor) = setup::<2, Total>(
699            "input a: Int32\n\
700                  input b: Int32\n\
701                  output c(x: Int32) spawn with a eval with x + a\n\
702                  output d := b",
703        );
704
705        let res = monitor
706            .accept_event([Value::Signed(15), Value::None], Duration::from_secs(1))
707            .expect("Failed to accept value");
708        let expected = Total {
709            inputs: vec![Some(Value::Signed(15)), None],
710            outputs: vec![
711                vec![(Some(vec![Value::Signed(15)]), Some(Value::Signed(30)))],
712                vec![(None, None)],
713            ],
714        };
715        assert_eq!(res.event, expected);
716        assert_eq!(res.timed.len(), 0);
717
718        let res = monitor
719            .accept_event(
720                [Value::Signed(20), Value::Signed(7)],
721                Duration::from_secs(2),
722            )
723            .expect("Failed to accept value");
724        let expected = Total {
725            inputs: vec![Some(Value::Signed(20)), Some(Value::Signed(7))],
726            outputs: vec![
727                vec![
728                    (Some(vec![Value::Signed(15)]), Some(Value::Signed(35))),
729                    (Some(vec![Value::Signed(20)]), Some(Value::Signed(40))),
730                ],
731                vec![(None, Some(Value::Signed(7)))],
732            ],
733        };
734        assert_eq!(sort_total(res.event), sort_total(expected));
735        assert_eq!(res.timed.len(), 0);
736
737        let res = monitor
738            .accept_event([Value::None, Value::Signed(42)], Duration::from_secs(3))
739            .expect("Failed to accept value");
740        let expected = Total {
741            inputs: vec![Some(Value::Signed(20)), Some(Value::Signed(42))],
742            outputs: vec![
743                vec![
744                    (Some(vec![Value::Signed(15)]), Some(Value::Signed(35))),
745                    (Some(vec![Value::Signed(20)]), Some(Value::Signed(40))),
746                ],
747                vec![(None, Some(Value::Signed(42)))],
748            ],
749        };
750        assert_eq!(sort_total(res.event), sort_total(expected));
751        assert_eq!(res.timed.len(), 0);
752    }
753
754    #[test]
755    fn test_eval_close() {
756        let (_, mut monitor) = setup::<1, Incremental>(
757            "input a: Int32\n\
758                  output c(x: Int32)\n\
759                    spawn with a \n\
760                    close @a when true\n\
761                    eval with x + a",
762        );
763
764        let res = monitor
765            .accept_event([Value::Signed(15)], Duration::from_secs(1))
766            .expect("Failed to accept value");
767        let mut expected = vec![
768            Change::Spawn(vec![Value::Signed(15)]),
769            Change::Value(Some(vec![Value::Signed(15)]), Value::Signed(30)),
770            Change::Close(vec![Value::Signed(15)]),
771        ];
772        expected.sort();
773        assert!(res.timed.is_empty());
774        assert_eq!(res.event[0].0, 0);
775
776        assert_eq!(sort_incremental(res.event)[0].1, expected);
777    }
778}