Skip to main content

sochdb_grpc/
blocking_pool.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// SochDB - LLM-Optimized Embedded Database
3// Copyright (C) 2026 Sushanth Reddy Vanagala (https://github.com/sushanthpy)
4//
5// This program is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Affero General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// This program is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU Affero General Public License for more details.
14//
15// You should have received a copy of the GNU Affero General Public License
16// along with this program. If not, see <https://www.gnu.org/licenses/>.
17
18//! # Async Boundary Hardening
19//!
20//! Provides bounded blocking pools to prevent Tokio executor starvation when
21//! the sync-first storage layer interacts with the async gRPC server.
22//!
23//! ## Problem Statement
24//!
25//! The codebase commits to sync-first storage (std::fs + std::thread + parking_lot).
26//! When gRPC (Tokio/tonic) is exposed, we have a **two-scheduler system**:
27//! - Tokio workers: optimized for short non-blocking tasks
28//! - Storage: blocking I/O with page faults, fsync stalls, cache misses
29//!
30//! If blocking storage work runs on Tokio workers → scheduler inversion:
31//! - Stalled workers reduce effective concurrency
32//! - Tail latency spikes
33//! - Potential deadlocks if background tasks depend on same executor
34//!
35//! ## Solution: Bounded Blocking Pools
36//!
37//! Separate pools for different workload patterns:
38//! - **Request Pool**: Point reads, small writes (low latency priority)
39//! - **Compaction Pool**: Sequential scans, merges (throughput priority)
40//! - **Checkpoint Pool**: Snapshot creation, WAL archiving (background)
41//!
42//! ## Queueing Theory
43//!
44//! Model as M/M/c: bounded pools prevent `c` from being "stolen" by long tasks.
45//! Pool sizing: `blocking_threads ≈ 2×cores` for mixed I/O+CPU, capped to
46//! prevent memory blowups (each thread has stack + allocator footprint).
47
48use std::future::Future;
49use std::pin::Pin;
50use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
51use std::sync::Arc;
52use std::thread;
53use std::time::{Duration, Instant};
54
55use crossbeam_channel::{bounded, Receiver, Sender, TrySendError};
56use parking_lot::{Condvar, Mutex};
57
58/// Pool type for workload isolation
59#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
60pub enum PoolType {
61    /// Request path: point reads, small writes
62    Request,
63    /// Compaction/GC: sequential scans, merges
64    Compaction,
65    /// Checkpoint/backup: snapshot, WAL archiving
66    Checkpoint,
67}
68
69impl PoolType {
70    /// Get recommended pool size for this workload type
71    pub fn default_size(&self) -> usize {
72        let cores = num_cpus::get();
73        match self {
74            PoolType::Request => (cores * 2).clamp(4, 64),
75            PoolType::Compaction => cores.clamp(2, 16),
76            PoolType::Checkpoint => 2.max(cores / 4),
77        }
78    }
79
80    /// Get queue depth for this pool
81    pub fn default_queue_depth(&self) -> usize {
82        match self {
83            PoolType::Request => 1024,
84            PoolType::Compaction => 64,
85            PoolType::Checkpoint => 16,
86        }
87    }
88}
89
90/// Configuration for a blocking pool
91#[derive(Debug, Clone)]
92pub struct BlockingPoolConfig {
93    /// Pool type
94    pub pool_type: PoolType,
95    /// Number of worker threads
96    pub num_threads: usize,
97    /// Maximum queue depth before backpressure
98    pub queue_depth: usize,
99    /// Stack size per thread (default 2MB)
100    pub stack_size: usize,
101    /// Thread name prefix
102    pub name_prefix: String,
103}
104
105impl BlockingPoolConfig {
106    /// Create config for request pool
107    pub fn request() -> Self {
108        Self {
109            pool_type: PoolType::Request,
110            num_threads: PoolType::Request.default_size(),
111            queue_depth: PoolType::Request.default_queue_depth(),
112            stack_size: 2 * 1024 * 1024,
113            name_prefix: "sochdb-req".to_string(),
114        }
115    }
116
117    /// Create config for compaction pool
118    pub fn compaction() -> Self {
119        Self {
120            pool_type: PoolType::Compaction,
121            num_threads: PoolType::Compaction.default_size(),
122            queue_depth: PoolType::Compaction.default_queue_depth(),
123            stack_size: 4 * 1024 * 1024, // Larger for compaction
124            name_prefix: "sochdb-compact".to_string(),
125        }
126    }
127
128    /// Create config for checkpoint pool
129    pub fn checkpoint() -> Self {
130        Self {
131            pool_type: PoolType::Checkpoint,
132            num_threads: PoolType::Checkpoint.default_size(),
133            queue_depth: PoolType::Checkpoint.default_queue_depth(),
134            stack_size: 2 * 1024 * 1024,
135            name_prefix: "sochdb-ckpt".to_string(),
136        }
137    }
138}
139
140/// Task to execute on a blocking pool
141type BlockingTask = Box<dyn FnOnce() + Send + 'static>;
142
143/// Pool metrics for observability
144#[derive(Debug, Default)]
145pub struct PoolMetrics {
146    /// Total tasks submitted
147    pub tasks_submitted: AtomicU64,
148    /// Tasks completed successfully
149    pub tasks_completed: AtomicU64,
150    /// Tasks rejected due to queue full
151    pub tasks_rejected: AtomicU64,
152    /// Current queue depth
153    pub queue_depth: AtomicUsize,
154    /// Active workers
155    pub active_workers: AtomicUsize,
156    /// Total execution time (microseconds)
157    pub total_exec_time_us: AtomicU64,
158    /// Maximum execution time seen (microseconds)
159    pub max_exec_time_us: AtomicU64,
160}
161
162impl PoolMetrics {
163    /// Record task execution
164    pub fn record_execution(&self, duration: Duration) {
165        self.tasks_completed.fetch_add(1, Ordering::Relaxed);
166        let us = duration.as_micros() as u64;
167        self.total_exec_time_us.fetch_add(us, Ordering::Relaxed);
168        // Update max (lockless, may miss some updates)
169        let _ = self.max_exec_time_us.fetch_max(us, Ordering::Relaxed);
170    }
171
172    /// Get average execution time
173    pub fn avg_exec_time_us(&self) -> u64 {
174        let completed = self.tasks_completed.load(Ordering::Relaxed);
175        if completed == 0 {
176            return 0;
177        }
178        self.total_exec_time_us.load(Ordering::Relaxed) / completed
179    }
180}
181
182/// A bounded blocking thread pool
183pub struct BlockingPool {
184    config: BlockingPoolConfig,
185    sender: Sender<BlockingTask>,
186    metrics: Arc<PoolMetrics>,
187    shutdown: Arc<(Mutex<bool>, Condvar)>,
188}
189
190impl BlockingPool {
191    /// Create a new blocking pool
192    pub fn new(config: BlockingPoolConfig) -> Self {
193        let (sender, receiver) = bounded(config.queue_depth);
194        let metrics = Arc::new(PoolMetrics::default());
195        let shutdown = Arc::new((Mutex::new(false), Condvar::new()));
196
197        // Spawn worker threads
198        for i in 0..config.num_threads {
199            let receiver = receiver.clone();
200            let metrics = metrics.clone();
201            let shutdown = shutdown.clone();
202            let name = format!("{}-{}", config.name_prefix, i);
203
204            thread::Builder::new()
205                .name(name)
206                .stack_size(config.stack_size)
207                .spawn(move || {
208                    Self::worker_loop(receiver, metrics, shutdown);
209                })
210                .expect("Failed to spawn blocking pool worker");
211        }
212
213        Self {
214            config,
215            sender,
216            metrics,
217            shutdown,
218        }
219    }
220
221    /// Worker thread main loop
222    fn worker_loop(
223        receiver: Receiver<BlockingTask>,
224        metrics: Arc<PoolMetrics>,
225        shutdown: Arc<(Mutex<bool>, Condvar)>,
226    ) {
227        loop {
228            // Check shutdown
229            {
230                let (lock, _) = &*shutdown;
231                if *lock.lock() {
232                    break;
233                }
234            }
235
236            match receiver.recv_timeout(Duration::from_millis(100)) {
237                Ok(task) => {
238                    metrics.active_workers.fetch_add(1, Ordering::Relaxed);
239                    let start = Instant::now();
240
241                    // Execute the task
242                    task();
243
244                    let elapsed = start.elapsed();
245                    metrics.record_execution(elapsed);
246                    metrics.active_workers.fetch_sub(1, Ordering::Relaxed);
247                    metrics.queue_depth.fetch_sub(1, Ordering::Relaxed);
248                }
249                Err(crossbeam_channel::RecvTimeoutError::Timeout) => {
250                    // Continue checking for shutdown
251                }
252                Err(crossbeam_channel::RecvTimeoutError::Disconnected) => {
253                    break;
254                }
255            }
256        }
257    }
258
259    /// Submit a task to the pool
260    ///
261    /// Returns Err if the queue is full (backpressure)
262    pub fn try_submit<F>(&self, task: F) -> Result<(), BlockingPoolError>
263    where
264        F: FnOnce() + Send + 'static,
265    {
266        self.metrics.tasks_submitted.fetch_add(1, Ordering::Relaxed);
267
268        match self.sender.try_send(Box::new(task)) {
269            Ok(()) => {
270                self.metrics.queue_depth.fetch_add(1, Ordering::Relaxed);
271                Ok(())
272            }
273            Err(TrySendError::Full(_)) => {
274                self.metrics.tasks_rejected.fetch_add(1, Ordering::Relaxed);
275                Err(BlockingPoolError::QueueFull)
276            }
277            Err(TrySendError::Disconnected(_)) => {
278                Err(BlockingPoolError::PoolShutdown)
279            }
280        }
281    }
282
283    /// Submit a task and block until queue has space
284    pub fn submit<F>(&self, task: F) -> Result<(), BlockingPoolError>
285    where
286        F: FnOnce() + Send + 'static,
287    {
288        self.metrics.tasks_submitted.fetch_add(1, Ordering::Relaxed);
289        self.metrics.queue_depth.fetch_add(1, Ordering::Relaxed);
290
291        self.sender
292            .send(Box::new(task))
293            .map_err(|_| BlockingPoolError::PoolShutdown)
294    }
295
296    /// Get pool metrics
297    pub fn metrics(&self) -> &PoolMetrics {
298        &self.metrics
299    }
300
301    /// Get pool type
302    pub fn pool_type(&self) -> PoolType {
303        self.config.pool_type
304    }
305
306    /// Shutdown the pool gracefully
307    pub fn shutdown(&self) {
308        let (lock, cvar) = &*self.shutdown;
309        *lock.lock() = true;
310        cvar.notify_all();
311    }
312}
313
314impl Drop for BlockingPool {
315    fn drop(&mut self) {
316        self.shutdown();
317    }
318}
319
320/// Blocking pool error
321#[derive(Debug, Clone, Copy, PartialEq, Eq)]
322pub enum BlockingPoolError {
323    /// Queue is full, backpressure required
324    QueueFull,
325    /// Pool has been shut down
326    PoolShutdown,
327}
328
329impl std::fmt::Display for BlockingPoolError {
330    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
331        match self {
332            BlockingPoolError::QueueFull => write!(f, "Blocking pool queue is full"),
333            BlockingPoolError::PoolShutdown => write!(f, "Blocking pool has been shut down"),
334        }
335    }
336}
337
338impl std::error::Error for BlockingPoolError {}
339
340/// Manager for multiple blocking pools with workload isolation
341pub struct BlockingPoolManager {
342    request_pool: BlockingPool,
343    compaction_pool: BlockingPool,
344    checkpoint_pool: BlockingPool,
345}
346
347impl BlockingPoolManager {
348    /// Create with default configurations
349    pub fn new() -> Self {
350        Self {
351            request_pool: BlockingPool::new(BlockingPoolConfig::request()),
352            compaction_pool: BlockingPool::new(BlockingPoolConfig::compaction()),
353            checkpoint_pool: BlockingPool::new(BlockingPoolConfig::checkpoint()),
354        }
355    }
356
357    /// Create with custom configurations
358    pub fn with_configs(
359        request_config: BlockingPoolConfig,
360        compaction_config: BlockingPoolConfig,
361        checkpoint_config: BlockingPoolConfig,
362    ) -> Self {
363        Self {
364            request_pool: BlockingPool::new(request_config),
365            compaction_pool: BlockingPool::new(compaction_config),
366            checkpoint_pool: BlockingPool::new(checkpoint_config),
367        }
368    }
369
370    /// Get pool by type
371    pub fn pool(&self, pool_type: PoolType) -> &BlockingPool {
372        match pool_type {
373            PoolType::Request => &self.request_pool,
374            PoolType::Compaction => &self.compaction_pool,
375            PoolType::Checkpoint => &self.checkpoint_pool,
376        }
377    }
378
379    /// Submit to request pool
380    pub fn submit_request<F>(&self, task: F) -> Result<(), BlockingPoolError>
381    where
382        F: FnOnce() + Send + 'static,
383    {
384        self.request_pool.try_submit(task)
385    }
386
387    /// Submit to compaction pool
388    pub fn submit_compaction<F>(&self, task: F) -> Result<(), BlockingPoolError>
389    where
390        F: FnOnce() + Send + 'static,
391    {
392        self.compaction_pool.submit(task)
393    }
394
395    /// Submit to checkpoint pool
396    pub fn submit_checkpoint<F>(&self, task: F) -> Result<(), BlockingPoolError>
397    where
398        F: FnOnce() + Send + 'static,
399    {
400        self.checkpoint_pool.submit(task)
401    }
402
403    /// Get all pool metrics
404    pub fn all_metrics(&self) -> Vec<(PoolType, &PoolMetrics)> {
405        vec![
406            (PoolType::Request, self.request_pool.metrics()),
407            (PoolType::Compaction, self.compaction_pool.metrics()),
408            (PoolType::Checkpoint, self.checkpoint_pool.metrics()),
409        ]
410    }
411
412    /// Shutdown all pools
413    pub fn shutdown(&self) {
414        self.request_pool.shutdown();
415        self.compaction_pool.shutdown();
416        self.checkpoint_pool.shutdown();
417    }
418}
419
420impl Default for BlockingPoolManager {
421    fn default() -> Self {
422        Self::new()
423    }
424}
425
426/// Async wrapper for blocking pool operations
427/// 
428/// Bridges the async gRPC layer with the sync storage layer by spawning
429/// blocking work on dedicated pools and returning futures.
430#[cfg(feature = "async")]
431pub mod async_bridge {
432    use super::*;
433    use tokio::sync::oneshot;
434
435    /// Execute a blocking operation on the specified pool, returning a future
436    pub fn spawn_blocking<F, R>(
437        pool: &BlockingPool,
438        f: F,
439    ) -> impl Future<Output = Result<R, BlockingPoolError>>
440    where
441        F: FnOnce() -> R + Send + 'static,
442        R: Send + 'static,
443    {
444        let (tx, rx) = oneshot::channel();
445
446        let result = pool.try_submit(move || {
447            let result = f();
448            let _ = tx.send(result);
449        });
450
451        async move {
452            result?;
453            rx.await.map_err(|_| BlockingPoolError::PoolShutdown)
454        }
455    }
456}
457
458#[cfg(test)]
459mod tests {
460    use super::*;
461    use std::sync::atomic::AtomicUsize;
462
463    #[test]
464    fn test_pool_basic_execution() {
465        let pool = BlockingPool::new(BlockingPoolConfig::request());
466        let counter = Arc::new(AtomicUsize::new(0));
467
468        for _ in 0..10 {
469            let counter = counter.clone();
470            pool.submit(move || {
471                counter.fetch_add(1, Ordering::SeqCst);
472            })
473            .unwrap();
474        }
475
476        // Wait for completion
477        thread::sleep(Duration::from_millis(100));
478        assert_eq!(counter.load(Ordering::SeqCst), 10);
479    }
480
481    #[test]
482    fn test_pool_backpressure() {
483        let config = BlockingPoolConfig {
484            pool_type: PoolType::Request,
485            num_threads: 1,
486            queue_depth: 2,
487            stack_size: 2 * 1024 * 1024,
488            name_prefix: "test".to_string(),
489        };
490        let pool = BlockingPool::new(config);
491
492        // Submit tasks that will block
493        for _ in 0..2 {
494            pool.try_submit(move || {
495                thread::sleep(Duration::from_secs(1));
496            })
497            .unwrap();
498        }
499
500        // This should fail with QueueFull
501        let result = pool.try_submit(|| {});
502        assert!(matches!(result, Err(BlockingPoolError::QueueFull)));
503    }
504
505    #[test]
506    fn test_pool_metrics() {
507        let pool = BlockingPool::new(BlockingPoolConfig::request());
508
509        pool.submit(|| {
510            thread::sleep(Duration::from_millis(10));
511        })
512        .unwrap();
513
514        thread::sleep(Duration::from_millis(50));
515
516        assert!(pool.metrics().tasks_completed.load(Ordering::Relaxed) >= 1);
517        assert!(pool.metrics().total_exec_time_us.load(Ordering::Relaxed) > 0);
518    }
519
520    #[test]
521    fn test_pool_manager() {
522        let manager = BlockingPoolManager::new();
523
524        manager.submit_request(|| {}).unwrap();
525        manager.submit_compaction(|| {}).unwrap();
526        manager.submit_checkpoint(|| {}).unwrap();
527
528        thread::sleep(Duration::from_millis(50));
529
530        for (pool_type, metrics) in manager.all_metrics() {
531            assert!(metrics.tasks_submitted.load(Ordering::Relaxed) >= 1);
532        }
533    }
534}