1use super::{
6 definition::{Step, StepType, Workflow},
7 retry::RetryManager,
8 state::WorkflowState,
9 WorkflowStats,
10};
11use crate::error::CliError;
12
13type Result<T> = std::result::Result<T, CliError>;
14use serde::{Deserialize, Serialize};
15use std::collections::HashMap;
16use std::time::Instant;
17
18#[derive(Clone)]
20pub struct ExecutionContext {
21 workflow: Workflow,
23 variables: HashMap<String, serde_json::Value>,
25 completed: HashMap<String, StepResult>,
27 skipped: Vec<String>,
29 retries: usize,
31}
32
33impl ExecutionContext {
34 pub fn new(workflow: Workflow) -> Self {
36 let mut variables = HashMap::new();
38 for (key, value) in &workflow.variables {
39 let json_value = match value {
40 super::definition::Variable::String(s) => serde_json::Value::String(s.clone()),
41 super::definition::Variable::Number(n) => serde_json::json!(n),
42 super::definition::Variable::Boolean(b) => serde_json::Value::Bool(*b),
43 super::definition::Variable::Array(arr) => serde_json::Value::Array(arr.clone()),
44 super::definition::Variable::Object(obj) => {
45 serde_json::Value::Object(serde_json::Map::from_iter(obj.clone()))
46 }
47 };
48 variables.insert(key.clone(), json_value);
49 }
50
51 Self {
52 workflow,
53 variables,
54 completed: HashMap::new(),
55 skipped: Vec::new(),
56 retries: 0,
57 }
58 }
59
60 pub fn workflow(&self) -> &Workflow {
62 &self.workflow
63 }
64
65 pub fn get_variables(&self) -> HashMap<String, serde_json::Value> {
67 self.variables.clone()
68 }
69
70 pub fn set_variable(&mut self, name: String, value: serde_json::Value) {
72 self.variables.insert(name, value);
73 }
74
75 pub fn complete_step(&mut self, name: &str, result: StepResult) {
77 self.completed.insert(name.to_string(), result);
78 }
79
80 pub fn skip_step(&mut self, name: &str, reason: &str) {
82 self.skipped.push(name.to_string());
83 tracing::info!("Skipping step '{}': {}", name, reason);
84 }
85
86 pub fn completed_steps(&self) -> &HashMap<String, StepResult> {
88 &self.completed
89 }
90
91 pub fn skipped_steps(&self) -> &[String] {
93 &self.skipped
94 }
95
96 pub fn increment_retries(&mut self) {
98 self.retries += 1;
99 }
100
101 pub fn total_retries(&self) -> usize {
103 self.retries
104 }
105
106 pub fn resume_from_state(&mut self, state: WorkflowState) {
108 self.variables = state.variables;
109 self.completed = state.completed_steps;
110 self.skipped = state.skipped_steps;
111 self.retries = state.total_retries;
112 }
113
114 pub fn get_state(&self) -> WorkflowState {
116 WorkflowState {
117 workflow_name: self.workflow.metadata.name.clone(),
118 state: super::state::ExecutionState::Running,
119 variables: self.variables.clone(),
120 completed_steps: self.completed.clone(),
121 skipped_steps: self.skipped.clone(),
122 current_step: None,
123 total_retries: self.retries,
124 last_updated: chrono::Utc::now(),
125 }
126 }
127}
128
129#[derive(Debug, Clone, Serialize, Deserialize)]
131pub struct StepResult {
132 pub step_name: String,
134 pub success: bool,
136 pub message: String,
138 pub output: HashMap<String, serde_json::Value>,
140 pub duration_ms: u64,
142 pub attempts: usize,
144}
145
146impl StepResult {
147 pub fn success(step_name: String, message: String, duration_ms: u64) -> Self {
149 Self {
150 step_name,
151 success: true,
152 message,
153 output: HashMap::new(),
154 duration_ms,
155 attempts: 1,
156 }
157 }
158
159 pub fn failure(step_name: String, message: String, duration_ms: u64) -> Self {
161 Self {
162 step_name,
163 success: false,
164 message,
165 output: HashMap::new(),
166 duration_ms,
167 attempts: 1,
168 }
169 }
170
171 pub fn with_output(mut self, key: String, value: serde_json::Value) -> Self {
173 self.output.insert(key, value);
174 self
175 }
176
177 pub fn with_attempts(mut self, attempts: usize) -> Self {
179 self.attempts = attempts;
180 self
181 }
182}
183
184pub struct StepExecutor {
186 retry_manager: RetryManager,
187}
188
189impl StepExecutor {
190 pub fn new() -> Self {
192 Self {
193 retry_manager: RetryManager::new(),
194 }
195 }
196
197 pub async fn execute_step(
199 &self,
200 step: &Step,
201 context: &mut ExecutionContext,
202 ) -> Result<StepResult> {
203 let start_time = Instant::now();
204
205 if let Some(ref for_each_var) = step.for_each {
207 return self.execute_for_each(step, for_each_var, context).await;
208 }
209
210 if let Some(ref retry_strategy) = step.retry {
212 let mut attempts = 0;
213 loop {
214 attempts += 1;
215 match self.execute_step_once(step, context).await {
216 Ok(result) => {
217 let duration = start_time.elapsed().as_millis() as u64;
218 context.complete_step(&step.name, result.clone().with_attempts(attempts));
219 return Ok(result.with_attempts(attempts));
220 }
221 Err(e) if attempts < retry_strategy.max_attempts => {
222 context.increment_retries();
223 let delay = self.retry_manager.calculate_delay(retry_strategy, attempts);
224 tokio::time::sleep(tokio::time::Duration::from_millis(delay)).await;
225 tracing::warn!(
226 "Step '{}' failed (attempt {}), retrying: {}",
227 step.name,
228 attempts,
229 e
230 );
231 continue;
232 }
233 Err(e) => {
234 let duration = start_time.elapsed().as_millis() as u64;
235 let result = StepResult::failure(
236 step.name.clone(),
237 format!("Error: {}", e),
238 duration,
239 )
240 .with_attempts(attempts);
241 context.complete_step(&step.name, result.clone());
242 return Ok(result);
243 }
244 }
245 }
246 } else {
247 let result = self.execute_step_once(step, context).await;
248 let duration = start_time.elapsed().as_millis() as u64;
249
250 match result {
251 Ok(mut result) => {
252 result.duration_ms = duration;
253 context.complete_step(&step.name, result.clone());
254 Ok(result)
255 }
256 Err(e) => {
257 let result =
258 StepResult::failure(step.name.clone(), format!("Error: {}", e), duration);
259 context.complete_step(&step.name, result.clone());
260 Ok(result)
261 }
262 }
263 }
264 }
265
266 async fn execute_step_once(
268 &self,
269 step: &Step,
270 context: &ExecutionContext,
271 ) -> Result<StepResult> {
272 let start_time = Instant::now();
273
274 let resolved_params =
276 self.resolve_parameters(&step.parameters, &context.get_variables())?;
277
278 let result = match step.step_type {
280 StepType::Synthesize => self.execute_synthesize(step, &resolved_params).await,
281 StepType::Validate => self.execute_validate(step, &resolved_params).await,
282 StepType::FileOp => self.execute_file_op(step, &resolved_params).await,
283 StepType::Command => self.execute_command(step, &resolved_params).await,
284 StepType::Script => self.execute_script(step, &resolved_params).await,
285 StepType::Branch => self.execute_branch(step, &resolved_params).await,
286 StepType::Loop => self.execute_loop(step, &resolved_params).await,
287 StepType::Workflow => self.execute_subworkflow(step, &resolved_params).await,
288 StepType::Wait => self.execute_wait(step, &resolved_params).await,
289 StepType::Notify => self.execute_notify(step, &resolved_params).await,
290 }?;
291
292 let duration = start_time.elapsed().as_millis() as u64;
293
294 Ok(StepResult::success(step.name.clone(), result, duration))
295 }
296
297 async fn execute_for_each(
299 &self,
300 step: &Step,
301 for_each_var: &str,
302 context: &mut ExecutionContext,
303 ) -> Result<StepResult> {
304 let variables = context.get_variables();
305
306 let var_name = for_each_var
308 .strip_prefix("${")
309 .and_then(|s| s.strip_suffix('}'))
310 .unwrap_or(for_each_var);
311
312 let items = variables
313 .get(var_name)
314 .and_then(|v| v.as_array())
315 .ok_or_else(|| {
316 CliError::Workflow(format!(
317 "For-each variable '{}' not found or not an array",
318 var_name
319 ))
320 })?;
321
322 let start_time = Instant::now();
323 let mut all_results = Vec::new();
324
325 for (idx, item) in items.iter().enumerate() {
326 let mut step_clone = step.clone();
328 step_clone.for_each = None;
329 step_clone.name = format!("{}[{}]", step.name, idx);
330
331 context.set_variable(format!("{}_item", var_name), item.clone());
333 context.set_variable(format!("{}_index", var_name), serde_json::json!(idx));
334
335 let result = self.execute_step_once(&step_clone, context).await?;
336 all_results.push(result);
337 }
338
339 let duration = start_time.elapsed().as_millis() as u64;
340 let success = all_results.iter().all(|r| r.success);
341
342 Ok(StepResult {
343 step_name: step.name.clone(),
344 success,
345 message: format!("Executed {} iterations", all_results.len()),
346 output: HashMap::new(),
347 duration_ms: duration,
348 attempts: 1,
349 })
350 }
351
352 fn resolve_parameters(
354 &self,
355 params: &HashMap<String, serde_json::Value>,
356 variables: &HashMap<String, serde_json::Value>,
357 ) -> Result<HashMap<String, serde_json::Value>> {
358 let mut resolved = HashMap::new();
359
360 for (key, value) in params {
361 let resolved_value = self.resolve_value(value, variables);
362 resolved.insert(key.clone(), resolved_value);
363 }
364
365 Ok(resolved)
366 }
367
368 fn resolve_value(
370 &self,
371 value: &serde_json::Value,
372 variables: &HashMap<String, serde_json::Value>,
373 ) -> serde_json::Value {
374 match value {
375 serde_json::Value::String(s) => {
376 if let Some(var_name) = s.strip_prefix("${").and_then(|s| s.strip_suffix('}')) {
377 variables
378 .get(var_name)
379 .cloned()
380 .unwrap_or(serde_json::Value::Null)
381 } else {
382 value.clone()
383 }
384 }
385 serde_json::Value::Array(arr) => serde_json::Value::Array(
386 arr.iter()
387 .map(|v| self.resolve_value(v, variables))
388 .collect(),
389 ),
390 serde_json::Value::Object(obj) => serde_json::Value::Object(
391 obj.iter()
392 .map(|(k, v)| (k.clone(), self.resolve_value(v, variables)))
393 .collect(),
394 ),
395 _ => value.clone(),
396 }
397 }
398
399 async fn execute_synthesize(
402 &self,
403 _step: &Step,
404 _params: &HashMap<String, serde_json::Value>,
405 ) -> Result<String> {
406 Ok("Synthesis completed".to_string())
407 }
408
409 async fn execute_validate(
410 &self,
411 _step: &Step,
412 _params: &HashMap<String, serde_json::Value>,
413 ) -> Result<String> {
414 Ok("Validation passed".to_string())
415 }
416
417 async fn execute_file_op(
418 &self,
419 _step: &Step,
420 _params: &HashMap<String, serde_json::Value>,
421 ) -> Result<String> {
422 Ok("File operation completed".to_string())
423 }
424
425 async fn execute_command(
426 &self,
427 _step: &Step,
428 _params: &HashMap<String, serde_json::Value>,
429 ) -> Result<String> {
430 Ok("Command executed".to_string())
431 }
432
433 async fn execute_script(
434 &self,
435 _step: &Step,
436 _params: &HashMap<String, serde_json::Value>,
437 ) -> Result<String> {
438 Ok("Script executed".to_string())
439 }
440
441 async fn execute_branch(
442 &self,
443 _step: &Step,
444 _params: &HashMap<String, serde_json::Value>,
445 ) -> Result<String> {
446 Ok("Branch evaluated".to_string())
447 }
448
449 async fn execute_loop(
450 &self,
451 _step: &Step,
452 _params: &HashMap<String, serde_json::Value>,
453 ) -> Result<String> {
454 Ok("Loop completed".to_string())
455 }
456
457 async fn execute_subworkflow(
458 &self,
459 _step: &Step,
460 _params: &HashMap<String, serde_json::Value>,
461 ) -> Result<String> {
462 Ok("Sub-workflow completed".to_string())
463 }
464
465 async fn execute_wait(
466 &self,
467 _step: &Step,
468 params: &HashMap<String, serde_json::Value>,
469 ) -> Result<String> {
470 if let Some(duration) = params.get("duration_ms") {
471 if let Some(ms) = duration.as_u64() {
472 tokio::time::sleep(tokio::time::Duration::from_millis(ms)).await;
473 }
474 }
475 Ok("Wait completed".to_string())
476 }
477
478 async fn execute_notify(
479 &self,
480 _step: &Step,
481 _params: &HashMap<String, serde_json::Value>,
482 ) -> Result<String> {
483 Ok("Notification sent".to_string())
484 }
485}
486
487impl Default for StepExecutor {
488 fn default() -> Self {
489 Self::new()
490 }
491}
492
493#[derive(Debug, Clone, Serialize, Deserialize)]
495pub struct ExecutionResult {
496 pub workflow_name: String,
498 pub success: bool,
500 pub message: String,
502 pub stats: WorkflowStats,
504}
505
506impl ExecutionResult {
507 pub fn success(workflow_name: String, message: String, stats: WorkflowStats) -> Self {
509 Self {
510 workflow_name,
511 success: true,
512 message,
513 stats,
514 }
515 }
516
517 pub fn failure(workflow_name: String, message: String, stats: WorkflowStats) -> Self {
519 Self {
520 workflow_name,
521 success: false,
522 message,
523 stats,
524 }
525 }
526}
527
528#[cfg(test)]
529mod tests {
530 use super::*;
531 use crate::workflow::definition::StepType;
532
533 #[test]
534 fn test_execution_context_creation() {
535 let workflow = Workflow::new("test", "1.0", "Test workflow");
536 let context = ExecutionContext::new(workflow);
537
538 assert_eq!(context.completed_steps().len(), 0);
539 assert_eq!(context.skipped_steps().len(), 0);
540 assert_eq!(context.total_retries(), 0);
541 }
542
543 #[test]
544 fn test_execution_context_variables() {
545 let mut workflow = Workflow::new("test", "1.0", "Test workflow");
546 workflow.add_variable(
547 "test_var".to_string(),
548 super::super::definition::Variable::String("test_value".to_string()),
549 );
550
551 let context = ExecutionContext::new(workflow);
552 let variables = context.get_variables();
553
554 assert_eq!(variables.len(), 1);
555 assert_eq!(
556 variables.get("test_var").unwrap().as_str().unwrap(),
557 "test_value"
558 );
559 }
560
561 #[test]
562 fn test_step_result_creation() {
563 let result = StepResult::success("step1".to_string(), "Success".to_string(), 100);
564
565 assert!(result.success);
566 assert_eq!(result.step_name, "step1");
567 assert_eq!(result.duration_ms, 100);
568 }
569
570 #[test]
571 fn test_step_result_with_output() {
572 let result = StepResult::success("step1".to_string(), "Success".to_string(), 100)
573 .with_output("key1".to_string(), serde_json::json!("value1"));
574
575 assert_eq!(result.output.len(), 1);
576 assert_eq!(
577 result.output.get("key1").unwrap().as_str().unwrap(),
578 "value1"
579 );
580 }
581
582 #[tokio::test]
583 async fn test_step_executor_creation() {
584 let executor = StepExecutor::new();
585 assert!(true);
587 }
588
589 #[test]
590 fn test_execution_result_success() {
591 let stats = WorkflowStats::new();
592 let result = ExecutionResult::success("test".to_string(), "Done".to_string(), stats);
593
594 assert!(result.success);
595 assert_eq!(result.workflow_name, "test");
596 }
597
598 #[test]
599 fn test_execution_result_failure() {
600 let stats = WorkflowStats::new();
601 let result = ExecutionResult::failure("test".to_string(), "Failed".to_string(), stats);
602
603 assert!(!result.success);
604 assert_eq!(result.message, "Failed");
605 }
606}