1use std::any::Any;
26use std::sync::atomic::{AtomicBool, Ordering};
27use std::sync::{Arc, Mutex};
28use std::time::Duration;
29
30use rill_core::queues::MpscQueue;
31
32pub(crate) struct Mailbox<M: Send + 'static> {
37 pub(crate) queue: MpscQueue<M>,
38 alive: AtomicBool,
39}
40
41impl<M: Send + 'static> Mailbox<M> {
42 pub(crate) fn new(capacity: usize) -> Self {
43 Self {
44 queue: MpscQueue::with_capacity(capacity),
45 alive: AtomicBool::new(true),
46 }
47 }
48
49 pub fn pop(&self) -> Option<M> {
50 self.queue.pop()
51 }
52
53 pub fn actor_ref(self: &Arc<Self>) -> ActorRef<M> {
54 ActorRef {
55 inner: self.clone(),
56 }
57 }
58}
59
60pub struct Actor<M: Send + 'static> {
65 mailbox: Arc<Mailbox<M>>,
66 handler: Box<dyn FnMut(M) + 'static>,
67}
68
69impl<M: Send + 'static> Actor<M> {
70 pub fn drain(&mut self) {
71 while let Some(msg) = self.mailbox.pop() {
72 (self.handler)(msg);
73 }
74 }
75
76 pub fn actor_ref(&self) -> ActorRef<M> {
77 self.mailbox.actor_ref()
78 }
79}
80
81pub struct ActorRef<M: Send + 'static> {
86 inner: Arc<Mailbox<M>>,
87}
88
89impl<M: Send + 'static> Clone for ActorRef<M> {
90 fn clone(&self) -> Self {
91 Self {
92 inner: self.inner.clone(),
93 }
94 }
95}
96
97impl<M: Send + 'static> ActorRef<M> {
98 pub fn send(&self, msg: M) {
99 if self.inner.alive.load(Ordering::Acquire) {
100 let _ = self.inner.queue.push(msg);
101 }
102 }
103}
104
105pub struct ActorSystem {
110 actors: Mutex<Vec<(String, Box<dyn Any + Send>)>>,
111 dead: Arc<MpscQueue<Box<dyn Any + Send>>>,
112}
113
114impl ActorSystem {
115 pub fn new() -> Self {
116 Self {
117 actors: Mutex::new(Vec::new()),
118 dead: Arc::new(MpscQueue::new()),
119 }
120 }
121
122 pub fn spawn<M: Send + 'static>(
125 &self,
126 name: &str,
127 handler: impl FnMut(M) + 'static,
128 ) -> Actor<M> {
129 let actor = Actor {
130 mailbox: Arc::new(Mailbox::new(64)),
131 handler: Box::new(handler),
132 };
133 self.actors
134 .lock()
135 .unwrap()
136 .push((name.to_string(), Box::new(actor.actor_ref())));
137 actor
138 }
139
140 pub fn spawn_detached<M: Send + 'static>(
146 &self,
147 name: &str,
148 make_handler: impl FnOnce() -> Box<dyn FnMut(M) + 'static> + Send + 'static,
149 interval_ms: u64,
150 ) -> ActorRef<M> {
151 let mailbox = Arc::new(Mailbox::new(64));
152 let actor_ref = mailbox.actor_ref();
153 self.actors
154 .lock()
155 .unwrap()
156 .push((name.to_string(), Box::new(actor_ref.clone())));
157 std::thread::spawn(move || {
158 let handler = make_handler();
159 let mut actor = Actor { mailbox, handler };
160 loop {
161 actor.drain();
162 std::thread::sleep(Duration::from_millis(interval_ms));
163 }
164 });
165 actor_ref
166 }
167
168 #[cfg(feature = "tokio")]
171 pub fn spawn_detached_tokio<M: Send + 'static>(
172 &self,
173 name: &str,
174 make_handler: impl FnOnce() -> Box<dyn FnMut(M) + Send + 'static> + Send + 'static,
175 interval_ms: u64,
176 ) -> ActorRef<M> {
177 let mailbox = Arc::new(Mailbox::new(64));
178 let actor_ref = mailbox.actor_ref();
179 self.actors
180 .lock()
181 .unwrap()
182 .push((name.to_string(), Box::new(actor_ref.clone())));
183 tokio::spawn(async move {
184 let mut handler = make_handler();
185 let mut interval = tokio::time::interval(Duration::from_millis(interval_ms));
186 loop {
187 interval.tick().await;
188 while let Some(msg) = mailbox.pop() {
189 handler(msg);
190 }
191 }
192 });
193 actor_ref
194 }
195
196 pub fn route<M: Send + 'static>(&self, name: &str, msg: M) {
197 if let Ok(actors) = self.actors.lock() {
198 for (n, actor_ref) in actors.iter() {
199 if n == name {
200 if let Some(ar) = actor_ref.downcast_ref::<ActorRef<M>>() {
201 ar.send(msg);
202 return;
203 }
204 }
205 }
206 }
207 let _ = self.dead.push(Box::new(msg));
208 }
209
210 pub fn broadcast<M: Send + Clone + 'static>(&self, msg: M) {
211 if let Ok(actors) = self.actors.lock() {
212 for (_, actor_ref) in actors.iter() {
213 if let Some(ar) = actor_ref.downcast_ref::<ActorRef<M>>() {
214 ar.send(msg.clone());
215 }
216 }
217 }
218 }
219
220 pub fn drain_dead(&self) -> Vec<Box<dyn Any + Send>> {
221 let mut msgs = Vec::new();
222 while let Some(msg) = self.dead.pop() {
223 msgs.push(msg);
224 }
225 msgs
226 }
227
228 pub fn actor_count(&self) -> usize {
229 self.actors.lock().map(|a| a.len()).unwrap_or(0)
230 }
231}
232
233impl Default for ActorSystem {
234 fn default() -> Self {
235 Self::new()
236 }
237}
238
239#[cfg(test)]
244mod tests {
245 use super::*;
246
247 #[test]
248 fn test_spawn_and_drain() {
249 let system = ActorSystem::new();
250 let received = Arc::new(Mutex::new(Vec::new()));
251 let recv = received.clone();
252 let mut actor = system.spawn("test", move |msg: String| {
253 recv.lock().unwrap().push(msg);
254 });
255 assert_eq!(system.actor_count(), 1);
256
257 let ref_a = actor.actor_ref();
258 ref_a.send("hello".into());
259 ref_a.send("world".into());
260
261 actor.drain();
262 assert_eq!(received.lock().unwrap().len(), 2);
263 }
264
265 #[test]
266 fn test_multiple_refs_share_mailbox() {
267 let system = ActorSystem::new();
268 let log = Arc::new(Mutex::new(Vec::new()));
269 let l = log.clone();
270 let mut actor = system.spawn("multi", move |msg: String| {
271 l.lock().unwrap().push(msg);
272 });
273
274 let ref_a = actor.actor_ref();
275 let ref_b = actor.actor_ref();
276 ref_a.send("alpha".into());
277 ref_b.send("beta".into());
278
279 actor.drain();
280 let drained = log.lock().unwrap();
281 assert_eq!(drained.len(), 2);
282 }
283
284 #[test]
285 fn test_queue_overflow_drops() {
286 let system = ActorSystem::new();
287 let sum = Arc::new(Mutex::new(0));
288 let s = sum.clone();
289 let mut actor = system.spawn("drop", move |msg: i32| {
290 *s.lock().unwrap() += msg;
291 });
292
293 let ref_a = actor.actor_ref();
294 for i in 0..200 {
295 ref_a.send(i);
296 }
297 actor.drain();
298 let total = *sum.lock().unwrap();
299 assert!(total > 0);
300 assert!(total < (0..200).sum::<i32>());
301 }
302
303 #[test]
304 fn test_route() {
305 let system = ActorSystem::new();
306 let log = Arc::new(Mutex::new(Vec::new()));
307 let l = log.clone();
308 let mut actor = system.spawn("echo", move |msg: String| {
309 l.lock().unwrap().push(msg);
310 });
311
312 system.route("echo", "routed".to_string());
313 actor.drain();
314 assert_eq!(*log.lock().unwrap(), vec!["routed".to_string()]);
315 }
316
317 #[test]
318 fn test_route_unknown_goes_to_dead() {
319 let system = ActorSystem::new();
320 system.route("unknown", "lost".to_string());
321 let dead: Vec<String> = system
322 .drain_dead()
323 .into_iter()
324 .filter_map(|b| b.downcast::<String>().ok().map(|b| *b))
325 .collect();
326 assert_eq!(dead, vec!["lost".to_string()]);
327 }
328
329 #[test]
330 fn test_broadcast() {
331 let system = ActorSystem::new();
332 let log_a = Arc::new(Mutex::new(Vec::new()));
333 let log_b = Arc::new(Mutex::new(Vec::new()));
334 let la = log_a.clone();
335 let lb = log_b.clone();
336 let mut actor_a = system.spawn("a", move |msg: String| {
337 la.lock().unwrap().push(msg);
338 });
339 let mut actor_b = system.spawn("b", move |msg: String| {
340 lb.lock().unwrap().push(msg);
341 });
342
343 system.broadcast("all".to_string());
344 actor_a.drain();
345 actor_b.drain();
346 assert_eq!(*log_a.lock().unwrap(), vec!["all".to_string()]);
347 assert_eq!(*log_b.lock().unwrap(), vec!["all".to_string()]);
348 }
349
350 #[test]
351 fn test_spawn_detached() {
352 let system = ActorSystem::new();
353 let received = Arc::new(Mutex::new(Vec::new()));
354 let recv = received.clone();
355 let actor_ref = system.spawn_detached(
356 "detached",
357 move || Box::new(move |msg: String| recv.lock().unwrap().push(msg)),
358 1,
359 );
360 actor_ref.send("hello".into());
361 actor_ref.send("world".into());
362 std::thread::sleep(Duration::from_millis(50));
363 assert_eq!(received.lock().unwrap().len(), 2);
364 }
365}