pub struct StoreValidator { /* private fields */ }Expand description
A built-in interceptor hook that asserts store state after steps.
Constructed fluently: chain expect,
expect_array, and
expect_map to declare the keys and shapes that
must be present. Use after_step to scope
expectations to a specific step. When any expectation fails after a
step, the pipeline aborts with OperationError::HookAbort.
Implementations§
Source§impl StoreValidator
impl StoreValidator
Sourcepub fn new() -> Self
pub fn new() -> Self
Constructs a validator with no expectations and no scope.
Examples found in repository?
13fn main() -> Result<(), Box<dyn std::error::Error>> {
14 // ── Happy path: derived outputs resolved at compile time ──
15 println!("=== Valid pipeline ===");
16 {
17 let mut pipe = Pipeline::default();
18
19 pipe.var("number", 1)?;
20 pipe.var("string", "example")?;
21 pipe.array("array")?.push(1)?.push(2)?.push(3)?;
22
23 // Derived output name from a variable ref: "string" → "example"
24 pipe.step::<SetVar>(
25 "set_var_1",
26 params!(
27 "name" => Param::reference("string"),
28 "value" => Param::reference("number"),
29 ),
30 )?;
31
32 // Derived output name from a literal: "greeting"
33 pipe.step::<SetVar>(
34 "set_var_2",
35 params!(
36 "name" => "greeting",
37 "value" => "hello world",
38 ),
39 )?;
40
41 // Chained: use derived output "greeting" from set_var_2 as a variable reference
42 pipe.step::<SetVar>(
43 "set_var_3",
44 params!(
45 "name" => "farewell",
46 "value" => Param::reference("greeting"),
47 ),
48 )?;
49
50 // Template referencing a derived output
51 pipe.step::<SetVar>(
52 "set_var_4",
53 params!(
54 "name" => "message",
55 "value" => Param::template(vec![
56 Param::reference("greeting"),
57 Param::literal(" and goodbye"),
58 ]),
59 ),
60 )?;
61
62 pipe.returns(
63 "summary",
64 params!(
65 "original_number" => Param::reference("number"),
66 "original_string" => Param::reference("string"),
67 "the_array" => Param::reference("array"),
68 "greeting" => Param::reference("greeting"),
69 "derived_from_ref" => Param::reference("example"),
70 "farewell" => Param::reference("farewell"),
71 "message" => Param::reference("message"),
72 ),
73 )?;
74
75 // Logger: writes each event to stdout
76 pipe.hook(Logger::new().writer(std::io::stdout()));
77
78 // Profiler: tracks per-step wall-clock duration
79 let profiler = Profiler::new().writer(std::io::stdout());
80 let timings = profiler.timings();
81 pipe.hook(profiler);
82
83 // EventLog: collects structured event records for post-mortem inspection
84 let event_log = EventLog::new();
85 let log = event_log.log();
86 pipe.hook(event_log);
87
88 // Timeout: aborts if pipeline exceeds 5 seconds
89 pipe.hook(Timeout::new(Duration::from_secs(5)));
90
91 // StoreValidator: assert "greeting" exists as Text after set_var_2
92 pipe.hook(
93 StoreValidator::new()
94 .after_step("set_var_2")
95 .expect("greeting", Type::Text),
96 );
97
98 let complete = pipe.compile()?.run().wait()?;
99 complete.debug();
100
101 // Read profiler timings programmatically
102 println!("\nProgrammatic timings:");
103 for (name, dur) in timings.lock().unwrap().iter() {
104 println!(" {} took {:.3}ms", name, dur.as_secs_f64() * 1000.0);
105 }
106
107 // Read event log
108 println!("\nEvent log ({} events):", log.lock().unwrap().len());
109 for record in log.lock().unwrap().iter() {
110 println!(
111 " [{:.3}ms] {:?}{}",
112 record.elapsed.as_secs_f64() * 1000.0,
113 record.kind,
114 record
115 .step_name
116 .as_deref()
117 .map(|s| format!(" ({})", s))
118 .unwrap_or_default(),
119 );
120 }
121 }
122
123 // ── Error: step references a variable that doesn't exist ──
124 println!("\n=== Unresolved step reference ===");
125 {
126 let mut pipe = Pipeline::default();
127 pipe.var("number", 1)?;
128
129 pipe.step::<SetVar>(
130 "bad_step",
131 params!(
132 "name" => "output",
133 "value" => Param::reference("does_not_exist"),
134 ),
135 )?;
136
137 match pipe.compile() {
138 Err(e) => println!(" Caught: {}", e),
139 Ok(_) => println!(" ERROR: should have failed!"),
140 }
141 }
142
143 // ── Error: return references something that was never produced ──
144 println!("\n=== Unresolved return reference ===");
145 {
146 let mut pipe = Pipeline::default();
147 pipe.var("number", 1)?;
148
149 pipe.step::<SetVar>(
150 "ok_step",
151 params!(
152 "name" => "output",
153 "value" => Param::reference("number"),
154 ),
155 )?;
156
157 pipe.returns(
158 "result",
159 params!(
160 "good" => Param::reference("output"),
161 "bad" => Param::reference("never_created"),
162 ),
163 )?;
164
165 match pipe.compile() {
166 Err(e) => println!(" Caught: {}", e),
167 Ok(_) => println!(" ERROR: should have failed!"),
168 }
169 }
170
171 // ── Error: step references a later step's output (ordering) ──
172 println!("\n=== Forward reference (wrong order) ===");
173 {
174 let mut pipe = Pipeline::default();
175 pipe.var("base", "start")?;
176
177 // This step tries to reference "later_output" which doesn't exist yet
178 pipe.step::<SetVar>(
179 "step_1",
180 params!(
181 "name" => "early",
182 "value" => Param::reference("later_output"),
183 ),
184 )?;
185
186 pipe.step::<SetVar>(
187 "step_2",
188 params!(
189 "name" => "later_output",
190 "value" => Param::reference("base"),
191 ),
192 )?;
193
194 match pipe.compile() {
195 Err(e) => println!(" Caught: {}", e),
196 Ok(_) => println!(" ERROR: should have failed!"),
197 }
198 }
199
200 // ── Error: duplicate variable name ──
201 println!("\n=== Duplicate variable ===");
202 {
203 let mut pipe = Pipeline::default();
204 pipe.var("x", 1)?;
205 match pipe.var("x", 2) {
206 Err(e) => println!(" Caught: {}", e),
207 Ok(_) => println!(" ERROR: should have failed!"),
208 }
209 }
210
211 // ── Error: duplicate step name ──
212 println!("\n=== Duplicate step ===");
213 {
214 let mut pipe = Pipeline::default();
215 pipe.var("v", "val")?;
216
217 pipe.step::<SetVar>("same_name", params!("name" => "a", "value" => "b"))?;
218 match pipe.step::<SetVar>("same_name", params!("name" => "c", "value" => "d")) {
219 Err(e) => println!(" Caught: {}", e),
220 Ok(_) => println!(" ERROR: should have failed!"),
221 }
222 }
223
224 // ── StepFilter: deny a step from executing ──
225 println!("\n=== StepFilter deny ===");
226 {
227 let mut pipe = Pipeline::default();
228 pipe.var("x", 1)?;
229
230 pipe.step::<SetVar>("allowed_step", params!("name" => "a", "value" => "ok"))?;
231 pipe.step::<SetVar>(
232 "blocked_step",
233 params!("name" => "b", "value" => Param::reference("a")),
234 )?;
235
236 pipe.returns("result", params!("a" => Param::reference("a")))?;
237
238 pipe.hook(StepFilter::deny(["blocked_step"]));
239
240 match pipe.compile()?.run().wait() {
241 Err(e) => println!(" Caught: {}", e),
242 Ok(_) => println!(" ERROR: should have been denied!"),
243 }
244 }
245
246 Ok(())
247}Sourcepub fn name(self, name: impl Into<String>) -> Self
pub fn name(self, name: impl Into<String>) -> Self
Overrides the hook name used in abort messages.
Sourcepub fn after_step(self, step_name: impl Into<String>) -> Self
pub fn after_step(self, step_name: impl Into<String>) -> Self
Only validate after a specific step. Applies to all subsequent expect calls
until after_step is called again or after_any_step resets it.
Examples found in repository?
13fn main() -> Result<(), Box<dyn std::error::Error>> {
14 // ── Happy path: derived outputs resolved at compile time ──
15 println!("=== Valid pipeline ===");
16 {
17 let mut pipe = Pipeline::default();
18
19 pipe.var("number", 1)?;
20 pipe.var("string", "example")?;
21 pipe.array("array")?.push(1)?.push(2)?.push(3)?;
22
23 // Derived output name from a variable ref: "string" → "example"
24 pipe.step::<SetVar>(
25 "set_var_1",
26 params!(
27 "name" => Param::reference("string"),
28 "value" => Param::reference("number"),
29 ),
30 )?;
31
32 // Derived output name from a literal: "greeting"
33 pipe.step::<SetVar>(
34 "set_var_2",
35 params!(
36 "name" => "greeting",
37 "value" => "hello world",
38 ),
39 )?;
40
41 // Chained: use derived output "greeting" from set_var_2 as a variable reference
42 pipe.step::<SetVar>(
43 "set_var_3",
44 params!(
45 "name" => "farewell",
46 "value" => Param::reference("greeting"),
47 ),
48 )?;
49
50 // Template referencing a derived output
51 pipe.step::<SetVar>(
52 "set_var_4",
53 params!(
54 "name" => "message",
55 "value" => Param::template(vec![
56 Param::reference("greeting"),
57 Param::literal(" and goodbye"),
58 ]),
59 ),
60 )?;
61
62 pipe.returns(
63 "summary",
64 params!(
65 "original_number" => Param::reference("number"),
66 "original_string" => Param::reference("string"),
67 "the_array" => Param::reference("array"),
68 "greeting" => Param::reference("greeting"),
69 "derived_from_ref" => Param::reference("example"),
70 "farewell" => Param::reference("farewell"),
71 "message" => Param::reference("message"),
72 ),
73 )?;
74
75 // Logger: writes each event to stdout
76 pipe.hook(Logger::new().writer(std::io::stdout()));
77
78 // Profiler: tracks per-step wall-clock duration
79 let profiler = Profiler::new().writer(std::io::stdout());
80 let timings = profiler.timings();
81 pipe.hook(profiler);
82
83 // EventLog: collects structured event records for post-mortem inspection
84 let event_log = EventLog::new();
85 let log = event_log.log();
86 pipe.hook(event_log);
87
88 // Timeout: aborts if pipeline exceeds 5 seconds
89 pipe.hook(Timeout::new(Duration::from_secs(5)));
90
91 // StoreValidator: assert "greeting" exists as Text after set_var_2
92 pipe.hook(
93 StoreValidator::new()
94 .after_step("set_var_2")
95 .expect("greeting", Type::Text),
96 );
97
98 let complete = pipe.compile()?.run().wait()?;
99 complete.debug();
100
101 // Read profiler timings programmatically
102 println!("\nProgrammatic timings:");
103 for (name, dur) in timings.lock().unwrap().iter() {
104 println!(" {} took {:.3}ms", name, dur.as_secs_f64() * 1000.0);
105 }
106
107 // Read event log
108 println!("\nEvent log ({} events):", log.lock().unwrap().len());
109 for record in log.lock().unwrap().iter() {
110 println!(
111 " [{:.3}ms] {:?}{}",
112 record.elapsed.as_secs_f64() * 1000.0,
113 record.kind,
114 record
115 .step_name
116 .as_deref()
117 .map(|s| format!(" ({})", s))
118 .unwrap_or_default(),
119 );
120 }
121 }
122
123 // ── Error: step references a variable that doesn't exist ──
124 println!("\n=== Unresolved step reference ===");
125 {
126 let mut pipe = Pipeline::default();
127 pipe.var("number", 1)?;
128
129 pipe.step::<SetVar>(
130 "bad_step",
131 params!(
132 "name" => "output",
133 "value" => Param::reference("does_not_exist"),
134 ),
135 )?;
136
137 match pipe.compile() {
138 Err(e) => println!(" Caught: {}", e),
139 Ok(_) => println!(" ERROR: should have failed!"),
140 }
141 }
142
143 // ── Error: return references something that was never produced ──
144 println!("\n=== Unresolved return reference ===");
145 {
146 let mut pipe = Pipeline::default();
147 pipe.var("number", 1)?;
148
149 pipe.step::<SetVar>(
150 "ok_step",
151 params!(
152 "name" => "output",
153 "value" => Param::reference("number"),
154 ),
155 )?;
156
157 pipe.returns(
158 "result",
159 params!(
160 "good" => Param::reference("output"),
161 "bad" => Param::reference("never_created"),
162 ),
163 )?;
164
165 match pipe.compile() {
166 Err(e) => println!(" Caught: {}", e),
167 Ok(_) => println!(" ERROR: should have failed!"),
168 }
169 }
170
171 // ── Error: step references a later step's output (ordering) ──
172 println!("\n=== Forward reference (wrong order) ===");
173 {
174 let mut pipe = Pipeline::default();
175 pipe.var("base", "start")?;
176
177 // This step tries to reference "later_output" which doesn't exist yet
178 pipe.step::<SetVar>(
179 "step_1",
180 params!(
181 "name" => "early",
182 "value" => Param::reference("later_output"),
183 ),
184 )?;
185
186 pipe.step::<SetVar>(
187 "step_2",
188 params!(
189 "name" => "later_output",
190 "value" => Param::reference("base"),
191 ),
192 )?;
193
194 match pipe.compile() {
195 Err(e) => println!(" Caught: {}", e),
196 Ok(_) => println!(" ERROR: should have failed!"),
197 }
198 }
199
200 // ── Error: duplicate variable name ──
201 println!("\n=== Duplicate variable ===");
202 {
203 let mut pipe = Pipeline::default();
204 pipe.var("x", 1)?;
205 match pipe.var("x", 2) {
206 Err(e) => println!(" Caught: {}", e),
207 Ok(_) => println!(" ERROR: should have failed!"),
208 }
209 }
210
211 // ── Error: duplicate step name ──
212 println!("\n=== Duplicate step ===");
213 {
214 let mut pipe = Pipeline::default();
215 pipe.var("v", "val")?;
216
217 pipe.step::<SetVar>("same_name", params!("name" => "a", "value" => "b"))?;
218 match pipe.step::<SetVar>("same_name", params!("name" => "c", "value" => "d")) {
219 Err(e) => println!(" Caught: {}", e),
220 Ok(_) => println!(" ERROR: should have failed!"),
221 }
222 }
223
224 // ── StepFilter: deny a step from executing ──
225 println!("\n=== StepFilter deny ===");
226 {
227 let mut pipe = Pipeline::default();
228 pipe.var("x", 1)?;
229
230 pipe.step::<SetVar>("allowed_step", params!("name" => "a", "value" => "ok"))?;
231 pipe.step::<SetVar>(
232 "blocked_step",
233 params!("name" => "b", "value" => Param::reference("a")),
234 )?;
235
236 pipe.returns("result", params!("a" => Param::reference("a")))?;
237
238 pipe.hook(StepFilter::deny(["blocked_step"]));
239
240 match pipe.compile()?.run().wait() {
241 Err(e) => println!(" Caught: {}", e),
242 Ok(_) => println!(" ERROR: should have been denied!"),
243 }
244 }
245
246 Ok(())
247}Sourcepub fn after_any_step(self) -> Self
pub fn after_any_step(self) -> Self
Reset so subsequent expect calls validate after every step.
Sourcepub fn expect(self, key: impl Into<String>, ty: Type) -> Self
pub fn expect(self, key: impl Into<String>, ty: Type) -> Self
Expect a store key to exist with the given scalar type.
Examples found in repository?
13fn main() -> Result<(), Box<dyn std::error::Error>> {
14 // ── Happy path: derived outputs resolved at compile time ──
15 println!("=== Valid pipeline ===");
16 {
17 let mut pipe = Pipeline::default();
18
19 pipe.var("number", 1)?;
20 pipe.var("string", "example")?;
21 pipe.array("array")?.push(1)?.push(2)?.push(3)?;
22
23 // Derived output name from a variable ref: "string" → "example"
24 pipe.step::<SetVar>(
25 "set_var_1",
26 params!(
27 "name" => Param::reference("string"),
28 "value" => Param::reference("number"),
29 ),
30 )?;
31
32 // Derived output name from a literal: "greeting"
33 pipe.step::<SetVar>(
34 "set_var_2",
35 params!(
36 "name" => "greeting",
37 "value" => "hello world",
38 ),
39 )?;
40
41 // Chained: use derived output "greeting" from set_var_2 as a variable reference
42 pipe.step::<SetVar>(
43 "set_var_3",
44 params!(
45 "name" => "farewell",
46 "value" => Param::reference("greeting"),
47 ),
48 )?;
49
50 // Template referencing a derived output
51 pipe.step::<SetVar>(
52 "set_var_4",
53 params!(
54 "name" => "message",
55 "value" => Param::template(vec![
56 Param::reference("greeting"),
57 Param::literal(" and goodbye"),
58 ]),
59 ),
60 )?;
61
62 pipe.returns(
63 "summary",
64 params!(
65 "original_number" => Param::reference("number"),
66 "original_string" => Param::reference("string"),
67 "the_array" => Param::reference("array"),
68 "greeting" => Param::reference("greeting"),
69 "derived_from_ref" => Param::reference("example"),
70 "farewell" => Param::reference("farewell"),
71 "message" => Param::reference("message"),
72 ),
73 )?;
74
75 // Logger: writes each event to stdout
76 pipe.hook(Logger::new().writer(std::io::stdout()));
77
78 // Profiler: tracks per-step wall-clock duration
79 let profiler = Profiler::new().writer(std::io::stdout());
80 let timings = profiler.timings();
81 pipe.hook(profiler);
82
83 // EventLog: collects structured event records for post-mortem inspection
84 let event_log = EventLog::new();
85 let log = event_log.log();
86 pipe.hook(event_log);
87
88 // Timeout: aborts if pipeline exceeds 5 seconds
89 pipe.hook(Timeout::new(Duration::from_secs(5)));
90
91 // StoreValidator: assert "greeting" exists as Text after set_var_2
92 pipe.hook(
93 StoreValidator::new()
94 .after_step("set_var_2")
95 .expect("greeting", Type::Text),
96 );
97
98 let complete = pipe.compile()?.run().wait()?;
99 complete.debug();
100
101 // Read profiler timings programmatically
102 println!("\nProgrammatic timings:");
103 for (name, dur) in timings.lock().unwrap().iter() {
104 println!(" {} took {:.3}ms", name, dur.as_secs_f64() * 1000.0);
105 }
106
107 // Read event log
108 println!("\nEvent log ({} events):", log.lock().unwrap().len());
109 for record in log.lock().unwrap().iter() {
110 println!(
111 " [{:.3}ms] {:?}{}",
112 record.elapsed.as_secs_f64() * 1000.0,
113 record.kind,
114 record
115 .step_name
116 .as_deref()
117 .map(|s| format!(" ({})", s))
118 .unwrap_or_default(),
119 );
120 }
121 }
122
123 // ── Error: step references a variable that doesn't exist ──
124 println!("\n=== Unresolved step reference ===");
125 {
126 let mut pipe = Pipeline::default();
127 pipe.var("number", 1)?;
128
129 pipe.step::<SetVar>(
130 "bad_step",
131 params!(
132 "name" => "output",
133 "value" => Param::reference("does_not_exist"),
134 ),
135 )?;
136
137 match pipe.compile() {
138 Err(e) => println!(" Caught: {}", e),
139 Ok(_) => println!(" ERROR: should have failed!"),
140 }
141 }
142
143 // ── Error: return references something that was never produced ──
144 println!("\n=== Unresolved return reference ===");
145 {
146 let mut pipe = Pipeline::default();
147 pipe.var("number", 1)?;
148
149 pipe.step::<SetVar>(
150 "ok_step",
151 params!(
152 "name" => "output",
153 "value" => Param::reference("number"),
154 ),
155 )?;
156
157 pipe.returns(
158 "result",
159 params!(
160 "good" => Param::reference("output"),
161 "bad" => Param::reference("never_created"),
162 ),
163 )?;
164
165 match pipe.compile() {
166 Err(e) => println!(" Caught: {}", e),
167 Ok(_) => println!(" ERROR: should have failed!"),
168 }
169 }
170
171 // ── Error: step references a later step's output (ordering) ──
172 println!("\n=== Forward reference (wrong order) ===");
173 {
174 let mut pipe = Pipeline::default();
175 pipe.var("base", "start")?;
176
177 // This step tries to reference "later_output" which doesn't exist yet
178 pipe.step::<SetVar>(
179 "step_1",
180 params!(
181 "name" => "early",
182 "value" => Param::reference("later_output"),
183 ),
184 )?;
185
186 pipe.step::<SetVar>(
187 "step_2",
188 params!(
189 "name" => "later_output",
190 "value" => Param::reference("base"),
191 ),
192 )?;
193
194 match pipe.compile() {
195 Err(e) => println!(" Caught: {}", e),
196 Ok(_) => println!(" ERROR: should have failed!"),
197 }
198 }
199
200 // ── Error: duplicate variable name ──
201 println!("\n=== Duplicate variable ===");
202 {
203 let mut pipe = Pipeline::default();
204 pipe.var("x", 1)?;
205 match pipe.var("x", 2) {
206 Err(e) => println!(" Caught: {}", e),
207 Ok(_) => println!(" ERROR: should have failed!"),
208 }
209 }
210
211 // ── Error: duplicate step name ──
212 println!("\n=== Duplicate step ===");
213 {
214 let mut pipe = Pipeline::default();
215 pipe.var("v", "val")?;
216
217 pipe.step::<SetVar>("same_name", params!("name" => "a", "value" => "b"))?;
218 match pipe.step::<SetVar>("same_name", params!("name" => "c", "value" => "d")) {
219 Err(e) => println!(" Caught: {}", e),
220 Ok(_) => println!(" ERROR: should have failed!"),
221 }
222 }
223
224 // ── StepFilter: deny a step from executing ──
225 println!("\n=== StepFilter deny ===");
226 {
227 let mut pipe = Pipeline::default();
228 pipe.var("x", 1)?;
229
230 pipe.step::<SetVar>("allowed_step", params!("name" => "a", "value" => "ok"))?;
231 pipe.step::<SetVar>(
232 "blocked_step",
233 params!("name" => "b", "value" => Param::reference("a")),
234 )?;
235
236 pipe.returns("result", params!("a" => Param::reference("a")))?;
237
238 pipe.hook(StepFilter::deny(["blocked_step"]));
239
240 match pipe.compile()?.run().wait() {
241 Err(e) => println!(" Caught: {}", e),
242 Ok(_) => println!(" ERROR: should have been denied!"),
243 }
244 }
245
246 Ok(())
247}Sourcepub fn expect_array(self, key: impl Into<String>) -> Self
pub fn expect_array(self, key: impl Into<String>) -> Self
Expect a store key to hold an array.
Sourcepub fn expect_map(self, key: impl Into<String>) -> Self
pub fn expect_map(self, key: impl Into<String>) -> Self
Expect a store key to hold a map.