Skip to main content

pipeline/
pipeline.rs

1//! End-to-end pipeline: variables, steps, returns, hooks, and validation.
2//!
3//! Shows the typestate flow — build with `Pipeline::default`, register
4//! variables and steps, then `compile()` to check every reference before
5//! `run().wait()` executes. The happy path attaches the standard hooks
6//! (Logger, Profiler, EventLog, Timeout, StoreValidator) and reads the
7//! profiler/event-log handles after completion. The error cases show what
8//! `compile()` rejects: unresolved refs, forward refs, and duplicate names.
9
10use panopticon_core::{extend::Type, params, prelude::*};
11use std::time::Duration;
12
13fn main() -> Result<(), Box<dyn std::error::Error>> {
14    // ── Happy path: derived outputs resolved at compile time ──
15    println!("=== Valid pipeline ===");
16    {
17        let mut pipe = Pipeline::default();
18
19        pipe.var("number", 1)?;
20        pipe.var("string", "example")?;
21        pipe.array("array")?.push(1)?.push(2)?.push(3)?;
22
23        // Derived output name from a variable ref: "string" → "example"
24        pipe.step::<SetVar>(
25            "set_var_1",
26            params!(
27                "name" => Param::reference("string"),
28                "value" => Param::reference("number"),
29            ),
30        )?;
31
32        // Derived output name from a literal: "greeting"
33        pipe.step::<SetVar>(
34            "set_var_2",
35            params!(
36                "name" => "greeting",
37                "value" => "hello world",
38            ),
39        )?;
40
41        // Chained: use derived output "greeting" from set_var_2 as a variable reference
42        pipe.step::<SetVar>(
43            "set_var_3",
44            params!(
45                "name" => "farewell",
46                "value" => Param::reference("greeting"),
47            ),
48        )?;
49
50        // Template referencing a derived output
51        pipe.step::<SetVar>(
52            "set_var_4",
53            params!(
54                "name" => "message",
55                "value" => Param::template(vec![
56                    Param::reference("greeting"),
57                    Param::literal(" and goodbye"),
58                ]),
59            ),
60        )?;
61
62        pipe.returns(
63            "summary",
64            params!(
65                "original_number" => Param::reference("number"),
66                "original_string" => Param::reference("string"),
67                "the_array" => Param::reference("array"),
68                "greeting" => Param::reference("greeting"),
69                "derived_from_ref" => Param::reference("example"),
70                "farewell" => Param::reference("farewell"),
71                "message" => Param::reference("message"),
72            ),
73        )?;
74
75        // Logger: writes each event to stdout
76        pipe.hook(Logger::new().writer(std::io::stdout()));
77
78        // Profiler: tracks per-step wall-clock duration
79        let profiler = Profiler::new().writer(std::io::stdout());
80        let timings = profiler.timings();
81        pipe.hook(profiler);
82
83        // EventLog: collects structured event records for post-mortem inspection
84        let event_log = EventLog::new();
85        let log = event_log.log();
86        pipe.hook(event_log);
87
88        // Timeout: aborts if pipeline exceeds 5 seconds
89        pipe.hook(Timeout::new(Duration::from_secs(5)));
90
91        // StoreValidator: assert "greeting" exists as Text after set_var_2
92        pipe.hook(
93            StoreValidator::new()
94                .after_step("set_var_2")
95                .expect("greeting", Type::Text),
96        );
97
98        let complete = pipe.compile()?.run().wait()?;
99        complete.debug();
100
101        // Read profiler timings programmatically
102        println!("\nProgrammatic timings:");
103        for (name, dur) in timings.lock().unwrap().iter() {
104            println!("  {} took {:.3}ms", name, dur.as_secs_f64() * 1000.0);
105        }
106
107        // Read event log
108        println!("\nEvent log ({} events):", log.lock().unwrap().len());
109        for record in log.lock().unwrap().iter() {
110            println!(
111                "  [{:.3}ms] {:?}{}",
112                record.elapsed.as_secs_f64() * 1000.0,
113                record.kind,
114                record
115                    .step_name
116                    .as_deref()
117                    .map(|s| format!(" ({})", s))
118                    .unwrap_or_default(),
119            );
120        }
121    }
122
123    // ── Error: step references a variable that doesn't exist ──
124    println!("\n=== Unresolved step reference ===");
125    {
126        let mut pipe = Pipeline::default();
127        pipe.var("number", 1)?;
128
129        pipe.step::<SetVar>(
130            "bad_step",
131            params!(
132                "name" => "output",
133                "value" => Param::reference("does_not_exist"),
134            ),
135        )?;
136
137        match pipe.compile() {
138            Err(e) => println!("  Caught: {}", e),
139            Ok(_) => println!("  ERROR: should have failed!"),
140        }
141    }
142
143    // ── Error: return references something that was never produced ──
144    println!("\n=== Unresolved return reference ===");
145    {
146        let mut pipe = Pipeline::default();
147        pipe.var("number", 1)?;
148
149        pipe.step::<SetVar>(
150            "ok_step",
151            params!(
152                "name" => "output",
153                "value" => Param::reference("number"),
154            ),
155        )?;
156
157        pipe.returns(
158            "result",
159            params!(
160                "good" => Param::reference("output"),
161                "bad" => Param::reference("never_created"),
162            ),
163        )?;
164
165        match pipe.compile() {
166            Err(e) => println!("  Caught: {}", e),
167            Ok(_) => println!("  ERROR: should have failed!"),
168        }
169    }
170
171    // ── Error: step references a later step's output (ordering) ──
172    println!("\n=== Forward reference (wrong order) ===");
173    {
174        let mut pipe = Pipeline::default();
175        pipe.var("base", "start")?;
176
177        // This step tries to reference "later_output" which doesn't exist yet
178        pipe.step::<SetVar>(
179            "step_1",
180            params!(
181                "name" => "early",
182                "value" => Param::reference("later_output"),
183            ),
184        )?;
185
186        pipe.step::<SetVar>(
187            "step_2",
188            params!(
189                "name" => "later_output",
190                "value" => Param::reference("base"),
191            ),
192        )?;
193
194        match pipe.compile() {
195            Err(e) => println!("  Caught: {}", e),
196            Ok(_) => println!("  ERROR: should have failed!"),
197        }
198    }
199
200    // ── Error: duplicate variable name ──
201    println!("\n=== Duplicate variable ===");
202    {
203        let mut pipe = Pipeline::default();
204        pipe.var("x", 1)?;
205        match pipe.var("x", 2) {
206            Err(e) => println!("  Caught: {}", e),
207            Ok(_) => println!("  ERROR: should have failed!"),
208        }
209    }
210
211    // ── Error: duplicate step name ──
212    println!("\n=== Duplicate step ===");
213    {
214        let mut pipe = Pipeline::default();
215        pipe.var("v", "val")?;
216
217        pipe.step::<SetVar>("same_name", params!("name" => "a", "value" => "b"))?;
218        match pipe.step::<SetVar>("same_name", params!("name" => "c", "value" => "d")) {
219            Err(e) => println!("  Caught: {}", e),
220            Ok(_) => println!("  ERROR: should have failed!"),
221        }
222    }
223
224    // ── StepFilter: deny a step from executing ──
225    println!("\n=== StepFilter deny ===");
226    {
227        let mut pipe = Pipeline::default();
228        pipe.var("x", 1)?;
229
230        pipe.step::<SetVar>("allowed_step", params!("name" => "a", "value" => "ok"))?;
231        pipe.step::<SetVar>(
232            "blocked_step",
233            params!("name" => "b", "value" => Param::reference("a")),
234        )?;
235
236        pipe.returns("result", params!("a" => Param::reference("a")))?;
237
238        pipe.hook(StepFilter::deny(["blocked_step"]));
239
240        match pipe.compile()?.run().wait() {
241            Err(e) => println!("  Caught: {}", e),
242            Ok(_) => println!("  ERROR: should have been denied!"),
243        }
244    }
245
246    Ok(())
247}