1use noether_core::stage::StageId;
2use noether_core::types::NType;
3use serde::{Deserialize, Serialize};
4use std::collections::BTreeMap;
5
6#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
8#[serde(tag = "op")]
9pub enum CompositionNode {
10 Stage {
16 id: StageId,
17 #[serde(default, skip_serializing_if = "Option::is_none")]
18 config: Option<BTreeMap<String, serde_json::Value>>,
19 },
20
21 RemoteStage {
28 url: String,
30 input: NType,
32 output: NType,
34 },
35
36 Const { value: serde_json::Value },
39
40 Sequential { stages: Vec<CompositionNode> },
42
43 Parallel {
48 branches: BTreeMap<String, CompositionNode>,
49 },
50
51 Branch {
53 predicate: Box<CompositionNode>,
54 if_true: Box<CompositionNode>,
55 if_false: Box<CompositionNode>,
56 },
57
58 Fanout {
60 source: Box<CompositionNode>,
61 targets: Vec<CompositionNode>,
62 },
63
64 Merge {
66 sources: Vec<CompositionNode>,
67 target: Box<CompositionNode>,
68 },
69
70 Retry {
72 stage: Box<CompositionNode>,
73 max_attempts: u32,
74 delay_ms: Option<u64>,
75 },
76
77 Let {
95 bindings: BTreeMap<String, CompositionNode>,
96 body: Box<CompositionNode>,
97 },
98}
99
100#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
102pub struct CompositionGraph {
103 pub description: String,
104 pub root: CompositionNode,
105 pub version: String,
106}
107
108impl CompositionGraph {
109 pub fn new(description: impl Into<String>, root: CompositionNode) -> Self {
110 Self {
111 description: description.into(),
112 root,
113 version: "0.1.0".into(),
114 }
115 }
116}
117
118pub fn collect_stage_ids(node: &CompositionNode) -> Vec<&StageId> {
120 let mut ids = Vec::new();
121 collect_ids_recursive(node, &mut ids);
122 ids
123}
124
125fn collect_ids_recursive<'a>(node: &'a CompositionNode, ids: &mut Vec<&'a StageId>) {
126 match node {
127 CompositionNode::Stage { id, .. } => ids.push(id),
128 CompositionNode::RemoteStage { .. } => {} CompositionNode::Const { .. } => {} CompositionNode::Sequential { stages } => {
131 for s in stages {
132 collect_ids_recursive(s, ids);
133 }
134 }
135 CompositionNode::Parallel { branches } => {
136 for b in branches.values() {
137 collect_ids_recursive(b, ids);
138 }
139 }
140 CompositionNode::Branch {
141 predicate,
142 if_true,
143 if_false,
144 } => {
145 collect_ids_recursive(predicate, ids);
146 collect_ids_recursive(if_true, ids);
147 collect_ids_recursive(if_false, ids);
148 }
149 CompositionNode::Fanout { source, targets } => {
150 collect_ids_recursive(source, ids);
151 for t in targets {
152 collect_ids_recursive(t, ids);
153 }
154 }
155 CompositionNode::Merge { sources, target } => {
156 for s in sources {
157 collect_ids_recursive(s, ids);
158 }
159 collect_ids_recursive(target, ids);
160 }
161 CompositionNode::Retry { stage, .. } => {
162 collect_ids_recursive(stage, ids);
163 }
164 CompositionNode::Let { bindings, body } => {
165 for b in bindings.values() {
166 collect_ids_recursive(b, ids);
167 }
168 collect_ids_recursive(body, ids);
169 }
170 }
171}
172
173#[cfg(test)]
174mod tests {
175 use super::*;
176 use serde_json::json;
177
178 fn stage(id: &str) -> CompositionNode {
179 CompositionNode::Stage {
180 id: StageId(id.into()),
181 config: None,
182 }
183 }
184
185 #[test]
186 fn serde_stage_round_trip() {
187 let node = stage("abc123");
188 let json = serde_json::to_string(&node).unwrap();
189 let parsed: CompositionNode = serde_json::from_str(&json).unwrap();
190 assert_eq!(node, parsed);
191 }
192
193 #[test]
194 fn serde_sequential() {
195 let node = CompositionNode::Sequential {
196 stages: vec![stage("a"), stage("b"), stage("c")],
197 };
198 let json = serde_json::to_string_pretty(&node).unwrap();
199 let parsed: CompositionNode = serde_json::from_str(&json).unwrap();
200 assert_eq!(node, parsed);
201 }
202
203 #[test]
204 fn serde_parallel() {
205 let mut branches = BTreeMap::new();
206 branches.insert("left".into(), stage("a"));
207 branches.insert("right".into(), stage("b"));
208 let node = CompositionNode::Parallel { branches };
209 let json = serde_json::to_string(&node).unwrap();
210 let parsed: CompositionNode = serde_json::from_str(&json).unwrap();
211 assert_eq!(node, parsed);
212 }
213
214 #[test]
215 fn serde_branch() {
216 let node = CompositionNode::Branch {
217 predicate: Box::new(stage("pred")),
218 if_true: Box::new(stage("yes")),
219 if_false: Box::new(stage("no")),
220 };
221 let json = serde_json::to_string(&node).unwrap();
222 let parsed: CompositionNode = serde_json::from_str(&json).unwrap();
223 assert_eq!(node, parsed);
224 }
225
226 #[test]
227 fn serde_retry() {
228 let node = CompositionNode::Retry {
229 stage: Box::new(stage("fallible")),
230 max_attempts: 3,
231 delay_ms: Some(500),
232 };
233 let json = serde_json::to_string(&node).unwrap();
234 let parsed: CompositionNode = serde_json::from_str(&json).unwrap();
235 assert_eq!(node, parsed);
236 }
237
238 #[test]
239 fn serde_full_graph() {
240 let graph = CompositionGraph::new(
241 "test pipeline",
242 CompositionNode::Sequential {
243 stages: vec![stage("parse"), stage("transform"), stage("output")],
244 },
245 );
246 let json = serde_json::to_string_pretty(&graph).unwrap();
247 let parsed: CompositionGraph = serde_json::from_str(&json).unwrap();
248 assert_eq!(graph, parsed);
249 }
250
251 #[test]
252 fn serde_nested_composition() {
253 let node = CompositionNode::Sequential {
254 stages: vec![
255 stage("input"),
256 CompositionNode::Retry {
257 stage: Box::new(CompositionNode::Sequential {
258 stages: vec![stage("a"), stage("b")],
259 }),
260 max_attempts: 2,
261 delay_ms: None,
262 },
263 stage("output"),
264 ],
265 };
266 let json = serde_json::to_string(&node).unwrap();
267 let parsed: CompositionNode = serde_json::from_str(&json).unwrap();
268 assert_eq!(node, parsed);
269 }
270
271 #[test]
272 fn collect_stage_ids_finds_all() {
273 let node = CompositionNode::Sequential {
274 stages: vec![
275 stage("a"),
276 CompositionNode::Parallel {
277 branches: BTreeMap::from([("x".into(), stage("b")), ("y".into(), stage("c"))]),
278 },
279 stage("d"),
280 ],
281 };
282 let ids = collect_stage_ids(&node);
283 assert_eq!(ids.len(), 4);
284 }
285
286 #[test]
287 fn json_format_is_tagged() {
288 let node = stage("abc123");
289 let v: serde_json::Value = serde_json::to_value(&node).unwrap();
290 assert_eq!(v["op"], json!("Stage"));
291 assert_eq!(v["id"], json!("abc123"));
292 }
293
294 #[test]
295 fn serde_remote_stage_round_trip() {
296 let node = CompositionNode::RemoteStage {
297 url: "http://localhost:8080".into(),
298 input: NType::record([("count", NType::Number)]),
299 output: NType::VNode,
300 };
301 let json = serde_json::to_string(&node).unwrap();
302 let parsed: CompositionNode = serde_json::from_str(&json).unwrap();
303 assert_eq!(node, parsed);
304 }
305
306 #[test]
307 fn remote_stage_json_shape() {
308 let node = CompositionNode::RemoteStage {
309 url: "http://api.example.com".into(),
310 input: NType::Text,
311 output: NType::Number,
312 };
313 let v: serde_json::Value = serde_json::to_value(&node).unwrap();
314 assert_eq!(v["op"], json!("RemoteStage"));
315 assert_eq!(v["url"], json!("http://api.example.com"));
316 assert!(v["input"].is_object());
317 assert!(v["output"].is_object());
318 }
319
320 #[test]
321 fn collect_stage_ids_skips_remote_stage() {
322 let node = CompositionNode::Sequential {
323 stages: vec![
324 stage("local-a"),
325 CompositionNode::RemoteStage {
326 url: "http://remote".into(),
327 input: NType::Text,
328 output: NType::Text,
329 },
330 stage("local-b"),
331 ],
332 };
333 let ids = collect_stage_ids(&node);
334 assert_eq!(ids.len(), 2);
336 assert_eq!(ids[0].0, "local-a");
337 assert_eq!(ids[1].0, "local-b");
338 }
339
340 #[test]
341 fn remote_stage_in_full_graph_serde() {
342 let graph = CompositionGraph::new(
343 "full-stack pipeline",
344 CompositionNode::Sequential {
345 stages: vec![
346 CompositionNode::RemoteStage {
347 url: "http://api:8080".into(),
348 input: NType::record([("query", NType::Text)]),
349 output: NType::List(Box::new(NType::Text)),
350 },
351 stage("render"),
352 ],
353 },
354 );
355 let json = serde_json::to_string_pretty(&graph).unwrap();
356 let parsed: CompositionGraph = serde_json::from_str(&json).unwrap();
357 assert_eq!(graph, parsed);
358 }
359}