csv_managed/
process.rs

1use std::fs::File;
2
3use anyhow::{Context, Result, anyhow};
4use csv::{ByteRecord, Position};
5use itertools::Itertools;
6use log::{debug, info};
7
8use crate::{
9    cli::{BooleanFormat, ProcessArgs},
10    data::{ComparableValue, Value, parse_typed_value},
11    derive::{DerivedColumn, parse_derived_columns},
12    filter::{evaluate_conditions, parse_filters},
13    index::{CsvIndex, IndexVariant, SortDirection},
14    io_utils,
15    schema::{ColumnMeta, ColumnType, Schema},
16};
17
18use encoding_rs::Encoding;
19
20pub fn execute(args: &ProcessArgs) -> Result<()> {
21    let delimiter = io_utils::resolve_input_delimiter(&args.input, args.delimiter);
22    let input_encoding = io_utils::resolve_encoding(args.input_encoding.as_deref())?;
23    let output_delimiter = io_utils::resolve_output_delimiter(
24        args.output.as_deref(),
25        args.output_delimiter,
26        delimiter,
27    );
28    let output_encoding = io_utils::resolve_encoding(args.output_encoding.as_deref())?;
29    info!(
30        "Processing '{}' -> {:?} (delimiter '{}', output '{}')",
31        args.input.display(),
32        args.output
33            .as_ref()
34            .map(|p| p.display().to_string())
35            .unwrap_or_else(|| "stdout".into()),
36        crate::printable_delimiter(delimiter),
37        crate::printable_delimiter(output_delimiter)
38    );
39    let sorts = args
40        .sort
41        .iter()
42        .flat_map(|s| s.split(','))
43        .map(|s| s.trim())
44        .filter(|s| !s.is_empty())
45        .map(SortDirective::parse)
46        .collect::<Result<Vec<_>>>()?;
47    let selected_columns = args
48        .columns
49        .iter()
50        .flat_map(|s| s.split(','))
51        .map(|s| s.trim())
52        .filter(|s| !s.is_empty())
53        .map(|s| s.to_string())
54        .collect::<Vec<_>>();
55    let derived_columns = parse_derived_columns(&args.derives)?;
56    let filters = parse_filters(&args.filters)?;
57
58    let mut reader = io_utils::open_csv_reader_from_path(&args.input, delimiter, true)?;
59    let headers = io_utils::reader_headers(&mut reader, input_encoding)?;
60
61    let mut schema = if let Some(schema_path) = &args.schema {
62        Schema::load(schema_path)?
63    } else {
64        Schema::from_headers(&headers)
65    };
66
67    reconcile_schema_with_headers(&mut schema, &headers)?;
68
69    let maybe_index = if let Some(index_path) = &args.index {
70        Some(CsvIndex::load(index_path)?)
71    } else {
72        None
73    };
74
75    let requested_variant = args
76        .index_variant
77        .as_ref()
78        .map(|name| name.trim())
79        .filter(|name| !name.is_empty())
80        .map(|name| name.to_string());
81
82    if requested_variant.is_some() && maybe_index.is_none() {
83        return Err(anyhow!(
84            "An index variant was specified but no index file was provided"
85        ));
86    }
87
88    let sort_signature = sorts
89        .iter()
90        .map(|s| {
91            (
92                s.column.clone(),
93                if s.ascending {
94                    SortDirection::Asc
95                } else {
96                    SortDirection::Desc
97                },
98            )
99        })
100        .collect_vec();
101
102    let matching_variant: Option<&IndexVariant> = if let Some(index) = maybe_index.as_ref() {
103        if let Some(name) = requested_variant.as_deref() {
104            if sort_signature.is_empty() {
105                return Err(anyhow!(
106                    "Selecting an index variant requires at least one --sort directive"
107                ));
108            }
109            let variant = index.variant_by_name(name).ok_or_else(|| {
110                anyhow!(
111                    "Index variant '{name}' not found in {:?}",
112                    args.index.as_ref().map(|p| p.display().to_string())
113                )
114            })?;
115            if !variant.matches(&sort_signature) {
116                return Err(anyhow!(
117                    "Index variant '{name}' does not match the requested sort order"
118                ));
119            }
120            Some(variant)
121        } else if sort_signature.is_empty() {
122            None
123        } else {
124            index.best_match(&sort_signature)
125        }
126    } else {
127        None
128    };
129
130    let output_path = args.output.as_deref();
131    let mut writer = io_utils::open_csv_writer(output_path, output_delimiter, output_encoding)?;
132
133    let column_map = build_column_map(&headers, &schema);
134    let sort_plan = build_sort_plan(&sorts, &schema, &column_map)?;
135    let filter_conditions = filters;
136
137    let output_plan = OutputPlan::new(
138        &headers,
139        &schema,
140        &selected_columns,
141        &derived_columns,
142        args.row_numbers,
143        args.boolean_format,
144    )?;
145
146    write_headers(&mut writer, &output_plan)?;
147    {
148        let mut engine = ProcessEngine {
149            schema: &schema,
150            headers: &headers,
151            filters: &filter_conditions,
152            derived_columns: &derived_columns,
153            output_plan: &output_plan,
154            writer: &mut writer,
155            limit: args.limit,
156        };
157
158        if let Some(variant) = matching_variant {
159            if io_utils::is_dash(&args.input) {
160                return Err(anyhow!(
161                    "Index accelerated processing requires a regular file input"
162                ));
163            }
164            let mut seek_reader = io_utils::open_seekable_csv_reader(&args.input, delimiter, true)?;
165            // Read and discard headers to align reader position with data start.
166            seek_reader.byte_headers()?;
167            let covered = variant.columns().len();
168            let total = sort_signature.len();
169            info!(
170                "Using index {:?} variant '{}' to accelerate sort",
171                args.index,
172                variant.describe()
173            );
174            if covered < total {
175                debug!(
176                    "Index covers {covered}/{total} sort columns; remaining columns will be sorted in-memory"
177                );
178            }
179            engine.process_with_index(&mut seek_reader, input_encoding, variant, &sort_plan)?;
180        } else {
181            if maybe_index.is_some() {
182                debug!("Index present but not used due to incompatible sort signature");
183            }
184            engine.process_in_memory(reader, input_encoding, sort_plan)?;
185        }
186    }
187    writer.flush().context("Flushing output")
188}
189
190fn reconcile_schema_with_headers(schema: &mut Schema, headers: &[String]) -> Result<()> {
191    if schema.columns.is_empty() {
192        schema.columns = headers
193            .iter()
194            .map(|name| ColumnMeta {
195                name: name.clone(),
196                datatype: ColumnType::String,
197                rename: None,
198                value_replacements: Vec::new(),
199            })
200            .collect();
201        return Ok(());
202    }
203
204    schema.validate_headers(headers)?;
205    Ok(())
206}
207
208fn build_column_map(
209    headers: &[String],
210    schema: &Schema,
211) -> std::collections::HashMap<String, usize> {
212    let mut map = std::collections::HashMap::new();
213    for (idx, header) in headers.iter().enumerate() {
214        map.insert(header.clone(), idx);
215        if let Some(rename) = schema
216            .columns
217            .get(idx)
218            .and_then(|column| column.rename.as_ref())
219            .filter(|rename| !rename.is_empty())
220        {
221            map.insert(rename.clone(), idx);
222        }
223    }
224    map
225}
226
227fn build_sort_plan(
228    directives: &[SortDirective],
229    schema: &Schema,
230    column_map: &std::collections::HashMap<String, usize>,
231) -> Result<Vec<SortInstruction>> {
232    directives
233        .iter()
234        .map(|directive| {
235            let idx = column_map
236                .get(&directive.column)
237                .copied()
238                .or_else(|| schema.column_index(&directive.column))
239                .ok_or_else(|| anyhow!("Sort column '{}' not found", directive.column))?;
240            Ok(SortInstruction {
241                index: idx,
242                ascending: directive.ascending,
243            })
244        })
245        .collect()
246}
247
248fn write_headers(
249    writer: &mut csv::Writer<Box<dyn std::io::Write>>,
250    plan: &OutputPlan,
251) -> Result<()> {
252    writer
253        .write_record(plan.headers.iter())
254        .context("Writing output headers")
255}
256
257struct ProcessEngine<'a> {
258    schema: &'a Schema,
259    headers: &'a [String],
260    filters: &'a [crate::filter::FilterCondition],
261    derived_columns: &'a [DerivedColumn],
262    output_plan: &'a OutputPlan,
263    writer: &'a mut csv::Writer<Box<dyn std::io::Write>>,
264    limit: Option<usize>,
265}
266
267impl<'a> ProcessEngine<'a> {
268    fn process_in_memory(
269        &mut self,
270        reader: csv::Reader<Box<dyn std::io::Read>>,
271        encoding: &'static Encoding,
272        sort_plan: Vec<SortInstruction>,
273    ) -> Result<()> {
274        let mut rows: Vec<RowData> = Vec::new();
275
276        for (ordinal, result) in reader.into_byte_records().enumerate() {
277            let record = result.with_context(|| format!("Reading row {}", ordinal + 2))?;
278            let mut raw = io_utils::decode_record(&record, encoding)?;
279            self.schema.apply_replacements_to_row(&mut raw);
280            let typed = parse_row(&raw, self.schema)?;
281
282            if !self.filters.is_empty()
283                && !evaluate_conditions(self.filters, self.schema, self.headers, &raw, &typed)?
284            {
285                continue;
286            }
287
288            rows.push(RowData {
289                raw,
290                typed,
291                ordinal,
292            });
293        }
294
295        if !sort_plan.is_empty() {
296            rows.sort_by(|a, b| compare_rows(a, b, &sort_plan));
297        }
298
299        emit_rows(
300            rows.into_iter(),
301            self.headers,
302            self.derived_columns,
303            self.output_plan,
304            self.writer,
305            self.limit,
306        )
307    }
308
309    fn process_with_index(
310        &mut self,
311        reader: &mut csv::Reader<std::io::BufReader<File>>,
312        encoding: &'static Encoding,
313        variant: &IndexVariant,
314        sort_plan: &[SortInstruction],
315    ) -> Result<()> {
316        let mut record = ByteRecord::new();
317        let mut emitted = 0usize;
318        let mut ordinal = 0usize;
319        let prefix_len = variant.columns().len();
320        let mut current_prefix: Option<Vec<Option<Value>>> = None;
321        let mut bucket: Vec<RowData> = Vec::new();
322
323        for offset in variant.ordered_offsets() {
324            if self.limit.is_some_and(|limit| emitted >= limit) {
325                break;
326            }
327            let mut position = Position::new();
328            position.set_byte(offset);
329            reader.seek(position)?;
330            if !reader.read_byte_record(&mut record)? {
331                break;
332            }
333            let mut raw = io_utils::decode_record(&record, encoding)?;
334            self.schema.apply_replacements_to_row(&mut raw);
335            let typed = parse_row(&raw, self.schema)?;
336            if !self.filters.is_empty()
337                && !evaluate_conditions(self.filters, self.schema, self.headers, &raw, &typed)?
338            {
339                continue;
340            }
341
342            let prefix_key = build_prefix_key(&typed, sort_plan, prefix_len);
343            match current_prefix.as_ref() {
344                Some(existing) if *existing == prefix_key => {}
345                Some(_) => {
346                    if self.flush_bucket(&mut bucket, sort_plan, prefix_len, &mut emitted)? {
347                        return Ok(());
348                    }
349                    current_prefix = Some(prefix_key.clone());
350                }
351                None => {
352                    current_prefix = Some(prefix_key.clone());
353                }
354            }
355
356            bucket.push(RowData {
357                raw,
358                typed,
359                ordinal,
360            });
361            ordinal += 1;
362        }
363
364        self.flush_bucket(&mut bucket, sort_plan, prefix_len, &mut emitted)?;
365
366        Ok(())
367    }
368
369    fn flush_bucket(
370        &mut self,
371        bucket: &mut Vec<RowData>,
372        sort_plan: &[SortInstruction],
373        prefix_len: usize,
374        emitted: &mut usize,
375    ) -> Result<bool> {
376        if bucket.is_empty() {
377            return Ok(false);
378        }
379
380        let remainder_plan = if prefix_len >= sort_plan.len() {
381            &[][..]
382        } else {
383            &sort_plan[prefix_len..]
384        };
385
386        if !remainder_plan.is_empty() {
387            bucket.sort_by(|a, b| compare_rows(a, b, remainder_plan));
388        }
389
390        for row in bucket.drain(..) {
391            if self.limit.is_some_and(|limit| *emitted >= limit) {
392                return Ok(true);
393            }
394            emit_single_row(
395                &row.raw,
396                &row.typed,
397                *emitted + 1,
398                self.headers,
399                self.derived_columns,
400                self.output_plan,
401                self.writer,
402            )?;
403            *emitted += 1;
404        }
405
406        Ok(false)
407    }
408}
409
410fn build_prefix_key(
411    typed: &[Option<Value>],
412    sort_plan: &[SortInstruction],
413    prefix_len: usize,
414) -> Vec<Option<Value>> {
415    let take = prefix_len.min(sort_plan.len());
416    sort_plan
417        .iter()
418        .take(take)
419        .map(|directive| typed[directive.index].clone())
420        .collect()
421}
422
423fn emit_rows<I>(
424    rows: I,
425    headers: &[String],
426    derived_columns: &[DerivedColumn],
427    output_plan: &OutputPlan,
428    writer: &mut csv::Writer<Box<dyn std::io::Write>>,
429    limit: Option<usize>,
430) -> Result<()>
431where
432    I: Iterator<Item = RowData>,
433{
434    for (written, row) in rows.enumerate() {
435        if limit.is_some_and(|limit| written >= limit) {
436            break;
437        }
438        emit_single_row(
439            &row.raw,
440            &row.typed,
441            written + 1,
442            headers,
443            derived_columns,
444            output_plan,
445            writer,
446        )?;
447    }
448    Ok(())
449}
450
451fn emit_single_row(
452    raw: &[String],
453    typed: &[Option<Value>],
454    row_number: usize,
455    headers: &[String],
456    derived_columns: &[DerivedColumn],
457    output_plan: &OutputPlan,
458    writer: &mut csv::Writer<Box<dyn std::io::Write>>,
459) -> Result<()> {
460    let mut record = Vec::with_capacity(output_plan.fields.len());
461    for field in &output_plan.fields {
462        match field {
463            OutputField::RowNumber => record.push(row_number.to_string()),
464            OutputField::ExistingColumn(idx) => {
465                let raw_value = raw.get(*idx).map(String::as_str).unwrap_or("");
466                let typed_value = typed.get(*idx).and_then(|v| v.as_ref());
467                let formatted = output_plan.format_existing_value(raw_value, typed_value);
468                record.push(formatted);
469            }
470            OutputField::Derived(idx) => {
471                let derived =
472                    derived_columns[*idx].evaluate(headers, raw, typed, Some(row_number))?;
473                record.push(derived);
474            }
475        }
476    }
477    writer
478        .write_record(record.iter())
479        .context("Writing output row")
480}
481
482fn parse_row(raw: &[String], schema: &Schema) -> Result<Vec<Option<Value>>> {
483    schema
484        .columns
485        .iter()
486        .enumerate()
487        .map(|(idx, column)| {
488            let value = raw.get(idx).map(|s| s.as_str()).unwrap_or("");
489            let normalized = column.normalize_value(value);
490            parse_typed_value(normalized.as_ref(), &column.datatype)
491        })
492        .collect()
493}
494
495fn compare_rows(a: &RowData, b: &RowData, plan: &[SortInstruction]) -> std::cmp::Ordering {
496    for directive in plan {
497        let left = ComparableValue(a.typed[directive.index].clone());
498        let right = ComparableValue(b.typed[directive.index].clone());
499        let ord = left.cmp(&right);
500        if ord != std::cmp::Ordering::Equal {
501            return if directive.ascending {
502                ord
503            } else {
504                ord.reverse()
505            };
506        }
507    }
508    a.ordinal.cmp(&b.ordinal)
509}
510
511#[derive(Debug)]
512struct RowData {
513    raw: Vec<String>,
514    typed: Vec<Option<Value>>,
515    ordinal: usize,
516}
517
518#[derive(Debug)]
519struct SortInstruction {
520    index: usize,
521    ascending: bool,
522}
523
524#[derive(Debug)]
525pub struct SortDirective {
526    pub column: String,
527    pub ascending: bool,
528}
529
530impl SortDirective {
531    fn parse(spec: &str) -> Result<Self> {
532        let mut parts = spec.split(':');
533        let column = parts
534            .next()
535            .map(|s| s.trim())
536            .filter(|s| !s.is_empty())
537            .ok_or_else(|| anyhow!("Sort directive is missing a column"))?;
538        let direction = parts.next().unwrap_or("asc");
539        let ascending = match direction.to_ascii_lowercase().as_str() {
540            "asc" => true,
541            "desc" => false,
542            other => {
543                return Err(anyhow!("Unknown sort direction '{other}'"));
544            }
545        };
546        Ok(SortDirective {
547            column: column.to_string(),
548            ascending,
549        })
550    }
551}
552
553struct OutputPlan {
554    headers: Vec<String>,
555    fields: Vec<OutputField>,
556    boolean_format: BooleanFormat,
557}
558
559impl OutputPlan {
560    fn new(
561        headers: &[String],
562        schema: &Schema,
563        selected_columns: &[String],
564        derived: &[DerivedColumn],
565        row_numbers: bool,
566        boolean_format: BooleanFormat,
567    ) -> Result<Self> {
568        let mut fields = Vec::new();
569        let mut output_headers = Vec::new();
570        if row_numbers {
571            fields.push(OutputField::RowNumber);
572            output_headers.push("row_number".to_string());
573        }
574        let column_map = build_column_map(headers, schema);
575        let columns_to_use = if selected_columns.is_empty() {
576            headers.to_vec()
577        } else {
578            selected_columns.to_vec()
579        };
580        for column in columns_to_use {
581            let idx = column_map
582                .get(&column)
583                .copied()
584                .ok_or_else(|| anyhow!("Requested column '{column}' not found"))?;
585            fields.push(OutputField::ExistingColumn(idx));
586            output_headers.push(schema.columns[idx].output_name().to_string());
587        }
588        for (idx, derived_column) in derived.iter().enumerate() {
589            fields.push(OutputField::Derived(idx));
590            output_headers.push(derived_column.name.clone());
591        }
592        Ok(OutputPlan {
593            headers: output_headers,
594            fields,
595            boolean_format,
596        })
597    }
598
599    fn format_existing_value(&self, raw: &str, typed: Option<&Value>) -> String {
600        match (self.boolean_format, typed) {
601            (BooleanFormat::Original, _) => raw.to_string(),
602            (BooleanFormat::TrueFalse, Some(Value::Boolean(true))) => "true".to_string(),
603            (BooleanFormat::TrueFalse, Some(Value::Boolean(false))) => "false".to_string(),
604            (BooleanFormat::OneZero, Some(Value::Boolean(true))) => "1".to_string(),
605            (BooleanFormat::OneZero, Some(Value::Boolean(false))) => "0".to_string(),
606            _ => raw.to_string(),
607        }
608    }
609}
610
611#[derive(Debug)]
612enum OutputField {
613    RowNumber,
614    ExistingColumn(usize),
615    Derived(usize),
616}