1use std::{
4 collections::{HashMap, HashSet},
5 fmt,
6 sync::Arc,
7};
8
9use crate::prelude::{thirdparty::tokio::sync::Mutex, *};
10
11#[derive(Debug, Clone)]
13pub struct DataLayout {
14 pub inputs: HashSet<Uuid>,
15 pub outputs: HashSet<Uuid>,
16 pub queries: HashSet<Uuid>,
17 pub queryables: HashSet<Uuid>,
18}
19
20#[derive(Debug, Clone)]
23pub struct DebugLayout {
24 pub labels: HashMap<Uuid, String>,
25 pub nodes: HashMap<Uuid, HashSet<Uuid>>,
26}
27
28impl DebugLayout {
29 pub fn label(&self, uuid: impl AsRef<Uuid>) -> String {
31 self.labels.get(uuid.as_ref()).cloned().unwrap_or_default()
32 }
33}
34
35#[derive(Clone)]
38pub struct DataflowLayout {
39 pub data: DataLayout,
40 pub debug: DebugLayout,
41 pub flows: FlowLayout,
42}
43
44#[derive(Debug, Clone)]
47pub struct SharedDataLayout {
48 pub data: Arc<Mutex<DataLayout>>,
49 pub debug: Arc<Mutex<DebugLayout>>,
50}
51
52impl DataflowLayout {
53 pub fn empty() -> SharedDataLayout {
56 SharedDataLayout {
57 data: Arc::new(Mutex::new(DataLayout {
58 inputs: HashSet::new(),
59 outputs: HashSet::new(),
60 queryables: HashSet::new(),
61 queries: HashSet::new(),
62 })),
63 debug: Arc::new(Mutex::new(DebugLayout {
64 labels: HashMap::new(),
65 nodes: HashMap::new(),
66 })),
67 }
68 }
69
70 pub fn label(&self, uuid: impl AsRef<Uuid>) -> String {
72 self.debug.label(uuid)
73 }
74}
75
76impl SharedDataLayout {
77 pub async fn node<T>(
80 &self,
81 label: impl Into<String>,
82 layout_builder: impl AsyncFnOnce(&mut NodeLayout) -> T,
83 ) -> (NodeID, T) {
84 let label = label.into();
85 let id = NodeID::new(&label);
86 let mut layout = NodeLayout::new(&id);
87
88 let result = layout_builder(&mut layout).await;
89
90 let mut debug = self.debug.lock().await;
91
92 debug.nodes.insert(
93 id.uuid,
94 layout
95 .data
96 .inputs
97 .union(&layout.data.outputs)
98 .chain(layout.data.queries.union(&layout.data.queryables))
99 .cloned()
100 .collect(),
101 );
102
103 let mut data = self.data.lock().await;
104
105 data.inputs.extend(layout.data.inputs);
106 data.outputs.extend(layout.data.outputs);
107 data.queries.extend(layout.data.queries);
108 data.queryables.extend(layout.data.queryables);
109
110 debug.labels.extend(layout.debug.labels);
111 debug.labels.insert(id.uuid, label.clone());
112
113 tracing::debug!("Node '{}' (uuid: {}) created", label, id.uuid);
114
115 (id, result)
116 }
117
118 pub async fn finish(
122 self,
123 flows: impl AsyncFnOnce(&mut FlowLayout) -> Result<()>,
124 ) -> Result<Arc<DataflowLayout>> {
125 let mut layout = FlowLayout {
126 connections: HashSet::new(),
127 };
128
129 flows(&mut layout).await.wrap_err("Failed to build flows")?;
130
131 let data = self.data.lock().await.clone();
132 let debug = self.debug.lock().await.clone();
133
134 for (a, b) in &layout.connections {
135 match (data.outputs.contains(a), data.inputs.contains(b)) {
136 (true, true) => {}
137 (false, false) => match (data.queries.contains(a), data.queryables.contains(b)) {
138 (true, true) => {}
139 (false, false) => match (data.queryables.contains(a), data.queries.contains(b))
140 {
141 (true, true) => {}
142 _ => {
143 eyre::bail!(
144 "Invalid connection between '{}' and '{}'",
145 debug.label(a),
146 debug.label(b)
147 );
148 }
149 },
150 _ => {
151 eyre::bail!(
152 "Invalid connection between '{}' and '{}'",
153 debug.label(a),
154 debug.label(b)
155 );
156 }
157 },
158 _ => {
159 eyre::bail!(
160 "Invalid connection between '{}' and '{}'",
161 debug.label(a),
162 debug.label(b)
163 );
164 }
165 }
166 }
167
168 Ok(Arc::new(DataflowLayout {
169 data,
170 debug,
171 flows: layout,
172 }))
173 }
174}
175
176impl fmt::Debug for DataflowLayout {
177 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
178 #[derive(Debug)]
179 struct Layout {
180 #[allow(dead_code)]
181 id: (String, Uuid),
182
183 inputs: HashSet<(String, Uuid)>,
184 outputs: HashSet<(String, Uuid)>,
185 queryables: HashSet<(String, Uuid)>,
186 queries: HashSet<(String, Uuid)>,
187 }
188
189 let mut nodes = Vec::new();
190
191 for (&node, io) in &self.debug.nodes {
192 let mut layout = Layout {
193 id: (self.label(node), node),
194
195 inputs: HashSet::new(),
196 outputs: HashSet::new(),
197 queryables: HashSet::new(),
198 queries: HashSet::new(),
199 };
200
201 for &io in io {
202 if self.data.inputs.contains(&io) {
203 layout.inputs.insert((self.label(io), io));
204 }
205 if self.data.outputs.contains(&io) {
206 layout.outputs.insert((self.label(io), io));
207 }
208 if self.data.queryables.contains(&io) {
209 layout.queryables.insert((self.label(io), io));
210 }
211 if self.data.queries.contains(&io) {
212 layout.queries.insert((self.label(io), io));
213 }
214 }
215
216 nodes.push(layout);
217 }
218
219 writeln!(f, "{:#?}", nodes)
220 }
221}