1use std::{collections::HashMap, sync::Arc};
2
3use tokio::sync::Mutex;
4use uhlc::HLC;
5use uuid::Uuid;
6
7use crate::prelude::*;
8
9type SharedMap<K, V> = Arc<Mutex<HashMap<K, V>>>;
10
11#[derive(Debug, Clone)]
13pub struct Flows {
14 pub inputs_receivers: SharedMap<Uuid, MessageReceiver>,
15 pub outputs_senders: SharedMap<Uuid, Vec<MessageSender>>,
16
17 pub queries_senders: SharedMap<Uuid, MessageSender>, pub queries_receivers: SharedMap<Uuid, MessageReceiver>, pub queryables_senders: SharedMap<Uuid, HashMap<Uuid, MessageSender>>, pub queryables_receivers: SharedMap<Uuid, MessageReceiver>, }
23
24pub struct FlowsBuilder {
26 pub layout: Arc<DataflowLayout>,
27
28 pub inputs_receivers: HashMap<Uuid, MessageReceiver>,
29 pub outputs_senders: HashMap<Uuid, Vec<MessageSender>>,
30
31 pub queries_senders: HashMap<Uuid, MessageSender>,
32 pub queries_receivers: HashMap<Uuid, MessageReceiver>,
33
34 pub queryables_senders: HashMap<Uuid, HashMap<Uuid, MessageSender>>,
35 pub queryables_receivers: HashMap<Uuid, MessageReceiver>,
36}
37
38impl Flows {
39 pub async fn new(
41 layout: Arc<DataflowLayout>,
42 flows: impl AsyncFnOnce(&mut FlowsBuilder) -> Result<()>,
43 ) -> Result<Self> {
44 let mut builder = FlowsBuilder::new(layout);
45
46 flows(&mut builder).await?;
47
48 Ok(Flows {
49 outputs_senders: Arc::new(Mutex::new(builder.outputs_senders)),
50 inputs_receivers: Arc::new(Mutex::new(builder.inputs_receivers)),
51 queries_senders: Arc::new(Mutex::new(builder.queries_senders)),
52 queries_receivers: Arc::new(Mutex::new(builder.queries_receivers)),
53 queryables_senders: Arc::new(Mutex::new(builder.queryables_senders)),
54 queryables_receivers: Arc::new(Mutex::new(builder.queryables_receivers)),
55 })
56 }
57
58 pub fn node_io(
60 &self,
61 clock: Arc<HLC>,
62 source: NodeLayout,
63 ) -> (Inputs, Outputs, Queries, Queryables) {
64 let inputs = Inputs::new(self.inputs_receivers.clone(), source.clone());
65 let outputs = Outputs::new(self.outputs_senders.clone(), clock.clone(), source.clone());
66 let queries = Queries::new(
67 self.queries_senders.clone(),
68 self.queries_receivers.clone(),
69 clock.clone(),
70 source.clone(),
71 );
72 let queryables = Queryables::new(
73 self.queryables_senders.clone(),
74 self.queryables_receivers.clone(),
75 clock.clone(),
76 source.clone(),
77 );
78
79 (inputs, outputs, queries, queryables)
80 }
81}
82
83impl FlowsBuilder {
84 pub fn new(layout: Arc<DataflowLayout>) -> Self {
85 Self {
86 layout,
87 inputs_receivers: HashMap::new(),
88 outputs_senders: HashMap::new(),
89 queries_senders: HashMap::new(),
90 queries_receivers: HashMap::new(),
91 queryables_senders: HashMap::new(),
92 queryables_receivers: HashMap::new(),
93 }
94 }
95
96 fn connect_input_output(&mut self, input: Uuid, output: Uuid, capacity: usize) -> Result<()> {
98 if !self.layout.inputs.contains(&input) {
99 eyre::bail!(
100 "Input '{}' (uuid: {}) not found in the dataflow layout created",
101 self.layout.label(input),
102 input
103 );
104 }
105
106 if !self.layout.outputs.contains(&output) {
107 eyre::bail!(
108 "Output '{}' (uuid: {}) not found in the dataflow layout created",
109 self.layout.label(output),
110 output
111 );
112 }
113
114 if self.inputs_receivers.contains_key(&input) {
115 eyre::bail!(
116 "Input '{}' (uuid: {}) already mapped",
117 self.layout.label(input),
118 input
119 );
120 }
121
122 let (sender, receiver) = tokio::sync::mpsc::channel(capacity);
123
124 self.outputs_senders.entry(output).or_default().push(sender);
125 self.inputs_receivers.insert(input, receiver);
126
127 tracing::debug!(
128 "Connecting input '{}' (uuid: {}) to output '{}' (uuid: {})",
129 self.layout.label(input),
130 input,
131 self.layout.label(output),
132 output
133 );
134
135 Ok(())
136 }
137
138 fn connect_query_queryable(
140 &mut self,
141 query: Uuid,
142 queryable: Uuid,
143 capacity: usize,
144 ) -> Result<()> {
145 if !self.layout.queryables.contains(&queryable) || !self.layout.queries.contains(&query) {
146 eyre::bail!(
147 "Queryable '{}' (uuid: {}) not found in the dataflow layout created",
148 self.layout.label(queryable),
149 queryable
150 );
151 }
152
153 if !self.layout.queries.contains(&query) {
154 eyre::bail!(
155 "Query '{}' (uuid: {}) not found in the dataflow layout created",
156 self.layout.label(query),
157 query
158 );
159 }
160
161 if self.queries_senders.contains_key(&query) || self.queries_receivers.contains_key(&query)
162 {
163 eyre::bail!(
164 "Query '{}' (uuid: {}) is already connected",
165 self.layout.label(query),
166 query
167 );
168 }
169
170 if let Some(senders) = self.queryables_senders.get(&queryable) {
171 if senders.contains_key(&query) {
172 eyre::bail!(
173 "Queryable '{}' (uuid: {}) already mapped",
174 self.layout.label(queryable),
175 queryable
176 );
177 }
178 }
179
180 if let std::collections::hash_map::Entry::Vacant(e) =
181 self.queryables_receivers.entry(queryable)
182 {
183 let (sender, receiver) = tokio::sync::mpsc::channel(capacity);
184
185 e.insert(receiver);
186 self.queries_senders.insert(query, sender);
187 } else {
188 let other_query = *match self.queryables_senders.get(&queryable) {
189 Some(senders) => senders.keys().next(),
190 None => None,
191 }
192 .ok_or_eyre(format!(
193 "Queryable '{}' (uuid: {}) is not well connected. This is a big error please report it",
194 self.layout.label(queryable),
195 queryable,
196 ))?;
197
198 let sender = self.queries_senders.get(&other_query).ok_or_eyre(format!(
199 "Query '{}' (uuid: {}) is not well connected. This is a big error please report it",
200 self.layout.label(other_query),
201 other_query,
202 ))?.clone();
203
204 self.queries_senders.insert(query, sender);
205 }
206
207 let (sender, receiver) = tokio::sync::mpsc::channel(capacity);
208 self.queries_receivers.insert(query, receiver);
209 self.queryables_senders
210 .entry(queryable)
211 .or_default()
212 .insert(query, sender);
213
214 tracing::debug!(
215 "Connecting query '{}' (uuid: {}) to queryable '{}' (uuid: {})",
216 self.layout.label(query),
217 query,
218 self.layout.label(queryable),
219 queryable
220 );
221
222 Ok(())
223 }
224
225 pub fn connect(&mut self, a: IOLayout, b: IOLayout, capacity: Option<usize>) -> Result<()> {
229 match (a, b) {
230 (IOLayout::Input(input), IOLayout::Output(output)) => {
231 self.connect_input_output(input.uuid, output.uuid, capacity.unwrap_or(128))
232 }
233 (IOLayout::Query(query), IOLayout::Queryable(queryable)) => {
234 self.connect_query_queryable(query.uuid, queryable.uuid, capacity.unwrap_or(128))
235 }
236 (IOLayout::Output(output), IOLayout::Input(input)) => {
237 self.connect_input_output(input.uuid, output.uuid, capacity.unwrap_or(128))
238 }
239 (IOLayout::Queryable(queryable), IOLayout::Query(query)) => {
240 self.connect_query_queryable(query.uuid, queryable.uuid, capacity.unwrap_or(128))
241 }
242 _ => Err(eyre::eyre!(
243 "Invalid connection! types `a` and `b` must verify: a != b and (a, b) in {{Input, Output}} or {{Query, Queryable}}"
244 )),
245 }
246 }
247}