Skip to main content

taquba_workflow/
lib.rs

1//! Durable, at-least-once workflow runtime on top of the [Taquba] task queue.
2//!
3//! `taquba-workflow` is the plumbing for any multi-step process that
4//! benefits from durable state between steps: idempotent step execution,
5//! retries with backoff, graceful shutdown / restart, and terminal-state
6//! notifications. Implement [`StepRunner`] with bytes-in / bytes-out
7//! per-step logic and the runtime persists everything else.
8//!
9//! It's particularly well-suited for **AI agent runs**, where each step is
10//! one LLM call (or one full agent loop) and a process restart between
11//! steps shouldn't lose expensive intermediate work. See
12//! `examples/rig_agent.rs` for a Rig integration. The runtime itself is
13//! framework-neutral: equally usable for ETL pipelines, document
14//! processing, payment flows, etc.
15//!
16//! # What this is / isn't
17//!
18//! `taquba-workflow` is an **imperative step orchestrator**: at each step,
19//! the runner code decides what happens next by returning a
20//! [`StepOutcome`] (Continue, Succeed, Fail, Cancel). External cancellation
21//! is supported via [`WorkflowRuntime::cancel`]. It is *not*:
22//!
23//! - **A DAG executor.** There's no declarative graph definition, no
24//!   built-in fan-out / fan-in, no dependency-driven scheduling.
25//! - **An event-sourced workflow engine.** There's no event-history
26//!   replay, no per-side-effect recording.
27//!
28//! # Single-process by design
29//!
30//! The submission API and worker pool live in the same binary and share one
31//! `Arc<Queue>`.
32//!
33//! # Configuring the queue
34//!
35//! Per-queue retention ([`taquba::QueueConfig::keep_done_jobs`] and
36//! [`taquba::QueueConfig::dead_retention`]) is set on the
37//! [`taquba::Queue`] before it's handed to the runtime. Pick an explicit
38//! name via [`WorkflowRuntimeBuilder::queue_name`] and key
39//! [`taquba::OpenOptions::queue_configs`] on the same string.
40//!
41//! ```no_run
42//! # use std::collections::HashMap;
43//! # use std::sync::Arc;
44//! # use std::time::Duration;
45//! # use taquba::{OpenOptions, Queue, QueueConfig, object_store::memory::InMemory};
46//! # use taquba_workflow::{NoopTerminalHook, StepError, StepOutcome, StepRunner, WorkflowRuntime, Step};
47//! # struct EchoRunner;
48//! # impl StepRunner for EchoRunner {
49//! #     async fn run_step(&self, step: &Step) -> Result<StepOutcome, StepError> {
50//! #         Ok(StepOutcome::Succeed { result: step.payload.clone() })
51//! #     }
52//! # }
53//! # async fn run() -> taquba_workflow::Result<()> {
54//! let store = Arc::new(InMemory::new());
55//! let opts = OpenOptions {
56//!     queue_configs: HashMap::from([(
57//!         "agent-runs".to_string(),
58//!         QueueConfig {
59//!             keep_done_jobs: Some(Duration::from_secs(24 * 60 * 60)),
60//!             ..QueueConfig::default()
61//!         },
62//!     )]),
63//!     ..OpenOptions::default()
64//! };
65//! let queue = Arc::new(Queue::open_with_options(store, "db", opts).await?);
66//! let runtime = WorkflowRuntime::builder(queue, EchoRunner, NoopTerminalHook)
67//!     .queue_name("agent-runs") // same string as in queue_configs
68//!     .build();
69//! # let _ = runtime;
70//! # Ok(()) }
71//! ```
72//!
73//! # Quick start
74//!
75//! ```no_run
76//! use std::sync::Arc;
77//! use taquba::{Queue, object_store::memory::InMemory};
78//! use taquba_workflow::{
79//!     NoopTerminalHook, RunSpec, Step, StepError, StepOutcome, StepRunner, WorkflowRuntime,
80//! };
81//!
82//! struct EchoRunner;
83//!
84//! impl StepRunner for EchoRunner {
85//!     async fn run_step(&self, step: &Step) -> Result<StepOutcome, StepError> {
86//!         Ok(StepOutcome::Succeed { result: step.payload.clone() })
87//!     }
88//! }
89//!
90//! # async fn run() -> Result<(), Box<dyn std::error::Error>> {
91//! let queue = Arc::new(Queue::open(Arc::new(InMemory::new()), "demo").await?);
92//!
93//! let runtime = WorkflowRuntime::builder(queue, EchoRunner, NoopTerminalHook).build();
94//!
95//! let runtime_for_worker = runtime.clone();
96//! tokio::spawn(async move {
97//!     runtime_for_worker.run(std::future::pending::<()>()).await
98//! });
99//!
100//! let outcome = runtime.submit(RunSpec {
101//!     input: b"hello".to_vec(),
102//!     ..Default::default()
103//! }).await?;
104//! println!("submitted run {}", outcome.run_id);
105//! # Ok(()) }
106//! ```
107//!
108//! # Cancellation
109//!
110//! Call [`WorkflowRuntime::cancel`] to cancel an active run from outside
111//! the runner:
112//!
113//! - If the current step is **pending or scheduled**, the queued step job
114//!   is removed and the terminal hook fires from the `cancel` call before
115//!   it returns.
116//! - If the current step is **running**, cancellation is delivered via
117//!   [`Step::cancel_token`] (a `tokio_util::sync::CancellationToken`).
118//!   Runners that watch the token can short-circuit immediately:
119//!
120//!   ```ignore
121//!   tokio::select! {
122//!       out = call_llm(step) => out,
123//!       _ = step.cancel_token.cancelled() => {
124//!           Ok(StepOutcome::Cancel { reason: "cooperative".into() })
125//!       }
126//!   }
127//!   ```
128//!
129//!   Runners that ignore the token are allowed to run to completion
130//!   (futures cannot be safely aborted mid-step). In both cases the
131//!   runner's [`StepOutcome`] is discarded, any pending transient retry
132//!   is suppressed, and the worker fires the terminal hook with
133//!   [`TerminalStatus::Cancelled`] once the step returns. Watching the
134//!   token only reduces cancellation latency for slow steps; it doesn't
135//!   change semantics.
136//!
137//! While termination is in flight, [`WorkflowRuntime::status`] reports a
138//! [`RunState::Cancelling`] overlay until the entry is dropped.
139//!
140//! `cancel` returns `Ok(false)` if the run is unknown or already
141//! terminal in this runtime. It only reaches runs submitted to this
142//! [`WorkflowRuntime`] instance; a second runtime in the same process
143//! (sharing the queue) maintains its own registry.
144//!
145//! # Idempotency model
146//!
147//! Each step is enqueued with [`taquba::EnqueueOptions::dedup_key`] of
148//! `"run:{run_id}:{step_number}"`. This guarantees that no two pending or
149//! scheduled jobs exist for the same `(run_id, step_number)` at the same
150//! time. Taquba is at-least-once though, so a step can still be claimed and
151//! executed more than once if its lease expires before ack: implementations
152//! of [`StepRunner`] must be idempotent for the same input.
153//!
154//! # Duplicate submissions
155//!
156//! [`WorkflowRuntime::submit`] is idempotent on `(run_id, spec.input)`.
157//! A re-submission of an active run that carries the same input is a
158//! no-op and the returned [`SubmitOutcome`] has `newly_submitted = false`.
159//! A re-submission that carries a *different* input is rejected with
160//! [`Error::InputMismatch`]: reusing a `run_id` with new content is a
161//! programmer error; pick a fresh `run_id` for a new run.
162//!
163//! Duplicates are caught from two sources, in order:
164//!
165//! 1. An in-process registry catches duplicates within the same runtime.
166//! 2. A **durable per-run record** written atomically with the step-0
167//!    enqueue (via [`taquba::Queue::enqueue_with_kv`]) catches
168//!    duplicates across process restarts, even after step 0 has been
169//!    claimed and its dedup key released. The record carries a SHA-256
170//!    of the original input so the cross-restart mismatch check works
171//!    even when the in-memory registry is empty. The record is cleaned
172//!    up when the run reaches a terminal state.
173//!
174//! # Reserved headers
175//!
176//! Step jobs reserve the `workflow.*` header prefix; concretely
177//! [`HEADER_RUN_ID`] and [`HEADER_STEP`] are set by the runtime on every
178//! step. Submitter-supplied headers must not start with `workflow.`; submission
179//! rejects them. All other user headers are threaded through every step and
180//! surfaced to the [`TerminalHook`].
181//!
182//! [Taquba]: https://docs.rs/taquba
183
184#![warn(missing_docs)]
185
186mod error;
187mod runner;
188mod runtime;
189mod terminal;
190
191pub use error::{Error, Result};
192pub use runner::{Step, StepError, StepErrorKind, StepOutcome, StepRunner};
193pub use runtime::{
194    HEADER_RUN_ID, HEADER_STEP, RESERVED_HEADER_PREFIX, RunSpec, RunState, RunStatus,
195    SubmitOutcome, WorkflowRuntime, WorkflowRuntimeBuilder,
196};
197#[cfg(feature = "webhooks")]
198pub use terminal::WebhookTerminalHook;
199pub use terminal::{NoopTerminalHook, RunOutcome, TerminalHook, TerminalStatus};