tactix/
context.rs

1use std::{marker::PhantomData, sync::Arc};
2
3use tokio::sync::{broadcast, mpsc, Mutex};
4
5use crate::{
6    addr::Addr,
7    envelope::Envelope,
8    traits::{Actor, EnvelopeApi},
9};
10
11/// Methods concerned with the context of the given Actor
12pub struct Context<A: Actor>
13where
14    A: Actor<Context = Context<A>>,
15{
16    _p: PhantomData<A>,
17    stop_tx: broadcast::Sender<()>,
18}
19
20impl<A> Clone for Context<A>
21where
22    A: Actor<Context = Context<A>>,
23{
24    fn clone(&self) -> Self {
25        Context {
26            stop_tx: self.stop_tx.clone(),
27            _p: PhantomData,
28        }
29    }
30}
31
32impl<A> Context<A>
33where
34    A: Actor<Context = Context<A>>,
35{
36    /// Construct a new Context
37    pub fn new() -> Self {
38        let (stop_tx, _) = broadcast::channel(1);
39        Self {
40            _p: PhantomData,
41            stop_tx,
42        }
43    }
44
45    /// Stop the actor and any running actor processes
46    pub fn stop(&self) {
47        let _ = self.stop_tx.send(());
48    }
49
50    /// Setup a Mailbox for this Actor. Pull messages of the Mailbox and process them as the come.
51    /// In a separate thread run the started function.
52    pub fn run(self, act: A) -> Addr<A>
53    where
54        A: Actor<Context = Self>,
55    {
56        let (tx, mut rx) = mpsc::channel::<Envelope<A>>(100);
57        let addr = Addr::new(tx);
58        let act_ref = Arc::new(Mutex::new(act));
59
60        // Listen for events sent to the Actor and handle them.
61        // If a stop signal is received then stop listening.
62        tokio::spawn({
63            let a = act_ref.clone();
64            let mut stop_rx = self.stop_tx.subscribe();
65            let c = self.clone();
66            async move {
67                loop {
68                    tokio::select! {
69                        Some(mut msg) = rx.recv() => {
70                            msg.handle(a.clone(), c.clone()).await;
71                        },
72                        Ok(_) = stop_rx.recv() => {
73                            break;
74                        }
75                    }
76                }
77                a.lock().await.stopped().await;
78            }
79        });
80
81        // Run the started() method on the actor. If the stop signal is received then stop.
82        tokio::spawn({
83            let a = act_ref.clone();
84            let mut stop_rx = self.stop_tx.subscribe();
85
86            async move {
87                let mutex = a.lock().await;
88                let fut = mutex.started();
89                tokio::select! {
90                    _ = fut => {},
91                    Ok(_) = stop_rx.recv() => {}
92                }
93            }
94        });
95        addr
96    }
97}