xactor 0.7.11

Xactor is a rust actors framework based on async-std
Documentation
use crate::addr::ActorEvent;
use crate::broker::{Subscribe, Unsubscribe};
use crate::runtime::{sleep, spawn};
use crate::{ActorId, Addr, Broker, Error, Handler, Message, Result, Service, StreamHandler};
use futures::channel::{mpsc, oneshot};
use futures::future::{AbortHandle, Abortable, Shared};
use futures::{Stream, StreamExt};
use once_cell::sync::OnceCell;
use slab::Slab;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Weak};
use std::time::Duration;

///An actor execution context.
pub struct Context<A> {
    actor_id: ActorId,
    tx: Weak<mpsc::UnboundedSender<ActorEvent<A>>>,
    pub(crate) rx_exit: Option<Shared<oneshot::Receiver<()>>>,
    pub(crate) streams: Slab<AbortHandle>,
    pub(crate) intervals: Slab<AbortHandle>,
}

impl<A> Context<A> {
    pub(crate) fn new(
        rx_exit: Option<Shared<oneshot::Receiver<()>>>,
    ) -> (
        Self,
        mpsc::UnboundedReceiver<ActorEvent<A>>,
        Arc<mpsc::UnboundedSender<ActorEvent<A>>>,
    ) {
        static ACTOR_ID: OnceCell<AtomicU64> = OnceCell::new();

        // Get an actor id
        let actor_id = ACTOR_ID
            .get_or_init(Default::default)
            .fetch_add(1, Ordering::Relaxed);

        let (tx, rx) = mpsc::unbounded::<ActorEvent<A>>();
        let tx = Arc::new(tx);
        let weak_tx = Arc::downgrade(&tx);
        (
            Self {
                actor_id,
                tx: weak_tx,
                rx_exit,
                streams: Default::default(),
                intervals: Default::default(),
            },
            rx,
            tx,
        )
    }

    /// Returns the address of the actor.
    pub fn address(&self) -> Addr<A> {
        Addr {
            actor_id: self.actor_id,
            // This getting unwrap panics
            tx: self.tx.upgrade().unwrap(),
            rx_exit: self.rx_exit.clone(),
        }
    }

    /// Returns the id of the actor.
    pub fn actor_id(&self) -> ActorId {
        self.actor_id
    }

    /// Stop the actor.
    pub fn stop(&self, err: Option<Error>) {
        if let Some(tx) = self.tx.upgrade() {
            mpsc::UnboundedSender::clone(&*tx)
                .start_send(ActorEvent::Stop(err))
                .ok();
        }
    }

    pub fn abort_intervals(&mut self) {
        for handle in self.intervals.drain() {
            handle.abort()
        }
    }

    pub fn abort_streams(&mut self) {
        for handle in self.streams.drain() {
            handle.abort();
        }
    }

