taquba-workflow 0.3.0

Durable, at-least-once workflow runtime on top of the Taquba task queue. Particularly well-suited for AI agent runs.
docs.rs failed to build taquba-workflow-0.3.0
Please check the build logs for more information.
See Builds for ideas on how to fix a failed build, or Metadata for how to configure docs.rs builds.
If you believe this is docs.rs' fault, open an issue.
Visit the last successful build: taquba-workflow-0.2.0

taquba-workflow

Durable, at-least-once workflow runtime on top of the Taquba durable task queue.

taquba-workflow is the plumbing for any multi-step process that benefits from durable state between steps: idempotent step execution, retries with backoff, graceful restart, and terminal-state notifications. Implement StepRunner with bytes-in / bytes-out per-step logic; the runtime persists everything else.

Particularly well-suited for AI agent runs (see examples/rig_agent.rs for a Rig integration), but the runtime itself is framework-neutral and equally usable for ETL pipelines, document processing, payment flows, etc.

What this is / isn't

taquba-workflow is an imperative step orchestrator: at each step the runner decides what happens next via StepOutcome (Continue, Succeed, Fail, Cancel). External cancellation is supported via WorkflowRuntime::cancel. It is not:

  • A DAG executor. There's no declarative graph, no fan-out / fan-in, no dependency-driven scheduling.
  • An event-sourced workflow engine. There's no event-history replay, no per-side-effect recording.

Install

cargo add taquba-workflow taquba
cargo add tokio --features full

Enable the webhooks feature for WebhookTerminalHook:

cargo add taquba-workflow --features webhooks

Quick start

use std::sync::Arc;
use taquba::{Queue, object_store::memory::InMemory};
use taquba_workflow::{
    NoopTerminalHook, RunSpec, Step, StepError, StepOutcome, StepRunner, WorkflowRuntime,
};

struct EchoRunner;

impl StepRunner for EchoRunner {
    async fn run_step(&self, step: &Step) -> Result<StepOutcome, StepError> {
        Ok(StepOutcome::Succeed { result: step.payload.clone() })
    }
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let queue = Arc::new(Queue::open(Arc::new(InMemory::new()), "demo").await?);

    let runtime = WorkflowRuntime::builder(queue, EchoRunner, NoopTerminalHook).build();

    let worker = runtime.clone();
    tokio::spawn(async move { worker.run(std::future::pending::<()>()).await });

    let handle = runtime.submit(RunSpec {
        input: b"hello".to_vec(),
        ..Default::default()
    }).await?;
    println!("submitted run {}", handle.run_id);
    Ok(())
}

Examples

cargo run -p taquba-workflow --example single_step
cargo run -p taquba-workflow --example multi_step
ANTHROPIC_API_KEY=... cargo run -p taquba-workflow --example rig_agent
OPENAI_API_KEY=...    cargo run -p taquba-workflow --example rig_agent

rig_agent is a two-stage AI agent (research, then write) that demonstrates between-step durability: kill the process after step 0 and a fresh process resumes at step 1.

Step outcomes

Outcome Effect
StepOutcome::Continue { payload } Enqueue the next step immediately.
StepOutcome::ContinueAfter { payload, delay } Schedule the next step delay from now.
StepOutcome::Succeed { result } Ack; terminal hook fires Succeeded.
StepOutcome::Fail { reason } Ack; terminal hook fires Failed. Runner verdict: no dead-letter.
StepOutcome::Cancel { reason } Ack; terminal hook fires Cancelled. Runner verdict: no dead-letter.
Err(StepError::transient(_)) Retry per backoff up to max_attempts, then dead-letter.
Err(StepError::permanent(_)) Dead-letter immediately.

StepOutcome::Fail / StepOutcome::Cancel vs Err(StepError::permanent): runner verdicts ack normally; an infrastructure error dead-letters so operators can find it via queue.dead_jobs().

Cancellation

Call WorkflowRuntime::cancel(run_id) to cancel an active run from outside the runner:

  • If the current step is pending or scheduled, the queued step job is removed and the terminal hook fires from the cancel call before it returns.

  • If the current step is running, cancellation is delivered via Step::cancel_token (a tokio_util::sync::CancellationToken). Runners that watch the token can short-circuit immediately:

    tokio::select! {
        out = call_llm(step) => out,
        _ = step.cancel_token.cancelled() => {
            Ok(StepOutcome::Cancel { reason: "cooperative".into() })
        }
    }
    

    Runners that ignore the token are allowed to run to completion (futures cannot be safely aborted mid-step). In both cases the runner's StepOutcome is discarded, any pending transient retry is suppressed, and the worker fires the terminal hook with Cancelled once the step returns. Watching the token only reduces cancellation latency for slow steps; it doesn't change semantics.

While termination is in flight, WorkflowRuntime::status reports a RunState::Cancelling overlay until the entry is dropped.

Returns Ok(false) if the run is unknown or already terminal in this runtime. cancel only reaches runs submitted to this WorkflowRuntime instance; a second runtime in the same process (sharing the queue) maintains its own registry.

Reserved headers

Step jobs reserve the workflow.* prefix; submission rejects user headers starting with it. Other headers on RunSpec::headers thread through every step and reach the terminal hook on RunOutcome::headers.

Key Meaning
workflow.run_id Run identifier.
workflow.step Zero-based step number.

Idempotency

Each step is enqueued with dedup_key = "run:{run_id}:{step_number}", preventing concurrent duplicate steps. But Taquba is at-least-once: a step can be claimed and executed twice if its lease expires before ack. StepRunner impls must be idempotent for the same (run_id, step_number).

Duplicate submissions

WorkflowRuntime::submit rejects re-submissions of an active run_id from two sources, in order:

  1. An in-process registry catches duplicates within the same runtime.
  2. A durable per-run record written atomically with the step-0 enqueue (via Taquba's enqueue_with_kv) catches duplicates across process restarts, even after step 0 has been claimed and its dedup key released. The record is cleaned up when the run reaches a terminal state.

Both paths surface as Error::DuplicateRun.

Terminal hook

TerminalHook::on_termination fires once per run on Succeeded, Failed, or Cancelled, receiving the submitter's headers and the runner's result or error. WebhookTerminalHook (behind the webhooks feature) fires HTTP callbacks via taquba-webhooks; set the per-run URL on RunSpec::headers["callback_url"].

License

Apache-2.0