Skip to main content

speare/
lib.rs

1use flume::{Receiver, Sender};
2use futures_core::Stream;
3use std::any::Any;
4use std::{
5    cmp,
6    collections::HashMap,
7    future::Future,
8    sync::{Arc, RwLock},
9    time::Duration,
10};
11use tokio::{
12    task::{self, JoinSet},
13    time,
14};
15
16mod exit;
17mod node;
18mod pubsub;
19mod req_res;
20mod streams;
21mod watch;
22
23pub use exit::*;
24pub use node::*;
25pub use pubsub::PubSubError;
26pub use req_res::*;
27pub use streams::{SourceSet, Sources};
28
29use crate::pubsub::PubSub;
30use crate::watch::{NoWatch, OnErrTerminate, WatchFn};
31
32/// A thin abstraction over tokio tasks and flume channels, allowing for easy message passing
33/// with a supervision tree to handle failures.
34///
35/// ## Example
36/// ```
37/// use speare::{Ctx, Actor};
38/// use derive_more::From;
39///
40/// struct Counter {
41///     count: u32,
42/// }
43///
44/// struct CounterProps {
45///     initial_count: u32,
46///     max_count: u32,
47/// }
48///
49/// #[derive(From)]
50/// enum CounterMsg {
51///     Inc(u32),
52/// }
53///
54/// enum CounterErr {
55///     MaxCountExceeded,
56/// }
57///
58/// impl Actor for Counter {
59///     type Props = CounterProps;
60///     type Msg = CounterMsg;
61///     type Err = CounterErr;
62///
63///     async fn init(ctx: &mut Ctx<Self>) -> Result<Self, Self::Err> {
64///         Ok(Counter {
65///             count: ctx.props().initial_count,
66///         })
67///     }
68///
69///     async fn handle(&mut self, msg: Self::Msg, ctx: &mut Ctx<Self>) -> Result<(), Self::Err> {
70///         match msg {
71///             CounterMsg::Inc(x) => {
72///                 self.count += x;
73///
74///                 if self.count > ctx.props().max_count {
75///                     return Err(CounterErr::MaxCountExceeded);
76///                 }
77///             }
78///         }
79///
80///         Ok(())
81///     }
82/// }
83/// ```
84#[allow(unused_variables)]
85pub trait Actor: Sized + Send + 'static {
86    type Props: Send + 'static;
87    type Msg: Send + 'static;
88    type Err: Send + Sync + 'static;
89
90    /// Constructs the actor. Called on initial spawn and on every restart.
91    ///
92    /// # Example
93    /// ```ignore
94    /// async fn init(ctx: &mut Ctx<Self>) -> Result<Self, Self::Err> {
95    ///     Ok(MyActor { count: ctx.props().initial })
96    /// }
97    /// ```
98    fn init(ctx: &mut Ctx<Self>) -> impl Future<Output = Result<Self, Self::Err>> + Send;
99
100    /// Cleanup hook called when the actor stops, restarts, or fails to init.
101    /// `this` is `None` if init failed.
102    ///
103    /// # Example
104    /// ```ignore
105    /// async fn exit(this: Option<Self>, reason: ExitReason<Self>, ctx: &mut Ctx<Self>) {
106    ///     if let ExitReason::Err(e) = reason {
107    ///         eprintln!("actor failed: {e:?}");
108    ///     }
109    /// }
110    /// ```
111    fn exit(
112        this: Option<Self>,
113        reason: ExitReason<Self>,
114        ctx: &mut Ctx<Self>,
115    ) -> impl Future<Output = ()> + Send {
116        async {}
117    }
118
119    /// Sets up message sources (streams, intervals) after init.
120    ///
121    /// Sources added earlier in the [`SourceSet`] chain have higher polling priority.
122    /// If an earlier source is consistently ready, later sources may be starved.
123    ///
124    /// # Example
125    /// ```ignore
126    /// async fn sources(&self, ctx: &Ctx<Self>) -> Result<impl Sources<Self>, Self::Err> {
127    ///     Ok(SourceSet::new()
128    ///         .interval(time::interval(Duration::from_millis(100)), || Msg::Tick)
129    ///         .stream(my_stream))
130    /// }
131    /// ```
132    fn sources(
133        &self,
134        ctx: &Ctx<Self>,
135    ) -> impl Future<Output = Result<impl Sources<Self>, Self::Err>> + Send {
136        async { Ok(SourceSet::new()) }
137    }
138
139    /// Called everytime your [`Actor`] receives a message.
140    ///
141    /// # Example
142    /// ```ignore
143    /// async fn handle(&mut self, msg: Self::Msg, ctx: &mut Ctx<Self>) -> Result<(), Self::Err> {
144    ///     match msg {
145    ///         Msg::Inc(n) => self.count += n,
146    ///     }
147    ///
148    ///     Ok(())
149    /// }
150    /// ```
151    fn handle(
152        &mut self,
153        msg: Self::Msg,
154        ctx: &mut Ctx<Self>,
155    ) -> impl Future<Output = Result<(), Self::Err>> + Send {
156        async { Ok(()) }
157    }
158}
159
160/// A handle to send messages to or stop an [`Actor`].
161pub struct Handle<Msg> {
162    msg_tx: Sender<Msg>,
163    proc_msg_tx: Sender<ProcMsg>,
164}
165
166impl<Msg> Clone for Handle<Msg> {
167    fn clone(&self) -> Self {
168        Self {
169            msg_tx: self.msg_tx.clone(),
170            proc_msg_tx: self.proc_msg_tx.clone(),
171        }
172    }
173}
174
175impl<Msg> Handle<Msg> {
176    /// Stops the [`Actor`] associated with this handle. Does not wait for the actor to finish.
177    ///
178    /// # Example
179    /// ```ignore
180    /// handle.stop();
181    /// ```
182    pub fn stop(&self) {
183        let (tx, _) = flume::unbounded();
184        let _ = self
185            .proc_msg_tx
186            .send(ProcMsg::FromHandle(ProcAction::Stop(tx)));
187    }
188
189    /// Restarts the [`Actor`] by re-running [`Actor::init`] and [`Actor::sources`]. Does not wait for the actor to finish.
190    ///
191    /// # Example
192    /// ```ignore
193    /// handle.restart();
194    /// ```
195    pub fn restart(&self) {
196        let _ = self
197            .proc_msg_tx
198            .send(ProcMsg::FromHandle(ProcAction::Restart));
199    }
200
201    /// Returns `true` if the [`Actor`] is still running.
202    ///
203    /// # Example
204    /// ```ignore
205    /// if handle.is_alive() {
206    ///     handle.send(Msg::Ping);
207    /// }
208    /// ```
209    pub fn is_alive(&self) -> bool {
210        !self.msg_tx.is_disconnected()
211    }
212
213    /// Sends a message to the [`Actor`], returning `true` if the message was delivered
214    /// or `false` if the actor is no longer running.
215    /// Takes advantage of `From<_>` implementations on the message type.
216    ///
217    /// # Example
218    /// ```ignore
219    /// // Given `#[derive(From)] enum Msg { Inc(u32) }`:
220    /// handle.send(Msg::Inc(1));
221    /// handle.send(1u32); // works via From<u32>
222    /// ```
223    pub fn send<M: Into<Msg>>(&self, msg: M) -> bool {
224        self.msg_tx.send(msg.into()).is_ok()
225    }
226
227    /// Sends a message to the [`Actor`] after the given duration, failing silently if it is no longer running.
228    ///
229    /// # Example
230    /// ```ignore
231    /// handle.send_in(Msg::Timeout, Duration::from_secs(5));
232    /// ```
233    pub fn send_in<M>(&self, msg: M, duration: Duration)
234    where
235        Msg: 'static + Send,
236        M: 'static + Send + Into<Msg>,
237    {
238        let msg_tx = self.msg_tx.clone();
239
240        task::spawn(async move {
241            time::sleep(duration).await;
242            let _ = msg_tx.send(msg.into());
243        });
244    }
245
246    /// Sends a request and awaits a response. Requires `Msg: From<Request<Req, Res>>`.
247    ///
248    /// # Example
249    /// ```ignore
250    /// #[derive(From)]
251    /// enum Msg {
252    ///     GetCount(Request<(), u32>),
253    /// }
254    ///
255    /// // sender side:
256    /// let count: u32 = handle.req(()).await?;
257    ///
258    /// // receiver side, inside handle():
259    /// Msg::GetCount(req) => req.reply(self.count),
260    /// ```
261    pub async fn req<Req, Res>(&self, req: Req) -> Result<Res, ReqErr>
262    where
263        Msg: From<Request<Req, Res>>,
264    {
265        let (req, res) = req_res(req);
266        self.send(req);
267        res.recv().await
268    }
269
270    /// Like [`Handle::req`], but uses a wrapper function to convert the [`Request`] into the message type.
271    /// Useful when the message variant can't implement `From<Request<Req, Res>>`.
272    ///
273    /// # Example
274    /// ```ignore
275    /// enum Msg {
276    ///     GetCount(Request<(), u32>),
277    /// }
278    ///
279    /// let count: u32 = handle.reqw(Msg::GetCount, ()).await?;
280    /// ```
281    pub async fn reqw<F, Req, Res>(&self, to_req: F, req: Req) -> Result<Res, ReqErr>
282    where
283        F: Fn(Request<Req, Res>) -> Msg,
284    {
285        let (req, res) = req_res(req);
286        let msg = to_req(req);
287        self.send(msg);
288        res.recv().await
289    }
290
291    /// Like [`Handle::req`], but fails with [`ReqErr::Timeout`] if no response within the given [`Duration`].
292    ///
293    /// # Example
294    /// ```ignore
295    /// let count: u32 = handle.req_timeout((), Duration::from_secs(1)).await?;
296    /// ```
297    pub async fn req_timeout<Req, Res>(&self, req: Req, timeout: Duration) -> Result<Res, ReqErr>
298    where
299        Msg: From<Request<Req, Res>>,
300    {
301        let (req, res) = req_res(req);
302        self.send(req);
303        res.recv_timeout(timeout).await
304    }
305
306    /// Like [`Handle::reqw`], but fails with [`ReqErr::Timeout`] if no response within the given [`Duration`].
307    ///
308    /// # Example
309    /// ```ignore
310    /// let count: u32 = handle.reqw_timeout(Msg::GetCount, (), Duration::from_secs(1)).await?;
311    /// ```
312    pub async fn reqw_timeout<F, Req, Res>(
313        &self,
314        to_req: F,
315        req: Req,
316        timeout: Duration,
317    ) -> Result<Res, ReqErr>
318    where
319        F: Fn(Request<Req, Res>) -> Msg,
320    {
321        let (req, res) = req_res(req);
322        let msg = to_req(req);
323        self.send(msg);
324        res.recv_timeout(timeout).await
325    }
326}
327
328/// The context surrounding the current `Actor`.
329///
330/// Provides a collection of methods that allow you to:
331/// - spawn other actors as children of the current actor
332/// - access the `Handle<_>` for the currrent actor
333/// - access this actor's props
334/// - clear this actor's mailbox
335pub struct Ctx<P>
336where
337    P: Actor,
338{
339    id: u64,
340    props: P::Props,
341    handle: Handle<P::Msg>,
342    msg_rx: Receiver<P::Msg>,
343    parent_proc_msg_tx: Option<Sender<ProcMsg>>,
344    proc_msg_rx: Receiver<ProcMsg>,
345    children_proc_msg_tx: HashMap<u64, Sender<ProcMsg>>,
346    supervision: Supervision,
347    total_children: u64,
348    tasks: JoinSet<Result<P::Msg, P::Err>>,
349    restarts: u64,
350    registry_key: Option<String>,
351    registry: Arc<RwLock<HashMap<String, Box<dyn Any + Send + Sync>>>>,
352    pubsub: Arc<RwLock<PubSub>>,
353    subscription_ids: Vec<(String, u64)>,
354}
355
356impl<P> Ctx<P>
357where
358    P: Actor,
359{
360    /// Returns a reference to this [`Actor`]'s props. Props are set once at spawn time
361    /// and remain immutable for the lifetime of the actor, including across restarts.
362    ///
363    /// # Example
364    /// ```ignore
365    /// async fn init(ctx: &mut Ctx<Self>) -> Result<Self, Self::Err> {
366    ///     Ok(MyActor { count: ctx.props().initial_count })
367    /// }
368    /// ```
369    pub fn props(&self) -> &P::Props {
370        &self.props
371    }
372
373    /// Returns a [`Handle`] to the current [`Actor`], allowing it to send messages to itself
374    /// or pass its handle to child actors.
375    ///
376    /// # Example
377    /// ```ignore
378    /// // schedule a message to self
379    /// ctx.this().send_in(Msg::Tick, Duration::from_secs(1));
380    /// ```
381    pub fn this(&self) -> &Handle<P::Msg> {
382        &self.handle
383    }
384
385    /// Drains all pending messages from this [`Actor`]'s mailbox. Useful during
386    /// restarts to discard stale messages via [`Actor::init`].
387    ///
388    /// # Example
389    /// ```ignore
390    /// async fn init(ctx: &mut Ctx<Self>) -> Result<Self, Self::Err> {
391    ///     ctx.clear_mailbox();
392    ///     Ok(MyActor::default())
393    /// }
394    /// ```
395    pub fn clear_mailbox(&self) {
396        self.msg_rx.drain();
397    }
398
399    /// Creates a [`SpawnBuilder`] for spawning a child [`Actor`]. The actor type is passed
400    /// as a generic parameter and its props as the argument. The child is supervised
401    /// by the current actor and will be stopped when the parent stops.
402    ///
403    /// # Example
404    /// ```ignore
405    /// let handle = ctx.actor::<Worker>(WorkerProps { id: 1 })
406    ///     .supervision(Supervision::Restart {
407    ///         max: Limit::Amount(3),
408    ///         backoff: Backoff::None,
409    ///     })
410    ///     .spawn();
411    /// ```
412    pub fn actor<'a, Child>(&'a mut self, props: Child::Props) -> SpawnBuilder<'a, P, Child>
413    where
414        Child: Actor,
415    {
416        SpawnBuilder::new(self, props)
417    }
418
419    /// Restarts all child actors immediately, bypassing their supervision strategy.
420    /// Each child will re-run its [`Actor::init`] with the same props.
421    ///
422    /// This is fire-and-forget: it does not wait for children to finish restarting.
423    pub fn restart_children(&self) {
424        for child in self.children_proc_msg_tx.values() {
425            let _ = child.send(ProcMsg::FromParent(ProcAction::Restart));
426        }
427    }
428
429    /// Stops all child actors and waits for each to fully terminate before returning.
430    pub async fn stop_children(&mut self) {
431        let mut acks = Vec::with_capacity(self.total_children as usize);
432        for child in self.children_proc_msg_tx.values() {
433            let (ack_tx, ack_rx) = flume::unbounded();
434            let _ = child.send(ProcMsg::FromParent(ProcAction::Stop(ack_tx)));
435            acks.push(ack_rx);
436        }
437
438        for ack in acks {
439            let _ = ack.recv_async().await;
440        }
441
442        self.total_children = 0;
443        self.children_proc_msg_tx.clear();
444    }
445
446    /// Spawns a background async task. On completion, its `Ok` value is delivered
447    /// as a message to this [`Actor`]; its `Err` triggers the supervision strategy
448    /// that this actor's parent has set for it.
449    ///
450    /// Tasks are aborted when the actor stops, but **survive restarts**. If the
451    /// actor is restarted (via supervision or [`Ctx::restart_children`]), in-flight
452    /// tasks from the previous incarnation will continue running and their results
453    /// will still be delivered to the restarted actor's `handle()`.
454    ///
455    /// # Example
456    /// ```ignore
457    /// ctx.task(async {
458    ///     let data = reqwest::get("https://example.com").await?.text().await?;
459    ///     Ok(Msg::Fetched(data))
460    /// });
461    /// ```
462    pub fn task<F>(&mut self, f: F)
463    where
464        F: Future<Output = Result<P::Msg, P::Err>> + Send + 'static,
465    {
466        self.tasks.spawn(f);
467    }
468
469    /// Looks up a registered [`Actor`]'s [`Handle`] by its type. The actor must have been
470    /// spawned with [`SpawnBuilder::spawn_registered`].
471    ///
472    /// # Example
473    /// ```ignore
474    /// let logger = ctx.get_handle_for::<Logger>()?;
475    /// logger.send(LogMsg::Info("hello".into()));
476    /// ```
477    pub fn get_handle_for<A: Actor>(&self) -> Result<Handle<A::Msg>, RegistryError> {
478        let key = std::any::type_name::<A>();
479        let reg = self.registry.read().map_err(|_| RegistryError::PoisonErr)?;
480        reg.get(key)
481            .and_then(|h| h.downcast_ref::<Handle<A::Msg>>())
482            .cloned()
483            .ok_or_else(|| RegistryError::NotFound(key.to_string()))
484    }
485
486    /// Looks up a registered [`Actor`]'s [`Handle`] by name. The actor must have been
487    /// spawned with [`SpawnBuilder::spawn_named`].
488    ///
489    /// # Example
490    /// ```ignore
491    /// let worker = ctx.get_handle::<WorkerMsg>("worker-1")?;
492    /// worker.send(WorkerMsg::Start);
493    /// ```
494    pub fn get_handle<Msg: Send + 'static>(
495        &self,
496        name: &str,
497    ) -> Result<Handle<Msg>, RegistryError> {
498        let reg = self.registry.read().map_err(|_| RegistryError::PoisonErr)?;
499        reg.get(name)
500            .and_then(|h| h.downcast_ref::<Handle<Msg>>())
501            .cloned()
502            .ok_or_else(|| RegistryError::NotFound(name.to_string()))
503    }
504
505    /// Sends a message to a registered [`Actor`] looked up by type.
506    ///
507    /// # Example
508    /// ```ignore
509    /// // Assuming MetricsCollector was spawned with spawn_registered():
510    /// // ctx.actor::<MetricsCollector>(props).spawn_registered()?;
511    ///
512    /// // Any actor in the system can then send to it by type:
513    /// ctx.send::<MetricsCollector>(MetricsMsg::RecordLatency(42))?;
514    /// ```
515    pub fn send<A: Actor>(&self, msg: impl Into<A::Msg>) -> Result<(), RegistryError> {
516        let key = std::any::type_name::<A>();
517        let reg = self.registry.read().map_err(|_| RegistryError::PoisonErr)?;
518        match reg
519            .get(key)
520            .and_then(|h| h.downcast_ref::<Handle<A::Msg>>())
521        {
522            Some(handle) => {
523                handle.send(msg);
524                Ok(())
525            }
526            None => Err(RegistryError::NotFound(key.to_string())),
527        }
528    }
529
530    /// Sends a message to a registered [`Actor`] looked up by name.
531    ///
532    /// # Example
533    /// ```ignore
534    /// // Assuming a Worker was spawned with spawn_named():
535    /// // ctx.actor::<Worker>(props).spawn_named("worker-1")?;
536    ///
537    /// // Any actor in the system can then send to it by name:
538    /// ctx.send_to("worker-1", WorkerMsg::Start)?;
539    /// ```
540    pub fn send_to<Msg: Send + 'static>(
541        &self,
542        name: &str,
543        msg: impl Into<Msg>,
544    ) -> Result<(), RegistryError> {
545        let reg = self.registry.read().map_err(|_| RegistryError::PoisonErr)?;
546        match reg.get(name).and_then(|h| h.downcast_ref::<Handle<Msg>>()) {
547            Some(handle) => {
548                handle.send(msg);
549                Ok(())
550            }
551            None => Err(RegistryError::NotFound(name.to_string())),
552        }
553    }
554}
555
556#[allow(clippy::enum_variant_names)]
557#[derive(Debug)]
558enum ProcMsg {
559    /// Sent from child once it terminates
560    ChildTerminated {
561        child_id: u64,
562    },
563    FromParent(ProcAction),
564    FromHandle(ProcAction),
565}
566
567#[derive(Debug)]
568enum ProcAction {
569    Restart,
570    Stop(Sender<()>),
571}
572
573fn spawn<Child, W>(mut ctx: Ctx<Child>, delay: Option<Duration>, watch: W)
574where
575    Child: Actor,
576    W: OnErrTerminate<Child::Err>,
577{
578    tokio::spawn(async move {
579        if let Some(d) = delay.filter(|d| !d.is_zero()) {
580            time::sleep(d).await;
581        }
582
583        // restart is Some whenever we should restart
584        let mut restart = Restart::No;
585        let mut exit_reason = None;
586        let mut actor_created = None;
587        let mut stop_ack_tx = None;
588
589        match Child::init(&mut ctx).await {
590            Err(e) => {
591                exit_reason = Some(ExitReason::Err(e));
592                restart = Restart::from_supervision(ctx.supervision, ctx.restarts);
593            }
594
595            Ok(mut actor) => match actor.sources(&ctx).await {
596                Err(e) => {
597                    exit_reason = Some(ExitReason::Err(e));
598                    restart = Restart::from_supervision(ctx.supervision, ctx.restarts);
599                    actor_created = Some(actor);
600                }
601
602                Ok(mut sources) => {
603                    macro_rules! on_err {
604                        ($e:expr) => {
605                            if let Supervision::Resume = ctx.supervision {
606                                continue;
607                            }
608
609                            restart = Restart::from_supervision(ctx.supervision, ctx.restarts);
610                            exit_reason = Some(ExitReason::Err($e));
611                            actor_created = Some(actor);
612                            break;
613                        };
614                    }
615
616                    loop {
617                        tokio::select! {
618                            biased;
619
620                            proc_msg = ctx.proc_msg_rx.recv_async() => {
621                                match proc_msg {
622                                    Err(_) => break,
623
624                                    Ok(ProcMsg::FromHandle(ProcAction::Stop(tx)) ) => {
625                                        exit_reason = Some(ExitReason::Handle);
626                                        stop_ack_tx = Some(tx);
627                                        break
628                                    },
629
630                                    Ok(ProcMsg::FromParent(ProcAction::Stop(tx))) => {
631                                        exit_reason = exit_reason.or(Some(ExitReason::Parent));
632                                        stop_ack_tx = Some(tx);
633                                        break
634                                    },
635
636                                    Ok(ProcMsg::FromParent(ProcAction::Restart)) => {
637                                        exit_reason = exit_reason.or(Some(ExitReason::Parent));
638                                        restart = Restart::In(Duration::ZERO);
639                                        break;
640                                    }
641
642
643                                    Ok(ProcMsg::FromHandle(ProcAction::Restart)) => {
644                                        exit_reason = exit_reason.or(Some(ExitReason::Handle));
645                                        restart = Restart::In(Duration::ZERO);
646                                        break;
647                                    }
648
649                                    Ok(ProcMsg::ChildTerminated { child_id, }) => {
650                                        if ctx.children_proc_msg_tx.remove(&child_id).is_some() {
651                                            ctx.total_children -= 1;
652                                        }
653                                    }
654                                }
655                            }
656
657                            recvd = ctx.msg_rx.recv_async() => {
658                                match recvd {
659                                    Err(_) => break,
660
661                                    Ok(msg) => {
662                                        if let Err(e) = actor.handle(msg, &mut ctx).await {
663                                            on_err!(e);
664                                        };
665                                    }
666                                }
667                            }
668
669                            Some(Ok(msg)) = ctx.tasks.join_next() => {
670                                match msg {
671                                    Err(e) => {
672                                        on_err!(e);
673                                    }
674
675                                    Ok(msg) => {
676                                        if let Err(e) = actor.handle(msg, &mut ctx).await {
677                                            on_err!(e);
678                                        };
679                                    }
680                                }
681
682                            }
683
684                            Some(msg) = std::future::poll_fn(|cx| Pin::new(&mut sources).poll_next(cx)) => {
685                                if let Err(e) = actor.handle(msg, &mut ctx).await {
686                                    on_err!(e);
687                                };
688                            }
689                        }
690                    }
691                }
692            },
693        }
694
695        ctx.stop_children().await;
696        let exit_reason = exit_reason.unwrap_or(ExitReason::Handle);
697
698        if let ExitReason::Err(_) = &exit_reason {
699            ctx.restarts += 1;
700        }
701
702        if let (Restart::No, ExitReason::Err(ref e)) = (&restart, &exit_reason) {
703            watch.on_err_terminate(e);
704        }
705
706        Child::exit(actor_created, exit_reason, &mut ctx).await;
707
708        // Clean up pub/sub subscriptions (runs on both stop and restart)
709        if !ctx.subscription_ids.is_empty() {
710            if let Ok(mut bus) = ctx.pubsub.write() {
711                for (topic, sub_id) in ctx.subscription_ids.drain(..) {
712                    if let Some(entry) = bus.topics.get_mut(&topic) {
713                        entry.subscribers.retain(|s| s.id != sub_id);
714                        if entry.subscribers.is_empty() {
715                            bus.topics.remove(&topic);
716                        }
717                    }
718                }
719            }
720        }
721
722        let _ = stop_ack_tx.map(|tx| tx.send(()));
723
724        if let Restart::In(duration) = restart {
725            spawn::<Child, W>(ctx, Some(duration), watch)
726        } else if let Some(parent_tx) = ctx.parent_proc_msg_tx {
727            if let Some(key) = ctx.registry_key.take() {
728                if let Ok(mut reg) = ctx.registry.write() {
729                    reg.remove(&key);
730                }
731            }
732
733            let _ = parent_tx.send(ProcMsg::ChildTerminated { child_id: ctx.id });
734        }
735    });
736}
737
738/// Defines how a parent reacts when a child actor fails.
739///
740/// # Example
741/// ```ignore
742/// let supervision = Supervision::Restart {
743///     max: Limit::Amount(5),
744///     backoff: Backoff::Static(Duration::from_secs(1)),
745/// };
746/// ```
747#[derive(Debug, Clone, Copy)]
748pub enum Supervision {
749    /// Actor terminates on error.
750    Stop,
751    /// Actor continues processing the next message after an error.
752    Resume,
753    /// Actor is restarted on error, up to `max` times with optional `backoff`.
754    Restart { max: Limit, backoff: Backoff },
755}
756
757/// Delay strategy between restart attempts.
758///
759/// # Example
760/// ```ignore
761/// let backoff = Backoff::Incremental {
762///     min: Duration::from_millis(100),
763///     max: Duration::from_secs(5),
764///     step: Duration::from_millis(500),
765/// };
766/// ```
767#[derive(Debug, Clone, Copy)]
768pub enum Backoff {
769    /// Restart immediately with no delay.
770    None,
771    /// Wait a fixed duration between restarts.
772    Static(Duration),
773    /// Linearly increase delay from `min` to `max` by `step` per restart.
774    Incremental {
775        min: Duration,
776        max: Duration,
777        step: Duration,
778    },
779}
780
781/// Maximum number of restarts allowed.
782///
783/// # Example
784/// ```ignore
785/// let limit = Limit::Amount(3);
786/// ```
787#[derive(Debug, Clone, Copy)]
788pub enum Limit {
789    /// No limit on restarts.
790    None,
791    /// Restart at most this many times.
792    Amount(u64),
793}
794
795/// **Note**: `0` maps to [`Limit::None`] (unlimited), not zero restarts.
796/// If you want zero restarts (i.e., never restart), use [`Supervision::Stop`] instead.
797impl From<u64> for Limit {
798    fn from(value: u64) -> Self {
799        match value {
800            0 => Limit::None,
801            v => Limit::Amount(v),
802        }
803    }
804}
805
806impl PartialEq<u64> for Limit {
807    fn eq(&self, other: &u64) -> bool {
808        match self {
809            Limit::None => false,
810            Limit::Amount(n) => n == other,
811        }
812    }
813}
814
815#[derive(Debug, Clone)]
816pub enum RegistryError {
817    NameTaken(String),
818    NotFound(String),
819    PoisonErr,
820}
821
822impl std::fmt::Display for RegistryError {
823    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
824        match self {
825            RegistryError::NameTaken(name) => write!(f, "registry name already taken: {name}"),
826            RegistryError::NotFound(name) => write!(f, "no actor registered under: {name}"),
827            RegistryError::PoisonErr => write!(f, "registry lock poisoned"),
828        }
829    }
830}
831
832impl std::error::Error for RegistryError {}
833
834/// Builder for configuring and spawning a child [`Actor`]. Created via [`Ctx::actor`].
835pub struct SpawnBuilder<'a, Parent, Child, W = NoWatch>
836where
837    Parent: Actor,
838    Child: Actor,
839{
840    ctx: &'a mut Ctx<Parent>,
841    props: Child::Props,
842    supervision: Supervision,
843    /// Only kicks in if child is stopped or reaches maximum number of restarts.
844    watch: W,
845    registry_key: Option<String>,
846}
847
848impl<'a, Parent, Child> SpawnBuilder<'a, Parent, Child, NoWatch>
849where
850    Parent: Actor,
851    Child: Actor,
852{
853    fn new(ctx: &'a mut Ctx<Parent>, props: Child::Props) -> Self {
854        Self {
855            ctx,
856            props,
857            supervision: Supervision::Restart {
858                max: Limit::None,
859                backoff: Backoff::None,
860            },
861            watch: NoWatch,
862            registry_key: None,
863        }
864    }
865}
866
867impl<'a, Parent, Child, W> SpawnBuilder<'a, Parent, Child, W>
868where
869    Parent: Actor,
870    Child: Actor,
871    W: OnErrTerminate<Child::Err>,
872{
873    /// Sets the [`Supervision`] strategy the parent will use for this child.
874    /// Defaults to [`Supervision::Restart`] with unlimited restarts and no backoff.
875    ///
876    /// # Example
877    /// ```ignore
878    /// ctx.actor::<Worker>(props)
879    ///     .supervision(Supervision::Restart {
880    ///         max: Limit::Amount(3),
881    ///         backoff: Backoff::Static(Duration::from_secs(1)),
882    ///     })
883    ///     .spawn();
884    /// ```
885    pub fn supervision(mut self, supervision: Supervision) -> Self {
886        self.supervision = supervision;
887        self
888    }
889
890    /// Registers a callback that fires when the child terminates due to an error.
891    /// This happens when the supervision strategy is [`Supervision::Stop`], or when
892    /// [`Supervision::Restart`] has exhausted all allowed restarts. The callback maps
893    /// the child's error into a message for the parent.
894    ///
895    /// # Example
896    /// ```ignore
897    /// ctx.actor::<Worker>(props)
898    ///     .supervision(Supervision::Restart {
899    ///         max: Limit::Amount(3),
900    ///         backoff: Backoff::None,
901    ///     })
902    ///     .watch(|err| ParentMsg::WorkerDied(format!("{err:?}")))
903    ///     .spawn();
904    /// ```
905    pub fn watch<F>(self, f: F) -> SpawnBuilder<'a, Parent, Child, WatchFn<F, Parent::Msg>>
906    where
907        F: Fn(&Child::Err) -> Parent::Msg + Send + 'static,
908    {
909        let parent_msg_tx = self.ctx.handle.msg_tx.clone();
910        SpawnBuilder {
911            ctx: self.ctx,
912            props: self.props,
913            supervision: self.supervision,
914            watch: WatchFn { f, parent_msg_tx },
915            registry_key: self.registry_key,
916        }
917    }
918
919    /// Spawns the child [`Actor`] and returns a [`Handle`] to it.
920    pub fn spawn(self) -> Handle<Child::Msg> {
921        let (msg_tx, msg_rx) = flume::unbounded(); // child
922        let (proc_msg_tx, proc_msg_rx) = flume::unbounded(); // child
923
924        let handle = Handle {
925            msg_tx,
926            proc_msg_tx,
927        };
928
929        self.ctx.total_children += 1;
930        let id = self.ctx.total_children;
931
932        let ctx: Ctx<Child> = Ctx {
933            id,
934            props: self.props,
935            handle: handle.clone(),
936            msg_rx,
937            parent_proc_msg_tx: Some(self.ctx.handle.proc_msg_tx.clone()),
938            proc_msg_rx,
939            children_proc_msg_tx: HashMap::new(),
940            total_children: 0,
941            supervision: self.supervision,
942            restarts: 0,
943            tasks: JoinSet::new(),
944            registry_key: self.registry_key,
945            registry: self.ctx.registry.clone(),
946            pubsub: self.ctx.pubsub.clone(),
947            subscription_ids: Vec::new(),
948        };
949
950        spawn::<Child, W>(ctx, None, self.watch);
951
952        self.ctx
953            .children_proc_msg_tx
954            .insert(self.ctx.total_children, handle.proc_msg_tx.clone());
955
956        handle
957    }
958
959    /// Spawns the child and registers it in the global registry under its type name.
960    /// Other actors can then look it up via [`Ctx::get_handle_for`] or [`Ctx::send`].
961    /// Returns [`RegistryError::NameTaken`] if already registered.
962    pub fn spawn_registered(self) -> Result<Handle<Child::Msg>, RegistryError> {
963        let key = std::any::type_name::<Child>();
964        self.spawn_named(key)
965    }
966
967    /// Spawns the child and registers it in the global registry under the given name.
968    /// Other actors can then look it up via [`Ctx::get_handle`] or [`Ctx::send_to`].
969    /// Returns [`RegistryError::NameTaken`] if the name is already taken.
970    ///
971    /// # Example
972    /// ```ignore
973    /// let h = ctx.actor::<Worker>(props).spawn_named("worker-1")?;
974    /// ```
975    pub fn spawn_named(
976        mut self,
977        name: impl Into<String>,
978    ) -> Result<Handle<Child::Msg>, RegistryError> {
979        let name = name.into();
980        let registry = self.ctx.registry.clone();
981        let mut reg = registry.write().map_err(|_| RegistryError::PoisonErr)?;
982
983        if reg.contains_key(&name) {
984            return Err(RegistryError::NameTaken(name.clone()));
985        }
986
987        self.registry_key = Some(name.clone());
988        let handle = self.spawn();
989        reg.insert(name, Box::new(handle.clone()));
990
991        Ok(handle)
992    }
993}
994
995#[derive(Debug)]
996enum Restart {
997    No,
998    In(Duration),
999}
1000
1001impl Restart {
1002    fn from_supervision(supervision: Supervision, current_restarts: u64) -> Self {
1003        match supervision {
1004            Supervision::Stop => Restart::No,
1005            Supervision::Resume => Restart::No,
1006            Supervision::Restart { max, .. } if max == current_restarts + 1 => Restart::No,
1007            Supervision::Restart { backoff, .. } => {
1008                let wait = match backoff {
1009                    Backoff::None => Duration::ZERO,
1010                    Backoff::Static(duration) => duration,
1011                    Backoff::Incremental { min, max, step } => {
1012                        let wait = step.mul_f64((current_restarts + 1) as f64);
1013                        let wait = cmp::min(max, wait);
1014                        cmp::max(min, wait)
1015                    }
1016                };
1017
1018                Restart::In(wait)
1019            }
1020        }
1021    }
1022}