    /// Create a stream handler for the actor.
    ///
    /// # Examples
    /// ```rust
    /// use xactor::*;
    /// use futures::stream;
    /// use std::time::Duration;
    ///
    /// #[message(result = "i32")]
    /// struct GetSum;
    ///
    /// #[derive(Default)]
    /// struct MyActor(i32);
    ///
    /// #[async_trait::async_trait]
    /// impl StreamHandler<i32> for MyActor {
    ///     async fn handle(&mut self, _ctx: &mut Context<Self>, msg: i32) {
    ///         self.0 += msg;
    ///     }
    ///
    ///     async fn started(&mut self, _ctx: &mut Context<Self>) {
    ///         println!("stream started");
    ///     }
    ///
    ///     async fn finished(&mut self, _ctx: &mut Context<Self>) {
    ///         println!("stream finished");
    ///     }
    /// }
    ///
    /// #[async_trait::async_trait]
    /// impl Handler<GetSum> for MyActor {
    ///     async fn handle(&mut self, _ctx: &mut Context<Self>, _msg: GetSum) -> i32 {
    ///         self.0
    ///     }
    /// }
    ///
    /// #[async_trait::async_trait]
    /// impl Actor for MyActor {
    ///     async fn started(&mut self, ctx: &mut Context<Self>) -> Result<()> {
    ///         let values = (0..100).collect::<Vec<_>>();
    ///         ctx.add_stream(stream::iter(values));
    ///         Ok(())
    ///     }
    /// }
    ///
    /// #[xactor::main]
    /// async fn main() -> Result<()> {
    ///     let mut addr = MyActor::start_default().await?;
    ///     sleep(Duration::from_secs(1)).await; // Wait for the stream to complete
    ///     let res = addr.call(GetSum).await?;
    ///     assert_eq!(res, (0..100).sum::<i32>());
    ///     Ok(())
    /// }
    /// ```
    /// ```
    pub fn add_stream<S>(&mut self, mut stream: S)
    where
        S: Stream + Unpin + Send + 'static,
        S::Item: 'static + Send,
        A: StreamHandler<S::Item>,
    {
        let tx = self.tx.clone();
        let entry = self.streams.vacant_entry();
        let id = entry.key();
        let (handle, registration) = futures::future::AbortHandle::new_pair();
        entry.insert(handle);

        let fut = {
            async move {
                if let Some(tx) = tx.upgrade() {
                    mpsc::UnboundedSender::clone(&*tx)
                        .start_send(ActorEvent::Exec(Box::new(move |actor, ctx| {
                            Box::pin(async move {
                                StreamHandler::started(actor, ctx).await;
                            })
                        })))
                        .ok();
                } else {
                    return;
                }

                while let Some(msg) = stream.next().await {
                    if let Some(tx) = tx.upgrade() {
                        let res = mpsc::UnboundedSender::clone(&*tx).start_send(ActorEvent::Exec(
                            Box::new(move |actor, ctx| {
                                Box::pin(async move {
                                    StreamHandler::handle(actor, ctx, msg).await;
                                })
                            }),
                        ));
                        if res.is_err() {
                            return;
                        }
                    } else {
                        return;
                    }
                }

                if let Some(tx) = tx.upgrade() {
                    mpsc::UnboundedSender::clone(&*tx)
                        .start_send(ActorEvent::Exec(Box::new(move |actor, ctx| {
                            Box::pin(async move {
                                StreamHandler::finished(actor, ctx).await;
                            })
                        })))
                        .ok();
                }

                if let Some(tx) = tx.upgrade() {
                    mpsc::UnboundedSender::clone(&*tx)
                        .start_send(ActorEvent::RemoveStream(id))
                        .ok();
                }
            }
        };
        spawn(Abortable::new(fut, registration));
    }

    /// Sends the message `msg` to self after a specified period of time.
    ///
    /// We use `Sender` instead of `Addr` so that the interval doesn't keep reference to address and prevent the actor from being dropped and stopped

    pub fn send_later<T>(&mut self, msg: T, after: Duration)
    where
        A: Handler<T>,
        T: Message<Result = ()>,
    {
        let sender = self.address().sender();
        let entry = self.intervals.vacant_entry();
        let (handle, registration) = futures::future::AbortHandle::new_pair();
        entry.insert(handle);

        spawn(Abortable::new(
            async move {
                sleep(after).await;
                sender.send(msg).ok();
            },
            registration,
        ));
    }

    /// Sends the message  to self, at a specified fixed interval.
    /// The message is created each time using a closure `f`.
    pub fn send_interval_with<T, F>(&mut self, f: F, dur: Duration)
    where
        A: Handler<T>,
        F: Fn() -> T + Sync + Send + 'static,
        T: Message<Result = ()>,
    {
        let sender = self.address().sender();

        let entry = self.intervals.vacant_entry();
        let (handle, registration) = futures::future::AbortHandle::new_pair();
        entry.insert(handle);

        spawn(Abortable::new(
            async move {
                loop {
                    sleep(dur).await;
                    if sender.send(f()).is_err() {
                        break;
                    }
                }
            },
            registration,
        ));
    }

    /// Sends the message `msg` to self, at a specified fixed interval.
    pub fn send_interval<T>(&mut self, msg: T, dur: Duration)
    where
        A: Handler<T>,
        T: Message<Result = ()> + Clone + Sync,
    {
        self.send_interval_with(move || msg.clone(), dur);
    }

    /// Subscribes to a message of a specified type.
    pub async fn subscribe<T: Message<Result = ()>>(&self) -> Result<()>
    where
        A: Handler<T>,
    {
        let broker = Broker::<T>::from_registry().await?;
        let sender = self.address().sender();
        broker
            .send(Subscribe {
                id: self.actor_id,
                sender,
            })
            .ok();
        Ok(())
    }

    /// Unsubscribe to a message of a specified type.
    pub async fn unsubscribe<T: Message<Result = ()>>(&self) -> Result<()> {
        let broker = Broker::<T>::from_registry().await?;
        broker.send(Unsubscribe { id: self.actor_id })
    }
}