use futures::{FutureExt, Stream, ready};
use serde::{Deserialize, Serialize};
use std::{
error::Error,
fmt,
marker::PhantomData,
mem,
pin::Pin,
task::{Context, Poll},
};
use tokio_util::sync::ReusableBoxFuture;
use super::{
super::{
DEFAULT_MAX_ITEM_SIZE, RemoteSendError,
base::{self, PortDeserializer, PortSerializer},
},
Ref,
};
use crate::{RemoteSend, chmux, codec};
#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum RecvError {
RemoteReceive(base::RecvError),
RemoteConnect(chmux::ConnectError),
RemoteListen(chmux::ListenerError),
}
impl fmt::Display for RecvError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
Self::RemoteReceive(err) => write!(f, "receive error: {err}"),
Self::RemoteConnect(err) => write!(f, "connect error: {err}"),
Self::RemoteListen(err) => write!(f, "listen error: {err}"),
}
}
}
impl Error for RecvError {}
impl RecvError {
pub fn is_final(&self) -> bool {
match self {
Self::RemoteReceive(err) => err.is_final(),
Self::RemoteConnect(_) | Self::RemoteListen(_) => true,
}
}
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum ChangedError {
Closed,
}
impl ChangedError {
#[deprecated = "a remoc::rch::watch::ChangedError is always due to closure"]
pub fn is_closed(&self) -> bool {
true
}
}
impl fmt::Display for ChangedError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
Self::Closed => write!(f, "closed"),
}
}
}
impl Error for ChangedError {}
#[derive(Clone)]
pub struct Receiver<T, Codec = codec::Default, const MAX_ITEM_SIZE: usize = DEFAULT_MAX_ITEM_SIZE> {
rx: tokio::sync::watch::Receiver<Result<T, RecvError>>,
remote_send_err_tx: tokio::sync::mpsc::UnboundedSender<RemoteSendError>,
remote_max_item_size: Option<usize>,
_codec: PhantomData<Codec>,
}
impl<T, Codec, const MAX_ITEM_SIZE: usize> fmt::Debug for Receiver<T, Codec, MAX_ITEM_SIZE> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("Receiver").finish()
}
}
#[derive(Serialize, Deserialize)]
pub(crate) struct TransportedReceiver<T, Codec> {
port: u32,
data: Result<T, RecvError>,
codec: PhantomData<Codec>,
#[serde(default)]
max_item_size: u64,
}
impl<T, Codec, const MAX_ITEM_SIZE: usize> Receiver<T, Codec, MAX_ITEM_SIZE> {
pub(crate) fn new(
rx: tokio::sync::watch::Receiver<Result<T, RecvError>>,
remote_send_err_tx: tokio::sync::mpsc::UnboundedSender<RemoteSendError>,
remote_max_item_size: Option<usize>,
) -> Self {
Self { rx, remote_send_err_tx, remote_max_item_size, _codec: PhantomData }
}
pub fn borrow(&self) -> Result<Ref<'_, T>, RecvError> {
let ref_res = self.rx.borrow();
match &*ref_res {
Ok(_) => Ok(Ref(ref_res)),
Err(err) => Err(err.clone()),
}
}
pub fn borrow_and_update(&mut self) -> Result<Ref<'_, T>, RecvError> {
let ref_res = self.rx.borrow_and_update();
match &*ref_res {
Ok(_) => Ok(Ref(ref_res)),
Err(err) => Err(err.clone()),
}
}
pub async fn changed(&mut self) -> Result<(), ChangedError> {
self.rx.changed().await.map_err(|_| ChangedError::Closed)
}
pub fn max_item_size(&self) -> usize {
MAX_ITEM_SIZE
}
pub fn set_max_item_size<const NEW_MAX_ITEM_SIZE: usize>(mut self) -> Receiver<T, Codec, NEW_MAX_ITEM_SIZE> {
Receiver {
rx: mem::replace(
&mut self.rx,
tokio::sync::watch::channel(Err(RecvError::RemoteConnect(chmux::ConnectError::ChMux))).1,
),
remote_send_err_tx: self.remote_send_err_tx.clone(),
remote_max_item_size: self.remote_max_item_size,
_codec: PhantomData,
}
}
pub fn remote_max_item_size(&self) -> Option<usize> {
self.remote_max_item_size
}
}
impl<T, Codec, const MAX_ITEM_SIZE: usize> Drop for Receiver<T, Codec, MAX_ITEM_SIZE> {
fn drop(&mut self) {
}
}
impl<T, Codec, const MAX_ITEM_SIZE: usize> Serialize for Receiver<T, Codec, MAX_ITEM_SIZE>
where
T: RemoteSend + Sync + Clone,
Codec: codec::Codec,
{
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
let rx = self.rx.clone();
let remote_send_err_tx = self.remote_send_err_tx.clone();
let port = PortSerializer::connect(|connect| {
async move {
let (raw_tx, raw_rx) = match connect.await {
Ok(tx_rx) => tx_rx,
Err(err) => {
let _ = remote_send_err_tx.send(RemoteSendError::Connect(err));
return;
}
};
super::send_impl::<T, Codec>(rx, raw_tx, raw_rx, remote_send_err_tx, MAX_ITEM_SIZE).await;
}
.boxed()
})?;
let data = self.rx.borrow().clone();
let transported = TransportedReceiver::<T, Codec> {
port,
data,
max_item_size: self.max_item_size().try_into().unwrap_or(u64::MAX),
codec: PhantomData,
};
transported.serialize(serializer)
}
}
impl<'de, T, Codec, const MAX_ITEM_SIZE: usize> Deserialize<'de> for Receiver<T, Codec, MAX_ITEM_SIZE>
where
T: RemoteSend + Sync,
Codec: codec::Codec,
{
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
let TransportedReceiver { port, data, max_item_size, .. } =
TransportedReceiver::<T, Codec>::deserialize(deserializer)?;
let max_item_size = usize::try_from(max_item_size).unwrap_or(usize::MAX);
if max_item_size > MAX_ITEM_SIZE {
tracing::debug!(
"Watch receiver maximum item size is {MAX_ITEM_SIZE} bytes, \
but remote endpoint expects at least {max_item_size} bytes"
);
}
let (tx, rx) = tokio::sync::watch::channel(data);
let (remote_send_err_tx, remote_send_err_rx) = tokio::sync::mpsc::unbounded_channel();
PortDeserializer::accept(port, |local_port, request| {
async move {
let (raw_tx, raw_rx) = match request.accept_from(local_port).await {
Ok(tx_rx) => tx_rx,
Err(err) => {
let _ = tx.send(Err(RecvError::RemoteListen(err)));
return;
}
};
super::recv_impl::<T, Codec>(tx, raw_tx, raw_rx, remote_send_err_rx, None, MAX_ITEM_SIZE).await;
}
.boxed()
})?;
Ok(Self::new(rx, remote_send_err_tx, Some(max_item_size)))
}
}
pub struct ReceiverStream<T, Codec = codec::Default, const MAX_ITEM_SIZE: usize = DEFAULT_MAX_ITEM_SIZE> {
inner: ReusableBoxFuture<'static, (Result<(), ChangedError>, Receiver<T, Codec, MAX_ITEM_SIZE>)>,
}
impl<T, Codec, const MAX_ITEM_SIZE: usize> fmt::Debug for ReceiverStream<T, Codec, MAX_ITEM_SIZE> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("ReceiverStream").finish()
}
}
impl<T, Codec, const MAX_ITEM_SIZE: usize> ReceiverStream<T, Codec, MAX_ITEM_SIZE>
where
T: RemoteSend + Sync,
Codec: Send + 'static,
{
pub fn new(rx: Receiver<T, Codec, MAX_ITEM_SIZE>) -> Self {
Self { inner: ReusableBoxFuture::new(async move { (Ok(()), rx) }) }
}
async fn make_future(
mut rx: Receiver<T, Codec, MAX_ITEM_SIZE>,
) -> (Result<(), ChangedError>, Receiver<T, Codec, MAX_ITEM_SIZE>) {
let result = rx.changed().await;
(result, rx)
}
}
impl<T, Codec, const MAX_ITEM_SIZE: usize> Stream for ReceiverStream<T, Codec, MAX_ITEM_SIZE>
where
T: Clone + RemoteSend + Sync,
Codec: Send + 'static,
{
type Item = Result<T, RecvError>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
let (result, mut rx) = ready!(self.inner.poll(cx));
match result {
Ok(()) => {
let received = rx.borrow_and_update().map(|v| v.clone());
self.inner.set(Self::make_future(rx));
Poll::Ready(Some(received))
}
Err(_) => {
self.inner.set(Self::make_future(rx));
Poll::Ready(None)
}
}
}
}
impl<T, Codec, const MAX_ITEM_SIZE: usize> Unpin for ReceiverStream<T, Codec, MAX_ITEM_SIZE> {}
impl<T, Codec, const MAX_ITEM_SIZE: usize> From<Receiver<T, Codec, MAX_ITEM_SIZE>>
for ReceiverStream<T, Codec, MAX_ITEM_SIZE>
where
T: RemoteSend + Sync,
Codec: Send + 'static,
{
fn from(recv: Receiver<T, Codec, MAX_ITEM_SIZE>) -> Self {
Self::new(recv)
}
}