speare/lib.rs
1use flume::{Receiver, Sender};
2use futures_core::Stream;
3use std::any::Any;
4use std::{
5 cmp,
6 collections::HashMap,
7 future::Future,
8 sync::{Arc, RwLock},
9 time::Duration,
10};
11use tokio::{
12 task::{self, JoinSet},
13 time,
14};
15
16mod exit;
17mod node;
18mod pubsub;
19mod req_res;
20mod streams;
21mod watch;
22
23pub use exit::*;
24pub use node::*;
25pub use pubsub::PubSubError;
26pub use req_res::*;
27pub use streams::{SourceSet, Sources};
28
29use crate::pubsub::PubSub;
30use crate::watch::{NoWatch, OnErrTerminate, WatchFn};
31
32/// A thin abstraction over tokio tasks and flume channels, allowing for easy message passing
33/// with a supervision tree to handle failures.
34///
35/// ## Example
36/// ```
37/// use speare::{Ctx, Actor};
38/// use derive_more::From;
39///
40/// struct Counter {
41/// count: u32,
42/// }
43///
44/// struct CounterProps {
45/// initial_count: u32,
46/// max_count: u32,
47/// }
48///
49/// #[derive(From)]
50/// enum CounterMsg {
51/// Inc(u32),
52/// }
53///
54/// enum CounterErr {
55/// MaxCountExceeded,
56/// }
57///
58/// impl Actor for Counter {
59/// type Props = CounterProps;
60/// type Msg = CounterMsg;
61/// type Err = CounterErr;
62///
63/// async fn init(ctx: &mut Ctx<Self>) -> Result<Self, Self::Err> {
64/// Ok(Counter {
65/// count: ctx.props().initial_count,
66/// })
67/// }
68///
69/// async fn handle(&mut self, msg: Self::Msg, ctx: &mut Ctx<Self>) -> Result<(), Self::Err> {
70/// match msg {
71/// CounterMsg::Inc(x) => {
72/// self.count += x;
73///
74/// if self.count > ctx.props().max_count {
75/// return Err(CounterErr::MaxCountExceeded);
76/// }
77/// }
78/// }
79///
80/// Ok(())
81/// }
82/// }
83/// ```
84#[allow(unused_variables)]
85pub trait Actor: Sized + Send + 'static {
86 type Props: Send + 'static;
87 type Msg: Send + 'static;
88 type Err: Send + Sync + 'static;
89
90 /// Constructs the actor. Called on initial spawn and on every restart.
91 ///
92 /// # Example
93 /// ```ignore
94 /// async fn init(ctx: &mut Ctx<Self>) -> Result<Self, Self::Err> {
95 /// Ok(MyActor { count: ctx.props().initial })
96 /// }
97 /// ```
98 fn init(ctx: &mut Ctx<Self>) -> impl Future<Output = Result<Self, Self::Err>> + Send;
99
100 /// Cleanup hook called when the actor stops, restarts, or fails to init.
101 /// `this` is `None` if init failed.
102 ///
103 /// # Example
104 /// ```ignore
105 /// async fn exit(this: Option<Self>, reason: ExitReason<Self>, ctx: &mut Ctx<Self>) {
106 /// if let ExitReason::Err(e) = reason {
107 /// eprintln!("actor failed: {e:?}");
108 /// }
109 /// }
110 /// ```
111 fn exit(
112 this: Option<Self>,
113 reason: ExitReason<Self>,
114 ctx: &mut Ctx<Self>,
115 ) -> impl Future<Output = ()> + Send {
116 async {}
117 }
118
119 /// Sets up message sources (streams, intervals) after init.
120 ///
121 /// Sources added earlier in the [`SourceSet`] chain have higher polling priority.
122 /// If an earlier source is consistently ready, later sources may be starved.
123 ///
124 /// # Example
125 /// ```ignore
126 /// async fn sources(&self, ctx: &Ctx<Self>) -> Result<impl Sources<Self>, Self::Err> {
127 /// Ok(SourceSet::new()
128 /// .interval(time::interval(Duration::from_millis(100)), || Msg::Tick)
129 /// .stream(my_stream))
130 /// }
131 /// ```
132 fn sources(
133 &self,
134 ctx: &Ctx<Self>,
135 ) -> impl Future<Output = Result<impl Sources<Self>, Self::Err>> + Send {
136 async { Ok(SourceSet::new()) }
137 }
138
139 /// Called everytime your [`Actor`] receives a message.
140 ///
141 /// # Example
142 /// ```ignore
143 /// async fn handle(&mut self, msg: Self::Msg, ctx: &mut Ctx<Self>) -> Result<(), Self::Err> {
144 /// match msg {
145 /// Msg::Inc(n) => self.count += n,
146 /// }
147 ///
148 /// Ok(())
149 /// }
150 /// ```
151 fn handle(
152 &mut self,
153 msg: Self::Msg,
154 ctx: &mut Ctx<Self>,
155 ) -> impl Future<Output = Result<(), Self::Err>> + Send {
156 async { Ok(()) }
157 }
158}
159
160/// A handle to send messages to or stop an [`Actor`].
161pub struct Handle<Msg> {
162 msg_tx: Sender<Msg>,
163 proc_msg_tx: Sender<ProcMsg>,
164}
165
166impl<Msg> Clone for Handle<Msg> {
167 fn clone(&self) -> Self {
168 Self {
169 msg_tx: self.msg_tx.clone(),
170 proc_msg_tx: self.proc_msg_tx.clone(),
171 }
172 }
173}
174
175impl<Msg> Handle<Msg> {
176 /// Stops the [`Actor`] associated with this handle. Does not wait for the actor to finish.
177 ///
178 /// # Example
179 /// ```ignore
180 /// handle.stop();
181 /// ```
182 pub fn stop(&self) {
183 let (tx, _) = flume::unbounded();
184 let _ = self
185 .proc_msg_tx
186 .send(ProcMsg::FromHandle(ProcAction::Stop(tx)));
187 }
188
189 /// Restarts the [`Actor`] by re-running [`Actor::init`] and [`Actor::sources`]. Does not wait for the actor to finish.
190 ///
191 /// # Example
192 /// ```ignore
193 /// handle.restart();
194 /// ```
195 pub fn restart(&self) {
196 let _ = self
197 .proc_msg_tx
198 .send(ProcMsg::FromHandle(ProcAction::Restart));
199 }
200
201 /// Returns `true` if the [`Actor`] is still running.
202 ///
203 /// # Example
204 /// ```ignore
205 /// if handle.is_alive() {
206 /// handle.send(Msg::Ping);
207 /// }
208 /// ```
209 pub fn is_alive(&self) -> bool {
210 !self.msg_tx.is_disconnected()
211 }
212
213 /// Sends a message to the [`Actor`], returning `true` if the message was delivered
214 /// or `false` if the actor is no longer running.
215 /// Takes advantage of `From<_>` implementations on the message type.
216 ///
217 /// # Example
218 /// ```ignore
219 /// // Given `#[derive(From)] enum Msg { Inc(u32) }`:
220 /// handle.send(Msg::Inc(1));
221 /// handle.send(1u32); // works via From<u32>
222 /// ```
223 pub fn send<M: Into<Msg>>(&self, msg: M) -> bool {
224 self.msg_tx.send(msg.into()).is_ok()
225 }
226
227 /// Sends a message to the [`Actor`] after the given duration, failing silently if it is no longer running.
228 ///
229 /// # Example
230 /// ```ignore
231 /// handle.send_in(Msg::Timeout, Duration::from_secs(5));
232 /// ```
233 pub fn send_in<M>(&self, msg: M, duration: Duration)
234 where
235 Msg: 'static + Send,
236 M: 'static + Send + Into<Msg>,
237 {
238 let msg_tx = self.msg_tx.clone();
239
240 task::spawn(async move {
241 time::sleep(duration).await;
242 let _ = msg_tx.send(msg.into());
243 });
244 }
245
246 /// Sends a request and awaits a response. Requires `Msg: From<Request<Req, Res>>`.
247 ///
248 /// # Example
249 /// ```ignore
250 /// #[derive(From)]
251 /// enum Msg {
252 /// GetCount(Request<(), u32>),
253 /// }
254 ///
255 /// // sender side:
256 /// let count: u32 = handle.req(()).await?;
257 ///
258 /// // receiver side, inside handle():
259 /// Msg::GetCount(req) => req.reply(self.count),
260 /// ```
261 pub async fn req<Req, Res>(&self, req: Req) -> Result<Res, ReqErr>
262 where
263 Msg: From<Request<Req, Res>>,
264 {
265 let (req, res) = req_res(req);
266 self.send(req);
267 res.recv().await
268 }
269
270 /// Like [`Handle::req`], but uses a wrapper function to convert the [`Request`] into the message type.
271 /// Useful when the message variant can't implement `From<Request<Req, Res>>`.
272 ///
273 /// # Example
274 /// ```ignore
275 /// enum Msg {
276 /// GetCount(Request<(), u32>),
277 /// }
278 ///
279 /// let count: u32 = handle.reqw(Msg::GetCount, ()).await?;
280 /// ```
281 pub async fn reqw<F, Req, Res>(&self, to_req: F, req: Req) -> Result<Res, ReqErr>
282 where
283 F: Fn(Request<Req, Res>) -> Msg,
284 {
285 let (req, res) = req_res(req);
286 let msg = to_req(req);
287 self.send(msg);
288 res.recv().await
289 }
290
291 /// Like [`Handle::req`], but fails with [`ReqErr::Timeout`] if no response within the given [`Duration`].
292 ///
293 /// # Example
294 /// ```ignore
295 /// let count: u32 = handle.req_timeout((), Duration::from_secs(1)).await?;
296 /// ```
297 pub async fn req_timeout<Req, Res>(&self, req: Req, timeout: Duration) -> Result<Res, ReqErr>
298 where
299 Msg: From<Request<Req, Res>>,
300 {
301 let (req, res) = req_res(req);
302 self.send(req);
303 res.recv_timeout(timeout).await
304 }
305
306 /// Like [`Handle::reqw`], but fails with [`ReqErr::Timeout`] if no response within the given [`Duration`].
307 ///
308 /// # Example
309 /// ```ignore
310 /// let count: u32 = handle.reqw_timeout(Msg::GetCount, (), Duration::from_secs(1)).await?;
311 /// ```
312 pub async fn reqw_timeout<F, Req, Res>(
313 &self,
314 to_req: F,
315 req: Req,
316 timeout: Duration,
317 ) -> Result<Res, ReqErr>
318 where
319 F: Fn(Request<Req, Res>) -> Msg,
320 {
321 let (req, res) = req_res(req);
322 let msg = to_req(req);
323 self.send(msg);
324 res.recv_timeout(timeout).await
325 }
326}
327
328/// The context surrounding the current `Actor`.
329///
330/// Provides a collection of methods that allow you to:
331/// - spawn other actors as children of the current actor
332/// - access the `Handle<_>` for the currrent actor
333/// - access this actor's props
334/// - clear this actor's mailbox
335pub struct Ctx<P>
336where
337 P: Actor,
338{
339 id: u64,
340 props: P::Props,
341 handle: Handle<P::Msg>,
342 msg_rx: Receiver<P::Msg>,
343 parent_proc_msg_tx: Option<Sender<ProcMsg>>,
344 proc_msg_rx: Receiver<ProcMsg>,
345 children_proc_msg_tx: HashMap<u64, Sender<ProcMsg>>,
346 supervision: Supervision,
347 total_children: u64,
348 tasks: JoinSet<Result<P::Msg, P::Err>>,
349 restarts: u64,
350 registry_key: Option<String>,
351 registry: Arc<RwLock<HashMap<String, Box<dyn Any + Send + Sync>>>>,
352 pubsub: Arc<RwLock<PubSub>>,
353 subscription_ids: Vec<(String, u64)>,
354}
355
356impl<P> Ctx<P>
357where
358 P: Actor,
359{
360 /// Returns a reference to this [`Actor`]'s props. Props are set once at spawn time
361 /// and remain immutable for the lifetime of the actor, including across restarts.
362 ///
363 /// # Example
364 /// ```ignore
365 /// async fn init(ctx: &mut Ctx<Self>) -> Result<Self, Self::Err> {
366 /// Ok(MyActor { count: ctx.props().initial_count })
367 /// }
368 /// ```
369 pub fn props(&self) -> &P::Props {
370 &self.props
371 }
372
373 /// Returns a [`Handle`] to the current [`Actor`], allowing it to send messages to itself
374 /// or pass its handle to child actors.
375 ///
376 /// # Example
377 /// ```ignore
378 /// // schedule a message to self
379 /// ctx.this().send_in(Msg::Tick, Duration::from_secs(1));
380 /// ```
381 pub fn this(&self) -> &Handle<P::Msg> {
382 &self.handle
383 }
384
385 /// Drains all pending messages from this [`Actor`]'s mailbox. Useful during
386 /// restarts to discard stale messages via [`Actor::init`].
387 ///
388 /// # Example
389 /// ```ignore
390 /// async fn init(ctx: &mut Ctx<Self>) -> Result<Self, Self::Err> {
391 /// ctx.clear_mailbox();
392 /// Ok(MyActor::default())
393 /// }
394 /// ```
395 pub fn clear_mailbox(&self) {
396 self.msg_rx.drain();
397 }
398
399 /// Creates a [`SpawnBuilder`] for spawning a child [`Actor`]. The actor type is passed
400 /// as a generic parameter and its props as the argument. The child is supervised
401 /// by the current actor and will be stopped when the parent stops.
402 ///
403 /// # Example
404 /// ```ignore
405 /// let handle = ctx.actor::<Worker>(WorkerProps { id: 1 })
406 /// .supervision(Supervision::Restart {
407 /// max: Limit::Amount(3),
408 /// backoff: Backoff::None,
409 /// })
410 /// .spawn();
411 /// ```
412 pub fn actor<'a, Child>(&'a mut self, props: Child::Props) -> SpawnBuilder<'a, P, Child>
413 where
414 Child: Actor,
415 {
416 SpawnBuilder::new(self, props)
417 }
418
419 /// Restarts all child actors immediately, bypassing their supervision strategy.
420 /// Each child will re-run its [`Actor::init`] with the same props.
421 ///
422 /// This is fire-and-forget: it does not wait for children to finish restarting.
423 pub fn restart_children(&self) {
424 for child in self.children_proc_msg_tx.values() {
425 let _ = child.send(ProcMsg::FromParent(ProcAction::Restart));
426 }
427 }
428
429 /// Stops all child actors and waits for each to fully terminate before returning.
430 pub async fn stop_children(&mut self) {
431 let mut acks = Vec::with_capacity(self.total_children as usize);
432 for child in self.children_proc_msg_tx.values() {
433 let (ack_tx, ack_rx) = flume::unbounded();
434 let _ = child.send(ProcMsg::FromParent(ProcAction::Stop(ack_tx)));
435 acks.push(ack_rx);
436 }
437
438 for ack in acks {
439 let _ = ack.recv_async().await;
440 }
441
442 self.total_children = 0;
443 self.children_proc_msg_tx.clear();
444 }
445
446 /// Spawns a background async task. On completion, its `Ok` value is delivered
447 /// as a message to this [`Actor`]; its `Err` triggers the supervision strategy
448 /// that this actor's parent has set for it.
449 ///
450 /// Tasks are aborted when the actor stops, but **survive restarts**. If the
451 /// actor is restarted (via supervision or [`Ctx::restart_children`]), in-flight
452 /// tasks from the previous incarnation will continue running and their results
453 /// will still be delivered to the restarted actor's `handle()`.
454 ///
455 /// # Example
456 /// ```ignore
457 /// ctx.task(async {
458 /// let data = reqwest::get("https://example.com").await?.text().await?;
459 /// Ok(Msg::Fetched(data))
460 /// });
461 /// ```
462 pub fn task<F>(&mut self, f: F)
463 where
464 F: Future<Output = Result<P::Msg, P::Err>> + Send + 'static,
465 {
466 self.tasks.spawn(f);
467 }
468
469 /// Looks up a registered [`Actor`]'s [`Handle`] by its type. The actor must have been
470 /// spawned with [`SpawnBuilder::spawn_registered`].
471 ///
472 /// # Example
473 /// ```ignore
474 /// let logger = ctx.get_handle_for::<Logger>()?;
475 /// logger.send(LogMsg::Info("hello".into()));
476 /// ```
477 pub fn get_handle_for<A: Actor>(&self) -> Result<Handle<A::Msg>, RegistryError> {
478 let key = std::any::type_name::<A>();
479 let reg = self.registry.read().map_err(|_| RegistryError::PoisonErr)?;
480 reg.get(key)
481 .and_then(|h| h.downcast_ref::<Handle<A::Msg>>())
482 .cloned()
483 .ok_or_else(|| RegistryError::NotFound(key.to_string()))
484 }
485
486 /// Looks up a registered [`Actor`]'s [`Handle`] by name. The actor must have been
487 /// spawned with [`SpawnBuilder::spawn_named`].
488 ///
489 /// # Example
490 /// ```ignore
491 /// let worker = ctx.get_handle::<WorkerMsg>("worker-1")?;
492 /// worker.send(WorkerMsg::Start);
493 /// ```
494 pub fn get_handle<Msg: Send + 'static>(
495 &self,
496 name: &str,
497 ) -> Result<Handle<Msg>, RegistryError> {
498 let reg = self.registry.read().map_err(|_| RegistryError::PoisonErr)?;
499 reg.get(name)
500 .and_then(|h| h.downcast_ref::<Handle<Msg>>())
501 .cloned()
502 .ok_or_else(|| RegistryError::NotFound(name.to_string()))
503 }
504
505 /// Sends a message to a registered [`Actor`] looked up by type.
506 ///
507 /// # Example
508 /// ```ignore
509 /// // Assuming MetricsCollector was spawned with spawn_registered():
510 /// // ctx.actor::<MetricsCollector>(props).spawn_registered()?;
511 ///
512 /// // Any actor in the system can then send to it by type:
513 /// ctx.send::<MetricsCollector>(MetricsMsg::RecordLatency(42))?;
514 /// ```
515 pub fn send<A: Actor>(&self, msg: impl Into<A::Msg>) -> Result<(), RegistryError> {
516 let key = std::any::type_name::<A>();
517 let reg = self.registry.read().map_err(|_| RegistryError::PoisonErr)?;
518 match reg
519 .get(key)
520 .and_then(|h| h.downcast_ref::<Handle<A::Msg>>())
521 {
522 Some(handle) => {
523 handle.send(msg);
524 Ok(())
525 }
526 None => Err(RegistryError::NotFound(key.to_string())),
527 }
528 }
529
530 /// Sends a message to a registered [`Actor`] looked up by name.
531 ///
532 /// # Example
533 /// ```ignore
534 /// // Assuming a Worker was spawned with spawn_named():
535 /// // ctx.actor::<Worker>(props).spawn_named("worker-1")?;
536 ///
537 /// // Any actor in the system can then send to it by name:
538 /// ctx.send_to("worker-1", WorkerMsg::Start)?;
539 /// ```
540 pub fn send_to<Msg: Send + 'static>(
541 &self,
542 name: &str,
543 msg: impl Into<Msg>,
544 ) -> Result<(), RegistryError> {
545 let reg = self.registry.read().map_err(|_| RegistryError::PoisonErr)?;
546 match reg.get(name).and_then(|h| h.downcast_ref::<Handle<Msg>>()) {
547 Some(handle) => {
548 handle.send(msg);
549 Ok(())
550 }
551 None => Err(RegistryError::NotFound(name.to_string())),
552 }
553 }
554}
555
556#[allow(clippy::enum_variant_names)]
557#[derive(Debug)]
558enum ProcMsg {
559 /// Sent from child once it terminates
560 ChildTerminated {
561 child_id: u64,
562 },
563 FromParent(ProcAction),
564 FromHandle(ProcAction),
565}
566
567#[derive(Debug)]
568enum ProcAction {
569 Restart,
570 Stop(Sender<()>),
571}
572
573fn spawn<Child, W>(mut ctx: Ctx<Child>, delay: Option<Duration>, watch: W)
574where
575 Child: Actor,
576 W: OnErrTerminate<Child::Err>,
577{
578 tokio::spawn(async move {
579 if let Some(d) = delay.filter(|d| !d.is_zero()) {
580 time::sleep(d).await;
581 }
582
583 // restart is Some whenever we should restart
584 let mut restart = Restart::No;
585 let mut exit_reason = None;
586 let mut actor_created = None;
587 let mut stop_ack_tx = None;
588
589 match Child::init(&mut ctx).await {
590 Err(e) => {
591 exit_reason = Some(ExitReason::Err(e));
592 restart = Restart::from_supervision(ctx.supervision, ctx.restarts);
593 }
594
595 Ok(mut actor) => match actor.sources(&ctx).await {
596 Err(e) => {
597 exit_reason = Some(ExitReason::Err(e));
598 restart = Restart::from_supervision(ctx.supervision, ctx.restarts);
599 actor_created = Some(actor);
600 }
601
602 Ok(mut sources) => {
603 macro_rules! on_err {
604 ($e:expr) => {
605 if let Supervision::Resume = ctx.supervision {
606 continue;
607 }
608
609 restart = Restart::from_supervision(ctx.supervision, ctx.restarts);
610 exit_reason = Some(ExitReason::Err($e));
611 actor_created = Some(actor);
612 break;
613 };
614 }
615
616 loop {
617 tokio::select! {
618 biased;
619
620 proc_msg = ctx.proc_msg_rx.recv_async() => {
621 match proc_msg {
622 Err(_) => break,
623
624 Ok(ProcMsg::FromHandle(ProcAction::Stop(tx)) ) => {
625 exit_reason = Some(ExitReason::Handle);
626 stop_ack_tx = Some(tx);
627 break
628 },
629
630 Ok(ProcMsg::FromParent(ProcAction::Stop(tx))) => {
631 exit_reason = exit_reason.or(Some(ExitReason::Parent));
632 stop_ack_tx = Some(tx);
633 break
634 },
635
636 Ok(ProcMsg::FromParent(ProcAction::Restart)) => {
637 exit_reason = exit_reason.or(Some(ExitReason::Parent));
638 restart = Restart::In(Duration::ZERO);
639 break;
640 }
641
642
643 Ok(ProcMsg::FromHandle(ProcAction::Restart)) => {
644 exit_reason = exit_reason.or(Some(ExitReason::Handle));
645 restart = Restart::In(Duration::ZERO);
646 break;
647 }
648
649 Ok(ProcMsg::ChildTerminated { child_id, }) => {
650 if ctx.children_proc_msg_tx.remove(&child_id).is_some() {
651 ctx.total_children -= 1;
652 }
653 }
654 }
655 }
656
657 recvd = ctx.msg_rx.recv_async() => {
658 match recvd {
659 Err(_) => break,
660
661 Ok(msg) => {
662 if let Err(e) = actor.handle(msg, &mut ctx).await {
663 on_err!(e);
664 };
665 }
666 }
667 }
668
669 Some(Ok(msg)) = ctx.tasks.join_next() => {
670 match msg {
671 Err(e) => {
672 on_err!(e);
673 }
674
675 Ok(msg) => {
676 if let Err(e) = actor.handle(msg, &mut ctx).await {
677 on_err!(e);
678 };
679 }
680 }
681
682 }
683
684 Some(msg) = std::future::poll_fn(|cx| Pin::new(&mut sources).poll_next(cx)) => {
685 if let Err(e) = actor.handle(msg, &mut ctx).await {
686 on_err!(e);
687 };
688 }
689 }
690 }
691 }
692 },
693 }
694
695 ctx.stop_children().await;
696 let exit_reason = exit_reason.unwrap_or(ExitReason::Handle);
697
698 if let ExitReason::Err(_) = &exit_reason {
699 ctx.restarts += 1;
700 }
701
702 if let (Restart::No, ExitReason::Err(ref e)) = (&restart, &exit_reason) {
703 watch.on_err_terminate(e);
704 }
705
706 Child::exit(actor_created, exit_reason, &mut ctx).await;
707
708 // Clean up pub/sub subscriptions (runs on both stop and restart)
709 if !ctx.subscription_ids.is_empty() {
710 if let Ok(mut bus) = ctx.pubsub.write() {
711 for (topic, sub_id) in ctx.subscription_ids.drain(..) {
712 if let Some(entry) = bus.topics.get_mut(&topic) {
713 entry.subscribers.retain(|s| s.id != sub_id);
714 if entry.subscribers.is_empty() {
715 bus.topics.remove(&topic);
716 }
717 }
718 }
719 }
720 }
721
722 let _ = stop_ack_tx.map(|tx| tx.send(()));
723
724 if let Restart::In(duration) = restart {
725 spawn::<Child, W>(ctx, Some(duration), watch)
726 } else if let Some(parent_tx) = ctx.parent_proc_msg_tx {
727 if let Some(key) = ctx.registry_key.take() {
728 if let Ok(mut reg) = ctx.registry.write() {
729 reg.remove(&key);
730 }
731 }
732
733 let _ = parent_tx.send(ProcMsg::ChildTerminated { child_id: ctx.id });
734 }
735 });
736}
737
738/// Defines how a parent reacts when a child actor fails.
739///
740/// # Example
741/// ```ignore
742/// let supervision = Supervision::Restart {
743/// max: Limit::Amount(5),
744/// backoff: Backoff::Static(Duration::from_secs(1)),
745/// };
746/// ```
747#[derive(Debug, Clone, Copy)]
748pub enum Supervision {
749 /// Actor terminates on error.
750 Stop,
751 /// Actor continues processing the next message after an error.
752 Resume,
753 /// Actor is restarted on error, up to `max` times with optional `backoff`.
754 Restart { max: Limit, backoff: Backoff },
755}
756
757/// Delay strategy between restart attempts.
758///
759/// # Example
760/// ```ignore
761/// let backoff = Backoff::Incremental {
762/// min: Duration::from_millis(100),
763/// max: Duration::from_secs(5),
764/// step: Duration::from_millis(500),
765/// };
766/// ```
767#[derive(Debug, Clone, Copy)]
768pub enum Backoff {
769 /// Restart immediately with no delay.
770 None,
771 /// Wait a fixed duration between restarts.
772 Static(Duration),
773 /// Linearly increase delay from `min` to `max` by `step` per restart.
774 Incremental {
775 min: Duration,
776 max: Duration,
777 step: Duration,
778 },
779}
780
781/// Maximum number of restarts allowed.
782///
783/// # Example
784/// ```ignore
785/// let limit = Limit::Amount(3);
786/// ```
787#[derive(Debug, Clone, Copy)]
788pub enum Limit {
789 /// No limit on restarts.
790 None,
791 /// Restart at most this many times.
792 Amount(u64),
793}
794
795/// **Note**: `0` maps to [`Limit::None`] (unlimited), not zero restarts.
796/// If you want zero restarts (i.e., never restart), use [`Supervision::Stop`] instead.
797impl From<u64> for Limit {
798 fn from(value: u64) -> Self {
799 match value {
800 0 => Limit::None,
801 v => Limit::Amount(v),
802 }
803 }
804}
805
806impl PartialEq<u64> for Limit {
807 fn eq(&self, other: &u64) -> bool {
808 match self {
809 Limit::None => false,
810 Limit::Amount(n) => n == other,
811 }
812 }
813}
814
815#[derive(Debug, Clone)]
816pub enum RegistryError {
817 NameTaken(String),
818 NotFound(String),
819 PoisonErr,
820}
821
822impl std::fmt::Display for RegistryError {
823 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
824 match self {
825 RegistryError::NameTaken(name) => write!(f, "registry name already taken: {name}"),
826 RegistryError::NotFound(name) => write!(f, "no actor registered under: {name}"),
827 RegistryError::PoisonErr => write!(f, "registry lock poisoned"),
828 }
829 }
830}
831
832impl std::error::Error for RegistryError {}
833
834/// Builder for configuring and spawning a child [`Actor`]. Created via [`Ctx::actor`].
835pub struct SpawnBuilder<'a, Parent, Child, W = NoWatch>
836where
837 Parent: Actor,
838 Child: Actor,
839{
840 ctx: &'a mut Ctx<Parent>,
841 props: Child::Props,
842 supervision: Supervision,
843 /// Only kicks in if child is stopped or reaches maximum number of restarts.
844 watch: W,
845 registry_key: Option<String>,
846}
847
848impl<'a, Parent, Child> SpawnBuilder<'a, Parent, Child, NoWatch>
849where
850 Parent: Actor,
851 Child: Actor,
852{
853 fn new(ctx: &'a mut Ctx<Parent>, props: Child::Props) -> Self {
854 Self {
855 ctx,
856 props,
857 supervision: Supervision::Restart {
858 max: Limit::None,
859 backoff: Backoff::None,
860 },
861 watch: NoWatch,
862 registry_key: None,
863 }
864 }
865}
866
867impl<'a, Parent, Child, W> SpawnBuilder<'a, Parent, Child, W>
868where
869 Parent: Actor,
870 Child: Actor,
871 W: OnErrTerminate<Child::Err>,
872{
873 /// Sets the [`Supervision`] strategy the parent will use for this child.
874 /// Defaults to [`Supervision::Restart`] with unlimited restarts and no backoff.
875 ///
876 /// # Example
877 /// ```ignore
878 /// ctx.actor::<Worker>(props)
879 /// .supervision(Supervision::Restart {
880 /// max: Limit::Amount(3),
881 /// backoff: Backoff::Static(Duration::from_secs(1)),
882 /// })
883 /// .spawn();
884 /// ```
885 pub fn supervision(mut self, supervision: Supervision) -> Self {
886 self.supervision = supervision;
887 self
888 }
889
890 /// Registers a callback that fires when the child terminates due to an error.
891 /// This happens when the supervision strategy is [`Supervision::Stop`], or when
892 /// [`Supervision::Restart`] has exhausted all allowed restarts. The callback maps
893 /// the child's error into a message for the parent.
894 ///
895 /// # Example
896 /// ```ignore
897 /// ctx.actor::<Worker>(props)
898 /// .supervision(Supervision::Restart {
899 /// max: Limit::Amount(3),
900 /// backoff: Backoff::None,
901 /// })
902 /// .watch(|err| ParentMsg::WorkerDied(format!("{err:?}")))
903 /// .spawn();
904 /// ```
905 pub fn watch<F>(self, f: F) -> SpawnBuilder<'a, Parent, Child, WatchFn<F, Parent::Msg>>
906 where
907 F: Fn(&Child::Err) -> Parent::Msg + Send + 'static,
908 {
909 let parent_msg_tx = self.ctx.handle.msg_tx.clone();
910 SpawnBuilder {
911 ctx: self.ctx,
912 props: self.props,
913 supervision: self.supervision,
914 watch: WatchFn { f, parent_msg_tx },
915 registry_key: self.registry_key,
916 }
917 }
918
919 /// Spawns the child [`Actor`] and returns a [`Handle`] to it.
920 pub fn spawn(self) -> Handle<Child::Msg> {
921 let (msg_tx, msg_rx) = flume::unbounded(); // child
922 let (proc_msg_tx, proc_msg_rx) = flume::unbounded(); // child
923
924 let handle = Handle {
925 msg_tx,
926 proc_msg_tx,
927 };
928
929 self.ctx.total_children += 1;
930 let id = self.ctx.total_children;
931
932 let ctx: Ctx<Child> = Ctx {
933 id,
934 props: self.props,
935 handle: handle.clone(),
936 msg_rx,
937 parent_proc_msg_tx: Some(self.ctx.handle.proc_msg_tx.clone()),
938 proc_msg_rx,
939 children_proc_msg_tx: HashMap::new(),
940 total_children: 0,
941 supervision: self.supervision,
942 restarts: 0,
943 tasks: JoinSet::new(),
944 registry_key: self.registry_key,
945 registry: self.ctx.registry.clone(),
946 pubsub: self.ctx.pubsub.clone(),
947 subscription_ids: Vec::new(),
948 };
949
950 spawn::<Child, W>(ctx, None, self.watch);
951
952 self.ctx
953 .children_proc_msg_tx
954 .insert(self.ctx.total_children, handle.proc_msg_tx.clone());
955
956 handle
957 }
958
959 /// Spawns the child and registers it in the global registry under its type name.
960 /// Other actors can then look it up via [`Ctx::get_handle_for`] or [`Ctx::send`].
961 /// Returns [`RegistryError::NameTaken`] if already registered.
962 pub fn spawn_registered(self) -> Result<Handle<Child::Msg>, RegistryError> {
963 let key = std::any::type_name::<Child>();
964 self.spawn_named(key)
965 }
966
967 /// Spawns the child and registers it in the global registry under the given name.
968 /// Other actors can then look it up via [`Ctx::get_handle`] or [`Ctx::send_to`].
969 /// Returns [`RegistryError::NameTaken`] if the name is already taken.
970 ///
971 /// # Example
972 /// ```ignore
973 /// let h = ctx.actor::<Worker>(props).spawn_named("worker-1")?;
974 /// ```
975 pub fn spawn_named(
976 mut self,
977 name: impl Into<String>,
978 ) -> Result<Handle<Child::Msg>, RegistryError> {
979 let name = name.into();
980 let registry = self.ctx.registry.clone();
981 let mut reg = registry.write().map_err(|_| RegistryError::PoisonErr)?;
982
983 if reg.contains_key(&name) {
984 return Err(RegistryError::NameTaken(name.clone()));
985 }
986
987 self.registry_key = Some(name.clone());
988 let handle = self.spawn();
989 reg.insert(name, Box::new(handle.clone()));
990
991 Ok(handle)
992 }
993}
994
995#[derive(Debug)]
996enum Restart {
997 No,
998 In(Duration),
999}
1000
1001impl Restart {
1002 fn from_supervision(supervision: Supervision, current_restarts: u64) -> Self {
1003 match supervision {
1004 Supervision::Stop => Restart::No,
1005 Supervision::Resume => Restart::No,
1006 Supervision::Restart { max, .. } if max == current_restarts + 1 => Restart::No,
1007 Supervision::Restart { backoff, .. } => {
1008 let wait = match backoff {
1009 Backoff::None => Duration::ZERO,
1010 Backoff::Static(duration) => duration,
1011 Backoff::Incremental { min, max, step } => {
1012 let wait = step.mul_f64((current_restarts + 1) as f64);
1013 let wait = cmp::min(max, wait);
1014 cmp::max(min, wait)
1015 }
1016 };
1017
1018 Restart::In(wait)
1019 }
1020 }
1021 }
1022}