1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
//! Fluent builder for constructing a [`Workflow`].
//!
//! Use [`WorkflowBuilder::new`] to start, chain configuration methods, and
//! call [`WorkflowBuilder::build`] to produce a validated [`Workflow`].
use std::collections::HashMap;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::time::Duration;
use blazen_events::{InputRequestEvent, InputResponseEvent};
use blazen_llm::retry::RetryConfig;
use crate::error::WorkflowError;
use crate::step::{ParallelSubWorkflowsStep, StepKind, StepRegistration, SubWorkflowStep};
use crate::workflow::Workflow;
/// Async callback for handling input requests inline (without pausing).
///
/// When registered on a [`WorkflowBuilder`], the event loop will invoke this
/// callback instead of auto-pausing when an [`InputRequestEvent`] arrives.
/// The callback should return an [`InputResponseEvent`] which will be
/// injected back into the event queue.
pub type InputHandlerFn = Arc<
dyn Fn(
InputRequestEvent,
)
-> Pin<Box<dyn Future<Output = Result<InputResponseEvent, WorkflowError>> + Send>>
+ Send
+ Sync,
>;
/// Fluent builder for constructing a [`Workflow`].
pub struct WorkflowBuilder {
name: String,
steps: Vec<StepKind>,
timeout: Option<Duration>,
/// Default retry configuration applied to LLM calls inside this
/// workflow. Step / per-call overrides take precedence; pipeline /
/// provider defaults take lower precedence.
retry_config: Option<Arc<RetryConfig>>,
/// Optional inline handler for input requests (HITL without pausing).
input_handler: Option<InputHandlerFn>,
/// Whether to automatically publish lifecycle events to the broadcast stream.
auto_publish_events: bool,
/// Policy applied to live session references when the workflow is
/// paused or snapshotted.
session_pause_policy: crate::session_ref::SessionPausePolicy,
/// Checkpoint store for durable persistence (requires `persist` feature).
#[cfg(feature = "persist")]
checkpoint_store: Option<Arc<dyn blazen_persist::CheckpointStore>>,
/// Whether to automatically checkpoint after each step completes.
#[cfg(feature = "persist")]
checkpoint_after_step: bool,
/// Whether to collect an append-only history of workflow events (requires `telemetry` feature).
#[cfg(feature = "telemetry")]
collect_history: bool,
}
impl WorkflowBuilder {
/// Create a new builder with the given workflow name.
#[must_use]
pub fn new(name: impl Into<String>) -> Self {
Self {
name: name.into(),
steps: Vec::new(),
timeout: Some(Duration::from_mins(5)),
retry_config: None,
input_handler: None,
auto_publish_events: true,
session_pause_policy: crate::session_ref::SessionPausePolicy::default(),
#[cfg(feature = "persist")]
checkpoint_store: None,
#[cfg(feature = "persist")]
checkpoint_after_step: false,
#[cfg(feature = "telemetry")]
collect_history: false,
}
}
/// Register a regular handler-backed step.
#[must_use]
pub fn step(mut self, registration: StepRegistration) -> Self {
self.steps.push(StepKind::Regular(registration));
self
}
/// Register a sub-workflow step that runs another [`Workflow`] as
/// its handler.
#[must_use]
pub fn add_subworkflow_step(mut self, step: SubWorkflowStep) -> Self {
self.steps.push(StepKind::SubWorkflow(step));
self
}
/// Register a parallel sub-workflow fan-out step.
#[must_use]
pub fn add_parallel_subworkflows(mut self, step: ParallelSubWorkflowsStep) -> Self {
self.steps.push(StepKind::ParallelSubWorkflows(step));
self
}
/// Borrow the most recently registered regular step's timeout slot
/// for mutation. Panics with `caller`'s name if no step is
/// registered yet, or if the most recent step is not a regular
/// (handler-backed) step.
fn last_regular_timeout_mut(&mut self, caller: &str) -> &mut Option<Duration> {
match self.steps.last_mut() {
Some(StepKind::Regular(reg)) => &mut reg.timeout,
Some(StepKind::SubWorkflow(step)) => &mut step.timeout,
Some(StepKind::ParallelSubWorkflows(_)) => panic!(
"{caller}() is not supported on a ParallelSubWorkflows step; \
set per-branch timeouts on each SubWorkflowStep instead"
),
None => panic!(
"{caller}() called before any step was registered; \
call .step(...) or .add_subworkflow_step(...) first"
),
}
}
/// Borrow the most recently registered step's retry-config slot for
/// mutation. Panics with `caller`'s name if no step is registered
/// yet, or if the most recent step does not carry a retry slot.
fn last_retry_slot_mut(&mut self, caller: &str) -> &mut Option<Arc<RetryConfig>> {
match self.steps.last_mut() {
Some(StepKind::Regular(reg)) => &mut reg.retry_config,
Some(StepKind::SubWorkflow(step)) => &mut step.retry_config,
Some(StepKind::ParallelSubWorkflows(_)) => panic!(
"{caller}() is not supported on a ParallelSubWorkflows step; \
set per-branch retries on each SubWorkflowStep instead"
),
None => panic!(
"{caller}() called before any step was registered; \
call .step(...) or .add_subworkflow_step(...) first"
),
}
}
/// Set the timeout on the most recently registered step.
///
/// # Panics
///
/// Panics if no step has been registered yet, or if the most
/// recently registered step is a `ParallelSubWorkflows` (set
/// per-branch timeouts instead).
#[must_use]
pub fn step_timeout(mut self, timeout: Duration) -> Self {
*self.last_regular_timeout_mut("step_timeout") = Some(timeout);
self
}
/// Clear the timeout on the most recently registered step (default).
///
/// # Panics
///
/// Panics if no step has been registered yet, or if the most
/// recently registered step is a `ParallelSubWorkflows`.
#[must_use]
pub fn no_step_timeout(mut self) -> Self {
*self.last_regular_timeout_mut("no_step_timeout") = None;
self
}
/// Set the retry config on the most recently registered step.
///
/// # Panics
///
/// Panics if no step has been registered yet, or if the most
/// recently registered step is a `ParallelSubWorkflows`.
#[must_use]
pub fn step_retry(mut self, config: RetryConfig) -> Self {
*self.last_retry_slot_mut("step_retry") = Some(Arc::new(config));
self
}
/// Disable retries on the most recently registered step.
///
/// # Panics
///
/// Panics if no step has been registered yet, or if the most
/// recently registered step is a `ParallelSubWorkflows`.
#[must_use]
pub fn no_step_retry(mut self) -> Self {
*self.last_retry_slot_mut("no_step_retry") = Some(Arc::new(RetryConfig {
max_retries: 0,
..RetryConfig::default()
}));
self
}
/// Set the maximum execution time for the workflow.
#[must_use]
pub fn timeout(mut self, timeout: Duration) -> Self {
self.timeout = Some(timeout);
self
}
/// Disable the execution timeout (workflow runs until `StopEvent`).
#[must_use]
pub fn no_timeout(mut self) -> Self {
self.timeout = None;
self
}
/// Set a default retry configuration for every LLM call inside this
/// workflow. Step / per-call overrides take precedence; pipeline /
/// provider defaults take lower precedence.
#[must_use]
pub fn retry_config(mut self, config: RetryConfig) -> Self {
self.retry_config = Some(Arc::new(config));
self
}
/// Disable workflow-level retries (`max_retries = 0`). Step / per-call
/// overrides still take precedence.
#[must_use]
pub fn no_retry(mut self) -> Self {
self.retry_config = Some(Arc::new(RetryConfig {
max_retries: 0,
..RetryConfig::default()
}));
self
}
/// Clear any workflow-level retry config.
#[must_use]
pub fn clear_retry_config(mut self) -> Self {
self.retry_config = None;
self
}
/// Register an inline handler for [`InputRequestEvent`]s.
///
/// When set, the event loop will call this handler instead of
/// auto-pausing when an input request arrives. The handler should
/// return an [`InputResponseEvent`] which is injected back into the
/// event queue, allowing the workflow to continue without interruption.
#[must_use]
pub fn input_handler(mut self, handler: InputHandlerFn) -> Self {
self.input_handler = Some(handler);
self
}
/// Enable or disable automatic publishing of lifecycle events to the
/// broadcast stream.
///
/// When enabled, the event loop will publish `DynamicEvent`s with type
/// `"blazen::lifecycle"` at key decision points (event routed, step
/// started, step completed, step failed) and a typed
/// [`ProgressEvent`](blazen_events::ProgressEvent) with
/// [`ProgressKind::Workflow`](blazen_events::ProgressKind::Workflow)
/// after each step completes. Consumers that subscribe via
/// [`WorkflowHandler::stream_events`](crate::WorkflowHandler::stream_events)
/// will receive these alongside any events published by steps.
///
/// Defaults to `true` — call this method with `false` to opt out
/// (e.g. for benchmarks or extremely event-noisy workflows).
#[must_use]
pub fn auto_publish_events(mut self, enabled: bool) -> Self {
self.auto_publish_events = enabled;
self
}
/// Configure the policy applied to live session references when the
/// workflow is paused or snapshotted. Defaults to `PickleOrError`.
#[must_use]
pub fn session_pause_policy(mut self, policy: crate::session_ref::SessionPausePolicy) -> Self {
self.session_pause_policy = policy;
self
}
/// Enable collection of an append-only history of workflow events.
///
/// When enabled, the event loop records a chronological log of
/// everything that happens during the workflow run: events received,
/// steps dispatched, steps completed/failed, pauses, and completion.
/// The history can be retrieved via
/// [`WorkflowHandler::collect_history`](crate::WorkflowHandler::collect_history)
/// after the workflow completes.
///
/// Requires the `telemetry` feature.
#[cfg(feature = "telemetry")]
#[must_use]
pub fn with_history(mut self) -> Self {
self.collect_history = true;
self
}
/// Set the checkpoint store for durable persistence.
///
/// When a checkpoint store is configured, the workflow can persist its
/// state to durable storage for crash recovery or migration.
///
/// Requires the `persist` feature.
#[cfg(feature = "persist")]
#[must_use]
pub fn checkpoint_store(mut self, store: Arc<dyn blazen_persist::CheckpointStore>) -> Self {
self.checkpoint_store = Some(store);
self
}
/// Enable or disable automatic checkpointing after each step completes.
///
/// When enabled (and a checkpoint store is configured), the workflow will
/// save a checkpoint after each event is dispatched to step handlers.
/// Checkpointing is best-effort: errors are logged but do not fail the
/// workflow.
///
/// Defaults to `false`.
///
/// Requires the `persist` feature.
#[cfg(feature = "persist")]
#[must_use]
pub fn checkpoint_after_step(mut self, enabled: bool) -> Self {
self.checkpoint_after_step = enabled;
self
}
/// Validate and build the workflow.
///
/// # Errors
///
/// Returns [`WorkflowError::ValidationFailed`] if no steps are registered.
pub fn build(self) -> crate::error::Result<Workflow> {
if self.steps.is_empty() {
return Err(WorkflowError::ValidationFailed(
"workflow must have at least one step".into(),
));
}
// Build the event-type -> handlers registry.
let mut registry: HashMap<String, Vec<StepKind>> = HashMap::new();
for step in self.steps {
for &event_type in step.accepts() {
registry
.entry(event_type.to_owned())
.or_default()
.push(step.clone());
}
}
Ok(Workflow {
name: self.name,
step_registry: registry,
timeout: self.timeout,
retry_config: self.retry_config,
input_handler: self.input_handler,
auto_publish_events: self.auto_publish_events,
session_pause_policy: self.session_pause_policy,
#[cfg(feature = "persist")]
checkpoint_store: self.checkpoint_store,
#[cfg(feature = "persist")]
checkpoint_after_step: self.checkpoint_after_step,
#[cfg(feature = "telemetry")]
collect_history: self.collect_history,
})
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::step::{StepFn, StepOutput, StepRegistration};
use blazen_events::{Event, StartEvent, StopEvent};
fn make_step(name: &str) -> StepRegistration {
let handler: StepFn = Arc::new(|_event, _ctx| {
Box::pin(async move {
Ok(StepOutput::Single(Box::new(StopEvent {
result: serde_json::json!(null),
})))
})
});
StepRegistration::new(
name.to_owned(),
vec![StartEvent::event_type()],
vec![StopEvent::event_type()],
handler,
0,
)
}
/// Helper: borrow the `StepRegistration` underlying a builder step,
/// asserting it is a `Regular` variant (the only one the legacy
/// tests construct).
fn as_reg(kind: &crate::step::StepKind) -> &StepRegistration {
match kind {
crate::step::StepKind::Regular(reg) => reg,
other => panic!("expected StepKind::Regular, got {other:?}"),
}
}
#[test]
fn step_timeout_sets_timeout_on_last_step() {
let builder = WorkflowBuilder::new("test")
.step(make_step("a"))
.step_timeout(Duration::from_millis(100));
let last = as_reg(builder.steps.last().expect("step registered"));
assert_eq!(last.timeout, Some(Duration::from_millis(100)));
}
#[test]
fn step_timeout_only_affects_most_recent_step() {
let builder = WorkflowBuilder::new("test")
.step(make_step("a"))
.step_timeout(Duration::from_millis(100))
.step(make_step("b"))
.step_timeout(Duration::from_millis(250));
assert_eq!(builder.steps.len(), 2);
assert_eq!(
as_reg(&builder.steps[0]).timeout,
Some(Duration::from_millis(100))
);
assert_eq!(
as_reg(&builder.steps[1]).timeout,
Some(Duration::from_millis(250))
);
}
#[test]
fn no_step_timeout_clears_timeout_on_last_step() {
let builder = WorkflowBuilder::new("test")
.step(make_step("a"))
.step_timeout(Duration::from_secs(1))
.no_step_timeout();
let last = as_reg(builder.steps.last().expect("step registered"));
assert!(last.timeout.is_none());
}
#[test]
#[should_panic(expected = "step_timeout() called before any step was registered")]
fn step_timeout_panics_without_step() {
let _ = WorkflowBuilder::new("test").step_timeout(Duration::from_millis(100));
}
#[test]
#[should_panic(expected = "no_step_timeout() called before any step was registered")]
fn no_step_timeout_panics_without_step() {
let _ = WorkflowBuilder::new("test").no_step_timeout();
}
#[test]
fn step_retry_sets_retry_config_on_last_step() {
let builder = WorkflowBuilder::new("test")
.step(make_step("a"))
.step_retry(RetryConfig {
max_retries: 9,
..RetryConfig::default()
});
let last = as_reg(builder.steps.last().expect("step registered"));
assert_eq!(last.retry_config.as_ref().unwrap().max_retries, 9);
}
#[test]
fn step_retry_only_affects_most_recent_step() {
let builder = WorkflowBuilder::new("test")
.step(make_step("a"))
.step_retry(RetryConfig {
max_retries: 3,
..RetryConfig::default()
})
.step(make_step("b"))
.step_retry(RetryConfig {
max_retries: 5,
..RetryConfig::default()
});
assert_eq!(builder.steps.len(), 2);
assert_eq!(
as_reg(&builder.steps[0])
.retry_config
.as_ref()
.unwrap()
.max_retries,
3
);
assert_eq!(
as_reg(&builder.steps[1])
.retry_config
.as_ref()
.unwrap()
.max_retries,
5
);
}
#[test]
fn no_step_retry_sets_zero_retries_on_last_step() {
let builder = WorkflowBuilder::new("test")
.step(make_step("a"))
.no_step_retry();
let last = as_reg(builder.steps.last().expect("step registered"));
assert_eq!(last.retry_config.as_ref().unwrap().max_retries, 0);
}
#[test]
#[should_panic(expected = "step_retry() called before any step was registered")]
fn step_retry_panics_without_step() {
let _ = WorkflowBuilder::new("test").step_retry(RetryConfig::default());
}
#[test]
#[should_panic(expected = "no_step_retry() called before any step was registered")]
fn no_step_retry_panics_without_step() {
let _ = WorkflowBuilder::new("test").no_step_retry();
}
}