Skip to main content

sochdb_storage/
actor.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// SochDB - LLM-Optimized Embedded Database
3// Copyright (C) 2026 Sushanth Reddy Vanagala (https://github.com/sushanthpy)
4//
5// This program is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Affero General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// This program is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU Affero General Public License for more details.
14//
15// You should have received a copy of the GNU Affero General Public License
16// along with this program. If not, see <https://www.gnu.org/licenses/>.
17
18//! Actor-Based Connection Manager
19//!
20//! From mm.md Task 7.2: Unified Connection Model
21//!
22//! ## Problem
23//!
24//! Current: One global Arc<Mutex<Database>> shared by all threads.
25//! Issue: Lock contention, complex lifetime management, no affinity.
26//!
27//! ## Solution
28//!
29//! Actor model with single-owner database connections:
30//!
31//! ```text
32//! ┌─────────────┐     ┌─────────────┐
33//! │   Client    │────>│   Actor 1   │──┐
34//! └─────────────┘     └─────────────┘  │
35//!                                      │  ┌──────────────────┐
36//! ┌─────────────┐     ┌─────────────┐  ├─>│    Database      │
37//! │   Client    │────>│   Actor 2   │──┤  │  (owned by pool) │
38//! └─────────────┘     └─────────────┘  │  └──────────────────┘
39//!                                      │
40//! ┌─────────────┐     ┌─────────────┐  │
41//! │   Client    │────>│   Actor 3   │──┘
42//! └─────────────┘     └─────────────┘
43//!
44//! Each Actor:
45//! - Owns its connection (no sharing)
46//! - Processes messages sequentially (no locks)
47//! - Has CPU affinity for cache locality
48//! ```
49//!
50//! ## Benefits
51//!
52//! - Zero lock contention within actor
53//! - Predictable latency (no lock wait)
54//! - Cache-friendly (single-threaded access pattern)
55//! - Natural backpressure through message queue
56
57use std::collections::VecDeque;
58use std::sync::Arc;
59use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering};
60use std::thread::{self, JoinHandle};
61use std::time::{Duration, Instant};
62
63use crossbeam_channel::{Receiver, Sender, bounded};
64use parking_lot::Mutex;
65
66/// Actor ID
67pub type ActorId = u64;
68
69/// Message ID for tracking
70pub type MessageId = u64;
71
72/// Actor message envelope
73pub struct Message<T> {
74    pub id: MessageId,
75    pub payload: T,
76    pub created_at: Instant,
77}
78
79impl<T> Message<T> {
80    pub fn new(id: MessageId, payload: T) -> Self {
81        Self {
82            id,
83            payload,
84            created_at: Instant::now(),
85        }
86    }
87
88    /// Age of the message
89    pub fn age(&self) -> Duration {
90        self.created_at.elapsed()
91    }
92}
93
94/// Response wrapper
95pub struct Response<R> {
96    pub message_id: MessageId,
97    pub result: Result<R, ActorError>,
98    pub processing_time: Duration,
99}
100
101/// Actor error types
102#[derive(Debug)]
103pub enum ActorError {
104    /// Mailbox is full
105    MailboxFull,
106    /// Actor is stopped
107    ActorStopped,
108    /// Handler error
109    HandlerError(String),
110    /// Timeout waiting for response
111    Timeout,
112}
113
114impl std::fmt::Display for ActorError {
115    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
116        match self {
117            ActorError::MailboxFull => write!(f, "Actor mailbox is full"),
118            ActorError::ActorStopped => write!(f, "Actor has stopped"),
119            ActorError::HandlerError(e) => write!(f, "Handler error: {}", e),
120            ActorError::Timeout => write!(f, "Request timed out"),
121        }
122    }
123}
124
125impl std::error::Error for ActorError {}
126
127/// Handler trait for processing messages
128pub trait Handler<M, R>: Send + Sync {
129    fn handle(&mut self, message: M) -> Result<R, ActorError>;
130}
131
132/// Actor statistics
133#[derive(Debug, Clone, Default)]
134pub struct ActorStats {
135    pub messages_processed: u64,
136    pub messages_pending: usize,
137    pub total_processing_time_us: u64,
138    pub max_processing_time_us: u64,
139    pub avg_wait_time_us: u64,
140}
141
142/// Internal actor state
143struct ActorInner<M, R, H: Handler<M, R>> {
144    #[allow(dead_code)]
145    id: ActorId,
146    handler: H,
147    inbox: Receiver<Message<M>>,
148    running: Arc<AtomicBool>,
149    stats: ActorStats,
150    _phantom: std::marker::PhantomData<R>,
151}
152
153impl<M: Send + 'static, R: Send + 'static, H: Handler<M, R> + 'static> ActorInner<M, R, H> {
154    fn run(mut self, response_tx: Sender<Response<R>>) {
155        while self.running.load(Ordering::Acquire) {
156            match self.inbox.recv_timeout(Duration::from_millis(100)) {
157                Ok(msg) => {
158                    let wait_time = msg.age();
159                    let start = Instant::now();
160
161                    let result = self.handler.handle(msg.payload);
162
163                    let processing_time = start.elapsed();
164
165                    // Update stats
166                    self.stats.messages_processed += 1;
167                    let proc_us = processing_time.as_micros() as u64;
168                    self.stats.total_processing_time_us += proc_us;
169                    if proc_us > self.stats.max_processing_time_us {
170                        self.stats.max_processing_time_us = proc_us;
171                    }
172                    let wait_us = wait_time.as_micros() as u64;
173                    let n = self.stats.messages_processed;
174                    self.stats.avg_wait_time_us =
175                        (self.stats.avg_wait_time_us * (n - 1) + wait_us) / n;
176
177                    let _ = response_tx.send(Response {
178                        message_id: msg.id,
179                        result,
180                        processing_time,
181                    });
182                }
183                Err(crossbeam_channel::RecvTimeoutError::Timeout) => {
184                    // No message, check if still running
185                    continue;
186                }
187                Err(crossbeam_channel::RecvTimeoutError::Disconnected) => {
188                    // Sender dropped, stop actor
189                    break;
190                }
191            }
192        }
193    }
194}
195
196/// Actor reference for sending messages
197pub struct ActorRef<M, R> {
198    id: ActorId,
199    inbox: Sender<Message<M>>,
200    responses: Receiver<Response<R>>,
201    next_message_id: AtomicU64,
202    running: Arc<AtomicBool>,
203}
204
205impl<M: Send + 'static, R: Send + 'static> ActorRef<M, R> {
206    /// Send a message and wait for response
207    pub fn ask(&self, message: M) -> Result<R, ActorError> {
208        self.ask_timeout(message, Duration::from_secs(30))
209    }
210
211    /// Send a message with timeout
212    pub fn ask_timeout(&self, message: M, timeout: Duration) -> Result<R, ActorError> {
213        if !self.running.load(Ordering::Acquire) {
214            return Err(ActorError::ActorStopped);
215        }
216
217        let id = self.next_message_id.fetch_add(1, Ordering::SeqCst);
218        let msg = Message::new(id, message);
219
220        self.inbox.send(msg).map_err(|_| ActorError::ActorStopped)?;
221
222        // Wait for response
223        match self.responses.recv_timeout(timeout) {
224            Ok(resp) => resp.result,
225            Err(_) => Err(ActorError::Timeout),
226        }
227    }
228
229    /// Send a message without waiting (fire-and-forget)
230    pub fn tell(&self, message: M) -> Result<(), ActorError> {
231        if !self.running.load(Ordering::Acquire) {
232            return Err(ActorError::ActorStopped);
233        }
234
235        let id = self.next_message_id.fetch_add(1, Ordering::SeqCst);
236        let msg = Message::new(id, message);
237
238        self.inbox.try_send(msg).map_err(|e| match e {
239            crossbeam_channel::TrySendError::Full(_) => ActorError::MailboxFull,
240            crossbeam_channel::TrySendError::Disconnected(_) => ActorError::ActorStopped,
241        })
242    }
243
244    /// Check if actor is still running
245    pub fn is_running(&self) -> bool {
246        self.running.load(Ordering::Acquire)
247    }
248
249    /// Get actor ID
250    pub fn id(&self) -> ActorId {
251        self.id
252    }
253
254    /// Stop the actor
255    pub fn stop(&self) {
256        self.running.store(false, Ordering::Release);
257    }
258}
259
260/// Actor spawner
261pub struct Actor;
262
263impl Actor {
264    /// Spawn a new actor with the given handler
265    pub fn spawn<M, R, H>(
266        id: ActorId,
267        handler: H,
268        mailbox_size: usize,
269    ) -> (ActorRef<M, R>, JoinHandle<()>)
270    where
271        M: Send + 'static,
272        R: Send + 'static,
273        H: Handler<M, R> + 'static,
274    {
275        let (inbox_tx, inbox_rx) = bounded(mailbox_size);
276        let (resp_tx, resp_rx) = bounded(mailbox_size);
277        let running = Arc::new(AtomicBool::new(true));
278
279        let inner = ActorInner {
280            id,
281            handler,
282            inbox: inbox_rx,
283            running: Arc::clone(&running),
284            stats: ActorStats::default(),
285            _phantom: std::marker::PhantomData,
286        };
287
288        let handle = thread::spawn(move || {
289            inner.run(resp_tx);
290        });
291
292        let actor_ref = ActorRef {
293            id,
294            inbox: inbox_tx,
295            responses: resp_rx,
296            next_message_id: AtomicU64::new(1),
297            running,
298        };
299
300        (actor_ref, handle)
301    }
302}
303
304/// Connection pool using actor model
305pub struct ActorPool<M: Send + Clone + 'static, R: Send + 'static> {
306    actors: Vec<ActorRef<M, R>>,
307    handles: Mutex<Vec<JoinHandle<()>>>,
308    next_actor: AtomicUsize,
309    #[allow(dead_code)]
310    next_actor_id: AtomicU64,
311}
312
313impl<M: Send + Clone + 'static, R: Send + 'static> ActorPool<M, R> {
314    /// Create a new pool with the given factory
315    pub fn new<F, H>(size: usize, factory: F, mailbox_size: usize) -> Self
316    where
317        F: Fn() -> H,
318        H: Handler<M, R> + 'static,
319    {
320        let mut actors = Vec::with_capacity(size);
321        let mut handles = Vec::with_capacity(size);
322        let next_id = AtomicU64::new(1);
323
324        for _ in 0..size {
325            let id = next_id.fetch_add(1, Ordering::SeqCst);
326            let handler = factory();
327            let (actor_ref, handle) = Actor::spawn(id, handler, mailbox_size);
328            actors.push(actor_ref);
329            handles.push(handle);
330        }
331
332        Self {
333            actors,
334            handles: Mutex::new(handles),
335            next_actor: AtomicUsize::new(0),
336            next_actor_id: next_id,
337        }
338    }
339
340    /// Send a message using round-robin selection
341    pub fn ask(&self, message: M) -> Result<R, ActorError> {
342        let idx = self.next_actor.fetch_add(1, Ordering::Relaxed) % self.actors.len();
343        self.actors[idx].ask(message)
344    }
345
346    /// Send to a specific actor
347    pub fn ask_actor(&self, actor_idx: usize, message: M) -> Result<R, ActorError> {
348        if actor_idx >= self.actors.len() {
349            return Err(ActorError::HandlerError("Invalid actor index".to_string()));
350        }
351        self.actors[actor_idx].ask(message)
352    }
353
354    /// Broadcast to all actors
355    pub fn broadcast(&self, message: M) -> Vec<Result<R, ActorError>> {
356        self.actors.iter().map(|a| a.ask(message.clone())).collect()
357    }
358
359    /// Get number of actors
360    pub fn size(&self) -> usize {
361        self.actors.len()
362    }
363
364    /// Stop all actors
365    pub fn shutdown(&self) {
366        for actor in &self.actors {
367            actor.stop();
368        }
369
370        // Wait for all actors to finish
371        let mut handles = self.handles.lock();
372        for handle in handles.drain(..) {
373            let _ = handle.join();
374        }
375    }
376}
377
378impl<M: Send + Clone + 'static, R: Send + 'static> Drop for ActorPool<M, R> {
379    fn drop(&mut self) {
380        self.shutdown();
381    }
382}
383
384/// Work-stealing actor pool for better load balancing
385#[allow(dead_code)]
386pub struct WorkStealingPool<M: Send + 'static, R: Send + 'static> {
387    actors: Vec<Arc<ActorRef<M, R>>>,
388    queues: Vec<Arc<Mutex<VecDeque<Message<M>>>>>,
389    handles: Mutex<Vec<JoinHandle<()>>>,
390    running: Arc<AtomicBool>,
391}
392
393/// Affinity hint for actor assignment
394#[derive(Debug, Clone)]
395pub enum AffinityHint {
396    /// Route based on key hash (for locality)
397    KeyBased(u64),
398    /// Use least loaded actor
399    LeastLoaded,
400    /// Round-robin
401    RoundRobin,
402    /// Specific actor
403    Specific(ActorId),
404}
405
406/// Request router for intelligent actor selection
407pub struct RequestRouter<M: Send + Clone + 'static, R: Send + 'static> {
408    pool: Arc<ActorPool<M, R>>,
409    key_to_actor: Mutex<std::collections::HashMap<u64, usize>>,
410}
411
412impl<M: Send + Clone + 'static, R: Send + 'static> RequestRouter<M, R> {
413    pub fn new(pool: Arc<ActorPool<M, R>>) -> Self {
414        Self {
415            pool,
416            key_to_actor: Mutex::new(std::collections::HashMap::new()),
417        }
418    }
419
420    /// Route a request with affinity hint
421    pub fn route(&self, message: M, hint: AffinityHint) -> Result<R, ActorError> {
422        let actor_idx = match hint {
423            AffinityHint::KeyBased(key) => {
424                let mut mapping = self.key_to_actor.lock();
425                *mapping
426                    .entry(key)
427                    .or_insert_with(|| (key as usize) % self.pool.size())
428            }
429            AffinityHint::LeastLoaded => {
430                // Simple round-robin as proxy for least loaded
431                0 // In production, track queue depths
432            }
433            AffinityHint::RoundRobin => {
434                // Let the pool handle round-robin
435                return self.pool.ask(message);
436            }
437            AffinityHint::Specific(id) => (id as usize) % self.pool.size(),
438        };
439
440        self.pool.ask_actor(actor_idx, message)
441    }
442}
443
444#[cfg(test)]
445mod tests {
446    use super::*;
447
448    struct EchoHandler;
449
450    impl Handler<String, String> for EchoHandler {
451        fn handle(&mut self, message: String) -> Result<String, ActorError> {
452            Ok(format!("Echo: {}", message))
453        }
454    }
455
456    struct CounterHandler {
457        count: u64,
458    }
459
460    impl Handler<(), u64> for CounterHandler {
461        fn handle(&mut self, _: ()) -> Result<u64, ActorError> {
462            self.count += 1;
463            Ok(self.count)
464        }
465    }
466
467    #[test]
468    fn test_actor_spawn() {
469        let (actor, handle) = Actor::spawn(1, EchoHandler, 10);
470
471        let result = actor.ask("Hello".to_string()).unwrap();
472        assert_eq!(result, "Echo: Hello");
473
474        actor.stop();
475        handle.join().unwrap();
476    }
477
478    #[test]
479    fn test_actor_pool() {
480        let pool = ActorPool::new(4, || EchoHandler, 100);
481
482        // Send multiple messages
483        let results: Vec<_> = (0..10)
484            .map(|i| pool.ask(format!("Message {}", i)))
485            .collect();
486
487        for (i, result) in results.into_iter().enumerate() {
488            assert_eq!(result.unwrap(), format!("Echo: Message {}", i));
489        }
490
491        pool.shutdown();
492    }
493
494    #[test]
495    fn test_counter_handler() {
496        let (actor, handle) = Actor::spawn(1, CounterHandler { count: 0 }, 10);
497
498        assert_eq!(actor.ask(()).unwrap(), 1);
499        assert_eq!(actor.ask(()).unwrap(), 2);
500        assert_eq!(actor.ask(()).unwrap(), 3);
501
502        actor.stop();
503        handle.join().unwrap();
504    }
505
506    #[test]
507    fn test_broadcast() {
508        let pool = ActorPool::new(4, || CounterHandler { count: 0 }, 100);
509
510        let results = pool.broadcast(());
511        assert_eq!(results.len(), 4);
512        for result in results {
513            assert_eq!(result.unwrap(), 1);
514        }
515
516        pool.shutdown();
517    }
518
519    #[test]
520    fn test_request_router() {
521        let pool = Arc::new(ActorPool::new(4, || EchoHandler, 100));
522        let router = RequestRouter::new(Arc::clone(&pool));
523
524        // Key-based routing should be consistent
525        let result1 = router
526            .route("Test1".to_string(), AffinityHint::KeyBased(42))
527            .unwrap();
528        let result2 = router
529            .route("Test2".to_string(), AffinityHint::KeyBased(42))
530            .unwrap();
531
532        assert!(result1.starts_with("Echo:"));
533        assert!(result2.starts_with("Echo:"));
534
535        pool.shutdown();
536    }
537}