Skip to main content

Crate taquba_workflow

Crate taquba_workflow 

Source
Expand description

Durable, at-least-once workflow runtime on top of the Taquba task queue.

taquba-workflow is the plumbing for any multi-step process that benefits from durable state between steps: idempotent step execution, retries with backoff, graceful shutdown / restart, and terminal-state notifications. Implement StepRunner with bytes-in / bytes-out per-step logic and the runtime persists everything else.

It’s particularly well-suited for AI agent runs, where each step is one LLM call (or one full agent loop) and a process restart between steps shouldn’t lose expensive intermediate work. See examples/rig_agent.rs for a Rig integration. The runtime itself is framework-neutral: equally usable for ETL pipelines, document processing, payment flows, etc.

§What this is / isn’t

taquba-workflow is an imperative step orchestrator: at each step, the runner code decides what happens next by returning a StepOutcome (Continue, Succeed, Fail, Cancel). External cancellation is supported via WorkflowRuntime::cancel. It is not:

  • A DAG executor. There’s no declarative graph definition, no built-in fan-out / fan-in, no dependency-driven scheduling.
  • An event-sourced workflow engine. There’s no event-history replay, no per-side-effect recording.

§Single-process by design

The submission API and worker pool live in the same binary and share one Arc<Queue>.

§Quick start

use std::sync::Arc;
use taquba::{Queue, object_store::memory::InMemory};
use taquba_workflow::{
    NoopTerminalHook, RunSpec, Step, StepError, StepOutcome, StepRunner, WorkflowRuntime,
};

struct EchoRunner;

impl StepRunner for EchoRunner {
    async fn run_step(&self, step: &Step) -> Result<StepOutcome, StepError> {
        Ok(StepOutcome::Succeed { result: step.payload.clone() })
    }
}

let queue = Arc::new(Queue::open(Arc::new(InMemory::new()), "demo").await?);

let runtime = WorkflowRuntime::builder(queue, EchoRunner, NoopTerminalHook).build();

let runtime_for_worker = runtime.clone();
tokio::spawn(async move {
    runtime_for_worker.run(std::future::pending::<()>()).await
});

let handle = runtime.submit(RunSpec {
    input: b"hello".to_vec(),
    ..Default::default()
}).await?;
println!("submitted run {}", handle.run_id);

§Cancellation

Call WorkflowRuntime::cancel to cancel an active run from outside the runner:

  • If the current step is pending or scheduled, the queued step job is removed and the terminal hook fires from the cancel call before it returns.

  • If the current step is running, cancellation is delivered via Step::cancel_token (a tokio_util::sync::CancellationToken). Runners that watch the token can short-circuit immediately:

    tokio::select! {
        out = call_llm(step) => out,
        _ = step.cancel_token.cancelled() => {
            Ok(StepOutcome::Cancel { reason: "cooperative".into() })
        }
    }

    Runners that ignore the token are allowed to run to completion (futures cannot be safely aborted mid-step). In both cases the runner’s StepOutcome is discarded, any pending transient retry is suppressed, and the worker fires the terminal hook with TerminalStatus::Cancelled once the step returns. Watching the token only reduces cancellation latency for slow steps; it doesn’t change semantics.

While termination is in flight, WorkflowRuntime::status reports a RunState::Cancelling overlay until the entry is dropped.

cancel returns Ok(false) if the run is unknown or already terminal in this runtime. It only reaches runs submitted to this WorkflowRuntime instance; a second runtime in the same process (sharing the queue) maintains its own registry.

§Idempotency model

Each step is enqueued with taquba::EnqueueOptions::dedup_key of "run:{run_id}:{step_number}". This guarantees that no two pending or scheduled jobs exist for the same (run_id, step_number) at the same time. Taquba is at-least-once though, so a step can still be claimed and executed more than once if its lease expires before ack: implementations of StepRunner must be idempotent for the same input.

§Reserved headers

Step jobs reserve the workflow.* header prefix; concretely HEADER_RUN_ID and HEADER_STEP are set by the runtime on every step. Submitter-supplied headers must not start with workflow.; submission rejects them. All other user headers are threaded through every step and surfaced to the TerminalHook.

Structs§

NoopTerminalHook
A no-op terminal hook. Useful when the user only cares about run observation via tracing or external state.
RunHandle
Returned by WorkflowRuntime::submit.
RunOutcome
Information passed to a TerminalHook when a run reaches a terminal state.
RunSpec
Spec passed to WorkflowRuntime::submit.
RunStatus
In-memory status snapshot for an active run. Returned by WorkflowRuntime::status. Terminal runs are not retained; once the terminal hook fires, the registry entry is removed.
Step
A single step within a workflow run, handed to StepRunner::run_step.
StepError
Failure outcomes the runner can return.
WebhookTerminalHook
Convenience terminal hook that fires an HTTP webhook delivery via taquba-webhooks whenever a run terminates.
WorkflowRuntime
Durable runtime for workflow runs. Cheap to clone (internally Arc).
WorkflowRuntimeBuilder
Builder for WorkflowRuntime.

Enums§

Error
Errors returned by the runtime’s submission and worker paths.
RunState
Lifecycle state tracked in RunStatus::state.
StepErrorKind
Whether a StepError should retry or fail the run.
StepOutcome
What the runner wants the runtime to do after this step.
TerminalStatus
Terminal state of a workflow run, passed to a TerminalHook.

Constants§

HEADER_RUN_ID
Header key carrying the run identifier on every step job.
HEADER_STEP
Header key carrying the zero-based step number on every step job.
RESERVED_HEADER_PREFIX
Reserved prefix the runtime owns on step-job headers. Submitter-supplied headers must not start with this prefix; if they do, the runtime treats them as its own and strips them before invoking the runner.

Traits§

StepRunner
User-implemented logic that advances a single workflow step.
TerminalHook
User-implemented hook fired once per run when the run reaches a terminal state.

Type Aliases§

Result
Result alias used throughout the crate.