Skip to main content

actionqueue_executor_local/
handler.rs

1//! Executor handler trait and related types for attempt-level execution.
2//!
3//! This module defines the [`ExecutorHandler`] trait that implementations must
4//! fulfill to execute attempts. The handler receives an [`ExecutorContext`]
5//! containing execution identity, payload, metadata, and optional workflow
6//! extensions (submission channel and children snapshot).
7
8pub mod cancellation;
9
10use actionqueue_core::budget::BudgetConsumption;
11use actionqueue_core::ids::{AttemptId, RunId};
12use actionqueue_core::task::safety::SafetyLevel;
13pub use cancellation::{CancellationContext, CancellationToken};
14
15use crate::children::ChildrenSnapshot;
16
17/// Execution context provided to the handler for each attempt.
18///
19/// This structure contains all information needed to execute an attempt
20/// including the run and attempt identifiers, the payload to execute,
21/// and constraints snapshot for timeout and retry behavior.
22///
23/// # Cancellation contract
24///
25/// - If `metadata.timeout_secs` is `Some`, timeout enforcement may request
26///   cancellation while execution is in progress.
27/// - Long-running handlers must poll [`CancellationToken::is_cancelled()`]
28///   at bounded intervals and exit promptly once cancellation is observed.
29/// - Returning success after cancellation has been requested does not override
30///   timeout truth; timeout classification remains authoritative.
31#[derive(Debug, Clone)]
32pub struct HandlerInput {
33    /// The unique identifier for the run instance.
34    pub run_id: RunId,
35    /// The unique identifier for this specific attempt within the run.
36    pub attempt_id: AttemptId,
37    /// The opaque payload bytes to execute.
38    pub payload: Vec<u8>,
39    /// The attempt-level execution metadata (timeout, retry policy, etc.).
40    pub metadata: AttemptMetadata,
41    /// The cancellation context for this execution. Handlers can check this
42    /// to determine if execution should be terminated early.
43    pub cancellation_context: CancellationContext,
44}
45
46/// Attempt-level execution metadata derived from task constraints.
47#[derive(Debug, Clone)]
48pub struct AttemptMetadata {
49    /// Maximum number of attempts allowed for this run.
50    pub max_attempts: u32,
51    /// Current attempt number (1-indexed).
52    pub attempt_number: u32,
53    /// Execution timeout in seconds. If `None`, no timeout.
54    pub timeout_secs: Option<u64>,
55    /// Safety level classification of the task.
56    pub safety_level: SafetyLevel,
57}
58
59/// Outcome of an attempt execution.
60///
61/// The handler return type uses typed variants to distinguish between
62/// successful completion, retryable failures (transient errors that may
63/// succeed on retry), and terminal failures (permanent errors that should
64/// not be retried). Handlers may also return `Suspended` to signal that
65/// execution was voluntarily preempted (e.g. in response to budget exhaustion).
66///
67/// All variants carry a `consumption` list: zero or more [`BudgetConsumption`]
68/// records reporting the resources consumed during this attempt. The dispatch
69/// loop durably records these after each attempt completes.
70#[derive(Debug, Clone, PartialEq, Eq)]
71pub enum HandlerOutput {
72    /// Execution completed successfully.
73    Success {
74        /// Optional output data produced by the execution.
75        output: Option<Vec<u8>>,
76        /// Resource consumption incurred during this attempt.
77        consumption: Vec<BudgetConsumption>,
78    },
79    /// Execution failed but may succeed on retry (transient failure).
80    RetryableFailure {
81        /// Error message describing the failure cause.
82        error: String,
83        /// Resource consumption incurred before the failure.
84        consumption: Vec<BudgetConsumption>,
85    },
86    /// Execution failed permanently and should not be retried.
87    TerminalFailure {
88        /// Error message describing the failure cause.
89        error: String,
90        /// Resource consumption incurred before the failure.
91        consumption: Vec<BudgetConsumption>,
92    },
93    /// Execution was voluntarily suspended (e.g. budget exhaustion signalled via
94    /// the cancellation token). The run transitions to `Suspended` and does not
95    /// count this attempt toward the `max_attempts` retry cap.
96    Suspended {
97        /// Optional partial-state bytes the handler may persist for use on resume.
98        output: Option<Vec<u8>>,
99        /// Resource consumption incurred before suspension.
100        consumption: Vec<BudgetConsumption>,
101    },
102}
103
104impl HandlerOutput {
105    /// Creates a success output with no output bytes and no consumption.
106    pub fn success() -> Self {
107        Self::Success { output: None, consumption: vec![] }
108    }
109
110    /// Creates a success output with output bytes and no consumption.
111    pub fn success_with_output(output: Vec<u8>) -> Self {
112        Self::Success { output: Some(output), consumption: vec![] }
113    }
114
115    /// Creates a retryable failure with no consumption.
116    pub fn retryable_failure(error: impl Into<String>) -> Self {
117        Self::RetryableFailure { error: error.into(), consumption: vec![] }
118    }
119
120    /// Creates a terminal failure with no consumption.
121    pub fn terminal_failure(error: impl Into<String>) -> Self {
122        Self::TerminalFailure { error: error.into(), consumption: vec![] }
123    }
124
125    /// Creates a suspended outcome with no partial state and no consumption.
126    pub fn suspended() -> Self {
127        Self::Suspended { output: None, consumption: vec![] }
128    }
129
130    /// Creates a suspended outcome with partial-state bytes and consumption.
131    pub fn suspended_with_output(output: Vec<u8>, consumption: Vec<BudgetConsumption>) -> Self {
132        Self::Suspended { output: Some(output), consumption }
133    }
134
135    /// Returns the resource consumption reported by this handler output.
136    pub fn consumption(&self) -> &[BudgetConsumption] {
137        match self {
138            Self::Success { consumption, .. }
139            | Self::RetryableFailure { consumption, .. }
140            | Self::TerminalFailure { consumption, .. }
141            | Self::Suspended { consumption, .. } => consumption,
142        }
143    }
144}
145
146/// Abstract port through which a handler can propose new child tasks.
147///
148/// The concrete implementation (backed by a tokio mpsc channel) lives in
149/// `actionqueue-workflow` and is injected by the dispatch loop. Using a trait
150/// object keeps `actionqueue-executor-local` free of tokio dependencies.
151///
152/// Submissions are fire-and-forget: if the channel is closed (e.g., the
153/// dispatch loop shut down), the submission is silently dropped. Handlers
154/// have no error path for submission failures.
155pub trait TaskSubmissionPort: Send + Sync {
156    /// Proposes a new task for creation.
157    ///
158    /// The dispatch loop validates and WAL-appends the task on the next tick.
159    /// The submitted `task_spec` should have `parent_task_id` set to associate
160    /// it with the Coordinator's task.
161    ///
162    /// `dependencies` is the list of `TaskId`s that must reach terminal success
163    /// before this task's runs are promoted to Ready.
164    fn submit(
165        &self,
166        task_spec: actionqueue_core::task::task_spec::TaskSpec,
167        dependencies: Vec<actionqueue_core::ids::TaskId>,
168    );
169}
170
171/// Full execution context provided to a handler for each attempt.
172///
173/// Extends [`HandlerInput`] with optional workflow extensions:
174/// - [`submission`](Self::submission): channel for proposing child tasks
175/// - [`children`](Self::children): snapshot of child task states
176///
177/// # Cancellation contract
178///
179/// See [`HandlerInput`] for the timeout and cancellation contract.
180pub struct ExecutorContext {
181    /// Core execution input: identifiers, payload, metadata, cancellation.
182    pub input: HandlerInput,
183    /// Optional channel for proposing new child tasks during execution.
184    ///
185    /// Present when the workflow feature is active. Handlers set
186    /// `parent_task_id` on submitted `TaskSpec`s to establish hierarchy.
187    pub submission: Option<std::sync::Arc<dyn TaskSubmissionPort>>,
188    /// Optional snapshot of child task states, taken at dispatch time.
189    ///
190    /// Present when the dispatched task has children in the hierarchy tracker.
191    /// Coordinator handlers use this to check progress and decide what to submit.
192    pub children: Option<ChildrenSnapshot>,
193}
194
195impl std::fmt::Debug for ExecutorContext {
196    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
197        f.debug_struct("ExecutorContext")
198            .field("input", &self.input)
199            .field("submission", &self.submission.as_ref().map(|_| "<TaskSubmissionPort>"))
200            .field("children", &self.children)
201            .finish()
202    }
203}
204
205/// Trait for executor implementations to fulfill for attempt execution.
206///
207/// The handler is invoked for each attempt with full context including
208/// `RunId` and `AttemptId` for traceability. Implementations must return
209/// a typed [`HandlerOutput`] that indicates whether the attempt succeeded,
210/// should be retried, or should be marked as a terminal failure.
211///
212/// # Invariants
213///
214/// - Handlers must not mutate attempt-counting or run-derivation accounting.
215/// - Handlers must use the `RunId` and `AttemptId` from [`HandlerInput`] for
216///   all logging and reporting.
217/// - Long-running work must cooperate with timeout enforcement by polling
218///   [`CancellationToken::is_cancelled()`] at a bounded cadence.
219pub trait ExecutorHandler: Send + Sync {
220    /// Executes the attempt with the provided context and returns the outcome.
221    ///
222    /// # Arguments
223    ///
224    /// * `ctx` - The full execution context, including run/attempt identifiers,
225    ///   payload, metadata, and optional workflow extensions.
226    ///
227    /// # Returns
228    ///
229    /// A [`HandlerOutput`] that indicates the attempt outcome:
230    /// - [`HandlerOutput::Success`] if execution completed successfully
231    /// - [`HandlerOutput::RetryableFailure`] if execution failed but may succeed on retry
232    /// - [`HandlerOutput::TerminalFailure`] if execution failed permanently
233    /// - [`HandlerOutput::Suspended`] if execution was voluntarily preempted
234    fn execute(&self, ctx: ExecutorContext) -> HandlerOutput;
235}