mod address_impl;
pub use address_impl::Address;
mod permit;
pub use permit::{OwnedSendPermit, SendPermit};
mod sender;
pub use sender::{
ClosedResultFuture, DoSendResult, DoSendResultFuture, SendResult, SendResultFuture, Sender,
SenderId,
};
mod recipient;
pub use recipient::Recipient;
mod mailbox;
pub use mailbox::Mailbox;
#[cfg(test)]
mod tests {
use std::collections::HashSet;
use anyhow::{Context as _, Result};
use pretty_assertions::assert_eq;
use tokio::time::{Duration, timeout};
use super::*;
use crate::channel::mpsc;
use crate::envelope::Envelope;
use crate::errors::SendError;
use crate::test_utils::{Dummy, Ping};
fn make_address(capacity: usize) -> (Address<Dummy>, Mailbox<Dummy>) {
let (tx, rx) = mpsc::channel::<Envelope<Dummy>>(capacity);
(Address::new(tx), Mailbox::new(rx))
}
#[tokio::test]
async fn test_address() -> Result<()> {
let (a1, _m1) = make_address(4);
let clone = a1.clone();
assert_eq!(a1, clone);
assert_eq!(a1.index(), clone.index());
#[allow(clippy::mutable_key_type)]
let mut map = HashSet::new();
map.insert(a1);
map.insert(clone);
assert_eq!(map.len(), 1, "clones should have the same hash and equal");
let (a1, m1) = make_address(4);
let (a2, _) = make_address(4);
assert_ne!(a1, a2);
assert_ne!(a1.index(), a2.index());
assert_eq!(a1.capacity(), 4);
assert!(!a1.is_closed());
let closed = a1.closed();
drop(m1);
assert!(a1.is_closed());
timeout(Duration::from_millis(500), closed)
.await
.context("closed() should resolve after mailbox drop")?;
let (a1, m1) = make_address(1);
a1.do_send(Ping(1)).await?;
assert_eq!(m1.len(), 1);
let (a1, m1) = make_address(1);
a1.send(Ping(2)).await?;
assert_eq!(m1.len(), 1);
let (a1, m1) = make_address(1);
a1.try_do_send(Ping(3))?;
assert_eq!(m1.len(), 1);
let result = a1.try_do_send(Ping(4));
assert!(
matches!(result, Err(SendError::Full(_))),
"expected Full, got {result:?}"
);
drop(m1);
let result = a1.try_do_send(Ping(5));
assert!(
matches!(result, Err(SendError::Closed(_))),
"expected Closed, got {result:?}"
);
let (a1, m1) = make_address(1);
a1.try_send(Ping(6))?;
assert_eq!(m1.len(), 1);
let result = a1.try_send(Ping(7));
assert!(
matches!(result, Err(SendError::Full(_))),
"expected Full, got {result:?}"
);
drop(m1);
let result = a1.try_send(Ping(8));
assert!(
matches!(result, Err(SendError::Closed(_))),
"expected Closed, got {result:?}"
);
let (a1, m1) = make_address(1);
a1.do_send_timeout(Ping(9), Duration::from_millis(10))
.await?;
assert_eq!(m1.len(), 1);
let result = a1
.do_send_timeout(Ping(10), Duration::from_millis(10))
.await;
assert!(
matches!(result, Err(SendError::Timeout(_))),
"expected Timeout, got {result:?}"
);
drop(m1);
let result = a1
.do_send_timeout(Ping(11), Duration::from_millis(10))
.await;
assert!(
matches!(result, Err(SendError::Closed(_))),
"expected Closed, got {result:?}"
);
let (a1, m1) = make_address(1);
a1.send_timeout(Ping(12), Duration::from_millis(10)).await?;
assert_eq!(m1.len(), 1);
let result = a1.send_timeout(Ping(13), Duration::from_millis(10)).await;
assert!(
matches!(result, Err(SendError::Timeout(_))),
"expected Timeout, got {result:?}"
);
drop(m1);
let result = a1.send_timeout(Ping(14), Duration::from_millis(10)).await;
assert!(
matches!(result, Err(SendError::Closed(_))),
"expected Closed, got {result:?}"
);
let (a1, m1) = make_address(1);
tokio::task::spawn_blocking(move || -> Result<()> {
a1.blocking_do_send(Ping(15))?;
assert_eq!(m1.len(), 1);
Ok(())
})
.await??;
let (a1, m1) = make_address(1);
tokio::task::spawn_blocking(move || -> Result<()> {
a1.blocking_send(Ping(16))?;
assert_eq!(m1.len(), 1);
Ok(())
})
.await??;
Ok(())
}
#[tokio::test]
async fn test_mailbox() -> Result<()> {
let (a1, mut m1) = make_address(4);
assert!(m1.is_empty());
assert_eq!(m1.len(), 0);
assert_eq!(m1.capacity(), 4);
assert_eq!(m1.max_capacity(), 4);
assert!(m1.try_recv().is_err());
assert!(!m1.is_closed());
a1.try_do_send(Ping(1))?;
a1.try_do_send(Ping(2))?;
assert_eq!(m1.len(), 2);
assert!(!m1.is_empty());
assert!(!a1.is_closed());
m1.close();
assert!(a1.is_closed());
let result = a1.try_do_send(Ping(3));
assert!(
matches!(result, Err(SendError::Closed(_))),
"expected Closed, got {result:?}"
);
Ok(())
}
#[tokio::test]
async fn test_recipient() -> Result<()> {
let (recipient, mut rx) = Recipient::<Ping>::create(4);
recipient.do_send(Ping(1)).await?;
let msg = rx.recv().await?;
assert_eq!(msg.0, 1);
let clone = recipient.clone();
assert_eq!(recipient, clone);
assert_eq!(recipient.index(), clone.index());
assert!(!recipient.is_closed());
assert_eq!(recipient.capacity(), 4);
drop(rx);
assert!(recipient.is_closed());
timeout(Duration::from_millis(500), recipient.closed())
.await
.context("closed() should resolve after receiver drop")?;
let (recipient, rx) = Recipient::<Ping>::create(8);
recipient.send(Ping(2)).await?;
recipient.do_send(Ping(3)).await?;
recipient.try_send(Ping(4))?;
recipient.try_do_send(Ping(5))?;
recipient
.send_timeout(Ping(6), Duration::from_millis(10))
.await?;
recipient
.do_send_timeout(Ping(7), Duration::from_millis(10))
.await?;
tokio::task::spawn_blocking(move || -> Result<()> {
recipient.blocking_send(Ping(8))?;
recipient.blocking_do_send(Ping(9))?;
Ok(())
})
.await??;
assert_eq!(rx.len(), 8);
let (a1, m1) = make_address(8);
let index = a1.index();
let recipient: Recipient<Ping> = a1.into();
assert_eq!(recipient.index(), index);
let clone = recipient.clone();
assert_eq!(recipient, clone);
assert_eq!(recipient.index(), clone.index());
recipient.send(Ping(10)).await?;
recipient.do_send(Ping(11)).await?;
recipient.try_send(Ping(12))?;
recipient.try_do_send(Ping(13))?;
recipient
.send_timeout(Ping(14), Duration::from_millis(10))
.await?;
recipient
.do_send_timeout(Ping(15), Duration::from_millis(10))
.await?;
tokio::task::spawn_blocking(move || -> Result<()> {
recipient.blocking_send(Ping(16))?;
recipient.blocking_do_send(Ping(17))?;
Ok(())
})
.await??;
assert_eq!(m1.len(), 8);
assert!(!clone.is_closed());
drop(m1);
assert!(clone.is_closed());
timeout(Duration::from_millis(100), clone.closed())
.await
.context("closed() should resolve after mailbox drop")?;
Ok(())
}
#[tokio::test]
async fn test_permits() -> Result<()> {
let (a1, m1) = make_address(2);
let permit = a1.reserve().await?;
permit.do_send(Ping(1));
let permit = a1.try_reserve()?;
permit.send(Ping(2));
assert_eq!(m1.len(), 2);
let (a1, m1) = make_address(2);
let p1 = a1.reserve().await?;
let _p2 = a1.reserve().await?;
let result = a1.try_reserve();
assert!(
matches!(result, Err(SendError::Full(_))),
"expected Full, got {result:?}"
);
drop(p1);
let _ = a1.try_reserve()?;
drop(m1);
let result = a1.try_reserve();
assert!(
matches!(result, Err(SendError::Closed(_))),
"expected Closed, got {result:?}"
);
let (a1, m1) = make_address(2);
let permit = a1.reserve_owned().await?;
permit.do_send(Ping(2));
let permit = a1.try_reserve_owned()?;
permit.send(Ping(3));
assert_eq!(m1.len(), 2);
let (a1, m1) = make_address(2);
let p1 = a1.reserve_owned().await?;
let _p2 = a1.reserve_owned().await?;
let result = a1.try_reserve_owned();
assert!(
matches!(result, Err(SendError::Full(_))),
"expected Full, got {result:?}"
);
drop(p1);
let _ = a1.try_reserve_owned()?;
drop(m1);
let result = a1.try_reserve_owned();
assert!(
matches!(result, Err(SendError::Closed(_))),
"expected Closed, got {result:?}"
);
Ok(())
}
#[test]
fn test_sender() {
let sender_id = u64::MAX;
assert_eq!(sender_id.index(), u64::MAX);
#[cfg(feature = "ipc")]
assert_eq!(sender_id.is_remote(), true);
}
#[test]
fn test_debug_fmt() {
let (address, mailbox) = make_address(4);
assert_eq!(
format!("{address:?}"),
format!("Address<Dummy>({})", address.index())
);
assert_eq!(format!("{mailbox:?}"), "Mailbox<Dummy>");
let (recipient, _rx) = Recipient::<Ping>::create(4);
assert_eq!(
format!("{recipient:?}"),
format!("Recipient<Ping>({})", recipient.index())
);
}
}