1use noether_core::stage::StageId;
2use noether_core::types::NType;
3use serde::{Deserialize, Serialize};
4use std::collections::BTreeMap;
5
6#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
14#[serde(rename_all = "lowercase")]
15pub enum Pinning {
16 #[default]
21 Signature,
22 Both,
26}
27
28impl Pinning {
29 pub fn is_signature(&self) -> bool {
32 matches!(self, Pinning::Signature)
33 }
34}
35
36#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
38#[serde(tag = "op")]
39pub enum CompositionNode {
40 Stage {
69 id: StageId,
70 #[serde(default, skip_serializing_if = "Pinning::is_signature")]
71 pinning: Pinning,
72 #[serde(default, skip_serializing_if = "Option::is_none")]
73 config: Option<BTreeMap<String, serde_json::Value>>,
74 },
75
76 RemoteStage {
83 url: String,
85 input: NType,
87 output: NType,
89 },
90
91 Const { value: serde_json::Value },
94
95 Sequential { stages: Vec<CompositionNode> },
97
98 Parallel {
103 branches: BTreeMap<String, CompositionNode>,
104 },
105
106 Branch {
108 predicate: Box<CompositionNode>,
109 if_true: Box<CompositionNode>,
110 if_false: Box<CompositionNode>,
111 },
112
113 Fanout {
115 source: Box<CompositionNode>,
116 targets: Vec<CompositionNode>,
117 },
118
119 Merge {
121 sources: Vec<CompositionNode>,
122 target: Box<CompositionNode>,
123 },
124
125 Retry {
127 stage: Box<CompositionNode>,
128 max_attempts: u32,
129 delay_ms: Option<u64>,
130 },
131
132 Let {
150 bindings: BTreeMap<String, CompositionNode>,
151 body: Box<CompositionNode>,
152 },
153}
154
155impl CompositionNode {
156 pub fn stage(id: impl Into<String>) -> Self {
160 Self::Stage {
161 id: StageId(id.into()),
162 pinning: Pinning::Signature,
163 config: None,
164 }
165 }
166
167 pub fn stage_pinned(id: impl Into<String>) -> Self {
170 Self::Stage {
171 id: StageId(id.into()),
172 pinning: Pinning::Both,
173 config: None,
174 }
175 }
176}
177
178pub fn resolve_stage_ref<'a, S>(
209 id: &StageId,
210 pinning: Pinning,
211 store: &'a S,
212) -> Option<&'a noether_core::stage::Stage>
213where
214 S: noether_store::StageStore + ?Sized,
215{
216 use noether_core::stage::{SignatureId, StageLifecycle};
217 match pinning {
218 Pinning::Signature => {
219 let sig = SignatureId(id.0.clone());
220 if let Some(stage) = store.get_by_signature(&sig) {
221 return Some(stage);
222 }
223 match store.get(id).ok().flatten() {
227 Some(s) if matches!(s.lifecycle, StageLifecycle::Active) => Some(s),
228 _ => None,
229 }
230 }
231 Pinning::Both => store.get(id).ok().flatten(),
232 }
233}
234
235#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
237pub struct CompositionGraph {
238 pub description: String,
239 pub root: CompositionNode,
240 pub version: String,
241}
242
243impl CompositionGraph {
244 pub fn new(description: impl Into<String>, root: CompositionNode) -> Self {
245 Self {
246 description: description.into(),
247 root,
248 version: "0.1.0".into(),
249 }
250 }
251}
252
253pub fn collect_stage_ids(node: &CompositionNode) -> Vec<&StageId> {
255 let mut ids = Vec::new();
256 collect_ids_recursive(node, &mut ids);
257 ids
258}
259
260fn collect_ids_recursive<'a>(node: &'a CompositionNode, ids: &mut Vec<&'a StageId>) {
261 match node {
262 CompositionNode::Stage { id, .. } => ids.push(id),
263 CompositionNode::RemoteStage { .. } => {} CompositionNode::Const { .. } => {} CompositionNode::Sequential { stages } => {
266 for s in stages {
267 collect_ids_recursive(s, ids);
268 }
269 }
270 CompositionNode::Parallel { branches } => {
271 for b in branches.values() {
272 collect_ids_recursive(b, ids);
273 }
274 }
275 CompositionNode::Branch {
276 predicate,
277 if_true,
278 if_false,
279 } => {
280 collect_ids_recursive(predicate, ids);
281 collect_ids_recursive(if_true, ids);
282 collect_ids_recursive(if_false, ids);
283 }
284 CompositionNode::Fanout { source, targets } => {
285 collect_ids_recursive(source, ids);
286 for t in targets {
287 collect_ids_recursive(t, ids);
288 }
289 }
290 CompositionNode::Merge { sources, target } => {
291 for s in sources {
292 collect_ids_recursive(s, ids);
293 }
294 collect_ids_recursive(target, ids);
295 }
296 CompositionNode::Retry { stage, .. } => {
297 collect_ids_recursive(stage, ids);
298 }
299 CompositionNode::Let { bindings, body } => {
300 for b in bindings.values() {
301 collect_ids_recursive(b, ids);
302 }
303 collect_ids_recursive(body, ids);
304 }
305 }
306}
307
308#[cfg(test)]
309mod tests {
310 use super::*;
311 use serde_json::json;
312
313 fn stage(id: &str) -> CompositionNode {
314 CompositionNode::Stage {
315 id: StageId(id.into()),
316 pinning: Pinning::Signature,
317 config: None,
318 }
319 }
320
321 #[test]
322 fn serde_stage_round_trip() {
323 let node = stage("abc123");
324 let json = serde_json::to_string(&node).unwrap();
325 let parsed: CompositionNode = serde_json::from_str(&json).unwrap();
326 assert_eq!(node, parsed);
327 }
328
329 #[test]
330 fn serde_sequential() {
331 let node = CompositionNode::Sequential {
332 stages: vec![stage("a"), stage("b"), stage("c")],
333 };
334 let json = serde_json::to_string_pretty(&node).unwrap();
335 let parsed: CompositionNode = serde_json::from_str(&json).unwrap();
336 assert_eq!(node, parsed);
337 }
338
339 #[test]
340 fn serde_parallel() {
341 let mut branches = BTreeMap::new();
342 branches.insert("left".into(), stage("a"));
343 branches.insert("right".into(), stage("b"));
344 let node = CompositionNode::Parallel { branches };
345 let json = serde_json::to_string(&node).unwrap();
346 let parsed: CompositionNode = serde_json::from_str(&json).unwrap();
347 assert_eq!(node, parsed);
348 }
349
350 #[test]
351 fn serde_branch() {
352 let node = CompositionNode::Branch {
353 predicate: Box::new(stage("pred")),
354 if_true: Box::new(stage("yes")),
355 if_false: Box::new(stage("no")),
356 };
357 let json = serde_json::to_string(&node).unwrap();
358 let parsed: CompositionNode = serde_json::from_str(&json).unwrap();
359 assert_eq!(node, parsed);
360 }
361
362 #[test]
363 fn serde_retry() {
364 let node = CompositionNode::Retry {
365 stage: Box::new(stage("fallible")),
366 max_attempts: 3,
367 delay_ms: Some(500),
368 };
369 let json = serde_json::to_string(&node).unwrap();
370 let parsed: CompositionNode = serde_json::from_str(&json).unwrap();
371 assert_eq!(node, parsed);
372 }
373
374 #[test]
375 fn serde_full_graph() {
376 let graph = CompositionGraph::new(
377 "test pipeline",
378 CompositionNode::Sequential {
379 stages: vec![stage("parse"), stage("transform"), stage("output")],
380 },
381 );
382 let json = serde_json::to_string_pretty(&graph).unwrap();
383 let parsed: CompositionGraph = serde_json::from_str(&json).unwrap();
384 assert_eq!(graph, parsed);
385 }
386
387 #[test]
388 fn serde_nested_composition() {
389 let node = CompositionNode::Sequential {
390 stages: vec![
391 stage("input"),
392 CompositionNode::Retry {
393 stage: Box::new(CompositionNode::Sequential {
394 stages: vec![stage("a"), stage("b")],
395 }),
396 max_attempts: 2,
397 delay_ms: None,
398 },
399 stage("output"),
400 ],
401 };
402 let json = serde_json::to_string(&node).unwrap();
403 let parsed: CompositionNode = serde_json::from_str(&json).unwrap();
404 assert_eq!(node, parsed);
405 }
406
407 #[test]
408 fn collect_stage_ids_finds_all() {
409 let node = CompositionNode::Sequential {
410 stages: vec![
411 stage("a"),
412 CompositionNode::Parallel {
413 branches: BTreeMap::from([("x".into(), stage("b")), ("y".into(), stage("c"))]),
414 },
415 stage("d"),
416 ],
417 };
418 let ids = collect_stage_ids(&node);
419 assert_eq!(ids.len(), 4);
420 }
421
422 #[test]
423 fn json_format_is_tagged() {
424 let node = stage("abc123");
425 let v: serde_json::Value = serde_json::to_value(&node).unwrap();
426 assert_eq!(v["op"], json!("Stage"));
427 assert_eq!(v["id"], json!("abc123"));
428 }
429
430 #[test]
431 fn default_pinning_omitted_from_json() {
432 let node = stage("abc123");
436 let v: serde_json::Value = serde_json::to_value(&node).unwrap();
437 assert!(
438 v.get("pinning").is_none(),
439 "default Signature pinning should be omitted from JSON, got: {v}"
440 );
441 }
442
443 #[test]
444 fn both_pinning_serialises_explicitly() {
445 let node = CompositionNode::stage_pinned("impl_abc");
446 let v: serde_json::Value = serde_json::to_value(&node).unwrap();
447 assert_eq!(v["pinning"], json!("both"));
448 }
449
450 #[test]
451 fn legacy_graph_without_pinning_deserialises() {
452 let legacy = json!({
456 "op": "Stage",
457 "id": "legacy_hash",
458 });
459 let parsed: CompositionNode = serde_json::from_value(legacy).unwrap();
460 match parsed {
461 CompositionNode::Stage { id, pinning, .. } => {
462 assert_eq!(id.0, "legacy_hash");
463 assert_eq!(pinning, Pinning::Signature);
464 }
465 _ => panic!("expected Stage variant"),
466 }
467 }
468
469 #[test]
470 fn explicit_both_pinning_deserialises() {
471 let pinned = json!({
472 "op": "Stage",
473 "id": "impl_xyz",
474 "pinning": "both",
475 });
476 let parsed: CompositionNode = serde_json::from_value(pinned).unwrap();
477 match parsed {
478 CompositionNode::Stage { pinning, .. } => {
479 assert_eq!(pinning, Pinning::Both);
480 }
481 _ => panic!("expected Stage variant"),
482 }
483 }
484
485 #[test]
486 fn serde_remote_stage_round_trip() {
487 let node = CompositionNode::RemoteStage {
488 url: "http://localhost:8080".into(),
489 input: NType::record([("count", NType::Number)]),
490 output: NType::VNode,
491 };
492 let json = serde_json::to_string(&node).unwrap();
493 let parsed: CompositionNode = serde_json::from_str(&json).unwrap();
494 assert_eq!(node, parsed);
495 }
496
497 #[test]
498 fn remote_stage_json_shape() {
499 let node = CompositionNode::RemoteStage {
500 url: "http://api.example.com".into(),
501 input: NType::Text,
502 output: NType::Number,
503 };
504 let v: serde_json::Value = serde_json::to_value(&node).unwrap();
505 assert_eq!(v["op"], json!("RemoteStage"));
506 assert_eq!(v["url"], json!("http://api.example.com"));
507 assert!(v["input"].is_object());
508 assert!(v["output"].is_object());
509 }
510
511 #[test]
512 fn collect_stage_ids_skips_remote_stage() {
513 let node = CompositionNode::Sequential {
514 stages: vec![
515 stage("local-a"),
516 CompositionNode::RemoteStage {
517 url: "http://remote".into(),
518 input: NType::Text,
519 output: NType::Text,
520 },
521 stage("local-b"),
522 ],
523 };
524 let ids = collect_stage_ids(&node);
525 assert_eq!(ids.len(), 2);
527 assert_eq!(ids[0].0, "local-a");
528 assert_eq!(ids[1].0, "local-b");
529 }
530
531 #[test]
532 fn remote_stage_in_full_graph_serde() {
533 let graph = CompositionGraph::new(
534 "full-stack pipeline",
535 CompositionNode::Sequential {
536 stages: vec![
537 CompositionNode::RemoteStage {
538 url: "http://api:8080".into(),
539 input: NType::record([("query", NType::Text)]),
540 output: NType::List(Box::new(NType::Text)),
541 },
542 stage("render"),
543 ],
544 },
545 );
546 let json = serde_json::to_string_pretty(&graph).unwrap();
547 let parsed: CompositionGraph = serde_json::from_str(&json).unwrap();
548 assert_eq!(graph, parsed);
549 }
550}