use crate::lagrange::ast::CompositionNode;
pub fn canonicalise(node: &CompositionNode) -> CompositionNode {
let with_canonical_children = canonicalise_children(node);
canonicalise_node(with_canonical_children)
}
fn canonicalise_children(node: &CompositionNode) -> CompositionNode {
match node {
CompositionNode::Stage { .. }
| CompositionNode::RemoteStage { .. }
| CompositionNode::Const { .. } => node.clone(),
CompositionNode::Sequential { stages } => CompositionNode::Sequential {
stages: stages.iter().map(canonicalise).collect(),
},
CompositionNode::Parallel { branches } => {
let branches = branches
.iter()
.map(|(k, v)| (k.clone(), canonicalise(v)))
.collect();
CompositionNode::Parallel { branches }
}
CompositionNode::Branch {
predicate,
if_true,
if_false,
} => CompositionNode::Branch {
predicate: Box::new(canonicalise(predicate)),
if_true: Box::new(canonicalise(if_true)),
if_false: Box::new(canonicalise(if_false)),
},
CompositionNode::Fanout { source, targets } => CompositionNode::Fanout {
source: Box::new(canonicalise(source)),
targets: targets.iter().map(canonicalise).collect(),
},
CompositionNode::Merge { sources, target } => CompositionNode::Merge {
sources: sources.iter().map(canonicalise).collect(),
target: Box::new(canonicalise(target)),
},
CompositionNode::Retry {
stage,
max_attempts,
delay_ms,
} => CompositionNode::Retry {
stage: Box::new(canonicalise(stage)),
max_attempts: *max_attempts,
delay_ms: *delay_ms,
},
CompositionNode::Let { bindings, body } => {
let bindings = bindings
.iter()
.map(|(k, v)| (k.clone(), canonicalise(v)))
.collect();
CompositionNode::Let {
bindings,
body: Box::new(canonicalise(body)),
}
}
}
}
fn canonicalise_node(node: CompositionNode) -> CompositionNode {
match node {
CompositionNode::Sequential { stages } => {
let flattened: Vec<CompositionNode> = stages
.into_iter()
.flat_map(|s| match s {
CompositionNode::Sequential { stages: inner } => inner,
other => vec![other],
})
.collect();
if flattened.len() == 1 {
flattened.into_iter().next().unwrap()
} else {
CompositionNode::Sequential { stages: flattened }
}
}
CompositionNode::Retry {
stage,
max_attempts,
delay_ms,
} => {
if max_attempts <= 1 {
return *stage;
}
if let CompositionNode::Retry {
stage: inner_stage,
max_attempts: inner_attempts,
delay_ms: inner_delay,
} = *stage
{
if inner_delay == delay_ms {
let combined = max_attempts.saturating_mul(inner_attempts);
return canonicalise_node(CompositionNode::Retry {
stage: inner_stage,
max_attempts: combined,
delay_ms,
});
}
return CompositionNode::Retry {
stage: Box::new(CompositionNode::Retry {
stage: inner_stage,
max_attempts: inner_attempts,
delay_ms: inner_delay,
}),
max_attempts,
delay_ms,
};
}
CompositionNode::Retry {
stage,
max_attempts,
delay_ms,
}
}
CompositionNode::Let { bindings, body } => {
if bindings.is_empty() {
return *body;
}
CompositionNode::Let { bindings, body }
}
other => other,
}
}
#[cfg(test)]
mod tests {
use super::*;
use noether_core::stage::StageId;
use serde_json::json;
use std::collections::BTreeMap;
fn stage(id: &str) -> CompositionNode {
CompositionNode::Stage {
id: StageId(id.into()),
pinning: crate::lagrange::Pinning::Signature,
config: None,
}
}
#[test]
fn atomic_nodes_unchanged() {
assert_eq!(canonicalise(&stage("a")), stage("a"));
let c = CompositionNode::Const { value: json!(42) };
assert_eq!(canonicalise(&c), c);
}
#[test]
fn sequential_singleton_collapses() {
let g = CompositionNode::Sequential {
stages: vec![stage("a")],
};
assert_eq!(canonicalise(&g), stage("a"));
}
#[test]
fn sequential_nested_flattens_left() {
let g = CompositionNode::Sequential {
stages: vec![
CompositionNode::Sequential {
stages: vec![stage("a"), stage("b")],
},
stage("c"),
],
};
let expected = CompositionNode::Sequential {
stages: vec![stage("a"), stage("b"), stage("c")],
};
assert_eq!(canonicalise(&g), expected);
}
#[test]
fn sequential_nested_flattens_right() {
let g = CompositionNode::Sequential {
stages: vec![
stage("a"),
CompositionNode::Sequential {
stages: vec![stage("b"), stage("c")],
},
],
};
let expected = CompositionNode::Sequential {
stages: vec![stage("a"), stage("b"), stage("c")],
};
assert_eq!(canonicalise(&g), expected);
}
#[test]
fn sequential_deeply_nested_flattens() {
let g = CompositionNode::Sequential {
stages: vec![
CompositionNode::Sequential {
stages: vec![
stage("a"),
CompositionNode::Sequential {
stages: vec![stage("b"), stage("c")],
},
],
},
CompositionNode::Sequential {
stages: vec![stage("d")],
},
],
};
let expected = CompositionNode::Sequential {
stages: vec![stage("a"), stage("b"), stage("c"), stage("d")],
};
assert_eq!(canonicalise(&g), expected);
}
#[test]
fn retry_single_attempt_collapses() {
let g = CompositionNode::Retry {
stage: Box::new(stage("a")),
max_attempts: 1,
delay_ms: Some(500),
};
assert_eq!(canonicalise(&g), stage("a"));
}
#[test]
fn retry_zero_attempts_also_collapses() {
let g = CompositionNode::Retry {
stage: Box::new(stage("a")),
max_attempts: 0,
delay_ms: None,
};
assert_eq!(canonicalise(&g), stage("a"));
}
#[test]
fn retry_nested_same_delay_multiplies() {
let g = CompositionNode::Retry {
stage: Box::new(CompositionNode::Retry {
stage: Box::new(stage("a")),
max_attempts: 3,
delay_ms: Some(100),
}),
max_attempts: 4,
delay_ms: Some(100),
};
let expected = CompositionNode::Retry {
stage: Box::new(stage("a")),
max_attempts: 12,
delay_ms: Some(100),
};
assert_eq!(canonicalise(&g), expected);
}
#[test]
fn retry_nested_different_delay_preserved() {
let g = CompositionNode::Retry {
stage: Box::new(CompositionNode::Retry {
stage: Box::new(stage("a")),
max_attempts: 3,
delay_ms: Some(100),
}),
max_attempts: 4,
delay_ms: Some(200),
};
let canonical = canonicalise(&g);
match canonical {
CompositionNode::Retry {
stage: outer_stage,
max_attempts: 4,
delay_ms: Some(200),
} => match *outer_stage {
CompositionNode::Retry {
max_attempts: 3,
delay_ms: Some(100),
..
} => {}
other => panic!("expected inner Retry, got {:?}", other),
},
other => panic!("expected outer Retry, got {:?}", other),
}
}
#[test]
fn empty_let_collapses_to_body() {
let g = CompositionNode::Let {
bindings: BTreeMap::new(),
body: Box::new(stage("body")),
};
assert_eq!(canonicalise(&g), stage("body"));
}
#[test]
fn non_empty_let_preserved() {
let mut bindings = BTreeMap::new();
bindings.insert("x".into(), stage("compute_x"));
let g = CompositionNode::Let {
bindings: bindings.clone(),
body: Box::new(stage("body")),
};
assert_eq!(canonicalise(&g), g);
}
#[test]
fn canonicalise_is_idempotent() {
let g = CompositionNode::Sequential {
stages: vec![
CompositionNode::Sequential {
stages: vec![stage("a"), stage("b")],
},
CompositionNode::Retry {
stage: Box::new(stage("c")),
max_attempts: 1,
delay_ms: None,
},
CompositionNode::Let {
bindings: BTreeMap::new(),
body: Box::new(stage("d")),
},
],
};
let once = canonicalise(&g);
let twice = canonicalise(&once);
assert_eq!(once, twice);
}
#[test]
fn parallel_branches_preserved_under_btreemap() {
let mut a = BTreeMap::new();
a.insert("alpha".into(), stage("x"));
a.insert("beta".into(), stage("y"));
let mut b = BTreeMap::new();
b.insert("beta".into(), stage("y"));
b.insert("alpha".into(), stage("x"));
let g1 = CompositionNode::Parallel { branches: a };
let g2 = CompositionNode::Parallel { branches: b };
assert_eq!(canonicalise(&g1), canonicalise(&g2));
}
#[test]
fn fanout_target_order_preserved() {
let g1 = CompositionNode::Fanout {
source: Box::new(stage("src")),
targets: vec![stage("a"), stage("b")],
};
let g2 = CompositionNode::Fanout {
source: Box::new(stage("src")),
targets: vec![stage("b"), stage("a")],
};
assert_ne!(canonicalise(&g1), canonicalise(&g2));
}
#[test]
fn inner_canonicalisation_bubbles_up() {
let g = CompositionNode::Sequential {
stages: vec![
stage("a"),
CompositionNode::Retry {
stage: Box::new(stage("b")),
max_attempts: 1,
delay_ms: Some(50),
},
stage("c"),
],
};
let expected = CompositionNode::Sequential {
stages: vec![stage("a"), stage("b"), stage("c")],
};
assert_eq!(canonicalise(&g), expected);
}
}