1use std::borrow::Cow;
2use std::collections::{HashMap, HashSet};
3use std::fmt::Display;
4use std::io::BufWriter;
5
6use dot::{LabelText, Style};
7use itertools::Itertools;
8use serde::{Serialize, Serializer};
9use serde_json::{json, to_string_pretty};
10
11use super::{
12 ActivationCondition, Mir, Origin, PacingType, StreamAccessKind, StreamReference,
13 TriggerReference, WindowReference,
14};
15
16#[derive(Debug, Clone)]
18pub struct DependencyGraph<'a> {
19 nodes: Vec<Node>,
20 edges: Vec<Edge>,
21 infos: HashMap<Node, NodeInformation<'a>>,
22}
23
24impl<'a> DependencyGraph<'a> {
25 pub(super) fn new(mir: &'a Mir) -> Self {
26 let stream_nodes = mir
27 .inputs
28 .iter()
29 .map(|i| i.reference)
30 .chain(
31 mir.outputs
32 .iter()
33 .filter(|o| !o.is_trigger())
34 .map(|o| o.reference),
35 )
36 .map(Node::Stream);
37
38 let window_nodes = mir
39 .sliding_windows
40 .iter()
41 .map(|w| Node::Window(w.reference));
42
43 let trigger_nodes = mir
44 .triggers
45 .iter()
46 .map(|trigger| Node::Trigger(trigger.trigger_reference));
47
48 let nodes: Vec<_> = stream_nodes
49 .chain(window_nodes)
50 .chain(trigger_nodes)
51 .collect();
52
53 let edges = edges(mir);
54
55 let infos = nodes
56 .iter()
57 .map(|node| (*node, node_infos(mir, *node)))
58 .collect();
59
60 Self {
61 nodes,
62 edges,
63 infos,
64 }
65 }
66
67 pub fn dot(&self) -> String {
69 let res = Vec::new();
70 let mut res_writer = BufWriter::new(res);
71 dot::render(self, &mut res_writer).unwrap();
72 String::from_utf8(res_writer.into_inner().unwrap()).unwrap()
73 }
74
75 pub fn json(&self) -> String {
77 let infos = self
78 .infos
79 .iter()
80 .map(|(key, value)| (key.to_string(), value))
81 .collect::<HashMap<_, _>>();
82
83 let json_value = json!({
84 "edges": self.edges,
85 "nodes": infos
86 });
87
88 to_string_pretty(&json_value).unwrap()
89 }
90}
91
92#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
93enum Node {
94 Stream(StreamReference),
95 Trigger(TriggerReference),
96 Window(WindowReference),
97}
98
99impl From<StreamReference> for Node {
100 fn from(s: StreamReference) -> Self {
101 Node::Stream(s)
102 }
103}
104
105impl From<WindowReference> for Node {
106 fn from(w: WindowReference) -> Self {
107 Node::Window(w)
108 }
109}
110
111impl Display for Node {
112 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
113 match self {
114 Node::Stream(StreamReference::In(i)) => write!(f, "In_{i}"),
115 Node::Stream(StreamReference::Out(i)) => write!(f, "Out_{i}"),
116 Node::Window(WindowReference::Sliding(i)) => write!(f, "SW_{i}"),
117 Node::Window(WindowReference::Discrete(i)) => write!(f, "DW_{i}"),
118 Node::Window(WindowReference::Instance(i)) => write!(f, "IA_{i}"),
119 Node::Trigger(i) => write!(f, "T_{i}"),
120 }
121 }
122}
123
124impl Serialize for Node {
125 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
126 where
127 S: Serializer,
128 {
129 serializer.serialize_str(self.to_string().as_str())
130 }
131}
132
133#[derive(Clone, Debug, Serialize, PartialEq, Eq)]
134struct Edge {
135 from: Node,
136 with: EdgeType,
137 to: Node,
138}
139
140#[derive(Debug, Clone, Serialize, PartialEq, Eq)]
141#[serde(tag = "type")]
142enum EdgeType {
143 Access {
144 kind: StreamAccessKind,
145 origin: Origin,
146 },
147 Spawn,
148 Eval,
149}
150
151impl Display for EdgeType {
152 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
153 let s = match self {
154 EdgeType::Access {
155 kind: StreamAccessKind::Sync,
156 ..
157 } => "Sync".into(),
158 EdgeType::Access {
159 kind: StreamAccessKind::Hold,
160 ..
161 } => "Hold".into(),
162 EdgeType::Access {
163 kind: StreamAccessKind::Offset(o),
164 ..
165 } => format!("Offset({o})"),
166 EdgeType::Spawn => "Spawn".into(),
167 EdgeType::Eval => "Eval".into(),
168 EdgeType::Access {
169 kind: StreamAccessKind::InstanceAggregation(_),
170 ..
171 } => "Instances".into(),
172 EdgeType::Access {
174 kind: StreamAccessKind::DiscreteWindow(_),
175 ..
176 }
177 | EdgeType::Access {
178 kind: StreamAccessKind::SlidingWindow(_),
179 ..
180 } => "".into(),
181 EdgeType::Access {
182 kind: StreamAccessKind::Get,
183 ..
184 } => "Get".into(),
185 EdgeType::Access {
186 kind: StreamAccessKind::Fresh,
187 ..
188 } => "Fresh".into(),
189 };
190
191 write!(f, "{s}")
192 }
193}
194
195#[derive(Serialize, Debug, Clone)]
196#[serde(untagged)]
197enum NodeInformation<'a> {
198 Input {
199 reference: StreamReference,
200 stream_name: &'a str,
201 memory_bound: u32,
202 value_ty: String,
203 },
204
205 Output {
206 reference: StreamReference,
207 is_trigger: bool,
208 stream_name: &'a str,
209 eval_layer: usize,
210 memory_bound: u32,
211 pacing_ty: String,
212 spawn_ty: String,
213 value_ty: String,
214 },
215
216 Window {
217 reference: WindowReference,
218 operation: String,
219 duration: String,
220 pacing_ty: String,
221 memory_bound: u32,
222 },
223}
224
225fn node_infos(mir: &Mir, node: Node) -> NodeInformation {
226 match node {
227 Node::Stream(sref) => stream_infos(mir, sref),
228 Node::Window(wref) => window_infos(mir, wref),
229 Node::Trigger(sref) => stream_infos(mir, mir.triggers[sref].output_reference),
230 }
231}
232
233fn stream_infos(mir: &Mir, sref: StreamReference) -> NodeInformation {
234 let stream = mir.stream(sref);
235
236 let stream_name = stream.name();
237 let eval_layer: usize = stream.eval_layer().into();
238 let memory_bound = stream.values_to_memorize().unwrap();
239 let value_ty = stream.ty();
240 let value_str = value_ty.to_string();
241
242 match sref {
243 StreamReference::In(_) => NodeInformation::Input {
244 reference: sref,
245 stream_name,
246 memory_bound,
247 value_ty: value_str,
248 },
249 StreamReference::Out(_) => {
250 let output = mir.output(sref);
251 let pacing_str = mir.display(&output.eval.eval_pacing).to_string();
252 let spawn_str = mir.display(&output.spawn.pacing).to_string();
253
254 NodeInformation::Output {
255 reference: sref,
256 is_trigger: output.is_trigger(),
257 stream_name,
258 eval_layer,
259 memory_bound,
260 pacing_ty: pacing_str,
261 spawn_ty: spawn_str,
262 value_ty: value_str,
263 }
264 }
265 }
266}
267
268fn window_infos(mir: &Mir, wref: WindowReference) -> NodeInformation {
269 let window = mir.window(wref);
270 let operation_str = window.op().to_string();
271 let duration_str = match wref {
272 WindowReference::Sliding(_) => {
273 let duration = mir.sliding_window(wref).duration;
274 format!("{}s", duration.as_secs_f64())
275 }
276 WindowReference::Discrete(_) => {
277 let duration = mir.discrete_window(wref).duration;
278 format!("{duration} values")
279 }
280
281 WindowReference::Instance(_) => {
282 let selection = &mir.instance_aggregation(wref).selection;
283 format!("{} instances", mir.display(selection))
284 }
285 };
286 let caller = mir.output(window.caller());
287
288 let origin = caller
289 .accesses
290 .iter()
291 .flat_map(|(_, accesses)| accesses)
292 .find(|(_, kind)| {
293 *kind == StreamAccessKind::SlidingWindow(wref)
294 || *kind == StreamAccessKind::DiscreteWindow(wref)
295 })
296 .expect("access has to exist")
297 .0;
298
299 let pacing = match origin {
300 Origin::Spawn => &caller.spawn.pacing,
301 Origin::Filter(_) | Origin::Eval(_) => &caller.eval.eval_pacing,
302 Origin::Close => &caller.close.pacing,
303 };
304
305 let pacing_str = mir.display(pacing).to_string();
306 let memory_bound = window.memory_bound().unwrap();
307
308 NodeInformation::Window {
309 reference: wref,
310 operation: operation_str,
311 duration: duration_str,
312 pacing_ty: pacing_str,
313 memory_bound,
314 }
315}
316
317fn edges(mir: &Mir) -> Vec<Edge> {
318 let input_accesses = mir
319 .inputs
320 .iter()
321 .map(|input| (input.reference, &input.accessed_by));
322 let output_accesses = mir
323 .outputs
324 .iter()
325 .map(|output| (output.reference, &output.accessed_by));
326 let all_accesses = input_accesses.chain(output_accesses);
327 let out_to_trig: &HashMap<_, _> = &(mir
328 .triggers
329 .iter()
330 .map(|t| (t.output_reference, t.trigger_reference))
331 .collect());
332
333 let access_edges = all_accesses.flat_map(|(source_ref, accesses)| {
334 let source = out_to_trig
335 .get(&source_ref)
336 .map(|t| Node::Trigger(*t))
337 .unwrap_or_else(|| Node::Stream(source_ref));
338 accesses.iter().flat_map(move |(target_ref, access_kinds)| {
339 let target = out_to_trig
340 .get(target_ref)
341 .map(|t| Node::Trigger(*t))
342 .unwrap_or_else(|| Node::Stream(*target_ref));
343 access_kinds
344 .iter()
345 .flat_map(move |&(origin, kind)| match kind {
346 StreamAccessKind::SlidingWindow(w) | StreamAccessKind::DiscreteWindow(w) => {
347 let with = EdgeType::Access { origin, kind };
348 vec![
349 Edge {
350 from: target,
351 with: with.clone(),
352 to: Node::Window(w),
353 },
354 Edge {
355 from: Node::Window(w),
356 with,
357 to: source,
358 },
359 ]
360 }
361 StreamAccessKind::Fresh
362 | StreamAccessKind::Get
363 | StreamAccessKind::Hold
364 | StreamAccessKind::Offset(_)
365 | StreamAccessKind::InstanceAggregation(_)
366 | StreamAccessKind::Sync => {
367 vec![Edge {
368 from: target,
369 with: EdgeType::Access { origin, kind },
370 to: source,
371 }]
372 }
373 })
374 })
375 });
376
377 let spawn_edges = mir.outputs.iter().flat_map(|output| {
378 let source = out_to_trig
379 .get(&output.reference)
380 .map(|t| Node::Trigger(*t))
381 .unwrap_or_else(|| Node::Stream(output.reference));
382 match &output.spawn.pacing {
383 PacingType::Event(ac) => flatten_ac(ac)
384 .into_iter()
385 .map(|input| Edge {
386 from: source,
387 with: EdgeType::Spawn,
388 to: Node::Stream(input),
389 })
390 .collect(),
391 PacingType::LocalPeriodic(_) | PacingType::GlobalPeriodic(_) | PacingType::Constant => {
392 vec![]
393 }
394 }
395 });
396
397 let ac_edges = mir.outputs.iter().flat_map(|output| {
398 let source = out_to_trig
399 .get(&output.reference)
400 .map(|t| Node::Trigger(*t))
401 .unwrap_or_else(|| Node::Stream(output.reference));
402 match &output.eval.eval_pacing {
403 PacingType::Event(ac) => flatten_ac(ac)
404 .into_iter()
405 .map(|input| Edge {
406 from: source,
407 with: EdgeType::Eval,
408 to: Node::Stream(input),
409 })
410 .collect(),
411 PacingType::LocalPeriodic(_) | PacingType::GlobalPeriodic(_) | PacingType::Constant => {
412 vec![]
413 }
414 }
415 });
416
417 access_edges.chain(spawn_edges).chain(ac_edges).collect()
418}
419
420fn inner_flatten_ac(ac: &ActivationCondition) -> Vec<StreamReference> {
421 match ac {
422 ActivationCondition::Disjunction(xs) | ActivationCondition::Conjunction(xs) => {
423 xs.iter().flat_map(flatten_ac).collect()
424 }
425 ActivationCondition::Stream(s) => vec![*s],
426 ActivationCondition::True => vec![],
427 }
428}
429
430fn flatten_ac(ac: &ActivationCondition) -> Vec<StreamReference> {
431 let mut vec = inner_flatten_ac(ac);
432 vec.sort();
433 vec.dedup();
434 vec
435}
436
437impl<'a> dot::Labeller<'a, Node, Edge> for DependencyGraph<'a> {
438 fn graph_id(&'a self) -> dot::Id<'a> {
439 dot::Id::new("dependency_graph").unwrap()
440 }
441
442 fn node_id(&'a self, n: &Node) -> dot::Id<'a> {
443 let id = n.to_string();
444 dot::Id::new(id).unwrap()
445 }
446
447 fn node_label<'b>(&'b self, n: &Node) -> LabelText<'b> {
448 let infos = self.infos.get(n).unwrap();
449
450 let label_text = match infos {
451 NodeInformation::Input {
452 stream_name,
453 memory_bound,
454 value_ty,
455 reference: _,
456 } => {
457 format!("{stream_name}: {value_ty}<br/>Memory Bound: {memory_bound}")
458 }
459 NodeInformation::Output {
460 stream_name,
461 is_trigger: _,
462 eval_layer,
463 memory_bound,
464 pacing_ty,
465 spawn_ty,
466 value_ty,
467 reference: _,
468 } => {
469 format!(
470 "{stream_name}: {value_ty}<br/>\
471Pacing: {pacing_ty}<br/>\
472Spawn: {spawn_ty}<br/>\
473Memory Bound: {memory_bound}<br/>\
474Layer {eval_layer}"
475 )
476 }
477 NodeInformation::Window {
478 reference,
479 operation,
480 duration,
481 pacing_ty: _,
482 memory_bound: _,
483 } => format!(
484 "Window {reference}<br/>Window Operation: {operation}<br/>Duration: {duration}"
485 ),
486 };
487
488 LabelText::HtmlStr(label_text.into())
489 }
490
491 fn edge_label<'b>(&'b self, edge: &Edge) -> LabelText<'b> {
492 LabelText::LabelStr(edge.with.to_string().into())
493 }
494
495 fn edge_style(&self, edge: &Edge) -> Style {
496 match &edge.with {
497 EdgeType::Access { kind, origin: _ } => match kind {
498 StreamAccessKind::Get | StreamAccessKind::Fresh | StreamAccessKind::Hold => {
499 Style::Dashed
500 }
501 StreamAccessKind::Sync
502 | StreamAccessKind::InstanceAggregation(_)
503 | StreamAccessKind::Offset(_)
504 | StreamAccessKind::DiscreteWindow(_)
505 | StreamAccessKind::SlidingWindow(_) => Style::None,
506 },
507 EdgeType::Spawn | EdgeType::Eval => Style::Dotted,
508 }
509 }
510
511 fn node_shape(&self, node: &Node) -> Option<LabelText<'_>> {
512 let shape_str = match node {
513 Node::Stream(StreamReference::In(_)) => "box",
514 Node::Stream(StreamReference::Out(_)) => "ellipse",
515 Node::Trigger(_) => "octagon",
516 Node::Window(_) => "note",
517 };
518
519 Some(LabelText::LabelStr(shape_str.into()))
520 }
521
522 fn edge_end_arrow(&'a self, _e: &Edge) -> dot::Arrow {
523 dot::Arrow::none()
524 }
525
526 fn edge_start_arrow(&'a self, _e: &Edge) -> dot::Arrow {
527 dot::Arrow::normal()
528 }
529}
530
531impl<'a> dot::GraphWalk<'a, Node, Edge> for DependencyGraph<'a> {
532 fn nodes(&'a self) -> dot::Nodes<'a, Node> {
533 Cow::Borrowed(&self.nodes)
534 }
535
536 fn edges(&'a self) -> dot::Edges<'a, Edge> {
537 let ac_accesses = self
539 .edges
540 .iter()
541 .filter(|edge| {
542 matches!(
543 edge.with,
544 EdgeType::Access {
545 kind: StreamAccessKind::Sync,
546 ..
547 } | EdgeType::Access {
548 kind: StreamAccessKind::Offset(_),
549 ..
550 }
551 )
552 })
553 .map(|edge| (&edge.from, &edge.to))
554 .collect::<HashSet<_>>();
555
556 let edges = self
557 .edges
558 .iter()
559 .unique_by(|edge| {
562 (
563 edge.from,
564 edge.to,
565 match edge.with {
566 EdgeType::Access { kind, origin: _ } => Some(kind),
567 EdgeType::Spawn | EdgeType::Eval => None,
568 },
569 )
570 })
571 .filter(|edge| match edge.with {
573 EdgeType::Access { .. } | EdgeType::Spawn => true,
574 EdgeType::Eval => !ac_accesses.contains(&(&edge.from, &edge.to)),
575 })
576 .cloned()
577 .collect();
578 Cow::Owned(edges)
579 }
580
581 fn source(&self, e: &Edge) -> Node {
582 e.to
584 }
585
586 fn target(&self, e: &Edge) -> Node {
587 e.from
589 }
590}
591
592#[cfg(test)]
593mod tests {
594 use rtlola_parser::ParserConfig;
595
596 use super::*;
597 use crate::parse;
598
599 macro_rules! build_node {
600 ( In($i:expr) ) => {
601 Node::Stream(StreamReference::In($i))
602 };
603 ( Out($i:expr) ) => {
604 Node::Stream(StreamReference::Out($i))
605 };
606 ( T($i:expr) ) => {
607 Node::Trigger($i)
608 };
609 ( SW($i:expr) ) => {
610 Node::Window(WindowReference::Sliding($i))
611 };
612 ( DW($i:expr) ) => {
613 Node::Window(WindowReference::Discrete($i))
614 };
615 }
616
617 macro_rules! build_edge_kind {
618 ( Spawn ) => {
619 EdgeType::Spawn
620 };
621 ( Eval ) => {
622 EdgeType::Eval
623 };
624 ( SW, $i:expr, $origin:ident $(, $origin_i:expr )? ) => {
625 EdgeType::Access{origin: Origin::$origin$(($origin_i))?, kind: StreamAccessKind::SlidingWindow(WindowReference::Sliding($i))}
626 };
627 ( DW, $i:expr, $origin:ident ) => {
628 EdgeType::Access{origin: Origin::&origin, kind: StreamAccessKind::DiscreteWindow(WindowReference::Discrete($i))}
629 };
630 ( $sak:ident, $origin:ident $(, $origin_i:expr )? ) => {
631 EdgeType::Access{origin: Origin::$origin$(($origin_i))?, kind: StreamAccessKind::$sak}
632 };
633 }
634
635 macro_rules! count {
637 () => (0usize);
638 ( $x:tt $($xs:tt)* ) => (1usize + count!($($xs)*));
639 }
640
641 macro_rules! test_dependency_graph {
642 ( $name:ident, $spec:literal, $( $edge_from_ty:ident($edge_from_i:expr)$(:$origin:ident$(($origin_i:expr))?)? => $edge_to_ty:ident($edge_to_i:expr) : $with:ident $(($p:expr))? , )+ ) => {
643
644 #[test]
645 fn $name() {
646 let config = ParserConfig::for_string($spec.into());
647 let mir = parse(&config).expect("should parse");
648 let dep_graph = mir.dependency_graph();
649 let edges = &dep_graph.edges;
650 $(
651 let from_node = build_node!($edge_from_ty($edge_from_i));
652 let to_node = build_node!($edge_to_ty($edge_to_i));
653 let with = build_edge_kind!($with $(,$p)? $(,$origin $(,$origin_i)?)?);
654 let expected_edge = Edge {
655 from: from_node, to: to_node, with
656 };
657 assert!(edges.iter().any(|edge| *edge == expected_edge), "specification did not contain expected edge {:#?}", expected_edge);
658 )+
659 assert!(edges.len() == count!($($with)+), "dependency graph had unwanted additional edges");
660 }
661 };
662 }
663
664 test_dependency_graph!(simple,
665 "input a : UInt64
666 input b : UInt64
667 output c := a + b",
668 Out(0):Eval(0) => In(0) : Sync,
669 Out(0):Eval(0) => In(1) : Sync,
670 Out(0) => In(0) : Eval,
671 Out(0) => In(1) : Eval,
672 );
673
674 test_dependency_graph!(trigger,
675 "input a : UInt64
676 trigger a > 5",
677 T(0):Filter(0) => In(0) : Sync,
678 T(0) => In(0) : Eval,
679 );
680
681 test_dependency_graph!(more_complex,
682 "input a : UInt64
683 input b : UInt64
684 output c := a + b.hold().defaults(to:0)
685 output d@1Hz := a.aggregate(over:5s, using:count)
686 trigger d < 5",
687 Out(0):Eval(0) => In(0) : Sync,
688 Out(0):Eval(0) => In(1) : Hold,
689 Out(1):Eval(0) => SW(0) : SW(0),
690 SW(0):Eval(0) => In(0) : SW(0),
691 T(0):Filter(0) => Out(1) : Sync,
692 Out(0) => In(0) : Eval,
693 );
694
695 test_dependency_graph!(ac,
696 "input a : UInt64
697 input b : UInt64
698 output c @(a||b) := 0
699 output d @(a&&b) := a
700 ",
701 Out(1):Eval(0) => In(0) : Sync,
702 Out(0) => In(0) : Eval,
703 Out(0) => In(1) : Eval,
704 Out(1) => In(0) : Eval,
705 Out(1) => In(1) : Eval,
706 );
707
708 test_dependency_graph!(spawn,
709 "input a : UInt64
710 input b : UInt64
711 output c(x)
712 spawn with a
713 eval with b when x == a
714 ",
715 Out(0) => In(0) : Spawn,
716 Out(0) => In(0) : Eval,
717 Out(0) => In(1) : Eval,
718 Out(0):Filter(0) => In(0) : Sync,
719 Out(0):Eval(0) => In(1) : Sync,
720 Out(0):Spawn => In(0) : Sync,
721 );
722
723 test_dependency_graph!(multiple_evals,
724 "input a : UInt64
725 input b : UInt64
726 output c
727 eval @(a&&b) when a == 0 with 0
728 eval @(a&&b) when b == 0 with 1
729 eval @(a&&b) when a + b == 1 with a
730 ",
731 Out(0) => In(0) : Eval,
732 Out(0) => In(1) : Eval,
733 Out(0):Filter(0) => In(0) : Sync,
734 Out(0):Filter(1) => In(1) : Sync,
735 Out(0):Filter(2) => In(0) : Sync,
736 Out(0):Filter(2) => In(1) : Sync,
737 Out(0):Eval(2) => In(0) : Sync,
738 );
739}