zero_pool/task_future.rs
1use std::sync::atomic::Ordering;
2use std::sync::{Arc, Condvar, Mutex};
3use std::time::Duration;
4
5use crate::padded_type::PaddedAtomicUsize;
6
7/// A future that tracks completion of submitted tasks
8///
9/// `TaskFuture` provides both blocking and non-blocking ways to wait for
10/// task completion, with efficient condition variable notification.
11/// Tasks can be checked for completion, waited on indefinitely, or
12/// waited on with a timeout.
13#[derive(Clone)]
14pub struct TaskFuture {
15 remaining: Arc<PaddedAtomicUsize>,
16 state: Arc<(Mutex<()>, Condvar)>,
17}
18
19impl TaskFuture {
20 // create a new work future for the given number of tasks
21 pub(crate) fn new(task_count: usize) -> Self {
22 TaskFuture {
23 remaining: Arc::new(PaddedAtomicUsize::new(task_count)),
24 state: Arc::new((Mutex::new(()), Condvar::new())),
25 }
26 }
27
28 /// Check if all tasks are complete without blocking
29 ///
30 /// Returns `true` if all tasks have finished execution.
31 /// This is a non-blocking operation using atomic loads.
32 pub fn is_complete(&self) -> bool {
33 self.remaining.load(Ordering::Acquire) == 0
34 }
35
36 /// Wait for all tasks to complete
37 ///
38 /// This method blocks the current thread until all tasks finish.
39 /// It uses efficient condition variable notification to minimize
40 /// CPU usage while waiting.
41 pub fn wait(self) {
42 if self.is_complete() {
43 return;
44 }
45
46 let (lock, cvar) = &*self.state;
47 let mut guard = lock.lock().unwrap();
48
49 while !self.is_complete() {
50 guard = cvar.wait(guard).unwrap();
51 }
52 }
53
54 /// Wait for all tasks to complete with a timeout
55 ///
56 /// Returns `true` if all tasks completed within the timeout,
57 /// `false` if the timeout was reached first.
58 pub fn wait_timeout(self, timeout: Duration) -> bool {
59 if self.is_complete() {
60 return true;
61 }
62
63 let (lock, cvar) = &*self.state;
64 let mut guard = lock.lock().unwrap();
65
66 while !self.is_complete() {
67 let (new_guard, timeout_result) = cvar.wait_timeout(guard, timeout).unwrap();
68 guard = new_guard;
69 if timeout_result.timed_out() {
70 return self.is_complete();
71 }
72 }
73 true
74 }
75
76 /// Get the approximate number of remaining incomplete tasks
77 ///
78 /// This provides a way to monitor progress of task batches.
79 /// The count is approximate due to relaxed atomic ordering.
80 /// Not for precise synchronization.
81 pub fn remaining_count(&self) -> usize {
82 self.remaining.load(Ordering::Relaxed)
83 }
84
85 // completes multiple tasks, decrements counter and notifies if all done
86 #[inline]
87 pub(crate) fn complete_many(&self, count: usize) {
88 let remaining_count = self.remaining.fetch_sub(count, Ordering::Release);
89
90 // if this completed the last tasks, notify waiters
91 if remaining_count == count {
92 let (lock, cvar) = &*self.state;
93 let _guard = lock.lock().unwrap();
94 cvar.notify_all();
95 }
96 }
97}