taquba_workflow/runner.rs
1use std::collections::HashMap;
2use std::future::Future;
3use std::time::Duration;
4
5use tokio_util::sync::CancellationToken;
6
7/// A single step within a workflow run, handed to [`StepRunner::run_step`].
8///
9/// Mirrors [`taquba::JobRecord`]: the `payload` is opaque application bytes
10/// and `headers` carries user metadata you set at submission (reserved
11/// `workflow.*` keys are filtered out before the runner sees them).
12#[derive(Debug, Clone)]
13pub struct Step {
14 /// Caller-visible run identifier (the value passed to or generated by
15 /// [`crate::RunSpec`]).
16 pub run_id: String,
17 /// Zero-based step number. Step 0 is always the first step of a run, with
18 /// the original submission input as its `payload`.
19 pub step_number: u32,
20 /// Application-defined bytes. For step 0 this is the submission `input`;
21 /// for later steps it is the bytes returned by the previous step's
22 /// [`StepOutcome::Continue`] / [`StepOutcome::ContinueAfter`].
23 pub payload: Vec<u8>,
24 /// Submitter-supplied metadata, threaded through every step of the run.
25 /// Reserved `workflow.*` headers are stripped before the runner sees them.
26 pub headers: HashMap<String, String>,
27 /// The Taquba job ID for this step, useful for tracing and lease renewal.
28 pub job_id: String,
29 /// How many times Taquba has attempted to deliver this step. `1` on the
30 /// first attempt; `>1` after a lease expiry / nack retry.
31 pub attempts: u32,
32 /// Cooperative cancellation signal for the run. The runtime cancels
33 /// this token when [`crate::WorkflowRuntime::cancel`] is called while
34 /// this step is in flight, so a long-running runner (e.g. an LLM call,
35 /// a slow HTTP request) can short-circuit instead of running to
36 /// completion. Typical use:
37 ///
38 /// ```ignore
39 /// tokio::select! {
40 /// out = do_slow_work(step) => out,
41 /// _ = step.cancel_token.cancelled() => {
42 /// Ok(StepOutcome::Cancel { reason: "cooperative".into() })
43 /// }
44 /// }
45 /// ```
46 ///
47 /// Runners that ignore the token remain correct: the runtime still
48 /// discards the outcome of a cancelled step and fires the terminal
49 /// hook with [`crate::TerminalStatus::Cancelled`]. Watching the token
50 /// only reduces cancellation latency for slow steps; it doesn't
51 /// change semantics.
52 ///
53 /// The token is run-scoped, not step-scoped: once cancelled, every
54 /// subsequent step of the run (including any retry of this one)
55 /// observes `is_cancelled() == true` immediately.
56 pub cancel_token: CancellationToken,
57}
58
59/// What the runner wants the runtime to do after this step.
60#[derive(Debug, Clone)]
61#[non_exhaustive]
62pub enum StepOutcome {
63 /// Run is not finished. Enqueue the next step immediately with `payload`
64 /// as its bytes. The runtime advances `step_number` by 1.
65 Continue {
66 /// Bytes to hand to the next step's [`Step::payload`].
67 payload: Vec<u8>,
68 },
69 /// Like [`Self::Continue`] but the next step is scheduled `delay` from
70 /// now (e.g. backing off after a 429 from a provider).
71 ContinueAfter {
72 /// Bytes to hand to the next step's [`Step::payload`].
73 payload: Vec<u8>,
74 /// How long to wait before the next step becomes claimable.
75 delay: Duration,
76 },
77 /// The run is finished successfully. The runtime acks the step and fires
78 /// the configured terminal hook with
79 /// [`crate::TerminalStatus::Succeeded`] and `result` as the body.
80 Succeed {
81 /// Final result bytes handed to the terminal hook.
82 result: Vec<u8>,
83 },
84 /// The run is finished as failed by the runner's verdict; the runner
85 /// ran to completion but the workflow's logical outcome is "no" (e.g.
86 /// a validation rule rejected the input, a policy check denied the
87 /// request, an agent decided the task can't be fulfilled). The runtime
88 /// acks the step and fires the terminal hook with
89 /// [`crate::TerminalStatus::Failed`] and `reason` as the error.
90 ///
91 /// Use this for *workflow-level* failures. For *infrastructure*
92 /// failures (network outage, downstream service down, etc.) return
93 /// `Err(StepError::transient)` or `Err(StepError::permanent)` instead;
94 /// those dead-letter the step so an operator can find it via
95 /// [`taquba::Queue::dead_jobs`]. `Fail` is a successful execution with
96 /// a negative outcome and does not dead-letter.
97 Fail {
98 /// Human-readable reason recorded on [`crate::RunOutcome::error`].
99 reason: String,
100 },
101 /// The run is finished as cancelled by the runner. Use this when the
102 /// runner decides on its own that the workflow should stop early
103 /// without it being a logical failure (e.g. a downstream cancellation
104 /// signal arrived mid-step, the user-supplied input is now obsolete).
105 /// The runtime acks the step and fires the terminal hook with
106 /// [`crate::TerminalStatus::Cancelled`] and `reason` as the error.
107 ///
108 /// For *external* cancellation requested by another component in the
109 /// process, call [`crate::WorkflowRuntime::cancel`] instead; the
110 /// runtime translates that into the same `Cancelled` terminal state.
111 Cancel {
112 /// Human-readable reason recorded on [`crate::RunOutcome::error`].
113 reason: String,
114 },
115}
116
117/// Failure outcomes the runner can return.
118#[derive(Debug)]
119pub struct StepError {
120 /// Human-readable message recorded on the underlying job's `last_error`.
121 pub message: String,
122 /// Whether to retry the step or fail the run immediately.
123 pub kind: StepErrorKind,
124}
125
126impl StepError {
127 /// Build a transient error: Taquba retries the step per the queue's
128 /// backoff/`max_attempts`. Once `max_attempts` is exhausted, the step is
129 /// dead-lettered and the run terminates as failed.
130 pub fn transient(message: impl Into<String>) -> Self {
131 Self {
132 message: message.into(),
133 kind: StepErrorKind::Transient,
134 }
135 }
136
137 /// Build a permanent error: the step is dead-lettered immediately and the
138 /// run terminates as failed.
139 pub fn permanent(message: impl Into<String>) -> Self {
140 Self {
141 message: message.into(),
142 kind: StepErrorKind::Permanent,
143 }
144 }
145}
146
147impl std::fmt::Display for StepError {
148 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
149 f.write_str(&self.message)
150 }
151}
152
153impl std::error::Error for StepError {}
154
155/// Whether a [`StepError`] should retry or fail the run.
156#[derive(Debug, Clone, Copy, PartialEq, Eq)]
157#[non_exhaustive]
158pub enum StepErrorKind {
159 /// Retry per the queue's backoff policy until `max_attempts` is reached.
160 Transient,
161 /// Dead-letter the step immediately; terminate the run as failed.
162 Permanent,
163}
164
165/// User-implemented logic that advances a single workflow step.
166///
167/// Implementations must be idempotent for the same `(run_id, step_number)`:
168/// Taquba is at-least-once, so a step can be claimed and processed more than
169/// once if a lease expires before the worker acks. Returning the same
170/// `StepOutcome` for the same input is the easiest way to satisfy this.
171pub trait StepRunner: Send + Sync {
172 /// Process a single step of a workflow run. Return [`StepOutcome::Continue`]
173 /// to enqueue the next step, [`StepOutcome::Succeed`] to finish the run
174 /// successfully, [`StepOutcome::Fail`] to terminate the run as Failed by
175 /// runner verdict, [`StepOutcome::Cancel`] to terminate the run as
176 /// Cancelled by runner verdict, or `Err(StepError)` to retry /
177 /// dead-letter on infrastructure errors.
178 fn run_step(
179 &self,
180 step: &Step,
181 ) -> impl Future<Output = std::result::Result<StepOutcome, StepError>> + Send;
182}