1use std::{marker::PhantomData, sync::Arc};
2
3use tokio::sync::{broadcast, mpsc, Mutex};
4
5use crate::{
6 addr::Addr,
7 envelope::Envelope,
8 traits::{Actor, EnvelopeApi},
9};
10
11pub struct Context<A: Actor>
13where
14 A: Actor<Context = Context<A>>,
15{
16 _p: PhantomData<A>,
17 stop_tx: broadcast::Sender<()>,
18}
19
20impl<A> Clone for Context<A>
21where
22 A: Actor<Context = Context<A>>,
23{
24 fn clone(&self) -> Self {
25 Context {
26 stop_tx: self.stop_tx.clone(),
27 _p: PhantomData,
28 }
29 }
30}
31
32impl<A> Context<A>
33where
34 A: Actor<Context = Context<A>>,
35{
36 pub fn new() -> Self {
38 let (stop_tx, _) = broadcast::channel(1);
39 Self {
40 _p: PhantomData,
41 stop_tx,
42 }
43 }
44
45 pub fn stop(&self) {
47 let _ = self.stop_tx.send(());
48 }
49
50 pub fn run(self, act: A) -> Addr<A>
53 where
54 A: Actor<Context = Self>,
55 {
56 let (tx, mut rx) = mpsc::channel::<Envelope<A>>(100);
57 let addr = Addr::new(tx);
58 let act_ref = Arc::new(Mutex::new(act));
59
60 tokio::spawn({
63 let a = act_ref.clone();
64 let mut stop_rx = self.stop_tx.subscribe();
65 let c = self.clone();
66 async move {
67 loop {
68 tokio::select! {
69 Some(mut msg) = rx.recv() => {
70 msg.handle(a.clone(), c.clone()).await;
71 },
72 Ok(_) = stop_rx.recv() => {
73 break;
74 }
75 }
76 }
77 a.lock().await.stopped().await;
78 }
79 });
80
81 tokio::spawn({
83 let a = act_ref.clone();
84 let mut stop_rx = self.stop_tx.subscribe();
85
86 async move {
87 let mutex = a.lock().await;
88 let fut = mutex.started();
89 tokio::select! {
90 _ = fut => {},
91 Ok(_) = stop_rx.recv() => {}
92 }
93 }
94 });
95 addr
96 }
97}