zero_pool/
task_future.rs

1use std::sync::atomic::Ordering;
2use std::sync::{Arc, Condvar, Mutex};
3use std::time::Duration;
4
5use crate::padded_type::PaddedAtomicUsize;
6
7/// A future that tracks completion of submitted tasks
8///
9/// `TaskFuture` provides both blocking and non-blocking ways to wait for
10/// task completion, with efficient condition variable notification.
11/// Tasks can be checked for completion, waited on indefinitely, or
12/// waited on with a timeout.
13#[derive(Clone)]
14pub struct TaskFuture {
15    remaining: Arc<PaddedAtomicUsize>,
16    state: Arc<(Mutex<()>, Condvar)>,
17}
18
19impl TaskFuture {
20    // create a new work future for the given number of tasks
21    pub(crate) fn new(task_count: usize) -> Self {
22        TaskFuture {
23            remaining: Arc::new(PaddedAtomicUsize::new(task_count)),
24            state: Arc::new((Mutex::new(()), Condvar::new())),
25        }
26    }
27
28    /// Check if all tasks are complete without blocking
29    ///
30    /// Returns `true` if all tasks have finished execution.
31    /// This is a non-blocking operation using atomic loads.
32    pub fn is_complete(&self) -> bool {
33        self.remaining.load(Ordering::Acquire) == 0
34    }
35
36    /// Wait for all tasks to complete
37    ///
38    /// This method blocks the current thread until all tasks finish.
39    /// It uses efficient condition variable notification to minimize
40    /// CPU usage while waiting.
41    pub fn wait(self) {
42        if self.is_complete() {
43            return;
44        }
45
46        let (lock, cvar) = &*self.state;
47        let mut guard = lock.lock().unwrap();
48
49        while !self.is_complete() {
50            guard = cvar.wait(guard).unwrap();
51        }
52    }
53
54    /// Wait for all tasks to complete with a timeout
55    ///
56    /// Returns `true` if all tasks completed within the timeout,
57    /// `false` if the timeout was reached first.
58    pub fn wait_timeout(self, timeout: Duration) -> bool {
59        if self.is_complete() {
60            return true;
61        }
62
63        let (lock, cvar) = &*self.state;
64        let mut guard = lock.lock().unwrap();
65
66        while !self.is_complete() {
67            let (new_guard, timeout_result) = cvar.wait_timeout(guard, timeout).unwrap();
68            guard = new_guard;
69            if timeout_result.timed_out() {
70                return self.is_complete();
71            }
72        }
73        true
74    }
75    
76    // completes multiple tasks, decrements counter and notifies if all done
77    pub(crate) fn complete_many(&self, count: usize) {
78        let remaining_count = self.remaining.fetch_sub(count, Ordering::Release);
79
80        // if this completed the last tasks, notify waiters
81        if remaining_count == count {
82            let (lock, cvar) = &*self.state;
83            let _guard = lock.lock().unwrap();
84            cvar.notify_all();
85        }
86    }
87}