iridis_api/io/
outputs.rs

1use std::{collections::HashMap, sync::Arc};
2
3use crate::prelude::*;
4
5use thirdparty::tokio::sync::Mutex;
6
7type SharedMap<K, V> = Arc<Mutex<HashMap<K, V>>>;
8type Senders = SharedMap<Uuid, Vec<MessageSender>>;
9
10/// Outputs let you manage output connections during a node *implementation*
11pub struct Outputs {
12    senders: Senders,
13    clock: Arc<uhlc::HLC>,
14
15    source: NodeLayout,
16}
17
18impl Outputs {
19    /// Creates a new instance of `Outputs`
20    pub fn new(senders: Senders, clock: Arc<uhlc::HLC>, source: NodeLayout) -> Self {
21        Self {
22            senders,
23            clock,
24            source,
25        }
26    }
27
28    async fn compute(
29        &mut self,
30        output: impl Into<String>,
31    ) -> Result<(Vec<MessageSender>, OutputLayout)> {
32        let label: String = output.into();
33        let layout = self.source.output(&label);
34
35        let senders = self
36            .senders
37            .lock()
38            .await
39            .remove(&layout.uuid)
40            .ok_or_eyre(report_io_not_found(&self.source, &layout))?;
41
42        Ok((senders, layout))
43    }
44
45    /// Creates a new raw Output, this raw output has no type information so you have
46    /// to manually transform it
47    pub async fn raw(&mut self, output: impl Into<String>) -> Result<RawOutput> {
48        let (senders, layout) = self.compute(output).await?;
49
50        tracing::debug!(
51            "Creating new raw output '{}' (uuid: {}) for node '{}' (uuid: {})",
52            layout.label,
53            layout.uuid,
54            self.source.label,
55            self.source.uuid
56        );
57
58        Ok(RawOutput::new(
59            senders,
60            self.clock.clone(),
61            self.source.clone(),
62            layout,
63        ))
64    }
65
66    /// Creates a new typed Output, this output has type information so you don't have
67    /// to manually transform it
68    pub async fn with<T: ArrowMessage>(&mut self, output: impl Into<String>) -> Result<Output<T>> {
69        let (senders, layout) = self.compute(output).await?;
70
71        tracing::debug!(
72            "Creating new raw output '{}' (uuid: {}) for node '{}' (uuid: {})",
73            layout.label,
74            layout.uuid,
75            self.source.label,
76            self.source.uuid
77        );
78
79        Ok(Output::new(
80            senders,
81            self.clock.clone(),
82            self.source.clone(),
83            layout,
84        ))
85    }
86}