actionqueue_executor_local/handler.rs
1//! Executor handler trait and related types for attempt-level execution.
2//!
3//! This module defines the [`ExecutorHandler`] trait that implementations must
4//! fulfill to execute attempts. The handler receives an [`ExecutorContext`]
5//! containing execution identity, payload, metadata, and optional workflow
6//! extensions (submission channel and children snapshot).
7
8pub mod cancellation;
9
10use actionqueue_core::budget::BudgetConsumption;
11use actionqueue_core::ids::{AttemptId, RunId};
12use actionqueue_core::task::safety::SafetyLevel;
13pub use cancellation::{CancellationContext, CancellationToken};
14
15use crate::children::ChildrenSnapshot;
16
17/// Execution context provided to the handler for each attempt.
18///
19/// This structure contains all information needed to execute an attempt
20/// including the run and attempt identifiers, the payload to execute,
21/// and constraints snapshot for timeout and retry behavior.
22///
23/// # Cancellation contract
24///
25/// - If `metadata.timeout_secs` is `Some`, timeout enforcement may request
26/// cancellation while execution is in progress.
27/// - Long-running handlers must poll [`CancellationToken::is_cancelled()`]
28/// at bounded intervals and exit promptly once cancellation is observed.
29/// - Returning success after cancellation has been requested does not override
30/// timeout truth; timeout classification remains authoritative.
31#[derive(Debug, Clone)]
32pub struct HandlerInput {
33 /// The unique identifier for the run instance.
34 pub run_id: RunId,
35 /// The unique identifier for this specific attempt within the run.
36 pub attempt_id: AttemptId,
37 /// The opaque payload bytes to execute.
38 pub payload: Vec<u8>,
39 /// The attempt-level execution metadata (timeout, retry policy, etc.).
40 pub metadata: AttemptMetadata,
41 /// The cancellation context for this execution. Handlers can check this
42 /// to determine if execution should be terminated early.
43 pub cancellation_context: CancellationContext,
44}
45
46/// Attempt-level execution metadata derived from task constraints.
47#[derive(Debug, Clone)]
48pub struct AttemptMetadata {
49 /// Maximum number of attempts allowed for this run.
50 pub max_attempts: u32,
51 /// Current attempt number (1-indexed).
52 pub attempt_number: u32,
53 /// Execution timeout in seconds. If `None`, no timeout.
54 pub timeout_secs: Option<u64>,
55 /// Safety level classification of the task.
56 pub safety_level: SafetyLevel,
57}
58
59/// Outcome of an attempt execution.
60///
61/// The handler return type uses typed variants to distinguish between
62/// successful completion, retryable failures (transient errors that may
63/// succeed on retry), and terminal failures (permanent errors that should
64/// not be retried). Handlers may also return `Suspended` to signal that
65/// execution was voluntarily preempted (e.g. in response to budget exhaustion).
66///
67/// All variants carry a `consumption` list: zero or more [`BudgetConsumption`]
68/// records reporting the resources consumed during this attempt. The dispatch
69/// loop durably records these after each attempt completes.
70#[derive(Debug, Clone, PartialEq, Eq)]
71pub enum HandlerOutput {
72 /// Execution completed successfully.
73 Success {
74 /// Optional output data produced by the execution.
75 output: Option<Vec<u8>>,
76 /// Resource consumption incurred during this attempt.
77 consumption: Vec<BudgetConsumption>,
78 },
79 /// Execution failed but may succeed on retry (transient failure).
80 RetryableFailure {
81 /// Error message describing the failure cause.
82 error: String,
83 /// Resource consumption incurred before the failure.
84 consumption: Vec<BudgetConsumption>,
85 },
86 /// Execution failed permanently and should not be retried.
87 TerminalFailure {
88 /// Error message describing the failure cause.
89 error: String,
90 /// Resource consumption incurred before the failure.
91 consumption: Vec<BudgetConsumption>,
92 },
93 /// Execution was voluntarily suspended (e.g. budget exhaustion signalled via
94 /// the cancellation token). The run transitions to `Suspended` and does not
95 /// count this attempt toward the `max_attempts` retry cap.
96 Suspended {
97 /// Optional partial-state bytes the handler may persist for use on resume.
98 output: Option<Vec<u8>>,
99 /// Resource consumption incurred before suspension.
100 consumption: Vec<BudgetConsumption>,
101 },
102}
103
104impl HandlerOutput {
105 /// Creates a success output with no output bytes and no consumption.
106 pub fn success() -> Self {
107 Self::Success { output: None, consumption: vec![] }
108 }
109
110 /// Creates a success output with output bytes and no consumption.
111 pub fn success_with_output(output: Vec<u8>) -> Self {
112 Self::Success { output: Some(output), consumption: vec![] }
113 }
114
115 /// Creates a retryable failure with no consumption.
116 pub fn retryable_failure(error: impl Into<String>) -> Self {
117 Self::RetryableFailure { error: error.into(), consumption: vec![] }
118 }
119
120 /// Creates a terminal failure with no consumption.
121 pub fn terminal_failure(error: impl Into<String>) -> Self {
122 Self::TerminalFailure { error: error.into(), consumption: vec![] }
123 }
124
125 /// Creates a suspended outcome with no partial state and no consumption.
126 pub fn suspended() -> Self {
127 Self::Suspended { output: None, consumption: vec![] }
128 }
129
130 /// Creates a suspended outcome with partial-state bytes and consumption.
131 pub fn suspended_with_output(output: Vec<u8>, consumption: Vec<BudgetConsumption>) -> Self {
132 Self::Suspended { output: Some(output), consumption }
133 }
134
135 /// Returns the resource consumption reported by this handler output.
136 pub fn consumption(&self) -> &[BudgetConsumption] {
137 match self {
138 Self::Success { consumption, .. }
139 | Self::RetryableFailure { consumption, .. }
140 | Self::TerminalFailure { consumption, .. }
141 | Self::Suspended { consumption, .. } => consumption,
142 }
143 }
144}
145
146/// Abstract port through which a handler can propose new child tasks.
147///
148/// The concrete implementation (backed by a tokio mpsc channel) lives in
149/// `actionqueue-workflow` and is injected by the dispatch loop. Using a trait
150/// object keeps `actionqueue-executor-local` free of tokio dependencies.
151///
152/// Submissions are fire-and-forget: if the channel is closed (e.g., the
153/// dispatch loop shut down), the submission is silently dropped. Handlers
154/// have no error path for submission failures.
155pub trait TaskSubmissionPort: Send + Sync {
156 /// Proposes a new task for creation.
157 ///
158 /// The dispatch loop validates and WAL-appends the task on the next tick.
159 /// The submitted `task_spec` should have `parent_task_id` set to associate
160 /// it with the Coordinator's task.
161 ///
162 /// `dependencies` is the list of `TaskId`s that must reach terminal success
163 /// before this task's runs are promoted to Ready.
164 fn submit(
165 &self,
166 task_spec: actionqueue_core::task::task_spec::TaskSpec,
167 dependencies: Vec<actionqueue_core::ids::TaskId>,
168 );
169}
170
171/// Full execution context provided to a handler for each attempt.
172///
173/// Extends [`HandlerInput`] with optional workflow extensions:
174/// - [`submission`](Self::submission): channel for proposing child tasks
175/// - [`children`](Self::children): snapshot of child task states
176///
177/// # Cancellation contract
178///
179/// See [`HandlerInput`] for the timeout and cancellation contract.
180pub struct ExecutorContext {
181 /// Core execution input: identifiers, payload, metadata, cancellation.
182 pub input: HandlerInput,
183 /// Optional channel for proposing new child tasks during execution.
184 ///
185 /// Present when the workflow feature is active. Handlers set
186 /// `parent_task_id` on submitted `TaskSpec`s to establish hierarchy.
187 pub submission: Option<std::sync::Arc<dyn TaskSubmissionPort>>,
188 /// Optional snapshot of child task states, taken at dispatch time.
189 ///
190 /// Present when the dispatched task has children in the hierarchy tracker.
191 /// Coordinator handlers use this to check progress and decide what to submit.
192 pub children: Option<ChildrenSnapshot>,
193}
194
195impl std::fmt::Debug for ExecutorContext {
196 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
197 f.debug_struct("ExecutorContext")
198 .field("input", &self.input)
199 .field("submission", &self.submission.as_ref().map(|_| "<TaskSubmissionPort>"))
200 .field("children", &self.children)
201 .finish()
202 }
203}
204
205/// Trait for executor implementations to fulfill for attempt execution.
206///
207/// The handler is invoked for each attempt with full context including
208/// `RunId` and `AttemptId` for traceability. Implementations must return
209/// a typed [`HandlerOutput`] that indicates whether the attempt succeeded,
210/// should be retried, or should be marked as a terminal failure.
211///
212/// # Invariants
213///
214/// - Handlers must not mutate attempt-counting or run-derivation accounting.
215/// - Handlers must use the `RunId` and `AttemptId` from [`HandlerInput`] for
216/// all logging and reporting.
217/// - Long-running work must cooperate with timeout enforcement by polling
218/// [`CancellationToken::is_cancelled()`] at a bounded cadence.
219pub trait ExecutorHandler: Send + Sync {
220 /// Executes the attempt with the provided context and returns the outcome.
221 ///
222 /// # Arguments
223 ///
224 /// * `ctx` - The full execution context, including run/attempt identifiers,
225 /// payload, metadata, and optional workflow extensions.
226 ///
227 /// # Returns
228 ///
229 /// A [`HandlerOutput`] that indicates the attempt outcome:
230 /// - [`HandlerOutput::Success`] if execution completed successfully
231 /// - [`HandlerOutput::RetryableFailure`] if execution failed but may succeed on retry
232 /// - [`HandlerOutput::TerminalFailure`] if execution failed permanently
233 /// - [`HandlerOutput::Suspended`] if execution was voluntarily preempted
234 fn execute(&self, ctx: ExecutorContext) -> HandlerOutput;
235}