1use std::path::Path;
2
3use anyhow::{Context, Result, anyhow};
4use log::info;
5
6use crate::{cli::AppendArgs, data::parse_typed_value, io_utils, schema::Schema};
7
8pub fn execute(args: &AppendArgs) -> Result<()> {
9 if args.inputs.is_empty() {
10 return Err(anyhow!("At least one input file must be provided"));
11 }
12
13 let delimiter = io_utils::resolve_input_delimiter(&args.inputs[0], args.delimiter);
14 let input_encoding = io_utils::resolve_encoding(args.input_encoding.as_deref())?;
15 let output_delimiter =
16 io_utils::resolve_output_delimiter(args.output.as_deref(), None, delimiter);
17 let output_encoding = io_utils::resolve_encoding(args.output_encoding.as_deref())?;
18
19 let schema = if let Some(path) = &args.schema {
20 Some(Schema::load(path).with_context(|| format!("Loading schema from {path:?}"))?)
21 } else {
22 None
23 };
24
25 let mut baseline_headers: Option<Vec<String>> = None;
26 let mut writer =
27 io_utils::open_csv_writer(args.output.as_deref(), output_delimiter, output_encoding)?;
28 let mut total_rows = 0usize;
29 let context = AppendContext {
30 delimiter,
31 encoding: input_encoding,
32 schema: schema.as_ref(),
33 };
34
35 {
36 let mut state = AppendState {
37 writer: &mut writer,
38 baseline_headers: &mut baseline_headers,
39 total_rows: &mut total_rows,
40 };
41
42 for (idx, input) in args.inputs.iter().enumerate() {
43 append_single(input.as_path(), idx == 0, &context, &mut state)?;
44 info!("✓ Appended {input:?}");
45 }
46 }
47
48 info!("Wrote {total_rows} data row(s) to output");
49 Ok(())
50}
51
52struct AppendContext<'schema> {
53 delimiter: u8,
54 encoding: &'static encoding_rs::Encoding,
55 schema: Option<&'schema Schema>,
56}
57
58struct AppendState<'writer> {
59 writer: &'writer mut csv::Writer<Box<dyn std::io::Write>>,
60 baseline_headers: &'writer mut Option<Vec<String>>,
61 total_rows: &'writer mut usize,
62}
63
64fn append_single(
65 path: &Path,
66 write_header: bool,
67 context: &AppendContext<'_>,
68 state: &mut AppendState<'_>,
69) -> Result<()> {
70 let (mut reader, headers, expects_headers) = if let Some(schema) = context.schema {
71 let expects_headers = schema.expects_headers();
72 let mut reader =
73 io_utils::open_csv_reader_from_path(path, context.delimiter, expects_headers)?;
74 let headers = if expects_headers {
75 io_utils::reader_headers(&mut reader, context.encoding)?
76 } else {
77 schema.headers()
78 };
79 (reader, headers, expects_headers)
80 } else {
81 let layout =
82 crate::schema::detect_csv_layout(path, context.delimiter, context.encoding, None)?;
83 let mut reader =
84 io_utils::open_csv_reader_from_path(path, context.delimiter, layout.has_headers)?;
85 let headers = if layout.has_headers {
86 io_utils::reader_headers(&mut reader, context.encoding)?
87 } else {
88 layout.headers.clone()
89 };
90 (reader, headers, layout.has_headers)
91 };
92
93 if let Some(schema) = context.schema {
94 schema
95 .validate_headers(&headers)
96 .with_context(|| format!("Validating headers for {path:?}"))?;
97 } else if let Some(baseline) = state.baseline_headers {
98 if headers.len() != baseline.len()
99 || !headers
100 .iter()
101 .zip(baseline.iter())
102 .all(|(left, right)| left == right)
103 {
104 return Err(anyhow!(
105 "Header mismatch between {path:?} and baseline ({baseline:?})"
106 ));
107 }
108 } else {
109 *state.baseline_headers = Some(headers.clone());
110 }
111
112 if write_header {
113 if let Some(schema) = context.schema {
114 if schema.expects_headers() {
115 let output_headers = schema.output_headers();
116 state
117 .writer
118 .write_record(output_headers.iter())
119 .with_context(|| "Writing output headers")?;
120 }
121 } else if expects_headers {
122 state
123 .writer
124 .write_record(headers.iter())
125 .with_context(|| "Writing output headers")?;
126 }
127 }
128
129 for (row_idx, record) in reader.byte_records().enumerate() {
130 let record = record.with_context(|| format!("Reading row {} in {path:?}", row_idx + 2))?;
131 let mut decoded = io_utils::decode_record(&record, context.encoding)?;
132 if let Some(schema) = context.schema {
133 if schema.has_transformations() {
134 schema
135 .apply_transformations_to_row(&mut decoded)
136 .with_context(|| {
137 format!(
138 "Applying datatype mappings to row {} in {path:?}",
139 row_idx + 2
140 )
141 })?;
142 }
143 schema.apply_replacements_to_row(&mut decoded);
144 validate_record(schema, &decoded, row_idx + 2)?;
145 }
146 state
147 .writer
148 .write_record(decoded.iter())
149 .with_context(|| format!("Writing row {} from {path:?}", row_idx + 2))?;
150 *state.total_rows += 1;
151 }
152
153 Ok(())
154}
155
156fn validate_record(schema: &Schema, record: &[String], row_index: usize) -> Result<()> {
157 for (idx, column) in schema.columns.iter().enumerate() {
158 let value = record.get(idx).map(|s| s.as_str()).unwrap_or("");
159 let normalized = column.normalize_value(value);
160 if normalized.is_empty() {
161 continue;
162 }
163 parse_typed_value(normalized.as_ref(), &column.datatype)
164 .with_context(|| format!("Row {row_index} column '{}'", column.output_name()))?;
165 }
166 Ok(())
167}