heph/actor_ref/mod.rs
1//! Module containing actor references.
2//!
3//! An actor reference is a generic reference to an actor that can run on the
4//! same thread, another thread on the same node or even running remotely.
5//!
6//! # Sending messages
7//!
8//! The primary function of actor references is sending messages. This can be
9//! done by using the [`send`] or [`try_send`] methods. These methods don't
10//! block, even the remote actor reference, but the methods don't provided a lot
11//! of guarantees. It doesn't even guarantee the order in which the messages
12//! arrive. What [`send`] does is asynchronously add the message to the queue of
13//! messages for the actor.
14//!
15//! In case of thread-local actor reference this can be done directly. But for
16//! thread-safe actor references the message must first be send across thread
17//! bounds before being added to the actor's message queue. Remote actor
18//! references even need to send this message across a network, a lot can go
19//! wrong here.
20//!
21//! If guarantees are needed that a message is received or processed the
22//! receiving actor should send back an acknowledgment that the message is
23//! received and/or processed correctly.
24//!
25//! [`send`]: ActorRef::send
26//! [`try_send`]: ActorRef::try_send
27//!
28//! This example shows a simple actor that prints all the messages it receives.
29//!
30//! ```
31//! #![feature(never_type)]
32//!
33//! use heph::actor;
34//! use heph::supervisor::NoSupervisor;
35//! use heph_rt::spawn::ActorOptions;
36//! use heph_rt::{self as rt, Runtime, ThreadLocal};
37//!
38//! fn main() -> Result<(), rt::Error> {
39//! let mut runtime = Runtime::new()?;
40//! runtime.run_on_workers(|mut runtime_ref| -> Result<(), !> {
41//! // Spawn the actor.
42//! let new_actor = actor as fn(_) -> _;
43//! let actor_ref = runtime_ref.spawn_local(NoSupervisor, new_actor, (), ActorOptions::default());
44//!
45//! // Now we can use the actor reference to send the actor a message.
46//! actor_ref.try_send("Hello world".to_owned()).unwrap();
47//!
48//! Ok(())
49//! })?;
50//! runtime.start()
51//! }
52//!
53//! /// Our actor.
54//! async fn actor(mut ctx: actor::Context<String, ThreadLocal>) {
55//! if let Ok(msg) = ctx.receive_next().await {
56//! println!("got message: {}", msg);
57//! }
58//! }
59//! ```
60//!
61//! # Sharing actor references
62//!
63//! All actor references can be cloned, which is the easiest way to share them.
64//!
65//! The example below shows how an actor reference is cloned to send a message
66//! to the same actor.
67//!
68//! ```
69//! #![feature(never_type)]
70//!
71//! use heph::actor;
72//! use heph::supervisor::NoSupervisor;
73//! use heph_rt::spawn::ActorOptions;
74//! use heph_rt::{self as rt, Runtime, ThreadLocal};
75//!
76//! fn main() -> Result<(), rt::Error> {
77//! let mut runtime = Runtime::new()?;
78//! runtime.run_on_workers(|mut runtime_ref| -> Result<(), !> {
79//! let new_actor = actor as fn(_) -> _;
80//! let actor_ref = runtime_ref.spawn_local(NoSupervisor, new_actor, (), ActorOptions::default());
81//!
82//! // To create another actor reference we can simply clone the
83//! // first one.
84//! let second_actor_ref = actor_ref.clone();
85//!
86//! // Now we can use both references to send a message.
87//! actor_ref.try_send("Hello world".to_owned()).unwrap();
88//! second_actor_ref.try_send("Bye world".to_owned()).unwrap();
89//!
90//! Ok(())
91//! })?;
92//! runtime.start()
93//! }
94//!
95//! /// Our actor.
96//! async fn actor(mut ctx: actor::Context<String, ThreadLocal>) {
97//! if let Ok(msg) = ctx.receive_next().await {
98//! println!("First message: {}", msg);
99//! }
100//!
101//! if let Ok(msg) = ctx.receive_next().await {
102//! println!("Second message: {}", msg);
103//! }
104//! }
105//! ```
106
107use std::any::TypeId;
108use std::convert::TryFrom;
109use std::error::Error;
110use std::fmt;
111use std::future::Future;
112use std::iter::FromIterator;
113use std::pin::Pin;
114use std::sync::atomic::{AtomicUsize, Ordering};
115use std::sync::Arc;
116use std::task::{self, Poll};
117
118use heph_inbox::{self as inbox, Sender};
119
120pub mod rpc;
121#[doc(no_inline)]
122pub use rpc::{Rpc, RpcError, RpcMessage, RpcResponse};
123
124/// Actor reference.
125///
126/// An actor reference reference can be used to send messages to an actor, for
127/// more details see the [module] documentation.
128///
129/// [module]: crate::actor_ref
130pub struct ActorRef<M> {
131 kind: ActorRefKind<M>,
132}
133
134enum ActorRefKind<M> {
135 /// Reference to an actor running on the same node.
136 Local(Sender<M>),
137 /// Reference that attempts to map the message to a different type first.
138 Mapped(Arc<dyn MappedActorRef<M>>),
139}
140
141// We know that the `Local` variant is `Send` and `Sync`. Since the `Mapped`
142// variant is a boxed version of `Local` so is that variant. This makes the
143// entire `ActorRefKind` `Send` and `Sync`, as long as `M` is `Send` (as we
144// could be sending the message across thread bounds).
145#[allow(clippy::non_send_fields_in_send_ty)]
146unsafe impl<M: Send> Send for ActorRefKind<M> {}
147unsafe impl<M: Send> Sync for ActorRefKind<M> {}
148
149// `ActorRefKind` is safe to move around independent of `M` as it's already heap
150// allocated.
151impl<M> Unpin for ActorRefKind<M> {}
152
153impl<M> ActorRef<M> {
154 /// Create a new `ActorRef` for an actor using `sender`.
155 #[doc(hidden)] // Not part of the stable API.
156 pub const fn local(sender: Sender<M>) -> ActorRef<M> {
157 ActorRef {
158 kind: ActorRefKind::Local(sender),
159 }
160 }
161
162 /// Send a message to the actor.
163 ///
164 /// See [Sending messages] and [`ActorRef::try_send`] for more details.
165 ///
166 /// [Sending messages]: index.html#sending-messages
167 ///
168 /// # Notes
169 ///
170 /// Mapped actor references, see [`ActorRef::map`] and
171 /// [`ActorRef::try_map`], require an allocation and might be expensive. If
172 /// possible try [`ActorRef::try_send`] first, which does not require an
173 /// allocation. Regular (i.e. non-mapped) actor references do not require an
174 /// allocation.
175 pub fn send<'r, Msg>(&'r self, msg: Msg) -> SendValue<'r, M>
176 where
177 Msg: Into<M>,
178 {
179 use ActorRefKind::*;
180 let msg = msg.into();
181 SendValue {
182 kind: match &self.kind {
183 Local(sender) => SendValueKind::Local(sender.send(msg)),
184 Mapped(actor_ref) => SendValueKind::Mapped(actor_ref.mapped_send(msg)),
185 },
186 }
187 }
188
189 /// Attempt to send a message to the actor.
190 ///
191 /// Some types of actor references can detect errors in sending a message,
192 /// however not all actor references can. This means that even if this
193 /// methods returns `Ok` it does **not** mean that the message is guaranteed
194 /// to be delivered to or handled by the actor.
195 ///
196 /// See [Sending messages] for more details.
197 ///
198 /// [Sending messages]: index.html#sending-messages
199 pub fn try_send<Msg>(&self, msg: Msg) -> Result<(), SendError>
200 where
201 Msg: Into<M>,
202 {
203 use ActorRefKind::*;
204 #[cfg(any(test, feature = "test"))]
205 if crate::test::should_lose_msg() {
206 log::debug!("dropping message on purpose");
207 return Ok(());
208 }
209
210 let msg = msg.into();
211 match &self.kind {
212 Local(sender) => sender.try_send(msg).map_err(|_| SendError),
213 Mapped(actor_ref) => actor_ref.try_mapped_send(msg),
214 }
215 }
216
217 /// Make a Remote Procedure Call (RPC).
218 ///
219 /// This will send the `request` to the actor and returns a [`Rpc`]
220 /// [`Future`] that will return a response (of type `Res`), or an error if
221 /// the receiving actor didn't respond.
222 ///
223 /// See the [`rpc`] module for more details.
224 ///
225 /// [`Future`]: std::future::Future
226 pub fn rpc<'r, Req, Res>(&'r self, request: Req) -> Rpc<'r, M, Res>
227 where
228 M: From<RpcMessage<Req, Res>>,
229 {
230 Rpc::new(self, request)
231 }
232
233 /// Change the message type of the actor reference.
234 ///
235 /// Before sending the message this will first change the message into a
236 /// different type. This is useful when you need to send to different types
237 /// of actors (using different message types) from a central location.
238 ///
239 /// # Notes
240 ///
241 /// This conversion is **not** cheap, it requires an allocation so use with
242 /// caution when it comes to performance sensitive code.
243 ///
244 /// Prefer to clone an existing mapped `ActorRef` over creating a new one as
245 /// that can reuse the allocation mentioned above.
246 pub fn map<Msg>(self) -> ActorRef<Msg>
247 where
248 M: From<Msg> + 'static,
249 Msg: 'static,
250 {
251 // There is a blanket implementation for `TryFrom` for `T: From` so we
252 // can use the `TryFrom` knowning that it will never return an error.
253 self.try_map()
254 }
255
256 /// Much like [`map`], but uses the [`TryFrom`] trait.
257 ///
258 /// This creates a new actor reference that attempts to map from one message
259 /// type to another before sending. This is useful when you need to send to
260 /// different types of actors from a central location.
261 ///
262 /// [`map`]: ActorRef::map
263 ///
264 /// # Notes
265 ///
266 /// Errors converting from one message type to another are turned into
267 /// [`SendError`]s.
268 ///
269 /// This conversion is **not** cheap, it requires an allocation so use with
270 /// caution when it comes to performance sensitive code.
271 ///
272 /// Prefer to clone an existing mapped `ActorRef` over creating a new one as
273 /// that can reuse the allocation mentioned above.
274 pub fn try_map<Msg>(self) -> ActorRef<Msg>
275 where
276 M: TryFrom<Msg> + 'static,
277 Msg: 'static,
278 {
279 if TypeId::of::<ActorRef<M>>() == TypeId::of::<ActorRef<Msg>>() {
280 // Safety: If `M` == `Msg`, then the following `transmute` is a
281 // no-op and thus safe.
282 unsafe { std::mem::transmute(self) }
283 } else {
284 ActorRef {
285 kind: ActorRefKind::Mapped(Arc::new(self)),
286 }
287 }
288 }
289
290 /// Change the message type of the actor reference.
291 ///
292 /// Before sending the message this will first change the message into a
293 /// different type using the `map`ping function `F`. This is useful when you
294 /// need to send to different types of actors (using different message
295 /// types) from a central location.
296 ///
297 /// # Notes
298 ///
299 /// This conversion is **not** cheap, it requires an allocation so use with
300 /// caution when it comes to performance sensitive code.
301 ///
302 /// Prefer to clone an existing mapped `ActorRef` over creating a new one as
303 /// that can reuse the allocation mentioned above.
304 pub fn map_fn<Msg, F>(self, map: F) -> ActorRef<Msg>
305 where
306 F: Fn(Msg) -> M + 'static,
307 M: 'static,
308 {
309 self.try_map_fn::<Msg, _, !>(move |msg| Ok(map(msg)))
310 }
311
312 /// Change the message type of the actor reference.
313 ///
314 /// Before sending the message this will first change the message into a
315 /// different type using the `map`ping function `F`. This is useful when you
316 /// need to send to different types of actors (using different message
317 /// types) from a central location.
318 ///
319 /// # Notes
320 ///
321 /// This conversion is **not** cheap, it requires an allocation so use with
322 /// caution when it comes to performance sensitive code.
323 ///
324 /// Prefer to clone an existing mapped `ActorRef` over creating a new one as
325 /// that can reuse the allocation mentioned above.
326 pub fn try_map_fn<Msg, F, E>(self, map: F) -> ActorRef<Msg>
327 where
328 F: Fn(Msg) -> Result<M, E> + 'static,
329 M: 'static,
330 {
331 let mapped_ref = MappedActorRefFn {
332 actor_ref: self,
333 map,
334 };
335 ActorRef {
336 kind: ActorRefKind::Mapped(Arc::new(mapped_ref)),
337 }
338 }
339
340 /// Returns a [`Future`] that waits until the actor finishes running. Acts
341 /// similar to a [`JoinHandle`] of a thread.
342 ///
343 /// [disconnected]: ActorRef::is_connected
344 /// [`JoinHandle`]: std::thread::JoinHandle
345 pub fn join<'r>(&'r self) -> Join<'r, M> {
346 use ActorRefKind::*;
347 Join {
348 kind: match &self.kind {
349 Local(sender) => JoinKind::Local(sender.join()),
350 Mapped(actor_ref) => JoinKind::Mapped(actor_ref.mapped_join()),
351 },
352 }
353 }
354
355 /// Returns `true` if the actor to which this reference sends to is still
356 /// connected.
357 ///
358 /// # Notes
359 ///
360 /// Even if this returns `true` it doesn't mean [`ActorRef::try_send`] will
361 /// succeeded (even if the inbox isn't full). There is always a race
362 /// condition between using this method and doing something based on it.
363 ///
364 /// This does provide one useful feature: once this returns `false` it will
365 /// never return `true` again. This makes it useful in the use case where
366 /// creating a message is expansive, which is wasted if the actor is no
367 /// longer running. Thus this should only be used as optimisation to not do
368 /// work.
369 pub fn is_connected(&self) -> bool {
370 use ActorRefKind::*;
371 match &self.kind {
372 Local(sender) => sender.is_connected(),
373 Mapped(actor_ref) => actor_ref.is_connected(),
374 }
375 }
376
377 /// Returns true if `self` and `other` send messages to the same actor.
378 pub fn sends_to<Msg>(&self, other: &ActorRef<Msg>) -> bool {
379 self.id() == other.id()
380 }
381
382 fn id(&self) -> inbox::Id {
383 use ActorRefKind::*;
384 match &self.kind {
385 Local(sender) => sender.id(),
386 Mapped(actor_ref) => actor_ref.id(),
387 }
388 }
389}
390
391impl<M> Clone for ActorRef<M> {
392 fn clone(&self) -> ActorRef<M> {
393 use ActorRefKind::*;
394 ActorRef {
395 kind: match &self.kind {
396 Local(sender) => Local(sender.clone()),
397 Mapped(actor_ref) => Mapped(actor_ref.clone()),
398 },
399 }
400 }
401}
402
403impl<M> fmt::Debug for ActorRef<M> {
404 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
405 f.write_str("ActorRef")
406 }
407}
408
409/// Trait to erase the original message type of the actor reference.
410///
411/// # Notes
412///
413/// For correctness this may only be implemented on [`ActorRef`].
414trait MappedActorRef<M> {
415 /// Same as [`ActorRef::try_send`] but converts the message first.
416 fn try_mapped_send(&self, msg: M) -> Result<(), SendError>;
417
418 fn mapped_send<'r>(&'r self, msg: M) -> MappedSendValue<'r>;
419
420 fn mapped_join<'r>(&'r self) -> MappedJoin<'r>;
421
422 fn is_connected(&self) -> bool;
423
424 fn id(&self) -> inbox::Id;
425}
426
427impl<M, Msg> MappedActorRef<Msg> for ActorRef<M>
428where
429 M: TryFrom<Msg>,
430{
431 fn try_mapped_send(&self, msg: Msg) -> Result<(), SendError> {
432 M::try_from(msg)
433 .map_err(|_| SendError)
434 .and_then(|msg| self.try_send(msg))
435 }
436
437 fn mapped_send<'r>(&'r self, msg: Msg) -> MappedSendValue<'r> {
438 match M::try_from(msg) {
439 Ok(msg) => match &self.kind {
440 ActorRefKind::Local(sender) => match sender.try_send(msg) {
441 Ok(()) => MappedSendValue::Send,
442 Err(heph_inbox::SendError::Full(msg)) => {
443 MappedSendValue::Sending(Box::pin(self.send(msg)))
444 }
445 Err(heph_inbox::SendError::Disconnected(_)) => MappedSendValue::SendErr,
446 },
447 ActorRefKind::Mapped(sender) => sender.mapped_send(msg),
448 },
449 Err(..) => MappedSendValue::SendErr,
450 }
451 }
452
453 fn mapped_join<'r>(&'r self) -> MappedJoin<'r> {
454 match &self.kind {
455 ActorRefKind::Local(sender) => match sender.is_connected() {
456 false => MappedJoin::Disconnected,
457 true => MappedJoin::Join(Box::pin(self.join())),
458 },
459 ActorRefKind::Mapped(sender) => sender.mapped_join(),
460 }
461 }
462
463 fn is_connected(&self) -> bool {
464 self.is_connected()
465 }
466
467 fn id(&self) -> inbox::Id {
468 self.id()
469 }
470}
471
472/// Wrapper around an [`ActorRef`] to change the message type.
473struct MappedActorRefFn<M, F> {
474 actor_ref: ActorRef<M>,
475 map: F,
476}
477
478impl<M, Msg, F, E> MappedActorRef<Msg> for MappedActorRefFn<M, F>
479where
480 F: Fn(Msg) -> Result<M, E>,
481{
482 fn try_mapped_send(&self, msg: Msg) -> Result<(), SendError> {
483 match (self.map)(msg) {
484 Ok(msg) => self.actor_ref.try_send(msg),
485 Err(..) => Err(SendError),
486 }
487 }
488
489 fn mapped_send<'r>(&'r self, msg: Msg) -> MappedSendValue<'r> {
490 match (self.map)(msg) {
491 Ok(msg) => match &self.actor_ref.kind {
492 ActorRefKind::Local(sender) => match sender.try_send(msg) {
493 Ok(()) => MappedSendValue::Send,
494 Err(heph_inbox::SendError::Full(msg)) => {
495 MappedSendValue::Sending(Box::pin(self.actor_ref.send(msg)))
496 }
497 Err(heph_inbox::SendError::Disconnected(_)) => MappedSendValue::SendErr,
498 },
499 ActorRefKind::Mapped(sender) => sender.mapped_send(msg),
500 },
501 Err(..) => MappedSendValue::SendErr,
502 }
503 }
504
505 fn mapped_join<'r>(&'r self) -> MappedJoin<'r> {
506 match &self.actor_ref.kind {
507 ActorRefKind::Local(sender) => match sender.is_connected() {
508 false => MappedJoin::Disconnected,
509 true => MappedJoin::Join(Box::pin(self.actor_ref.join())),
510 },
511 ActorRefKind::Mapped(sender) => sender.mapped_join(),
512 }
513 }
514
515 fn is_connected(&self) -> bool {
516 self.actor_ref.is_connected()
517 }
518
519 fn id(&self) -> inbox::Id {
520 self.actor_ref.id()
521 }
522}
523
524/// Future used in `MappedActorRef::mapped_send`
525enum MappedSendValue<'r> {
526 /// Already send.
527 Send,
528 /// Error mapping or sending the message.
529 SendErr,
530 /// Sending in progress.
531 /// NOTE: we need a `Box` to erase the mapped message type.
532 Sending(Pin<Box<dyn Future<Output = Result<(), SendError>> + 'r>>),
533}
534
535impl<'r> Future for MappedSendValue<'r> {
536 type Output = Result<(), SendError>;
537
538 #[track_caller]
539 fn poll(self: Pin<&mut Self>, ctx: &mut task::Context<'_>) -> Poll<Self::Output> {
540 use MappedSendValue::*;
541 match self.get_mut() {
542 Send => Poll::Ready(Ok(())),
543 SendErr => Poll::Ready(Err(SendError)),
544 Sending(send_value) => send_value.as_mut().poll(ctx),
545 }
546 }
547}
548
549/// Future used in `MappedActorRef::mapped_join`
550enum MappedJoin<'r> {
551 Disconnected,
552 /// NOTE: we need a `Box` to erase the mapped message type.
553 Join(Pin<Box<dyn Future<Output = ()> + 'r>>),
554}
555
556impl<'r> Future for MappedJoin<'r> {
557 type Output = ();
558
559 #[track_caller]
560 fn poll(self: Pin<&mut Self>, ctx: &mut task::Context<'_>) -> Poll<Self::Output> {
561 use MappedJoin::*;
562 match self.get_mut() {
563 Disconnected => Poll::Ready(()),
564 Join(join) => join.as_mut().poll(ctx),
565 }
566 }
567}
568
569/// [`Future`] behind [`ActorRef::send`].
570#[must_use = "futures do nothing unless you `.await` or poll them"]
571pub struct SendValue<'r, M> {
572 kind: SendValueKind<'r, M>,
573}
574
575enum SendValueKind<'r, M> {
576 Local(inbox::SendValue<'r, M>),
577 Mapped(MappedSendValue<'r>),
578}
579
580// We know that the `Local` variant is `Send` and `Sync`. Since the `Mapped`
581// variant is a boxed version of `Local` so is that variant. This makes the
582// entire `SendValueKind` `Send` and `Sync`, as long as `M` is `Send` (as we
583// could be sending the message across thread bounds).
584#[allow(clippy::non_send_fields_in_send_ty)]
585unsafe impl<'r, M: Send> Send for SendValueKind<'r, M> {}
586unsafe impl<'r, M: Send> Sync for SendValueKind<'r, M> {}
587
588impl<'r, M> Future for SendValue<'r, M> {
589 type Output = Result<(), SendError>;
590
591 #[track_caller]
592 fn poll(self: Pin<&mut Self>, ctx: &mut task::Context<'_>) -> Poll<Self::Output> {
593 use SendValueKind::*;
594 #[cfg(any(test, feature = "test"))]
595 if crate::test::should_lose_msg() {
596 log::debug!("dropping message on purpose");
597 return Poll::Ready(Ok(()));
598 }
599
600 // Safety: we're not moving the future to this is safe.
601 let this = unsafe { self.get_unchecked_mut() };
602 match &mut this.kind {
603 // Safety: we're not moving `send_value` so this is safe.
604 Local(fut) => unsafe { Pin::new_unchecked(fut) }
605 .poll(ctx)
606 .map_err(|_| SendError),
607 // Safety: we're not moving `fut` so this is safe.
608 Mapped(fut) => unsafe { Pin::new_unchecked(fut) }.poll(ctx),
609 }
610 }
611}
612
613impl<'r, M> fmt::Debug for SendValue<'r, M> {
614 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
615 f.write_str("SendValue")
616 }
617}
618
619/// Error returned when sending a message fails.
620///
621/// The reason why the sending of the message failed is unspecified.
622#[derive(Copy, Clone, Debug, Eq, PartialEq)]
623pub struct SendError;
624
625impl fmt::Display for SendError {
626 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
627 f.write_str("unable to send message")
628 }
629}
630
631impl Error for SendError {}
632
633/// [`Future`] behind [`ActorRef::join`].
634#[must_use = "futures do nothing unless you `.await` or poll them"]
635pub struct Join<'r, M> {
636 kind: JoinKind<'r, M>,
637}
638
639enum JoinKind<'r, M> {
640 Local(inbox::Join<'r, M>),
641 Mapped(MappedJoin<'r>),
642}
643
644// We know that the `Local` variant is `Send` and `Sync`. Since the `Mapped`
645// variant is a boxed version of `Local` so is that variant. This makes the
646// entire `JoinKind` `Send` and `Sync`, as long as `M` is `Send` (as we could be
647// sending the message across thread bounds).
648#[allow(clippy::non_send_fields_in_send_ty)]
649unsafe impl<'r, M: Send> Send for JoinKind<'r, M> {}
650unsafe impl<'r, M: Send> Sync for JoinKind<'r, M> {}
651
652impl<'r, M> Future for Join<'r, M> {
653 type Output = ();
654
655 #[track_caller]
656 fn poll(self: Pin<&mut Self>, ctx: &mut task::Context<'_>) -> Poll<Self::Output> {
657 use JoinKind::*;
658
659 // Safety: we're not moving the future to this is safe.
660 let this = unsafe { self.get_unchecked_mut() };
661 match &mut this.kind {
662 // Safety: we're not moving `fut` so this is safe.
663 Local(fut) => unsafe { Pin::new_unchecked(fut) }.poll(ctx),
664 // Safety: we're not moving `fut` so this is safe.
665 Mapped(fut) => unsafe { Pin::new_unchecked(fut) }.poll(ctx),
666 }
667 }
668}
669
670impl<'r, M> fmt::Debug for Join<'r, M> {
671 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
672 f.write_str("Join")
673 }
674}
675
676/// A group of [`ActorRef`]s used to send a message to multiple actors.
677///
678/// # Notes
679///
680/// Unlike [`ActorRef`] this is **not** cheap to clone as it's requires a clone
681/// of an internal vector.
682pub struct ActorGroup<M> {
683 actor_refs: Vec<ActorRef<M>>,
684 /// Index of the actor reference to send the [single delivery] message to.
685 /// Using relaxed ordering on this field is fine because we make no
686 /// guarantees about to which actor the message will be delivered. E.g. we
687 /// could always send to the first actor in the group and still fulfill the
688 /// contract.
689 ///
690 /// [single delivery]: Delivery::ToOne
691 send_next: AtomicUsize,
692}
693
694/// The kind of delivery to use in [`ActorGroup::try_send`].
695#[derive(Copy, Clone, Debug)]
696pub enum Delivery {
697 /// Delivery a copy of the message to all actors in the group.
698 ToAll,
699 /// Delivery the message to one of the actors.
700 ToOne,
701}
702
703impl<M> ActorGroup<M> {
704 /// Creates an empty `ActorGroup`.
705 pub const fn empty() -> ActorGroup<M> {
706 ActorGroup {
707 actor_refs: Vec::new(),
708 send_next: AtomicUsize::new(0),
709 }
710 }
711
712 /// Create a new `ActorGroup`.
713 pub fn new<I>(actor_refs: I) -> ActorGroup<M>
714 where
715 I: IntoIterator<Item = ActorRef<M>>,
716 {
717 ActorGroup {
718 actor_refs: actor_refs.into_iter().collect(),
719 send_next: AtomicUsize::new(0),
720 }
721 }
722
723 /// Returns the number of actor references in the group.
724 pub fn len(&self) -> usize {
725 self.actor_refs.len()
726 }
727
728 /// Returns `true` if the group is empty.
729 pub fn is_empty(&self) -> bool {
730 self.actor_refs.is_empty()
731 }
732
733 /// Add an `ActorRef` to the group.
734 pub fn add(&mut self, actor_ref: ActorRef<M>) {
735 self.actor_refs.push(actor_ref)
736 }
737
738 /// Add an `ActorRef` to the group, iff it's not already in the group.
739 pub fn add_unique(&mut self, actor_ref: ActorRef<M>) {
740 let id = actor_ref.id();
741 for actor_ref in &self.actor_refs {
742 if actor_ref.id() == id {
743 return;
744 }
745 }
746 self.actor_refs.push(actor_ref)
747 }
748
749 /// Remove all actor references which point to the same actor as
750 /// `actor_ref`.
751 pub fn remove(&mut self, actor_ref: &ActorRef<M>) {
752 let id = actor_ref.id();
753 self.actor_refs.retain(|a| a.id() != id);
754 }
755
756 /// Remove all actor references that have been disconnected.
757 pub fn remove_disconnected(&mut self) {
758 self.actor_refs.retain(ActorRef::is_connected);
759 }
760
761 /// Make the group of actor references unique.
762 ///
763 /// Removes all duplicate actor references.
764 pub fn make_unique(&mut self) {
765 let mut i = 0;
766 while let Some(id) = self.actor_refs.get(i).map(ActorRef::id) {
767 let mut j = i + 1;
768 while let Some(other_id) = self.actor_refs.get(j).map(ActorRef::id) {
769 if id == other_id {
770 // NOTE: don't update `j` as it's replaced with another
771 // actor reference we haven't checked yet.
772 drop(self.actor_refs.swap_remove(j));
773 } else {
774 // Move the next actor reference.
775 j += 1;
776 }
777 }
778 i += 1;
779 }
780 }
781
782 /// Attempts to send a message to all the actors in the group.
783 ///
784 /// This can either send the message to a single actor, by using
785 /// [`Delivery::ToOne`], or to all actors in the group by using
786 /// [`Delivery::ToAll`].
787 ///
788 /// When deliverying to all actors this will first `clone` the message and
789 /// then [`try_send`]ing it to each actor in the group. Note that this means
790 /// it will `clone` before calling [`Into::into`] on the message. If the
791 /// call to [`Into::into`] is expansive, or `M` is cheaper to clone than
792 /// `Msg` it might be worthwhile to call `msg.into()` before calling this
793 /// method.
794 ///
795 /// This only returns an error if the group is empty, otherwise this will
796 /// always return `Ok(())`.
797 ///
798 /// See [Sending messages] for more details.
799 ///
800 /// [`try_send`]: ActorRef::try_send
801 /// [Sending messages]: index.html#sending-messages
802 pub fn try_send<Msg>(&self, msg: Msg, delivery: Delivery) -> Result<(), SendError>
803 where
804 Msg: Into<M> + Clone,
805 {
806 if self.actor_refs.is_empty() {
807 return Err(SendError);
808 }
809
810 match delivery {
811 Delivery::ToAll => {
812 for actor_ref in &self.actor_refs {
813 let _ = actor_ref.try_send(msg.clone());
814 }
815 Ok(())
816 }
817 Delivery::ToOne => {
818 // Safety: this needs to sync itself.
819 // NOTE: this wraps around on overflow.
820 let idx = self.send_next.fetch_add(1, Ordering::AcqRel) % self.actor_refs.len();
821 let actor_ref = &self.actor_refs[idx];
822 // TODO: try to send it to another actor on send failure?
823 actor_ref.try_send(msg)
824 }
825 }
826 }
827
828 /// Wait for all actors in this group to finish running.
829 ///
830 /// This works the same way as [`ActorRef::join`], but waits on a group of
831 /// actors.
832 pub fn join_all<'r>(&'r self) -> JoinAll<'r, M> {
833 JoinAll {
834 actor_refs: &self.actor_refs,
835 join: None,
836 }
837 }
838}
839
840impl<M> From<ActorRef<M>> for ActorGroup<M> {
841 fn from(actor_ref: ActorRef<M>) -> ActorGroup<M> {
842 ActorGroup {
843 actor_refs: vec![actor_ref],
844 send_next: AtomicUsize::new(0),
845 }
846 }
847}
848
849impl<M> FromIterator<ActorRef<M>> for ActorGroup<M> {
850 fn from_iter<I>(iter: I) -> Self
851 where
852 I: IntoIterator<Item = ActorRef<M>>,
853 {
854 ActorGroup::new(iter)
855 }
856}
857
858impl<M> Extend<ActorRef<M>> for ActorGroup<M> {
859 fn extend<I>(&mut self, iter: I)
860 where
861 I: IntoIterator<Item = ActorRef<M>>,
862 {
863 self.actor_refs.extend(iter);
864 }
865}
866
867impl<M> Clone for ActorGroup<M> {
868 fn clone(&self) -> ActorGroup<M> {
869 ActorGroup {
870 actor_refs: self.actor_refs.clone(),
871 send_next: AtomicUsize::new(0),
872 }
873 }
874
875 fn clone_from(&mut self, source: &Self) {
876 self.actor_refs.clone_from(&source.actor_refs);
877 *self.send_next.get_mut() = 0
878 }
879}
880
881impl<M> fmt::Debug for ActorGroup<M> {
882 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
883 self.actor_refs.fmt(f)
884 }
885}
886
887/// [`Future`] behind [`ActorGroup::join_all`].
888#[must_use = "futures do nothing unless you `.await` or poll them"]
889pub struct JoinAll<'r, M> {
890 actor_refs: &'r [ActorRef<M>],
891 join: Option<Join<'r, M>>,
892}
893
894impl<'r, M> Future for JoinAll<'r, M> {
895 type Output = ();
896
897 fn poll(mut self: Pin<&mut Self>, ctx: &mut task::Context<'_>) -> Poll<Self::Output> {
898 loop {
899 // Check the `Join` we're currently waiting on, if any.
900 if let Some(join) = self.join.as_mut() {
901 match Pin::new(join).poll(ctx) {
902 Poll::Ready(()) => {
903 drop(self.join.take());
904 // Continue below.
905 }
906 Poll::Pending => return Poll::Pending,
907 }
908 }
909
910 // Remove all already disconnected actor references.
911 let mut remove = 0;
912 for actor_ref in self.actor_refs {
913 if actor_ref.is_connected() {
914 break;
915 }
916 remove += 1;
917 }
918 self.actor_refs = &self.actor_refs[remove..];
919
920 if self.actor_refs.is_empty() {
921 // No more actors we have to wait for, so we're done.
922 return Poll::Ready(());
923 }
924
925 // Wait on the next actor.
926 self.join = Some(self.actor_refs[0].join());
927 }
928 }
929}
930
931impl<'r, M> fmt::Debug for JoinAll<'r, M> {
932 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
933 f.debug_struct("JoinAll")
934 .field("left", &self.actor_refs.len())
935 .finish()
936 }
937}