Expand description
Durable, at-least-once workflow runtime on top of the Taquba task queue.
taquba-workflow is the plumbing for any multi-step process that
benefits from durable state between steps: idempotent step execution,
retries with backoff, graceful shutdown / restart, and terminal-state
notifications. Implement StepRunner with bytes-in / bytes-out
per-step logic and the runtime persists everything else.
It’s particularly well-suited for AI agent runs, where each step is
one LLM call (or one full agent loop) and a process restart between
steps shouldn’t lose expensive intermediate work. See
examples/rig_agent.rs for a Rig integration. The runtime itself is
framework-neutral: equally usable for ETL pipelines, document
processing, payment flows, etc.
§What this is / isn’t
taquba-workflow is an imperative step orchestrator: at each step,
the runner code decides what happens next by returning a
StepOutcome (Continue, Succeed, Fail, Cancel). External cancellation
is supported via WorkflowRuntime::cancel. It is not:
- A DAG executor. There’s no declarative graph definition, no built-in fan-out / fan-in, no dependency-driven scheduling.
- An event-sourced workflow engine. There’s no event-history replay, no per-side-effect recording.
§Single-process by design
The submission API and worker pool live in the same binary and share one
Arc<Queue>.
§Quick start
use std::sync::Arc;
use taquba::{Queue, object_store::memory::InMemory};
use taquba_workflow::{
NoopTerminalHook, RunSpec, Step, StepError, StepOutcome, StepRunner, WorkflowRuntime,
};
struct EchoRunner;
impl StepRunner for EchoRunner {
async fn run_step(&self, step: &Step) -> Result<StepOutcome, StepError> {
Ok(StepOutcome::Succeed { result: step.payload.clone() })
}
}
let queue = Arc::new(Queue::open(Arc::new(InMemory::new()), "demo").await?);
let runtime = WorkflowRuntime::builder(queue, EchoRunner, NoopTerminalHook).build();
let runtime_for_worker = runtime.clone();
tokio::spawn(async move {
runtime_for_worker.run(std::future::pending::<()>()).await
});
let handle = runtime.submit(RunSpec {
input: b"hello".to_vec(),
..Default::default()
}).await?;
println!("submitted run {}", handle.run_id);§Cancellation
Call WorkflowRuntime::cancel to cancel an active run from outside
the runner:
-
If the current step is pending or scheduled, the queued step job is removed and the terminal hook fires from the
cancelcall before it returns. -
If the current step is running, cancellation is delivered via
Step::cancel_token(atokio_util::sync::CancellationToken). Runners that watch the token can short-circuit immediately:ⓘtokio::select! { out = call_llm(step) => out, _ = step.cancel_token.cancelled() => { Ok(StepOutcome::Cancel { reason: "cooperative".into() }) } }Runners that ignore the token are allowed to run to completion (futures cannot be safely aborted mid-step). In both cases the runner’s
StepOutcomeis discarded, any pending transient retry is suppressed, and the worker fires the terminal hook withTerminalStatus::Cancelledonce the step returns. Watching the token only reduces cancellation latency for slow steps; it doesn’t change semantics.
While termination is in flight, WorkflowRuntime::status reports a
RunState::Cancelling overlay until the entry is dropped.
cancel returns Ok(false) if the run is unknown or already
terminal in this runtime. It only reaches runs submitted to this
WorkflowRuntime instance; a second runtime in the same process
(sharing the queue) maintains its own registry.
§Idempotency model
Each step is enqueued with taquba::EnqueueOptions::dedup_key of
"run:{run_id}:{step_number}". This guarantees that no two pending or
scheduled jobs exist for the same (run_id, step_number) at the same
time. Taquba is at-least-once though, so a step can still be claimed and
executed more than once if its lease expires before ack: implementations
of StepRunner must be idempotent for the same input.
§Reserved headers
Step jobs reserve the workflow.* header prefix; concretely
HEADER_RUN_ID and HEADER_STEP are set by the runtime on every
step. Submitter-supplied headers must not start with workflow.; submission
rejects them. All other user headers are threaded through every step and
surfaced to the TerminalHook.
Structs§
- Noop
Terminal Hook - A no-op terminal hook. Useful when the user only cares about run observation via tracing or external state.
- RunHandle
- Returned by
WorkflowRuntime::submit. - RunOutcome
- Information passed to a
TerminalHookwhen a run reaches a terminal state. - RunSpec
- Spec passed to
WorkflowRuntime::submit. - RunStatus
- In-memory status snapshot for an active run. Returned by
WorkflowRuntime::status. Terminal runs are not retained; once the terminal hook fires, the registry entry is removed. - Step
- A single step within a workflow run, handed to
StepRunner::run_step. - Step
Error - Failure outcomes the runner can return.
- Webhook
Terminal Hook - Convenience terminal hook that fires an HTTP webhook delivery via
taquba-webhookswhenever a run terminates. - Workflow
Runtime - Durable runtime for workflow runs. Cheap to clone (internally
Arc). - Workflow
Runtime Builder - Builder for
WorkflowRuntime.
Enums§
- Error
- Errors returned by the runtime’s submission and worker paths.
- RunState
- Lifecycle state tracked in
RunStatus::state. - Step
Error Kind - Whether a
StepErrorshould retry or fail the run. - Step
Outcome - What the runner wants the runtime to do after this step.
- Terminal
Status - Terminal state of a workflow run, passed to a
TerminalHook.
Constants§
- HEADER_
RUN_ ID - Header key carrying the run identifier on every step job.
- HEADER_
STEP - Header key carrying the zero-based step number on every step job.
- RESERVED_
HEADER_ PREFIX - Reserved prefix the runtime owns on step-job headers. Submitter-supplied headers must not start with this prefix; if they do, the runtime treats them as its own and strips them before invoking the runner.
Traits§
- Step
Runner - User-implemented logic that advances a single workflow step.
- Terminal
Hook - User-implemented hook fired once per run when the run reaches a terminal state.
Type Aliases§
- Result
- Result alias used throughout the crate.