1use std::borrow::Cow;
7use std::fmt::Write;
8use std::io::{self, Read};
9use std::path::Path;
10
11use polars::prelude::SerWriter;
12
13use crate::config::Config;
14use dsq_core::error::{Error, Result};
15use dsq_core::filter::{FilterCompiler, FilterExecutor as CoreFilterExecutor};
16use dsq_core::io::{read_file, write_file};
17use dsq_core::Value;
18
19pub struct Executor {
21 config: Config,
22 pub filter_executor: CoreFilterExecutor,
23}
24
25impl Executor {
26 pub fn new(config: Config) -> Self {
28 let executor_config = config.to_executor_config();
29 let filter_executor = CoreFilterExecutor::with_config(executor_config);
30 Self {
31 config,
32 filter_executor,
33 }
34 }
35
36 pub async fn execute_filter(
38 &mut self,
39 filter: &str,
40 input_path: Option<&Path>,
41 output_path: Option<&Path>,
42 ) -> Result<()> {
43 let input_value = if let Some(path) = input_path {
45 self.read_input(path).await?
46 } else {
47 self.read_from_stdin().await?
49 };
50
51 self.execute_filter_on_value(filter, input_value, output_path)
52 .await
53 }
54
55 pub async fn execute_filter_on_value(
57 &mut self,
58 filter: &str,
59 input_value: Value,
60 output_path: Option<&Path>,
61 ) -> Result<()> {
62 #[cfg(feature = "profiling")]
64 coz::progress!("filter_execution");
65
66 let result = self.filter_executor.execute_str(filter, input_value)?;
67 let mut result_value = result.value;
68
69 #[cfg(feature = "profiling")]
70 coz::progress!("filter_complete");
71
72 if let Some(limit) = self.config.io.limit {
74 result_value = self.apply_limit(result_value, limit)?;
75 }
76
77 if let Some(path) = output_path {
79 self.write_output(&result_value, path).await?;
80 } else {
81 self.write_to_stdout(&result_value)?;
83 }
84
85 #[cfg(feature = "profiling")]
86 coz::progress!("output_complete");
87
88 if self.config.display.exit_status {
90 let exit_code = match &result_value {
91 Value::Null => 1,
92 Value::Bool(false) => 1,
93 Value::Array(arr) if arr.is_empty() => 1,
94 Value::String(s) if s.is_empty() => 1,
95 _ => 0,
96 };
97 std::process::exit(exit_code);
98 }
99
100 if self.config.debug.verbosity > 0 {
102 eprintln!(
103 "Execution time: {} ms",
104 result
105 .stats
106 .as_ref()
107 .map(|s| s.execution_time.as_millis() as u64)
108 .unwrap_or(0)
109 );
110 eprintln!(
111 "Operations: {}",
112 result
113 .stats
114 .as_ref()
115 .map(|s| s.operations_executed)
116 .unwrap_or(0)
117 );
118 }
119
120 Ok(())
121 }
122
123 fn apply_limit(&self, value: Value, limit: usize) -> Result<Value> {
125 match value {
126 Value::Array(arr) => {
127 let limited = arr.into_iter().take(limit).collect();
128 Ok(Value::Array(limited))
129 }
130 Value::DataFrame(df) => Ok(Value::DataFrame(df.head(Some(limit)))),
131 other => Ok(other),
133 }
134 }
135
136 pub fn validate_filter(&self, filter: &str) -> Result<()> {
138 let compiler = FilterCompiler::new();
140 let _compiled = compiler.compile_str(filter)?;
141 Ok(())
142 }
143
144 pub fn explain_filter(&self, filter: &str) -> Result<String> {
146 Ok(dsq_core::filter::explain_filter(filter)?)
147 }
148
149 pub async fn read_input(&self, path: &Path) -> Result<Value> {
151 let read_options = self.config.to_read_options();
152 let result = read_file(path, &read_options).await?;
153
154 #[cfg(feature = "profiling")]
155 coz::progress!("input_read");
156
157 Ok(result)
158 }
159
160 async fn read_from_stdin(&self) -> Result<Value> {
162 use std::io::BufRead;
163 let stdin = io::stdin();
164 let mut reader = io::BufReader::new(stdin);
165
166 let mut first_line = String::new();
168 reader.read_line(&mut first_line)?;
169
170 let trimmed = first_line.trim();
172 if trimmed.starts_with('{') || trimmed.starts_with('[') {
173 let mut buffer = first_line;
175 reader.read_to_string(&mut buffer)?;
176 let json_value: serde_json::Value = serde_json::from_str(&buffer)
177 .map_err(|e| Error::operation(Cow::Owned(format!("Invalid JSON: {}", e))))?;
178 Ok(Value::from_json(json_value))
179 } else {
180 use std::io::Write;
182 let mut temp_file = tempfile::NamedTempFile::new()?;
183 temp_file.write_all(first_line.as_bytes())?;
184 io::copy(&mut reader, &mut temp_file)?;
185 let temp_path = temp_file.path().to_path_buf();
186 let read_options = self.config.to_read_options();
187 read_file(&temp_path, &read_options).await
188 }
189 }
190
191 async fn write_output(&self, value: &Value, path: &Path) -> Result<()> {
193 let write_options = self.config.to_write_options();
194 write_file(value, path, &write_options).await
195 }
196
197 pub fn write_to_stdout(&self, value: &Value) -> Result<()> {
199 use dsq_core::DataFormat;
200
201 if self.config.display.raw_output {
203 match value {
204 Value::String(s) => {
205 println!("{}", s);
206 return Ok(());
207 }
208 Value::Array(arr) => {
209 for item in arr {
210 if let Value::String(s) = item {
211 println!("{}", s);
212 } else {
213 let json = item.to_json()?;
214 println!("{}", json);
215 }
216 }
217 return Ok(());
218 }
219 _ => {
220 let json = value.to_json()?;
221 println!("{}", json);
222 return Ok(());
223 }
224 }
225 }
226
227 let output_format = self
228 .config
229 .io
230 .default_output_format
231 .unwrap_or(DataFormat::Json);
232
233 match output_format {
234 DataFormat::Json => {
235 let json_value = value.to_json()?;
237 let json_str = if self.config.display.compact {
238 serde_json::to_string(&json_value)
239 } else {
240 serde_json::to_string_pretty(&json_value)
241 }
242 .map_err(|e| {
243 Error::operation(Cow::Owned(format!("JSON serialization error: {}", e)))
244 })?;
245 println!("{}", json_str);
246 }
247 DataFormat::JsonCompact => {
248 let json_value = value.to_json()?;
250 let json_str = serde_json::to_string(&json_value)
251 .map_err(|e| Error::operation(format!("JSON serialization error: {}", e)))?;
252 println!("{}", json_str);
253 }
254 DataFormat::JsonLines => {
255 let json_value = value.to_json()?;
257 match json_value {
258 serde_json::Value::Array(arr) => {
259 for item in arr {
260 let json_str = serde_json::to_string(&item).map_err(|e| {
261 Error::operation(format!("JSON serialization error: {}", e))
262 })?;
263 println!("{}", json_str);
264 }
265 }
266 _ => {
267 let json_str = serde_json::to_string(&json_value).map_err(|e| {
269 Error::operation(format!("JSON serialization error: {}", e))
270 })?;
271 println!("{}", json_str);
272 }
273 }
274 }
275 DataFormat::Csv => {
276 match value {
277 Value::DataFrame(df) => {
278 use polars::prelude::CsvWriter;
280 use std::io::BufWriter;
281 let stdout = std::io::stdout();
282 let mut writer = BufWriter::with_capacity(65536, stdout.lock());
283 CsvWriter::new(&mut writer)
284 .include_header(true)
285 .finish(&mut df.clone())
286 .map_err(|e| Error::operation(format!("CSV write error: {}", e)))?;
287 }
288 Value::LazyFrame(lf) => {
289 let df = lf.clone().collect()?;
290 self.write_to_stdout(&Value::DataFrame(df))?;
291 }
292 _ => {
293 let json_value = value.to_json()?;
295 let json_str = serde_json::to_string_pretty(&json_value).map_err(|e| {
296 Error::operation(format!("JSON serialization error: {}", e))
297 })?;
298 println!("{}", json_str);
299 }
300 }
301 }
302 DataFormat::Adt => {
303 match value {
304 Value::DataFrame(df) => {
305 use std::io::{self, BufWriter, Write};
307
308 const FIELD_SEPARATOR: u8 = 31;
309 const RECORD_SEPARATOR: u8 = 30;
310
311 let stdout_handle = io::stdout();
312 let mut stdout = BufWriter::with_capacity(65536, stdout_handle.lock());
313
314 let headers: Vec<&str> =
316 df.get_column_names().iter().map(|s| s.as_str()).collect();
317 for (i, header) in headers.iter().enumerate() {
318 if i > 0 {
319 stdout.write_all(&[FIELD_SEPARATOR])?;
320 }
321 stdout.write_all(header.as_bytes())?;
322 }
323 stdout.write_all(&[RECORD_SEPARATOR])?;
324
325 let height = df.height();
327 let mut value_buffer = String::new(); for row_idx in 0..height {
329 for (col_idx, column) in df.get_columns().iter().enumerate() {
330 if col_idx > 0 {
331 stdout.write_all(&[FIELD_SEPARATOR])?;
332 }
333
334 value_buffer.clear(); match column.get(row_idx).map_err(|e| {
336 Error::operation(Cow::Owned(format!(
337 "Failed to get column value: {}",
338 e
339 )))
340 })? {
341 polars::prelude::AnyValue::String(s) => {
342 value_buffer.push_str(s)
343 }
344 polars::prelude::AnyValue::Int64(i) => {
345 write!(value_buffer, "{}", i).unwrap()
346 }
347 polars::prelude::AnyValue::Float64(f) => {
348 write!(value_buffer, "{}", f).unwrap()
349 }
350 polars::prelude::AnyValue::Boolean(b) => {
351 write!(value_buffer, "{}", b).unwrap()
352 }
353 polars::prelude::AnyValue::Null => {} other => write!(value_buffer, "{}", other).unwrap(),
355 };
356
357 stdout.write_all(value_buffer.as_bytes())?;
358 }
359 stdout.write_all(&[RECORD_SEPARATOR])?;
360 }
361 stdout.flush()?;
362 }
363 Value::LazyFrame(lf) => {
364 let df = lf.clone().collect()?;
365 self.write_to_stdout(&Value::DataFrame(df))?;
366 }
367 _ => {
368 let json_value = value.to_json()?;
370 let json_str = serde_json::to_string_pretty(&json_value).map_err(|e| {
371 Error::operation(format!("JSON serialization error: {}", e))
372 })?;
373 println!("{}", json_str);
374 }
375 }
376 }
377 _ => {
378 println!("{}", value);
380 }
381 }
382
383 Ok(())
384 }
385}
386
387#[cfg(test)]
388mod tests {
389 use super::*;
390
391 #[tokio::test]
392 async fn test_execute_filter_on_value_identity() {
393 let config = Config::default();
394 let mut executor = Executor::new(config);
395 let input_value = dsq_core::utils::object([
396 ("name", dsq_core::Value::string("Alice")),
397 ("age", dsq_core::Value::int(30)),
398 ]);
399
400 let result = executor
402 .execute_filter_on_value(".", input_value.clone(), None)
403 .await;
404 assert!(result.is_ok());
405 }
407
408 #[test]
409 fn test_validate_filter() {
410 let config = Config::default();
411 let executor = Executor::new(config);
412
413 assert!(executor.validate_filter(".").is_ok());
415 assert!(executor.validate_filter(".name").is_ok());
416
417 assert!(executor.validate_filter("invalid syntax +++").is_err());
419 }
420
421 #[test]
422 fn test_explain_filter() {
423 let config = Config::default();
424 let executor = Executor::new(config);
425
426 let result = executor.explain_filter(".");
428 assert!(result.is_ok());
429 let explanation = result.unwrap();
430 assert!(!explanation.is_empty());
431 }
432
433 #[tokio::test]
434 async fn test_execute_filter_on_value_with_filter_duplicate() {
435 let config = Config::default();
436 let mut executor = Executor::new(config);
437 let input_value = dsq_core::utils::object([
438 ("name", dsq_core::Value::string("Alice")),
439 ("age", dsq_core::Value::int(30)),
440 ]);
441
442 let result = executor
444 .execute_filter_on_value(".name", input_value.clone(), None)
445 .await;
446 assert!(result.is_ok());
447 }
448
449 #[tokio::test]
450 async fn test_execute_filter_on_value_with_limit() {
451 let mut config = Config::default();
452 config.io.limit = Some(1);
453 let mut executor = Executor::new(config);
454 let input_value = Value::Array(vec![
455 dsq_core::Value::int(1),
456 dsq_core::Value::int(2),
457 dsq_core::Value::int(3),
458 ]);
459
460 let result = executor
462 .execute_filter_on_value(".", input_value, None)
463 .await;
464 assert!(result.is_ok());
465 }
466
467 #[tokio::test]
468 async fn test_execute_filter_on_value_invalid_filter() {
469 let config = Config::default();
470 let mut executor = Executor::new(config);
471 let input_value = Value::Null;
472
473 let result = executor
475 .execute_filter_on_value("invalid +++", input_value, None)
476 .await;
477 assert!(result.is_err());
478 }
479
480 #[tokio::test]
481 async fn test_read_input() {
482 let config = Config::default();
483 let executor = Executor::new(config);
484 let temp_file = tempfile::NamedTempFile::new().unwrap();
485 std::fs::write(&temp_file, r#"{"name": "test"}"#).unwrap();
486 let path = temp_file.path();
487
488 let result = executor.read_input(path).await;
489 assert!(result.is_ok());
490 }
491
492 #[tokio::test]
493 async fn test_write_output() {
494 use polars::prelude::*;
495
496 let config = Config::default();
497 let executor = Executor::new(config);
498 let temp_dir = tempfile::tempdir().unwrap();
499 let path = temp_dir.path().join("test.csv");
500
501 let df = df! {
503 "name" => &["Alice", "Bob"],
504 "age" => &[30, 25],
505 }
506 .unwrap();
507 let value = Value::DataFrame(df);
508
509 let result = executor.write_output(&value, &path).await;
510 assert!(result.is_ok());
511
512 let content = std::fs::read_to_string(&path).unwrap();
514 assert!(content.contains("Alice"));
515 assert!(content.contains("Bob"));
516 }
517
518 #[tokio::test]
519 async fn test_execute_filter_with_file() {
520 let config = Config::default();
521 let mut executor = Executor::new(config);
522 let temp_file = tempfile::NamedTempFile::new().unwrap();
523 std::fs::write(&temp_file, r#"{"name": "Alice"}"#).unwrap();
524 let input_path = temp_file.path();
525
526 let result = executor
527 .execute_filter(".name", Some(input_path), None)
528 .await;
529 assert!(result.is_ok());
530 }
531
532 #[tokio::test]
533 async fn test_execute_filter_on_value_with_stats() {
534 let mut config = Config::default();
535 config.debug.verbosity = 1;
536 let mut executor = Executor::new(config);
537 let input_value = Value::Null;
538
539 let result = executor
541 .execute_filter_on_value(".", input_value, None)
542 .await;
543 assert!(result.is_ok());
544 }
545}