hannibal 0.16.3

A small actor library
Documentation
use futures::{
    Stream as _,
    stream::{PollFn, poll_fn},
};

use std::{
    future::Future,
    pin::{Pin, pin},
    sync::{Arc, Weak},
    task,
};

use crate::{
    error::{ActorError, Result},
    event_loop::Payload,
};

trait TxFn<A>: Send + Sync {
    fn send(&self, msg: Payload<A>) -> Pin<Box<dyn Future<Output = Result<()>> + Send>>;
}

impl<F, A> TxFn<A> for F
where
    F: Fn(Payload<A>) -> Pin<Box<dyn Future<Output = Result<()>> + Send>>,
    F: Send + Sync,
{
    fn send(&self, msg: Payload<A>) -> Pin<Box<dyn Future<Output = Result<()>> + Send>> {
        self(msg)
    }
}

trait TryTxFn<A>: Send + Sync {
    fn try_send(&self, msg: Payload<A>) -> Result<()>;
}

impl<F, A> TryTxFn<A> for F
where
    F: Fn(Payload<A>) -> Result<()>,
    F: Send + Sync,
{
    fn try_send(&self, msg: Payload<A>) -> Result<()> {
        self(msg)
    }
}

trait ForceTxFn<A>: Send + Sync {
    fn send(&self, msg: Payload<A>) -> Result<()>;
}

impl<F, A> ForceTxFn<A> for F
where
    F: Fn(Payload<A>) -> Result<()>,
    F: Send + Sync,
{
    fn send(&self, msg: Payload<A>) -> Result<()> {
        self(msg)
    }
}

type PayloadStream<A> =
    PollFn<Box<dyn FnMut(&mut task::Context<'_>) -> task::Poll<Option<Payload<A>>> + Send>>;

pub(crate) struct Tx<A> {
    tx: Arc<dyn TxFn<A>>,
    try_tx: Arc<dyn TryTxFn<A>>,
    force_tx: Arc<dyn ForceTxFn<A>>,
}

impl<A> Clone for Tx<A> {
    fn clone(&self) -> Self {
        Self {
            tx: Arc::clone(&self.tx),
            try_tx: Arc::clone(&self.try_tx),
            force_tx: Arc::clone(&self.force_tx),
        }
    }
}

impl<A> Tx<A> {
    fn new(
        tx: Arc<dyn TxFn<A>>,
        try_tx: Arc<dyn TryTxFn<A>>,
        force_tx: Arc<dyn ForceTxFn<A>>,
    ) -> Self {
        Self {
            tx,
            try_tx,
            force_tx,
        }
    }

    pub fn downgrade(&self) -> WeakTx<A> {
        WeakTx {
            tx: Arc::downgrade(&self.tx),
            try_tx: Arc::downgrade(&self.try_tx),
            force_tx: Arc::downgrade(&self.force_tx),
        }
    }

    pub fn send(&self, msg: Payload<A>) -> Pin<Box<dyn Future<Output = Result<()>> + Send>> {
        self.tx.send(msg)
    }

    pub fn try_send(&self, msg: Payload<A>) -> Result<()> {
        self.try_tx.try_send(msg)
    }

    pub fn force_send(&self, msg: Payload<A>) -> Result<()> {
        self.force_tx.send(msg)
    }
}

pub(crate) struct WeakTx<A> {
    tx: Weak<dyn TxFn<A>>,
    try_tx: Weak<dyn TryTxFn<A>>,
    force_tx: Weak<dyn ForceTxFn<A>>,
}

impl<A> WeakTx<A> {
    pub fn upgrade(&self) -> Option<Tx<A>> {
        Some(Tx {
            tx: self.tx.upgrade()?,
            try_tx: self.try_tx.upgrade()?,
            force_tx: self.force_tx.upgrade()?,
        })
    }
}

impl<A> Clone for WeakTx<A> {
    fn clone(&self) -> Self {
        Self {
            tx: Weak::clone(&self.tx),
            try_tx: Weak::clone(&self.try_tx),
            force_tx: Weak::clone(&self.force_tx),
        }
    }
}

pub(crate) struct Rx<A> {
    stream: PayloadStream<A>,
}

impl<A> Rx<A> {
    pub(crate) fn new(stream: PayloadStream<A>) -> Self {
        Self { stream }
    }
}

impl<A> futures::Stream for Rx<A> {
    type Item = Payload<A>;

    fn poll_next(
        mut self: Pin<&mut Self>,
        cx: &mut task::Context<'_>,
    ) -> task::Poll<Option<Self::Item>> {
        let pinned = pin!(&mut self.stream);
        pinned.poll_next(cx)
    }
}

pub(crate) struct Channel<A> {
    tx: Tx<A>,
    rx: Rx<A>,
}

impl<A> Channel<A> {
    const fn new(tx: Tx<A>, rx: Rx<A>) -> Self {
        Channel { tx, rx }
    }
}

impl<A> Channel<A>
where
    for<'a> A: 'a,
{
    pub fn bounded(buffer: usize) -> Self {
        let (tx, mut rx) = futures::channel::mpsc::channel::<Payload<A>>(buffer);
        let tx2 = tx.clone();
        let tx3 = tx.clone();

        let send = Arc::new(
            move |event: Payload<A>| -> Pin<Box<dyn Future<Output = Result<()>> + Send>> {
                let tx = tx2.clone();
                Box::pin(async move {
                    let mut tx = tx.clone();
                    futures::SinkExt::send(&mut tx, event).await?;
                    Ok(())
                })
            },
        );

        let try_send = Arc::new(move |event: Payload<A>| -> Result<()> {
            let mut tx = tx3.clone();
            tx.try_send(event)
                .map_err(|_| crate::error::ActorError::ChannelClosed)?;
            Ok(())
        });

        let force_send = Arc::new(move |event: Payload<A>| -> Result<()> {
            let mut tx = tx.clone();
            tx.start_send(event)?;
            Ok(())
        });

        let recv: PayloadStream<A> = poll_fn(Box::new(move |ctx| {
            let pinned = pin!(&mut rx);
            pinned.poll_next(ctx)
        }));

        let tx = Tx::new(send, try_send, force_send);
        let rx = Rx::new(recv);

        Self::new(tx, rx)
    }

    pub fn unbounded() -> Self {
        let (tx, mut rx) = futures::channel::mpsc::unbounded::<Payload<A>>();
        let tx2 = tx.clone();
        let tx3 = tx.clone();

        let send = Arc::new(
            move |event: Payload<A>| -> Pin<Box<dyn Future<Output = Result<()>> + Send>> {
                let tx = tx2.clone();
                Box::pin(async move {
                    let mut tx = tx.clone();
                    futures::SinkExt::send(&mut tx, event).await?;
                    Ok(())
                })
            },
        );

        let try_send = Arc::new(move |event: Payload<A>| -> Result<()> {
            let try_tx = tx3.clone();
            try_tx
                .unbounded_send(event)
                .map_err(|_| ActorError::ChannelClosed)?;
            Ok(())
        });

        let force_send = Arc::new(move |event: Payload<A>| -> Result<()> {
            let mut tx_clone = tx.clone();
            tx_clone.start_send(event)?;
            Ok(())
        });

        let recv: PayloadStream<A> = poll_fn(Box::new(move |ctx| {
            let pinned = pin!(&mut rx);
            pinned.poll_next(ctx)
        }));

        let tx = Tx::new(send, try_send, force_send);
        let rx = Rx::new(recv);

        Self::new(tx, rx)
    }

    pub fn break_up(self) -> (Tx<A>, Rx<A>) {
        (self.tx, self.rx)
    }
}