1use crate::handler_registry::HandlerRegistry;
6use crate::operations::DataOperations;
7use crate::traits::DataWriteOptions;
8use anyhow::{Context, Result};
9use serde::{Deserialize, Serialize};
10use std::fs;
11
12#[derive(Debug, Clone, Serialize, Deserialize)]
14pub struct WorkflowStep {
15 pub operation: String,
16 pub input: Option<String>,
17 pub output: Option<String>,
18 pub args: Option<serde_json::Value>,
19}
20
21#[derive(Debug, Clone, Serialize, Deserialize)]
23pub struct WorkflowConfig {
24 pub name: String,
25 pub description: Option<String>,
26 pub steps: Vec<WorkflowStep>,
27}
28
29pub struct WorkflowExecutor {
31 registry: HandlerRegistry,
32}
33
34impl WorkflowExecutor {
35 pub fn new() -> Self {
36 Self {
37 registry: HandlerRegistry::new(),
38 }
39 }
40
41 pub fn execute(&self, config_path: &str) -> Result<()> {
43 let config_str = fs::read_to_string(config_path)
44 .with_context(|| format!("Failed to read workflow config: {}", config_path))?;
45
46 let config: WorkflowConfig = toml::from_str(&config_str)
47 .or_else(|_| serde_json::from_str(&config_str))
48 .with_context(|| "Failed to parse workflow config. Expected TOML or JSON")?;
49
50 self.execute_config(&config)
51 }
52
53 pub fn execute_config(&self, config: &WorkflowConfig) -> Result<()> {
55 println!("Executing workflow: {}", config.name);
56
57 let mut current_data: Option<Vec<Vec<String>>> = None;
58
59 for (step_idx, step) in config.steps.iter().enumerate() {
60 println!("Step {}: {}", step_idx + 1, step.operation);
61
62 let input_data = if let Some(ref input) = step.input {
64 self.registry.read(input)?
65 } else if let Some(ref data) = current_data {
66 data.clone()
67 } else {
68 anyhow::bail!("No input data available for step {}", step_idx + 1);
69 };
70
71 let output_data =
73 self.execute_step(&step.operation, &input_data, step.args.as_ref())?;
74
75 if let Some(ref output) = step.output {
77 let mut options = DataWriteOptions::default();
78 let out = output.to_lowercase();
79 if out.ends_with(".parquet") || out.ends_with(".avro") {
80 options.include_headers = true;
81 }
82 self.registry.write(output, &output_data, options)?;
83 println!(" Output saved to: {}", output);
84 }
85
86 current_data = Some(output_data);
87 }
88
89 Ok(())
90 }
91
92 fn execute_step(
93 &self,
94 operation: &str,
95 data: &[Vec<String>],
96 args: Option<&serde_json::Value>,
97 ) -> Result<Vec<Vec<String>>> {
98 let mut result = data.to_vec();
99 let ops = DataOperations::new();
100
101 match operation {
102 "read" => Ok(data.to_vec()),
103
104 "filter" => {
105 if let Some(args) = args {
106 if let Some(column_idx) = args.get("column").and_then(|v| v.as_u64()) {
107 if let Some(where_clause) = args.get("where").and_then(|v| v.as_str()) {
108 result = ops.filter_rows(&result, column_idx as usize, where_clause, "")?;
109 }
110 }
111 }
112 Ok(result)
113 }
114
115 "sort" => {
116 if let Some(args) = args {
117 if let Some(column_idx) = args.get("column").and_then(|v| v.as_u64()) {
118 let ascending = args.get("ascending")
119 .and_then(|v| v.as_bool())
120 .unwrap_or(true);
121
122 use crate::operations::types::SortOrder;
123 let order = if ascending { SortOrder::Ascending } else { SortOrder::Descending };
124 ops.sort_by_column(&mut result, column_idx as usize, order)?;
125 }
126 }
127 Ok(result)
128 }
129
130 "transform" => {
131 if let Some(args) = args {
132 if let Some(op_type) = args.get("operation").and_then(|v| v.as_str()) {
133 match op_type {
134 "replace" => {
135 if let Some(find) = args.get("find").and_then(|v| v.as_str()) {
136 if let Some(replace) = args.get("replace").and_then(|v| v.as_str()) {
137 if let Some(column_idx) = args.get("column").and_then(|v| v.as_u64()) {
138 let _count = ops.replace(&mut result, column_idx as usize, find, replace);
139 println!(" Replaced '{}' with '{}' in column {}", find, replace, column_idx);
140 }
141 }
142 }
143 }
144 "dedupe" => {
145 let count = ops.deduplicate_mut(&mut result);
146 println!(" Removed {} duplicate rows", count);
147 }
148 "transpose" => {
149 result = ops.transpose(&result);
150 }
151 "fillna" => {
152 if let Some(value) = args.get("value").and_then(|v| v.as_str()) {
153 ops.fillna(&mut result, value);
154 }
155 }
156 "dropna" => {
157 result = ops.dropna(&result);
158 }
159 _ => anyhow::bail!("Unknown transform operation: {}", op_type),
160 }
161 }
162 }
163 Ok(result)
164 }
165
166 "mutate" => {
167 if let Some(args) = args {
168 if let Some(_column) = args.get("column").and_then(|v| v.as_str()) {
169 if let Some(_formula) = args.get("formula").and_then(|v| v.as_str()) {
170 for row in &mut result {
173 row.push("MUTATED".to_string());
174 }
175 }
176 }
177 }
178 Ok(result)
179 }
180
181 "select" => {
182 if let Some(args) = args {
183 if let Some(columns) = args.get("columns").and_then(|v| v.as_array()) {
184 let column_names: Vec<&str> = columns
185 .iter()
186 .filter_map(|v| v.as_str())
187 .collect();
188
189 result = ops.select_columns_by_name(&result, &column_names)?;
190 }
191 }
192 Ok(result)
193 }
194
195 "describe" => {
196 let desc = ops.describe(&result)?;
197 println!(" Statistics: {:?}", desc);
198 Ok(desc)
199 }
200
201 _ => anyhow::bail!("Unknown operation: {}", operation),
202 }
203 }
204}