atento_core/
chain.rs

1use crate::errors::{AtentoError, Result};
2use crate::executor::CommandExecutor;
3use crate::input::Input;
4use crate::interpreter::{Interpreter, default_interpreters};
5use crate::parameter::Parameter;
6use crate::result_ref::ResultRef;
7use crate::step::{Step, StepResult};
8use indexmap::IndexMap;
9use serde::{Deserialize, Serialize};
10use std::collections::{HashMap, HashSet};
11use std::time::Instant;
12
13const DEFAULT_CHAIN_TIMEOUT: u64 = 300;
14
15// Helper function to provide the custom default for serde
16fn default_chain_timeout() -> u64 {
17    DEFAULT_CHAIN_TIMEOUT
18}
19
20#[derive(Debug, Deserialize)]
21#[serde(from = "ChainHelper")]
22pub struct Chain {
23    pub name: Option<String>,
24    pub timeout: u64,
25    pub interpreters: HashMap<String, Interpreter>,
26    pub parameters: HashMap<String, Parameter>,
27    pub steps: IndexMap<String, Step>,
28    pub results: HashMap<String, ResultRef>,
29}
30
31// Helper struct for deserialization
32#[derive(Deserialize)]
33struct ChainHelper {
34    name: Option<String>,
35    #[serde(default = "default_chain_timeout")]
36    timeout: u64,
37    #[serde(default)]
38    interpreters: HashMap<String, Interpreter>,
39    #[serde(default)]
40    parameters: HashMap<String, Parameter>,
41    #[serde(default)]
42    steps: IndexMap<String, Step>,
43    #[serde(default)]
44    results: HashMap<String, ResultRef>,
45}
46
47impl From<ChainHelper> for Chain {
48    fn from(helper: ChainHelper) -> Self {
49        // Start with default interpreters
50        let mut interpreters: HashMap<String, Interpreter> =
51            default_interpreters().into_iter().collect();
52
53        // Override with user-provided interpreters
54        interpreters.extend(helper.interpreters);
55
56        Chain {
57            name: helper.name,
58            timeout: helper.timeout,
59            interpreters,
60            parameters: helper.parameters,
61            steps: helper.steps,
62            results: helper.results,
63        }
64    }
65}
66
67#[derive(Debug, Serialize)]
68pub struct ChainResult {
69    #[serde(skip_serializing_if = "Option::is_none")]
70    pub name: Option<String>,
71    pub duration_ms: u128,
72    #[serde(skip_serializing_if = "Option::is_none")]
73    pub parameters: Option<HashMap<String, String>>,
74    #[serde(skip_serializing_if = "Option::is_none")]
75    pub steps: Option<IndexMap<String, StepResult>>,
76    #[serde(skip_serializing_if = "Option::is_none")]
77    pub results: Option<HashMap<String, String>>,
78    #[serde(skip_serializing_if = "Vec::is_empty")]
79    pub errors: Vec<AtentoError>,
80    pub status: String,
81}
82
83impl Default for Chain {
84    fn default() -> Self {
85        Self {
86            name: None,
87            timeout: default_chain_timeout(),
88            parameters: HashMap::new(),
89            interpreters: HashMap::new(),
90            steps: IndexMap::new(),
91            results: HashMap::new(),
92        }
93    }
94}
95
96impl Chain {
97    fn make_output_key(step_key: &str, output_key: &str) -> String {
98        format!("steps.{step_key}.outputs.{output_key}")
99    }
100
101    /// Validates the chain structure.
102    ///
103    /// # Errors
104    /// Returns validation errors for unresolved references, forward references, or invalid patterns.
105    pub fn validate(&self) -> Result<()> {
106        let parameter_keys: HashSet<String> = self
107            .parameters
108            .keys()
109            .map(|k| format!("parameters.{k}"))
110            .collect();
111
112        let mut step_output_keys = HashSet::new();
113
114        for (step_key, step) in &self.steps {
115            for (input_key, input) in &step.inputs {
116                if let Input::Ref { ref_ } = input
117                    && !parameter_keys.contains(ref_)
118                    && !step_output_keys.contains(ref_)
119                {
120                    let forward_decl = self
121                        .steps
122                        .keys()
123                        .skip_while(|k| *k != step_key)
124                        .skip(1)
125                        .any(|k| {
126                            self.steps[k]
127                                .outputs
128                                .keys()
129                                .any(|out_name| Self::make_output_key(k, out_name) == *ref_)
130                        });
131
132                    if forward_decl {
133                        return Err(AtentoError::Validation(format!(
134                            "Input '{input_key}' in step '{step_key}' references '{ref_}', which is a future step output"
135                        )));
136                    }
137
138                    return Err(AtentoError::UnresolvedReference {
139                        reference: ref_.clone(),
140                        context: format!("step '{step_key}'"),
141                    });
142                }
143            }
144
145            step.validate(step_key)?;
146
147            for (out_key, out) in &step.outputs {
148                if out.pattern.is_empty() {
149                    return Err(AtentoError::Validation(format!(
150                        "Output '{out_key}' in step '{step_key}' has empty capture pattern"
151                    )));
152                }
153
154                step_output_keys.insert(Self::make_output_key(step_key, out_key));
155            }
156        }
157
158        for (result_key, result) in &self.results {
159            if !step_output_keys.contains(&result.ref_) {
160                return Err(AtentoError::UnresolvedReference {
161                    reference: result.ref_.clone(),
162                    context: format!("chain result '{result_key}'"),
163                });
164            }
165        }
166
167        Ok(())
168    }
169
170    fn resolve_input(
171        &self,
172        input_name: &str,
173        input: &Input,
174        step_name: &str,
175        resolved_outputs: &HashMap<String, String>,
176    ) -> Result<String> {
177        match input {
178            Input::Inline { .. } => input.to_string_value().map_err(|e| {
179                AtentoError::Execution(format!("Input '{input_name}' in step '{step_name}': {e}"))
180            }),
181
182            Input::Ref { ref_ } => {
183                let param_key = ref_.strip_prefix("parameters.").unwrap_or(ref_);
184
185                if let Some(param) = self.parameters.get(param_key) {
186                    param.to_string_value().map_err(|e| {
187                        AtentoError::Execution(format!(
188                            "Parameter '{input_name}' in step '{step_name}': {e}"
189                        ))
190                    })
191                } else if let Some(output) = resolved_outputs.get(ref_) {
192                    Ok(output.clone())
193                } else {
194                    Err(AtentoError::UnresolvedReference {
195                        reference: ref_.clone(),
196                        context: format!("step '{step_name}'"),
197                    })
198                }
199            }
200        }
201    }
202
203    fn check_timeout(&self, start_time: &Instant, step_name: &str) -> Result<u64> {
204        if self.timeout == 0 {
205            return Ok(0);
206        }
207
208        let elapsed = start_time.elapsed().as_secs();
209        if elapsed >= self.timeout {
210            return Err(AtentoError::Timeout {
211                context: format!("Chain timed out before step '{step_name}'"),
212                timeout_secs: self.timeout,
213            });
214        }
215
216        Ok(self.timeout.saturating_sub(elapsed))
217    }
218
219    fn resolve_step_inputs(
220        &self,
221        step: &Step,
222        step_name: &str,
223        resolved_outputs: &HashMap<String, String>,
224    ) -> Result<HashMap<String, String>> {
225        step.inputs
226            .iter()
227            .map(|(input_name, input)| {
228                self.resolve_input(input_name, input, step_name, resolved_outputs)
229                    .map(|val| (input_name.clone(), val))
230            })
231            .collect()
232    }
233
234    fn lookup_interpreter(&self, step: &Step, step_name: &str) -> Result<&Interpreter> {
235        self.interpreters.get(&step.interpreter).ok_or_else(|| {
236            AtentoError::Validation(format!(
237                "Unknown interpreter '{}' in step '{}'",
238                step.interpreter, step_name
239            ))
240        })
241    }
242
243    fn process_step_result(
244        step_name: &str,
245        step_result: &StepResult,
246        resolved_outputs: &mut HashMap<String, String>,
247    ) -> Option<AtentoError> {
248        // Store step outputs
249        for (k, v) in &step_result.outputs {
250            resolved_outputs.insert(Self::make_output_key(step_name, k), v.clone());
251        }
252
253        // Check for step error
254        step_result
255            .error
256            .as_ref()
257            .map(|err| AtentoError::StepExecution {
258                step: step_name.to_string(),
259                reason: err.to_string(),
260            })
261    }
262
263    fn collect_chain_results(
264        &self,
265        resolved_outputs: &HashMap<String, String>,
266    ) -> (HashMap<String, String>, Vec<AtentoError>) {
267        let mut final_results = HashMap::new();
268        let mut errors = Vec::new();
269
270        for (result_name, result_ref) in &self.results {
271            if let Some(val) = resolved_outputs.get(&result_ref.ref_) {
272                final_results.insert(result_name.clone(), val.clone());
273            } else {
274                errors.push(AtentoError::UnresolvedReference {
275                    reference: result_ref.ref_.clone(),
276                    context: format!("Unresolved Reference '{result_name}'"),
277                });
278            }
279        }
280
281        (final_results, errors)
282    }
283
284    fn serialize_parameters(&self) -> (Option<HashMap<String, String>>, Vec<AtentoError>) {
285        if self.parameters.is_empty() {
286            return (None, Vec::new());
287        }
288
289        match self
290            .parameters
291            .iter()
292            .map(|(k, v)| v.to_string_value().map(|s| (k.clone(), s)))
293            .collect::<Result<HashMap<_, _>>>()
294        {
295            Ok(params) => (Some(params), Vec::new()),
296            Err(e) => (None, vec![e]),
297        }
298    }
299
300    /// Executes the chain with a custom executor (useful for testing).
301    ///
302    /// # Errors
303    /// Returns an error if timeout is exceeded, a step fails, or output resolution fails.
304    pub fn run_with_executor<E: CommandExecutor>(&self, executor: &E) -> ChainResult {
305        let start_time = Instant::now();
306        let mut resolved_outputs = HashMap::new();
307        let mut step_results = IndexMap::new();
308        let mut chain_errors = Vec::new();
309
310        for (step_name, step) in &self.steps {
311            // Check timeout
312            let time_left = match self.check_timeout(&start_time, step_name) {
313                Ok(time) => time,
314                Err(e) => {
315                    chain_errors.push(e);
316                    break;
317                }
318            };
319
320            // Resolve step inputs
321            let step_inputs = match self.resolve_step_inputs(step, step_name, &resolved_outputs) {
322                Ok(inputs) => inputs,
323                Err(e) => {
324                    chain_errors.push(e);
325                    break;
326                }
327            };
328
329            // Lookup interpreter
330            let interpreter = match self.lookup_interpreter(step, step_name) {
331                Ok(interp) => interp,
332                Err(e) => {
333                    chain_errors.push(e);
334                    break;
335                }
336            };
337
338            // Run step
339            let step_result = step.run(executor, &step_inputs, time_left, interpreter);
340
341            // Process result and check for errors
342            if let Some(err) =
343                Self::process_step_result(step_name, &step_result, &mut resolved_outputs)
344            {
345                chain_errors.push(err);
346                step_results.insert(step_name.clone(), step_result);
347                break;
348            }
349
350            step_results.insert(step_name.clone(), step_result);
351        }
352
353        // Collect chain results and parameters
354        let (final_results, mut result_errors) = self.collect_chain_results(&resolved_outputs);
355        chain_errors.append(&mut result_errors);
356
357        let (parameters, mut param_errors) = self.serialize_parameters();
358        chain_errors.append(&mut param_errors);
359
360        let status = if chain_errors.is_empty() { "ok" } else { "nok" }.to_string();
361
362        ChainResult {
363            name: self.name.clone(),
364            duration_ms: start_time.elapsed().as_millis(),
365            parameters,
366            steps: if step_results.is_empty() {
367                None
368            } else {
369                Some(step_results)
370            },
371            results: if final_results.is_empty() {
372                None
373            } else {
374                Some(final_results)
375            },
376            errors: chain_errors,
377            status,
378        }
379    }
380
381    /// Executes the chain using the system executor.
382    ///
383    /// # Errors
384    /// Returns an error if timeout is exceeded, a step fails, or output resolution fails.
385    #[must_use]
386    pub fn run(&self) -> ChainResult {
387        use crate::executor::SystemExecutor;
388        let executor = SystemExecutor;
389        self.run_with_executor(&executor)
390    }
391}