1use std::{collections::HashMap, sync::Arc};
2
3use crate::prelude::*;
4
5use thirdparty::tokio::sync::Mutex;
6
7type SharedMap<K, V> = Arc<Mutex<HashMap<K, V>>>;
8type Senders = SharedMap<Uuid, Vec<MessageSender>>;
9
10pub struct Outputs {
12 senders: Senders,
13 clock: Arc<uhlc::HLC>,
14
15 source: NodeLayout,
16}
17
18impl Outputs {
19 pub fn new(senders: Senders, clock: Arc<uhlc::HLC>, source: NodeLayout) -> Self {
21 Self {
22 senders,
23 clock,
24 source,
25 }
26 }
27
28 async fn compute(
29 &mut self,
30 output: impl Into<String>,
31 ) -> Result<(Vec<MessageSender>, OutputLayout)> {
32 let label: String = output.into();
33 let layout = self.source.output(&label);
34
35 let senders = self
36 .senders
37 .lock()
38 .await
39 .remove(&layout.uuid)
40 .ok_or_eyre(report_io_not_found(&self.source, &layout))?;
41
42 Ok((senders, layout))
43 }
44
45 pub async fn raw(&mut self, output: impl Into<String>) -> Result<RawOutput> {
48 let (senders, layout) = self.compute(output).await?;
49
50 tracing::debug!(
51 "Creating new raw output '{}' (uuid: {}) for node '{}' (uuid: {})",
52 layout.label,
53 layout.uuid,
54 self.source.label,
55 self.source.uuid
56 );
57
58 Ok(RawOutput::new(
59 senders,
60 self.clock.clone(),
61 self.source.clone(),
62 layout,
63 ))
64 }
65
66 pub async fn with<T: ArrowMessage>(&mut self, output: impl Into<String>) -> Result<Output<T>> {
69 let (senders, layout) = self.compute(output).await?;
70
71 tracing::debug!(
72 "Creating new raw output '{}' (uuid: {}) for node '{}' (uuid: {})",
73 layout.label,
74 layout.uuid,
75 self.source.label,
76 self.source.uuid
77 );
78
79 Ok(Output::new(
80 senders,
81 self.clock.clone(),
82 self.source.clone(),
83 layout,
84 ))
85 }
86}