Skip to main content

axactor/
lib.rs

1use parking_lot::Mutex;
2use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
3use std::sync::Arc;
4use std::time::{Duration, SystemTime};
5use tokio::sync::{mpsc, oneshot};
6use tokio::time::Instant;
7
8pub use async_trait;
9pub use axactor_macros::actor;
10
11pub type SharedHandle = Arc<Mutex<Option<tokio::task::JoinHandle<()>>>>;
12
13pub use rt::{Actor, System};
14pub mod cluster;
15pub mod obs;
16pub mod ref_api;
17pub mod registry;
18pub mod rt;
19pub mod supervisor;
20pub use ref_api::{
21    Addr, AnyAskError, AnyMonitorError, AnyRef, AnyTellError, LocalContractRef, LocalMonitorError,
22    LocalRef, RefContract, RemoteRef, RemoteRefError, RemoteSafe, SimdJsonCodec, WireCodec,
23    WireCodecError,
24};
25pub use registry::RegistryError;
26
27static PID_SEQ: AtomicU64 = AtomicU64::new(1);
28
29#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
30pub struct Pid(pub u64);
31
32impl Pid {
33    pub const INVALID: Self = Self(0);
34
35    pub fn next() -> Self {
36        Self(PID_SEQ.fetch_add(1, Ordering::Relaxed))
37    }
38}
39
40impl std::fmt::Display for Pid {
41    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
42        write!(f, "{}", self.0)
43    }
44}
45
46pub type MonitorId = u64;
47
48#[derive(Debug, Clone, PartialEq, Eq)]
49pub enum ExitReason {
50    Normal,
51    Shutdown,
52    Panic,
53    Killed,
54    LeaseLost,
55    NodeDown,
56    NoSuchActor,
57    RestartIntensityExceeded,
58    ProtocolViolation,
59    Other(String),
60}
61
62#[derive(Debug, Clone, Copy, PartialEq, Eq)]
63pub enum StopSignalReason {
64    User,
65    Supervisor,
66    Shutdown,
67    LinkPropagation,
68    LeaseLost,
69}
70
71#[derive(Debug, thiserror::Error, Clone, PartialEq, Eq)]
72pub enum SnapshotError {
73    #[error("snapshot not supported")]
74    Unsupported,
75    #[error("snapshot failed: {0}")]
76    Failed(String),
77}
78
79#[derive(Debug)]
80pub enum ControlMsg {
81    Stop {
82        reason: StopSignalReason,
83    },
84    ExitSignal {
85        from: Pid,
86        reason: ExitReason,
87    },
88    Down {
89        monitor_id: MonitorId,
90        pid: Pid,
91        reason: ExitReason,
92    },
93    LeaseLost {
94        registry_id: String,
95        shard_id: usize,
96        epoch: u64,
97    },
98    GetSnapshot {
99        reply: oneshot::Sender<Result<Option<Vec<u8>>, SnapshotError>>,
100    },
101    Ping {
102        reply: oneshot::Sender<()>,
103    },
104}
105
106pub trait MessageKind {
107    fn kind(&self) -> &'static str;
108}
109
110impl MessageKind for () {
111    fn kind(&self) -> &'static str {
112        "()"
113    }
114}
115
116pub trait RefMetrics: Send + Sync {
117    fn mailbox_len(&self) -> usize;
118    fn control_mailbox_len(&self) -> usize {
119        0
120    }
121    fn inflight_count(&self) -> usize;
122    fn idle_for(&self) -> Duration;
123    fn silence_for(&self) -> Duration;
124    fn incarnation(&self) -> u64;
125    fn is_closed(&self) -> bool;
126    fn is_terminated(&self) -> bool;
127    fn pid(&self) -> Pid {
128        Pid::INVALID
129    }
130
131    fn stop(&self) -> Result<(), TellError>;
132}
133
134#[derive(Debug, thiserror::Error, Clone, Copy, PartialEq, Eq)]
135pub enum AskError {
136    #[error("Mailbox full")]
137    Full,
138    #[error("Actor closed")]
139    Closed,
140    #[error("Request timed out")]
141    Timeout,
142    #[error("Response canceled")]
143    Canceled,
144}
145
146#[derive(Debug, thiserror::Error, Clone, Copy, PartialEq, Eq)]
147pub enum TellError {
148    #[error("Mailbox full")]
149    Full,
150    #[error("Actor closed")]
151    Closed,
152}
153
154pub trait SpawnGuard: Send + Sync + 'static {}
155impl<T: Send + Sync + 'static> SpawnGuard for T {}
156
157#[derive(Debug, thiserror::Error, Clone)]
158pub enum SpawnError {
159    #[error("Actor name already exists: {0}")]
160    NameTaken(String),
161    #[error("spawn(actor, ..) cannot restart; use spawn_with(...) for restart policies")]
162    NotRestartable,
163    #[error("system is shutting down")]
164    ShuttingDown,
165}
166
167#[derive(Debug, Clone, PartialEq, Eq)]
168pub enum ActorStopReason {
169    Normal,
170    Panic,
171    FactoryPanic,
172    LifecycleHookPanic,
173    ContextStop,
174    MailboxClosed,
175    ShutdownImmediate,
176    ShutdownGraceful,
177    ShutdownDeadlineExceeded,
178}
179
180#[derive(Debug, Clone, PartialEq, Eq)]
181pub enum ActorLifecycleEventKind {
182    Started,
183    Restarted {
184        incarnation: u64,
185    },
186    Stopped {
187        reason: ActorStopReason,
188        incarnation: u64,
189    },
190}
191
192#[derive(Debug, Clone, PartialEq, Eq)]
193pub struct ActorLifecycleEvent {
194    pub actor: String,
195    pub at: SystemTime,
196    pub kind: ActorLifecycleEventKind,
197}
198
199impl From<TellError> for AskError {
200    fn from(e: TellError) -> Self {
201        match e {
202            TellError::Full => AskError::Full,
203            TellError::Closed => AskError::Closed,
204        }
205    }
206}
207
208#[cfg(feature = "axum")]
209impl axum::response::IntoResponse for AskError {
210    fn into_response(self) -> axum::response::Response {
211        let status = match self {
212            AskError::Full => axum::http::StatusCode::SERVICE_UNAVAILABLE,
213            AskError::Closed => axum::http::StatusCode::SERVICE_UNAVAILABLE,
214            AskError::Timeout => axum::http::StatusCode::GATEWAY_TIMEOUT,
215            AskError::Canceled => axum::http::StatusCode::INTERNAL_SERVER_ERROR,
216        };
217        (status, self.to_string()).into_response()
218    }
219}
220
221#[cfg(feature = "axum")]
222impl axum::response::IntoResponse for TellError {
223    fn into_response(self) -> axum::response::Response {
224        let status = match self {
225            TellError::Full => axum::http::StatusCode::SERVICE_UNAVAILABLE,
226            TellError::Closed => axum::http::StatusCode::SERVICE_UNAVAILABLE,
227        };
228        (status, self.to_string()).into_response()
229    }
230}
231
232#[derive(Clone)]
233pub struct OverloadConfig {
234    pub high: usize,
235    pub low: usize,
236    pub policy: OverloadPolicy,
237}
238
239impl Default for OverloadConfig {
240    fn default() -> Self {
241        Self {
242            high: usize::MAX,
243            low: usize::MAX,
244            policy: OverloadPolicy::Reject,
245        }
246    }
247}
248
249#[derive(Debug, Clone, Copy, PartialEq, Eq)]
250pub enum OverloadPolicy {
251    Reject,
252    Block,
253}
254
255#[derive(Clone)]
256pub struct SpawnConfig {
257    pub name: String,
258    pub capacity: usize,
259    pub control_capacity: usize,
260    pub restart_policy: RestartPolicy,
261    pub restart_mailbox_policy: RestartMailboxPolicy,
262    pub shutdown_mode: ShutdownMode,
263    pub overload: OverloadConfig,
264    pub initial_snapshot: Option<Vec<u8>>,
265    pub extra_guards: Vec<Arc<dyn SpawnGuard>>,
266}
267
268impl SpawnConfig {
269    pub fn new(name: impl Into<String>, capacity: usize) -> Self {
270        Self {
271            name: name.into(),
272            capacity,
273            control_capacity: capacity.clamp(8, 1024),
274            restart_policy: RestartPolicy::None,
275            restart_mailbox_policy: RestartMailboxPolicy::Keep,
276            shutdown_mode: ShutdownMode::Immediate,
277            overload: OverloadConfig::default(),
278            initial_snapshot: None,
279            extra_guards: Vec::new(),
280        }
281    }
282
283    /// Convenience constructor for stateful/distributed actors.
284    /// Sets restart mailbox behavior to `DrainAndDrop`.
285    pub fn new_stateful(name: impl Into<String>, capacity: usize) -> Self {
286        Self::new(name, capacity).restart_mailbox_drain_and_drop()
287    }
288
289    pub fn push_guard(&mut self, guard: Arc<dyn SpawnGuard>) {
290        self.extra_guards.push(guard);
291    }
292
293    pub fn restart_on_panic(
294        mut self,
295        max_restarts: usize,
296        min_backoff: Duration,
297        max_backoff: Duration,
298    ) -> Self {
299        self.restart_policy = RestartPolicy::OnPanic {
300            max_restarts,
301            min_backoff,
302            max_backoff,
303        };
304        self
305    }
306
307    pub fn shutdown_graceful(mut self, deadline: Duration) -> Self {
308        self.shutdown_mode = ShutdownMode::Graceful { deadline };
309        self
310    }
311
312    pub fn restart_mailbox_policy(mut self, policy: RestartMailboxPolicy) -> Self {
313        self.restart_mailbox_policy = policy;
314        self
315    }
316
317    pub fn restart_mailbox_drain_and_drop(mut self) -> Self {
318        self.restart_mailbox_policy = RestartMailboxPolicy::DrainAndDrop;
319        self
320    }
321
322    pub fn restart_mailbox_keep(mut self) -> Self {
323        self.restart_mailbox_policy = RestartMailboxPolicy::Keep;
324        self
325    }
326
327    pub fn control_capacity(mut self, capacity: usize) -> Self {
328        self.control_capacity = capacity.max(1);
329        self
330    }
331
332    pub fn overload(mut self, overload: OverloadConfig) -> Self {
333        self.overload = overload;
334        self
335    }
336
337    pub fn initial_snapshot(mut self, snapshot: Vec<u8>) -> Self {
338        self.initial_snapshot = Some(snapshot);
339        self
340    }
341}
342
343#[derive(Debug, Clone, Copy, PartialEq, Eq)]
344pub enum RestartPolicy {
345    None,
346    OnPanic {
347        max_restarts: usize,
348        min_backoff: Duration,
349        max_backoff: Duration,
350    },
351}
352
353#[derive(Debug, Clone, Copy, PartialEq, Eq)]
354pub enum RestartMailboxPolicy {
355    Keep,
356    DrainAndDrop,
357}
358
359#[derive(Debug, Clone, Copy, PartialEq, Eq)]
360pub enum ShutdownMode {
361    Immediate,
362    Graceful { deadline: Duration },
363}
364
365pub struct Context {
366    stop_requested: bool,
367    trap_exit: bool,
368    pid: Pid,
369    stop_exit_reason: Option<ExitReason>,
370}
371
372impl Default for Context {
373    fn default() -> Self {
374        Self::new()
375    }
376}
377
378impl Context {
379    pub fn new() -> Self {
380        Self {
381            stop_requested: false,
382            trap_exit: false,
383            pid: Pid::INVALID,
384            stop_exit_reason: None,
385        }
386    }
387
388    pub fn stop(&mut self) {
389        self.stop_requested = true;
390    }
391
392    pub fn stop_with_exit(&mut self, reason: ExitReason) {
393        self.stop_requested = true;
394        self.stop_exit_reason = Some(reason);
395    }
396
397    pub fn is_stop_requested(&self) -> bool {
398        self.stop_requested
399    }
400
401    pub fn trap_exit(&mut self, enabled: bool) {
402        self.trap_exit = enabled;
403    }
404
405    pub fn is_trap_exit(&self) -> bool {
406        self.trap_exit
407    }
408
409    pub fn pid(&self) -> Pid {
410        self.pid
411    }
412
413    pub(crate) fn set_pid(&mut self, pid: Pid) {
414        self.pid = pid;
415    }
416
417    pub(crate) fn take_stop_exit_reason(&mut self) -> Option<ExitReason> {
418        self.stop_exit_reason.take()
419    }
420}
421
422pub struct MailboxMeter {
423    user_len: AtomicUsize,
424    ctl_len: AtomicUsize,
425    inflight: AtomicUsize,
426    base: Instant,
427    last_seen_ms: AtomicU64,
428    last_done_ms: AtomicU64,
429    incarnation: AtomicU64,
430    closed: std::sync::atomic::AtomicBool,
431    terminated: std::sync::atomic::AtomicBool,
432    overloaded: std::sync::atomic::AtomicBool,
433}
434
435impl Default for MailboxMeter {
436    fn default() -> Self {
437        Self::new()
438    }
439}
440
441impl MailboxMeter {
442    pub fn new() -> Self {
443        let base = Instant::now();
444        Self {
445            user_len: AtomicUsize::new(0),
446            ctl_len: AtomicUsize::new(0),
447            inflight: AtomicUsize::new(0),
448            base,
449            last_seen_ms: AtomicU64::new(0),
450            last_done_ms: AtomicU64::new(0),
451            incarnation: AtomicU64::new(1),
452            closed: std::sync::atomic::AtomicBool::new(false),
453            terminated: std::sync::atomic::AtomicBool::new(false),
454            overloaded: std::sync::atomic::AtomicBool::new(false),
455        }
456    }
457
458    #[inline]
459    pub fn on_startup(&self) {
460        let now = self.now_ms();
461        self.last_seen_ms.store(now, Ordering::Relaxed);
462        self.last_done_ms.store(now, Ordering::Relaxed);
463        self.overloaded.store(false, Ordering::Relaxed);
464    }
465
466    #[inline]
467    fn now_ms(&self) -> u64 {
468        self.base.elapsed().as_millis() as u64
469    }
470
471    #[inline]
472    pub fn on_send_ok_user(&self) {
473        self.user_len.fetch_add(1, Ordering::Relaxed);
474        self.last_seen_ms.store(self.now_ms(), Ordering::Relaxed);
475    }
476
477    #[inline]
478    pub fn on_send_ok_ctl(&self) {
479        self.ctl_len.fetch_add(1, Ordering::Relaxed);
480        self.last_seen_ms.store(self.now_ms(), Ordering::Relaxed);
481    }
482
483    #[inline]
484    pub fn on_recv_user(&self) {
485        #[cfg(debug_assertions)]
486        {
487            let prev = self.user_len.fetch_sub(1, Ordering::Relaxed);
488            debug_assert!(prev > 0, "MailboxMeter user_len underflow detected");
489        }
490        #[cfg(not(debug_assertions))]
491        {
492            self.user_len.fetch_sub(1, Ordering::Relaxed);
493        }
494        self.last_seen_ms.store(self.now_ms(), Ordering::Relaxed);
495    }
496
497    #[inline]
498    pub fn on_recv_ctl(&self) {
499        #[cfg(debug_assertions)]
500        {
501            let prev = self.ctl_len.fetch_sub(1, Ordering::Relaxed);
502            debug_assert!(prev > 0, "MailboxMeter ctl_len underflow detected");
503        }
504        #[cfg(not(debug_assertions))]
505        {
506            self.ctl_len.fetch_sub(1, Ordering::Relaxed);
507        }
508        self.last_seen_ms.store(self.now_ms(), Ordering::Relaxed);
509    }
510
511    #[inline]
512    pub fn on_drop_queued_user(&self) {
513        #[cfg(debug_assertions)]
514        {
515            let prev = self.user_len.fetch_sub(1, Ordering::Relaxed);
516            debug_assert!(
517                prev > 0,
518                "MailboxMeter user_len underflow detected while dropping queued messages"
519            );
520        }
521        #[cfg(not(debug_assertions))]
522        {
523            self.user_len.fetch_sub(1, Ordering::Relaxed);
524        }
525    }
526
527    #[inline]
528    pub fn on_drop_queued_ctl(&self) {
529        #[cfg(debug_assertions)]
530        {
531            let prev = self.ctl_len.fetch_sub(1, Ordering::Relaxed);
532            debug_assert!(
533                prev > 0,
534                "MailboxMeter ctl_len underflow detected while dropping queued control"
535            );
536        }
537        #[cfg(not(debug_assertions))]
538        {
539            self.ctl_len.fetch_sub(1, Ordering::Relaxed);
540        }
541    }
542
543    #[inline]
544    pub fn on_process_start(&self) {
545        self.inflight.fetch_add(1, Ordering::Relaxed);
546    }
547
548    #[inline]
549    pub fn on_process_end(&self) {
550        #[cfg(debug_assertions)]
551        {
552            let prev = self.inflight.fetch_sub(1, Ordering::Relaxed);
553            debug_assert!(prev > 0, "inflight underflow detected");
554        }
555        #[cfg(not(debug_assertions))]
556        {
557            self.inflight.fetch_sub(1, Ordering::Relaxed);
558        }
559        self.last_done_ms.store(self.now_ms(), Ordering::Relaxed);
560    }
561
562    #[inline]
563    pub fn user_len(&self) -> usize {
564        self.user_len.load(Ordering::Relaxed)
565    }
566
567    #[inline]
568    pub fn ctl_len(&self) -> usize {
569        self.ctl_len.load(Ordering::Relaxed)
570    }
571
572    #[inline]
573    pub fn len(&self) -> usize {
574        self.user_len() + self.ctl_len()
575    }
576
577    #[inline]
578    pub fn is_empty(&self) -> bool {
579        self.len() == 0
580    }
581
582    #[inline]
583    pub fn inflight_count(&self) -> usize {
584        self.inflight.load(Ordering::Relaxed)
585    }
586
587    #[inline]
588    pub fn idle_for(&self) -> Duration {
589        let now = self.now_ms();
590        let last = self.last_done_ms.load(Ordering::Relaxed);
591        Duration::from_millis(now.saturating_sub(last))
592    }
593
594    #[inline]
595    pub fn silence_for(&self) -> Duration {
596        let now = self.now_ms();
597        let last = self.last_seen_ms.load(Ordering::Relaxed);
598        Duration::from_millis(now.saturating_sub(last))
599    }
600
601    #[inline]
602    pub fn incarnation(&self) -> u64 {
603        self.incarnation.load(Ordering::Relaxed)
604    }
605
606    #[inline]
607    pub fn inc_incarnation(&self) {
608        self.incarnation.fetch_add(1, Ordering::Relaxed);
609    }
610
611    #[inline]
612    pub fn is_closed(&self) -> bool {
613        self.closed.load(Ordering::Relaxed)
614    }
615
616    #[inline]
617    pub fn set_closed(&self, closed: bool) {
618        self.closed.store(closed, Ordering::Relaxed);
619    }
620
621    #[inline]
622    pub fn is_terminated(&self) -> bool {
623        self.terminated.load(Ordering::Relaxed)
624    }
625
626    #[inline]
627    pub fn set_terminated(&self, terminated: bool) {
628        self.terminated.store(terminated, Ordering::Relaxed);
629    }
630
631    #[inline]
632    pub fn is_overloaded(&self) -> bool {
633        self.overloaded.load(Ordering::Relaxed)
634    }
635
636    #[inline]
637    pub fn set_overloaded(&self, overloaded: bool) {
638        self.overloaded.store(overloaded, Ordering::Relaxed);
639    }
640}
641
642pub struct ProcessGuard<'a> {
643    meter: &'a MailboxMeter,
644}
645
646impl<'a> ProcessGuard<'a> {
647    pub fn new(meter: &'a MailboxMeter) -> Self {
648        meter.on_process_start();
649        Self { meter }
650    }
651}
652
653impl Drop for ProcessGuard<'_> {
654    fn drop(&mut self) {
655        self.meter.on_process_end();
656    }
657}
658
659pub struct MailboxTx<M> {
660    user_tx: mpsc::Sender<M>,
661    ctl_tx: mpsc::Sender<ControlMsg>,
662    meter: Arc<MailboxMeter>,
663    pid: Pid,
664    overload: OverloadConfig,
665}
666
667impl<M> Clone for MailboxTx<M> {
668    fn clone(&self) -> Self {
669        Self {
670            user_tx: self.user_tx.clone(),
671            ctl_tx: self.ctl_tx.clone(),
672            meter: self.meter.clone(),
673            pid: self.pid,
674            overload: self.overload.clone(),
675        }
676    }
677}
678
679impl<M> MailboxTx<M> {
680    pub fn new(
681        user_tx: mpsc::Sender<M>,
682        ctl_tx: mpsc::Sender<ControlMsg>,
683        meter: Arc<MailboxMeter>,
684        pid: Pid,
685        overload: OverloadConfig,
686    ) -> Self {
687        Self {
688            user_tx,
689            ctl_tx,
690            meter,
691            pid,
692            overload,
693        }
694    }
695
696    pub fn pid(&self) -> Pid {
697        self.pid
698    }
699
700    fn should_reject_overload(&self, len: usize) -> bool {
701        if !matches!(self.overload.policy, OverloadPolicy::Reject) {
702            return false;
703        }
704
705        if self.overload.high == usize::MAX {
706            return false;
707        }
708
709        let low = self.overload.low.min(self.overload.high);
710        if self.meter.is_overloaded() {
711            if low != usize::MAX && len <= low {
712                self.meter.set_overloaded(false);
713            } else {
714                return true;
715            }
716        }
717
718        if len >= self.overload.high {
719            self.meter.set_overloaded(true);
720            return true;
721        }
722
723        false
724    }
725
726    pub async fn send_wait(&self, msg: M) -> Result<(), TellError> {
727        if self.should_reject_overload(self.meter.user_len()) {
728            return Err(TellError::Full);
729        }
730        let permit = self
731            .user_tx
732            .reserve()
733            .await
734            .map_err(|_| TellError::Closed)?;
735        self.meter.on_send_ok_user();
736        permit.send(msg);
737        Ok(())
738    }
739
740    pub fn try_send(&self, msg: M) -> Result<(), TellError> {
741        if self.should_reject_overload(self.meter.user_len()) {
742            return Err(TellError::Full);
743        }
744        match self.user_tx.try_send(msg) {
745            Ok(_) => {
746                self.meter.on_send_ok_user();
747                Ok(())
748            }
749            Err(mpsc::error::TrySendError::Full(_)) => Err(TellError::Full),
750            Err(mpsc::error::TrySendError::Closed(_)) => Err(TellError::Closed),
751        }
752    }
753
754    pub fn try_send_control(&self, msg: ControlMsg) -> Result<(), TellError> {
755        match self.ctl_tx.try_send(msg) {
756            Ok(_) => {
757                self.meter.on_send_ok_ctl();
758                Ok(())
759            }
760            Err(mpsc::error::TrySendError::Full(_)) => Err(TellError::Full),
761            Err(mpsc::error::TrySendError::Closed(_)) => Err(TellError::Closed),
762        }
763    }
764
765    pub async fn send_control_wait(&self, msg: ControlMsg) -> Result<(), TellError> {
766        let permit = self.ctl_tx.reserve().await.map_err(|_| TellError::Closed)?;
767        self.meter.on_send_ok_ctl();
768        permit.send(msg);
769        Ok(())
770    }
771
772    pub fn stop(&self) -> Result<(), TellError> {
773        self.try_send_control(ControlMsg::Stop {
774            reason: StopSignalReason::User,
775        })
776    }
777
778    pub async fn stop_wait(&self) -> Result<(), TellError> {
779        self.send_control_wait(ControlMsg::Stop {
780            reason: StopSignalReason::User,
781        })
782        .await
783    }
784
785    pub fn len(&self) -> usize {
786        self.meter.user_len()
787    }
788
789    pub fn is_empty(&self) -> bool {
790        self.len() == 0
791    }
792
793    pub fn control_len(&self) -> usize {
794        self.meter.ctl_len()
795    }
796
797    pub fn inflight_count(&self) -> usize {
798        self.meter.inflight_count()
799    }
800
801    pub fn idle_for(&self) -> Duration {
802        self.meter.idle_for()
803    }
804
805    pub fn silence_for(&self) -> Duration {
806        self.meter.silence_for()
807    }
808
809    pub fn incarnation(&self) -> u64 {
810        self.meter.incarnation()
811    }
812
813    pub fn is_closed(&self) -> bool {
814        self.meter.is_closed()
815    }
816
817    pub fn is_terminated(&self) -> bool {
818        self.meter.is_terminated()
819    }
820
821    pub async fn ask_wait<F, R>(&self, factory: F) -> Result<R, AskError>
822    where
823        F: FnOnce(tokio::sync::oneshot::Sender<R>) -> M,
824    {
825        let (tx, rx) = tokio::sync::oneshot::channel();
826        let msg = factory(tx);
827        self.send_wait(msg).await?;
828        rx.await.map_err(|_| AskError::Canceled)
829    }
830
831    pub async fn ask_wait_timeout<F, R>(&self, factory: F, timeout: Duration) -> Result<R, AskError>
832    where
833        F: FnOnce(tokio::sync::oneshot::Sender<R>) -> M,
834    {
835        tokio::time::timeout(timeout, self.ask_wait(factory))
836            .await
837            .map_err(|_| AskError::Timeout)?
838    }
839}
840
841pub struct SpawnHandle<R> {
842    actor_ref: R,
843    handle: SharedHandle,
844    pid: Pid,
845}
846
847impl<R> SpawnHandle<R> {
848    pub fn new(actor_ref: R, handle: tokio::task::JoinHandle<()>, pid: Pid) -> Self {
849        Self {
850            actor_ref,
851            handle: Arc::new(Mutex::new(Some(handle))),
852            pid,
853        }
854    }
855
856    pub fn new_shared(actor_ref: R, handle: SharedHandle, pid: Pid) -> Self {
857        Self {
858            actor_ref,
859            handle,
860            pid,
861        }
862    }
863
864    pub fn actor(&self) -> R
865    where
866        R: Clone,
867    {
868        self.actor_ref.clone()
869    }
870
871    pub fn pid(&self) -> Pid {
872        self.pid
873    }
874
875    pub fn shared_handle(&self) -> SharedHandle {
876        self.handle.clone()
877    }
878
879    pub async fn join(&self) -> Result<(), tokio::task::JoinError> {
880        let handle = self.handle.lock().take();
881        if let Some(h) = handle {
882            h.await
883        } else {
884            Ok(())
885        }
886    }
887}