use std::{collections::HashMap, sync::Arc};
use crate::prelude::*;
use thirdparty::tokio::sync::Mutex;
type SharedMap<K, V> = Arc<Mutex<HashMap<K, V>>>;
type Senders = SharedMap<Uuid, Vec<MessageSender>>;
pub struct Outputs {
senders: Senders,
clock: Arc<uhlc::HLC>,
source: NodeLayout,
}
impl Outputs {
pub fn new(senders: Senders, clock: Arc<uhlc::HLC>, source: NodeLayout) -> Self {
Self {
senders,
clock,
source,
}
}
async fn compute(
&mut self,
output: impl Into<String>,
) -> Result<(Vec<MessageSender>, OutputLayout)> {
let label: String = output.into();
let layout = self.source.output(&label);
let senders = self
.senders
.lock()
.await
.remove(&layout.uuid)
.ok_or_eyre(report_io_not_found(&self.source, &layout))?;
Ok((senders, layout))
}
pub async fn raw(&mut self, output: impl Into<String>) -> Result<RawOutput> {
let (senders, layout) = self.compute(output).await?;
tracing::debug!(
"Creating new raw output '{}' (uuid: {}) for node '{}' (uuid: {})",
layout.label,
layout.uuid,
self.source.label,
self.source.uuid
);
Ok(RawOutput::new(
senders,
self.clock.clone(),
self.source.clone(),
layout,
))
}
pub async fn with<T: ArrowMessage>(&mut self, output: impl Into<String>) -> Result<Output<T>> {
let (senders, layout) = self.compute(output).await?;
tracing::debug!(
"Creating new raw output '{}' (uuid: {}) for node '{}' (uuid: {})",
layout.label,
layout.uuid,
self.source.label,
self.source.uuid
);
Ok(Output::new(
senders,
self.clock.clone(),
self.source.clone(),
layout,
))
}
}