jlizard_simple_threadpool/
threadpool.rs1use crate::common::Job;
9use crate::worker::Worker;
10use std::error::Error;
11
12#[cfg(feature = "log")]
13use log::debug;
14
15use std::fmt::{Display, Formatter};
16use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
17use std::sync::mpsc::Sender;
18use std::sync::{Arc, Mutex, mpsc};
19use std::thread;
20use std::time::Duration;
21
22pub struct ThreadPool {
23 workers: Vec<Worker>,
24 sender: Option<Sender<Job>>,
25 num_threads: u8,
26 kill_signal: Arc<AtomicBool>,
27 max_jobs: usize,
28 job_count: Arc<AtomicUsize>,
29}
30
31impl ThreadPool {
32 pub fn new(pool_size: u8) -> Self {
37 if pool_size == 0 {
38 Self::default()
39 } else if pool_size == 1 {
40 Self {
41 workers: Vec::new(),
42 sender: None,
43 num_threads: pool_size,
44 kill_signal: Arc::new(AtomicBool::new(false)),
45 max_jobs: 50_000,
46 job_count: Arc::new(AtomicUsize::new(0)),
47 }
48 } else {
49 let (sender, receiver) = mpsc::channel::<Job>();
50
51 let mut workers = Vec::with_capacity(pool_size as usize);
52
53 let receiver = Arc::new(Mutex::new(receiver));
54 let kill_signal = Arc::new(AtomicBool::new(false));
55 let job_count = Arc::new(AtomicUsize::new(0));
56
57 for id in 1..=pool_size {
58 workers.push(Worker::new(
59 id,
60 Arc::clone(&receiver),
61 Arc::clone(&kill_signal),
62 Arc::clone(&job_count),
63 ));
64 }
65
66 Self {
67 workers,
68 sender: Some(sender),
69 num_threads: pool_size,
70 kill_signal,
71 max_jobs: 50_000,
72 job_count,
73 }
74 }
75 }
76
77 pub fn max_jobs(mut self, limit: usize) -> Self {
95 if limit == 0 {
96 panic!("max_jobs must be > 0");
97 }
98 self.max_jobs = limit;
99 self
100 }
101
102 pub fn execute<F>(&self, f: F) -> Result<(), Box<dyn Error>>
115 where
116 F: FnOnce() + Send + 'static,
117 {
118 if self.is_single_threaded() {
119 f();
120 Ok(())
121 } else {
122 let mut backoff_ms = 1u64;
123 const MAX_BACKOFF_MS: u64 = 50;
124
125 loop {
126 let current = self.job_count.load(Ordering::Relaxed);
127
128 if current < self.max_jobs {
129 self.job_count.fetch_add(1, Ordering::Relaxed);
130
131 match self.sender.as_ref().unwrap().send(Box::new(f)) {
132 Ok(_) => return Ok(()),
133 Err(e) => {
134 self.job_count.fetch_sub(1, Ordering::Relaxed);
135 return Err(e.into());
136 }
137 }
138 } else {
139 thread::sleep(Duration::from_millis(backoff_ms));
140 backoff_ms = (backoff_ms * 2).min(MAX_BACKOFF_MS);
141 }
142 }
143 }
144 }
145
146 pub fn is_single_threaded(&self) -> bool {
153 self.sender.is_none() && self.workers.is_empty()
154 }
155
156 pub fn signal_stop(&self) {
162 self.kill_signal.store(true, Ordering::Relaxed);
163 }
164
165 pub fn get_kill_signal(&self) -> Arc<AtomicBool> {
188 Arc::clone(&self.kill_signal)
189 }
190}
191
192impl Drop for ThreadPool {
193 fn drop(&mut self) {
194 drop(self.sender.take());
197
198 #[cfg(feature = "log")]
199 {
200 debug!("Waiting for workers to finish");
201 }
202
203 for worker in &mut self.workers {
207 #[cfg(feature = "log")]
208 {
209 debug!("Shutting down worker {}", worker.id);
210 }
211 worker.thread.take().unwrap().join().unwrap();
212 }
213
214 #[cfg(feature = "log")]
215 {
216 debug!("All workers stopped");
217 }
218 }
219}
220
221impl Default for ThreadPool {
222 fn default() -> Self {
223 let max_threads = thread::available_parallelism().map(|e| e.get()).expect("Unable to find any threads to run with. Possible system-side restrictions or limitations");
224
225 ThreadPool::new(u8::try_from(max_threads).unwrap_or(u8::MAX))
227 }
228}
229
230impl Display for ThreadPool {
231 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
232 if self.is_single_threaded() {
233 write!(
234 f,
235 "Concurrency Disabled: running all jobs sequentially in main thread. A user override forced this through an VEX2PDF_MAX_JOBS or the --max-jobs cli argument"
236 )
237 } else {
238 write!(
239 f,
240 "Concurrency Enabled: running with {} jobs",
241 self.num_threads
242 )
243 }
244 }
245}
246
247#[cfg(test)]
248mod tests {
249 use super::*;
250 use std::sync::{Arc, Mutex};
251 use std::time::Duration;
252
253 #[test]
254 fn test_threadpool_creation_modes() {
255 let pool_default = ThreadPool::new(0);
257 assert!(pool_default.num_threads > 0);
258 assert!(!pool_default.is_single_threaded());
259
260 let pool_single = ThreadPool::new(1);
262 assert_eq!(pool_single.num_threads, 1);
263 assert!(pool_single.is_single_threaded());
264 assert!(pool_single.workers.is_empty());
265 assert!(pool_single.sender.is_none());
266
267 let pool_multi = ThreadPool::new(4);
269 assert_eq!(pool_multi.num_threads, 4);
270 assert!(!pool_multi.is_single_threaded());
271 assert_eq!(pool_multi.workers.len(), 4);
272 assert!(pool_multi.sender.is_some());
273 }
274
275 #[test]
276 fn test_single_threaded_execution() {
277 let pool = ThreadPool::new(1);
278 let counter = Arc::new(Mutex::new(0));
279 let counter_clone = Arc::clone(&counter);
280
281 pool.execute(move || {
283 let mut num = counter_clone.lock().unwrap();
284 *num += 1;
285 })
286 .expect("Failed to execute job");
287
288 let value = *counter.lock().unwrap();
290 assert_eq!(value, 1);
291 }
292
293 #[test]
294 fn test_multi_threaded_execution() {
295 let pool = ThreadPool::new(2);
296 let results = Arc::new(Mutex::new(Vec::new()));
297
298 for i in 0..5 {
300 let results_clone = Arc::clone(&results);
301 pool.execute(move || {
302 std::thread::sleep(Duration::from_millis(10));
303 results_clone.lock().unwrap().push(i);
304 })
305 .expect("Failed to execute job");
306 }
307
308 drop(pool);
310
311 let final_results = results.lock().unwrap();
313 assert_eq!(final_results.len(), 5);
314 for i in 0..5 {
316 assert!(final_results.contains(&i));
317 }
318 }
319
320 #[test]
321 fn test_get_num_threads() {
322 let pool1 = ThreadPool::new(1);
323 assert_eq!(pool1.num_threads, 1);
324
325 let pool4 = ThreadPool::new(4);
326 assert_eq!(pool4.num_threads, 4);
327
328 let pool_default = ThreadPool::default();
329 assert!(pool_default.num_threads > 0);
330 }
331
332 #[test]
333 fn test_is_single_threaded() {
334 let pool_single = ThreadPool::new(1);
335 assert!(pool_single.is_single_threaded());
336
337 let pool_multi = ThreadPool::new(2);
338 assert!(!pool_multi.is_single_threaded());
339
340 let pool_default = ThreadPool::default();
341 assert!(!pool_default.is_single_threaded());
342 }
343
344 #[test]
345 fn test_pool_graceful_shutdown() {
346 let pool = ThreadPool::new(3);
347 let completed = Arc::new(Mutex::new(0));
348
349 for _ in 0..10 {
351 let completed_clone = Arc::clone(&completed);
352 pool.execute(move || {
353 std::thread::sleep(Duration::from_millis(20));
354 *completed_clone.lock().unwrap() += 1;
355 })
356 .expect("Failed to execute job");
357 }
358
359 drop(pool);
361
362 assert_eq!(*completed.lock().unwrap(), 10);
364 }
365
366 #[test]
367 fn test_signal_stop_method() {
368 let pool = ThreadPool::new(4);
369 let completed = Arc::new(Mutex::new(0));
370
371 for _ in 0..5 {
373 let completed_clone = Arc::clone(&completed);
374 pool.execute(move || {
375 std::thread::sleep(Duration::from_millis(10));
376 *completed_clone.lock().unwrap() += 1;
377 })
378 .expect("Failed to execute job");
379 }
380
381 pool.signal_stop();
383
384 drop(pool);
386
387 let count = *completed.lock().unwrap();
389 assert!(count >= 1 && count <= 5);
390 }
391
392 #[test]
393 fn test_get_kill_signal() {
394 let pool = ThreadPool::new(2);
395 let kill_signal = pool.get_kill_signal();
396
397 assert!(!kill_signal.load(std::sync::atomic::Ordering::Relaxed));
399
400 kill_signal.store(true, std::sync::atomic::Ordering::Relaxed);
402
403 drop(pool);
405 }
406
407 #[test]
408 fn test_job_signals_stop_to_other_workers() {
409 use std::sync::atomic::Ordering;
410
411 let pool = Arc::new(ThreadPool::new(4));
412 let completed = Arc::new(Mutex::new(Vec::new()));
413 let collision_found = Arc::new(AtomicBool::new(false));
414
415 for i in 0..20 {
417 let pool_clone = Arc::clone(&pool);
418 let completed_clone = Arc::clone(&completed);
419 let collision_found_clone = Arc::clone(&collision_found);
420 let kill_signal = pool.get_kill_signal();
421
422 pool.execute(move || {
423 if kill_signal.load(Ordering::Relaxed) {
425 return;
426 }
427
428 std::thread::sleep(Duration::from_millis(10));
429
430 if i == 2 {
432 collision_found_clone.store(true, Ordering::Relaxed);
433 pool_clone.signal_stop();
434 completed_clone.lock().unwrap().push(i);
435 } else {
436 if !collision_found_clone.load(Ordering::Relaxed) {
438 completed_clone.lock().unwrap().push(i);
439 }
440 }
441 })
442 .expect("Failed to execute job");
443 }
444
445 std::thread::sleep(Duration::from_millis(150));
447
448 drop(pool);
450
451 assert!(collision_found.load(Ordering::Relaxed));
453
454 let completed_jobs = completed.lock().unwrap();
456 assert!(completed_jobs.len() < 20);
457 assert!(completed_jobs.contains(&2)); }
459
460 #[test]
461 fn test_workers_complete_current_job_before_stopping() {
462 use std::sync::atomic::Ordering;
463
464 let pool = ThreadPool::new(2);
465 let job_started = Arc::new(AtomicBool::new(false));
466 let job_completed = Arc::new(AtomicBool::new(false));
467
468 let job_started_clone = Arc::clone(&job_started);
469 let job_completed_clone = Arc::clone(&job_completed);
470
471 pool.execute(move || {
473 job_started_clone.store(true, Ordering::Relaxed);
474 std::thread::sleep(Duration::from_millis(100));
475 job_completed_clone.store(true, Ordering::Relaxed);
476 })
477 .expect("Failed to execute job");
478
479 std::thread::sleep(Duration::from_millis(50));
481 assert!(job_started.load(Ordering::Relaxed));
482
483 pool.signal_stop();
485
486 drop(pool);
488
489 assert!(job_completed.load(Ordering::Relaxed));
491 }
492
493 #[test]
494 fn test_no_new_jobs_after_signal_stop() {
495 use std::sync::atomic::Ordering;
496
497 let pool = ThreadPool::new(3);
498 let executed = Arc::new(AtomicBool::new(false));
499 let executed_clone = Arc::clone(&executed);
500
501 pool.signal_stop();
503
504 pool.execute(move || {
506 executed_clone.store(true, Ordering::Relaxed);
507 })
508 .expect("Failed to execute job");
509
510 std::thread::sleep(Duration::from_millis(200));
512
513 drop(pool);
518 }
520
521 #[test]
522 fn test_kill_signal_in_single_threaded_mode() {
523 let pool = ThreadPool::new(1);
524 assert!(pool.is_single_threaded());
525
526 let kill_signal = pool.get_kill_signal();
528 assert!(!kill_signal.load(std::sync::atomic::Ordering::Relaxed));
529
530 pool.signal_stop();
532 assert!(kill_signal.load(std::sync::atomic::Ordering::Relaxed));
533
534 drop(pool);
536 }
537
538 #[test]
541 fn test_builder_pattern_api() {
542 let pool = ThreadPool::new(4).max_jobs(1000);
544 assert_eq!(pool.num_threads, 4);
545 assert_eq!(pool.max_jobs, 1000);
546 assert!(!pool.is_single_threaded());
547
548 let pool_default = ThreadPool::new(2);
550 assert_eq!(pool_default.max_jobs, 50_000);
551 }
552
553 #[test]
554 fn test_backward_compatibility() {
555 let pool = ThreadPool::new(3);
557 let counter = Arc::new(Mutex::new(0));
558 let counter_clone = Arc::clone(&counter);
559
560 pool.execute(move || {
561 *counter_clone.lock().unwrap() += 1;
562 })
563 .expect("Failed to execute");
564
565 drop(pool);
566 assert_eq!(*counter.lock().unwrap(), 1);
567 }
568
569 #[test]
570 fn test_single_threaded_ignores_max_jobs() {
571 let pool = ThreadPool::new(1).max_jobs(5);
573 assert!(pool.is_single_threaded());
574
575 let counter = Arc::new(Mutex::new(0));
576
577 for _ in 0..100 {
579 let counter_clone = Arc::clone(&counter);
580 pool.execute(move || {
581 *counter_clone.lock().unwrap() += 1;
582 })
583 .expect("Failed to execute");
584 }
585
586 assert_eq!(*counter.lock().unwrap(), 100);
588 }
589
590 #[test]
591 fn test_job_count_increments_and_decrements() {
592 use std::sync::atomic::Ordering;
593
594 let pool = ThreadPool::new(2).max_jobs(100);
595 let completed = Arc::new(Mutex::new(0));
596
597 for _ in 0..10 {
599 let completed_clone = Arc::clone(&completed);
600 pool.execute(move || {
601 std::thread::sleep(Duration::from_millis(50));
602 *completed_clone.lock().unwrap() += 1;
603 })
604 .expect("Failed to execute");
605 }
606
607 std::thread::sleep(Duration::from_millis(20));
609 let count_while_running = pool.job_count.load(Ordering::Relaxed);
610 assert!(count_while_running > 0);
611
612 drop(pool);
614
615 assert_eq!(*completed.lock().unwrap(), 10);
617 }
618
619 #[test]
620 fn test_backpressure_blocks_when_queue_full() {
621 use std::sync::atomic::Ordering;
622
623 let pool = Arc::new(ThreadPool::new(2).max_jobs(5));
625 let executed = Arc::new(Mutex::new(Vec::new()));
626
627 for i in 0..5 {
629 let executed_clone = Arc::clone(&executed);
630 pool.execute(move || {
631 std::thread::sleep(Duration::from_millis(100));
632 executed_clone.lock().unwrap().push(i);
633 })
634 .expect("Failed to execute");
635 }
636
637 std::thread::sleep(Duration::from_millis(20));
639
640 let count = pool.job_count.load(Ordering::Relaxed);
642 assert_eq!(count, 5);
643
644 let pool_clone = Arc::clone(&pool);
646 let executed_clone = Arc::clone(&executed);
647
648 std::thread::spawn(move || {
649 pool_clone
650 .execute(move || {
651 executed_clone.lock().unwrap().push(99);
652 })
653 .expect("Failed to execute");
654 });
655
656 std::thread::sleep(Duration::from_millis(300));
658 drop(pool);
659
660 let final_executed = executed.lock().unwrap();
662 assert_eq!(final_executed.len(), 6);
663 assert!(final_executed.contains(&99));
664 }
665
666 #[test]
667 fn test_exponential_backoff_timing() {
668
669 let pool = Arc::new(ThreadPool::new(1).max_jobs(2));
671
672 for _ in 0..2 {
674 pool.execute(|| {
675 std::thread::sleep(Duration::from_millis(200));
676 })
677 .expect("Failed to execute");
678 }
679
680 std::thread::sleep(Duration::from_millis(20));
682
683 let pool_clone = Arc::clone(&pool);
685
686 std::thread::spawn(move || {
687 pool_clone
688 .execute(|| {
689 })
691 .expect("Failed to execute");
692 });
693
694 std::thread::sleep(Duration::from_millis(500));
696 drop(pool);
697
698 }
700
701 #[test]
702 fn test_queue_recovers_after_full() {
703
704 let pool = ThreadPool::new(2).max_jobs(10);
705 let completed = Arc::new(Mutex::new(0));
706
707 for _ in 0..10 {
709 let completed_clone = Arc::clone(&completed);
710 pool.execute(move || {
711 std::thread::sleep(Duration::from_millis(50));
712 *completed_clone.lock().unwrap() += 1;
713 })
714 .expect("Failed to execute");
715 }
716
717 std::thread::sleep(Duration::from_millis(150));
719
720 for _ in 0..5 {
722 let completed_clone = Arc::clone(&completed);
723 pool.execute(move || {
724 *completed_clone.lock().unwrap() += 1;
725 })
726 .expect("Failed to execute");
727 }
728
729 drop(pool);
730
731 assert_eq!(*completed.lock().unwrap(), 15);
733 }
734
735 #[test]
736 fn test_concurrent_submissions_near_limit() {
737
738 let pool = Arc::new(ThreadPool::new(4).max_jobs(20));
739
740 let mut handles = vec![];
742 for _ in 0..5 {
743 let pool_clone = Arc::clone(&pool);
744
745 let handle = std::thread::spawn(move || {
746 for _ in 0..10 {
747 pool_clone
748 .execute(move || {
749 std::thread::sleep(Duration::from_millis(10));
750 })
751 .expect("Failed to execute");
752 }
753 });
754 handles.push(handle);
755 }
756
757 for handle in handles {
759 handle.join().unwrap();
760 }
761
762 drop(pool);
763
764 }
768
769 #[test]
770 #[should_panic(expected = "max_jobs must be > 0")]
771 fn test_max_jobs_zero_panics() {
772 let _pool = ThreadPool::new(4).max_jobs(0);
773 }
774}