Skip to main content

xan_actor/
actor.rs

1use crate::channel;
2use crate::{
3    ActorError, ActorSystem, ActorSystemCmd, Blocking, CHANNEL_SIZE, ErrorHandling, LifeCycle,
4    Message, TypedMailbox,
5};
6use std::sync::Arc;
7
8/// User-implemented unit of work in the actor system.
9///
10/// You provide:
11/// - associated types `Message`, `Result`, `Error`,
12/// - `address()` so the system can place the actor in its address map,
13/// - `handle()` to process each inbound message.
14///
15/// The system provides the rest: a typed mailbox, a tokio task host
16/// (per [`Blocking`]), the lifecycle state machine ([`LifeCycle`]), and
17/// the optional hooks ([`pre_start`], [`pre_restart`], [`post_stop`],
18/// [`post_restart`]) that fire at the relevant transitions.
19///
20/// Register an actor with `actor.register(&mut system, ...)` (see
21/// [`register`]); never implement [`run_actor`] manually — its body is
22/// the receive/lifecycle loop the system depends on.
23///
24/// `'static + Send + Sync` because the actor's state moves into a tokio
25/// task that may outlive the caller.
26///
27/// [`Blocking`]: crate::Blocking
28/// [`LifeCycle`]: crate::LifeCycle
29/// [`pre_start`]: Self::pre_start
30/// [`pre_restart`]: Self::pre_restart
31/// [`post_stop`]: Self::post_stop
32/// [`post_restart`]: Self::post_restart
33/// [`register`]: Self::register
34/// [`run_actor`]: Self::run_actor
35#[async_trait::async_trait]
36pub trait Actor
37where
38    Self: Sized + Send + Sync + 'static,
39{
40    /// Inbound message type. `Debug` for log output. With `multi-node` on,
41    /// must also implement `xancode::Codec` for cross-node serialization
42    /// (enforced via [`MaybeCodec`] on the send methods).
43    ///
44    /// [`MaybeCodec`]: crate::MaybeCodec
45    type Message: std::fmt::Debug + Sized + Send + Sync + 'static;
46
47    /// Reply type returned from `handle`. Visible to callers of
48    /// `send_and_recv::<Self>`.
49    type Result: std::fmt::Debug + Sized + Send + 'static;
50
51    /// Error type returned from `handle` on failure. Combined with the
52    /// per-actor [`ErrorHandling`] policy to decide whether to resume,
53    /// restart, or stop the actor.
54    ///
55    /// [`ErrorHandling`]: crate::ErrorHandling
56    type Error: std::fmt::Debug + std::fmt::Display + Send;
57
58    /// Returns this actor's address. Must be stable for the actor's
59    /// lifetime — the system uses it as the key in its address map.
60    #[cfg(not(feature = "multi-node"))]
61    fn address(&self) -> &str;
62    /// Returns this actor's fully qualified address (multi-node).
63    /// `node` must equal the registering `ActorSystem`'s `node_name`,
64    /// otherwise `register` fails with [`AddressNotOwned`].
65    ///
66    /// [`AddressNotOwned`]: crate::ActorError::AddressNotOwned
67    #[cfg(feature = "multi-node")]
68    fn address(&self) -> &crate::inter_node::Address;
69
70    /// Internal helper: the local name part of this actor's address.
71    /// Single-node: equals `address()`. Multi-node: `address().name`.
72    #[doc(hidden)]
73    fn local_name(&self) -> &str {
74        #[cfg(not(feature = "multi-node"))]
75        {
76            self.address()
77        }
78        #[cfg(feature = "multi-node")]
79        {
80            &self.address().name
81        }
82    }
83
84    /// Process one inbound message. Called once per message in the actor's
85    /// receive loop. The `Arc<Self::Message>` is shared with potential
86    /// broadcast recipients — clone the inner payload if you need to
87    /// mutate it.
88    ///
89    /// Return `Err(...)` to trigger this actor's [`ErrorHandling`]
90    /// policy: `Resume` drops the message and continues, `Restart` tears
91    /// down the mailbox and re-enters via `pre_restart` / `post_restart`,
92    /// `Stop` runs `post_stop` and terminates.
93    ///
94    /// [`ErrorHandling`]: crate::ErrorHandling
95    async fn handle(&mut self, msg: Arc<Self::Message>) -> Result<Self::Result, Self::Error>;
96
97    /// Runs once after the actor enters [`LifeCycle::Starting`] but
98    /// before transitioning to `Receiving`. Use for one-time setup that
99    /// needs `&mut self` (e.g. opening a connection, registering with an
100    /// external service).
101    ///
102    /// [`LifeCycle::Starting`]: crate::LifeCycle::Starting
103    async fn pre_start(&mut self) {}
104
105    /// Runs after `post_stop` and before the actor re-enters
106    /// `Starting`/`Restarting` for a restart. Use to release / re-prepare
107    /// state that needs to be fresh in the next incarnation.
108    async fn pre_restart(&mut self) {}
109
110    /// Runs as the actor enters [`LifeCycle::Stopping`] — either due to
111    /// `Stop` error handling, a `kill` from `unregister`, or an explicit
112    /// `restart`. Last chance to clean up before the task exits.
113    ///
114    /// [`LifeCycle::Stopping`]: crate::LifeCycle::Stopping
115    async fn post_stop(&mut self) {}
116
117    /// Runs at the top of every restart iteration (after the first run),
118    /// before `pre_start`'s normal startup work. Symmetric to
119    /// `pre_restart`; use for "what to do once the restart actually
120    /// begins" rather than "what to do before tearing down".
121    async fn post_restart(&mut self) {}
122
123    /// Internal receive/lifecycle loop.
124    ///
125    /// Owns the mailbox, drains messages, applies `ErrorHandling`,
126    /// fires the lifecycle hooks, and reports state transitions back to
127    /// `actor_system_loop` via `ActorSystemCmd`. Do **not** implement
128    /// this method — use [`register`] to wire the actor up.
129    ///
130    /// `channel_size` is the mailbox capacity under `bounded-channel`
131    /// and is ignored under `unbounded-channel`.
132    ///
133    /// [`register`]: Self::register
134    async fn run_actor(
135        &mut self,
136        actor_system_tx: channel::Sender<ActorSystemCmd>,
137        error_handling: ErrorHandling,
138        ready_tx: channel::Sender<Result<(), ActorError>>,
139        channel_size: Option<usize>,
140    ) -> Result<(), ActorError> {
141        let mut is_restarted = false;
142        let size = channel_size.unwrap_or(CHANNEL_SIZE);
143        loop {
144            if is_restarted {
145                self.post_restart().await;
146            }
147            let (tx, mut rx) = channel::channel::<Message<Self>>(size);
148            let (kill_tx, mut kill_rx) = channel::channel::<()>(size);
149            let (restart_tx, mut restart_rx) = channel::channel::<()>(size);
150            let mailbox = Arc::new(TypedMailbox::<Self>::new(tx.clone()));
151
152            let mut count = 0;
153            let result_rx = loop {
154                let (result_tx, result_rx) = tokio::sync::oneshot::channel();
155                if let Err(e) = channel::send(
156                    &actor_system_tx,
157                    ActorSystemCmd::Register {
158                        actor_type: std::any::type_name::<Self>().to_string(),
159                        #[cfg(not(feature = "multi-node"))]
160                        address: self.address().to_string(),
161                        #[cfg(feature = "multi-node")]
162                        address: self.address().clone(),
163                        mailbox: mailbox.clone(),
164                        restart_tx: restart_tx.clone(),
165                        kill_tx: kill_tx.clone(),
166                        life_cycle: if is_restarted {
167                            LifeCycle::Restarting
168                        } else {
169                            LifeCycle::Starting
170                        },
171                        result_tx,
172                        is_restarted,
173                    },
174                )
175                .await
176                {
177                    count += 1;
178                    error!(
179                        "Failed to register actor {}...({}): {:?}",
180                        self.local_name(),
181                        count,
182                        e
183                    );
184                    if count > 10 {
185                        let _ = channel::send(&ready_tx, Err(ActorError::UnhealthyActorSystem))
186                            .await;
187                        return Err(ActorError::UnhealthyActorSystem);
188                    }
189                }
190                break result_rx;
191            };
192            match result_rx.await {
193                Ok(Err(e)) => {
194                    let _ = channel::send(&ready_tx, Err(e)).await;
195                    // Now, this case is only when the address already exists.
196                    return Err(ActorError::AddressAlreadyExist(self.local_name().to_string()));
197                }
198                Err(e) => {
199                    let _ = channel::send(&ready_tx, Err(ActorError::from(e))).await;
200                    return Err(ActorError::UnhealthyActorSystem);
201                }
202                _ => {}
203            }
204            self.pre_start().await;
205            is_restarted = true;
206            let _ = channel::send(
207                &actor_system_tx,
208                ActorSystemCmd::SetLifeCycle {
209                    address: self.local_name().to_string(),
210                    life_cycle: LifeCycle::Receiving,
211                },
212            )
213            .await;
214            let _ = channel::send(&ready_tx, Ok(())).await;
215            if let Some(_) = loop {
216                tokio::select! {
217                    Some(mut msg) = rx.recv() => {
218                        let result_tx = msg.result_tx();
219                        let msg_de = msg.inner();
220                        match self.handle(msg_de).await {
221                           Ok(result) => {
222                                if let Some(result_tx) = result_tx {
223                                    let _ = result_tx.send(result);
224                                }
225                            }
226                           Err(e) => {
227                                match error_handling {
228                                    ErrorHandling::Resume => {
229                                        debug!("Handler's result has error: {:?} ...Resume this actor", e);
230                                        continue;
231                                    }
232                                    ErrorHandling::Restart => {
233                                        debug!("Handler's result has error: {:?} ...Restart this actor", e);
234                                        break None;
235                                    }
236                                    ErrorHandling::Stop => {
237                                        error!("Handler's result has error: {:?} ...Stop this actor", e);
238                                        break Some(());
239                                    }
240                                }
241                           }
242                       }
243                    }
244                    Some(_) = kill_rx.recv() => {
245                        info!("Kill actor: address={}", self.local_name());
246                        break Some(());
247                    }
248                    Some(_) = restart_rx.recv() => {
249                        info!("Restart actor: address={}", self.local_name());
250                        break None;
251                    }
252                };
253            } {
254                let _ = channel::send(
255                    &actor_system_tx,
256                    ActorSystemCmd::SetLifeCycle {
257                        address: self.local_name().to_string(),
258                        life_cycle: LifeCycle::Stopping,
259                    },
260                )
261                .await;
262                self.post_stop().await;
263                let _ = channel::send(
264                    &actor_system_tx,
265                    ActorSystemCmd::SetLifeCycle {
266                        address: self.local_name().to_string(),
267                        life_cycle: LifeCycle::Terminated,
268                    },
269                )
270                .await;
271                break Ok(());
272            }
273            let _ = channel::send(
274                &actor_system_tx,
275                ActorSystemCmd::SetLifeCycle {
276                    address: self.local_name().to_string(),
277                    life_cycle: LifeCycle::Stopping,
278                },
279            )
280            .await;
281            self.pre_restart().await;
282            let _ = channel::send(
283                &actor_system_tx,
284                ActorSystemCmd::SetLifeCycle {
285                    address: self.local_name().to_string(),
286                    life_cycle: LifeCycle::Restarting,
287                },
288            )
289            .await;
290        }
291    }
292
293    /// Register the actor with `actor_system` and spawn its receive loop.
294    ///
295    /// Consumes `self` (the actor's state moves into the spawned task).
296    /// Awaits until the system confirms registration (or rejects it with
297    /// `AddressAlreadyExist` / `AddressNotOwned`), so on `Ok(())` the
298    /// actor is wired up — though it may still be in [`LifeCycle::Starting`]
299    /// when this returns.
300    ///
301    /// - `error_handling`: per-actor [`ErrorHandling`] policy applied
302    ///   on every `handle` error.
303    /// - `blocking`: chooses between `tokio::spawn` (`NonBlocking`) and
304    ///   `spawn_blocking + block_on` (`Blocking`). Pick `Blocking` if the
305    ///   handler does long synchronous work that would otherwise starve
306    ///   the async runtime.
307    /// - `channel_size`: override the mailbox capacity for this actor.
308    ///   `None` uses the default (4096). Ignored under
309    ///   `unbounded-channel`.
310    ///
311    /// [`ErrorHandling`]: crate::ErrorHandling
312    /// [`LifeCycle::Starting`]: crate::LifeCycle::Starting
313    async fn register(
314        mut self,
315        actor_system: &mut ActorSystem,
316        error_handling: ErrorHandling,
317        blocking: Blocking,
318        channel_size: Option<usize>,
319    ) -> Result<(), ActorError> {
320        let size = channel_size.unwrap_or(CHANNEL_SIZE);
321        let (tx, mut rx) = channel::channel(size);
322        let actor_system_tx = actor_system.handler_tx();
323        let _ = if blocking == Blocking::Blocking {
324            tokio::task::spawn_blocking(move || {
325                let result = tokio::runtime::Handle::current().block_on(self.run_actor(
326                    actor_system_tx,
327                    error_handling,
328                    tx,
329                    channel_size,
330                ));
331                if let Err(e) = result {
332                    error!("Actor {} run failed: {:?}", self.local_name(), e);
333                }
334            })
335        } else {
336            tokio::spawn(async move {
337                let result = self
338                    .run_actor(actor_system_tx, error_handling, tx, channel_size)
339                    .await;
340                if let Err(e) = result {
341                    error!("Actor {} run failed: {:?}", self.local_name(), e);
342                }
343            })
344        };
345        if let Some(result) = rx.recv().await {
346            result
347        } else {
348            Err(ActorError::ChannelRecv)
349        }
350    }
351}