csv_managed/
append.rs

1use std::path::Path;
2
3use anyhow::{Context, Result, anyhow};
4use log::info;
5
6use crate::{cli::AppendArgs, data::parse_typed_value, io_utils, schema::Schema};
7
8pub fn execute(args: &AppendArgs) -> Result<()> {
9    if args.inputs.is_empty() {
10        return Err(anyhow!("At least one input file must be provided"));
11    }
12
13    let delimiter = io_utils::resolve_input_delimiter(&args.inputs[0], args.delimiter);
14    let input_encoding = io_utils::resolve_encoding(args.input_encoding.as_deref())?;
15    let output_delimiter =
16        io_utils::resolve_output_delimiter(args.output.as_deref(), None, delimiter);
17    let output_encoding = io_utils::resolve_encoding(args.output_encoding.as_deref())?;
18
19    let schema = if let Some(path) = &args.schema {
20        Some(Schema::load(path).with_context(|| format!("Loading schema from {path:?}"))?)
21    } else {
22        None
23    };
24
25    let mut baseline_headers: Option<Vec<String>> = None;
26    let mut writer =
27        io_utils::open_csv_writer(args.output.as_deref(), output_delimiter, output_encoding)?;
28    let mut total_rows = 0usize;
29    let context = AppendContext {
30        delimiter,
31        encoding: input_encoding,
32        schema: schema.as_ref(),
33    };
34
35    {
36        let mut state = AppendState {
37            writer: &mut writer,
38            baseline_headers: &mut baseline_headers,
39            total_rows: &mut total_rows,
40        };
41
42        for (idx, input) in args.inputs.iter().enumerate() {
43            append_single(input.as_path(), idx == 0, &context, &mut state)?;
44            info!("✓ Appended {input:?}");
45        }
46    }
47
48    info!("Wrote {total_rows} data row(s) to output");
49    Ok(())
50}
51
52struct AppendContext<'schema> {
53    delimiter: u8,
54    encoding: &'static encoding_rs::Encoding,
55    schema: Option<&'schema Schema>,
56}
57
58struct AppendState<'writer> {
59    writer: &'writer mut csv::Writer<Box<dyn std::io::Write>>,
60    baseline_headers: &'writer mut Option<Vec<String>>,
61    total_rows: &'writer mut usize,
62}
63
64fn append_single(
65    path: &Path,
66    write_header: bool,
67    context: &AppendContext<'_>,
68    state: &mut AppendState<'_>,
69) -> Result<()> {
70    let (mut reader, headers, expects_headers) = if let Some(schema) = context.schema {
71        let expects_headers = schema.expects_headers();
72        let mut reader =
73            io_utils::open_csv_reader_from_path(path, context.delimiter, expects_headers)?;
74        let headers = if expects_headers {
75            io_utils::reader_headers(&mut reader, context.encoding)?
76        } else {
77            schema.headers()
78        };
79        (reader, headers, expects_headers)
80    } else {
81        let layout =
82            crate::schema::detect_csv_layout(path, context.delimiter, context.encoding, None)?;
83        let mut reader =
84            io_utils::open_csv_reader_from_path(path, context.delimiter, layout.has_headers)?;
85        let headers = if layout.has_headers {
86            io_utils::reader_headers(&mut reader, context.encoding)?
87        } else {
88            layout.headers.clone()
89        };
90        (reader, headers, layout.has_headers)
91    };
92
93    if let Some(schema) = context.schema {
94        schema
95            .validate_headers(&headers)
96            .with_context(|| format!("Validating headers for {path:?}"))?;
97    } else if let Some(baseline) = state.baseline_headers {
98        if headers.len() != baseline.len()
99            || !headers
100                .iter()
101                .zip(baseline.iter())
102                .all(|(left, right)| left == right)
103        {
104            return Err(anyhow!(
105                "Header mismatch between {path:?} and baseline ({baseline:?})"
106            ));
107        }
108    } else {
109        *state.baseline_headers = Some(headers.clone());
110    }
111
112    if write_header {
113        if let Some(schema) = context.schema {
114            if schema.expects_headers() {
115                let output_headers = schema.output_headers();
116                state
117                    .writer
118                    .write_record(output_headers.iter())
119                    .with_context(|| "Writing output headers")?;
120            }
121        } else if expects_headers {
122            state
123                .writer
124                .write_record(headers.iter())
125                .with_context(|| "Writing output headers")?;
126        }
127    }
128
129    for (row_idx, record) in reader.byte_records().enumerate() {
130        let record = record.with_context(|| format!("Reading row {} in {path:?}", row_idx + 2))?;
131        let mut decoded = io_utils::decode_record(&record, context.encoding)?;
132        if let Some(schema) = context.schema {
133            if schema.has_transformations() {
134                schema
135                    .apply_transformations_to_row(&mut decoded)
136                    .with_context(|| {
137                        format!(
138                            "Applying datatype mappings to row {} in {path:?}",
139                            row_idx + 2
140                        )
141                    })?;
142            }
143            schema.apply_replacements_to_row(&mut decoded);
144            validate_record(schema, &decoded, row_idx + 2)?;
145        }
146        state
147            .writer
148            .write_record(decoded.iter())
149            .with_context(|| format!("Writing row {} from {path:?}", row_idx + 2))?;
150        *state.total_rows += 1;
151    }
152
153    Ok(())
154}
155
156fn validate_record(schema: &Schema, record: &[String], row_index: usize) -> Result<()> {
157    for (idx, column) in schema.columns.iter().enumerate() {
158        let value = record.get(idx).map(|s| s.as_str()).unwrap_or("");
159        let normalized = column.normalize_value(value);
160        if normalized.is_empty() {
161            continue;
162        }
163        parse_typed_value(normalized.as_ref(), &column.datatype)
164            .with_context(|| format!("Row {row_index} column '{}'", column.output_name()))?;
165    }
166    Ok(())
167}