Skip to main content

bnto_core/executor/
loop_config.rs

1// Loop container configuration — parsed from node params.
2// Controls error handling and output persistence behavior for loop nodes.
3
4use serde::Deserialize;
5
6/// How the loop handles child node failures.
7#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
8pub enum OnErrorStrategy {
9    /// Stop the loop on the first child failure (default).
10    #[default]
11    FailFast,
12    /// Record the failure, skip the iteration, continue to the next.
13    Continue,
14}
15
16/// When loop iteration outputs are written to disk.
17#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
18pub enum OutputPersistence {
19    /// Accumulate all outputs in memory, write after pipeline completes (default).
20    #[default]
21    Deferred,
22    /// Emit output files in events so consumers can write them immediately.
23    Progressive,
24}
25
26/// Loop-specific configuration parsed from a loop node's `params`.
27#[derive(Debug, Clone, PartialEq, Eq, Default)]
28pub struct LoopConfig {
29    pub on_error: OnErrorStrategy,
30    pub output_persistence: OutputPersistence,
31}
32
33/// Intermediate struct for serde deserialization of loop params.
34#[derive(Deserialize)]
35#[serde(rename_all = "camelCase")]
36struct RawLoopParams {
37    #[serde(default)]
38    on_error: Option<String>,
39    #[serde(default)]
40    output_persistence: Option<String>,
41}
42
43/// Parse `LoopConfig` from a node's params map.
44/// Unknown or missing values fall back to safe defaults.
45pub fn parse_loop_config(params: &serde_json::Map<String, serde_json::Value>) -> LoopConfig {
46    let raw: RawLoopParams = serde_json::from_value(serde_json::Value::Object(params.clone()))
47        .unwrap_or(RawLoopParams {
48            on_error: None,
49            output_persistence: None,
50        });
51
52    let on_error = match raw.on_error.as_deref() {
53        Some("continue") => OnErrorStrategy::Continue,
54        _ => OnErrorStrategy::FailFast,
55    };
56
57    let output_persistence = match raw.output_persistence.as_deref() {
58        Some("progressive") => OutputPersistence::Progressive,
59        _ => OutputPersistence::Deferred,
60    };
61
62    LoopConfig {
63        on_error,
64        output_persistence,
65    }
66}
67
68#[cfg(test)]
69mod tests {
70    use super::*;
71
72    #[test]
73    fn empty_params_yields_defaults() {
74        let params = serde_json::Map::new();
75        let config = parse_loop_config(&params);
76        assert_eq!(config.on_error, OnErrorStrategy::FailFast);
77        assert_eq!(config.output_persistence, OutputPersistence::Deferred);
78    }
79
80    #[test]
81    fn on_error_continue_parsed() {
82        let mut params = serde_json::Map::new();
83        params.insert("onError".into(), "continue".into());
84        let config = parse_loop_config(&params);
85        assert_eq!(config.on_error, OnErrorStrategy::Continue);
86    }
87
88    #[test]
89    fn on_error_fail_fast_parsed() {
90        let mut params = serde_json::Map::new();
91        params.insert("onError".into(), "failFast".into());
92        let config = parse_loop_config(&params);
93        assert_eq!(config.on_error, OnErrorStrategy::FailFast);
94    }
95
96    #[test]
97    fn output_persistence_progressive_parsed() {
98        let mut params = serde_json::Map::new();
99        params.insert("outputPersistence".into(), "progressive".into());
100        let config = parse_loop_config(&params);
101        assert_eq!(config.output_persistence, OutputPersistence::Progressive);
102    }
103
104    #[test]
105    fn output_persistence_deferred_parsed() {
106        let mut params = serde_json::Map::new();
107        params.insert("outputPersistence".into(), "deferred".into());
108        let config = parse_loop_config(&params);
109        assert_eq!(config.output_persistence, OutputPersistence::Deferred);
110    }
111
112    #[test]
113    fn unknown_on_error_defaults_to_fail_fast() {
114        let mut params = serde_json::Map::new();
115        params.insert("onError".into(), "garbage".into());
116        let config = parse_loop_config(&params);
117        assert_eq!(config.on_error, OnErrorStrategy::FailFast);
118    }
119
120    #[test]
121    fn unknown_output_persistence_defaults_to_deferred() {
122        let mut params = serde_json::Map::new();
123        params.insert("outputPersistence".into(), "garbage".into());
124        let config = parse_loop_config(&params);
125        assert_eq!(config.output_persistence, OutputPersistence::Deferred);
126    }
127
128    #[test]
129    fn extra_params_ignored() {
130        let mut params = serde_json::Map::new();
131        params.insert("onError".into(), "continue".into());
132        params.insert("mode".into(), "forEach".into());
133        params.insert("unrelated".into(), 42.into());
134        let config = parse_loop_config(&params);
135        assert_eq!(config.on_error, OnErrorStrategy::Continue);
136        assert_eq!(config.output_persistence, OutputPersistence::Deferred);
137    }
138
139    #[test]
140    fn both_fields_parsed_together() {
141        let mut params = serde_json::Map::new();
142        params.insert("onError".into(), "continue".into());
143        params.insert("outputPersistence".into(), "progressive".into());
144        let config = parse_loop_config(&params);
145        assert_eq!(config.on_error, OnErrorStrategy::Continue);
146        assert_eq!(config.output_persistence, OutputPersistence::Progressive);
147    }
148}