1use std::collections::VecDeque;
58use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering};
59use std::sync::Arc;
60use std::thread::{self, JoinHandle};
61use std::time::{Duration, Instant};
62
63use crossbeam_channel::{bounded, Receiver, Sender};
64use parking_lot::Mutex;
65
66pub type ActorId = u64;
68
69pub type MessageId = u64;
71
72pub struct Message<T> {
74 pub id: MessageId,
75 pub payload: T,
76 pub created_at: Instant,
77}
78
79impl<T> Message<T> {
80 pub fn new(id: MessageId, payload: T) -> Self {
81 Self {
82 id,
83 payload,
84 created_at: Instant::now(),
85 }
86 }
87
88 pub fn age(&self) -> Duration {
90 self.created_at.elapsed()
91 }
92}
93
94pub struct Response<R> {
96 pub message_id: MessageId,
97 pub result: Result<R, ActorError>,
98 pub processing_time: Duration,
99}
100
101#[derive(Debug)]
103pub enum ActorError {
104 MailboxFull,
106 ActorStopped,
108 HandlerError(String),
110 Timeout,
112}
113
114impl std::fmt::Display for ActorError {
115 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
116 match self {
117 ActorError::MailboxFull => write!(f, "Actor mailbox is full"),
118 ActorError::ActorStopped => write!(f, "Actor has stopped"),
119 ActorError::HandlerError(e) => write!(f, "Handler error: {}", e),
120 ActorError::Timeout => write!(f, "Request timed out"),
121 }
122 }
123}
124
125impl std::error::Error for ActorError {}
126
127pub trait Handler<M, R>: Send + Sync {
129 fn handle(&mut self, message: M) -> Result<R, ActorError>;
130}
131
132#[derive(Debug, Clone, Default)]
134pub struct ActorStats {
135 pub messages_processed: u64,
136 pub messages_pending: usize,
137 pub total_processing_time_us: u64,
138 pub max_processing_time_us: u64,
139 pub avg_wait_time_us: u64,
140}
141
142struct ActorInner<M, R, H: Handler<M, R>> {
144 #[allow(dead_code)]
145 id: ActorId,
146 handler: H,
147 inbox: Receiver<Message<M>>,
148 running: Arc<AtomicBool>,
149 stats: ActorStats,
150 _phantom: std::marker::PhantomData<R>,
151}
152
153impl<M: Send + 'static, R: Send + 'static, H: Handler<M, R> + 'static> ActorInner<M, R, H> {
154 fn run(mut self, response_tx: Sender<Response<R>>) {
155 while self.running.load(Ordering::Acquire) {
156 match self.inbox.recv_timeout(Duration::from_millis(100)) {
157 Ok(msg) => {
158 let wait_time = msg.age();
159 let start = Instant::now();
160
161 let result = self.handler.handle(msg.payload);
162
163 let processing_time = start.elapsed();
164
165 self.stats.messages_processed += 1;
167 let proc_us = processing_time.as_micros() as u64;
168 self.stats.total_processing_time_us += proc_us;
169 if proc_us > self.stats.max_processing_time_us {
170 self.stats.max_processing_time_us = proc_us;
171 }
172 let wait_us = wait_time.as_micros() as u64;
173 let n = self.stats.messages_processed;
174 self.stats.avg_wait_time_us =
175 (self.stats.avg_wait_time_us * (n - 1) + wait_us) / n;
176
177 let _ = response_tx.send(Response {
178 message_id: msg.id,
179 result,
180 processing_time,
181 });
182 }
183 Err(crossbeam_channel::RecvTimeoutError::Timeout) => {
184 continue;
186 }
187 Err(crossbeam_channel::RecvTimeoutError::Disconnected) => {
188 break;
190 }
191 }
192 }
193 }
194}
195
196pub struct ActorRef<M, R> {
198 id: ActorId,
199 inbox: Sender<Message<M>>,
200 responses: Receiver<Response<R>>,
201 next_message_id: AtomicU64,
202 running: Arc<AtomicBool>,
203}
204
205impl<M: Send + 'static, R: Send + 'static> ActorRef<M, R> {
206 pub fn ask(&self, message: M) -> Result<R, ActorError> {
208 self.ask_timeout(message, Duration::from_secs(30))
209 }
210
211 pub fn ask_timeout(&self, message: M, timeout: Duration) -> Result<R, ActorError> {
213 if !self.running.load(Ordering::Acquire) {
214 return Err(ActorError::ActorStopped);
215 }
216
217 let id = self.next_message_id.fetch_add(1, Ordering::SeqCst);
218 let msg = Message::new(id, message);
219
220 self.inbox
221 .send(msg)
222 .map_err(|_| ActorError::ActorStopped)?;
223
224 match self.responses.recv_timeout(timeout) {
226 Ok(resp) => resp.result,
227 Err(_) => Err(ActorError::Timeout),
228 }
229 }
230
231 pub fn tell(&self, message: M) -> Result<(), ActorError> {
233 if !self.running.load(Ordering::Acquire) {
234 return Err(ActorError::ActorStopped);
235 }
236
237 let id = self.next_message_id.fetch_add(1, Ordering::SeqCst);
238 let msg = Message::new(id, message);
239
240 self.inbox
241 .try_send(msg)
242 .map_err(|e| match e {
243 crossbeam_channel::TrySendError::Full(_) => ActorError::MailboxFull,
244 crossbeam_channel::TrySendError::Disconnected(_) => ActorError::ActorStopped,
245 })
246 }
247
248 pub fn is_running(&self) -> bool {
250 self.running.load(Ordering::Acquire)
251 }
252
253 pub fn id(&self) -> ActorId {
255 self.id
256 }
257
258 pub fn stop(&self) {
260 self.running.store(false, Ordering::Release);
261 }
262}
263
264pub struct Actor;
266
267impl Actor {
268 pub fn spawn<M, R, H>(id: ActorId, handler: H, mailbox_size: usize) -> (ActorRef<M, R>, JoinHandle<()>)
270 where
271 M: Send + 'static,
272 R: Send + 'static,
273 H: Handler<M, R> + 'static,
274 {
275 let (inbox_tx, inbox_rx) = bounded(mailbox_size);
276 let (resp_tx, resp_rx) = bounded(mailbox_size);
277 let running = Arc::new(AtomicBool::new(true));
278
279 let inner = ActorInner {
280 id,
281 handler,
282 inbox: inbox_rx,
283 running: Arc::clone(&running),
284 stats: ActorStats::default(),
285 _phantom: std::marker::PhantomData,
286 };
287
288 let handle = thread::spawn(move || {
289 inner.run(resp_tx);
290 });
291
292 let actor_ref = ActorRef {
293 id,
294 inbox: inbox_tx,
295 responses: resp_rx,
296 next_message_id: AtomicU64::new(1),
297 running,
298 };
299
300 (actor_ref, handle)
301 }
302}
303
304pub struct ActorPool<M: Send + Clone + 'static, R: Send + 'static> {
306 actors: Vec<ActorRef<M, R>>,
307 handles: Mutex<Vec<JoinHandle<()>>>,
308 next_actor: AtomicUsize,
309 #[allow(dead_code)]
310 next_actor_id: AtomicU64,
311}
312
313impl<M: Send + Clone + 'static, R: Send + 'static> ActorPool<M, R> {
314 pub fn new<F, H>(size: usize, factory: F, mailbox_size: usize) -> Self
316 where
317 F: Fn() -> H,
318 H: Handler<M, R> + 'static,
319 {
320 let mut actors = Vec::with_capacity(size);
321 let mut handles = Vec::with_capacity(size);
322 let next_id = AtomicU64::new(1);
323
324 for _ in 0..size {
325 let id = next_id.fetch_add(1, Ordering::SeqCst);
326 let handler = factory();
327 let (actor_ref, handle) = Actor::spawn(id, handler, mailbox_size);
328 actors.push(actor_ref);
329 handles.push(handle);
330 }
331
332 Self {
333 actors,
334 handles: Mutex::new(handles),
335 next_actor: AtomicUsize::new(0),
336 next_actor_id: next_id,
337 }
338 }
339
340 pub fn ask(&self, message: M) -> Result<R, ActorError> {
342 let idx = self.next_actor.fetch_add(1, Ordering::Relaxed) % self.actors.len();
343 self.actors[idx].ask(message)
344 }
345
346 pub fn ask_actor(&self, actor_idx: usize, message: M) -> Result<R, ActorError> {
348 if actor_idx >= self.actors.len() {
349 return Err(ActorError::HandlerError("Invalid actor index".to_string()));
350 }
351 self.actors[actor_idx].ask(message)
352 }
353
354 pub fn broadcast(&self, message: M) -> Vec<Result<R, ActorError>> {
356 self.actors.iter().map(|a| a.ask(message.clone())).collect()
357 }
358
359 pub fn size(&self) -> usize {
361 self.actors.len()
362 }
363
364 pub fn shutdown(&self) {
366 for actor in &self.actors {
367 actor.stop();
368 }
369
370 let mut handles = self.handles.lock();
372 for handle in handles.drain(..) {
373 let _ = handle.join();
374 }
375 }
376}
377
378impl<M: Send + Clone + 'static, R: Send + 'static> Drop for ActorPool<M, R> {
379 fn drop(&mut self) {
380 self.shutdown();
381 }
382}
383
384#[allow(dead_code)]
386pub struct WorkStealingPool<M: Send + 'static, R: Send + 'static> {
387 actors: Vec<Arc<ActorRef<M, R>>>,
388 queues: Vec<Arc<Mutex<VecDeque<Message<M>>>>>,
389 handles: Mutex<Vec<JoinHandle<()>>>,
390 running: Arc<AtomicBool>,
391}
392
393#[derive(Debug, Clone)]
395pub enum AffinityHint {
396 KeyBased(u64),
398 LeastLoaded,
400 RoundRobin,
402 Specific(ActorId),
404}
405
406pub struct RequestRouter<M: Send + Clone + 'static, R: Send + 'static> {
408 pool: Arc<ActorPool<M, R>>,
409 key_to_actor: Mutex<std::collections::HashMap<u64, usize>>,
410}
411
412impl<M: Send + Clone + 'static, R: Send + 'static> RequestRouter<M, R> {
413 pub fn new(pool: Arc<ActorPool<M, R>>) -> Self {
414 Self {
415 pool,
416 key_to_actor: Mutex::new(std::collections::HashMap::new()),
417 }
418 }
419
420 pub fn route(&self, message: M, hint: AffinityHint) -> Result<R, ActorError> {
422 let actor_idx = match hint {
423 AffinityHint::KeyBased(key) => {
424 let mut mapping = self.key_to_actor.lock();
425 *mapping.entry(key).or_insert_with(|| {
426 (key as usize) % self.pool.size()
427 })
428 }
429 AffinityHint::LeastLoaded => {
430 0 }
433 AffinityHint::RoundRobin => {
434 return self.pool.ask(message);
436 }
437 AffinityHint::Specific(id) => {
438 (id as usize) % self.pool.size()
439 }
440 };
441
442 self.pool.ask_actor(actor_idx, message)
443 }
444}
445
446#[cfg(test)]
447mod tests {
448 use super::*;
449
450 struct EchoHandler;
451
452 impl Handler<String, String> for EchoHandler {
453 fn handle(&mut self, message: String) -> Result<String, ActorError> {
454 Ok(format!("Echo: {}", message))
455 }
456 }
457
458 struct CounterHandler {
459 count: u64,
460 }
461
462 impl Handler<(), u64> for CounterHandler {
463 fn handle(&mut self, _: ()) -> Result<u64, ActorError> {
464 self.count += 1;
465 Ok(self.count)
466 }
467 }
468
469 #[test]
470 fn test_actor_spawn() {
471 let (actor, handle) = Actor::spawn(1, EchoHandler, 10);
472
473 let result = actor.ask("Hello".to_string()).unwrap();
474 assert_eq!(result, "Echo: Hello");
475
476 actor.stop();
477 handle.join().unwrap();
478 }
479
480 #[test]
481 fn test_actor_pool() {
482 let pool = ActorPool::new(4, || EchoHandler, 100);
483
484 let results: Vec<_> = (0..10)
486 .map(|i| pool.ask(format!("Message {}", i)))
487 .collect();
488
489 for (i, result) in results.into_iter().enumerate() {
490 assert_eq!(result.unwrap(), format!("Echo: Message {}", i));
491 }
492
493 pool.shutdown();
494 }
495
496 #[test]
497 fn test_counter_handler() {
498 let (actor, handle) = Actor::spawn(1, CounterHandler { count: 0 }, 10);
499
500 assert_eq!(actor.ask(()).unwrap(), 1);
501 assert_eq!(actor.ask(()).unwrap(), 2);
502 assert_eq!(actor.ask(()).unwrap(), 3);
503
504 actor.stop();
505 handle.join().unwrap();
506 }
507
508 #[test]
509 fn test_broadcast() {
510 let pool = ActorPool::new(4, || CounterHandler { count: 0 }, 100);
511
512 let results = pool.broadcast(());
513 assert_eq!(results.len(), 4);
514 for result in results {
515 assert_eq!(result.unwrap(), 1);
516 }
517
518 pool.shutdown();
519 }
520
521 #[test]
522 fn test_request_router() {
523 let pool = Arc::new(ActorPool::new(4, || EchoHandler, 100));
524 let router = RequestRouter::new(Arc::clone(&pool));
525
526 let result1 = router.route("Test1".to_string(), AffinityHint::KeyBased(42)).unwrap();
528 let result2 = router.route("Test2".to_string(), AffinityHint::KeyBased(42)).unwrap();
529
530 assert!(result1.starts_with("Echo:"));
531 assert!(result2.starts_with("Echo:"));
532
533 pool.shutdown();
534 }
535}