flarrow_api/io/
inputs.rs

1use std::{collections::HashMap, sync::Arc};
2
3use crate::prelude::*;
4
5use thirdparty::tokio::sync::Mutex;
6
7type SharedMap<K, V> = Arc<Mutex<HashMap<K, V>>>;
8type Receivers = SharedMap<Uuid, MessageReceiver>;
9
10/// Inputs let you manage input connections during a node *implementation*
11#[derive(Debug)]
12pub struct Inputs {
13    receivers: Receivers,
14
15    source: NodeLayout,
16}
17
18impl Inputs {
19    /// Creates a new Inputs instance.
20    pub fn new(receivers: Receivers, source: NodeLayout) -> Self {
21        tracing::debug!(
22            "Creating Inputs entry for node '{}' (uuid: {})",
23            source.label,
24            source.uuid
25        );
26
27        Self { receivers, source }
28    }
29
30    async fn compute(
31        &mut self,
32        input: impl Into<String>,
33    ) -> Result<(MessageReceiver, InputLayout)> {
34        let label: String = input.into();
35        let layout = self.source.input(&label);
36
37        let receiver = self
38            .receivers
39            .lock()
40            .await
41            .remove(&layout.uuid)
42            .ok_or_eyre(report_io_not_found(&self.source, &layout))?;
43
44        Ok((receiver, layout))
45    }
46
47    /// Creates a new raw Input, this raw input has no type information so you have
48    /// to manually transform it
49    pub async fn raw(&mut self, input: impl Into<String>) -> Result<RawInput> {
50        let (receiver, layout) = self.compute(input).await?;
51
52        tracing::debug!(
53            "Creating new raw input '{}' (uuid: {}) for node '{}' (uuid: {})",
54            layout.label,
55            layout.uuid,
56            self.source.label,
57            self.source.uuid
58        );
59
60        Ok(RawInput::new(receiver, self.source.clone(), layout))
61    }
62
63    /// Creates a new Input, this input has type information so it can be directly transformed
64    pub async fn with<T: ArrowMessage>(&mut self, input: impl Into<String>) -> Result<Input<T>> {
65        let (receiver, layout) = self.compute(input).await?;
66
67        tracing::debug!(
68            "Creating new input '{}' (uuid: {}) for node '{}' (uuid: {})",
69            layout.label,
70            layout.uuid,
71            self.source.label,
72            self.source.uuid
73        );
74
75        Ok(Input::new(receiver, self.source.clone(), layout))
76    }
77}