1use std::{collections::HashMap, sync::Arc};
2
3use tokio::sync::{
4 Mutex,
5 broadcast::{Receiver, Sender},
6};
7
8use crate::prelude::*;
9
10pub struct Flows {
11 pub senders: Arc<Mutex<HashMap<OutputID, Sender<DataflowMessage>>>>,
12 pub receivers: Arc<Mutex<HashMap<InputID, Receiver<DataflowMessage>>>>,
13}
14
15impl Flows {
16 pub async fn new(
17 layout: Arc<DataflowLayout>,
18 flows: impl AsyncFn(&mut Connector) -> Result<()>,
19 ) -> Result<Self> {
20 let mut connectors = Connector::new(layout)?;
21
22 flows(&mut connectors).await?;
23
24 Ok(Flows {
25 senders: Arc::new(Mutex::new(connectors.senders)),
26 receivers: Arc::new(Mutex::new(connectors.receivers)),
27 })
28 }
29}
30
31pub struct Connector {
32 layout: Arc<DataflowLayout>,
33
34 senders: HashMap<OutputID, Sender<DataflowMessage>>,
35 receivers: HashMap<InputID, Receiver<DataflowMessage>>,
36}
37
38impl Connector {
39 pub fn new(layout: Arc<DataflowLayout>) -> Result<Self> {
40 Ok(Self {
41 layout,
42 senders: HashMap::new(),
43 receivers: HashMap::new(),
44 })
45 }
46
47 pub fn connect(&mut self, input: InputID, output: OutputID) -> eyre::Result<&mut Self> {
48 if !self.layout.inputs.contains(&input) {
49 eyre::bail!("Input ID {} not found", input.0);
50 }
51
52 if !self.layout.outputs.contains(&output) {
53 eyre::bail!("Output ID {} not found", output.0);
54 }
55
56 if self.receivers.contains_key(&input) {
57 eyre::bail!("Input ID {} already mapped", input.0);
58 }
59
60 let receiver = match self.senders.get(&output) {
61 Some(sender) => sender.subscribe(),
62 None => {
63 let (sender, receiver) = tokio::sync::broadcast::channel(1024);
64 self.senders.insert(output, sender);
65 receiver
66 }
67 };
68
69 self.receivers.insert(input, receiver);
70
71 Ok(self)
72 }
73}