tiny-actor 0.0.1

A minimal actor framework for Rust
Documentation
use crate::*;
use concurrent_queue::PushError;
use event_listener::EventListener;
use futures::{Future, FutureExt};
use std::{
    pin::Pin,
    sync::Arc,
    task::{Context, Poll},
};

//------------------------------------------------------------------------------------------------
//  Address
//------------------------------------------------------------------------------------------------

pub struct Address<T> {
    channel: Arc<Channel<T>>,
    exit_listener: Option<EventListener>,
}

impl<T> Address<T> {
    pub(crate) fn from_channel(channel: Arc<Channel<T>>) -> Self {
        Self {
            channel,
            exit_listener: None,
        }
    }

    /// Try to send a message into the channel.
    pub fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> {
        Ok(self.channel.push_msg(msg)?)
    }

    /// Send a message into the channel.
    pub fn send(&self, msg: T) -> Snd<'_, T> {
        Snd::new(&self.channel, msg)
    }

    /// Send a message into the channel while blocking the scheduler.
    pub fn send_blocking(&self, mut msg: T) -> Result<(), Closed<T>> {
        loop {
            msg = match self.channel.push_msg(msg) {
                Ok(()) => {
                    return Ok(());
                }
                Err(PushError::Closed(msg)) => {
                    return Err(Closed(msg));
                }
                Err(PushError::Full(msg)) => msg,
            };

            self.channel.send_listener().wait();
        }
    }

    /// Close the channel.
    pub fn close(&self) -> bool {
        self.channel.close()
    }

    /// Halt all inboxes linked to the channel.
    pub fn halt_all(&self) {
        self.channel.halt_n(u32::MAX)
    }

    /// Halt n inboxes linked to the channel.
    pub fn halt_some(&self, n: u32) {
        self.channel.halt_n(n)
    }

    /// Get the amount of inboxes linked to the channel.
    pub fn inbox_count(&self) -> usize {
        self.channel.inbox_count()
    }

    /// Get the amount of messages linked to the channel.
    pub fn message_count(&self) -> usize {
        self.channel.message_count()
    }

    /// Get the amount of addresses linked to the channel.
    pub fn address_count(&self) -> usize {
        self.channel.address_count()
    }

    /// Whether the channel has been closed.
    pub fn is_closed(&self) -> bool {
        self.channel.is_closed()
    }

    /// Whether all inboxes linked to this channel have exited.
    pub fn has_exited(&self) -> bool {
        self.inbox_count() == 0
    }
}

impl<T> Unpin for Address<T> {}

impl<T> Future for Address<T> {
    type Output = ();

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        loop {
            if self.channel.inbox_count() == 0 {
                return Poll::Ready(());
            }

            if self.exit_listener.is_none() {
                self.exit_listener = Some(self.channel.exit_listener())
            }

            match self.exit_listener.as_mut().unwrap().poll_unpin(cx) {
                Poll::Ready(()) => self.exit_listener = None,
                Poll::Pending => return Poll::Pending,
            }
        }
    }
}

impl<A> Clone for Address<A> {
    fn clone(&self) -> Self {
        self.channel.add_address();
        Self {
            channel: self.channel.clone(),
            exit_listener: None,
        }
    }
}

impl<A> Drop for Address<A> {
    fn drop(&mut self) {
        self.channel.remove_address()
    }
}

//------------------------------------------------------------------------------------------------
//  Snd
//------------------------------------------------------------------------------------------------

pub struct Snd<'a, T> {
    channel: &'a Channel<T>,
    msg: Option<T>,
    listener: Option<EventListener>,
}

impl<'a, T> Snd<'a, T> {
    pub(crate) fn new(channel: &'a Channel<T>, msg: T) -> Self {
        Snd {
            channel,
            msg: Some(msg),
            listener: None,
        }
    }
}

impl<'a, T> Unpin for Snd<'a, T> {}

impl<'a, T> Future for Snd<'a, T> {
    type Output = Result<(), Closed<T>>;

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        let mut msg = self.msg.take().unwrap();

        loop {
            msg = match self.channel.push_msg(msg) {
                Ok(()) => {
                    return Poll::Ready(Ok(()));
                }
                Err(PushError::Closed(msg)) => {
                    return Poll::Ready(Err(Closed(msg)));
                }
                Err(PushError::Full(msg)) => msg,
            };

            if self.listener.is_none() {
                self.listener = Some(self.channel.send_listener())
            }

            match self.listener.as_mut().unwrap().poll_unpin(cx) {
                Poll::Ready(()) => self.listener = None,
                Poll::Pending => {
                    self.msg = Some(msg);
                    return Poll::Pending;
                }
            }
        }
    }
}

//------------------------------------------------------------------------------------------------
//  Errors
//------------------------------------------------------------------------------------------------

/// An error returned when attempting to send a message into a channel.
#[derive(Debug, Clone)]
pub enum TrySendError<T> {
    Closed(T),
    Full(T),
}

impl<T> From<PushError<T>> for TrySendError<T> {
    fn from(e: PushError<T>) -> Self {
        match e {
            PushError::Full(msg) => Self::Full(msg),
            PushError::Closed(msg) => Self::Closed(msg),
        }
    }
}

/// The channel has been closed.
#[derive(Debug, Clone)]
pub struct Closed<T>(T);