1use std::future::Future;
49use std::pin::Pin;
50use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
51use std::sync::Arc;
52use std::thread;
53use std::time::{Duration, Instant};
54
55use crossbeam_channel::{bounded, Receiver, Sender, TrySendError};
56use parking_lot::{Condvar, Mutex};
57
58#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
60pub enum PoolType {
61 Request,
63 Compaction,
65 Checkpoint,
67}
68
69impl PoolType {
70 pub fn default_size(&self) -> usize {
72 let cores = num_cpus::get();
73 match self {
74 PoolType::Request => (cores * 2).clamp(4, 64),
75 PoolType::Compaction => cores.clamp(2, 16),
76 PoolType::Checkpoint => 2.max(cores / 4),
77 }
78 }
79
80 pub fn default_queue_depth(&self) -> usize {
82 match self {
83 PoolType::Request => 1024,
84 PoolType::Compaction => 64,
85 PoolType::Checkpoint => 16,
86 }
87 }
88}
89
90#[derive(Debug, Clone)]
92pub struct BlockingPoolConfig {
93 pub pool_type: PoolType,
95 pub num_threads: usize,
97 pub queue_depth: usize,
99 pub stack_size: usize,
101 pub name_prefix: String,
103}
104
105impl BlockingPoolConfig {
106 pub fn request() -> Self {
108 Self {
109 pool_type: PoolType::Request,
110 num_threads: PoolType::Request.default_size(),
111 queue_depth: PoolType::Request.default_queue_depth(),
112 stack_size: 2 * 1024 * 1024,
113 name_prefix: "sochdb-req".to_string(),
114 }
115 }
116
117 pub fn compaction() -> Self {
119 Self {
120 pool_type: PoolType::Compaction,
121 num_threads: PoolType::Compaction.default_size(),
122 queue_depth: PoolType::Compaction.default_queue_depth(),
123 stack_size: 4 * 1024 * 1024, name_prefix: "sochdb-compact".to_string(),
125 }
126 }
127
128 pub fn checkpoint() -> Self {
130 Self {
131 pool_type: PoolType::Checkpoint,
132 num_threads: PoolType::Checkpoint.default_size(),
133 queue_depth: PoolType::Checkpoint.default_queue_depth(),
134 stack_size: 2 * 1024 * 1024,
135 name_prefix: "sochdb-ckpt".to_string(),
136 }
137 }
138}
139
140type BlockingTask = Box<dyn FnOnce() + Send + 'static>;
142
143#[derive(Debug, Default)]
145pub struct PoolMetrics {
146 pub tasks_submitted: AtomicU64,
148 pub tasks_completed: AtomicU64,
150 pub tasks_rejected: AtomicU64,
152 pub queue_depth: AtomicUsize,
154 pub active_workers: AtomicUsize,
156 pub total_exec_time_us: AtomicU64,
158 pub max_exec_time_us: AtomicU64,
160}
161
162impl PoolMetrics {
163 pub fn record_execution(&self, duration: Duration) {
165 self.tasks_completed.fetch_add(1, Ordering::Relaxed);
166 let us = duration.as_micros() as u64;
167 self.total_exec_time_us.fetch_add(us, Ordering::Relaxed);
168 let _ = self.max_exec_time_us.fetch_max(us, Ordering::Relaxed);
170 }
171
172 pub fn avg_exec_time_us(&self) -> u64 {
174 let completed = self.tasks_completed.load(Ordering::Relaxed);
175 if completed == 0 {
176 return 0;
177 }
178 self.total_exec_time_us.load(Ordering::Relaxed) / completed
179 }
180}
181
182pub struct BlockingPool {
184 config: BlockingPoolConfig,
185 sender: Sender<BlockingTask>,
186 metrics: Arc<PoolMetrics>,
187 shutdown: Arc<(Mutex<bool>, Condvar)>,
188}
189
190impl BlockingPool {
191 pub fn new(config: BlockingPoolConfig) -> Self {
193 let (sender, receiver) = bounded(config.queue_depth);
194 let metrics = Arc::new(PoolMetrics::default());
195 let shutdown = Arc::new((Mutex::new(false), Condvar::new()));
196
197 for i in 0..config.num_threads {
199 let receiver = receiver.clone();
200 let metrics = metrics.clone();
201 let shutdown = shutdown.clone();
202 let name = format!("{}-{}", config.name_prefix, i);
203
204 thread::Builder::new()
205 .name(name)
206 .stack_size(config.stack_size)
207 .spawn(move || {
208 Self::worker_loop(receiver, metrics, shutdown);
209 })
210 .expect("Failed to spawn blocking pool worker");
211 }
212
213 Self {
214 config,
215 sender,
216 metrics,
217 shutdown,
218 }
219 }
220
221 fn worker_loop(
223 receiver: Receiver<BlockingTask>,
224 metrics: Arc<PoolMetrics>,
225 shutdown: Arc<(Mutex<bool>, Condvar)>,
226 ) {
227 loop {
228 {
230 let (lock, _) = &*shutdown;
231 if *lock.lock() {
232 break;
233 }
234 }
235
236 match receiver.recv_timeout(Duration::from_millis(100)) {
237 Ok(task) => {
238 metrics.active_workers.fetch_add(1, Ordering::Relaxed);
239 let start = Instant::now();
240
241 task();
243
244 let elapsed = start.elapsed();
245 metrics.record_execution(elapsed);
246 metrics.active_workers.fetch_sub(1, Ordering::Relaxed);
247 metrics.queue_depth.fetch_sub(1, Ordering::Relaxed);
248 }
249 Err(crossbeam_channel::RecvTimeoutError::Timeout) => {
250 }
252 Err(crossbeam_channel::RecvTimeoutError::Disconnected) => {
253 break;
254 }
255 }
256 }
257 }
258
259 pub fn try_submit<F>(&self, task: F) -> Result<(), BlockingPoolError>
263 where
264 F: FnOnce() + Send + 'static,
265 {
266 self.metrics.tasks_submitted.fetch_add(1, Ordering::Relaxed);
267
268 match self.sender.try_send(Box::new(task)) {
269 Ok(()) => {
270 self.metrics.queue_depth.fetch_add(1, Ordering::Relaxed);
271 Ok(())
272 }
273 Err(TrySendError::Full(_)) => {
274 self.metrics.tasks_rejected.fetch_add(1, Ordering::Relaxed);
275 Err(BlockingPoolError::QueueFull)
276 }
277 Err(TrySendError::Disconnected(_)) => {
278 Err(BlockingPoolError::PoolShutdown)
279 }
280 }
281 }
282
283 pub fn submit<F>(&self, task: F) -> Result<(), BlockingPoolError>
285 where
286 F: FnOnce() + Send + 'static,
287 {
288 self.metrics.tasks_submitted.fetch_add(1, Ordering::Relaxed);
289 self.metrics.queue_depth.fetch_add(1, Ordering::Relaxed);
290
291 self.sender
292 .send(Box::new(task))
293 .map_err(|_| BlockingPoolError::PoolShutdown)
294 }
295
296 pub fn metrics(&self) -> &PoolMetrics {
298 &self.metrics
299 }
300
301 pub fn pool_type(&self) -> PoolType {
303 self.config.pool_type
304 }
305
306 pub fn shutdown(&self) {
308 let (lock, cvar) = &*self.shutdown;
309 *lock.lock() = true;
310 cvar.notify_all();
311 }
312}
313
314impl Drop for BlockingPool {
315 fn drop(&mut self) {
316 self.shutdown();
317 }
318}
319
320#[derive(Debug, Clone, Copy, PartialEq, Eq)]
322pub enum BlockingPoolError {
323 QueueFull,
325 PoolShutdown,
327}
328
329impl std::fmt::Display for BlockingPoolError {
330 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
331 match self {
332 BlockingPoolError::QueueFull => write!(f, "Blocking pool queue is full"),
333 BlockingPoolError::PoolShutdown => write!(f, "Blocking pool has been shut down"),
334 }
335 }
336}
337
338impl std::error::Error for BlockingPoolError {}
339
340pub struct BlockingPoolManager {
342 request_pool: BlockingPool,
343 compaction_pool: BlockingPool,
344 checkpoint_pool: BlockingPool,
345}
346
347impl BlockingPoolManager {
348 pub fn new() -> Self {
350 Self {
351 request_pool: BlockingPool::new(BlockingPoolConfig::request()),
352 compaction_pool: BlockingPool::new(BlockingPoolConfig::compaction()),
353 checkpoint_pool: BlockingPool::new(BlockingPoolConfig::checkpoint()),
354 }
355 }
356
357 pub fn with_configs(
359 request_config: BlockingPoolConfig,
360 compaction_config: BlockingPoolConfig,
361 checkpoint_config: BlockingPoolConfig,
362 ) -> Self {
363 Self {
364 request_pool: BlockingPool::new(request_config),
365 compaction_pool: BlockingPool::new(compaction_config),
366 checkpoint_pool: BlockingPool::new(checkpoint_config),
367 }
368 }
369
370 pub fn pool(&self, pool_type: PoolType) -> &BlockingPool {
372 match pool_type {
373 PoolType::Request => &self.request_pool,
374 PoolType::Compaction => &self.compaction_pool,
375 PoolType::Checkpoint => &self.checkpoint_pool,
376 }
377 }
378
379 pub fn submit_request<F>(&self, task: F) -> Result<(), BlockingPoolError>
381 where
382 F: FnOnce() + Send + 'static,
383 {
384 self.request_pool.try_submit(task)
385 }
386
387 pub fn submit_compaction<F>(&self, task: F) -> Result<(), BlockingPoolError>
389 where
390 F: FnOnce() + Send + 'static,
391 {
392 self.compaction_pool.submit(task)
393 }
394
395 pub fn submit_checkpoint<F>(&self, task: F) -> Result<(), BlockingPoolError>
397 where
398 F: FnOnce() + Send + 'static,
399 {
400 self.checkpoint_pool.submit(task)
401 }
402
403 pub fn all_metrics(&self) -> Vec<(PoolType, &PoolMetrics)> {
405 vec![
406 (PoolType::Request, self.request_pool.metrics()),
407 (PoolType::Compaction, self.compaction_pool.metrics()),
408 (PoolType::Checkpoint, self.checkpoint_pool.metrics()),
409 ]
410 }
411
412 pub fn shutdown(&self) {
414 self.request_pool.shutdown();
415 self.compaction_pool.shutdown();
416 self.checkpoint_pool.shutdown();
417 }
418}
419
420impl Default for BlockingPoolManager {
421 fn default() -> Self {
422 Self::new()
423 }
424}
425
426#[cfg(feature = "async")]
431pub mod async_bridge {
432 use super::*;
433 use tokio::sync::oneshot;
434
435 pub fn spawn_blocking<F, R>(
437 pool: &BlockingPool,
438 f: F,
439 ) -> impl Future<Output = Result<R, BlockingPoolError>>
440 where
441 F: FnOnce() -> R + Send + 'static,
442 R: Send + 'static,
443 {
444 let (tx, rx) = oneshot::channel();
445
446 let result = pool.try_submit(move || {
447 let result = f();
448 let _ = tx.send(result);
449 });
450
451 async move {
452 result?;
453 rx.await.map_err(|_| BlockingPoolError::PoolShutdown)
454 }
455 }
456}
457
458#[cfg(test)]
459mod tests {
460 use super::*;
461 use std::sync::atomic::AtomicUsize;
462
463 #[test]
464 fn test_pool_basic_execution() {
465 let pool = BlockingPool::new(BlockingPoolConfig::request());
466 let counter = Arc::new(AtomicUsize::new(0));
467
468 for _ in 0..10 {
469 let counter = counter.clone();
470 pool.submit(move || {
471 counter.fetch_add(1, Ordering::SeqCst);
472 })
473 .unwrap();
474 }
475
476 thread::sleep(Duration::from_millis(100));
478 assert_eq!(counter.load(Ordering::SeqCst), 10);
479 }
480
481 #[test]
482 fn test_pool_backpressure() {
483 let config = BlockingPoolConfig {
484 pool_type: PoolType::Request,
485 num_threads: 1,
486 queue_depth: 2,
487 stack_size: 2 * 1024 * 1024,
488 name_prefix: "test".to_string(),
489 };
490 let pool = BlockingPool::new(config);
491
492 for _ in 0..2 {
494 pool.try_submit(move || {
495 thread::sleep(Duration::from_secs(1));
496 })
497 .unwrap();
498 }
499
500 let result = pool.try_submit(|| {});
502 assert!(matches!(result, Err(BlockingPoolError::QueueFull)));
503 }
504
505 #[test]
506 fn test_pool_metrics() {
507 let pool = BlockingPool::new(BlockingPoolConfig::request());
508
509 pool.submit(|| {
510 thread::sleep(Duration::from_millis(10));
511 })
512 .unwrap();
513
514 thread::sleep(Duration::from_millis(50));
515
516 assert!(pool.metrics().tasks_completed.load(Ordering::Relaxed) >= 1);
517 assert!(pool.metrics().total_exec_time_us.load(Ordering::Relaxed) > 0);
518 }
519
520 #[test]
521 fn test_pool_manager() {
522 let manager = BlockingPoolManager::new();
523
524 manager.submit_request(|| {}).unwrap();
525 manager.submit_compaction(|| {}).unwrap();
526 manager.submit_checkpoint(|| {}).unwrap();
527
528 thread::sleep(Duration::from_millis(50));
529
530 for (pool_type, metrics) in manager.all_metrics() {
531 assert!(metrics.tasks_submitted.load(Ordering::Relaxed) >= 1);
532 }
533 }
534}