1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
//! Durable, at-least-once workflow runtime on top of the [Taquba] task queue.
//!
//! `taquba-workflow` is the plumbing for any multi-step process that
//! benefits from durable state between steps: idempotent step execution,
//! retries with backoff, graceful shutdown / restart, and terminal-state
//! notifications. Implement [`StepRunner`] with bytes-in / bytes-out
//! per-step logic and the runtime persists everything else.
//!
//! It's particularly well-suited for **AI agent runs**, where each step is
//! one LLM call (or one full agent loop) and a process restart between
//! steps shouldn't lose expensive intermediate work. See
//! `examples/rig_agent.rs` for a Rig integration. The runtime itself is
//! framework-neutral: equally usable for ETL pipelines, document
//! processing, payment flows, etc.
//!
//! # What this is / isn't
//!
//! `taquba-workflow` is an **imperative step orchestrator**: at each step,
//! the runner code decides what happens next by returning a
//! [`StepOutcome`] (Continue, Succeed, Fail, Cancel). External cancellation
//! is supported via [`WorkflowRuntime::cancel`]. It is *not*:
//!
//! - **A DAG executor.** There's no declarative graph definition, no
//! built-in fan-out / fan-in, no dependency-driven scheduling.
//! - **An event-sourced workflow engine.** There's no event-history
//! replay, no per-side-effect recording.
//!
//! # Single-process by design
//!
//! The submission API and worker pool live in the same binary and share one
//! `Arc<Queue>`.
//!
//! # Configuring the queue
//!
//! Per-queue retention ([`taquba::QueueConfig::keep_done_jobs`] and
//! [`taquba::QueueConfig::dead_retention`]) is set on the
//! [`taquba::Queue`] before it's handed to the runtime. Pick an explicit
//! name via [`WorkflowRuntimeBuilder::queue_name`] and key
//! [`taquba::OpenOptions::queue_configs`] on the same string.
//!
//! ```no_run
//! # use std::collections::HashMap;
//! # use std::sync::Arc;
//! # use std::time::Duration;
//! # use taquba::{OpenOptions, Queue, QueueConfig, object_store::memory::InMemory};
//! # use taquba_workflow::{NoopTerminalHook, StepError, StepOutcome, StepRunner, WorkflowRuntime, Step};
//! # struct EchoRunner;
//! # impl StepRunner for EchoRunner {
//! # async fn run_step(&self, step: &Step) -> Result<StepOutcome, StepError> {
//! # Ok(StepOutcome::Succeed { result: step.payload.clone() })
//! # }
//! # }
//! # async fn run() -> taquba_workflow::Result<()> {
//! let store = Arc::new(InMemory::new());
//! let opts = OpenOptions {
//! queue_configs: HashMap::from([(
//! "agent-runs".to_string(),
//! QueueConfig {
//! keep_done_jobs: Some(Duration::from_secs(24 * 60 * 60)),
//! ..QueueConfig::default()
//! },
//! )]),
//! ..OpenOptions::default()
//! };
//! let queue = Arc::new(Queue::open_with_options(store.clone(), "db", opts).await?);
//! let runtime = WorkflowRuntime::builder(queue, store, EchoRunner, NoopTerminalHook)
//! .queue_name("agent-runs") // same string as in queue_configs
//! .build();
//! # let _ = runtime;
//! # Ok(()) }
//! ```
//!
//! # Quick start
//!
//! ```no_run
//! use std::sync::Arc;
//! use taquba::{Queue, object_store::memory::InMemory};
//! use taquba_workflow::{
//! NoopTerminalHook, RunSpec, Step, StepError, StepOutcome, StepRunner, WorkflowRuntime,
//! };
//!
//! struct EchoRunner;
//!
//! impl StepRunner for EchoRunner {
//! async fn run_step(&self, step: &Step) -> Result<StepOutcome, StepError> {
//! Ok(StepOutcome::Succeed { result: step.payload.clone() })
//! }
//! }
//!
//! # async fn run() -> Result<(), Box<dyn std::error::Error>> {
//! let store = Arc::new(InMemory::new());
//! let queue = Arc::new(Queue::open(store.clone(), "demo").await?);
//!
//! let runtime = WorkflowRuntime::builder(queue, store, EchoRunner, NoopTerminalHook).build();
//!
//! let runtime_for_worker = runtime.clone();
//! tokio::spawn(async move {
//! runtime_for_worker.run(std::future::pending::<()>()).await
//! });
//!
//! let outcome = runtime.submit(RunSpec {
//! input: b"hello".to_vec(),
//! ..Default::default()
//! }).await?;
//! println!("submitted run {}", outcome.run_id);
//! # Ok(()) }
//! ```
//!
//! # Cancellation
//!
//! Call [`WorkflowRuntime::cancel`] to cancel an active run from outside
//! the runner:
//!
//! - If the current step is **pending or scheduled**, the queued step job
//! is removed and the terminal hook fires from the `cancel` call before
//! it returns.
//! - If the current step is **running**, cancellation is delivered via
//! [`Step::cancel_token`] (a `tokio_util::sync::CancellationToken`).
//! Runners that watch the token can short-circuit immediately:
//!
//! ```ignore
//! tokio::select! {
//! out = call_llm(step) => out,
//! _ = step.cancel_token.cancelled() => {
//! Ok(StepOutcome::Cancel { reason: "cooperative".into() })
//! }
//! }
//! ```
//!
//! Runners that ignore the token are allowed to run to completion
//! (futures cannot be safely aborted mid-step). In both cases the
//! runner's [`StepOutcome`] is discarded, any pending transient retry
//! is suppressed, and the worker fires the terminal hook with
//! [`TerminalStatus::Cancelled`] once the step returns. Watching the
//! token only reduces cancellation latency for slow steps; it doesn't
//! change semantics.
//!
//! While termination is in flight, [`WorkflowRuntime::status`] reports a
//! [`RunState::Cancelling`] overlay until the entry is dropped.
//!
//! `cancel` returns `Ok(false)` if the run is unknown or already
//! terminal in this runtime. It only reaches runs submitted to this
//! [`WorkflowRuntime`] instance; a second runtime in the same process
//! (sharing the queue) maintains its own registry.
//!
//! # Idempotency model
//!
//! Each step is enqueued with [`taquba::EnqueueOptions::dedup_key`] of
//! `"run:{run_id}:{step_number}"`. This guarantees that no two pending or
//! scheduled jobs exist for the same `(run_id, step_number)` at the same
//! time. Taquba is at-least-once though, so a step can still be claimed and
//! executed more than once if its lease expires before ack: implementations
//! of [`StepRunner`] must be idempotent for the same input.
//!
//! # Memoizing within-step side effects
//!
//! Because retries can re-execute a step, expensive non-idempotent side
//! effects (LLM calls, paid APIs, multi-stage processing) need a place
//! to record their result so retries observe the cached value instead
//! of paying twice. [`Step::memo`] is a per-step durable key-value
//! store scoped to `(run_id, step_number)`:
//!
//! ```ignore
//! // Inside StepRunner::run_step:
//! if let Some(cached) = step.memo.get("draft").await? {
//! return Ok(StepOutcome::Succeed { result: cached });
//! }
//! let draft = expensive_call(&step.payload).await?;
//! step.memo.put("draft", &draft).await?;
//! Ok(StepOutcome::Succeed { result: draft })
//! ```
//!
//! Memo entries live in the object store passed to
//! [`WorkflowRuntime::builder`] under the path prefix configured by
//! [`WorkflowRuntimeBuilder::memo_prefix`] (default `"workflow-memo"`).
//! `Memo` is strictly per-step; the durable channel between steps is
//! [`StepOutcome::Continue`]'s payload, not memo.
//!
//! # Memo retention
//!
//! By default memo entries are retained indefinitely (appropriate for
//! short-lived runs or workloads that manage cleanup externally). To
//! enable automatic cleanup, configure a retention window via
//! [`WorkflowRuntimeBuilder::memo_retention`]:
//!
//! ```ignore
//! let runtime = WorkflowRuntime::builder(queue, store, runner, hook)
//! .memo_retention(Duration::from_secs(24 * 60 * 60))
//! .build();
//! ```
//!
//! When retention is set, the runtime writes a small terminal marker
//! for every terminal state (Succeeded, Failed, Cancelled) and
//! [`WorkflowRuntime::run`] spawns a background sweeper that lists
//! those markers and clears the memos plus marker for any run whose
//! marker is older than the retention window. The first sweep fires
//! on startup so a restarted process catches markers left behind by
//! an earlier one.
//!
//! Advanced cleanup policies (selective retention, externally-driven
//! sweeps) can be built directly on [`MemoStore::list_terminal_markers`],
//! [`MemoStore::clear_memos_for_run`], and
//! [`MemoStore::delete_terminal_marker`] without configuring
//! [`WorkflowRuntimeBuilder::memo_retention`].
//!
//! # Time injection
//!
//! Every timestamp the runtime writes (the `submitted_at_ms` on
//! the durable per-run record, the `run_at` it computes when a
//! step returns [`StepOutcome::ContinueAfter`], and the terminal
//! marker timestamps the memo-retention sweep consumes) is read
//! through a [`taquba::Clock`] rather than `SystemTime::now()`. By
//! default the runtime inherits the clock its [`taquba::Queue`]
//! was opened with, so passing a [`taquba::MockClock`] to
//! [`taquba::OpenOptions::clock`] virtualises both the queue and
//! the workflow runtime in lockstep:
//!
//! ```rust,ignore
//! let clock = MockClock::new(1_700_000_000_000);
//! let opts = OpenOptions {
//! clock: Arc::new(clock.clone()),
//! ..OpenOptions::default()
//! };
//! let queue = Queue::open_with_options(store.clone(), "db", opts).await?;
//! let runtime = WorkflowRuntime::builder(queue, store, runner, hook).build();
//! // `runtime` reads the same clock as `queue`; `clock.advance(...)`
//! // moves every time-based decision the runtime makes.
//! ```
//!
//! Override the inherited default via
//! [`WorkflowRuntimeBuilder::clock`] when a test or specialised
//! setup needs the runtime on a different time source than the
//! queue. The common case for production callers is to leave the
//! default and let the queue's `SystemClock` flow through.
//!
//! This makes downstream tests deterministic:
//! [`StepOutcome::ContinueAfter`] delays, memo-retention sweep
//! eligibility, and terminal-marker ages all advance under
//! explicit `MockClock::advance` calls rather than wall-clock
//! waits.
//!
//! # Duplicate submissions
//!
//! [`WorkflowRuntime::submit`] is idempotent on `(run_id, spec.input)`.
//! A re-submission of an active run that carries the same input is a
//! no-op and the returned [`SubmitOutcome`] has `newly_submitted = false`.
//! A re-submission that carries a *different* input is rejected with
//! [`Error::InputMismatch`]: reusing a `run_id` with new content is a
//! programmer error; pick a fresh `run_id` for a new run.
//!
//! Duplicates are caught from two sources, in order:
//!
//! 1. An in-process registry catches duplicates within the same runtime.
//! 2. A **durable per-run record** written atomically with the step-0
//! enqueue (via [`taquba::Queue::enqueue_with_kv`]) catches
//! duplicates across process restarts, even after step 0 has been
//! claimed and its dedup key released. The record carries a SHA-256
//! of the original input so the cross-restart mismatch check works
//! even when the in-memory registry is empty. The record is cleaned
//! up when the run reaches a terminal state.
//!
//! # Reserved headers
//!
//! Step jobs reserve the `workflow.*` header prefix; concretely
//! [`HEADER_RUN_ID`] and [`HEADER_STEP`] are set by the runtime on every
//! step. Submitter-supplied headers must not start with `workflow.`; submission
//! rejects them. All other user headers are threaded through every step and
//! surfaced to the [`TerminalHook`].
//!
//! [Taquba]: https://docs.rs/taquba
pub use ;
pub use ;
pub use ;
pub use ;
pub use WebhookTerminalHook;
pub use ;