zero_pool/
task_future.rs

1use std::sync::atomic::Ordering;
2use std::sync::{Arc, Condvar, Mutex};
3use std::time::Duration;
4
5use crate::padded_type::PaddedAtomicUsize;
6
7/// Inner state shared between all clones of a TaskFuture
8struct TaskFutureInner {
9    remaining: PaddedAtomicUsize,
10    lock: Mutex<()>,
11    cvar: Condvar,
12}
13
14/// A future that tracks completion of submitted tasks
15///
16/// `TaskFuture` provides both blocking and non-blocking ways to wait for
17/// task completion, with efficient condition variable notification.
18/// Tasks can be checked for completion, waited on indefinitely, or
19/// waited on with a timeout.
20///
21/// `TaskFuture` is cheaply cloneable and can be shared across threads.
22/// You can drop the future immediately after submission - tasks will
23/// still complete as the task batch holds its own reference.
24#[derive(Clone)]
25pub struct TaskFuture(Arc<TaskFutureInner>);
26
27impl TaskFuture {
28    // create a new work future for the given number of tasks
29    pub(crate) fn new(task_count: usize) -> Self {
30        TaskFuture(Arc::new(TaskFutureInner {
31            remaining: PaddedAtomicUsize::new(task_count),
32            lock: Mutex::new(()),
33            cvar: Condvar::new(),
34        }))
35    }
36
37    /// Check if all tasks are complete without blocking
38    ///
39    /// Returns `true` if all tasks have finished execution.
40    /// This is a non-blocking operation using atomic loads.
41    pub fn is_complete(&self) -> bool {
42        self.0.remaining.load(Ordering::Acquire) == 0
43    }
44
45    /// Wait for all tasks to complete
46    ///
47    /// This method blocks the current thread until all tasks finish.
48    /// It uses efficient condition variable notification to minimize
49    /// CPU usage while waiting.
50    pub fn wait(self) {
51        if self.is_complete() {
52            return;
53        }
54
55        let mut guard = self.0.lock.lock().unwrap();
56
57        while !self.is_complete() {
58            guard = self.0.cvar.wait(guard).unwrap();
59        }
60    }
61
62    /// Wait for all tasks to complete with a timeout
63    ///
64    /// Returns `true` if all tasks completed within the timeout,
65    /// `false` if the timeout was reached first.
66    pub fn wait_timeout(self, timeout: Duration) -> bool {
67        if self.is_complete() {
68            return true;
69        }
70
71        let mut guard = self.0.lock.lock().unwrap();
72
73        while !self.is_complete() {
74            let (new_guard, timeout_result) = self.0.cvar.wait_timeout(guard, timeout).unwrap();
75            guard = new_guard;
76            if timeout_result.timed_out() {
77                return self.is_complete();
78            }
79        }
80        true
81    }
82
83    // completes multiple tasks, decrements counter and notifies if all done
84    pub(crate) fn complete_many(&self, count: usize) -> bool {
85        let remaining_count = self.0.remaining.fetch_sub(count, Ordering::Release);
86
87        if remaining_count != count {
88            return false;
89        }
90
91        let _guard = self.0.lock.lock().unwrap();
92        self.0.cvar.notify_all();
93        true
94    }
95}