agner_actors/
context.rs

1use std::any::{Any, TypeId};
2use std::collections::HashMap;
3
4use futures::Future;
5
6use crate::actor_id::ActorID;
7use crate::actor_runner::call_msg::CallMsg;
8use crate::actor_runner::pipe::{PipeRx, PipeTx};
9use crate::exit::Exit;
10use crate::imports::Never;
11use crate::system::{System, SystemWeakRef};
12
13/// Actor's API to itself
14#[derive(Debug)]
15pub struct Context<M> {
16    actor_id: ActorID,
17    system: SystemWeakRef,
18    messages: PipeRx<M>,
19    signals: PipeRx<Signal>,
20    calls: PipeTx<CallMsg<M>>,
21    data: HashMap<TypeId, Box<dyn Any + Send + Sync + 'static>>,
22}
23
24/// Either a Message or a [`Signal`](crate::context::Signal) received by an actor.
25///
26/// Note: only actors that ["trap exits"](crate::context::Context::trap_exit) can handle signals.
27#[derive(Debug)]
28pub enum Event<M> {
29    Message(M),
30    Signal(Signal),
31}
32
33/// A signal received by an actor.
34///
35/// Note: only actors that ["trap exits"](crate::context::Context::trap_exit) can handle signals.
36#[derive(Debug)]
37pub enum Signal {
38    Exit(ActorID, Exit),
39}
40
41impl<M> Context<M> {
42    /// Get current actor's [`ActorID`]
43    pub fn actor_id(&self) -> ActorID {
44        self.actor_id
45    }
46
47    /// Get the [`System`] this actor is running in.
48    pub fn system(&self) -> System {
49        self.system.rc_upgrade().expect("System gone")
50    }
51
52    /// Receive next event (message or signal)
53    pub async fn next_event(&mut self) -> Event<M>
54    where
55        M: Unpin,
56    {
57        tokio::select! {
58            biased;
59
60            signal = self.signals.recv() =>
61                Event::Signal(signal),
62            message = self.messages.recv() =>
63                Event::Message(message),
64        }
65    }
66
67    /// Receive next message.
68    pub async fn next_message(&mut self) -> M
69    where
70        M: Unpin,
71    {
72        self.messages.recv().await
73    }
74
75    /// Receive next signal.
76    pub async fn next_signal(&mut self) -> Signal {
77        self.signals.recv().await
78    }
79
80    /// Exit with the provided reason
81    pub async fn exit(&mut self, exit_reason: Exit) -> Never {
82        self.backend_call(CallMsg::Exit(exit_reason)).await;
83        std::future::pending().await
84    }
85
86    /// Link this actor to another actor.
87    pub async fn link(&mut self, to: ActorID) {
88        self.backend_call(CallMsg::Link(to)).await;
89    }
90
91    /// Unlink this actor from another actor.
92    pub async fn unlink(&mut self, from: ActorID) {
93        self.backend_call(CallMsg::Unlink(from)).await;
94    }
95
96    /// Set whether this actor upon receiving a [`Signal`](crate::context::Signal) will be able to
97    /// handle it (`trap_exit = true`) or crash (`trap_exit = false`).
98    pub async fn trap_exit(&mut self, trap_exit: bool) {
99        self.backend_call(CallMsg::TrapExit(trap_exit)).await;
100    }
101
102    pub async fn spawn_job<F>(&mut self, fut: F)
103    where
104        F: Future + Send + Sync + 'static,
105    {
106        self.backend_call(CallMsg::SpawnJob(Box::pin(async move {
107            let _ = fut.await;
108            None
109        })))
110        .await
111    }
112
113    /// Process the provided future "in background" and upon its completion send the output to the
114    /// message-inbox.
115    pub async fn future_to_inbox<F>(&mut self, fut: F)
116    where
117        F: Future + Send + Sync + 'static,
118        F::Output: Into<M>,
119    {
120        self.backend_call(CallMsg::SpawnJob(Box::pin(async move {
121            let message = fut.await.into();
122            Some(message)
123        })))
124        .await;
125    }
126}
127
128/// "data-bag" related methods
129impl<M> Context<M> {
130    pub fn put<D>(&mut self, data: D) -> Option<D>
131    where
132        D: Any + Send + Sync + 'static,
133    {
134        let type_id = data.type_id();
135        let boxed = Box::new(data);
136        let prev_opt = self.data.insert(type_id, boxed);
137
138        prev_opt
139            .map(|any| any.downcast().expect("The value does not match the type-id."))
140            .map(|b| *b)
141    }
142    pub fn take<D>(&mut self) -> Option<D>
143    where
144        D: Any + Send + Sync + 'static,
145    {
146        let type_id = TypeId::of::<D>();
147        let boxed_opt = self.data.remove(&type_id);
148
149        boxed_opt
150            .map(|any| any.downcast().expect("The value does not match the type-id."))
151            .map(|b| *b)
152    }
153    pub fn get<D>(&self) -> Option<&D>
154    where
155        D: Any + Send + Sync + 'static,
156    {
157        let type_id = TypeId::of::<D>();
158        let boxed_opt = self.data.get(&type_id);
159
160        boxed_opt.map(|any| any.downcast_ref().expect("The value does not match the type-id."))
161    }
162    pub fn get_mut<D>(&mut self) -> Option<&mut D>
163    where
164        D: Any + Send + Sync + 'static,
165    {
166        let type_id = TypeId::of::<D>();
167        let boxed_opt = self.data.get_mut(&type_id);
168
169        boxed_opt.map(|any| any.downcast_mut().expect("The value does not match the type-id."))
170    }
171    pub fn with_data(
172        mut self,
173        data: HashMap<TypeId, Box<dyn Any + Send + Sync + 'static>>,
174    ) -> Self {
175        self.data = data;
176        self
177    }
178}
179
180impl<M> Context<M> {
181    /// Create a new instance of [`Context`]
182    pub(crate) fn new(
183        actor_id: ActorID,
184        system: SystemWeakRef,
185        inbox: PipeRx<M>,
186        signals: PipeRx<Signal>,
187        calls: PipeTx<CallMsg<M>>,
188    ) -> Self {
189        let calls = calls.blocking();
190        Self { actor_id, system, messages: inbox, signals, calls, data: Default::default() }
191    }
192}
193
194impl<M> Context<M> {
195    async fn backend_call(&mut self, call: CallMsg<M>) {
196        self.calls.send(call).await.expect("It's a blocking Tx. Should not reject.")
197    }
198}