zero_pool/
task_future.rs

1use std::sync::atomic::{AtomicUsize, Ordering};
2use std::sync::{Arc, Condvar, Mutex};
3use std::time::Duration;
4
5use crate::padded_type::PaddedType;
6
7/// Inner state shared between all clones of a TaskFuture
8struct TaskFutureInner {
9    remaining: PaddedType<AtomicUsize>,
10    lock: Mutex<()>,
11    cvar: Condvar,
12}
13
14/// A future that tracks completion of submitted tasks
15///
16/// `TaskFuture` provides both blocking and non-blocking ways to wait for
17/// task completion. Tasks can be checked for completion, waited on
18/// indefinitely, or waited on with a timeout.
19///
20/// `TaskFuture` is cheaply cloneable and can be shared across threads.
21/// You can drop the future immediately after submission - tasks will
22/// still complete as the task batch holds its own reference.
23#[derive(Clone)]
24pub struct TaskFuture(Arc<TaskFutureInner>);
25
26impl TaskFuture {
27    // create a new work future for the given number of tasks
28    pub(crate) fn new(task_count: usize) -> Self {
29        TaskFuture(Arc::new(TaskFutureInner {
30            remaining: PaddedType::new(AtomicUsize::new(task_count)),
31            lock: Mutex::new(()),
32            cvar: Condvar::new(),
33        }))
34    }
35
36    /// Check if all tasks are complete without blocking
37    ///
38    /// Returns `true` if all tasks have finished execution.
39    /// This is a non-blocking operation using atomic loads.
40    pub fn is_complete(&self) -> bool {
41        self.0.remaining.load(Ordering::Acquire) == 0
42    }
43
44    /// Wait for all tasks to complete
45    ///
46    /// First checks completion with an atomic load; if incomplete, it locks the mutex.
47    pub fn wait(self) {
48        if self.is_complete() {
49            return;
50        }
51
52        let guard = self.0.lock.lock().unwrap();
53
54        let _guard = self
55            .0
56            .cvar
57            .wait_while(guard, |_| !self.is_complete())
58            .unwrap();
59    }
60
61    /// Wait for all tasks to complete with a timeout
62    ///
63    /// First checks completion with an atomic load; if incomplete, it locks the mutex.
64    /// Returns `true` if all tasks completed within the timeout,
65    /// `false` if the timeout was reached first.
66    pub fn wait_timeout(self, timeout: Duration) -> bool {
67        if self.is_complete() {
68            return true;
69        }
70
71        let guard = self.0.lock.lock().unwrap();
72
73        !self
74            .0
75            .cvar
76            .wait_timeout_while(guard, timeout, |_| !self.is_complete())
77            .unwrap()
78            .1
79            .timed_out()
80    }
81
82    // completes multiple tasks, decrements counter and notifies if all done
83    pub(crate) fn complete_many(&self, count: usize) -> bool {
84        let remaining_count = self.0.remaining.fetch_sub(count, Ordering::Release);
85
86        if remaining_count != count {
87            return false;
88        }
89
90        let _guard = self.0.lock.lock().unwrap();
91        self.0.cvar.notify_all();
92        true
93    }
94}