xan_actor/actor.rs
1use crate::channel;
2use crate::{
3 ActorError, ActorSystem, ActorSystemCmd, Blocking, CHANNEL_SIZE, ErrorHandling, LifeCycle,
4 Message, TypedMailbox,
5};
6use std::sync::Arc;
7
8/// User-implemented unit of work in the actor system.
9///
10/// You provide:
11/// - associated types `Message`, `Result`, `Error`,
12/// - `address()` so the system can place the actor in its address map,
13/// - `handle()` to process each inbound message.
14///
15/// The system provides the rest: a typed mailbox, a tokio task host
16/// (per [`Blocking`]), the lifecycle state machine ([`LifeCycle`]), and
17/// the optional hooks ([`pre_start`], [`pre_restart`], [`post_stop`],
18/// [`post_restart`]) that fire at the relevant transitions.
19///
20/// Register an actor with `actor.register(&mut system, ...)` (see
21/// [`register`]); never implement [`run_actor`] manually — its body is
22/// the receive/lifecycle loop the system depends on.
23///
24/// `'static + Send + Sync` because the actor's state moves into a tokio
25/// task that may outlive the caller.
26///
27/// [`Blocking`]: crate::Blocking
28/// [`LifeCycle`]: crate::LifeCycle
29/// [`pre_start`]: Self::pre_start
30/// [`pre_restart`]: Self::pre_restart
31/// [`post_stop`]: Self::post_stop
32/// [`post_restart`]: Self::post_restart
33/// [`register`]: Self::register
34/// [`run_actor`]: Self::run_actor
35#[async_trait::async_trait]
36pub trait Actor
37where
38 Self: Sized + Send + Sync + 'static,
39{
40 /// Inbound message type. `Debug` for log output. With `multi-node` on,
41 /// must also implement `xancode::Codec` for cross-node serialization
42 /// (enforced via [`MaybeCodec`] on the send methods).
43 ///
44 /// [`MaybeCodec`]: crate::MaybeCodec
45 type Message: std::fmt::Debug + Sized + Send + Sync + 'static;
46
47 /// Reply type returned from `handle`. Visible to callers of
48 /// `send_and_recv::<Self>`.
49 type Result: std::fmt::Debug + Sized + Send + 'static;
50
51 /// Error type returned from `handle` on failure. Combined with the
52 /// per-actor [`ErrorHandling`] policy to decide whether to resume,
53 /// restart, or stop the actor.
54 ///
55 /// [`ErrorHandling`]: crate::ErrorHandling
56 type Error: std::fmt::Debug + std::fmt::Display + Send;
57
58 /// Returns this actor's address. Must be stable for the actor's
59 /// lifetime — the system uses it as the key in its address map.
60 #[cfg(not(feature = "multi-node"))]
61 fn address(&self) -> &str;
62 /// Returns this actor's fully qualified address (multi-node).
63 /// `node` must equal the registering `ActorSystem`'s `node_name`,
64 /// otherwise `register` fails with [`AddressNotOwned`].
65 ///
66 /// [`AddressNotOwned`]: crate::ActorError::AddressNotOwned
67 #[cfg(feature = "multi-node")]
68 fn address(&self) -> &crate::inter_node::Address;
69
70 /// Internal helper: the local name part of this actor's address.
71 /// Single-node: equals `address()`. Multi-node: `address().name`.
72 #[doc(hidden)]
73 fn local_name(&self) -> &str {
74 #[cfg(not(feature = "multi-node"))]
75 {
76 self.address()
77 }
78 #[cfg(feature = "multi-node")]
79 {
80 &self.address().name
81 }
82 }
83
84 /// Process one inbound message. Called once per message in the actor's
85 /// receive loop. The `Arc<Self::Message>` is shared with potential
86 /// broadcast recipients — clone the inner payload if you need to
87 /// mutate it.
88 ///
89 /// Return `Err(...)` to trigger this actor's [`ErrorHandling`]
90 /// policy: `Resume` drops the message and continues, `Restart` tears
91 /// down the mailbox and re-enters via `pre_restart` / `post_restart`,
92 /// `Stop` runs `post_stop` and terminates.
93 ///
94 /// [`ErrorHandling`]: crate::ErrorHandling
95 async fn handle(&mut self, msg: Arc<Self::Message>) -> Result<Self::Result, Self::Error>;
96
97 /// Runs once after the actor enters [`LifeCycle::Starting`] but
98 /// before transitioning to `Receiving`. Use for one-time setup that
99 /// needs `&mut self` (e.g. opening a connection, registering with an
100 /// external service).
101 ///
102 /// [`LifeCycle::Starting`]: crate::LifeCycle::Starting
103 async fn pre_start(&mut self) {}
104
105 /// Runs after `post_stop` and before the actor re-enters
106 /// `Starting`/`Restarting` for a restart. Use to release / re-prepare
107 /// state that needs to be fresh in the next incarnation.
108 async fn pre_restart(&mut self) {}
109
110 /// Runs as the actor enters [`LifeCycle::Stopping`] — either due to
111 /// `Stop` error handling, a `kill` from `unregister`, or an explicit
112 /// `restart`. Last chance to clean up before the task exits.
113 ///
114 /// [`LifeCycle::Stopping`]: crate::LifeCycle::Stopping
115 async fn post_stop(&mut self) {}
116
117 /// Runs at the top of every restart iteration (after the first run),
118 /// before `pre_start`'s normal startup work. Symmetric to
119 /// `pre_restart`; use for "what to do once the restart actually
120 /// begins" rather than "what to do before tearing down".
121 async fn post_restart(&mut self) {}
122
123 /// Internal receive/lifecycle loop.
124 ///
125 /// Owns the mailbox, drains messages, applies `ErrorHandling`,
126 /// fires the lifecycle hooks, and reports state transitions back to
127 /// `actor_system_loop` via `ActorSystemCmd`. Do **not** implement
128 /// this method — use [`register`] to wire the actor up.
129 ///
130 /// `channel_size` is the mailbox capacity under `bounded-channel`
131 /// and is ignored under `unbounded-channel`.
132 ///
133 /// [`register`]: Self::register
134 async fn run_actor(
135 &mut self,
136 actor_system_tx: channel::Sender<ActorSystemCmd>,
137 error_handling: ErrorHandling,
138 ready_tx: channel::Sender<Result<(), ActorError>>,
139 channel_size: Option<usize>,
140 ) -> Result<(), ActorError> {
141 let mut is_restarted = false;
142 let size = channel_size.unwrap_or(CHANNEL_SIZE);
143 loop {
144 if is_restarted {
145 self.post_restart().await;
146 }
147 let (tx, mut rx) = channel::channel::<Message<Self>>(size);
148 let (kill_tx, mut kill_rx) = channel::channel::<()>(size);
149 let (restart_tx, mut restart_rx) = channel::channel::<()>(size);
150 let mailbox = Arc::new(TypedMailbox::<Self>::new(tx.clone()));
151
152 let mut count = 0;
153 let result_rx = loop {
154 let (result_tx, result_rx) = tokio::sync::oneshot::channel();
155 if let Err(e) = channel::send(
156 &actor_system_tx,
157 ActorSystemCmd::Register {
158 actor_type: std::any::type_name::<Self>().to_string(),
159 #[cfg(not(feature = "multi-node"))]
160 address: self.address().to_string(),
161 #[cfg(feature = "multi-node")]
162 address: self.address().clone(),
163 mailbox: mailbox.clone(),
164 restart_tx: restart_tx.clone(),
165 kill_tx: kill_tx.clone(),
166 life_cycle: if is_restarted {
167 LifeCycle::Restarting
168 } else {
169 LifeCycle::Starting
170 },
171 result_tx,
172 is_restarted,
173 },
174 )
175 .await
176 {
177 count += 1;
178 error!(
179 "Failed to register actor {}...({}): {:?}",
180 self.local_name(),
181 count,
182 e
183 );
184 if count > 10 {
185 let _ = channel::send(&ready_tx, Err(ActorError::UnhealthyActorSystem))
186 .await;
187 return Err(ActorError::UnhealthyActorSystem);
188 }
189 }
190 break result_rx;
191 };
192 match result_rx.await {
193 Ok(Err(e)) => {
194 let _ = channel::send(&ready_tx, Err(e)).await;
195 // Now, this case is only when the address already exists.
196 return Err(ActorError::AddressAlreadyExist(self.local_name().to_string()));
197 }
198 Err(e) => {
199 let _ = channel::send(&ready_tx, Err(ActorError::from(e))).await;
200 return Err(ActorError::UnhealthyActorSystem);
201 }
202 _ => {}
203 }
204 self.pre_start().await;
205 is_restarted = true;
206 let _ = channel::send(
207 &actor_system_tx,
208 ActorSystemCmd::SetLifeCycle {
209 address: self.local_name().to_string(),
210 life_cycle: LifeCycle::Receiving,
211 },
212 )
213 .await;
214 let _ = channel::send(&ready_tx, Ok(())).await;
215 if let Some(_) = loop {
216 tokio::select! {
217 Some(mut msg) = rx.recv() => {
218 let result_tx = msg.result_tx();
219 let msg_de = msg.inner();
220 match self.handle(msg_de).await {
221 Ok(result) => {
222 if let Some(result_tx) = result_tx {
223 let _ = result_tx.send(result);
224 }
225 }
226 Err(e) => {
227 match error_handling {
228 ErrorHandling::Resume => {
229 debug!("Handler's result has error: {:?} ...Resume this actor", e);
230 continue;
231 }
232 ErrorHandling::Restart => {
233 debug!("Handler's result has error: {:?} ...Restart this actor", e);
234 break None;
235 }
236 ErrorHandling::Stop => {
237 error!("Handler's result has error: {:?} ...Stop this actor", e);
238 break Some(());
239 }
240 }
241 }
242 }
243 }
244 Some(_) = kill_rx.recv() => {
245 info!("Kill actor: address={}", self.local_name());
246 break Some(());
247 }
248 Some(_) = restart_rx.recv() => {
249 info!("Restart actor: address={}", self.local_name());
250 break None;
251 }
252 };
253 } {
254 let _ = channel::send(
255 &actor_system_tx,
256 ActorSystemCmd::SetLifeCycle {
257 address: self.local_name().to_string(),
258 life_cycle: LifeCycle::Stopping,
259 },
260 )
261 .await;
262 self.post_stop().await;
263 let _ = channel::send(
264 &actor_system_tx,
265 ActorSystemCmd::SetLifeCycle {
266 address: self.local_name().to_string(),
267 life_cycle: LifeCycle::Terminated,
268 },
269 )
270 .await;
271 break Ok(());
272 }
273 let _ = channel::send(
274 &actor_system_tx,
275 ActorSystemCmd::SetLifeCycle {
276 address: self.local_name().to_string(),
277 life_cycle: LifeCycle::Stopping,
278 },
279 )
280 .await;
281 self.pre_restart().await;
282 let _ = channel::send(
283 &actor_system_tx,
284 ActorSystemCmd::SetLifeCycle {
285 address: self.local_name().to_string(),
286 life_cycle: LifeCycle::Restarting,
287 },
288 )
289 .await;
290 }
291 }
292
293 /// Register the actor with `actor_system` and spawn its receive loop.
294 ///
295 /// Consumes `self` (the actor's state moves into the spawned task).
296 /// Awaits until the system confirms registration (or rejects it with
297 /// `AddressAlreadyExist` / `AddressNotOwned`), so on `Ok(())` the
298 /// actor is wired up — though it may still be in [`LifeCycle::Starting`]
299 /// when this returns.
300 ///
301 /// - `error_handling`: per-actor [`ErrorHandling`] policy applied
302 /// on every `handle` error.
303 /// - `blocking`: chooses between `tokio::spawn` (`NonBlocking`) and
304 /// `spawn_blocking + block_on` (`Blocking`). Pick `Blocking` if the
305 /// handler does long synchronous work that would otherwise starve
306 /// the async runtime.
307 /// - `channel_size`: override the mailbox capacity for this actor.
308 /// `None` uses the default (4096). Ignored under
309 /// `unbounded-channel`.
310 ///
311 /// [`ErrorHandling`]: crate::ErrorHandling
312 /// [`LifeCycle::Starting`]: crate::LifeCycle::Starting
313 async fn register(
314 mut self,
315 actor_system: &mut ActorSystem,
316 error_handling: ErrorHandling,
317 blocking: Blocking,
318 channel_size: Option<usize>,
319 ) -> Result<(), ActorError> {
320 let size = channel_size.unwrap_or(CHANNEL_SIZE);
321 let (tx, mut rx) = channel::channel(size);
322 let actor_system_tx = actor_system.handler_tx();
323 let _ = if blocking == Blocking::Blocking {
324 tokio::task::spawn_blocking(move || {
325 let result = tokio::runtime::Handle::current().block_on(self.run_actor(
326 actor_system_tx,
327 error_handling,
328 tx,
329 channel_size,
330 ));
331 if let Err(e) = result {
332 error!("Actor {} run failed: {:?}", self.local_name(), e);
333 }
334 })
335 } else {
336 tokio::spawn(async move {
337 let result = self
338 .run_actor(actor_system_tx, error_handling, tx, channel_size)
339 .await;
340 if let Err(e) = result {
341 error!("Actor {} run failed: {:?}", self.local_name(), e);
342 }
343 })
344 };
345 if let Some(result) = rx.recv().await {
346 result
347 } else {
348 Err(ActorError::ChannelRecv)
349 }
350 }
351}