flarrow_runtime/
flows.rs

1use std::{collections::HashMap, sync::Arc};
2
3use tokio::sync::{
4    Mutex,
5    broadcast::{Receiver, Sender},
6};
7
8use crate::prelude::*;
9
10pub struct Flows {
11    pub senders: Arc<Mutex<HashMap<OutputID, Sender<DataflowMessage>>>>,
12    pub receivers: Arc<Mutex<HashMap<InputID, Receiver<DataflowMessage>>>>,
13}
14
15impl Flows {
16    pub async fn new(
17        layout: Arc<DataflowLayout>,
18        flows: impl AsyncFn(&mut Connector) -> Result<()>,
19    ) -> Result<Self> {
20        let mut connectors = Connector::new(layout)?;
21
22        flows(&mut connectors).await?;
23
24        Ok(Flows {
25            senders: Arc::new(Mutex::new(connectors.senders)),
26            receivers: Arc::new(Mutex::new(connectors.receivers)),
27        })
28    }
29}
30
31pub struct Connector {
32    layout: Arc<DataflowLayout>,
33
34    senders: HashMap<OutputID, Sender<DataflowMessage>>,
35    receivers: HashMap<InputID, Receiver<DataflowMessage>>,
36}
37
38impl Connector {
39    pub fn new(layout: Arc<DataflowLayout>) -> Result<Self> {
40        Ok(Self {
41            layout,
42            senders: HashMap::new(),
43            receivers: HashMap::new(),
44        })
45    }
46
47    pub fn connect(&mut self, input: InputID, output: OutputID) -> eyre::Result<&mut Self> {
48        if !self.layout.inputs.contains(&input) {
49            eyre::bail!("Input ID {} not found", input.0);
50        }
51
52        if !self.layout.outputs.contains(&output) {
53            eyre::bail!("Output ID {} not found", output.0);
54        }
55
56        if self.receivers.contains_key(&input) {
57            eyre::bail!("Input ID {} already mapped", input.0);
58        }
59
60        let receiver = match self.senders.get(&output) {
61            Some(sender) => sender.subscribe(),
62            None => {
63                let (sender, receiver) = tokio::sync::broadcast::channel(1024);
64                self.senders.insert(output, sender);
65                receiver
66            }
67        };
68
69        self.receivers.insert(input, receiver);
70
71        Ok(self)
72    }
73}