1use std::path::Path;
2
3use anyhow::{Context, Result, anyhow};
4use log::info;
5
6use crate::{cli::AppendArgs, data::parse_typed_value, io_utils, schema::Schema};
7
8pub fn execute(args: &AppendArgs) -> Result<()> {
9 if args.inputs.is_empty() {
10 return Err(anyhow!("At least one input file must be provided"));
11 }
12
13 let delimiter = io_utils::resolve_input_delimiter(&args.inputs[0], args.delimiter);
14 let input_encoding = io_utils::resolve_encoding(args.input_encoding.as_deref())?;
15 let output_delimiter =
16 io_utils::resolve_output_delimiter(args.output.as_deref(), None, delimiter);
17 let output_encoding = io_utils::resolve_encoding(args.output_encoding.as_deref())?;
18
19 let schema = if let Some(path) = &args.schema {
20 Some(Schema::load(path).with_context(|| format!("Loading schema from {path:?}"))?)
21 } else {
22 None
23 };
24
25 let mut baseline_headers: Option<Vec<String>> = None;
26 let mut writer =
27 io_utils::open_csv_writer(args.output.as_deref(), output_delimiter, output_encoding)?;
28 let mut total_rows = 0usize;
29 let context = AppendContext {
30 delimiter,
31 encoding: input_encoding,
32 schema: schema.as_ref(),
33 };
34
35 {
36 let mut state = AppendState {
37 writer: &mut writer,
38 baseline_headers: &mut baseline_headers,
39 total_rows: &mut total_rows,
40 };
41
42 for (idx, input) in args.inputs.iter().enumerate() {
43 append_single(input.as_path(), idx == 0, &context, &mut state)?;
44 info!("✓ Appended {input:?}");
45 }
46 }
47
48 info!("Wrote {total_rows} data row(s) to output");
49 Ok(())
50}
51
52struct AppendContext<'schema> {
53 delimiter: u8,
54 encoding: &'static encoding_rs::Encoding,
55 schema: Option<&'schema Schema>,
56}
57
58struct AppendState<'writer> {
59 writer: &'writer mut csv::Writer<Box<dyn std::io::Write>>,
60 baseline_headers: &'writer mut Option<Vec<String>>,
61 total_rows: &'writer mut usize,
62}
63
64fn append_single(
65 path: &Path,
66 write_header: bool,
67 context: &AppendContext<'_>,
68 state: &mut AppendState<'_>,
69) -> Result<()> {
70 let mut reader = io_utils::open_csv_reader_from_path(path, context.delimiter, true)?;
71 let headers = io_utils::reader_headers(&mut reader, context.encoding)?;
72
73 if let Some(schema) = context.schema {
74 schema
75 .validate_headers(&headers)
76 .with_context(|| format!("Validating headers for {path:?}"))?;
77 } else if let Some(baseline) = state.baseline_headers {
78 if headers.len() != baseline.len()
79 || !headers
80 .iter()
81 .zip(baseline.iter())
82 .all(|(left, right)| left == right)
83 {
84 return Err(anyhow!(
85 "Header mismatch between {path:?} and baseline ({baseline:?})"
86 ));
87 }
88 } else {
89 *state.baseline_headers = Some(headers.clone());
90 }
91
92 if write_header {
93 if let Some(schema) = context.schema {
94 let output_headers = schema.output_headers();
95 state
96 .writer
97 .write_record(output_headers.iter())
98 .with_context(|| "Writing output headers")?;
99 } else {
100 state
101 .writer
102 .write_record(headers.iter())
103 .with_context(|| "Writing output headers")?;
104 }
105 }
106
107 for (row_idx, record) in reader.byte_records().enumerate() {
108 let record = record.with_context(|| format!("Reading row {} in {path:?}", row_idx + 2))?;
109 let mut decoded = io_utils::decode_record(&record, context.encoding)?;
110 if let Some(schema) = context.schema {
111 schema.apply_replacements_to_row(&mut decoded);
112 validate_record(schema, &decoded, row_idx + 2)?;
113 }
114 state
115 .writer
116 .write_record(decoded.iter())
117 .with_context(|| format!("Writing row {} from {path:?}", row_idx + 2))?;
118 *state.total_rows += 1;
119 }
120
121 Ok(())
122}
123
124fn validate_record(schema: &Schema, record: &[String], row_index: usize) -> Result<()> {
125 for (idx, column) in schema.columns.iter().enumerate() {
126 let value = record.get(idx).map(|s| s.as_str()).unwrap_or("");
127 let normalized = column.normalize_value(value);
128 if normalized.is_empty() {
129 continue;
130 }
131 parse_typed_value(normalized.as_ref(), &column.datatype)
132 .with_context(|| format!("Row {row_index} column '{}'", column.output_name()))?;
133 }
134 Ok(())
135}