1use std::collections::VecDeque;
55use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering};
56use std::sync::Arc;
57use std::thread::{self, JoinHandle};
58use std::time::{Duration, Instant};
59
60use crossbeam_channel::{bounded, Receiver, Sender};
61use parking_lot::Mutex;
62
63pub type ActorId = u64;
65
66pub type MessageId = u64;
68
69pub struct Message<T> {
71 pub id: MessageId,
72 pub payload: T,
73 pub created_at: Instant,
74}
75
76impl<T> Message<T> {
77 pub fn new(id: MessageId, payload: T) -> Self {
78 Self {
79 id,
80 payload,
81 created_at: Instant::now(),
82 }
83 }
84
85 pub fn age(&self) -> Duration {
87 self.created_at.elapsed()
88 }
89}
90
91pub struct Response<R> {
93 pub message_id: MessageId,
94 pub result: Result<R, ActorError>,
95 pub processing_time: Duration,
96}
97
98#[derive(Debug)]
100pub enum ActorError {
101 MailboxFull,
103 ActorStopped,
105 HandlerError(String),
107 Timeout,
109}
110
111impl std::fmt::Display for ActorError {
112 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
113 match self {
114 ActorError::MailboxFull => write!(f, "Actor mailbox is full"),
115 ActorError::ActorStopped => write!(f, "Actor has stopped"),
116 ActorError::HandlerError(e) => write!(f, "Handler error: {}", e),
117 ActorError::Timeout => write!(f, "Request timed out"),
118 }
119 }
120}
121
122impl std::error::Error for ActorError {}
123
124pub trait Handler<M, R>: Send + Sync {
126 fn handle(&mut self, message: M) -> Result<R, ActorError>;
127}
128
129#[derive(Debug, Clone, Default)]
131pub struct ActorStats {
132 pub messages_processed: u64,
133 pub messages_pending: usize,
134 pub total_processing_time_us: u64,
135 pub max_processing_time_us: u64,
136 pub avg_wait_time_us: u64,
137}
138
139struct ActorInner<M, R, H: Handler<M, R>> {
141 #[allow(dead_code)]
142 id: ActorId,
143 handler: H,
144 inbox: Receiver<Message<M>>,
145 running: Arc<AtomicBool>,
146 stats: ActorStats,
147 _phantom: std::marker::PhantomData<R>,
148}
149
150impl<M: Send + 'static, R: Send + 'static, H: Handler<M, R> + 'static> ActorInner<M, R, H> {
151 fn run(mut self, response_tx: Sender<Response<R>>) {
152 while self.running.load(Ordering::Acquire) {
153 match self.inbox.recv_timeout(Duration::from_millis(100)) {
154 Ok(msg) => {
155 let wait_time = msg.age();
156 let start = Instant::now();
157
158 let result = self.handler.handle(msg.payload);
159
160 let processing_time = start.elapsed();
161
162 self.stats.messages_processed += 1;
164 let proc_us = processing_time.as_micros() as u64;
165 self.stats.total_processing_time_us += proc_us;
166 if proc_us > self.stats.max_processing_time_us {
167 self.stats.max_processing_time_us = proc_us;
168 }
169 let wait_us = wait_time.as_micros() as u64;
170 let n = self.stats.messages_processed;
171 self.stats.avg_wait_time_us =
172 (self.stats.avg_wait_time_us * (n - 1) + wait_us) / n;
173
174 let _ = response_tx.send(Response {
175 message_id: msg.id,
176 result,
177 processing_time,
178 });
179 }
180 Err(crossbeam_channel::RecvTimeoutError::Timeout) => {
181 continue;
183 }
184 Err(crossbeam_channel::RecvTimeoutError::Disconnected) => {
185 break;
187 }
188 }
189 }
190 }
191}
192
193pub struct ActorRef<M, R> {
195 id: ActorId,
196 inbox: Sender<Message<M>>,
197 responses: Receiver<Response<R>>,
198 next_message_id: AtomicU64,
199 running: Arc<AtomicBool>,
200}
201
202impl<M: Send + 'static, R: Send + 'static> ActorRef<M, R> {
203 pub fn ask(&self, message: M) -> Result<R, ActorError> {
205 self.ask_timeout(message, Duration::from_secs(30))
206 }
207
208 pub fn ask_timeout(&self, message: M, timeout: Duration) -> Result<R, ActorError> {
210 if !self.running.load(Ordering::Acquire) {
211 return Err(ActorError::ActorStopped);
212 }
213
214 let id = self.next_message_id.fetch_add(1, Ordering::SeqCst);
215 let msg = Message::new(id, message);
216
217 self.inbox
218 .send(msg)
219 .map_err(|_| ActorError::ActorStopped)?;
220
221 match self.responses.recv_timeout(timeout) {
223 Ok(resp) => resp.result,
224 Err(_) => Err(ActorError::Timeout),
225 }
226 }
227
228 pub fn tell(&self, message: M) -> Result<(), ActorError> {
230 if !self.running.load(Ordering::Acquire) {
231 return Err(ActorError::ActorStopped);
232 }
233
234 let id = self.next_message_id.fetch_add(1, Ordering::SeqCst);
235 let msg = Message::new(id, message);
236
237 self.inbox
238 .try_send(msg)
239 .map_err(|e| match e {
240 crossbeam_channel::TrySendError::Full(_) => ActorError::MailboxFull,
241 crossbeam_channel::TrySendError::Disconnected(_) => ActorError::ActorStopped,
242 })
243 }
244
245 pub fn is_running(&self) -> bool {
247 self.running.load(Ordering::Acquire)
248 }
249
250 pub fn id(&self) -> ActorId {
252 self.id
253 }
254
255 pub fn stop(&self) {
257 self.running.store(false, Ordering::Release);
258 }
259}
260
261pub struct Actor;
263
264impl Actor {
265 pub fn spawn<M, R, H>(id: ActorId, handler: H, mailbox_size: usize) -> (ActorRef<M, R>, JoinHandle<()>)
267 where
268 M: Send + 'static,
269 R: Send + 'static,
270 H: Handler<M, R> + 'static,
271 {
272 let (inbox_tx, inbox_rx) = bounded(mailbox_size);
273 let (resp_tx, resp_rx) = bounded(mailbox_size);
274 let running = Arc::new(AtomicBool::new(true));
275
276 let inner = ActorInner {
277 id,
278 handler,
279 inbox: inbox_rx,
280 running: Arc::clone(&running),
281 stats: ActorStats::default(),
282 _phantom: std::marker::PhantomData,
283 };
284
285 let handle = thread::spawn(move || {
286 inner.run(resp_tx);
287 });
288
289 let actor_ref = ActorRef {
290 id,
291 inbox: inbox_tx,
292 responses: resp_rx,
293 next_message_id: AtomicU64::new(1),
294 running,
295 };
296
297 (actor_ref, handle)
298 }
299}
300
301pub struct ActorPool<M: Send + Clone + 'static, R: Send + 'static> {
303 actors: Vec<ActorRef<M, R>>,
304 handles: Mutex<Vec<JoinHandle<()>>>,
305 next_actor: AtomicUsize,
306 #[allow(dead_code)]
307 next_actor_id: AtomicU64,
308}
309
310impl<M: Send + Clone + 'static, R: Send + 'static> ActorPool<M, R> {
311 pub fn new<F, H>(size: usize, factory: F, mailbox_size: usize) -> Self
313 where
314 F: Fn() -> H,
315 H: Handler<M, R> + 'static,
316 {
317 let mut actors = Vec::with_capacity(size);
318 let mut handles = Vec::with_capacity(size);
319 let next_id = AtomicU64::new(1);
320
321 for _ in 0..size {
322 let id = next_id.fetch_add(1, Ordering::SeqCst);
323 let handler = factory();
324 let (actor_ref, handle) = Actor::spawn(id, handler, mailbox_size);
325 actors.push(actor_ref);
326 handles.push(handle);
327 }
328
329 Self {
330 actors,
331 handles: Mutex::new(handles),
332 next_actor: AtomicUsize::new(0),
333 next_actor_id: next_id,
334 }
335 }
336
337 pub fn ask(&self, message: M) -> Result<R, ActorError> {
339 let idx = self.next_actor.fetch_add(1, Ordering::Relaxed) % self.actors.len();
340 self.actors[idx].ask(message)
341 }
342
343 pub fn ask_actor(&self, actor_idx: usize, message: M) -> Result<R, ActorError> {
345 if actor_idx >= self.actors.len() {
346 return Err(ActorError::HandlerError("Invalid actor index".to_string()));
347 }
348 self.actors[actor_idx].ask(message)
349 }
350
351 pub fn broadcast(&self, message: M) -> Vec<Result<R, ActorError>> {
353 self.actors.iter().map(|a| a.ask(message.clone())).collect()
354 }
355
356 pub fn size(&self) -> usize {
358 self.actors.len()
359 }
360
361 pub fn shutdown(&self) {
363 for actor in &self.actors {
364 actor.stop();
365 }
366
367 let mut handles = self.handles.lock();
369 for handle in handles.drain(..) {
370 let _ = handle.join();
371 }
372 }
373}
374
375impl<M: Send + Clone + 'static, R: Send + 'static> Drop for ActorPool<M, R> {
376 fn drop(&mut self) {
377 self.shutdown();
378 }
379}
380
381#[allow(dead_code)]
383pub struct WorkStealingPool<M: Send + 'static, R: Send + 'static> {
384 actors: Vec<Arc<ActorRef<M, R>>>,
385 queues: Vec<Arc<Mutex<VecDeque<Message<M>>>>>,
386 handles: Mutex<Vec<JoinHandle<()>>>,
387 running: Arc<AtomicBool>,
388}
389
390#[derive(Debug, Clone)]
392pub enum AffinityHint {
393 KeyBased(u64),
395 LeastLoaded,
397 RoundRobin,
399 Specific(ActorId),
401}
402
403pub struct RequestRouter<M: Send + Clone + 'static, R: Send + 'static> {
405 pool: Arc<ActorPool<M, R>>,
406 key_to_actor: Mutex<std::collections::HashMap<u64, usize>>,
407}
408
409impl<M: Send + Clone + 'static, R: Send + 'static> RequestRouter<M, R> {
410 pub fn new(pool: Arc<ActorPool<M, R>>) -> Self {
411 Self {
412 pool,
413 key_to_actor: Mutex::new(std::collections::HashMap::new()),
414 }
415 }
416
417 pub fn route(&self, message: M, hint: AffinityHint) -> Result<R, ActorError> {
419 let actor_idx = match hint {
420 AffinityHint::KeyBased(key) => {
421 let mut mapping = self.key_to_actor.lock();
422 *mapping.entry(key).or_insert_with(|| {
423 (key as usize) % self.pool.size()
424 })
425 }
426 AffinityHint::LeastLoaded => {
427 0 }
430 AffinityHint::RoundRobin => {
431 return self.pool.ask(message);
433 }
434 AffinityHint::Specific(id) => {
435 (id as usize) % self.pool.size()
436 }
437 };
438
439 self.pool.ask_actor(actor_idx, message)
440 }
441}
442
443#[cfg(test)]
444mod tests {
445 use super::*;
446
447 struct EchoHandler;
448
449 impl Handler<String, String> for EchoHandler {
450 fn handle(&mut self, message: String) -> Result<String, ActorError> {
451 Ok(format!("Echo: {}", message))
452 }
453 }
454
455 struct CounterHandler {
456 count: u64,
457 }
458
459 impl Handler<(), u64> for CounterHandler {
460 fn handle(&mut self, _: ()) -> Result<u64, ActorError> {
461 self.count += 1;
462 Ok(self.count)
463 }
464 }
465
466 #[test]
467 fn test_actor_spawn() {
468 let (actor, handle) = Actor::spawn(1, EchoHandler, 10);
469
470 let result = actor.ask("Hello".to_string()).unwrap();
471 assert_eq!(result, "Echo: Hello");
472
473 actor.stop();
474 handle.join().unwrap();
475 }
476
477 #[test]
478 fn test_actor_pool() {
479 let pool = ActorPool::new(4, || EchoHandler, 100);
480
481 let results: Vec<_> = (0..10)
483 .map(|i| pool.ask(format!("Message {}", i)))
484 .collect();
485
486 for (i, result) in results.into_iter().enumerate() {
487 assert_eq!(result.unwrap(), format!("Echo: Message {}", i));
488 }
489
490 pool.shutdown();
491 }
492
493 #[test]
494 fn test_counter_handler() {
495 let (actor, handle) = Actor::spawn(1, CounterHandler { count: 0 }, 10);
496
497 assert_eq!(actor.ask(()).unwrap(), 1);
498 assert_eq!(actor.ask(()).unwrap(), 2);
499 assert_eq!(actor.ask(()).unwrap(), 3);
500
501 actor.stop();
502 handle.join().unwrap();
503 }
504
505 #[test]
506 fn test_broadcast() {
507 let pool = ActorPool::new(4, || CounterHandler { count: 0 }, 100);
508
509 let results = pool.broadcast(());
510 assert_eq!(results.len(), 4);
511 for result in results {
512 assert_eq!(result.unwrap(), 1);
513 }
514
515 pool.shutdown();
516 }
517
518 #[test]
519 fn test_request_router() {
520 let pool = Arc::new(ActorPool::new(4, || EchoHandler, 100));
521 let router = RequestRouter::new(Arc::clone(&pool));
522
523 let result1 = router.route("Test1".to_string(), AffinityHint::KeyBased(42)).unwrap();
525 let result2 = router.route("Test2".to_string(), AffinityHint::KeyBased(42)).unwrap();
526
527 assert!(result1.starts_with("Echo:"));
528 assert!(result2.starts_with("Echo:"));
529
530 pool.shutdown();
531 }
532}