Skip to main content

cqlite_core/platform/
threading.rs

1//! Threading utilities
2
3use std::sync::Arc;
4use tokio::sync::{Mutex, RwLock, Semaphore};
5
6/// Threading provider
7#[derive(Debug)]
8pub struct ThreadingProvider {
9    /// Thread pool for CPU-intensive tasks
10    cpu_pool: Arc<Semaphore>,
11
12    /// Thread pool for I/O tasks
13    io_pool: Arc<Semaphore>,
14}
15
16impl Default for ThreadingProvider {
17    fn default() -> Self {
18        Self::new()
19    }
20}
21
22impl ThreadingProvider {
23    /// Create a new threading provider
24    pub fn new() -> Self {
25        let cpu_count = num_cpus::get();
26
27        Self {
28            cpu_pool: Arc::new(Semaphore::new(cpu_count)),
29            io_pool: Arc::new(Semaphore::new(cpu_count * 2)),
30        }
31    }
32
33    /// Execute CPU-intensive task
34    pub async fn execute_cpu_task<F, T>(&self, task: F) -> crate::Result<T>
35    where
36        F: FnOnce() -> T + Send + 'static,
37        T: Send + 'static,
38    {
39        let _permit = self
40            .cpu_pool
41            .acquire()
42            .await
43            .map_err(|e| crate::Error::storage(e.to_string()))?;
44
45        let result = tokio::task::spawn_blocking(task)
46            .await
47            .map_err(|e| crate::Error::storage(e.to_string()))?;
48
49        Ok(result)
50    }
51
52    /// Execute I/O task
53    pub async fn execute_io_task<F, T>(&self, task: F) -> crate::Result<T>
54    where
55        F: FnOnce() -> T + Send + 'static,
56        T: Send + 'static,
57    {
58        let _permit = self
59            .io_pool
60            .acquire()
61            .await
62            .map_err(|e| crate::Error::storage(e.to_string()))?;
63
64        let result = tokio::task::spawn_blocking(task)
65            .await
66            .map_err(|e| crate::Error::storage(e.to_string()))?;
67
68        Ok(result)
69    }
70
71    /// Create a new mutex
72    pub fn create_mutex<T>(&self, value: T) -> Arc<Mutex<T>> {
73        Arc::new(Mutex::new(value))
74    }
75
76    /// Create a new read-write lock
77    pub fn create_rwlock<T>(&self, value: T) -> Arc<RwLock<T>> {
78        Arc::new(RwLock::new(value))
79    }
80
81    /// Get CPU count
82    pub fn cpu_count(&self) -> usize {
83        num_cpus::get()
84    }
85}