actionqueue_executor_local/handler.rs
1//! Executor handler trait and related types for attempt-level execution.
2//!
3//! This module defines the [`ExecutorHandler`] trait that implementations must
4//! fulfill to execute attempts. The handler receives an [`ExecutorContext`]
5//! containing execution identity, payload, metadata, and optional workflow
6//! extensions (submission channel and children snapshot).
7
8pub mod cancellation;
9
10use actionqueue_core::budget::BudgetConsumption;
11use actionqueue_core::ids::{AttemptId, RunId};
12use actionqueue_core::task::safety::SafetyLevel;
13pub use cancellation::{CancellationContext, CancellationToken};
14
15use crate::children::ChildrenSnapshot;
16
17/// Execution context provided to the handler for each attempt.
18///
19/// This structure contains all information needed to execute an attempt
20/// including the run and attempt identifiers, the payload to execute,
21/// and constraints snapshot for timeout and retry behavior.
22///
23/// # Cancellation contract
24///
25/// - If `metadata.timeout_secs` is `Some`, timeout enforcement may request
26/// cancellation while execution is in progress.
27/// - Long-running handlers must poll [`CancellationToken::is_cancelled()`]
28/// at bounded intervals and exit promptly once cancellation is observed.
29/// - Returning success after cancellation has been requested does not override
30/// timeout truth; timeout classification remains authoritative.
31#[derive(Debug, Clone)]
32pub struct HandlerInput {
33 /// The unique identifier for the run instance.
34 pub run_id: RunId,
35 /// The unique identifier for this specific attempt within the run.
36 pub attempt_id: AttemptId,
37 /// The opaque payload bytes to execute.
38 pub payload: Vec<u8>,
39 /// The attempt-level execution metadata (timeout, retry policy, etc.).
40 pub metadata: AttemptMetadata,
41 /// The cancellation context for this execution. Handlers can check this
42 /// to determine if execution should be terminated early.
43 pub cancellation_context: CancellationContext,
44}
45
46/// Attempt-level execution metadata derived from task constraints.
47#[derive(Debug, Clone)]
48pub struct AttemptMetadata {
49 /// Maximum number of attempts allowed for this run.
50 pub max_attempts: u32,
51 /// Current attempt number (1-indexed).
52 pub attempt_number: u32,
53 /// Execution timeout in seconds. If `None`, no timeout.
54 pub timeout_secs: Option<u64>,
55 /// Safety level classification of the task.
56 pub safety_level: SafetyLevel,
57}
58
59/// Outcome of an attempt execution.
60///
61/// The handler return type uses typed variants to distinguish between
62/// successful completion, retryable failures (transient errors that may
63/// succeed on retry), and terminal failures (permanent errors that should
64/// not be retried). Handlers may also return `Suspended` to signal that
65/// execution was voluntarily preempted (e.g. in response to budget exhaustion).
66///
67/// All variants carry a `consumption` list: zero or more [`BudgetConsumption`]
68/// records reporting the resources consumed during this attempt. The dispatch
69/// loop durably records these after each attempt completes.
70#[derive(Debug, Clone, PartialEq, Eq)]
71pub enum HandlerOutput {
72 /// Execution completed successfully.
73 Success {
74 /// Optional output data produced by the execution.
75 output: Option<Vec<u8>>,
76 /// Resource consumption incurred during this attempt.
77 consumption: Vec<BudgetConsumption>,
78 },
79 /// Execution failed but may succeed on retry (transient failure).
80 RetryableFailure {
81 /// Error message describing the failure cause.
82 error: String,
83 /// Resource consumption incurred before the failure.
84 consumption: Vec<BudgetConsumption>,
85 },
86 /// Execution failed permanently and should not be retried.
87 TerminalFailure {
88 /// Error message describing the failure cause.
89 error: String,
90 /// Resource consumption incurred before the failure.
91 consumption: Vec<BudgetConsumption>,
92 },
93 /// Execution was voluntarily suspended (e.g. budget exhaustion signalled via
94 /// the cancellation token). The run transitions to `Suspended` and does not
95 /// count this attempt toward the `max_attempts` retry cap.
96 Suspended {
97 /// Optional partial-state bytes the handler may persist for use on resume.
98 output: Option<Vec<u8>>,
99 /// Resource consumption incurred before suspension.
100 consumption: Vec<BudgetConsumption>,
101 },
102}
103
104impl HandlerOutput {
105 /// Creates a success output with no output bytes and no consumption.
106 pub fn success() -> Self {
107 Self::Success { output: None, consumption: vec![] }
108 }
109
110 /// Creates a success output with output bytes and no consumption.
111 pub fn success_with_output(output: Vec<u8>) -> Self {
112 Self::Success { output: Some(output), consumption: vec![] }
113 }
114
115 /// Creates a success output with output bytes and consumption records.
116 pub fn success_with_consumption(
117 output: Option<Vec<u8>>,
118 consumption: Vec<BudgetConsumption>,
119 ) -> Self {
120 Self::Success { output, consumption }
121 }
122
123 /// Creates a retryable failure with no consumption.
124 pub fn retryable_failure(error: impl Into<String>) -> Self {
125 Self::RetryableFailure { error: error.into(), consumption: vec![] }
126 }
127
128 /// Creates a terminal failure with no consumption.
129 pub fn terminal_failure(error: impl Into<String>) -> Self {
130 Self::TerminalFailure { error: error.into(), consumption: vec![] }
131 }
132
133 /// Creates a suspended outcome with no partial state and no consumption.
134 pub fn suspended() -> Self {
135 Self::Suspended { output: None, consumption: vec![] }
136 }
137
138 /// Creates a suspended outcome with partial-state bytes and consumption.
139 pub fn suspended_with_output(output: Vec<u8>, consumption: Vec<BudgetConsumption>) -> Self {
140 Self::Suspended { output: Some(output), consumption }
141 }
142
143 /// Returns the resource consumption reported by this handler output.
144 pub fn consumption(&self) -> &[BudgetConsumption] {
145 match self {
146 Self::Success { consumption, .. }
147 | Self::RetryableFailure { consumption, .. }
148 | Self::TerminalFailure { consumption, .. }
149 | Self::Suspended { consumption, .. } => consumption,
150 }
151 }
152}
153
154/// Abstract port through which a handler can propose new child tasks.
155///
156/// The concrete implementation (backed by a tokio mpsc channel) lives in
157/// `actionqueue-workflow` and is injected by the dispatch loop. Using a trait
158/// object keeps `actionqueue-executor-local` free of tokio dependencies.
159///
160/// Submissions are fire-and-forget: if the channel is closed (e.g., the
161/// dispatch loop shut down), the submission is silently dropped. Handlers
162/// have no error path for submission failures.
163pub trait TaskSubmissionPort: Send + Sync {
164 /// Proposes a new task for creation.
165 ///
166 /// The dispatch loop validates and WAL-appends the task on the next tick.
167 /// The submitted `task_spec` should have `parent_task_id` set to associate
168 /// it with the Coordinator's task.
169 ///
170 /// `dependencies` is the list of `TaskId`s that must reach terminal success
171 /// before this task's runs are promoted to Ready.
172 fn submit(
173 &self,
174 task_spec: actionqueue_core::task::task_spec::TaskSpec,
175 dependencies: Vec<actionqueue_core::ids::TaskId>,
176 );
177}
178
179/// Full execution context provided to a handler for each attempt.
180///
181/// Extends [`HandlerInput`] with optional workflow extensions:
182/// - [`submission`](Self::submission): channel for proposing child tasks
183/// - [`children`](Self::children): snapshot of child task states
184///
185/// # Cancellation contract
186///
187/// See [`HandlerInput`] for the timeout and cancellation contract.
188pub struct ExecutorContext {
189 /// Core execution input: identifiers, payload, metadata, cancellation.
190 pub input: HandlerInput,
191 /// Optional channel for proposing new child tasks during execution.
192 ///
193 /// Present when the workflow feature is active. Handlers set
194 /// `parent_task_id` on submitted `TaskSpec`s to establish hierarchy.
195 pub submission: Option<std::sync::Arc<dyn TaskSubmissionPort>>,
196 /// Optional snapshot of child task states, taken at dispatch time.
197 ///
198 /// Present when the dispatched task has children in the hierarchy tracker.
199 /// Coordinator handlers use this to check progress and decide what to submit.
200 pub children: Option<ChildrenSnapshot>,
201}
202
203impl std::fmt::Debug for ExecutorContext {
204 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
205 f.debug_struct("ExecutorContext")
206 .field("input", &self.input)
207 .field("submission", &self.submission.as_ref().map(|_| "<TaskSubmissionPort>"))
208 .field("children", &self.children)
209 .finish()
210 }
211}
212
213/// Trait for executor implementations to fulfill for attempt execution.
214///
215/// The handler is invoked for each attempt with full context including
216/// `RunId` and `AttemptId` for traceability. Implementations must return
217/// a typed [`HandlerOutput`] that indicates whether the attempt succeeded,
218/// should be retried, or should be marked as a terminal failure.
219///
220/// # Invariants
221///
222/// - Handlers must not mutate attempt-counting or run-derivation accounting.
223/// - Handlers must use the `RunId` and `AttemptId` from [`HandlerInput`] for
224/// all logging and reporting.
225/// - Long-running work must cooperate with timeout enforcement by polling
226/// [`CancellationToken::is_cancelled()`] at a bounded cadence.
227pub trait ExecutorHandler: Send + Sync {
228 /// Executes the attempt with the provided context and returns the outcome.
229 ///
230 /// # Arguments
231 ///
232 /// * `ctx` - The full execution context, including run/attempt identifiers,
233 /// payload, metadata, and optional workflow extensions.
234 ///
235 /// # Returns
236 ///
237 /// A [`HandlerOutput`] that indicates the attempt outcome:
238 /// - [`HandlerOutput::Success`] if execution completed successfully
239 /// - [`HandlerOutput::RetryableFailure`] if execution failed but may succeed on retry
240 /// - [`HandlerOutput::TerminalFailure`] if execution failed permanently
241 /// - [`HandlerOutput::Suspended`] if execution was voluntarily preempted
242 fn execute(&self, ctx: ExecutorContext) -> HandlerOutput;
243}