use std::sync::Arc;
use crate::{
communication::{RecvEndpoint, TryRecvError},
dataflow::{Data, Message},
};
use super::{
errors::{ReadError, TryReadError},
StreamId,
};
pub struct ReadStream<D: Data> {
id: StreamId,
name: String,
is_closed: bool,
recv_endpoint: Option<RecvEndpoint<Arc<Message<D>>>>,
}
impl<D: Data> ReadStream<D> {
pub(crate) fn new(
id: StreamId,
name: &str,
recv_endpoint: Option<RecvEndpoint<Arc<Message<D>>>>,
) -> Self {
Self {
id,
name: name.to_string(),
is_closed: false,
recv_endpoint,
}
}
pub fn is_closed(&self) -> bool {
self.is_closed
}
pub fn try_read(&mut self) -> Result<Message<D>, TryReadError> {
if self.is_closed {
return Err(TryReadError::Closed);
}
let result = self
.recv_endpoint
.as_mut()
.map_or(Err(TryReadError::Disconnected), |rx| {
rx.try_read()
.map(|msg| Message::clone(&msg))
.map_err(TryReadError::from)
});
if result
.as_ref()
.map(Message::is_top_watermark)
.unwrap_or(false)
{
self.is_closed = true;
self.recv_endpoint = None;
}
result
}
pub fn read(&mut self) -> Result<Message<D>, ReadError> {
if self.is_closed {
return Err(ReadError::Closed);
}
let result = self
.recv_endpoint
.as_mut()
.map_or(Err(ReadError::Disconnected), |rx| loop {
match rx.try_read() {
Ok(msg) => {
break Ok(Message::clone(&msg));
}
Err(TryRecvError::Empty) => (),
Err(TryRecvError::Disconnected) => {
break Err(ReadError::Disconnected);
}
Err(TryRecvError::BincodeError(_)) => {
break Err(ReadError::SerializationError);
}
}
});
if result
.as_ref()
.map(Message::is_top_watermark)
.unwrap_or(false)
{
self.is_closed = true;
self.recv_endpoint = None;
}
result
}
pub(crate) async fn async_read(&mut self) -> Result<Arc<Message<D>>, ReadError> {
if self.is_closed {
return Err(ReadError::Closed);
}
match self.recv_endpoint.as_mut() {
Some(endpoint) => match endpoint.read().await {
Ok(msg) => Ok(msg),
_ => Err(ReadError::Disconnected),
},
None => Err(ReadError::Disconnected),
}
}
pub fn id(&self) -> StreamId {
self.id
}
pub fn name(&self) -> String {
self.name.clone()
}
}
unsafe impl<T: Data> Send for ReadStream<T> {}
unsafe impl<T: Data> Sync for ReadStream<T> {}