csv_managed/
append.rs

1use std::path::Path;
2
3use anyhow::{Context, Result, anyhow};
4use log::info;
5
6use crate::{cli::AppendArgs, data::parse_typed_value, io_utils, schema::Schema};
7
8pub fn execute(args: &AppendArgs) -> Result<()> {
9    if args.inputs.is_empty() {
10        return Err(anyhow!("At least one input file must be provided"));
11    }
12
13    let delimiter = io_utils::resolve_input_delimiter(&args.inputs[0], args.delimiter);
14    let input_encoding = io_utils::resolve_encoding(args.input_encoding.as_deref())?;
15    let output_delimiter =
16        io_utils::resolve_output_delimiter(args.output.as_deref(), None, delimiter);
17    let output_encoding = io_utils::resolve_encoding(args.output_encoding.as_deref())?;
18
19    let schema = if let Some(path) = &args.schema {
20        Some(Schema::load(path).with_context(|| format!("Loading schema from {path:?}"))?)
21    } else {
22        None
23    };
24
25    let mut baseline_headers: Option<Vec<String>> = None;
26    let mut writer =
27        io_utils::open_csv_writer(args.output.as_deref(), output_delimiter, output_encoding)?;
28    let mut total_rows = 0usize;
29    let context = AppendContext {
30        delimiter,
31        encoding: input_encoding,
32        schema: schema.as_ref(),
33    };
34
35    {
36        let mut state = AppendState {
37            writer: &mut writer,
38            baseline_headers: &mut baseline_headers,
39            total_rows: &mut total_rows,
40        };
41
42        for (idx, input) in args.inputs.iter().enumerate() {
43            append_single(input.as_path(), idx == 0, &context, &mut state)?;
44            info!("✓ Appended {input:?}");
45        }
46    }
47
48    info!("Wrote {total_rows} data row(s) to output");
49    Ok(())
50}
51
52struct AppendContext<'schema> {
53    delimiter: u8,
54    encoding: &'static encoding_rs::Encoding,
55    schema: Option<&'schema Schema>,
56}
57
58struct AppendState<'writer> {
59    writer: &'writer mut csv::Writer<Box<dyn std::io::Write>>,
60    baseline_headers: &'writer mut Option<Vec<String>>,
61    total_rows: &'writer mut usize,
62}
63
64fn append_single(
65    path: &Path,
66    write_header: bool,
67    context: &AppendContext<'_>,
68    state: &mut AppendState<'_>,
69) -> Result<()> {
70    let mut reader = io_utils::open_csv_reader_from_path(path, context.delimiter, true)?;
71    let headers = io_utils::reader_headers(&mut reader, context.encoding)?;
72
73    if let Some(schema) = context.schema {
74        schema
75            .validate_headers(&headers)
76            .with_context(|| format!("Validating headers for {path:?}"))?;
77    } else if let Some(baseline) = state.baseline_headers {
78        if headers.len() != baseline.len()
79            || !headers
80                .iter()
81                .zip(baseline.iter())
82                .all(|(left, right)| left == right)
83        {
84            return Err(anyhow!(
85                "Header mismatch between {path:?} and baseline ({baseline:?})"
86            ));
87        }
88    } else {
89        *state.baseline_headers = Some(headers.clone());
90    }
91
92    if write_header {
93        if let Some(schema) = context.schema {
94            let output_headers = schema.output_headers();
95            state
96                .writer
97                .write_record(output_headers.iter())
98                .with_context(|| "Writing output headers")?;
99        } else {
100            state
101                .writer
102                .write_record(headers.iter())
103                .with_context(|| "Writing output headers")?;
104        }
105    }
106
107    for (row_idx, record) in reader.byte_records().enumerate() {
108        let record = record.with_context(|| format!("Reading row {} in {path:?}", row_idx + 2))?;
109        let mut decoded = io_utils::decode_record(&record, context.encoding)?;
110        if let Some(schema) = context.schema {
111            schema.apply_replacements_to_row(&mut decoded);
112            validate_record(schema, &decoded, row_idx + 2)?;
113        }
114        state
115            .writer
116            .write_record(decoded.iter())
117            .with_context(|| format!("Writing row {} from {path:?}", row_idx + 2))?;
118        *state.total_rows += 1;
119    }
120
121    Ok(())
122}
123
124fn validate_record(schema: &Schema, record: &[String], row_index: usize) -> Result<()> {
125    for (idx, column) in schema.columns.iter().enumerate() {
126        let value = record.get(idx).map(|s| s.as_str()).unwrap_or("");
127        let normalized = column.normalize_value(value);
128        if normalized.is_empty() {
129            continue;
130        }
131        parse_typed_value(normalized.as_ref(), &column.datatype)
132            .with_context(|| format!("Row {row_index} column '{}'", column.output_name()))?;
133    }
134    Ok(())
135}