heph/actor_ref/
mod.rs

1//! Module containing actor references.
2//!
3//! An actor reference is a generic reference to an actor that can run on the
4//! same thread, another thread on the same node or even running remotely.
5//!
6//! # Sending messages
7//!
8//! The primary function of actor references is sending messages. This can be
9//! done by using the [`send`] or [`try_send`] methods. These methods don't
10//! block, even the remote actor reference, but the methods don't provided a lot
11//! of guarantees. It doesn't even guarantee the order in which the messages
12//! arrive. What [`send`] does is asynchronously add the message to the queue of
13//! messages for the actor.
14//!
15//! In case of thread-local actor reference this can be done directly. But for
16//! thread-safe actor references the message must first be send across thread
17//! bounds before being added to the actor's message queue. Remote actor
18//! references even need to send this message across a network, a lot can go
19//! wrong here.
20//!
21//! If guarantees are needed that a message is received or processed the
22//! receiving actor should send back an acknowledgment that the message is
23//! received and/or processed correctly.
24//!
25//! [`send`]: ActorRef::send
26//! [`try_send`]: ActorRef::try_send
27//!
28//! This example shows a simple actor that prints all the messages it receives.
29//!
30//! ```
31//! #![feature(never_type)]
32//!
33//! use heph::actor;
34//! use heph::supervisor::NoSupervisor;
35//! use heph_rt::spawn::ActorOptions;
36//! use heph_rt::{self as rt, Runtime, ThreadLocal};
37//!
38//! fn main() -> Result<(), rt::Error> {
39//!     let mut runtime = Runtime::new()?;
40//!     runtime.run_on_workers(|mut runtime_ref| -> Result<(), !> {
41//!         // Spawn the actor.
42//!         let new_actor = actor as fn(_) -> _;
43//!         let actor_ref = runtime_ref.spawn_local(NoSupervisor, new_actor, (), ActorOptions::default());
44//!
45//!         // Now we can use the actor reference to send the actor a message.
46//!         actor_ref.try_send("Hello world".to_owned()).unwrap();
47//!
48//!         Ok(())
49//!     })?;
50//!     runtime.start()
51//! }
52//!
53//! /// Our actor.
54//! async fn actor(mut ctx: actor::Context<String, ThreadLocal>) {
55//!     if let Ok(msg) = ctx.receive_next().await {
56//!         println!("got message: {}", msg);
57//!     }
58//! }
59//! ```
60//!
61//! # Sharing actor references
62//!
63//! All actor references can be cloned, which is the easiest way to share them.
64//!
65//! The example below shows how an actor reference is cloned to send a message
66//! to the same actor.
67//!
68//! ```
69//! #![feature(never_type)]
70//!
71//! use heph::actor;
72//! use heph::supervisor::NoSupervisor;
73//! use heph_rt::spawn::ActorOptions;
74//! use heph_rt::{self as rt, Runtime, ThreadLocal};
75//!
76//! fn main() -> Result<(), rt::Error> {
77//!     let mut runtime = Runtime::new()?;
78//!     runtime.run_on_workers(|mut runtime_ref| -> Result<(), !> {
79//!         let new_actor = actor as fn(_) -> _;
80//!         let actor_ref = runtime_ref.spawn_local(NoSupervisor, new_actor, (), ActorOptions::default());
81//!
82//!         // To create another actor reference we can simply clone the
83//!         // first one.
84//!         let second_actor_ref = actor_ref.clone();
85//!
86//!         // Now we can use both references to send a message.
87//!         actor_ref.try_send("Hello world".to_owned()).unwrap();
88//!         second_actor_ref.try_send("Bye world".to_owned()).unwrap();
89//!
90//!         Ok(())
91//!     })?;
92//!     runtime.start()
93//! }
94//!
95//! /// Our actor.
96//! async fn actor(mut ctx: actor::Context<String, ThreadLocal>) {
97//!     if let Ok(msg) = ctx.receive_next().await {
98//!         println!("First message: {}", msg);
99//!     }
100//!
101//!     if let Ok(msg) = ctx.receive_next().await {
102//!         println!("Second message: {}", msg);
103//!     }
104//! }
105//! ```
106
107use std::any::TypeId;
108use std::convert::TryFrom;
109use std::error::Error;
110use std::fmt;
111use std::future::Future;
112use std::iter::FromIterator;
113use std::pin::Pin;
114use std::sync::atomic::{AtomicUsize, Ordering};
115use std::sync::Arc;
116use std::task::{self, Poll};
117
118use heph_inbox::{self as inbox, Sender};
119
120pub mod rpc;
121#[doc(no_inline)]
122pub use rpc::{Rpc, RpcError, RpcMessage, RpcResponse};
123
124/// Actor reference.
125///
126/// An actor reference reference can be used to send messages to an actor, for
127/// more details see the [module] documentation.
128///
129/// [module]: crate::actor_ref
130pub struct ActorRef<M> {
131    kind: ActorRefKind<M>,
132}
133
134enum ActorRefKind<M> {
135    /// Reference to an actor running on the same node.
136    Local(Sender<M>),
137    /// Reference that attempts to map the message to a different type first.
138    Mapped(Arc<dyn MappedActorRef<M>>),
139}
140
141// We know that the `Local` variant is `Send` and `Sync`. Since the `Mapped`
142// variant is a boxed version of `Local` so is that variant. This makes the
143// entire `ActorRefKind` `Send` and `Sync`, as long as `M` is `Send` (as we
144// could be sending the message across thread bounds).
145#[allow(clippy::non_send_fields_in_send_ty)]
146unsafe impl<M: Send> Send for ActorRefKind<M> {}
147unsafe impl<M: Send> Sync for ActorRefKind<M> {}
148
149// `ActorRefKind` is safe to move around independent of `M` as it's already heap
150// allocated.
151impl<M> Unpin for ActorRefKind<M> {}
152
153impl<M> ActorRef<M> {
154    /// Create a new `ActorRef` for an actor using `sender`.
155    #[doc(hidden)] // Not part of the stable API.
156    pub const fn local(sender: Sender<M>) -> ActorRef<M> {
157        ActorRef {
158            kind: ActorRefKind::Local(sender),
159        }
160    }
161
162    /// Send a message to the actor.
163    ///
164    /// See [Sending messages] and [`ActorRef::try_send`] for more details.
165    ///
166    /// [Sending messages]: index.html#sending-messages
167    ///
168    /// # Notes
169    ///
170    /// Mapped actor references, see [`ActorRef::map`] and
171    /// [`ActorRef::try_map`], require an allocation and might be expensive. If
172    /// possible try [`ActorRef::try_send`] first, which does not require an
173    /// allocation. Regular (i.e. non-mapped) actor references do not require an
174    /// allocation.
175    pub fn send<'r, Msg>(&'r self, msg: Msg) -> SendValue<'r, M>
176    where
177        Msg: Into<M>,
178    {
179        use ActorRefKind::*;
180        let msg = msg.into();
181        SendValue {
182            kind: match &self.kind {
183                Local(sender) => SendValueKind::Local(sender.send(msg)),
184                Mapped(actor_ref) => SendValueKind::Mapped(actor_ref.mapped_send(msg)),
185            },
186        }
187    }
188
189    /// Attempt to send a message to the actor.
190    ///
191    /// Some types of actor references can detect errors in sending a message,
192    /// however not all actor references can. This means that even if this
193    /// methods returns `Ok` it does **not** mean that the message is guaranteed
194    /// to be delivered to or handled by the actor.
195    ///
196    /// See [Sending messages] for more details.
197    ///
198    /// [Sending messages]: index.html#sending-messages
199    pub fn try_send<Msg>(&self, msg: Msg) -> Result<(), SendError>
200    where
201        Msg: Into<M>,
202    {
203        use ActorRefKind::*;
204        #[cfg(any(test, feature = "test"))]
205        if crate::test::should_lose_msg() {
206            log::debug!("dropping message on purpose");
207            return Ok(());
208        }
209
210        let msg = msg.into();
211        match &self.kind {
212            Local(sender) => sender.try_send(msg).map_err(|_| SendError),
213            Mapped(actor_ref) => actor_ref.try_mapped_send(msg),
214        }
215    }
216
217    /// Make a Remote Procedure Call (RPC).
218    ///
219    /// This will send the `request` to the actor and returns a [`Rpc`]
220    /// [`Future`] that will return a response (of type `Res`), or an error if
221    /// the receiving actor didn't respond.
222    ///
223    /// See the [`rpc`] module for more details.
224    ///
225    /// [`Future`]: std::future::Future
226    pub fn rpc<'r, Req, Res>(&'r self, request: Req) -> Rpc<'r, M, Res>
227    where
228        M: From<RpcMessage<Req, Res>>,
229    {
230        Rpc::new(self, request)
231    }
232
233    /// Change the message type of the actor reference.
234    ///
235    /// Before sending the message this will first change the message into a
236    /// different type. This is useful when you need to send to different types
237    /// of actors (using different message types) from a central location.
238    ///
239    /// # Notes
240    ///
241    /// This conversion is **not** cheap, it requires an allocation so use with
242    /// caution when it comes to performance sensitive code.
243    ///
244    /// Prefer to clone an existing mapped `ActorRef` over creating a new one as
245    /// that can reuse the allocation mentioned above.
246    pub fn map<Msg>(self) -> ActorRef<Msg>
247    where
248        M: From<Msg> + 'static,
249        Msg: 'static,
250    {
251        // There is a blanket implementation for `TryFrom` for `T: From` so we
252        // can use the `TryFrom` knowning that it will never return an error.
253        self.try_map()
254    }
255
256    /// Much like [`map`], but uses the [`TryFrom`] trait.
257    ///
258    /// This creates a new actor reference that attempts to map from one message
259    /// type to another before sending. This is useful when you need to send to
260    /// different types of actors from a central location.
261    ///
262    /// [`map`]: ActorRef::map
263    ///
264    /// # Notes
265    ///
266    /// Errors converting from one message type to another are turned into
267    /// [`SendError`]s.
268    ///
269    /// This conversion is **not** cheap, it requires an allocation so use with
270    /// caution when it comes to performance sensitive code.
271    ///
272    /// Prefer to clone an existing mapped `ActorRef` over creating a new one as
273    /// that can reuse the allocation mentioned above.
274    pub fn try_map<Msg>(self) -> ActorRef<Msg>
275    where
276        M: TryFrom<Msg> + 'static,
277        Msg: 'static,
278    {
279        if TypeId::of::<ActorRef<M>>() == TypeId::of::<ActorRef<Msg>>() {
280            // Safety: If `M` == `Msg`, then the following `transmute` is a
281            // no-op and thus safe.
282            unsafe { std::mem::transmute(self) }
283        } else {
284            ActorRef {
285                kind: ActorRefKind::Mapped(Arc::new(self)),
286            }
287        }
288    }
289
290    /// Change the message type of the actor reference.
291    ///
292    /// Before sending the message this will first change the message into a
293    /// different type using the `map`ping function `F`. This is useful when you
294    /// need to send to different types of actors (using different message
295    /// types) from a central location.
296    ///
297    /// # Notes
298    ///
299    /// This conversion is **not** cheap, it requires an allocation so use with
300    /// caution when it comes to performance sensitive code.
301    ///
302    /// Prefer to clone an existing mapped `ActorRef` over creating a new one as
303    /// that can reuse the allocation mentioned above.
304    pub fn map_fn<Msg, F>(self, map: F) -> ActorRef<Msg>
305    where
306        F: Fn(Msg) -> M + 'static,
307        M: 'static,
308    {
309        self.try_map_fn::<Msg, _, !>(move |msg| Ok(map(msg)))
310    }
311
312    /// Change the message type of the actor reference.
313    ///
314    /// Before sending the message this will first change the message into a
315    /// different type using the `map`ping function `F`. This is useful when you
316    /// need to send to different types of actors (using different message
317    /// types) from a central location.
318    ///
319    /// # Notes
320    ///
321    /// This conversion is **not** cheap, it requires an allocation so use with
322    /// caution when it comes to performance sensitive code.
323    ///
324    /// Prefer to clone an existing mapped `ActorRef` over creating a new one as
325    /// that can reuse the allocation mentioned above.
326    pub fn try_map_fn<Msg, F, E>(self, map: F) -> ActorRef<Msg>
327    where
328        F: Fn(Msg) -> Result<M, E> + 'static,
329        M: 'static,
330    {
331        let mapped_ref = MappedActorRefFn {
332            actor_ref: self,
333            map,
334        };
335        ActorRef {
336            kind: ActorRefKind::Mapped(Arc::new(mapped_ref)),
337        }
338    }
339
340    /// Returns a [`Future`] that waits until the actor finishes running. Acts
341    /// similar to a [`JoinHandle`] of a thread.
342    ///
343    /// [disconnected]: ActorRef::is_connected
344    /// [`JoinHandle`]: std::thread::JoinHandle
345    pub fn join<'r>(&'r self) -> Join<'r, M> {
346        use ActorRefKind::*;
347        Join {
348            kind: match &self.kind {
349                Local(sender) => JoinKind::Local(sender.join()),
350                Mapped(actor_ref) => JoinKind::Mapped(actor_ref.mapped_join()),
351            },
352        }
353    }
354
355    /// Returns `true` if the actor to which this reference sends to is still
356    /// connected.
357    ///
358    /// # Notes
359    ///
360    /// Even if this returns `true` it doesn't mean [`ActorRef::try_send`] will
361    /// succeeded (even if the inbox isn't full). There is always a race
362    /// condition between using this method and doing something based on it.
363    ///
364    /// This does provide one useful feature: once this returns `false` it will
365    /// never return `true` again. This makes it useful in the use case where
366    /// creating a message is expansive, which is wasted if the actor is no
367    /// longer running. Thus this should only be used as optimisation to not do
368    /// work.
369    pub fn is_connected(&self) -> bool {
370        use ActorRefKind::*;
371        match &self.kind {
372            Local(sender) => sender.is_connected(),
373            Mapped(actor_ref) => actor_ref.is_connected(),
374        }
375    }
376
377    /// Returns true if `self` and `other` send messages to the same actor.
378    pub fn sends_to<Msg>(&self, other: &ActorRef<Msg>) -> bool {
379        self.id() == other.id()
380    }
381
382    fn id(&self) -> inbox::Id {
383        use ActorRefKind::*;
384        match &self.kind {
385            Local(sender) => sender.id(),
386            Mapped(actor_ref) => actor_ref.id(),
387        }
388    }
389}
390
391impl<M> Clone for ActorRef<M> {
392    fn clone(&self) -> ActorRef<M> {
393        use ActorRefKind::*;
394        ActorRef {
395            kind: match &self.kind {
396                Local(sender) => Local(sender.clone()),
397                Mapped(actor_ref) => Mapped(actor_ref.clone()),
398            },
399        }
400    }
401}
402
403impl<M> fmt::Debug for ActorRef<M> {
404    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
405        f.write_str("ActorRef")
406    }
407}
408
409/// Trait to erase the original message type of the actor reference.
410///
411/// # Notes
412///
413/// For correctness this may only be implemented on [`ActorRef`].
414trait MappedActorRef<M> {
415    /// Same as [`ActorRef::try_send`] but converts the message first.
416    fn try_mapped_send(&self, msg: M) -> Result<(), SendError>;
417
418    fn mapped_send<'r>(&'r self, msg: M) -> MappedSendValue<'r>;
419
420    fn mapped_join<'r>(&'r self) -> MappedJoin<'r>;
421
422    fn is_connected(&self) -> bool;
423
424    fn id(&self) -> inbox::Id;
425}
426
427impl<M, Msg> MappedActorRef<Msg> for ActorRef<M>
428where
429    M: TryFrom<Msg>,
430{
431    fn try_mapped_send(&self, msg: Msg) -> Result<(), SendError> {
432        M::try_from(msg)
433            .map_err(|_| SendError)
434            .and_then(|msg| self.try_send(msg))
435    }
436
437    fn mapped_send<'r>(&'r self, msg: Msg) -> MappedSendValue<'r> {
438        match M::try_from(msg) {
439            Ok(msg) => match &self.kind {
440                ActorRefKind::Local(sender) => match sender.try_send(msg) {
441                    Ok(()) => MappedSendValue::Send,
442                    Err(heph_inbox::SendError::Full(msg)) => {
443                        MappedSendValue::Sending(Box::pin(self.send(msg)))
444                    }
445                    Err(heph_inbox::SendError::Disconnected(_)) => MappedSendValue::SendErr,
446                },
447                ActorRefKind::Mapped(sender) => sender.mapped_send(msg),
448            },
449            Err(..) => MappedSendValue::SendErr,
450        }
451    }
452
453    fn mapped_join<'r>(&'r self) -> MappedJoin<'r> {
454        match &self.kind {
455            ActorRefKind::Local(sender) => match sender.is_connected() {
456                false => MappedJoin::Disconnected,
457                true => MappedJoin::Join(Box::pin(self.join())),
458            },
459            ActorRefKind::Mapped(sender) => sender.mapped_join(),
460        }
461    }
462
463    fn is_connected(&self) -> bool {
464        self.is_connected()
465    }
466
467    fn id(&self) -> inbox::Id {
468        self.id()
469    }
470}
471
472/// Wrapper around an [`ActorRef`] to change the message type.
473struct MappedActorRefFn<M, F> {
474    actor_ref: ActorRef<M>,
475    map: F,
476}
477
478impl<M, Msg, F, E> MappedActorRef<Msg> for MappedActorRefFn<M, F>
479where
480    F: Fn(Msg) -> Result<M, E>,
481{
482    fn try_mapped_send(&self, msg: Msg) -> Result<(), SendError> {
483        match (self.map)(msg) {
484            Ok(msg) => self.actor_ref.try_send(msg),
485            Err(..) => Err(SendError),
486        }
487    }
488
489    fn mapped_send<'r>(&'r self, msg: Msg) -> MappedSendValue<'r> {
490        match (self.map)(msg) {
491            Ok(msg) => match &self.actor_ref.kind {
492                ActorRefKind::Local(sender) => match sender.try_send(msg) {
493                    Ok(()) => MappedSendValue::Send,
494                    Err(heph_inbox::SendError::Full(msg)) => {
495                        MappedSendValue::Sending(Box::pin(self.actor_ref.send(msg)))
496                    }
497                    Err(heph_inbox::SendError::Disconnected(_)) => MappedSendValue::SendErr,
498                },
499                ActorRefKind::Mapped(sender) => sender.mapped_send(msg),
500            },
501            Err(..) => MappedSendValue::SendErr,
502        }
503    }
504
505    fn mapped_join<'r>(&'r self) -> MappedJoin<'r> {
506        match &self.actor_ref.kind {
507            ActorRefKind::Local(sender) => match sender.is_connected() {
508                false => MappedJoin::Disconnected,
509                true => MappedJoin::Join(Box::pin(self.actor_ref.join())),
510            },
511            ActorRefKind::Mapped(sender) => sender.mapped_join(),
512        }
513    }
514
515    fn is_connected(&self) -> bool {
516        self.actor_ref.is_connected()
517    }
518
519    fn id(&self) -> inbox::Id {
520        self.actor_ref.id()
521    }
522}
523
524/// Future used in `MappedActorRef::mapped_send`
525enum MappedSendValue<'r> {
526    /// Already send.
527    Send,
528    /// Error mapping or sending the message.
529    SendErr,
530    /// Sending in progress.
531    /// NOTE: we need a `Box` to erase the mapped message type.
532    Sending(Pin<Box<dyn Future<Output = Result<(), SendError>> + 'r>>),
533}
534
535impl<'r> Future for MappedSendValue<'r> {
536    type Output = Result<(), SendError>;
537
538    #[track_caller]
539    fn poll(self: Pin<&mut Self>, ctx: &mut task::Context<'_>) -> Poll<Self::Output> {
540        use MappedSendValue::*;
541        match self.get_mut() {
542            Send => Poll::Ready(Ok(())),
543            SendErr => Poll::Ready(Err(SendError)),
544            Sending(send_value) => send_value.as_mut().poll(ctx),
545        }
546    }
547}
548
549/// Future used in `MappedActorRef::mapped_join`
550enum MappedJoin<'r> {
551    Disconnected,
552    /// NOTE: we need a `Box` to erase the mapped message type.
553    Join(Pin<Box<dyn Future<Output = ()> + 'r>>),
554}
555
556impl<'r> Future for MappedJoin<'r> {
557    type Output = ();
558
559    #[track_caller]
560    fn poll(self: Pin<&mut Self>, ctx: &mut task::Context<'_>) -> Poll<Self::Output> {
561        use MappedJoin::*;
562        match self.get_mut() {
563            Disconnected => Poll::Ready(()),
564            Join(join) => join.as_mut().poll(ctx),
565        }
566    }
567}
568
569/// [`Future`] behind [`ActorRef::send`].
570#[must_use = "futures do nothing unless you `.await` or poll them"]
571pub struct SendValue<'r, M> {
572    kind: SendValueKind<'r, M>,
573}
574
575enum SendValueKind<'r, M> {
576    Local(inbox::SendValue<'r, M>),
577    Mapped(MappedSendValue<'r>),
578}
579
580// We know that the `Local` variant is `Send` and `Sync`. Since the `Mapped`
581// variant is a boxed version of `Local` so is that variant. This makes the
582// entire `SendValueKind` `Send` and `Sync`, as long as `M` is `Send` (as we
583// could be sending the message across thread bounds).
584#[allow(clippy::non_send_fields_in_send_ty)]
585unsafe impl<'r, M: Send> Send for SendValueKind<'r, M> {}
586unsafe impl<'r, M: Send> Sync for SendValueKind<'r, M> {}
587
588impl<'r, M> Future for SendValue<'r, M> {
589    type Output = Result<(), SendError>;
590
591    #[track_caller]
592    fn poll(self: Pin<&mut Self>, ctx: &mut task::Context<'_>) -> Poll<Self::Output> {
593        use SendValueKind::*;
594        #[cfg(any(test, feature = "test"))]
595        if crate::test::should_lose_msg() {
596            log::debug!("dropping message on purpose");
597            return Poll::Ready(Ok(()));
598        }
599
600        // Safety: we're not moving the future to this is safe.
601        let this = unsafe { self.get_unchecked_mut() };
602        match &mut this.kind {
603            // Safety: we're not moving `send_value` so this is safe.
604            Local(fut) => unsafe { Pin::new_unchecked(fut) }
605                .poll(ctx)
606                .map_err(|_| SendError),
607            // Safety: we're not moving `fut` so this is safe.
608            Mapped(fut) => unsafe { Pin::new_unchecked(fut) }.poll(ctx),
609        }
610    }
611}
612
613impl<'r, M> fmt::Debug for SendValue<'r, M> {
614    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
615        f.write_str("SendValue")
616    }
617}
618
619/// Error returned when sending a message fails.
620///
621/// The reason why the sending of the message failed is unspecified.
622#[derive(Copy, Clone, Debug, Eq, PartialEq)]
623pub struct SendError;
624
625impl fmt::Display for SendError {
626    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
627        f.write_str("unable to send message")
628    }
629}
630
631impl Error for SendError {}
632
633/// [`Future`] behind [`ActorRef::join`].
634#[must_use = "futures do nothing unless you `.await` or poll them"]
635pub struct Join<'r, M> {
636    kind: JoinKind<'r, M>,
637}
638
639enum JoinKind<'r, M> {
640    Local(inbox::Join<'r, M>),
641    Mapped(MappedJoin<'r>),
642}
643
644// We know that the `Local` variant is `Send` and `Sync`. Since the `Mapped`
645// variant is a boxed version of `Local` so is that variant. This makes the
646// entire `JoinKind` `Send` and `Sync`, as long as `M` is `Send` (as we could be
647// sending the message across thread bounds).
648#[allow(clippy::non_send_fields_in_send_ty)]
649unsafe impl<'r, M: Send> Send for JoinKind<'r, M> {}
650unsafe impl<'r, M: Send> Sync for JoinKind<'r, M> {}
651
652impl<'r, M> Future for Join<'r, M> {
653    type Output = ();
654
655    #[track_caller]
656    fn poll(self: Pin<&mut Self>, ctx: &mut task::Context<'_>) -> Poll<Self::Output> {
657        use JoinKind::*;
658
659        // Safety: we're not moving the future to this is safe.
660        let this = unsafe { self.get_unchecked_mut() };
661        match &mut this.kind {
662            // Safety: we're not moving `fut` so this is safe.
663            Local(fut) => unsafe { Pin::new_unchecked(fut) }.poll(ctx),
664            // Safety: we're not moving `fut` so this is safe.
665            Mapped(fut) => unsafe { Pin::new_unchecked(fut) }.poll(ctx),
666        }
667    }
668}
669
670impl<'r, M> fmt::Debug for Join<'r, M> {
671    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
672        f.write_str("Join")
673    }
674}
675
676/// A group of [`ActorRef`]s used to send a message to multiple actors.
677///
678/// # Notes
679///
680/// Unlike [`ActorRef`] this is **not** cheap to clone as it's requires a clone
681/// of an internal vector.
682pub struct ActorGroup<M> {
683    actor_refs: Vec<ActorRef<M>>,
684    /// Index of the actor reference to send the [single delivery] message to.
685    /// Using relaxed ordering on this field is fine because we make no
686    /// guarantees about to which actor the message will be delivered. E.g. we
687    /// could always send to the first actor in the group and still fulfill the
688    /// contract.
689    ///
690    /// [single delivery]: Delivery::ToOne
691    send_next: AtomicUsize,
692}
693
694/// The kind of delivery to use in [`ActorGroup::try_send`].
695#[derive(Copy, Clone, Debug)]
696pub enum Delivery {
697    /// Delivery a copy of the message to all actors in the group.
698    ToAll,
699    /// Delivery the message to one of the actors.
700    ToOne,
701}
702
703impl<M> ActorGroup<M> {
704    /// Creates an empty `ActorGroup`.
705    pub const fn empty() -> ActorGroup<M> {
706        ActorGroup {
707            actor_refs: Vec::new(),
708            send_next: AtomicUsize::new(0),
709        }
710    }
711
712    /// Create a new `ActorGroup`.
713    pub fn new<I>(actor_refs: I) -> ActorGroup<M>
714    where
715        I: IntoIterator<Item = ActorRef<M>>,
716    {
717        ActorGroup {
718            actor_refs: actor_refs.into_iter().collect(),
719            send_next: AtomicUsize::new(0),
720        }
721    }
722
723    /// Returns the number of actor references in the group.
724    pub fn len(&self) -> usize {
725        self.actor_refs.len()
726    }
727
728    /// Returns `true` if the group is empty.
729    pub fn is_empty(&self) -> bool {
730        self.actor_refs.is_empty()
731    }
732
733    /// Add an `ActorRef` to the group.
734    pub fn add(&mut self, actor_ref: ActorRef<M>) {
735        self.actor_refs.push(actor_ref)
736    }
737
738    /// Add an `ActorRef` to the group, iff it's not already in the group.
739    pub fn add_unique(&mut self, actor_ref: ActorRef<M>) {
740        let id = actor_ref.id();
741        for actor_ref in &self.actor_refs {
742            if actor_ref.id() == id {
743                return;
744            }
745        }
746        self.actor_refs.push(actor_ref)
747    }
748
749    /// Remove all actor references which point to the same actor as
750    /// `actor_ref`.
751    pub fn remove(&mut self, actor_ref: &ActorRef<M>) {
752        let id = actor_ref.id();
753        self.actor_refs.retain(|a| a.id() != id);
754    }
755
756    /// Remove all actor references that have been disconnected.
757    pub fn remove_disconnected(&mut self) {
758        self.actor_refs.retain(ActorRef::is_connected);
759    }
760
761    /// Make the group of actor references unique.
762    ///
763    /// Removes all duplicate actor references.
764    pub fn make_unique(&mut self) {
765        let mut i = 0;
766        while let Some(id) = self.actor_refs.get(i).map(ActorRef::id) {
767            let mut j = i + 1;
768            while let Some(other_id) = self.actor_refs.get(j).map(ActorRef::id) {
769                if id == other_id {
770                    // NOTE: don't update `j` as it's replaced with another
771                    // actor reference we haven't checked yet.
772                    drop(self.actor_refs.swap_remove(j));
773                } else {
774                    // Move the next actor reference.
775                    j += 1;
776                }
777            }
778            i += 1;
779        }
780    }
781
782    /// Attempts to send a message to all the actors in the group.
783    ///
784    /// This can either send the message to a single actor, by using
785    /// [`Delivery::ToOne`], or to all actors in the group by using
786    /// [`Delivery::ToAll`].
787    ///
788    /// When deliverying to all actors this will first `clone` the message and
789    /// then [`try_send`]ing it to each actor in the group. Note that this means
790    /// it will `clone` before calling [`Into::into`] on the message. If the
791    /// call to [`Into::into`] is expansive, or `M` is cheaper to clone than
792    /// `Msg` it might be worthwhile to call `msg.into()` before calling this
793    /// method.
794    ///
795    /// This only returns an error if the group is empty, otherwise this will
796    /// always return `Ok(())`.
797    ///
798    /// See [Sending messages] for more details.
799    ///
800    /// [`try_send`]: ActorRef::try_send
801    /// [Sending messages]: index.html#sending-messages
802    pub fn try_send<Msg>(&self, msg: Msg, delivery: Delivery) -> Result<(), SendError>
803    where
804        Msg: Into<M> + Clone,
805    {
806        if self.actor_refs.is_empty() {
807            return Err(SendError);
808        }
809
810        match delivery {
811            Delivery::ToAll => {
812                for actor_ref in &self.actor_refs {
813                    let _ = actor_ref.try_send(msg.clone());
814                }
815                Ok(())
816            }
817            Delivery::ToOne => {
818                // Safety: this needs to sync itself.
819                // NOTE: this wraps around on overflow.
820                let idx = self.send_next.fetch_add(1, Ordering::AcqRel) % self.actor_refs.len();
821                let actor_ref = &self.actor_refs[idx];
822                // TODO: try to send it to another actor on send failure?
823                actor_ref.try_send(msg)
824            }
825        }
826    }
827
828    /// Wait for all actors in this group to finish running.
829    ///
830    /// This works the same way as [`ActorRef::join`], but waits on a group of
831    /// actors.
832    pub fn join_all<'r>(&'r self) -> JoinAll<'r, M> {
833        JoinAll {
834            actor_refs: &self.actor_refs,
835            join: None,
836        }
837    }
838}
839
840impl<M> From<ActorRef<M>> for ActorGroup<M> {
841    fn from(actor_ref: ActorRef<M>) -> ActorGroup<M> {
842        ActorGroup {
843            actor_refs: vec![actor_ref],
844            send_next: AtomicUsize::new(0),
845        }
846    }
847}
848
849impl<M> FromIterator<ActorRef<M>> for ActorGroup<M> {
850    fn from_iter<I>(iter: I) -> Self
851    where
852        I: IntoIterator<Item = ActorRef<M>>,
853    {
854        ActorGroup::new(iter)
855    }
856}
857
858impl<M> Extend<ActorRef<M>> for ActorGroup<M> {
859    fn extend<I>(&mut self, iter: I)
860    where
861        I: IntoIterator<Item = ActorRef<M>>,
862    {
863        self.actor_refs.extend(iter);
864    }
865}
866
867impl<M> Clone for ActorGroup<M> {
868    fn clone(&self) -> ActorGroup<M> {
869        ActorGroup {
870            actor_refs: self.actor_refs.clone(),
871            send_next: AtomicUsize::new(0),
872        }
873    }
874
875    fn clone_from(&mut self, source: &Self) {
876        self.actor_refs.clone_from(&source.actor_refs);
877        *self.send_next.get_mut() = 0
878    }
879}
880
881impl<M> fmt::Debug for ActorGroup<M> {
882    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
883        self.actor_refs.fmt(f)
884    }
885}
886
887/// [`Future`] behind [`ActorGroup::join_all`].
888#[must_use = "futures do nothing unless you `.await` or poll them"]
889pub struct JoinAll<'r, M> {
890    actor_refs: &'r [ActorRef<M>],
891    join: Option<Join<'r, M>>,
892}
893
894impl<'r, M> Future for JoinAll<'r, M> {
895    type Output = ();
896
897    fn poll(mut self: Pin<&mut Self>, ctx: &mut task::Context<'_>) -> Poll<Self::Output> {
898        loop {
899            // Check the `Join` we're currently waiting on, if any.
900            if let Some(join) = self.join.as_mut() {
901                match Pin::new(join).poll(ctx) {
902                    Poll::Ready(()) => {
903                        drop(self.join.take());
904                        // Continue below.
905                    }
906                    Poll::Pending => return Poll::Pending,
907                }
908            }
909
910            // Remove all already disconnected actor references.
911            let mut remove = 0;
912            for actor_ref in self.actor_refs {
913                if actor_ref.is_connected() {
914                    break;
915                }
916                remove += 1;
917            }
918            self.actor_refs = &self.actor_refs[remove..];
919
920            if self.actor_refs.is_empty() {
921                // No more actors we have to wait for, so we're done.
922                return Poll::Ready(());
923            }
924
925            // Wait on the next actor.
926            self.join = Some(self.actor_refs[0].join());
927        }
928    }
929}
930
931impl<'r, M> fmt::Debug for JoinAll<'r, M> {
932    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
933        f.debug_struct("JoinAll")
934            .field("left", &self.actor_refs.len())
935            .finish()
936    }
937}