1use parking_lot::Mutex;
2use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
3use std::sync::Arc;
4use std::time::{Duration, SystemTime};
5use tokio::sync::{mpsc, oneshot};
6use tokio::time::Instant;
7
8pub use async_trait;
9pub use axactor_macros::actor;
10
11pub type SharedHandle = Arc<Mutex<Option<tokio::task::JoinHandle<()>>>>;
12
13pub use rt::{Actor, System};
14pub mod cluster;
15pub mod obs;
16pub mod ref_api;
17pub mod registry;
18pub mod rt;
19pub mod supervisor;
20pub use ref_api::{
21 Addr, AnyAskError, AnyMonitorError, AnyRef, AnyTellError, LocalContractRef, LocalMonitorError,
22 LocalRef, RefContract, RemoteRef, RemoteRefError, RemoteSafe, SimdJsonCodec, WireCodec,
23 WireCodecError,
24};
25pub use registry::RegistryError;
26
27static PID_SEQ: AtomicU64 = AtomicU64::new(1);
28
29#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
30pub struct Pid(pub u64);
31
32impl Pid {
33 pub const INVALID: Self = Self(0);
34
35 pub fn next() -> Self {
36 Self(PID_SEQ.fetch_add(1, Ordering::Relaxed))
37 }
38}
39
40impl std::fmt::Display for Pid {
41 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
42 write!(f, "{}", self.0)
43 }
44}
45
46pub type MonitorId = u64;
47
48#[derive(Debug, Clone, PartialEq, Eq)]
49pub enum ExitReason {
50 Normal,
51 Shutdown,
52 Panic,
53 Killed,
54 LeaseLost,
55 NodeDown,
56 NoSuchActor,
57 RestartIntensityExceeded,
58 ProtocolViolation,
59 Other(String),
60}
61
62#[derive(Debug, Clone, Copy, PartialEq, Eq)]
63pub enum StopSignalReason {
64 User,
65 Supervisor,
66 Shutdown,
67 LinkPropagation,
68 LeaseLost,
69}
70
71#[derive(Debug, thiserror::Error, Clone, PartialEq, Eq)]
72pub enum SnapshotError {
73 #[error("snapshot not supported")]
74 Unsupported,
75 #[error("snapshot failed: {0}")]
76 Failed(String),
77}
78
79#[derive(Debug)]
80pub enum ControlMsg {
81 Stop {
82 reason: StopSignalReason,
83 },
84 ExitSignal {
85 from: Pid,
86 reason: ExitReason,
87 },
88 Down {
89 monitor_id: MonitorId,
90 pid: Pid,
91 reason: ExitReason,
92 },
93 LeaseLost {
94 registry_id: String,
95 shard_id: usize,
96 epoch: u64,
97 },
98 GetSnapshot {
99 reply: oneshot::Sender<Result<Option<Vec<u8>>, SnapshotError>>,
100 },
101 Ping {
102 reply: oneshot::Sender<()>,
103 },
104}
105
106pub trait MessageKind {
107 fn kind(&self) -> &'static str;
108}
109
110impl MessageKind for () {
111 fn kind(&self) -> &'static str {
112 "()"
113 }
114}
115
116pub trait RefMetrics: Send + Sync {
117 fn mailbox_len(&self) -> usize;
118 fn control_mailbox_len(&self) -> usize {
119 0
120 }
121 fn inflight_count(&self) -> usize;
122 fn idle_for(&self) -> Duration;
123 fn silence_for(&self) -> Duration;
124 fn incarnation(&self) -> u64;
125 fn is_closed(&self) -> bool;
126 fn is_terminated(&self) -> bool;
127 fn pid(&self) -> Pid {
128 Pid::INVALID
129 }
130
131 fn stop(&self) -> Result<(), TellError>;
132}
133
134#[derive(Debug, thiserror::Error, Clone, Copy, PartialEq, Eq)]
135pub enum AskError {
136 #[error("Mailbox full")]
137 Full,
138 #[error("Actor closed")]
139 Closed,
140 #[error("Request timed out")]
141 Timeout,
142 #[error("Response canceled")]
143 Canceled,
144}
145
146#[derive(Debug, thiserror::Error, Clone, Copy, PartialEq, Eq)]
147pub enum TellError {
148 #[error("Mailbox full")]
149 Full,
150 #[error("Actor closed")]
151 Closed,
152}
153
154pub trait SpawnGuard: Send + Sync + 'static {}
155impl<T: Send + Sync + 'static> SpawnGuard for T {}
156
157#[derive(Debug, thiserror::Error, Clone)]
158pub enum SpawnError {
159 #[error("Actor name already exists: {0}")]
160 NameTaken(String),
161 #[error("spawn(actor, ..) cannot restart; use spawn_with(...) for restart policies")]
162 NotRestartable,
163 #[error("system is shutting down")]
164 ShuttingDown,
165}
166
167#[derive(Debug, Clone, PartialEq, Eq)]
168pub enum ActorStopReason {
169 Normal,
170 Panic,
171 FactoryPanic,
172 LifecycleHookPanic,
173 ContextStop,
174 MailboxClosed,
175 ShutdownImmediate,
176 ShutdownGraceful,
177 ShutdownDeadlineExceeded,
178}
179
180#[derive(Debug, Clone, PartialEq, Eq)]
181pub enum ActorLifecycleEventKind {
182 Started,
183 Restarted {
184 incarnation: u64,
185 },
186 Stopped {
187 reason: ActorStopReason,
188 incarnation: u64,
189 },
190}
191
192#[derive(Debug, Clone, PartialEq, Eq)]
193pub struct ActorLifecycleEvent {
194 pub actor: String,
195 pub at: SystemTime,
196 pub kind: ActorLifecycleEventKind,
197}
198
199impl From<TellError> for AskError {
200 fn from(e: TellError) -> Self {
201 match e {
202 TellError::Full => AskError::Full,
203 TellError::Closed => AskError::Closed,
204 }
205 }
206}
207
208#[cfg(feature = "axum")]
209impl axum::response::IntoResponse for AskError {
210 fn into_response(self) -> axum::response::Response {
211 let status = match self {
212 AskError::Full => axum::http::StatusCode::SERVICE_UNAVAILABLE,
213 AskError::Closed => axum::http::StatusCode::SERVICE_UNAVAILABLE,
214 AskError::Timeout => axum::http::StatusCode::GATEWAY_TIMEOUT,
215 AskError::Canceled => axum::http::StatusCode::INTERNAL_SERVER_ERROR,
216 };
217 (status, self.to_string()).into_response()
218 }
219}
220
221#[cfg(feature = "axum")]
222impl axum::response::IntoResponse for TellError {
223 fn into_response(self) -> axum::response::Response {
224 let status = match self {
225 TellError::Full => axum::http::StatusCode::SERVICE_UNAVAILABLE,
226 TellError::Closed => axum::http::StatusCode::SERVICE_UNAVAILABLE,
227 };
228 (status, self.to_string()).into_response()
229 }
230}
231
232#[derive(Clone)]
233pub struct OverloadConfig {
234 pub high: usize,
235 pub low: usize,
236 pub policy: OverloadPolicy,
237}
238
239impl Default for OverloadConfig {
240 fn default() -> Self {
241 Self {
242 high: usize::MAX,
243 low: usize::MAX,
244 policy: OverloadPolicy::Reject,
245 }
246 }
247}
248
249#[derive(Debug, Clone, Copy, PartialEq, Eq)]
250pub enum OverloadPolicy {
251 Reject,
252 Block,
253}
254
255#[derive(Clone)]
256pub struct SpawnConfig {
257 pub name: String,
258 pub capacity: usize,
259 pub control_capacity: usize,
260 pub restart_policy: RestartPolicy,
261 pub restart_mailbox_policy: RestartMailboxPolicy,
262 pub shutdown_mode: ShutdownMode,
263 pub overload: OverloadConfig,
264 pub initial_snapshot: Option<Vec<u8>>,
265 pub extra_guards: Vec<Arc<dyn SpawnGuard>>,
266}
267
268impl SpawnConfig {
269 pub fn new(name: impl Into<String>, capacity: usize) -> Self {
270 Self {
271 name: name.into(),
272 capacity,
273 control_capacity: capacity.clamp(8, 1024),
274 restart_policy: RestartPolicy::None,
275 restart_mailbox_policy: RestartMailboxPolicy::Keep,
276 shutdown_mode: ShutdownMode::Immediate,
277 overload: OverloadConfig::default(),
278 initial_snapshot: None,
279 extra_guards: Vec::new(),
280 }
281 }
282
283 pub fn new_stateful(name: impl Into<String>, capacity: usize) -> Self {
286 Self::new(name, capacity).restart_mailbox_drain_and_drop()
287 }
288
289 pub fn push_guard(&mut self, guard: Arc<dyn SpawnGuard>) {
290 self.extra_guards.push(guard);
291 }
292
293 pub fn restart_on_panic(
294 mut self,
295 max_restarts: usize,
296 min_backoff: Duration,
297 max_backoff: Duration,
298 ) -> Self {
299 self.restart_policy = RestartPolicy::OnPanic {
300 max_restarts,
301 min_backoff,
302 max_backoff,
303 };
304 self
305 }
306
307 pub fn shutdown_graceful(mut self, deadline: Duration) -> Self {
308 self.shutdown_mode = ShutdownMode::Graceful { deadline };
309 self
310 }
311
312 pub fn restart_mailbox_policy(mut self, policy: RestartMailboxPolicy) -> Self {
313 self.restart_mailbox_policy = policy;
314 self
315 }
316
317 pub fn restart_mailbox_drain_and_drop(mut self) -> Self {
318 self.restart_mailbox_policy = RestartMailboxPolicy::DrainAndDrop;
319 self
320 }
321
322 pub fn restart_mailbox_keep(mut self) -> Self {
323 self.restart_mailbox_policy = RestartMailboxPolicy::Keep;
324 self
325 }
326
327 pub fn control_capacity(mut self, capacity: usize) -> Self {
328 self.control_capacity = capacity.max(1);
329 self
330 }
331
332 pub fn overload(mut self, overload: OverloadConfig) -> Self {
333 self.overload = overload;
334 self
335 }
336
337 pub fn initial_snapshot(mut self, snapshot: Vec<u8>) -> Self {
338 self.initial_snapshot = Some(snapshot);
339 self
340 }
341}
342
343#[derive(Debug, Clone, Copy, PartialEq, Eq)]
344pub enum RestartPolicy {
345 None,
346 OnPanic {
347 max_restarts: usize,
348 min_backoff: Duration,
349 max_backoff: Duration,
350 },
351}
352
353#[derive(Debug, Clone, Copy, PartialEq, Eq)]
354pub enum RestartMailboxPolicy {
355 Keep,
356 DrainAndDrop,
357}
358
359#[derive(Debug, Clone, Copy, PartialEq, Eq)]
360pub enum ShutdownMode {
361 Immediate,
362 Graceful { deadline: Duration },
363}
364
365pub struct Context {
366 stop_requested: bool,
367 trap_exit: bool,
368 pid: Pid,
369 stop_exit_reason: Option<ExitReason>,
370}
371
372impl Default for Context {
373 fn default() -> Self {
374 Self::new()
375 }
376}
377
378impl Context {
379 pub fn new() -> Self {
380 Self {
381 stop_requested: false,
382 trap_exit: false,
383 pid: Pid::INVALID,
384 stop_exit_reason: None,
385 }
386 }
387
388 pub fn stop(&mut self) {
389 self.stop_requested = true;
390 }
391
392 pub fn stop_with_exit(&mut self, reason: ExitReason) {
393 self.stop_requested = true;
394 self.stop_exit_reason = Some(reason);
395 }
396
397 pub fn is_stop_requested(&self) -> bool {
398 self.stop_requested
399 }
400
401 pub fn trap_exit(&mut self, enabled: bool) {
402 self.trap_exit = enabled;
403 }
404
405 pub fn is_trap_exit(&self) -> bool {
406 self.trap_exit
407 }
408
409 pub fn pid(&self) -> Pid {
410 self.pid
411 }
412
413 pub(crate) fn set_pid(&mut self, pid: Pid) {
414 self.pid = pid;
415 }
416
417 pub(crate) fn take_stop_exit_reason(&mut self) -> Option<ExitReason> {
418 self.stop_exit_reason.take()
419 }
420}
421
422pub struct MailboxMeter {
423 user_len: AtomicUsize,
424 ctl_len: AtomicUsize,
425 inflight: AtomicUsize,
426 base: Instant,
427 last_seen_ms: AtomicU64,
428 last_done_ms: AtomicU64,
429 incarnation: AtomicU64,
430 closed: std::sync::atomic::AtomicBool,
431 terminated: std::sync::atomic::AtomicBool,
432 overloaded: std::sync::atomic::AtomicBool,
433}
434
435impl Default for MailboxMeter {
436 fn default() -> Self {
437 Self::new()
438 }
439}
440
441impl MailboxMeter {
442 pub fn new() -> Self {
443 let base = Instant::now();
444 Self {
445 user_len: AtomicUsize::new(0),
446 ctl_len: AtomicUsize::new(0),
447 inflight: AtomicUsize::new(0),
448 base,
449 last_seen_ms: AtomicU64::new(0),
450 last_done_ms: AtomicU64::new(0),
451 incarnation: AtomicU64::new(1),
452 closed: std::sync::atomic::AtomicBool::new(false),
453 terminated: std::sync::atomic::AtomicBool::new(false),
454 overloaded: std::sync::atomic::AtomicBool::new(false),
455 }
456 }
457
458 #[inline]
459 pub fn on_startup(&self) {
460 let now = self.now_ms();
461 self.last_seen_ms.store(now, Ordering::Relaxed);
462 self.last_done_ms.store(now, Ordering::Relaxed);
463 self.overloaded.store(false, Ordering::Relaxed);
464 }
465
466 #[inline]
467 fn now_ms(&self) -> u64 {
468 self.base.elapsed().as_millis() as u64
469 }
470
471 #[inline]
472 pub fn on_send_ok_user(&self) {
473 self.user_len.fetch_add(1, Ordering::Relaxed);
474 self.last_seen_ms.store(self.now_ms(), Ordering::Relaxed);
475 }
476
477 #[inline]
478 pub fn on_send_ok_ctl(&self) {
479 self.ctl_len.fetch_add(1, Ordering::Relaxed);
480 self.last_seen_ms.store(self.now_ms(), Ordering::Relaxed);
481 }
482
483 #[inline]
484 pub fn on_recv_user(&self) {
485 #[cfg(debug_assertions)]
486 {
487 let prev = self.user_len.fetch_sub(1, Ordering::Relaxed);
488 debug_assert!(prev > 0, "MailboxMeter user_len underflow detected");
489 }
490 #[cfg(not(debug_assertions))]
491 {
492 self.user_len.fetch_sub(1, Ordering::Relaxed);
493 }
494 self.last_seen_ms.store(self.now_ms(), Ordering::Relaxed);
495 }
496
497 #[inline]
498 pub fn on_recv_ctl(&self) {
499 #[cfg(debug_assertions)]
500 {
501 let prev = self.ctl_len.fetch_sub(1, Ordering::Relaxed);
502 debug_assert!(prev > 0, "MailboxMeter ctl_len underflow detected");
503 }
504 #[cfg(not(debug_assertions))]
505 {
506 self.ctl_len.fetch_sub(1, Ordering::Relaxed);
507 }
508 self.last_seen_ms.store(self.now_ms(), Ordering::Relaxed);
509 }
510
511 #[inline]
512 pub fn on_drop_queued_user(&self) {
513 #[cfg(debug_assertions)]
514 {
515 let prev = self.user_len.fetch_sub(1, Ordering::Relaxed);
516 debug_assert!(
517 prev > 0,
518 "MailboxMeter user_len underflow detected while dropping queued messages"
519 );
520 }
521 #[cfg(not(debug_assertions))]
522 {
523 self.user_len.fetch_sub(1, Ordering::Relaxed);
524 }
525 }
526
527 #[inline]
528 pub fn on_drop_queued_ctl(&self) {
529 #[cfg(debug_assertions)]
530 {
531 let prev = self.ctl_len.fetch_sub(1, Ordering::Relaxed);
532 debug_assert!(
533 prev > 0,
534 "MailboxMeter ctl_len underflow detected while dropping queued control"
535 );
536 }
537 #[cfg(not(debug_assertions))]
538 {
539 self.ctl_len.fetch_sub(1, Ordering::Relaxed);
540 }
541 }
542
543 #[inline]
544 pub fn on_process_start(&self) {
545 self.inflight.fetch_add(1, Ordering::Relaxed);
546 }
547
548 #[inline]
549 pub fn on_process_end(&self) {
550 #[cfg(debug_assertions)]
551 {
552 let prev = self.inflight.fetch_sub(1, Ordering::Relaxed);
553 debug_assert!(prev > 0, "inflight underflow detected");
554 }
555 #[cfg(not(debug_assertions))]
556 {
557 self.inflight.fetch_sub(1, Ordering::Relaxed);
558 }
559 self.last_done_ms.store(self.now_ms(), Ordering::Relaxed);
560 }
561
562 #[inline]
563 pub fn user_len(&self) -> usize {
564 self.user_len.load(Ordering::Relaxed)
565 }
566
567 #[inline]
568 pub fn ctl_len(&self) -> usize {
569 self.ctl_len.load(Ordering::Relaxed)
570 }
571
572 #[inline]
573 pub fn len(&self) -> usize {
574 self.user_len() + self.ctl_len()
575 }
576
577 #[inline]
578 pub fn is_empty(&self) -> bool {
579 self.len() == 0
580 }
581
582 #[inline]
583 pub fn inflight_count(&self) -> usize {
584 self.inflight.load(Ordering::Relaxed)
585 }
586
587 #[inline]
588 pub fn idle_for(&self) -> Duration {
589 let now = self.now_ms();
590 let last = self.last_done_ms.load(Ordering::Relaxed);
591 Duration::from_millis(now.saturating_sub(last))
592 }
593
594 #[inline]
595 pub fn silence_for(&self) -> Duration {
596 let now = self.now_ms();
597 let last = self.last_seen_ms.load(Ordering::Relaxed);
598 Duration::from_millis(now.saturating_sub(last))
599 }
600
601 #[inline]
602 pub fn incarnation(&self) -> u64 {
603 self.incarnation.load(Ordering::Relaxed)
604 }
605
606 #[inline]
607 pub fn inc_incarnation(&self) {
608 self.incarnation.fetch_add(1, Ordering::Relaxed);
609 }
610
611 #[inline]
612 pub fn is_closed(&self) -> bool {
613 self.closed.load(Ordering::Relaxed)
614 }
615
616 #[inline]
617 pub fn set_closed(&self, closed: bool) {
618 self.closed.store(closed, Ordering::Relaxed);
619 }
620
621 #[inline]
622 pub fn is_terminated(&self) -> bool {
623 self.terminated.load(Ordering::Relaxed)
624 }
625
626 #[inline]
627 pub fn set_terminated(&self, terminated: bool) {
628 self.terminated.store(terminated, Ordering::Relaxed);
629 }
630
631 #[inline]
632 pub fn is_overloaded(&self) -> bool {
633 self.overloaded.load(Ordering::Relaxed)
634 }
635
636 #[inline]
637 pub fn set_overloaded(&self, overloaded: bool) {
638 self.overloaded.store(overloaded, Ordering::Relaxed);
639 }
640}
641
642pub struct ProcessGuard<'a> {
643 meter: &'a MailboxMeter,
644}
645
646impl<'a> ProcessGuard<'a> {
647 pub fn new(meter: &'a MailboxMeter) -> Self {
648 meter.on_process_start();
649 Self { meter }
650 }
651}
652
653impl Drop for ProcessGuard<'_> {
654 fn drop(&mut self) {
655 self.meter.on_process_end();
656 }
657}
658
659pub struct MailboxTx<M> {
660 user_tx: mpsc::Sender<M>,
661 ctl_tx: mpsc::Sender<ControlMsg>,
662 meter: Arc<MailboxMeter>,
663 pid: Pid,
664 overload: OverloadConfig,
665}
666
667impl<M> Clone for MailboxTx<M> {
668 fn clone(&self) -> Self {
669 Self {
670 user_tx: self.user_tx.clone(),
671 ctl_tx: self.ctl_tx.clone(),
672 meter: self.meter.clone(),
673 pid: self.pid,
674 overload: self.overload.clone(),
675 }
676 }
677}
678
679impl<M> MailboxTx<M> {
680 pub fn new(
681 user_tx: mpsc::Sender<M>,
682 ctl_tx: mpsc::Sender<ControlMsg>,
683 meter: Arc<MailboxMeter>,
684 pid: Pid,
685 overload: OverloadConfig,
686 ) -> Self {
687 Self {
688 user_tx,
689 ctl_tx,
690 meter,
691 pid,
692 overload,
693 }
694 }
695
696 pub fn pid(&self) -> Pid {
697 self.pid
698 }
699
700 fn should_reject_overload(&self, len: usize) -> bool {
701 if !matches!(self.overload.policy, OverloadPolicy::Reject) {
702 return false;
703 }
704
705 if self.overload.high == usize::MAX {
706 return false;
707 }
708
709 let low = self.overload.low.min(self.overload.high);
710 if self.meter.is_overloaded() {
711 if low != usize::MAX && len <= low {
712 self.meter.set_overloaded(false);
713 } else {
714 return true;
715 }
716 }
717
718 if len >= self.overload.high {
719 self.meter.set_overloaded(true);
720 return true;
721 }
722
723 false
724 }
725
726 pub async fn send_wait(&self, msg: M) -> Result<(), TellError> {
727 if self.should_reject_overload(self.meter.user_len()) {
728 return Err(TellError::Full);
729 }
730 let permit = self
731 .user_tx
732 .reserve()
733 .await
734 .map_err(|_| TellError::Closed)?;
735 self.meter.on_send_ok_user();
736 permit.send(msg);
737 Ok(())
738 }
739
740 pub fn try_send(&self, msg: M) -> Result<(), TellError> {
741 if self.should_reject_overload(self.meter.user_len()) {
742 return Err(TellError::Full);
743 }
744 match self.user_tx.try_send(msg) {
745 Ok(_) => {
746 self.meter.on_send_ok_user();
747 Ok(())
748 }
749 Err(mpsc::error::TrySendError::Full(_)) => Err(TellError::Full),
750 Err(mpsc::error::TrySendError::Closed(_)) => Err(TellError::Closed),
751 }
752 }
753
754 pub fn try_send_control(&self, msg: ControlMsg) -> Result<(), TellError> {
755 match self.ctl_tx.try_send(msg) {
756 Ok(_) => {
757 self.meter.on_send_ok_ctl();
758 Ok(())
759 }
760 Err(mpsc::error::TrySendError::Full(_)) => Err(TellError::Full),
761 Err(mpsc::error::TrySendError::Closed(_)) => Err(TellError::Closed),
762 }
763 }
764
765 pub async fn send_control_wait(&self, msg: ControlMsg) -> Result<(), TellError> {
766 let permit = self.ctl_tx.reserve().await.map_err(|_| TellError::Closed)?;
767 self.meter.on_send_ok_ctl();
768 permit.send(msg);
769 Ok(())
770 }
771
772 pub fn stop(&self) -> Result<(), TellError> {
773 self.try_send_control(ControlMsg::Stop {
774 reason: StopSignalReason::User,
775 })
776 }
777
778 pub async fn stop_wait(&self) -> Result<(), TellError> {
779 self.send_control_wait(ControlMsg::Stop {
780 reason: StopSignalReason::User,
781 })
782 .await
783 }
784
785 pub fn len(&self) -> usize {
786 self.meter.user_len()
787 }
788
789 pub fn is_empty(&self) -> bool {
790 self.len() == 0
791 }
792
793 pub fn control_len(&self) -> usize {
794 self.meter.ctl_len()
795 }
796
797 pub fn inflight_count(&self) -> usize {
798 self.meter.inflight_count()
799 }
800
801 pub fn idle_for(&self) -> Duration {
802 self.meter.idle_for()
803 }
804
805 pub fn silence_for(&self) -> Duration {
806 self.meter.silence_for()
807 }
808
809 pub fn incarnation(&self) -> u64 {
810 self.meter.incarnation()
811 }
812
813 pub fn is_closed(&self) -> bool {
814 self.meter.is_closed()
815 }
816
817 pub fn is_terminated(&self) -> bool {
818 self.meter.is_terminated()
819 }
820
821 pub async fn ask_wait<F, R>(&self, factory: F) -> Result<R, AskError>
822 where
823 F: FnOnce(tokio::sync::oneshot::Sender<R>) -> M,
824 {
825 let (tx, rx) = tokio::sync::oneshot::channel();
826 let msg = factory(tx);
827 self.send_wait(msg).await?;
828 rx.await.map_err(|_| AskError::Canceled)
829 }
830
831 pub async fn ask_wait_timeout<F, R>(&self, factory: F, timeout: Duration) -> Result<R, AskError>
832 where
833 F: FnOnce(tokio::sync::oneshot::Sender<R>) -> M,
834 {
835 tokio::time::timeout(timeout, self.ask_wait(factory))
836 .await
837 .map_err(|_| AskError::Timeout)?
838 }
839}
840
841pub struct SpawnHandle<R> {
842 actor_ref: R,
843 handle: SharedHandle,
844 pid: Pid,
845}
846
847impl<R> SpawnHandle<R> {
848 pub fn new(actor_ref: R, handle: tokio::task::JoinHandle<()>, pid: Pid) -> Self {
849 Self {
850 actor_ref,
851 handle: Arc::new(Mutex::new(Some(handle))),
852 pid,
853 }
854 }
855
856 pub fn new_shared(actor_ref: R, handle: SharedHandle, pid: Pid) -> Self {
857 Self {
858 actor_ref,
859 handle,
860 pid,
861 }
862 }
863
864 pub fn actor(&self) -> R
865 where
866 R: Clone,
867 {
868 self.actor_ref.clone()
869 }
870
871 pub fn pid(&self) -> Pid {
872 self.pid
873 }
874
875 pub fn shared_handle(&self) -> SharedHandle {
876 self.handle.clone()
877 }
878
879 pub async fn join(&self) -> Result<(), tokio::task::JoinError> {
880 let handle = self.handle.lock().take();
881 if let Some(h) = handle {
882 h.await
883 } else {
884 Ok(())
885 }
886 }
887}