zero_pool/task_future.rs
1use std::sync::atomic::{AtomicUsize, Ordering};
2use std::sync::{Arc, Condvar, Mutex};
3use std::time::Duration;
4
5use crate::padded_type::PaddedType;
6
7/// Inner state shared between all clones of a TaskFuture
8struct TaskFutureInner {
9 remaining: PaddedType<AtomicUsize>,
10 lock: Mutex<()>,
11 cvar: Condvar,
12}
13
14/// A future that tracks completion of submitted tasks
15///
16/// `TaskFuture` provides both blocking and non-blocking ways to wait for
17/// task completion. Tasks can be checked for completion, waited on
18/// indefinitely, or waited on with a timeout.
19///
20/// `TaskFuture` is cheaply cloneable and can be shared across threads.
21/// You can drop the future immediately after submission - tasks will
22/// still complete as the task batch holds its own reference.
23#[derive(Clone)]
24pub struct TaskFuture(Arc<TaskFutureInner>);
25
26impl TaskFuture {
27 // create a new work future for the given number of tasks
28 pub(crate) fn new(task_count: usize) -> Self {
29 TaskFuture(Arc::new(TaskFutureInner {
30 remaining: PaddedType::new(AtomicUsize::new(task_count)),
31 lock: Mutex::new(()),
32 cvar: Condvar::new(),
33 }))
34 }
35
36 /// Check if all tasks are complete without blocking
37 ///
38 /// Returns `true` if all tasks have finished execution.
39 /// This is a non-blocking operation using atomic loads.
40 pub fn is_complete(&self) -> bool {
41 self.0.remaining.load(Ordering::Acquire) == 0
42 }
43
44 /// Wait for all tasks to complete
45 ///
46 /// First checks completion with an atomic load; if incomplete, it locks the mutex.
47 pub fn wait(self) {
48 if self.is_complete() {
49 return;
50 }
51
52 let guard = self.0.lock.lock().unwrap();
53
54 let _guard = self
55 .0
56 .cvar
57 .wait_while(guard, |_| !self.is_complete())
58 .unwrap();
59 }
60
61 /// Wait for all tasks to complete with a timeout
62 ///
63 /// First checks completion with an atomic load; if incomplete, it locks the mutex.
64 /// Returns `true` if all tasks completed within the timeout,
65 /// `false` if the timeout was reached first.
66 pub fn wait_timeout(self, timeout: Duration) -> bool {
67 if self.is_complete() {
68 return true;
69 }
70
71 let guard = self.0.lock.lock().unwrap();
72
73 !self
74 .0
75 .cvar
76 .wait_timeout_while(guard, timeout, |_| !self.is_complete())
77 .unwrap()
78 .1
79 .timed_out()
80 }
81
82 // completes multiple tasks, decrements counter and notifies if all done
83 pub(crate) fn complete_many(&self, count: usize) -> bool {
84 let remaining_count = self.0.remaining.fetch_sub(count, Ordering::Release);
85
86 if remaining_count != count {
87 return false;
88 }
89
90 let _guard = self.0.lock.lock().unwrap();
91 self.0.cvar.notify_all();
92 true
93 }
94}