1use std::any::Any;
22use std::cell::RefCell;
23use std::collections::HashMap;
24use std::error::Error;
25use std::fmt::{Debug, Display, Formatter};
26use std::ops::Not;
27use std::rc::Rc;
28use std::thread;
29use std::thread::JoinHandle;
30use std::time::{Duration, SystemTime};
31
32use crossbeam_channel::{bounded, unbounded, Sender};
33pub use crossbeam_channel::{Receiver, RecvError, RecvTimeoutError, TryRecvError};
34use rtlola_frontend::mir::{InputReference, OutputReference, RtLolaMir, Type};
35#[cfg(feature = "serde")]
36use serde::Serialize;
37
38use crate::config::{Config, ExecutionMode, OfflineMode, OnlineMode};
39use crate::configuration::time::{OutputTimeRepresentation, RelativeFloat, TimeRepresentation};
40use crate::evaluator::{Evaluator, EvaluatorData};
41use crate::input::EventFactory;
42use crate::monitor::{Incremental, RawVerdict, Tracer, VerdictRepresentation, Verdicts};
43use crate::schedule::schedule_manager::ScheduleManager;
44use crate::schedule::DynamicSchedule;
45use crate::time::RealTime;
46use crate::Monitor;
47
48#[cfg_attr(feature = "serde", derive(Serialize))]
50#[derive(Debug, Clone, Copy, PartialEq, Eq)]
51pub enum VerdictKind {
52 Timed,
54 Event,
56}
57
58#[derive(Debug, Clone, Copy)]
61pub enum QueueLength {
62 Unbounded,
64 Bounded(usize),
66}
67
68impl QueueLength {
69 fn to_queue<T>(self) -> (Sender<T>, Receiver<T>) {
70 match self {
71 QueueLength::Unbounded => unbounded(),
72 QueueLength::Bounded(cap) => bounded(cap),
73 }
74 }
75}
76
77#[derive(Debug)]
79pub enum QueueError {
80 SourceError(Box<dyn Error + Send>),
82 ThreadPanic(String),
84 ThreadSendError(Box<dyn Any + Send>),
86 MultipleStart,
88 EventBeforeStart,
90}
91
92impl Display for QueueError {
93 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
94 match self {
95 QueueError::SourceError(e) => write!(f, "Event Source error: {}", e),
96 QueueError::ThreadPanic(reason) => write!(f, "Worker thread hung up: {}", reason),
97 QueueError::ThreadSendError(msg) => write!(f, "Failed to send message: {:?}", msg),
98 QueueError::MultipleStart => write!(f, "Multiple start commands sent"),
99 QueueError::EventBeforeStart => {
100 write!(f, "Received an event before a start was called")
101 }
102 }
103 }
104}
105
106impl Error for QueueError {
107 fn source(&self) -> Option<&(dyn Error + 'static)> {
108 match self {
109 QueueError::SourceError(e) => Some(e.as_ref()),
110 QueueError::ThreadPanic(_) => None,
111 QueueError::ThreadSendError(_) => None,
112 QueueError::MultipleStart => None,
113 QueueError::EventBeforeStart => None,
114 }
115 }
116}
117
118#[cfg_attr(feature = "serde", derive(Serialize))]
122#[derive(Debug, Clone)]
123pub struct QueuedVerdict<Verdict: VerdictRepresentation, VerdictTime: OutputTimeRepresentation> {
124 pub kind: VerdictKind,
126 pub ts: VerdictTime::InnerTime,
128 pub verdict: Verdict,
130}
131
132#[allow(missing_debug_implementations)]
147pub struct QueuedMonitor<Source, Mode, Verdict = Incremental, VerdictTime = RelativeFloat>
148where
149 Source: EventFactory,
150 Mode: ExecutionMode,
151 Verdict: VerdictRepresentation,
152 VerdictTime: OutputTimeRepresentation + 'static,
153{
154 ir: RtLolaMir,
155 worker: Option<JoinHandle<Result<(), QueueError>>>,
156
157 input: Sender<WorkItem<Source, Mode::SourceTime>>,
158 output: Receiver<QueuedVerdict<Verdict, VerdictTime>>,
159}
160
161impl<Source, Mode, Verdict, VerdictTime> QueuedMonitor<Source, Mode, Verdict, VerdictTime>
162where
163 Source: EventFactory + 'static,
164 Mode: ExecutionMode,
165 Verdict: VerdictRepresentation,
166 VerdictTime: OutputTimeRepresentation,
167{
168 fn runner<W: Worker<Source, Mode, Verdict, VerdictTime>>(
169 config: Config<Mode, VerdictTime>,
170 input_names: HashMap<String, InputReference>,
171 setup_data: Source::CreationData,
172 input: Receiver<WorkItem<Source, Mode::SourceTime>>,
173 output: Sender<QueuedVerdict<Verdict, VerdictTime>>,
174 ) -> Result<(), QueueError> {
175 let mut worker = W::setup(config, input_names, setup_data, input.clone(), output)?;
176 worker.wait_for_start(&input)?;
177 drop(input);
178 worker.init()?;
179 worker.process()?;
180 Ok(())
181 }
182
183 fn worker_alive(&mut self) -> Result<(), QueueError> {
184 if self.worker.is_some() {
185 if self.worker.as_ref().unwrap().is_finished() {
186 let worker = self.worker.take().unwrap();
187 worker
188 .join()
189 .map_err(|e| QueueError::ThreadPanic(format!("{:?}", e)))?
190 } else {
191 Ok(())
192 }
193 } else {
194 Err(QueueError::ThreadPanic("Worker thread died.".to_string()))
195 }
196 }
197
198 pub fn start(&mut self) -> Result<(), QueueError> {
200 self.worker_alive()?;
201 self.input
202 .send(WorkItem::Start)
203 .map_err(|msg| QueueError::ThreadSendError(Box::new(msg.0)))
204 }
205
206 pub fn output_queue(&self) -> Receiver<QueuedVerdict<Verdict, VerdictTime>> {
208 self.output.clone()
209 }
210
211 pub fn accept_event(
215 &mut self,
216 ev: Source::Record,
217 ts: <Mode::SourceTime as TimeRepresentation>::InnerTime,
218 ) -> Result<(), QueueError> {
219 self.worker_alive()?;
220 self.input
221 .send(WorkItem::Event(ev, ts))
222 .map_err(|msg| QueueError::ThreadSendError(Box::new(msg.0)))
223 }
224
225 pub fn end(self) -> Result<(), QueueError> {
227 let QueuedMonitor { worker, input, .. } = self;
228 drop(input);
230 if let Some(worker) = worker {
232 worker
233 .join()
234 .map_err(|e| QueueError::ThreadPanic(format!("{:?}", e)))?
235 } else {
236 Ok(())
237 }
238 }
239
240 pub fn ir(&self) -> &RtLolaMir {
242 &self.ir
243 }
244
245 pub fn name_for_input(&self, id: InputReference) -> &str {
251 self.ir.inputs[id].name.as_str()
252 }
253
254 pub fn name_for_output(&self, id: OutputReference) -> &str {
260 self.ir.outputs[id].name.as_str()
261 }
262
263 pub fn trigger_stream_index(&self, id: usize) -> usize {
267 self.ir.triggers[id].output_reference.out_ix()
268 }
269
270 pub fn number_of_input_streams(&self) -> usize {
274 self.ir.inputs.len()
275 }
276
277 pub fn number_of_output_streams(&self) -> usize {
281 self.ir.outputs.len()
282 }
283
284 pub fn number_of_triggers(&self) -> usize {
288 self.ir.triggers.len()
289 }
290
291 pub fn type_of_input(&self, id: InputReference) -> &Type {
297 &self.ir.inputs[id].ty
298 }
299
300 pub fn type_of_output(&self, id: OutputReference) -> &Type {
306 &self.ir.outputs[id].ty
307 }
308
309 pub fn extend_rate_of_output(&self, id: OutputReference) -> Option<Duration> {
315 self.ir
316 .time_driven
317 .iter()
318 .find(|time_driven_stream| time_driven_stream.reference.out_ix() == id)
319 .map(|time_driven_stream| time_driven_stream.period_in_duration())
320 }
321}
322
323impl<Source, SourceTime, Verdict, VerdictTime>
324 QueuedMonitor<Source, OfflineMode<SourceTime>, Verdict, VerdictTime>
325where
326 Source: EventFactory + 'static,
327 SourceTime: TimeRepresentation,
328 Verdict: VerdictRepresentation,
329 VerdictTime: OutputTimeRepresentation,
330{
331 pub fn bounded_setup(
333 config: Config<OfflineMode<SourceTime>, VerdictTime>,
334 setup_data: Source::CreationData,
335 input_queue_bound: QueueLength,
336 output_queue_bound: QueueLength,
337 ) -> QueuedMonitor<Source, OfflineMode<SourceTime>, Verdict, VerdictTime> {
338 let config_clone = config.clone();
339
340 let input_map = config
341 .ir
342 .inputs
343 .iter()
344 .map(|i| (i.name.clone(), i.reference.in_ix()))
345 .collect();
346
347 let (input_send, input_rcv) = input_queue_bound.to_queue();
348 let (output_send, output_rcv) = output_queue_bound.to_queue();
349
350 let worker = thread::spawn(move || {
351 Self::runner::<OfflineWorker<Source, SourceTime, Verdict, VerdictTime>>(
352 config_clone,
353 input_map,
354 setup_data,
355 input_rcv,
356 output_send,
357 )
358 });
359
360 QueuedMonitor {
361 ir: config.ir,
362 worker: Some(worker),
363
364 input: input_send,
365 output: output_rcv,
366 }
367 }
368
369 pub fn setup(
371 config: Config<OfflineMode<SourceTime>, VerdictTime>,
372 setup_data: Source::CreationData,
373 ) -> QueuedMonitor<Source, OfflineMode<SourceTime>, Verdict, VerdictTime> {
374 Self::bounded_setup(
375 config,
376 setup_data,
377 QueueLength::Unbounded,
378 QueueLength::Unbounded,
379 )
380 }
381}
382
383impl<Source, Verdict, VerdictTime> QueuedMonitor<Source, OnlineMode, Verdict, VerdictTime>
384where
385 Source: EventFactory + 'static,
386 Verdict: VerdictRepresentation,
387 VerdictTime: OutputTimeRepresentation,
388{
389 pub fn bounded_setup(
391 config: Config<OnlineMode, VerdictTime>,
392 setup_data: Source::CreationData,
393 input_queue_bound: QueueLength,
394 output_queue_bound: QueueLength,
395 ) -> QueuedMonitor<Source, OnlineMode, Verdict, VerdictTime> {
396 let config_clone = config.clone();
397
398 let input_map = config
399 .ir
400 .inputs
401 .iter()
402 .map(|i| (i.name.clone(), i.reference.in_ix()))
403 .collect();
404
405 let (input_send, input_rcv) = input_queue_bound.to_queue();
406 let (output_send, output_rcv) = output_queue_bound.to_queue();
407
408 let worker = thread::spawn(move || {
409 Self::runner::<OnlineWorker<Source, Verdict, VerdictTime>>(
410 config_clone,
411 input_map,
412 setup_data,
413 input_rcv,
414 output_send,
415 )
416 });
417
418 QueuedMonitor {
419 ir: config.ir,
420 worker: Some(worker),
421
422 input: input_send,
423 output: output_rcv,
424 }
425 }
426
427 pub fn setup(
429 config: Config<OnlineMode, VerdictTime>,
430 setup_data: Source::CreationData,
431 ) -> QueuedMonitor<Source, OnlineMode, Verdict, VerdictTime> {
432 Self::bounded_setup(
433 config,
434 setup_data,
435 QueueLength::Unbounded,
436 QueueLength::Unbounded,
437 )
438 }
439}
440
441enum WorkItem<Source: EventFactory, SourceTime: TimeRepresentation> {
442 Start,
443 Event(Source::Record, SourceTime::InnerTime),
444}
445
446trait Worker<Source, Mode, Verdict, VerdictTime>: Sized
447where
448 Source: EventFactory,
449 Mode: ExecutionMode,
450 Verdict: VerdictRepresentation,
451 VerdictTime: OutputTimeRepresentation + 'static,
452{
453 fn setup(
454 config: Config<Mode, VerdictTime>,
455 input_names: HashMap<String, InputReference>,
456 setup_data: Source::CreationData,
457 input: Receiver<WorkItem<Source, Mode::SourceTime>>,
458 output: Sender<QueuedVerdict<Verdict, VerdictTime>>,
459 ) -> Result<Self, QueueError>;
460
461 fn wait_for_start(
462 &mut self,
463 input: &Receiver<WorkItem<Source, Mode::SourceTime>>,
464 ) -> Result<(), QueueError> {
465 match input.recv() {
467 Ok(WorkItem::Start) => Ok(()),
468 Ok(WorkItem::Event(_, _)) => Err(QueueError::EventBeforeStart),
469 Err(_) => Ok(()),
470 }
471 }
472
473 fn init(&mut self) -> Result<(), QueueError>;
474
475 fn process(&mut self) -> Result<(), QueueError>;
476
477 fn try_send(
478 output: &Sender<QueuedVerdict<Verdict, VerdictTime>>,
479 verdict: Option<QueuedVerdict<Verdict, VerdictTime>>,
480 ) -> Result<(), QueueError> {
481 if let Some(verdict) = verdict {
482 output
483 .send(verdict)
484 .map_err(|e| QueueError::ThreadSendError(Box::new(e.0)))
485 } else {
486 Ok(())
487 }
488 }
489}
490
491struct OnlineWorker<Source, Verdict, VerdictTime>
492where
493 Source: EventFactory,
494 Verdict: VerdictRepresentation,
495 VerdictTime: OutputTimeRepresentation + 'static,
496{
497 source: Source,
498 source_time: RealTime,
499 output_time: Option<VerdictTime>,
500 start_time: Option<SystemTime>,
501
502 schedule_manager: ScheduleManager,
503 evaluator: Evaluator,
504 input: Receiver<WorkItem<Source, RealTime>>,
505 output: Sender<QueuedVerdict<Verdict, VerdictTime>>,
506}
507
508impl<
509 Source: EventFactory,
510 Verdict: VerdictRepresentation,
511 VerdictTime: OutputTimeRepresentation,
512 > Worker<Source, OnlineMode, Verdict, VerdictTime>
513 for OnlineWorker<Source, Verdict, VerdictTime>
514{
515 fn setup(
516 config: Config<OnlineMode, VerdictTime>,
517 input_names: HashMap<String, InputReference>,
518 setup_data: Source::CreationData,
519 input: Receiver<WorkItem<Source, RealTime>>,
520 output: Sender<QueuedVerdict<Verdict, VerdictTime>>,
521 ) -> Result<Self, QueueError> {
522 let source_time = config.mode.time_representation().clone();
524 let source = Source::new(input_names, setup_data)
525 .map_err(|e| QueueError::SourceError(Box::new(e)))?;
526
527 let dyn_schedule = Rc::new(RefCell::new(DynamicSchedule::new()));
529 let eval_data = EvaluatorData::new(config.ir.clone(), dyn_schedule.clone());
530 let schedule_manager = ScheduleManager::setup(config.ir.clone(), dyn_schedule)
531 .expect("Error computing schedule for time-driven streams");
532 let evaluator = eval_data.into_evaluator();
533
534 Ok(OnlineWorker {
535 source,
536 source_time,
537 output_time: None,
538 start_time: config.start_time,
539 schedule_manager,
540 evaluator,
541 input,
542 output,
543 })
544 }
545
546 fn init(&mut self) -> Result<(), QueueError> {
547 let st = self.source_time.init_start_time(self.start_time);
548 let mut ot = VerdictTime::default();
549 ot.set_start_time(st);
550 self.output_time.replace(ot);
551 Ok(())
552 }
553
554 fn process(&mut self) -> Result<(), QueueError> {
555 let output_time = self
556 .output_time
557 .as_mut()
558 .expect("Init to be executed before process");
559 loop {
560 let next_deadline = self.schedule_manager.get_next_due();
561 let item = if let Some(due) = next_deadline {
562 let now = self.source_time.convert_from(());
564 let wait_time = if due <= now {
565 Duration::ZERO
567 } else {
568 due - now
569 };
570 self.input.recv_timeout(wait_time)
571 } else {
572 self.input
573 .recv()
574 .map_err(|_| RecvTimeoutError::Disconnected)
575 };
576 let verdict = match item {
577 Ok(WorkItem::Event(e, ts)) => {
578 let mut tracer = Verdict::Tracing::default();
580 tracer.parse_start();
581 let e = self
582 .source
583 .get_event(e)
584 .map_err(|e| QueueError::SourceError(Box::new(e)))?;
585 tracer.parse_end();
586 let ts = self.source_time.convert_from(ts);
587
588 tracer.eval_start();
589 self.evaluator.eval_event(&e, ts, &mut tracer);
590 tracer.eval_end();
591
592 let verdict =
593 Verdict::create_with_trace(RawVerdict::from(&self.evaluator), tracer);
594 verdict.is_empty().not().then_some(QueuedVerdict {
595 kind: VerdictKind::Event,
596 ts: output_time.convert_into(ts),
597 verdict,
598 })
599 }
600 Err(RecvTimeoutError::Timeout) => {
601 let mut tracer = Verdict::Tracing::default();
603 tracer.eval_start();
604 let due = next_deadline.expect("timeout to only happen for a deadline.");
605 let deadline = self.schedule_manager.get_next_deadline(due);
608 self.evaluator
609 .eval_time_driven_tasks(deadline, due, &mut tracer);
610 tracer.eval_end();
611
612 let verdict =
613 Verdict::create_with_trace(RawVerdict::from(&self.evaluator), tracer);
614 verdict.is_empty().not().then_some(QueuedVerdict {
615 kind: VerdictKind::Timed,
616 ts: output_time.convert_into(due),
617 verdict,
618 })
619 }
620 Err(RecvTimeoutError::Disconnected) => {
621 return Ok(());
623 }
624 Ok(WorkItem::Start) => {
625 return Err(QueueError::MultipleStart);
627 }
628 };
629
630 Self::try_send(&self.output, verdict)?;
631 }
632 }
633}
634
635struct OfflineWorker<Source, SourceTime, Verdict, VerdictTime>
636where
637 Source: EventFactory,
638 SourceTime: TimeRepresentation,
639 Verdict: VerdictRepresentation,
640 VerdictTime: OutputTimeRepresentation + 'static,
641{
642 config: Config<OfflineMode<SourceTime>, VerdictTime>,
643 setup_data: Source::CreationData,
644
645 monitor: Option<Monitor<Source, OfflineMode<SourceTime>, Verdict, VerdictTime>>,
646 input: Receiver<WorkItem<Source, SourceTime>>,
647 output: Sender<QueuedVerdict<Verdict, VerdictTime>>,
648}
649
650impl<
651 Source: EventFactory,
652 SourceTime: TimeRepresentation,
653 Verdict: VerdictRepresentation,
654 VerdictTime: OutputTimeRepresentation,
655 > Worker<Source, OfflineMode<SourceTime>, Verdict, VerdictTime>
656 for OfflineWorker<Source, SourceTime, Verdict, VerdictTime>
657{
658 fn setup(
659 config: Config<OfflineMode<SourceTime>, VerdictTime>,
660 _input_names: HashMap<String, InputReference>,
661 setup_data: Source::CreationData,
662 input: Receiver<WorkItem<Source, SourceTime>>,
663 output: Sender<QueuedVerdict<Verdict, VerdictTime>>,
664 ) -> Result<Self, QueueError> {
665 Ok(OfflineWorker {
666 config,
667 setup_data,
668 monitor: None,
669 input,
670 output,
671 })
672 }
673
674 fn init(&mut self) -> Result<(), QueueError> {
675 let monitor: Monitor<Source, OfflineMode<SourceTime>, Verdict, VerdictTime> =
677 Monitor::setup(self.config.clone(), self.setup_data.clone())
678 .map_err(|e| QueueError::SourceError(Box::new(e)))?;
679 self.monitor.replace(monitor);
680 Ok(())
681 }
682
683 fn process(&mut self) -> Result<(), QueueError> {
684 let monitor = self
685 .monitor
686 .as_mut()
687 .expect("Init to be called before process");
688 let mut last_event = None;
689 let mut done = false;
690 while !done {
691 match self.input.recv() {
692 Ok(WorkItem::Event(e, ts)) => {
693 last_event.replace(ts.clone());
695 let Verdicts { timed, event, ts } = monitor
696 .accept_event(e, ts)
697 .map_err(|e| QueueError::SourceError(Box::new(e)))?;
698
699 for (ts, v) in timed {
700 let verdict = QueuedVerdict {
701 kind: VerdictKind::Timed,
702 ts,
703 verdict: v,
704 };
705 Self::try_send(&self.output, Some(verdict))?;
706 }
707
708 if !event.is_empty() {
709 let verdict = QueuedVerdict {
710 kind: VerdictKind::Event,
711 ts: ts.clone(),
712 verdict: event,
713 };
714 Self::try_send(&self.output, Some(verdict))?;
715 }
716 }
717 Err(_) => {
718 done = true;
720 if let Some(last_event) = last_event.as_ref() {
721 let timed = monitor.accept_time(last_event.clone());
722 for (ts, v) in timed {
723 let verdict = QueuedVerdict {
724 kind: VerdictKind::Timed,
725 ts,
726 verdict: v,
727 };
728 Self::try_send(&self.output, Some(verdict))?;
729 }
730 } else {
731 return Ok(());
732 }
733 }
734 Ok(WorkItem::Start) => {
735 return Err(QueueError::MultipleStart);
737 }
738 }
739 }
740 Ok(())
741 }
742}
743
744#[cfg(test)]
745#[cfg(not(feature = "serde"))]
746mod tests {
747 use std::convert::Infallible;
748 use std::thread::sleep;
749 use std::time::{Duration, Instant};
750
751 use crate::api::monitor::Change;
752 use crate::config::OfflineMode;
753 use crate::input::ArrayFactory;
754 use crate::monitor::{Incremental, Total, TotalIncremental, VerdictRepresentation};
755 use crate::queued::{QueuedVerdict, VerdictKind};
756 use crate::time::RelativeFloat;
757 use crate::{ConfigBuilder, QueuedMonitor, Value};
758
759 fn setup<const N: usize, V: VerdictRepresentation>(
760 spec: &str,
761 ) -> (
762 Instant,
763 QueuedMonitor<
764 ArrayFactory<N, Infallible, [Value; N]>,
765 OfflineMode<RelativeFloat>,
766 V,
767 RelativeFloat,
768 >,
769 ) {
770 let monitor = ConfigBuilder::new()
772 .spec_str(spec)
773 .offline::<RelativeFloat>()
774 .with_array_events::<N, Infallible, [Value; N]>()
775 .with_verdict::<V>()
776 .queued_monitor();
777 (Instant::now(), monitor)
778 }
779
780 fn sort_total(res: Total) -> Total {
781 let Total {
782 inputs,
783 mut outputs,
784 } = res;
785 outputs.iter_mut().for_each(|s| s.sort());
786 Total { inputs, outputs }
787 }
788
789 fn sort_incremental(mut res: Incremental) -> Incremental {
790 res.iter_mut().for_each(|(_, changes)| changes.sort());
791 res
792 }
793
794 #[test]
795 fn test_const_output_literals() {
796 let (start, mut monitor) = setup::<1, Total>(
797 r#"
798 input i_0: UInt8
799
800 output o_0: Bool @i_0 := true
801 output o_1: UInt8 @i_0 := 3
802 output o_2: Int8 @i_0 := -5
803 output o_3: Float32 @i_0 := -123.456
804 output o_4: String @i_0 := "foobar"
805 "#,
806 );
807 let queue = monitor.output_queue();
808 monitor.start().expect("Failed to start monitor");
809 let v = Value::Unsigned(3);
810 let timeout = Duration::from_millis(500);
811
812 monitor
813 .accept_event([v.clone()], start.elapsed())
814 .expect("Failed to accept event");
815 let res = queue.recv_timeout(timeout).unwrap();
816
817 assert!(res.kind == VerdictKind::Event);
818 let res = res.verdict;
819 assert_eq!(res.inputs[0], Some(v));
820 assert_eq!(res.outputs[0][0], (None, Some(Value::Bool(true))));
821 assert_eq!(res.outputs[1][0], (None, Some(Value::Unsigned(3))));
822 assert_eq!(res.outputs[2][0], (None, Some(Value::Signed(-5))));
823 assert_eq!(
824 res.outputs[3][0],
825 (None, Some(Value::try_from(-123.456).unwrap()))
826 );
827 assert_eq!(res.outputs[4][0], (None, Some(Value::Str("foobar".into()))));
828 }
829
830 #[test]
831 fn test_count_window() {
832 let (_, mut monitor) = setup::<1, Incremental>(
833 "input a: UInt16\noutput b: UInt16 @0.25Hz := a.aggregate(over: 40s, using: count)",
834 );
835
836 let timeout = Duration::from_millis(500);
837 let output = monitor.output_queue();
838 monitor.start().expect("Failed to start monitor");
839 let n = 25;
840 let mut time = Duration::from_secs(45);
841 monitor
842 .accept_event([Value::Unsigned(1)], time)
843 .expect("Failed to accept event");
844
845 let res: Vec<_> = (0..11)
846 .map(|_| output.recv_timeout(timeout).unwrap())
847 .collect();
848 assert!(output.is_empty());
849
850 assert!(res.iter().all(|v| v.kind == VerdictKind::Timed));
851 assert!(res.iter().all(|QueuedVerdict { ts, verdict, .. }| {
852 ts.as_secs() % 4 == 0
853 && verdict[0].0 == 0
854 && verdict[0].1[0] == Change::Value(None, Value::Unsigned(0))
855 }));
856 for v in 2..=n {
857 time += Duration::from_secs(1);
858 monitor
859 .accept_event([Value::Unsigned(v)], time)
860 .expect("Failed to accept event");
861 if (v - 1) % 4 == 0 {
862 let res = output.recv_timeout(timeout).unwrap();
863 assert_eq!(res.kind, VerdictKind::Timed);
864 assert_eq!(
865 res.verdict[0].1[0],
866 Change::Value(None, Value::Unsigned(v - 1))
867 );
868 } else {
869 assert!(output.is_empty());
870 }
871 }
872 }
873
874 #[test]
875 fn test_spawn_eventbased() {
876 let (_, mut monitor) = setup::<2, Total>(
877 "input a: Int32\n\
878 input b: Int32\n\
879 output c(x: Int32) spawn with a eval with x + a\n\
880 output d := b",
881 );
882
883 let timeout = Duration::from_millis(500);
884 let output = monitor.output_queue();
885 monitor.start().expect("Failed to start monitor");
886 monitor
887 .accept_event([Value::Signed(15), Value::None], Duration::from_secs(1))
888 .expect("Failed to accept event");
889 let res = output.recv_timeout(timeout).unwrap();
890
891 let expected = Total {
892 inputs: vec![Some(Value::Signed(15)), None],
893 outputs: vec![
894 vec![(Some(vec![Value::Signed(15)]), Some(Value::Signed(30)))],
895 vec![(None, None)],
896 ],
897 };
898 assert_eq!(res.kind, VerdictKind::Event);
899 assert_eq!(sort_total(res.verdict), sort_total(expected));
900
901 monitor
902 .accept_event(
903 [Value::Signed(20), Value::Signed(7)],
904 Duration::from_secs(2),
905 )
906 .expect("Failed to accept event");
907 let res = output.recv_timeout(timeout).unwrap();
908
909 let expected = Total {
910 inputs: vec![Some(Value::Signed(20)), Some(Value::Signed(7))],
911 outputs: vec![
912 vec![
913 (Some(vec![Value::Signed(15)]), Some(Value::Signed(35))),
914 (Some(vec![Value::Signed(20)]), Some(Value::Signed(40))),
915 ],
916 vec![(None, Some(Value::Signed(7)))],
917 ],
918 };
919 assert_eq!(res.kind, VerdictKind::Event);
920 assert_eq!(sort_total(res.verdict), sort_total(expected));
921
922 monitor
923 .accept_event([Value::None, Value::Signed(42)], Duration::from_secs(3))
924 .expect("Failed to accept event");
925 let res = output.recv_timeout(timeout).unwrap();
926
927 let expected = Total {
928 inputs: vec![Some(Value::Signed(20)), Some(Value::Signed(42))],
929 outputs: vec![
930 vec![
931 (Some(vec![Value::Signed(15)]), Some(Value::Signed(35))),
932 (Some(vec![Value::Signed(20)]), Some(Value::Signed(40))),
933 ],
934 vec![(None, Some(Value::Signed(42)))],
935 ],
936 };
937 assert_eq!(res.kind, VerdictKind::Event);
938 assert_eq!(sort_total(res.verdict), sort_total(expected));
939 }
940
941 #[test]
942 fn test_eval_close() {
943 let (_, mut monitor) = setup::<1, Incremental>(
944 "input a: Int32\n\
945 output c(x: Int32)\n\
946 spawn with a \n\
947 close @a when true\n\
948 eval with x + a",
949 );
950
951 let timeout = Duration::from_millis(500);
952 let output = monitor.output_queue();
953 monitor.start().expect("Failed to start monitor");
954 monitor
955 .accept_event([Value::Signed(15)], Duration::from_secs(1))
956 .expect("Failed to accept event");
957 let res = output.recv_timeout(timeout).unwrap();
958
959 let mut expected = vec![
960 Change::Spawn(vec![Value::Signed(15)]),
961 Change::Value(Some(vec![Value::Signed(15)]), Value::Signed(30)),
962 Change::Close(vec![Value::Signed(15)]),
963 ];
964 expected.sort();
965 assert_eq!(res.kind, VerdictKind::Event);
966 assert_eq!(res.verdict[0].0, 0);
967
968 assert_eq!(sort_incremental(res.verdict)[0].1, expected);
969 }
970
971 #[test]
972 fn test_online_time() {
973 let spec = "\
974 input i: UInt64\n\
975 output o @10Hz := true\
976 ";
977 let mut monitor = ConfigBuilder::new()
978 .spec_str(spec)
979 .online()
980 .with_array_events::<1, Infallible, [Value; 1]>()
981 .with_verdict::<TotalIncremental>()
982 .queued_monitor();
983 let output = monitor.output_queue();
984 monitor.start().expect("Failed to start monitor");
985 sleep(Duration::from_millis(1090));
986 monitor.end().unwrap();
987 assert_eq!(output.len(), 10);
988 }
989}