dsq_cli/
executor.rs

1//! Filter execution engine for dsq CLI
2//!
3//! This module provides the main execution logic for running jq-compatible
4//! filters on data files through the command-line interface.
5
6use std::borrow::Cow;
7use std::fmt::Write;
8use std::io::{self, Read};
9use std::path::Path;
10
11use polars::prelude::SerWriter;
12
13use crate::config::Config;
14use dsq_core::error::{Error, Result};
15use dsq_core::filter::{FilterCompiler, FilterExecutor as CoreFilterExecutor};
16use dsq_core::io::{read_file, write_file};
17use dsq_core::Value;
18
19/// Main executor for dsq operations
20pub struct Executor {
21    config: Config,
22    pub filter_executor: CoreFilterExecutor,
23}
24
25impl Executor {
26    /// Create a new executor with the given configuration
27    pub fn new(config: Config) -> Self {
28        let executor_config = config.to_executor_config();
29        let filter_executor = CoreFilterExecutor::with_config(executor_config);
30        Self {
31            config,
32            filter_executor,
33        }
34    }
35
36    /// Execute a filter on input data
37    pub async fn execute_filter(
38        &mut self,
39        filter: &str,
40        input_path: Option<&Path>,
41        output_path: Option<&Path>,
42    ) -> Result<()> {
43        // Read input data
44        let input_value = if let Some(path) = input_path {
45            self.read_input(path).await?
46        } else {
47            // Read from stdin
48            self.read_from_stdin().await?
49        };
50
51        self.execute_filter_on_value(filter, input_value, output_path)
52            .await
53    }
54
55    /// Execute a filter on a value directly
56    pub async fn execute_filter_on_value(
57        &mut self,
58        filter: &str,
59        input_value: Value,
60        output_path: Option<&Path>,
61    ) -> Result<()> {
62        // Execute the filter
63        #[cfg(feature = "profiling")]
64        coz::progress!("filter_execution");
65
66        let result = self.filter_executor.execute_str(filter, input_value)?;
67        let mut result_value = result.value;
68
69        #[cfg(feature = "profiling")]
70        coz::progress!("filter_complete");
71
72        // Apply limit if specified
73        if let Some(limit) = self.config.io.limit {
74            result_value = self.apply_limit(result_value, limit)?;
75        }
76
77        // Write output
78        if let Some(path) = output_path {
79            self.write_output(&result_value, path).await?;
80        } else {
81            // Write to stdout
82            self.write_to_stdout(&result_value)?;
83        }
84
85        #[cfg(feature = "profiling")]
86        coz::progress!("output_complete");
87
88        // Handle exit status
89        if self.config.display.exit_status {
90            let exit_code = match &result_value {
91                Value::Null => 1,
92                Value::Bool(false) => 1,
93                Value::Array(arr) if arr.is_empty() => 1,
94                Value::String(s) if s.is_empty() => 1,
95                _ => 0,
96            };
97            std::process::exit(exit_code);
98        }
99
100        // Print execution stats if requested
101        if self.config.debug.verbosity > 0 {
102            eprintln!(
103                "Execution time: {} ms",
104                result
105                    .stats
106                    .as_ref()
107                    .map(|s| s.execution_time.as_millis() as u64)
108                    .unwrap_or(0)
109            );
110            eprintln!(
111                "Operations: {}",
112                result
113                    .stats
114                    .as_ref()
115                    .map(|s| s.operations_executed)
116                    .unwrap_or(0)
117            );
118        }
119
120        Ok(())
121    }
122
123    /// Apply limit to a value
124    fn apply_limit(&self, value: Value, limit: usize) -> Result<Value> {
125        match value {
126            Value::Array(arr) => {
127                let limited = arr.into_iter().take(limit).collect();
128                Ok(Value::Array(limited))
129            }
130            Value::DataFrame(df) => Ok(Value::DataFrame(df.head(Some(limit)))),
131            // For other types, return as-is (limit doesn't apply)
132            other => Ok(other),
133        }
134    }
135
136    /// Validate that a filter is syntactically correct
137    pub fn validate_filter(&self, filter: &str) -> Result<()> {
138        // For now, just try to compile it
139        let compiler = FilterCompiler::new();
140        let _compiled = compiler.compile_str(filter)?;
141        Ok(())
142    }
143
144    /// Explain what a filter does
145    pub fn explain_filter(&self, filter: &str) -> Result<String> {
146        Ok(dsq_core::filter::explain_filter(filter)?)
147    }
148
149    /// Read input from a file path
150    pub async fn read_input(&self, path: &Path) -> Result<Value> {
151        let read_options = self.config.to_read_options();
152        let result = read_file(path, &read_options).await?;
153
154        #[cfg(feature = "profiling")]
155        coz::progress!("input_read");
156
157        Ok(result)
158    }
159
160    /// Read input from stdin
161    async fn read_from_stdin(&self) -> Result<Value> {
162        use std::io::BufRead;
163        let stdin = io::stdin();
164        let mut reader = io::BufReader::new(stdin);
165
166        // Read first line to detect format
167        let mut first_line = String::new();
168        reader.read_line(&mut first_line)?;
169
170        // Try to detect format from first non-whitespace character
171        let trimmed = first_line.trim();
172        if trimmed.starts_with('{') || trimmed.starts_with('[') {
173            // Assume JSON - read entire input
174            let mut buffer = first_line;
175            reader.read_to_string(&mut buffer)?;
176            let json_value: serde_json::Value = serde_json::from_str(&buffer)
177                .map_err(|e| Error::operation(Cow::Owned(format!("Invalid JSON: {}", e))))?;
178            Ok(Value::from_json(json_value))
179        } else {
180            // Assume CSV - write to temp file and read
181            use std::io::Write;
182            let mut temp_file = tempfile::NamedTempFile::new()?;
183            temp_file.write_all(first_line.as_bytes())?;
184            io::copy(&mut reader, &mut temp_file)?;
185            let temp_path = temp_file.path().to_path_buf();
186            let read_options = self.config.to_read_options();
187            read_file(&temp_path, &read_options).await
188        }
189    }
190
191    /// Write output to a file path
192    async fn write_output(&self, value: &Value, path: &Path) -> Result<()> {
193        let write_options = self.config.to_write_options();
194        write_file(value, path, &write_options).await
195    }
196
197    /// Write output to stdout
198    pub fn write_to_stdout(&self, value: &Value) -> Result<()> {
199        use dsq_core::DataFormat;
200
201        // Handle raw output
202        if self.config.display.raw_output {
203            match value {
204                Value::String(s) => {
205                    println!("{}", s);
206                    return Ok(());
207                }
208                Value::Array(arr) => {
209                    for item in arr {
210                        if let Value::String(s) = item {
211                            println!("{}", s);
212                        } else {
213                            let json = item.to_json()?;
214                            println!("{}", json);
215                        }
216                    }
217                    return Ok(());
218                }
219                _ => {
220                    let json = value.to_json()?;
221                    println!("{}", json);
222                    return Ok(());
223                }
224            }
225        }
226
227        let output_format = self
228            .config
229            .io
230            .default_output_format
231            .unwrap_or(DataFormat::Json);
232
233        match output_format {
234            DataFormat::Json => {
235                // Write as JSON to stdout
236                let json_value = value.to_json()?;
237                let json_str = if self.config.display.compact {
238                    serde_json::to_string(&json_value)
239                } else {
240                    serde_json::to_string_pretty(&json_value)
241                }
242                .map_err(|e| {
243                    Error::operation(Cow::Owned(format!("JSON serialization error: {}", e)))
244                })?;
245                println!("{}", json_str);
246            }
247            DataFormat::JsonCompact => {
248                // Write as compact JSON to stdout
249                let json_value = value.to_json()?;
250                let json_str = serde_json::to_string(&json_value)
251                    .map_err(|e| Error::operation(format!("JSON serialization error: {}", e)))?;
252                println!("{}", json_str);
253            }
254            DataFormat::JsonLines => {
255                // Write as NDJSON to stdout
256                let json_value = value.to_json()?;
257                match json_value {
258                    serde_json::Value::Array(arr) => {
259                        for item in arr {
260                            let json_str = serde_json::to_string(&item).map_err(|e| {
261                                Error::operation(format!("JSON serialization error: {}", e))
262                            })?;
263                            println!("{}", json_str);
264                        }
265                    }
266                    _ => {
267                        // For non-arrays, output as single line
268                        let json_str = serde_json::to_string(&json_value).map_err(|e| {
269                            Error::operation(format!("JSON serialization error: {}", e))
270                        })?;
271                        println!("{}", json_str);
272                    }
273                }
274            }
275            DataFormat::Csv => {
276                match value {
277                    Value::DataFrame(df) => {
278                        // Write as CSV to stdout - avoid clone by using a mutable reference
279                        use polars::prelude::CsvWriter;
280                        use std::io::BufWriter;
281                        let stdout = std::io::stdout();
282                        let mut writer = BufWriter::with_capacity(65536, stdout.lock());
283                        CsvWriter::new(&mut writer)
284                            .include_header(true)
285                            .finish(&mut df.clone())
286                            .map_err(|e| Error::operation(format!("CSV write error: {}", e)))?;
287                    }
288                    Value::LazyFrame(lf) => {
289                        let df = lf.clone().collect()?;
290                        self.write_to_stdout(&Value::DataFrame(df))?;
291                    }
292                    _ => {
293                        // For non-DataFrame values, fall back to JSON
294                        let json_value = value.to_json()?;
295                        let json_str = serde_json::to_string_pretty(&json_value).map_err(|e| {
296                            Error::operation(format!("JSON serialization error: {}", e))
297                        })?;
298                        println!("{}", json_str);
299                    }
300                }
301            }
302            DataFormat::Adt => {
303                match value {
304                    Value::DataFrame(df) => {
305                        // Write as ADT to stdout with buffering for better performance
306                        use std::io::{self, BufWriter, Write};
307
308                        const FIELD_SEPARATOR: u8 = 31;
309                        const RECORD_SEPARATOR: u8 = 30;
310
311                        let stdout_handle = io::stdout();
312                        let mut stdout = BufWriter::with_capacity(65536, stdout_handle.lock());
313
314                        // Write header
315                        let headers: Vec<&str> =
316                            df.get_column_names().iter().map(|s| s.as_str()).collect();
317                        for (i, header) in headers.iter().enumerate() {
318                            if i > 0 {
319                                stdout.write_all(&[FIELD_SEPARATOR])?;
320                            }
321                            stdout.write_all(header.as_bytes())?;
322                        }
323                        stdout.write_all(&[RECORD_SEPARATOR])?;
324
325                        // Write data rows
326                        let height = df.height();
327                        let mut value_buffer = String::new(); // Pre-allocated buffer for value formatting
328                        for row_idx in 0..height {
329                            for (col_idx, column) in df.get_columns().iter().enumerate() {
330                                if col_idx > 0 {
331                                    stdout.write_all(&[FIELD_SEPARATOR])?;
332                                }
333
334                                value_buffer.clear(); // Reuse the buffer
335                                match column.get(row_idx).map_err(|e| {
336                                    Error::operation(Cow::Owned(format!(
337                                        "Failed to get column value: {}",
338                                        e
339                                    )))
340                                })? {
341                                    polars::prelude::AnyValue::String(s) => {
342                                        value_buffer.push_str(s)
343                                    }
344                                    polars::prelude::AnyValue::Int64(i) => {
345                                        write!(value_buffer, "{}", i).unwrap()
346                                    }
347                                    polars::prelude::AnyValue::Float64(f) => {
348                                        write!(value_buffer, "{}", f).unwrap()
349                                    }
350                                    polars::prelude::AnyValue::Boolean(b) => {
351                                        write!(value_buffer, "{}", b).unwrap()
352                                    }
353                                    polars::prelude::AnyValue::Null => {} // buffer remains empty
354                                    other => write!(value_buffer, "{}", other).unwrap(),
355                                };
356
357                                stdout.write_all(value_buffer.as_bytes())?;
358                            }
359                            stdout.write_all(&[RECORD_SEPARATOR])?;
360                        }
361                        stdout.flush()?;
362                    }
363                    Value::LazyFrame(lf) => {
364                        let df = lf.clone().collect()?;
365                        self.write_to_stdout(&Value::DataFrame(df))?;
366                    }
367                    _ => {
368                        // For non-DataFrame values, fall back to JSON
369                        let json_value = value.to_json()?;
370                        let json_str = serde_json::to_string_pretty(&json_value).map_err(|e| {
371                            Error::operation(format!("JSON serialization error: {}", e))
372                        })?;
373                        println!("{}", json_str);
374                    }
375                }
376            }
377            _ => {
378                // For other formats, fall back to string representation
379                println!("{}", value);
380            }
381        }
382
383        Ok(())
384    }
385}
386
387#[cfg(test)]
388mod tests {
389    use super::*;
390
391    #[tokio::test]
392    async fn test_execute_filter_on_value_identity() {
393        let config = Config::default();
394        let mut executor = Executor::new(config);
395        let input_value = dsq_core::utils::object([
396            ("name", dsq_core::Value::string("Alice")),
397            ("age", dsq_core::Value::int(30)),
398        ]);
399
400        // Test identity filter "."
401        let result = executor
402            .execute_filter_on_value(".", input_value.clone(), None)
403            .await;
404        assert!(result.is_ok());
405        // For now, just check it doesn't error
406    }
407
408    #[test]
409    fn test_validate_filter() {
410        let config = Config::default();
411        let executor = Executor::new(config);
412
413        // Valid filter
414        assert!(executor.validate_filter(".").is_ok());
415        assert!(executor.validate_filter(".name").is_ok());
416
417        // Invalid filter
418        assert!(executor.validate_filter("invalid syntax +++").is_err());
419    }
420
421    #[test]
422    fn test_explain_filter() {
423        let config = Config::default();
424        let executor = Executor::new(config);
425
426        // Test explaining a filter
427        let result = executor.explain_filter(".");
428        assert!(result.is_ok());
429        let explanation = result.unwrap();
430        assert!(!explanation.is_empty());
431    }
432
433    #[tokio::test]
434    async fn test_execute_filter_on_value_with_filter_duplicate() {
435        let config = Config::default();
436        let mut executor = Executor::new(config);
437        let input_value = dsq_core::utils::object([
438            ("name", dsq_core::Value::string("Alice")),
439            ("age", dsq_core::Value::int(30)),
440        ]);
441
442        // Test filter ".name"
443        let result = executor
444            .execute_filter_on_value(".name", input_value.clone(), None)
445            .await;
446        assert!(result.is_ok());
447    }
448
449    #[tokio::test]
450    async fn test_execute_filter_on_value_with_limit() {
451        let mut config = Config::default();
452        config.io.limit = Some(1);
453        let mut executor = Executor::new(config);
454        let input_value = Value::Array(vec![
455            dsq_core::Value::int(1),
456            dsq_core::Value::int(2),
457            dsq_core::Value::int(3),
458        ]);
459
460        // Test with limit
461        let result = executor
462            .execute_filter_on_value(".", input_value, None)
463            .await;
464        assert!(result.is_ok());
465    }
466
467    #[tokio::test]
468    async fn test_execute_filter_on_value_invalid_filter() {
469        let config = Config::default();
470        let mut executor = Executor::new(config);
471        let input_value = Value::Null;
472
473        // Test invalid filter
474        let result = executor
475            .execute_filter_on_value("invalid +++", input_value, None)
476            .await;
477        assert!(result.is_err());
478    }
479
480    #[tokio::test]
481    async fn test_read_input() {
482        let config = Config::default();
483        let executor = Executor::new(config);
484        let temp_file = tempfile::NamedTempFile::new().unwrap();
485        std::fs::write(&temp_file, r#"{"name": "test"}"#).unwrap();
486        let path = temp_file.path();
487
488        let result = executor.read_input(path).await;
489        assert!(result.is_ok());
490    }
491
492    #[tokio::test]
493    async fn test_write_output() {
494        use polars::prelude::*;
495
496        let config = Config::default();
497        let executor = Executor::new(config);
498        let temp_dir = tempfile::tempdir().unwrap();
499        let path = temp_dir.path().join("test.csv");
500
501        // Create a simple DataFrame
502        let df = df! {
503            "name" => &["Alice", "Bob"],
504            "age" => &[30, 25],
505        }
506        .unwrap();
507        let value = Value::DataFrame(df);
508
509        let result = executor.write_output(&value, &path).await;
510        assert!(result.is_ok());
511
512        // Verify the file was written
513        let content = std::fs::read_to_string(&path).unwrap();
514        assert!(content.contains("Alice"));
515        assert!(content.contains("Bob"));
516    }
517
518    #[tokio::test]
519    async fn test_execute_filter_with_file() {
520        let config = Config::default();
521        let mut executor = Executor::new(config);
522        let temp_file = tempfile::NamedTempFile::new().unwrap();
523        std::fs::write(&temp_file, r#"{"name": "Alice"}"#).unwrap();
524        let input_path = temp_file.path();
525
526        let result = executor
527            .execute_filter(".name", Some(input_path), None)
528            .await;
529        assert!(result.is_ok());
530    }
531
532    #[tokio::test]
533    async fn test_execute_filter_on_value_with_stats() {
534        let mut config = Config::default();
535        config.debug.verbosity = 1;
536        let mut executor = Executor::new(config);
537        let input_value = Value::Null;
538
539        // Capture stderr for stats, but since it's eprintln, hard to test
540        let result = executor
541            .execute_filter_on_value(".", input_value, None)
542            .await;
543        assert!(result.is_ok());
544    }
545}