1use crate::errors::{AtentoError, Result};
2use crate::executor::CommandExecutor;
3use crate::input::Input;
4use crate::interpreter::{Interpreter, default_interpreters};
5use crate::parameter::Parameter;
6use crate::result_ref::ResultRef;
7use crate::step::{Step, StepResult};
8use indexmap::IndexMap;
9use serde::{Deserialize, Serialize};
10use std::collections::{HashMap, HashSet};
11use std::time::Instant;
12
13const DEFAULT_CHAIN_TIMEOUT: u64 = 300;
14
15fn default_chain_timeout() -> u64 {
17 DEFAULT_CHAIN_TIMEOUT
18}
19
20#[derive(Debug, Deserialize)]
21#[serde(from = "ChainHelper")]
22pub struct Chain {
23 pub name: Option<String>,
24 pub timeout: u64,
25 pub interpreters: HashMap<String, Interpreter>,
26 pub parameters: HashMap<String, Parameter>,
27 pub steps: IndexMap<String, Step>,
28 pub results: HashMap<String, ResultRef>,
29}
30
31#[derive(Deserialize)]
33struct ChainHelper {
34 name: Option<String>,
35 #[serde(default = "default_chain_timeout")]
36 timeout: u64,
37 #[serde(default)]
38 interpreters: HashMap<String, Interpreter>,
39 #[serde(default)]
40 parameters: HashMap<String, Parameter>,
41 #[serde(default)]
42 steps: IndexMap<String, Step>,
43 #[serde(default)]
44 results: HashMap<String, ResultRef>,
45}
46
47impl From<ChainHelper> for Chain {
48 fn from(helper: ChainHelper) -> Self {
49 let mut interpreters: HashMap<String, Interpreter> =
51 default_interpreters().into_iter().collect();
52
53 interpreters.extend(helper.interpreters);
55
56 Chain {
57 name: helper.name,
58 timeout: helper.timeout,
59 interpreters,
60 parameters: helper.parameters,
61 steps: helper.steps,
62 results: helper.results,
63 }
64 }
65}
66
67#[derive(Debug, Serialize)]
68pub struct ChainResult {
69 #[serde(skip_serializing_if = "Option::is_none")]
70 pub name: Option<String>,
71 pub duration_ms: u128,
72 #[serde(skip_serializing_if = "Option::is_none")]
73 pub parameters: Option<HashMap<String, String>>,
74 #[serde(skip_serializing_if = "Option::is_none")]
75 pub steps: Option<IndexMap<String, StepResult>>,
76 #[serde(skip_serializing_if = "Option::is_none")]
77 pub results: Option<HashMap<String, String>>,
78 #[serde(skip_serializing_if = "Vec::is_empty")]
79 pub errors: Vec<AtentoError>,
80 pub status: String,
81}
82
83impl Default for Chain {
84 fn default() -> Self {
85 Self {
86 name: None,
87 timeout: default_chain_timeout(),
88 parameters: HashMap::new(),
89 interpreters: HashMap::new(),
90 steps: IndexMap::new(),
91 results: HashMap::new(),
92 }
93 }
94}
95
96impl Chain {
97 fn make_output_key(step_key: &str, output_key: &str) -> String {
98 format!("steps.{step_key}.outputs.{output_key}")
99 }
100
101 pub fn validate(&self) -> Result<()> {
106 let parameter_keys: HashSet<String> = self
107 .parameters
108 .keys()
109 .map(|k| format!("parameters.{k}"))
110 .collect();
111
112 let mut step_output_keys = HashSet::new();
113
114 for (step_key, step) in &self.steps {
115 for (input_key, input) in &step.inputs {
116 if let Input::Ref { ref_ } = input
117 && !parameter_keys.contains(ref_)
118 && !step_output_keys.contains(ref_)
119 {
120 let forward_decl = self
121 .steps
122 .keys()
123 .skip_while(|k| *k != step_key)
124 .skip(1)
125 .any(|k| {
126 self.steps[k]
127 .outputs
128 .keys()
129 .any(|out_name| Self::make_output_key(k, out_name) == *ref_)
130 });
131
132 if forward_decl {
133 return Err(AtentoError::Validation(format!(
134 "Input '{input_key}' in step '{step_key}' references '{ref_}', which is a future step output"
135 )));
136 }
137
138 return Err(AtentoError::UnresolvedReference {
139 reference: ref_.clone(),
140 context: format!("step '{step_key}'"),
141 });
142 }
143 }
144
145 step.validate(step_key)?;
146
147 for (out_key, out) in &step.outputs {
148 if out.pattern.is_empty() {
149 return Err(AtentoError::Validation(format!(
150 "Output '{out_key}' in step '{step_key}' has empty capture pattern"
151 )));
152 }
153
154 step_output_keys.insert(Self::make_output_key(step_key, out_key));
155 }
156 }
157
158 for (result_key, result) in &self.results {
159 if !step_output_keys.contains(&result.ref_) {
160 return Err(AtentoError::UnresolvedReference {
161 reference: result.ref_.clone(),
162 context: format!("chain result '{result_key}'"),
163 });
164 }
165 }
166
167 Ok(())
168 }
169
170 fn resolve_input(
171 &self,
172 input_name: &str,
173 input: &Input,
174 step_name: &str,
175 resolved_outputs: &HashMap<String, String>,
176 ) -> Result<String> {
177 match input {
178 Input::Inline { .. } => input.to_string_value().map_err(|e| {
179 AtentoError::Execution(format!("Input '{input_name}' in step '{step_name}': {e}"))
180 }),
181
182 Input::Ref { ref_ } => {
183 let param_key = ref_.strip_prefix("parameters.").unwrap_or(ref_);
184
185 if let Some(param) = self.parameters.get(param_key) {
186 param.to_string_value().map_err(|e| {
187 AtentoError::Execution(format!(
188 "Parameter '{input_name}' in step '{step_name}': {e}"
189 ))
190 })
191 } else if let Some(output) = resolved_outputs.get(ref_) {
192 Ok(output.clone())
193 } else {
194 Err(AtentoError::UnresolvedReference {
195 reference: ref_.clone(),
196 context: format!("step '{step_name}'"),
197 })
198 }
199 }
200 }
201 }
202
203 fn check_timeout(&self, start_time: &Instant, step_name: &str) -> Result<u64> {
204 if self.timeout == 0 {
205 return Ok(0);
206 }
207
208 let elapsed = start_time.elapsed().as_secs();
209 if elapsed >= self.timeout {
210 return Err(AtentoError::Timeout {
211 context: format!("Chain timed out before step '{step_name}'"),
212 timeout_secs: self.timeout,
213 });
214 }
215
216 Ok(self.timeout.saturating_sub(elapsed))
217 }
218
219 fn resolve_step_inputs(
220 &self,
221 step: &Step,
222 step_name: &str,
223 resolved_outputs: &HashMap<String, String>,
224 ) -> Result<HashMap<String, String>> {
225 step.inputs
226 .iter()
227 .map(|(input_name, input)| {
228 self.resolve_input(input_name, input, step_name, resolved_outputs)
229 .map(|val| (input_name.clone(), val))
230 })
231 .collect()
232 }
233
234 fn lookup_interpreter(&self, step: &Step, step_name: &str) -> Result<&Interpreter> {
235 self.interpreters.get(&step.interpreter).ok_or_else(|| {
236 AtentoError::Validation(format!(
237 "Unknown interpreter '{}' in step '{}'",
238 step.interpreter, step_name
239 ))
240 })
241 }
242
243 fn process_step_result(
244 step_name: &str,
245 step_result: &StepResult,
246 resolved_outputs: &mut HashMap<String, String>,
247 ) -> Option<AtentoError> {
248 for (k, v) in &step_result.outputs {
250 resolved_outputs.insert(Self::make_output_key(step_name, k), v.clone());
251 }
252
253 step_result
255 .error
256 .as_ref()
257 .map(|err| AtentoError::StepExecution {
258 step: step_name.to_string(),
259 reason: err.to_string(),
260 })
261 }
262
263 fn collect_chain_results(
264 &self,
265 resolved_outputs: &HashMap<String, String>,
266 ) -> (HashMap<String, String>, Vec<AtentoError>) {
267 let mut final_results = HashMap::new();
268 let mut errors = Vec::new();
269
270 for (result_name, result_ref) in &self.results {
271 if let Some(val) = resolved_outputs.get(&result_ref.ref_) {
272 final_results.insert(result_name.clone(), val.clone());
273 } else {
274 errors.push(AtentoError::UnresolvedReference {
275 reference: result_ref.ref_.clone(),
276 context: format!("Unresolved Reference '{result_name}'"),
277 });
278 }
279 }
280
281 (final_results, errors)
282 }
283
284 fn serialize_parameters(&self) -> (Option<HashMap<String, String>>, Vec<AtentoError>) {
285 if self.parameters.is_empty() {
286 return (None, Vec::new());
287 }
288
289 match self
290 .parameters
291 .iter()
292 .map(|(k, v)| v.to_string_value().map(|s| (k.clone(), s)))
293 .collect::<Result<HashMap<_, _>>>()
294 {
295 Ok(params) => (Some(params), Vec::new()),
296 Err(e) => (None, vec![e]),
297 }
298 }
299
300 pub fn run_with_executor<E: CommandExecutor>(&self, executor: &E) -> ChainResult {
305 let start_time = Instant::now();
306 let mut resolved_outputs = HashMap::new();
307 let mut step_results = IndexMap::new();
308 let mut chain_errors = Vec::new();
309
310 for (step_name, step) in &self.steps {
311 let time_left = match self.check_timeout(&start_time, step_name) {
313 Ok(time) => time,
314 Err(e) => {
315 chain_errors.push(e);
316 break;
317 }
318 };
319
320 let step_inputs = match self.resolve_step_inputs(step, step_name, &resolved_outputs) {
322 Ok(inputs) => inputs,
323 Err(e) => {
324 chain_errors.push(e);
325 break;
326 }
327 };
328
329 let interpreter = match self.lookup_interpreter(step, step_name) {
331 Ok(interp) => interp,
332 Err(e) => {
333 chain_errors.push(e);
334 break;
335 }
336 };
337
338 let step_result = step.run(executor, &step_inputs, time_left, interpreter);
340
341 if let Some(err) =
343 Self::process_step_result(step_name, &step_result, &mut resolved_outputs)
344 {
345 chain_errors.push(err);
346 step_results.insert(step_name.clone(), step_result);
347 break;
348 }
349
350 step_results.insert(step_name.clone(), step_result);
351 }
352
353 let (final_results, mut result_errors) = self.collect_chain_results(&resolved_outputs);
355 chain_errors.append(&mut result_errors);
356
357 let (parameters, mut param_errors) = self.serialize_parameters();
358 chain_errors.append(&mut param_errors);
359
360 let status = if chain_errors.is_empty() { "ok" } else { "nok" }.to_string();
361
362 ChainResult {
363 name: self.name.clone(),
364 duration_ms: start_time.elapsed().as_millis(),
365 parameters,
366 steps: if step_results.is_empty() {
367 None
368 } else {
369 Some(step_results)
370 },
371 results: if final_results.is_empty() {
372 None
373 } else {
374 Some(final_results)
375 },
376 errors: chain_errors,
377 status,
378 }
379 }
380
381 #[must_use]
386 pub fn run(&self) -> ChainResult {
387 use crate::executor::SystemExecutor;
388 let executor = SystemExecutor;
389 self.run_with_executor(&executor)
390 }
391}