speare/lib.rs
1use flume::{Receiver, Sender};
2use futures_core::Stream;
3use std::any::Any;
4use std::{
5 cmp,
6 collections::HashMap,
7 future::Future,
8 sync::{Arc, RwLock},
9 time::Duration,
10};
11use tokio::{
12 task::{self, JoinSet},
13 time,
14};
15
16pub mod mini;
17
18mod exit;
19mod node;
20mod pubsub;
21mod req_res;
22mod streams;
23mod watch;
24
25pub use exit::*;
26pub use node::*;
27pub use pubsub::PubSubError;
28pub use req_res::*;
29pub use streams::{SourceSet, Sources};
30
31use crate::pubsub::PubSub;
32use crate::watch::{NoWatch, OnErrTerminate, WatchFn};
33
34/// A thin abstraction over tokio tasks and flume channels, allowing for easy message passing
35/// with a supervision tree to handle failures.
36///
37/// ## Example
38/// ```
39/// use speare::{Ctx, Actor};
40/// use derive_more::From;
41///
42/// struct Counter {
43/// count: u32,
44/// }
45///
46/// struct CounterProps {
47/// initial_count: u32,
48/// max_count: u32,
49/// }
50///
51/// #[derive(From)]
52/// enum CounterMsg {
53/// Inc(u32),
54/// }
55///
56/// enum CounterErr {
57/// MaxCountExceeded,
58/// }
59///
60/// impl Actor for Counter {
61/// type Props = CounterProps;
62/// type Msg = CounterMsg;
63/// type Err = CounterErr;
64///
65/// async fn init(ctx: &mut Ctx<Self>) -> Result<Self, Self::Err> {
66/// Ok(Counter {
67/// count: ctx.props().initial_count,
68/// })
69/// }
70///
71/// async fn handle(&mut self, msg: Self::Msg, ctx: &mut Ctx<Self>) -> Result<(), Self::Err> {
72/// match msg {
73/// CounterMsg::Inc(x) => {
74/// self.count += x;
75///
76/// if self.count > ctx.props().max_count {
77/// return Err(CounterErr::MaxCountExceeded);
78/// }
79/// }
80/// }
81///
82/// Ok(())
83/// }
84/// }
85/// ```
86#[allow(unused_variables)]
87pub trait Actor: Sized + Send + 'static {
88 type Props: Send + 'static;
89 type Msg: Send + 'static;
90 type Err: Send + Sync + 'static;
91
92 /// Constructs the actor. Called on initial spawn and on every restart.
93 ///
94 /// # Example
95 /// ```ignore
96 /// async fn init(ctx: &mut Ctx<Self>) -> Result<Self, Self::Err> {
97 /// Ok(MyActor { count: ctx.props().initial })
98 /// }
99 /// ```
100 fn init(ctx: &mut Ctx<Self>) -> impl Future<Output = Result<Self, Self::Err>> + Send;
101
102 /// Cleanup hook called when the actor stops, restarts, or fails to init.
103 /// `this` is `None` if init failed.
104 ///
105 /// # Example
106 /// ```ignore
107 /// async fn exit(this: Option<Self>, reason: ExitReason<Self>, ctx: &mut Ctx<Self>) {
108 /// if let ExitReason::Err(e) = reason {
109 /// eprintln!("actor failed: {e:?}");
110 /// }
111 /// }
112 /// ```
113 fn exit(
114 this: Option<Self>,
115 reason: ExitReason<Self>,
116 ctx: &mut Ctx<Self>,
117 ) -> impl Future<Output = ()> + Send {
118 async {}
119 }
120
121 /// Sets up message sources (streams, intervals) after init.
122 ///
123 /// Sources added earlier in the [`SourceSet`] chain have higher polling priority.
124 /// If an earlier source is consistently ready, later sources may be starved.
125 ///
126 /// # Example
127 /// ```ignore
128 /// async fn sources(&self, ctx: &Ctx<Self>) -> Result<impl Sources<Self>, Self::Err> {
129 /// Ok(SourceSet::new()
130 /// .interval(time::interval(Duration::from_millis(100)), || Msg::Tick)
131 /// .stream(my_stream))
132 /// }
133 /// ```
134 fn sources(
135 &self,
136 ctx: &Ctx<Self>,
137 ) -> impl Future<Output = Result<impl Sources<Self>, Self::Err>> + Send {
138 async { Ok(SourceSet::new()) }
139 }
140
141 /// Called everytime your [`Actor`] receives a message.
142 ///
143 /// # Example
144 /// ```ignore
145 /// async fn handle(&mut self, msg: Self::Msg, ctx: &mut Ctx<Self>) -> Result<(), Self::Err> {
146 /// match msg {
147 /// Msg::Inc(n) => self.count += n,
148 /// }
149 ///
150 /// Ok(())
151 /// }
152 /// ```
153 fn handle(
154 &mut self,
155 msg: Self::Msg,
156 ctx: &mut Ctx<Self>,
157 ) -> impl Future<Output = Result<(), Self::Err>> + Send {
158 async { Ok(()) }
159 }
160}
161
162/// A handle to send messages to or stop an [`Actor`].
163pub struct Handle<Msg> {
164 msg_tx: Sender<Msg>,
165 proc_msg_tx: Sender<ProcMsg>,
166}
167
168impl<Msg> Clone for Handle<Msg> {
169 fn clone(&self) -> Self {
170 Self {
171 msg_tx: self.msg_tx.clone(),
172 proc_msg_tx: self.proc_msg_tx.clone(),
173 }
174 }
175}
176
177impl<Msg> Handle<Msg> {
178 /// Stops the [`Actor`] associated with this handle. Does not wait for the actor to finish.
179 ///
180 /// # Example
181 /// ```ignore
182 /// handle.stop();
183 /// ```
184 pub fn stop(&self) {
185 let (tx, _) = flume::unbounded();
186 let _ = self
187 .proc_msg_tx
188 .send(ProcMsg::FromHandle(ProcAction::Stop(tx)));
189 }
190
191 /// Restarts the [`Actor`] by re-running [`Actor::init`] and [`Actor::sources`]. Does not wait for the actor to finish.
192 ///
193 /// # Example
194 /// ```ignore
195 /// handle.restart();
196 /// ```
197 pub fn restart(&self) {
198 let _ = self
199 .proc_msg_tx
200 .send(ProcMsg::FromHandle(ProcAction::Restart));
201 }
202
203 /// Returns `true` if the [`Actor`] is still running.
204 ///
205 /// # Example
206 /// ```ignore
207 /// if handle.is_alive() {
208 /// handle.send(Msg::Ping);
209 /// }
210 /// ```
211 pub fn is_alive(&self) -> bool {
212 !self.msg_tx.is_disconnected()
213 }
214
215 /// Sends a message to the [`Actor`], returning `true` if the message was delivered
216 /// or `false` if the actor is no longer running.
217 /// Takes advantage of `From<_>` implementations on the message type.
218 ///
219 /// # Example
220 /// ```ignore
221 /// // Given `#[derive(From)] enum Msg { Inc(u32) }`:
222 /// handle.send(Msg::Inc(1));
223 /// handle.send(1u32); // works via From<u32>
224 /// ```
225 pub fn send<M: Into<Msg>>(&self, msg: M) -> bool {
226 self.msg_tx.send(msg.into()).is_ok()
227 }
228
229 /// Sends a message to the [`Actor`] after the given duration, failing silently if it is no longer running.
230 ///
231 /// # Example
232 /// ```ignore
233 /// handle.send_in(Msg::Timeout, Duration::from_secs(5));
234 /// ```
235 pub fn send_in<M>(&self, msg: M, duration: Duration)
236 where
237 Msg: 'static + Send,
238 M: 'static + Send + Into<Msg>,
239 {
240 let msg_tx = self.msg_tx.clone();
241
242 task::spawn(async move {
243 time::sleep(duration).await;
244 let _ = msg_tx.send(msg.into());
245 });
246 }
247
248 /// Sends a request and awaits a response. Requires `Msg: From<Request<Req, Res>>`.
249 ///
250 /// # Example
251 /// ```ignore
252 /// #[derive(From)]
253 /// enum Msg {
254 /// GetCount(Request<(), u32>),
255 /// }
256 ///
257 /// // sender side:
258 /// let count: u32 = handle.req(()).await?;
259 ///
260 /// // receiver side, inside handle():
261 /// Msg::GetCount(req) => req.reply(self.count),
262 /// ```
263 pub async fn req<Req, Res>(&self, req: Req) -> Result<Res, ReqErr>
264 where
265 Msg: From<Request<Req, Res>>,
266 {
267 let (req, res) = req_res(req);
268 self.send(req);
269 res.recv().await
270 }
271
272 /// Like [`Handle::req`], but uses a wrapper function to convert the [`Request`] into the message type.
273 /// Useful when the message variant can't implement `From<Request<Req, Res>>`.
274 ///
275 /// # Example
276 /// ```ignore
277 /// enum Msg {
278 /// GetCount(Request<(), u32>),
279 /// }
280 ///
281 /// let count: u32 = handle.reqw(Msg::GetCount, ()).await?;
282 /// ```
283 pub async fn reqw<F, Req, Res>(&self, to_req: F, req: Req) -> Result<Res, ReqErr>
284 where
285 F: Fn(Request<Req, Res>) -> Msg,
286 {
287 let (req, res) = req_res(req);
288 let msg = to_req(req);
289 self.send(msg);
290 res.recv().await
291 }
292
293 /// Like [`Handle::req`], but fails with [`ReqErr::Timeout`] if no response within the given [`Duration`].
294 ///
295 /// # Example
296 /// ```ignore
297 /// let count: u32 = handle.req_timeout((), Duration::from_secs(1)).await?;
298 /// ```
299 pub async fn req_timeout<Req, Res>(&self, req: Req, timeout: Duration) -> Result<Res, ReqErr>
300 where
301 Msg: From<Request<Req, Res>>,
302 {
303 let (req, res) = req_res(req);
304 self.send(req);
305 res.recv_timeout(timeout).await
306 }
307
308 /// Like [`Handle::reqw`], but fails with [`ReqErr::Timeout`] if no response within the given [`Duration`].
309 ///
310 /// # Example
311 /// ```ignore
312 /// let count: u32 = handle.reqw_timeout(Msg::GetCount, (), Duration::from_secs(1)).await?;
313 /// ```
314 pub async fn reqw_timeout<F, Req, Res>(
315 &self,
316 to_req: F,
317 req: Req,
318 timeout: Duration,
319 ) -> Result<Res, ReqErr>
320 where
321 F: Fn(Request<Req, Res>) -> Msg,
322 {
323 let (req, res) = req_res(req);
324 let msg = to_req(req);
325 self.send(msg);
326 res.recv_timeout(timeout).await
327 }
328}
329
330/// The context surrounding the current `Actor`.
331///
332/// Provides a collection of methods that allow you to:
333/// - spawn other actors as children of the current actor
334/// - access the `Handle<_>` for the currrent actor
335/// - access this actor's props
336/// - clear this actor's mailbox
337pub struct Ctx<P>
338where
339 P: Actor,
340{
341 id: u64,
342 props: P::Props,
343 handle: Handle<P::Msg>,
344 msg_rx: Receiver<P::Msg>,
345 parent_proc_msg_tx: Option<Sender<ProcMsg>>,
346 proc_msg_rx: Receiver<ProcMsg>,
347 children_proc_msg_tx: HashMap<u64, Sender<ProcMsg>>,
348 supervision: Supervision,
349 total_children: u64,
350 tasks: JoinSet<Result<P::Msg, P::Err>>,
351 restarts: u64,
352 registry_key: Option<String>,
353 registry: Arc<RwLock<HashMap<String, Box<dyn Any + Send + Sync>>>>,
354 pubsub: Arc<RwLock<PubSub>>,
355 subscription_ids: Vec<(String, u64)>,
356}
357
358impl<P> Ctx<P>
359where
360 P: Actor,
361{
362 /// Returns a reference to this [`Actor`]'s props. Props are set once at spawn time
363 /// and remain immutable for the lifetime of the actor, including across restarts.
364 ///
365 /// # Example
366 /// ```ignore
367 /// async fn init(ctx: &mut Ctx<Self>) -> Result<Self, Self::Err> {
368 /// Ok(MyActor { count: ctx.props().initial_count })
369 /// }
370 /// ```
371 pub fn props(&self) -> &P::Props {
372 &self.props
373 }
374
375 /// Returns a [`Handle`] to the current [`Actor`], allowing it to send messages to itself
376 /// or pass its handle to child actors.
377 ///
378 /// # Example
379 /// ```ignore
380 /// // schedule a message to self
381 /// ctx.this().send_in(Msg::Tick, Duration::from_secs(1));
382 /// ```
383 pub fn this(&self) -> &Handle<P::Msg> {
384 &self.handle
385 }
386
387 /// Drains all pending messages from this [`Actor`]'s mailbox. Useful during
388 /// restarts to discard stale messages via [`Actor::init`].
389 ///
390 /// # Example
391 /// ```ignore
392 /// async fn init(ctx: &mut Ctx<Self>) -> Result<Self, Self::Err> {
393 /// ctx.clear_mailbox();
394 /// Ok(MyActor::default())
395 /// }
396 /// ```
397 pub fn clear_mailbox(&self) {
398 self.msg_rx.drain();
399 }
400
401 /// Creates a [`SpawnBuilder`] for spawning a child [`Actor`]. The actor type is passed
402 /// as a generic parameter and its props as the argument. The child is supervised
403 /// by the current actor and will be stopped when the parent stops.
404 ///
405 /// # Example
406 /// ```ignore
407 /// let handle = ctx.actor::<Worker>(WorkerProps { id: 1 })
408 /// .supervision(Supervision::Restart {
409 /// max: Limit::Amount(3),
410 /// backoff: Backoff::None,
411 /// })
412 /// .spawn();
413 /// ```
414 pub fn actor<'a, Child>(&'a mut self, props: Child::Props) -> SpawnBuilder<'a, P, Child>
415 where
416 Child: Actor,
417 {
418 SpawnBuilder::new(self, props)
419 }
420
421 /// Restarts all child actors immediately, bypassing their supervision strategy.
422 /// Each child will re-run its [`Actor::init`] with the same props.
423 ///
424 /// This is fire-and-forget: it does not wait for children to finish restarting.
425 pub fn restart_children(&self) {
426 for child in self.children_proc_msg_tx.values() {
427 let _ = child.send(ProcMsg::FromParent(ProcAction::Restart));
428 }
429 }
430
431 /// Stops all child actors and waits for each to fully terminate before returning.
432 pub async fn stop_children(&mut self) {
433 let mut acks = Vec::with_capacity(self.total_children as usize);
434 for child in self.children_proc_msg_tx.values() {
435 let (ack_tx, ack_rx) = flume::unbounded();
436 let _ = child.send(ProcMsg::FromParent(ProcAction::Stop(ack_tx)));
437 acks.push(ack_rx);
438 }
439
440 for ack in acks {
441 let _ = ack.recv_async().await;
442 }
443
444 self.total_children = 0;
445 self.children_proc_msg_tx.clear();
446 }
447
448 /// Spawns a background async task. On completion, its `Ok` value is delivered
449 /// as a message to this [`Actor`]; its `Err` triggers the supervision strategy
450 /// that this actor's parent has set for it.
451 ///
452 /// Tasks are aborted when the actor stops, but **survive restarts**. If the
453 /// actor is restarted (via supervision or [`Ctx::restart_children`]), in-flight
454 /// tasks from the previous incarnation will continue running and their results
455 /// will still be delivered to the restarted actor's `handle()`.
456 ///
457 /// # Example
458 /// ```ignore
459 /// ctx.task(async {
460 /// let data = reqwest::get("https://example.com").await?.text().await?;
461 /// Ok(Msg::Fetched(data))
462 /// });
463 /// ```
464 pub fn task<F>(&mut self, f: F)
465 where
466 F: Future<Output = Result<P::Msg, P::Err>> + Send + 'static,
467 {
468 self.tasks.spawn(f);
469 }
470
471 /// Looks up a registered [`Actor`]'s [`Handle`] by its type. The actor must have been
472 /// spawned with [`SpawnBuilder::spawn_registered`].
473 ///
474 /// # Example
475 /// ```ignore
476 /// let logger = ctx.get_handle_for::<Logger>()?;
477 /// logger.send(LogMsg::Info("hello".into()));
478 /// ```
479 pub fn get_handle_for<A: Actor>(&self) -> Result<Handle<A::Msg>, RegistryError> {
480 let key = std::any::type_name::<A>();
481 let reg = self.registry.read().map_err(|_| RegistryError::PoisonErr)?;
482 reg.get(key)
483 .and_then(|h| h.downcast_ref::<Handle<A::Msg>>())
484 .cloned()
485 .ok_or_else(|| RegistryError::NotFound(key.to_string()))
486 }
487
488 /// Looks up a registered [`Actor`]'s [`Handle`] by name. The actor must have been
489 /// spawned with [`SpawnBuilder::spawn_named`].
490 ///
491 /// # Example
492 /// ```ignore
493 /// let worker = ctx.get_handle::<WorkerMsg>("worker-1")?;
494 /// worker.send(WorkerMsg::Start);
495 /// ```
496 pub fn get_handle<Msg: Send + 'static>(
497 &self,
498 name: &str,
499 ) -> Result<Handle<Msg>, RegistryError> {
500 let reg = self.registry.read().map_err(|_| RegistryError::PoisonErr)?;
501 reg.get(name)
502 .and_then(|h| h.downcast_ref::<Handle<Msg>>())
503 .cloned()
504 .ok_or_else(|| RegistryError::NotFound(name.to_string()))
505 }
506
507 /// Sends a message to a registered [`Actor`] looked up by type.
508 ///
509 /// # Example
510 /// ```ignore
511 /// // Assuming MetricsCollector was spawned with spawn_registered():
512 /// // ctx.actor::<MetricsCollector>(props).spawn_registered()?;
513 ///
514 /// // Any actor in the system can then send to it by type:
515 /// ctx.send::<MetricsCollector>(MetricsMsg::RecordLatency(42))?;
516 /// ```
517 pub fn send<A: Actor>(&self, msg: impl Into<A::Msg>) -> Result<(), RegistryError> {
518 let key = std::any::type_name::<A>();
519 let reg = self.registry.read().map_err(|_| RegistryError::PoisonErr)?;
520 match reg
521 .get(key)
522 .and_then(|h| h.downcast_ref::<Handle<A::Msg>>())
523 {
524 Some(handle) => {
525 handle.send(msg);
526 Ok(())
527 }
528 None => Err(RegistryError::NotFound(key.to_string())),
529 }
530 }
531
532 /// Sends a message to a registered [`Actor`] looked up by name.
533 ///
534 /// # Example
535 /// ```ignore
536 /// // Assuming a Worker was spawned with spawn_named():
537 /// // ctx.actor::<Worker>(props).spawn_named("worker-1")?;
538 ///
539 /// // Any actor in the system can then send to it by name:
540 /// ctx.send_to("worker-1", WorkerMsg::Start)?;
541 /// ```
542 pub fn send_to<Msg: Send + 'static>(
543 &self,
544 name: &str,
545 msg: impl Into<Msg>,
546 ) -> Result<(), RegistryError> {
547 let reg = self.registry.read().map_err(|_| RegistryError::PoisonErr)?;
548 match reg.get(name).and_then(|h| h.downcast_ref::<Handle<Msg>>()) {
549 Some(handle) => {
550 handle.send(msg);
551 Ok(())
552 }
553 None => Err(RegistryError::NotFound(name.to_string())),
554 }
555 }
556}
557
558#[allow(clippy::enum_variant_names)]
559#[derive(Debug)]
560enum ProcMsg {
561 /// Sent from child once it terminates
562 ChildTerminated {
563 child_id: u64,
564 },
565 FromParent(ProcAction),
566 FromHandle(ProcAction),
567}
568
569#[derive(Debug)]
570enum ProcAction {
571 Restart,
572 Stop(Sender<()>),
573}
574
575fn spawn<Child, W>(mut ctx: Ctx<Child>, delay: Option<Duration>, watch: W)
576where
577 Child: Actor,
578 W: OnErrTerminate<Child::Err>,
579{
580 tokio::spawn(async move {
581 if let Some(d) = delay.filter(|d| !d.is_zero()) {
582 time::sleep(d).await;
583 }
584
585 // restart is Some whenever we should restart
586 let mut restart = Restart::No;
587 let mut exit_reason = None;
588 let mut actor_created = None;
589 let mut stop_ack_tx = None;
590
591 match Child::init(&mut ctx).await {
592 Err(e) => {
593 exit_reason = Some(ExitReason::Err(e));
594 restart = Restart::from_supervision(ctx.supervision, ctx.restarts);
595 }
596
597 Ok(mut actor) => match actor.sources(&ctx).await {
598 Err(e) => {
599 exit_reason = Some(ExitReason::Err(e));
600 restart = Restart::from_supervision(ctx.supervision, ctx.restarts);
601 actor_created = Some(actor);
602 }
603
604 Ok(mut sources) => {
605 macro_rules! on_err {
606 ($e:expr) => {
607 if let Supervision::Resume = ctx.supervision {
608 continue;
609 }
610
611 restart = Restart::from_supervision(ctx.supervision, ctx.restarts);
612 exit_reason = Some(ExitReason::Err($e));
613 actor_created = Some(actor);
614 break;
615 };
616 }
617
618 loop {
619 tokio::select! {
620 biased;
621
622 proc_msg = ctx.proc_msg_rx.recv_async() => {
623 match proc_msg {
624 Err(_) => break,
625
626 Ok(ProcMsg::FromHandle(ProcAction::Stop(tx)) ) => {
627 exit_reason = Some(ExitReason::Handle);
628 stop_ack_tx = Some(tx);
629 break
630 },
631
632 Ok(ProcMsg::FromParent(ProcAction::Stop(tx))) => {
633 exit_reason = exit_reason.or(Some(ExitReason::Parent));
634 stop_ack_tx = Some(tx);
635 break
636 },
637
638 Ok(ProcMsg::FromParent(ProcAction::Restart)) => {
639 exit_reason = exit_reason.or(Some(ExitReason::Parent));
640 restart = Restart::In(Duration::ZERO);
641 break;
642 }
643
644
645 Ok(ProcMsg::FromHandle(ProcAction::Restart)) => {
646 exit_reason = exit_reason.or(Some(ExitReason::Handle));
647 restart = Restart::In(Duration::ZERO);
648 break;
649 }
650
651 Ok(ProcMsg::ChildTerminated { child_id, }) => {
652 if ctx.children_proc_msg_tx.remove(&child_id).is_some() {
653 ctx.total_children -= 1;
654 }
655 }
656 }
657 }
658
659 recvd = ctx.msg_rx.recv_async() => {
660 match recvd {
661 Err(_) => break,
662
663 Ok(msg) => {
664 if let Err(e) = actor.handle(msg, &mut ctx).await {
665 on_err!(e);
666 };
667 }
668 }
669 }
670
671 Some(Ok(msg)) = ctx.tasks.join_next() => {
672 match msg {
673 Err(e) => {
674 on_err!(e);
675 }
676
677 Ok(msg) => {
678 if let Err(e) = actor.handle(msg, &mut ctx).await {
679 on_err!(e);
680 };
681 }
682 }
683
684 }
685
686 Some(msg) = std::future::poll_fn(|cx| Pin::new(&mut sources).poll_next(cx)) => {
687 if let Err(e) = actor.handle(msg, &mut ctx).await {
688 on_err!(e);
689 };
690 }
691 }
692 }
693 }
694 },
695 }
696
697 ctx.stop_children().await;
698 let exit_reason = exit_reason.unwrap_or(ExitReason::Handle);
699
700 if let ExitReason::Err(_) = &exit_reason {
701 ctx.restarts += 1;
702 }
703
704 if let (Restart::No, ExitReason::Err(ref e)) = (&restart, &exit_reason) {
705 watch.on_err_terminate(e);
706 }
707
708 Child::exit(actor_created, exit_reason, &mut ctx).await;
709
710 // Clean up pub/sub subscriptions (runs on both stop and restart)
711 if !ctx.subscription_ids.is_empty() {
712 if let Ok(mut bus) = ctx.pubsub.write() {
713 for (topic, sub_id) in ctx.subscription_ids.drain(..) {
714 if let Some(entry) = bus.topics.get_mut(&topic) {
715 entry.subscribers.retain(|s| s.id != sub_id);
716 if entry.subscribers.is_empty() {
717 bus.topics.remove(&topic);
718 }
719 }
720 }
721 }
722 }
723
724 let _ = stop_ack_tx.map(|tx| tx.send(()));
725
726 if let Restart::In(duration) = restart {
727 spawn::<Child, W>(ctx, Some(duration), watch)
728 } else if let Some(parent_tx) = ctx.parent_proc_msg_tx {
729 if let Some(key) = ctx.registry_key.take() {
730 if let Ok(mut reg) = ctx.registry.write() {
731 reg.remove(&key);
732 }
733 }
734
735 let _ = parent_tx.send(ProcMsg::ChildTerminated { child_id: ctx.id });
736 }
737 });
738}
739
740/// Defines how a parent reacts when a child actor fails.
741///
742/// # Example
743/// ```ignore
744/// let supervision = Supervision::Restart {
745/// max: Limit::Amount(5),
746/// backoff: Backoff::Static(Duration::from_secs(1)),
747/// };
748/// ```
749#[derive(Debug, Clone, Copy)]
750pub enum Supervision {
751 /// Actor terminates on error.
752 Stop,
753 /// Actor continues processing the next message after an error.
754 Resume,
755 /// Actor is restarted on error, up to `max` times with optional `backoff`.
756 Restart { max: Limit, backoff: Backoff },
757}
758
759/// Delay strategy between restart attempts.
760///
761/// # Example
762/// ```ignore
763/// let backoff = Backoff::Incremental {
764/// min: Duration::from_millis(100),
765/// max: Duration::from_secs(5),
766/// step: Duration::from_millis(500),
767/// };
768/// ```
769#[derive(Debug, Clone, Copy, Eq, PartialEq)]
770pub enum Backoff {
771 /// Restart immediately with no delay.
772 None,
773 /// Wait a fixed duration between restarts.
774 Static(Duration),
775 /// Linearly increase delay from `min` to `max` by `step` per restart.
776 Incremental {
777 min: Duration,
778 max: Duration,
779 step: Duration,
780 },
781}
782
783/// Maximum number of restarts allowed.
784///
785/// # Example
786/// ```ignore
787/// let limit = Limit::Amount(3);
788/// ```
789#[derive(Debug, Clone, Copy, Eq, PartialEq)]
790pub enum Limit {
791 /// No limit on restarts.
792 None,
793 /// Restart at most this many times.
794 Amount(u64),
795}
796
797/// **Note**: `0` maps to [`Limit::None`] (unlimited), not zero restarts.
798/// If you want zero restarts (i.e., never restart), use [`Supervision::Stop`] instead.
799impl From<u64> for Limit {
800 fn from(value: u64) -> Self {
801 match value {
802 0 => Limit::None,
803 v => Limit::Amount(v),
804 }
805 }
806}
807
808impl PartialEq<u64> for Limit {
809 fn eq(&self, other: &u64) -> bool {
810 match self {
811 Limit::None => false,
812 Limit::Amount(n) => n == other,
813 }
814 }
815}
816
817#[derive(Debug, Clone)]
818pub enum RegistryError {
819 NameTaken(String),
820 NotFound(String),
821 PoisonErr,
822}
823
824impl std::fmt::Display for RegistryError {
825 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
826 match self {
827 RegistryError::NameTaken(name) => write!(f, "registry name already taken: {name}"),
828 RegistryError::NotFound(name) => write!(f, "no actor registered under: {name}"),
829 RegistryError::PoisonErr => write!(f, "registry lock poisoned"),
830 }
831 }
832}
833
834impl std::error::Error for RegistryError {}
835
836/// Builder for configuring and spawning a child [`Actor`]. Created via [`Ctx::actor`].
837pub struct SpawnBuilder<'a, Parent, Child, W = NoWatch>
838where
839 Parent: Actor,
840 Child: Actor,
841{
842 ctx: &'a mut Ctx<Parent>,
843 props: Child::Props,
844 supervision: Supervision,
845 /// Only kicks in if child is stopped or reaches maximum number of restarts.
846 watch: W,
847 registry_key: Option<String>,
848}
849
850impl<'a, Parent, Child> SpawnBuilder<'a, Parent, Child, NoWatch>
851where
852 Parent: Actor,
853 Child: Actor,
854{
855 fn new(ctx: &'a mut Ctx<Parent>, props: Child::Props) -> Self {
856 Self {
857 ctx,
858 props,
859 supervision: Supervision::Restart {
860 max: Limit::None,
861 backoff: Backoff::None,
862 },
863 watch: NoWatch,
864 registry_key: None,
865 }
866 }
867}
868
869impl<'a, Parent, Child, W> SpawnBuilder<'a, Parent, Child, W>
870where
871 Parent: Actor,
872 Child: Actor,
873 W: OnErrTerminate<Child::Err>,
874{
875 /// Sets the [`Supervision`] strategy the parent will use for this child.
876 /// Defaults to [`Supervision::Restart`] with unlimited restarts and no backoff.
877 ///
878 /// # Example
879 /// ```ignore
880 /// ctx.actor::<Worker>(props)
881 /// .supervision(Supervision::Restart {
882 /// max: Limit::Amount(3),
883 /// backoff: Backoff::Static(Duration::from_secs(1)),
884 /// })
885 /// .spawn();
886 /// ```
887 pub fn supervision(mut self, supervision: Supervision) -> Self {
888 self.supervision = supervision;
889 self
890 }
891
892 /// Registers a callback that fires when the child terminates due to an error.
893 /// This happens when the supervision strategy is [`Supervision::Stop`], or when
894 /// [`Supervision::Restart`] has exhausted all allowed restarts. The callback maps
895 /// the child's error into a message for the parent.
896 ///
897 /// # Example
898 /// ```ignore
899 /// ctx.actor::<Worker>(props)
900 /// .supervision(Supervision::Restart {
901 /// max: Limit::Amount(3),
902 /// backoff: Backoff::None,
903 /// })
904 /// .watch(|err| ParentMsg::WorkerDied(format!("{err:?}")))
905 /// .spawn();
906 /// ```
907 pub fn watch<F>(self, f: F) -> SpawnBuilder<'a, Parent, Child, WatchFn<F, Parent::Msg>>
908 where
909 F: Fn(&Child::Err) -> Parent::Msg + Send + 'static,
910 {
911 let parent_msg_tx = self.ctx.handle.msg_tx.clone();
912 SpawnBuilder {
913 ctx: self.ctx,
914 props: self.props,
915 supervision: self.supervision,
916 watch: WatchFn { f, parent_msg_tx },
917 registry_key: self.registry_key,
918 }
919 }
920
921 /// Spawns the child [`Actor`] and returns a [`Handle`] to it.
922 pub fn spawn(self) -> Handle<Child::Msg> {
923 let (msg_tx, msg_rx) = flume::unbounded(); // child
924 let (proc_msg_tx, proc_msg_rx) = flume::unbounded(); // child
925
926 let handle = Handle {
927 msg_tx,
928 proc_msg_tx,
929 };
930
931 self.ctx.total_children += 1;
932 let id = self.ctx.total_children;
933
934 let ctx: Ctx<Child> = Ctx {
935 id,
936 props: self.props,
937 handle: handle.clone(),
938 msg_rx,
939 parent_proc_msg_tx: Some(self.ctx.handle.proc_msg_tx.clone()),
940 proc_msg_rx,
941 children_proc_msg_tx: HashMap::new(),
942 total_children: 0,
943 supervision: self.supervision,
944 restarts: 0,
945 tasks: JoinSet::new(),
946 registry_key: self.registry_key,
947 registry: self.ctx.registry.clone(),
948 pubsub: self.ctx.pubsub.clone(),
949 subscription_ids: Vec::new(),
950 };
951
952 spawn::<Child, W>(ctx, None, self.watch);
953
954 self.ctx
955 .children_proc_msg_tx
956 .insert(self.ctx.total_children, handle.proc_msg_tx.clone());
957
958 handle
959 }
960
961 /// Spawns the child and registers it in the global registry under its type name.
962 /// Other actors can then look it up via [`Ctx::get_handle_for`] or [`Ctx::send`].
963 /// Returns [`RegistryError::NameTaken`] if already registered.
964 pub fn spawn_registered(self) -> Result<Handle<Child::Msg>, RegistryError> {
965 let key = std::any::type_name::<Child>();
966 self.spawn_named(key)
967 }
968
969 /// Spawns the child and registers it in the global registry under the given name.
970 /// Other actors can then look it up via [`Ctx::get_handle`] or [`Ctx::send_to`].
971 /// Returns [`RegistryError::NameTaken`] if the name is already taken.
972 ///
973 /// # Example
974 /// ```ignore
975 /// let h = ctx.actor::<Worker>(props).spawn_named("worker-1")?;
976 /// ```
977 pub fn spawn_named(
978 mut self,
979 name: impl Into<String>,
980 ) -> Result<Handle<Child::Msg>, RegistryError> {
981 let name = name.into();
982 let registry = self.ctx.registry.clone();
983 let mut reg = registry.write().map_err(|_| RegistryError::PoisonErr)?;
984
985 if reg.contains_key(&name) {
986 return Err(RegistryError::NameTaken(name.clone()));
987 }
988
989 self.registry_key = Some(name.clone());
990 let handle = self.spawn();
991 reg.insert(name, Box::new(handle.clone()));
992
993 Ok(handle)
994 }
995}
996
997#[derive(Debug)]
998enum Restart {
999 No,
1000 In(Duration),
1001}
1002
1003impl Restart {
1004 fn from_supervision(supervision: Supervision, current_restarts: u64) -> Self {
1005 match supervision {
1006 Supervision::Stop => Restart::No,
1007 Supervision::Resume => Restart::No,
1008 Supervision::Restart { max, .. } if max == current_restarts + 1 => Restart::No,
1009 Supervision::Restart { backoff, .. } => {
1010 let wait = match backoff {
1011 Backoff::None => Duration::ZERO,
1012 Backoff::Static(duration) => duration,
1013 Backoff::Incremental { min, max, step } => {
1014 let wait = step.mul_f64((current_restarts + 1) as f64);
1015 let wait = cmp::min(max, wait);
1016 cmp::max(min, wait)
1017 }
1018 };
1019
1020 Restart::In(wait)
1021 }
1022 }
1023 }
1024}