Skip to main content

starlang_runtime/
mailbox.rs

1//! Process mailbox for message delivery.
2//!
3//! Each process has a mailbox that receives messages from other processes.
4//! The mailbox uses an unbounded MPSC channel for message delivery.
5
6use std::time::Duration;
7use tokio::sync::mpsc;
8use tokio::time::timeout;
9
10/// A message that can be delivered to a process mailbox.
11///
12/// Messages are stored as raw bytes to support heterogeneous message types.
13#[derive(Debug, Clone)]
14pub struct Envelope {
15    /// The raw message bytes.
16    pub data: Vec<u8>,
17}
18
19impl Envelope {
20    /// Create a new envelope with the given data.
21    pub fn new(data: Vec<u8>) -> Self {
22        Self { data }
23    }
24}
25
26/// The receiving end of a process mailbox.
27///
28/// This is held by the process and used to receive messages.
29pub struct Mailbox {
30    rx: mpsc::UnboundedReceiver<Envelope>,
31}
32
33impl Mailbox {
34    /// Creates a new mailbox, returning the mailbox and its sender.
35    pub fn new() -> (Self, MailboxSender) {
36        let (tx, rx) = mpsc::unbounded_channel();
37        (Self { rx }, MailboxSender { tx })
38    }
39
40    /// Receives the next message, blocking until one is available.
41    ///
42    /// Returns `None` if all senders have been dropped.
43    pub async fn recv(&mut self) -> Option<Envelope> {
44        self.rx.recv().await
45    }
46
47    /// Receives the next message with a timeout.
48    ///
49    /// Returns `Ok(Some(envelope))` if a message was received,
50    /// `Ok(None)` if all senders were dropped,
51    /// or `Err(())` if the timeout elapsed.
52    pub async fn recv_timeout(&mut self, duration: Duration) -> Result<Option<Envelope>, ()> {
53        match timeout(duration, self.rx.recv()).await {
54            Ok(msg) => Ok(msg),
55            Err(_) => Err(()),
56        }
57    }
58
59    /// Tries to receive a message without blocking.
60    ///
61    /// Returns `Ok(envelope)` if a message was available,
62    /// `Err(TryRecvError::Empty)` if no message was available,
63    /// or `Err(TryRecvError::Disconnected)` if all senders were dropped.
64    pub fn try_recv(&mut self) -> Result<Envelope, mpsc::error::TryRecvError> {
65        self.rx.try_recv()
66    }
67
68    /// Closes the mailbox, preventing any further messages from being sent.
69    pub fn close(&mut self) {
70        self.rx.close()
71    }
72}
73
74/// The sending end of a process mailbox.
75///
76/// This can be cloned and shared between processes to send messages
77/// to the mailbox owner.
78#[derive(Clone)]
79pub struct MailboxSender {
80    tx: mpsc::UnboundedSender<Envelope>,
81}
82
83impl MailboxSender {
84    /// Sends a message to the mailbox.
85    ///
86    /// Returns `Ok(())` if the message was sent successfully,
87    /// or `Err(envelope)` if the mailbox was closed.
88    pub fn send(&self, envelope: Envelope) -> Result<(), Envelope> {
89        self.tx.send(envelope).map_err(|e| e.0)
90    }
91
92    /// Returns `true` if the mailbox is closed.
93    pub fn is_closed(&self) -> bool {
94        self.tx.is_closed()
95    }
96}
97
98#[cfg(test)]
99mod tests {
100    use super::*;
101
102    #[tokio::test]
103    async fn test_mailbox_send_recv() {
104        let (mut mailbox, sender) = Mailbox::new();
105
106        sender.send(Envelope::new(vec![1, 2, 3])).unwrap();
107        sender.send(Envelope::new(vec![4, 5, 6])).unwrap();
108
109        let msg1 = mailbox.recv().await.unwrap();
110        assert_eq!(msg1.data, vec![1, 2, 3]);
111
112        let msg2 = mailbox.recv().await.unwrap();
113        assert_eq!(msg2.data, vec![4, 5, 6]);
114    }
115
116    #[tokio::test]
117    async fn test_mailbox_try_recv() {
118        let (mut mailbox, sender) = Mailbox::new();
119
120        // No message yet
121        assert!(mailbox.try_recv().is_err());
122
123        sender.send(Envelope::new(vec![1, 2, 3])).unwrap();
124
125        // Now there's a message
126        let msg = mailbox.try_recv().unwrap();
127        assert_eq!(msg.data, vec![1, 2, 3]);
128
129        // No more messages
130        assert!(mailbox.try_recv().is_err());
131    }
132
133    #[tokio::test]
134    async fn test_mailbox_timeout() {
135        let (mut mailbox, _sender) = Mailbox::new();
136
137        // Should timeout quickly
138        let result = mailbox.recv_timeout(Duration::from_millis(10)).await;
139        assert!(result.is_err());
140    }
141
142    #[tokio::test]
143    async fn test_mailbox_close() {
144        let (mut mailbox, sender) = Mailbox::new();
145
146        // Send a message
147        sender.send(Envelope::new(vec![1, 2, 3])).unwrap();
148
149        // Close the mailbox
150        mailbox.close();
151
152        // Can still receive pending messages
153        let msg = mailbox.recv().await.unwrap();
154        assert_eq!(msg.data, vec![1, 2, 3]);
155
156        // Further sends should fail
157        assert!(sender.send(Envelope::new(vec![4, 5, 6])).is_err());
158    }
159
160    #[tokio::test]
161    async fn test_sender_is_closed() {
162        let (mut mailbox, sender) = Mailbox::new();
163
164        assert!(!sender.is_closed());
165
166        mailbox.close();
167
168        assert!(sender.is_closed());
169    }
170
171    #[tokio::test]
172    async fn test_multiple_senders() {
173        let (mut mailbox, sender1) = Mailbox::new();
174        let sender2 = sender1.clone();
175
176        sender1.send(Envelope::new(vec![1])).unwrap();
177        sender2.send(Envelope::new(vec![2])).unwrap();
178
179        let msg1 = mailbox.recv().await.unwrap();
180        let msg2 = mailbox.recv().await.unwrap();
181
182        // Order is preserved per sender but interleaving depends on timing
183        assert!(msg1.data == vec![1] || msg1.data == vec![2]);
184        assert!(msg2.data == vec![1] || msg2.data == vec![2]);
185        assert_ne!(msg1.data, msg2.data);
186    }
187}