rtlola_streamir/ir/
lowering.rs

1mod expressions;
2pub(crate) mod livetime_equivalences;
3
4use livetime_equivalences::LivetimeEquivalences;
5use std::{collections::HashMap, iter, time::Duration};
6use thiserror::Error;
7use uom::num_traits::Inv;
8use uom::si::{
9    rational64::{Frequency as UOM_Frequency, Time},
10    time::nanosecond,
11};
12
13use expressions::convert_stream_expression;
14use itertools::Itertools;
15use rtlola_frontend::mir::{self, Expression, ExpressionKind, PacingType, Stream};
16
17use super::schedule::StaticSchedule;
18use super::windows::{InstanceSelection, Window, WindowKind, WindowOperation};
19use super::{
20    expressions::Expr,
21    memory::{Memory, Parameter, StreamBuffer, StreamMemory},
22    Guard, IfStmt, Stmt, StreamIr, StreamReference, Type, WindowReference,
23};
24use super::{LocalFreq, LocalFreqRef, Offset, Origin};
25use super::{OutputReference, StreamAccessKind};
26
27impl TryFrom<rtlola_frontend::RtLolaMir> for StreamIr {
28    type Error = LoweringError;
29
30    fn try_from(value: rtlola_frontend::RtLolaMir) -> Result<Self, Self::Error> {
31        let schedule = value
32            .compute_schedule()
33            .map_err(LoweringError::ComputeSchedule)?;
34        let rtlola_frontend::RtLolaMir {
35            inputs,
36            outputs,
37            time_driven: _,
38            event_driven: _,
39            discrete_windows,
40            sliding_windows,
41            instance_aggregations,
42            triggers,
43            global_tags: _,
44        } = value;
45
46        let mut cur_unparameterized = 0;
47        let mut cur_parameterized = 0;
48        let sr2sr: HashMap<_, _> = inputs
49            .iter()
50            .map(|i| (i.reference, StreamReference::In(i.reference.in_ix())))
51            .chain(outputs.iter().map(|o| {
52                (
53                    o.reference,
54                    StreamReference::Out(if o.is_parameterized() {
55                        let i = cur_parameterized;
56                        cur_parameterized += 1;
57                        OutputReference::Parameterized(i)
58                    } else {
59                        let i = cur_unparameterized;
60                        cur_unparameterized += 1;
61                        OutputReference::Unparameterized(i)
62                    }),
63                )
64            }))
65            .collect();
66        let (accesses, accessed_by) = inputs
67            .iter()
68            .map(|i| (i.as_stream_ref(), (vec![], &i.accessed_by)))
69            .chain(
70                outputs
71                    .iter()
72                    .map(|o| (o.as_stream_ref(), (o.accesses.clone(), &o.accessed_by))),
73            )
74            .map(|(sr, (accesses, accessed_by))| {
75                (
76                    (
77                        sr2sr[&sr],
78                        accesses
79                            .iter()
80                            .map(|(sr, a)| {
81                                (
82                                    sr2sr[sr],
83                                    a.iter()
84                                        .map(|(o, a)| {
85                                            (Origin::from(*o), StreamAccessKind::from(*a))
86                                        })
87                                        .collect(),
88                                )
89                            })
90                            .collect::<Vec<(StreamReference, Vec<(Origin, StreamAccessKind)>)>>(),
91                    ),
92                    (
93                        sr2sr[&sr],
94                        accessed_by
95                            .iter()
96                            .map(|(sr, a)| {
97                                (
98                                    sr2sr[sr],
99                                    a.iter().map(|(o, a)| ((*o).into(), (*a).into())).collect(),
100                                )
101                            })
102                            .collect::<Vec<(StreamReference, _)>>(),
103                    ),
104                )
105            })
106            .collect();
107        let mut lref2lfreq: HashMap<LocalFreqRef, LocalFreq> = HashMap::new();
108
109        let livetime_equivalences = LivetimeEquivalences::new(&outputs, &sr2sr);
110        let static_schedule = StaticSchedule::new(schedule, &sr2sr);
111
112        let (sr2memory_inputs, input_stmts): (HashMap<_, _>, Vec<_>) = inputs
113            .into_iter()
114            .map(|i| {
115                let sr = sr2sr[&i.reference];
116                let (mem, stmts) = StreamIr::lower_input(i, &sr2sr);
117                ((sr, mem), stmts)
118            })
119            .unzip();
120        let layer_0 = Stmt::parallel(input_stmts);
121        let (sr2memory_outputs, stmts): (HashMap<StreamReference, Memory>, Vec<_>) = outputs
122            .into_iter()
123            .map(|o| {
124                let sr = sr2sr[&o.reference];
125                let (mem, stmts) = StreamIr::lower_output(o, &sr2sr, &mut lref2lfreq)?;
126                Ok(((sr, mem), stmts))
127            })
128            .collect::<Result<Vec<_>, _>>()?
129            .into_iter()
130            .unzip();
131
132        let stmts = stmts
133            .into_iter()
134            .flatten()
135            .sorted_by_key(|(layer, _stmt)| *layer);
136        let layers = Stmt::seq(
137            iter::once(layer_0).chain(stmts.chunk_by(|(layer, _stmt)| *layer).into_iter().map(
138                |(_layer, stmts)| Stmt::parallel(stmts.into_iter().map(|(_layer, stmt)| stmt)),
139            )),
140        );
141
142        let sr2memory = sr2memory_inputs
143            .into_iter()
144            .chain(sr2memory_outputs)
145            .collect();
146
147        let wref2window = sliding_windows
148            .into_iter()
149            .map(|swin| {
150                Ok((
151                    swin.reference.into(),
152                    StreamIr::lower_sliding_window(swin, &sr2sr, &mut lref2lfreq)?,
153                ))
154            })
155            .collect::<Vec<_>>()
156            .into_iter()
157            .chain(
158                discrete_windows
159                    .into_iter()
160                    .map(|dwin| {
161                        Ok((
162                            dwin.reference.into(),
163                            StreamIr::lower_discrete_window(dwin, &sr2sr, &mut lref2lfreq)?,
164                        ))
165                    })
166                    .collect::<Vec<_>>(),
167            )
168            .chain(
169                instance_aggregations
170                    .into_iter()
171                    .map(|iwin| {
172                        Ok((
173                            iwin.reference.into(),
174                            StreamIr::lower_instance_aggregation(iwin, &sr2sr, &mut lref2lfreq)?,
175                        ))
176                    })
177                    .collect::<Vec<_>>(),
178            )
179            .collect::<Result<_, _>>()?;
180
181        let triggers = triggers
182            .into_iter()
183            .map(|t| (sr2sr[&t.output_reference].out_idx(), t.trigger_reference))
184            .collect();
185
186        Ok(StreamIr {
187            stmt: layers,
188            sr2memory,
189            wref2window,
190            lref2lfreq,
191            livetime_equivalences,
192            static_schedule,
193            triggers,
194            accesses,
195            accessed_by,
196        })
197    }
198}
199
200impl From<mir::MemorizationBound> for StreamBuffer {
201    fn from(value: mir::MemorizationBound) -> Self {
202        match value {
203            mir::MemorizationBound::Unbounded => StreamBuffer::UnBounded,
204            mir::MemorizationBound::Bounded(b) => StreamBuffer::Bounded(b as usize),
205        }
206    }
207}
208
209#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
210pub enum MyLayer {
211    Layer(mir::Layer),
212    Close,
213}
214
215impl From<mir::Layer> for MyLayer {
216    fn from(value: mir::Layer) -> Self {
217        MyLayer::Layer(value)
218    }
219}
220
221impl From<mir::Parameter> for Parameter {
222    fn from(value: mir::Parameter) -> Self {
223        let mir::Parameter { name, ty, idx: _ } = value;
224        Parameter {
225            name,
226            ty: ty.into(),
227        }
228    }
229}
230
231impl StreamIr {
232    fn lower_input(
233        input: mir::InputStream,
234        sr2sr: &HashMap<mir::StreamReference, StreamReference>,
235    ) -> (Memory, Stmt) {
236        let sr = sr2sr[&input.reference];
237        let memory = Memory {
238            buffer: StreamMemory::Static(input.memory_bound.into()),
239            ty: input.ty.into(),
240            name: input.name,
241        };
242        let stmt = Stmt::seq([Stmt::Shift(sr), Stmt::Input(sr.in_idx())]).filter(Guard::Stream(sr));
243        (memory, stmt)
244    }
245
246    fn lower_output(
247        output: mir::OutputStream,
248        sr2sr: &HashMap<mir::StreamReference, StreamReference>,
249        lref2lfreq: &mut HashMap<LocalFreqRef, LocalFreq>,
250    ) -> Result<(Memory, [(MyLayer, Stmt); 4]), LoweringError> {
251        let has_spawn = output.is_spawned();
252        let has_close = output.is_closed();
253        let is_dynamic = has_spawn || has_close;
254
255        let mir::OutputStream {
256            name,
257            kind: _,
258            ty,
259            spawn,
260            eval,
261            close,
262            accesses: _,
263            accessed_by: _,
264            aggregated_by: _,
265            aggregates,
266            memory_bound,
267            layer,
268            reference,
269            params,
270            tags: _,
271        } = output;
272        let sr = sr2sr[&reference];
273        let parameter = params.into_iter().map(|p| p.into()).collect::<Vec<_>>();
274
275        let spawned_and_closed_windows = aggregates
276            .iter()
277            .filter_map(|(_target, origin, wref)| match origin {
278                mir::Origin::Spawn => None,
279                mir::Origin::Filter(_) | mir::Origin::Eval(_) | mir::Origin::Close => {
280                    Some((*wref).into())
281                }
282            })
283            .collect::<Vec<_>>();
284        let (shift, eval, mut eval_freq) = StreamIr::lower_eval(sr, eval, sr2sr, lref2lfreq)?;
285        let shift = shift.iterate(sr, &parameter, is_dynamic);
286        let eval = eval.iterate(sr, &parameter, is_dynamic);
287        let (close, close_freq) = StreamIr::lower_close(
288            sr,
289            close,
290            sr2sr,
291            lref2lfreq,
292            eval_freq.clone(),
293            spawned_and_closed_windows.clone(),
294        )?;
295        let close = if has_close {
296            close.iterate(sr, &parameter, is_dynamic)
297        } else {
298            Stmt::Skip
299        };
300        eval_freq.extend(close_freq);
301        let spawn = if has_spawn {
302            StreamIr::lower_spawn(
303                sr,
304                spawn,
305                sr2sr,
306                lref2lfreq,
307                eval_freq,
308                spawned_and_closed_windows,
309            )?
310        } else {
311            Stmt::Skip
312        };
313
314        let buffer = match (parameter.is_empty(), is_dynamic) {
315            (true, false) => StreamMemory::Static(memory_bound.into()),
316            (true, true) => StreamMemory::Dynamic {
317                buffer: memory_bound.into(),
318                has_spawn,
319                has_close,
320            },
321            (false, true) => StreamMemory::Instances {
322                buffer: memory_bound.into(),
323                parameter,
324            },
325            (false, false) => unreachable!("parameterized always has spawn"),
326        };
327
328        let mem = Memory {
329            buffer,
330            ty: ty.into(),
331            name,
332        };
333        Ok((
334            mem,
335            [
336                (layer.spawn_layer().into(), spawn),
337                (layer.shift_layer().into(), shift),
338                (layer.evaluation_layer().into(), eval),
339                (MyLayer::Close, close),
340            ],
341        ))
342    }
343
344    fn lower_spawn(
345        sr: StreamReference,
346        spawn: mir::Spawn,
347        sr2sr: &HashMap<mir::StreamReference, StreamReference>,
348        lref2lfreq: &mut HashMap<LocalFreqRef, LocalFreq>,
349        local_freqs: Vec<LocalFreqRef>,
350        windows: Vec<WindowReference>,
351    ) -> Result<Stmt, LoweringError> {
352        let mir::Spawn {
353            expression,
354            pacing,
355            condition,
356        } = spawn;
357        let (guard, lref) = Self::lower_guard(pacing, condition, sr, sr2sr, lref2lfreq)?;
358        if lref.is_some() {
359            Err(LoweringError::LocalFreq)
360        } else {
361            Ok(Stmt::Spawn {
362                sr: sr.out_idx(),
363                with: expression
364                    .map(|Expression { ty, kind }| match kind {
365                        ExpressionKind::Tuple(inner) => inner
366                            .into_iter()
367                            .map(|expr| convert_stream_expression(expr, None, sr2sr))
368                            .collect(),
369                        other => Ok(vec![convert_stream_expression(
370                            Expression { ty, kind: other },
371                            None,
372                            sr2sr,
373                        )?]),
374                    })
375                    .transpose()?,
376                local_frequencies: local_freqs,
377                windows,
378            }
379            .filter(guard))
380        }
381    }
382
383    fn lower_eval(
384        sr: StreamReference,
385        eval: mir::Eval,
386        sr2sr: &HashMap<mir::StreamReference, StreamReference>,
387        lref2lfreq: &mut HashMap<LocalFreqRef, LocalFreq>,
388    ) -> Result<(Stmt, Stmt, Vec<LocalFreqRef>), LoweringError> {
389        let mir::Eval {
390            clauses,
391            eval_pacing: _,
392        } = eval;
393        let shift = |_expr: Expression, _idx: usize| Ok(Stmt::Shift(sr));
394        let eval = |expr: Expression, idx: usize| {
395            Ok::<Stmt, LoweringError>(Stmt::Eval {
396                sr: sr.out_idx(),
397                with: convert_stream_expression(expr, None, sr2sr)?,
398                idx,
399            })
400        };
401        let mut local_freqs = Vec::new();
402        let clauses = clauses.into_iter().enumerate().collect::<Vec<_>>();
403        let mut construct = |f: Box<dyn Fn(Expression, usize) -> Result<Stmt, LoweringError>>| {
404            let mut clauses = clauses.clone();
405            let last = clauses
406                .pop()
407                .map(
408                    |(
409                        idx,
410                        mir::EvalClause {
411                            condition,
412                            expression,
413                            pacing,
414                        },
415                    )| {
416                        let (guard, lref) =
417                            Self::lower_guard(pacing, condition, sr, sr2sr, lref2lfreq)?;
418                        local_freqs.extend(lref);
419                        Ok(f(expression, idx)?.filter(guard))
420                    },
421                )
422                .unwrap();
423            clauses.into_iter().rfold(
424                last,
425                |alt,
426                 (
427                    idx,
428                    mir::EvalClause {
429                        condition,
430                        expression,
431                        pacing,
432                    },
433                )| {
434                    let (guard, lref) =
435                        Self::lower_guard(pacing, condition, sr, sr2sr, lref2lfreq)?;
436                    local_freqs.extend(lref);
437                    Ok(f(expression, idx)?.filter_else(guard, alt?))
438                },
439            )
440        };
441        Ok((
442            construct(Box::new(shift))?,
443            construct(Box::new(eval))?,
444            local_freqs,
445        ))
446    }
447
448    fn lower_close(
449        sr: StreamReference,
450        close: mir::Close,
451        sr2sr: &HashMap<mir::StreamReference, StreamReference>,
452        lref2lfreq: &mut HashMap<LocalFreqRef, LocalFreq>,
453        mut eval_local_freqs: Vec<LocalFreqRef>,
454        windows: Vec<WindowReference>,
455    ) -> Result<(Stmt, Option<LocalFreqRef>), LoweringError> {
456        let mir::Close {
457            condition, pacing, ..
458        } = close;
459        let (condition, lfreq) = condition
460            .map(|g| Self::lower_guard(pacing, Some(g), sr, sr2sr, lref2lfreq))
461            .unwrap_or_else(|| Ok((Guard::Constant(false), None)))?;
462        eval_local_freqs.extend(lfreq);
463        Ok((
464            Stmt::Close {
465                sr: sr.out_idx(),
466                local_frequencies: eval_local_freqs,
467                windows,
468            }
469            .filter(condition),
470            lfreq,
471        ))
472    }
473
474    fn lower_guard(
475        pacing: PacingType,
476        condition: Option<Expression>,
477        source: StreamReference,
478        sr2sr: &HashMap<mir::StreamReference, StreamReference>,
479        lref1lfreq: &mut HashMap<LocalFreqRef, LocalFreq>,
480    ) -> Result<(Guard, Option<LocalFreqRef>), LoweringError> {
481        let (pacing, lref) = Guard::from_pt(pacing, source, sr2sr, lref1lfreq);
482        if let Some(condition) = condition {
483            Ok((
484                pacing.and(convert_stream_expression(condition, None, sr2sr)?.into()),
485                lref,
486            ))
487        } else {
488            Ok((pacing, lref))
489        }
490    }
491
492    fn lower_sliding_window(
493        sliding_window: mir::SlidingWindow,
494        sr2sr: &HashMap<mir::StreamReference, StreamReference>,
495        lref2lfreq: &mut HashMap<LocalFreqRef, LocalFreq>,
496    ) -> Result<Window, LoweringError> {
497        let mir::SlidingWindow {
498            target,
499            caller,
500            duration,
501            num_buckets,
502            bucket_size,
503            wait,
504            op,
505            reference,
506            ty,
507            origin,
508            pacing,
509        } = sliding_window;
510        Ok(Window {
511            wref: reference.into(),
512            op: op.into(),
513            target: sr2sr[&target],
514            caller: sr2sr[&caller],
515            ty: ty.into(),
516            kind: WindowKind::Sliding {
517                duration,
518                bucket_count: num_buckets.unwrap() as usize,
519                wait,
520                bucket_duration: bucket_size,
521            },
522            origin_pacing: Guard::from_pt(pacing, sr2sr[&caller], sr2sr, lref2lfreq).0,
523            origin: origin.into(),
524        })
525    }
526
527    fn lower_discrete_window(
528        discrete_window: mir::DiscreteWindow,
529        sr2sr: &HashMap<mir::StreamReference, StreamReference>,
530        lref2lfreq: &mut HashMap<LocalFreqRef, LocalFreq>,
531    ) -> Result<Window, LoweringError> {
532        let mir::DiscreteWindow {
533            target,
534            caller,
535            duration,
536            wait,
537            op,
538            reference,
539            ty,
540            origin,
541            pacing,
542        } = discrete_window;
543        Ok(Window {
544            wref: reference.into(),
545            op: op.into(),
546            target: sr2sr[&target],
547            caller: sr2sr[&caller],
548            ty: ty.into(),
549            kind: WindowKind::Discrete {
550                num_values: duration,
551                wait,
552            },
553            origin_pacing: Guard::from_pt(pacing, sr2sr[&caller], sr2sr, lref2lfreq).0,
554            origin: origin.into(),
555        })
556    }
557
558    fn lower_instance_aggregation(
559        instance_aggregation: mir::InstanceAggregation,
560        sr2sr: &HashMap<mir::StreamReference, StreamReference>,
561        lref2lfreq: &mut HashMap<LocalFreqRef, LocalFreq>,
562    ) -> Result<Window, LoweringError> {
563        let mir::InstanceAggregation {
564            target,
565            caller,
566            selection,
567            aggr,
568            reference,
569            ty,
570            origin,
571            pacing,
572        } = instance_aggregation;
573        Ok(Window {
574            wref: reference.into(),
575            op: aggr.into(),
576            target: sr2sr[&target],
577            caller: sr2sr[&caller],
578            ty: ty.into(),
579            kind: WindowKind::Instances {
580                selection: InstanceSelection::from_mir_selection(selection, sr2sr)?,
581            },
582            origin_pacing: Guard::from_pt(pacing, sr2sr[&caller], sr2sr, lref2lfreq).0,
583            origin: origin.into(),
584        })
585    }
586}
587
588impl Stmt {
589    fn filter(self, guard: Guard) -> Self {
590        Stmt::If(IfStmt {
591            guard,
592            cons: Box::new(self),
593            alt: Box::new(Stmt::Skip),
594        })
595    }
596
597    fn filter_else(self, guard: Guard, alt: Stmt) -> Self {
598        Stmt::If(IfStmt {
599            guard,
600            cons: Box::new(self),
601            alt: Box::new(alt),
602        })
603    }
604
605    pub(crate) fn seq(iter: impl IntoIterator<Item = Stmt>) -> Self {
606        let stmts = iter.into_iter().collect::<Vec<_>>();
607        match stmts.len() {
608            0 => Stmt::Skip,
609            1 => stmts.into_iter().next().unwrap(),
610            2.. => Stmt::Seq(stmts),
611        }
612    }
613
614    pub(crate) fn parallel(iter: impl IntoIterator<Item = Stmt>) -> Self {
615        let stmts = iter.into_iter().collect::<Vec<_>>();
616        match stmts.len() {
617            0 => Stmt::Skip,
618            1 => stmts.into_iter().next().unwrap(),
619            2.. => Stmt::Parallel(stmts),
620        }
621    }
622
623    fn iterate(self, sr: StreamReference, parameter: &[Parameter], is_dynamic: bool) -> Self {
624        match (is_dynamic, parameter.is_empty()) {
625            (false, true) => self,
626            (true, true) => self.filter(Guard::Alive(sr)),
627            (true, false) => Stmt::Iterate {
628                sr: vec![sr.out_idx()],
629                stmt: Box::new(self),
630            },
631            (false, false) => unreachable!(),
632        }
633    }
634}
635
636impl Guard {
637    fn and(self, rhs: Self) -> Self {
638        Guard::And {
639            lhs: Box::new(self),
640            rhs: Box::new(rhs),
641        }
642    }
643}
644
645impl From<Expr> for Guard {
646    fn from(value: Expr) -> Self {
647        Guard::Dynamic(value)
648    }
649}
650
651impl Guard {
652    fn from_pt(
653        pt: PacingType,
654        sr: StreamReference,
655        sr2sr: &HashMap<mir::StreamReference, StreamReference>,
656        lref2lfreq: &mut HashMap<LocalFreqRef, LocalFreq>,
657    ) -> (Self, Option<LocalFreqRef>) {
658        match pt {
659            PacingType::GlobalPeriodic(freq) => {
660                (Guard::GlobalFreq(frequency_to_duration(freq)), None)
661            }
662            PacingType::LocalPeriodic(freq) => {
663                let new_ref = lref2lfreq.len();
664                let freq = LocalFreq {
665                    dur: frequency_to_duration(freq),
666                    sr: sr.out_idx(),
667                    reference: new_ref,
668                };
669                if let Some(r) = lref2lfreq
670                    .iter()
671                    .find_map(|(k, v)| (*v == freq).then_some(*k))
672                {
673                    (Guard::LocalFreq(r), Some(r))
674                } else {
675                    lref2lfreq.insert(new_ref, freq);
676                    (Guard::LocalFreq(new_ref), Some(new_ref))
677                }
678            }
679            PacingType::Event(activation_condition) => {
680                (Guard::from_ac(activation_condition, sr2sr), None)
681            }
682            PacingType::Constant => (Guard::Constant(true), None),
683        }
684    }
685}
686
687fn frequency_to_duration(frequency: UOM_Frequency) -> Duration {
688    let period =
689        Time::new::<uom::si::time::second>(frequency.get::<uom::si::frequency::hertz>().inv());
690    Duration::from_nanos(
691        period
692            .get::<nanosecond>()
693            .to_integer()
694            .try_into()
695            .expect("Period [ns] too large for u64!"),
696    )
697}
698
699impl Guard {
700    fn from_ac(
701        value: mir::ActivationCondition,
702        sr2sr: &HashMap<mir::StreamReference, StreamReference>,
703    ) -> Self {
704        match value {
705            mir::ActivationCondition::Conjunction(activation_conditions) => activation_conditions
706                .into_iter()
707                .map(|g| Guard::from_ac(g, sr2sr))
708                .reduce(|lhs, rhs| Guard::And {
709                    lhs: Box::new(lhs),
710                    rhs: Box::new(rhs),
711                })
712                .unwrap(),
713            mir::ActivationCondition::Disjunction(activation_conditions) => activation_conditions
714                .into_iter()
715                .map(|g| Guard::from_ac(g, sr2sr))
716                .reduce(|lhs, rhs| Guard::Or {
717                    lhs: Box::new(lhs),
718                    rhs: Box::new(rhs),
719                })
720                .unwrap(),
721            mir::ActivationCondition::Stream(stream_reference) => {
722                Guard::Stream(sr2sr[&stream_reference])
723            }
724            mir::ActivationCondition::True => Guard::Constant(true),
725        }
726    }
727}
728
729impl From<mir::WindowReference> for WindowReference {
730    fn from(value: mir::WindowReference) -> Self {
731        match value {
732            mir::WindowReference::Sliding(i) => WindowReference::Sliding(i),
733            mir::WindowReference::Discrete(i) => WindowReference::Discrete(i),
734            mir::WindowReference::Instance(i) => WindowReference::Instance(i),
735        }
736    }
737}
738
739impl From<mir::Type> for Type {
740    fn from(value: mir::Type) -> Self {
741        match value {
742            mir::Type::Bool => Type::Bool,
743            mir::Type::Int(mir::IntTy::Int8) => Type::Int(8),
744            mir::Type::Int(mir::IntTy::Int16) => Type::Int(16),
745            mir::Type::Int(mir::IntTy::Int32) => Type::Int(32),
746            mir::Type::Int(mir::IntTy::Int64) => Type::Int(64),
747            mir::Type::Int(mir::IntTy::Int128) => Type::Int(128),
748            mir::Type::Int(mir::IntTy::Int256) => Type::Int(256),
749            mir::Type::UInt(mir::UIntTy::UInt8) => Type::UInt(8),
750            mir::Type::UInt(mir::UIntTy::UInt16) => Type::UInt(16),
751            mir::Type::UInt(mir::UIntTy::UInt32) => Type::UInt(32),
752            mir::Type::UInt(mir::UIntTy::UInt64) => Type::UInt(64),
753            mir::Type::UInt(mir::UIntTy::UInt128) => Type::UInt(128),
754            mir::Type::UInt(mir::UIntTy::UInt256) => Type::UInt(256),
755            mir::Type::Float(mir::FloatTy::Float32) => Type::Float32,
756            mir::Type::Float(mir::FloatTy::Float64) => Type::Float64,
757            mir::Type::String => Type::String,
758            mir::Type::Option(inner_ty) => Type::Option(Box::new(Type::from(*inner_ty))),
759            mir::Type::Tuple(inner_tys) => {
760                Type::Tuple(inner_tys.into_iter().map(Type::from).collect())
761            }
762            mir::Type::Fixed(mir::FixedTy::Fixed16_8) => Type::Fixed(16),
763            mir::Type::Fixed(mir::FixedTy::Fixed32_16) => Type::Fixed(32),
764            mir::Type::Fixed(mir::FixedTy::Fixed64_32) => Type::Fixed(64),
765            mir::Type::UFixed(mir::FixedTy::Fixed16_8) => Type::UFixed(16),
766            mir::Type::UFixed(mir::FixedTy::Fixed32_16) => Type::UFixed(32),
767            mir::Type::UFixed(mir::FixedTy::Fixed64_32) => Type::UFixed(64),
768            mir::Type::Bytes => Type::Bytes,
769            mir::Type::Function { .. } => unimplemented!(),
770        }
771    }
772}
773
774impl From<mir::WindowOperation> for WindowOperation {
775    fn from(value: mir::WindowOperation) -> Self {
776        match value {
777            mir::WindowOperation::Count => WindowOperation::Count,
778            mir::WindowOperation::Min => WindowOperation::Min,
779            mir::WindowOperation::Max => WindowOperation::Max,
780            mir::WindowOperation::Sum => WindowOperation::Sum,
781            mir::WindowOperation::Product => WindowOperation::Product,
782            mir::WindowOperation::Average => WindowOperation::Average,
783            mir::WindowOperation::Integral => WindowOperation::Integral,
784            mir::WindowOperation::Conjunction => WindowOperation::Conjunction,
785            mir::WindowOperation::Disjunction => WindowOperation::Disjunction,
786            mir::WindowOperation::Last => WindowOperation::Last,
787            mir::WindowOperation::Variance => WindowOperation::Variance,
788            mir::WindowOperation::Covariance => WindowOperation::Covariance,
789            mir::WindowOperation::StandardDeviation => WindowOperation::StandardDeviation,
790            mir::WindowOperation::NthPercentile(n) => WindowOperation::NthPercentile(n),
791        }
792    }
793}
794
795impl From<mir::InstanceOperation> for WindowOperation {
796    fn from(value: mir::InstanceOperation) -> Self {
797        match value {
798            mir::InstanceOperation::Count => WindowOperation::Count,
799            mir::InstanceOperation::Min => WindowOperation::Min,
800            mir::InstanceOperation::Max => WindowOperation::Max,
801            mir::InstanceOperation::Sum => WindowOperation::Sum,
802            mir::InstanceOperation::Product => WindowOperation::Product,
803            mir::InstanceOperation::Average => WindowOperation::Average,
804            mir::InstanceOperation::Conjunction => WindowOperation::Conjunction,
805            mir::InstanceOperation::Disjunction => WindowOperation::Disjunction,
806            mir::InstanceOperation::Variance => WindowOperation::Variance,
807            mir::InstanceOperation::Covariance => WindowOperation::Covariance,
808            mir::InstanceOperation::StandardDeviation => WindowOperation::StandardDeviation,
809            mir::InstanceOperation::NthPercentile(n) => WindowOperation::NthPercentile(n),
810        }
811    }
812}
813
814impl From<mir::StreamAccessKind> for StreamAccessKind {
815    fn from(value: mir::StreamAccessKind) -> Self {
816        match value {
817            mir::StreamAccessKind::Sync => Self::Sync,
818            mir::StreamAccessKind::DiscreteWindow(window_reference) => {
819                Self::DiscreteWindow(window_reference.into())
820            }
821            mir::StreamAccessKind::SlidingWindow(window_reference) => {
822                Self::SlidingWindow(window_reference.into())
823            }
824            mir::StreamAccessKind::InstanceAggregation(window_reference) => {
825                Self::InstanceAggregation(window_reference.into())
826            }
827            mir::StreamAccessKind::Hold => Self::Hold,
828            mir::StreamAccessKind::Offset(offset) => Self::Offset(offset.into()),
829            mir::StreamAccessKind::Get => Self::Get,
830            mir::StreamAccessKind::Fresh => Self::Fresh,
831        }
832    }
833}
834
835impl From<mir::Offset> for Offset {
836    fn from(value: mir::Offset) -> Self {
837        match value {
838            mir::Offset::Future(o) => Self::Future(o),
839            mir::Offset::Past(o) => Self::Past(o),
840        }
841    }
842}
843
844impl InstanceSelection {
845    fn from_mir_selection(
846        value: mir::InstanceSelection,
847        sr2sr: &HashMap<mir::StreamReference, StreamReference>,
848    ) -> Result<Self, LoweringError> {
849        match value {
850            mir::InstanceSelection::Fresh => Ok(InstanceSelection::Fresh),
851            mir::InstanceSelection::All => Ok(InstanceSelection::All),
852            mir::InstanceSelection::FilteredFresh { parameters, cond } => {
853                Ok(InstanceSelection::FilteredFresh {
854                    parameters: parameters.into_iter().map(|p| p.into()).collect(),
855                    cond: convert_stream_expression(*cond, None, sr2sr)?,
856                })
857            }
858            mir::InstanceSelection::FilteredAll { parameters, cond } => {
859                Ok(InstanceSelection::FilteredAll {
860                    parameters: parameters.into_iter().map(|p| p.into()).collect(),
861                    cond: convert_stream_expression(*cond, None, sr2sr)?,
862                })
863            }
864        }
865    }
866}
867
868#[derive(Debug, Clone, Error)]
869/// An error that can happen during the lowering of the RtLolaMIR into the StreamIR
870pub enum LoweringError {
871    #[error("specification contains optional expression that is not immediately followed by a default value")]
872    /// The specification contains an optional expression that is not immediately followed by a default value
873    DefaultRequired,
874    #[error("specification contains a future access")]
875    /// The specification contains a future access
876    FutureAccess,
877    #[error("specification contains the unsupported function {0}")]
878    /// The specification contains an unsupported function
879    UnsupportedFunction(String),
880    #[error("Local frequency in an invalid position")]
881    /// The specification contains a local frequency in an invalid position
882    LocalFreq,
883    #[error("Error computing static schedule: {0}")]
884    /// An error happened when computing the static schedule from the RtLolaMir
885    ComputeSchedule(String),
886}