use std::{fmt::Debug, sync::Arc};
use futures::FutureExt;
use tokio::{sync::mpsc, task::unconstrained};
use crate::{
communication::{CommunicationError, InterProcessMessage, Serializable, TryRecvError},
dataflow::stream::StreamId,
};
#[derive(Clone)]
pub enum SendEndpoint<D: Clone + Send + Debug> {
InterThread(mpsc::UnboundedSender<D>),
InterProcess(StreamId, mpsc::UnboundedSender<InterProcessMessage>),
}
impl<D: 'static + Serializable + Send + Sync + Debug> SendEndpoint<Arc<D>> {
pub fn send(&mut self, msg: Arc<D>) -> Result<(), CommunicationError> {
match self {
Self::InterThread(sender) => sender.send(msg).map_err(CommunicationError::from),
Self::InterProcess(stream_id, sender) => sender
.send(InterProcessMessage::new_deserialized(msg, *stream_id))
.map_err(CommunicationError::from),
}
}
}
pub enum RecvEndpoint<D: Clone + Send + Debug> {
InterThread(mpsc::UnboundedReceiver<D>),
}
impl<D: Clone + Send + Debug> RecvEndpoint<D> {
pub async fn read(&mut self) -> Result<D, CommunicationError> {
match self {
Self::InterThread(receiver) => receiver
.recv()
.await
.ok_or(CommunicationError::Disconnected),
}
}
pub fn try_read(&mut self) -> Result<D, TryRecvError> {
match self {
Self::InterThread(rx) => match unconstrained(rx.recv()).now_or_never() {
Some(Some(msg)) => Ok(msg),
Some(None) => Err(TryRecvError::Disconnected),
None => Err(TryRecvError::Empty),
},
}
}
}