rulemorph 0.3.2

YAML-based declarative data transformation engine for CSV/JSON to JSON
Documentation
use csv::{ReaderBuilder, StringRecordsIntoIter};
use serde_json::{Map, Value as JsonValue};
use std::collections::HashSet;

use crate::error::{TransformError, TransformErrorKind};
use crate::model::RuleFile;

use super::{NormalizationOptions, enforce_records_limit};

pub(crate) fn normalize_csv_records_iter<'a>(
    rule: &RuleFile,
    input: &'a str,
    options: &NormalizationOptions,
) -> Result<CsvRecords<'a>, TransformError> {
    let csv_spec = rule.input.csv.as_ref().ok_or_else(|| {
        TransformError::new(
            TransformErrorKind::InvalidInput,
            "input.csv is required when format=csv",
        )
    })?;
    if csv_spec.delimiter.len() != 1 {
        return Err(TransformError::new(
            TransformErrorKind::InvalidInput,
            "csv.delimiter must be a single-byte character",
        ));
    }

    let mut reader = ReaderBuilder::new()
        .delimiter(csv_spec.delimiter.as_bytes()[0])
        .has_headers(csv_spec.has_header)
        .from_reader(input.as_bytes());

    let headers = if csv_spec.has_header {
        let header_record = reader.headers().map_err(|err| {
            TransformError::new(
                TransformErrorKind::InvalidInput,
                format!("failed to read csv header: {}", err),
            )
        })?;
        header_record
            .iter()
            .enumerate()
            .map(|(index, value)| {
                if index == 0 {
                    value.trim_start_matches('\u{feff}').to_string()
                } else {
                    value.to_string()
                }
            })
            .collect::<Vec<_>>()
    } else {
        let columns = csv_spec
            .columns
            .as_ref()
            .filter(|columns| !columns.is_empty())
            .ok_or_else(|| {
                TransformError::new(
                    TransformErrorKind::InvalidInput,
                    "csv.columns is required when has_header=false",
                )
            })?;
        columns.iter().map(|column| column.name.clone()).collect()
    };
    validate_headers(&headers)?;

    Ok(CsvRecords {
        records: reader.into_records(),
        headers,
        options: options.clone(),
        count: 0,
    })
}

pub(crate) struct CsvRecords<'a> {
    records: StringRecordsIntoIter<&'a [u8]>,
    headers: Vec<String>,
    options: NormalizationOptions,
    count: usize,
}

impl Iterator for CsvRecords<'_> {
    type Item = Result<JsonValue, TransformError>;

    fn next(&mut self) -> Option<Self::Item> {
        let record = match self.records.next()? {
            Ok(record) => record,
            Err(err) => {
                return Some(Err(TransformError::new(
                    TransformErrorKind::InvalidInput,
                    format!("failed to read csv record: {}", err),
                )));
            }
        };
        if record.len() != self.headers.len() {
            return Some(Err(TransformError::new(
                TransformErrorKind::InvalidInput,
                format!(
                    "csv record has {} fields but expected {}",
                    record.len(),
                    self.headers.len()
                ),
            )));
        }

        let mut obj = Map::new();
        for (index, name) in self.headers.iter().enumerate() {
            if let Some(value) = record.get(index) {
                if value.len() > self.options.max_text_bytes {
                    return Some(Err(TransformError::new(
                        TransformErrorKind::InvalidInput,
                        "input exceeds max_text_bytes",
                    )));
                }
                obj.insert(name.clone(), JsonValue::String(value.to_string()));
            }
        }

        self.count += 1;
        if let Err(err) = enforce_records_limit(self.count, &self.options) {
            return Some(Err(err));
        }

        Some(Ok(JsonValue::Object(obj)))
    }
}

fn validate_headers(headers: &[String]) -> Result<(), TransformError> {
    let mut seen = HashSet::new();
    for header in headers {
        if header.trim().is_empty() {
            return Err(TransformError::new(
                TransformErrorKind::InvalidInput,
                "csv header must not be blank",
            ));
        }
        if !seen.insert(header.clone()) {
            return Err(TransformError::new(
                TransformErrorKind::InvalidInput,
                "csv header must be unique",
            ));
        }
    }
    Ok(())
}