csv_managed/
process.rs

1use std::fs::File;
2
3use anyhow::{Context, Result, anyhow};
4use csv::{ByteRecord, Position};
5use itertools::Itertools;
6use log::{debug, info};
7
8use crate::{
9    cli::{BooleanFormat, ProcessArgs},
10    data::{ComparableValue, Value},
11    derive::{DerivedColumn, parse_derived_columns},
12    filter::{evaluate_conditions, parse_filters},
13    index::{CsvIndex, IndexVariant, SortDirection},
14    io_utils,
15    rows::{evaluate_filter_expressions, parse_typed_row},
16    schema::{ColumnMeta, ColumnType, Schema},
17    table,
18};
19
20use encoding_rs::Encoding;
21
22pub fn execute(args: &ProcessArgs) -> Result<()> {
23    let delimiter = io_utils::resolve_input_delimiter(&args.input, args.delimiter);
24    let input_encoding = io_utils::resolve_encoding(args.input_encoding.as_deref())?;
25    let output_path = args.output.as_deref();
26    let writing_to_stdout = output_path.is_none_or(io_utils::is_dash);
27
28    if args.preview && args.output.is_some() {
29        return Err(anyhow!("--preview cannot be combined with --output"));
30    }
31    let mut limit = args.limit;
32    if args.preview && limit.is_none() {
33        limit = Some(10);
34    }
35    let use_table_output = if args.preview {
36        true
37    } else {
38        args.table && writing_to_stdout
39    };
40    let output_delimiter =
41        io_utils::resolve_output_delimiter(output_path, args.output_delimiter, delimiter);
42    let output_encoding = io_utils::resolve_encoding(args.output_encoding.as_deref())?;
43    info!(
44        "Processing '{}' -> {:?} (delimiter '{}', output '{}')",
45        args.input.display(),
46        output_path
47            .map(|p| p.display().to_string())
48            .unwrap_or_else(|| "stdout".into()),
49        crate::printable_delimiter(delimiter),
50        crate::printable_delimiter(output_delimiter)
51    );
52    let sorts = args
53        .sort
54        .iter()
55        .flat_map(|s| s.split(','))
56        .map(|s| s.trim())
57        .filter(|s| !s.is_empty())
58        .map(SortDirective::parse)
59        .collect::<Result<Vec<_>>>()?;
60    let selected_columns = args
61        .columns
62        .iter()
63        .flat_map(|s| s.split(','))
64        .map(|s| s.trim())
65        .filter(|s| !s.is_empty())
66        .map(|s| s.to_string())
67        .collect::<Vec<_>>();
68    let derived_columns = parse_derived_columns(&args.derives)?;
69    let filters = parse_filters(&args.filters)?;
70
71    let mut reader;
72    let headers: Vec<String>;
73    let mut schema: Schema;
74
75    if let Some(schema_path) = &args.schema {
76        schema = Schema::load(schema_path)?;
77        let expects_headers = schema.expects_headers();
78        reader = io_utils::open_csv_reader_from_path(&args.input, delimiter, expects_headers)?;
79        headers = if expects_headers {
80            io_utils::reader_headers(&mut reader, input_encoding)?
81        } else {
82            schema.headers()
83        };
84    } else {
85        let layout =
86            crate::schema::detect_csv_layout(&args.input, delimiter, input_encoding, None)?;
87        reader = io_utils::open_csv_reader_from_path(&args.input, delimiter, layout.has_headers)?;
88        headers = if layout.has_headers {
89            io_utils::reader_headers(&mut reader, input_encoding)?
90        } else {
91            layout.headers.clone()
92        };
93        schema = Schema::from_headers(&headers);
94        schema.has_headers = layout.has_headers;
95    }
96
97    reconcile_schema_with_headers(&mut schema, &headers)?;
98
99    if args.apply_mappings && args.skip_mappings {
100        return Err(anyhow!(
101            "--apply-mappings and --skip-mappings cannot be used together"
102        ));
103    }
104    let schema_has_mappings = schema.has_transformations();
105    let apply_mappings = if args.skip_mappings {
106        false
107    } else if args.apply_mappings {
108        if !schema_has_mappings {
109            debug!("--apply-mappings requested but schema defines no datatype mappings");
110        }
111        schema_has_mappings
112    } else {
113        schema_has_mappings
114    };
115
116    let maybe_index = if let Some(index_path) = &args.index {
117        Some(CsvIndex::load(index_path)?)
118    } else {
119        None
120    };
121
122    let requested_variant = args
123        .index_variant
124        .as_ref()
125        .map(|name| name.trim())
126        .filter(|name| !name.is_empty())
127        .map(|name| name.to_string());
128
129    if requested_variant.is_some() && maybe_index.is_none() {
130        return Err(anyhow!(
131            "An index variant was specified but no index file was provided"
132        ));
133    }
134
135    let sort_signature = sorts
136        .iter()
137        .map(|s| {
138            (
139                s.column.clone(),
140                if s.ascending {
141                    SortDirection::Asc
142                } else {
143                    SortDirection::Desc
144                },
145            )
146        })
147        .collect_vec();
148
149    let matching_variant: Option<&IndexVariant> = if let Some(index) = maybe_index.as_ref() {
150        if let Some(name) = requested_variant.as_deref() {
151            if sort_signature.is_empty() {
152                return Err(anyhow!(
153                    "Selecting an index variant requires at least one --sort directive"
154                ));
155            }
156            let variant = index.variant_by_name(name).ok_or_else(|| {
157                anyhow!(
158                    "Index variant '{name}' not found in {:?}",
159                    args.index.as_ref().map(|p| p.display().to_string())
160                )
161            })?;
162            if !variant.matches(&sort_signature) {
163                return Err(anyhow!(
164                    "Index variant '{name}' does not match the requested sort order"
165                ));
166            }
167            Some(variant)
168        } else if sort_signature.is_empty() {
169            None
170        } else {
171            index.best_match(&sort_signature)
172        }
173    } else {
174        None
175    };
176
177    let column_map = build_column_map(&headers, &schema);
178    let sort_plan = build_sort_plan(&sorts, &schema, &column_map)?;
179    let filter_conditions = filters;
180
181    let output_plan = OutputPlan::new(
182        &headers,
183        &schema,
184        &selected_columns,
185        &derived_columns,
186        args.row_numbers,
187        args.boolean_format,
188    )?;
189
190    if args.table && !use_table_output && !args.preview {
191        debug!("--table requested but output will remain CSV because a file path was provided");
192    }
193
194    if use_table_output {
195        let mut rows_for_table = Vec::new();
196        {
197            let mut engine = ProcessEngine {
198                schema: &schema,
199                headers: &headers,
200                filters: &filter_conditions,
201                filter_exprs: &args.filter_exprs,
202                derived_columns: &derived_columns,
203                output_plan: &output_plan,
204                sink: OutputSink::Table(&mut rows_for_table),
205                limit,
206                apply_mappings,
207            };
208
209            if let Some(variant) = matching_variant {
210                if io_utils::is_dash(&args.input) {
211                    return Err(anyhow!(
212                        "Index accelerated processing requires a regular file input"
213                    ));
214                }
215                let mut seek_reader =
216                    io_utils::open_seekable_csv_reader(&args.input, delimiter, true)?;
217                // Read and discard headers to align reader position with data start.
218                seek_reader.byte_headers()?;
219                let covered = variant.columns().len();
220                let total = sort_signature.len();
221                info!(
222                    "Using index {:?} variant '{}' to accelerate sort",
223                    args.index,
224                    variant.describe()
225                );
226                if covered < total {
227                    debug!(
228                        "Index covers {covered}/{total} sort columns; remaining columns will be sorted in-memory"
229                    );
230                }
231                engine.process_with_index(&mut seek_reader, input_encoding, variant, &sort_plan)?;
232            } else {
233                if maybe_index.is_some() {
234                    debug!("Index present but not used due to incompatible sort signature");
235                }
236                engine.process_in_memory(reader, input_encoding, sort_plan)?;
237            }
238        }
239
240        table::print_table(output_plan.headers(), &rows_for_table);
241        if args.preview {
242            info!(
243                "Displayed {} row(s) from {:?}",
244                rows_for_table.len(),
245                args.input
246            );
247        }
248        Ok(())
249    } else {
250        let mut writer = io_utils::open_csv_writer(output_path, output_delimiter, output_encoding)?;
251        write_headers(&mut writer, &output_plan)?;
252        {
253            let mut engine = ProcessEngine {
254                schema: &schema,
255                headers: &headers,
256                filters: &filter_conditions,
257                filter_exprs: &args.filter_exprs,
258                derived_columns: &derived_columns,
259                output_plan: &output_plan,
260                sink: OutputSink::Csv(&mut writer),
261                limit,
262                apply_mappings,
263            };
264
265            if let Some(variant) = matching_variant {
266                if io_utils::is_dash(&args.input) {
267                    return Err(anyhow!(
268                        "Index accelerated processing requires a regular file input"
269                    ));
270                }
271                let mut seek_reader =
272                    io_utils::open_seekable_csv_reader(&args.input, delimiter, true)?;
273                // Read and discard headers to align reader position with data start.
274                seek_reader.byte_headers()?;
275                let covered = variant.columns().len();
276                let total = sort_signature.len();
277                info!(
278                    "Using index {:?} variant '{}' to accelerate sort",
279                    args.index,
280                    variant.describe()
281                );
282                if covered < total {
283                    debug!(
284                        "Index covers {covered}/{total} sort columns; remaining columns will be sorted in-memory"
285                    );
286                }
287                engine.process_with_index(&mut seek_reader, input_encoding, variant, &sort_plan)?;
288            } else {
289                if maybe_index.is_some() {
290                    debug!("Index present but not used due to incompatible sort signature");
291                }
292                engine.process_in_memory(reader, input_encoding, sort_plan)?;
293            }
294        }
295        writer.flush().context("Flushing output")
296    }
297}
298
299fn reconcile_schema_with_headers(schema: &mut Schema, headers: &[String]) -> Result<()> {
300    if schema.columns.is_empty() {
301        schema.columns = headers
302            .iter()
303            .map(|name| ColumnMeta {
304                name: name.clone(),
305                datatype: ColumnType::String,
306                rename: None,
307                value_replacements: Vec::new(),
308                datatype_mappings: Vec::new(),
309            })
310            .collect();
311        return Ok(());
312    }
313
314    schema.validate_headers(headers)?;
315    Ok(())
316}
317
318fn build_column_map(
319    headers: &[String],
320    schema: &Schema,
321) -> std::collections::HashMap<String, usize> {
322    let mut map = std::collections::HashMap::new();
323    for (idx, header) in headers.iter().enumerate() {
324        map.insert(header.clone(), idx);
325        if let Some(rename) = schema
326            .columns
327            .get(idx)
328            .and_then(|column| column.rename.as_ref())
329            .filter(|rename| !rename.is_empty())
330        {
331            map.insert(rename.clone(), idx);
332        }
333    }
334    map
335}
336
337fn build_sort_plan(
338    directives: &[SortDirective],
339    schema: &Schema,
340    column_map: &std::collections::HashMap<String, usize>,
341) -> Result<Vec<SortInstruction>> {
342    directives
343        .iter()
344        .map(|directive| {
345            let idx = column_map
346                .get(&directive.column)
347                .copied()
348                .or_else(|| schema.column_index(&directive.column))
349                .ok_or_else(|| anyhow!("Sort column '{}' not found", directive.column))?;
350            Ok(SortInstruction {
351                index: idx,
352                ascending: directive.ascending,
353            })
354        })
355        .collect()
356}
357
358fn write_headers(
359    writer: &mut csv::Writer<Box<dyn std::io::Write>>,
360    plan: &OutputPlan,
361) -> Result<()> {
362    writer
363        .write_record(plan.headers.iter())
364        .context("Writing output headers")
365}
366
367enum OutputSink<'a> {
368    Csv(&'a mut csv::Writer<Box<dyn std::io::Write>>),
369    Table(&'a mut Vec<Vec<String>>),
370}
371
372struct ProcessEngine<'a, 'b> {
373    schema: &'a Schema,
374    headers: &'a [String],
375    filters: &'a [crate::filter::FilterCondition],
376    filter_exprs: &'a [String],
377    derived_columns: &'a [DerivedColumn],
378    output_plan: &'a OutputPlan,
379    sink: OutputSink<'b>,
380    limit: Option<usize>,
381    apply_mappings: bool,
382}
383
384impl<'a, 'b> ProcessEngine<'a, 'b> {
385    fn process_in_memory(
386        &mut self,
387        reader: csv::Reader<Box<dyn std::io::Read>>,
388        encoding: &'static Encoding,
389        sort_plan: Vec<SortInstruction>,
390    ) -> Result<()> {
391        let mut rows: Vec<RowData> = Vec::new();
392
393        for (ordinal, result) in reader.into_byte_records().enumerate() {
394            let record = result.with_context(|| format!("Reading row {}", ordinal + 2))?;
395            let mut raw = io_utils::decode_record(&record, encoding)?;
396            if self.apply_mappings {
397                self.schema
398                    .apply_transformations_to_row(&mut raw)
399                    .with_context(|| {
400                        format!("Applying datatype mappings to row {}", ordinal + 2)
401                    })?;
402            }
403            self.schema.apply_replacements_to_row(&mut raw);
404            let typed = parse_typed_row(self.schema, &raw)?;
405
406            if !self.filters.is_empty()
407                && !evaluate_conditions(self.filters, self.schema, self.headers, &raw, &typed)?
408            {
409                continue;
410            }
411
412            if !self.filter_exprs.is_empty()
413                && !evaluate_filter_expressions(
414                    self.filter_exprs,
415                    self.headers,
416                    &raw,
417                    &typed,
418                    Some(ordinal + 1),
419                )?
420            {
421                continue;
422            }
423
424            rows.push(RowData {
425                raw,
426                typed,
427                ordinal,
428            });
429        }
430
431        if !sort_plan.is_empty() {
432            rows.sort_by(|a, b| compare_rows(a, b, &sort_plan));
433        }
434
435        for (written, row) in rows.into_iter().enumerate() {
436            if self.limit.is_some_and(|limit| written >= limit) {
437                break;
438            }
439            self.emit_row(&row.raw, &row.typed, written + 1)?;
440        }
441
442        Ok(())
443    }
444
445    fn process_with_index(
446        &mut self,
447        reader: &mut csv::Reader<std::io::BufReader<File>>,
448        encoding: &'static Encoding,
449        variant: &IndexVariant,
450        sort_plan: &[SortInstruction],
451    ) -> Result<()> {
452        let mut record = ByteRecord::new();
453        let mut emitted = 0usize;
454        let mut ordinal = 0usize;
455        let prefix_len = variant.columns().len();
456        let mut current_prefix: Option<Vec<Option<Value>>> = None;
457        let mut bucket: Vec<RowData> = Vec::new();
458
459        for offset in variant.ordered_offsets() {
460            if self.limit.is_some_and(|limit| emitted >= limit) {
461                break;
462            }
463            let mut position = Position::new();
464            position.set_byte(offset);
465            reader.seek(position)?;
466            if !reader.read_byte_record(&mut record)? {
467                break;
468            }
469            let mut raw = io_utils::decode_record(&record, encoding)?;
470            if self.apply_mappings {
471                self.schema
472                    .apply_transformations_to_row(&mut raw)
473                    .with_context(|| {
474                        format!(
475                            "Applying datatype mappings to indexed row at byte offset {}",
476                            offset
477                        )
478                    })?;
479            }
480            self.schema.apply_replacements_to_row(&mut raw);
481            let typed = parse_typed_row(self.schema, &raw)?;
482            if !self.filters.is_empty()
483                && !evaluate_conditions(self.filters, self.schema, self.headers, &raw, &typed)?
484            {
485                continue;
486            }
487
488            if !self.filter_exprs.is_empty()
489                && !evaluate_filter_expressions(
490                    self.filter_exprs,
491                    self.headers,
492                    &raw,
493                    &typed,
494                    Some(ordinal + 1),
495                )?
496            {
497                continue;
498            }
499
500            let prefix_key = build_prefix_key(&typed, sort_plan, prefix_len);
501            match current_prefix.as_ref() {
502                Some(existing) if *existing == prefix_key => {}
503                Some(_) => {
504                    if self.flush_bucket(&mut bucket, sort_plan, prefix_len, &mut emitted)? {
505                        return Ok(());
506                    }
507                    current_prefix = Some(prefix_key.clone());
508                }
509                None => {
510                    current_prefix = Some(prefix_key.clone());
511                }
512            }
513
514            bucket.push(RowData {
515                raw,
516                typed,
517                ordinal,
518            });
519            ordinal += 1;
520        }
521
522        self.flush_bucket(&mut bucket, sort_plan, prefix_len, &mut emitted)?;
523
524        Ok(())
525    }
526
527    fn flush_bucket(
528        &mut self,
529        bucket: &mut Vec<RowData>,
530        sort_plan: &[SortInstruction],
531        prefix_len: usize,
532        emitted: &mut usize,
533    ) -> Result<bool> {
534        if bucket.is_empty() {
535            return Ok(false);
536        }
537
538        let remainder_plan = if prefix_len >= sort_plan.len() {
539            &[][..]
540        } else {
541            &sort_plan[prefix_len..]
542        };
543
544        if !remainder_plan.is_empty() {
545            bucket.sort_by(|a, b| compare_rows(a, b, remainder_plan));
546        }
547
548        for row in bucket.drain(..) {
549            if self.limit.is_some_and(|limit| *emitted >= limit) {
550                return Ok(true);
551            }
552            self.emit_row(&row.raw, &row.typed, *emitted + 1)?;
553            *emitted += 1;
554        }
555
556        Ok(false)
557    }
558
559    fn emit_row(
560        &mut self,
561        raw: &[String],
562        typed: &[Option<Value>],
563        row_number: usize,
564    ) -> Result<()> {
565        let record = build_output_record(
566            raw,
567            typed,
568            row_number,
569            self.headers,
570            self.derived_columns,
571            self.output_plan,
572        )?;
573
574        match &mut self.sink {
575            OutputSink::Csv(writer) => writer
576                .write_record(record.iter())
577                .context("Writing output row"),
578            OutputSink::Table(rows) => {
579                rows.push(record);
580                Ok(())
581            }
582        }
583    }
584}
585
586fn build_prefix_key(
587    typed: &[Option<Value>],
588    sort_plan: &[SortInstruction],
589    prefix_len: usize,
590) -> Vec<Option<Value>> {
591    let take = prefix_len.min(sort_plan.len());
592    sort_plan
593        .iter()
594        .take(take)
595        .map(|directive| typed[directive.index].clone())
596        .collect()
597}
598
599fn build_output_record(
600    raw: &[String],
601    typed: &[Option<Value>],
602    row_number: usize,
603    headers: &[String],
604    derived_columns: &[DerivedColumn],
605    output_plan: &OutputPlan,
606) -> Result<Vec<String>> {
607    let mut record = Vec::with_capacity(output_plan.fields.len());
608    for field in &output_plan.fields {
609        match field {
610            OutputField::RowNumber => record.push(row_number.to_string()),
611            OutputField::ExistingColumn(idx) => {
612                let raw_value = raw.get(*idx).map(String::as_str).unwrap_or("");
613                let typed_value = typed.get(*idx).and_then(|v| v.as_ref());
614                let formatted = output_plan.format_existing_value(raw_value, typed_value);
615                record.push(formatted);
616            }
617            OutputField::Derived(idx) => {
618                let derived =
619                    derived_columns[*idx].evaluate(headers, raw, typed, Some(row_number))?;
620                record.push(derived);
621            }
622        }
623    }
624    Ok(record)
625}
626
627fn compare_rows(a: &RowData, b: &RowData, plan: &[SortInstruction]) -> std::cmp::Ordering {
628    for directive in plan {
629        let left = ComparableValue(a.typed[directive.index].clone());
630        let right = ComparableValue(b.typed[directive.index].clone());
631        let ord = left.cmp(&right);
632        if ord != std::cmp::Ordering::Equal {
633            return if directive.ascending {
634                ord
635            } else {
636                ord.reverse()
637            };
638        }
639    }
640    a.ordinal.cmp(&b.ordinal)
641}
642
643#[derive(Debug)]
644struct RowData {
645    raw: Vec<String>,
646    typed: Vec<Option<Value>>,
647    ordinal: usize,
648}
649
650#[derive(Debug)]
651struct SortInstruction {
652    index: usize,
653    ascending: bool,
654}
655
656#[derive(Debug)]
657pub struct SortDirective {
658    pub column: String,
659    pub ascending: bool,
660}
661
662impl SortDirective {
663    fn parse(spec: &str) -> Result<Self> {
664        let mut parts = spec.split(':');
665        let column = parts
666            .next()
667            .map(|s| s.trim())
668            .filter(|s| !s.is_empty())
669            .ok_or_else(|| anyhow!("Sort directive is missing a column"))?;
670        let direction = parts.next().unwrap_or("asc");
671        let ascending = match direction.to_ascii_lowercase().as_str() {
672            "asc" => true,
673            "desc" => false,
674            other => {
675                return Err(anyhow!("Unknown sort direction '{other}'"));
676            }
677        };
678        Ok(SortDirective {
679            column: column.to_string(),
680            ascending,
681        })
682    }
683}
684
685struct OutputPlan {
686    headers: Vec<String>,
687    fields: Vec<OutputField>,
688    boolean_format: BooleanFormat,
689}
690
691impl OutputPlan {
692    fn new(
693        headers: &[String],
694        schema: &Schema,
695        selected_columns: &[String],
696        derived: &[DerivedColumn],
697        row_numbers: bool,
698        boolean_format: BooleanFormat,
699    ) -> Result<Self> {
700        let mut fields = Vec::new();
701        let mut output_headers = Vec::new();
702        if row_numbers {
703            fields.push(OutputField::RowNumber);
704            output_headers.push("row_number".to_string());
705        }
706        let column_map = build_column_map(headers, schema);
707        let columns_to_use = if selected_columns.is_empty() {
708            headers.to_vec()
709        } else {
710            selected_columns.to_vec()
711        };
712        for column in columns_to_use {
713            let idx = column_map
714                .get(&column)
715                .copied()
716                .ok_or_else(|| anyhow!("Requested column '{column}' not found"))?;
717            fields.push(OutputField::ExistingColumn(idx));
718            output_headers.push(schema.columns[idx].output_name().to_string());
719        }
720        for (idx, derived_column) in derived.iter().enumerate() {
721            fields.push(OutputField::Derived(idx));
722            output_headers.push(derived_column.name.clone());
723        }
724        Ok(OutputPlan {
725            headers: output_headers,
726            fields,
727            boolean_format,
728        })
729    }
730
731    fn format_existing_value(&self, raw: &str, typed: Option<&Value>) -> String {
732        match (self.boolean_format, typed) {
733            (BooleanFormat::Original, _) => raw.to_string(),
734            (BooleanFormat::TrueFalse, Some(Value::Boolean(true))) => "true".to_string(),
735            (BooleanFormat::TrueFalse, Some(Value::Boolean(false))) => "false".to_string(),
736            (BooleanFormat::OneZero, Some(Value::Boolean(true))) => "1".to_string(),
737            (BooleanFormat::OneZero, Some(Value::Boolean(false))) => "0".to_string(),
738            _ => raw.to_string(),
739        }
740    }
741
742    fn headers(&self) -> &[String] {
743        &self.headers
744    }
745}
746
747#[derive(Debug)]
748enum OutputField {
749    RowNumber,
750    ExistingColumn(usize),
751    Derived(usize),
752}