Skip to main content

taquba_workflow/
runner.rs

1use std::collections::HashMap;
2use std::future::Future;
3use std::time::Duration;
4
5use tokio_util::sync::CancellationToken;
6
7/// A single step within a workflow run, handed to [`StepRunner::run_step`].
8///
9/// Mirrors [`taquba::JobRecord`]: the `payload` is opaque application bytes
10/// and `headers` carries user metadata you set at submission (reserved
11/// `workflow.*` keys are filtered out before the runner sees them).
12#[derive(Debug, Clone)]
13pub struct Step {
14    /// Caller-visible run identifier (the value passed to or generated by
15    /// [`crate::RunSpec`]).
16    pub run_id: String,
17    /// Zero-based step number. Step 0 is always the first step of a run, with
18    /// the original submission input as its `payload`.
19    pub step_number: u32,
20    /// Application-defined bytes. For step 0 this is the submission `input`;
21    /// for later steps it is the bytes returned by the previous step's
22    /// [`StepOutcome::Continue`] / [`StepOutcome::ContinueAfter`].
23    pub payload: Vec<u8>,
24    /// Submitter-supplied metadata, threaded through every step of the run.
25    /// Reserved `workflow.*` headers are stripped before the runner sees them.
26    pub headers: HashMap<String, String>,
27    /// The Taquba job ID for this step, useful for tracing and lease renewal.
28    pub job_id: String,
29    /// How many times Taquba has attempted to deliver this step. `1` on the
30    /// first attempt; `>1` after a lease expiry / nack retry.
31    pub attempts: u32,
32    /// Cooperative cancellation signal for the run. The runtime cancels
33    /// this token when [`crate::WorkflowRuntime::cancel`] is called while
34    /// this step is in flight, so a long-running runner (e.g. an LLM call,
35    /// a slow HTTP request) can short-circuit instead of running to
36    /// completion. Typical use:
37    ///
38    /// ```ignore
39    /// tokio::select! {
40    ///     out = do_slow_work(step) => out,
41    ///     _ = step.cancel_token.cancelled() => {
42    ///         Ok(StepOutcome::Cancel { reason: "cooperative".into() })
43    ///     }
44    /// }
45    /// ```
46    ///
47    /// Runners that ignore the token remain correct: the runtime still
48    /// discards the outcome of a cancelled step and fires the terminal
49    /// hook with [`crate::TerminalStatus::Cancelled`]. Watching the token
50    /// only reduces cancellation latency for slow steps; it doesn't
51    /// change semantics.
52    ///
53    /// The token is run-scoped, not step-scoped: once cancelled, every
54    /// subsequent step of the run (including any retry of this one)
55    /// observes `is_cancelled() == true` immediately.
56    pub cancel_token: CancellationToken,
57}
58
59/// What the runner wants the runtime to do after this step.
60#[derive(Debug, Clone)]
61#[non_exhaustive]
62pub enum StepOutcome {
63    /// Run is not finished. Enqueue the next step immediately with `payload`
64    /// as its bytes. The runtime advances `step_number` by 1.
65    Continue {
66        /// Bytes to hand to the next step's [`Step::payload`].
67        payload: Vec<u8>,
68    },
69    /// Like [`Self::Continue`] but the next step is scheduled `delay` from
70    /// now (e.g. backing off after a 429 from a provider).
71    ContinueAfter {
72        /// Bytes to hand to the next step's [`Step::payload`].
73        payload: Vec<u8>,
74        /// How long to wait before the next step becomes claimable.
75        delay: Duration,
76    },
77    /// The run is finished successfully. The runtime acks the step and fires
78    /// the configured terminal hook with
79    /// [`crate::TerminalStatus::Succeeded`] and `result` as the body.
80    Succeed {
81        /// Final result bytes handed to the terminal hook.
82        result: Vec<u8>,
83    },
84    /// The run is finished as failed by the runner's verdict; the runner
85    /// ran to completion but the workflow's logical outcome is "no" (e.g.
86    /// a validation rule rejected the input, a policy check denied the
87    /// request, an agent decided the task can't be fulfilled). The runtime
88    /// acks the step and fires the terminal hook with
89    /// [`crate::TerminalStatus::Failed`] and `reason` as the error.
90    ///
91    /// Use this for *workflow-level* failures. For *infrastructure*
92    /// failures (network outage, downstream service down, etc.) return
93    /// `Err(StepError::transient)` or `Err(StepError::permanent)` instead;
94    /// those dead-letter the step so an operator can find it via
95    /// [`taquba::Queue::dead_jobs`]. `Fail` is a successful execution with
96    /// a negative outcome and does not dead-letter.
97    Fail {
98        /// Human-readable reason recorded on [`crate::RunOutcome::error`].
99        reason: String,
100    },
101    /// The run is finished as cancelled by the runner. Use this when the
102    /// runner decides on its own that the workflow should stop early
103    /// without it being a logical failure (e.g. a downstream cancellation
104    /// signal arrived mid-step, the user-supplied input is now obsolete).
105    /// The runtime acks the step and fires the terminal hook with
106    /// [`crate::TerminalStatus::Cancelled`] and `reason` as the error.
107    ///
108    /// For *external* cancellation requested by another component in the
109    /// process, call [`crate::WorkflowRuntime::cancel`] instead; the
110    /// runtime translates that into the same `Cancelled` terminal state.
111    Cancel {
112        /// Human-readable reason recorded on [`crate::RunOutcome::error`].
113        reason: String,
114    },
115}
116
117/// Failure outcomes the runner can return.
118#[derive(Debug)]
119pub struct StepError {
120    /// Human-readable message recorded on the underlying job's `last_error`.
121    pub message: String,
122    /// Whether to retry the step or fail the run immediately.
123    pub kind: StepErrorKind,
124}
125
126impl StepError {
127    /// Build a transient error: Taquba retries the step per the queue's
128    /// backoff/`max_attempts`. Once `max_attempts` is exhausted, the step is
129    /// dead-lettered and the run terminates as failed.
130    pub fn transient(message: impl Into<String>) -> Self {
131        Self {
132            message: message.into(),
133            kind: StepErrorKind::Transient,
134        }
135    }
136
137    /// Build a permanent error: the step is dead-lettered immediately and the
138    /// run terminates as failed.
139    pub fn permanent(message: impl Into<String>) -> Self {
140        Self {
141            message: message.into(),
142            kind: StepErrorKind::Permanent,
143        }
144    }
145}
146
147impl std::fmt::Display for StepError {
148    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
149        f.write_str(&self.message)
150    }
151}
152
153impl std::error::Error for StepError {}
154
155/// Whether a [`StepError`] should retry or fail the run.
156#[derive(Debug, Clone, Copy, PartialEq, Eq)]
157#[non_exhaustive]
158pub enum StepErrorKind {
159    /// Retry per the queue's backoff policy until `max_attempts` is reached.
160    Transient,
161    /// Dead-letter the step immediately; terminate the run as failed.
162    Permanent,
163}
164
165/// User-implemented logic that advances a single workflow step.
166///
167/// Implementations must be idempotent for the same `(run_id, step_number)`:
168/// Taquba is at-least-once, so a step can be claimed and processed more than
169/// once if a lease expires before the worker acks. Returning the same
170/// `StepOutcome` for the same input is the easiest way to satisfy this.
171pub trait StepRunner: Send + Sync {
172    /// Process a single step of a workflow run. Return [`StepOutcome::Continue`]
173    /// to enqueue the next step, [`StepOutcome::Succeed`] to finish the run
174    /// successfully, [`StepOutcome::Fail`] to terminate the run as Failed by
175    /// runner verdict, [`StepOutcome::Cancel`] to terminate the run as
176    /// Cancelled by runner verdict, or `Err(StepError)` to retry /
177    /// dead-letter on infrastructure errors.
178    fn run_step(
179        &self,
180        step: &Step,
181    ) -> impl Future<Output = std::result::Result<StepOutcome, StepError>> + Send;
182}