taquba-workflow 0.2.0

Durable, at-least-once workflow runtime on top of the Taquba task queue. Particularly well-suited for AI agent runs.
Documentation
use std::collections::HashMap;
use std::future::Future;
use std::time::Duration;

use tokio_util::sync::CancellationToken;

/// A single step within a workflow run, handed to [`StepRunner::run_step`].
///
/// Mirrors [`taquba::JobRecord`]: the `payload` is opaque application bytes
/// and `headers` carries user metadata you set at submission (reserved
/// `workflow.*` keys are filtered out before the runner sees them).
#[derive(Debug, Clone)]
pub struct Step {
    /// Caller-visible run identifier (the value passed to or generated by
    /// [`crate::RunSpec`]).
    pub run_id: String,
    /// Zero-based step number. Step 0 is always the first step of a run, with
    /// the original submission input as its `payload`.
    pub step_number: u32,
    /// Application-defined bytes. For step 0 this is the submission `input`;
    /// for later steps it is the bytes returned by the previous step's
    /// [`StepOutcome::Continue`] / [`StepOutcome::ContinueAfter`].
    pub payload: Vec<u8>,
    /// Submitter-supplied metadata, threaded through every step of the run.
    /// Reserved `workflow.*` headers are stripped before the runner sees them.
    pub headers: HashMap<String, String>,
    /// The Taquba job ID for this step, useful for tracing and lease renewal.
    pub job_id: String,
    /// How many times Taquba has attempted to deliver this step. `1` on the
    /// first attempt; `>1` after a lease expiry / nack retry.
    pub attempts: u32,
    /// Cooperative cancellation signal for the run. The runtime cancels
    /// this token when [`crate::WorkflowRuntime::cancel`] is called while
    /// this step is in flight, so a long-running runner (e.g. an LLM call,
    /// a slow HTTP request) can short-circuit instead of running to
    /// completion. Typical use:
    ///
    /// ```ignore
    /// tokio::select! {
    ///     out = do_slow_work(step) => out,
    ///     _ = step.cancel_token.cancelled() => {
    ///         Ok(StepOutcome::Cancel { reason: "cooperative".into() })
    ///     }
    /// }
    /// ```
    ///
    /// Runners that ignore the token remain correct: the runtime still
    /// discards the outcome of a cancelled step and fires the terminal
    /// hook with [`crate::TerminalStatus::Cancelled`]. Watching the token
    /// only reduces cancellation latency for slow steps; it doesn't
    /// change semantics.
    ///
    /// The token is run-scoped, not step-scoped: once cancelled, every
    /// subsequent step of the run (including any retry of this one)
    /// observes `is_cancelled() == true` immediately.
    pub cancel_token: CancellationToken,
}

/// What the runner wants the runtime to do after this step.
#[derive(Debug, Clone)]
#[non_exhaustive]
pub enum StepOutcome {
    /// Run is not finished. Enqueue the next step immediately with `payload`
    /// as its bytes. The runtime advances `step_number` by 1.
    Continue {
        /// Bytes to hand to the next step's [`Step::payload`].
        payload: Vec<u8>,
    },
    /// Like [`Self::Continue`] but the next step is scheduled `delay` from
    /// now (e.g. backing off after a 429 from a provider).
    ContinueAfter {
        /// Bytes to hand to the next step's [`Step::payload`].
        payload: Vec<u8>,
        /// How long to wait before the next step becomes claimable.
        delay: Duration,
    },
    /// The run is finished successfully. The runtime acks the step and fires
    /// the configured terminal hook with
    /// [`crate::TerminalStatus::Succeeded`] and `result` as the body.
    Succeed {
        /// Final result bytes handed to the terminal hook.
        result: Vec<u8>,
    },
    /// The run is finished as failed by the runner's verdict; the runner
    /// ran to completion but the workflow's logical outcome is "no" (e.g.
    /// a validation rule rejected the input, a policy check denied the
    /// request, an agent decided the task can't be fulfilled). The runtime
    /// acks the step and fires the terminal hook with
    /// [`crate::TerminalStatus::Failed`] and `reason` as the error.
    ///
    /// Use this for *workflow-level* failures. For *infrastructure*
    /// failures (network outage, downstream service down, etc.) return
    /// `Err(StepError::transient)` or `Err(StepError::permanent)` instead;
    /// those dead-letter the step so an operator can find it via
    /// [`taquba::Queue::dead_jobs`]. `Fail` is a successful execution with
    /// a negative outcome and does not dead-letter.
    Fail {
        /// Human-readable reason recorded on [`crate::RunOutcome::error`].
        reason: String,
    },
    /// The run is finished as cancelled by the runner. Use this when the
    /// runner decides on its own that the workflow should stop early
    /// without it being a logical failure (e.g. a downstream cancellation
    /// signal arrived mid-step, the user-supplied input is now obsolete).
    /// The runtime acks the step and fires the terminal hook with
    /// [`crate::TerminalStatus::Cancelled`] and `reason` as the error.
    ///
    /// For *external* cancellation requested by another component in the
    /// process, call [`crate::WorkflowRuntime::cancel`] instead; the
    /// runtime translates that into the same `Cancelled` terminal state.
    Cancel {
        /// Human-readable reason recorded on [`crate::RunOutcome::error`].
        reason: String,
    },
}

/// Failure outcomes the runner can return.
#[derive(Debug)]
pub struct StepError {
    /// Human-readable message recorded on the underlying job's `last_error`.
    pub message: String,
    /// Whether to retry the step or fail the run immediately.
    pub kind: StepErrorKind,
}

impl StepError {
    /// Build a transient error: Taquba retries the step per the queue's
    /// backoff/`max_attempts`. Once `max_attempts` is exhausted, the step is
    /// dead-lettered and the run terminates as failed.
    pub fn transient(message: impl Into<String>) -> Self {
        Self {
            message: message.into(),
            kind: StepErrorKind::Transient,
        }
    }

    /// Build a permanent error: the step is dead-lettered immediately and the
    /// run terminates as failed.
    pub fn permanent(message: impl Into<String>) -> Self {
        Self {
            message: message.into(),
            kind: StepErrorKind::Permanent,
        }
    }
}

impl std::fmt::Display for StepError {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        f.write_str(&self.message)
    }
}

impl std::error::Error for StepError {}

/// Whether a [`StepError`] should retry or fail the run.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[non_exhaustive]
pub enum StepErrorKind {
    /// Retry per the queue's backoff policy until `max_attempts` is reached.
    Transient,
    /// Dead-letter the step immediately; terminate the run as failed.
    Permanent,
}

/// User-implemented logic that advances a single workflow step.
///
/// Implementations must be idempotent for the same `(run_id, step_number)`:
/// Taquba is at-least-once, so a step can be claimed and processed more than
/// once if a lease expires before the worker acks. Returning the same
/// `StepOutcome` for the same input is the easiest way to satisfy this.
pub trait StepRunner: Send + Sync {
    /// Process a single step of a workflow run. Return [`StepOutcome::Continue`]
    /// to enqueue the next step, [`StepOutcome::Succeed`] to finish the run
    /// successfully, [`StepOutcome::Fail`] to terminate the run as Failed by
    /// runner verdict, [`StepOutcome::Cancel`] to terminate the run as
    /// Cancelled by runner verdict, or `Err(StepError)` to retry /
    /// dead-letter on infrastructure errors.
    fn run_step(
        &self,
        step: &Step,
    ) -> impl Future<Output = std::result::Result<StepOutcome, StepError>> + Send;
}