1use std::{collections::HashMap, sync::Arc};
2
3use crate::prelude::*;
4
5use thirdparty::tokio::sync::Mutex;
6
7type SharedMap<K, V> = Arc<Mutex<HashMap<K, V>>>;
8type Receivers = SharedMap<Uuid, MessageReceiver>;
9
10#[derive(Debug)]
12pub struct Inputs {
13 receivers: Receivers,
14
15 source: NodeLayout,
16}
17
18impl Inputs {
19 pub fn new(receivers: Receivers, source: NodeLayout) -> Self {
21 tracing::debug!(
22 "Creating Inputs entry for node '{}' (uuid: {})",
23 source.label,
24 source.uuid
25 );
26
27 Self { receivers, source }
28 }
29
30 async fn compute(
31 &mut self,
32 input: impl Into<String>,
33 ) -> Result<(MessageReceiver, InputLayout)> {
34 let label: String = input.into();
35 let layout = self.source.input(&label);
36
37 let receiver = self
38 .receivers
39 .lock()
40 .await
41 .remove(&layout.uuid)
42 .ok_or_eyre(report_io_not_found(&self.source, &layout))?;
43
44 Ok((receiver, layout))
45 }
46
47 pub async fn raw(&mut self, input: impl Into<String>) -> Result<RawInput> {
50 let (receiver, layout) = self.compute(input).await?;
51
52 tracing::debug!(
53 "Creating new raw input '{}' (uuid: {}) for node '{}' (uuid: {})",
54 layout.label,
55 layout.uuid,
56 self.source.label,
57 self.source.uuid
58 );
59
60 Ok(RawInput::new(receiver, self.source.clone(), layout))
61 }
62
63 pub async fn with<T: ArrowMessage>(&mut self, input: impl Into<String>) -> Result<Input<T>> {
65 let (receiver, layout) = self.compute(input).await?;
66
67 tracing::debug!(
68 "Creating new input '{}' (uuid: {}) for node '{}' (uuid: {})",
69 layout.label,
70 layout.uuid,
71 self.source.label,
72 self.source.uuid
73 );
74
75 Ok(Input::new(receiver, self.source.clone(), layout))
76 }
77}