rtlola_frontend/mir.rs
1//! This module covers the Mid-Level Intermediate Representation (MIR) of an RTLola specification.
2//!
3//! The [RtLolaMir] is specifically designed to allow convenient navigation and access to data. Hence, it is perfect for working *with* the specification
4//! rather than work *on* it.
5//!
6//! # Most Notable Structs and Enums
7//! * [RtLolaMir] is the root data structure representing the specification.
8//! * [OutputStream] represents a single output stream. The data structure is enriched with information regarding streams accessing it or accessed by it and much more. For input streams confer [InputStream].
9//! * [StreamReference] used for referencing streams within the Mir.
10//! * [Spawn] and [Close] contain all information regarding the parametrization, spawning and closing behavior of streams.
11//! * [Eval] contains the information regarding the evaluation condition and the expression of the stream.
12//! * [Expression] represents an expression. It contains its [ExpressionKind] and its type. The latter contains all information specific to a certain kind of expression such as sub-expressions of operators.
13//!
14//! # See Also
15//! * [rtlola_frontend](crate) for an overview regarding different representations.
16//! * [rtlola_frontend::parse](crate::parse) to obtain an [RtLolaMir] for a specification in form of a string or path to a specification file.
17//! * [rtlola_hir::hir::RtLolaHir] for a data structs designed for working _on_it.
18//! * [RtLolaAst](rtlola_parser::RtLolaAst), which is the most basic and down-to-syntax data structure available for RTLola.
19
20mod dependency_graph;
21mod print;
22mod schedule;
23
24use std::collections::HashMap;
25use std::convert::TryInto;
26use std::time::Duration;
27
28use num::traits::Inv;
29pub use print::RtLolaMirPrinter;
30use rtlola_hir::hir::ConcreteValueType;
31pub use rtlola_hir::hir::{
32 InputReference, Layer, MemBoundMode, MemorizationBound, Origin, OutputKind, OutputReference,
33 RtLolaHir, StreamLayers, StreamReference, WindowReference,
34};
35pub use rtlola_parser::ast::Tag;
36#[cfg(feature = "spanned")]
37use rtlola_reporting::Span;
38use rust_decimal::Decimal;
39use serde::{Deserialize, Serialize};
40use uom::si::rational64::{Frequency as UOM_Frequency, Time as UOM_Time};
41use uom::si::time::nanosecond;
42
43pub use self::dependency_graph::DependencyGraph;
44pub use crate::mir::schedule::{Deadline, Schedule, Task};
45
46pub(crate) type Mir = RtLolaMir;
47
48/// A trait for any kind of stream.
49pub trait Stream {
50 /// Reports the evaluation layer of the spawn condition of the stream.
51 fn spawn_layer(&self) -> Layer;
52 /// Reports the evaluation layer of the stream.
53 fn eval_layer(&self) -> Layer;
54 /// Reports the name of the stream.
55 fn name(&self) -> &str;
56 /// Returns the type of the stream.
57 fn ty(&self) -> &Type;
58 /// Indicates whether or not the stream is an input stream.
59 fn is_input(&self) -> bool;
60 /// Indicates whether or not the stream has parameters.
61 fn is_parameterized(&self) -> bool;
62 /// Indicates whether or not the stream spawned / dynamically created.
63 fn is_spawned(&self) -> bool;
64 /// Indicates whether or not the stream is closed.
65 fn is_closed(&self) -> bool;
66 /// Indicated whether or not the stream is filtered.
67 fn is_eval_filtered(&self) -> bool;
68 /// Indicates how many values of the stream's [Type] need to be memorized.
69 fn values_to_memorize(&self) -> MemorizationBound;
70 /// Produces a stream references referring to the stream.
71 fn as_stream_ref(&self) -> StreamReference;
72 /// Returns the collection of streams that access the stream non-transitively.
73 fn accessed_by(&self) -> &Accesses;
74 /// Returns the collection of sliding windows that access the stream non-transitively.
75 /// This includes both sliding and discrete windows.
76 fn aggregated_by(&self) -> &[(StreamReference, Origin, WindowReference)];
77 /// Returns the collection of sliding windows that are accessed by the stream non-transitively.
78 /// This includes both sliding and discrete windows.
79 fn aggregates(&self) -> &[(StreamReference, Origin, WindowReference)];
80 /// Returns the tags annotated to this stream.
81 fn tags(&self) -> &HashMap<String, Option<String>>;
82 #[cfg(feature = "spanned")]
83 /// Returns the spans of all tags annotated to this stream.
84 fn tags_span(&self) -> &HashMap<String, Span>;
85}
86
87/// This struct constitutes the Mid-Level Intermediate Representation (MIR) of an RTLola specification.
88///
89/// The [RtLolaMir] is specifically designed to allow convenient navigation and access to data. Hence, it is perfect for working _with_ the specification
90/// rather than work _on_ it.
91///
92/// # Most Notable Structs and Enums
93/// * [Stream] is a trait offering several convenient access methods for everything constituting a stream.
94/// * [OutputStream] represents a single output stream. The data structure is enriched with information regarding streams accessing it or accessed by it and much more. For input streams confer [InputStream].
95/// * [StreamReference] used for referencing streams within the Mir.
96/// * [Spawn] and [Close] contain all information regarding the parametrization, spawning and closing behavior of streams.
97/// * [Eval] contains the information regarding the evaluation condition and the expression of the stream. The [Expression] represents an computational evaluation. It contains its [ExpressionKind] and its type. The latter contains all information specific to a certain kind of expression such as sub-expressions of operators.
98///
99/// # See Also
100/// * [rtlola_frontend](crate) for an overview regarding different representations.
101/// * [rtlola_frontend::parse](crate::parse) to obtain an [RtLolaMir] for a specification in form of a string or path to a specification file.
102/// * [rtlola_hir::hir::RtLolaHir] for a data structs designed for working _on_it.
103/// * [RtLolaAst](rtlola_parser::RtLolaAst), which is the most basic and down-to-syntax data structure available for RTLola.
104#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
105pub struct RtLolaMir {
106 /// Contains all input streams.
107 pub inputs: Vec<InputStream>,
108 /// Contains all output streams including all triggers. They only contain the information relevant for every single kind of output stream. Refer to [RtLolaMir::time_driven], [RtLolaMir::event_driven],
109 /// and [RtLolaMir::triggers] for more information.
110 pub outputs: Vec<OutputStream>,
111 /// References and pacing information of all time-driven streams.
112 pub time_driven: Vec<TimeDrivenStream>,
113 /// References and pacing information of all event-driven streams.
114 pub event_driven: Vec<EventDrivenStream>,
115 /// A collection of all discrete windows.
116 pub discrete_windows: Vec<DiscreteWindow>,
117 /// A collection of all sliding windows.
118 pub sliding_windows: Vec<SlidingWindow>,
119 /// A collection of all instance aggregations.
120 pub instance_aggregations: Vec<InstanceAggregation>,
121 /// The references of all outputs that represent triggers
122 pub triggers: Vec<Trigger>,
123 /// The global tags of the specification
124 pub global_tags: Tags,
125 #[cfg(feature = "spanned")]
126 /// The span's of the global tags
127 pub global_tags_span: HashMap<String, Span>,
128}
129
130/// Represents an RTLola value type. This does not including pacing information, for this refer to [TimeDrivenStream] and [EventDrivenStream].
131#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
132pub enum Type {
133 /// A boolean type
134 Bool,
135 /// An integer type of fixed bit-width
136 Int(IntTy),
137 /// An unsigned integer type of fixed bit-width
138 UInt(UIntTy),
139 /// A floating point type of fixed bit-width
140 Float(FloatTy),
141 /// A signed fixed point type of fixed bit-width
142 Fixed(FixedTy),
143 /// An unsigned fixed point type of fixed bit-width
144 UFixed(FixedTy),
145 /// A unicode string
146 String,
147 /// A sequence of 8-bit bytes
148 Bytes,
149 /// An n-ary tuples where n is the length of the contained vector
150 Tuple(Vec<Type>),
151 /// An optional value type, e.g., resulting from accessing a past value of a stream
152 Option(Box<Type>),
153 /// A type describing a function
154 Function {
155 /// The types of the arguments to the function, monomorphized
156 args: Vec<Type>,
157 /// The monomorphized return type of the function
158 ret: Box<Type>,
159 },
160}
161
162/// Represents an RTLola pacing type.
163#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
164pub enum PacingType {
165 /// Represents a periodic pacing with a fixed global frequency
166 GlobalPeriodic(UOM_Frequency),
167 /// Represents a periodic pacing with a fixed local frequency
168 LocalPeriodic(UOM_Frequency),
169 /// Represents an event based pacing defined by an [ActivationCondition]
170 Event(ActivationCondition),
171 /// The pacing is constant, meaning that the value is always present.
172 Constant,
173}
174
175#[allow(missing_docs)]
176#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
177pub enum IntTy {
178 /// Represents an 8-bit integer.
179 Int8,
180 /// Represents a 16-bit integer.
181 Int16,
182 /// Represents a 32-bit integer.
183 Int32,
184 /// Represents a 64-bit integer.
185 Int64,
186 /// Represents a 128-bit integer.
187 Int128,
188 /// Represents a 256-bit integer.
189 Int256,
190}
191
192#[allow(missing_docs)]
193#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
194pub enum UIntTy {
195 /// Represents an 8-bit unsigned integer.
196 UInt8,
197 /// Represents a 16-bit unsigned integer.
198 UInt16,
199 /// Represents a 32-bit unsigned integer.
200 UInt32,
201 /// Represents a 64-bit unsigned integer.
202 UInt64,
203 /// Represents a 128-bit unsigned integer.
204 UInt128,
205 /// Represents a 256-bit unsigned integer.
206 UInt256,
207}
208
209#[allow(missing_docs)]
210#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
211pub enum FloatTy {
212 /// Represents a 32-bit floating point number.
213 Float32,
214 /// Represents a 64-bit floating point number.
215 Float64,
216}
217
218#[allow(missing_docs)]
219#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
220pub enum FixedTy {
221 /// Represents a 64-bit fixed point number with 32 integer bits and 32 fractional bits
222 Fixed64_32,
223 /// Represents a 32-bit fixed point number with 16 integer bits and 16 fractional bits
224 Fixed32_16,
225 /// Represents a 16-bit fixed point number with 16 integer bits and 8 fractional bits
226 Fixed16_8,
227}
228
229impl From<ConcreteValueType> for Type {
230 fn from(ty: ConcreteValueType) -> Type {
231 match ty {
232 ConcreteValueType::Integer8 => Type::Int(IntTy::Int8),
233 ConcreteValueType::Integer16 => Type::Int(IntTy::Int16),
234 ConcreteValueType::Integer32 => Type::Int(IntTy::Int32),
235 ConcreteValueType::Integer64 => Type::Int(IntTy::Int64),
236 ConcreteValueType::UInteger8 => Type::UInt(UIntTy::UInt8),
237 ConcreteValueType::UInteger16 => Type::UInt(UIntTy::UInt16),
238 ConcreteValueType::UInteger32 => Type::UInt(UIntTy::UInt32),
239 ConcreteValueType::UInteger64 => Type::UInt(UIntTy::UInt64),
240 ConcreteValueType::Float32 => Type::Float(FloatTy::Float32),
241 ConcreteValueType::Float64 => Type::Float(FloatTy::Float64),
242 ConcreteValueType::Tuple(t) => Type::Tuple(t.into_iter().map(Type::from).collect()),
243 ConcreteValueType::TString => Type::String,
244 ConcreteValueType::Byte => Type::Bytes,
245 ConcreteValueType::Option(o) => Type::Option(Box::new(Type::from(*o))),
246 _ => unreachable!("cannot lower `ValueTy` {}", ty),
247 }
248 }
249}
250
251type Accesses = Vec<(StreamReference, Vec<(Origin, StreamAccessKind)>)>;
252
253/// Contains all information inherent to an input stream.
254#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
255pub struct InputStream {
256 /// The name of the stream
257 pub name: String,
258 /// The value type of the stream. Note that its pacing is always pre-determined.
259 pub ty: Type,
260 /// The collection of streams that access the current stream non-transitively
261 pub accessed_by: Accesses,
262 /// The collection of sliding windows that access this stream non-transitively. This includes both sliding and discrete windows.
263 pub aggregated_by: Vec<(StreamReference, Origin, WindowReference)>,
264 /// The collection of windows that is accessed by this stream. This includes both sliding and discrete windows.
265 pub aggregates: Vec<(StreamReference, Origin, WindowReference)>,
266 /// Provides the evaluation of layer of this stream.
267 pub layer: StreamLayers,
268 /// Provides the number of values of this stream's type that need to be memorized. Refer to [Type::size] to get a type's byte-size.
269 pub memory_bound: MemorizationBound,
270 /// The reference referring to this stream
271 pub reference: StreamReference,
272 /// The tags annotated to this stream.
273 pub tags: Tags,
274 #[cfg(feature = "spanned")]
275 /// The span of the tags annotated to the input stream
276 pub tags_span: HashMap<String, Span>,
277 #[cfg(feature = "spanned")]
278 /// The span of the input stream definition
279 pub span: Span,
280}
281
282/// Contains all information relevant to every kind of output stream.
283///
284/// Refer to [TimeDrivenStream], [EventDrivenStream], and [Trigger], as well as their respective fields in the Mir for additional information.
285#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
286pub struct OutputStream {
287 /// The name of the stream.
288 pub name: String,
289 /// The kind of the output (regular output or trigger)
290 pub kind: OutputKind,
291 /// The value type of the stream.
292 pub ty: Type,
293 /// Information on the spawn behavior of the stream
294 pub spawn: Spawn,
295 /// Information on the evaluation behavior of the stream
296 pub eval: Eval,
297 /// The condition under which the stream is supposed to be closed
298 pub close: Close,
299 /// The collection of streams this stream accesses non-transitively. Includes this stream's spawn, evaluation condition, and close expressions.
300 pub accesses: Accesses,
301 /// The collection of streams that access the current stream non-transitively
302 pub accessed_by: Accesses,
303 /// The collection of windows that access this stream non-transitively. This includes both sliding and discrete windows.
304 pub aggregated_by: Vec<(StreamReference, Origin, WindowReference)>,
305 /// The collection of windows that is accessed by this stream. This includes both sliding and discrete windows.
306 pub aggregates: Vec<(StreamReference, Origin, WindowReference)>,
307 /// Provides the number of values of this stream's type that need to be memorized. Refer to [Type::size] to get a type's byte-size.
308 pub memory_bound: MemorizationBound,
309 /// Provides the evaluation of layer of this stream.
310 pub layer: StreamLayers,
311 /// The reference referring to this stream
312 pub reference: StreamReference,
313 /// The parameters of a parameterized output stream; The vector is empty in non-parametrized streams
314 pub params: Vec<Parameter>,
315 /// The tags annotated to this stream.
316 pub tags: Tags,
317 #[cfg(feature = "spanned")]
318 /// The span of the tags annotated to the output stream
319 pub tags_span: HashMap<String, Span>,
320 #[cfg(feature = "spanned")]
321 /// The span of the output stream definition
322 pub span: Span,
323}
324
325/// A trigger (represented by the output stream `output_reference`)
326#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Copy)]
327pub struct Trigger {
328 /// The reference of the output stream representing this trigger
329 pub output_reference: StreamReference,
330 /// The reference of this trigger
331 pub trigger_reference: TriggerReference,
332}
333
334impl OutputStream {
335 fn is_trigger(&self) -> bool {
336 matches!(self.kind, OutputKind::Trigger(_))
337 }
338}
339
340type Tags = HashMap<String, Option<String>>;
341
342/// A type alias for references to triggers.
343pub type TriggerReference = usize;
344
345/// Information on the spawn behavior of a stream
346#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
347pub struct Spawn {
348 /// The expression needs to be evaluated whenever the stream with this Spawn template is supposed to be spawned. The result of the evaluation constitutes the respective parameters.
349 pub expression: Option<Expression>,
350 /// The timing of when a new instance _could_ be created assuming the spawn condition evaluates to true.
351 pub pacing: PacingType,
352 /// The spawn condition. If the condition evaluates to false, the stream will not be spawned.
353 pub condition: Option<Expression>,
354 #[cfg(feature = "spanned")]
355 /// The span of the spawn clause
356 pub span: Span,
357}
358
359impl Default for Spawn {
360 fn default() -> Self {
361 Spawn {
362 expression: None,
363 pacing: PacingType::Constant,
364 condition: None,
365 #[cfg(feature = "spanned")]
366 span: Span::Unknown,
367 }
368 }
369}
370
371/// Information on the close behavior of a stream
372#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
373pub struct Close {
374 /// The `condition` expression needs to be evaluated whenever the stream with this Close template is supposed to be closed. The result of the evaluation constitutes whether the stream is closed.
375 pub condition: Option<Expression>,
376 /// The timing of the close condition.
377 pub pacing: PacingType,
378 /// Indicates whether the close condition contains a reference to the stream it belongs to.
379 pub has_self_reference: bool,
380 #[cfg(feature = "spanned")]
381 /// The span of the close clause
382 pub span: Span,
383}
384
385impl Default for Close {
386 fn default() -> Self {
387 Close {
388 condition: None,
389 pacing: PacingType::Constant,
390 has_self_reference: false,
391 #[cfg(feature = "spanned")]
392 span: Span::Unknown,
393 }
394 }
395}
396
397/// Information on the evaluation behavior of a stream
398#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
399pub struct Eval {
400 /// The eval clauses of the stream.
401 pub clauses: Vec<EvalClause>,
402 /// The eval pacing of the stream, combining the condition and expr pacings of all eval clauses
403 pub eval_pacing: PacingType,
404}
405
406/// Information on an eval clause of a stream
407#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
408pub struct EvalClause {
409 /// The expression of this stream needs to be evaluated whenever this condition evaluates to `True`.
410 pub condition: Option<Expression>,
411 /// The evaluation expression of this stream, defining the returned and accessed value.
412 pub expression: Expression,
413 /// The eval pacing of the stream, combining the condition and expr pacings of the clause.
414 pub pacing: PacingType,
415 #[cfg(feature = "spanned")]
416 /// The span of the eval clause
417 pub span: Span,
418}
419
420/// Information of a parameter of a parametrized output stream
421#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
422pub struct Parameter {
423 /// The name of the parameter.
424 pub name: String,
425 /// The type of the parameter.
426 pub ty: Type,
427 /// The index of the parameter.
428 pub idx: usize,
429 #[cfg(feature = "spanned")]
430 /// The span of the parameter
431 pub span: Span,
432}
433
434/// Wrapper for output streams providing additional information specific to time-driven streams.
435#[derive(Debug, PartialEq, Eq, Clone, Copy, Serialize, Deserialize)]
436pub struct TimeDrivenStream {
437 /// A reference to the stream that is specified.
438 pub reference: StreamReference,
439 /// The evaluation frequency of the stream.
440 pub frequency: UOM_Frequency,
441 /// Whether the given frequency is relative to a dynamic spawn
442 pub locality: PacingLocality,
443}
444
445#[derive(Debug, PartialEq, Eq, Clone, Copy, Serialize, Deserialize)]
446/// Describes if the pacing is interpreted relatively to a dynamic spawn
447pub enum PacingLocality {
448 /// The pacing is relative to a global clock
449 Global,
450 /// The pacing is relative to the spawn
451 Local,
452}
453
454impl TimeDrivenStream {
455 /// Returns the evaluation period, i.e., the multiplicative inverse of [TimeDrivenStream::frequency].
456 pub fn period(&self) -> UOM_Time {
457 UOM_Time::new::<uom::si::time::second>(
458 self.frequency.get::<uom::si::frequency::hertz>().inv(),
459 )
460 }
461
462 /// Returns the evaluation frequency.
463 pub fn frequency(&self) -> UOM_Frequency {
464 self.frequency
465 }
466
467 /// Returns the evaluation period, i.e., the multiplicative inverse of [TimeDrivenStream::frequency], as [Duration].
468 pub fn period_in_duration(&self) -> Duration {
469 Duration::from_nanos(
470 self.period()
471 .get::<nanosecond>()
472 .to_integer()
473 .try_into()
474 .expect("Period [ns] too large for u64!"),
475 )
476 }
477}
478
479/// Wrapper for output streams providing additional information specific to event-based streams.
480#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
481pub struct EventDrivenStream {
482 /// A reference to the stream that is specified
483 pub reference: StreamReference,
484 /// The activation condition of an event-based stream
485 pub ac: ActivationCondition,
486}
487
488/// Representation of the activation condition of event-based entities such as streams or spawn conditions
489#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
490pub enum ActivationCondition {
491 /// Activate when all entries of the [Vec] are true.
492 Conjunction(Vec<Self>),
493 /// Activate when at least one entry of the [Vec] is true.
494 Disjunction(Vec<Self>),
495 /// Activate when the referenced stream is evaluated.
496 Stream(StreamReference),
497 /// Activate
498 True,
499}
500
501/// Represents an expression
502#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
503pub struct Expression {
504 /// The kind and all kind-specific information of the expression
505 pub kind: ExpressionKind,
506 /// The type of the expression
507 pub ty: Type,
508 #[cfg(feature = "spanned")]
509 /// The span of the expression
510 pub span: Span,
511}
512
513/// This enum contains all possible kinds of expressions and their relevant information.
514#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
515pub enum ExpressionKind {
516 /// Load a constant value
517 LoadConstant(Constant),
518 /// Apply an arithmetic or logic operation. The function is monomorphized.
519 ///
520 /// *Note:* Arguments never need to be coerced.
521 /// Unary: 1st argument -> operand
522 /// Binary: 1st argument -> lhs, 2nd argument -> rhs
523 /// n-ary: kth argument -> kth operand
524 ArithLog(ArithLogOp, Vec<Expression>),
525 /// Access another stream
526 StreamAccess {
527 /// The target stream to be accessed
528 target: StreamReference,
529 /// The parameters of the specific stream instance that is accessed.
530 ///
531 /// If the stream behind `target` is not parametrized, this collection is empty.
532 parameters: Vec<Expression>,
533 /// The kind of access
534 access_kind: StreamAccessKind,
535 },
536 /// Access to the parameter of a stream represented by a stream reference,
537 /// referencing the target stream and the index of the parameter that should be accessed.
538 ParameterAccess(StreamReference, usize),
539 /// Access to the lambda parameter in the filtered instance aggregation
540 LambdaParameterAccess {
541 /// Reference to the instance aggregation using the lambda function
542 wref: WindowReference,
543 /// Reference to the parameter
544 pref: usize,
545 },
546 /// A conditional (if-then-else) expression
547 Ite {
548 /// The condition under which either `consequence` or `alternative` is selected.
549 condition: Box<Expression>,
550 /// The consequence should be evaluated and returned if the condition evaluates to true.
551 consequence: Box<Expression>,
552 /// The alternative should be evaluated and returned if the condition evaluates to false.
553 alternative: Box<Expression>,
554 },
555 /// A tuple expression
556 Tuple(Vec<Expression>),
557 /// Represents a tuple projections, i.e., it accesses a specific tuple element.
558 // The expression produces a tuple and the `usize` is the index of the accessed element. This value is constant.
559 TupleAccess(Box<Expression>, usize),
560 /// Represents a function call. The function is monomorphized.
561 ///
562 /// *Note:* Arguments never need to be coerced.
563 /// Unary: 1st argument -> operand
564 /// Binary: 1st argument -> lhs, 2nd argument -> rhs
565 /// n-ary: kth argument -> kth operand
566 Function(String, Vec<Expression>),
567 /// Converting a value to a different type
568 ///
569 /// The result type is indicated in the expression with the `Convert` kind.
570 Convert {
571 /// The expression that produces a value. The type of the expression indicates the source of the conversion.
572 expr: Box<Expression>,
573 },
574 /// Transforms an optional value into a definitive one
575 Default {
576 /// The expression that results in an optional value.
577 expr: Box<Expression>,
578 /// An infallible expression providing the default value if `expr` fails to produce a value.
579 default: Box<Expression>,
580 },
581}
582
583/// Represents a constant value of a certain kind.
584///
585/// *Note* the type of the constant might be more general than the type of the constant. For example, `Constant::UInt(3u64)` represents an RTLola UInt8 constant.
586#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
587pub enum Constant {
588 #[allow(missing_docs)]
589 Str(String),
590 #[allow(missing_docs)]
591 Bool(bool),
592 #[allow(missing_docs)]
593 UInt(u64),
594 #[allow(missing_docs)]
595 Int(i64),
596 #[allow(missing_docs)]
597 Float(f64),
598 #[allow(missing_docs)]
599 Decimal(Decimal),
600}
601
602/// Arithmetical and logical operations
603#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
604pub enum ArithLogOp {
605 /// Logic negation (!)
606 Not,
607 /// Arithmetic negation (-)
608 Neg,
609 /// Arithmetic addition (+)
610 Add,
611 /// Arithmetic subtraction (-)
612 Sub,
613 /// Arithmetic multiplication (*)
614 Mul,
615 /// Arithmetic division (/)
616 Div,
617 /// Arithmetic modulation (%)
618 Rem,
619 /// Arithmetic exponentiation (**)
620 Pow,
621 /// Logic conjunction/multiplication (&&)
622 And,
623 /// Logic disjunction/addition (||)
624 Or,
625 /// Bit-wise xor (^)
626 BitXor,
627 /// Bit-wise conjunction/multiplication (&)
628 BitAnd,
629 /// Bit-wise disjunction/addition (|)
630 BitOr,
631 /// Bit-wise negation / One's complement (~)
632 BitNot,
633 /// Bit-wise left-shift (<<)
634 Shl,
635 /// Bit-wise right-shift (>>)
636 Shr,
637 /// Semantic Equality (==)
638 Eq,
639 /// Less-than comparison (<)
640 Lt,
641 /// Less-than-or-equal comparison (<=)
642 Le,
643 /// Semantic Inequality (!=)
644 Ne,
645 /// Greater-than-or-equal comparison (>=)
646 Ge,
647 /// Greater-than comparison (>)
648 Gt,
649}
650
651/// Represents an instance of a discrete window
652#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
653pub struct DiscreteWindow {
654 /// The stream whose values will be aggregated
655 pub target: StreamReference,
656 /// The stream in which expression this window occurs
657 pub caller: StreamReference,
658 /// The duration over which the window aggregates
659 pub duration: usize,
660 /// Indicates whether or not the first aggregated value will be produced immediately or whether the window waits until `duration` number of values have been observed.
661 pub wait: bool,
662 /// The aggregation operation
663 pub op: WindowOperation,
664 /// A reference to this discrete window
665 pub reference: WindowReference,
666 /// The type of value the window produces
667 pub ty: Type,
668 /// The origin of the discrete window expression
669 pub origin: Origin,
670 /// The pacing of the discrete window expression
671 pub pacing: PacingType,
672}
673
674/// Represents an instance of a sliding window
675#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
676pub struct SlidingWindow {
677 /// The stream whose values will be aggregated
678 pub target: StreamReference,
679 /// The stream in which expression this window occurs
680 pub caller: StreamReference,
681 /// The duration over which the window aggregates
682 pub duration: Duration,
683 /// The number of buckets that are needed for the window
684 pub num_buckets: MemorizationBound,
685 /// The time per bucket of the window
686 pub bucket_size: Duration,
687 /// Indicates whether or not the first aggregated value will be produced immediately or whether the window waits until `duration` has passed at least once
688 pub wait: bool,
689 /// The aggregation operation
690 pub op: WindowOperation,
691 /// A reference to this sliding window
692 pub reference: WindowReference,
693 /// The type of value the window produces
694 pub ty: Type,
695 /// The origin of the sliding window expression
696 pub origin: Origin,
697 /// The pacing of the sliding window expression
698 pub pacing: PacingType,
699}
700
701/// Represents an instance of an instance aggregation
702#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
703pub struct InstanceAggregation {
704 /// The stream whose values will be aggregated
705 pub target: StreamReference,
706 /// The stream calling and evaluating this window
707 pub caller: StreamReference,
708 /// A filter over the instances
709 pub selection: InstanceSelection,
710 /// The operation to be performed over the instances
711 pub aggr: InstanceOperation,
712 /// The reference of this window.
713 pub reference: WindowReference,
714 /// The type of value the window produces
715 pub ty: Type,
716 /// The origin of the instance window expression
717 pub origin: Origin,
718 /// The pacing of the instance aggregation expression
719 pub pacing: PacingType,
720}
721
722#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
723/// Enum to indicate which instances are part of the aggregation
724pub enum InstanceSelection {
725 /// Only instances that are updated in this evaluation cycle are part of the aggregation
726 Fresh,
727 /// All instances are part of the aggregation
728 All,
729 /// Only instances that are updated in this evaluation cycle and satisfy the condition are part of the aggregation
730 FilteredFresh {
731 /// The parameters of the lambda expression
732 parameters: Vec<Parameter>,
733 /// The condition that needs to be satisfied
734 cond: Box<Expression>,
735 },
736 /// All instances that satisfy the condition are part of the aggregation
737 FilteredAll {
738 /// The parameters of the lambda expression
739 parameters: Vec<Parameter>,
740 /// The condition that needs to be satisfied
741 cond: Box<Expression>,
742 },
743}
744
745impl InstanceSelection {
746 /// Accesses the condition to a filtered instance aggregation. Returns None if the instance aggregation is not filtered
747 pub fn condition(&self) -> Option<&Expression> {
748 match self {
749 InstanceSelection::Fresh | InstanceSelection::All => None,
750 InstanceSelection::FilteredFresh {
751 parameters: _,
752 cond,
753 }
754 | InstanceSelection::FilteredAll {
755 parameters: _,
756 cond,
757 } => Some(cond),
758 }
759 }
760
761 /// Accesses the parameters to a filtered instance aggregation. Returns None if the instance aggregation is not filtered
762 pub fn parameters(&self) -> Option<&Vec<Parameter>> {
763 match self {
764 InstanceSelection::Fresh | InstanceSelection::All => None,
765 InstanceSelection::FilteredFresh {
766 parameters,
767 cond: _,
768 }
769 | InstanceSelection::FilteredAll {
770 parameters,
771 cond: _,
772 } => Some(parameters),
773 }
774 }
775}
776
777#[derive(Debug, PartialEq, Eq, Clone, Copy, Hash, Serialize, Deserialize)]
778/// A subset of the window operations that are suitable to be performed over a set of instances.
779pub enum InstanceOperation {
780 /// Aggregation function to count the number of instances of the accessed stream
781 Count,
782 /// Aggregation function to return the minimum
783 Min,
784 /// Aggregation function to return the minimum
785 Max,
786 /// Aggregation function to return the addition
787 Sum,
788 /// Aggregation function to return the product
789 Product,
790 /// Aggregation function to return the average
791 Average,
792 /// Aggregation function to return the conjunction, i.e., the instances aggregation returns true iff ALL current values of the instances of the accessed stream are assigned to true
793 Conjunction,
794 /// Aggregation function to return the disjunction, i.e., the instances aggregation returns true iff ANY current values of the instances of the accessed stream are assigned to true
795 Disjunction,
796 /// Aggregation function to return the variance of all values, assumes equal probability.
797 Variance,
798 /// Aggregation function to return the covariance of all values in a tuple stream, assumes equal probability.
799 Covariance,
800 /// Aggregation function to return the standard deviation of all values, assumes equal probability.
801 StandardDeviation,
802 /// Aggregation function to return the Nth-Percentile
803 NthPercentile(u8),
804}
805
806#[derive(Debug, PartialEq, Eq, Clone, Copy, Serialize, Deserialize)]
807/// The Ast representation of the different aggregation functions
808pub enum WindowOperation {
809 /// Aggregation function to count the number of updated values on the accessed stream
810 Count,
811 /// Aggregation function to return the minimum
812 Min,
813 /// Aggregation function to return the minimum
814 Max,
815 /// Aggregation function to return the addition
816 Sum,
817 /// Aggregation function to return the product
818 Product,
819 /// Aggregation function to return the average
820 Average,
821 /// Aggregation function to return the integral
822 Integral,
823 /// Aggregation function to return the conjunction, i.e., the sliding window returns true iff ALL values on the accessed stream inside a window are assigned to true
824 Conjunction,
825 /// Aggregation function to return the disjunction, i.e., the sliding window returns true iff AT LEAst ONE value on the accessed stream inside a window is assigned to true
826 Disjunction,
827 /// Aggregation function to return the last value, a time bounded hold
828 Last,
829 /// Aggregation function to return the variance of all values, assumes equal probability.
830 Variance,
831 /// Aggregation function to return the covariance of all values in a tuple stream, assumes equal probability.
832 Covariance,
833 /// Aggregation function to return the standard deviation of all values, assumes equal probability.
834 StandardDeviation,
835 /// Aggregation function to return the Nth-Percentile
836 NthPercentile(u8),
837}
838
839impl From<InstanceOperation> for WindowOperation {
840 fn from(value: InstanceOperation) -> Self {
841 match value {
842 InstanceOperation::Count => WindowOperation::Count,
843 InstanceOperation::Min => WindowOperation::Min,
844 InstanceOperation::Max => WindowOperation::Max,
845 InstanceOperation::Sum => WindowOperation::Sum,
846 InstanceOperation::Product => WindowOperation::Product,
847 InstanceOperation::Average => WindowOperation::Average,
848 InstanceOperation::Conjunction => WindowOperation::Conjunction,
849 InstanceOperation::Disjunction => WindowOperation::Disjunction,
850 InstanceOperation::Variance => WindowOperation::Variance,
851 InstanceOperation::Covariance => WindowOperation::Covariance,
852 InstanceOperation::StandardDeviation => WindowOperation::StandardDeviation,
853 InstanceOperation::NthPercentile(x) => WindowOperation::NthPercentile(x),
854 }
855 }
856}
857
858/// A trait for any kind of window
859pub trait Window {
860 /// Returns a reference to the stream that will be aggregated by that window.
861 fn target(&self) -> StreamReference;
862
863 /// Returns a reference to the stream in which expression this window occurs.
864 fn caller(&self) -> StreamReference;
865
866 /// Returns the aggregation operation the window uses.
867 fn op(&self) -> WindowOperation;
868
869 /// Returns the type of value the window produces.
870 fn ty(&self) -> &Type;
871
872 /// Returns the memorization bound of the window.
873 fn memory_bound(&self) -> MemorizationBound;
874}
875
876////////// Implementations //////////
877impl Stream for OutputStream {
878 fn spawn_layer(&self) -> Layer {
879 self.layer.spawn_layer()
880 }
881
882 fn eval_layer(&self) -> Layer {
883 self.layer.evaluation_layer()
884 }
885
886 fn name(&self) -> &str {
887 &self.name
888 }
889
890 fn ty(&self) -> &Type {
891 &self.ty
892 }
893
894 fn is_input(&self) -> bool {
895 false
896 }
897
898 fn is_parameterized(&self) -> bool {
899 self.spawn.expression.is_some()
900 }
901
902 fn is_spawned(&self) -> bool {
903 self.spawn.expression.is_some()
904 || self.spawn.condition.is_some()
905 || self.spawn.pacing != PacingType::Constant
906 }
907
908 fn is_closed(&self) -> bool {
909 self.close.condition.is_some()
910 }
911
912 fn is_eval_filtered(&self) -> bool {
913 self.eval
914 .clauses
915 .iter()
916 .any(|eval| eval.condition.is_some())
917 }
918
919 fn values_to_memorize(&self) -> MemorizationBound {
920 self.memory_bound
921 }
922
923 fn as_stream_ref(&self) -> StreamReference {
924 self.reference
925 }
926
927 fn accessed_by(&self) -> &Accesses {
928 &self.accessed_by
929 }
930
931 fn aggregated_by(&self) -> &[(StreamReference, Origin, WindowReference)] {
932 &self.aggregated_by
933 }
934
935 fn aggregates(&self) -> &[(StreamReference, Origin, WindowReference)] {
936 &self.aggregates
937 }
938
939 fn tags(&self) -> &HashMap<String, Option<String>> {
940 &self.tags
941 }
942
943 #[cfg(feature = "spanned")]
944 fn tags_span(&self) -> &HashMap<String, Span> {
945 &self.tags_span
946 }
947}
948
949impl Stream for InputStream {
950 fn spawn_layer(&self) -> Layer {
951 self.layer.spawn_layer()
952 }
953
954 fn eval_layer(&self) -> Layer {
955 self.layer.evaluation_layer()
956 }
957
958 fn name(&self) -> &str {
959 &self.name
960 }
961
962 fn ty(&self) -> &Type {
963 &self.ty
964 }
965
966 fn is_input(&self) -> bool {
967 true
968 }
969
970 fn is_parameterized(&self) -> bool {
971 false
972 }
973
974 fn is_spawned(&self) -> bool {
975 false
976 }
977
978 fn is_closed(&self) -> bool {
979 false
980 }
981
982 fn is_eval_filtered(&self) -> bool {
983 false
984 }
985
986 fn values_to_memorize(&self) -> MemorizationBound {
987 self.memory_bound
988 }
989
990 fn as_stream_ref(&self) -> StreamReference {
991 self.reference
992 }
993
994 fn accessed_by(&self) -> &Accesses {
995 &self.accessed_by
996 }
997
998 fn aggregated_by(&self) -> &[(StreamReference, Origin, WindowReference)] {
999 &self.aggregated_by
1000 }
1001
1002 fn aggregates(&self) -> &[(StreamReference, Origin, WindowReference)] {
1003 &self.aggregates
1004 }
1005
1006 fn tags(&self) -> &HashMap<String, Option<String>> {
1007 &self.tags
1008 }
1009
1010 #[cfg(feature = "spanned")]
1011 fn tags_span(&self) -> &HashMap<String, Span> {
1012 &self.tags_span
1013 }
1014}
1015
1016impl Window for SlidingWindow {
1017 fn target(&self) -> StreamReference {
1018 self.target
1019 }
1020
1021 fn caller(&self) -> StreamReference {
1022 self.caller
1023 }
1024
1025 fn op(&self) -> WindowOperation {
1026 self.op
1027 }
1028
1029 fn ty(&self) -> &Type {
1030 &self.ty
1031 }
1032
1033 fn memory_bound(&self) -> MemorizationBound {
1034 self.num_buckets
1035 }
1036}
1037
1038impl Window for DiscreteWindow {
1039 fn target(&self) -> StreamReference {
1040 self.target
1041 }
1042
1043 fn caller(&self) -> StreamReference {
1044 self.caller
1045 }
1046
1047 fn op(&self) -> WindowOperation {
1048 self.op
1049 }
1050
1051 fn ty(&self) -> &Type {
1052 &self.ty
1053 }
1054
1055 fn memory_bound(&self) -> MemorizationBound {
1056 MemorizationBound::Bounded(self.duration as u32)
1057 }
1058}
1059
1060impl Window for InstanceAggregation {
1061 fn target(&self) -> StreamReference {
1062 self.target
1063 }
1064
1065 fn caller(&self) -> StreamReference {
1066 self.caller
1067 }
1068
1069 fn op(&self) -> WindowOperation {
1070 self.aggr.into()
1071 }
1072
1073 fn ty(&self) -> &Type {
1074 &self.ty
1075 }
1076
1077 fn memory_bound(&self) -> MemorizationBound {
1078 MemorizationBound::Bounded(1)
1079 }
1080}
1081
1082impl RtLolaMir {
1083 /// Returns a collection containing a reference to each input stream in the specification.
1084 pub fn input_refs(&self) -> impl Iterator<Item = InputReference> {
1085 0..self.inputs.len()
1086 }
1087
1088 /// Returns a collection containing a reference to each output stream in the specification.
1089 pub fn output_refs(&self) -> impl Iterator<Item = OutputReference> {
1090 0..self.outputs.len()
1091 }
1092
1093 /// Provides mutable access to an input stream.
1094 ///
1095 /// # Panic
1096 /// Panics if `reference` is a [StreamReference::Out].
1097 pub fn input_mut(&mut self, reference: StreamReference) -> &mut InputStream {
1098 match reference {
1099 StreamReference::In(ix) => &mut self.inputs[ix],
1100 StreamReference::Out(_) => {
1101 unreachable!("Called `LolaIR::get_in` with a `StreamReference::OutRef`.")
1102 }
1103 }
1104 }
1105
1106 /// Provides immutable access to an input stream.
1107 ///
1108 /// # Panic
1109 /// Panics if `reference` is a [StreamReference::Out].
1110 pub fn input(&self, reference: StreamReference) -> &InputStream {
1111 match reference {
1112 StreamReference::In(ix) => &self.inputs[ix],
1113 StreamReference::Out(_) => {
1114 unreachable!("Called `LolaIR::get_in` with a `StreamReference::OutRef`.")
1115 }
1116 }
1117 }
1118
1119 /// Provides mutable access to an output stream.
1120 ///
1121 /// # Panic
1122 /// Panics if `reference` is a [StreamReference::In].
1123 pub fn output_mut(&mut self, reference: StreamReference) -> &mut OutputStream {
1124 match reference {
1125 StreamReference::In(_) => {
1126 unreachable!("Called `LolaIR::get_out` with a `StreamReference::InRef`.")
1127 }
1128 StreamReference::Out(ix) => &mut self.outputs[ix],
1129 }
1130 }
1131
1132 /// Provides immutable access to an output stream.
1133 ///
1134 /// # Panic
1135 /// Panics if `reference` is a [StreamReference::In].
1136 pub fn output(&self, reference: StreamReference) -> &OutputStream {
1137 match reference {
1138 StreamReference::In(_) => {
1139 unreachable!("Called `LolaIR::get_out` with a `StreamReference::InRef`.")
1140 }
1141 StreamReference::Out(ix) => &self.outputs[ix],
1142 }
1143 }
1144
1145 /// Provides immutable access to a stream.
1146 pub fn stream(&self, reference: StreamReference) -> &dyn Stream {
1147 match reference {
1148 StreamReference::In(ix) => &self.inputs[ix],
1149 StreamReference::Out(ix) => &self.outputs[ix],
1150 }
1151 }
1152
1153 /// Produces an iterator over all stream references.
1154 pub fn all_streams(&self) -> impl Iterator<Item = StreamReference> {
1155 self.input_refs()
1156 .map(StreamReference::In)
1157 .chain(self.output_refs().map(StreamReference::Out))
1158 }
1159
1160 /// Provides a collection of all output streams representing a trigger.
1161 pub fn all_triggers(&self) -> Vec<&OutputStream> {
1162 self.triggers
1163 .iter()
1164 .map(|t| self.output(t.output_reference))
1165 .collect()
1166 }
1167
1168 /// Provides a collection of all event-driven output streams.
1169 pub fn all_event_driven(&self) -> Vec<&OutputStream> {
1170 self.event_driven
1171 .iter()
1172 .map(|t| self.output(t.reference))
1173 .collect()
1174 }
1175
1176 /// Return true if the specification contains any time-driven features.
1177 /// This includes time-driven streams and time-driven spawn conditions.
1178 pub fn has_time_driven_features(&self) -> bool {
1179 !self.time_driven.is_empty()
1180 || self.outputs.iter().any(|o| {
1181 matches!(
1182 o.spawn.pacing,
1183 PacingType::GlobalPeriodic(_) | PacingType::LocalPeriodic(_)
1184 )
1185 })
1186 }
1187
1188 /// Provides a collection of all time-driven output streams.
1189 pub fn all_time_driven(&self) -> Vec<&OutputStream> {
1190 self.time_driven
1191 .iter()
1192 .map(|t| self.output(t.reference))
1193 .collect()
1194 }
1195
1196 /// Provides the activation contion of a event-driven stream and none if the stream is time-driven
1197 pub fn get_ac(&self, sref: StreamReference) -> Option<&ActivationCondition> {
1198 self.event_driven
1199 .iter()
1200 .find(|e| e.reference == sref)
1201 .map(|e| &e.ac)
1202 }
1203
1204 /// Provides immutable access to a discrete window.
1205 ///
1206 /// # Panic
1207 /// Panics if `window` is not a [WindowReference::Discrete].
1208 pub fn discrete_window(&self, window: WindowReference) -> &DiscreteWindow {
1209 match window {
1210 WindowReference::Discrete(x) => &self.discrete_windows[x],
1211 WindowReference::Sliding(_) | WindowReference::Instance(_) => {
1212 panic!("wrong type of window reference passed to getter")
1213 }
1214 }
1215 }
1216
1217 /// Provides immutable access to a instance aggregation.
1218 ///
1219 /// # Panic
1220 /// Panics if `window` is not a [WindowReference::Instance].
1221 pub fn instance_aggregation(&self, window: WindowReference) -> &InstanceAggregation {
1222 match window {
1223 WindowReference::Instance(x) => &self.instance_aggregations[x],
1224 WindowReference::Sliding(_) | WindowReference::Discrete(_) => {
1225 panic!("wrong type of window reference passed to getter")
1226 }
1227 }
1228 }
1229
1230 /// Provides immutable access to a sliding window.
1231 ///
1232 /// # Panic
1233 /// Panics if `window` is not a [WindowReference::Sliding].
1234 pub fn sliding_window(&self, window: WindowReference) -> &SlidingWindow {
1235 match window {
1236 WindowReference::Sliding(x) => &self.sliding_windows[x],
1237 WindowReference::Discrete(_) | WindowReference::Instance(_) => {
1238 panic!("wrong type of window reference passed to getter")
1239 }
1240 }
1241 }
1242
1243 /// Provides immutable access to a window.
1244 pub fn window(&self, window: WindowReference) -> &dyn Window {
1245 match window {
1246 WindowReference::Sliding(x) => &self.sliding_windows[x],
1247 WindowReference::Discrete(x) => &self.discrete_windows[x],
1248 WindowReference::Instance(x) => &self.instance_aggregations[x],
1249 }
1250 }
1251
1252 /// Provides a representation for the evaluation layers of all event-driven output streams. Each element of the outer `Vec` represents a layer, each element of the inner `Vec` an output stream in the layer.
1253 pub fn get_event_driven_layers(&self) -> Vec<Vec<Task>> {
1254 let mut event_driven_spawns = self
1255 .outputs
1256 .iter()
1257 .filter(|o| matches!(o.spawn.pacing, PacingType::Event(_)))
1258 .peekable();
1259
1260 // Peekable is fine because the filter above does not have side effects
1261 if self.event_driven.is_empty() && event_driven_spawns.peek().is_none() {
1262 return vec![];
1263 }
1264
1265 // Zip eval layer with stream reference.
1266 let streams_with_layers = self.event_driven.iter().map(|s| s.reference).map(|r| {
1267 (
1268 self.output(r).eval_layer().into(),
1269 Task::Evaluate(r.out_ix()),
1270 )
1271 });
1272
1273 let spawns_with_layers = event_driven_spawns
1274 .map(|o| (o.spawn_layer().inner(), Task::Spawn(o.reference.out_ix())));
1275
1276 let tasks_with_layers: Vec<(usize, Task)> =
1277 streams_with_layers.chain(spawns_with_layers).collect();
1278
1279 // Streams are annotated with an evaluation layer. The layer is not minimal, so there might be
1280 // layers without entries and more layers than streams.
1281 // Minimization works as follows:
1282 // a) Find the greatest layer
1283 // b) For each potential layer...
1284 // c) Find streams that would be in it.
1285 // d) If there is none, skip this layer
1286 // e) If there are some, add them as layer.
1287
1288 // a) Find the greatest layer. Maximum must exist because vec cannot be empty.
1289 let max_layer = tasks_with_layers
1290 .iter()
1291 .max_by_key(|(layer, _)| layer)
1292 .unwrap()
1293 .0;
1294
1295 let mut layers = Vec::new();
1296 // b) For each potential layer
1297 for i in 0..=max_layer {
1298 // c) Find streams that would be in it.
1299 let in_layer_i: Vec<Task> = tasks_with_layers
1300 .iter()
1301 .filter_map(|(l, r)| if *l == i { Some(*r) } else { None })
1302 .collect();
1303 if in_layer_i.is_empty() {
1304 // d) If there is none, skip this layer
1305 continue;
1306 } else {
1307 // e) If there are some, add them as layer.
1308 layers.push(in_layer_i);
1309 }
1310 }
1311 layers
1312 }
1313
1314 /// Attempts to compute a schedule for all time-driven streams.
1315 ///
1316 /// # Fail
1317 /// Fails if the resulting schedule would require at least 10^7 deadlines.
1318 pub fn compute_schedule(&self) -> Result<Schedule, String> {
1319 Schedule::from(self)
1320 }
1321
1322 /// Creates a new [RtLolaMirPrinter] for the Mir type `T`. It implements the [Display](std::fmt::Display) Trait for type `T`.
1323 pub fn display<'a, T>(&'a self, target: &'a T) -> RtLolaMirPrinter<'a, T> {
1324 RtLolaMirPrinter::new(self, target)
1325 }
1326
1327 /// Represents the specification as a dependency graph
1328 pub fn dependency_graph(&self) -> DependencyGraph<'_> {
1329 DependencyGraph::new(self)
1330 }
1331
1332 /// Returns the input stream with the given name if it exists.
1333 pub fn get_input_by_name(&self, name: &str) -> Option<&InputStream> {
1334 self.inputs.iter().find(|input| input.name == name)
1335 }
1336
1337 /// Returns the output stream with the given name if it exists.
1338 pub fn get_output_by_name(&self, name: &str) -> Option<&OutputStream> {
1339 self.outputs.iter().find(|output| output.name == name)
1340 }
1341
1342 /// Returns the stream with the given name if it exists.
1343 pub fn get_stream_by_name(&self, name: &str) -> Option<&dyn Stream> {
1344 self.get_input_by_name(name)
1345 .map(|input| {
1346 // clippy likes it this way
1347 let input: &dyn Stream = input;
1348 input
1349 })
1350 .or_else(|| {
1351 self.get_output_by_name(name).map(|output| {
1352 let output: &dyn Stream = output;
1353 output
1354 })
1355 })
1356 }
1357}
1358
1359impl Type {
1360 /// Indicates how many bytes a type requires to be stored in memory.
1361 ///
1362 /// Recursive types yield the sum of their sub-type sizes, unsized types panic, and functions do not have a size, so they produce `None`.
1363 /// # Panics
1364 /// Panics if the type is an instance of [Type::Option], [Type::String], or [Type::Bytes] because their size is undetermined.
1365 pub fn size(&self) -> Option<ValSize> {
1366 match self {
1367 Type::Bool => Some(ValSize(1)),
1368 Type::Int(IntTy::Int8) => Some(ValSize(1)),
1369 Type::Int(IntTy::Int16) => Some(ValSize(2)),
1370 Type::Int(IntTy::Int32) => Some(ValSize(4)),
1371 Type::Int(IntTy::Int64) => Some(ValSize(8)),
1372 Type::Int(IntTy::Int128) => Some(ValSize(16)),
1373 Type::Int(IntTy::Int256) => Some(ValSize(32)),
1374 Type::UInt(UIntTy::UInt8) => Some(ValSize(1)),
1375 Type::UInt(UIntTy::UInt16) => Some(ValSize(2)),
1376 Type::UInt(UIntTy::UInt32) => Some(ValSize(4)),
1377 Type::UInt(UIntTy::UInt64) => Some(ValSize(8)),
1378 Type::UInt(UIntTy::UInt128) => Some(ValSize(16)),
1379 Type::UInt(UIntTy::UInt256) => Some(ValSize(32)),
1380 Type::Float(FloatTy::Float32) => Some(ValSize(4)),
1381 Type::Float(FloatTy::Float64) => Some(ValSize(8)),
1382 Type::Fixed(FixedTy::Fixed64_32) | Type::UFixed(FixedTy::Fixed64_32) => {
1383 Some(ValSize(64))
1384 }
1385 Type::Fixed(FixedTy::Fixed32_16) | Type::UFixed(FixedTy::Fixed32_16) => {
1386 Some(ValSize(32))
1387 }
1388 Type::Fixed(FixedTy::Fixed16_8) | Type::UFixed(FixedTy::Fixed16_8) => Some(ValSize(16)),
1389 Type::Option(_) => unimplemented!("Size of option not determined, yet."),
1390 Type::Tuple(t) => {
1391 let size = t.iter().map(|t| Type::size(t).unwrap().0).sum();
1392 Some(ValSize(size))
1393 }
1394 Type::String | Type::Bytes => unimplemented!("Size of Strings not determined, yet."),
1395 Type::Function { .. } => None,
1396 }
1397 }
1398}
1399
1400/// The size of a specific value in bytes.
1401#[derive(Debug, Clone, Copy)]
1402pub struct ValSize(pub u32); // Needs to be reasonably large for compound types.
1403
1404impl From<u8> for ValSize {
1405 fn from(val: u8) -> ValSize {
1406 ValSize(u32::from(val))
1407 }
1408}
1409
1410impl std::ops::Add for ValSize {
1411 type Output = ValSize;
1412
1413 fn add(self, rhs: ValSize) -> ValSize {
1414 ValSize(self.0 + rhs.0)
1415 }
1416}
1417
1418/// Representation of the different stream accesses
1419#[derive(Debug, PartialEq, Eq, Clone, Copy, Serialize, Deserialize, Hash)]
1420pub enum StreamAccessKind {
1421 /// Represents the synchronous access
1422 Sync,
1423 /// Represents the access to a (discrete window)[DiscreteWindow]
1424 ///
1425 /// The argument contains the reference to the (discrete window)[DiscreteWindow] whose value is used in the [Expression].
1426 DiscreteWindow(WindowReference),
1427 /// Represents the access to a (sliding window)[SlidingWindow]
1428 ///
1429 /// The argument contains the reference to the (sliding window)[SlidingWindow] whose value is used in the [Expression].
1430 SlidingWindow(WindowReference),
1431 /// Represents the access to a (instance aggregation)[InstanceAggregation]
1432 ///
1433 /// The argument contains the reference to the (instance aggregation)[InstanceAggregation] whose value is used in the [Expression].
1434 InstanceAggregation(WindowReference),
1435 /// Representation of sample and hold accesses
1436 Hold,
1437 /// Representation of offset accesses
1438 ///
1439 /// The argument contains the [Offset] of the stream access.
1440 Offset(Offset),
1441 /// Represents the optional `get` access.
1442 Get,
1443 /// Represents the update check of a stream, if the target received a new value at this timestamp.
1444 Fresh,
1445}
1446
1447/// Offset used in the lookup expression
1448#[derive(Debug, PartialEq, Eq, Clone, Copy, Serialize, Deserialize, Hash)]
1449pub enum Offset {
1450 /// A strictly positive discrete offset, e.g., `4`, or `42`
1451 Future(u32),
1452 /// A non-negative discrete offset, e.g., `0`, `-4`, or `-42`
1453 Past(u32),
1454}
1455
1456impl PartialOrd for Offset {
1457 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
1458 Some(self.cmp(other))
1459 }
1460}
1461
1462impl Ord for Offset {
1463 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
1464 use std::cmp::Ordering;
1465
1466 use Offset::*;
1467 match (self, other) {
1468 (Past(_), Future(_)) => Ordering::Less,
1469 (Future(_), Past(_)) => Ordering::Greater,
1470 (Future(a), Future(b)) => a.cmp(b),
1471 (Past(a), Past(b)) => b.cmp(a),
1472 }
1473 }
1474}