qubit_executor/task/task_slot.rs
1/*******************************************************************************
2 *
3 * Copyright (c) 2025 - 2026 Haixing Hu.
4 *
5 * SPDX-License-Identifier: Apache-2.0
6 *
7 * Licensed under the Apache License, Version 2.0.
8 *
9 ******************************************************************************/
10use std::sync::Arc;
11
12use qubit_function::Callable;
13
14use super::{
15 TaskResult,
16 running_task_slot::RunningTaskSlot,
17 task_runner::TaskRunner,
18 task_state::TaskState,
19};
20
21/// Runner-side slot for one task submission.
22///
23/// This low-level endpoint is exposed so custom executor services built on top
24/// of `qubit-executor` can wire their own scheduling while still returning the
25/// standard [`crate::TaskHandle`]. Executor implementations should call
26/// [`Self::accept`] only after submission succeeds; this arms lifecycle hook
27/// reporting for later start and finish events. Normal callers should use
28/// [`crate::TaskHandle`] and executor/service submission methods instead.
29///
30/// Dropping an accepted slot reports [`crate::TaskExecutionError::Dropped`]
31/// because it means the runner endpoint was abandoned without making an
32/// explicit terminal decision. Executor services that intentionally discard
33/// accepted work before it starts, such as during
34/// [`crate::ExecutorService::stop`], should call [`Self::cancel_unstarted`] so
35/// callers observe [`crate::TaskExecutionError::Cancelled`] instead.
36pub struct TaskSlot<R, E> {
37 /// Shared state updated by this completion endpoint.
38 pub(crate) state: Option<Arc<TaskState<R, E>>>,
39}
40
41impl<R, E> TaskSlot<R, E> {
42 /// Returns the shared state owned by this slot.
43 ///
44 /// # Returns
45 ///
46 /// A reference to the task state. The state is always present until a
47 /// consuming runner-side API transfers it to another endpoint.
48 #[inline]
49 fn state(&self) -> &TaskState<R, E> {
50 self.state
51 .as_deref()
52 .expect("task slot state should be present")
53 }
54
55 /// Marks this runner endpoint as accepted and arms lifecycle hook reporting.
56 ///
57 /// Calling this method emits `on_accepted` before any later `on_started` or
58 /// `on_finished` event for the same task. Executor implementations must call
59 /// it only after submission has succeeded. Dropping a slot before acceptance
60 /// still releases result waiters with `Dropped`, but does not emit lifecycle
61 /// hook events for a task that was rejected before acceptance.
62 #[inline]
63 pub fn accept(&self) {
64 let _accepted_now = self.state().accept();
65 }
66
67 /// Cancels this accepted runner endpoint before it starts running.
68 ///
69 /// This method is the runner-side service-provider API for an executor or
70 /// executor service that intentionally removes queued, scheduled, or other
71 /// unstarted accepted work. It publishes
72 /// [`crate::TaskExecutionError::Cancelled`] when this slot wins the
73 /// pending-task terminal-state race. The slot is consumed to make the
74 /// explicit cancellation decision the final runner-side action.
75 ///
76 /// If the slot has already been accepted, successful cancellation emits the
77 /// finished lifecycle hook with [`crate::TaskStatus::Cancelled`]. If it has
78 /// not been accepted, cancellation still releases result waiters but does
79 /// not emit lifecycle hook events.
80 ///
81 /// # Returns
82 ///
83 /// `true` if this call moved the task from pending to cancelled, or `false`
84 /// if another path had already started or completed the task.
85 #[inline]
86 pub fn cancel_unstarted(mut self) -> bool {
87 self.state
88 .take()
89 .is_some_and(|state| state.try_cancel_pending())
90 }
91
92 /// Attempts to move this slot from pending into running state.
93 ///
94 /// This method consumes the pending slot. On success, it returns a
95 /// [`RunningTaskSlot`] that must be completed or dropped. On failure, the
96 /// original pending slot is returned so the caller can inspect or drop it;
97 /// the user callable must not be executed in that case.
98 ///
99 /// # Returns
100 ///
101 /// `Ok(RunningTaskSlot)` if this call won the pending-to-running race, or
102 /// `Err(TaskSlot)` if the task had already been cancelled or completed.
103 pub fn try_start(mut self) -> Result<RunningTaskSlot<R, E>, Self> {
104 if !self.start() {
105 return Err(self);
106 }
107 let state = self
108 .state
109 .take()
110 .expect("started task slot state should be present");
111 Ok(RunningTaskSlot::new(state))
112 }
113
114 /// Marks the task as started if it was not cancelled first.
115 ///
116 /// # Returns
117 ///
118 /// `true` if the runner should execute the task, or `false` if the task was
119 /// already completed through cancellation.
120 pub(crate) fn start(&self) -> bool {
121 self.state().try_start(self.state().is_accepted())
122 }
123
124 /// Starts the task and completes it with a lazily produced result.
125 ///
126 /// The supplied closure is executed only if this completion endpoint wins
127 /// the start race. If the handle was cancelled first, the closure is not
128 /// called and the existing cancellation result is preserved.
129 ///
130 /// # Parameters
131 ///
132 /// * `task` - Closure that runs the accepted task and returns its final
133 /// result.
134 ///
135 /// # Returns
136 ///
137 /// `true` if the closure was executed and its result was published, or
138 /// `false` if the task had already been completed by cancellation.
139 #[inline]
140 pub(crate) fn start_and_complete<F>(self, task: F) -> bool
141 where
142 F: FnOnce() -> TaskResult<R, E>,
143 {
144 let Ok(running) = self.try_start() else {
145 return false;
146 };
147 running.complete(task())
148 }
149
150 /// Starts this slot and runs a callable to completion.
151 ///
152 /// # Parameters
153 ///
154 /// * `task` - Callable to run if the task has not been cancelled.
155 ///
156 /// # Returns
157 ///
158 /// `true` if the callable ran and published a result, or `false` if the
159 /// task had already been cancelled.
160 #[inline]
161 pub fn run<C>(self, task: C) -> bool
162 where
163 C: Callable<R, E>,
164 {
165 self.start_and_complete(|| TaskRunner::new(task).call())
166 }
167}
168
169impl<R, E> Drop for TaskSlot<R, E> {
170 /// Publishes a dropped-result error when the runner endpoint is abandoned.
171 #[inline]
172 fn drop(&mut self) {
173 if let Some(state) = &self.state {
174 let _ignored = state.try_drop_unfinished(state.is_accepted());
175 }
176 }
177}