Skip to main content

acktor/
address.rs

1//! Traits and type definitions for actor address.
2//!
3//! In the actor model, an [`Address`] is a handle to an actor. It is the only way to interact
4//! with an actor since the runtime will take the ownership of the actor itself after it is
5//! spawned.
6//!
7//! This module defines the [`Address`] type for an actor. It also provides the [`Sender`] trait
8//! and a [`Recipient`] type which are alternative ways to organize the addresses of actors.
9//!
10
11use std::fmt::{self, Debug};
12use std::hash::{Hash, Hasher};
13use std::sync::{
14    Arc,
15    atomic::{AtomicU64, Ordering},
16};
17
18use futures_util::FutureExt;
19use tokio::time::Duration;
20
21use crate::actor::{Actor, ActorId};
22use crate::channel::mpsc;
23use crate::envelope::{Envelope, FromEnvelope, IntoEnvelope};
24use crate::error::SendError;
25use crate::message::Message;
26use crate::utils::ShortName;
27#[cfg(feature = "ipc")]
28use crate::{actor::RemoteAddressable, message::BinaryMessage};
29
30mod local;
31use local::LocalAddress;
32
33#[cfg(feature = "ipc")]
34mod remote;
35#[cfg(feature = "ipc")]
36use remote::RemoteAddress;
37#[cfg(feature = "ipc")]
38#[cfg_attr(docsrs, doc(cfg(feature = "ipc")))]
39pub use remote::RemoteProxy;
40
41mod permit;
42pub use permit::{OwnedSendPermit, SendPermit};
43
44mod sender;
45pub use sender::{
46    DoSendResult, DoSendResultFuture, EmptyFuture, SendResult, SendResultFuture, Sender, SenderInfo,
47};
48
49mod recipient;
50pub use recipient::Recipient;
51
52mod mailbox;
53pub use mailbox::Mailbox;
54
55static INDEX_GENERATOR: AtomicU64 = AtomicU64::new(0);
56
57#[inline]
58pub(crate) fn next_actor_id() -> u64 {
59    INDEX_GENERATOR.fetch_add(1, Ordering::Relaxed)
60}
61
62/// A remote mailbox which can be used by the runtime to deliver binary messages to a remote
63/// addressable actor.
64///
65/// It is an alias of the address of the actor in the form of a `Recipient<BinaryMessage>`.
66#[cfg(feature = "ipc")]
67#[cfg_attr(docsrs, doc(cfg(feature = "ipc")))]
68pub type RemoteMailbox = Recipient<BinaryMessage>;
69
70enum Inner<A>
71where
72    A: Actor,
73{
74    Local(LocalAddress<A>),
75    #[cfg(feature = "ipc")]
76    Remote(RemoteAddress),
77}
78
79impl<A> Clone for Inner<A>
80where
81    A: Actor,
82{
83    fn clone(&self) -> Self {
84        match self {
85            Self::Local(address) => Self::Local(address.clone()),
86            #[cfg(feature = "ipc")]
87            Self::Remote(address) => Self::Remote(address.clone()),
88        }
89    }
90}
91
92/// The address of an actor.
93///
94/// It is used to send messages to an actor.
95#[repr(transparent)]
96pub struct Address<A>(Inner<A>)
97where
98    A: Actor;
99
100impl<A> Debug for Address<A>
101where
102    A: Actor,
103{
104    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
105        f.debug_tuple(&ShortName::of::<Self>().to_string())
106            .field(&self.index())
107            .finish()
108    }
109}
110
111impl<A> Clone for Address<A>
112where
113    A: Actor,
114{
115    fn clone(&self) -> Self {
116        Self(self.0.clone())
117    }
118}
119
120impl<A> PartialEq for Address<A>
121where
122    A: Actor,
123{
124    fn eq(&self, other: &Self) -> bool {
125        self.index().eq(&other.index())
126    }
127}
128
129impl<A> Eq for Address<A> where A: Actor {}
130
131impl<A> Hash for Address<A>
132where
133    A: Actor,
134{
135    fn hash<H>(&self, state: &mut H)
136    where
137        H: Hasher,
138    {
139        self.index().hash(state);
140    }
141}
142
143impl<A> Address<A>
144where
145    A: Actor,
146{
147    /// Constructs a new local [`Address`] from a [`mpsc::Sender`].
148    pub fn new(tx: mpsc::Sender<Envelope<A>>) -> Self {
149        Self(Inner::Local(LocalAddress::new(tx)))
150    }
151
152    /// Constructs a new remote [`Address`] from a [`RemoteProxy`].
153    ///
154    /// The `index` parameter is used to identify the actor in another process, usually it is the
155    /// index part of its [`ActorId`] in its own process.
156    #[cfg(feature = "ipc")]
157    #[cfg_attr(docsrs, doc(cfg(feature = "ipc")))]
158    pub fn new_remote(index: u64, proxy: Arc<dyn RemoteProxy + Send + Sync>) -> Self
159    where
160        A: RemoteAddressable,
161    {
162        Self(Inner::Remote(RemoteAddress::new(
163            index,
164            A::codec_table(),
165            proxy,
166        )))
167    }
168
169    /// Returns the index of the address.
170    pub const fn index(&self) -> ActorId {
171        match &self.0 {
172            Inner::Local(address) => address.index(),
173            #[cfg(feature = "ipc")]
174            Inner::Remote(address) => address.index(),
175        }
176    }
177
178    /// Completes when the mailbox of the actor has been closed.
179    pub async fn closed(&self) {
180        match &self.0 {
181            Inner::Local(address) => address.closed().await,
182            #[cfg(feature = "ipc")]
183            Inner::Remote(address) => address.closed().await,
184        }
185    }
186
187    /// Checks if the mailbox of the actor is closed.
188    pub fn is_closed(&self) -> bool {
189        match &self.0 {
190            Inner::Local(address) => address.is_closed(),
191            #[cfg(feature = "ipc")]
192            Inner::Remote(address) => address.is_closed(),
193        }
194    }
195
196    /// Returns the current capacity of the mailbox of the actor.
197    pub fn capacity(&self) -> usize {
198        match &self.0 {
199            Inner::Local(address) => address.capacity(),
200            #[cfg(feature = "ipc")]
201            Inner::Remote(address) => address.capacity(),
202        }
203    }
204
205    /// Sends a message, waiting until there is capacity, and returns a
206    /// [`Receiver`][crate::channel::oneshot::Receiver] which can be used to receive the message
207    /// response.
208    pub async fn send<M, EP>(&self, msg: M) -> SendResult<M>
209    where
210        M: Message + IntoEnvelope<A, EP> + FromEnvelope<A, EP>,
211    {
212        match &self.0 {
213            Inner::Local(address) => address.send(msg).await,
214            #[cfg(feature = "ipc")]
215            Inner::Remote(address) => address.send(msg).await,
216        }
217    }
218
219    /// Sends a message, waiting until there is capacity, without expecting a response.
220    pub async fn do_send<M, EP>(&self, msg: M) -> DoSendResult<M>
221    where
222        M: Message + IntoEnvelope<A, EP> + FromEnvelope<A, EP>,
223    {
224        match &self.0 {
225            Inner::Local(address) => address.do_send(msg).await,
226            #[cfg(feature = "ipc")]
227            Inner::Remote(address) => address.do_send(msg).await,
228        }
229    }
230
231    /// Attempts to immediately send a message and returns a
232    /// [`Receiver`][crate::channel::oneshot::Receiver] which can be used to receive the message
233    /// response.
234    pub fn try_send<M, EP>(&self, msg: M) -> SendResult<M>
235    where
236        M: Message + IntoEnvelope<A, EP> + FromEnvelope<A, EP>,
237    {
238        match &self.0 {
239            Inner::Local(address) => address.try_send(msg),
240            #[cfg(feature = "ipc")]
241            Inner::Remote(address) => address.try_send(msg),
242        }
243    }
244
245    /// Attempts to immediately send a message without expecting a response.
246    pub fn try_do_send<M, EP>(&self, msg: M) -> DoSendResult<M>
247    where
248        M: Message + IntoEnvelope<A, EP> + FromEnvelope<A, EP>,
249    {
250        match &self.0 {
251            Inner::Local(address) => address.try_do_send(msg),
252            #[cfg(feature = "ipc")]
253            Inner::Remote(address) => address.try_do_send(msg),
254        }
255    }
256
257    /// Sends a message, waiting until there is capacity, but only for a limited time, and returns
258    /// a [`Receiver`][crate::channel::oneshot::Receiver] which can be used to receive the message
259    /// response.
260    pub async fn send_timeout<M, EP>(&self, msg: M, timeout: Duration) -> SendResult<M>
261    where
262        M: Message + IntoEnvelope<A, EP> + FromEnvelope<A, EP>,
263    {
264        match &self.0 {
265            Inner::Local(address) => address.send_timeout(msg, timeout).await,
266            #[cfg(feature = "ipc")]
267            Inner::Remote(address) => address.send_timeout(msg, timeout).await,
268        }
269    }
270
271    /// Sends a message, waiting until there is capacity, but only for a limited time, without
272    /// expecting a response.
273    pub async fn do_send_timeout<M, EP>(&self, msg: M, timeout: Duration) -> DoSendResult<M>
274    where
275        M: Message + IntoEnvelope<A, EP> + FromEnvelope<A, EP>,
276    {
277        match &self.0 {
278            Inner::Local(address) => address.do_send_timeout(msg, timeout).await,
279            #[cfg(feature = "ipc")]
280            Inner::Remote(address) => address.do_send_timeout(msg, timeout).await,
281        }
282    }
283
284    /// Blocking send to call outside of asynchronous contexts.
285    ///
286    /// This method is intended for use cases where you are sending from synchronous code to
287    /// asynchronous code.
288    ///
289    /// # Panics
290    ///
291    /// This function panics if called within an asynchronous execution context.
292    pub fn blocking_send<M, EP>(&self, msg: M) -> SendResult<M>
293    where
294        M: Message + IntoEnvelope<A, EP> + FromEnvelope<A, EP>,
295    {
296        match &self.0 {
297            Inner::Local(address) => address.blocking_send(msg),
298            #[cfg(feature = "ipc")]
299            Inner::Remote(address) => address.blocking_send(msg),
300        }
301    }
302
303    /// Blocking do_send to call outside of asynchronous contexts.
304    ///
305    /// This method is intended for use cases where you are sending from synchronous code to
306    /// asynchronous code.
307    ///
308    /// # Panics
309    ///
310    /// This function panics if called within an asynchronous execution context.
311    pub fn blocking_do_send<M, EP>(&self, msg: M) -> DoSendResult<M>
312    where
313        M: Message + IntoEnvelope<A, EP> + FromEnvelope<A, EP>,
314    {
315        match &self.0 {
316            Inner::Local(address) => address.blocking_do_send(msg),
317            #[cfg(feature = "ipc")]
318            Inner::Remote(address) => address.blocking_do_send(msg),
319        }
320    }
321
322    /// Reserves channel capacity to send one message.
323    ///
324    /// This method borrows the internal [`mpsc::Sender`] and returns a [`SendPermit`].
325    pub async fn reserve(&self) -> Result<SendPermit<'_, A>, SendError<()>> {
326        match &self.0 {
327            Inner::Local(address) => address.reserve().await,
328            #[cfg(feature = "ipc")]
329            Inner::Remote(_) => Err(SendError::other(
330                "remote address does not support reserve",
331                (),
332            )),
333        }
334    }
335
336    /// Attempts to reserve channel capacity to send one message.
337    ///
338    /// This method borrows the internal [`mpsc::Sender`] and returns a [`SendPermit`].
339    pub fn try_reserve(&self) -> Result<SendPermit<'_, A>, SendError<()>> {
340        match &self.0 {
341            Inner::Local(address) => address.try_reserve(),
342            #[cfg(feature = "ipc")]
343            Inner::Remote(_) => Err(SendError::other(
344                "remote address does not support reserve",
345                (),
346            )),
347        }
348    }
349
350    /// Reserves channel capacity to send one message.
351    ///
352    /// This method clones the internal [`mpsc::Sender`] and returns a [`OwnedSendPermit`].
353    pub async fn reserve_owned(&self) -> Result<OwnedSendPermit<A>, SendError<()>> {
354        match &self.0 {
355            Inner::Local(address) => address.reserve_owned().await,
356            #[cfg(feature = "ipc")]
357            Inner::Remote(_) => Err(SendError::other(
358                "remote address does not support reserve",
359                (),
360            )),
361        }
362    }
363
364    /// Attempts to reserve channel capacity to send one message.
365    ///
366    /// This method clones the internal [`mpsc::Sender`] and returns a [`OwnedSendPermit`].
367    pub fn try_reserve_owned(&self) -> Result<OwnedSendPermit<A>, SendError<()>> {
368        match &self.0 {
369            Inner::Local(address) => address.try_reserve_owned(),
370            #[cfg(feature = "ipc")]
371            Inner::Remote(_) => Err(SendError::other(
372                "remote address does not support reserve",
373                (),
374            )),
375        }
376    }
377
378    /// Returns a [`RemoteMailbox`], if the actor is a remote addressable actor, and this address
379    /// is a local address.
380    #[cfg(feature = "ipc")]
381    #[cfg_attr(docsrs, doc(cfg(feature = "ipc")))]
382    fn remote_addressable(&self) -> Option<RemoteMailbox> {
383        match &self.0 {
384            Inner::Local(_) => A::remote_mailbox(self.clone()),
385            #[cfg(feature = "ipc")]
386            Inner::Remote(_) => None,
387        }
388    }
389}
390
391impl<A> SenderInfo for Address<A>
392where
393    A: Actor,
394{
395    fn index(&self) -> ActorId {
396        self.index()
397    }
398
399    fn closed(&self) -> EmptyFuture<'_> {
400        self.closed().boxed()
401    }
402
403    fn is_closed(&self) -> bool {
404        self.is_closed()
405    }
406
407    fn capacity(&self) -> usize {
408        self.capacity()
409    }
410
411    #[cfg(feature = "ipc")]
412    fn remote_mailbox(&self) -> Option<RemoteMailbox> {
413        self.remote_addressable()
414    }
415}
416
417impl<A, M, EP> Sender<M, EP> for Address<A>
418where
419    A: Actor,
420    M: Message + IntoEnvelope<A, EP> + FromEnvelope<A, EP>,
421    EP: 'static,
422{
423    fn send(&self, msg: M) -> SendResultFuture<'_, M> {
424        self.send(msg).boxed()
425    }
426
427    fn do_send(&self, msg: M) -> DoSendResultFuture<'_, M> {
428        self.do_send(msg).boxed()
429    }
430
431    fn try_send(&self, msg: M) -> SendResult<M> {
432        self.try_send(msg)
433    }
434
435    fn try_do_send(&self, msg: M) -> DoSendResult<M> {
436        self.try_do_send(msg)
437    }
438
439    fn send_timeout(&self, msg: M, timeout: Duration) -> SendResultFuture<'_, M> {
440        self.send_timeout(msg, timeout).boxed()
441    }
442
443    fn do_send_timeout(&self, msg: M, timeout: Duration) -> DoSendResultFuture<'_, M> {
444        self.do_send_timeout(msg, timeout).boxed()
445    }
446
447    fn blocking_send(&self, msg: M) -> SendResult<M> {
448        self.blocking_send(msg)
449    }
450
451    fn blocking_do_send(&self, msg: M) -> DoSendResult<M> {
452        self.blocking_do_send(msg)
453    }
454}
455
456impl<A> From<LocalAddress<A>> for Address<A>
457where
458    A: Actor,
459{
460    fn from(addr: LocalAddress<A>) -> Self {
461        Self(Inner::Local(addr))
462    }
463}
464
465#[cfg(feature = "ipc")]
466impl<A> From<RemoteAddress> for Address<A>
467where
468    A: Actor,
469{
470    fn from(addr: RemoteAddress) -> Self {
471        Self(Inner::Remote(addr))
472    }
473}
474
475impl<A, M, EP> From<Address<A>> for Recipient<M, EP>
476where
477    A: Actor,
478    M: Message + IntoEnvelope<A, EP> + FromEnvelope<A, EP>,
479    EP: 'static,
480{
481    fn from(addr: Address<A>) -> Self {
482        Self::new(Arc::new(addr))
483    }
484}
485
486#[cfg(test)]
487mod tests {
488    use anyhow::{Context as _, Result};
489    use pretty_assertions::{assert_eq, assert_ne};
490    use tokio::time::{self, Duration};
491
492    #[cfg(feature = "ipc")]
493    use super::SenderInfo;
494    use crate::test_utils::{Ping, hash_of, make_address};
495
496    #[tokio::test]
497    async fn test_address() -> Result<()> {
498        // index is unique
499        let (a1, _) = make_address(4);
500        let (a2, _) = make_address(4);
501        assert_ne!(a1, a2);
502        assert_ne!(a1.index(), a2.index());
503        #[cfg(feature = "ipc")]
504        assert!(!a1.is_remote());
505
506        // clone + eq + hash
507        let (a1, m1) = make_address(4);
508        let clone = a1.clone();
509        assert_eq!(a1, clone);
510        assert_eq!(a1.index(), clone.index());
511        assert_eq!(hash_of(&a1), hash_of(&clone));
512
513        // capacity + is_closed + closed
514        assert_eq!(a1.capacity(), 4);
515        assert!(!a1.is_closed());
516        drop(m1);
517        assert!(a1.is_closed());
518        time::timeout(Duration::from_millis(500), a1.closed())
519            .await
520            .context("closed() should resolve after mailbox drop")?;
521
522        // send functions
523        let (a1, m1) = make_address(8);
524        a1.send(Ping(1)).await?;
525        a1.do_send(Ping(2)).await?;
526        a1.try_send(Ping(3))?;
527        a1.try_do_send(Ping(4))?;
528        a1.send_timeout(Ping(5), Duration::from_millis(100)).await?;
529        a1.do_send_timeout(Ping(6), Duration::from_millis(100))
530            .await?;
531        tokio::task::spawn_blocking(move || -> Result<()> {
532            a1.blocking_send(Ping(7))?;
533            a1.blocking_do_send(Ping(8))?;
534            Ok(())
535        })
536        .await??;
537        assert_eq!(m1.len(), 8);
538
539        Ok(())
540    }
541
542    #[cfg(feature = "ipc")]
543    #[tokio::test]
544    async fn test_remote_address() -> Result<()> {
545        use std::num::NonZeroU64;
546
547        use crate::actor::ActorId;
548        use crate::error::SendError;
549        use crate::test_utils::{Dummy, DummyProxy};
550
551        let proxy = DummyProxy::new();
552        let address = super::Address::<Dummy>::new_remote(7, proxy.clone());
553
554        // index reflects the remote (local_index, proxy_index) pair
555        assert_eq!(
556            address.index(),
557            ActorId::new_remote(7, NonZeroU64::new(42).unwrap())
558        );
559        assert!(address.is_remote());
560
561        // clone + eq + hash
562        let clone = address.clone();
563        assert_eq!(address, clone);
564        assert_eq!(address.index(), clone.index());
565        assert_eq!(hash_of(&address), hash_of(&clone));
566
567        // capacity + is_closed + closed
568        assert_eq!(address.capacity(), usize::MAX);
569        assert!(!address.is_closed());
570        time::timeout(Duration::from_millis(500), address.closed()).await?;
571
572        // send functions
573        address.send(Ping(1)).await?.await?;
574        address.do_send(Ping(2)).await?;
575        address.try_send(Ping(3))?.await?;
576        address.try_do_send(Ping(4))?;
577        address
578            .send_timeout(Ping(5), Duration::from_millis(100))
579            .await?
580            .await?;
581        address
582            .do_send_timeout(Ping(6), Duration::from_millis(100))
583            .await?;
584        let address = tokio::task::spawn_blocking(move || -> Result<_> {
585            address.blocking_send(Ping(7))?.blocking_recv()?;
586            address.blocking_do_send(Ping(8))?;
587            Ok(address)
588        })
589        .await??;
590
591        // reserve* are not supported on remote addresses.
592        assert!(matches!(address.reserve().await, Err(SendError::Other(..))));
593        assert!(matches!(address.try_reserve(), Err(SendError::Other(..))));
594        assert!(matches!(
595            address.reserve_owned().await,
596            Err(SendError::Other(..))
597        ));
598        assert!(matches!(
599            address.try_reserve_owned(),
600            Err(SendError::Other(..))
601        ));
602
603        // remote addresses do not expose a `RemoteMailbox`.
604        assert!(address.remote_addressable().is_none());
605
606        Ok(())
607    }
608
609    #[test]
610    fn test_debug_fmt() {
611        let (address, _) = make_address(4);
612        assert_eq!(
613            format!("{:?}", address),
614            format!("Address<Dummy>({})", address.index())
615        );
616    }
617}