Skip to main content

taquba_workflow/
lib.rs

1//! Durable, at-least-once workflow runtime on top of the [Taquba] task queue.
2//!
3//! `taquba-workflow` is the plumbing for any multi-step process that
4//! benefits from durable state between steps: idempotent step execution,
5//! retries with backoff, graceful shutdown / restart, and terminal-state
6//! notifications. Implement [`StepRunner`] with bytes-in / bytes-out
7//! per-step logic and the runtime persists everything else.
8//!
9//! It's particularly well-suited for **AI agent runs**, where each step is
10//! one LLM call (or one full agent loop) and a process restart between
11//! steps shouldn't lose expensive intermediate work. See
12//! `examples/rig_agent.rs` for a Rig integration. The runtime itself is
13//! framework-neutral: equally usable for ETL pipelines, document
14//! processing, payment flows, etc.
15//!
16//! # What this is / isn't
17//!
18//! `taquba-workflow` is an **imperative step orchestrator**: at each step,
19//! the runner code decides what happens next by returning a
20//! [`StepOutcome`] (Continue, Succeed, Fail, Cancel). External cancellation
21//! is supported via [`WorkflowRuntime::cancel`]. It is *not*:
22//!
23//! - **A DAG executor.** There's no declarative graph definition, no
24//!   built-in fan-out / fan-in, no dependency-driven scheduling.
25//! - **An event-sourced workflow engine.** There's no event-history
26//!   replay, no per-side-effect recording.
27//!
28//! # Single-process by design
29//!
30//! The submission API and worker pool live in the same binary and share one
31//! `Arc<Queue>`.
32//!
33//! # Quick start
34//!
35//! ```no_run
36//! use std::sync::Arc;
37//! use taquba::{Queue, object_store::memory::InMemory};
38//! use taquba_workflow::{
39//!     NoopTerminalHook, RunSpec, Step, StepError, StepOutcome, StepRunner, WorkflowRuntime,
40//! };
41//!
42//! struct EchoRunner;
43//!
44//! impl StepRunner for EchoRunner {
45//!     async fn run_step(&self, step: &Step) -> Result<StepOutcome, StepError> {
46//!         Ok(StepOutcome::Succeed { result: step.payload.clone() })
47//!     }
48//! }
49//!
50//! # async fn run() -> Result<(), Box<dyn std::error::Error>> {
51//! let queue = Arc::new(Queue::open(Arc::new(InMemory::new()), "demo").await?);
52//!
53//! let runtime = WorkflowRuntime::builder(queue, EchoRunner, NoopTerminalHook).build();
54//!
55//! let runtime_for_worker = runtime.clone();
56//! tokio::spawn(async move {
57//!     runtime_for_worker.run(std::future::pending::<()>()).await
58//! });
59//!
60//! let handle = runtime.submit(RunSpec {
61//!     input: b"hello".to_vec(),
62//!     ..Default::default()
63//! }).await?;
64//! println!("submitted run {}", handle.run_id);
65//! # Ok(()) }
66//! ```
67//!
68//! # Cancellation
69//!
70//! Call [`WorkflowRuntime::cancel`] to cancel an active run from outside
71//! the runner:
72//!
73//! - If the current step is **pending or scheduled**, the queued step job
74//!   is removed and the terminal hook fires from the `cancel` call before
75//!   it returns.
76//! - If the current step is **running**, cancellation is delivered via
77//!   [`Step::cancel_token`] (a `tokio_util::sync::CancellationToken`).
78//!   Runners that watch the token can short-circuit immediately:
79//!
80//!   ```ignore
81//!   tokio::select! {
82//!       out = call_llm(step) => out,
83//!       _ = step.cancel_token.cancelled() => {
84//!           Ok(StepOutcome::Cancel { reason: "cooperative".into() })
85//!       }
86//!   }
87//!   ```
88//!
89//!   Runners that ignore the token are allowed to run to completion
90//!   (futures cannot be safely aborted mid-step). In both cases the
91//!   runner's [`StepOutcome`] is discarded, any pending transient retry
92//!   is suppressed, and the worker fires the terminal hook with
93//!   [`TerminalStatus::Cancelled`] once the step returns. Watching the
94//!   token only reduces cancellation latency for slow steps; it doesn't
95//!   change semantics.
96//!
97//! While termination is in flight, [`WorkflowRuntime::status`] reports a
98//! [`RunState::Cancelling`] overlay until the entry is dropped.
99//!
100//! `cancel` returns `Ok(false)` if the run is unknown or already
101//! terminal in this runtime. It only reaches runs submitted to this
102//! [`WorkflowRuntime`] instance; a second runtime in the same process
103//! (sharing the queue) maintains its own registry.
104//!
105//! # Idempotency model
106//!
107//! Each step is enqueued with [`taquba::EnqueueOptions::dedup_key`] of
108//! `"run:{run_id}:{step_number}"`. This guarantees that no two pending or
109//! scheduled jobs exist for the same `(run_id, step_number)` at the same
110//! time. Taquba is at-least-once though, so a step can still be claimed and
111//! executed more than once if its lease expires before ack: implementations
112//! of [`StepRunner`] must be idempotent for the same input.
113//!
114//! # Reserved headers
115//!
116//! Step jobs reserve the `workflow.*` header prefix; concretely
117//! [`HEADER_RUN_ID`] and [`HEADER_STEP`] are set by the runtime on every
118//! step. Submitter-supplied headers must not start with `workflow.`; submission
119//! rejects them. All other user headers are threaded through every step and
120//! surfaced to the [`TerminalHook`].
121//!
122//! [Taquba]: https://docs.rs/taquba
123
124#![warn(missing_docs)]
125
126mod error;
127mod runner;
128mod runtime;
129mod terminal;
130
131pub use error::{Error, Result};
132pub use runner::{Step, StepError, StepErrorKind, StepOutcome, StepRunner};
133pub use runtime::{
134    HEADER_RUN_ID, HEADER_STEP, RESERVED_HEADER_PREFIX, RunHandle, RunSpec, RunState, RunStatus,
135    WorkflowRuntime, WorkflowRuntimeBuilder,
136};
137#[cfg(feature = "webhooks")]
138pub use terminal::WebhookTerminalHook;
139pub use terminal::{NoopTerminalHook, RunOutcome, TerminalHook, TerminalStatus};