Skip to main content

qubit_executor/task/
task_completion.rs

1/*******************************************************************************
2 *
3 *    Copyright (c) 2025 - 2026.
4 *    Haixing Hu, Qubit Co. Ltd.
5 *
6 *    All rights reserved.
7 *
8 ******************************************************************************/
9use std::sync::Arc;
10
11use super::TaskExecutionError;
12use super::TaskResult;
13use super::task_handle_inner::TaskHandleInner;
14use super::task_handle_state::TaskHandleState;
15
16/// Completion endpoint owned by a task runner.
17///
18/// This low-level endpoint is exposed so custom executor services built on top
19/// of `qubit-executor` can wire their own scheduling and cancellation hooks
20/// while still returning the standard [`crate::TaskHandle`]. Normal callers
21/// should use [`crate::TaskHandle`] and executor/service submission methods
22/// instead.
23pub struct TaskCompletion<R, E> {
24    /// Shared state updated by this completion endpoint.
25    pub(crate) inner: Arc<TaskHandleInner<R, E>>,
26}
27
28impl<R, E> Clone for TaskCompletion<R, E> {
29    /// Clones the completion endpoint for mutually exclusive finish paths.
30    ///
31    /// # Returns
32    ///
33    /// A completion endpoint sharing the same task state.
34    #[inline]
35    fn clone(&self) -> Self {
36        Self {
37            inner: Arc::clone(&self.inner),
38        }
39    }
40}
41
42impl<R, E> TaskCompletion<R, E> {
43    /// Marks the task as started if it was not cancelled first.
44    ///
45    /// # Returns
46    ///
47    /// `true` if the runner should execute the task, or `false` if the task was
48    /// already completed through cancellation.
49    pub fn start(&self) -> bool {
50        self.inner.state.write(|state| {
51            if state.completed {
52                false
53            } else {
54                state.started = true;
55                true
56            }
57        })
58    }
59
60    /// Completes the task with its final result.
61    ///
62    /// If another path has already completed the task, this result is ignored.
63    ///
64    /// # Parameters
65    ///
66    /// * `result` - Final task result to publish if the task is not already
67    ///   completed.
68    #[inline]
69    pub fn complete(&self, result: TaskResult<R, E>) {
70        self.finish(result, |_| true);
71    }
72
73    /// Starts the task and completes it with a lazily produced result.
74    ///
75    /// The supplied closure is executed only if this completion endpoint wins
76    /// the start race. If the handle was cancelled first, the closure is not
77    /// called and the existing cancellation result is preserved.
78    ///
79    /// # Parameters
80    ///
81    /// * `task` - Closure that runs the accepted task and returns its final
82    ///   result.
83    ///
84    /// # Returns
85    ///
86    /// `true` if the closure was executed and its result was published, or
87    /// `false` if the task had already been completed by cancellation.
88    #[inline]
89    pub fn start_and_complete<F>(&self, task: F) -> bool
90    where
91        F: FnOnce() -> TaskResult<R, E>,
92    {
93        if !self.start() {
94            return false;
95        }
96        self.complete(task());
97        true
98    }
99
100    /// Cancels the task if it has not started yet.
101    ///
102    /// # Returns
103    ///
104    /// `true` if this call published a cancellation result, or `false` if the
105    /// task was already started or completed.
106    #[inline]
107    pub fn cancel(&self) -> bool {
108        self.finish(Err(TaskExecutionError::Cancelled), |state| !state.started)
109    }
110
111    /// Publishes a terminal result when the supplied predicate allows it.
112    ///
113    /// # Parameters
114    ///
115    /// * `result` - Terminal result to store.
116    /// * `can_finish` - Predicate evaluated under the monitor lock to decide
117    ///   whether this path may publish the result.
118    ///
119    /// # Returns
120    ///
121    /// `true` if the result was published and waiters were notified, or
122    /// `false` if another completion path already won or `can_finish`
123    /// rejected the transition.
124    fn finish<F>(&self, result: TaskResult<R, E>, can_finish: F) -> bool
125    where
126        F: FnOnce(&TaskHandleState<R, E>) -> bool,
127    {
128        let (published, waker) = self.inner.state.write(|state| {
129            if state.completed || !can_finish(state) {
130                return (false, None);
131            }
132            state.result = Some(result);
133            state.completed = true;
134            self.inner.done.store(true);
135            (true, state.waker.take())
136        });
137        if !published {
138            return false;
139        }
140        self.inner.notify_completion();
141        if let Some(waker) = waker {
142            waker.wake();
143        }
144        true
145    }
146}