1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
//! # Workflow Validation
//!
//! Run with: `cargo run --example workflow_validation`
//!
//! Demonstrates [`Workflow::validate`] and [`Workflow::validate_initial_state`] — the two
//! static checks that catch misconfigured workflows before any async execution happens.
//!
//! Three scenarios are shown:
//!
//! 1. **Happy path** — a well-formed workflow validates and runs cleanly.
//! 2. **Misconfigured split** — a `register_split` whose `join_state` is neither registered
//! nor an exit state; `validate()` catches the dangling transition target.
//! 3. **Bad initial state** — `validate_initial_state` rejects an unregistered start state.
//!
//! `validate()` is also called implicitly by `orchestrate` (cached via `OnceLock`), but
//! calling it explicitly lets you surface errors earlier — at build time or on startup —
//! rather than on the first run.
use cano::prelude::*;
// ---------------------------------------------------------------------------
// State enum
// ---------------------------------------------------------------------------
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
enum Step {
Prepare,
Process,
Aggregate,
Done,
// A state that we intentionally leave unregistered to provoke validation failures.
Orphan,
}
// ---------------------------------------------------------------------------
// Tasks
// ---------------------------------------------------------------------------
struct PrepareTask;
#[task(state = Step)]
impl PrepareTask {
async fn run_bare(&self) -> Result<TaskResult<Step>, CanoError> {
println!(" prepare: loading work items");
Ok(TaskResult::Single(Step::Process))
}
}
struct WorkerTask {
id: usize,
}
#[task(state = Step)]
impl WorkerTask {
async fn run_bare(&self) -> Result<TaskResult<Step>, CanoError> {
println!(" worker {}: done", self.id);
Ok(TaskResult::Single(Step::Aggregate))
}
}
struct AggregateTask;
#[task(state = Step)]
impl AggregateTask {
async fn run_bare(&self) -> Result<TaskResult<Step>, CanoError> {
println!(" aggregate: collecting results");
Ok(TaskResult::Single(Step::Done))
}
}
// ---------------------------------------------------------------------------
// Entry point
// ---------------------------------------------------------------------------
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
println!("=== Workflow Validation Demo ===\n");
// -----------------------------------------------------------------------
// Scenario 1: Happy path — validate() passes, orchestrate succeeds.
// -----------------------------------------------------------------------
println!("-- Scenario 1: well-formed workflow --");
{
let workers = vec![WorkerTask { id: 1 }, WorkerTask { id: 2 }];
let join_config = JoinConfig::new(JoinStrategy::All, Step::Aggregate);
let workflow = Workflow::bare()
.register(Step::Prepare, PrepareTask)
.register_split(Step::Process, workers, join_config)
.register(Step::Aggregate, AggregateTask)
.add_exit_state(Step::Done);
// Explicit pre-run validation — useful in service startup code.
match workflow.validate() {
Ok(()) => println!(" validate() -> Ok"),
Err(e) => println!(" validate() -> Err: {e}"),
}
// validate_initial_state lets you check that a specific entry point is sane.
match workflow.validate_initial_state(&Step::Prepare) {
Ok(()) => println!(" validate_initial_state(Prepare) -> Ok"),
Err(e) => println!(" validate_initial_state(Prepare) -> Err: {e}"),
}
let result = workflow.orchestrate(Step::Prepare).await?;
println!(" orchestrate -> {result:?}\n");
}
// -----------------------------------------------------------------------
// Scenario 2: Dangling split join_state — validate() rejects it.
//
// The split at `Process` targets `Step::Orphan` as its join_state, but
// `Orphan` is neither registered as a task handler nor declared as an exit
// state. Orchestration would always fail at runtime after the split
// completed — validate() surfaces this error immediately.
// -----------------------------------------------------------------------
println!("-- Scenario 2: split join_state is neither registered nor an exit state --");
{
let workers = vec![WorkerTask { id: 1 }, WorkerTask { id: 2 }];
// join_state points to an unregistered, non-exit state: Orphan.
let bad_join = JoinConfig::new(JoinStrategy::All, Step::Orphan);
let workflow = Workflow::bare()
.register(Step::Prepare, PrepareTask)
.register_split(Step::Process, workers, bad_join)
// Orphan is intentionally not registered and not added as an exit state.
.add_exit_state(Step::Done);
match workflow.validate() {
Ok(()) => println!(" validate() -> Ok (unexpected!)"),
Err(e) => println!(" validate() -> Err (expected): {e}"),
}
// Also no exit states at all — another validate() failure path.
let no_exit = Workflow::bare().register(Step::Prepare, PrepareTask);
match no_exit.validate() {
Ok(()) => println!(" no-exit-state validate() -> Ok (unexpected!)"),
Err(e) => println!(" no-exit-state validate() -> Err (expected): {e}"),
}
println!();
}
// -----------------------------------------------------------------------
// Scenario 3: validate_initial_state rejects an unregistered start state.
//
// The workflow is otherwise valid, but we pass an unregistered state as
// the initial entry point. validate_initial_state() catches this before
// any tasks run; orchestrate() would surface the same error on first call.
// -----------------------------------------------------------------------
println!("-- Scenario 3: unregistered initial state --");
{
let workflow = Workflow::bare()
.register(Step::Prepare, PrepareTask)
.register(Step::Aggregate, AggregateTask)
.add_exit_state(Step::Done);
// `Process` is not registered in this workflow.
match workflow.validate_initial_state(&Step::Process) {
Ok(()) => println!(" validate_initial_state(Process) -> Ok (unexpected!)"),
Err(e) => println!(" validate_initial_state(Process) -> Err (expected): {e}"),
}
// Exit states are valid initial states (they're skipped immediately).
match workflow.validate_initial_state(&Step::Done) {
Ok(()) => println!(" validate_initial_state(Done) -> Ok (exit states are valid)"),
Err(e) => println!(" validate_initial_state(Done) -> Err (unexpected!): {e}"),
}
}
println!("\n=== Done ===");
Ok(())
}