Skip to main content

xls_rs/
workflow.rs

1//! Workflow orchestration
2//!
3//! Provides pipeline execution capabilities for chaining multiple operations.
4
5use crate::handler_registry::HandlerRegistry;
6use crate::operations::DataOperations;
7use crate::traits::DataWriteOptions;
8use anyhow::{Context, Result};
9use serde::{Deserialize, Serialize};
10use std::fs;
11
12/// Workflow step
13#[derive(Debug, Clone, Serialize, Deserialize)]
14pub struct WorkflowStep {
15    pub operation: String,
16    pub input: Option<String>,
17    pub output: Option<String>,
18    pub args: Option<serde_json::Value>,
19}
20
21/// Workflow configuration
22#[derive(Debug, Clone, Serialize, Deserialize)]
23pub struct WorkflowConfig {
24    pub name: String,
25    pub description: Option<String>,
26    pub steps: Vec<WorkflowStep>,
27}
28
29/// Workflow executor
30pub struct WorkflowExecutor {
31    registry: HandlerRegistry,
32}
33
34impl WorkflowExecutor {
35    pub fn new() -> Self {
36        Self {
37            registry: HandlerRegistry::new(),
38        }
39    }
40
41    /// Execute workflow from config file
42    pub fn execute(&self, config_path: &str) -> Result<()> {
43        let config_str = fs::read_to_string(config_path)
44            .with_context(|| format!("Failed to read workflow config: {}", config_path))?;
45
46        let config: WorkflowConfig = toml::from_str(&config_str)
47            .or_else(|_| serde_json::from_str(&config_str))
48            .with_context(|| "Failed to parse workflow config. Expected TOML or JSON")?;
49
50        self.execute_config(&config)
51    }
52
53    /// Execute workflow from an in-memory configuration (same semantics as [`Self::execute`]).
54    pub fn execute_config(&self, config: &WorkflowConfig) -> Result<()> {
55        println!("Executing workflow: {}", config.name);
56
57        let mut current_data: Option<Vec<Vec<String>>> = None;
58
59        for (step_idx, step) in config.steps.iter().enumerate() {
60            println!("Step {}: {}", step_idx + 1, step.operation);
61
62            // Get input data
63            let input_data = if let Some(ref input) = step.input {
64                self.registry.read(input)?
65            } else if let Some(ref data) = current_data {
66                data.clone()
67            } else {
68                anyhow::bail!("No input data available for step {}", step_idx + 1);
69            };
70
71            // Execute operation
72            let output_data =
73                self.execute_step(&step.operation, &input_data, step.args.as_ref())?;
74
75            // Save output if specified
76            if let Some(ref output) = step.output {
77                let mut options = DataWriteOptions::default();
78                let out = output.to_lowercase();
79                if out.ends_with(".parquet") || out.ends_with(".avro") {
80                    options.include_headers = true;
81                }
82                self.registry.write(output, &output_data, options)?;
83                println!("  Output saved to: {}", output);
84            }
85
86            current_data = Some(output_data);
87        }
88
89        Ok(())
90    }
91
92    fn execute_step(
93        &self,
94        operation: &str,
95        data: &[Vec<String>],
96        args: Option<&serde_json::Value>,
97    ) -> Result<Vec<Vec<String>>> {
98        let mut result = data.to_vec();
99        let ops = DataOperations::new();
100
101        match operation {
102            "read" => Ok(data.to_vec()),
103
104            "filter" => {
105                if let Some(args) = args {
106                    if let Some(column_idx) = args.get("column").and_then(|v| v.as_u64()) {
107                        if let Some(where_clause) = args.get("where").and_then(|v| v.as_str()) {
108                            result = ops.filter_rows(&result, column_idx as usize, where_clause, "")?;
109                        }
110                    }
111                }
112                Ok(result)
113            }
114
115            "sort" => {
116                if let Some(args) = args {
117                    if let Some(column_idx) = args.get("column").and_then(|v| v.as_u64()) {
118                        let ascending = args.get("ascending")
119                            .and_then(|v| v.as_bool())
120                            .unwrap_or(true);
121
122                        use crate::operations::types::SortOrder;
123                        let order = if ascending { SortOrder::Ascending } else { SortOrder::Descending };
124                        ops.sort_by_column(&mut result, column_idx as usize, order)?;
125                    }
126                }
127                Ok(result)
128            }
129
130            "transform" => {
131                if let Some(args) = args {
132                    if let Some(op_type) = args.get("operation").and_then(|v| v.as_str()) {
133                        match op_type {
134                            "replace" => {
135                                if let Some(find) = args.get("find").and_then(|v| v.as_str()) {
136                                    if let Some(replace) = args.get("replace").and_then(|v| v.as_str()) {
137                                        if let Some(column_idx) = args.get("column").and_then(|v| v.as_u64()) {
138                                            let _count = ops.replace(&mut result, column_idx as usize, find, replace);
139                                            println!("  Replaced '{}' with '{}' in column {}", find, replace, column_idx);
140                                        }
141                                    }
142                                }
143                            }
144                            "dedupe" => {
145                                let count = ops.deduplicate_mut(&mut result);
146                                println!("  Removed {} duplicate rows", count);
147                            }
148                            "transpose" => {
149                                result = ops.transpose(&result);
150                            }
151                            "fillna" => {
152                                if let Some(value) = args.get("value").and_then(|v| v.as_str()) {
153                                    ops.fillna(&mut result, value);
154                                }
155                            }
156                            "dropna" => {
157                                result = ops.dropna(&result);
158                            }
159                            _ => anyhow::bail!("Unknown transform operation: {}", op_type),
160                        }
161                    }
162                }
163                Ok(result)
164            }
165
166            "mutate" => {
167                if let Some(args) = args {
168                    if let Some(_column) = args.get("column").and_then(|v| v.as_str()) {
169                        if let Some(_formula) = args.get("formula").and_then(|v| v.as_str()) {
170                            // For now, just add a placeholder column
171                            // Full formula evaluation with mutate is complex
172                            for row in &mut result {
173                                row.push("MUTATED".to_string());
174                            }
175                        }
176                    }
177                }
178                Ok(result)
179            }
180
181            "select" => {
182                if let Some(args) = args {
183                    if let Some(columns) = args.get("columns").and_then(|v| v.as_array()) {
184                        let column_names: Vec<&str> = columns
185                            .iter()
186                            .filter_map(|v| v.as_str())
187                            .collect();
188
189                        result = ops.select_columns_by_name(&result, &column_names)?;
190                    }
191                }
192                Ok(result)
193            }
194
195            "describe" => {
196                let desc = ops.describe(&result)?;
197                println!("  Statistics: {:?}", desc);
198                Ok(desc)
199            }
200
201            _ => anyhow::bail!("Unknown operation: {}", operation),
202        }
203    }
204}