starlang_runtime/
mailbox.rs1use std::time::Duration;
7use tokio::sync::mpsc;
8use tokio::time::timeout;
9
10#[derive(Debug, Clone)]
14pub struct Envelope {
15 pub data: Vec<u8>,
17}
18
19impl Envelope {
20 pub fn new(data: Vec<u8>) -> Self {
22 Self { data }
23 }
24}
25
26pub struct Mailbox {
30 rx: mpsc::UnboundedReceiver<Envelope>,
31}
32
33impl Mailbox {
34 pub fn new() -> (Self, MailboxSender) {
36 let (tx, rx) = mpsc::unbounded_channel();
37 (Self { rx }, MailboxSender { tx })
38 }
39
40 pub async fn recv(&mut self) -> Option<Envelope> {
44 self.rx.recv().await
45 }
46
47 pub async fn recv_timeout(&mut self, duration: Duration) -> Result<Option<Envelope>, ()> {
53 match timeout(duration, self.rx.recv()).await {
54 Ok(msg) => Ok(msg),
55 Err(_) => Err(()),
56 }
57 }
58
59 pub fn try_recv(&mut self) -> Result<Envelope, mpsc::error::TryRecvError> {
65 self.rx.try_recv()
66 }
67
68 pub fn close(&mut self) {
70 self.rx.close()
71 }
72}
73
74#[derive(Clone)]
79pub struct MailboxSender {
80 tx: mpsc::UnboundedSender<Envelope>,
81}
82
83impl MailboxSender {
84 pub fn send(&self, envelope: Envelope) -> Result<(), Envelope> {
89 self.tx.send(envelope).map_err(|e| e.0)
90 }
91
92 pub fn is_closed(&self) -> bool {
94 self.tx.is_closed()
95 }
96}
97
98#[cfg(test)]
99mod tests {
100 use super::*;
101
102 #[tokio::test]
103 async fn test_mailbox_send_recv() {
104 let (mut mailbox, sender) = Mailbox::new();
105
106 sender.send(Envelope::new(vec![1, 2, 3])).unwrap();
107 sender.send(Envelope::new(vec![4, 5, 6])).unwrap();
108
109 let msg1 = mailbox.recv().await.unwrap();
110 assert_eq!(msg1.data, vec![1, 2, 3]);
111
112 let msg2 = mailbox.recv().await.unwrap();
113 assert_eq!(msg2.data, vec![4, 5, 6]);
114 }
115
116 #[tokio::test]
117 async fn test_mailbox_try_recv() {
118 let (mut mailbox, sender) = Mailbox::new();
119
120 assert!(mailbox.try_recv().is_err());
122
123 sender.send(Envelope::new(vec![1, 2, 3])).unwrap();
124
125 let msg = mailbox.try_recv().unwrap();
127 assert_eq!(msg.data, vec![1, 2, 3]);
128
129 assert!(mailbox.try_recv().is_err());
131 }
132
133 #[tokio::test]
134 async fn test_mailbox_timeout() {
135 let (mut mailbox, _sender) = Mailbox::new();
136
137 let result = mailbox.recv_timeout(Duration::from_millis(10)).await;
139 assert!(result.is_err());
140 }
141
142 #[tokio::test]
143 async fn test_mailbox_close() {
144 let (mut mailbox, sender) = Mailbox::new();
145
146 sender.send(Envelope::new(vec![1, 2, 3])).unwrap();
148
149 mailbox.close();
151
152 let msg = mailbox.recv().await.unwrap();
154 assert_eq!(msg.data, vec![1, 2, 3]);
155
156 assert!(sender.send(Envelope::new(vec![4, 5, 6])).is_err());
158 }
159
160 #[tokio::test]
161 async fn test_sender_is_closed() {
162 let (mut mailbox, sender) = Mailbox::new();
163
164 assert!(!sender.is_closed());
165
166 mailbox.close();
167
168 assert!(sender.is_closed());
169 }
170
171 #[tokio::test]
172 async fn test_multiple_senders() {
173 let (mut mailbox, sender1) = Mailbox::new();
174 let sender2 = sender1.clone();
175
176 sender1.send(Envelope::new(vec![1])).unwrap();
177 sender2.send(Envelope::new(vec![2])).unwrap();
178
179 let msg1 = mailbox.recv().await.unwrap();
180 let msg2 = mailbox.recv().await.unwrap();
181
182 assert!(msg1.data == vec![1] || msg1.data == vec![2]);
184 assert!(msg2.data == vec![1] || msg2.data == vec![2]);
185 assert_ne!(msg1.data, msg2.data);
186 }
187}