iridis_node/primitives/
inputs.rs1use std::{collections::HashMap, sync::Arc};
4
5use crate::prelude::{thirdparty::tokio::sync::Mutex, *};
6
7type SharedMap<K, V> = Arc<Mutex<HashMap<K, V>>>;
8type Receivers = SharedMap<Uuid, MessageReceiver>;
9
10#[derive(Debug)]
12pub struct Inputs {
13 receivers: Receivers,
14
15 source: NodeID,
16}
17
18impl Inputs {
19 pub fn new(receivers: Receivers, source: NodeID) -> Self {
21 tracing::debug!(
22 "Creating Inputs entry for node '{}' (uuid: {})",
23 source.label,
24 source.uuid
25 );
26
27 Self { receivers, source }
28 }
29
30 async fn compute(&mut self, input: impl Into<String>) -> Result<(MessageReceiver, InputID)> {
31 let label: String = input.into();
32 let layout = self.source.input(&label);
33
34 let receiver = self
35 .receivers
36 .lock()
37 .await
38 .remove(&layout.uuid)
39 .ok_or_eyre(report_io_not_found(&self.source, &layout))?;
40
41 Ok((receiver, layout))
42 }
43
44 pub async fn raw(&mut self, input: impl Into<String>) -> Result<RawInput> {
47 let (receiver, layout) = self.compute(input).await?;
48
49 tracing::debug!(
50 "Creating new raw input '{}' (uuid: {}) for node '{}' (uuid: {})",
51 layout.label,
52 layout.uuid,
53 self.source.label,
54 self.source.uuid
55 );
56
57 Ok(RawInput::new(receiver, self.source.clone(), layout))
58 }
59
60 pub async fn with<T: ArrowMessage>(&mut self, input: impl Into<String>) -> Result<Input<T>> {
62 let (receiver, layout) = self.compute(input).await?;
63
64 tracing::debug!(
65 "Creating new input '{}' (uuid: {}) for node '{}' (uuid: {})",
66 layout.label,
67 layout.uuid,
68 self.source.label,
69 self.source.uuid
70 );
71
72 Ok(Input::new(receiver, self.source.clone(), layout))
73 }
74}