1use std::cell::RefCell;
21use std::fmt::{Debug, Display, Formatter};
22use std::marker::PhantomData;
23use std::rc::Rc;
24use std::time::Duration;
25
26use itertools::Itertools;
27use rtlola_frontend::mir::{InputReference, OutputReference, RtLolaMir, TriggerReference, Type};
28#[cfg(feature = "serde")]
29use serde::Serialize;
30
31use crate::config::{Config, ExecutionMode};
32use crate::configuration::time::{OutputTimeRepresentation, RelativeFloat, TimeRepresentation};
33use crate::evaluator::{Evaluator, EvaluatorData};
34use crate::input::{EventFactory, EventFactoryError};
35use crate::schedule::schedule_manager::ScheduleManager;
36use crate::schedule::DynamicSchedule;
37use crate::storage::Value;
38use crate::{CondSerialize, Time};
39
40pub type Event = Vec<Value>;
42
43pub trait VerdictRepresentation: Clone + Debug + Send + CondSerialize + 'static {
47 type Tracing: Tracer;
49
50 fn create(data: RawVerdict) -> Self;
52
53 fn create_with_trace(data: RawVerdict, _tracing: Self::Tracing) -> Self {
55 Self::create(data)
56 }
57
58 fn is_empty(&self) -> bool;
60}
61
62pub trait Tracer: Default + Clone + Debug + Send + 'static {
67 fn parse_start(&mut self) {}
69 fn parse_end(&mut self) {}
71
72 fn eval_start(&mut self) {}
74 fn eval_end(&mut self) {}
76
77 fn spawn_start(&mut self, _output: OutputReference) {}
79 fn spawn_end(&mut self, _output: OutputReference) {}
81
82 fn instance_eval_start(&mut self, _output: OutputReference, _instance: &[Value]) {}
84 fn instance_eval_end(&mut self, _output: OutputReference, _instance: &[Value]) {}
86
87 fn close_start(&mut self, _output: OutputReference, _instance: &[Value]) {}
89 fn close_end(&mut self, _output: OutputReference, _instance: &[Value]) {}
91}
92
93#[cfg_attr(feature = "serde", derive(Serialize))]
95#[derive(Debug, Clone, Copy, Default)]
96pub struct NoTracer {}
97impl Tracer for NoTracer {}
98
99#[cfg_attr(feature = "serde", derive(Serialize))]
101#[derive(Debug, Clone)]
102pub struct TracingVerdict<T: Tracer, V: VerdictRepresentation> {
103 #[cfg_attr(feature = "serde", serde(skip))]
105 pub tracer: T,
106 pub verdict: V,
108}
109
110impl<T: Tracer, V: VerdictRepresentation<Tracing = NoTracer>> VerdictRepresentation
111 for TracingVerdict<T, V>
112{
113 type Tracing = T;
114
115 fn create(data: RawVerdict) -> Self {
116 Self {
117 tracer: T::default(),
118 verdict: V::create(data),
119 }
120 }
121
122 fn create_with_trace(data: RawVerdict, tracing: Self::Tracing) -> Self {
123 Self {
124 tracer: tracing,
125 verdict: V::create(data),
126 }
127 }
128
129 fn is_empty(&self) -> bool {
130 V::is_empty(&self.verdict)
131 }
132}
133
134pub type Parameters = Option<Vec<Value>>;
138
139pub type Instance = (Parameters, Option<Value>);
141
142#[cfg_attr(feature = "serde", derive(Serialize))]
144#[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd)]
145pub enum Change {
146 Spawn(Vec<Value>),
148 Value(Parameters, Value),
150 Close(Vec<Value>),
152}
153
154impl Display for Change {
155 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
156 match self {
157 Change::Spawn(para) => write!(f, "Spawn<{}>", para.iter().join(", ")),
158 Change::Close(para) => write!(f, "Close<{}>", para.iter().join(", ")),
159 Change::Value(para, value) => match para {
160 Some(para) => write!(f, "Instance<{}> = {}", para.iter().join(", "), value),
161 None => write!(f, "Value = {}", value),
162 },
163 }
164 }
165}
166
167pub type Incremental = Vec<(OutputReference, Vec<Change>)>;
171
172impl VerdictRepresentation for Incremental {
173 type Tracing = NoTracer;
174
175 fn create(data: RawVerdict) -> Self {
176 data.eval.peek_fresh_outputs()
177 }
178
179 fn is_empty(&self) -> bool {
180 Vec::is_empty(self)
181 }
182}
183
184#[cfg_attr(feature = "serde", derive(Serialize))]
193#[derive(Debug, Clone)]
194pub struct TotalIncremental {
195 pub inputs: Vec<(InputReference, Value)>,
197 pub outputs: Vec<(OutputReference, Vec<Change>)>,
199 pub trigger: Vec<TriggerReference>,
201}
202
203impl VerdictRepresentation for TotalIncremental {
204 type Tracing = NoTracer;
205
206 fn create(data: RawVerdict) -> Self {
207 let inputs = data.eval.peek_fresh_input();
208 let outputs = data.eval.peek_fresh_outputs();
209 let trigger = data.eval.peek_violated_triggers();
210 Self {
211 inputs,
212 outputs,
213 trigger,
214 }
215 }
216
217 fn is_empty(&self) -> bool {
218 self.inputs.is_empty() && self.outputs.is_empty() && self.trigger.is_empty()
219 }
220}
221
222#[cfg_attr(feature = "serde", derive(Serialize))]
226#[derive(Debug, Clone, Eq, PartialEq)]
227pub struct Total {
228 pub inputs: Vec<Option<Value>>,
230
231 pub outputs: Vec<Vec<Instance>>,
234}
235
236impl VerdictRepresentation for Total {
237 type Tracing = NoTracer;
238
239 fn create(data: RawVerdict) -> Self {
240 Total {
241 inputs: data.eval.peek_inputs(),
242 outputs: data.eval.peek_outputs(),
243 }
244 }
245
246 fn is_empty(&self) -> bool {
247 false
248 }
249}
250
251pub type TriggerMessages = Vec<(OutputReference, Parameters, String)>;
255
256impl VerdictRepresentation for TriggerMessages {
257 type Tracing = NoTracer;
258
259 fn create(data: RawVerdict) -> Self
260 where
261 Self: Sized,
262 {
263 data.eval.peek_violated_triggers_messages()
264 }
265
266 fn is_empty(&self) -> bool {
267 Vec::is_empty(self)
268 }
269}
270
271#[cfg_attr(feature = "serde", derive(Serialize))]
278#[derive(Debug, Clone)]
279pub struct Verdicts<V: VerdictRepresentation, VerdictTime: OutputTimeRepresentation> {
280 pub timed: Vec<(VerdictTime::InnerTime, V)>,
282 pub event: V,
284 pub ts: VerdictTime::InnerTime,
286}
287
288#[allow(missing_debug_implementations)]
300pub struct Monitor<Source, Mode, Verdict = Incremental, VerdictTime = RelativeFloat>
301where
302 Source: EventFactory,
303 Mode: ExecutionMode,
304 Verdict: VerdictRepresentation,
305 VerdictTime: OutputTimeRepresentation + 'static,
306{
307 ir: RtLolaMir,
308 eval: Evaluator,
309
310 schedule_manager: ScheduleManager,
311
312 source: Source,
313
314 source_time: Mode::SourceTime,
315 output_time: VerdictTime,
316
317 phantom: PhantomData<Verdict>,
318}
319
320impl<Source, Mode, Verdict, VerdictTime> Monitor<Source, Mode, Verdict, VerdictTime>
322where
323 Source: EventFactory,
324 Mode: ExecutionMode,
325 Verdict: VerdictRepresentation,
326 VerdictTime: OutputTimeRepresentation,
327{
328 pub fn setup(
330 config: Config<Mode, VerdictTime>,
331 setup_data: Source::CreationData,
332 ) -> Result<Monitor<Source, Mode, Verdict, VerdictTime>, EventFactoryError> {
333 let dyn_schedule = Rc::new(RefCell::new(DynamicSchedule::new()));
334 let mut source_time = config.mode.time_representation().clone();
335 let mut output_time = VerdictTime::default();
336
337 let st = source_time.init_start_time(config.start_time);
338 output_time.set_start_time(st);
339
340 let input_map = config
341 .ir
342 .inputs
343 .iter()
344 .map(|i| (i.name.clone(), i.reference.in_ix()))
345 .collect();
346
347 let eval_data = EvaluatorData::new(config.ir.clone(), dyn_schedule.clone());
348
349 let time_manager = ScheduleManager::setup(config.ir.clone(), dyn_schedule)
350 .expect("Error computing schedule for time-driven streams");
351
352 Ok(Monitor {
353 ir: config.ir,
354 eval: eval_data.into_evaluator(),
355
356 schedule_manager: time_manager,
357
358 source: Source::new(input_map, setup_data)?,
359
360 source_time,
361 output_time,
362
363 phantom: PhantomData,
364 })
365 }
366
367 fn eval_deadlines(&mut self, ts: Time, only_before: bool) -> Vec<(Time, Verdict)> {
368 let mut timed: Vec<(Time, Verdict)> = vec![];
369 while self.schedule_manager.get_next_due().is_some() {
370 let mut tracer = Verdict::Tracing::default();
371 tracer.eval_start();
372 let due = self.schedule_manager.get_next_due().unwrap();
373 if due > ts || (only_before && due == ts) {
374 break;
375 }
376 let deadline = self.schedule_manager.get_next_deadline(ts);
377
378 self.eval.eval_time_driven_tasks(deadline, due, &mut tracer);
379 tracer.eval_end();
380 timed.push((
381 due,
382 Verdict::create_with_trace(RawVerdict::from(&self.eval), tracer),
383 ))
384 }
385 timed
386 }
387}
388
389#[allow(missing_debug_implementations)]
391#[derive(Copy, Clone)]
392pub struct RawVerdict<'a> {
393 eval: &'a Evaluator,
394}
395
396impl<'a> From<&'a Evaluator> for RawVerdict<'a> {
397 fn from(eval: &'a Evaluator) -> Self {
398 RawVerdict { eval }
399 }
400}
401
402impl<Source, Mode, Verdict, VerdictTime> Monitor<Source, Mode, Verdict, VerdictTime>
404where
405 Source: EventFactory,
406 Mode: ExecutionMode,
407 Verdict: VerdictRepresentation,
408 VerdictTime: OutputTimeRepresentation,
409{
410 pub fn accept_event(
416 &mut self,
417 ev: Source::Record,
418 ts: <Mode::SourceTime as TimeRepresentation>::InnerTime,
419 ) -> Result<Verdicts<Verdict, VerdictTime>, EventFactoryError> {
420 let mut tracer = Verdict::Tracing::default();
421
422 tracer.parse_start();
423 let ev = self.source.get_event(ev)?;
424 tracer.parse_end();
425 let ts = self.source_time.convert_from(ts);
426
427 let timed = if self.ir.has_time_driven_features() {
429 self.eval_deadlines(ts, true)
430 } else {
431 vec![]
432 };
433
434 tracer.eval_start();
436 self.eval.eval_event(ev.as_slice(), ts, &mut tracer);
437 tracer.eval_end();
438 let event_change = Verdict::create_with_trace(RawVerdict::from(&self.eval), tracer);
439
440 let timed = timed
441 .into_iter()
442 .map(|(t, v)| (self.output_time.convert_into(t), v))
443 .collect();
444
445 Ok(Verdicts::<Verdict, VerdictTime> {
446 timed,
447 event: event_change,
448 ts: self.output_time.convert_into(ts),
449 })
450 }
451
452 pub fn accept_time(
456 &mut self,
457 ts: <Mode::SourceTime as TimeRepresentation>::InnerTime,
458 ) -> Vec<(VerdictTime::InnerTime, Verdict)> {
459 let ts = self.source_time.convert_from(ts);
460
461 let timed = if self.ir.has_time_driven_features() {
462 self.eval_deadlines(ts, false)
463 } else {
464 vec![]
465 };
466
467 timed
468 .into_iter()
469 .map(|(t, v)| (self.output_time.convert_into(t), v))
470 .collect()
471 }
472
473 pub fn ir(&self) -> &RtLolaMir {
475 &self.ir
476 }
477
478 pub fn name_for_input(&self, id: InputReference) -> &str {
484 self.ir.inputs[id].name.as_str()
485 }
486
487 pub fn name_for_output(&self, id: OutputReference) -> &str {
493 self.ir.outputs[id].name.as_str()
494 }
495
496 pub fn trigger_stream_index(&self, id: usize) -> usize {
500 self.ir.triggers[id].output_reference.out_ix()
501 }
502
503 pub fn number_of_input_streams(&self) -> usize {
507 self.ir.inputs.len()
508 }
509
510 pub fn number_of_output_streams(&self) -> usize {
514 self.ir.outputs.len()
515 }
516
517 pub fn number_of_triggers(&self) -> usize {
521 self.ir.triggers.len()
522 }
523
524 pub fn type_of_input(&self, id: InputReference) -> &Type {
530 &self.ir.inputs[id].ty
531 }
532
533 pub fn type_of_output(&self, id: OutputReference) -> &Type {
539 &self.ir.outputs[id].ty
540 }
541
542 pub fn extend_rate_of_output(&self, id: OutputReference) -> Option<Duration> {
548 self.ir
549 .time_driven
550 .iter()
551 .find(|time_driven_stream| time_driven_stream.reference.out_ix() == id)
552 .map(|time_driven_stream| time_driven_stream.period_in_duration())
553 }
554
555 pub fn with_verdict_representation<T: VerdictRepresentation>(
557 self,
558 ) -> Monitor<Source, Mode, T, VerdictTime> {
559 let Monitor {
560 ir,
561 eval,
562 schedule_manager: time_manager,
563 source_time,
564 source,
565 output_time,
566 phantom: _,
567 } = self;
568 Monitor {
569 ir,
570 eval,
571 schedule_manager: time_manager,
572 source_time,
573 source,
574 output_time,
575 phantom: PhantomData,
576 }
577 }
578}
579
580#[cfg(test)]
581#[cfg(not(feature = "serde"))]
582mod tests {
583 use std::convert::Infallible;
584 use std::time::{Duration, Instant};
585
586 use crate::api::monitor::Change;
587 use crate::config::OfflineMode;
588 use crate::input::ArrayFactory;
589 use crate::monitor::{Incremental, Monitor, Total, Value, VerdictRepresentation};
590 use crate::time::RelativeFloat;
591 use crate::ConfigBuilder;
592
593 fn setup<const N: usize, V: VerdictRepresentation>(
594 spec: &str,
595 ) -> (
596 Instant,
597 Monitor<
598 ArrayFactory<N, Infallible, [Value; N]>,
599 OfflineMode<RelativeFloat>,
600 V,
601 RelativeFloat,
602 >,
603 ) {
604 let monitor = ConfigBuilder::new()
606 .spec_str(spec)
607 .offline::<RelativeFloat>()
608 .with_array_events::<N, Infallible, [Value; N]>()
609 .with_verdict::<V>()
610 .monitor()
611 .expect("Failed to create monitor");
612 (Instant::now(), monitor)
613 }
614
615 fn sort_total(res: Total) -> Total {
616 let Total {
617 inputs,
618 mut outputs,
619 } = res;
620 outputs.iter_mut().for_each(|s| s.sort());
621 Total { inputs, outputs }
622 }
623
624 fn sort_incremental(mut res: Incremental) -> Incremental {
625 res.iter_mut().for_each(|(_, changes)| changes.sort());
626 res
627 }
628
629 #[test]
630 fn test_const_output_literals() {
631 let (start, mut monitor) = setup::<1, Total>(
632 r#"
633 input i_0: UInt8
634
635 output o_0: Bool @i_0 := true
636 output o_1: UInt8 @i_0 := 3
637 output o_2: Int8 @i_0 := -5
638 output o_3: Float32 @i_0 := -123.456
639 output o_4: String @i_0 := "foobar"
640 "#,
641 );
642 let v = Value::Unsigned(3);
643 let res = monitor
644 .accept_event([v.clone()], start.elapsed())
645 .expect("Failed to accept value");
646 assert!(res.timed.is_empty());
647 let res = res.event;
648 assert_eq!(res.inputs[0], Some(v));
649 assert_eq!(res.outputs[0][0], (None, Some(Value::Bool(true))));
650 assert_eq!(res.outputs[1][0], (None, Some(Value::Unsigned(3))));
651 assert_eq!(res.outputs[2][0], (None, Some(Value::Signed(-5))));
652 assert_eq!(
653 res.outputs[3][0],
654 (None, Some(Value::try_from(-123.456).unwrap()))
655 );
656 assert_eq!(res.outputs[4][0], (None, Some(Value::Str("foobar".into()))));
657 }
658
659 #[test]
660 fn test_count_window() {
661 let (_, mut monitor) = setup::<1, Incremental>(
662 "input a: UInt16\noutput b: UInt16 @0.25Hz := a.aggregate(over: 40s, using: count)",
663 );
664
665 let n = 25;
666 let mut time = Duration::from_secs(45);
667 let res = monitor
668 .accept_event([Value::Unsigned(1)], time)
669 .expect("Failed to accept value");
670 assert!(res.event.is_empty());
671 assert_eq!(res.timed.len(), 11);
672 assert!(res.timed.iter().all(|(time, change)| {
673 time.as_secs() % 4 == 0
674 && change[0].0 == 0
675 && change[0].1[0] == Change::Value(None, Value::Unsigned(0))
676 }));
677 for v in 2..=n {
678 time += Duration::from_secs(1);
679 let res = monitor
680 .accept_event([Value::Unsigned(v)], time)
681 .expect("Failed to accept value");
682
683 assert_eq!(res.event.len(), 0);
684 if (v - 1) % 4 == 0 {
685 assert_eq!(res.timed.len(), 1);
686 assert_eq!(
687 res.timed[0].1[0].1[0],
688 Change::Value(None, Value::Unsigned(v - 1))
689 );
690 } else {
691 assert_eq!(res.timed.len(), 0);
692 }
693 }
694 }
695
696 #[test]
697 fn test_spawn_eventbased() {
698 let (_, mut monitor) = setup::<2, Total>(
699 "input a: Int32\n\
700 input b: Int32\n\
701 output c(x: Int32) spawn with a eval with x + a\n\
702 output d := b",
703 );
704
705 let res = monitor
706 .accept_event([Value::Signed(15), Value::None], Duration::from_secs(1))
707 .expect("Failed to accept value");
708 let expected = Total {
709 inputs: vec![Some(Value::Signed(15)), None],
710 outputs: vec![
711 vec![(Some(vec![Value::Signed(15)]), Some(Value::Signed(30)))],
712 vec![(None, None)],
713 ],
714 };
715 assert_eq!(res.event, expected);
716 assert_eq!(res.timed.len(), 0);
717
718 let res = monitor
719 .accept_event(
720 [Value::Signed(20), Value::Signed(7)],
721 Duration::from_secs(2),
722 )
723 .expect("Failed to accept value");
724 let expected = Total {
725 inputs: vec![Some(Value::Signed(20)), Some(Value::Signed(7))],
726 outputs: vec![
727 vec![
728 (Some(vec![Value::Signed(15)]), Some(Value::Signed(35))),
729 (Some(vec![Value::Signed(20)]), Some(Value::Signed(40))),
730 ],
731 vec![(None, Some(Value::Signed(7)))],
732 ],
733 };
734 assert_eq!(sort_total(res.event), sort_total(expected));
735 assert_eq!(res.timed.len(), 0);
736
737 let res = monitor
738 .accept_event([Value::None, Value::Signed(42)], Duration::from_secs(3))
739 .expect("Failed to accept value");
740 let expected = Total {
741 inputs: vec![Some(Value::Signed(20)), Some(Value::Signed(42))],
742 outputs: vec![
743 vec![
744 (Some(vec![Value::Signed(15)]), Some(Value::Signed(35))),
745 (Some(vec![Value::Signed(20)]), Some(Value::Signed(40))),
746 ],
747 vec![(None, Some(Value::Signed(42)))],
748 ],
749 };
750 assert_eq!(sort_total(res.event), sort_total(expected));
751 assert_eq!(res.timed.len(), 0);
752 }
753
754 #[test]
755 fn test_eval_close() {
756 let (_, mut monitor) = setup::<1, Incremental>(
757 "input a: Int32\n\
758 output c(x: Int32)\n\
759 spawn with a \n\
760 close @a when true\n\
761 eval with x + a",
762 );
763
764 let res = monitor
765 .accept_event([Value::Signed(15)], Duration::from_secs(1))
766 .expect("Failed to accept value");
767 let mut expected = vec![
768 Change::Spawn(vec![Value::Signed(15)]),
769 Change::Value(Some(vec![Value::Signed(15)]), Value::Signed(30)),
770 Change::Close(vec![Value::Signed(15)]),
771 ];
772 expected.sort();
773 assert!(res.timed.is_empty());
774 assert_eq!(res.event[0].0, 0);
775
776 assert_eq!(sort_incremental(res.event)[0].1, expected);
777 }
778}