use std::{collections::HashMap, sync::Arc};
use crate::prelude::*;
use thirdparty::tokio::sync::Mutex;
type SharedMap<K, V> = Arc<Mutex<HashMap<K, V>>>;
type Receivers = SharedMap<Uuid, MessageReceiver>;
#[derive(Debug)]
pub struct Inputs {
receivers: Receivers,
source: NodeLayout,
}
impl Inputs {
pub fn new(receivers: Receivers, source: NodeLayout) -> Self {
tracing::debug!(
"Creating Inputs entry for node '{}' (uuid: {})",
source.label,
source.uuid
);
Self { receivers, source }
}
async fn compute(
&mut self,
input: impl Into<String>,
) -> Result<(MessageReceiver, InputLayout)> {
let label: String = input.into();
let layout = self.source.input(&label);
let receiver = self
.receivers
.lock()
.await
.remove(&layout.uuid)
.ok_or_eyre(report_io_not_found(&self.source, &layout))?;
Ok((receiver, layout))
}
pub async fn raw(&mut self, input: impl Into<String>) -> Result<RawInput> {
let (receiver, layout) = self.compute(input).await?;
tracing::debug!(
"Creating new raw input '{}' (uuid: {}) for node '{}' (uuid: {})",
layout.label,
layout.uuid,
self.source.label,
self.source.uuid
);
Ok(RawInput::new(receiver, self.source.clone(), layout))
}
pub async fn with<T: ArrowMessage>(&mut self, input: impl Into<String>) -> Result<Input<T>> {
let (receiver, layout) = self.compute(input).await?;
tracing::debug!(
"Creating new input '{}' (uuid: {}) for node '{}' (uuid: {})",
layout.label,
layout.uuid,
self.source.label,
self.source.uuid
);
Ok(Input::new(receiver, self.source.clone(), layout))
}
}