use {
super::{
super::{Datum, StreamId},
builder::ProducerConfig,
worker::{Handle, WorkerLoop},
},
crate::{discovery::Discovery, network::LocalNode, primitives::ShortFmtExt},
dashmap::{DashMap, Entry},
derive_more::{Deref, From, Into},
std::sync::Arc,
};
pub(in crate::streams) struct Sinks {
pub local: LocalNode,
pub discovery: Discovery,
pub active: Arc<DashMap<StreamId, SinkHandle>>,
}
impl Sinks {
pub(in crate::streams) fn new(
local: LocalNode,
discovery: Discovery,
) -> Self {
Self {
local,
discovery,
active: Arc::new(DashMap::new()),
}
}
}
impl Sinks {
pub fn create<D: Datum>(
&self,
config: ProducerConfig,
) -> Result<SinkHandle, SinkHandle> {
let stream_id = config.stream_id;
match self.active.entry(stream_id) {
Entry::Vacant(entry) => {
let sink = WorkerLoop::<D>::spawn(self, config);
let handle = entry.insert(sink.into()).clone();
let labels = [("network", self.local.network_id().short().to_string())];
metrics::gauge!("mosaik.streams.producers.active", &labels)
.increment(1.0);
self
.discovery
.update_local_entry(move |me| me.add_streams(stream_id));
Ok(handle)
}
Entry::Occupied(entry) => Err(entry.get().clone()),
}
}
pub fn open(&self, stream_id: StreamId) -> Option<SinkHandle> {
self.active.get(&stream_id).map(|entry| entry.clone())
}
}
impl core::fmt::Debug for Sinks {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
write!(f, "Sinks({} streams)", self.active.len())
}
}
#[derive(Clone, Deref, From, Into)]
pub(in crate::streams) struct SinkHandle(Arc<Handle>);
impl From<Handle> for SinkHandle {
fn from(handle: Handle) -> Self {
Self(Arc::new(handle))
}
}