Skip to main content

oxirs_core/concurrent/
thread_per_core.rs

1//! Thread-per-core architecture for optimal CPU utilization
2//!
3//! This module implements a thread-per-core work-stealing scheduler specifically
4//! optimized for RDF triple processing operations. It provides:
5//!
6//! - **CPU Affinity**: Each worker thread is pinned to a specific CPU core
7//! - **Work Stealing**: Idle threads steal work from busy threads
8//! - **NUMA Awareness**: Memory allocation considers NUMA topology
9//! - **Zero Allocation**: Lock-free work queues with bounded capacity
10//!
11//! # Architecture
12//!
13//! ```text
14//! ┌─────────┐   ┌─────────┐   ┌─────────┐   ┌─────────┐
15//! │ Core 0  │   │ Core 1  │   │ Core 2  │   │ Core 3  │
16//! │ Worker  │   │ Worker  │   │ Worker  │   │ Worker  │
17//! └────┬────┘   └────┬────┘   └────┬────┘   └────┬────┘
18//!      │             │             │             │
19//!      └─────────────┴─────────────┴─────────────┘
20//!                    Work Stealing
21//! ```
22//!
23//! # Example
24//!
25//! ```rust,ignore
26//! use oxirs_core::concurrent::thread_per_core::{ThreadPerCore, Task};
27//!
28//! # fn example() -> Result<(), oxirs_core::OxirsError> {
29//! // Create thread-per-core executor
30//! let executor = ThreadPerCore::new()?;
31//!
32//! // Submit work for parallel execution
33//! let task = Task::new(|| {
34//!     // Process RDF triples
35//!     42
36//! });
37//!
38//! let result = executor.submit(task)?;
39//! # Ok(())
40//! # }
41//! ```
42
43use crate::OxirsError;
44use crossbeam_deque::{Injector, Stealer, Worker};
45use scirs2_core::metrics::{Counter, Timer};
46use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
47use std::sync::Arc;
48use std::thread::{self, JoinHandle};
49use std::time::Duration;
50
51/// Result type
52pub type Result<T> = std::result::Result<T, OxirsError>;
53
54/// Thread-per-core executor
55pub struct ThreadPerCore {
56    /// Worker threads
57    workers: Vec<CoreWorker>,
58    /// Global work queue (injector)
59    global_queue: Arc<Injector<Task>>,
60    /// Running flag
61    running: Arc<AtomicBool>,
62    /// Configuration
63    config: ThreadPerCoreConfig,
64    /// Metrics
65    submitted_counter: Counter,
66    #[allow(dead_code)]
67    completed_counter: Counter,
68    #[allow(dead_code)]
69    stolen_counter: Counter,
70    #[allow(dead_code)]
71    execution_timer: Timer,
72}
73
74/// Configuration for thread-per-core executor
75#[derive(Debug, Clone)]
76pub struct ThreadPerCoreConfig {
77    /// Number of worker threads (defaults to num_cpus)
78    pub num_workers: usize,
79    /// Enable CPU affinity (pin threads to cores)
80    pub enable_affinity: bool,
81    /// Work queue capacity per worker
82    pub queue_capacity: usize,
83    /// Enable work stealing
84    pub enable_work_stealing: bool,
85    /// Steal batch size
86    pub steal_batch_size: usize,
87}
88
89impl Default for ThreadPerCoreConfig {
90    fn default() -> Self {
91        Self {
92            num_workers: num_cpus::get(),
93            enable_affinity: true,
94            queue_capacity: 1024,
95            enable_work_stealing: true,
96            steal_batch_size: 16,
97        }
98    }
99}
100
101/// A task to be executed on a core
102pub struct Task {
103    /// Task function
104    func: Box<dyn FnOnce() + Send + 'static>,
105    /// Task ID for tracking
106    id: usize,
107}
108
109impl Task {
110    /// Create a new task
111    pub fn new<F>(f: F) -> Self
112    where
113        F: FnOnce() + Send + 'static,
114    {
115        static NEXT_ID: AtomicUsize = AtomicUsize::new(0);
116        Self {
117            func: Box::new(f),
118            id: NEXT_ID.fetch_add(1, Ordering::Relaxed),
119        }
120    }
121
122    /// Execute the task
123    fn execute(self) {
124        (self.func)();
125    }
126
127    /// Get task ID
128    pub fn id(&self) -> usize {
129        self.id
130    }
131}
132
133/// Worker thread bound to a specific core
134struct CoreWorker {
135    /// Worker ID (corresponds to CPU core)
136    #[allow(dead_code)]
137    id: usize,
138    /// Thread handle
139    handle: Option<JoinHandle<()>>,
140    /// Local work queue
141    local_queue: Worker<Task>,
142    /// Stealer for this worker (used by other workers)
143    #[allow(dead_code)]
144    stealer: Stealer<Task>,
145    /// Statistics
146    stats: Arc<WorkerStats>,
147}
148
149/// Worker statistics
150#[derive(Default)]
151struct WorkerStats {
152    /// Tasks executed
153    executed: AtomicUsize,
154    /// Tasks stolen from this worker
155    #[allow(dead_code)]
156    stolen_from: AtomicUsize,
157    /// Tasks stolen by this worker
158    stolen_by: AtomicUsize,
159    /// Idle time in microseconds
160    idle_time_us: AtomicUsize,
161}
162
163impl ThreadPerCore {
164    /// Create a new thread-per-core executor with default configuration
165    pub fn new() -> Result<Self> {
166        Self::with_config(ThreadPerCoreConfig::default())
167    }
168
169    /// Create a thread-per-core executor with custom configuration
170    pub fn with_config(config: ThreadPerCoreConfig) -> Result<Self> {
171        tracing::info!(
172            "Initializing thread-per-core executor with {} workers",
173            config.num_workers
174        );
175
176        let global_queue = Arc::new(Injector::new());
177        let running = Arc::new(AtomicBool::new(true));
178
179        // Create workers with local queues and stealers
180        let mut workers = Vec::with_capacity(config.num_workers);
181        let mut stealers = Vec::new();
182        let mut worker_stats = Vec::new();
183
184        // First, create all local queues and collect stealers
185        for worker_id in 0..config.num_workers {
186            let local_queue = Worker::new_fifo();
187            let stealer = local_queue.stealer();
188            stealers.push(stealer.clone());
189
190            let stats = Arc::new(WorkerStats::default());
191            worker_stats.push(stats.clone());
192
193            let worker = CoreWorker {
194                id: worker_id,
195                handle: None,
196                local_queue,
197                stealer,
198                stats,
199            };
200
201            workers.push(worker);
202        }
203
204        // Start worker threads (move local queues into threads)
205        let stealers_arc = Arc::new(stealers);
206
207        for (worker_id, worker) in workers.iter_mut().enumerate() {
208            // Move the local queue into the thread
209            let local_queue = std::mem::replace(&mut worker.local_queue, Worker::new_fifo());
210            let global_queue = global_queue.clone();
211            let running = running.clone();
212            let stealers = stealers_arc.clone();
213            let stats = worker_stats[worker_id].clone();
214            let enable_affinity = config.enable_affinity;
215            let enable_work_stealing = config.enable_work_stealing;
216
217            let handle = thread::Builder::new()
218                .name(format!("rdf-worker-{}", worker_id))
219                .spawn(move || {
220                    Self::worker_loop(
221                        worker_id,
222                        local_queue,
223                        global_queue,
224                        stealers,
225                        running,
226                        stats,
227                        enable_affinity,
228                        enable_work_stealing,
229                    )
230                })
231                .map_err(|e| {
232                    OxirsError::ConcurrencyError(format!("Failed to spawn worker: {}", e))
233                })?;
234
235            worker.handle = Some(handle);
236        }
237
238        Ok(Self {
239            workers,
240            global_queue,
241            running,
242            config,
243            submitted_counter: Counter::new("threadpool.submitted".to_string()),
244            completed_counter: Counter::new("threadpool.completed".to_string()),
245            stolen_counter: Counter::new("threadpool.stolen".to_string()),
246            execution_timer: Timer::new("threadpool.execution".to_string()),
247        })
248    }
249
250    /// Submit a task for execution
251    pub fn submit(&self, task: Task) -> Result<()> {
252        if !self.running.load(Ordering::Relaxed) {
253            return Err(OxirsError::ConcurrencyError(
254                "Thread pool is shutting down".to_string(),
255            ));
256        }
257
258        // Push to global queue
259        self.global_queue.push(task);
260        self.submitted_counter.add(1);
261
262        Ok(())
263    }
264
265    /// Submit multiple tasks in batch
266    pub fn submit_batch(&self, tasks: Vec<Task>) -> Result<()> {
267        if !self.running.load(Ordering::Relaxed) {
268            return Err(OxirsError::ConcurrencyError(
269                "Thread pool is shutting down".to_string(),
270            ));
271        }
272
273        for task in tasks {
274            self.global_queue.push(task);
275        }
276
277        self.submitted_counter.add(1);
278
279        Ok(())
280    }
281
282    /// Get executor statistics
283    pub fn stats(&self) -> ThreadPerCoreStats {
284        let total_executed: usize = self
285            .workers
286            .iter()
287            .map(|w| w.stats.executed.load(Ordering::Relaxed))
288            .sum();
289
290        let total_stolen: usize = self
291            .workers
292            .iter()
293            .map(|w| w.stats.stolen_by.load(Ordering::Relaxed))
294            .sum();
295
296        let total_idle_us: usize = self
297            .workers
298            .iter()
299            .map(|w| w.stats.idle_time_us.load(Ordering::Relaxed))
300            .sum();
301
302        ThreadPerCoreStats {
303            num_workers: self.config.num_workers,
304            submitted: self.submitted_counter.get(),
305            completed: total_executed as u64,
306            stolen: total_stolen as u64,
307            avg_idle_time_us: total_idle_us as f64 / self.config.num_workers as f64,
308        }
309    }
310
311    /// Worker thread main loop
312    #[allow(clippy::too_many_arguments)]
313    fn worker_loop(
314        worker_id: usize,
315        local_queue: Worker<Task>,
316        global_queue: Arc<Injector<Task>>,
317        stealers: Arc<Vec<Stealer<Task>>>,
318        running: Arc<AtomicBool>,
319        stats: Arc<WorkerStats>,
320        enable_affinity: bool,
321        enable_work_stealing: bool,
322    ) {
323        // Set CPU affinity if enabled
324        if enable_affinity {
325            if let Err(e) = Self::set_cpu_affinity(worker_id) {
326                tracing::warn!("Failed to set CPU affinity for worker {}: {}", worker_id, e);
327            } else {
328                tracing::debug!("Worker {} pinned to core {}", worker_id, worker_id);
329            }
330        }
331
332        while running.load(Ordering::Relaxed) {
333            // Try to get task from local queue
334            if let Some(task) = local_queue.pop() {
335                task.execute();
336                stats.executed.fetch_add(1, Ordering::Relaxed);
337                continue;
338            }
339
340            // Try to steal from global queue
341            match global_queue.steal() {
342                crossbeam_deque::Steal::Success(task) => {
343                    task.execute();
344                    stats.executed.fetch_add(1, Ordering::Relaxed);
345                    continue;
346                }
347                crossbeam_deque::Steal::Empty => {}
348                crossbeam_deque::Steal::Retry => continue,
349            }
350
351            // Try to steal from other workers
352            if enable_work_stealing {
353                let mut found = false;
354                for (i, stealer) in stealers.iter().enumerate() {
355                    if i == worker_id {
356                        continue; // Skip self
357                    }
358
359                    match stealer.steal() {
360                        crossbeam_deque::Steal::Success(task) => {
361                            task.execute();
362                            stats.executed.fetch_add(1, Ordering::Relaxed);
363                            stats.stolen_by.fetch_add(1, Ordering::Relaxed);
364                            found = true;
365                            break;
366                        }
367                        crossbeam_deque::Steal::Empty => {}
368                        crossbeam_deque::Steal::Retry => continue,
369                    }
370                }
371
372                if found {
373                    continue;
374                }
375            }
376
377            // No work found, sleep briefly
378            let idle_start = std::time::Instant::now();
379            thread::sleep(Duration::from_micros(10));
380            let idle_us = idle_start.elapsed().as_micros() as usize;
381            stats.idle_time_us.fetch_add(idle_us, Ordering::Relaxed);
382        }
383
384        tracing::info!("Worker {} shutting down", worker_id);
385    }
386
387    /// Set CPU affinity for current thread
388    #[cfg(target_os = "linux")]
389    fn set_cpu_affinity(core_id: usize) -> Result<()> {
390        use std::mem;
391
392        // Linux-specific CPU affinity setting
393        unsafe {
394            let mut cpu_set: libc::cpu_set_t = mem::zeroed();
395            libc::CPU_SET(core_id, &mut cpu_set);
396
397            if libc::sched_setaffinity(0, mem::size_of::<libc::cpu_set_t>(), &cpu_set) != 0 {
398                return Err(OxirsError::ConcurrencyError(format!(
399                    "Failed to set CPU affinity: {}",
400                    std::io::Error::last_os_error()
401                )));
402            }
403        }
404
405        Ok(())
406    }
407
408    /// Set CPU affinity for current thread (no-op on non-Linux)
409    #[cfg(not(target_os = "linux"))]
410    fn set_cpu_affinity(_core_id: usize) -> Result<()> {
411        // CPU affinity not supported on this platform
412        Ok(())
413    }
414
415    /// Shutdown the thread pool gracefully
416    pub fn shutdown(self) -> Result<()> {
417        tracing::info!("Shutting down thread-per-core executor");
418
419        // Signal workers to stop
420        self.running.store(false, Ordering::Relaxed);
421
422        // Wait for all workers to finish
423        for mut worker in self.workers {
424            if let Some(handle) = worker.handle.take() {
425                handle.join().map_err(|_| {
426                    OxirsError::ConcurrencyError("Worker thread panicked".to_string())
427                })?;
428            }
429        }
430
431        tracing::info!("Thread-per-core executor shut down successfully");
432        Ok(())
433    }
434}
435
436impl Default for ThreadPerCore {
437    fn default() -> Self {
438        Self::new().expect("Failed to create ThreadPerCore executor")
439    }
440}
441
442/// Statistics for thread-per-core executor
443#[derive(Debug, Clone)]
444pub struct ThreadPerCoreStats {
445    /// Number of worker threads
446    pub num_workers: usize,
447    /// Total tasks submitted
448    pub submitted: u64,
449    /// Total tasks completed
450    pub completed: u64,
451    /// Total tasks stolen
452    pub stolen: u64,
453    /// Average idle time per worker (microseconds)
454    pub avg_idle_time_us: f64,
455}
456
457#[cfg(test)]
458mod tests {
459    use super::*;
460    use std::sync::atomic::AtomicUsize;
461    use std::sync::Arc;
462
463    #[test]
464    fn test_thread_per_core_creation() -> Result<()> {
465        let config = ThreadPerCoreConfig {
466            num_workers: 4,
467            ..Default::default()
468        };
469
470        let executor = ThreadPerCore::with_config(config)?;
471        executor.shutdown()?;
472
473        Ok(())
474    }
475
476    #[test]
477    fn test_task_submission() -> Result<()> {
478        let executor = ThreadPerCore::new()?;
479
480        let counter = Arc::new(AtomicUsize::new(0));
481        let counter_clone = counter.clone();
482
483        let task = Task::new(move || {
484            counter_clone.fetch_add(1, Ordering::Relaxed);
485        });
486
487        executor.submit(task)?;
488
489        // Give time for execution
490        thread::sleep(Duration::from_millis(100));
491
492        assert_eq!(counter.load(Ordering::Relaxed), 1);
493
494        executor.shutdown()?;
495        Ok(())
496    }
497
498    #[test]
499    fn test_batch_submission() -> Result<()> {
500        let executor = ThreadPerCore::new()?;
501
502        let counter = Arc::new(AtomicUsize::new(0));
503
504        let tasks: Vec<_> = (0..100)
505            .map(|_| {
506                let counter = counter.clone();
507                Task::new(move || {
508                    counter.fetch_add(1, Ordering::Relaxed);
509                })
510            })
511            .collect();
512
513        executor.submit_batch(tasks)?;
514
515        // Give time for execution
516        thread::sleep(Duration::from_millis(500));
517
518        assert_eq!(counter.load(Ordering::Relaxed), 100);
519
520        executor.shutdown()?;
521        Ok(())
522    }
523
524    #[test]
525    fn test_stats() -> Result<()> {
526        let executor = ThreadPerCore::new()?;
527
528        // Submit some tasks
529        for _ in 0..10 {
530            let task = Task::new(|| {
531                thread::sleep(Duration::from_millis(1));
532            });
533            executor.submit(task)?;
534        }
535
536        // Give time for execution
537        thread::sleep(Duration::from_millis(100));
538
539        let stats = executor.stats();
540        assert_eq!(stats.submitted, 10);
541        assert!(stats.completed <= 10);
542
543        executor.shutdown()?;
544        Ok(())
545    }
546}