rtlola_hir/hir/
selector.rs

1//! This module covers a variety of selectors to extract different kinds of output streams from the High-Level Intermediate Representation (HIR) of an RTLola specification.
2//!
3//! The entrypoint is the [select](crate::RtLolaHir::select) method of the HIR which requires at least the Typed State.
4//!
5//! # Most Notable Structs and Enums
6//! * [StreamSelector] is the data structure representing the combination of one or multiple selectors.
7//! * [FilterSelector] represents the variants of filters a stream has.
8//! * [PacingSelector] allows to select streams based on their pacing type.
9//! * [CloseSelector] represents the variants of close conditions a stream has.
10//! * [ParameterSelector] allows to select different parameter properties of a stream.
11//!
12//! # See Also
13//! * [RtLolaHir](crate::RtLolaHir) the High-Level Intermediate Representation (HIR) of an RTLola specification.
14
15use crate::hir::{ConcretePacingType, Hir, Output, SRef, TypedTrait};
16use crate::modes::types::HirType;
17use crate::modes::HirMode;
18
19impl<M: HirMode + TypedTrait> Hir<M> {
20    /// Creates a [StreamSelector] to query the HIR for different classes of output streams.
21    pub fn select(&self) -> StreamSelector<M, All> {
22        StreamSelector::all(self)
23    }
24}
25
26/// Represents a selectable property of an output stream.
27pub trait Selectable: Copy {
28    /// Returns true if the stream represented by `sref` is accepted (selected) by the selector.
29    fn select<M: HirMode + TypedTrait>(&self, hir: &Hir<M>, sref: SRef) -> bool;
30}
31
32/// An enum used to select different filter behaviours of a stream.
33#[derive(Debug, Clone, Copy)]
34pub enum FilterSelector {
35    /// Any stream matches this selector
36    Any,
37    /// Only streams *with* a filter condition match this selector
38    Filtered,
39    /// Only streams *without* a filter condition match this selector
40    Unfiltered,
41}
42
43impl Selectable for FilterSelector {
44    fn select<M: HirMode + TypedTrait>(&self, hir: &Hir<M>, sref: SRef) -> bool {
45        assert!(sref.is_output());
46        let output = hir.output(sref).unwrap();
47        match self {
48            FilterSelector::Any => true,
49            FilterSelector::Filtered => output.eval().iter().any(|eval| eval.condition.is_some()),
50            FilterSelector::Unfiltered => output.eval().iter().all(|eval| eval.condition.is_none()),
51        }
52    }
53}
54
55#[derive(Debug, Clone, Copy)]
56/// An enum used to select different closing behaviours of a stream.
57pub enum CloseSelector {
58    /// Any stream matches this behaviour.
59    Any,
60    /// Only streams *with* a close condition match this selector.
61    Closed,
62    /// Only streams with an event-based close condition match this selector.
63    EventBased,
64    /// Only streams with an periodic close condition that is dynamically scheduled match this selector.
65    /// I.e. if the close expression access a periodic stream that is spawned / dynamically created.
66    DynamicPeriodic,
67    /// Only streams with an periodic close condition that is statically scheduled match this selector.
68    StaticPeriodic,
69    /// Only streams with an periodic close condition match this selector.
70    AnyPeriodic,
71    /// Only streams *without* a close condition match this selector.
72    NotClosed,
73}
74
75impl Selectable for CloseSelector {
76    fn select<M: HirMode + TypedTrait>(&self, hir: &Hir<M>, sref: SRef) -> bool {
77        assert!(sref.is_output());
78        let output = hir.output(sref).unwrap();
79        let close_ty: Option<HirType> = output.close_cond().map(|cond| hir.expr_type(cond));
80        match self {
81            CloseSelector::Any => true,
82            CloseSelector::Closed => close_ty.is_some(),
83            CloseSelector::EventBased => close_ty
84                .map(|t| t.eval_pacing.is_event_based() || t.eval_pacing.is_constant())
85                .unwrap_or(false),
86            CloseSelector::DynamicPeriodic => close_ty
87                .map(|t| t.eval_pacing.is_periodic() && !t.spawn_pacing.is_constant())
88                .unwrap_or(false),
89            CloseSelector::StaticPeriodic => close_ty
90                .map(|t| t.eval_pacing.is_periodic() && t.spawn_pacing.is_constant())
91                .unwrap_or(false),
92            CloseSelector::AnyPeriodic => close_ty
93                .map(|t| t.eval_pacing.is_periodic())
94                .unwrap_or(false),
95            CloseSelector::NotClosed => output.close().is_none(),
96        }
97    }
98}
99
100#[derive(Debug, Clone, Copy)]
101/// An enum used to select different pacing behaviours.
102pub enum PacingSelector {
103    /// The subject is either event-based or periodic
104    Any,
105    /// Only event-based subjects match this selector.
106    EventBased,
107    /// Only periodic subjects match this selector.
108    Periodic,
109}
110
111impl PacingSelector {
112    fn select_eval<M: HirMode + TypedTrait>(&self, hir: &Hir<M>, sref: SRef) -> bool {
113        assert!(sref.is_output());
114        let ty: ConcretePacingType = hir.stream_type(sref).eval_pacing;
115        match self {
116            PacingSelector::Any => true,
117            PacingSelector::EventBased => ty.is_event_based(),
118            PacingSelector::Periodic => ty.is_periodic(),
119        }
120    }
121
122    fn select_spawn<M: HirMode + TypedTrait>(&self, hir: &Hir<M>, sref: SRef) -> bool {
123        assert!(sref.is_output());
124        let ty: ConcretePacingType = hir.stream_type(sref).spawn_pacing;
125        match self {
126            PacingSelector::Any => true,
127            PacingSelector::EventBased => ty.is_event_based(),
128            PacingSelector::Periodic => ty.is_periodic(),
129        }
130    }
131}
132
133#[derive(Debug, Clone, Copy)]
134/// An enum used to select different parameter configurations of a stream.
135pub enum ParameterSelector {
136    /// Any stream matches this selector.
137    Any,
138    /// Only streams *with* parameters match this selector.
139    Parameterized,
140    /// Only streams *without* parameters match this selector.
141    NotParameterized,
142}
143
144impl Selectable for ParameterSelector {
145    fn select<M: HirMode + TypedTrait>(&self, hir: &Hir<M>, sref: SRef) -> bool {
146        assert!(sref.is_output());
147        let paras = &hir.output(sref).unwrap().params;
148        match self {
149            ParameterSelector::Any => true,
150            ParameterSelector::Parameterized => !paras.is_empty(),
151            ParameterSelector::NotParameterized => paras.is_empty(),
152        }
153    }
154}
155
156#[derive(Debug, Clone, Copy)]
157/// A selector struct to capture all streams of the hir.
158pub struct All {}
159impl Selectable for All {
160    fn select<M: HirMode + TypedTrait>(&self, _hir: &Hir<M>, _sref: SRef) -> bool {
161        true
162    }
163}
164
165#[derive(Debug, Clone, Copy)]
166/// A selector struct to capture all statically created streams of the hir.
167pub struct Static {}
168impl Selectable for Static {
169    fn select<M: HirMode + TypedTrait>(&self, hir: &Hir<M>, sref: SRef) -> bool {
170        assert!(sref.is_output());
171        hir.stream_type(sref).spawn_pacing.is_constant()
172    }
173}
174
175#[derive(Debug, Clone, Copy)]
176/// A selector struct to capture all dynamically created streams of the hir.
177pub struct Dynamic {
178    /// Determines the pacing behaviour of the selected condition.
179    spawn: PacingSelector,
180    /// Determines the closing behaviour of the selected streams.
181    close: CloseSelector,
182    /// Determines the parameter configuration of the selected streams.
183    parameter: ParameterSelector,
184}
185impl Selectable for Dynamic {
186    fn select<M: HirMode + TypedTrait>(&self, hir: &Hir<M>, sref: SRef) -> bool {
187        assert!(sref.is_output());
188        !hir.stream_type(sref).spawn_pacing.is_constant()
189            && self.spawn.select_spawn(hir, sref)
190            && self.close.select(hir, sref)
191            && self.parameter.select(hir, sref)
192    }
193}
194
195#[derive(Debug, Clone, Copy)]
196/// A StreamSelector combines different selectors to extract specific stream classes from the Hir.
197/// It captures a subset of output streams, where each stream matches all selectors of the StreamSelector.
198pub struct StreamSelector<'a, M: HirMode + TypedTrait, S: Selectable> {
199    /// A reference to the Hir
200    hir: &'a Hir<M>,
201    /// The underlying selector state representing a certain class of streams.
202    ///
203    /// # See also:
204    /// * [All]
205    /// * [Static]
206    /// * [Dynamic]
207    state: S,
208    /// Determines the filter behaviour of the selected streams.
209    filter: FilterSelector,
210    /// Determines the evaluation pacing of the selected streams.
211    eval: PacingSelector,
212}
213
214impl<'a, M: HirMode + TypedTrait, S: Selectable> StreamSelector<'a, M, S> {
215    /// Selects streams *with* a filter condition.
216    pub fn filtered(mut self) -> Self {
217        self.filter = FilterSelector::Filtered;
218        self
219    }
220
221    /// Selects streams *without* a filter condition.
222    pub fn unfiltered(mut self) -> Self {
223        self.filter = FilterSelector::Unfiltered;
224        self
225    }
226
227    /// Selects streams with the filter behaviour specified by the FilterSelector.
228    pub fn filter(mut self, selector: FilterSelector) -> Self {
229        self.filter = selector;
230        self
231    }
232
233    /// Selects streams with periodic evaluation pacing.
234    pub fn periodic_eval(mut self) -> Self {
235        self.eval = PacingSelector::Periodic;
236        self
237    }
238
239    /// Selects streams with event-based evaluation pacing.
240    pub fn event_based_eval(mut self) -> Self {
241        self.eval = PacingSelector::EventBased;
242        self
243    }
244
245    /// Selects streams with an evaluation pacing matching the [PacingSelector].
246    pub fn eval(mut self, selector: PacingSelector) -> Self {
247        self.eval = selector;
248        self
249    }
250
251    fn select(&self, sref: SRef) -> bool {
252        assert!(sref.is_output());
253        self.filter.select(self.hir, sref)
254            && self.eval.select_eval(self.hir, sref)
255            && self.state.select(self.hir, sref)
256    }
257
258    /// Construct the represented subset of output streams matching the given selections.
259    pub fn build(self) -> impl Iterator<Item = &'a Output> {
260        self.hir.outputs().filter(move |o| self.select(o.sr))
261    }
262}
263
264impl<'a, M: HirMode + TypedTrait> StreamSelector<'a, M, All> {
265    /// Creates a new StreamSelector matching all output streams.
266    pub fn all(hir: &'a Hir<M>) -> Self {
267        StreamSelector {
268            hir,
269            state: All {},
270            filter: FilterSelector::Any,
271            eval: PacingSelector::Any,
272        }
273    }
274
275    /// Selects statically created streams.
276    /// I.e. only streams without a spawn condition.
277    pub fn static_streams(self) -> StreamSelector<'a, M, Static> {
278        StreamSelector {
279            hir: self.hir,
280            state: Static {},
281            filter: self.filter,
282            eval: self.eval,
283        }
284    }
285
286    /// Selects dynamically created streams.
287    /// I.e. only streams with a spawn condition.
288    pub fn dynamic_streams(self) -> StreamSelector<'a, M, Dynamic> {
289        StreamSelector {
290            hir: self.hir,
291            state: Dynamic {
292                spawn: PacingSelector::Any,
293                close: CloseSelector::Any,
294                parameter: ParameterSelector::Any,
295            },
296            filter: self.filter,
297            eval: self.eval,
298        }
299    }
300}
301
302impl<M: HirMode + TypedTrait> StreamSelector<'_, M, Dynamic> {
303    /// Selects streams with a periodic spawn pacing.
304    pub fn periodic_spawn(mut self) -> Self {
305        self.state.spawn = PacingSelector::Periodic;
306        self
307    }
308
309    /// Selects streams with a event-based spawn pacing.
310    pub fn event_based_spawn(mut self) -> Self {
311        self.state.spawn = PacingSelector::EventBased;
312        self
313    }
314
315    /// Selects streams with a spawn pacing matching the [PacingSelector].
316    pub fn spawn(mut self, selector: PacingSelector) -> Self {
317        self.state.spawn = selector;
318        self
319    }
320
321    /// Selects streams with a closing behaviour matching the [CloseSelector]
322    pub fn close(mut self, selector: CloseSelector) -> Self {
323        self.state.close = selector;
324        self
325    }
326
327    /// Selects streams with parameters
328    pub fn parameterized(mut self) -> Self {
329        self.state.parameter = ParameterSelector::Parameterized;
330        self
331    }
332
333    /// Selects streams without parameters
334    pub fn not_parameterized(mut self) -> Self {
335        self.state.parameter = ParameterSelector::NotParameterized;
336        self
337    }
338
339    /// Selects streams with a parameter configuration matching the [ParameterSelector]
340    pub fn parameters(mut self, selector: ParameterSelector) -> Self {
341        self.state.parameter = selector;
342        self
343    }
344}
345
346#[cfg(test)]
347mod tests {
348    use itertools::Itertools;
349    use rtlola_parser::ParserConfig;
350
351    use super::*;
352    use crate::config::FrontendConfig;
353    use crate::CompleteMode;
354
355    macro_rules! assert_streams {
356        ($streams:expr, $expected:expr) => {
357            let names: Vec<String> = $streams.map(|o| o.name()).sorted().collect();
358            let expect: Vec<&str> = $expected.into_iter().sorted().collect();
359            assert_eq!(names, expect);
360        };
361    }
362
363    fn get_hir() -> Hir<CompleteMode> {
364        let spec = "input i: Int8\n\
365        output a eval @1Hz with true\n\
366        output b spawn when i = 5 eval @1Hz when i.hold(or: 0) = 5 with 42\n\
367        output c (p) spawn @1Hz with i.hold(or: 0) eval with i + p\n\
368        output d spawn when i = 8 eval with i close when a\n\
369        output e spawn when i = 3 eval @1Hz when i.hold(or: 1) % 2 = 0 with true close when e \n\
370        output f (p: Int8) spawn @1Hz with i.aggregate(over:1s, using: sum) eval with i close when i = 8\n\
371        output g eval when i = 5 with i + 5";
372
373        let parser_config = ParserConfig::for_string(spec.to_string());
374        let frontend_config = FrontendConfig::from(&parser_config);
375        let ast = parser_config.parse().unwrap_or_else(|e| panic!("{:?}", e));
376        let hir = crate::fully_analyzed(ast, &frontend_config).expect("Invalid Spec");
377        hir
378    }
379
380    #[test]
381    fn test_all() {
382        let hir = get_hir();
383
384        assert_streams!(
385            hir.select().build(),
386            vec!["a", "b", "c", "d", "e", "f", "g"]
387        );
388
389        assert_streams!(hir.select().periodic_eval().build(), vec!["a", "b", "e"]);
390        assert_streams!(
391            hir.select().event_based_eval().build(),
392            vec!["c", "d", "f", "g"]
393        );
394        assert_streams!(
395            hir.select().eval(PacingSelector::Periodic).build(),
396            vec!["a", "b", "e"]
397        );
398        assert_streams!(
399            hir.select().eval(PacingSelector::EventBased).build(),
400            vec!["c", "d", "f", "g"]
401        );
402
403        assert_streams!(hir.select().filtered().build(), vec!["b", "e", "g"]);
404        assert_streams!(hir.select().unfiltered().build(), vec!["a", "c", "d", "f"]);
405        assert_streams!(
406            hir.select().filter(FilterSelector::Filtered).build(),
407            vec!["b", "e", "g"]
408        );
409        assert_streams!(
410            hir.select().filter(FilterSelector::Unfiltered).build(),
411            vec!["a", "c", "d", "f"]
412        );
413    }
414
415    #[test]
416    fn test_dynamic() {
417        let hir = get_hir();
418
419        assert_streams!(
420            hir.select().dynamic_streams().build(),
421            vec!["b", "c", "d", "e", "f"]
422        );
423        assert_streams!(
424            hir.select().dynamic_streams().periodic_eval().build(),
425            vec!["b", "e"]
426        );
427        assert_streams!(
428            hir.select().dynamic_streams().event_based_eval().build(),
429            vec!["c", "d", "f"]
430        );
431        assert_streams!(
432            hir.select()
433                .dynamic_streams()
434                .eval(PacingSelector::Periodic)
435                .build(),
436            vec!["b", "e"]
437        );
438        assert_streams!(
439            hir.select()
440                .dynamic_streams()
441                .eval(PacingSelector::EventBased)
442                .build(),
443            vec!["c", "d", "f"]
444        );
445
446        assert_streams!(
447            hir.select().dynamic_streams().filtered().build(),
448            vec!["b", "e"]
449        );
450        assert_streams!(
451            hir.select().dynamic_streams().unfiltered().build(),
452            vec!["c", "d", "f"]
453        );
454        assert_streams!(
455            hir.select()
456                .dynamic_streams()
457                .filter(FilterSelector::Filtered)
458                .build(),
459            vec!["b", "e"]
460        );
461        assert_streams!(
462            hir.select()
463                .dynamic_streams()
464                .filter(FilterSelector::Unfiltered)
465                .build(),
466            vec!["c", "d", "f"]
467        );
468
469        assert_streams!(
470            hir.select().dynamic_streams().periodic_spawn().build(),
471            vec!["c", "f"]
472        );
473        assert_streams!(
474            hir.select().dynamic_streams().event_based_spawn().build(),
475            vec!["b", "d", "e"]
476        );
477        assert_streams!(
478            hir.select()
479                .dynamic_streams()
480                .spawn(PacingSelector::Periodic)
481                .build(),
482            vec!["c", "f"]
483        );
484        assert_streams!(
485            hir.select()
486                .dynamic_streams()
487                .spawn(PacingSelector::EventBased)
488                .build(),
489            vec!["b", "d", "e"]
490        );
491
492        assert_streams!(
493            hir.select().dynamic_streams().parameterized().build(),
494            vec!["c", "f"]
495        );
496        assert_streams!(
497            hir.select().dynamic_streams().not_parameterized().build(),
498            vec!["b", "d", "e"]
499        );
500        assert_streams!(
501            hir.select()
502                .dynamic_streams()
503                .parameters(ParameterSelector::Parameterized)
504                .build(),
505            vec!["c", "f"]
506        );
507        assert_streams!(
508            hir.select()
509                .dynamic_streams()
510                .parameters(ParameterSelector::NotParameterized)
511                .build(),
512            vec!["b", "d", "e"]
513        );
514    }
515
516    #[test]
517    fn test_close() {
518        let hir = get_hir();
519
520        assert_streams!(
521            hir.select()
522                .dynamic_streams()
523                .close(CloseSelector::Closed)
524                .build(),
525            vec!["d", "e", "f"]
526        );
527
528        assert_streams!(
529            hir.select()
530                .dynamic_streams()
531                .close(CloseSelector::AnyPeriodic)
532                .build(),
533            vec!["d", "e"]
534        );
535
536        assert_streams!(
537            hir.select()
538                .dynamic_streams()
539                .close(CloseSelector::DynamicPeriodic)
540                .build(),
541            vec!["e"]
542        );
543
544        assert_streams!(
545            hir.select()
546                .dynamic_streams()
547                .close(CloseSelector::StaticPeriodic)
548                .build(),
549            vec!["d"]
550        );
551
552        assert_streams!(
553            hir.select()
554                .dynamic_streams()
555                .close(CloseSelector::EventBased)
556                .build(),
557            vec!["f"]
558        );
559
560        assert_streams!(
561            hir.select()
562                .dynamic_streams()
563                .close(CloseSelector::NotClosed)
564                .build(),
565            vec!["b", "c"]
566        );
567    }
568
569    #[test]
570    fn test_static() {
571        let hir = get_hir();
572
573        assert_streams!(hir.select().static_streams().build(), vec!["a", "g"]);
574        assert_streams!(
575            hir.select().static_streams().periodic_eval().build(),
576            vec!["a"]
577        );
578        assert_streams!(
579            hir.select().static_streams().event_based_eval().build(),
580            vec!["g"]
581        );
582        assert_streams!(
583            hir.select()
584                .static_streams()
585                .eval(PacingSelector::Periodic)
586                .build(),
587            vec!["a"]
588        );
589        assert_streams!(
590            hir.select()
591                .static_streams()
592                .eval(PacingSelector::EventBased)
593                .build(),
594            vec!["g"]
595        );
596
597        assert_streams!(hir.select().static_streams().filtered().build(), vec!["g"]);
598        assert_streams!(
599            hir.select().static_streams().unfiltered().build(),
600            vec!["a"]
601        );
602        assert_streams!(
603            hir.select()
604                .static_streams()
605                .filter(FilterSelector::Filtered)
606                .build(),
607            vec!["g"]
608        );
609        assert_streams!(
610            hir.select()
611                .static_streams()
612                .filter(FilterSelector::Unfiltered)
613                .build(),
614            vec!["a"]
615        );
616    }
617}