ractor/actor/actor_cell.rs
1// Copyright (c) Sean Lawlor
2//
3// This source code is licensed under both the MIT license found in the
4// LICENSE-MIT file in the root directory of this source tree.
5
6//! [ActorCell] is reference counted actor which can be passed around as needed
7//!
8//! This module contains all the functionality around the [ActorCell], including
9//! the internal properties, ports, states, etc. [ActorCell] is the basic primitive
10//! for references to a given actor and its communication channels
11
12use std::any::TypeId;
13use std::sync::Arc;
14
15#[cfg(feature = "async-std")]
16use futures::FutureExt;
17
18use super::actor_properties::MuxedMessage;
19use super::messages::Signal;
20use super::messages::StopMessage;
21use super::SupervisionEvent;
22use crate::actor::actor_properties::ActorProperties;
23use crate::concurrency::JoinHandle;
24use crate::concurrency::MpscUnboundedReceiver as InputPortReceiver;
25use crate::concurrency::OneshotReceiver;
26use crate::errors::MessagingErr;
27#[cfg(feature = "cluster")]
28use crate::message::SerializedMessage;
29use crate::Actor;
30use crate::ActorId;
31use crate::ActorName;
32use crate::ActorRef;
33use crate::Message;
34use crate::RactorErr;
35use crate::SpawnErr;
36
37/// [ActorStatus] represents the status of an actor's lifecycle
38#[derive(Debug, Clone, Eq, PartialEq, Copy, PartialOrd, Ord)]
39#[repr(u8)]
40pub enum ActorStatus {
41 /// Created, but not yet started
42 Unstarted = 0u8,
43 /// Starting
44 Starting = 1u8,
45 /// Executing (or waiting on messages)
46 Running = 2u8,
47 /// Upgrading
48 Upgrading = 3u8,
49 /// Draining
50 Draining = 4u8,
51 /// Stopping
52 Stopping = 5u8,
53 /// Dead
54 Stopped = 6u8,
55}
56
57/// Actor states where operations can continue to interact with an agent
58pub const ACTIVE_STATES: [ActorStatus; 3] = [
59 ActorStatus::Starting,
60 ActorStatus::Running,
61 ActorStatus::Upgrading,
62];
63
64/// The collection of ports an actor needs to listen to
65pub(crate) struct ActorPortSet {
66 /// The inner signal port
67 pub(crate) signal_rx: OneshotReceiver<Signal>,
68 /// The inner stop port
69 pub(crate) stop_rx: OneshotReceiver<StopMessage>,
70 /// The inner supervisor port
71 pub(crate) supervisor_rx: InputPortReceiver<SupervisionEvent>,
72 /// The inner message port
73 pub(crate) message_rx: InputPortReceiver<MuxedMessage>,
74}
75
76impl Drop for ActorPortSet {
77 fn drop(&mut self) {
78 // Close all the message ports and flush all the message queue backlogs.
79 // See: https://docs.rs/tokio/0.1.22/tokio/sync/mpsc/index.html#clean-shutdown
80 self.signal_rx.close();
81 self.stop_rx.close();
82 self.supervisor_rx.close();
83 self.message_rx.close();
84
85 while self.signal_rx.try_recv().is_ok() {}
86 while self.stop_rx.try_recv().is_ok() {}
87 while self.supervisor_rx.try_recv().is_ok() {}
88 while self.message_rx.try_recv().is_ok() {}
89 }
90}
91
92/// Messages that come in off an actor's port, with associated priority
93pub(crate) enum ActorPortMessage {
94 /// A signal message
95 Signal(Signal),
96 /// A stop message
97 Stop(StopMessage),
98 /// A supervision message
99 Supervision(SupervisionEvent),
100 /// A regular message
101 Message(MuxedMessage),
102}
103
104impl ActorPortSet {
105 /// Run a future beside the signal port, so that
106 /// the signal port can terminate the async work
107 ///
108 /// * `future` - The future to execute
109 ///
110 /// Returns [Ok(`TState`)] when the future completes without
111 /// signal interruption, [Err(Signal)] in the event the
112 /// signal interrupts the async work.
113 pub(crate) async fn run_with_signal<TState>(
114 &mut self,
115 future: impl std::future::Future<Output = TState>,
116 ) -> Result<TState, Signal>
117 where
118 TState: crate::State,
119 {
120 #[cfg(feature = "async-std")]
121 {
122 crate::concurrency::select! {
123 // supervision or message processing work
124 // can be interrupted by the signal port receiving
125 // a kill signal
126 signal = (&mut self.signal_rx).fuse() => {
127 Err(signal.unwrap_or(Signal::Kill))
128 }
129 new_state = future.fuse() => {
130 Ok(new_state)
131 }
132 }
133 }
134 #[cfg(not(feature = "async-std"))]
135 {
136 crate::concurrency::select! {
137 // supervision or message processing work
138 // can be interrupted by the signal port receiving
139 // a kill signal
140 signal = &mut self.signal_rx => {
141 Err(signal.unwrap_or(Signal::Kill))
142 }
143 new_state = future => {
144 Ok(new_state)
145 }
146 }
147 }
148 }
149
150 /// List to the input ports in priority. The priority of listening for messages is
151 /// 1. Signal port
152 /// 2. Stop port
153 /// 3. Supervision message port
154 /// 4. General message port
155 ///
156 /// Returns [Ok(ActorPortMessage)] on a successful message reception, [MessagingErr]
157 /// in the event any of the channels is closed.
158 pub(crate) async fn listen_in_priority(
159 &mut self,
160 ) -> Result<ActorPortMessage, MessagingErr<()>> {
161 #[cfg(feature = "async-std")]
162 {
163 crate::concurrency::select! {
164 signal = (&mut self.signal_rx).fuse() => {
165 signal.map(ActorPortMessage::Signal).map_err(|_| MessagingErr::ChannelClosed)
166 }
167 stop = (&mut self.stop_rx).fuse() => {
168 stop.map(ActorPortMessage::Stop).map_err(|_| MessagingErr::ChannelClosed)
169 }
170 supervision = self.supervisor_rx.recv().fuse() => {
171 supervision.map(ActorPortMessage::Supervision).ok_or(MessagingErr::ChannelClosed)
172 }
173 message = self.message_rx.recv().fuse() => {
174 message.map(ActorPortMessage::Message).ok_or(MessagingErr::ChannelClosed)
175 }
176 }
177 }
178 #[cfg(not(feature = "async-std"))]
179 {
180 crate::concurrency::select! {
181 signal = &mut self.signal_rx => {
182 signal.map(ActorPortMessage::Signal).map_err(|_| MessagingErr::ChannelClosed)
183 }
184 stop = &mut self.stop_rx => {
185 stop.map(ActorPortMessage::Stop).map_err(|_| MessagingErr::ChannelClosed)
186 }
187 supervision = self.supervisor_rx.recv() => {
188 supervision.map(ActorPortMessage::Supervision).ok_or(MessagingErr::ChannelClosed)
189 }
190 message = self.message_rx.recv() => {
191 message.map(ActorPortMessage::Message).ok_or(MessagingErr::ChannelClosed)
192 }
193 }
194 }
195 }
196}
197
198/// An [ActorCell] is a reference to an [Actor]'s communication channels
199/// and provides external access to send messages, stop, kill, and generally
200/// interactor with the underlying [Actor] process.
201///
202/// The input ports contained in the cell will return an error should the
203/// underlying actor have terminated and no longer exist.
204#[derive(Clone)]
205pub struct ActorCell {
206 pub(crate) inner: Arc<ActorProperties>,
207}
208
209impl std::fmt::Debug for ActorCell {
210 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
211 f.debug_struct("Actor")
212 .field("name", &self.get_name())
213 .field("id", &self.get_id())
214 .finish()
215 }
216}
217
218impl PartialEq for ActorCell {
219 fn eq(&self, other: &Self) -> bool {
220 other.get_id() == self.get_id()
221 }
222}
223
224impl Eq for ActorCell {}
225
226impl std::hash::Hash for ActorCell {
227 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
228 self.get_id().hash(state)
229 }
230}
231
232impl ActorCell {
233 /// Construct a new [ActorCell] pointing to an [super::Actor] and return the message reception channels as a [ActorPortSet]
234 ///
235 /// * `name` - Optional name for the actor
236 ///
237 /// Returns a tuple [(ActorCell, ActorPortSet)] to bootstrap the [crate::Actor]
238 pub(crate) fn new<TActor>(name: Option<ActorName>) -> Result<(Self, ActorPortSet), SpawnErr>
239 where
240 TActor: Actor,
241 {
242 let (props, rx1, rx2, rx3, rx4) = ActorProperties::new::<TActor>(name.clone());
243 let cell = Self {
244 inner: Arc::new(props),
245 };
246
247 #[cfg(feature = "cluster")]
248 {
249 // registry to the PID registry
250 crate::registry::pid_registry::register_pid(cell.get_id(), cell.clone())?;
251 }
252
253 if let Some(r_name) = name {
254 crate::registry::register(r_name, cell.clone())?;
255 }
256
257 Ok((
258 cell,
259 ActorPortSet {
260 signal_rx: rx1,
261 stop_rx: rx2,
262 supervisor_rx: rx3,
263 message_rx: rx4,
264 },
265 ))
266 }
267
268 /// Create a new remote actor, to be called from the `ractor_cluster` crate
269 #[cfg(feature = "cluster")]
270 pub(crate) fn new_remote<TActor>(
271 name: Option<ActorName>,
272 id: ActorId,
273 ) -> Result<(Self, ActorPortSet), SpawnErr>
274 where
275 TActor: Actor,
276 {
277 if id.is_local() {
278 return Err(SpawnErr::StartupFailed(From::from("Cannot create a new remote actor handler without the actor id being marked as a remote actor!")));
279 }
280
281 let (props, rx1, rx2, rx3, rx4) = ActorProperties::new_remote::<TActor>(name, id);
282 let cell = Self {
283 inner: Arc::new(props),
284 };
285 // NOTE: remote actors don't appear in the name registry
286 // if let Some(r_name) = name {
287 // crate::registry::register(r_name, cell.clone())?;
288 // }
289 Ok((
290 cell,
291 ActorPortSet {
292 signal_rx: rx1,
293 stop_rx: rx2,
294 supervisor_rx: rx3,
295 message_rx: rx4,
296 },
297 ))
298 }
299
300 /// Retrieve the [super::Actor]'s unique identifier [ActorId]
301 pub fn get_id(&self) -> ActorId {
302 self.inner.id
303 }
304
305 /// Retrieve the [super::Actor]'s name
306 pub fn get_name(&self) -> Option<ActorName> {
307 self.inner.name.clone()
308 }
309
310 /// Retrieve the current status of an [super::Actor]
311 ///
312 /// Returns the [super::Actor]'s current [ActorStatus]
313 pub fn get_status(&self) -> ActorStatus {
314 self.inner.get_status()
315 }
316
317 /// Identifies if this actor supports remote (dist) communication
318 ///
319 /// Returns [true] if the actor's messaging protocols support remote calls, [false] otherwise
320 #[cfg(feature = "cluster")]
321 pub fn supports_remoting(&self) -> bool {
322 self.inner.supports_remoting
323 }
324
325 /// Set the status of the [super::Actor]. If the status is set to
326 /// [ActorStatus::Stopping] or [ActorStatus::Stopped] the actor
327 /// will also be unenrolled from both the named registry ([crate::registry])
328 /// and the PG groups ([crate::pg]) if it's enrolled in any
329 ///
330 /// * `status` - The [ActorStatus] to set
331 pub(crate) fn set_status(&self, status: ActorStatus) {
332 // The actor is shut down — only run cleanup once, on the first transition
333 // to Stopping. This avoids redundant full-DashMap iterations on the
334 // Stopping → Stopped transition.
335 if (status == ActorStatus::Stopped || status == ActorStatus::Stopping)
336 && self.get_status() < ActorStatus::Stopping
337 {
338 #[cfg(feature = "cluster")]
339 {
340 // stop monitoring for updates
341 crate::registry::pid_registry::demonitor(self.get_id());
342 // unregistry from the PID registry
343 crate::registry::pid_registry::unregister_pid(self.get_id());
344 }
345 // If it's enrolled in the registry, remove it
346 if let Some(name) = self.get_name() {
347 crate::registry::unregister(name);
348 }
349 // Leave all + stop monitoring pg groups (if any)
350 crate::pg::demonitor_all(self.get_id());
351 crate::pg::leave_all(self.get_id());
352 }
353
354 // Fix for #254. We should only notify the stop listener AFTER post_stop
355 // has executed, which is when the state gets set to `Stopped`.
356 if status == ActorStatus::Stopped {
357 // notify whoever might be waiting on the stop signal
358 self.inner.notify_stop_listener();
359 }
360
361 self.inner.set_status(status)
362 }
363
364 /// Terminate this [super::Actor] and all it's children
365 pub(crate) fn terminate(&self) {
366 // we don't need to notify of exit if we're already stopping or stopped
367 if self.get_status() as u8 <= ActorStatus::Upgrading as u8 {
368 // kill myself immediately. Ignores failures, as a failure means either
369 // 1. we're already dead or
370 // 2. the channel is full of "signals"
371 self.kill();
372 }
373
374 // notify children they should die. They will unlink themselves from the supervisor
375 self.inner.tree.terminate_all_children();
376 }
377
378 /// Link this [super::Actor] to the provided supervisor
379 ///
380 /// * `supervisor` - The supervisor [super::Actor] of this actor
381 pub fn link(&self, supervisor: ActorCell) {
382 supervisor.inner.tree.insert_child(self.clone());
383 self.inner.tree.set_supervisor(supervisor);
384 }
385
386 /// Unlink this [super::Actor] from the supervisor if it's
387 /// currently linked (if self's supervisor is `supervisor`)
388 ///
389 /// * `supervisor` - The supervisor to unlink this [super::Actor] from
390 pub fn unlink(&self, supervisor: ActorCell) {
391 if self.inner.tree.is_child_of(supervisor.get_id()) {
392 supervisor.inner.tree.remove_child(self.get_id());
393 self.inner.tree.clear_supervisor();
394 }
395 }
396
397 /// Clear the supervisor field
398 pub(crate) fn clear_supervisor(&self) {
399 self.inner.tree.clear_supervisor();
400 }
401
402 /// Monitor the provided [super::Actor] for supervision events. An actor in `ractor` can
403 /// only have a single supervisor, denoted by the `link` function, however they
404 /// may have multiple `monitors`. Monitor's receive copies of the [SupervisionEvent]s,
405 /// with non-cloneable information removed.
406 ///
407 /// * `who`: The actor to monitor
408 #[cfg(feature = "monitors")]
409 pub fn monitor(&self, who: ActorCell) {
410 who.inner.tree.set_monitor(self.clone());
411 }
412
413 /// Stop monitoring the provided [super::Actor] for supervision events.
414 ///
415 /// * `who`: The actor to stop monitoring
416 #[cfg(feature = "monitors")]
417 pub fn unmonitor(&self, who: ActorCell) {
418 who.inner.tree.remove_monitor(self.get_id());
419 }
420
421 /// Kill this [super::Actor] forcefully (terminates async work)
422 pub fn kill(&self) {
423 let _ = self.inner.send_signal(Signal::Kill);
424 }
425
426 /// Kill this [super::Actor] forcefully (terminates async work)
427 /// and wait for the actor shutdown to complete
428 ///
429 /// * `timeout` - An optional timeout duration to wait for shutdown to occur
430 ///
431 /// Returns [Ok(())] upon the actor being stopped/shutdown. [Err(RactorErr::Messaging(_))] if the channel is closed
432 /// or dropped (which may indicate some other process is trying to shutdown this actor) or [Err(RactorErr::Timeout)]
433 /// if timeout was hit before the actor was successfully shut down (when set)
434 pub async fn kill_and_wait(
435 &self,
436 timeout: Option<crate::concurrency::Duration>,
437 ) -> Result<(), RactorErr<()>> {
438 if let Some(to) = timeout {
439 match crate::concurrency::timeout(to, self.inner.send_signal_and_wait(Signal::Kill))
440 .await
441 {
442 Err(_) => Err(RactorErr::Timeout),
443 Ok(Err(e)) => Err(e.into()),
444 Ok(_) => Ok(()),
445 }
446 } else {
447 Ok(self.inner.send_signal_and_wait(Signal::Kill).await?)
448 }
449 }
450
451 /// Stop this [super::Actor] gracefully (stopping message processing)
452 ///
453 /// * `reason` - An optional string reason why the stop is occurring
454 pub fn stop(&self, reason: Option<String>) {
455 // ignore failures, since that means the actor is dead already
456 let _ = self.inner.send_stop(reason);
457 }
458
459 /// Stop the [super::Actor] gracefully (stopping messaging processing)
460 /// and wait for the actor shutdown to complete
461 ///
462 /// * `reason` - An optional string reason why the stop is occurring
463 /// * `timeout` - An optional timeout duration to wait for shutdown to occur
464 ///
465 /// Returns [Ok(())] upon the actor being stopped/shutdown. [Err(RactorErr::Messaging(_))] if the channel is closed
466 /// or dropped (which may indicate some other process is trying to shutdown this actor) or [Err(RactorErr::Timeout)]
467 /// if timeout was hit before the actor was successfully shut down (when set)
468 pub async fn stop_and_wait(
469 &self,
470 reason: Option<String>,
471 timeout: Option<crate::concurrency::Duration>,
472 ) -> Result<(), RactorErr<StopMessage>> {
473 if let Some(to) = timeout {
474 match crate::concurrency::timeout(to, self.inner.send_stop_and_wait(reason)).await {
475 Err(_) => Err(RactorErr::Timeout),
476 Ok(Err(e)) => Err(e.into()),
477 Ok(_) => Ok(()),
478 }
479 } else {
480 Ok(self.inner.send_stop_and_wait(reason).await?)
481 }
482 }
483
484 /// Wait for the actor to exit, optionally within a timeout
485 ///
486 /// * `timeout`: If supplied, the amount of time to wait before
487 /// returning an error and cancelling the wait future.
488 ///
489 /// IMPORTANT: If the timeout is hit, the actor is still running.
490 /// You should wait again for its exit.
491 pub async fn wait(
492 &self,
493 timeout: Option<crate::concurrency::Duration>,
494 ) -> Result<(), crate::concurrency::Timeout> {
495 if let Some(to) = timeout {
496 crate::concurrency::timeout(to, self.inner.wait()).await
497 } else {
498 self.inner.wait().await;
499 Ok(())
500 }
501 }
502
503 /// Send a supervisor event to the supervisory port
504 ///
505 /// * `message` - The [SupervisionEvent] to send to the supervisory port
506 ///
507 /// Returns [Ok(())] on successful message send, [Err(MessagingErr)] otherwise
508 pub(crate) fn send_supervisor_evt(
509 &self,
510 message: SupervisionEvent,
511 ) -> Result<(), MessagingErr<SupervisionEvent>> {
512 self.inner.send_supervisor_evt(message)
513 }
514
515 /// Send a strongly-typed message, constructing the boxed message on the fly
516 ///
517 /// Note: The type requirement of `TActor` assures that `TMsg` is the supported
518 /// message type for `TActor` such that we can't send boxed messages of an unsupported
519 /// type to the specified actor.
520 ///
521 /// * `message` - The message to send
522 ///
523 /// Returns [Ok(())] on successful message send, [Err(MessagingErr)] otherwise
524 pub fn send_message<TMessage>(&self, message: TMessage) -> Result<(), MessagingErr<TMessage>>
525 where
526 TMessage: Message,
527 {
528 self.inner.send_message::<TMessage>(message)
529 }
530
531 /// Drain the actor's message queue and when finished processing, terminate the actor.
532 ///
533 /// Any messages received after the drain marker but prior to shutdown will be rejected
534 pub fn drain(&self) -> Result<(), MessagingErr<()>> {
535 self.inner.drain()
536 }
537
538 /// Drain the actor's message queue and when finished processing, terminate the actor,
539 /// notifying on this handler that the actor has drained and exited (stopped).
540 ///
541 /// * `timeout`: The optional amount of time to wait for the drain to complete.
542 ///
543 /// Any messages received after the drain marker but prior to shutdown will be rejected
544 pub async fn drain_and_wait(
545 &self,
546 timeout: Option<crate::concurrency::Duration>,
547 ) -> Result<(), RactorErr<()>> {
548 if let Some(to) = timeout {
549 match crate::concurrency::timeout(to, self.inner.drain_and_wait()).await {
550 Err(_) => Err(RactorErr::Timeout),
551 Ok(Err(e)) => Err(e.into()),
552 Ok(_) => Ok(()),
553 }
554 } else {
555 Ok(self.inner.drain_and_wait().await?)
556 }
557 }
558
559 /// Send a serialized binary message to the actor.
560 ///
561 /// * `message` - The message to send
562 ///
563 /// Returns [Ok(())] on successful message send, [Err(MessagingErr)] otherwise
564 #[cfg(feature = "cluster")]
565 pub fn send_serialized(
566 &self,
567 message: SerializedMessage,
568 ) -> Result<(), Box<MessagingErr<SerializedMessage>>> {
569 self.inner.send_serialized(message)
570 }
571
572 /// Notify the supervisor and all monitors that a supervision event occurred.
573 /// Monitors receive a reduced copy of the supervision event which won't contain
574 /// the [crate::actor::BoxedState] and collapses the [crate::ActorProcessingErr]
575 /// exception to a [String]
576 ///
577 /// * `evt` - The event to send to this [super::Actor]'s supervisors
578 pub fn notify_supervisor(&self, evt: SupervisionEvent) {
579 self.inner.tree.notify_supervisor(evt)
580 }
581
582 /// Stop any children of this actor, not waiting for their exit, and threading
583 /// the optional reason to all children
584 ///
585 /// * `reason`: The stop reason to send to all the children
586 ///
587 /// This swallows and communication errors because if you can't send a message
588 /// to the child, it's dropped the message channel, and is dead/stopped already.
589 pub fn stop_children(&self, reason: Option<String>) {
590 self.inner.tree.stop_all_children(reason);
591 }
592
593 /// Tries to retrieve this actor's supervisor.
594 ///
595 /// Returns [None] if this actor has no supervisor at the given instance or
596 /// [Some(ActorCell)] supervisor if one is configured.
597 pub fn try_get_supervisor(&self) -> Option<ActorCell> {
598 self.inner.tree.try_get_supervisor()
599 }
600
601 /// Stop any children of this actor, and wait for their collective exit, optionally
602 /// threading the optional reason to all children
603 ///
604 /// * `reason`: The stop reason to send to all the children
605 /// * `timeout`: An optional timeout which is the maximum time to wait for the actor stop
606 /// operation to complete
607 ///
608 /// This swallows and communication errors because if you can't send a message
609 /// to the child, it's dropped the message channel, and is dead/stopped already.
610 pub async fn stop_children_and_wait(
611 &self,
612 reason: Option<String>,
613 timeout: Option<crate::concurrency::Duration>,
614 ) {
615 self.inner
616 .tree
617 .stop_all_children_and_wait(reason, timeout)
618 .await
619 }
620
621 /// Drain any children of this actor, not waiting for their exit
622 ///
623 /// This swallows and communication errors because if you can't send a message
624 /// to the child, it's dropped the message channel, and is dead/stopped already.
625 pub fn drain_children(&self) {
626 self.inner.tree.drain_all_children();
627 }
628
629 /// Drain any children of this actor, and wait for their collective exit
630 ///
631 /// * `timeout`: An optional timeout which is the maximum time to wait for the actor stop
632 /// operation to complete
633 pub async fn drain_children_and_wait(&self, timeout: Option<crate::concurrency::Duration>) {
634 self.inner.tree.drain_all_children_and_wait(timeout).await
635 }
636
637 /// Retrieve the supervised children of this actor (if any)
638 ///
639 /// Returns a [Vec] of [ActorCell]s which are the children that are
640 /// presently linked to this actor.
641 pub fn get_children(&self) -> Vec<ActorCell> {
642 self.inner.tree.get_children()
643 }
644
645 /// Retrieve the [TypeId] of this [ActorCell] which can be helpful
646 /// for quick type-checking.
647 ///
648 /// HOWEVER: Note this is an unstable identifier, and changes between
649 /// Rust releases and may not be stable over a network call.
650 pub fn get_type_id(&self) -> TypeId {
651 self.inner.type_id
652 }
653
654 /// Runtime check the message type of this actor, which only works for
655 /// local actors, as remote actors send serializable messages, and can't
656 /// have their message type runtime checked.
657 ///
658 /// Returns [None] if the actor is a remote actor, and we cannot perform a
659 /// runtime message type check. Otherwise [Some(true)] for the correct message
660 /// type or [Some(false)] for an incorrect type will returned.
661 pub fn is_message_type_of<TMessage: Message>(&self) -> Option<bool> {
662 if self.get_id().is_local() {
663 Some(self.get_type_id() == std::any::TypeId::of::<TMessage>())
664 } else {
665 None
666 }
667 }
668
669 /// Spawn an actor of the given type as a child of this actor, automatically starting the actor.
670 /// This [ActorCell] becomes the supervisor of the child actor.
671 ///
672 /// * `name`: A name to give the actor. Useful for global referencing or debug printing
673 /// * `handler` The implementation of Self
674 /// * `startup_args`: Arguments passed to the `pre_start` call of the [Actor] to facilitate startup and
675 /// initial state creation
676 ///
677 /// Returns a [Ok((ActorRef, JoinHandle<()>))] upon successful start, denoting the actor reference
678 /// along with the join handle which will complete when the actor terminates. Returns [Err(SpawnErr)] if
679 /// the actor failed to start
680 pub async fn spawn_linked<T: Actor>(
681 &self,
682 name: Option<String>,
683 handler: T,
684 startup_args: T::Arguments,
685 ) -> Result<(ActorRef<T::Msg>, JoinHandle<()>), SpawnErr> {
686 crate::actor::ActorRuntime::spawn_linked(name, handler, startup_args, self.clone()).await
687 }
688
689 // ================== Test Utilities ================== //
690
691 #[cfg(test)]
692 pub(crate) fn get_num_children(&self) -> usize {
693 self.inner.tree.get_num_children()
694 }
695
696 #[cfg(test)]
697 pub(crate) fn get_num_parents(&self) -> usize {
698 self.inner.tree.get_num_parents()
699 }
700}