Skip to main content

ractor/actor/
actor_cell.rs

1// Copyright (c) Sean Lawlor
2//
3// This source code is licensed under both the MIT license found in the
4// LICENSE-MIT file in the root directory of this source tree.
5
6//! [ActorCell] is reference counted actor which can be passed around as needed
7//!
8//! This module contains all the functionality around the [ActorCell], including
9//! the internal properties, ports, states, etc. [ActorCell] is the basic primitive
10//! for references to a given actor and its communication channels
11
12use std::any::TypeId;
13use std::sync::Arc;
14
15#[cfg(feature = "async-std")]
16use futures::FutureExt;
17
18use super::actor_properties::MuxedMessage;
19use super::messages::Signal;
20use super::messages::StopMessage;
21use super::SupervisionEvent;
22use crate::actor::actor_properties::ActorProperties;
23use crate::concurrency::JoinHandle;
24use crate::concurrency::MpscUnboundedReceiver as InputPortReceiver;
25use crate::concurrency::OneshotReceiver;
26use crate::errors::MessagingErr;
27#[cfg(feature = "cluster")]
28use crate::message::SerializedMessage;
29use crate::Actor;
30use crate::ActorId;
31use crate::ActorName;
32use crate::ActorRef;
33use crate::Message;
34use crate::RactorErr;
35use crate::SpawnErr;
36
37/// [ActorStatus] represents the status of an actor's lifecycle
38#[derive(Debug, Clone, Eq, PartialEq, Copy, PartialOrd, Ord)]
39#[repr(u8)]
40pub enum ActorStatus {
41    /// Created, but not yet started
42    Unstarted = 0u8,
43    /// Starting
44    Starting = 1u8,
45    /// Executing (or waiting on messages)
46    Running = 2u8,
47    /// Upgrading
48    Upgrading = 3u8,
49    /// Draining
50    Draining = 4u8,
51    /// Stopping
52    Stopping = 5u8,
53    /// Dead
54    Stopped = 6u8,
55}
56
57/// Actor states where operations can continue to interact with an agent
58pub const ACTIVE_STATES: [ActorStatus; 3] = [
59    ActorStatus::Starting,
60    ActorStatus::Running,
61    ActorStatus::Upgrading,
62];
63
64/// The collection of ports an actor needs to listen to
65pub(crate) struct ActorPortSet {
66    /// The inner signal port
67    pub(crate) signal_rx: OneshotReceiver<Signal>,
68    /// The inner stop port
69    pub(crate) stop_rx: OneshotReceiver<StopMessage>,
70    /// The inner supervisor port
71    pub(crate) supervisor_rx: InputPortReceiver<SupervisionEvent>,
72    /// The inner message port
73    pub(crate) message_rx: InputPortReceiver<MuxedMessage>,
74}
75
76impl Drop for ActorPortSet {
77    fn drop(&mut self) {
78        // Close all the message ports and flush all the message queue backlogs.
79        // See: https://docs.rs/tokio/0.1.22/tokio/sync/mpsc/index.html#clean-shutdown
80        self.signal_rx.close();
81        self.stop_rx.close();
82        self.supervisor_rx.close();
83        self.message_rx.close();
84
85        while self.signal_rx.try_recv().is_ok() {}
86        while self.stop_rx.try_recv().is_ok() {}
87        while self.supervisor_rx.try_recv().is_ok() {}
88        while self.message_rx.try_recv().is_ok() {}
89    }
90}
91
92/// Messages that come in off an actor's port, with associated priority
93pub(crate) enum ActorPortMessage {
94    /// A signal message
95    Signal(Signal),
96    /// A stop message
97    Stop(StopMessage),
98    /// A supervision message
99    Supervision(SupervisionEvent),
100    /// A regular message
101    Message(MuxedMessage),
102}
103
104impl ActorPortSet {
105    /// Run a future beside the signal port, so that
106    /// the signal port can terminate the async work
107    ///
108    /// * `future` - The future to execute
109    ///
110    /// Returns [Ok(`TState`)] when the future completes without
111    /// signal interruption, [Err(Signal)] in the event the
112    /// signal interrupts the async work.
113    pub(crate) async fn run_with_signal<TState>(
114        &mut self,
115        future: impl std::future::Future<Output = TState>,
116    ) -> Result<TState, Signal>
117    where
118        TState: crate::State,
119    {
120        #[cfg(feature = "async-std")]
121        {
122            crate::concurrency::select! {
123                // supervision or message processing work
124                // can be interrupted by the signal port receiving
125                // a kill signal
126                signal = (&mut self.signal_rx).fuse() => {
127                    Err(signal.unwrap_or(Signal::Kill))
128                }
129                new_state = future.fuse() => {
130                    Ok(new_state)
131                }
132            }
133        }
134        #[cfg(not(feature = "async-std"))]
135        {
136            crate::concurrency::select! {
137                // supervision or message processing work
138                // can be interrupted by the signal port receiving
139                // a kill signal
140                signal = &mut self.signal_rx => {
141                    Err(signal.unwrap_or(Signal::Kill))
142                }
143                new_state = future => {
144                    Ok(new_state)
145                }
146            }
147        }
148    }
149
150    /// List to the input ports in priority. The priority of listening for messages is
151    /// 1. Signal port
152    /// 2. Stop port
153    /// 3. Supervision message port
154    /// 4. General message port
155    ///
156    /// Returns [Ok(ActorPortMessage)] on a successful message reception, [MessagingErr]
157    /// in the event any of the channels is closed.
158    pub(crate) async fn listen_in_priority(
159        &mut self,
160    ) -> Result<ActorPortMessage, MessagingErr<()>> {
161        #[cfg(feature = "async-std")]
162        {
163            crate::concurrency::select! {
164                signal = (&mut self.signal_rx).fuse() => {
165                    signal.map(ActorPortMessage::Signal).map_err(|_| MessagingErr::ChannelClosed)
166                }
167                stop = (&mut self.stop_rx).fuse() => {
168                    stop.map(ActorPortMessage::Stop).map_err(|_| MessagingErr::ChannelClosed)
169                }
170                supervision = self.supervisor_rx.recv().fuse() => {
171                    supervision.map(ActorPortMessage::Supervision).ok_or(MessagingErr::ChannelClosed)
172                }
173                message = self.message_rx.recv().fuse() => {
174                    message.map(ActorPortMessage::Message).ok_or(MessagingErr::ChannelClosed)
175                }
176            }
177        }
178        #[cfg(not(feature = "async-std"))]
179        {
180            crate::concurrency::select! {
181                signal = &mut self.signal_rx => {
182                    signal.map(ActorPortMessage::Signal).map_err(|_| MessagingErr::ChannelClosed)
183                }
184                stop = &mut self.stop_rx => {
185                    stop.map(ActorPortMessage::Stop).map_err(|_| MessagingErr::ChannelClosed)
186                }
187                supervision = self.supervisor_rx.recv() => {
188                    supervision.map(ActorPortMessage::Supervision).ok_or(MessagingErr::ChannelClosed)
189                }
190                message = self.message_rx.recv() => {
191                    message.map(ActorPortMessage::Message).ok_or(MessagingErr::ChannelClosed)
192                }
193            }
194        }
195    }
196}
197
198/// An [ActorCell] is a reference to an [Actor]'s communication channels
199/// and provides external access to send messages, stop, kill, and generally
200/// interactor with the underlying [Actor] process.
201///
202/// The input ports contained in the cell will return an error should the
203/// underlying actor have terminated and no longer exist.
204#[derive(Clone)]
205pub struct ActorCell {
206    pub(crate) inner: Arc<ActorProperties>,
207}
208
209impl std::fmt::Debug for ActorCell {
210    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
211        f.debug_struct("Actor")
212            .field("name", &self.get_name())
213            .field("id", &self.get_id())
214            .finish()
215    }
216}
217
218impl PartialEq for ActorCell {
219    fn eq(&self, other: &Self) -> bool {
220        other.get_id() == self.get_id()
221    }
222}
223
224impl Eq for ActorCell {}
225
226impl std::hash::Hash for ActorCell {
227    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
228        self.get_id().hash(state)
229    }
230}
231
232impl ActorCell {
233    /// Construct a new [ActorCell] pointing to an [super::Actor] and return the message reception channels as a [ActorPortSet]
234    ///
235    /// * `name` - Optional name for the actor
236    ///
237    /// Returns a tuple [(ActorCell, ActorPortSet)] to bootstrap the [crate::Actor]
238    pub(crate) fn new<TActor>(name: Option<ActorName>) -> Result<(Self, ActorPortSet), SpawnErr>
239    where
240        TActor: Actor,
241    {
242        let (props, rx1, rx2, rx3, rx4) = ActorProperties::new::<TActor>(name.clone());
243        let cell = Self {
244            inner: Arc::new(props),
245        };
246
247        #[cfg(feature = "cluster")]
248        {
249            // registry to the PID registry
250            crate::registry::pid_registry::register_pid(cell.get_id(), cell.clone())?;
251        }
252
253        if let Some(r_name) = name {
254            crate::registry::register(r_name, cell.clone())?;
255        }
256
257        Ok((
258            cell,
259            ActorPortSet {
260                signal_rx: rx1,
261                stop_rx: rx2,
262                supervisor_rx: rx3,
263                message_rx: rx4,
264            },
265        ))
266    }
267
268    /// Create a new remote actor, to be called from the `ractor_cluster` crate
269    #[cfg(feature = "cluster")]
270    pub(crate) fn new_remote<TActor>(
271        name: Option<ActorName>,
272        id: ActorId,
273    ) -> Result<(Self, ActorPortSet), SpawnErr>
274    where
275        TActor: Actor,
276    {
277        if id.is_local() {
278            return Err(SpawnErr::StartupFailed(From::from("Cannot create a new remote actor handler without the actor id being marked as a remote actor!")));
279        }
280
281        let (props, rx1, rx2, rx3, rx4) = ActorProperties::new_remote::<TActor>(name, id);
282        let cell = Self {
283            inner: Arc::new(props),
284        };
285        // NOTE: remote actors don't appear in the name registry
286        // if let Some(r_name) = name {
287        //     crate::registry::register(r_name, cell.clone())?;
288        // }
289        Ok((
290            cell,
291            ActorPortSet {
292                signal_rx: rx1,
293                stop_rx: rx2,
294                supervisor_rx: rx3,
295                message_rx: rx4,
296            },
297        ))
298    }
299
300    /// Retrieve the [super::Actor]'s unique identifier [ActorId]
301    pub fn get_id(&self) -> ActorId {
302        self.inner.id
303    }
304
305    /// Retrieve the [super::Actor]'s name
306    pub fn get_name(&self) -> Option<ActorName> {
307        self.inner.name.clone()
308    }
309
310    /// Retrieve the current status of an [super::Actor]
311    ///
312    /// Returns the [super::Actor]'s current [ActorStatus]
313    pub fn get_status(&self) -> ActorStatus {
314        self.inner.get_status()
315    }
316
317    /// Identifies if this actor supports remote (dist) communication
318    ///
319    /// Returns [true] if the actor's messaging protocols support remote calls, [false] otherwise
320    #[cfg(feature = "cluster")]
321    pub fn supports_remoting(&self) -> bool {
322        self.inner.supports_remoting
323    }
324
325    /// Set the status of the [super::Actor]. If the status is set to
326    /// [ActorStatus::Stopping] or [ActorStatus::Stopped] the actor
327    /// will also be unenrolled from both the named registry ([crate::registry])
328    /// and the PG groups ([crate::pg]) if it's enrolled in any
329    ///
330    /// * `status` - The [ActorStatus] to set
331    pub(crate) fn set_status(&self, status: ActorStatus) {
332        // The actor is shut down — only run cleanup once, on the first transition
333        // to Stopping. This avoids redundant full-DashMap iterations on the
334        // Stopping → Stopped transition.
335        if (status == ActorStatus::Stopped || status == ActorStatus::Stopping)
336            && self.get_status() < ActorStatus::Stopping
337        {
338            #[cfg(feature = "cluster")]
339            {
340                // stop monitoring for updates
341                crate::registry::pid_registry::demonitor(self.get_id());
342                // unregistry from the PID registry
343                crate::registry::pid_registry::unregister_pid(self.get_id());
344            }
345            // If it's enrolled in the registry, remove it
346            if let Some(name) = self.get_name() {
347                crate::registry::unregister(name);
348            }
349            // Leave all + stop monitoring pg groups (if any)
350            crate::pg::demonitor_all(self.get_id());
351            crate::pg::leave_all(self.get_id());
352        }
353
354        // Fix for #254. We should only notify the stop listener AFTER post_stop
355        // has executed, which is when the state gets set to `Stopped`.
356        if status == ActorStatus::Stopped {
357            // notify whoever might be waiting on the stop signal
358            self.inner.notify_stop_listener();
359        }
360
361        self.inner.set_status(status)
362    }
363
364    /// Terminate this [super::Actor] and all it's children
365    pub(crate) fn terminate(&self) {
366        // we don't need to notify of exit if we're already stopping or stopped
367        if self.get_status() as u8 <= ActorStatus::Upgrading as u8 {
368            // kill myself immediately. Ignores failures, as a failure means either
369            // 1. we're already dead or
370            // 2. the channel is full of "signals"
371            self.kill();
372        }
373
374        // notify children they should die. They will unlink themselves from the supervisor
375        self.inner.tree.terminate_all_children();
376    }
377
378    /// Link this [super::Actor] to the provided supervisor
379    ///
380    /// * `supervisor` - The supervisor [super::Actor] of this actor
381    pub fn link(&self, supervisor: ActorCell) {
382        supervisor.inner.tree.insert_child(self.clone());
383        self.inner.tree.set_supervisor(supervisor);
384    }
385
386    /// Unlink this [super::Actor] from the supervisor if it's
387    /// currently linked (if self's supervisor is `supervisor`)
388    ///
389    /// * `supervisor` - The supervisor to unlink this [super::Actor] from
390    pub fn unlink(&self, supervisor: ActorCell) {
391        if self.inner.tree.is_child_of(supervisor.get_id()) {
392            supervisor.inner.tree.remove_child(self.get_id());
393            self.inner.tree.clear_supervisor();
394        }
395    }
396
397    /// Clear the supervisor field
398    pub(crate) fn clear_supervisor(&self) {
399        self.inner.tree.clear_supervisor();
400    }
401
402    /// Monitor the provided [super::Actor] for supervision events. An actor in `ractor` can
403    /// only have a single supervisor, denoted by the `link` function, however they
404    /// may have multiple `monitors`. Monitor's receive copies of the [SupervisionEvent]s,
405    /// with non-cloneable information removed.
406    ///
407    /// * `who`: The actor to monitor
408    #[cfg(feature = "monitors")]
409    pub fn monitor(&self, who: ActorCell) {
410        who.inner.tree.set_monitor(self.clone());
411    }
412
413    /// Stop monitoring the provided [super::Actor] for supervision events.
414    ///
415    /// * `who`: The actor to stop monitoring
416    #[cfg(feature = "monitors")]
417    pub fn unmonitor(&self, who: ActorCell) {
418        who.inner.tree.remove_monitor(self.get_id());
419    }
420
421    /// Kill this [super::Actor] forcefully (terminates async work)
422    pub fn kill(&self) {
423        let _ = self.inner.send_signal(Signal::Kill);
424    }
425
426    /// Kill this [super::Actor] forcefully (terminates async work)
427    /// and wait for the actor shutdown to complete
428    ///
429    /// * `timeout` - An optional timeout duration to wait for shutdown to occur
430    ///
431    /// Returns [Ok(())] upon the actor being stopped/shutdown. [Err(RactorErr::Messaging(_))] if the channel is closed
432    /// or dropped (which may indicate some other process is trying to shutdown this actor) or [Err(RactorErr::Timeout)]
433    /// if timeout was hit before the actor was successfully shut down (when set)
434    pub async fn kill_and_wait(
435        &self,
436        timeout: Option<crate::concurrency::Duration>,
437    ) -> Result<(), RactorErr<()>> {
438        if let Some(to) = timeout {
439            match crate::concurrency::timeout(to, self.inner.send_signal_and_wait(Signal::Kill))
440                .await
441            {
442                Err(_) => Err(RactorErr::Timeout),
443                Ok(Err(e)) => Err(e.into()),
444                Ok(_) => Ok(()),
445            }
446        } else {
447            Ok(self.inner.send_signal_and_wait(Signal::Kill).await?)
448        }
449    }
450
451    /// Stop this [super::Actor] gracefully (stopping message processing)
452    ///
453    /// * `reason` - An optional string reason why the stop is occurring
454    pub fn stop(&self, reason: Option<String>) {
455        // ignore failures, since that means the actor is dead already
456        let _ = self.inner.send_stop(reason);
457    }
458
459    /// Stop the [super::Actor] gracefully (stopping messaging processing)
460    /// and wait for the actor shutdown to complete
461    ///
462    /// * `reason` - An optional string reason why the stop is occurring
463    /// * `timeout` - An optional timeout duration to wait for shutdown to occur
464    ///
465    /// Returns [Ok(())] upon the actor being stopped/shutdown. [Err(RactorErr::Messaging(_))] if the channel is closed
466    /// or dropped (which may indicate some other process is trying to shutdown this actor) or [Err(RactorErr::Timeout)]
467    /// if timeout was hit before the actor was successfully shut down (when set)
468    pub async fn stop_and_wait(
469        &self,
470        reason: Option<String>,
471        timeout: Option<crate::concurrency::Duration>,
472    ) -> Result<(), RactorErr<StopMessage>> {
473        if let Some(to) = timeout {
474            match crate::concurrency::timeout(to, self.inner.send_stop_and_wait(reason)).await {
475                Err(_) => Err(RactorErr::Timeout),
476                Ok(Err(e)) => Err(e.into()),
477                Ok(_) => Ok(()),
478            }
479        } else {
480            Ok(self.inner.send_stop_and_wait(reason).await?)
481        }
482    }
483
484    /// Wait for the actor to exit, optionally within a timeout
485    ///
486    /// * `timeout`: If supplied, the amount of time to wait before
487    ///   returning an error and cancelling the wait future.
488    ///
489    /// IMPORTANT: If the timeout is hit, the actor is still running.
490    /// You should wait again for its exit.
491    pub async fn wait(
492        &self,
493        timeout: Option<crate::concurrency::Duration>,
494    ) -> Result<(), crate::concurrency::Timeout> {
495        if let Some(to) = timeout {
496            crate::concurrency::timeout(to, self.inner.wait()).await
497        } else {
498            self.inner.wait().await;
499            Ok(())
500        }
501    }
502
503    /// Send a supervisor event to the supervisory port
504    ///
505    /// * `message` - The [SupervisionEvent] to send to the supervisory port
506    ///
507    /// Returns [Ok(())] on successful message send, [Err(MessagingErr)] otherwise
508    pub(crate) fn send_supervisor_evt(
509        &self,
510        message: SupervisionEvent,
511    ) -> Result<(), MessagingErr<SupervisionEvent>> {
512        self.inner.send_supervisor_evt(message)
513    }
514
515    /// Send a strongly-typed message, constructing the boxed message on the fly
516    ///
517    /// Note: The type requirement of `TActor` assures that `TMsg` is the supported
518    /// message type for `TActor` such that we can't send boxed messages of an unsupported
519    /// type to the specified actor.
520    ///
521    /// * `message` - The message to send
522    ///
523    /// Returns [Ok(())] on successful message send, [Err(MessagingErr)] otherwise
524    pub fn send_message<TMessage>(&self, message: TMessage) -> Result<(), MessagingErr<TMessage>>
525    where
526        TMessage: Message,
527    {
528        self.inner.send_message::<TMessage>(message)
529    }
530
531    /// Drain the actor's message queue and when finished processing, terminate the actor.
532    ///
533    /// Any messages received after the drain marker but prior to shutdown will be rejected
534    pub fn drain(&self) -> Result<(), MessagingErr<()>> {
535        self.inner.drain()
536    }
537
538    /// Drain the actor's message queue and when finished processing, terminate the actor,
539    /// notifying on this handler that the actor has drained and exited (stopped).
540    ///
541    /// * `timeout`: The optional amount of time to wait for the drain to complete.
542    ///
543    /// Any messages received after the drain marker but prior to shutdown will be rejected
544    pub async fn drain_and_wait(
545        &self,
546        timeout: Option<crate::concurrency::Duration>,
547    ) -> Result<(), RactorErr<()>> {
548        if let Some(to) = timeout {
549            match crate::concurrency::timeout(to, self.inner.drain_and_wait()).await {
550                Err(_) => Err(RactorErr::Timeout),
551                Ok(Err(e)) => Err(e.into()),
552                Ok(_) => Ok(()),
553            }
554        } else {
555            Ok(self.inner.drain_and_wait().await?)
556        }
557    }
558
559    /// Send a serialized binary message to the actor.
560    ///
561    /// * `message` - The message to send
562    ///
563    /// Returns [Ok(())] on successful message send, [Err(MessagingErr)] otherwise
564    #[cfg(feature = "cluster")]
565    pub fn send_serialized(
566        &self,
567        message: SerializedMessage,
568    ) -> Result<(), Box<MessagingErr<SerializedMessage>>> {
569        self.inner.send_serialized(message)
570    }
571
572    /// Notify the supervisor and all monitors that a supervision event occurred.
573    /// Monitors receive a reduced copy of the supervision event which won't contain
574    /// the [crate::actor::BoxedState] and collapses the [crate::ActorProcessingErr]
575    /// exception to a [String]
576    ///
577    /// * `evt` - The event to send to this [super::Actor]'s supervisors
578    pub fn notify_supervisor(&self, evt: SupervisionEvent) {
579        self.inner.tree.notify_supervisor(evt)
580    }
581
582    /// Stop any children of this actor, not waiting for their exit, and threading
583    /// the optional reason to all children
584    ///
585    /// * `reason`: The stop reason to send to all the children
586    ///
587    /// This swallows and communication errors because if you can't send a message
588    /// to the child, it's dropped the message channel, and is dead/stopped already.
589    pub fn stop_children(&self, reason: Option<String>) {
590        self.inner.tree.stop_all_children(reason);
591    }
592
593    /// Tries to retrieve this actor's supervisor.
594    ///
595    /// Returns [None] if this actor has no supervisor at the given instance or
596    /// [Some(ActorCell)] supervisor if one is configured.
597    pub fn try_get_supervisor(&self) -> Option<ActorCell> {
598        self.inner.tree.try_get_supervisor()
599    }
600
601    /// Stop any children of this actor, and wait for their collective exit, optionally
602    /// threading the optional reason to all children
603    ///
604    /// * `reason`: The stop reason to send to all the children
605    /// * `timeout`: An optional timeout which is the maximum time to wait for the actor stop
606    ///   operation to complete
607    ///
608    /// This swallows and communication errors because if you can't send a message
609    /// to the child, it's dropped the message channel, and is dead/stopped already.
610    pub async fn stop_children_and_wait(
611        &self,
612        reason: Option<String>,
613        timeout: Option<crate::concurrency::Duration>,
614    ) {
615        self.inner
616            .tree
617            .stop_all_children_and_wait(reason, timeout)
618            .await
619    }
620
621    /// Drain any children of this actor, not waiting for their exit
622    ///
623    /// This swallows and communication errors because if you can't send a message
624    /// to the child, it's dropped the message channel, and is dead/stopped already.
625    pub fn drain_children(&self) {
626        self.inner.tree.drain_all_children();
627    }
628
629    /// Drain any children of this actor, and wait for their collective exit
630    ///
631    /// * `timeout`: An optional timeout which is the maximum time to wait for the actor stop
632    ///   operation to complete
633    pub async fn drain_children_and_wait(&self, timeout: Option<crate::concurrency::Duration>) {
634        self.inner.tree.drain_all_children_and_wait(timeout).await
635    }
636
637    /// Retrieve the supervised children of this actor (if any)
638    ///
639    /// Returns a [Vec] of [ActorCell]s which are the children that are
640    /// presently linked to this actor.
641    pub fn get_children(&self) -> Vec<ActorCell> {
642        self.inner.tree.get_children()
643    }
644
645    /// Retrieve the [TypeId] of this [ActorCell] which can be helpful
646    /// for quick type-checking.
647    ///
648    /// HOWEVER: Note this is an unstable identifier, and changes between
649    /// Rust releases and may not be stable over a network call.
650    pub fn get_type_id(&self) -> TypeId {
651        self.inner.type_id
652    }
653
654    /// Runtime check the message type of this actor, which only works for
655    /// local actors, as remote actors send serializable messages, and can't
656    /// have their message type runtime checked.
657    ///
658    /// Returns [None] if the actor is a remote actor, and we cannot perform a
659    /// runtime message type check. Otherwise [Some(true)] for the correct message
660    /// type or [Some(false)] for an incorrect type will returned.
661    pub fn is_message_type_of<TMessage: Message>(&self) -> Option<bool> {
662        if self.get_id().is_local() {
663            Some(self.get_type_id() == std::any::TypeId::of::<TMessage>())
664        } else {
665            None
666        }
667    }
668
669    /// Spawn an actor of the given type as a child of this actor, automatically starting the actor.
670    /// This [ActorCell] becomes the supervisor of the child actor.
671    ///
672    /// * `name`: A name to give the actor. Useful for global referencing or debug printing
673    /// * `handler` The implementation of Self
674    /// * `startup_args`: Arguments passed to the `pre_start` call of the [Actor] to facilitate startup and
675    ///   initial state creation
676    ///
677    /// Returns a [Ok((ActorRef, JoinHandle<()>))] upon successful start, denoting the actor reference
678    /// along with the join handle which will complete when the actor terminates. Returns [Err(SpawnErr)] if
679    /// the actor failed to start
680    pub async fn spawn_linked<T: Actor>(
681        &self,
682        name: Option<String>,
683        handler: T,
684        startup_args: T::Arguments,
685    ) -> Result<(ActorRef<T::Msg>, JoinHandle<()>), SpawnErr> {
686        crate::actor::ActorRuntime::spawn_linked(name, handler, startup_args, self.clone()).await
687    }
688
689    // ================== Test Utilities ================== //
690
691    #[cfg(test)]
692    pub(crate) fn get_num_children(&self) -> usize {
693        self.inner.tree.get_num_children()
694    }
695
696    #[cfg(test)]
697    pub(crate) fn get_num_parents(&self) -> usize {
698        self.inner.tree.get_num_parents()
699    }
700}