use crate::ProcessMessage;
pub struct Emitter {
working: tokio::sync::mpsc::Sender<ProcessMessage>,
delivery: tokio::sync::mpsc::Sender<ProcessMessage>,
}
impl Emitter {
#[tracing::instrument(skip(self))]
pub(crate) async fn send_to_delivery(&self, message: ProcessMessage) -> std::io::Result<()> {
match self.delivery.send(message).await {
Ok(()) => Ok(()),
Err(_err) => Err(std::io::Error::from(std::io::ErrorKind::ConnectionAborted)),
}
}
#[tracing::instrument(skip(self))]
pub(crate) async fn send_to_working(&self, message: ProcessMessage) -> std::io::Result<()> {
match self.working.send(message).await {
Ok(()) => Ok(()),
Err(_err) => Err(std::io::Error::from(std::io::ErrorKind::ConnectionAborted)),
}
}
}
pub struct Receiver {
inner: tokio::sync::mpsc::Receiver<ProcessMessage>,
}
impl Receiver {
pub fn as_stream(&mut self) -> impl tokio_stream::Stream<Item = ProcessMessage> + '_ {
async_stream::stream! {
while let Some(message) = self.inner.recv().await {
yield message;
}
}
}
}
#[must_use]
pub fn init(
working_channel_size: usize,
delivery_channel_size: usize,
) -> (std::sync::Arc<Emitter>, Receiver, Receiver) {
let (working_tx, working_rx) = tokio::sync::mpsc::channel(working_channel_size);
let (delivery_tx, delivery_rx) = tokio::sync::mpsc::channel(delivery_channel_size);
(
std::sync::Arc::new(Emitter {
working: working_tx,
delivery: delivery_tx,
}),
Receiver { inner: working_rx },
Receiver { inner: delivery_rx },
)
}