qubit_executor/task/task_completion.rs
1/*******************************************************************************
2 *
3 * Copyright (c) 2025 - 2026 Haixing Hu.
4 *
5 * SPDX-License-Identifier: Apache-2.0
6 *
7 * Licensed under the Apache License, Version 2.0.
8 *
9 ******************************************************************************/
10use std::sync::Arc;
11
12use super::TaskExecutionError;
13use super::TaskResult;
14use super::task_handle_inner::TaskHandleInner;
15use super::task_handle_state::TaskHandleState;
16
17/// Completion endpoint owned by a task runner.
18///
19/// This low-level endpoint is exposed so custom executor services built on top
20/// of `qubit-executor` can wire their own scheduling and cancellation hooks
21/// while still returning the standard [`crate::TaskHandle`]. Normal callers
22/// should use [`crate::TaskHandle`] and executor/service submission methods
23/// instead.
24pub struct TaskCompletion<R, E> {
25 /// Shared state updated by this completion endpoint.
26 pub(crate) inner: Arc<TaskHandleInner<R, E>>,
27}
28
29impl<R, E> Clone for TaskCompletion<R, E> {
30 /// Clones the completion endpoint for mutually exclusive finish paths.
31 ///
32 /// # Returns
33 ///
34 /// A completion endpoint sharing the same task state.
35 #[inline]
36 fn clone(&self) -> Self {
37 Self {
38 inner: Arc::clone(&self.inner),
39 }
40 }
41}
42
43impl<R, E> TaskCompletion<R, E> {
44 /// Marks the task as started if it was not cancelled first.
45 ///
46 /// # Returns
47 ///
48 /// `true` if the runner should execute the task, or `false` if the task was
49 /// already completed through cancellation.
50 pub fn start(&self) -> bool {
51 self.inner.state.write(|state| {
52 if state.completed {
53 false
54 } else {
55 state.started = true;
56 true
57 }
58 })
59 }
60
61 /// Completes the task with its final result.
62 ///
63 /// If another path has already completed the task, this result is ignored.
64 ///
65 /// # Parameters
66 ///
67 /// * `result` - Final task result to publish if the task is not already
68 /// completed.
69 #[inline]
70 pub fn complete(&self, result: TaskResult<R, E>) {
71 self.finish(result, |_| true);
72 }
73
74 /// Starts the task and completes it with a lazily produced result.
75 ///
76 /// The supplied closure is executed only if this completion endpoint wins
77 /// the start race. If the handle was cancelled first, the closure is not
78 /// called and the existing cancellation result is preserved.
79 ///
80 /// # Parameters
81 ///
82 /// * `task` - Closure that runs the accepted task and returns its final
83 /// result.
84 ///
85 /// # Returns
86 ///
87 /// `true` if the closure was executed and its result was published, or
88 /// `false` if the task had already been completed by cancellation.
89 #[inline]
90 pub fn start_and_complete<F>(&self, task: F) -> bool
91 where
92 F: FnOnce() -> TaskResult<R, E>,
93 {
94 if !self.start() {
95 return false;
96 }
97 self.complete(task());
98 true
99 }
100
101 /// Cancels the task if it has not started yet.
102 ///
103 /// # Returns
104 ///
105 /// `true` if this call published a cancellation result, or `false` if the
106 /// task was already started or completed.
107 #[inline]
108 pub fn cancel(&self) -> bool {
109 self.finish(Err(TaskExecutionError::Cancelled), |state| !state.started)
110 }
111
112 /// Publishes a terminal result when the supplied predicate allows it.
113 ///
114 /// # Parameters
115 ///
116 /// * `result` - Terminal result to store.
117 /// * `can_finish` - Predicate evaluated under the monitor lock to decide
118 /// whether this path may publish the result.
119 ///
120 /// # Returns
121 ///
122 /// `true` if the result was published and waiters were notified, or
123 /// `false` if another completion path already won or `can_finish`
124 /// rejected the transition.
125 fn finish<F>(&self, result: TaskResult<R, E>, can_finish: F) -> bool
126 where
127 F: FnOnce(&TaskHandleState<R, E>) -> bool,
128 {
129 let (published, waker) = self.inner.state.write(|state| {
130 if state.completed || !can_finish(state) {
131 return (false, None);
132 }
133 state.result = Some(result);
134 state.completed = true;
135 self.inner.done.store(true);
136 (true, state.waker.take())
137 });
138 if !published {
139 return false;
140 }
141 self.inner.notify_completion();
142 if let Some(waker) = waker {
143 waker.wake();
144 }
145 true
146 }
147}