Skip to main content

rill_core_actor/
lib.rs

1//! # Rill Core Actor — actor model infrastructure
2//!
3//! A lightweight, domain-agnostic actor model for lock-free message passing.
4//! Actors are single-threaded: handler is created and drained on the same thread.
5//!
6//! ## Key types
7//!
8//! | Type | Role |
9//! |------|------|
10//! | [`Actor<M>`] | Handler + mailbox — drained in-place (no separate thread) |
11//! | [`ActorRef<M>`] | Thread-safe handle to send messages to an actor |
12//! | [`ActorSystem`] | Named actor registry, dead letters, `spawn()`, `spawn_detached()` |
13//!
14//! ## Architecture
15//!
16//! ```text
17//! // Handler drained inline (Graph, Rack):
18//! system.spawn(name, handler) → Actor<M> → actor.drain() on caller's thread
19//!
20//! // Handler created & drained inside a new thread (Servo, factory modules):
21//! system.spawn_detached(name, make_handler, ms) → ActorRef<M>
22//!   └── thread::spawn: handler = make_handler() → actor.drain() loop
23//! ```
24
25use std::any::Any;
26use std::sync::atomic::{AtomicBool, Ordering};
27use std::sync::{Arc, Mutex};
28use std::time::Duration;
29
30use rill_core::queues::MpscQueue;
31
32// ============================================================================
33// Mailbox — private, never visible outside this crate
34// ============================================================================
35
36pub(crate) struct Mailbox<M: Send + 'static> {
37    pub(crate) queue: MpscQueue<M>,
38    alive: AtomicBool,
39}
40
41impl<M: Send + 'static> Mailbox<M> {
42    pub(crate) fn new(capacity: usize) -> Self {
43        Self {
44            queue: MpscQueue::with_capacity(capacity),
45            alive: AtomicBool::new(true),
46        }
47    }
48
49    pub fn pop(&self) -> Option<M> {
50        self.queue.pop()
51    }
52
53    pub fn actor_ref(self: &Arc<Self>) -> ActorRef<M> {
54        ActorRef {
55            inner: self.clone(),
56        }
57    }
58}
59
60// ============================================================================
61// Actor — single‑threaded (handler: !Send, drained inline)
62// ============================================================================
63
64pub struct Actor<M: Send + 'static> {
65    mailbox: Arc<Mailbox<M>>,
66    handler: Box<dyn FnMut(M) + 'static>,
67}
68
69impl<M: Send + 'static> Actor<M> {
70    pub fn drain(&mut self) {
71        while let Some(msg) = self.mailbox.pop() {
72            (self.handler)(msg);
73        }
74    }
75
76    pub fn actor_ref(&self) -> ActorRef<M> {
77        self.mailbox.actor_ref()
78    }
79}
80
81// ============================================================================
82// ActorRef
83// ============================================================================
84
85pub struct ActorRef<M: Send + 'static> {
86    inner: Arc<Mailbox<M>>,
87}
88
89impl<M: Send + 'static> Clone for ActorRef<M> {
90    fn clone(&self) -> Self {
91        Self {
92            inner: self.inner.clone(),
93        }
94    }
95}
96
97impl<M: Send + 'static> ActorRef<M> {
98    pub fn send(&self, msg: M) {
99        if self.inner.alive.load(Ordering::Acquire) {
100            let _ = self.inner.queue.push(msg);
101        }
102    }
103}
104
105// ============================================================================
106// ActorSystem
107// ============================================================================
108
109pub struct ActorSystem {
110    actors: Mutex<Vec<(String, Box<dyn Any + Send>)>>,
111    dead: Arc<MpscQueue<Box<dyn Any + Send>>>,
112}
113
114impl ActorSystem {
115    pub fn new() -> Self {
116        Self {
117            actors: Mutex::new(Vec::new()),
118            dead: Arc::new(MpscQueue::new()),
119        }
120    }
121
122    /// Spawn an actor drained inline by the caller (Graph, Rack).
123    /// Handler does not need `Send` — it lives and dies on the caller's thread.
124    pub fn spawn<M: Send + 'static>(
125        &self,
126        name: &str,
127        handler: impl FnMut(M) + 'static,
128    ) -> Actor<M> {
129        let actor = Actor {
130            mailbox: Arc::new(Mailbox::new(64)),
131            handler: Box::new(handler),
132        };
133        self.actors
134            .lock()
135            .unwrap()
136            .push((name.to_string(), Box::new(actor.actor_ref())));
137        actor
138    }
139
140    /// Spawn a detached actor — handler is created inside a new OS thread
141    /// and drained in a loop. Returns the [`ActorRef`] immediately.
142    ///
143    /// `make_handler` is called inside the spawned thread, so the returned
144    /// handler closure does not need `Send`.
145    pub fn spawn_detached<M: Send + 'static>(
146        &self,
147        name: &str,
148        make_handler: impl FnOnce() -> Box<dyn FnMut(M) + 'static> + Send + 'static,
149        interval_ms: u64,
150    ) -> ActorRef<M> {
151        let mailbox = Arc::new(Mailbox::new(64));
152        let actor_ref = mailbox.actor_ref();
153        self.actors
154            .lock()
155            .unwrap()
156            .push((name.to_string(), Box::new(actor_ref.clone())));
157        std::thread::spawn(move || {
158            let handler = make_handler();
159            let mut actor = Actor { mailbox, handler };
160            loop {
161                actor.drain();
162                std::thread::sleep(Duration::from_millis(interval_ms));
163            }
164        });
165        actor_ref
166    }
167
168    /// Spawn a detached actor on a tokio task — handler must be `Send`.
169    /// Useful when many actors are needed (e.g. 24 Servos), avoiding OS thread overhead.
170    #[cfg(feature = "tokio")]
171    pub fn spawn_detached_tokio<M: Send + 'static>(
172        &self,
173        name: &str,
174        make_handler: impl FnOnce() -> Box<dyn FnMut(M) + Send + 'static> + Send + 'static,
175        interval_ms: u64,
176    ) -> ActorRef<M> {
177        let mailbox = Arc::new(Mailbox::new(64));
178        let actor_ref = mailbox.actor_ref();
179        self.actors
180            .lock()
181            .unwrap()
182            .push((name.to_string(), Box::new(actor_ref.clone())));
183        tokio::spawn(async move {
184            let mut handler = make_handler();
185            let mut interval = tokio::time::interval(Duration::from_millis(interval_ms));
186            loop {
187                interval.tick().await;
188                while let Some(msg) = mailbox.pop() {
189                    handler(msg);
190                }
191            }
192        });
193        actor_ref
194    }
195
196    pub fn route<M: Send + 'static>(&self, name: &str, msg: M) {
197        if let Ok(actors) = self.actors.lock() {
198            for (n, actor_ref) in actors.iter() {
199                if n == name {
200                    if let Some(ar) = actor_ref.downcast_ref::<ActorRef<M>>() {
201                        ar.send(msg);
202                        return;
203                    }
204                }
205            }
206        }
207        let _ = self.dead.push(Box::new(msg));
208    }
209
210    pub fn broadcast<M: Send + Clone + 'static>(&self, msg: M) {
211        if let Ok(actors) = self.actors.lock() {
212            for (_, actor_ref) in actors.iter() {
213                if let Some(ar) = actor_ref.downcast_ref::<ActorRef<M>>() {
214                    ar.send(msg.clone());
215                }
216            }
217        }
218    }
219
220    pub fn drain_dead(&self) -> Vec<Box<dyn Any + Send>> {
221        let mut msgs = Vec::new();
222        while let Some(msg) = self.dead.pop() {
223            msgs.push(msg);
224        }
225        msgs
226    }
227
228    pub fn actor_count(&self) -> usize {
229        self.actors.lock().map(|a| a.len()).unwrap_or(0)
230    }
231}
232
233impl Default for ActorSystem {
234    fn default() -> Self {
235        Self::new()
236    }
237}
238
239// ============================================================================
240// Tests
241// ============================================================================
242
243#[cfg(test)]
244mod tests {
245    use super::*;
246
247    #[test]
248    fn test_spawn_and_drain() {
249        let system = ActorSystem::new();
250        let received = Arc::new(Mutex::new(Vec::new()));
251        let recv = received.clone();
252        let mut actor = system.spawn("test", move |msg: String| {
253            recv.lock().unwrap().push(msg);
254        });
255        assert_eq!(system.actor_count(), 1);
256
257        let ref_a = actor.actor_ref();
258        ref_a.send("hello".into());
259        ref_a.send("world".into());
260
261        actor.drain();
262        assert_eq!(received.lock().unwrap().len(), 2);
263    }
264
265    #[test]
266    fn test_multiple_refs_share_mailbox() {
267        let system = ActorSystem::new();
268        let log = Arc::new(Mutex::new(Vec::new()));
269        let l = log.clone();
270        let mut actor = system.spawn("multi", move |msg: String| {
271            l.lock().unwrap().push(msg);
272        });
273
274        let ref_a = actor.actor_ref();
275        let ref_b = actor.actor_ref();
276        ref_a.send("alpha".into());
277        ref_b.send("beta".into());
278
279        actor.drain();
280        let drained = log.lock().unwrap();
281        assert_eq!(drained.len(), 2);
282    }
283
284    #[test]
285    fn test_queue_overflow_drops() {
286        let system = ActorSystem::new();
287        let sum = Arc::new(Mutex::new(0));
288        let s = sum.clone();
289        let mut actor = system.spawn("drop", move |msg: i32| {
290            *s.lock().unwrap() += msg;
291        });
292
293        let ref_a = actor.actor_ref();
294        for i in 0..200 {
295            ref_a.send(i);
296        }
297        actor.drain();
298        let total = *sum.lock().unwrap();
299        assert!(total > 0);
300        assert!(total < (0..200).sum::<i32>());
301    }
302
303    #[test]
304    fn test_route() {
305        let system = ActorSystem::new();
306        let log = Arc::new(Mutex::new(Vec::new()));
307        let l = log.clone();
308        let mut actor = system.spawn("echo", move |msg: String| {
309            l.lock().unwrap().push(msg);
310        });
311
312        system.route("echo", "routed".to_string());
313        actor.drain();
314        assert_eq!(*log.lock().unwrap(), vec!["routed".to_string()]);
315    }
316
317    #[test]
318    fn test_route_unknown_goes_to_dead() {
319        let system = ActorSystem::new();
320        system.route("unknown", "lost".to_string());
321        let dead: Vec<String> = system
322            .drain_dead()
323            .into_iter()
324            .filter_map(|b| b.downcast::<String>().ok().map(|b| *b))
325            .collect();
326        assert_eq!(dead, vec!["lost".to_string()]);
327    }
328
329    #[test]
330    fn test_broadcast() {
331        let system = ActorSystem::new();
332        let log_a = Arc::new(Mutex::new(Vec::new()));
333        let log_b = Arc::new(Mutex::new(Vec::new()));
334        let la = log_a.clone();
335        let lb = log_b.clone();
336        let mut actor_a = system.spawn("a", move |msg: String| {
337            la.lock().unwrap().push(msg);
338        });
339        let mut actor_b = system.spawn("b", move |msg: String| {
340            lb.lock().unwrap().push(msg);
341        });
342
343        system.broadcast("all".to_string());
344        actor_a.drain();
345        actor_b.drain();
346        assert_eq!(*log_a.lock().unwrap(), vec!["all".to_string()]);
347        assert_eq!(*log_b.lock().unwrap(), vec!["all".to_string()]);
348    }
349
350    #[test]
351    fn test_spawn_detached() {
352        let system = ActorSystem::new();
353        let received = Arc::new(Mutex::new(Vec::new()));
354        let recv = received.clone();
355        let actor_ref = system.spawn_detached(
356            "detached",
357            move || Box::new(move |msg: String| recv.lock().unwrap().push(msg)),
358            1,
359        );
360        actor_ref.send("hello".into());
361        actor_ref.send("world".into());
362        std::thread::sleep(Duration::from_millis(50));
363        assert_eq!(received.lock().unwrap().len(), 2);
364    }
365}