1use std::fmt::{self, Debug};
12use std::hash::{Hash, Hasher};
13use std::sync::{
14 Arc,
15 atomic::{AtomicU64, Ordering},
16};
17
18use futures_util::FutureExt;
19use tokio::time::Duration;
20
21use crate::actor::{Actor, ActorId};
22use crate::channel::mpsc;
23use crate::envelope::{Envelope, FromEnvelope, IntoEnvelope};
24use crate::error::SendError;
25use crate::message::Message;
26use crate::utils::ShortName;
27#[cfg(feature = "ipc")]
28use crate::{actor::RemoteAddressable, message::BinaryMessage};
29
30mod local;
31use local::LocalAddress;
32
33#[cfg(feature = "ipc")]
34mod remote;
35#[cfg(feature = "ipc")]
36use remote::RemoteAddress;
37#[cfg(feature = "ipc")]
38#[cfg_attr(docsrs, doc(cfg(feature = "ipc")))]
39pub use remote::RemoteProxy;
40
41mod permit;
42pub use permit::{OwnedSendPermit, SendPermit};
43
44mod sender;
45pub use sender::{
46 DoSendResult, DoSendResultFuture, EmptyFuture, SendResult, SendResultFuture, Sender, SenderInfo,
47};
48
49mod recipient;
50pub use recipient::Recipient;
51
52mod mailbox;
53pub use mailbox::Mailbox;
54
55static INDEX_GENERATOR: AtomicU64 = AtomicU64::new(0);
56
57#[inline]
58pub(crate) fn next_actor_id() -> u64 {
59 INDEX_GENERATOR.fetch_add(1, Ordering::Relaxed)
60}
61
62#[cfg(feature = "ipc")]
67#[cfg_attr(docsrs, doc(cfg(feature = "ipc")))]
68pub type RemoteMailbox = Recipient<BinaryMessage>;
69
70enum Inner<A>
71where
72 A: Actor,
73{
74 Local(LocalAddress<A>),
75 #[cfg(feature = "ipc")]
76 Remote(RemoteAddress),
77}
78
79impl<A> Clone for Inner<A>
80where
81 A: Actor,
82{
83 fn clone(&self) -> Self {
84 match self {
85 Self::Local(address) => Self::Local(address.clone()),
86 #[cfg(feature = "ipc")]
87 Self::Remote(address) => Self::Remote(address.clone()),
88 }
89 }
90}
91
92#[repr(transparent)]
96pub struct Address<A>(Inner<A>)
97where
98 A: Actor;
99
100impl<A> Debug for Address<A>
101where
102 A: Actor,
103{
104 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
105 f.debug_tuple(&ShortName::of::<Self>().to_string())
106 .field(&self.index())
107 .finish()
108 }
109}
110
111impl<A> Clone for Address<A>
112where
113 A: Actor,
114{
115 fn clone(&self) -> Self {
116 Self(self.0.clone())
117 }
118}
119
120impl<A> PartialEq for Address<A>
121where
122 A: Actor,
123{
124 fn eq(&self, other: &Self) -> bool {
125 self.index().eq(&other.index())
126 }
127}
128
129impl<A> Eq for Address<A> where A: Actor {}
130
131impl<A> Hash for Address<A>
132where
133 A: Actor,
134{
135 fn hash<H>(&self, state: &mut H)
136 where
137 H: Hasher,
138 {
139 self.index().hash(state);
140 }
141}
142
143impl<A> Address<A>
144where
145 A: Actor,
146{
147 pub fn new(tx: mpsc::Sender<Envelope<A>>) -> Self {
149 Self(Inner::Local(LocalAddress::new(tx)))
150 }
151
152 #[cfg(feature = "ipc")]
157 #[cfg_attr(docsrs, doc(cfg(feature = "ipc")))]
158 pub fn new_remote(index: u64, proxy: Arc<dyn RemoteProxy + Send + Sync>) -> Self
159 where
160 A: RemoteAddressable,
161 {
162 Self(Inner::Remote(RemoteAddress::new(
163 index,
164 A::codec_table(),
165 proxy,
166 )))
167 }
168
169 pub const fn index(&self) -> ActorId {
171 match &self.0 {
172 Inner::Local(address) => address.index(),
173 #[cfg(feature = "ipc")]
174 Inner::Remote(address) => address.index(),
175 }
176 }
177
178 pub async fn closed(&self) {
180 match &self.0 {
181 Inner::Local(address) => address.closed().await,
182 #[cfg(feature = "ipc")]
183 Inner::Remote(address) => address.closed().await,
184 }
185 }
186
187 pub fn is_closed(&self) -> bool {
189 match &self.0 {
190 Inner::Local(address) => address.is_closed(),
191 #[cfg(feature = "ipc")]
192 Inner::Remote(address) => address.is_closed(),
193 }
194 }
195
196 pub fn capacity(&self) -> usize {
198 match &self.0 {
199 Inner::Local(address) => address.capacity(),
200 #[cfg(feature = "ipc")]
201 Inner::Remote(address) => address.capacity(),
202 }
203 }
204
205 pub async fn send<M, EP>(&self, msg: M) -> SendResult<M>
209 where
210 M: Message + IntoEnvelope<A, EP> + FromEnvelope<A, EP>,
211 {
212 match &self.0 {
213 Inner::Local(address) => address.send(msg).await,
214 #[cfg(feature = "ipc")]
215 Inner::Remote(address) => address.send(msg).await,
216 }
217 }
218
219 pub async fn do_send<M, EP>(&self, msg: M) -> DoSendResult<M>
221 where
222 M: Message + IntoEnvelope<A, EP> + FromEnvelope<A, EP>,
223 {
224 match &self.0 {
225 Inner::Local(address) => address.do_send(msg).await,
226 #[cfg(feature = "ipc")]
227 Inner::Remote(address) => address.do_send(msg).await,
228 }
229 }
230
231 pub fn try_send<M, EP>(&self, msg: M) -> SendResult<M>
235 where
236 M: Message + IntoEnvelope<A, EP> + FromEnvelope<A, EP>,
237 {
238 match &self.0 {
239 Inner::Local(address) => address.try_send(msg),
240 #[cfg(feature = "ipc")]
241 Inner::Remote(address) => address.try_send(msg),
242 }
243 }
244
245 pub fn try_do_send<M, EP>(&self, msg: M) -> DoSendResult<M>
247 where
248 M: Message + IntoEnvelope<A, EP> + FromEnvelope<A, EP>,
249 {
250 match &self.0 {
251 Inner::Local(address) => address.try_do_send(msg),
252 #[cfg(feature = "ipc")]
253 Inner::Remote(address) => address.try_do_send(msg),
254 }
255 }
256
257 pub async fn send_timeout<M, EP>(&self, msg: M, timeout: Duration) -> SendResult<M>
261 where
262 M: Message + IntoEnvelope<A, EP> + FromEnvelope<A, EP>,
263 {
264 match &self.0 {
265 Inner::Local(address) => address.send_timeout(msg, timeout).await,
266 #[cfg(feature = "ipc")]
267 Inner::Remote(address) => address.send_timeout(msg, timeout).await,
268 }
269 }
270
271 pub async fn do_send_timeout<M, EP>(&self, msg: M, timeout: Duration) -> DoSendResult<M>
274 where
275 M: Message + IntoEnvelope<A, EP> + FromEnvelope<A, EP>,
276 {
277 match &self.0 {
278 Inner::Local(address) => address.do_send_timeout(msg, timeout).await,
279 #[cfg(feature = "ipc")]
280 Inner::Remote(address) => address.do_send_timeout(msg, timeout).await,
281 }
282 }
283
284 pub fn blocking_send<M, EP>(&self, msg: M) -> SendResult<M>
293 where
294 M: Message + IntoEnvelope<A, EP> + FromEnvelope<A, EP>,
295 {
296 match &self.0 {
297 Inner::Local(address) => address.blocking_send(msg),
298 #[cfg(feature = "ipc")]
299 Inner::Remote(address) => address.blocking_send(msg),
300 }
301 }
302
303 pub fn blocking_do_send<M, EP>(&self, msg: M) -> DoSendResult<M>
312 where
313 M: Message + IntoEnvelope<A, EP> + FromEnvelope<A, EP>,
314 {
315 match &self.0 {
316 Inner::Local(address) => address.blocking_do_send(msg),
317 #[cfg(feature = "ipc")]
318 Inner::Remote(address) => address.blocking_do_send(msg),
319 }
320 }
321
322 pub async fn reserve(&self) -> Result<SendPermit<'_, A>, SendError<()>> {
326 match &self.0 {
327 Inner::Local(address) => address.reserve().await,
328 #[cfg(feature = "ipc")]
329 Inner::Remote(_) => Err(SendError::other(
330 "remote address does not support reserve",
331 (),
332 )),
333 }
334 }
335
336 pub fn try_reserve(&self) -> Result<SendPermit<'_, A>, SendError<()>> {
340 match &self.0 {
341 Inner::Local(address) => address.try_reserve(),
342 #[cfg(feature = "ipc")]
343 Inner::Remote(_) => Err(SendError::other(
344 "remote address does not support reserve",
345 (),
346 )),
347 }
348 }
349
350 pub async fn reserve_owned(&self) -> Result<OwnedSendPermit<A>, SendError<()>> {
354 match &self.0 {
355 Inner::Local(address) => address.reserve_owned().await,
356 #[cfg(feature = "ipc")]
357 Inner::Remote(_) => Err(SendError::other(
358 "remote address does not support reserve",
359 (),
360 )),
361 }
362 }
363
364 pub fn try_reserve_owned(&self) -> Result<OwnedSendPermit<A>, SendError<()>> {
368 match &self.0 {
369 Inner::Local(address) => address.try_reserve_owned(),
370 #[cfg(feature = "ipc")]
371 Inner::Remote(_) => Err(SendError::other(
372 "remote address does not support reserve",
373 (),
374 )),
375 }
376 }
377
378 #[cfg(feature = "ipc")]
381 #[cfg_attr(docsrs, doc(cfg(feature = "ipc")))]
382 fn remote_addressable(&self) -> Option<RemoteMailbox> {
383 match &self.0 {
384 Inner::Local(_) => A::remote_mailbox(self.clone()),
385 #[cfg(feature = "ipc")]
386 Inner::Remote(_) => None,
387 }
388 }
389}
390
391impl<A> SenderInfo for Address<A>
392where
393 A: Actor,
394{
395 fn index(&self) -> ActorId {
396 self.index()
397 }
398
399 fn closed(&self) -> EmptyFuture<'_> {
400 self.closed().boxed()
401 }
402
403 fn is_closed(&self) -> bool {
404 self.is_closed()
405 }
406
407 fn capacity(&self) -> usize {
408 self.capacity()
409 }
410
411 #[cfg(feature = "ipc")]
412 fn remote_mailbox(&self) -> Option<RemoteMailbox> {
413 self.remote_addressable()
414 }
415}
416
417impl<A, M, EP> Sender<M, EP> for Address<A>
418where
419 A: Actor,
420 M: Message + IntoEnvelope<A, EP> + FromEnvelope<A, EP>,
421 EP: 'static,
422{
423 fn send(&self, msg: M) -> SendResultFuture<'_, M> {
424 self.send(msg).boxed()
425 }
426
427 fn do_send(&self, msg: M) -> DoSendResultFuture<'_, M> {
428 self.do_send(msg).boxed()
429 }
430
431 fn try_send(&self, msg: M) -> SendResult<M> {
432 self.try_send(msg)
433 }
434
435 fn try_do_send(&self, msg: M) -> DoSendResult<M> {
436 self.try_do_send(msg)
437 }
438
439 fn send_timeout(&self, msg: M, timeout: Duration) -> SendResultFuture<'_, M> {
440 self.send_timeout(msg, timeout).boxed()
441 }
442
443 fn do_send_timeout(&self, msg: M, timeout: Duration) -> DoSendResultFuture<'_, M> {
444 self.do_send_timeout(msg, timeout).boxed()
445 }
446
447 fn blocking_send(&self, msg: M) -> SendResult<M> {
448 self.blocking_send(msg)
449 }
450
451 fn blocking_do_send(&self, msg: M) -> DoSendResult<M> {
452 self.blocking_do_send(msg)
453 }
454}
455
456impl<A> From<LocalAddress<A>> for Address<A>
457where
458 A: Actor,
459{
460 fn from(addr: LocalAddress<A>) -> Self {
461 Self(Inner::Local(addr))
462 }
463}
464
465#[cfg(feature = "ipc")]
466impl<A> From<RemoteAddress> for Address<A>
467where
468 A: Actor,
469{
470 fn from(addr: RemoteAddress) -> Self {
471 Self(Inner::Remote(addr))
472 }
473}
474
475impl<A, M, EP> From<Address<A>> for Recipient<M, EP>
476where
477 A: Actor,
478 M: Message + IntoEnvelope<A, EP> + FromEnvelope<A, EP>,
479 EP: 'static,
480{
481 fn from(addr: Address<A>) -> Self {
482 Self::new(Arc::new(addr))
483 }
484}
485
486#[cfg(test)]
487mod tests {
488 use anyhow::{Context as _, Result};
489 use pretty_assertions::{assert_eq, assert_ne};
490 use tokio::time::{self, Duration};
491
492 #[cfg(feature = "ipc")]
493 use super::SenderInfo;
494 use crate::test_utils::{Ping, hash_of, make_address};
495
496 #[tokio::test]
497 async fn test_address() -> Result<()> {
498 let (a1, _) = make_address(4);
500 let (a2, _) = make_address(4);
501 assert_ne!(a1, a2);
502 assert_ne!(a1.index(), a2.index());
503 #[cfg(feature = "ipc")]
504 assert!(!a1.is_remote());
505
506 let (a1, m1) = make_address(4);
508 let clone = a1.clone();
509 assert_eq!(a1, clone);
510 assert_eq!(a1.index(), clone.index());
511 assert_eq!(hash_of(&a1), hash_of(&clone));
512
513 assert_eq!(a1.capacity(), 4);
515 assert!(!a1.is_closed());
516 drop(m1);
517 assert!(a1.is_closed());
518 time::timeout(Duration::from_millis(500), a1.closed())
519 .await
520 .context("closed() should resolve after mailbox drop")?;
521
522 let (a1, m1) = make_address(8);
524 a1.send(Ping(1)).await?;
525 a1.do_send(Ping(2)).await?;
526 a1.try_send(Ping(3))?;
527 a1.try_do_send(Ping(4))?;
528 a1.send_timeout(Ping(5), Duration::from_millis(100)).await?;
529 a1.do_send_timeout(Ping(6), Duration::from_millis(100))
530 .await?;
531 tokio::task::spawn_blocking(move || -> Result<()> {
532 a1.blocking_send(Ping(7))?;
533 a1.blocking_do_send(Ping(8))?;
534 Ok(())
535 })
536 .await??;
537 assert_eq!(m1.len(), 8);
538
539 Ok(())
540 }
541
542 #[cfg(feature = "ipc")]
543 #[tokio::test]
544 async fn test_remote_address() -> Result<()> {
545 use std::num::NonZeroU64;
546
547 use crate::actor::ActorId;
548 use crate::error::SendError;
549 use crate::test_utils::{Dummy, DummyProxy};
550
551 let proxy = DummyProxy::new();
552 let address = super::Address::<Dummy>::new_remote(7, proxy.clone());
553
554 assert_eq!(
556 address.index(),
557 ActorId::new_remote(7, NonZeroU64::new(42).unwrap())
558 );
559 assert!(address.is_remote());
560
561 let clone = address.clone();
563 assert_eq!(address, clone);
564 assert_eq!(address.index(), clone.index());
565 assert_eq!(hash_of(&address), hash_of(&clone));
566
567 assert_eq!(address.capacity(), usize::MAX);
569 assert!(!address.is_closed());
570 time::timeout(Duration::from_millis(500), address.closed()).await?;
571
572 address.send(Ping(1)).await?.await?;
574 address.do_send(Ping(2)).await?;
575 address.try_send(Ping(3))?.await?;
576 address.try_do_send(Ping(4))?;
577 address
578 .send_timeout(Ping(5), Duration::from_millis(100))
579 .await?
580 .await?;
581 address
582 .do_send_timeout(Ping(6), Duration::from_millis(100))
583 .await?;
584 let address = tokio::task::spawn_blocking(move || -> Result<_> {
585 address.blocking_send(Ping(7))?.blocking_recv()?;
586 address.blocking_do_send(Ping(8))?;
587 Ok(address)
588 })
589 .await??;
590
591 assert!(matches!(address.reserve().await, Err(SendError::Other(..))));
593 assert!(matches!(address.try_reserve(), Err(SendError::Other(..))));
594 assert!(matches!(
595 address.reserve_owned().await,
596 Err(SendError::Other(..))
597 ));
598 assert!(matches!(
599 address.try_reserve_owned(),
600 Err(SendError::Other(..))
601 ));
602
603 assert!(address.remote_addressable().is_none());
605
606 Ok(())
607 }
608
609 #[test]
610 fn test_debug_fmt() {
611 let (address, _) = make_address(4);
612 assert_eq!(
613 format!("{:?}", address),
614 format!("Address<Dummy>({})", address.index())
615 );
616 }
617}