Skip to main content

speare/
lib.rs

1use flume::{Receiver, Sender};
2use futures_core::Stream;
3use std::any::Any;
4use std::{
5    cmp,
6    collections::HashMap,
7    future::Future,
8    sync::{Arc, RwLock},
9    time::Duration,
10};
11use tokio::{
12    task::{self, JoinSet},
13    time,
14};
15
16pub mod mini;
17
18mod exit;
19mod node;
20mod pubsub;
21mod req_res;
22mod streams;
23mod watch;
24
25pub use exit::*;
26pub use node::*;
27pub use pubsub::PubSubError;
28pub use req_res::*;
29pub use streams::{SourceSet, Sources};
30
31use crate::pubsub::PubSub;
32use crate::watch::{NoWatch, OnErrTerminate, WatchFn};
33
34/// A thin abstraction over tokio tasks and flume channels, allowing for easy message passing
35/// with a supervision tree to handle failures.
36///
37/// ## Example
38/// ```
39/// use speare::{Ctx, Actor};
40/// use derive_more::From;
41///
42/// struct Counter {
43///     count: u32,
44/// }
45///
46/// struct CounterProps {
47///     initial_count: u32,
48///     max_count: u32,
49/// }
50///
51/// #[derive(From)]
52/// enum CounterMsg {
53///     Inc(u32),
54/// }
55///
56/// enum CounterErr {
57///     MaxCountExceeded,
58/// }
59///
60/// impl Actor for Counter {
61///     type Props = CounterProps;
62///     type Msg = CounterMsg;
63///     type Err = CounterErr;
64///
65///     async fn init(ctx: &mut Ctx<Self>) -> Result<Self, Self::Err> {
66///         Ok(Counter {
67///             count: ctx.props().initial_count,
68///         })
69///     }
70///
71///     async fn handle(&mut self, msg: Self::Msg, ctx: &mut Ctx<Self>) -> Result<(), Self::Err> {
72///         match msg {
73///             CounterMsg::Inc(x) => {
74///                 self.count += x;
75///
76///                 if self.count > ctx.props().max_count {
77///                     return Err(CounterErr::MaxCountExceeded);
78///                 }
79///             }
80///         }
81///
82///         Ok(())
83///     }
84/// }
85/// ```
86#[allow(unused_variables)]
87pub trait Actor: Sized + Send + 'static {
88    type Props: Send + 'static;
89    type Msg: Send + 'static;
90    type Err: Send + Sync + 'static;
91
92    /// Constructs the actor. Called on initial spawn and on every restart.
93    ///
94    /// # Example
95    /// ```ignore
96    /// async fn init(ctx: &mut Ctx<Self>) -> Result<Self, Self::Err> {
97    ///     Ok(MyActor { count: ctx.props().initial })
98    /// }
99    /// ```
100    fn init(ctx: &mut Ctx<Self>) -> impl Future<Output = Result<Self, Self::Err>> + Send;
101
102    /// Cleanup hook called when the actor stops, restarts, or fails to init.
103    /// `this` is `None` if init failed.
104    ///
105    /// # Example
106    /// ```ignore
107    /// async fn exit(this: Option<Self>, reason: ExitReason<Self>, ctx: &mut Ctx<Self>) {
108    ///     if let ExitReason::Err(e) = reason {
109    ///         eprintln!("actor failed: {e:?}");
110    ///     }
111    /// }
112    /// ```
113    fn exit(
114        this: Option<Self>,
115        reason: ExitReason<Self>,
116        ctx: &mut Ctx<Self>,
117    ) -> impl Future<Output = ()> + Send {
118        async {}
119    }
120
121    /// Sets up message sources (streams, intervals) after init.
122    ///
123    /// Sources added earlier in the [`SourceSet`] chain have higher polling priority.
124    /// If an earlier source is consistently ready, later sources may be starved.
125    ///
126    /// # Example
127    /// ```ignore
128    /// async fn sources(&self, ctx: &Ctx<Self>) -> Result<impl Sources<Self>, Self::Err> {
129    ///     Ok(SourceSet::new()
130    ///         .interval(time::interval(Duration::from_millis(100)), || Msg::Tick)
131    ///         .stream(my_stream))
132    /// }
133    /// ```
134    fn sources(
135        &self,
136        ctx: &Ctx<Self>,
137    ) -> impl Future<Output = Result<impl Sources<Self>, Self::Err>> + Send {
138        async { Ok(SourceSet::new()) }
139    }
140
141    /// Called everytime your [`Actor`] receives a message.
142    ///
143    /// # Example
144    /// ```ignore
145    /// async fn handle(&mut self, msg: Self::Msg, ctx: &mut Ctx<Self>) -> Result<(), Self::Err> {
146    ///     match msg {
147    ///         Msg::Inc(n) => self.count += n,
148    ///     }
149    ///
150    ///     Ok(())
151    /// }
152    /// ```
153    fn handle(
154        &mut self,
155        msg: Self::Msg,
156        ctx: &mut Ctx<Self>,
157    ) -> impl Future<Output = Result<(), Self::Err>> + Send {
158        async { Ok(()) }
159    }
160}
161
162/// A handle to send messages to or stop an [`Actor`].
163pub struct Handle<Msg> {
164    msg_tx: Sender<Msg>,
165    proc_msg_tx: Sender<ProcMsg>,
166}
167
168impl<Msg> Clone for Handle<Msg> {
169    fn clone(&self) -> Self {
170        Self {
171            msg_tx: self.msg_tx.clone(),
172            proc_msg_tx: self.proc_msg_tx.clone(),
173        }
174    }
175}
176
177impl<Msg> Handle<Msg> {
178    /// Stops the [`Actor`] associated with this handle. Does not wait for the actor to finish.
179    ///
180    /// # Example
181    /// ```ignore
182    /// handle.stop();
183    /// ```
184    pub fn stop(&self) {
185        let (tx, _) = flume::unbounded();
186        let _ = self
187            .proc_msg_tx
188            .send(ProcMsg::FromHandle(ProcAction::Stop(tx)));
189    }
190
191    /// Restarts the [`Actor`] by re-running [`Actor::init`] and [`Actor::sources`]. Does not wait for the actor to finish.
192    ///
193    /// # Example
194    /// ```ignore
195    /// handle.restart();
196    /// ```
197    pub fn restart(&self) {
198        let _ = self
199            .proc_msg_tx
200            .send(ProcMsg::FromHandle(ProcAction::Restart));
201    }
202
203    /// Returns `true` if the [`Actor`] is still running.
204    ///
205    /// # Example
206    /// ```ignore
207    /// if handle.is_alive() {
208    ///     handle.send(Msg::Ping);
209    /// }
210    /// ```
211    pub fn is_alive(&self) -> bool {
212        !self.msg_tx.is_disconnected()
213    }
214
215    /// Sends a message to the [`Actor`], returning `true` if the message was delivered
216    /// or `false` if the actor is no longer running.
217    /// Takes advantage of `From<_>` implementations on the message type.
218    ///
219    /// # Example
220    /// ```ignore
221    /// // Given `#[derive(From)] enum Msg { Inc(u32) }`:
222    /// handle.send(Msg::Inc(1));
223    /// handle.send(1u32); // works via From<u32>
224    /// ```
225    pub fn send<M: Into<Msg>>(&self, msg: M) -> bool {
226        self.msg_tx.send(msg.into()).is_ok()
227    }
228
229    /// Sends a message to the [`Actor`] after the given duration, failing silently if it is no longer running.
230    ///
231    /// # Example
232    /// ```ignore
233    /// handle.send_in(Msg::Timeout, Duration::from_secs(5));
234    /// ```
235    pub fn send_in<M>(&self, msg: M, duration: Duration)
236    where
237        Msg: 'static + Send,
238        M: 'static + Send + Into<Msg>,
239    {
240        let msg_tx = self.msg_tx.clone();
241
242        task::spawn(async move {
243            time::sleep(duration).await;
244            let _ = msg_tx.send(msg.into());
245        });
246    }
247
248    /// Sends a request and awaits a response. Requires `Msg: From<Request<Req, Res>>`.
249    ///
250    /// # Example
251    /// ```ignore
252    /// #[derive(From)]
253    /// enum Msg {
254    ///     GetCount(Request<(), u32>),
255    /// }
256    ///
257    /// // sender side:
258    /// let count: u32 = handle.req(()).await?;
259    ///
260    /// // receiver side, inside handle():
261    /// Msg::GetCount(req) => req.reply(self.count),
262    /// ```
263    pub async fn req<Req, Res>(&self, req: Req) -> Result<Res, ReqErr>
264    where
265        Msg: From<Request<Req, Res>>,
266    {
267        let (req, res) = req_res(req);
268        self.send(req);
269        res.recv().await
270    }
271
272    /// Like [`Handle::req`], but uses a wrapper function to convert the [`Request`] into the message type.
273    /// Useful when the message variant can't implement `From<Request<Req, Res>>`.
274    ///
275    /// # Example
276    /// ```ignore
277    /// enum Msg {
278    ///     GetCount(Request<(), u32>),
279    /// }
280    ///
281    /// let count: u32 = handle.reqw(Msg::GetCount, ()).await?;
282    /// ```
283    pub async fn reqw<F, Req, Res>(&self, to_req: F, req: Req) -> Result<Res, ReqErr>
284    where
285        F: Fn(Request<Req, Res>) -> Msg,
286    {
287        let (req, res) = req_res(req);
288        let msg = to_req(req);
289        self.send(msg);
290        res.recv().await
291    }
292
293    /// Like [`Handle::req`], but fails with [`ReqErr::Timeout`] if no response within the given [`Duration`].
294    ///
295    /// # Example
296    /// ```ignore
297    /// let count: u32 = handle.req_timeout((), Duration::from_secs(1)).await?;
298    /// ```
299    pub async fn req_timeout<Req, Res>(&self, req: Req, timeout: Duration) -> Result<Res, ReqErr>
300    where
301        Msg: From<Request<Req, Res>>,
302    {
303        let (req, res) = req_res(req);
304        self.send(req);
305        res.recv_timeout(timeout).await
306    }
307
308    /// Like [`Handle::reqw`], but fails with [`ReqErr::Timeout`] if no response within the given [`Duration`].
309    ///
310    /// # Example
311    /// ```ignore
312    /// let count: u32 = handle.reqw_timeout(Msg::GetCount, (), Duration::from_secs(1)).await?;
313    /// ```
314    pub async fn reqw_timeout<F, Req, Res>(
315        &self,
316        to_req: F,
317        req: Req,
318        timeout: Duration,
319    ) -> Result<Res, ReqErr>
320    where
321        F: Fn(Request<Req, Res>) -> Msg,
322    {
323        let (req, res) = req_res(req);
324        let msg = to_req(req);
325        self.send(msg);
326        res.recv_timeout(timeout).await
327    }
328}
329
330/// The context surrounding the current `Actor`.
331///
332/// Provides a collection of methods that allow you to:
333/// - spawn other actors as children of the current actor
334/// - access the `Handle<_>` for the currrent actor
335/// - access this actor's props
336/// - clear this actor's mailbox
337pub struct Ctx<P>
338where
339    P: Actor,
340{
341    id: u64,
342    props: P::Props,
343    handle: Handle<P::Msg>,
344    msg_rx: Receiver<P::Msg>,
345    parent_proc_msg_tx: Option<Sender<ProcMsg>>,
346    proc_msg_rx: Receiver<ProcMsg>,
347    children_proc_msg_tx: HashMap<u64, Sender<ProcMsg>>,
348    supervision: Supervision,
349    total_children: u64,
350    tasks: JoinSet<Result<P::Msg, P::Err>>,
351    restarts: u64,
352    registry_key: Option<String>,
353    registry: Arc<RwLock<HashMap<String, Box<dyn Any + Send + Sync>>>>,
354    pubsub: Arc<RwLock<PubSub>>,
355    subscription_ids: Vec<(String, u64)>,
356}
357
358impl<P> Ctx<P>
359where
360    P: Actor,
361{
362    /// Returns a reference to this [`Actor`]'s props. Props are set once at spawn time
363    /// and remain immutable for the lifetime of the actor, including across restarts.
364    ///
365    /// # Example
366    /// ```ignore
367    /// async fn init(ctx: &mut Ctx<Self>) -> Result<Self, Self::Err> {
368    ///     Ok(MyActor { count: ctx.props().initial_count })
369    /// }
370    /// ```
371    pub fn props(&self) -> &P::Props {
372        &self.props
373    }
374
375    /// Returns a [`Handle`] to the current [`Actor`], allowing it to send messages to itself
376    /// or pass its handle to child actors.
377    ///
378    /// # Example
379    /// ```ignore
380    /// // schedule a message to self
381    /// ctx.this().send_in(Msg::Tick, Duration::from_secs(1));
382    /// ```
383    pub fn this(&self) -> &Handle<P::Msg> {
384        &self.handle
385    }
386
387    /// Drains all pending messages from this [`Actor`]'s mailbox. Useful during
388    /// restarts to discard stale messages via [`Actor::init`].
389    ///
390    /// # Example
391    /// ```ignore
392    /// async fn init(ctx: &mut Ctx<Self>) -> Result<Self, Self::Err> {
393    ///     ctx.clear_mailbox();
394    ///     Ok(MyActor::default())
395    /// }
396    /// ```
397    pub fn clear_mailbox(&self) {
398        self.msg_rx.drain();
399    }
400
401    /// Creates a [`SpawnBuilder`] for spawning a child [`Actor`]. The actor type is passed
402    /// as a generic parameter and its props as the argument. The child is supervised
403    /// by the current actor and will be stopped when the parent stops.
404    ///
405    /// # Example
406    /// ```ignore
407    /// let handle = ctx.actor::<Worker>(WorkerProps { id: 1 })
408    ///     .supervision(Supervision::Restart {
409    ///         max: Limit::Amount(3),
410    ///         backoff: Backoff::None,
411    ///     })
412    ///     .spawn();
413    /// ```
414    pub fn actor<'a, Child>(&'a mut self, props: Child::Props) -> SpawnBuilder<'a, P, Child>
415    where
416        Child: Actor,
417    {
418        SpawnBuilder::new(self, props)
419    }
420
421    /// Restarts all child actors immediately, bypassing their supervision strategy.
422    /// Each child will re-run its [`Actor::init`] with the same props.
423    ///
424    /// This is fire-and-forget: it does not wait for children to finish restarting.
425    pub fn restart_children(&self) {
426        for child in self.children_proc_msg_tx.values() {
427            let _ = child.send(ProcMsg::FromParent(ProcAction::Restart));
428        }
429    }
430
431    /// Stops all child actors and waits for each to fully terminate before returning.
432    pub async fn stop_children(&mut self) {
433        let mut acks = Vec::with_capacity(self.total_children as usize);
434        for child in self.children_proc_msg_tx.values() {
435            let (ack_tx, ack_rx) = flume::unbounded();
436            let _ = child.send(ProcMsg::FromParent(ProcAction::Stop(ack_tx)));
437            acks.push(ack_rx);
438        }
439
440        for ack in acks {
441            let _ = ack.recv_async().await;
442        }
443
444        self.total_children = 0;
445        self.children_proc_msg_tx.clear();
446    }
447
448    /// Spawns a background async task. On completion, its `Ok` value is delivered
449    /// as a message to this [`Actor`]; its `Err` triggers the supervision strategy
450    /// that this actor's parent has set for it.
451    ///
452    /// Tasks are aborted when the actor stops, but **survive restarts**. If the
453    /// actor is restarted (via supervision or [`Ctx::restart_children`]), in-flight
454    /// tasks from the previous incarnation will continue running and their results
455    /// will still be delivered to the restarted actor's `handle()`.
456    ///
457    /// # Example
458    /// ```ignore
459    /// ctx.task(async {
460    ///     let data = reqwest::get("https://example.com").await?.text().await?;
461    ///     Ok(Msg::Fetched(data))
462    /// });
463    /// ```
464    pub fn task<F>(&mut self, f: F)
465    where
466        F: Future<Output = Result<P::Msg, P::Err>> + Send + 'static,
467    {
468        self.tasks.spawn(f);
469    }
470
471    /// Looks up a registered [`Actor`]'s [`Handle`] by its type. The actor must have been
472    /// spawned with [`SpawnBuilder::spawn_registered`].
473    ///
474    /// # Example
475    /// ```ignore
476    /// let logger = ctx.get_handle_for::<Logger>()?;
477    /// logger.send(LogMsg::Info("hello".into()));
478    /// ```
479    pub fn get_handle_for<A: Actor>(&self) -> Result<Handle<A::Msg>, RegistryError> {
480        let key = std::any::type_name::<A>();
481        let reg = self.registry.read().map_err(|_| RegistryError::PoisonErr)?;
482        reg.get(key)
483            .and_then(|h| h.downcast_ref::<Handle<A::Msg>>())
484            .cloned()
485            .ok_or_else(|| RegistryError::NotFound(key.to_string()))
486    }
487
488    /// Looks up a registered [`Actor`]'s [`Handle`] by name. The actor must have been
489    /// spawned with [`SpawnBuilder::spawn_named`].
490    ///
491    /// # Example
492    /// ```ignore
493    /// let worker = ctx.get_handle::<WorkerMsg>("worker-1")?;
494    /// worker.send(WorkerMsg::Start);
495    /// ```
496    pub fn get_handle<Msg: Send + 'static>(
497        &self,
498        name: &str,
499    ) -> Result<Handle<Msg>, RegistryError> {
500        let reg = self.registry.read().map_err(|_| RegistryError::PoisonErr)?;
501        reg.get(name)
502            .and_then(|h| h.downcast_ref::<Handle<Msg>>())
503            .cloned()
504            .ok_or_else(|| RegistryError::NotFound(name.to_string()))
505    }
506
507    /// Sends a message to a registered [`Actor`] looked up by type.
508    ///
509    /// # Example
510    /// ```ignore
511    /// // Assuming MetricsCollector was spawned with spawn_registered():
512    /// // ctx.actor::<MetricsCollector>(props).spawn_registered()?;
513    ///
514    /// // Any actor in the system can then send to it by type:
515    /// ctx.send::<MetricsCollector>(MetricsMsg::RecordLatency(42))?;
516    /// ```
517    pub fn send<A: Actor>(&self, msg: impl Into<A::Msg>) -> Result<(), RegistryError> {
518        let key = std::any::type_name::<A>();
519        let reg = self.registry.read().map_err(|_| RegistryError::PoisonErr)?;
520        match reg
521            .get(key)
522            .and_then(|h| h.downcast_ref::<Handle<A::Msg>>())
523        {
524            Some(handle) => {
525                handle.send(msg);
526                Ok(())
527            }
528            None => Err(RegistryError::NotFound(key.to_string())),
529        }
530    }
531
532    /// Sends a message to a registered [`Actor`] looked up by name.
533    ///
534    /// # Example
535    /// ```ignore
536    /// // Assuming a Worker was spawned with spawn_named():
537    /// // ctx.actor::<Worker>(props).spawn_named("worker-1")?;
538    ///
539    /// // Any actor in the system can then send to it by name:
540    /// ctx.send_to("worker-1", WorkerMsg::Start)?;
541    /// ```
542    pub fn send_to<Msg: Send + 'static>(
543        &self,
544        name: &str,
545        msg: impl Into<Msg>,
546    ) -> Result<(), RegistryError> {
547        let reg = self.registry.read().map_err(|_| RegistryError::PoisonErr)?;
548        match reg.get(name).and_then(|h| h.downcast_ref::<Handle<Msg>>()) {
549            Some(handle) => {
550                handle.send(msg);
551                Ok(())
552            }
553            None => Err(RegistryError::NotFound(name.to_string())),
554        }
555    }
556}
557
558#[allow(clippy::enum_variant_names)]
559#[derive(Debug)]
560enum ProcMsg {
561    /// Sent from child once it terminates
562    ChildTerminated {
563        child_id: u64,
564    },
565    FromParent(ProcAction),
566    FromHandle(ProcAction),
567}
568
569#[derive(Debug)]
570enum ProcAction {
571    Restart,
572    Stop(Sender<()>),
573}
574
575fn spawn<Child, W>(mut ctx: Ctx<Child>, delay: Option<Duration>, watch: W)
576where
577    Child: Actor,
578    W: OnErrTerminate<Child::Err>,
579{
580    tokio::spawn(async move {
581        if let Some(d) = delay.filter(|d| !d.is_zero()) {
582            time::sleep(d).await;
583        }
584
585        // restart is Some whenever we should restart
586        let mut restart = Restart::No;
587        let mut exit_reason = None;
588        let mut actor_created = None;
589        let mut stop_ack_tx = None;
590
591        match Child::init(&mut ctx).await {
592            Err(e) => {
593                exit_reason = Some(ExitReason::Err(e));
594                restart = Restart::from_supervision(ctx.supervision, ctx.restarts);
595            }
596
597            Ok(mut actor) => match actor.sources(&ctx).await {
598                Err(e) => {
599                    exit_reason = Some(ExitReason::Err(e));
600                    restart = Restart::from_supervision(ctx.supervision, ctx.restarts);
601                    actor_created = Some(actor);
602                }
603
604                Ok(mut sources) => {
605                    macro_rules! on_err {
606                        ($e:expr) => {
607                            if let Supervision::Resume = ctx.supervision {
608                                continue;
609                            }
610
611                            restart = Restart::from_supervision(ctx.supervision, ctx.restarts);
612                            exit_reason = Some(ExitReason::Err($e));
613                            actor_created = Some(actor);
614                            break;
615                        };
616                    }
617
618                    loop {
619                        tokio::select! {
620                            biased;
621
622                            proc_msg = ctx.proc_msg_rx.recv_async() => {
623                                match proc_msg {
624                                    Err(_) => break,
625
626                                    Ok(ProcMsg::FromHandle(ProcAction::Stop(tx)) ) => {
627                                        exit_reason = Some(ExitReason::Handle);
628                                        stop_ack_tx = Some(tx);
629                                        break
630                                    },
631
632                                    Ok(ProcMsg::FromParent(ProcAction::Stop(tx))) => {
633                                        exit_reason = exit_reason.or(Some(ExitReason::Parent));
634                                        stop_ack_tx = Some(tx);
635                                        break
636                                    },
637
638                                    Ok(ProcMsg::FromParent(ProcAction::Restart)) => {
639                                        exit_reason = exit_reason.or(Some(ExitReason::Parent));
640                                        restart = Restart::In(Duration::ZERO);
641                                        break;
642                                    }
643
644
645                                    Ok(ProcMsg::FromHandle(ProcAction::Restart)) => {
646                                        exit_reason = exit_reason.or(Some(ExitReason::Handle));
647                                        restart = Restart::In(Duration::ZERO);
648                                        break;
649                                    }
650
651                                    Ok(ProcMsg::ChildTerminated { child_id, }) => {
652                                        if ctx.children_proc_msg_tx.remove(&child_id).is_some() {
653                                            ctx.total_children -= 1;
654                                        }
655                                    }
656                                }
657                            }
658
659                            recvd = ctx.msg_rx.recv_async() => {
660                                match recvd {
661                                    Err(_) => break,
662
663                                    Ok(msg) => {
664                                        if let Err(e) = actor.handle(msg, &mut ctx).await {
665                                            on_err!(e);
666                                        };
667                                    }
668                                }
669                            }
670
671                            Some(Ok(msg)) = ctx.tasks.join_next() => {
672                                match msg {
673                                    Err(e) => {
674                                        on_err!(e);
675                                    }
676
677                                    Ok(msg) => {
678                                        if let Err(e) = actor.handle(msg, &mut ctx).await {
679                                            on_err!(e);
680                                        };
681                                    }
682                                }
683
684                            }
685
686                            Some(msg) = std::future::poll_fn(|cx| Pin::new(&mut sources).poll_next(cx)) => {
687                                if let Err(e) = actor.handle(msg, &mut ctx).await {
688                                    on_err!(e);
689                                };
690                            }
691                        }
692                    }
693                }
694            },
695        }
696
697        ctx.stop_children().await;
698        let exit_reason = exit_reason.unwrap_or(ExitReason::Handle);
699
700        if let ExitReason::Err(_) = &exit_reason {
701            ctx.restarts += 1;
702        }
703
704        if let (Restart::No, ExitReason::Err(ref e)) = (&restart, &exit_reason) {
705            watch.on_err_terminate(e);
706        }
707
708        Child::exit(actor_created, exit_reason, &mut ctx).await;
709
710        // Clean up pub/sub subscriptions (runs on both stop and restart)
711        if !ctx.subscription_ids.is_empty() {
712            if let Ok(mut bus) = ctx.pubsub.write() {
713                for (topic, sub_id) in ctx.subscription_ids.drain(..) {
714                    if let Some(entry) = bus.topics.get_mut(&topic) {
715                        entry.subscribers.retain(|s| s.id != sub_id);
716                        if entry.subscribers.is_empty() {
717                            bus.topics.remove(&topic);
718                        }
719                    }
720                }
721            }
722        }
723
724        let _ = stop_ack_tx.map(|tx| tx.send(()));
725
726        if let Restart::In(duration) = restart {
727            spawn::<Child, W>(ctx, Some(duration), watch)
728        } else if let Some(parent_tx) = ctx.parent_proc_msg_tx {
729            if let Some(key) = ctx.registry_key.take() {
730                if let Ok(mut reg) = ctx.registry.write() {
731                    reg.remove(&key);
732                }
733            }
734
735            let _ = parent_tx.send(ProcMsg::ChildTerminated { child_id: ctx.id });
736        }
737    });
738}
739
740/// Defines how a parent reacts when a child actor fails.
741///
742/// # Example
743/// ```ignore
744/// let supervision = Supervision::Restart {
745///     max: Limit::Amount(5),
746///     backoff: Backoff::Static(Duration::from_secs(1)),
747/// };
748/// ```
749#[derive(Debug, Clone, Copy)]
750pub enum Supervision {
751    /// Actor terminates on error.
752    Stop,
753    /// Actor continues processing the next message after an error.
754    Resume,
755    /// Actor is restarted on error, up to `max` times with optional `backoff`.
756    Restart { max: Limit, backoff: Backoff },
757}
758
759/// Delay strategy between restart attempts.
760///
761/// # Example
762/// ```ignore
763/// let backoff = Backoff::Incremental {
764///     min: Duration::from_millis(100),
765///     max: Duration::from_secs(5),
766///     step: Duration::from_millis(500),
767/// };
768/// ```
769#[derive(Debug, Clone, Copy, Eq, PartialEq)]
770pub enum Backoff {
771    /// Restart immediately with no delay.
772    None,
773    /// Wait a fixed duration between restarts.
774    Static(Duration),
775    /// Linearly increase delay from `min` to `max` by `step` per restart.
776    Incremental {
777        min: Duration,
778        max: Duration,
779        step: Duration,
780    },
781}
782
783/// Maximum number of restarts allowed.
784///
785/// # Example
786/// ```ignore
787/// let limit = Limit::Amount(3);
788/// ```
789#[derive(Debug, Clone, Copy, Eq, PartialEq)]
790pub enum Limit {
791    /// No limit on restarts.
792    None,
793    /// Restart at most this many times.
794    Amount(u64),
795}
796
797/// **Note**: `0` maps to [`Limit::None`] (unlimited), not zero restarts.
798/// If you want zero restarts (i.e., never restart), use [`Supervision::Stop`] instead.
799impl From<u64> for Limit {
800    fn from(value: u64) -> Self {
801        match value {
802            0 => Limit::None,
803            v => Limit::Amount(v),
804        }
805    }
806}
807
808impl PartialEq<u64> for Limit {
809    fn eq(&self, other: &u64) -> bool {
810        match self {
811            Limit::None => false,
812            Limit::Amount(n) => n == other,
813        }
814    }
815}
816
817#[derive(Debug, Clone)]
818pub enum RegistryError {
819    NameTaken(String),
820    NotFound(String),
821    PoisonErr,
822}
823
824impl std::fmt::Display for RegistryError {
825    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
826        match self {
827            RegistryError::NameTaken(name) => write!(f, "registry name already taken: {name}"),
828            RegistryError::NotFound(name) => write!(f, "no actor registered under: {name}"),
829            RegistryError::PoisonErr => write!(f, "registry lock poisoned"),
830        }
831    }
832}
833
834impl std::error::Error for RegistryError {}
835
836/// Builder for configuring and spawning a child [`Actor`]. Created via [`Ctx::actor`].
837pub struct SpawnBuilder<'a, Parent, Child, W = NoWatch>
838where
839    Parent: Actor,
840    Child: Actor,
841{
842    ctx: &'a mut Ctx<Parent>,
843    props: Child::Props,
844    supervision: Supervision,
845    /// Only kicks in if child is stopped or reaches maximum number of restarts.
846    watch: W,
847    registry_key: Option<String>,
848}
849
850impl<'a, Parent, Child> SpawnBuilder<'a, Parent, Child, NoWatch>
851where
852    Parent: Actor,
853    Child: Actor,
854{
855    fn new(ctx: &'a mut Ctx<Parent>, props: Child::Props) -> Self {
856        Self {
857            ctx,
858            props,
859            supervision: Supervision::Restart {
860                max: Limit::None,
861                backoff: Backoff::None,
862            },
863            watch: NoWatch,
864            registry_key: None,
865        }
866    }
867}
868
869impl<'a, Parent, Child, W> SpawnBuilder<'a, Parent, Child, W>
870where
871    Parent: Actor,
872    Child: Actor,
873    W: OnErrTerminate<Child::Err>,
874{
875    /// Sets the [`Supervision`] strategy the parent will use for this child.
876    /// Defaults to [`Supervision::Restart`] with unlimited restarts and no backoff.
877    ///
878    /// # Example
879    /// ```ignore
880    /// ctx.actor::<Worker>(props)
881    ///     .supervision(Supervision::Restart {
882    ///         max: Limit::Amount(3),
883    ///         backoff: Backoff::Static(Duration::from_secs(1)),
884    ///     })
885    ///     .spawn();
886    /// ```
887    pub fn supervision(mut self, supervision: Supervision) -> Self {
888        self.supervision = supervision;
889        self
890    }
891
892    /// Registers a callback that fires when the child terminates due to an error.
893    /// This happens when the supervision strategy is [`Supervision::Stop`], or when
894    /// [`Supervision::Restart`] has exhausted all allowed restarts. The callback maps
895    /// the child's error into a message for the parent.
896    ///
897    /// # Example
898    /// ```ignore
899    /// ctx.actor::<Worker>(props)
900    ///     .supervision(Supervision::Restart {
901    ///         max: Limit::Amount(3),
902    ///         backoff: Backoff::None,
903    ///     })
904    ///     .watch(|err| ParentMsg::WorkerDied(format!("{err:?}")))
905    ///     .spawn();
906    /// ```
907    pub fn watch<F>(self, f: F) -> SpawnBuilder<'a, Parent, Child, WatchFn<F, Parent::Msg>>
908    where
909        F: Fn(&Child::Err) -> Parent::Msg + Send + 'static,
910    {
911        let parent_msg_tx = self.ctx.handle.msg_tx.clone();
912        SpawnBuilder {
913            ctx: self.ctx,
914            props: self.props,
915            supervision: self.supervision,
916            watch: WatchFn { f, parent_msg_tx },
917            registry_key: self.registry_key,
918        }
919    }
920
921    /// Spawns the child [`Actor`] and returns a [`Handle`] to it.
922    pub fn spawn(self) -> Handle<Child::Msg> {
923        let (msg_tx, msg_rx) = flume::unbounded(); // child
924        let (proc_msg_tx, proc_msg_rx) = flume::unbounded(); // child
925
926        let handle = Handle {
927            msg_tx,
928            proc_msg_tx,
929        };
930
931        self.ctx.total_children += 1;
932        let id = self.ctx.total_children;
933
934        let ctx: Ctx<Child> = Ctx {
935            id,
936            props: self.props,
937            handle: handle.clone(),
938            msg_rx,
939            parent_proc_msg_tx: Some(self.ctx.handle.proc_msg_tx.clone()),
940            proc_msg_rx,
941            children_proc_msg_tx: HashMap::new(),
942            total_children: 0,
943            supervision: self.supervision,
944            restarts: 0,
945            tasks: JoinSet::new(),
946            registry_key: self.registry_key,
947            registry: self.ctx.registry.clone(),
948            pubsub: self.ctx.pubsub.clone(),
949            subscription_ids: Vec::new(),
950        };
951
952        spawn::<Child, W>(ctx, None, self.watch);
953
954        self.ctx
955            .children_proc_msg_tx
956            .insert(self.ctx.total_children, handle.proc_msg_tx.clone());
957
958        handle
959    }
960
961    /// Spawns the child and registers it in the global registry under its type name.
962    /// Other actors can then look it up via [`Ctx::get_handle_for`] or [`Ctx::send`].
963    /// Returns [`RegistryError::NameTaken`] if already registered.
964    pub fn spawn_registered(self) -> Result<Handle<Child::Msg>, RegistryError> {
965        let key = std::any::type_name::<Child>();
966        self.spawn_named(key)
967    }
968
969    /// Spawns the child and registers it in the global registry under the given name.
970    /// Other actors can then look it up via [`Ctx::get_handle`] or [`Ctx::send_to`].
971    /// Returns [`RegistryError::NameTaken`] if the name is already taken.
972    ///
973    /// # Example
974    /// ```ignore
975    /// let h = ctx.actor::<Worker>(props).spawn_named("worker-1")?;
976    /// ```
977    pub fn spawn_named(
978        mut self,
979        name: impl Into<String>,
980    ) -> Result<Handle<Child::Msg>, RegistryError> {
981        let name = name.into();
982        let registry = self.ctx.registry.clone();
983        let mut reg = registry.write().map_err(|_| RegistryError::PoisonErr)?;
984
985        if reg.contains_key(&name) {
986            return Err(RegistryError::NameTaken(name.clone()));
987        }
988
989        self.registry_key = Some(name.clone());
990        let handle = self.spawn();
991        reg.insert(name, Box::new(handle.clone()));
992
993        Ok(handle)
994    }
995}
996
997#[derive(Debug)]
998enum Restart {
999    No,
1000    In(Duration),
1001}
1002
1003impl Restart {
1004    fn from_supervision(supervision: Supervision, current_restarts: u64) -> Self {
1005        match supervision {
1006            Supervision::Stop => Restart::No,
1007            Supervision::Resume => Restart::No,
1008            Supervision::Restart { max, .. } if max == current_restarts + 1 => Restart::No,
1009            Supervision::Restart { backoff, .. } => {
1010                let wait = match backoff {
1011                    Backoff::None => Duration::ZERO,
1012                    Backoff::Static(duration) => duration,
1013                    Backoff::Incremental { min, max, step } => {
1014                        let wait = step.mul_f64((current_restarts + 1) as f64);
1015                        let wait = cmp::min(max, wait);
1016                        cmp::max(min, wait)
1017                    }
1018                };
1019
1020                Restart::In(wait)
1021            }
1022        }
1023    }
1024}