tokactor 2.1.0

A actor model framework wrapped around tokio
Documentation
use std::{future::Future, io, marker::PhantomData, pin::Pin, task::Poll};

use tokio::{
    net::{self, tcp::OwnedReadHalf, TcpStream, ToSocketAddrs},
    sync::{mpsc, oneshot},
};

use crate::{
    executor::{Executor, RawExecutor},
    Actor, Ask, AsyncAsk, Ctx, Message, TcpRequest,
};

use super::{
    create_reader_actor, create_writer_actor, Component, ComponentFuture, DataFrameReceiver,
    IoRead, Reader, Writer,
};

pub struct TcpAcceptFut<
    'a,
    RouterAct: Actor,
    ConnAct: Actor,
    Reader: IoRead<OwnedReadHalf> + Send,
    O: DataFrameReceiver<Frame = Reader>,
> {
    listener: &'a net::TcpListener,
    executor: &'a mut RawExecutor<RouterAct>,
    _actor: PhantomData<ConnAct>,
    _reader: PhantomData<Reader>,
    _payload: PhantomData<O>,
}

impl<'a, O, RouterAct, ConnAct, Reader> Unpin for TcpAcceptFut<'a, RouterAct, ConnAct, Reader, O>
where
    RouterAct: Actor + Ask<TcpRequest, Result = ConnAct>,
    ConnAct: Actor,
    Reader: IoRead<OwnedReadHalf> + Default + Send + 'static,
    O: DataFrameReceiver<Frame = Reader>,
{
}

impl<'a, O, RouterAct, ConnAct, Reader> Future for TcpAcceptFut<'a, RouterAct, ConnAct, Reader, O>
where
    RouterAct: Actor + Ask<TcpRequest, Result = ConnAct>,
    ConnAct: Actor,
    Reader: IoRead<OwnedReadHalf> + Default + Send + 'static,
    O: DataFrameReceiver<Frame = Reader>,
{
    type Output = io::Result<(crate::io::Reader<Reader, O::Request>, ConnAct)>;

    fn poll(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
        if let Poll::Ready(result) = self.listener.poll_accept(cx) {
            let result = match result {
                Ok((stream, address)) => {
                    let this = self.get_mut();
                    let (read, write) = this
                        .executor
                        .with_ctx(move |ctx| tcp_actors::<RouterAct, Reader, O>(ctx, stream));
                    let request = TcpRequest(write, address);
                    let actor = this.executor.ask(request);
                    Ok((read, actor))
                }
                Err(err) => Err(err),
            };
            Poll::Ready(result)
        } else {
            Poll::Pending
        }
    }
}

pub struct TcpListener<
    P: Actor,
    A: Actor,
    R: IoRead<OwnedReadHalf>,
    O: DataFrameReceiver<Frame = R>,
> {
    executor: RawExecutor<P>,
    listener: net::TcpListener,
    _actor: PhantomData<A>,
    _reader: PhantomData<R>,
    _payload: PhantomData<O>,
}

impl<'a, P: Actor, A: Actor, R: IoRead<OwnedReadHalf>, O: DataFrameReceiver<Frame = R>>
    TcpListener<P, A, R, O>
{
    pub async fn new(
        address: impl ToSocketAddrs,
        parent: P,
    ) -> io::Result<TcpListener<P, A, R, O>> {
        let listener = net::TcpListener::bind(address).await?;
        let mut executor = Executor::new(parent, Ctx::<P>::new()).into_raw();
        executor.raw_start();
        Ok(Self {
            executor,
            listener,
            _actor: PhantomData,
            _reader: PhantomData,
            _payload: PhantomData,
        })
    }
}

impl<P, A, R, O> ComponentFuture for TcpListener<P, A, R, O>
where
    P: Actor + Ask<TcpRequest, Result = A>,
    A: Actor + AsyncAsk<O::Request>,
    R: IoRead<OwnedReadHalf> + Default + Message + std::fmt::Debug + Send + Sync + 'static,
    O: DataFrameReceiver<Frame = R>,
{
    type Payload = O;
    type Reader = crate::io::Reader<R, O::Request>;
    type Actor = A;
    type Error = std::io::Error;
    type Future<'a> = TcpAcceptFut<'a, P, A, R, O>;
}

impl<P, A, R, O> Component for TcpListener<P, A, R, O>
where
    P: Actor + Ask<TcpRequest, Result = A>,
    A: Actor + AsyncAsk<O::Request>,
    R: IoRead<OwnedReadHalf> + Default + Message + std::fmt::Debug + Send + Sync + 'static,
    O: DataFrameReceiver<Frame = R>,
{
    type Shutdown = Pin<Box<dyn Future<Output = ()> + Send + Sync + 'static>>;

    #[allow(clippy::needless_lifetimes)]
    fn accept<'a>(&'a mut self) -> Self::Future<'a> {
        TcpAcceptFut {
            listener: &self.listener,
            executor: &mut self.executor,
            _actor: PhantomData,
            _reader: PhantomData,
            _payload: PhantomData,
        }
    }

    fn shutdown(self) -> Self::Shutdown {
        Box::pin(async move { self.executor.raw_shutdown().await })
    }
}

fn tcp_actors<
    A: Actor,
    R: IoRead<OwnedReadHalf> + Default + Send + 'static,
    Payload: DataFrameReceiver<Frame = R>,
>(
    ctx: &Ctx<A>,
    stream: TcpStream,
) -> (Reader<R, Payload::Request>, Writer) {
    let (read, write) = stream.into_split();
    let (reader_tx, reader_rx) =
        mpsc::channel::<(R, oneshot::Sender<std::io::Result<Payload::Request>>)>(10);
    let (writer_tx, writer_rx) =
        mpsc::channel::<(Vec<u8>, oneshot::Sender<std::io::Result<()>>)>(10);
    let (shutdown_tx, shutdown_rx) = oneshot::channel();

    let parent_rx = ctx.notifier.subscribe();
    tokio::spawn(create_reader_actor::<OwnedReadHalf, R, Payload>(
        read,
        reader_rx,
        parent_rx,
        shutdown_tx,
    ));

    let parent_rx = ctx.notifier.subscribe();
    tokio::spawn(create_writer_actor(
        write,
        writer_rx,
        parent_rx,
        shutdown_rx,
    ));

    let reader = Reader::<R, Payload::Request>::new(reader_tx);
    let writer = Writer::new(writer_tx);

    (reader, writer)
}