data_transform/repl/
mod.rs

1use colored::*;
2use polars::prelude::*;
3use rustyline::error::ReadlineError;
4use rustyline::DefaultEditor;
5use std::collections::HashMap;
6
7use crate::error::Result;
8use crate::executor::Executor;
9use crate::parser::{parse, ast::Statement};
10
11pub struct Repl {
12    editor: DefaultEditor,
13    executor: Executor,
14
15    // Current state
16    current: Option<DataFrame>,
17
18    // History for undo/redo
19    history: Vec<DataFrame>,
20    history_position: usize,
21    max_history: usize,
22
23    // Operation history (for .history command)
24    operation_log: Vec<String>,
25
26    // Variable snapshots: stores complete variable state at each history point
27    variable_snapshots: Vec<std::collections::HashMap<String, DataFrame>>,
28}
29
30impl Repl {
31    pub fn new() -> Result<Self> {
32        Ok(Self {
33            editor: DefaultEditor::new()
34                .map_err(|e| crate::error::DtransformError::ReadlineError(e.to_string()))?,
35            executor: Executor::new(),
36            current: None,
37            history: Vec::new(),
38            history_position: 0,
39            max_history: 10,
40            operation_log: Vec::new(),
41            variable_snapshots: Vec::new(),
42        })
43    }
44
45    pub fn run(&mut self) -> Result<()> {
46        println!("{}", "Data Transform REPL v0.1.2".bright_blue().bold());
47        println!("Type .help for help, .exit to quit");
48        println!(
49            "Use .undo/.redo to step through operations\n"
50        );
51
52        let mut accumulated_input = String::new();
53
54        loop {
55            let prompt = if accumulated_input.is_empty() {
56                ">> "
57            } else {
58                ".. "
59            };
60
61            let readline = self.editor.readline(prompt);
62            match readline {
63                Ok(line) => {
64                    // Check if line continues (ends with pipe)
65                    let trimmed = line.trim();
66
67                    if trimmed.is_empty() && accumulated_input.is_empty() {
68                        continue;
69                    }
70
71                    // Append to accumulated input
72                    if !accumulated_input.is_empty() {
73                        accumulated_input.push('\n');
74                    }
75                    accumulated_input.push_str(&line);
76
77                    // Check if we should continue reading (line ends with |)
78                    if trimmed.ends_with('|') {
79                        continue;
80                    }
81
82                    // We have a complete statement, process it
83                    let _ = self.editor.add_history_entry(accumulated_input.as_str());
84
85                    // Normalize multi-line input: replace newlines with spaces
86                    let normalized = accumulated_input.replace('\n', " ");
87
88                    if let Err(e) = self.handle_input(&normalized) {
89                        eprintln!("{}: {}", "Error".red().bold(), e.display_friendly());
90                    }
91
92                    // Reset for next statement
93                    accumulated_input.clear();
94                }
95                Err(ReadlineError::Interrupted) => {
96                    println!("^C");
97                    accumulated_input.clear();
98                    continue;
99                }
100                Err(ReadlineError::Eof) => {
101                    println!("Goodbye!");
102                    break;
103                }
104                Err(err) => {
105                    eprintln!("Error: {:?}", err);
106                    break;
107                }
108            }
109        }
110        Ok(())
111    }
112
113    fn handle_input(&mut self, input: &str) -> Result<()> {
114        // Handle special commands
115        if input.starts_with('.') {
116            return self.handle_command(input);
117        }
118
119        // Parse statement (could be assignment or pipeline)
120        let statement = parse(input)?;
121        let operation_desc = self.describe_statement(&statement);
122
123        match statement {
124            Statement::Assignment { name, pipeline } => {
125                // Execute pipeline
126                let result = self.executor.execute_pipeline(pipeline)?;
127
128                // Store in executor's variable map
129                self.executor.set_variable(name.clone(), result.clone());
130
131                // Also set as current for _
132                self.current = Some(result.clone());
133                self.save_to_history(Some(name.clone()));
134
135                self.operation_log.push(format!("{} = ...", name));
136
137                println!(
138                    "{}: {} ({} rows × {} cols)",
139                    "Stored".green(),
140                    name,
141                    result.height(),
142                    result.width()
143                );
144                self.preview_result(&result);
145            }
146            Statement::Pipeline(pipeline) => {
147                // If pipeline has no source, use current table
148                let has_source = pipeline.source.is_some();
149                let pipeline_to_execute = if !has_source {
150                    // Use current table as source
151                    if let Some(ref current_df) = self.current {
152                        // Create a temporary variable for the current table
153                        self.executor.set_variable("_".to_string(), current_df.clone());
154
155                        let mut modified_pipeline = pipeline;
156                        modified_pipeline.source = Some(crate::parser::ast::Source::Variable("_".to_string()));
157                        modified_pipeline
158                    } else {
159                        pipeline
160                    }
161                } else {
162                    pipeline
163                };
164
165                // Execute pipeline
166                let result = self.executor.execute_pipeline(pipeline_to_execute)?;
167
168                // Save to history for undo
169                self.current = Some(result.clone());
170                self.save_to_history(None);
171
172                self.operation_log.push(operation_desc);
173
174                // Preview
175                self.preview_result(&result);
176            }
177        }
178
179        Ok(())
180    }
181
182    fn describe_statement(&self, statement: &Statement) -> String {
183        match statement {
184            Statement::Assignment { name, .. } => format!("{} = ...", name),
185            Statement::Pipeline(pipeline) => {
186                if pipeline.operations.is_empty() {
187                    "read(...)".to_string()
188                } else {
189                    format!("{} operation(s)", pipeline.operations.len())
190                }
191            }
192        }
193    }
194
195    fn save_to_history(&mut self, _variable_name: Option<String>) {
196        if let Some(ref current) = self.current {
197            // Truncate future if we're in the middle of history
198            self.history.truncate(self.history_position);
199            self.variable_snapshots.truncate(self.history_position);
200
201            // Save current dataframe state
202            self.history.push(current.clone());
203
204            // Save complete variable snapshot
205            let snapshot = self.executor.get_all_variables();
206            self.variable_snapshots.push(snapshot);
207
208            // Limit history size
209            if self.history.len() > self.max_history {
210                self.history.remove(0);
211                self.variable_snapshots.remove(0);
212            } else {
213                self.history_position += 1;
214            }
215        }
216    }
217
218    fn handle_command(&mut self, cmd: &str) -> Result<()> {
219        let parts: Vec<&str> = cmd.split_whitespace().collect();
220
221        match parts[0] {
222            ".help" => self.show_help(),
223            ".exit" | ".quit" => std::process::exit(0),
224            ".schema" => self.show_schema()?,
225            ".undo" => {
226                let n = parts.get(1).and_then(|s| s.parse().ok()).unwrap_or(1);
227                self.undo(n)?;
228            }
229            ".redo" => {
230                let n = parts.get(1).and_then(|s| s.parse().ok()).unwrap_or(1);
231                self.redo(n)?;
232            }
233            ".history" => self.show_history(),
234            ".vars" | ".variables" => self.show_variables(),
235            ".clear" => self.clear(),
236            _ => println!("Unknown command: {}. Type .help for help.", parts[0]),
237        }
238        Ok(())
239    }
240
241    fn undo(&mut self, n: usize) -> Result<()> {
242        if self.history_position == 0 {
243            return Err(crate::error::DtransformError::InvalidOperation(
244                "No more history to undo".to_string(),
245            ));
246        }
247
248        let steps = n.min(self.history_position);
249        let new_position = self.history_position - steps;
250
251        self.history_position = new_position;
252
253        // Restore dataframe state
254        self.current = if self.history_position == 0 {
255            None
256        } else {
257            Some(self.history[self.history_position - 1].clone())
258        };
259
260        // Restore variable snapshot
261        if self.history_position > 0 {
262            let snapshot = self.variable_snapshots[self.history_position - 1].clone();
263            self.executor.restore_variables(snapshot);
264        } else {
265            // At position 0, clear all variables
266            self.executor.restore_variables(HashMap::new());
267        }
268
269        println!("{} {} step(s)", "Undid".yellow(), steps);
270
271        if let Some(ref df) = self.current {
272            self.preview_result(df);
273        }
274
275        Ok(())
276    }
277
278    fn redo(&mut self, n: usize) -> Result<()> {
279        if self.history_position >= self.history.len() {
280            return Err(crate::error::DtransformError::InvalidOperation(
281                "No more history to redo".to_string(),
282            ));
283        }
284
285        let steps = n.min(self.history.len() - self.history_position);
286        self.history_position += steps;
287
288        // Restore dataframe state
289        self.current = Some(self.history[self.history_position - 1].clone());
290
291        // Restore variable snapshot
292        let snapshot = self.variable_snapshots[self.history_position - 1].clone();
293        self.executor.restore_variables(snapshot);
294
295        println!("{} {} step(s)", "Redid".yellow(), steps);
296
297        if let Some(ref df) = self.current {
298            self.preview_result(df);
299        }
300
301        Ok(())
302    }
303
304    fn show_history(&self) {
305        println!("{}", "Operation History:".bright_blue());
306        for (i, op) in self.operation_log.iter().enumerate() {
307            let marker = if i == self.history_position - 1 {
308                " ← current"
309            } else {
310                ""
311            };
312            println!("  {}. {}{}", i + 1, op, marker.green());
313        }
314
315        if self.operation_log.is_empty() {
316            println!("  (no operations yet)");
317        }
318    }
319
320    fn show_variables(&self) {
321        println!("{}", "Stored Variables:".bright_blue());
322        let vars = self.executor.list_variables();
323
324        if vars.is_empty() {
325            println!("  (no variables stored)");
326        } else {
327            for name in vars {
328                if let Some(df) = self.executor.get_variable(&name) {
329                    println!(
330                        "  {} → {} rows × {} cols",
331                        name,
332                        df.height(),
333                        df.width()
334                    );
335                }
336            }
337        }
338    }
339
340    fn clear(&mut self) {
341        self.current = None;
342        self.history.clear();
343        self.history_position = 0;
344        self.operation_log.clear();
345        self.variable_snapshots.clear();
346        println!("{}", "Cleared current table and history".yellow());
347    }
348
349    fn show_help(&self) {
350        println!("{}", "Available commands:".bright_blue());
351        println!("  .help          - Show this help");
352        println!("  .exit          - Exit REPL");
353        println!("  .schema        - Show current table schema");
354        println!("  .undo [n]      - Undo last n operations (default: 1)");
355        println!("  .redo [n]      - Redo last n operations (default: 1)");
356        println!("  .history       - Show operation history");
357        println!("  .vars          - Show stored variables");
358        println!("  .clear         - Clear current table and history");
359        println!("\n{}", "Multi-line statements:".bright_blue());
360        println!("  Lines ending with | continue to the next line");
361        println!("  The prompt changes to .. for continuation");
362        println!("  Example:");
363        println!("    >> data = read('data.csv') |");
364        println!("    .. filter(price > 100) |");
365        println!("    .. select(product, quantity)");
366        println!("\n{}", "Example usage:".bright_blue());
367        println!("  data = read('data.csv')");
368        println!("  data | select($1, $2) | filter(age > 25)");
369        println!("  .undo 2");
370        println!("\n{}", "Quick reference:".bright_blue());
371        println!("  Pipe operations:        read('file.csv') | select($1,$2) | filter(age > 25)");
372        println!("  Rename columns:         rename(old_name -> new_name)");
373        println!("  Bulk rename:            rename_all(lowercase)");
374        println!("  Smart selection:        select(re('^Sales_'))  # regex");
375        println!("                          select(types(Number))  # by type");
376        println!("  String operations:      mutate(email = email.lower())");
377    }
378
379    fn show_schema(&self) -> Result<()> {
380        if let Some(ref df) = self.current {
381            println!("{}", "Schema:".bright_blue());
382            let schema = df.schema();
383
384            for (i, (name, field)) in schema.iter().enumerate() {
385                println!("  {}. {} ({})", i + 1, name, field);
386            }
387
388            println!(
389                "\n{} rows × {} columns",
390                df.height(),
391                df.width()
392            );
393        } else {
394            println!("No table loaded. Use read() to load data or a variable name.");
395        }
396        Ok(())
397    }
398
399    fn preview_result(&self, df: &DataFrame) {
400        let rows = df.height();
401        let cols = df.width();
402
403        println!(
404            "\n{}",
405            format!("[Table: {} rows × {} cols]", rows, cols).bright_green()
406        );
407
408        // Show first few rows
409        let preview = df.head(Some(5));
410        println!("{}", preview);
411
412        if rows > 5 {
413            println!("... {} more rows", rows - 5);
414        }
415        println!();
416    }
417}