atomic_actor/
context.rs

1use std::sync::{Arc, Weak};
2
3use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
4
5use crate::{task::Task, Actor, Addr};
6
7/// The running context.
8///
9/// This is currently very simple, can only be used to get a new [`Addr`].
10pub struct Context<A: Actor> {
11    /// Only none during running.
12    actor: Option<A>,
13    sender: Weak<UnboundedSender<Box<dyn Task<A>>>>,
14    receiver: UnboundedReceiver<Box<dyn Task<A>>>,
15}
16
17impl<A: Actor> Context<A> {
18    pub(crate) fn start(actor: A) -> Addr<A> {
19        let (sender, receiver) = unbounded_channel();
20        let sender = Arc::new(sender);
21        let context = Context {
22            actor: Some(actor),
23            sender: Arc::downgrade(&sender),
24            receiver,
25        };
26        tokio::spawn(context.run());
27        Addr::new(sender)
28    }
29
30    async fn run(mut self) {
31        let mut actor = self.actor.take().unwrap();
32        while let Some(task) = self.receiver.recv().await {
33            actor = task.run(actor, &mut self).await;
34        }
35    }
36
37    /// Gets an [`Addr`] of the actor if it is still alive.
38    pub fn addr(&self) -> Option<Addr<A>> {
39        Weak::upgrade(&self.sender).map(Addr::new)
40    }
41}