flarrow_flows/
flows.rs

1use std::{collections::HashMap, sync::Arc};
2
3use tokio::sync::Mutex;
4use uhlc::HLC;
5use uuid::Uuid;
6
7use crate::prelude::*;
8
9type SharedMap<K, V> = Arc<Mutex<HashMap<K, V>>>;
10
11/// Represents the collection of flows in the system (all the MPSC channels)
12#[derive(Debug, Clone)]
13pub struct Flows {
14    pub inputs_receivers: SharedMap<Uuid, MessageReceiver>,
15    pub outputs_senders: SharedMap<Uuid, Vec<MessageSender>>,
16
17    pub queries_senders: SharedMap<Uuid, MessageSender>, // other side is in 'queryables_receivers'
18    pub queries_receivers: SharedMap<Uuid, MessageReceiver>, // other side is in 'queryables_senders'
19
20    pub queryables_senders: SharedMap<Uuid, HashMap<Uuid, MessageSender>>, // receiver part in 'queries_receivers'
21    pub queryables_receivers: SharedMap<Uuid, MessageReceiver>, // sender part in 'queries_senders'
22}
23
24/// This is the builder struct available for the user closure when creating flows.
25pub struct FlowsBuilder {
26    pub layout: Arc<DataflowLayout>,
27
28    pub inputs_receivers: HashMap<Uuid, MessageReceiver>,
29    pub outputs_senders: HashMap<Uuid, Vec<MessageSender>>,
30
31    pub queries_senders: HashMap<Uuid, MessageSender>,
32    pub queries_receivers: HashMap<Uuid, MessageReceiver>,
33
34    pub queryables_senders: HashMap<Uuid, HashMap<Uuid, MessageSender>>,
35    pub queryables_receivers: HashMap<Uuid, MessageReceiver>,
36}
37
38impl Flows {
39    /// Creates a new Flows instance with a Building async closure
40    pub async fn new(
41        layout: Arc<DataflowLayout>,
42        flows: impl AsyncFnOnce(&mut FlowsBuilder) -> Result<()>,
43    ) -> Result<Self> {
44        let mut builder = FlowsBuilder::new(layout);
45
46        flows(&mut builder).await?;
47
48        Ok(Flows {
49            outputs_senders: Arc::new(Mutex::new(builder.outputs_senders)),
50            inputs_receivers: Arc::new(Mutex::new(builder.inputs_receivers)),
51            queries_senders: Arc::new(Mutex::new(builder.queries_senders)),
52            queries_receivers: Arc::new(Mutex::new(builder.queries_receivers)),
53            queryables_senders: Arc::new(Mutex::new(builder.queryables_senders)),
54            queryables_receivers: Arc::new(Mutex::new(builder.queryables_receivers)),
55        })
56    }
57
58    /// This is intended to be used only by the Runtime when loading nodes
59    pub fn node_io(
60        &self,
61        clock: Arc<HLC>,
62        source: NodeLayout,
63    ) -> (Inputs, Outputs, Queries, Queryables) {
64        let inputs = Inputs::new(self.inputs_receivers.clone(), source.clone());
65        let outputs = Outputs::new(self.outputs_senders.clone(), clock.clone(), source.clone());
66        let queries = Queries::new(
67            self.queries_senders.clone(),
68            self.queries_receivers.clone(),
69            clock.clone(),
70            source.clone(),
71        );
72        let queryables = Queryables::new(
73            self.queryables_senders.clone(),
74            self.queryables_receivers.clone(),
75            clock.clone(),
76            source.clone(),
77        );
78
79        (inputs, outputs, queries, queryables)
80    }
81}
82
83impl FlowsBuilder {
84    pub fn new(layout: Arc<DataflowLayout>) -> Self {
85        Self {
86            layout,
87            inputs_receivers: HashMap::new(),
88            outputs_senders: HashMap::new(),
89            queries_senders: HashMap::new(),
90            queries_receivers: HashMap::new(),
91            queryables_senders: HashMap::new(),
92            queryables_receivers: HashMap::new(),
93        }
94    }
95
96    /// Connect input to output (Output --> Input)
97    fn connect_input_output(&mut self, input: Uuid, output: Uuid, capacity: usize) -> Result<()> {
98        if !self.layout.inputs.contains(&input) {
99            eyre::bail!(
100                "Input '{}' (uuid: {}) not found in the dataflow layout created",
101                self.layout.label(input),
102                input
103            );
104        }
105
106        if !self.layout.outputs.contains(&output) {
107            eyre::bail!(
108                "Output '{}' (uuid: {}) not found in the dataflow layout created",
109                self.layout.label(output),
110                output
111            );
112        }
113
114        if self.inputs_receivers.contains_key(&input) {
115            eyre::bail!(
116                "Input '{}' (uuid: {}) already mapped",
117                self.layout.label(input),
118                input
119            );
120        }
121
122        let (sender, receiver) = tokio::sync::mpsc::channel(capacity);
123
124        self.outputs_senders.entry(output).or_default().push(sender);
125        self.inputs_receivers.insert(input, receiver);
126
127        tracing::debug!(
128            "Connecting input '{}' (uuid: {}) to output '{}' (uuid: {})",
129            self.layout.label(input),
130            input,
131            self.layout.label(output),
132            output
133        );
134
135        Ok(())
136    }
137
138    /// Connect query to queryable (Queryable --> Query)
139    fn connect_query_queryable(
140        &mut self,
141        query: Uuid,
142        queryable: Uuid,
143        capacity: usize,
144    ) -> Result<()> {
145        if !self.layout.queryables.contains(&queryable) || !self.layout.queries.contains(&query) {
146            eyre::bail!(
147                "Queryable '{}' (uuid: {}) not found in the dataflow layout created",
148                self.layout.label(queryable),
149                queryable
150            );
151        }
152
153        if !self.layout.queries.contains(&query) {
154            eyre::bail!(
155                "Query '{}' (uuid: {}) not found in the dataflow layout created",
156                self.layout.label(query),
157                query
158            );
159        }
160
161        if self.queries_senders.contains_key(&query) || self.queries_receivers.contains_key(&query)
162        {
163            eyre::bail!(
164                "Query '{}' (uuid: {}) is already connected",
165                self.layout.label(query),
166                query
167            );
168        }
169
170        if let Some(senders) = self.queryables_senders.get(&queryable) {
171            if senders.contains_key(&query) {
172                eyre::bail!(
173                    "Queryable '{}' (uuid: {}) already mapped",
174                    self.layout.label(queryable),
175                    queryable
176                );
177            }
178        }
179
180        if let std::collections::hash_map::Entry::Vacant(e) =
181            self.queryables_receivers.entry(queryable)
182        {
183            let (sender, receiver) = tokio::sync::mpsc::channel(capacity);
184
185            e.insert(receiver);
186            self.queries_senders.insert(query, sender);
187        } else {
188            let other_query = *match self.queryables_senders.get(&queryable) {
189                Some(senders) => senders.keys().next(),
190                None => None,
191            }
192            .ok_or_eyre(format!(
193                "Queryable '{}' (uuid: {}) is not well connected. This is a big error please report it",
194                self.layout.label(queryable),
195                queryable,
196            ))?;
197
198            let sender = self.queries_senders.get(&other_query).ok_or_eyre(format!(
199                "Query '{}' (uuid: {}) is not well connected. This is a big error please report it",
200                self.layout.label(other_query),
201                other_query,
202            ))?.clone();
203
204            self.queries_senders.insert(query, sender);
205        }
206
207        let (sender, receiver) = tokio::sync::mpsc::channel(capacity);
208        self.queries_receivers.insert(query, receiver);
209        self.queryables_senders
210            .entry(queryable)
211            .or_default()
212            .insert(query, sender);
213
214        tracing::debug!(
215            "Connecting query '{}' (uuid: {}) to queryable '{}' (uuid: {})",
216            self.layout.label(query),
217            query,
218            self.layout.label(queryable),
219            queryable
220        );
221
222        Ok(())
223    }
224
225    /// Connect two IO (a -> b or b -> a).
226    /// Acceptable format (a, b) in {Input, Output}
227    /// or (a, b) in {Query, Queryable} with a != b
228    pub fn connect(&mut self, a: IOLayout, b: IOLayout, capacity: Option<usize>) -> Result<()> {
229        match (a, b) {
230            (IOLayout::Input(input), IOLayout::Output(output)) => {
231                self.connect_input_output(input.uuid, output.uuid, capacity.unwrap_or(128))
232            }
233            (IOLayout::Query(query), IOLayout::Queryable(queryable)) => {
234                self.connect_query_queryable(query.uuid, queryable.uuid, capacity.unwrap_or(128))
235            }
236            (IOLayout::Output(output), IOLayout::Input(input)) => {
237                self.connect_input_output(input.uuid, output.uuid, capacity.unwrap_or(128))
238            }
239            (IOLayout::Queryable(queryable), IOLayout::Query(query)) => {
240                self.connect_query_queryable(query.uuid, queryable.uuid, capacity.unwrap_or(128))
241            }
242            _ => Err(eyre::eyre!(
243                "Invalid connection! types `a` and `b` must verify: a != b and (a, b) in {{Input, Output}} or {{Query, Queryable}}"
244            )),
245        }
246    }
247}