Skip to main content

philiprehberger_task_queue/
lib.rs

1//! In-process thread-based task queue with priority and concurrency control.
2//!
3//! This crate provides a simple task queue that runs closures on a pool of worker
4//! threads. Tasks can be submitted with different priorities, and higher-priority
5//! tasks are executed first.
6//!
7//! # Example
8//!
9//! ```
10//! use philiprehberger_task_queue::{TaskQueue, Priority};
11//!
12//! let queue = TaskQueue::new(2);
13//!
14//! let handle = queue.submit(|| 1 + 1);
15//! assert_eq!(handle.join().unwrap(), 2);
16//!
17//! let handle = queue.submit_with_priority(Priority::High, || "done");
18//! assert_eq!(handle.join().unwrap(), "done");
19//!
20//! queue.shutdown();
21//! ```
22
23use std::cmp::Ordering;
24use std::collections::BinaryHeap;
25use std::panic::{self, AssertUnwindSafe};
26use std::sync::atomic::{AtomicU64, Ordering as AtomicOrdering};
27use std::sync::{Arc, Condvar, Mutex};
28use std::thread;
29use std::time::{Duration, Instant};
30
31/// Task execution priority.
32///
33/// Higher-priority tasks are dequeued before lower-priority ones.
34#[derive(Debug, Clone, Copy, Eq, PartialEq)]
35pub enum Priority {
36    /// Lowest execution priority.
37    Low,
38    /// Default execution priority.
39    Normal,
40    /// Highest execution priority.
41    High,
42}
43
44impl Priority {
45    fn as_u8(self) -> u8 {
46        match self {
47            Priority::Low => 0,
48            Priority::Normal => 1,
49            Priority::High => 2,
50        }
51    }
52}
53
54impl Ord for Priority {
55    fn cmp(&self, other: &Self) -> Ordering {
56        self.as_u8().cmp(&other.as_u8())
57    }
58}
59
60impl PartialOrd for Priority {
61    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
62        Some(self.cmp(other))
63    }
64}
65
66/// Error returned when a task fails to produce a result.
67#[derive(Debug)]
68pub enum TaskError {
69    /// The task panicked during execution.
70    Panicked,
71    /// The task was cancelled because the queue shut down before it could run.
72    Cancelled,
73    /// The task was rejected because the queue is at capacity.
74    QueueFull,
75}
76
77impl std::fmt::Display for TaskError {
78    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
79        match self {
80            TaskError::Panicked => write!(f, "task panicked"),
81            TaskError::Cancelled => write!(f, "task cancelled"),
82            TaskError::QueueFull => write!(f, "task rejected: queue is full"),
83        }
84    }
85}
86
87impl std::error::Error for TaskError {}
88
89/// Snapshot of task queue statistics for observability.
90///
91/// Obtained via [`TaskQueue::stats`].
92#[derive(Debug, Clone)]
93pub struct TaskQueueStats {
94    /// Total number of tasks submitted to the queue.
95    pub total_submitted: u64,
96    /// Number of tasks that completed successfully.
97    pub completed: u64,
98    /// Number of tasks that failed (panicked).
99    pub failed: u64,
100    /// Number of tasks currently being executed by workers.
101    pub in_flight: u64,
102    /// Sum of end-to-end latencies (enqueue -> completion) in nanoseconds
103    /// for every task that has finished (successfully or with a panic).
104    pub total_latency_nanos: u128,
105    /// Number of completed tasks that have contributed to `total_latency_nanos`.
106    pub completed_latency_samples: u64,
107}
108
109impl TaskQueueStats {
110    /// Return the average end-to-end task latency (enqueue -> completion).
111    ///
112    /// Returns `None` when no tasks have completed yet.
113    ///
114    /// # Example
115    ///
116    /// ```
117    /// use philiprehberger_task_queue::TaskQueue;
118    ///
119    /// let queue = TaskQueue::new(1);
120    /// queue.submit(|| 1 + 1).join().unwrap();
121    /// let stats = queue.stats();
122    /// assert!(stats.average_latency().is_some());
123    /// queue.shutdown();
124    /// ```
125    pub fn average_latency(&self) -> Option<Duration> {
126        if self.completed_latency_samples == 0 {
127            return None;
128        }
129        let avg_nanos = self.total_latency_nanos / self.completed_latency_samples as u128;
130        // Cap to u64::MAX nanos (~584 years) — safe for all practical cases.
131        let capped = avg_nanos.min(u64::MAX as u128) as u64;
132        Some(Duration::from_nanos(capped))
133    }
134}
135
136/// Shared atomic counters used by the task queue for stats tracking.
137struct StatsCounters {
138    total_submitted: AtomicU64,
139    completed: AtomicU64,
140    failed: AtomicU64,
141    in_flight: AtomicU64,
142    /// Cumulative enqueue -> completion latency across all finished tasks.
143    /// Stored in a mutex because u128 has no stable atomic on all targets.
144    latency: Mutex<LatencyAccumulator>,
145}
146
147#[derive(Default)]
148struct LatencyAccumulator {
149    total_nanos: u128,
150    samples: u64,
151}
152
153impl StatsCounters {
154    fn new() -> Self {
155        Self {
156            total_submitted: AtomicU64::new(0),
157            completed: AtomicU64::new(0),
158            failed: AtomicU64::new(0),
159            in_flight: AtomicU64::new(0),
160            latency: Mutex::new(LatencyAccumulator::default()),
161        }
162    }
163
164    fn record_latency(&self, elapsed: Duration) {
165        if let Ok(mut acc) = self.latency.lock() {
166            acc.total_nanos = acc.total_nanos.saturating_add(elapsed.as_nanos());
167            acc.samples = acc.samples.saturating_add(1);
168        }
169    }
170
171    fn snapshot_latency(&self) -> (u128, u64) {
172        match self.latency.lock() {
173            Ok(acc) => (acc.total_nanos, acc.samples),
174            Err(_) => (0, 0),
175        }
176    }
177}
178
179type CompletionCallback = dyn Fn(bool, Duration) + Send + Sync;
180
181/// A handle to a submitted task, used to retrieve the result.
182///
183/// # Example
184///
185/// ```
186/// use philiprehberger_task_queue::TaskQueue;
187///
188/// let queue = TaskQueue::new(1);
189/// let handle = queue.submit(|| 42);
190/// assert_eq!(handle.join().unwrap(), 42);
191/// queue.shutdown();
192/// ```
193pub struct TaskHandle<T> {
194    inner: Arc<TaskResultSlot<T>>,
195}
196
197struct TaskResultSlot<T> {
198    mutex: Mutex<Option<Result<T, TaskError>>>,
199    condvar: Condvar,
200}
201
202impl<T> TaskResultSlot<T> {
203    fn set(&self, value: Result<T, TaskError>) {
204        let mut guard = self.mutex.lock().unwrap();
205        *guard = Some(value);
206        self.condvar.notify_one();
207    }
208}
209
210impl<T> TaskHandle<T> {
211    /// Block until the task completes and return its result.
212    ///
213    /// Returns `Ok(value)` if the task completed successfully, or a [`TaskError`]
214    /// if the task panicked or was cancelled.
215    pub fn join(self) -> Result<T, TaskError> {
216        let mut guard = self.inner.mutex.lock().unwrap();
217        while guard.is_none() {
218            guard = self.inner.condvar.wait(guard).unwrap();
219        }
220        guard.take().unwrap()
221    }
222
223    /// Check whether the task has completed without blocking.
224    pub fn is_done(&self) -> bool {
225        self.inner.mutex.lock().unwrap().is_some()
226    }
227}
228
229/// Guard that sets `TaskError::Cancelled` on the result slot when dropped,
230/// unless the task has already completed. This ensures that `TaskHandle::join`
231/// never blocks forever if the task is dropped without running.
232struct CancelGuard<T> {
233    slot: Arc<TaskResultSlot<T>>,
234}
235
236impl<T> Drop for CancelGuard<T> {
237    fn drop(&mut self) {
238        let mut guard = self.slot.mutex.lock().unwrap();
239        if guard.is_none() {
240            *guard = Some(Err(TaskError::Cancelled));
241            self.slot.condvar.notify_one();
242        }
243    }
244}
245
246/// Returned by a task closure: signals the result slot after the worker
247/// has finished post-task bookkeeping (stats, callback).
248type TaskCompletion = Box<dyn FnOnce() + Send>;
249type BoxedTask = Box<dyn FnOnce() -> TaskCompletion + Send>;
250
251struct QueueEntry {
252    priority: Priority,
253    sequence: u64,
254    task: BoxedTask,
255    enqueued_at: Instant,
256}
257
258impl Eq for QueueEntry {}
259
260impl PartialEq for QueueEntry {
261    fn eq(&self, other: &Self) -> bool {
262        self.priority == other.priority && self.sequence == other.sequence
263    }
264}
265
266impl Ord for QueueEntry {
267    fn cmp(&self, other: &Self) -> Ordering {
268        self.priority
269            .cmp(&other.priority)
270            .then_with(|| other.sequence.cmp(&self.sequence))
271    }
272}
273
274impl PartialOrd for QueueEntry {
275    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
276        Some(self.cmp(other))
277    }
278}
279
280struct SharedState {
281    queue: BinaryHeap<QueueEntry>,
282    shutdown: bool,
283    draining: bool,
284    next_sequence: u64,
285    max_queued: Option<usize>,
286    paused: bool,
287}
288
289/// A thread-based task queue with configurable concurrency and priority scheduling.
290///
291/// Workers continuously pull the highest-priority task from the queue and execute it.
292/// When the queue is shut down, running tasks are allowed to complete but pending
293/// tasks are dropped (their handles will receive `TaskError::Cancelled`).
294///
295/// # Example
296///
297/// ```
298/// use philiprehberger_task_queue::{TaskQueue, Priority};
299///
300/// let queue = TaskQueue::new(2);
301///
302/// let h1 = queue.submit(|| 10);
303/// let h2 = queue.submit_with_priority(Priority::High, || 20);
304///
305/// assert_eq!(h1.join().unwrap(), 10);
306/// assert_eq!(h2.join().unwrap(), 20);
307///
308/// queue.shutdown();
309/// ```
310pub struct TaskQueue {
311    shared: Arc<(Mutex<SharedState>, Condvar)>,
312    workers: Option<Vec<thread::JoinHandle<()>>>,
313    stats: Arc<StatsCounters>,
314    callback: Arc<Mutex<Option<Arc<CompletionCallback>>>>,
315}
316
317impl TaskQueue {
318    /// Create a new task queue with the given number of worker threads.
319    ///
320    /// # Panics
321    ///
322    /// Panics if `concurrency` is zero.
323    pub fn new(concurrency: usize) -> Self {
324        assert!(concurrency > 0, "concurrency must be at least 1");
325
326        let shared = Arc::new((
327            Mutex::new(SharedState {
328                queue: BinaryHeap::new(),
329                shutdown: false,
330                draining: false,
331                next_sequence: 0,
332                max_queued: None,
333                paused: false,
334            }),
335            Condvar::new(),
336        ));
337
338        let stats = Arc::new(StatsCounters::new());
339        let callback: Arc<Mutex<Option<Arc<CompletionCallback>>>> = Arc::new(Mutex::new(None));
340
341        let mut workers = Vec::with_capacity(concurrency);
342        for _ in 0..concurrency {
343            let shared = Arc::clone(&shared);
344            let stats = Arc::clone(&stats);
345            let callback = Arc::clone(&callback);
346            let handle = thread::spawn(move || {
347                worker_loop(&shared, &stats, &callback);
348            });
349            workers.push(handle);
350        }
351
352        TaskQueue {
353            shared,
354            workers: Some(workers),
355            stats,
356            callback,
357        }
358    }
359
360    /// Create a new task queue with a maximum pending task limit.
361    ///
362    /// When the queue contains `max_queued` or more pending tasks, new
363    /// submissions are rejected and the returned handle will yield
364    /// `TaskError::QueueFull` on join.
365    ///
366    /// # Panics
367    ///
368    /// Panics if `concurrency` is zero.
369    pub fn with_capacity(concurrency: usize, max_queued: usize) -> Self {
370        let queue = Self::new(concurrency);
371        {
372            let (ref mutex, _) = *queue.shared;
373            mutex.lock().unwrap().max_queued = Some(max_queued);
374        }
375        queue
376    }
377
378    /// Temporarily stop workers from processing tasks.
379    ///
380    /// Tasks can still be submitted while paused, but workers will not
381    /// dequeue them until [`resume`](TaskQueue::resume) is called.
382    /// Draining overrides the pause — [`drain`](TaskQueue::drain) will
383    /// complete all pending tasks even if the queue is paused.
384    pub fn pause(&self) {
385        let (ref mutex, _) = *self.shared;
386        mutex.lock().unwrap().paused = true;
387    }
388
389    /// Resume processing after a call to [`pause`](TaskQueue::pause).
390    pub fn resume(&self) {
391        let (ref mutex, ref condvar) = *self.shared;
392        mutex.lock().unwrap().paused = false;
393        condvar.notify_all();
394    }
395
396    /// Check whether task processing is currently paused.
397    pub fn is_paused(&self) -> bool {
398        let (ref mutex, _) = *self.shared;
399        mutex.lock().unwrap().paused
400    }
401
402    /// Return the number of tasks waiting in the queue.
403    pub fn pending_count(&self) -> usize {
404        let (ref mutex, _) = *self.shared;
405        mutex.lock().unwrap().queue.len()
406    }
407
408    /// Submit a task with `Normal` priority.
409    ///
410    /// Returns a [`TaskHandle`] that can be used to retrieve the result.
411    pub fn submit<F, T>(&self, task: F) -> TaskHandle<T>
412    where
413        F: FnOnce() -> T + Send + 'static,
414        T: Send + 'static,
415    {
416        self.submit_with_priority(Priority::Normal, task)
417    }
418
419    /// Submit a task with the given priority.
420    ///
421    /// Higher-priority tasks are executed before lower-priority ones when
422    /// multiple tasks are waiting in the queue.
423    ///
424    /// Returns a [`TaskHandle`] that can be used to retrieve the result.
425    ///
426    /// If the queue is draining or shut down, the returned handle will
427    /// immediately yield `TaskError::Cancelled`.
428    pub fn submit_with_priority<F, T>(&self, priority: Priority, task: F) -> TaskHandle<T>
429    where
430        F: FnOnce() -> T + Send + 'static,
431        T: Send + 'static,
432    {
433        let slot = Arc::new(TaskResultSlot {
434            mutex: Mutex::new(None),
435            condvar: Condvar::new(),
436        });
437
438        // Reject submissions if draining, shut down, or queue is full.
439        {
440            let (ref mutex, _) = *self.shared;
441            let state = mutex.lock().unwrap();
442            if state.draining || state.shutdown {
443                slot.set(Err(TaskError::Cancelled));
444                return TaskHandle { inner: slot };
445            }
446            if let Some(max) = state.max_queued {
447                if state.queue.len() >= max {
448                    slot.set(Err(TaskError::QueueFull));
449                    return TaskHandle { inner: slot };
450                }
451            }
452        }
453
454        let cancel_guard = CancelGuard {
455            slot: Arc::clone(&slot),
456        };
457
458        let boxed: BoxedTask = Box::new(move || {
459            // The cancel guard is moved into the closure. If the closure runs,
460            // we explicitly set the result and then forget the guard so it
461            // doesn't overwrite with Cancelled. If the closure is dropped without
462            // running, the guard's Drop fires and sets Cancelled.
463            let outcome = panic::catch_unwind(AssertUnwindSafe(task));
464            let success = outcome.is_ok();
465            TASK_SUCCESS.with(|s| s.set(success));
466            let value = match outcome {
467                Ok(v) => Ok(v),
468                Err(_) => Err(TaskError::Panicked),
469            };
470            let slot = Arc::clone(&cancel_guard.slot);
471            // Prevent the Drop impl from overwriting the result with Cancelled
472            std::mem::forget(cancel_guard);
473            // Return a completion callback that the worker calls AFTER stats
474            // and on_complete callback, so join() doesn't return prematurely.
475            Box::new(move || slot.set(value))
476        });
477
478        self.stats
479            .total_submitted
480            .fetch_add(1, AtomicOrdering::Relaxed);
481
482        let (ref mutex, ref condvar) = *self.shared;
483        let mut state = mutex.lock().unwrap();
484        let sequence = state.next_sequence;
485        state.next_sequence += 1;
486        state.queue.push(QueueEntry {
487            priority,
488            sequence,
489            task: boxed,
490            enqueued_at: Instant::now(),
491        });
492        condvar.notify_one();
493
494        TaskHandle { inner: slot }
495    }
496
497    /// Return a snapshot of task queue statistics.
498    ///
499    /// The counters are updated atomically as tasks are submitted, completed,
500    /// and failed, so successive calls may return different values.
501    ///
502    /// # Example
503    ///
504    /// ```
505    /// use philiprehberger_task_queue::TaskQueue;
506    ///
507    /// let queue = TaskQueue::new(1);
508    /// let handle = queue.submit(|| 1 + 1);
509    /// handle.join().unwrap();
510    ///
511    /// let stats = queue.stats();
512    /// assert_eq!(stats.total_submitted, 1);
513    /// assert_eq!(stats.completed, 1);
514    /// queue.shutdown();
515    /// ```
516    pub fn stats(&self) -> TaskQueueStats {
517        let (total_latency_nanos, completed_latency_samples) = self.stats.snapshot_latency();
518        TaskQueueStats {
519            total_submitted: self.stats.total_submitted.load(AtomicOrdering::Relaxed),
520            completed: self.stats.completed.load(AtomicOrdering::Relaxed),
521            failed: self.stats.failed.load(AtomicOrdering::Relaxed),
522            in_flight: self.stats.in_flight.load(AtomicOrdering::Relaxed),
523            total_latency_nanos,
524            completed_latency_samples,
525        }
526    }
527
528    /// Stop accepting new tasks and wait for all queued and in-flight tasks to
529    /// complete.
530    ///
531    /// Unlike [`shutdown`](TaskQueue::shutdown), `drain` does **not** drop
532    /// pending tasks — every task that was already submitted will run to
533    /// completion. New submissions made after `drain` is called will be
534    /// immediately cancelled.
535    ///
536    /// This method blocks until the queue is empty and all workers are idle,
537    /// then shuts down the worker threads.
538    ///
539    /// # Example
540    ///
541    /// ```
542    /// use philiprehberger_task_queue::TaskQueue;
543    /// use std::sync::Arc;
544    /// use std::sync::atomic::{AtomicUsize, Ordering};
545    ///
546    /// let queue = TaskQueue::new(2);
547    /// let counter = Arc::new(AtomicUsize::new(0));
548    ///
549    /// for _ in 0..5 {
550    ///     let c = counter.clone();
551    ///     queue.submit(move || { c.fetch_add(1, Ordering::SeqCst); });
552    /// }
553    ///
554    /// queue.drain();
555    /// assert_eq!(counter.load(Ordering::SeqCst), 5);
556    /// ```
557    pub fn drain(mut self) {
558        self.do_drain();
559    }
560
561    fn do_drain(&mut self) {
562        let (ref mutex, ref condvar) = *self.shared;
563        {
564            let mut state = mutex.lock().unwrap();
565            state.draining = true;
566            // Do NOT clear the queue — let workers process everything.
567            // Wake workers in case the queue was paused.
568            condvar.notify_all();
569        }
570
571        // Wait until the queue is empty and no tasks are in-flight.
572        {
573            let mut state = mutex.lock().unwrap();
574            while !state.queue.is_empty() || self.stats.in_flight.load(AtomicOrdering::SeqCst) > 0 {
575                state = condvar.wait(state).unwrap();
576            }
577        }
578
579        // Now perform a normal shutdown (workers will exit because queue is
580        // empty and shutdown flag is set).
581        self.do_shutdown();
582    }
583
584    /// Register a callback that fires after each task completes.
585    ///
586    /// The callback receives two arguments:
587    /// - `success` — `true` if the task completed without panicking, `false` otherwise.
588    /// - `duration` — wall-clock time the task took to execute.
589    ///
590    /// Only one callback may be active at a time; calling this again replaces
591    /// the previous callback.
592    ///
593    /// # Example
594    ///
595    /// ```
596    /// use philiprehberger_task_queue::TaskQueue;
597    /// use std::sync::Arc;
598    /// use std::sync::atomic::{AtomicUsize, Ordering};
599    ///
600    /// let queue = TaskQueue::new(1);
601    /// let count = Arc::new(AtomicUsize::new(0));
602    /// let c = count.clone();
603    /// queue.on_complete(move |_success, _dur| {
604    ///     c.fetch_add(1, Ordering::SeqCst);
605    /// });
606    ///
607    /// queue.submit(|| 42).join().unwrap();
608    /// assert_eq!(count.load(Ordering::SeqCst), 1);
609    /// queue.shutdown();
610    /// ```
611    pub fn on_complete<F>(&self, callback: F)
612    where
613        F: Fn(bool, Duration) + Send + Sync + 'static,
614    {
615        let mut guard = self.callback.lock().unwrap();
616        *guard = Some(Arc::new(callback));
617    }
618
619    /// Shut down the task queue.
620    ///
621    /// Signals all workers to stop, waits for currently running tasks to finish,
622    /// and drops any pending tasks. Pending task handles will receive
623    /// `TaskError::Cancelled` when joined.
624    pub fn shutdown(mut self) {
625        self.do_shutdown();
626    }
627
628    fn do_shutdown(&mut self) {
629        let (ref mutex, ref condvar) = *self.shared;
630
631        {
632            let mut state = mutex.lock().unwrap();
633            state.shutdown = true;
634            condvar.notify_all();
635            // Drain the queue — dropping each entry drops its closure, which
636            // drops the CancelGuard, which sets TaskError::Cancelled on the slot.
637            state.queue.clear();
638        }
639
640        if let Some(workers) = self.workers.take() {
641            for w in workers {
642                let _ = w.join();
643            }
644        }
645    }
646}
647
648impl Drop for TaskQueue {
649    fn drop(&mut self) {
650        let (ref mutex, ref condvar) = *self.shared;
651        {
652            let mut state = mutex.lock().unwrap();
653            if !state.shutdown {
654                state.shutdown = true;
655                if !state.draining {
656                    state.queue.clear();
657                }
658                condvar.notify_all();
659            }
660        }
661        if let Some(workers) = self.workers.take() {
662            for w in workers {
663                let _ = w.join();
664            }
665        }
666    }
667}
668
669thread_local! {
670    /// Used by the task closure to communicate success/failure to the worker loop.
671    static TASK_SUCCESS: std::cell::Cell<bool> = const { std::cell::Cell::new(true) };
672}
673
674fn worker_loop(
675    shared: &(Mutex<SharedState>, Condvar),
676    stats: &StatsCounters,
677    callback: &Mutex<Option<Arc<CompletionCallback>>>,
678) {
679    let (ref mutex, ref condvar) = *shared;
680    loop {
681        let task = {
682            let mut state = mutex.lock().unwrap();
683            loop {
684                if !state.paused || state.draining {
685                    if let Some(entry) = state.queue.pop() {
686                        break Some((entry.task, entry.enqueued_at));
687                    }
688                }
689                if state.shutdown || (state.draining && state.queue.is_empty()) {
690                    break None;
691                }
692                state = condvar.wait(state).unwrap();
693            }
694        };
695        match task {
696            Some((task, enqueued_at)) => {
697                stats.in_flight.fetch_add(1, AtomicOrdering::SeqCst);
698                let start = Instant::now();
699                let completion = task();
700                let elapsed = start.elapsed();
701                let total_latency = enqueued_at.elapsed();
702                stats.record_latency(total_latency);
703                stats.in_flight.fetch_sub(1, AtomicOrdering::SeqCst);
704
705                // The task closure uses catch_unwind internally and communicates
706                // success/failure via a thread-local, since the boxed closure
707                // always returns () without panicking.
708                let success = TASK_SUCCESS.with(|s| s.get());
709                if success {
710                    stats.completed.fetch_add(1, AtomicOrdering::Relaxed);
711                } else {
712                    stats.failed.fetch_add(1, AtomicOrdering::Relaxed);
713                }
714
715                // Fire the on_complete callback if registered.
716                if let Ok(guard) = callback.lock() {
717                    if let Some(ref cb) = *guard {
718                        cb(success, elapsed);
719                    }
720                }
721
722                // Now set the result and notify the TaskHandle — this ensures
723                // stats and callback have both completed before join() returns.
724                completion();
725
726                // Notify condvar so drain() can check progress.
727                condvar.notify_all();
728            }
729            None => return,
730        }
731    }
732}
733
734#[cfg(test)]
735mod tests {
736    use super::*;
737    use std::sync::atomic::{AtomicUsize, Ordering};
738    use std::sync::mpsc;
739    use std::sync::Barrier;
740    use std::time::Duration;
741
742    #[test]
743    fn submit_and_join() {
744        let queue = TaskQueue::new(1);
745        let handle = queue.submit(|| 42);
746        assert_eq!(handle.join().unwrap(), 42);
747        queue.shutdown();
748    }
749
750    #[test]
751    fn submit_multiple_tasks_all_complete() {
752        let queue = TaskQueue::new(2);
753        let handles: Vec<_> = (0..10).map(|i| queue.submit(move || i * 2)).collect();
754        let results: Vec<_> = handles.into_iter().map(|h| h.join().unwrap()).collect();
755        for (i, r) in results.iter().enumerate() {
756            assert_eq!(*r, i * 2);
757        }
758        queue.shutdown();
759    }
760
761    #[test]
762    fn priority_ordering() {
763        let queue = TaskQueue::new(1);
764        let barrier = Arc::new(Barrier::new(2));
765        let order = Arc::new(Mutex::new(Vec::new()));
766
767        // Block the single worker
768        let b = barrier.clone();
769        queue.submit(move || {
770            b.wait();
771        });
772
773        // Give the worker time to pick up the blocking task
774        thread::sleep(Duration::from_millis(50));
775
776        // Now submit tasks with different priorities — they'll queue up
777        let o = order.clone();
778        let h_low = queue.submit_with_priority(Priority::Low, move || {
779            o.lock().unwrap().push("low");
780        });
781
782        let o = order.clone();
783        let h_high = queue.submit_with_priority(Priority::High, move || {
784            o.lock().unwrap().push("high");
785        });
786
787        let o = order.clone();
788        let h_normal = queue.submit_with_priority(Priority::Normal, move || {
789            o.lock().unwrap().push("normal");
790        });
791
792        // Unblock the worker
793        barrier.wait();
794
795        // Wait for all tasks
796        h_low.join().unwrap();
797        h_high.join().unwrap();
798        h_normal.join().unwrap();
799
800        let final_order = order.lock().unwrap();
801        assert_eq!(*final_order, vec!["high", "normal", "low"]);
802
803        queue.shutdown();
804    }
805
806    #[test]
807    fn is_done_returns_false_then_true() {
808        let queue = TaskQueue::new(1);
809        let barrier = Arc::new(Barrier::new(2));
810
811        let b = barrier.clone();
812        let handle = queue.submit(move || {
813            b.wait();
814            99
815        });
816
817        // Task is blocked, so not done yet
818        assert!(!handle.is_done());
819
820        // Unblock the task
821        barrier.wait();
822
823        // Wait for completion
824        let result = handle.join().unwrap();
825        assert_eq!(result, 99);
826
827        queue.shutdown();
828    }
829
830    #[test]
831    fn shutdown_completes_running_tasks() {
832        let queue = TaskQueue::new(1);
833        let (tx, rx) = mpsc::channel();
834
835        queue.submit(move || {
836            thread::sleep(Duration::from_millis(50));
837            tx.send(true).unwrap();
838        });
839
840        // Give the worker time to start the task
841        thread::sleep(Duration::from_millis(10));
842
843        // Shutdown should wait for the running task
844        queue.shutdown();
845
846        // The task should have completed
847        assert!(rx.recv_timeout(Duration::from_millis(100)).unwrap());
848    }
849
850    #[test]
851    fn panicking_task_returns_panicked_error() {
852        let queue = TaskQueue::new(1);
853        let handle = queue.submit(|| {
854            panic!("intentional panic");
855        });
856        match handle.join() {
857            Err(TaskError::Panicked) => {}
858            other => panic!("expected TaskError::Panicked, got {:?}", other.err()),
859        }
860
861        // Queue should still work after a panic
862        let handle = queue.submit(|| 123);
863        assert_eq!(handle.join().unwrap(), 123);
864
865        queue.shutdown();
866    }
867
868    #[test]
869    fn concurrency_limit_is_respected() {
870        let concurrency = 3;
871        let queue = TaskQueue::new(concurrency);
872        let running = Arc::new(AtomicUsize::new(0));
873        let max_running = Arc::new(AtomicUsize::new(0));
874
875        let mut handles = Vec::new();
876        for _ in 0..concurrency * 2 {
877            let r = running.clone();
878            let m = max_running.clone();
879            handles.push(queue.submit(move || {
880                let current = r.fetch_add(1, Ordering::SeqCst) + 1;
881                // Update max using compare-and-swap loop
882                loop {
883                    let prev_max = m.load(Ordering::SeqCst);
884                    if current <= prev_max {
885                        break;
886                    }
887                    if m.compare_exchange(prev_max, current, Ordering::SeqCst, Ordering::SeqCst)
888                        .is_ok()
889                    {
890                        break;
891                    }
892                }
893                thread::sleep(Duration::from_millis(50));
894                r.fetch_sub(1, Ordering::SeqCst);
895            }));
896        }
897
898        for h in handles {
899            h.join().unwrap();
900        }
901
902        let observed_max = max_running.load(Ordering::SeqCst);
903        assert!(
904            observed_max <= concurrency,
905            "max concurrent tasks ({observed_max}) exceeded concurrency limit ({concurrency})"
906        );
907
908        queue.shutdown();
909    }
910
911    #[test]
912    fn stats_tracks_submitted_and_completed() {
913        let queue = TaskQueue::new(2);
914
915        let handles: Vec<_> = (0..5).map(|i| queue.submit(move || i)).collect();
916        for h in handles {
917            h.join().unwrap();
918        }
919
920        let s = queue.stats();
921        assert_eq!(s.total_submitted, 5);
922        assert_eq!(s.completed, 5);
923        assert_eq!(s.failed, 0);
924        assert_eq!(s.in_flight, 0);
925
926        queue.shutdown();
927    }
928
929    #[test]
930    fn stats_tracks_failures() {
931        let queue = TaskQueue::new(1);
932
933        let h1 = queue.submit(|| panic!("boom"));
934        let _ = h1.join(); // Err(Panicked)
935
936        let h2 = queue.submit(|| 42);
937        h2.join().unwrap();
938
939        let s = queue.stats();
940        assert_eq!(s.total_submitted, 2);
941        assert_eq!(s.completed, 1);
942        assert_eq!(s.failed, 1);
943
944        queue.shutdown();
945    }
946
947    #[test]
948    fn drain_completes_all_pending_tasks() {
949        let queue = TaskQueue::new(1);
950        let counter = Arc::new(AtomicUsize::new(0));
951
952        for _ in 0..10 {
953            let c = counter.clone();
954            queue.submit(move || {
955                c.fetch_add(1, Ordering::SeqCst);
956            });
957        }
958
959        queue.drain();
960        assert_eq!(counter.load(Ordering::SeqCst), 10);
961    }
962
963    #[test]
964    fn drain_rejects_new_submissions() {
965        let queue = TaskQueue::new(1);
966        let barrier = Arc::new(Barrier::new(2));
967
968        // Block the worker so we can call drain from another context
969        let b = barrier.clone();
970        queue.submit(move || {
971            b.wait();
972        });
973
974        // Give the worker time to pick up the task
975        thread::sleep(Duration::from_millis(50));
976
977        // Submit a task that should be queued
978        let counter = Arc::new(AtomicUsize::new(0));
979        let c = counter.clone();
980        queue.submit(move || {
981            c.fetch_add(1, Ordering::SeqCst);
982        });
983
984        // We need to set draining and then unblock. Since drain() consumes self,
985        // we test the rejection behavior differently: submit after drain finishes
986        // is not possible (self consumed). Instead, verify that drain processes
987        // all queued tasks.
988        barrier.wait();
989        queue.drain();
990        assert_eq!(counter.load(Ordering::SeqCst), 1);
991    }
992
993    #[test]
994    fn on_complete_callback_fires_on_success() {
995        let queue = TaskQueue::new(1);
996        let call_count = Arc::new(AtomicUsize::new(0));
997        let success_count = Arc::new(AtomicUsize::new(0));
998
999        let cc = call_count.clone();
1000        let sc = success_count.clone();
1001        queue.on_complete(move |success, dur| {
1002            cc.fetch_add(1, Ordering::SeqCst);
1003            if success {
1004                sc.fetch_add(1, Ordering::SeqCst);
1005            }
1006            assert!(dur.as_nanos() > 0);
1007        });
1008
1009        let h = queue.submit(|| 42);
1010        h.join().unwrap();
1011
1012        assert_eq!(call_count.load(Ordering::SeqCst), 1);
1013        assert_eq!(success_count.load(Ordering::SeqCst), 1);
1014
1015        queue.shutdown();
1016    }
1017
1018    #[test]
1019    fn on_complete_callback_fires_on_failure() {
1020        let queue = TaskQueue::new(1);
1021        let failure_count = Arc::new(AtomicUsize::new(0));
1022
1023        let fc = failure_count.clone();
1024        queue.on_complete(move |success, _dur| {
1025            if !success {
1026                fc.fetch_add(1, Ordering::SeqCst);
1027            }
1028        });
1029
1030        let h = queue.submit(|| panic!("intentional"));
1031        let _ = h.join();
1032
1033        assert_eq!(failure_count.load(Ordering::SeqCst), 1);
1034
1035        queue.shutdown();
1036    }
1037
1038    #[test]
1039    fn on_complete_callback_reports_duration() {
1040        let queue = TaskQueue::new(1);
1041        let observed_duration = Arc::new(Mutex::new(Duration::ZERO));
1042
1043        let od = observed_duration.clone();
1044        queue.on_complete(move |_success, dur| {
1045            *od.lock().unwrap() = dur;
1046        });
1047
1048        let h = queue.submit(|| {
1049            thread::sleep(Duration::from_millis(50));
1050        });
1051        h.join().unwrap();
1052
1053        let dur = *observed_duration.lock().unwrap();
1054        assert!(dur >= Duration::from_millis(40), "duration was {dur:?}");
1055
1056        queue.shutdown();
1057    }
1058
1059    #[test]
1060    fn replacing_callback() {
1061        let queue = TaskQueue::new(1);
1062        let first_count = Arc::new(AtomicUsize::new(0));
1063        let second_count = Arc::new(AtomicUsize::new(0));
1064
1065        let fc = first_count.clone();
1066        queue.on_complete(move |_, _| {
1067            fc.fetch_add(1, Ordering::SeqCst);
1068        });
1069
1070        queue.submit(|| {}).join().unwrap();
1071
1072        let sc = second_count.clone();
1073        queue.on_complete(move |_, _| {
1074            sc.fetch_add(1, Ordering::SeqCst);
1075        });
1076
1077        queue.submit(|| {}).join().unwrap();
1078
1079        assert_eq!(first_count.load(Ordering::SeqCst), 1);
1080        assert_eq!(second_count.load(Ordering::SeqCst), 1);
1081
1082        queue.shutdown();
1083    }
1084
1085    #[test]
1086    fn test_with_capacity_rejects_when_full() {
1087        let queue = TaskQueue::with_capacity(1, 2);
1088        // Pause so tasks don't get consumed
1089        queue.pause();
1090
1091        let h1 = queue.submit(|| 1);
1092        let h2 = queue.submit(|| 2);
1093        let h3 = queue.submit(|| 3); // should be rejected
1094
1095        // h3 should immediately return QueueFull
1096        queue.resume();
1097        assert!(matches!(h3.join(), Err(TaskError::QueueFull)));
1098
1099        // h1 and h2 should succeed
1100        assert!(h1.join().is_ok());
1101        assert!(h2.join().is_ok());
1102        queue.shutdown();
1103    }
1104
1105    #[test]
1106    fn test_with_capacity_allows_within_limit() {
1107        let queue = TaskQueue::with_capacity(2, 10);
1108        let handles: Vec<_> = (0..10).map(|i| queue.submit(move || i)).collect();
1109        for (i, h) in handles.into_iter().enumerate() {
1110            assert_eq!(h.join().unwrap(), i);
1111        }
1112        queue.shutdown();
1113    }
1114
1115    #[test]
1116    fn test_pause_and_resume() {
1117        let queue = TaskQueue::new(2);
1118        queue.pause();
1119
1120        let counter = Arc::new(AtomicUsize::new(0));
1121        let c = counter.clone();
1122        queue.submit(move || {
1123            c.fetch_add(1, Ordering::SeqCst);
1124        });
1125
1126        // Give workers time to potentially process (they shouldn't)
1127        thread::sleep(Duration::from_millis(50));
1128        assert_eq!(counter.load(Ordering::SeqCst), 0);
1129
1130        queue.resume();
1131        // Give time to process
1132        thread::sleep(Duration::from_millis(100));
1133        assert_eq!(counter.load(Ordering::SeqCst), 1);
1134
1135        queue.shutdown();
1136    }
1137
1138    #[test]
1139    fn test_is_paused() {
1140        let queue = TaskQueue::new(1);
1141        assert!(!queue.is_paused());
1142        queue.pause();
1143        assert!(queue.is_paused());
1144        queue.resume();
1145        assert!(!queue.is_paused());
1146        queue.shutdown();
1147    }
1148
1149    #[test]
1150    fn test_drain_overrides_pause() {
1151        let queue = TaskQueue::new(2);
1152        queue.pause();
1153
1154        let counter = Arc::new(AtomicUsize::new(0));
1155        for _ in 0..5 {
1156            let c = counter.clone();
1157            queue.submit(move || {
1158                c.fetch_add(1, Ordering::SeqCst);
1159            });
1160        }
1161
1162        // Drain should complete all tasks even though paused
1163        queue.drain();
1164        assert_eq!(counter.load(Ordering::SeqCst), 5);
1165    }
1166
1167    #[test]
1168    fn test_pending_count() {
1169        let queue = TaskQueue::new(1);
1170        queue.pause();
1171
1172        assert_eq!(queue.pending_count(), 0);
1173        queue.submit(|| 1);
1174        queue.submit(|| 2);
1175        assert_eq!(queue.pending_count(), 2);
1176
1177        queue.resume();
1178        thread::sleep(Duration::from_millis(100));
1179        assert_eq!(queue.pending_count(), 0);
1180
1181        queue.shutdown();
1182    }
1183
1184    #[test]
1185    fn test_queue_full_error_display() {
1186        assert_eq!(
1187            format!("{}", TaskError::QueueFull),
1188            "task rejected: queue is full"
1189        );
1190    }
1191}