sochdb_storage/
actor.rs

1// Copyright 2025 Sushanth (https://github.com/sushanthpy)
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Actor-Based Connection Manager
16//!
17//! From mm.md Task 7.2: Unified Connection Model
18//!
19//! ## Problem
20//!
21//! Current: One global Arc<Mutex<Database>> shared by all threads.
22//! Issue: Lock contention, complex lifetime management, no affinity.
23//!
24//! ## Solution
25//!
26//! Actor model with single-owner database connections:
27//!
28//! ```text
29//! ┌─────────────┐     ┌─────────────┐
30//! │   Client    │────>│   Actor 1   │──┐
31//! └─────────────┘     └─────────────┘  │
32//!                                      │  ┌──────────────────┐
33//! ┌─────────────┐     ┌─────────────┐  ├─>│    Database      │
34//! │   Client    │────>│   Actor 2   │──┤  │  (owned by pool) │
35//! └─────────────┘     └─────────────┘  │  └──────────────────┘
36//!                                      │
37//! ┌─────────────┐     ┌─────────────┐  │
38//! │   Client    │────>│   Actor 3   │──┘
39//! └─────────────┘     └─────────────┘
40//!
41//! Each Actor:
42//! - Owns its connection (no sharing)
43//! - Processes messages sequentially (no locks)
44//! - Has CPU affinity for cache locality
45//! ```
46//!
47//! ## Benefits
48//!
49//! - Zero lock contention within actor
50//! - Predictable latency (no lock wait)
51//! - Cache-friendly (single-threaded access pattern)
52//! - Natural backpressure through message queue
53
54use std::collections::VecDeque;
55use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering};
56use std::sync::Arc;
57use std::thread::{self, JoinHandle};
58use std::time::{Duration, Instant};
59
60use crossbeam_channel::{bounded, Receiver, Sender};
61use parking_lot::Mutex;
62
63/// Actor ID
64pub type ActorId = u64;
65
66/// Message ID for tracking
67pub type MessageId = u64;
68
69/// Actor message envelope
70pub struct Message<T> {
71    pub id: MessageId,
72    pub payload: T,
73    pub created_at: Instant,
74}
75
76impl<T> Message<T> {
77    pub fn new(id: MessageId, payload: T) -> Self {
78        Self {
79            id,
80            payload,
81            created_at: Instant::now(),
82        }
83    }
84
85    /// Age of the message
86    pub fn age(&self) -> Duration {
87        self.created_at.elapsed()
88    }
89}
90
91/// Response wrapper
92pub struct Response<R> {
93    pub message_id: MessageId,
94    pub result: Result<R, ActorError>,
95    pub processing_time: Duration,
96}
97
98/// Actor error types
99#[derive(Debug)]
100pub enum ActorError {
101    /// Mailbox is full
102    MailboxFull,
103    /// Actor is stopped
104    ActorStopped,
105    /// Handler error
106    HandlerError(String),
107    /// Timeout waiting for response
108    Timeout,
109}
110
111impl std::fmt::Display for ActorError {
112    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
113        match self {
114            ActorError::MailboxFull => write!(f, "Actor mailbox is full"),
115            ActorError::ActorStopped => write!(f, "Actor has stopped"),
116            ActorError::HandlerError(e) => write!(f, "Handler error: {}", e),
117            ActorError::Timeout => write!(f, "Request timed out"),
118        }
119    }
120}
121
122impl std::error::Error for ActorError {}
123
124/// Handler trait for processing messages
125pub trait Handler<M, R>: Send + Sync {
126    fn handle(&mut self, message: M) -> Result<R, ActorError>;
127}
128
129/// Actor statistics
130#[derive(Debug, Clone, Default)]
131pub struct ActorStats {
132    pub messages_processed: u64,
133    pub messages_pending: usize,
134    pub total_processing_time_us: u64,
135    pub max_processing_time_us: u64,
136    pub avg_wait_time_us: u64,
137}
138
139/// Internal actor state
140struct ActorInner<M, R, H: Handler<M, R>> {
141    #[allow(dead_code)]
142    id: ActorId,
143    handler: H,
144    inbox: Receiver<Message<M>>,
145    running: Arc<AtomicBool>,
146    stats: ActorStats,
147    _phantom: std::marker::PhantomData<R>,
148}
149
150impl<M: Send + 'static, R: Send + 'static, H: Handler<M, R> + 'static> ActorInner<M, R, H> {
151    fn run(mut self, response_tx: Sender<Response<R>>) {
152        while self.running.load(Ordering::Acquire) {
153            match self.inbox.recv_timeout(Duration::from_millis(100)) {
154                Ok(msg) => {
155                    let wait_time = msg.age();
156                    let start = Instant::now();
157
158                    let result = self.handler.handle(msg.payload);
159
160                    let processing_time = start.elapsed();
161
162                    // Update stats
163                    self.stats.messages_processed += 1;
164                    let proc_us = processing_time.as_micros() as u64;
165                    self.stats.total_processing_time_us += proc_us;
166                    if proc_us > self.stats.max_processing_time_us {
167                        self.stats.max_processing_time_us = proc_us;
168                    }
169                    let wait_us = wait_time.as_micros() as u64;
170                    let n = self.stats.messages_processed;
171                    self.stats.avg_wait_time_us =
172                        (self.stats.avg_wait_time_us * (n - 1) + wait_us) / n;
173
174                    let _ = response_tx.send(Response {
175                        message_id: msg.id,
176                        result,
177                        processing_time,
178                    });
179                }
180                Err(crossbeam_channel::RecvTimeoutError::Timeout) => {
181                    // No message, check if still running
182                    continue;
183                }
184                Err(crossbeam_channel::RecvTimeoutError::Disconnected) => {
185                    // Sender dropped, stop actor
186                    break;
187                }
188            }
189        }
190    }
191}
192
193/// Actor reference for sending messages
194pub struct ActorRef<M, R> {
195    id: ActorId,
196    inbox: Sender<Message<M>>,
197    responses: Receiver<Response<R>>,
198    next_message_id: AtomicU64,
199    running: Arc<AtomicBool>,
200}
201
202impl<M: Send + 'static, R: Send + 'static> ActorRef<M, R> {
203    /// Send a message and wait for response
204    pub fn ask(&self, message: M) -> Result<R, ActorError> {
205        self.ask_timeout(message, Duration::from_secs(30))
206    }
207
208    /// Send a message with timeout
209    pub fn ask_timeout(&self, message: M, timeout: Duration) -> Result<R, ActorError> {
210        if !self.running.load(Ordering::Acquire) {
211            return Err(ActorError::ActorStopped);
212        }
213
214        let id = self.next_message_id.fetch_add(1, Ordering::SeqCst);
215        let msg = Message::new(id, message);
216
217        self.inbox
218            .send(msg)
219            .map_err(|_| ActorError::ActorStopped)?;
220
221        // Wait for response
222        match self.responses.recv_timeout(timeout) {
223            Ok(resp) => resp.result,
224            Err(_) => Err(ActorError::Timeout),
225        }
226    }
227
228    /// Send a message without waiting (fire-and-forget)
229    pub fn tell(&self, message: M) -> Result<(), ActorError> {
230        if !self.running.load(Ordering::Acquire) {
231            return Err(ActorError::ActorStopped);
232        }
233
234        let id = self.next_message_id.fetch_add(1, Ordering::SeqCst);
235        let msg = Message::new(id, message);
236
237        self.inbox
238            .try_send(msg)
239            .map_err(|e| match e {
240                crossbeam_channel::TrySendError::Full(_) => ActorError::MailboxFull,
241                crossbeam_channel::TrySendError::Disconnected(_) => ActorError::ActorStopped,
242            })
243    }
244
245    /// Check if actor is still running
246    pub fn is_running(&self) -> bool {
247        self.running.load(Ordering::Acquire)
248    }
249
250    /// Get actor ID
251    pub fn id(&self) -> ActorId {
252        self.id
253    }
254
255    /// Stop the actor
256    pub fn stop(&self) {
257        self.running.store(false, Ordering::Release);
258    }
259}
260
261/// Actor spawner
262pub struct Actor;
263
264impl Actor {
265    /// Spawn a new actor with the given handler
266    pub fn spawn<M, R, H>(id: ActorId, handler: H, mailbox_size: usize) -> (ActorRef<M, R>, JoinHandle<()>)
267    where
268        M: Send + 'static,
269        R: Send + 'static,
270        H: Handler<M, R> + 'static,
271    {
272        let (inbox_tx, inbox_rx) = bounded(mailbox_size);
273        let (resp_tx, resp_rx) = bounded(mailbox_size);
274        let running = Arc::new(AtomicBool::new(true));
275
276        let inner = ActorInner {
277            id,
278            handler,
279            inbox: inbox_rx,
280            running: Arc::clone(&running),
281            stats: ActorStats::default(),
282            _phantom: std::marker::PhantomData,
283        };
284
285        let handle = thread::spawn(move || {
286            inner.run(resp_tx);
287        });
288
289        let actor_ref = ActorRef {
290            id,
291            inbox: inbox_tx,
292            responses: resp_rx,
293            next_message_id: AtomicU64::new(1),
294            running,
295        };
296
297        (actor_ref, handle)
298    }
299}
300
301/// Connection pool using actor model
302pub struct ActorPool<M: Send + Clone + 'static, R: Send + 'static> {
303    actors: Vec<ActorRef<M, R>>,
304    handles: Mutex<Vec<JoinHandle<()>>>,
305    next_actor: AtomicUsize,
306    #[allow(dead_code)]
307    next_actor_id: AtomicU64,
308}
309
310impl<M: Send + Clone + 'static, R: Send + 'static> ActorPool<M, R> {
311    /// Create a new pool with the given factory
312    pub fn new<F, H>(size: usize, factory: F, mailbox_size: usize) -> Self
313    where
314        F: Fn() -> H,
315        H: Handler<M, R> + 'static,
316    {
317        let mut actors = Vec::with_capacity(size);
318        let mut handles = Vec::with_capacity(size);
319        let next_id = AtomicU64::new(1);
320
321        for _ in 0..size {
322            let id = next_id.fetch_add(1, Ordering::SeqCst);
323            let handler = factory();
324            let (actor_ref, handle) = Actor::spawn(id, handler, mailbox_size);
325            actors.push(actor_ref);
326            handles.push(handle);
327        }
328
329        Self {
330            actors,
331            handles: Mutex::new(handles),
332            next_actor: AtomicUsize::new(0),
333            next_actor_id: next_id,
334        }
335    }
336
337    /// Send a message using round-robin selection
338    pub fn ask(&self, message: M) -> Result<R, ActorError> {
339        let idx = self.next_actor.fetch_add(1, Ordering::Relaxed) % self.actors.len();
340        self.actors[idx].ask(message)
341    }
342
343    /// Send to a specific actor
344    pub fn ask_actor(&self, actor_idx: usize, message: M) -> Result<R, ActorError> {
345        if actor_idx >= self.actors.len() {
346            return Err(ActorError::HandlerError("Invalid actor index".to_string()));
347        }
348        self.actors[actor_idx].ask(message)
349    }
350
351    /// Broadcast to all actors
352    pub fn broadcast(&self, message: M) -> Vec<Result<R, ActorError>> {
353        self.actors.iter().map(|a| a.ask(message.clone())).collect()
354    }
355
356    /// Get number of actors
357    pub fn size(&self) -> usize {
358        self.actors.len()
359    }
360
361    /// Stop all actors
362    pub fn shutdown(&self) {
363        for actor in &self.actors {
364            actor.stop();
365        }
366
367        // Wait for all actors to finish
368        let mut handles = self.handles.lock();
369        for handle in handles.drain(..) {
370            let _ = handle.join();
371        }
372    }
373}
374
375impl<M: Send + Clone + 'static, R: Send + 'static> Drop for ActorPool<M, R> {
376    fn drop(&mut self) {
377        self.shutdown();
378    }
379}
380
381/// Work-stealing actor pool for better load balancing
382#[allow(dead_code)]
383pub struct WorkStealingPool<M: Send + 'static, R: Send + 'static> {
384    actors: Vec<Arc<ActorRef<M, R>>>,
385    queues: Vec<Arc<Mutex<VecDeque<Message<M>>>>>,
386    handles: Mutex<Vec<JoinHandle<()>>>,
387    running: Arc<AtomicBool>,
388}
389
390/// Affinity hint for actor assignment
391#[derive(Debug, Clone)]
392pub enum AffinityHint {
393    /// Route based on key hash (for locality)
394    KeyBased(u64),
395    /// Use least loaded actor
396    LeastLoaded,
397    /// Round-robin
398    RoundRobin,
399    /// Specific actor
400    Specific(ActorId),
401}
402
403/// Request router for intelligent actor selection
404pub struct RequestRouter<M: Send + Clone + 'static, R: Send + 'static> {
405    pool: Arc<ActorPool<M, R>>,
406    key_to_actor: Mutex<std::collections::HashMap<u64, usize>>,
407}
408
409impl<M: Send + Clone + 'static, R: Send + 'static> RequestRouter<M, R> {
410    pub fn new(pool: Arc<ActorPool<M, R>>) -> Self {
411        Self {
412            pool,
413            key_to_actor: Mutex::new(std::collections::HashMap::new()),
414        }
415    }
416
417    /// Route a request with affinity hint
418    pub fn route(&self, message: M, hint: AffinityHint) -> Result<R, ActorError> {
419        let actor_idx = match hint {
420            AffinityHint::KeyBased(key) => {
421                let mut mapping = self.key_to_actor.lock();
422                *mapping.entry(key).or_insert_with(|| {
423                    (key as usize) % self.pool.size()
424                })
425            }
426            AffinityHint::LeastLoaded => {
427                // Simple round-robin as proxy for least loaded
428                0 // In production, track queue depths
429            }
430            AffinityHint::RoundRobin => {
431                // Let the pool handle round-robin
432                return self.pool.ask(message);
433            }
434            AffinityHint::Specific(id) => {
435                (id as usize) % self.pool.size()
436            }
437        };
438
439        self.pool.ask_actor(actor_idx, message)
440    }
441}
442
443#[cfg(test)]
444mod tests {
445    use super::*;
446
447    struct EchoHandler;
448
449    impl Handler<String, String> for EchoHandler {
450        fn handle(&mut self, message: String) -> Result<String, ActorError> {
451            Ok(format!("Echo: {}", message))
452        }
453    }
454
455    struct CounterHandler {
456        count: u64,
457    }
458
459    impl Handler<(), u64> for CounterHandler {
460        fn handle(&mut self, _: ()) -> Result<u64, ActorError> {
461            self.count += 1;
462            Ok(self.count)
463        }
464    }
465
466    #[test]
467    fn test_actor_spawn() {
468        let (actor, handle) = Actor::spawn(1, EchoHandler, 10);
469
470        let result = actor.ask("Hello".to_string()).unwrap();
471        assert_eq!(result, "Echo: Hello");
472
473        actor.stop();
474        handle.join().unwrap();
475    }
476
477    #[test]
478    fn test_actor_pool() {
479        let pool = ActorPool::new(4, || EchoHandler, 100);
480
481        // Send multiple messages
482        let results: Vec<_> = (0..10)
483            .map(|i| pool.ask(format!("Message {}", i)))
484            .collect();
485
486        for (i, result) in results.into_iter().enumerate() {
487            assert_eq!(result.unwrap(), format!("Echo: Message {}", i));
488        }
489
490        pool.shutdown();
491    }
492
493    #[test]
494    fn test_counter_handler() {
495        let (actor, handle) = Actor::spawn(1, CounterHandler { count: 0 }, 10);
496
497        assert_eq!(actor.ask(()).unwrap(), 1);
498        assert_eq!(actor.ask(()).unwrap(), 2);
499        assert_eq!(actor.ask(()).unwrap(), 3);
500
501        actor.stop();
502        handle.join().unwrap();
503    }
504
505    #[test]
506    fn test_broadcast() {
507        let pool = ActorPool::new(4, || CounterHandler { count: 0 }, 100);
508
509        let results = pool.broadcast(());
510        assert_eq!(results.len(), 4);
511        for result in results {
512            assert_eq!(result.unwrap(), 1);
513        }
514
515        pool.shutdown();
516    }
517
518    #[test]
519    fn test_request_router() {
520        let pool = Arc::new(ActorPool::new(4, || EchoHandler, 100));
521        let router = RequestRouter::new(Arc::clone(&pool));
522
523        // Key-based routing should be consistent
524        let result1 = router.route("Test1".to_string(), AffinityHint::KeyBased(42)).unwrap();
525        let result2 = router.route("Test2".to_string(), AffinityHint::KeyBased(42)).unwrap();
526
527        assert!(result1.starts_with("Echo:"));
528        assert!(result2.starts_with("Echo:"));
529
530        pool.shutdown();
531    }
532}