Skip to main content

atomr_core/actor/
inbox.rs

1//! Inbox — a receive-from-the-outside handle into a synthetic actor.
2
3use std::sync::Weak;
4use std::time::Duration;
5
6use tokio::sync::mpsc;
7
8use super::actor_ref::{ActorRef, AskError};
9use super::address::Address;
10use super::path::ActorPath;
11use super::traits::MessageEnvelope;
12
13/// A one-shot-style receive handle useful in tests and top-level `main` code.
14pub struct Inbox<M: Send + 'static> {
15    rx: mpsc::UnboundedReceiver<MessageEnvelope<M>>,
16    actor_ref: ActorRef<M>,
17}
18
19impl<M: Send + 'static> Inbox<M> {
20    pub fn new(name: &str) -> Self {
21        let (user_tx, rx) = mpsc::unbounded_channel::<MessageEnvelope<M>>();
22        let (sys_tx, _sys_rx) = mpsc::unbounded_channel();
23        let path = ActorPath::root(Address::local("Inbox")).child(name);
24        let actor_ref = ActorRef::new(path, user_tx, sys_tx, Weak::new());
25        Self { rx, actor_ref }
26    }
27
28    pub fn actor_ref(&self) -> &ActorRef<M> {
29        &self.actor_ref
30    }
31
32    pub async fn receive(&mut self, timeout: Duration) -> Result<M, AskError> {
33        match tokio::time::timeout(timeout, self.rx.recv()).await {
34            Ok(Some(env)) => Ok(env.message),
35            Ok(None) => Err(AskError::TargetDropped),
36            Err(_) => Err(AskError::Timeout),
37        }
38    }
39}
40
41#[cfg(test)]
42mod tests {
43    use super::*;
44
45    #[tokio::test]
46    async fn inbox_receives_tell() {
47        let mut inbox = Inbox::<u32>::new("t");
48        inbox.actor_ref().tell(7);
49        let m = inbox.receive(Duration::from_millis(100)).await.unwrap();
50        assert_eq!(m, 7);
51    }
52}