mod address_impl;
pub use address_impl::Address;
mod permit;
pub use permit::{OwnedSendPermit, SendPermit};
mod sender;
pub use sender::{
ClosedResultFuture, DoSendResult, DoSendResultFuture, SendResult, SendResultFuture, Sender,
SenderId,
};
mod recipient;
pub use recipient::Recipient;
mod mailbox;
pub use mailbox::Mailbox;
#[cfg(test)]
mod tests {
use std::collections::HashSet;
use std::time::Duration;
use pretty_assertions::assert_eq;
use super::*;
use crate::actor::Actor;
use crate::channel::mpsc;
use crate::context::Context;
use crate::envelope::Envelope;
use crate::errors::SendError;
use crate::message::{Handler, Message};
#[derive(Debug)]
struct Dummy;
impl Actor for Dummy {
type Context = Context<Self>;
type Error = anyhow::Error;
}
#[derive(Debug)]
struct Ping(u32);
impl Message for Ping {
type Result = ();
}
impl Handler<Ping> for Dummy {
type Result = ();
async fn handle(&mut self, _msg: Ping, _ctx: &mut Self::Context) -> Self::Result {}
}
fn make_address(capacity: usize) -> (Address<Dummy>, Mailbox<Dummy>) {
let (tx, rx) = mpsc::channel::<Envelope<Dummy>>(capacity);
(Address::new(tx), Mailbox::new(rx))
}
#[tokio::test]
async fn test_address() {
let (a1, _) = make_address(4);
let clone = a1.clone();
assert_eq!(a1, clone);
assert_eq!(a1.index(), clone.index());
#[allow(clippy::mutable_key_type)]
let mut map = HashSet::new();
map.insert(a1);
map.insert(clone);
assert_eq!(
map.len(),
1,
"clones should have the same hash and be equal"
);
let (a1, m1) = make_address(4);
let (a2, _) = make_address(4);
assert_ne!(a1, a2);
assert_ne!(a1.index(), a2.index());
assert_eq!(a1.capacity(), 4);
assert!(!a1.is_closed());
let closed = a1.closed();
drop(m1);
assert!(a1.is_closed());
tokio::time::timeout(Duration::from_secs(1), closed)
.await
.expect("closed() did not resolve after mailbox drop");
let (a1, mut m1) = make_address(1);
a1.do_send(Ping(42)).await.expect("do_send should succeed");
assert_eq!(m1.len(), 1);
m1.recv().await.expect("mailbox should receive an envelope");
a1.try_do_send(Ping(1)).expect("first message fits");
match a1.try_do_send(Ping(2)) {
Err(SendError::Full(_)) => {}
other => panic!("expected Full, got {other:?}"),
}
match a1.do_send_timeout(Ping(3), Duration::from_millis(10)).await {
Err(SendError::Timeout(_)) => {}
other => panic!("expected Timeout, got {other:?}"),
}
let (a1, m1) = make_address(4);
drop(m1);
match a1.try_do_send(Ping(1)) {
Err(SendError::Closed(_)) => {}
other => panic!("expected Closed, got {other:?}"),
}
}
#[tokio::test]
async fn test_mailbox() {
let (_tx, rx) = mpsc::channel::<Envelope<Dummy>>(4);
let mut m1 = Mailbox::<Dummy>::new(rx);
assert!(m1.is_empty());
assert_eq!(m1.len(), 0);
assert_eq!(m1.capacity(), 4);
assert_eq!(m1.max_capacity(), 4);
assert!(m1.try_recv().is_err());
assert!(!m1.is_closed());
let (a1, m1) = make_address(4);
a1.try_do_send(Ping(1)).unwrap();
a1.try_do_send(Ping(2)).unwrap();
assert_eq!(m1.len(), 2);
assert!(!m1.is_empty());
let (a1, mut m1) = make_address(4);
assert!(!a1.is_closed());
m1.close();
assert!(a1.is_closed());
match a1.try_do_send(Ping(1)) {
Err(SendError::Closed(_)) => {}
other => panic!("expected Closed, got {other:?}"),
}
}
#[tokio::test]
async fn test_recipient() {
let (recipient, mut rx) = Recipient::<Ping>::create(4);
recipient.do_send(Ping(7)).await.expect("do_send");
let msg = rx.recv().await.expect("recv");
assert_eq!(msg.0, 7);
let clone = recipient.clone();
assert_eq!(recipient, clone);
assert_eq!(recipient.index(), clone.index());
let (recipient, rx) = Recipient::<Ping>::create(2);
let _ = recipient.try_send(Ping(1)).expect("first fits").await;
recipient.try_do_send(Ping(2)).expect("second fits");
match recipient.try_do_send(Ping(3)) {
Err(SendError::Full(_)) => {}
other => panic!("expected Full, got {other:?}"),
}
drop(rx);
match recipient.try_do_send(Ping(3)) {
Err(SendError::Closed(_)) => {}
other => panic!("expected Closed, got {other:?}"),
}
let (recipient, _rx) = Recipient::<Ping>::create(2);
let _ = recipient
.send_timeout(Ping(1), Duration::from_millis(10))
.await
.expect("first fits")
.await;
recipient
.do_send_timeout(Ping(2), Duration::from_millis(10))
.await
.expect("second fits");
match recipient
.do_send_timeout(Ping(3), Duration::from_millis(10))
.await
{
Err(SendError::Timeout(_)) => {}
other => panic!("expected Timeout, got {other:?}"),
}
let (a1, _) = make_address(4);
let index = a1.index();
let recipient: Recipient<Ping> = a1.into();
assert_eq!(recipient.index(), index);
}
#[tokio::test]
async fn test_permits() {
let (a1, mut m1) = make_address(2);
let permit = a1.reserve().await.expect("reserve");
permit.do_send(Ping(1));
assert_eq!(m1.len(), 1);
m1.recv().await.expect("envelope delivered");
let (a1, _m1) = make_address(2);
let p1 = a1.reserve().await.expect("first reserve");
let p2 = a1.reserve().await.expect("second reserve");
match a1.try_reserve() {
Err(SendError::Full(())) => {}
other => panic!("expected Full, got {other:?}"),
}
drop(p1);
a1.try_reserve().expect("slot freed after permit drop");
p2.send(Ping(2));
let (a1, m1) = make_address(4);
drop(m1);
match a1.try_reserve() {
Err(SendError::Closed(())) => {}
other => panic!("expected Closed, got {other:?}"),
}
let (a1, mut m1) = make_address(2);
let permit = a1.reserve_owned().await.expect("reserve_owned");
permit.do_send(Ping(2));
assert_eq!(m1.len(), 1);
m1.recv().await.expect("envelope delivered");
let (a1, m1) = make_address(1);
let owned = a1.try_reserve_owned().expect("first reserve");
match a1.try_reserve_owned() {
Err(SendError::Full(())) => {}
other => panic!("expected Full, got {other:?}"),
}
owned.send(Ping(3));
drop(m1);
match a1.try_reserve_owned() {
Err(SendError::Closed(())) => {}
other => panic!("expected Closed, got {other:?}"),
}
}
}