rtlola_frontend/
mir.rs

1//! This module covers the Mid-Level Intermediate Representation (MIR) of an RTLola specification.
2//!
3//! The [RtLolaMir] is specifically designed to allow convenient navigation and access to data.  Hence, it is perfect for working *with* the specification
4//! rather than work *on* it.  
5//!
6//! # Most Notable Structs and Enums
7//! * [RtLolaMir] is the root data structure representing the specification.
8//! * [OutputStream] represents a single output stream.  The data structure is enriched with information regarding streams accessing it or accessed by it and much more.  For input streams confer [InputStream].
9//! * [StreamReference] used for referencing streams within the Mir.
10//! * [Spawn] and [Close] contain all information regarding the parametrization, spawning and closing behavior of streams.
11//! * [Eval] contains the information regarding the evaluation condition and the expression of the stream.
12//! * [Expression] represents an expression.  It contains its [ExpressionKind] and its type.  The latter contains all information specific to a certain kind of expression such as sub-expressions of operators.
13//!
14//! # See Also
15//! * [rtlola_frontend](crate) for an overview regarding different representations.
16//! * [rtlola_frontend::parse](crate::parse) to obtain an [RtLolaMir] for a specification in form of a string or path to a specification file.
17//! * [rtlola_hir::hir::RtLolaHir] for a data structs designed for working _on_it.
18//! * [RtLolaAst](rtlola_parser::RtLolaAst), which is the most basic and down-to-syntax data structure available for RTLola.
19
20mod dependency_graph;
21mod print;
22mod schedule;
23
24use std::collections::HashMap;
25use std::convert::TryInto;
26use std::time::Duration;
27
28use num::traits::Inv;
29pub use print::RtLolaMirPrinter;
30use rtlola_hir::hir::ConcreteValueType;
31pub use rtlola_hir::hir::{
32    InputReference, Layer, MemBoundMode, MemorizationBound, Origin, OutputKind, OutputReference,
33    RtLolaHir, StreamLayers, StreamReference, WindowReference,
34};
35pub use rtlola_parser::ast::Tag;
36#[cfg(feature = "spanned")]
37use rtlola_reporting::Span;
38use rust_decimal::Decimal;
39use serde::{Deserialize, Serialize};
40use uom::si::rational64::{Frequency as UOM_Frequency, Time as UOM_Time};
41use uom::si::time::nanosecond;
42
43pub use self::dependency_graph::DependencyGraph;
44pub use crate::mir::schedule::{Deadline, Schedule, Task};
45
46pub(crate) type Mir = RtLolaMir;
47
48/// A trait for any kind of stream.
49pub trait Stream {
50    /// Reports the evaluation layer of the spawn condition of the stream.
51    fn spawn_layer(&self) -> Layer;
52    /// Reports the evaluation layer of the stream.
53    fn eval_layer(&self) -> Layer;
54    /// Reports the name of the stream.
55    fn name(&self) -> &str;
56    /// Returns the type of the stream.
57    fn ty(&self) -> &Type;
58    /// Indicates whether or not the stream is an input stream.
59    fn is_input(&self) -> bool;
60    /// Indicates whether or not the stream has parameters.
61    fn is_parameterized(&self) -> bool;
62    /// Indicates whether or not the stream spawned / dynamically created.
63    fn is_spawned(&self) -> bool;
64    /// Indicates whether or not the stream is closed.
65    fn is_closed(&self) -> bool;
66    /// Indicated whether or not the stream is filtered.
67    fn is_eval_filtered(&self) -> bool;
68    /// Indicates how many values of the stream's [Type] need to be memorized.
69    fn values_to_memorize(&self) -> MemorizationBound;
70    /// Produces a stream references referring to the stream.
71    fn as_stream_ref(&self) -> StreamReference;
72    /// Returns the collection of streams that access the stream non-transitively.
73    fn accessed_by(&self) -> &Accesses;
74    /// Returns the collection of sliding windows that access the stream non-transitively.
75    /// This includes both sliding and discrete windows.
76    fn aggregated_by(&self) -> &[(StreamReference, Origin, WindowReference)];
77    /// Returns the collection of sliding windows that are accessed by the stream non-transitively.
78    /// This includes both sliding and discrete windows.
79    fn aggregates(&self) -> &[(StreamReference, Origin, WindowReference)];
80    /// Returns the tags annotated to this stream.
81    fn tags(&self) -> &HashMap<String, Option<String>>;
82    #[cfg(feature = "spanned")]
83    /// Returns the spans of all tags annotated to this stream.
84    fn tags_span(&self) -> &HashMap<String, Span>;
85}
86
87/// This struct constitutes the Mid-Level Intermediate Representation (MIR) of an RTLola specification.
88///
89/// The [RtLolaMir] is specifically designed to allow convenient navigation and access to data.  Hence, it is perfect for working _with_ the specification
90/// rather than work _on_ it.  
91///
92/// # Most Notable Structs and Enums
93/// * [Stream] is a trait offering several convenient access methods for everything constituting a stream.
94/// * [OutputStream] represents a single output stream.  The data structure is enriched with information regarding streams accessing it or accessed by it and much more.  For input streams confer [InputStream].
95/// * [StreamReference] used for referencing streams within the Mir.
96/// * [Spawn] and [Close] contain all information regarding the parametrization, spawning and closing behavior of streams.
97/// * [Eval] contains the information regarding the evaluation condition and the expression of the stream. The [Expression] represents an computational evaluation.  It contains its [ExpressionKind] and its type.  The latter contains all information specific to a certain kind of expression such as sub-expressions of operators.
98///
99/// # See Also
100/// * [rtlola_frontend](crate) for an overview regarding different representations.
101/// * [rtlola_frontend::parse](crate::parse) to obtain an [RtLolaMir] for a specification in form of a string or path to a specification file.
102/// * [rtlola_hir::hir::RtLolaHir] for a data structs designed for working _on_it.
103/// * [RtLolaAst](rtlola_parser::RtLolaAst), which is the most basic and down-to-syntax data structure available for RTLola.
104#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
105pub struct RtLolaMir {
106    /// Contains all input streams.
107    pub inputs: Vec<InputStream>,
108    /// Contains all output streams including all triggers.  They only contain the information relevant for every single kind of output stream.  Refer to [RtLolaMir::time_driven], [RtLolaMir::event_driven],
109    /// and [RtLolaMir::triggers] for more information.
110    pub outputs: Vec<OutputStream>,
111    /// References and pacing information of all time-driven streams.
112    pub time_driven: Vec<TimeDrivenStream>,
113    /// References and pacing information of all event-driven streams.
114    pub event_driven: Vec<EventDrivenStream>,
115    /// A collection of all discrete windows.
116    pub discrete_windows: Vec<DiscreteWindow>,
117    /// A collection of all sliding windows.
118    pub sliding_windows: Vec<SlidingWindow>,
119    /// A collection of all instance aggregations.
120    pub instance_aggregations: Vec<InstanceAggregation>,
121    /// The references of all outputs that represent triggers
122    pub triggers: Vec<Trigger>,
123    /// The global tags of the specification
124    pub global_tags: Tags,
125    #[cfg(feature = "spanned")]
126    /// The span's of the global tags
127    pub global_tags_span: HashMap<String, Span>,
128}
129
130/// Represents an RTLola value type.  This does not including pacing information, for this refer to [TimeDrivenStream] and [EventDrivenStream].
131#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
132pub enum Type {
133    /// A boolean type
134    Bool,
135    /// An integer type of fixed bit-width
136    Int(IntTy),
137    /// An unsigned integer type of fixed bit-width
138    UInt(UIntTy),
139    /// A floating point type of fixed bit-width
140    Float(FloatTy),
141    /// A signed fixed point type of fixed bit-width
142    Fixed(FixedTy),
143    /// An unsigned fixed point type of fixed bit-width
144    UFixed(FixedTy),
145    /// A unicode string
146    String,
147    /// A sequence of 8-bit bytes
148    Bytes,
149    /// An n-ary tuples where n is the length of the contained vector
150    Tuple(Vec<Type>),
151    /// An optional value type, e.g., resulting from accessing a past value of a stream
152    Option(Box<Type>),
153    /// A type describing a function
154    Function {
155        /// The types of the arguments to the function, monomorphized
156        args: Vec<Type>,
157        /// The monomorphized return type of the function
158        ret: Box<Type>,
159    },
160}
161
162/// Represents an RTLola pacing type.
163#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
164pub enum PacingType {
165    /// Represents a periodic pacing with a fixed global frequency
166    GlobalPeriodic(UOM_Frequency),
167    /// Represents a periodic pacing with a fixed local frequency
168    LocalPeriodic(UOM_Frequency),
169    /// Represents an event based pacing defined by an [ActivationCondition]
170    Event(ActivationCondition),
171    /// The pacing is constant, meaning that the value is always present.
172    Constant,
173}
174
175#[allow(missing_docs)]
176#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
177pub enum IntTy {
178    /// Represents an 8-bit integer.
179    Int8,
180    /// Represents a 16-bit integer.
181    Int16,
182    /// Represents a 32-bit integer.
183    Int32,
184    /// Represents a 64-bit integer.
185    Int64,
186    /// Represents a 128-bit integer.
187    Int128,
188    /// Represents a 256-bit integer.
189    Int256,
190}
191
192#[allow(missing_docs)]
193#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
194pub enum UIntTy {
195    /// Represents an 8-bit unsigned integer.
196    UInt8,
197    /// Represents a 16-bit unsigned integer.
198    UInt16,
199    /// Represents a 32-bit unsigned integer.
200    UInt32,
201    /// Represents a 64-bit unsigned integer.
202    UInt64,
203    /// Represents a 128-bit unsigned integer.
204    UInt128,
205    /// Represents a 256-bit unsigned integer.
206    UInt256,
207}
208
209#[allow(missing_docs)]
210#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
211pub enum FloatTy {
212    /// Represents a 32-bit floating point number.
213    Float32,
214    /// Represents a 64-bit floating point number.
215    Float64,
216}
217
218#[allow(missing_docs)]
219#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
220pub enum FixedTy {
221    /// Represents a 64-bit fixed point number with 32 integer bits and 32 fractional bits
222    Fixed64_32,
223    /// Represents a 32-bit fixed point number with 16 integer bits and 16 fractional bits
224    Fixed32_16,
225    /// Represents a 16-bit fixed point number with 16 integer bits and 8 fractional bits
226    Fixed16_8,
227}
228
229impl From<ConcreteValueType> for Type {
230    fn from(ty: ConcreteValueType) -> Type {
231        match ty {
232            ConcreteValueType::Integer8 => Type::Int(IntTy::Int8),
233            ConcreteValueType::Integer16 => Type::Int(IntTy::Int16),
234            ConcreteValueType::Integer32 => Type::Int(IntTy::Int32),
235            ConcreteValueType::Integer64 => Type::Int(IntTy::Int64),
236            ConcreteValueType::UInteger8 => Type::UInt(UIntTy::UInt8),
237            ConcreteValueType::UInteger16 => Type::UInt(UIntTy::UInt16),
238            ConcreteValueType::UInteger32 => Type::UInt(UIntTy::UInt32),
239            ConcreteValueType::UInteger64 => Type::UInt(UIntTy::UInt64),
240            ConcreteValueType::Float32 => Type::Float(FloatTy::Float32),
241            ConcreteValueType::Float64 => Type::Float(FloatTy::Float64),
242            ConcreteValueType::Tuple(t) => Type::Tuple(t.into_iter().map(Type::from).collect()),
243            ConcreteValueType::TString => Type::String,
244            ConcreteValueType::Byte => Type::Bytes,
245            ConcreteValueType::Option(o) => Type::Option(Box::new(Type::from(*o))),
246            _ => unreachable!("cannot lower `ValueTy` {}", ty),
247        }
248    }
249}
250
251type Accesses = Vec<(StreamReference, Vec<(Origin, StreamAccessKind)>)>;
252
253/// Contains all information inherent to an input stream.
254#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
255pub struct InputStream {
256    /// The name of the stream
257    pub name: String,
258    /// The value type of the stream.  Note that its pacing is always pre-determined.
259    pub ty: Type,
260    /// The collection of streams that access the current stream non-transitively
261    pub accessed_by: Accesses,
262    /// The collection of sliding windows that access this stream non-transitively.  This includes both sliding and discrete windows.
263    pub aggregated_by: Vec<(StreamReference, Origin, WindowReference)>,
264    /// The collection of windows that is accessed by this stream.  This includes both sliding and discrete windows.
265    pub aggregates: Vec<(StreamReference, Origin, WindowReference)>,
266    /// Provides the evaluation of layer of this stream.
267    pub layer: StreamLayers,
268    /// Provides the number of values of this stream's type that need to be memorized.  Refer to [Type::size] to get a type's byte-size.
269    pub memory_bound: MemorizationBound,
270    /// The reference referring to this stream
271    pub reference: StreamReference,
272    /// The tags annotated to this stream.
273    pub tags: Tags,
274    #[cfg(feature = "spanned")]
275    /// The span of the tags annotated to the input stream
276    pub tags_span: HashMap<String, Span>,
277    #[cfg(feature = "spanned")]
278    /// The span of the input stream definition
279    pub span: Span,
280}
281
282/// Contains all information relevant to every kind of output stream.
283///
284/// Refer to [TimeDrivenStream], [EventDrivenStream], and [Trigger], as well as their respective fields in the Mir for additional information.
285#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
286pub struct OutputStream {
287    /// The name of the stream.
288    pub name: String,
289    /// The kind of the output (regular output or trigger)
290    pub kind: OutputKind,
291    /// The value type of the stream.
292    pub ty: Type,
293    /// Information on the spawn behavior of the stream
294    pub spawn: Spawn,
295    /// Information on the evaluation behavior of the stream
296    pub eval: Eval,
297    /// The condition under which the stream is supposed to be closed
298    pub close: Close,
299    /// The collection of streams this stream accesses non-transitively.  Includes this stream's spawn, evaluation condition, and close expressions.
300    pub accesses: Accesses,
301    /// The collection of streams that access the current stream non-transitively
302    pub accessed_by: Accesses,
303    /// The collection of windows that access this stream non-transitively.  This includes both sliding and discrete windows.
304    pub aggregated_by: Vec<(StreamReference, Origin, WindowReference)>,
305    /// The collection of windows that is accessed by this stream.  This includes both sliding and discrete windows.
306    pub aggregates: Vec<(StreamReference, Origin, WindowReference)>,
307    /// Provides the number of values of this stream's type that need to be memorized.  Refer to [Type::size] to get a type's byte-size.
308    pub memory_bound: MemorizationBound,
309    /// Provides the evaluation of layer of this stream.
310    pub layer: StreamLayers,
311    /// The reference referring to this stream
312    pub reference: StreamReference,
313    /// The parameters of a parameterized output stream; The vector is empty in non-parametrized streams
314    pub params: Vec<Parameter>,
315    /// The tags annotated to this stream.
316    pub tags: Tags,
317    #[cfg(feature = "spanned")]
318    /// The span of the tags annotated to the output stream
319    pub tags_span: HashMap<String, Span>,
320    #[cfg(feature = "spanned")]
321    /// The span of the output stream definition
322    pub span: Span,
323}
324
325/// A trigger (represented by the output stream `output_reference`)
326#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Copy)]
327pub struct Trigger {
328    /// The reference of the output stream representing this trigger
329    pub output_reference: StreamReference,
330    /// The reference of this trigger
331    pub trigger_reference: TriggerReference,
332}
333
334impl OutputStream {
335    fn is_trigger(&self) -> bool {
336        matches!(self.kind, OutputKind::Trigger(_))
337    }
338}
339
340type Tags = HashMap<String, Option<String>>;
341
342/// A type alias for references to triggers.
343pub type TriggerReference = usize;
344
345/// Information on the spawn behavior of a stream
346#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
347pub struct Spawn {
348    /// The expression needs to be evaluated whenever the stream with this Spawn template is supposed to be spawned.  The result of the evaluation constitutes the respective parameters.
349    pub expression: Option<Expression>,
350    /// The timing of when a new instance _could_ be created assuming the spawn condition evaluates to true.
351    pub pacing: PacingType,
352    /// The spawn condition.  If the condition evaluates to false, the stream will not be spawned.
353    pub condition: Option<Expression>,
354    #[cfg(feature = "spanned")]
355    /// The span of the spawn clause
356    pub span: Span,
357}
358
359impl Default for Spawn {
360    fn default() -> Self {
361        Spawn {
362            expression: None,
363            pacing: PacingType::Constant,
364            condition: None,
365            #[cfg(feature = "spanned")]
366            span: Span::Unknown,
367        }
368    }
369}
370
371/// Information on the close behavior of a stream
372#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
373pub struct Close {
374    /// The `condition` expression needs to be evaluated whenever the stream with this Close template is supposed to be closed.  The result of the evaluation constitutes whether the stream is closed.
375    pub condition: Option<Expression>,
376    /// The timing of the close condition.
377    pub pacing: PacingType,
378    /// Indicates whether the close condition contains a reference to the stream it belongs to.
379    pub has_self_reference: bool,
380    #[cfg(feature = "spanned")]
381    /// The span of the close clause
382    pub span: Span,
383}
384
385impl Default for Close {
386    fn default() -> Self {
387        Close {
388            condition: None,
389            pacing: PacingType::Constant,
390            has_self_reference: false,
391            #[cfg(feature = "spanned")]
392            span: Span::Unknown,
393        }
394    }
395}
396
397/// Information on the evaluation behavior of a stream
398#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
399pub struct Eval {
400    /// The eval clauses of the stream.
401    pub clauses: Vec<EvalClause>,
402    /// The eval pacing of the stream, combining the condition and expr pacings of all eval clauses
403    pub eval_pacing: PacingType,
404}
405
406/// Information on an eval clause of a stream
407#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
408pub struct EvalClause {
409    /// The expression of this stream needs to be evaluated whenever this condition evaluates to `True`.
410    pub condition: Option<Expression>,
411    /// The evaluation expression of this stream, defining the returned and accessed value.
412    pub expression: Expression,
413    /// The eval pacing of the stream, combining the condition and expr pacings of the clause.
414    pub pacing: PacingType,
415    #[cfg(feature = "spanned")]
416    /// The span of the eval clause
417    pub span: Span,
418}
419
420/// Information of a parameter of a parametrized output stream
421#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
422pub struct Parameter {
423    /// The name of the parameter.
424    pub name: String,
425    /// The type of the parameter.
426    pub ty: Type,
427    /// The index of the parameter.
428    pub idx: usize,
429    #[cfg(feature = "spanned")]
430    /// The span of the parameter
431    pub span: Span,
432}
433
434/// Wrapper for output streams providing additional information specific to time-driven streams.
435#[derive(Debug, PartialEq, Eq, Clone, Copy, Serialize, Deserialize)]
436pub struct TimeDrivenStream {
437    /// A reference to the stream that is specified.
438    pub reference: StreamReference,
439    /// The evaluation frequency of the stream.
440    pub frequency: UOM_Frequency,
441    /// Whether the given frequency is relative to a dynamic spawn
442    pub locality: PacingLocality,
443}
444
445#[derive(Debug, PartialEq, Eq, Clone, Copy, Serialize, Deserialize)]
446/// Describes if the pacing is interpreted relatively to a dynamic spawn
447pub enum PacingLocality {
448    /// The pacing is relative to a global clock
449    Global,
450    /// The pacing is relative to the spawn
451    Local,
452}
453
454impl TimeDrivenStream {
455    /// Returns the evaluation period, i.e., the multiplicative inverse of [TimeDrivenStream::frequency].
456    pub fn period(&self) -> UOM_Time {
457        UOM_Time::new::<uom::si::time::second>(
458            self.frequency.get::<uom::si::frequency::hertz>().inv(),
459        )
460    }
461
462    /// Returns the evaluation frequency.
463    pub fn frequency(&self) -> UOM_Frequency {
464        self.frequency
465    }
466
467    /// Returns the evaluation period, i.e., the multiplicative inverse of [TimeDrivenStream::frequency], as [Duration].
468    pub fn period_in_duration(&self) -> Duration {
469        Duration::from_nanos(
470            self.period()
471                .get::<nanosecond>()
472                .to_integer()
473                .try_into()
474                .expect("Period [ns] too large for u64!"),
475        )
476    }
477}
478
479/// Wrapper for output streams providing additional information specific to event-based streams.
480#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
481pub struct EventDrivenStream {
482    /// A reference to the stream that is specified
483    pub reference: StreamReference,
484    /// The activation condition of an event-based stream
485    pub ac: ActivationCondition,
486}
487
488/// Representation of the activation condition of event-based entities such as streams or spawn conditions
489#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
490pub enum ActivationCondition {
491    /// Activate when all entries of the [Vec] are true.
492    Conjunction(Vec<Self>),
493    /// Activate when at least one entry of the [Vec] is true.
494    Disjunction(Vec<Self>),
495    /// Activate when the referenced stream is evaluated.
496    Stream(StreamReference),
497    /// Activate
498    True,
499}
500
501/// Represents an expression
502#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
503pub struct Expression {
504    /// The kind and all kind-specific information of the expression
505    pub kind: ExpressionKind,
506    /// The type of the expression
507    pub ty: Type,
508    #[cfg(feature = "spanned")]
509    /// The span of the expression
510    pub span: Span,
511}
512
513/// This enum contains all possible kinds of expressions and their relevant information.
514#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
515pub enum ExpressionKind {
516    /// Load a constant value
517    LoadConstant(Constant),
518    /// Apply an arithmetic or logic operation.  The function is monomorphized.
519    ///
520    /// *Note:* Arguments never need to be coerced.
521    /// Unary: 1st argument -> operand
522    /// Binary: 1st argument -> lhs, 2nd argument -> rhs
523    /// n-ary: kth argument -> kth operand
524    ArithLog(ArithLogOp, Vec<Expression>),
525    /// Access another stream
526    StreamAccess {
527        /// The target stream to be accessed
528        target: StreamReference,
529        /// The parameters of the specific stream instance that is accessed.  
530        ///
531        /// If the stream behind `target` is not parametrized, this collection is empty.
532        parameters: Vec<Expression>,
533        /// The kind of access
534        access_kind: StreamAccessKind,
535    },
536    /// Access to the parameter of a stream represented by a stream reference,
537    /// referencing the target stream and the index of the parameter that should be accessed.
538    ParameterAccess(StreamReference, usize),
539    /// Access to the lambda parameter in the filtered instance aggregation
540    LambdaParameterAccess {
541        /// Reference to the instance aggregation using the lambda function
542        wref: WindowReference,
543        /// Reference to the parameter
544        pref: usize,
545    },
546    /// A conditional (if-then-else) expression
547    Ite {
548        /// The condition under which either `consequence` or `alternative` is selected.
549        condition: Box<Expression>,
550        /// The consequence should be evaluated and returned if the condition evaluates to true.
551        consequence: Box<Expression>,
552        /// The alternative should be evaluated and returned if the condition evaluates to false.
553        alternative: Box<Expression>,
554    },
555    /// A tuple expression
556    Tuple(Vec<Expression>),
557    /// Represents a tuple projections, i.e., it accesses a specific tuple element.  
558    // The expression produces a tuple and the `usize` is the index of the accessed element.  This value is constant.
559    TupleAccess(Box<Expression>, usize),
560    /// Represents a function call.  The function is monomorphized.
561    ///
562    /// *Note:* Arguments never need to be coerced.
563    /// Unary: 1st argument -> operand
564    /// Binary: 1st argument -> lhs, 2nd argument -> rhs
565    /// n-ary: kth argument -> kth operand
566    Function(String, Vec<Expression>),
567    /// Converting a value to a different type
568    ///
569    /// The result type is indicated in the expression with the `Convert` kind.  
570    Convert {
571        /// The expression that produces a value.  The type of the expression indicates the source of the conversion.
572        expr: Box<Expression>,
573    },
574    /// Transforms an optional value into a definitive one
575    Default {
576        /// The expression that results in an optional value.
577        expr: Box<Expression>,
578        /// An infallible expression providing the default value if `expr` fails to produce a value.
579        default: Box<Expression>,
580    },
581}
582
583/// Represents a constant value of a certain kind.
584///
585/// *Note* the type of the constant might be more general than the type of the constant.  For example, `Constant::UInt(3u64)` represents an RTLola UInt8 constant.
586#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
587pub enum Constant {
588    #[allow(missing_docs)]
589    Str(String),
590    #[allow(missing_docs)]
591    Bool(bool),
592    #[allow(missing_docs)]
593    UInt(u64),
594    #[allow(missing_docs)]
595    Int(i64),
596    #[allow(missing_docs)]
597    Float(f64),
598    #[allow(missing_docs)]
599    Decimal(Decimal),
600}
601
602/// Arithmetical and logical operations
603#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
604pub enum ArithLogOp {
605    /// Logic negation (!)
606    Not,
607    /// Arithmetic negation (-)
608    Neg,
609    /// Arithmetic addition (+)
610    Add,
611    /// Arithmetic subtraction (-)
612    Sub,
613    /// Arithmetic multiplication (*)
614    Mul,
615    /// Arithmetic division (/)
616    Div,
617    /// Arithmetic modulation (%)
618    Rem,
619    /// Arithmetic exponentiation (**)
620    Pow,
621    /// Logic conjunction/multiplication (&&)
622    And,
623    /// Logic disjunction/addition (||)
624    Or,
625    /// Bit-wise xor (^)
626    BitXor,
627    /// Bit-wise conjunction/multiplication (&)
628    BitAnd,
629    /// Bit-wise disjunction/addition (|)
630    BitOr,
631    /// Bit-wise negation / One's complement (~)
632    BitNot,
633    /// Bit-wise left-shift (<<)
634    Shl,
635    /// Bit-wise right-shift (>>)
636    Shr,
637    /// Semantic Equality (==)
638    Eq,
639    /// Less-than comparison (<)
640    Lt,
641    /// Less-than-or-equal comparison (<=)
642    Le,
643    /// Semantic Inequality (!=)
644    Ne,
645    /// Greater-than-or-equal comparison (>=)
646    Ge,
647    /// Greater-than comparison (>)
648    Gt,
649}
650
651/// Represents an instance of a discrete window
652#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
653pub struct DiscreteWindow {
654    /// The stream whose values will be aggregated
655    pub target: StreamReference,
656    /// The stream in which expression this window occurs
657    pub caller: StreamReference,
658    /// The duration over which the window aggregates
659    pub duration: usize,
660    /// Indicates whether or not the first aggregated value will be produced immediately or whether the window waits until `duration` number of values have been observed.
661    pub wait: bool,
662    /// The aggregation operation
663    pub op: WindowOperation,
664    /// A reference to this discrete window
665    pub reference: WindowReference,
666    /// The type of value the window produces
667    pub ty: Type,
668    /// The origin of the discrete window expression
669    pub origin: Origin,
670    /// The pacing of the discrete window expression
671    pub pacing: PacingType,
672}
673
674/// Represents an instance of a sliding window
675#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
676pub struct SlidingWindow {
677    /// The stream whose values will be aggregated
678    pub target: StreamReference,
679    /// The stream in which expression this window occurs
680    pub caller: StreamReference,
681    /// The duration over which the window aggregates
682    pub duration: Duration,
683    /// The number of buckets that are needed for the window
684    pub num_buckets: MemorizationBound,
685    /// The time per bucket of the window
686    pub bucket_size: Duration,
687    /// Indicates whether or not the first aggregated value will be produced immediately or whether the window waits until `duration` has passed at least once
688    pub wait: bool,
689    /// The aggregation operation
690    pub op: WindowOperation,
691    /// A reference to this sliding window
692    pub reference: WindowReference,
693    /// The type of value the window produces
694    pub ty: Type,
695    /// The origin of the sliding window expression
696    pub origin: Origin,
697    /// The pacing of the sliding window expression
698    pub pacing: PacingType,
699}
700
701/// Represents an instance of an instance aggregation
702#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
703pub struct InstanceAggregation {
704    /// The stream whose values will be aggregated
705    pub target: StreamReference,
706    /// The stream calling and evaluating this window
707    pub caller: StreamReference,
708    /// A filter over the instances
709    pub selection: InstanceSelection,
710    /// The operation to be performed over the instances
711    pub aggr: InstanceOperation,
712    /// The reference of this window.
713    pub reference: WindowReference,
714    /// The type of value the window produces
715    pub ty: Type,
716    /// The origin of the instance window expression
717    pub origin: Origin,
718    /// The pacing of the instance aggregation expression
719    pub pacing: PacingType,
720}
721
722#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
723/// Enum to indicate which instances are part of the aggregation
724pub enum InstanceSelection {
725    /// Only instances that are updated in this evaluation cycle are part of the aggregation
726    Fresh,
727    /// All instances are part of the aggregation
728    All,
729    /// Only instances that are updated in this evaluation cycle and satisfy the condition are part of the aggregation
730    FilteredFresh {
731        /// The parameters of the lambda expression
732        parameters: Vec<Parameter>,
733        /// The condition that needs to be satisfied
734        cond: Box<Expression>,
735    },
736    /// All instances that satisfy the condition are part of the aggregation
737    FilteredAll {
738        /// The parameters of the lambda expression
739        parameters: Vec<Parameter>,
740        /// The condition that needs to be satisfied
741        cond: Box<Expression>,
742    },
743}
744
745impl InstanceSelection {
746    /// Accesses the condition to a filtered instance aggregation. Returns None if the instance aggregation is not filtered
747    pub fn condition(&self) -> Option<&Expression> {
748        match self {
749            InstanceSelection::Fresh | InstanceSelection::All => None,
750            InstanceSelection::FilteredFresh {
751                parameters: _,
752                cond,
753            }
754            | InstanceSelection::FilteredAll {
755                parameters: _,
756                cond,
757            } => Some(cond),
758        }
759    }
760
761    /// Accesses the parameters to a filtered instance aggregation. Returns None if the instance aggregation is not filtered
762    pub fn parameters(&self) -> Option<&Vec<Parameter>> {
763        match self {
764            InstanceSelection::Fresh | InstanceSelection::All => None,
765            InstanceSelection::FilteredFresh {
766                parameters,
767                cond: _,
768            }
769            | InstanceSelection::FilteredAll {
770                parameters,
771                cond: _,
772            } => Some(parameters),
773        }
774    }
775}
776
777#[derive(Debug, PartialEq, Eq, Clone, Copy, Hash, Serialize, Deserialize)]
778/// A subset of the window operations that are suitable to be performed over a set of instances.
779pub enum InstanceOperation {
780    /// Aggregation function to count the number of instances of the accessed stream
781    Count,
782    /// Aggregation function to return the minimum
783    Min,
784    /// Aggregation function to return the minimum
785    Max,
786    /// Aggregation function to return the addition
787    Sum,
788    /// Aggregation function to return the product
789    Product,
790    /// Aggregation function to return the average
791    Average,
792    /// Aggregation function to return the conjunction, i.e., the instances aggregation returns true iff ALL current values of the instances of the accessed stream are assigned to true
793    Conjunction,
794    /// Aggregation function to return the disjunction, i.e., the instances aggregation returns true iff ANY current values of the instances of the accessed stream are assigned to true
795    Disjunction,
796    /// Aggregation function to return the variance of all values, assumes equal probability.
797    Variance,
798    /// Aggregation function to return the covariance of all values in a tuple stream, assumes equal probability.
799    Covariance,
800    /// Aggregation function to return the standard deviation of all values, assumes equal probability.
801    StandardDeviation,
802    /// Aggregation function to return the Nth-Percentile
803    NthPercentile(u8),
804}
805
806#[derive(Debug, PartialEq, Eq, Clone, Copy, Serialize, Deserialize)]
807/// The Ast representation of the different aggregation functions
808pub enum WindowOperation {
809    /// Aggregation function to count the number of updated values on the accessed stream
810    Count,
811    /// Aggregation function to return the minimum
812    Min,
813    /// Aggregation function to return the minimum
814    Max,
815    /// Aggregation function to return the addition
816    Sum,
817    /// Aggregation function to return the product
818    Product,
819    /// Aggregation function to return the average
820    Average,
821    /// Aggregation function to return the integral
822    Integral,
823    /// Aggregation function to return the conjunction, i.e., the sliding window returns true iff ALL values on the accessed stream inside a window are assigned to true
824    Conjunction,
825    /// Aggregation function to return the disjunction, i.e., the sliding window returns true iff AT LEAst ONE value on the accessed stream inside a window is assigned to true
826    Disjunction,
827    /// Aggregation function to return the last value, a time bounded hold
828    Last,
829    /// Aggregation function to return the variance of all values, assumes equal probability.
830    Variance,
831    /// Aggregation function to return the covariance of all values in a tuple stream, assumes equal probability.
832    Covariance,
833    /// Aggregation function to return the standard deviation of all values, assumes equal probability.
834    StandardDeviation,
835    /// Aggregation function to return the Nth-Percentile
836    NthPercentile(u8),
837}
838
839impl From<InstanceOperation> for WindowOperation {
840    fn from(value: InstanceOperation) -> Self {
841        match value {
842            InstanceOperation::Count => WindowOperation::Count,
843            InstanceOperation::Min => WindowOperation::Min,
844            InstanceOperation::Max => WindowOperation::Max,
845            InstanceOperation::Sum => WindowOperation::Sum,
846            InstanceOperation::Product => WindowOperation::Product,
847            InstanceOperation::Average => WindowOperation::Average,
848            InstanceOperation::Conjunction => WindowOperation::Conjunction,
849            InstanceOperation::Disjunction => WindowOperation::Disjunction,
850            InstanceOperation::Variance => WindowOperation::Variance,
851            InstanceOperation::Covariance => WindowOperation::Covariance,
852            InstanceOperation::StandardDeviation => WindowOperation::StandardDeviation,
853            InstanceOperation::NthPercentile(x) => WindowOperation::NthPercentile(x),
854        }
855    }
856}
857
858/// A trait for any kind of window
859pub trait Window {
860    /// Returns a reference to the stream that will be aggregated by that window.
861    fn target(&self) -> StreamReference;
862
863    /// Returns a reference to the stream in which expression this window occurs.
864    fn caller(&self) -> StreamReference;
865
866    /// Returns the aggregation operation the window uses.
867    fn op(&self) -> WindowOperation;
868
869    /// Returns the type of value the window produces.
870    fn ty(&self) -> &Type;
871
872    /// Returns the memorization bound of the window.
873    fn memory_bound(&self) -> MemorizationBound;
874}
875
876////////// Implementations //////////
877impl Stream for OutputStream {
878    fn spawn_layer(&self) -> Layer {
879        self.layer.spawn_layer()
880    }
881
882    fn eval_layer(&self) -> Layer {
883        self.layer.evaluation_layer()
884    }
885
886    fn name(&self) -> &str {
887        &self.name
888    }
889
890    fn ty(&self) -> &Type {
891        &self.ty
892    }
893
894    fn is_input(&self) -> bool {
895        false
896    }
897
898    fn is_parameterized(&self) -> bool {
899        self.spawn.expression.is_some()
900    }
901
902    fn is_spawned(&self) -> bool {
903        self.spawn.expression.is_some()
904            || self.spawn.condition.is_some()
905            || self.spawn.pacing != PacingType::Constant
906    }
907
908    fn is_closed(&self) -> bool {
909        self.close.condition.is_some()
910    }
911
912    fn is_eval_filtered(&self) -> bool {
913        self.eval
914            .clauses
915            .iter()
916            .any(|eval| eval.condition.is_some())
917    }
918
919    fn values_to_memorize(&self) -> MemorizationBound {
920        self.memory_bound
921    }
922
923    fn as_stream_ref(&self) -> StreamReference {
924        self.reference
925    }
926
927    fn accessed_by(&self) -> &Accesses {
928        &self.accessed_by
929    }
930
931    fn aggregated_by(&self) -> &[(StreamReference, Origin, WindowReference)] {
932        &self.aggregated_by
933    }
934
935    fn aggregates(&self) -> &[(StreamReference, Origin, WindowReference)] {
936        &self.aggregates
937    }
938
939    fn tags(&self) -> &HashMap<String, Option<String>> {
940        &self.tags
941    }
942
943    #[cfg(feature = "spanned")]
944    fn tags_span(&self) -> &HashMap<String, Span> {
945        &self.tags_span
946    }
947}
948
949impl Stream for InputStream {
950    fn spawn_layer(&self) -> Layer {
951        self.layer.spawn_layer()
952    }
953
954    fn eval_layer(&self) -> Layer {
955        self.layer.evaluation_layer()
956    }
957
958    fn name(&self) -> &str {
959        &self.name
960    }
961
962    fn ty(&self) -> &Type {
963        &self.ty
964    }
965
966    fn is_input(&self) -> bool {
967        true
968    }
969
970    fn is_parameterized(&self) -> bool {
971        false
972    }
973
974    fn is_spawned(&self) -> bool {
975        false
976    }
977
978    fn is_closed(&self) -> bool {
979        false
980    }
981
982    fn is_eval_filtered(&self) -> bool {
983        false
984    }
985
986    fn values_to_memorize(&self) -> MemorizationBound {
987        self.memory_bound
988    }
989
990    fn as_stream_ref(&self) -> StreamReference {
991        self.reference
992    }
993
994    fn accessed_by(&self) -> &Accesses {
995        &self.accessed_by
996    }
997
998    fn aggregated_by(&self) -> &[(StreamReference, Origin, WindowReference)] {
999        &self.aggregated_by
1000    }
1001
1002    fn aggregates(&self) -> &[(StreamReference, Origin, WindowReference)] {
1003        &self.aggregates
1004    }
1005
1006    fn tags(&self) -> &HashMap<String, Option<String>> {
1007        &self.tags
1008    }
1009
1010    #[cfg(feature = "spanned")]
1011    fn tags_span(&self) -> &HashMap<String, Span> {
1012        &self.tags_span
1013    }
1014}
1015
1016impl Window for SlidingWindow {
1017    fn target(&self) -> StreamReference {
1018        self.target
1019    }
1020
1021    fn caller(&self) -> StreamReference {
1022        self.caller
1023    }
1024
1025    fn op(&self) -> WindowOperation {
1026        self.op
1027    }
1028
1029    fn ty(&self) -> &Type {
1030        &self.ty
1031    }
1032
1033    fn memory_bound(&self) -> MemorizationBound {
1034        self.num_buckets
1035    }
1036}
1037
1038impl Window for DiscreteWindow {
1039    fn target(&self) -> StreamReference {
1040        self.target
1041    }
1042
1043    fn caller(&self) -> StreamReference {
1044        self.caller
1045    }
1046
1047    fn op(&self) -> WindowOperation {
1048        self.op
1049    }
1050
1051    fn ty(&self) -> &Type {
1052        &self.ty
1053    }
1054
1055    fn memory_bound(&self) -> MemorizationBound {
1056        MemorizationBound::Bounded(self.duration as u32)
1057    }
1058}
1059
1060impl Window for InstanceAggregation {
1061    fn target(&self) -> StreamReference {
1062        self.target
1063    }
1064
1065    fn caller(&self) -> StreamReference {
1066        self.caller
1067    }
1068
1069    fn op(&self) -> WindowOperation {
1070        self.aggr.into()
1071    }
1072
1073    fn ty(&self) -> &Type {
1074        &self.ty
1075    }
1076
1077    fn memory_bound(&self) -> MemorizationBound {
1078        MemorizationBound::Bounded(1)
1079    }
1080}
1081
1082impl RtLolaMir {
1083    /// Returns a collection containing a reference to each input stream in the specification.
1084    pub fn input_refs(&self) -> impl Iterator<Item = InputReference> {
1085        0..self.inputs.len()
1086    }
1087
1088    /// Returns a collection containing a reference to each output stream in the specification.
1089    pub fn output_refs(&self) -> impl Iterator<Item = OutputReference> {
1090        0..self.outputs.len()
1091    }
1092
1093    /// Provides mutable access to an input stream.
1094    ///
1095    /// # Panic
1096    /// Panics if `reference` is a [StreamReference::Out].
1097    pub fn input_mut(&mut self, reference: StreamReference) -> &mut InputStream {
1098        match reference {
1099            StreamReference::In(ix) => &mut self.inputs[ix],
1100            StreamReference::Out(_) => {
1101                unreachable!("Called `LolaIR::get_in` with a `StreamReference::OutRef`.")
1102            }
1103        }
1104    }
1105
1106    /// Provides immutable access to an input stream.
1107    ///
1108    /// # Panic
1109    /// Panics if `reference` is a [StreamReference::Out].
1110    pub fn input(&self, reference: StreamReference) -> &InputStream {
1111        match reference {
1112            StreamReference::In(ix) => &self.inputs[ix],
1113            StreamReference::Out(_) => {
1114                unreachable!("Called `LolaIR::get_in` with a `StreamReference::OutRef`.")
1115            }
1116        }
1117    }
1118
1119    /// Provides mutable access to an output stream.
1120    ///
1121    /// # Panic
1122    /// Panics if `reference` is a [StreamReference::In].
1123    pub fn output_mut(&mut self, reference: StreamReference) -> &mut OutputStream {
1124        match reference {
1125            StreamReference::In(_) => {
1126                unreachable!("Called `LolaIR::get_out` with a `StreamReference::InRef`.")
1127            }
1128            StreamReference::Out(ix) => &mut self.outputs[ix],
1129        }
1130    }
1131
1132    /// Provides immutable access to an output stream.
1133    ///
1134    /// # Panic
1135    /// Panics if `reference` is a [StreamReference::In].
1136    pub fn output(&self, reference: StreamReference) -> &OutputStream {
1137        match reference {
1138            StreamReference::In(_) => {
1139                unreachable!("Called `LolaIR::get_out` with a `StreamReference::InRef`.")
1140            }
1141            StreamReference::Out(ix) => &self.outputs[ix],
1142        }
1143    }
1144
1145    /// Provides immutable access to a stream.
1146    pub fn stream(&self, reference: StreamReference) -> &dyn Stream {
1147        match reference {
1148            StreamReference::In(ix) => &self.inputs[ix],
1149            StreamReference::Out(ix) => &self.outputs[ix],
1150        }
1151    }
1152
1153    /// Produces an iterator over all stream references.
1154    pub fn all_streams(&self) -> impl Iterator<Item = StreamReference> {
1155        self.input_refs()
1156            .map(StreamReference::In)
1157            .chain(self.output_refs().map(StreamReference::Out))
1158    }
1159
1160    /// Provides a collection of all output streams representing a trigger.
1161    pub fn all_triggers(&self) -> Vec<&OutputStream> {
1162        self.triggers
1163            .iter()
1164            .map(|t| self.output(t.output_reference))
1165            .collect()
1166    }
1167
1168    /// Provides a collection of all event-driven output streams.
1169    pub fn all_event_driven(&self) -> Vec<&OutputStream> {
1170        self.event_driven
1171            .iter()
1172            .map(|t| self.output(t.reference))
1173            .collect()
1174    }
1175
1176    /// Return true if the specification contains any time-driven features.
1177    /// This includes time-driven streams and time-driven spawn conditions.
1178    pub fn has_time_driven_features(&self) -> bool {
1179        !self.time_driven.is_empty()
1180            || self.outputs.iter().any(|o| {
1181                matches!(
1182                    o.spawn.pacing,
1183                    PacingType::GlobalPeriodic(_) | PacingType::LocalPeriodic(_)
1184                )
1185            })
1186    }
1187
1188    /// Provides a collection of all time-driven output streams.
1189    pub fn all_time_driven(&self) -> Vec<&OutputStream> {
1190        self.time_driven
1191            .iter()
1192            .map(|t| self.output(t.reference))
1193            .collect()
1194    }
1195
1196    /// Provides the activation contion of a event-driven stream and none if the stream is time-driven
1197    pub fn get_ac(&self, sref: StreamReference) -> Option<&ActivationCondition> {
1198        self.event_driven
1199            .iter()
1200            .find(|e| e.reference == sref)
1201            .map(|e| &e.ac)
1202    }
1203
1204    /// Provides immutable access to a discrete window.
1205    ///
1206    /// # Panic
1207    /// Panics if `window` is not a [WindowReference::Discrete].
1208    pub fn discrete_window(&self, window: WindowReference) -> &DiscreteWindow {
1209        match window {
1210            WindowReference::Discrete(x) => &self.discrete_windows[x],
1211            WindowReference::Sliding(_) | WindowReference::Instance(_) => {
1212                panic!("wrong type of window reference passed to getter")
1213            }
1214        }
1215    }
1216
1217    /// Provides immutable access to a instance aggregation.
1218    ///
1219    /// # Panic
1220    /// Panics if `window` is not a [WindowReference::Instance].
1221    pub fn instance_aggregation(&self, window: WindowReference) -> &InstanceAggregation {
1222        match window {
1223            WindowReference::Instance(x) => &self.instance_aggregations[x],
1224            WindowReference::Sliding(_) | WindowReference::Discrete(_) => {
1225                panic!("wrong type of window reference passed to getter")
1226            }
1227        }
1228    }
1229
1230    /// Provides immutable access to a sliding window.
1231    ///
1232    /// # Panic
1233    /// Panics if `window` is not a [WindowReference::Sliding].
1234    pub fn sliding_window(&self, window: WindowReference) -> &SlidingWindow {
1235        match window {
1236            WindowReference::Sliding(x) => &self.sliding_windows[x],
1237            WindowReference::Discrete(_) | WindowReference::Instance(_) => {
1238                panic!("wrong type of window reference passed to getter")
1239            }
1240        }
1241    }
1242
1243    /// Provides immutable access to a window.
1244    pub fn window(&self, window: WindowReference) -> &dyn Window {
1245        match window {
1246            WindowReference::Sliding(x) => &self.sliding_windows[x],
1247            WindowReference::Discrete(x) => &self.discrete_windows[x],
1248            WindowReference::Instance(x) => &self.instance_aggregations[x],
1249        }
1250    }
1251
1252    /// Provides a representation for the evaluation layers of all event-driven output streams.  Each element of the outer `Vec` represents a layer, each element of the inner `Vec` an output stream in the layer.
1253    pub fn get_event_driven_layers(&self) -> Vec<Vec<Task>> {
1254        let mut event_driven_spawns = self
1255            .outputs
1256            .iter()
1257            .filter(|o| matches!(o.spawn.pacing, PacingType::Event(_)))
1258            .peekable();
1259
1260        // Peekable is fine because the filter above does not have side effects
1261        if self.event_driven.is_empty() && event_driven_spawns.peek().is_none() {
1262            return vec![];
1263        }
1264
1265        // Zip eval layer with stream reference.
1266        let streams_with_layers = self.event_driven.iter().map(|s| s.reference).map(|r| {
1267            (
1268                self.output(r).eval_layer().into(),
1269                Task::Evaluate(r.out_ix()),
1270            )
1271        });
1272
1273        let spawns_with_layers = event_driven_spawns
1274            .map(|o| (o.spawn_layer().inner(), Task::Spawn(o.reference.out_ix())));
1275
1276        let tasks_with_layers: Vec<(usize, Task)> =
1277            streams_with_layers.chain(spawns_with_layers).collect();
1278
1279        // Streams are annotated with an evaluation layer. The layer is not minimal, so there might be
1280        // layers without entries and more layers than streams.
1281        // Minimization works as follows:
1282        // a) Find the greatest layer
1283        // b) For each potential layer...
1284        // c) Find streams that would be in it.
1285        // d) If there is none, skip this layer
1286        // e) If there are some, add them as layer.
1287
1288        // a) Find the greatest layer. Maximum must exist because vec cannot be empty.
1289        let max_layer = tasks_with_layers
1290            .iter()
1291            .max_by_key(|(layer, _)| layer)
1292            .unwrap()
1293            .0;
1294
1295        let mut layers = Vec::new();
1296        // b) For each potential layer
1297        for i in 0..=max_layer {
1298            // c) Find streams that would be in it.
1299            let in_layer_i: Vec<Task> = tasks_with_layers
1300                .iter()
1301                .filter_map(|(l, r)| if *l == i { Some(*r) } else { None })
1302                .collect();
1303            if in_layer_i.is_empty() {
1304                // d) If there is none, skip this layer
1305                continue;
1306            } else {
1307                // e) If there are some, add them as layer.
1308                layers.push(in_layer_i);
1309            }
1310        }
1311        layers
1312    }
1313
1314    /// Attempts to compute a schedule for all time-driven streams.
1315    ///
1316    /// # Fail
1317    /// Fails if the resulting schedule would require at least 10^7 deadlines.
1318    pub fn compute_schedule(&self) -> Result<Schedule, String> {
1319        Schedule::from(self)
1320    }
1321
1322    /// Creates a new [RtLolaMirPrinter] for the Mir type `T`. It implements the [Display](std::fmt::Display) Trait for type `T`.
1323    pub fn display<'a, T>(&'a self, target: &'a T) -> RtLolaMirPrinter<'a, T> {
1324        RtLolaMirPrinter::new(self, target)
1325    }
1326
1327    /// Represents the specification as a dependency graph
1328    pub fn dependency_graph(&self) -> DependencyGraph<'_> {
1329        DependencyGraph::new(self)
1330    }
1331
1332    /// Returns the input stream with the given name if it exists.
1333    pub fn get_input_by_name(&self, name: &str) -> Option<&InputStream> {
1334        self.inputs.iter().find(|input| input.name == name)
1335    }
1336
1337    /// Returns the output stream with the given name if it exists.
1338    pub fn get_output_by_name(&self, name: &str) -> Option<&OutputStream> {
1339        self.outputs.iter().find(|output| output.name == name)
1340    }
1341
1342    /// Returns the stream with the given name if it exists.
1343    pub fn get_stream_by_name(&self, name: &str) -> Option<&dyn Stream> {
1344        self.get_input_by_name(name)
1345            .map(|input| {
1346                // clippy likes it this way
1347                let input: &dyn Stream = input;
1348                input
1349            })
1350            .or_else(|| {
1351                self.get_output_by_name(name).map(|output| {
1352                    let output: &dyn Stream = output;
1353                    output
1354                })
1355            })
1356    }
1357}
1358
1359impl Type {
1360    /// Indicates how many bytes a type requires to be stored in memory.
1361    ///
1362    /// Recursive types yield the sum of their sub-type sizes, unsized types panic, and functions do not have a size, so they produce `None`.
1363    /// # Panics
1364    /// Panics if the type is an instance of [Type::Option], [Type::String], or [Type::Bytes] because their size is undetermined.
1365    pub fn size(&self) -> Option<ValSize> {
1366        match self {
1367            Type::Bool => Some(ValSize(1)),
1368            Type::Int(IntTy::Int8) => Some(ValSize(1)),
1369            Type::Int(IntTy::Int16) => Some(ValSize(2)),
1370            Type::Int(IntTy::Int32) => Some(ValSize(4)),
1371            Type::Int(IntTy::Int64) => Some(ValSize(8)),
1372            Type::Int(IntTy::Int128) => Some(ValSize(16)),
1373            Type::Int(IntTy::Int256) => Some(ValSize(32)),
1374            Type::UInt(UIntTy::UInt8) => Some(ValSize(1)),
1375            Type::UInt(UIntTy::UInt16) => Some(ValSize(2)),
1376            Type::UInt(UIntTy::UInt32) => Some(ValSize(4)),
1377            Type::UInt(UIntTy::UInt64) => Some(ValSize(8)),
1378            Type::UInt(UIntTy::UInt128) => Some(ValSize(16)),
1379            Type::UInt(UIntTy::UInt256) => Some(ValSize(32)),
1380            Type::Float(FloatTy::Float32) => Some(ValSize(4)),
1381            Type::Float(FloatTy::Float64) => Some(ValSize(8)),
1382            Type::Fixed(FixedTy::Fixed64_32) | Type::UFixed(FixedTy::Fixed64_32) => {
1383                Some(ValSize(64))
1384            }
1385            Type::Fixed(FixedTy::Fixed32_16) | Type::UFixed(FixedTy::Fixed32_16) => {
1386                Some(ValSize(32))
1387            }
1388            Type::Fixed(FixedTy::Fixed16_8) | Type::UFixed(FixedTy::Fixed16_8) => Some(ValSize(16)),
1389            Type::Option(_) => unimplemented!("Size of option not determined, yet."),
1390            Type::Tuple(t) => {
1391                let size = t.iter().map(|t| Type::size(t).unwrap().0).sum();
1392                Some(ValSize(size))
1393            }
1394            Type::String | Type::Bytes => unimplemented!("Size of Strings not determined, yet."),
1395            Type::Function { .. } => None,
1396        }
1397    }
1398}
1399
1400/// The size of a specific value in bytes.
1401#[derive(Debug, Clone, Copy)]
1402pub struct ValSize(pub u32); // Needs to be reasonably large for compound types.
1403
1404impl From<u8> for ValSize {
1405    fn from(val: u8) -> ValSize {
1406        ValSize(u32::from(val))
1407    }
1408}
1409
1410impl std::ops::Add for ValSize {
1411    type Output = ValSize;
1412
1413    fn add(self, rhs: ValSize) -> ValSize {
1414        ValSize(self.0 + rhs.0)
1415    }
1416}
1417
1418/// Representation of the different stream accesses
1419#[derive(Debug, PartialEq, Eq, Clone, Copy, Serialize, Deserialize, Hash)]
1420pub enum StreamAccessKind {
1421    /// Represents the synchronous access
1422    Sync,
1423    /// Represents the access to a (discrete window)[DiscreteWindow]
1424    ///
1425    /// The argument contains the reference to the (discrete window)[DiscreteWindow] whose value is used in the [Expression].
1426    DiscreteWindow(WindowReference),
1427    /// Represents the access to a (sliding window)[SlidingWindow]
1428    ///
1429    /// The argument contains the reference to the (sliding window)[SlidingWindow] whose value is used in the [Expression].
1430    SlidingWindow(WindowReference),
1431    /// Represents the access to a (instance aggregation)[InstanceAggregation]
1432    ///
1433    /// The argument contains the reference to the (instance aggregation)[InstanceAggregation] whose value is used in the [Expression].
1434    InstanceAggregation(WindowReference),
1435    /// Representation of sample and hold accesses
1436    Hold,
1437    /// Representation of offset accesses
1438    ///
1439    /// The argument contains the [Offset] of the stream access.
1440    Offset(Offset),
1441    /// Represents the optional `get` access.
1442    Get,
1443    /// Represents the update check of a stream, if the target received a new value at this timestamp.
1444    Fresh,
1445}
1446
1447/// Offset used in the lookup expression
1448#[derive(Debug, PartialEq, Eq, Clone, Copy, Serialize, Deserialize, Hash)]
1449pub enum Offset {
1450    /// A strictly positive discrete offset, e.g., `4`, or `42`
1451    Future(u32),
1452    /// A non-negative discrete offset, e.g., `0`, `-4`, or `-42`
1453    Past(u32),
1454}
1455
1456impl PartialOrd for Offset {
1457    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
1458        Some(self.cmp(other))
1459    }
1460}
1461
1462impl Ord for Offset {
1463    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
1464        use std::cmp::Ordering;
1465
1466        use Offset::*;
1467        match (self, other) {
1468            (Past(_), Future(_)) => Ordering::Less,
1469            (Future(_), Past(_)) => Ordering::Greater,
1470            (Future(a), Future(b)) => a.cmp(b),
1471            (Past(a), Past(b)) => b.cmp(a),
1472        }
1473    }
1474}