1use crate::lagrange::ast::CompositionNode;
33
34pub fn canonicalise(node: &CompositionNode) -> CompositionNode {
37 let with_canonical_children = canonicalise_children(node);
43 canonicalise_node(with_canonical_children)
44}
45
46fn canonicalise_children(node: &CompositionNode) -> CompositionNode {
50 match node {
51 CompositionNode::Stage { .. }
52 | CompositionNode::RemoteStage { .. }
53 | CompositionNode::Const { .. } => node.clone(),
54
55 CompositionNode::Sequential { stages } => CompositionNode::Sequential {
56 stages: stages.iter().map(canonicalise).collect(),
57 },
58
59 CompositionNode::Parallel { branches } => {
60 let branches = branches
62 .iter()
63 .map(|(k, v)| (k.clone(), canonicalise(v)))
64 .collect();
65 CompositionNode::Parallel { branches }
66 }
67
68 CompositionNode::Branch {
69 predicate,
70 if_true,
71 if_false,
72 } => CompositionNode::Branch {
73 predicate: Box::new(canonicalise(predicate)),
74 if_true: Box::new(canonicalise(if_true)),
75 if_false: Box::new(canonicalise(if_false)),
76 },
77
78 CompositionNode::Fanout { source, targets } => CompositionNode::Fanout {
79 source: Box::new(canonicalise(source)),
80 targets: targets.iter().map(canonicalise).collect(),
81 },
82
83 CompositionNode::Merge { sources, target } => CompositionNode::Merge {
84 sources: sources.iter().map(canonicalise).collect(),
85 target: Box::new(canonicalise(target)),
86 },
87
88 CompositionNode::Retry {
89 stage,
90 max_attempts,
91 delay_ms,
92 } => CompositionNode::Retry {
93 stage: Box::new(canonicalise(stage)),
94 max_attempts: *max_attempts,
95 delay_ms: *delay_ms,
96 },
97
98 CompositionNode::Let { bindings, body } => {
99 let bindings = bindings
100 .iter()
101 .map(|(k, v)| (k.clone(), canonicalise(v)))
102 .collect();
103 CompositionNode::Let {
104 bindings,
105 body: Box::new(canonicalise(body)),
106 }
107 }
108 }
109}
110
111fn canonicalise_node(node: CompositionNode) -> CompositionNode {
114 match node {
115 CompositionNode::Sequential { stages } => {
117 let flattened: Vec<CompositionNode> = stages
118 .into_iter()
119 .flat_map(|s| match s {
120 CompositionNode::Sequential { stages: inner } => inner,
121 other => vec![other],
122 })
123 .collect();
124
125 if flattened.len() == 1 {
126 flattened.into_iter().next().unwrap()
127 } else {
128 CompositionNode::Sequential { stages: flattened }
129 }
130 }
131
132 CompositionNode::Retry {
135 stage,
136 max_attempts,
137 delay_ms,
138 } => {
139 if max_attempts <= 1 {
140 return *stage;
141 }
142 if let CompositionNode::Retry {
143 stage: inner_stage,
144 max_attempts: inner_attempts,
145 delay_ms: inner_delay,
146 } = *stage
147 {
148 if inner_delay == delay_ms {
149 let combined = max_attempts.saturating_mul(inner_attempts);
150 return canonicalise_node(CompositionNode::Retry {
156 stage: inner_stage,
157 max_attempts: combined,
158 delay_ms,
159 });
160 }
161 return CompositionNode::Retry {
165 stage: Box::new(CompositionNode::Retry {
166 stage: inner_stage,
167 max_attempts: inner_attempts,
168 delay_ms: inner_delay,
169 }),
170 max_attempts,
171 delay_ms,
172 };
173 }
174 CompositionNode::Retry {
175 stage,
176 max_attempts,
177 delay_ms,
178 }
179 }
180
181 CompositionNode::Let { bindings, body } => {
183 if bindings.is_empty() {
184 return *body;
185 }
186 CompositionNode::Let { bindings, body }
187 }
188
189 other => other,
191 }
192}
193
194#[cfg(test)]
195mod tests {
196 use super::*;
197 use noether_core::stage::StageId;
198 use serde_json::json;
199 use std::collections::BTreeMap;
200
201 fn stage(id: &str) -> CompositionNode {
202 CompositionNode::Stage {
203 id: StageId(id.into()),
204 pinning: crate::lagrange::Pinning::Signature,
205 config: None,
206 }
207 }
208
209 #[test]
210 fn atomic_nodes_unchanged() {
211 assert_eq!(canonicalise(&stage("a")), stage("a"));
212
213 let c = CompositionNode::Const { value: json!(42) };
214 assert_eq!(canonicalise(&c), c);
215 }
216
217 #[test]
218 fn sequential_singleton_collapses() {
219 let g = CompositionNode::Sequential {
220 stages: vec![stage("a")],
221 };
222 assert_eq!(canonicalise(&g), stage("a"));
223 }
224
225 #[test]
226 fn sequential_nested_flattens_left() {
227 let g = CompositionNode::Sequential {
229 stages: vec![
230 CompositionNode::Sequential {
231 stages: vec![stage("a"), stage("b")],
232 },
233 stage("c"),
234 ],
235 };
236 let expected = CompositionNode::Sequential {
237 stages: vec![stage("a"), stage("b"), stage("c")],
238 };
239 assert_eq!(canonicalise(&g), expected);
240 }
241
242 #[test]
243 fn sequential_nested_flattens_right() {
244 let g = CompositionNode::Sequential {
245 stages: vec![
246 stage("a"),
247 CompositionNode::Sequential {
248 stages: vec![stage("b"), stage("c")],
249 },
250 ],
251 };
252 let expected = CompositionNode::Sequential {
253 stages: vec![stage("a"), stage("b"), stage("c")],
254 };
255 assert_eq!(canonicalise(&g), expected);
256 }
257
258 #[test]
259 fn sequential_deeply_nested_flattens() {
260 let g = CompositionNode::Sequential {
262 stages: vec![
263 CompositionNode::Sequential {
264 stages: vec![
265 stage("a"),
266 CompositionNode::Sequential {
267 stages: vec![stage("b"), stage("c")],
268 },
269 ],
270 },
271 CompositionNode::Sequential {
272 stages: vec![stage("d")],
273 },
274 ],
275 };
276 let expected = CompositionNode::Sequential {
277 stages: vec![stage("a"), stage("b"), stage("c"), stage("d")],
278 };
279 assert_eq!(canonicalise(&g), expected);
280 }
281
282 #[test]
283 fn retry_single_attempt_collapses() {
284 let g = CompositionNode::Retry {
285 stage: Box::new(stage("a")),
286 max_attempts: 1,
287 delay_ms: Some(500),
288 };
289 assert_eq!(canonicalise(&g), stage("a"));
290 }
291
292 #[test]
293 fn retry_zero_attempts_also_collapses() {
294 let g = CompositionNode::Retry {
296 stage: Box::new(stage("a")),
297 max_attempts: 0,
298 delay_ms: None,
299 };
300 assert_eq!(canonicalise(&g), stage("a"));
301 }
302
303 #[test]
304 fn retry_nested_same_delay_multiplies() {
305 let g = CompositionNode::Retry {
307 stage: Box::new(CompositionNode::Retry {
308 stage: Box::new(stage("a")),
309 max_attempts: 3,
310 delay_ms: Some(100),
311 }),
312 max_attempts: 4,
313 delay_ms: Some(100),
314 };
315 let expected = CompositionNode::Retry {
316 stage: Box::new(stage("a")),
317 max_attempts: 12,
318 delay_ms: Some(100),
319 };
320 assert_eq!(canonicalise(&g), expected);
321 }
322
323 #[test]
324 fn retry_nested_different_delay_preserved() {
325 let g = CompositionNode::Retry {
326 stage: Box::new(CompositionNode::Retry {
327 stage: Box::new(stage("a")),
328 max_attempts: 3,
329 delay_ms: Some(100),
330 }),
331 max_attempts: 4,
332 delay_ms: Some(200),
333 };
334 let canonical = canonicalise(&g);
337 match canonical {
338 CompositionNode::Retry {
339 stage: outer_stage,
340 max_attempts: 4,
341 delay_ms: Some(200),
342 } => match *outer_stage {
343 CompositionNode::Retry {
344 max_attempts: 3,
345 delay_ms: Some(100),
346 ..
347 } => {}
348 other => panic!("expected inner Retry, got {:?}", other),
349 },
350 other => panic!("expected outer Retry, got {:?}", other),
351 }
352 }
353
354 #[test]
355 fn empty_let_collapses_to_body() {
356 let g = CompositionNode::Let {
357 bindings: BTreeMap::new(),
358 body: Box::new(stage("body")),
359 };
360 assert_eq!(canonicalise(&g), stage("body"));
361 }
362
363 #[test]
364 fn non_empty_let_preserved() {
365 let mut bindings = BTreeMap::new();
366 bindings.insert("x".into(), stage("compute_x"));
367 let g = CompositionNode::Let {
368 bindings: bindings.clone(),
369 body: Box::new(stage("body")),
370 };
371 assert_eq!(canonicalise(&g), g);
372 }
373
374 #[test]
375 fn canonicalise_is_idempotent() {
376 let g = CompositionNode::Sequential {
378 stages: vec![
379 CompositionNode::Sequential {
380 stages: vec![stage("a"), stage("b")],
381 },
382 CompositionNode::Retry {
383 stage: Box::new(stage("c")),
384 max_attempts: 1,
385 delay_ms: None,
386 },
387 CompositionNode::Let {
388 bindings: BTreeMap::new(),
389 body: Box::new(stage("d")),
390 },
391 ],
392 };
393 let once = canonicalise(&g);
394 let twice = canonicalise(&once);
395 assert_eq!(once, twice);
396 }
397
398 #[test]
399 fn parallel_branches_preserved_under_btreemap() {
400 let mut a = BTreeMap::new();
404 a.insert("alpha".into(), stage("x"));
405 a.insert("beta".into(), stage("y"));
406
407 let mut b = BTreeMap::new();
408 b.insert("beta".into(), stage("y"));
409 b.insert("alpha".into(), stage("x"));
410
411 let g1 = CompositionNode::Parallel { branches: a };
412 let g2 = CompositionNode::Parallel { branches: b };
413
414 assert_eq!(canonicalise(&g1), canonicalise(&g2));
415 }
416
417 #[test]
418 fn fanout_target_order_preserved() {
419 let g1 = CompositionNode::Fanout {
423 source: Box::new(stage("src")),
424 targets: vec![stage("a"), stage("b")],
425 };
426 let g2 = CompositionNode::Fanout {
427 source: Box::new(stage("src")),
428 targets: vec![stage("b"), stage("a")],
429 };
430 assert_ne!(canonicalise(&g1), canonicalise(&g2));
431 }
432
433 #[test]
434 fn inner_canonicalisation_bubbles_up() {
435 let g = CompositionNode::Sequential {
438 stages: vec![
439 stage("a"),
440 CompositionNode::Retry {
441 stage: Box::new(stage("b")),
442 max_attempts: 1,
443 delay_ms: Some(50),
444 },
445 stage("c"),
446 ],
447 };
448 let expected = CompositionNode::Sequential {
449 stages: vec![stage("a"), stage("b"), stage("c")],
450 };
451 assert_eq!(canonicalise(&g), expected);
452 }
453}