zero_pool/
task_future.rs

1use std::sync::atomic::{AtomicUsize, Ordering};
2use std::sync::{Arc, Condvar, Mutex};
3use std::time::Duration;
4
5use crate::padded_type::PaddedType;
6
7/// Inner state shared between all clones of a TaskFuture
8struct TaskFutureInner {
9    remaining: PaddedType<AtomicUsize>,
10    lock: Mutex<()>,
11    cvar: Condvar,
12}
13
14/// A future that tracks completion of submitted tasks
15///
16/// `TaskFuture` provides both blocking and non-blocking ways to wait for
17/// task completion, with efficient condition variable notification.
18/// Tasks can be checked for completion, waited on indefinitely, or
19/// waited on with a timeout.
20///
21/// `TaskFuture` is cheaply cloneable and can be shared across threads.
22/// You can drop the future immediately after submission - tasks will
23/// still complete as the task batch holds its own reference.
24#[derive(Clone)]
25pub struct TaskFuture(Arc<TaskFutureInner>);
26
27impl TaskFuture {
28    // create a new work future for the given number of tasks
29    pub(crate) fn new(task_count: usize) -> Self {
30        TaskFuture(Arc::new(TaskFutureInner {
31            remaining: PaddedType::new(AtomicUsize::new(task_count)),
32            lock: Mutex::new(()),
33            cvar: Condvar::new(),
34        }))
35    }
36
37    /// Check if all tasks are complete without blocking
38    ///
39    /// Returns `true` if all tasks have finished execution.
40    /// This is a non-blocking operation using atomic loads.
41    pub fn is_complete(&self) -> bool {
42        self.0.remaining.load(Ordering::Acquire) == 0
43    }
44
45    /// Wait for all tasks to complete
46    ///
47    /// This method blocks the current thread until all tasks finish.
48    /// It uses efficient condition variable notification to minimize
49    /// CPU usage while waiting.
50    pub fn wait(self) {
51        if self.is_complete() {
52            return;
53        }
54
55        let mut guard = self.0.lock.lock().unwrap();
56
57        while !self.is_complete() {
58            guard = self.0.cvar.wait(guard).unwrap();
59        }
60    }
61
62    /// Wait for all tasks to complete with a timeout
63    ///
64    /// Returns `true` if all tasks completed within the timeout,
65    /// `false` if the timeout was reached first.
66    pub fn wait_timeout(self, timeout: Duration) -> bool {
67        if self.is_complete() {
68            return true;
69        }
70
71        let guard = self.0.lock.lock().unwrap();
72
73        !self
74            .0
75            .cvar
76            .wait_timeout_while(guard, timeout, |_| !self.is_complete())
77            .unwrap()
78            .1
79            .timed_out()
80    }
81
82    // completes multiple tasks, decrements counter and notifies if all done
83    pub(crate) fn complete_many(&self, count: usize) -> bool {
84        let remaining_count = self.0.remaining.fetch_sub(count, Ordering::Release);
85
86        if remaining_count != count {
87            return false;
88        }
89
90        let _guard = self.0.lock.lock().unwrap();
91        self.0.cvar.notify_all();
92        true
93    }
94}