1use std::fs::File;
2
3use anyhow::{Context, Result, anyhow};
4use csv::{ByteRecord, Position};
5use itertools::Itertools;
6use log::{debug, info};
7
8use crate::{
9 cli::{BooleanFormat, ProcessArgs},
10 data::{ComparableValue, Value, parse_typed_value},
11 derive::{DerivedColumn, parse_derived_columns},
12 filter::{evaluate_conditions, parse_filters},
13 index::{CsvIndex, IndexVariant, SortDirection},
14 io_utils,
15 schema::{ColumnMeta, ColumnType, Schema},
16};
17
18use encoding_rs::Encoding;
19
20pub fn execute(args: &ProcessArgs) -> Result<()> {
21 let delimiter = io_utils::resolve_input_delimiter(&args.input, args.delimiter);
22 let input_encoding = io_utils::resolve_encoding(args.input_encoding.as_deref())?;
23 let output_delimiter = io_utils::resolve_output_delimiter(
24 args.output.as_deref(),
25 args.output_delimiter,
26 delimiter,
27 );
28 let output_encoding = io_utils::resolve_encoding(args.output_encoding.as_deref())?;
29 info!(
30 "Processing '{}' -> {:?} (delimiter '{}', output '{}')",
31 args.input.display(),
32 args.output
33 .as_ref()
34 .map(|p| p.display().to_string())
35 .unwrap_or_else(|| "stdout".into()),
36 crate::printable_delimiter(delimiter),
37 crate::printable_delimiter(output_delimiter)
38 );
39 let sorts = args
40 .sort
41 .iter()
42 .flat_map(|s| s.split(','))
43 .map(|s| s.trim())
44 .filter(|s| !s.is_empty())
45 .map(SortDirective::parse)
46 .collect::<Result<Vec<_>>>()?;
47 let selected_columns = args
48 .columns
49 .iter()
50 .flat_map(|s| s.split(','))
51 .map(|s| s.trim())
52 .filter(|s| !s.is_empty())
53 .map(|s| s.to_string())
54 .collect::<Vec<_>>();
55 let derived_columns = parse_derived_columns(&args.derives)?;
56 let filters = parse_filters(&args.filters)?;
57
58 let mut reader = io_utils::open_csv_reader_from_path(&args.input, delimiter, true)?;
59 let headers = io_utils::reader_headers(&mut reader, input_encoding)?;
60
61 let mut schema = if let Some(schema_path) = &args.schema {
62 Schema::load(schema_path)?
63 } else {
64 Schema::from_headers(&headers)
65 };
66
67 reconcile_schema_with_headers(&mut schema, &headers)?;
68
69 let maybe_index = if let Some(index_path) = &args.index {
70 Some(CsvIndex::load(index_path)?)
71 } else {
72 None
73 };
74
75 let requested_variant = args
76 .index_variant
77 .as_ref()
78 .map(|name| name.trim())
79 .filter(|name| !name.is_empty())
80 .map(|name| name.to_string());
81
82 if requested_variant.is_some() && maybe_index.is_none() {
83 return Err(anyhow!(
84 "An index variant was specified but no index file was provided"
85 ));
86 }
87
88 let sort_signature = sorts
89 .iter()
90 .map(|s| {
91 (
92 s.column.clone(),
93 if s.ascending {
94 SortDirection::Asc
95 } else {
96 SortDirection::Desc
97 },
98 )
99 })
100 .collect_vec();
101
102 let matching_variant: Option<&IndexVariant> = if let Some(index) = maybe_index.as_ref() {
103 if let Some(name) = requested_variant.as_deref() {
104 if sort_signature.is_empty() {
105 return Err(anyhow!(
106 "Selecting an index variant requires at least one --sort directive"
107 ));
108 }
109 let variant = index.variant_by_name(name).ok_or_else(|| {
110 anyhow!(
111 "Index variant '{name}' not found in {:?}",
112 args.index.as_ref().map(|p| p.display().to_string())
113 )
114 })?;
115 if !variant.matches(&sort_signature) {
116 return Err(anyhow!(
117 "Index variant '{name}' does not match the requested sort order"
118 ));
119 }
120 Some(variant)
121 } else if sort_signature.is_empty() {
122 None
123 } else {
124 index.best_match(&sort_signature)
125 }
126 } else {
127 None
128 };
129
130 let output_path = args.output.as_deref();
131 let mut writer = io_utils::open_csv_writer(output_path, output_delimiter, output_encoding)?;
132
133 let column_map = build_column_map(&headers, &schema);
134 let sort_plan = build_sort_plan(&sorts, &schema, &column_map)?;
135 let filter_conditions = filters;
136
137 let output_plan = OutputPlan::new(
138 &headers,
139 &schema,
140 &selected_columns,
141 &derived_columns,
142 args.row_numbers,
143 args.boolean_format,
144 )?;
145
146 write_headers(&mut writer, &output_plan)?;
147 {
148 let mut engine = ProcessEngine {
149 schema: &schema,
150 headers: &headers,
151 filters: &filter_conditions,
152 derived_columns: &derived_columns,
153 output_plan: &output_plan,
154 writer: &mut writer,
155 limit: args.limit,
156 };
157
158 if let Some(variant) = matching_variant {
159 if io_utils::is_dash(&args.input) {
160 return Err(anyhow!(
161 "Index accelerated processing requires a regular file input"
162 ));
163 }
164 let mut seek_reader = io_utils::open_seekable_csv_reader(&args.input, delimiter, true)?;
165 seek_reader.byte_headers()?;
167 let covered = variant.columns().len();
168 let total = sort_signature.len();
169 info!(
170 "Using index {:?} variant '{}' to accelerate sort",
171 args.index,
172 variant.describe()
173 );
174 if covered < total {
175 debug!(
176 "Index covers {covered}/{total} sort columns; remaining columns will be sorted in-memory"
177 );
178 }
179 engine.process_with_index(&mut seek_reader, input_encoding, variant, &sort_plan)?;
180 } else {
181 if maybe_index.is_some() {
182 debug!("Index present but not used due to incompatible sort signature");
183 }
184 engine.process_in_memory(reader, input_encoding, sort_plan)?;
185 }
186 }
187 writer.flush().context("Flushing output")
188}
189
190fn reconcile_schema_with_headers(schema: &mut Schema, headers: &[String]) -> Result<()> {
191 if schema.columns.is_empty() {
192 schema.columns = headers
193 .iter()
194 .map(|name| ColumnMeta {
195 name: name.clone(),
196 datatype: ColumnType::String,
197 rename: None,
198 value_replacements: Vec::new(),
199 })
200 .collect();
201 return Ok(());
202 }
203
204 schema.validate_headers(headers)?;
205 Ok(())
206}
207
208fn build_column_map(
209 headers: &[String],
210 schema: &Schema,
211) -> std::collections::HashMap<String, usize> {
212 let mut map = std::collections::HashMap::new();
213 for (idx, header) in headers.iter().enumerate() {
214 map.insert(header.clone(), idx);
215 if let Some(rename) = schema
216 .columns
217 .get(idx)
218 .and_then(|column| column.rename.as_ref())
219 .filter(|rename| !rename.is_empty())
220 {
221 map.insert(rename.clone(), idx);
222 }
223 }
224 map
225}
226
227fn build_sort_plan(
228 directives: &[SortDirective],
229 schema: &Schema,
230 column_map: &std::collections::HashMap<String, usize>,
231) -> Result<Vec<SortInstruction>> {
232 directives
233 .iter()
234 .map(|directive| {
235 let idx = column_map
236 .get(&directive.column)
237 .copied()
238 .or_else(|| schema.column_index(&directive.column))
239 .ok_or_else(|| anyhow!("Sort column '{}' not found", directive.column))?;
240 Ok(SortInstruction {
241 index: idx,
242 ascending: directive.ascending,
243 })
244 })
245 .collect()
246}
247
248fn write_headers(
249 writer: &mut csv::Writer<Box<dyn std::io::Write>>,
250 plan: &OutputPlan,
251) -> Result<()> {
252 writer
253 .write_record(plan.headers.iter())
254 .context("Writing output headers")
255}
256
257struct ProcessEngine<'a> {
258 schema: &'a Schema,
259 headers: &'a [String],
260 filters: &'a [crate::filter::FilterCondition],
261 derived_columns: &'a [DerivedColumn],
262 output_plan: &'a OutputPlan,
263 writer: &'a mut csv::Writer<Box<dyn std::io::Write>>,
264 limit: Option<usize>,
265}
266
267impl<'a> ProcessEngine<'a> {
268 fn process_in_memory(
269 &mut self,
270 reader: csv::Reader<Box<dyn std::io::Read>>,
271 encoding: &'static Encoding,
272 sort_plan: Vec<SortInstruction>,
273 ) -> Result<()> {
274 let mut rows: Vec<RowData> = Vec::new();
275
276 for (ordinal, result) in reader.into_byte_records().enumerate() {
277 let record = result.with_context(|| format!("Reading row {}", ordinal + 2))?;
278 let mut raw = io_utils::decode_record(&record, encoding)?;
279 self.schema.apply_replacements_to_row(&mut raw);
280 let typed = parse_row(&raw, self.schema)?;
281
282 if !self.filters.is_empty()
283 && !evaluate_conditions(self.filters, self.schema, self.headers, &raw, &typed)?
284 {
285 continue;
286 }
287
288 rows.push(RowData {
289 raw,
290 typed,
291 ordinal,
292 });
293 }
294
295 if !sort_plan.is_empty() {
296 rows.sort_by(|a, b| compare_rows(a, b, &sort_plan));
297 }
298
299 emit_rows(
300 rows.into_iter(),
301 self.headers,
302 self.derived_columns,
303 self.output_plan,
304 self.writer,
305 self.limit,
306 )
307 }
308
309 fn process_with_index(
310 &mut self,
311 reader: &mut csv::Reader<std::io::BufReader<File>>,
312 encoding: &'static Encoding,
313 variant: &IndexVariant,
314 sort_plan: &[SortInstruction],
315 ) -> Result<()> {
316 let mut record = ByteRecord::new();
317 let mut emitted = 0usize;
318 let mut ordinal = 0usize;
319 let prefix_len = variant.columns().len();
320 let mut current_prefix: Option<Vec<Option<Value>>> = None;
321 let mut bucket: Vec<RowData> = Vec::new();
322
323 for offset in variant.ordered_offsets() {
324 if self.limit.is_some_and(|limit| emitted >= limit) {
325 break;
326 }
327 let mut position = Position::new();
328 position.set_byte(offset);
329 reader.seek(position)?;
330 if !reader.read_byte_record(&mut record)? {
331 break;
332 }
333 let mut raw = io_utils::decode_record(&record, encoding)?;
334 self.schema.apply_replacements_to_row(&mut raw);
335 let typed = parse_row(&raw, self.schema)?;
336 if !self.filters.is_empty()
337 && !evaluate_conditions(self.filters, self.schema, self.headers, &raw, &typed)?
338 {
339 continue;
340 }
341
342 let prefix_key = build_prefix_key(&typed, sort_plan, prefix_len);
343 match current_prefix.as_ref() {
344 Some(existing) if *existing == prefix_key => {}
345 Some(_) => {
346 if self.flush_bucket(&mut bucket, sort_plan, prefix_len, &mut emitted)? {
347 return Ok(());
348 }
349 current_prefix = Some(prefix_key.clone());
350 }
351 None => {
352 current_prefix = Some(prefix_key.clone());
353 }
354 }
355
356 bucket.push(RowData {
357 raw,
358 typed,
359 ordinal,
360 });
361 ordinal += 1;
362 }
363
364 self.flush_bucket(&mut bucket, sort_plan, prefix_len, &mut emitted)?;
365
366 Ok(())
367 }
368
369 fn flush_bucket(
370 &mut self,
371 bucket: &mut Vec<RowData>,
372 sort_plan: &[SortInstruction],
373 prefix_len: usize,
374 emitted: &mut usize,
375 ) -> Result<bool> {
376 if bucket.is_empty() {
377 return Ok(false);
378 }
379
380 let remainder_plan = if prefix_len >= sort_plan.len() {
381 &[][..]
382 } else {
383 &sort_plan[prefix_len..]
384 };
385
386 if !remainder_plan.is_empty() {
387 bucket.sort_by(|a, b| compare_rows(a, b, remainder_plan));
388 }
389
390 for row in bucket.drain(..) {
391 if self.limit.is_some_and(|limit| *emitted >= limit) {
392 return Ok(true);
393 }
394 emit_single_row(
395 &row.raw,
396 &row.typed,
397 *emitted + 1,
398 self.headers,
399 self.derived_columns,
400 self.output_plan,
401 self.writer,
402 )?;
403 *emitted += 1;
404 }
405
406 Ok(false)
407 }
408}
409
410fn build_prefix_key(
411 typed: &[Option<Value>],
412 sort_plan: &[SortInstruction],
413 prefix_len: usize,
414) -> Vec<Option<Value>> {
415 let take = prefix_len.min(sort_plan.len());
416 sort_plan
417 .iter()
418 .take(take)
419 .map(|directive| typed[directive.index].clone())
420 .collect()
421}
422
423fn emit_rows<I>(
424 rows: I,
425 headers: &[String],
426 derived_columns: &[DerivedColumn],
427 output_plan: &OutputPlan,
428 writer: &mut csv::Writer<Box<dyn std::io::Write>>,
429 limit: Option<usize>,
430) -> Result<()>
431where
432 I: Iterator<Item = RowData>,
433{
434 for (written, row) in rows.enumerate() {
435 if limit.is_some_and(|limit| written >= limit) {
436 break;
437 }
438 emit_single_row(
439 &row.raw,
440 &row.typed,
441 written + 1,
442 headers,
443 derived_columns,
444 output_plan,
445 writer,
446 )?;
447 }
448 Ok(())
449}
450
451fn emit_single_row(
452 raw: &[String],
453 typed: &[Option<Value>],
454 row_number: usize,
455 headers: &[String],
456 derived_columns: &[DerivedColumn],
457 output_plan: &OutputPlan,
458 writer: &mut csv::Writer<Box<dyn std::io::Write>>,
459) -> Result<()> {
460 let mut record = Vec::with_capacity(output_plan.fields.len());
461 for field in &output_plan.fields {
462 match field {
463 OutputField::RowNumber => record.push(row_number.to_string()),
464 OutputField::ExistingColumn(idx) => {
465 let raw_value = raw.get(*idx).map(String::as_str).unwrap_or("");
466 let typed_value = typed.get(*idx).and_then(|v| v.as_ref());
467 let formatted = output_plan.format_existing_value(raw_value, typed_value);
468 record.push(formatted);
469 }
470 OutputField::Derived(idx) => {
471 let derived =
472 derived_columns[*idx].evaluate(headers, raw, typed, Some(row_number))?;
473 record.push(derived);
474 }
475 }
476 }
477 writer
478 .write_record(record.iter())
479 .context("Writing output row")
480}
481
482fn parse_row(raw: &[String], schema: &Schema) -> Result<Vec<Option<Value>>> {
483 schema
484 .columns
485 .iter()
486 .enumerate()
487 .map(|(idx, column)| {
488 let value = raw.get(idx).map(|s| s.as_str()).unwrap_or("");
489 let normalized = column.normalize_value(value);
490 parse_typed_value(normalized.as_ref(), &column.datatype)
491 })
492 .collect()
493}
494
495fn compare_rows(a: &RowData, b: &RowData, plan: &[SortInstruction]) -> std::cmp::Ordering {
496 for directive in plan {
497 let left = ComparableValue(a.typed[directive.index].clone());
498 let right = ComparableValue(b.typed[directive.index].clone());
499 let ord = left.cmp(&right);
500 if ord != std::cmp::Ordering::Equal {
501 return if directive.ascending {
502 ord
503 } else {
504 ord.reverse()
505 };
506 }
507 }
508 a.ordinal.cmp(&b.ordinal)
509}
510
511#[derive(Debug)]
512struct RowData {
513 raw: Vec<String>,
514 typed: Vec<Option<Value>>,
515 ordinal: usize,
516}
517
518#[derive(Debug)]
519struct SortInstruction {
520 index: usize,
521 ascending: bool,
522}
523
524#[derive(Debug)]
525pub struct SortDirective {
526 pub column: String,
527 pub ascending: bool,
528}
529
530impl SortDirective {
531 fn parse(spec: &str) -> Result<Self> {
532 let mut parts = spec.split(':');
533 let column = parts
534 .next()
535 .map(|s| s.trim())
536 .filter(|s| !s.is_empty())
537 .ok_or_else(|| anyhow!("Sort directive is missing a column"))?;
538 let direction = parts.next().unwrap_or("asc");
539 let ascending = match direction.to_ascii_lowercase().as_str() {
540 "asc" => true,
541 "desc" => false,
542 other => {
543 return Err(anyhow!("Unknown sort direction '{other}'"));
544 }
545 };
546 Ok(SortDirective {
547 column: column.to_string(),
548 ascending,
549 })
550 }
551}
552
553struct OutputPlan {
554 headers: Vec<String>,
555 fields: Vec<OutputField>,
556 boolean_format: BooleanFormat,
557}
558
559impl OutputPlan {
560 fn new(
561 headers: &[String],
562 schema: &Schema,
563 selected_columns: &[String],
564 derived: &[DerivedColumn],
565 row_numbers: bool,
566 boolean_format: BooleanFormat,
567 ) -> Result<Self> {
568 let mut fields = Vec::new();
569 let mut output_headers = Vec::new();
570 if row_numbers {
571 fields.push(OutputField::RowNumber);
572 output_headers.push("row_number".to_string());
573 }
574 let column_map = build_column_map(headers, schema);
575 let columns_to_use = if selected_columns.is_empty() {
576 headers.to_vec()
577 } else {
578 selected_columns.to_vec()
579 };
580 for column in columns_to_use {
581 let idx = column_map
582 .get(&column)
583 .copied()
584 .ok_or_else(|| anyhow!("Requested column '{column}' not found"))?;
585 fields.push(OutputField::ExistingColumn(idx));
586 output_headers.push(schema.columns[idx].output_name().to_string());
587 }
588 for (idx, derived_column) in derived.iter().enumerate() {
589 fields.push(OutputField::Derived(idx));
590 output_headers.push(derived_column.name.clone());
591 }
592 Ok(OutputPlan {
593 headers: output_headers,
594 fields,
595 boolean_format,
596 })
597 }
598
599 fn format_existing_value(&self, raw: &str, typed: Option<&Value>) -> String {
600 match (self.boolean_format, typed) {
601 (BooleanFormat::Original, _) => raw.to_string(),
602 (BooleanFormat::TrueFalse, Some(Value::Boolean(true))) => "true".to_string(),
603 (BooleanFormat::TrueFalse, Some(Value::Boolean(false))) => "false".to_string(),
604 (BooleanFormat::OneZero, Some(Value::Boolean(true))) => "1".to_string(),
605 (BooleanFormat::OneZero, Some(Value::Boolean(false))) => "0".to_string(),
606 _ => raw.to_string(),
607 }
608 }
609}
610
611#[derive(Debug)]
612enum OutputField {
613 RowNumber,
614 ExistingColumn(usize),
615 Derived(usize),
616}