coerce 0.3.1-prerelease3

Async actor runtime and distributed systems framework
use crate::remote::system::RemoteActorSystem;

use std::future::Future;
use std::io::Error;

use std::pin::Pin;
use std::task::{Context, Poll};

use crate::remote::net::codec::NetworkCodec;
use futures::StreamExt;
use tokio_util::codec::FramedRead;

pub mod client;
pub mod codec;
pub mod message;
pub mod proto;
pub mod server;

pub trait StreamMessage: 'static + Send + Sync + Sized {
    fn read_from_bytes(data: Vec<u8>) -> Option<Self>;

    fn write_to_bytes(&self) -> Option<Vec<u8>>;
}

#[async_trait]
pub trait StreamReceiver {
    type Message: StreamMessage;

    async fn on_recv(&mut self, msg: Self::Message, ctx: &mut RemoteActorSystem);

    async fn on_close(&mut self, ctx: &mut RemoteActorSystem);
}

pub struct StreamReceiverFuture<S: tokio::io::AsyncRead> {
    stream: FramedRead<S, NetworkCodec>,
    stop_rx: tokio::sync::oneshot::Receiver<bool>,
}

impl<S: tokio::io::AsyncRead> StreamReceiverFuture<S> {
    pub fn new(
        stream: FramedRead<S, NetworkCodec>,
        stop_rx: tokio::sync::oneshot::Receiver<bool>,
    ) -> StreamReceiverFuture<S> {
        StreamReceiverFuture { stream, stop_rx }
    }
}

impl<S: tokio::io::AsyncRead> tokio_stream::Stream for StreamReceiverFuture<S>
where
    S: Unpin,
{
    type Item = Option<Vec<u8>>;

    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Option<Vec<u8>>>> {
        if let Poll::Ready(Ok(true)) = Pin::new(&mut self.stop_rx).poll(cx) {
            return Poll::Ready(None);
        }

        let result: Option<Result<Vec<u8>, Error>> =
            futures::ready!(Pin::new(&mut self.stream).poll_next(cx));

        Poll::Ready(match result {
            Some(Ok(message)) => Some(Some(message)),
            Some(Err(e)) => {
                error!(target: "RemoteStream", "{:?}", e);
                Some(None)
            }
            None => None,
        })
    }
}

pub async fn receive_loop<R: StreamReceiver, S: tokio::io::AsyncRead + Unpin>(
    mut system: RemoteActorSystem,
    read: FramedRead<S, NetworkCodec>,
    stop_rx: tokio::sync::oneshot::Receiver<bool>,
    mut receiver: R,
) where
    R: Send,
{
    let mut fut = StreamReceiverFuture::new(read, stop_rx);
    while let Some(res) = fut.next().await {
        match res {
            Some(res) => match R::Message::read_from_bytes(res) {
                Some(msg) => receiver.on_recv(msg, &mut system).await,
                None => warn!(target: "RemoteReceive", "error decoding msg"),
            },
            None => {
                error!(target: "RemoteReceive", "error receiving msg");
                break;
            }
        }
    }

    trace!(target: "RemoteReceive", "closed");
    receiver.on_close(&mut system).await;
}