1use crate::OxirsError;
44use crossbeam_deque::{Injector, Stealer, Worker};
45use scirs2_core::metrics::{Counter, Timer};
46use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
47use std::sync::Arc;
48use std::thread::{self, JoinHandle};
49use std::time::Duration;
50
51pub type Result<T> = std::result::Result<T, OxirsError>;
53
54pub struct ThreadPerCore {
56 workers: Vec<CoreWorker>,
58 global_queue: Arc<Injector<Task>>,
60 running: Arc<AtomicBool>,
62 config: ThreadPerCoreConfig,
64 submitted_counter: Counter,
66 #[allow(dead_code)]
67 completed_counter: Counter,
68 #[allow(dead_code)]
69 stolen_counter: Counter,
70 #[allow(dead_code)]
71 execution_timer: Timer,
72}
73
74#[derive(Debug, Clone)]
76pub struct ThreadPerCoreConfig {
77 pub num_workers: usize,
79 pub enable_affinity: bool,
81 pub queue_capacity: usize,
83 pub enable_work_stealing: bool,
85 pub steal_batch_size: usize,
87}
88
89impl Default for ThreadPerCoreConfig {
90 fn default() -> Self {
91 Self {
92 num_workers: num_cpus::get(),
93 enable_affinity: true,
94 queue_capacity: 1024,
95 enable_work_stealing: true,
96 steal_batch_size: 16,
97 }
98 }
99}
100
101pub struct Task {
103 func: Box<dyn FnOnce() + Send + 'static>,
105 id: usize,
107}
108
109impl Task {
110 pub fn new<F>(f: F) -> Self
112 where
113 F: FnOnce() + Send + 'static,
114 {
115 static NEXT_ID: AtomicUsize = AtomicUsize::new(0);
116 Self {
117 func: Box::new(f),
118 id: NEXT_ID.fetch_add(1, Ordering::Relaxed),
119 }
120 }
121
122 fn execute(self) {
124 (self.func)();
125 }
126
127 pub fn id(&self) -> usize {
129 self.id
130 }
131}
132
133struct CoreWorker {
135 #[allow(dead_code)]
137 id: usize,
138 handle: Option<JoinHandle<()>>,
140 local_queue: Worker<Task>,
142 #[allow(dead_code)]
144 stealer: Stealer<Task>,
145 stats: Arc<WorkerStats>,
147}
148
149#[derive(Default)]
151struct WorkerStats {
152 executed: AtomicUsize,
154 #[allow(dead_code)]
156 stolen_from: AtomicUsize,
157 stolen_by: AtomicUsize,
159 idle_time_us: AtomicUsize,
161}
162
163impl ThreadPerCore {
164 pub fn new() -> Result<Self> {
166 Self::with_config(ThreadPerCoreConfig::default())
167 }
168
169 pub fn with_config(config: ThreadPerCoreConfig) -> Result<Self> {
171 tracing::info!(
172 "Initializing thread-per-core executor with {} workers",
173 config.num_workers
174 );
175
176 let global_queue = Arc::new(Injector::new());
177 let running = Arc::new(AtomicBool::new(true));
178
179 let mut workers = Vec::with_capacity(config.num_workers);
181 let mut stealers = Vec::new();
182 let mut worker_stats = Vec::new();
183
184 for worker_id in 0..config.num_workers {
186 let local_queue = Worker::new_fifo();
187 let stealer = local_queue.stealer();
188 stealers.push(stealer.clone());
189
190 let stats = Arc::new(WorkerStats::default());
191 worker_stats.push(stats.clone());
192
193 let worker = CoreWorker {
194 id: worker_id,
195 handle: None,
196 local_queue,
197 stealer,
198 stats,
199 };
200
201 workers.push(worker);
202 }
203
204 let stealers_arc = Arc::new(stealers);
206
207 for (worker_id, worker) in workers.iter_mut().enumerate() {
208 let local_queue = std::mem::replace(&mut worker.local_queue, Worker::new_fifo());
210 let global_queue = global_queue.clone();
211 let running = running.clone();
212 let stealers = stealers_arc.clone();
213 let stats = worker_stats[worker_id].clone();
214 let enable_affinity = config.enable_affinity;
215 let enable_work_stealing = config.enable_work_stealing;
216
217 let handle = thread::Builder::new()
218 .name(format!("rdf-worker-{}", worker_id))
219 .spawn(move || {
220 Self::worker_loop(
221 worker_id,
222 local_queue,
223 global_queue,
224 stealers,
225 running,
226 stats,
227 enable_affinity,
228 enable_work_stealing,
229 )
230 })
231 .map_err(|e| {
232 OxirsError::ConcurrencyError(format!("Failed to spawn worker: {}", e))
233 })?;
234
235 worker.handle = Some(handle);
236 }
237
238 Ok(Self {
239 workers,
240 global_queue,
241 running,
242 config,
243 submitted_counter: Counter::new("threadpool.submitted".to_string()),
244 completed_counter: Counter::new("threadpool.completed".to_string()),
245 stolen_counter: Counter::new("threadpool.stolen".to_string()),
246 execution_timer: Timer::new("threadpool.execution".to_string()),
247 })
248 }
249
250 pub fn submit(&self, task: Task) -> Result<()> {
252 if !self.running.load(Ordering::Relaxed) {
253 return Err(OxirsError::ConcurrencyError(
254 "Thread pool is shutting down".to_string(),
255 ));
256 }
257
258 self.global_queue.push(task);
260 self.submitted_counter.add(1);
261
262 Ok(())
263 }
264
265 pub fn submit_batch(&self, tasks: Vec<Task>) -> Result<()> {
267 if !self.running.load(Ordering::Relaxed) {
268 return Err(OxirsError::ConcurrencyError(
269 "Thread pool is shutting down".to_string(),
270 ));
271 }
272
273 for task in tasks {
274 self.global_queue.push(task);
275 }
276
277 self.submitted_counter.add(1);
278
279 Ok(())
280 }
281
282 pub fn stats(&self) -> ThreadPerCoreStats {
284 let total_executed: usize = self
285 .workers
286 .iter()
287 .map(|w| w.stats.executed.load(Ordering::Relaxed))
288 .sum();
289
290 let total_stolen: usize = self
291 .workers
292 .iter()
293 .map(|w| w.stats.stolen_by.load(Ordering::Relaxed))
294 .sum();
295
296 let total_idle_us: usize = self
297 .workers
298 .iter()
299 .map(|w| w.stats.idle_time_us.load(Ordering::Relaxed))
300 .sum();
301
302 ThreadPerCoreStats {
303 num_workers: self.config.num_workers,
304 submitted: self.submitted_counter.get(),
305 completed: total_executed as u64,
306 stolen: total_stolen as u64,
307 avg_idle_time_us: total_idle_us as f64 / self.config.num_workers as f64,
308 }
309 }
310
311 #[allow(clippy::too_many_arguments)]
313 fn worker_loop(
314 worker_id: usize,
315 local_queue: Worker<Task>,
316 global_queue: Arc<Injector<Task>>,
317 stealers: Arc<Vec<Stealer<Task>>>,
318 running: Arc<AtomicBool>,
319 stats: Arc<WorkerStats>,
320 enable_affinity: bool,
321 enable_work_stealing: bool,
322 ) {
323 if enable_affinity {
325 if let Err(e) = Self::set_cpu_affinity(worker_id) {
326 tracing::warn!("Failed to set CPU affinity for worker {}: {}", worker_id, e);
327 } else {
328 tracing::debug!("Worker {} pinned to core {}", worker_id, worker_id);
329 }
330 }
331
332 while running.load(Ordering::Relaxed) {
333 if let Some(task) = local_queue.pop() {
335 task.execute();
336 stats.executed.fetch_add(1, Ordering::Relaxed);
337 continue;
338 }
339
340 match global_queue.steal() {
342 crossbeam_deque::Steal::Success(task) => {
343 task.execute();
344 stats.executed.fetch_add(1, Ordering::Relaxed);
345 continue;
346 }
347 crossbeam_deque::Steal::Empty => {}
348 crossbeam_deque::Steal::Retry => continue,
349 }
350
351 if enable_work_stealing {
353 let mut found = false;
354 for (i, stealer) in stealers.iter().enumerate() {
355 if i == worker_id {
356 continue; }
358
359 match stealer.steal() {
360 crossbeam_deque::Steal::Success(task) => {
361 task.execute();
362 stats.executed.fetch_add(1, Ordering::Relaxed);
363 stats.stolen_by.fetch_add(1, Ordering::Relaxed);
364 found = true;
365 break;
366 }
367 crossbeam_deque::Steal::Empty => {}
368 crossbeam_deque::Steal::Retry => continue,
369 }
370 }
371
372 if found {
373 continue;
374 }
375 }
376
377 let idle_start = std::time::Instant::now();
379 thread::sleep(Duration::from_micros(10));
380 let idle_us = idle_start.elapsed().as_micros() as usize;
381 stats.idle_time_us.fetch_add(idle_us, Ordering::Relaxed);
382 }
383
384 tracing::info!("Worker {} shutting down", worker_id);
385 }
386
387 #[cfg(target_os = "linux")]
389 fn set_cpu_affinity(core_id: usize) -> Result<()> {
390 use std::mem;
391
392 unsafe {
394 let mut cpu_set: libc::cpu_set_t = mem::zeroed();
395 libc::CPU_SET(core_id, &mut cpu_set);
396
397 if libc::sched_setaffinity(0, mem::size_of::<libc::cpu_set_t>(), &cpu_set) != 0 {
398 return Err(OxirsError::ConcurrencyError(format!(
399 "Failed to set CPU affinity: {}",
400 std::io::Error::last_os_error()
401 )));
402 }
403 }
404
405 Ok(())
406 }
407
408 #[cfg(not(target_os = "linux"))]
410 fn set_cpu_affinity(_core_id: usize) -> Result<()> {
411 Ok(())
413 }
414
415 pub fn shutdown(self) -> Result<()> {
417 tracing::info!("Shutting down thread-per-core executor");
418
419 self.running.store(false, Ordering::Relaxed);
421
422 for mut worker in self.workers {
424 if let Some(handle) = worker.handle.take() {
425 handle.join().map_err(|_| {
426 OxirsError::ConcurrencyError("Worker thread panicked".to_string())
427 })?;
428 }
429 }
430
431 tracing::info!("Thread-per-core executor shut down successfully");
432 Ok(())
433 }
434}
435
436impl Default for ThreadPerCore {
437 fn default() -> Self {
438 Self::new().expect("Failed to create ThreadPerCore executor")
439 }
440}
441
442#[derive(Debug, Clone)]
444pub struct ThreadPerCoreStats {
445 pub num_workers: usize,
447 pub submitted: u64,
449 pub completed: u64,
451 pub stolen: u64,
453 pub avg_idle_time_us: f64,
455}
456
457#[cfg(test)]
458mod tests {
459 use super::*;
460 use std::sync::atomic::AtomicUsize;
461 use std::sync::Arc;
462
463 #[test]
464 fn test_thread_per_core_creation() -> Result<()> {
465 let config = ThreadPerCoreConfig {
466 num_workers: 4,
467 ..Default::default()
468 };
469
470 let executor = ThreadPerCore::with_config(config)?;
471 executor.shutdown()?;
472
473 Ok(())
474 }
475
476 #[test]
477 fn test_task_submission() -> Result<()> {
478 let executor = ThreadPerCore::new()?;
479
480 let counter = Arc::new(AtomicUsize::new(0));
481 let counter_clone = counter.clone();
482
483 let task = Task::new(move || {
484 counter_clone.fetch_add(1, Ordering::Relaxed);
485 });
486
487 executor.submit(task)?;
488
489 thread::sleep(Duration::from_millis(100));
491
492 assert_eq!(counter.load(Ordering::Relaxed), 1);
493
494 executor.shutdown()?;
495 Ok(())
496 }
497
498 #[test]
499 fn test_batch_submission() -> Result<()> {
500 let executor = ThreadPerCore::new()?;
501
502 let counter = Arc::new(AtomicUsize::new(0));
503
504 let tasks: Vec<_> = (0..100)
505 .map(|_| {
506 let counter = counter.clone();
507 Task::new(move || {
508 counter.fetch_add(1, Ordering::Relaxed);
509 })
510 })
511 .collect();
512
513 executor.submit_batch(tasks)?;
514
515 thread::sleep(Duration::from_millis(500));
517
518 assert_eq!(counter.load(Ordering::Relaxed), 100);
519
520 executor.shutdown()?;
521 Ok(())
522 }
523
524 #[test]
525 fn test_stats() -> Result<()> {
526 let executor = ThreadPerCore::new()?;
527
528 for _ in 0..10 {
530 let task = Task::new(|| {
531 thread::sleep(Duration::from_millis(1));
532 });
533 executor.submit(task)?;
534 }
535
536 thread::sleep(Duration::from_millis(100));
538
539 let stats = executor.stats();
540 assert_eq!(stats.submitted, 10);
541 assert!(stats.completed <= 10);
542
543 executor.shutdown()?;
544 Ok(())
545 }
546}