rtlola_interpreter/api/
queued.rs

1//! The [QueuedMonitor] is the multi-threaded version of the API.
2//! Deadlines are evaluated immediately and the resulting verdicts are returned through a queue retrieved using the [output_queue](QueuedMonitor::output_queue) method.
3//! This API should be used in an online monitoring setting.
4//!
5//! The [QueuedMonitor] is parameterized over its input and output method.
6//! The preferred method to create an API is using the [ConfigBuilder](crate::ConfigBuilder) and the [queued_monitor](crate::ConfigBuilder::queued_monitor) method.
7//!
8//! # Input Method
9//! An input method has to implement the [EventFactory] trait. Out of the box two different methods are provided:
10//! * [EventInput](crate::input::ArrayFactory): Provides a basic input method for anything that already is an [Event](crate::monitor::Event) or that can be transformed into one using `TryInto<[Value]>`.
11//! * [RecordInput](crate::input::MappedFactory): Is a more elaborate input method. It allows to provide a custom data structure to the monitor as an input, as long as it implements the [Record](crate::input::InputMap) trait.
12//!     If implemented this traits provides functionality to generate a new value for any input stream from the data structure.
13//!
14//! # Output Method
15//! The [QueuedMonitor] can provide output with a varying level of detail captured by the [VerdictRepresentation](crate::monitor::VerdictRepresentation) trait. The different output formats are:
16//! * [Incremental]: For each processed event a condensed list of monitor state changes is provided.
17//! * [Total](crate::monitor::Total): For each event a complete snapshot of the current monitor state is returned
18//! * [TotalIncremental](crate::monitor::TotalIncremental): For each processed event a complete list of monitor state changes is provided
19//! * [TriggerMessages](crate::monitor::TriggerMessages): For each event a list of violated triggers with their description is produced.
20
21use std::any::Any;
22use std::cell::RefCell;
23use std::collections::HashMap;
24use std::error::Error;
25use std::fmt::{Debug, Display, Formatter};
26use std::ops::Not;
27use std::rc::Rc;
28use std::thread;
29use std::thread::JoinHandle;
30use std::time::{Duration, SystemTime};
31
32use crossbeam_channel::{bounded, unbounded, Sender};
33pub use crossbeam_channel::{Receiver, RecvError, RecvTimeoutError, TryRecvError};
34use rtlola_frontend::mir::{InputReference, OutputReference, RtLolaMir, Type};
35#[cfg(feature = "serde")]
36use serde::Serialize;
37
38use crate::config::{Config, ExecutionMode, OfflineMode, OnlineMode};
39use crate::configuration::time::{OutputTimeRepresentation, RelativeFloat, TimeRepresentation};
40use crate::evaluator::{Evaluator, EvaluatorData};
41use crate::input::EventFactory;
42use crate::monitor::{Incremental, RawVerdict, Tracer, VerdictRepresentation, Verdicts};
43use crate::schedule::schedule_manager::ScheduleManager;
44use crate::schedule::DynamicSchedule;
45use crate::time::RealTime;
46use crate::Monitor;
47
48/// Represents the kind of the verdict. I.e. whether the evaluation was triggered by an event, or by a deadline.
49#[cfg_attr(feature = "serde", derive(Serialize))]
50#[derive(Debug, Clone, Copy, PartialEq, Eq)]
51pub enum VerdictKind {
52    /// The verdict resulted from a deadline evaluation.
53    Timed,
54    /// The verdict resulted from the evaluation of an event.
55    Event,
56}
57
58/// Represents the length of a queue used for communication.
59/// Bounding its length can be useful in resource constraint environments.
60#[derive(Debug, Clone, Copy)]
61pub enum QueueLength {
62    /// There is no bound on the queue.
63    Unbounded,
64    /// The queue is bounded to keep at most this many elements.
65    Bounded(usize),
66}
67
68impl QueueLength {
69    fn to_queue<T>(self) -> (Sender<T>, Receiver<T>) {
70        match self {
71            QueueLength::Unbounded => unbounded(),
72            QueueLength::Bounded(cap) => bounded(cap),
73        }
74    }
75}
76
77/// Represents an error emitted by the API.
78#[derive(Debug)]
79pub enum QueueError {
80    /// A problem with the event source occurred further described by the inner error.
81    SourceError(Box<dyn Error + Send>),
82    /// A problem with the worker thread occurred.
83    ThreadPanic(String),
84    /// An event could not be sent.
85    ThreadSendError(Box<dyn Any + Send>),
86    /// Multiple start commands were send to the api.
87    MultipleStart,
88    /// An event was received before the monitor was started.
89    EventBeforeStart,
90}
91
92impl Display for QueueError {
93    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
94        match self {
95            QueueError::SourceError(e) => write!(f, "Event Source error: {}", e),
96            QueueError::ThreadPanic(reason) => write!(f, "Worker thread hung up: {}", reason),
97            QueueError::ThreadSendError(msg) => write!(f, "Failed to send message: {:?}", msg),
98            QueueError::MultipleStart => write!(f, "Multiple start commands sent"),
99            QueueError::EventBeforeStart => {
100                write!(f, "Received an event before a start was called")
101            }
102        }
103    }
104}
105
106impl Error for QueueError {
107    fn source(&self) -> Option<&(dyn Error + 'static)> {
108        match self {
109            QueueError::SourceError(e) => Some(e.as_ref()),
110            QueueError::ThreadPanic(_) => None,
111            QueueError::ThreadSendError(_) => None,
112            QueueError::MultipleStart => None,
113            QueueError::EventBeforeStart => None,
114        }
115    }
116}
117
118/// The verdict of the queued monitor. It is either triggered by a deadline or an event described by the `kind` field.
119/// The time when the verdict occurred is given by `ts`. `verdict` finally describes the changes to input and output streams
120/// as defined by the [VerdictRepresentation].
121#[cfg_attr(feature = "serde", derive(Serialize))]
122#[derive(Debug, Clone)]
123pub struct QueuedVerdict<Verdict: VerdictRepresentation, VerdictTime: OutputTimeRepresentation> {
124    /// The kind of the verdict. I.e. what triggered the evaluation it resulted from.
125    pub kind: VerdictKind,
126    /// The time when the verdict occurred.
127    pub ts: VerdictTime::InnerTime,
128    /// The changes of input and output streams as defined by the [VerdictRepresentation]
129    pub verdict: Verdict,
130}
131
132/**
133The QueuedMonitor is a threaded version of the Api allowing deadlines to be evaluated immediately.
134
135The [QueuedMonitor] accepts new events and computes streams.
136It can compute streams based on new events through [accept_event](QueuedMonitor::accept_event) once the [start](QueuedMonitor::start) function was invoked.
137Timed streams are evaluated automatically at their deadline. The resulting verdicts of events and deadlines are returned through a [Receiver] which can be obtained through [output_queue](QueuedMonitor::output_queue).
138Note that the [start](QueuedMonitor::start) function *has* to be invoked before any event can be evaluated.
139Finally, a calling [end](QueuedMonitor::end) will block until all events have been evaluated.
140
141The generic argument `Source` implements the [EventFactory] trait describing the input source of the API.
142The generic argument `SourceTime` implements the [TimeRepresentation] trait defining the input time format.
143The generic argument `Verdict` implements the [VerdictRepresentation] trait describing the output format of the API that is by default [Incremental].
144The generic argument `VerdictTime` implements the [TimeRepresentation] trait defining the output time format. It defaults to [RelativeFloat]
145 */
146#[allow(missing_debug_implementations)]
147pub struct QueuedMonitor<Source, Mode, Verdict = Incremental, VerdictTime = RelativeFloat>
148where
149    Source: EventFactory,
150    Mode: ExecutionMode,
151    Verdict: VerdictRepresentation,
152    VerdictTime: OutputTimeRepresentation + 'static,
153{
154    ir: RtLolaMir,
155    worker: Option<JoinHandle<Result<(), QueueError>>>,
156
157    input: Sender<WorkItem<Source, Mode::SourceTime>>,
158    output: Receiver<QueuedVerdict<Verdict, VerdictTime>>,
159}
160
161impl<Source, Mode, Verdict, VerdictTime> QueuedMonitor<Source, Mode, Verdict, VerdictTime>
162where
163    Source: EventFactory + 'static,
164    Mode: ExecutionMode,
165    Verdict: VerdictRepresentation,
166    VerdictTime: OutputTimeRepresentation,
167{
168    fn runner<W: Worker<Source, Mode, Verdict, VerdictTime>>(
169        config: Config<Mode, VerdictTime>,
170        input_names: HashMap<String, InputReference>,
171        setup_data: Source::CreationData,
172        input: Receiver<WorkItem<Source, Mode::SourceTime>>,
173        output: Sender<QueuedVerdict<Verdict, VerdictTime>>,
174    ) -> Result<(), QueueError> {
175        let mut worker = W::setup(config, input_names, setup_data, input.clone(), output)?;
176        worker.wait_for_start(&input)?;
177        drop(input);
178        worker.init()?;
179        worker.process()?;
180        Ok(())
181    }
182
183    fn worker_alive(&mut self) -> Result<(), QueueError> {
184        if self.worker.is_some() {
185            if self.worker.as_ref().unwrap().is_finished() {
186                let worker = self.worker.take().unwrap();
187                worker
188                    .join()
189                    .map_err(|e| QueueError::ThreadPanic(format!("{:?}", e)))?
190            } else {
191                Ok(())
192            }
193        } else {
194            Err(QueueError::ThreadPanic("Worker thread died.".to_string()))
195        }
196    }
197
198    /// Starts the evaluation process. This method has to be called before any event is accepted.
199    pub fn start(&mut self) -> Result<(), QueueError> {
200        self.worker_alive()?;
201        self.input
202            .send(WorkItem::Start)
203            .map_err(|msg| QueueError::ThreadSendError(Box::new(msg.0)))
204    }
205
206    /// This method returns the queue through which the verdicts can be received.
207    pub fn output_queue(&self) -> Receiver<QueuedVerdict<Verdict, VerdictTime>> {
208        self.output.clone()
209    }
210
211    /**
212    Schedules a new event for evaluation. The verdict can be received through the Queue return by the [QueuedMonitor::output_queue].
213    */
214    pub fn accept_event(
215        &mut self,
216        ev: Source::Record,
217        ts: <Mode::SourceTime as TimeRepresentation>::InnerTime,
218    ) -> Result<(), QueueError> {
219        self.worker_alive()?;
220        self.input
221            .send(WorkItem::Event(ev, ts))
222            .map_err(|msg| QueueError::ThreadSendError(Box::new(msg.0)))
223    }
224
225    /// Ends the evaluation process and blocks until all events are processed.
226    pub fn end(self) -> Result<(), QueueError> {
227        let QueuedMonitor { worker, input, .. } = self;
228        // Drop the sender of the input queue
229        drop(input);
230        // wait for worker to finish processing all events left in input queue
231        if let Some(worker) = worker {
232            worker
233                .join()
234                .map_err(|e| QueueError::ThreadPanic(format!("{:?}", e)))?
235        } else {
236            Ok(())
237        }
238    }
239
240    /// Returns the underlying representation of the specification as an [RtLolaMir]
241    pub fn ir(&self) -> &RtLolaMir {
242        &self.ir
243    }
244
245    /**
246    Get the name of an input stream based on its [InputReference].
247
248    The reference is valid for the lifetime of the monitor.
249    */
250    pub fn name_for_input(&self, id: InputReference) -> &str {
251        self.ir.inputs[id].name.as_str()
252    }
253
254    /**
255    Get the name of an output stream based on its [OutputReference].
256
257    The reference is valid for the lifetime of the monitor.
258    */
259    pub fn name_for_output(&self, id: OutputReference) -> &str {
260        self.ir.outputs[id].name.as_str()
261    }
262
263    /**
264    Get the [OutputReference] of a trigger based on its index.
265    */
266    pub fn trigger_stream_index(&self, id: usize) -> usize {
267        self.ir.triggers[id].output_reference.out_ix()
268    }
269
270    /**
271    Get the number of input streams.
272    */
273    pub fn number_of_input_streams(&self) -> usize {
274        self.ir.inputs.len()
275    }
276
277    /**
278    Get the number of output streams (this includes one output stream for each trigger).
279    */
280    pub fn number_of_output_streams(&self) -> usize {
281        self.ir.outputs.len()
282    }
283
284    /**
285    Get the number of triggers.
286    */
287    pub fn number_of_triggers(&self) -> usize {
288        self.ir.triggers.len()
289    }
290
291    /**
292    Get the type of an input stream based on its [InputReference].
293
294    The reference is valid for the lifetime of the monitor.
295    */
296    pub fn type_of_input(&self, id: InputReference) -> &Type {
297        &self.ir.inputs[id].ty
298    }
299
300    /**
301    Get the type of an output stream based on its [OutputReference].
302
303    The reference is valid for the lifetime of the monitor.
304    */
305    pub fn type_of_output(&self, id: OutputReference) -> &Type {
306        &self.ir.outputs[id].ty
307    }
308
309    /**
310    Get the extend rate of an output stream based on its [OutputReference].
311
312    The reference is valid for the lifetime of the monitor.
313    */
314    pub fn extend_rate_of_output(&self, id: OutputReference) -> Option<Duration> {
315        self.ir
316            .time_driven
317            .iter()
318            .find(|time_driven_stream| time_driven_stream.reference.out_ix() == id)
319            .map(|time_driven_stream| time_driven_stream.period_in_duration())
320    }
321}
322
323impl<Source, SourceTime, Verdict, VerdictTime>
324    QueuedMonitor<Source, OfflineMode<SourceTime>, Verdict, VerdictTime>
325where
326    Source: EventFactory + 'static,
327    SourceTime: TimeRepresentation,
328    Verdict: VerdictRepresentation,
329    VerdictTime: OutputTimeRepresentation,
330{
331    /// setup the api, while providing bounds for the queues.
332    pub fn bounded_setup(
333        config: Config<OfflineMode<SourceTime>, VerdictTime>,
334        setup_data: Source::CreationData,
335        input_queue_bound: QueueLength,
336        output_queue_bound: QueueLength,
337    ) -> QueuedMonitor<Source, OfflineMode<SourceTime>, Verdict, VerdictTime> {
338        let config_clone = config.clone();
339
340        let input_map = config
341            .ir
342            .inputs
343            .iter()
344            .map(|i| (i.name.clone(), i.reference.in_ix()))
345            .collect();
346
347        let (input_send, input_rcv) = input_queue_bound.to_queue();
348        let (output_send, output_rcv) = output_queue_bound.to_queue();
349
350        let worker = thread::spawn(move || {
351            Self::runner::<OfflineWorker<Source, SourceTime, Verdict, VerdictTime>>(
352                config_clone,
353                input_map,
354                setup_data,
355                input_rcv,
356                output_send,
357            )
358        });
359
360        QueuedMonitor {
361            ir: config.ir,
362            worker: Some(worker),
363
364            input: input_send,
365            output: output_rcv,
366        }
367    }
368
369    /// setup the api with unbounded queues
370    pub fn setup(
371        config: Config<OfflineMode<SourceTime>, VerdictTime>,
372        setup_data: Source::CreationData,
373    ) -> QueuedMonitor<Source, OfflineMode<SourceTime>, Verdict, VerdictTime> {
374        Self::bounded_setup(
375            config,
376            setup_data,
377            QueueLength::Unbounded,
378            QueueLength::Unbounded,
379        )
380    }
381}
382
383impl<Source, Verdict, VerdictTime> QueuedMonitor<Source, OnlineMode, Verdict, VerdictTime>
384where
385    Source: EventFactory + 'static,
386    Verdict: VerdictRepresentation,
387    VerdictTime: OutputTimeRepresentation,
388{
389    /// setup the api, while providing bounds for the queues.
390    pub fn bounded_setup(
391        config: Config<OnlineMode, VerdictTime>,
392        setup_data: Source::CreationData,
393        input_queue_bound: QueueLength,
394        output_queue_bound: QueueLength,
395    ) -> QueuedMonitor<Source, OnlineMode, Verdict, VerdictTime> {
396        let config_clone = config.clone();
397
398        let input_map = config
399            .ir
400            .inputs
401            .iter()
402            .map(|i| (i.name.clone(), i.reference.in_ix()))
403            .collect();
404
405        let (input_send, input_rcv) = input_queue_bound.to_queue();
406        let (output_send, output_rcv) = output_queue_bound.to_queue();
407
408        let worker = thread::spawn(move || {
409            Self::runner::<OnlineWorker<Source, Verdict, VerdictTime>>(
410                config_clone,
411                input_map,
412                setup_data,
413                input_rcv,
414                output_send,
415            )
416        });
417
418        QueuedMonitor {
419            ir: config.ir,
420            worker: Some(worker),
421
422            input: input_send,
423            output: output_rcv,
424        }
425    }
426
427    /// setup the api with unbounded queues
428    pub fn setup(
429        config: Config<OnlineMode, VerdictTime>,
430        setup_data: Source::CreationData,
431    ) -> QueuedMonitor<Source, OnlineMode, Verdict, VerdictTime> {
432        Self::bounded_setup(
433            config,
434            setup_data,
435            QueueLength::Unbounded,
436            QueueLength::Unbounded,
437        )
438    }
439}
440
441enum WorkItem<Source: EventFactory, SourceTime: TimeRepresentation> {
442    Start,
443    Event(Source::Record, SourceTime::InnerTime),
444}
445
446trait Worker<Source, Mode, Verdict, VerdictTime>: Sized
447where
448    Source: EventFactory,
449    Mode: ExecutionMode,
450    Verdict: VerdictRepresentation,
451    VerdictTime: OutputTimeRepresentation + 'static,
452{
453    fn setup(
454        config: Config<Mode, VerdictTime>,
455        input_names: HashMap<String, InputReference>,
456        setup_data: Source::CreationData,
457        input: Receiver<WorkItem<Source, Mode::SourceTime>>,
458        output: Sender<QueuedVerdict<Verdict, VerdictTime>>,
459    ) -> Result<Self, QueueError>;
460
461    fn wait_for_start(
462        &mut self,
463        input: &Receiver<WorkItem<Source, Mode::SourceTime>>,
464    ) -> Result<(), QueueError> {
465        // Wait for Start command
466        match input.recv() {
467            Ok(WorkItem::Start) => Ok(()),
468            Ok(WorkItem::Event(_, _)) => Err(QueueError::EventBeforeStart),
469            Err(_) => Ok(()),
470        }
471    }
472
473    fn init(&mut self) -> Result<(), QueueError>;
474
475    fn process(&mut self) -> Result<(), QueueError>;
476
477    fn try_send(
478        output: &Sender<QueuedVerdict<Verdict, VerdictTime>>,
479        verdict: Option<QueuedVerdict<Verdict, VerdictTime>>,
480    ) -> Result<(), QueueError> {
481        if let Some(verdict) = verdict {
482            output
483                .send(verdict)
484                .map_err(|e| QueueError::ThreadSendError(Box::new(e.0)))
485        } else {
486            Ok(())
487        }
488    }
489}
490
491struct OnlineWorker<Source, Verdict, VerdictTime>
492where
493    Source: EventFactory,
494    Verdict: VerdictRepresentation,
495    VerdictTime: OutputTimeRepresentation + 'static,
496{
497    source: Source,
498    source_time: RealTime,
499    output_time: Option<VerdictTime>,
500    start_time: Option<SystemTime>,
501
502    schedule_manager: ScheduleManager,
503    evaluator: Evaluator,
504    input: Receiver<WorkItem<Source, RealTime>>,
505    output: Sender<QueuedVerdict<Verdict, VerdictTime>>,
506}
507
508impl<
509        Source: EventFactory,
510        Verdict: VerdictRepresentation,
511        VerdictTime: OutputTimeRepresentation,
512    > Worker<Source, OnlineMode, Verdict, VerdictTime>
513    for OnlineWorker<Source, Verdict, VerdictTime>
514{
515    fn setup(
516        config: Config<OnlineMode, VerdictTime>,
517        input_names: HashMap<String, InputReference>,
518        setup_data: Source::CreationData,
519        input: Receiver<WorkItem<Source, RealTime>>,
520        output: Sender<QueuedVerdict<Verdict, VerdictTime>>,
521    ) -> Result<Self, QueueError> {
522        // setup monitor
523        let source_time = config.mode.time_representation().clone();
524        let source = Source::new(input_names, setup_data)
525            .map_err(|e| QueueError::SourceError(Box::new(e)))?;
526
527        // Setup evaluator
528        let dyn_schedule = Rc::new(RefCell::new(DynamicSchedule::new()));
529        let eval_data = EvaluatorData::new(config.ir.clone(), dyn_schedule.clone());
530        let schedule_manager = ScheduleManager::setup(config.ir.clone(), dyn_schedule)
531            .expect("Error computing schedule for time-driven streams");
532        let evaluator = eval_data.into_evaluator();
533
534        Ok(OnlineWorker {
535            source,
536            source_time,
537            output_time: None,
538            start_time: config.start_time,
539            schedule_manager,
540            evaluator,
541            input,
542            output,
543        })
544    }
545
546    fn init(&mut self) -> Result<(), QueueError> {
547        let st = self.source_time.init_start_time(self.start_time);
548        let mut ot = VerdictTime::default();
549        ot.set_start_time(st);
550        self.output_time.replace(ot);
551        Ok(())
552    }
553
554    fn process(&mut self) -> Result<(), QueueError> {
555        let output_time = self
556            .output_time
557            .as_mut()
558            .expect("Init to be executed before process");
559        loop {
560            let next_deadline = self.schedule_manager.get_next_due();
561            let item = if let Some(due) = next_deadline {
562                // Deadlines always in the future, if not, i.e. the max evaluates to 0 we should output a warning as the monitor is falling behind its schedule...
563                let now = self.source_time.convert_from(());
564                let wait_time = if due <= now {
565                    // eprintln!("Monitor is falling behind schedule by: {}s", (now - due).as_secs_f64());
566                    Duration::ZERO
567                } else {
568                    due - now
569                };
570                self.input.recv_timeout(wait_time)
571            } else {
572                self.input
573                    .recv()
574                    .map_err(|_| RecvTimeoutError::Disconnected)
575            };
576            let verdict = match item {
577                Ok(WorkItem::Event(e, ts)) => {
578                    // Received Event before deadline
579                    let mut tracer = Verdict::Tracing::default();
580                    tracer.parse_start();
581                    let e = self
582                        .source
583                        .get_event(e)
584                        .map_err(|e| QueueError::SourceError(Box::new(e)))?;
585                    tracer.parse_end();
586                    let ts = self.source_time.convert_from(ts);
587
588                    tracer.eval_start();
589                    self.evaluator.eval_event(&e, ts, &mut tracer);
590                    tracer.eval_end();
591
592                    let verdict =
593                        Verdict::create_with_trace(RawVerdict::from(&self.evaluator), tracer);
594                    verdict.is_empty().not().then_some(QueuedVerdict {
595                        kind: VerdictKind::Event,
596                        ts: output_time.convert_into(ts),
597                        verdict,
598                    })
599                }
600                Err(RecvTimeoutError::Timeout) => {
601                    // Deadline occurred before event
602                    let mut tracer = Verdict::Tracing::default();
603                    tracer.eval_start();
604                    let due = next_deadline.expect("timeout to only happen for a deadline.");
605                    //println!("Deadline at: {:?} evaluated at: {:?}", self.current_time, self.source_time.convert_from(()));
606
607                    let deadline = self.schedule_manager.get_next_deadline(due);
608                    self.evaluator
609                        .eval_time_driven_tasks(deadline, due, &mut tracer);
610                    tracer.eval_end();
611
612                    let verdict =
613                        Verdict::create_with_trace(RawVerdict::from(&self.evaluator), tracer);
614                    verdict.is_empty().not().then_some(QueuedVerdict {
615                        kind: VerdictKind::Timed,
616                        ts: output_time.convert_into(due),
617                        verdict,
618                    })
619                }
620                Err(RecvTimeoutError::Disconnected) => {
621                    // Channel closed, we are done here
622                    return Ok(());
623                }
624                Ok(WorkItem::Start) => {
625                    // Received second start command -> abort
626                    return Err(QueueError::MultipleStart);
627                }
628            };
629
630            Self::try_send(&self.output, verdict)?;
631        }
632    }
633}
634
635struct OfflineWorker<Source, SourceTime, Verdict, VerdictTime>
636where
637    Source: EventFactory,
638    SourceTime: TimeRepresentation,
639    Verdict: VerdictRepresentation,
640    VerdictTime: OutputTimeRepresentation + 'static,
641{
642    config: Config<OfflineMode<SourceTime>, VerdictTime>,
643    setup_data: Source::CreationData,
644
645    monitor: Option<Monitor<Source, OfflineMode<SourceTime>, Verdict, VerdictTime>>,
646    input: Receiver<WorkItem<Source, SourceTime>>,
647    output: Sender<QueuedVerdict<Verdict, VerdictTime>>,
648}
649
650impl<
651        Source: EventFactory,
652        SourceTime: TimeRepresentation,
653        Verdict: VerdictRepresentation,
654        VerdictTime: OutputTimeRepresentation,
655    > Worker<Source, OfflineMode<SourceTime>, Verdict, VerdictTime>
656    for OfflineWorker<Source, SourceTime, Verdict, VerdictTime>
657{
658    fn setup(
659        config: Config<OfflineMode<SourceTime>, VerdictTime>,
660        _input_names: HashMap<String, InputReference>,
661        setup_data: Source::CreationData,
662        input: Receiver<WorkItem<Source, SourceTime>>,
663        output: Sender<QueuedVerdict<Verdict, VerdictTime>>,
664    ) -> Result<Self, QueueError> {
665        Ok(OfflineWorker {
666            config,
667            setup_data,
668            monitor: None,
669            input,
670            output,
671        })
672    }
673
674    fn init(&mut self) -> Result<(), QueueError> {
675        // Setup evaluator
676        let monitor: Monitor<Source, OfflineMode<SourceTime>, Verdict, VerdictTime> =
677            Monitor::setup(self.config.clone(), self.setup_data.clone())
678                .map_err(|e| QueueError::SourceError(Box::new(e)))?;
679        self.monitor.replace(monitor);
680        Ok(())
681    }
682
683    fn process(&mut self) -> Result<(), QueueError> {
684        let monitor = self
685            .monitor
686            .as_mut()
687            .expect("Init to be called before process");
688        let mut last_event = None;
689        let mut done = false;
690        while !done {
691            match self.input.recv() {
692                Ok(WorkItem::Event(e, ts)) => {
693                    // Received Event
694                    last_event.replace(ts.clone());
695                    let Verdicts { timed, event, ts } = monitor
696                        .accept_event(e, ts)
697                        .map_err(|e| QueueError::SourceError(Box::new(e)))?;
698
699                    for (ts, v) in timed {
700                        let verdict = QueuedVerdict {
701                            kind: VerdictKind::Timed,
702                            ts,
703                            verdict: v,
704                        };
705                        Self::try_send(&self.output, Some(verdict))?;
706                    }
707
708                    if !event.is_empty() {
709                        let verdict = QueuedVerdict {
710                            kind: VerdictKind::Event,
711                            ts: ts.clone(),
712                            verdict: event,
713                        };
714                        Self::try_send(&self.output, Some(verdict))?;
715                    }
716                }
717                Err(_) => {
718                    // Channel closed, we are done here
719                    done = true;
720                    if let Some(last_event) = last_event.as_ref() {
721                        let timed = monitor.accept_time(last_event.clone());
722                        for (ts, v) in timed {
723                            let verdict = QueuedVerdict {
724                                kind: VerdictKind::Timed,
725                                ts,
726                                verdict: v,
727                            };
728                            Self::try_send(&self.output, Some(verdict))?;
729                        }
730                    } else {
731                        return Ok(());
732                    }
733                }
734                Ok(WorkItem::Start) => {
735                    // Received second start command -> abort
736                    return Err(QueueError::MultipleStart);
737                }
738            }
739        }
740        Ok(())
741    }
742}
743
744#[cfg(test)]
745#[cfg(not(feature = "serde"))]
746mod tests {
747    use std::convert::Infallible;
748    use std::thread::sleep;
749    use std::time::{Duration, Instant};
750
751    use crate::api::monitor::Change;
752    use crate::config::OfflineMode;
753    use crate::input::ArrayFactory;
754    use crate::monitor::{Incremental, Total, TotalIncremental, VerdictRepresentation};
755    use crate::queued::{QueuedVerdict, VerdictKind};
756    use crate::time::RelativeFloat;
757    use crate::{ConfigBuilder, QueuedMonitor, Value};
758
759    fn setup<const N: usize, V: VerdictRepresentation>(
760        spec: &str,
761    ) -> (
762        Instant,
763        QueuedMonitor<
764            ArrayFactory<N, Infallible, [Value; N]>,
765            OfflineMode<RelativeFloat>,
766            V,
767            RelativeFloat,
768        >,
769    ) {
770        // Init Monitor API
771        let monitor = ConfigBuilder::new()
772            .spec_str(spec)
773            .offline::<RelativeFloat>()
774            .with_array_events::<N, Infallible, [Value; N]>()
775            .with_verdict::<V>()
776            .queued_monitor();
777        (Instant::now(), monitor)
778    }
779
780    fn sort_total(res: Total) -> Total {
781        let Total {
782            inputs,
783            mut outputs,
784        } = res;
785        outputs.iter_mut().for_each(|s| s.sort());
786        Total { inputs, outputs }
787    }
788
789    fn sort_incremental(mut res: Incremental) -> Incremental {
790        res.iter_mut().for_each(|(_, changes)| changes.sort());
791        res
792    }
793
794    #[test]
795    fn test_const_output_literals() {
796        let (start, mut monitor) = setup::<1, Total>(
797            r#"
798        input i_0: UInt8
799
800        output o_0: Bool @i_0 := true
801        output o_1: UInt8 @i_0 := 3
802        output o_2: Int8 @i_0 := -5
803        output o_3: Float32 @i_0 := -123.456
804        output o_4: String @i_0 := "foobar"
805        "#,
806        );
807        let queue = monitor.output_queue();
808        monitor.start().expect("Failed to start monitor");
809        let v = Value::Unsigned(3);
810        let timeout = Duration::from_millis(500);
811
812        monitor
813            .accept_event([v.clone()], start.elapsed())
814            .expect("Failed to accept event");
815        let res = queue.recv_timeout(timeout).unwrap();
816
817        assert!(res.kind == VerdictKind::Event);
818        let res = res.verdict;
819        assert_eq!(res.inputs[0], Some(v));
820        assert_eq!(res.outputs[0][0], (None, Some(Value::Bool(true))));
821        assert_eq!(res.outputs[1][0], (None, Some(Value::Unsigned(3))));
822        assert_eq!(res.outputs[2][0], (None, Some(Value::Signed(-5))));
823        assert_eq!(
824            res.outputs[3][0],
825            (None, Some(Value::try_from(-123.456).unwrap()))
826        );
827        assert_eq!(res.outputs[4][0], (None, Some(Value::Str("foobar".into()))));
828    }
829
830    #[test]
831    fn test_count_window() {
832        let (_, mut monitor) = setup::<1, Incremental>(
833            "input a: UInt16\noutput b: UInt16 @0.25Hz := a.aggregate(over: 40s, using: count)",
834        );
835
836        let timeout = Duration::from_millis(500);
837        let output = monitor.output_queue();
838        monitor.start().expect("Failed to start monitor");
839        let n = 25;
840        let mut time = Duration::from_secs(45);
841        monitor
842            .accept_event([Value::Unsigned(1)], time)
843            .expect("Failed to accept event");
844
845        let res: Vec<_> = (0..11)
846            .map(|_| output.recv_timeout(timeout).unwrap())
847            .collect();
848        assert!(output.is_empty());
849
850        assert!(res.iter().all(|v| v.kind == VerdictKind::Timed));
851        assert!(res.iter().all(|QueuedVerdict { ts, verdict, .. }| {
852            ts.as_secs() % 4 == 0
853                && verdict[0].0 == 0
854                && verdict[0].1[0] == Change::Value(None, Value::Unsigned(0))
855        }));
856        for v in 2..=n {
857            time += Duration::from_secs(1);
858            monitor
859                .accept_event([Value::Unsigned(v)], time)
860                .expect("Failed to accept event");
861            if (v - 1) % 4 == 0 {
862                let res = output.recv_timeout(timeout).unwrap();
863                assert_eq!(res.kind, VerdictKind::Timed);
864                assert_eq!(
865                    res.verdict[0].1[0],
866                    Change::Value(None, Value::Unsigned(v - 1))
867                );
868            } else {
869                assert!(output.is_empty());
870            }
871        }
872    }
873
874    #[test]
875    fn test_spawn_eventbased() {
876        let (_, mut monitor) = setup::<2, Total>(
877            "input a: Int32\n\
878                  input b: Int32\n\
879                  output c(x: Int32) spawn with a eval with x + a\n\
880                  output d := b",
881        );
882
883        let timeout = Duration::from_millis(500);
884        let output = monitor.output_queue();
885        monitor.start().expect("Failed to start monitor");
886        monitor
887            .accept_event([Value::Signed(15), Value::None], Duration::from_secs(1))
888            .expect("Failed to accept event");
889        let res = output.recv_timeout(timeout).unwrap();
890
891        let expected = Total {
892            inputs: vec![Some(Value::Signed(15)), None],
893            outputs: vec![
894                vec![(Some(vec![Value::Signed(15)]), Some(Value::Signed(30)))],
895                vec![(None, None)],
896            ],
897        };
898        assert_eq!(res.kind, VerdictKind::Event);
899        assert_eq!(sort_total(res.verdict), sort_total(expected));
900
901        monitor
902            .accept_event(
903                [Value::Signed(20), Value::Signed(7)],
904                Duration::from_secs(2),
905            )
906            .expect("Failed to accept event");
907        let res = output.recv_timeout(timeout).unwrap();
908
909        let expected = Total {
910            inputs: vec![Some(Value::Signed(20)), Some(Value::Signed(7))],
911            outputs: vec![
912                vec![
913                    (Some(vec![Value::Signed(15)]), Some(Value::Signed(35))),
914                    (Some(vec![Value::Signed(20)]), Some(Value::Signed(40))),
915                ],
916                vec![(None, Some(Value::Signed(7)))],
917            ],
918        };
919        assert_eq!(res.kind, VerdictKind::Event);
920        assert_eq!(sort_total(res.verdict), sort_total(expected));
921
922        monitor
923            .accept_event([Value::None, Value::Signed(42)], Duration::from_secs(3))
924            .expect("Failed to accept event");
925        let res = output.recv_timeout(timeout).unwrap();
926
927        let expected = Total {
928            inputs: vec![Some(Value::Signed(20)), Some(Value::Signed(42))],
929            outputs: vec![
930                vec![
931                    (Some(vec![Value::Signed(15)]), Some(Value::Signed(35))),
932                    (Some(vec![Value::Signed(20)]), Some(Value::Signed(40))),
933                ],
934                vec![(None, Some(Value::Signed(42)))],
935            ],
936        };
937        assert_eq!(res.kind, VerdictKind::Event);
938        assert_eq!(sort_total(res.verdict), sort_total(expected));
939    }
940
941    #[test]
942    fn test_eval_close() {
943        let (_, mut monitor) = setup::<1, Incremental>(
944            "input a: Int32\n\
945                  output c(x: Int32)\n\
946                    spawn with a \n\
947                    close @a when true\n\
948                    eval with x + a",
949        );
950
951        let timeout = Duration::from_millis(500);
952        let output = monitor.output_queue();
953        monitor.start().expect("Failed to start monitor");
954        monitor
955            .accept_event([Value::Signed(15)], Duration::from_secs(1))
956            .expect("Failed to accept event");
957        let res = output.recv_timeout(timeout).unwrap();
958
959        let mut expected = vec![
960            Change::Spawn(vec![Value::Signed(15)]),
961            Change::Value(Some(vec![Value::Signed(15)]), Value::Signed(30)),
962            Change::Close(vec![Value::Signed(15)]),
963        ];
964        expected.sort();
965        assert_eq!(res.kind, VerdictKind::Event);
966        assert_eq!(res.verdict[0].0, 0);
967
968        assert_eq!(sort_incremental(res.verdict)[0].1, expected);
969    }
970
971    #[test]
972    fn test_online_time() {
973        let spec = "\
974            input i: UInt64\n\
975            output o @10Hz := true\
976        ";
977        let mut monitor = ConfigBuilder::new()
978            .spec_str(spec)
979            .online()
980            .with_array_events::<1, Infallible, [Value; 1]>()
981            .with_verdict::<TotalIncremental>()
982            .queued_monitor();
983        let output = monitor.output_queue();
984        monitor.start().expect("Failed to start monitor");
985        sleep(Duration::from_millis(1090));
986        monitor.end().unwrap();
987        assert_eq!(output.len(), 10);
988    }
989}