rtlola_frontend/mir/
dependency_graph.rs

1use std::borrow::Cow;
2use std::collections::{HashMap, HashSet};
3use std::fmt::Display;
4use std::io::BufWriter;
5
6use dot::{LabelText, Style};
7use itertools::Itertools;
8use serde::{Serialize, Serializer};
9use serde_json::{json, to_string_pretty};
10
11use super::{
12    ActivationCondition, Mir, Origin, PacingType, StreamAccessKind, StreamReference,
13    TriggerReference, WindowReference,
14};
15
16/// Represents the dependency graph of the specification
17#[derive(Debug, Clone)]
18pub struct DependencyGraph<'a> {
19    nodes: Vec<Node>,
20    edges: Vec<Edge>,
21    infos: HashMap<Node, NodeInformation<'a>>,
22}
23
24impl<'a> DependencyGraph<'a> {
25    pub(super) fn new(mir: &'a Mir) -> Self {
26        let stream_nodes = mir
27            .inputs
28            .iter()
29            .map(|i| i.reference)
30            .chain(
31                mir.outputs
32                    .iter()
33                    .filter(|o| !o.is_trigger())
34                    .map(|o| o.reference),
35            )
36            .map(Node::Stream);
37
38        let window_nodes = mir
39            .sliding_windows
40            .iter()
41            .map(|w| Node::Window(w.reference));
42
43        let trigger_nodes = mir
44            .triggers
45            .iter()
46            .map(|trigger| Node::Trigger(trigger.trigger_reference));
47
48        let nodes: Vec<_> = stream_nodes
49            .chain(window_nodes)
50            .chain(trigger_nodes)
51            .collect();
52
53        let edges = edges(mir);
54
55        let infos = nodes
56            .iter()
57            .map(|node| (*node, node_infos(mir, *node)))
58            .collect();
59
60        Self {
61            nodes,
62            edges,
63            infos,
64        }
65    }
66
67    /// Returns the dependency graph in the graphviz dot-format
68    pub fn dot(&self) -> String {
69        let res = Vec::new();
70        let mut res_writer = BufWriter::new(res);
71        dot::render(self, &mut res_writer).unwrap();
72        String::from_utf8(res_writer.into_inner().unwrap()).unwrap()
73    }
74
75    /// Returns the dependency graph in a json-format
76    pub fn json(&self) -> String {
77        let infos = self
78            .infos
79            .iter()
80            .map(|(key, value)| (key.to_string(), value))
81            .collect::<HashMap<_, _>>();
82
83        let json_value = json!({
84            "edges": self.edges,
85            "nodes": infos
86        });
87
88        to_string_pretty(&json_value).unwrap()
89    }
90}
91
92#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
93enum Node {
94    Stream(StreamReference),
95    Trigger(TriggerReference),
96    Window(WindowReference),
97}
98
99impl From<StreamReference> for Node {
100    fn from(s: StreamReference) -> Self {
101        Node::Stream(s)
102    }
103}
104
105impl From<WindowReference> for Node {
106    fn from(w: WindowReference) -> Self {
107        Node::Window(w)
108    }
109}
110
111impl Display for Node {
112    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
113        match self {
114            Node::Stream(StreamReference::In(i)) => write!(f, "In_{i}"),
115            Node::Stream(StreamReference::Out(i)) => write!(f, "Out_{i}"),
116            Node::Window(WindowReference::Sliding(i)) => write!(f, "SW_{i}"),
117            Node::Window(WindowReference::Discrete(i)) => write!(f, "DW_{i}"),
118            Node::Window(WindowReference::Instance(i)) => write!(f, "IA_{i}"),
119            Node::Trigger(i) => write!(f, "T_{i}"),
120        }
121    }
122}
123
124impl Serialize for Node {
125    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
126    where
127        S: Serializer,
128    {
129        serializer.serialize_str(self.to_string().as_str())
130    }
131}
132
133#[derive(Clone, Debug, Serialize, PartialEq, Eq)]
134struct Edge {
135    from: Node,
136    with: EdgeType,
137    to: Node,
138}
139
140#[derive(Debug, Clone, Serialize, PartialEq, Eq)]
141#[serde(tag = "type")]
142enum EdgeType {
143    Access {
144        kind: StreamAccessKind,
145        origin: Origin,
146    },
147    Spawn,
148    Eval,
149}
150
151impl Display for EdgeType {
152    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
153        let s = match self {
154            EdgeType::Access {
155                kind: StreamAccessKind::Sync,
156                ..
157            } => "Sync".into(),
158            EdgeType::Access {
159                kind: StreamAccessKind::Hold,
160                ..
161            } => "Hold".into(),
162            EdgeType::Access {
163                kind: StreamAccessKind::Offset(o),
164                ..
165            } => format!("Offset({o})"),
166            EdgeType::Spawn => "Spawn".into(),
167            EdgeType::Eval => "Eval".into(),
168            EdgeType::Access {
169                kind: StreamAccessKind::InstanceAggregation(_),
170                ..
171            } => "Instances".into(),
172            // no label on window access edges
173            EdgeType::Access {
174                kind: StreamAccessKind::DiscreteWindow(_),
175                ..
176            }
177            | EdgeType::Access {
178                kind: StreamAccessKind::SlidingWindow(_),
179                ..
180            } => "".into(),
181            EdgeType::Access {
182                kind: StreamAccessKind::Get,
183                ..
184            } => "Get".into(),
185            EdgeType::Access {
186                kind: StreamAccessKind::Fresh,
187                ..
188            } => "Fresh".into(),
189        };
190
191        write!(f, "{s}")
192    }
193}
194
195#[derive(Serialize, Debug, Clone)]
196#[serde(untagged)]
197enum NodeInformation<'a> {
198    Input {
199        reference: StreamReference,
200        stream_name: &'a str,
201        memory_bound: u32,
202        value_ty: String,
203    },
204
205    Output {
206        reference: StreamReference,
207        is_trigger: bool,
208        stream_name: &'a str,
209        eval_layer: usize,
210        memory_bound: u32,
211        pacing_ty: String,
212        spawn_ty: String,
213        value_ty: String,
214    },
215
216    Window {
217        reference: WindowReference,
218        operation: String,
219        duration: String,
220        pacing_ty: String,
221        memory_bound: u32,
222    },
223}
224
225fn node_infos(mir: &Mir, node: Node) -> NodeInformation {
226    match node {
227        Node::Stream(sref) => stream_infos(mir, sref),
228        Node::Window(wref) => window_infos(mir, wref),
229        Node::Trigger(sref) => stream_infos(mir, mir.triggers[sref].output_reference),
230    }
231}
232
233fn stream_infos(mir: &Mir, sref: StreamReference) -> NodeInformation {
234    let stream = mir.stream(sref);
235
236    let stream_name = stream.name();
237    let eval_layer: usize = stream.eval_layer().into();
238    let memory_bound = stream.values_to_memorize().unwrap();
239    let value_ty = stream.ty();
240    let value_str = value_ty.to_string();
241
242    match sref {
243        StreamReference::In(_) => NodeInformation::Input {
244            reference: sref,
245            stream_name,
246            memory_bound,
247            value_ty: value_str,
248        },
249        StreamReference::Out(_) => {
250            let output = mir.output(sref);
251            let pacing_str = mir.display(&output.eval.eval_pacing).to_string();
252            let spawn_str = mir.display(&output.spawn.pacing).to_string();
253
254            NodeInformation::Output {
255                reference: sref,
256                is_trigger: output.is_trigger(),
257                stream_name,
258                eval_layer,
259                memory_bound,
260                pacing_ty: pacing_str,
261                spawn_ty: spawn_str,
262                value_ty: value_str,
263            }
264        }
265    }
266}
267
268fn window_infos(mir: &Mir, wref: WindowReference) -> NodeInformation {
269    let window = mir.window(wref);
270    let operation_str = window.op().to_string();
271    let duration_str = match wref {
272        WindowReference::Sliding(_) => {
273            let duration = mir.sliding_window(wref).duration;
274            format!("{}s", duration.as_secs_f64())
275        }
276        WindowReference::Discrete(_) => {
277            let duration = mir.discrete_window(wref).duration;
278            format!("{duration} values")
279        }
280
281        WindowReference::Instance(_) => {
282            let selection = &mir.instance_aggregation(wref).selection;
283            format!("{} instances", mir.display(selection))
284        }
285    };
286    let caller = mir.output(window.caller());
287
288    let origin = caller
289        .accesses
290        .iter()
291        .flat_map(|(_, accesses)| accesses)
292        .find(|(_, kind)| {
293            *kind == StreamAccessKind::SlidingWindow(wref)
294                || *kind == StreamAccessKind::DiscreteWindow(wref)
295        })
296        .expect("access has to exist")
297        .0;
298
299    let pacing = match origin {
300        Origin::Spawn => &caller.spawn.pacing,
301        Origin::Filter(_) | Origin::Eval(_) => &caller.eval.eval_pacing,
302        Origin::Close => &caller.close.pacing,
303    };
304
305    let pacing_str = mir.display(pacing).to_string();
306    let memory_bound = window.memory_bound().unwrap();
307
308    NodeInformation::Window {
309        reference: wref,
310        operation: operation_str,
311        duration: duration_str,
312        pacing_ty: pacing_str,
313        memory_bound,
314    }
315}
316
317fn edges(mir: &Mir) -> Vec<Edge> {
318    let input_accesses = mir
319        .inputs
320        .iter()
321        .map(|input| (input.reference, &input.accessed_by));
322    let output_accesses = mir
323        .outputs
324        .iter()
325        .map(|output| (output.reference, &output.accessed_by));
326    let all_accesses = input_accesses.chain(output_accesses);
327    let out_to_trig: &HashMap<_, _> = &(mir
328        .triggers
329        .iter()
330        .map(|t| (t.output_reference, t.trigger_reference))
331        .collect());
332
333    let access_edges = all_accesses.flat_map(|(source_ref, accesses)| {
334        let source = out_to_trig
335            .get(&source_ref)
336            .map(|t| Node::Trigger(*t))
337            .unwrap_or_else(|| Node::Stream(source_ref));
338        accesses.iter().flat_map(move |(target_ref, access_kinds)| {
339            let target = out_to_trig
340                .get(target_ref)
341                .map(|t| Node::Trigger(*t))
342                .unwrap_or_else(|| Node::Stream(*target_ref));
343            access_kinds
344                .iter()
345                .flat_map(move |&(origin, kind)| match kind {
346                    StreamAccessKind::SlidingWindow(w) | StreamAccessKind::DiscreteWindow(w) => {
347                        let with = EdgeType::Access { origin, kind };
348                        vec![
349                            Edge {
350                                from: target,
351                                with: with.clone(),
352                                to: Node::Window(w),
353                            },
354                            Edge {
355                                from: Node::Window(w),
356                                with,
357                                to: source,
358                            },
359                        ]
360                    }
361                    StreamAccessKind::Fresh
362                    | StreamAccessKind::Get
363                    | StreamAccessKind::Hold
364                    | StreamAccessKind::Offset(_)
365                    | StreamAccessKind::InstanceAggregation(_)
366                    | StreamAccessKind::Sync => {
367                        vec![Edge {
368                            from: target,
369                            with: EdgeType::Access { origin, kind },
370                            to: source,
371                        }]
372                    }
373                })
374        })
375    });
376
377    let spawn_edges = mir.outputs.iter().flat_map(|output| {
378        let source = out_to_trig
379            .get(&output.reference)
380            .map(|t| Node::Trigger(*t))
381            .unwrap_or_else(|| Node::Stream(output.reference));
382        match &output.spawn.pacing {
383            PacingType::Event(ac) => flatten_ac(ac)
384                .into_iter()
385                .map(|input| Edge {
386                    from: source,
387                    with: EdgeType::Spawn,
388                    to: Node::Stream(input),
389                })
390                .collect(),
391            PacingType::LocalPeriodic(_) | PacingType::GlobalPeriodic(_) | PacingType::Constant => {
392                vec![]
393            }
394        }
395    });
396
397    let ac_edges = mir.outputs.iter().flat_map(|output| {
398        let source = out_to_trig
399            .get(&output.reference)
400            .map(|t| Node::Trigger(*t))
401            .unwrap_or_else(|| Node::Stream(output.reference));
402        match &output.eval.eval_pacing {
403            PacingType::Event(ac) => flatten_ac(ac)
404                .into_iter()
405                .map(|input| Edge {
406                    from: source,
407                    with: EdgeType::Eval,
408                    to: Node::Stream(input),
409                })
410                .collect(),
411            PacingType::LocalPeriodic(_) | PacingType::GlobalPeriodic(_) | PacingType::Constant => {
412                vec![]
413            }
414        }
415    });
416
417    access_edges.chain(spawn_edges).chain(ac_edges).collect()
418}
419
420fn inner_flatten_ac(ac: &ActivationCondition) -> Vec<StreamReference> {
421    match ac {
422        ActivationCondition::Disjunction(xs) | ActivationCondition::Conjunction(xs) => {
423            xs.iter().flat_map(flatten_ac).collect()
424        }
425        ActivationCondition::Stream(s) => vec![*s],
426        ActivationCondition::True => vec![],
427    }
428}
429
430fn flatten_ac(ac: &ActivationCondition) -> Vec<StreamReference> {
431    let mut vec = inner_flatten_ac(ac);
432    vec.sort();
433    vec.dedup();
434    vec
435}
436
437impl<'a> dot::Labeller<'a, Node, Edge> for DependencyGraph<'a> {
438    fn graph_id(&'a self) -> dot::Id<'a> {
439        dot::Id::new("dependency_graph").unwrap()
440    }
441
442    fn node_id(&'a self, n: &Node) -> dot::Id<'a> {
443        let id = n.to_string();
444        dot::Id::new(id).unwrap()
445    }
446
447    fn node_label<'b>(&'b self, n: &Node) -> LabelText<'b> {
448        let infos = self.infos.get(n).unwrap();
449
450        let label_text = match infos {
451            NodeInformation::Input {
452                stream_name,
453                memory_bound,
454                value_ty,
455                reference: _,
456            } => {
457                format!("{stream_name}: {value_ty}<br/>Memory Bound: {memory_bound}")
458            }
459            NodeInformation::Output {
460                stream_name,
461                is_trigger: _,
462                eval_layer,
463                memory_bound,
464                pacing_ty,
465                spawn_ty,
466                value_ty,
467                reference: _,
468            } => {
469                format!(
470                    "{stream_name}: {value_ty}<br/>\
471Pacing: {pacing_ty}<br/>\
472Spawn: {spawn_ty}<br/>\
473Memory Bound: {memory_bound}<br/>\
474Layer {eval_layer}"
475                )
476            }
477            NodeInformation::Window {
478                reference,
479                operation,
480                duration,
481                pacing_ty: _,
482                memory_bound: _,
483            } => format!(
484                "Window {reference}<br/>Window Operation: {operation}<br/>Duration: {duration}"
485            ),
486        };
487
488        LabelText::HtmlStr(label_text.into())
489    }
490
491    fn edge_label<'b>(&'b self, edge: &Edge) -> LabelText<'b> {
492        LabelText::LabelStr(edge.with.to_string().into())
493    }
494
495    fn edge_style(&self, edge: &Edge) -> Style {
496        match &edge.with {
497            EdgeType::Access { kind, origin: _ } => match kind {
498                StreamAccessKind::Get | StreamAccessKind::Fresh | StreamAccessKind::Hold => {
499                    Style::Dashed
500                }
501                StreamAccessKind::Sync
502                | StreamAccessKind::InstanceAggregation(_)
503                | StreamAccessKind::Offset(_)
504                | StreamAccessKind::DiscreteWindow(_)
505                | StreamAccessKind::SlidingWindow(_) => Style::None,
506            },
507            EdgeType::Spawn | EdgeType::Eval => Style::Dotted,
508        }
509    }
510
511    fn node_shape(&self, node: &Node) -> Option<LabelText<'_>> {
512        let shape_str = match node {
513            Node::Stream(StreamReference::In(_)) => "box",
514            Node::Stream(StreamReference::Out(_)) => "ellipse",
515            Node::Trigger(_) => "octagon",
516            Node::Window(_) => "note",
517        };
518
519        Some(LabelText::LabelStr(shape_str.into()))
520    }
521
522    fn edge_end_arrow(&'a self, _e: &Edge) -> dot::Arrow {
523        dot::Arrow::none()
524    }
525
526    fn edge_start_arrow(&'a self, _e: &Edge) -> dot::Arrow {
527        dot::Arrow::normal()
528    }
529}
530
531impl<'a> dot::GraphWalk<'a, Node, Edge> for DependencyGraph<'a> {
532    fn nodes(&'a self) -> dot::Nodes<'a, Node> {
533        Cow::Borrowed(&self.nodes)
534    }
535
536    fn edges(&'a self) -> dot::Edges<'a, Edge> {
537        // all the sync and offset edges
538        let ac_accesses = self
539            .edges
540            .iter()
541            .filter(|edge| {
542                matches!(
543                    edge.with,
544                    EdgeType::Access {
545                        kind: StreamAccessKind::Sync,
546                        ..
547                    } | EdgeType::Access {
548                        kind: StreamAccessKind::Offset(_),
549                        ..
550                    }
551                )
552            })
553            .map(|edge| (&edge.from, &edge.to))
554            .collect::<HashSet<_>>();
555
556        let edges = self
557            .edges
558            .iter()
559            // remove edges that have the same access kind but different origins, because
560            // the origin is not displayed in the dot-representation
561            .unique_by(|edge| {
562                (
563                    edge.from,
564                    edge.to,
565                    match edge.with {
566                        EdgeType::Access { kind, origin: _ } => Some(kind),
567                        EdgeType::Spawn | EdgeType::Eval => None,
568                    },
569                )
570            })
571            // in the dot format, we only want to render eval edges, if the edge it not already covered by sync or offset edges
572            .filter(|edge| match edge.with {
573                EdgeType::Access { .. } | EdgeType::Spawn => true,
574                EdgeType::Eval => !ac_accesses.contains(&(&edge.from, &edge.to)),
575            })
576            .cloned()
577            .collect();
578        Cow::Owned(edges)
579    }
580
581    fn source(&self, e: &Edge) -> Node {
582        // because we add the arrows the wrong way round (see edge style)
583        e.to
584    }
585
586    fn target(&self, e: &Edge) -> Node {
587        // because we add the arrows the wrong way round (see edge style)
588        e.from
589    }
590}
591
592#[cfg(test)]
593mod tests {
594    use rtlola_parser::ParserConfig;
595
596    use super::*;
597    use crate::parse;
598
599    macro_rules! build_node {
600        ( In($i:expr) ) => {
601            Node::Stream(StreamReference::In($i))
602        };
603        ( Out($i:expr) ) => {
604            Node::Stream(StreamReference::Out($i))
605        };
606        ( T($i:expr) ) => {
607            Node::Trigger($i)
608        };
609        ( SW($i:expr) ) => {
610            Node::Window(WindowReference::Sliding($i))
611        };
612        ( DW($i:expr) ) => {
613            Node::Window(WindowReference::Discrete($i))
614        };
615    }
616
617    macro_rules! build_edge_kind {
618        ( Spawn ) => {
619            EdgeType::Spawn
620        };
621        ( Eval ) => {
622            EdgeType::Eval
623        };
624        ( SW, $i:expr, $origin:ident $(, $origin_i:expr )? ) => {
625            EdgeType::Access{origin: Origin::$origin$(($origin_i))?, kind: StreamAccessKind::SlidingWindow(WindowReference::Sliding($i))}
626        };
627        ( DW, $i:expr, $origin:ident ) => {
628            EdgeType::Access{origin: Origin::&origin, kind: StreamAccessKind::DiscreteWindow(WindowReference::Discrete($i))}
629        };
630        ( $sak:ident, $origin:ident $(, $origin_i:expr )? ) => {
631            EdgeType::Access{origin: Origin::$origin$(($origin_i))?, kind: StreamAccessKind::$sak}
632        };
633    }
634
635    // https://stackoverflow.com/a/34324856
636    macro_rules! count {
637        () => (0usize);
638        ( $x:tt $($xs:tt)* ) => (1usize + count!($($xs)*));
639    }
640
641    macro_rules! test_dependency_graph {
642        ( $name:ident, $spec:literal, $( $edge_from_ty:ident($edge_from_i:expr)$(:$origin:ident$(($origin_i:expr))?)? => $edge_to_ty:ident($edge_to_i:expr) : $with:ident $(($p:expr))? , )+ ) => {
643
644            #[test]
645            fn $name() {
646                let config = ParserConfig::for_string($spec.into());
647                let mir = parse(&config).expect("should parse");
648                let dep_graph = mir.dependency_graph();
649                let edges = &dep_graph.edges;
650                $(
651                    let from_node = build_node!($edge_from_ty($edge_from_i));
652                    let to_node = build_node!($edge_to_ty($edge_to_i));
653                    let with = build_edge_kind!($with $(,$p)? $(,$origin $(,$origin_i)?)?);
654                    let expected_edge = Edge {
655                        from: from_node, to: to_node, with
656                    };
657                    assert!(edges.iter().any(|edge| *edge == expected_edge), "specification did not contain expected edge {:#?}", expected_edge);
658                )+
659                assert!(edges.len() == count!($($with)+), "dependency graph had unwanted additional edges");
660            }
661        };
662    }
663
664    test_dependency_graph!(simple,
665        "input a : UInt64
666        input b : UInt64
667        output c := a + b",
668        Out(0):Eval(0) => In(0) : Sync,
669        Out(0):Eval(0) => In(1) : Sync,
670        Out(0) => In(0) : Eval,
671        Out(0) => In(1) : Eval,
672    );
673
674    test_dependency_graph!(trigger,
675        "input a : UInt64
676        trigger a > 5",
677        T(0):Filter(0) => In(0) : Sync,
678        T(0) => In(0) : Eval,
679    );
680
681    test_dependency_graph!(more_complex,
682        "input a : UInt64
683        input b : UInt64
684        output c := a + b.hold().defaults(to:0)
685        output d@1Hz := a.aggregate(over:5s, using:count)
686        trigger d < 5",
687        Out(0):Eval(0) => In(0) : Sync,
688        Out(0):Eval(0) => In(1) : Hold,
689        Out(1):Eval(0) => SW(0) : SW(0),
690        SW(0):Eval(0) => In(0) : SW(0),
691        T(0):Filter(0) => Out(1) : Sync,
692        Out(0) => In(0) : Eval,
693    );
694
695    test_dependency_graph!(ac,
696        "input a : UInt64
697        input b : UInt64
698        output c @(a||b) := 0
699        output d @(a&&b) := a
700        ",
701        Out(1):Eval(0) => In(0) : Sync,
702        Out(0) => In(0) : Eval,
703        Out(0) => In(1) : Eval,
704        Out(1) => In(0) : Eval,
705        Out(1) => In(1) : Eval,
706    );
707
708    test_dependency_graph!(spawn,
709        "input a : UInt64
710        input b : UInt64
711        output c(x)
712            spawn with a
713            eval with b when x == a
714        ",
715        Out(0) => In(0) : Spawn,
716        Out(0) => In(0) : Eval,
717        Out(0) => In(1) : Eval,
718        Out(0):Filter(0) => In(0) : Sync,
719        Out(0):Eval(0) => In(1) : Sync,
720        Out(0):Spawn => In(0) : Sync,
721    );
722
723    test_dependency_graph!(multiple_evals,
724        "input a : UInt64
725        input b : UInt64
726        output c
727            eval @(a&&b) when a == 0 with 0
728            eval @(a&&b) when b == 0 with 1
729            eval @(a&&b) when a + b == 1 with a  
730        ",
731        Out(0) => In(0) : Eval,
732        Out(0) => In(1) : Eval,
733        Out(0):Filter(0) => In(0) : Sync,
734        Out(0):Filter(1) => In(1) : Sync,
735        Out(0):Filter(2) => In(0) : Sync,
736        Out(0):Filter(2) => In(1) : Sync,
737        Out(0):Eval(2) => In(0) : Sync,
738    );
739}