1use std::fs::File;
2
3use anyhow::{Context, Result, anyhow};
4use csv::{ByteRecord, Position};
5use itertools::Itertools;
6use log::{debug, info};
7
8use crate::{
9 cli::{BooleanFormat, ProcessArgs},
10 data::{ComparableValue, Value},
11 derive::{DerivedColumn, parse_derived_columns},
12 filter::{evaluate_conditions, parse_filters},
13 index::{CsvIndex, IndexVariant, SortDirection},
14 io_utils,
15 rows::{evaluate_filter_expressions, parse_typed_row},
16 schema::{ColumnMeta, ColumnType, Schema},
17 table,
18};
19
20use encoding_rs::Encoding;
21
22pub fn execute(args: &ProcessArgs) -> Result<()> {
23 let delimiter = io_utils::resolve_input_delimiter(&args.input, args.delimiter);
24 let input_encoding = io_utils::resolve_encoding(args.input_encoding.as_deref())?;
25 let output_path = args.output.as_deref();
26 let writing_to_stdout = output_path.is_none_or(io_utils::is_dash);
27
28 if args.preview && args.output.is_some() {
29 return Err(anyhow!("--preview cannot be combined with --output"));
30 }
31 let mut limit = args.limit;
32 if args.preview && limit.is_none() {
33 limit = Some(10);
34 }
35 let use_table_output = if args.preview {
36 true
37 } else {
38 args.table && writing_to_stdout
39 };
40 let output_delimiter =
41 io_utils::resolve_output_delimiter(output_path, args.output_delimiter, delimiter);
42 let output_encoding = io_utils::resolve_encoding(args.output_encoding.as_deref())?;
43 info!(
44 "Processing '{}' -> {:?} (delimiter '{}', output '{}')",
45 args.input.display(),
46 output_path
47 .map(|p| p.display().to_string())
48 .unwrap_or_else(|| "stdout".into()),
49 crate::printable_delimiter(delimiter),
50 crate::printable_delimiter(output_delimiter)
51 );
52 let sorts = args
53 .sort
54 .iter()
55 .flat_map(|s| s.split(','))
56 .map(|s| s.trim())
57 .filter(|s| !s.is_empty())
58 .map(SortDirective::parse)
59 .collect::<Result<Vec<_>>>()?;
60 let selected_columns = args
61 .columns
62 .iter()
63 .flat_map(|s| s.split(','))
64 .map(|s| s.trim())
65 .filter(|s| !s.is_empty())
66 .map(|s| s.to_string())
67 .collect::<Vec<_>>();
68 let derived_columns = parse_derived_columns(&args.derives)?;
69 let filters = parse_filters(&args.filters)?;
70
71 let mut reader;
72 let headers: Vec<String>;
73 let mut schema: Schema;
74
75 if let Some(schema_path) = &args.schema {
76 schema = Schema::load(schema_path)?;
77 let expects_headers = schema.expects_headers();
78 reader = io_utils::open_csv_reader_from_path(&args.input, delimiter, expects_headers)?;
79 headers = if expects_headers {
80 io_utils::reader_headers(&mut reader, input_encoding)?
81 } else {
82 schema.headers()
83 };
84 } else {
85 let layout =
86 crate::schema::detect_csv_layout(&args.input, delimiter, input_encoding, None)?;
87 reader = io_utils::open_csv_reader_from_path(&args.input, delimiter, layout.has_headers)?;
88 headers = if layout.has_headers {
89 io_utils::reader_headers(&mut reader, input_encoding)?
90 } else {
91 layout.headers.clone()
92 };
93 schema = Schema::from_headers(&headers);
94 schema.has_headers = layout.has_headers;
95 }
96
97 reconcile_schema_with_headers(&mut schema, &headers)?;
98
99 if args.apply_mappings && args.skip_mappings {
100 return Err(anyhow!(
101 "--apply-mappings and --skip-mappings cannot be used together"
102 ));
103 }
104 let schema_has_mappings = schema.has_transformations();
105 let apply_mappings = if args.skip_mappings {
106 false
107 } else if args.apply_mappings {
108 if !schema_has_mappings {
109 debug!("--apply-mappings requested but schema defines no datatype mappings");
110 }
111 schema_has_mappings
112 } else {
113 schema_has_mappings
114 };
115
116 let maybe_index = if let Some(index_path) = &args.index {
117 Some(CsvIndex::load(index_path)?)
118 } else {
119 None
120 };
121
122 let requested_variant = args
123 .index_variant
124 .as_ref()
125 .map(|name| name.trim())
126 .filter(|name| !name.is_empty())
127 .map(|name| name.to_string());
128
129 if requested_variant.is_some() && maybe_index.is_none() {
130 return Err(anyhow!(
131 "An index variant was specified but no index file was provided"
132 ));
133 }
134
135 let sort_signature = sorts
136 .iter()
137 .map(|s| {
138 (
139 s.column.clone(),
140 if s.ascending {
141 SortDirection::Asc
142 } else {
143 SortDirection::Desc
144 },
145 )
146 })
147 .collect_vec();
148
149 let matching_variant: Option<&IndexVariant> = if let Some(index) = maybe_index.as_ref() {
150 if let Some(name) = requested_variant.as_deref() {
151 if sort_signature.is_empty() {
152 return Err(anyhow!(
153 "Selecting an index variant requires at least one --sort directive"
154 ));
155 }
156 let variant = index.variant_by_name(name).ok_or_else(|| {
157 anyhow!(
158 "Index variant '{name}' not found in {:?}",
159 args.index.as_ref().map(|p| p.display().to_string())
160 )
161 })?;
162 if !variant.matches(&sort_signature) {
163 return Err(anyhow!(
164 "Index variant '{name}' does not match the requested sort order"
165 ));
166 }
167 Some(variant)
168 } else if sort_signature.is_empty() {
169 None
170 } else {
171 index.best_match(&sort_signature)
172 }
173 } else {
174 None
175 };
176
177 let column_map = build_column_map(&headers, &schema);
178 let sort_plan = build_sort_plan(&sorts, &schema, &column_map)?;
179 let filter_conditions = filters;
180
181 let output_plan = OutputPlan::new(
182 &headers,
183 &schema,
184 &selected_columns,
185 &derived_columns,
186 args.row_numbers,
187 args.boolean_format,
188 )?;
189
190 if args.table && !use_table_output && !args.preview {
191 debug!("--table requested but output will remain CSV because a file path was provided");
192 }
193
194 if use_table_output {
195 let mut rows_for_table = Vec::new();
196 {
197 let mut engine = ProcessEngine {
198 schema: &schema,
199 headers: &headers,
200 filters: &filter_conditions,
201 filter_exprs: &args.filter_exprs,
202 derived_columns: &derived_columns,
203 output_plan: &output_plan,
204 sink: OutputSink::Table(&mut rows_for_table),
205 limit,
206 apply_mappings,
207 };
208
209 if let Some(variant) = matching_variant {
210 if io_utils::is_dash(&args.input) {
211 return Err(anyhow!(
212 "Index accelerated processing requires a regular file input"
213 ));
214 }
215 let mut seek_reader =
216 io_utils::open_seekable_csv_reader(&args.input, delimiter, true)?;
217 seek_reader.byte_headers()?;
219 let covered = variant.columns().len();
220 let total = sort_signature.len();
221 info!(
222 "Using index {:?} variant '{}' to accelerate sort",
223 args.index,
224 variant.describe()
225 );
226 if covered < total {
227 debug!(
228 "Index covers {covered}/{total} sort columns; remaining columns will be sorted in-memory"
229 );
230 }
231 engine.process_with_index(&mut seek_reader, input_encoding, variant, &sort_plan)?;
232 } else {
233 if maybe_index.is_some() {
234 debug!("Index present but not used due to incompatible sort signature");
235 }
236 engine.process_in_memory(reader, input_encoding, sort_plan)?;
237 }
238 }
239
240 table::print_table(output_plan.headers(), &rows_for_table);
241 if args.preview {
242 info!(
243 "Displayed {} row(s) from {:?}",
244 rows_for_table.len(),
245 args.input
246 );
247 }
248 Ok(())
249 } else {
250 let mut writer = io_utils::open_csv_writer(output_path, output_delimiter, output_encoding)?;
251 write_headers(&mut writer, &output_plan)?;
252 {
253 let mut engine = ProcessEngine {
254 schema: &schema,
255 headers: &headers,
256 filters: &filter_conditions,
257 filter_exprs: &args.filter_exprs,
258 derived_columns: &derived_columns,
259 output_plan: &output_plan,
260 sink: OutputSink::Csv(&mut writer),
261 limit,
262 apply_mappings,
263 };
264
265 if let Some(variant) = matching_variant {
266 if io_utils::is_dash(&args.input) {
267 return Err(anyhow!(
268 "Index accelerated processing requires a regular file input"
269 ));
270 }
271 let mut seek_reader =
272 io_utils::open_seekable_csv_reader(&args.input, delimiter, true)?;
273 seek_reader.byte_headers()?;
275 let covered = variant.columns().len();
276 let total = sort_signature.len();
277 info!(
278 "Using index {:?} variant '{}' to accelerate sort",
279 args.index,
280 variant.describe()
281 );
282 if covered < total {
283 debug!(
284 "Index covers {covered}/{total} sort columns; remaining columns will be sorted in-memory"
285 );
286 }
287 engine.process_with_index(&mut seek_reader, input_encoding, variant, &sort_plan)?;
288 } else {
289 if maybe_index.is_some() {
290 debug!("Index present but not used due to incompatible sort signature");
291 }
292 engine.process_in_memory(reader, input_encoding, sort_plan)?;
293 }
294 }
295 writer.flush().context("Flushing output")
296 }
297}
298
299fn reconcile_schema_with_headers(schema: &mut Schema, headers: &[String]) -> Result<()> {
300 if schema.columns.is_empty() {
301 schema.columns = headers
302 .iter()
303 .map(|name| ColumnMeta {
304 name: name.clone(),
305 datatype: ColumnType::String,
306 rename: None,
307 value_replacements: Vec::new(),
308 datatype_mappings: Vec::new(),
309 })
310 .collect();
311 return Ok(());
312 }
313
314 schema.validate_headers(headers)?;
315 Ok(())
316}
317
318fn build_column_map(
319 headers: &[String],
320 schema: &Schema,
321) -> std::collections::HashMap<String, usize> {
322 let mut map = std::collections::HashMap::new();
323 for (idx, header) in headers.iter().enumerate() {
324 map.insert(header.clone(), idx);
325 if let Some(rename) = schema
326 .columns
327 .get(idx)
328 .and_then(|column| column.rename.as_ref())
329 .filter(|rename| !rename.is_empty())
330 {
331 map.insert(rename.clone(), idx);
332 }
333 }
334 map
335}
336
337fn build_sort_plan(
338 directives: &[SortDirective],
339 schema: &Schema,
340 column_map: &std::collections::HashMap<String, usize>,
341) -> Result<Vec<SortInstruction>> {
342 directives
343 .iter()
344 .map(|directive| {
345 let idx = column_map
346 .get(&directive.column)
347 .copied()
348 .or_else(|| schema.column_index(&directive.column))
349 .ok_or_else(|| anyhow!("Sort column '{}' not found", directive.column))?;
350 Ok(SortInstruction {
351 index: idx,
352 ascending: directive.ascending,
353 })
354 })
355 .collect()
356}
357
358fn write_headers(
359 writer: &mut csv::Writer<Box<dyn std::io::Write>>,
360 plan: &OutputPlan,
361) -> Result<()> {
362 writer
363 .write_record(plan.headers.iter())
364 .context("Writing output headers")
365}
366
367enum OutputSink<'a> {
368 Csv(&'a mut csv::Writer<Box<dyn std::io::Write>>),
369 Table(&'a mut Vec<Vec<String>>),
370}
371
372struct ProcessEngine<'a, 'b> {
373 schema: &'a Schema,
374 headers: &'a [String],
375 filters: &'a [crate::filter::FilterCondition],
376 filter_exprs: &'a [String],
377 derived_columns: &'a [DerivedColumn],
378 output_plan: &'a OutputPlan,
379 sink: OutputSink<'b>,
380 limit: Option<usize>,
381 apply_mappings: bool,
382}
383
384impl<'a, 'b> ProcessEngine<'a, 'b> {
385 fn process_in_memory(
386 &mut self,
387 reader: csv::Reader<Box<dyn std::io::Read>>,
388 encoding: &'static Encoding,
389 sort_plan: Vec<SortInstruction>,
390 ) -> Result<()> {
391 let mut rows: Vec<RowData> = Vec::new();
392
393 for (ordinal, result) in reader.into_byte_records().enumerate() {
394 let record = result.with_context(|| format!("Reading row {}", ordinal + 2))?;
395 let mut raw = io_utils::decode_record(&record, encoding)?;
396 if self.apply_mappings {
397 self.schema
398 .apply_transformations_to_row(&mut raw)
399 .with_context(|| {
400 format!("Applying datatype mappings to row {}", ordinal + 2)
401 })?;
402 }
403 self.schema.apply_replacements_to_row(&mut raw);
404 let typed = parse_typed_row(self.schema, &raw)?;
405
406 if !self.filters.is_empty()
407 && !evaluate_conditions(self.filters, self.schema, self.headers, &raw, &typed)?
408 {
409 continue;
410 }
411
412 if !self.filter_exprs.is_empty()
413 && !evaluate_filter_expressions(
414 self.filter_exprs,
415 self.headers,
416 &raw,
417 &typed,
418 Some(ordinal + 1),
419 )?
420 {
421 continue;
422 }
423
424 rows.push(RowData {
425 raw,
426 typed,
427 ordinal,
428 });
429 }
430
431 if !sort_plan.is_empty() {
432 rows.sort_by(|a, b| compare_rows(a, b, &sort_plan));
433 }
434
435 for (written, row) in rows.into_iter().enumerate() {
436 if self.limit.is_some_and(|limit| written >= limit) {
437 break;
438 }
439 self.emit_row(&row.raw, &row.typed, written + 1)?;
440 }
441
442 Ok(())
443 }
444
445 fn process_with_index(
446 &mut self,
447 reader: &mut csv::Reader<std::io::BufReader<File>>,
448 encoding: &'static Encoding,
449 variant: &IndexVariant,
450 sort_plan: &[SortInstruction],
451 ) -> Result<()> {
452 let mut record = ByteRecord::new();
453 let mut emitted = 0usize;
454 let mut ordinal = 0usize;
455 let prefix_len = variant.columns().len();
456 let mut current_prefix: Option<Vec<Option<Value>>> = None;
457 let mut bucket: Vec<RowData> = Vec::new();
458
459 for offset in variant.ordered_offsets() {
460 if self.limit.is_some_and(|limit| emitted >= limit) {
461 break;
462 }
463 let mut position = Position::new();
464 position.set_byte(offset);
465 reader.seek(position)?;
466 if !reader.read_byte_record(&mut record)? {
467 break;
468 }
469 let mut raw = io_utils::decode_record(&record, encoding)?;
470 if self.apply_mappings {
471 self.schema
472 .apply_transformations_to_row(&mut raw)
473 .with_context(|| {
474 format!(
475 "Applying datatype mappings to indexed row at byte offset {}",
476 offset
477 )
478 })?;
479 }
480 self.schema.apply_replacements_to_row(&mut raw);
481 let typed = parse_typed_row(self.schema, &raw)?;
482 if !self.filters.is_empty()
483 && !evaluate_conditions(self.filters, self.schema, self.headers, &raw, &typed)?
484 {
485 continue;
486 }
487
488 if !self.filter_exprs.is_empty()
489 && !evaluate_filter_expressions(
490 self.filter_exprs,
491 self.headers,
492 &raw,
493 &typed,
494 Some(ordinal + 1),
495 )?
496 {
497 continue;
498 }
499
500 let prefix_key = build_prefix_key(&typed, sort_plan, prefix_len);
501 match current_prefix.as_ref() {
502 Some(existing) if *existing == prefix_key => {}
503 Some(_) => {
504 if self.flush_bucket(&mut bucket, sort_plan, prefix_len, &mut emitted)? {
505 return Ok(());
506 }
507 current_prefix = Some(prefix_key.clone());
508 }
509 None => {
510 current_prefix = Some(prefix_key.clone());
511 }
512 }
513
514 bucket.push(RowData {
515 raw,
516 typed,
517 ordinal,
518 });
519 ordinal += 1;
520 }
521
522 self.flush_bucket(&mut bucket, sort_plan, prefix_len, &mut emitted)?;
523
524 Ok(())
525 }
526
527 fn flush_bucket(
528 &mut self,
529 bucket: &mut Vec<RowData>,
530 sort_plan: &[SortInstruction],
531 prefix_len: usize,
532 emitted: &mut usize,
533 ) -> Result<bool> {
534 if bucket.is_empty() {
535 return Ok(false);
536 }
537
538 let remainder_plan = if prefix_len >= sort_plan.len() {
539 &[][..]
540 } else {
541 &sort_plan[prefix_len..]
542 };
543
544 if !remainder_plan.is_empty() {
545 bucket.sort_by(|a, b| compare_rows(a, b, remainder_plan));
546 }
547
548 for row in bucket.drain(..) {
549 if self.limit.is_some_and(|limit| *emitted >= limit) {
550 return Ok(true);
551 }
552 self.emit_row(&row.raw, &row.typed, *emitted + 1)?;
553 *emitted += 1;
554 }
555
556 Ok(false)
557 }
558
559 fn emit_row(
560 &mut self,
561 raw: &[String],
562 typed: &[Option<Value>],
563 row_number: usize,
564 ) -> Result<()> {
565 let record = build_output_record(
566 raw,
567 typed,
568 row_number,
569 self.headers,
570 self.derived_columns,
571 self.output_plan,
572 )?;
573
574 match &mut self.sink {
575 OutputSink::Csv(writer) => writer
576 .write_record(record.iter())
577 .context("Writing output row"),
578 OutputSink::Table(rows) => {
579 rows.push(record);
580 Ok(())
581 }
582 }
583 }
584}
585
586fn build_prefix_key(
587 typed: &[Option<Value>],
588 sort_plan: &[SortInstruction],
589 prefix_len: usize,
590) -> Vec<Option<Value>> {
591 let take = prefix_len.min(sort_plan.len());
592 sort_plan
593 .iter()
594 .take(take)
595 .map(|directive| typed[directive.index].clone())
596 .collect()
597}
598
599fn build_output_record(
600 raw: &[String],
601 typed: &[Option<Value>],
602 row_number: usize,
603 headers: &[String],
604 derived_columns: &[DerivedColumn],
605 output_plan: &OutputPlan,
606) -> Result<Vec<String>> {
607 let mut record = Vec::with_capacity(output_plan.fields.len());
608 for field in &output_plan.fields {
609 match field {
610 OutputField::RowNumber => record.push(row_number.to_string()),
611 OutputField::ExistingColumn(idx) => {
612 let raw_value = raw.get(*idx).map(String::as_str).unwrap_or("");
613 let typed_value = typed.get(*idx).and_then(|v| v.as_ref());
614 let formatted = output_plan.format_existing_value(raw_value, typed_value);
615 record.push(formatted);
616 }
617 OutputField::Derived(idx) => {
618 let derived =
619 derived_columns[*idx].evaluate(headers, raw, typed, Some(row_number))?;
620 record.push(derived);
621 }
622 }
623 }
624 Ok(record)
625}
626
627fn compare_rows(a: &RowData, b: &RowData, plan: &[SortInstruction]) -> std::cmp::Ordering {
628 for directive in plan {
629 let left = ComparableValue(a.typed[directive.index].clone());
630 let right = ComparableValue(b.typed[directive.index].clone());
631 let ord = left.cmp(&right);
632 if ord != std::cmp::Ordering::Equal {
633 return if directive.ascending {
634 ord
635 } else {
636 ord.reverse()
637 };
638 }
639 }
640 a.ordinal.cmp(&b.ordinal)
641}
642
643#[derive(Debug)]
644struct RowData {
645 raw: Vec<String>,
646 typed: Vec<Option<Value>>,
647 ordinal: usize,
648}
649
650#[derive(Debug)]
651struct SortInstruction {
652 index: usize,
653 ascending: bool,
654}
655
656#[derive(Debug)]
657pub struct SortDirective {
658 pub column: String,
659 pub ascending: bool,
660}
661
662impl SortDirective {
663 fn parse(spec: &str) -> Result<Self> {
664 let mut parts = spec.split(':');
665 let column = parts
666 .next()
667 .map(|s| s.trim())
668 .filter(|s| !s.is_empty())
669 .ok_or_else(|| anyhow!("Sort directive is missing a column"))?;
670 let direction = parts.next().unwrap_or("asc");
671 let ascending = match direction.to_ascii_lowercase().as_str() {
672 "asc" => true,
673 "desc" => false,
674 other => {
675 return Err(anyhow!("Unknown sort direction '{other}'"));
676 }
677 };
678 Ok(SortDirective {
679 column: column.to_string(),
680 ascending,
681 })
682 }
683}
684
685struct OutputPlan {
686 headers: Vec<String>,
687 fields: Vec<OutputField>,
688 boolean_format: BooleanFormat,
689}
690
691impl OutputPlan {
692 fn new(
693 headers: &[String],
694 schema: &Schema,
695 selected_columns: &[String],
696 derived: &[DerivedColumn],
697 row_numbers: bool,
698 boolean_format: BooleanFormat,
699 ) -> Result<Self> {
700 let mut fields = Vec::new();
701 let mut output_headers = Vec::new();
702 if row_numbers {
703 fields.push(OutputField::RowNumber);
704 output_headers.push("row_number".to_string());
705 }
706 let column_map = build_column_map(headers, schema);
707 let columns_to_use = if selected_columns.is_empty() {
708 headers.to_vec()
709 } else {
710 selected_columns.to_vec()
711 };
712 for column in columns_to_use {
713 let idx = column_map
714 .get(&column)
715 .copied()
716 .ok_or_else(|| anyhow!("Requested column '{column}' not found"))?;
717 fields.push(OutputField::ExistingColumn(idx));
718 output_headers.push(schema.columns[idx].output_name().to_string());
719 }
720 for (idx, derived_column) in derived.iter().enumerate() {
721 fields.push(OutputField::Derived(idx));
722 output_headers.push(derived_column.name.clone());
723 }
724 Ok(OutputPlan {
725 headers: output_headers,
726 fields,
727 boolean_format,
728 })
729 }
730
731 fn format_existing_value(&self, raw: &str, typed: Option<&Value>) -> String {
732 match (self.boolean_format, typed) {
733 (BooleanFormat::Original, _) => raw.to_string(),
734 (BooleanFormat::TrueFalse, Some(Value::Boolean(true))) => "true".to_string(),
735 (BooleanFormat::TrueFalse, Some(Value::Boolean(false))) => "false".to_string(),
736 (BooleanFormat::OneZero, Some(Value::Boolean(true))) => "1".to_string(),
737 (BooleanFormat::OneZero, Some(Value::Boolean(false))) => "0".to_string(),
738 _ => raw.to_string(),
739 }
740 }
741
742 fn headers(&self) -> &[String] {
743 &self.headers
744 }
745}
746
747#[derive(Debug)]
748enum OutputField {
749 RowNumber,
750 ExistingColumn(usize),
751 Derived(usize),
752}