atomr_core/actor/
inbox.rs1use std::sync::Weak;
5use std::time::Duration;
6
7use tokio::sync::mpsc;
8
9use super::actor_ref::{ActorRef, AskError};
10use super::address::Address;
11use super::path::ActorPath;
12use super::traits::MessageEnvelope;
13
14pub struct Inbox<M: Send + 'static> {
16 rx: mpsc::UnboundedReceiver<MessageEnvelope<M>>,
17 actor_ref: ActorRef<M>,
18}
19
20impl<M: Send + 'static> Inbox<M> {
21 pub fn new(name: &str) -> Self {
22 let (user_tx, rx) = mpsc::unbounded_channel::<MessageEnvelope<M>>();
23 let (sys_tx, _sys_rx) = mpsc::unbounded_channel();
24 let path = ActorPath::root(Address::local("Inbox")).child(name);
25 let actor_ref = ActorRef::new(path, user_tx, sys_tx, Weak::new());
26 Self { rx, actor_ref }
27 }
28
29 pub fn actor_ref(&self) -> &ActorRef<M> {
30 &self.actor_ref
31 }
32
33 pub async fn receive(&mut self, timeout: Duration) -> Result<M, AskError> {
34 match tokio::time::timeout(timeout, self.rx.recv()).await {
35 Ok(Some(env)) => Ok(env.message),
36 Ok(None) => Err(AskError::TargetDropped),
37 Err(_) => Err(AskError::Timeout),
38 }
39 }
40}
41
42#[cfg(test)]
43mod tests {
44 use super::*;
45
46 #[tokio::test]
47 async fn inbox_receives_tell() {
48 let mut inbox = Inbox::<u32>::new("t");
49 inbox.actor_ref().tell(7);
50 let m = inbox.receive(Duration::from_millis(100)).await.unwrap();
51 assert_eq!(m, 7);
52 }
53}