iridis_layout/
layout.rs

1//! This module defines the complete layout of a `dataflow` application.
2
3use std::{
4    collections::{HashMap, HashSet},
5    fmt,
6    sync::Arc,
7};
8
9use crate::prelude::{thirdparty::tokio::sync::Mutex, *};
10
11/// Represents the data layout of the application.
12#[derive(Debug, Clone)]
13pub struct DataLayout {
14    pub inputs: HashSet<Uuid>,
15    pub outputs: HashSet<Uuid>,
16    pub queries: HashSet<Uuid>,
17    pub queryables: HashSet<Uuid>,
18}
19
20/// Represents the debug layout of the application: the labels
21/// and the nodes/primitives relationship.
22#[derive(Debug, Clone)]
23pub struct DebugLayout {
24    pub labels: HashMap<Uuid, String>,
25    pub nodes: HashMap<Uuid, HashSet<Uuid>>,
26}
27
28impl DebugLayout {
29    /// Gets the label of a primitive or a Node by its UUID.
30    pub fn label(&self, uuid: impl AsRef<Uuid>) -> String {
31        self.labels.get(uuid.as_ref()).cloned().unwrap_or_default()
32    }
33}
34
35/// Represents the a `Dataflow` application! This is the main
36/// struct that contains all the data and debug layouts.
37#[derive(Clone)]
38pub struct DataflowLayout {
39    pub data: DataLayout,
40    pub debug: DebugLayout,
41    pub flows: FlowLayout,
42}
43
44/// Represents a shared `Data` only layout. It is used to construct
45/// a `DataflowLayout` easily.
46#[derive(Debug, Clone)]
47pub struct SharedDataLayout {
48    pub data: Arc<Mutex<DataLayout>>,
49    pub debug: Arc<Mutex<DebugLayout>>,
50}
51
52impl DataflowLayout {
53    /// Creates a new empty `SharedDataLayout` that must be `finished` to create
54    /// the final `DataflowLayout`.
55    pub fn empty() -> SharedDataLayout {
56        SharedDataLayout {
57            data: Arc::new(Mutex::new(DataLayout {
58                inputs: HashSet::new(),
59                outputs: HashSet::new(),
60                queryables: HashSet::new(),
61                queries: HashSet::new(),
62            })),
63            debug: Arc::new(Mutex::new(DebugLayout {
64                labels: HashMap::new(),
65                nodes: HashMap::new(),
66            })),
67        }
68    }
69
70    /// Gets the label of a primitive or a Node by its UUID.
71    pub fn label(&self, uuid: impl AsRef<Uuid>) -> String {
72        self.debug.label(uuid)
73    }
74}
75
76impl SharedDataLayout {
77    /// Creates a new `Node` with the given label. Provide an `async` closure
78    /// to add primitives to the node (such as Inputs, Queries etc...)
79    pub async fn node<T>(
80        &self,
81        label: impl Into<String>,
82        layout_builder: impl AsyncFnOnce(&mut NodeLayout) -> T,
83    ) -> (NodeID, T) {
84        let label = label.into();
85        let id = NodeID::new(&label);
86        let mut layout = NodeLayout::new(&id);
87
88        let result = layout_builder(&mut layout).await;
89
90        let mut debug = self.debug.lock().await;
91
92        debug.nodes.insert(
93            id.uuid,
94            layout
95                .data
96                .inputs
97                .union(&layout.data.outputs)
98                .chain(layout.data.queries.union(&layout.data.queryables))
99                .cloned()
100                .collect(),
101        );
102
103        let mut data = self.data.lock().await;
104
105        data.inputs.extend(layout.data.inputs);
106        data.outputs.extend(layout.data.outputs);
107        data.queries.extend(layout.data.queries);
108        data.queryables.extend(layout.data.queryables);
109
110        debug.labels.extend(layout.debug.labels);
111        debug.labels.insert(id.uuid, label.clone());
112
113        tracing::debug!("Node '{}' (uuid: {}) created", label, id.uuid);
114
115        (id, result)
116    }
117
118    /// Creates a `DataflowLayout` from the current `SharedDataLayout` and the given
119    /// `flows` function. The `flows` function is an `async` closure that takes a
120    /// `FlowLayout` that can be used to connect the primitives together.
121    pub async fn finish(
122        self,
123        flows: impl AsyncFnOnce(&mut FlowLayout) -> Result<()>,
124    ) -> Result<Arc<DataflowLayout>> {
125        let mut layout = FlowLayout {
126            connections: HashSet::new(),
127        };
128
129        flows(&mut layout).await.wrap_err("Failed to build flows")?;
130
131        let data = self.data.lock().await.clone();
132        let debug = self.debug.lock().await.clone();
133
134        for (a, b) in &layout.connections {
135            match (data.outputs.contains(a), data.inputs.contains(b)) {
136                (true, true) => {}
137                (false, false) => match (data.queries.contains(a), data.queryables.contains(b)) {
138                    (true, true) => {}
139                    (false, false) => match (data.queryables.contains(a), data.queries.contains(b))
140                    {
141                        (true, true) => {}
142                        _ => {
143                            eyre::bail!(
144                                "Invalid connection between '{}' and '{}'",
145                                debug.label(a),
146                                debug.label(b)
147                            );
148                        }
149                    },
150                    _ => {
151                        eyre::bail!(
152                            "Invalid connection between '{}' and '{}'",
153                            debug.label(a),
154                            debug.label(b)
155                        );
156                    }
157                },
158                _ => {
159                    eyre::bail!(
160                        "Invalid connection between '{}' and '{}'",
161                        debug.label(a),
162                        debug.label(b)
163                    );
164                }
165            }
166        }
167
168        Ok(Arc::new(DataflowLayout {
169            data,
170            debug,
171            flows: layout,
172        }))
173    }
174}
175
176impl fmt::Debug for DataflowLayout {
177    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
178        #[derive(Debug)]
179        struct Layout {
180            #[allow(dead_code)]
181            id: (String, Uuid),
182
183            inputs: HashSet<(String, Uuid)>,
184            outputs: HashSet<(String, Uuid)>,
185            queryables: HashSet<(String, Uuid)>,
186            queries: HashSet<(String, Uuid)>,
187        }
188
189        let mut nodes = Vec::new();
190
191        for (&node, io) in &self.debug.nodes {
192            let mut layout = Layout {
193                id: (self.label(node), node),
194
195                inputs: HashSet::new(),
196                outputs: HashSet::new(),
197                queryables: HashSet::new(),
198                queries: HashSet::new(),
199            };
200
201            for &io in io {
202                if self.data.inputs.contains(&io) {
203                    layout.inputs.insert((self.label(io), io));
204                }
205                if self.data.outputs.contains(&io) {
206                    layout.outputs.insert((self.label(io), io));
207                }
208                if self.data.queryables.contains(&io) {
209                    layout.queryables.insert((self.label(io), io));
210                }
211                if self.data.queries.contains(&io) {
212                    layout.queries.insert((self.label(io), io));
213                }
214            }
215
216            nodes.push(layout);
217        }
218
219        writeln!(f, "{:#?}", nodes)
220    }
221}