etl-unit 0.1.0

Semantic data model for ETL units — qualities and measurements over subjects and time. Built on Polars.
use std::{collections::HashMap, error::Error, fs::File, io::BufReader, path::Path};

use clap::Parser;
use serde::{Deserialize, Serialize};

// ============================================================================
// CLI Arguments
// ============================================================================

#[derive(Parser, Debug)]
#[command(
    version,
    about = "Converts ChartJS JSON configuration to CSV for ETL testing"
)]
struct Args {
    /// Path to the input ChartJS JSON configuration file
    #[arg(short, long, default_value = "pump_data_configjs.json")]
    input: String,

    /// Path to write the output CSV file
    #[arg(short, long, default_value = "tests/fixtures/pump_data.csv")]
    output: String,

    /// Maximum number of data points to export (truncates the data)
    #[arg(short, long, default_value_t = 300)]
    limit: usize,
}

// ============================================================================
// Input Structures (ChartJS JSON)
// ============================================================================

#[derive(Deserialize, Debug)]
struct ChartJsRoot {
    data: ChartData,
}

#[derive(Deserialize, Debug)]
struct ChartData {
    labels: Vec<String>,
    datasets: Vec<ChartDataset>,
}

#[derive(Deserialize, Debug)]
struct ChartDataset {
    label: Option<String>,
    data: Vec<Option<f64>>,
}

// ============================================================================
// Output Structure (CSV Row)
// ============================================================================

#[derive(Serialize, Debug, Default)]
struct OutputRow {
    station_id: String,
    observation_time: String,
    #[serde(skip_serializing_if = "Option::is_none")]
    sump: Option<f64>,
    #[serde(skip_serializing_if = "Option::is_none")]
    suction: Option<f64>,
    #[serde(skip_serializing_if = "Option::is_none")]
    discharge: Option<f64>,
    #[serde(skip_serializing_if = "Option::is_none")]
    fuel: Option<f64>,
    #[serde(skip_serializing_if = "Option::is_none")]
    engine_1: Option<f64>,
    #[serde(skip_serializing_if = "Option::is_none")]
    engine_2: Option<f64>,
    #[serde(skip_serializing_if = "Option::is_none")]
    engine_3: Option<f64>,
    #[serde(skip_serializing_if = "Option::is_none")]
    engine_4: Option<f64>,
    #[serde(skip_serializing_if = "Option::is_none")]
    engine_5: Option<f64>,
}

// ============================================================================
// Logic
// ============================================================================

fn main() -> Result<(), Box<dyn Error>> {
    let args = Args::parse();

    println!("Reading from '{}'...", args.input);
    if !Path::new(&args.input).exists() {
        eprintln!("Error: Input file '{}' not found.", args.input);
        // We return Ok here to avoid panic stacks in CLI usage,
        // but normally you might return an error code.
        return Ok(());
    }

    // 1. Parse JSON
    let file = File::open(&args.input)?;
    let reader = BufReader::new(file);
    let root: ChartJsRoot = serde_json::from_reader(reader)?;

    // 2. Prepare Data Columns
    let mut mapped_datasets: HashMap<String, Vec<Option<f64>>> = HashMap::new();

    for dataset in root.data.datasets {
        if let Some(label) = dataset.label {
            let key = map_label_to_column(&label);
            if let Some(col_name) = key {
                mapped_datasets.insert(col_name.clone(), dataset.data);
                println!("Mapped '{}' -> '{}'", label, col_name);
            } else {
                println!("Skipping dataset: '{}' (No mapping found)", label);
            }
        }
    }

    // 3. Write CSV
    let output_path = Path::new(&args.output);
    if let Some(parent) = output_path.parent() {
        std::fs::create_dir_all(parent)?;
    }

    let mut wtr = csv::Writer::from_path(output_path)?;

    // We explicitly iterate up to the limit or the number of labels available
    let count = root.data.labels.len().min(args.limit);

    for i in 0..count {
        let time = &root.data.labels[i];

        // Construct the row
        let row = OutputRow {
            station_id: "Station_A".to_string(),
            observation_time: time.clone(),
            sump: get_value(&mapped_datasets, "sump", i),
            suction: get_value(&mapped_datasets, "suction", i),
            discharge: get_value(&mapped_datasets, "discharge", i),
            fuel: get_value(&mapped_datasets, "fuel", i),
            engine_1: get_value(&mapped_datasets, "engine_1", i),
            engine_2: get_value(&mapped_datasets, "engine_2", i),
            engine_3: get_value(&mapped_datasets, "engine_3", i),
            engine_4: get_value(&mapped_datasets, "engine_4", i),
            engine_5: get_value(&mapped_datasets, "engine_5", i),
        };

        wtr.serialize(row)?;
    }

    wtr.flush()?;
    println!("Successfully wrote {} rows to '{}'", count, args.output);

    Ok(())
}

/// Simple fuzzy matcher to map ChartJS labels to Schema column names
fn map_label_to_column(label: &str) -> Option<String> {
    let l = label.to_lowercase();
    if l.contains("sump") {
        return Some("sump".into());
    }
    if l.contains("suction") {
        return Some("suction".into());
    }
    if l.contains("discharge") {
        return Some("discharge".into());
    }
    if l.contains("fuel") {
        return Some("fuel".into());
    }
    if l.contains("engine 1") {
        return Some("engine_1".into());
    }
    if l.contains("engine 2") {
        return Some("engine_2".into());
    }
    if l.contains("engine 3") {
        return Some("engine_3".into());
    }
    if l.contains("engine 4") {
        return Some("engine_4".into());
    }
    if l.contains("engine 5") {
        return Some("engine_5".into());
    }
    None
}

/// Safe helper to extract data from the mapped vectors
fn get_value(map: &HashMap<String, Vec<Option<f64>>>, key: &str, index: usize) -> Option<f64> {
    map.get(key)
        .and_then(|vec| if index < vec.len() { vec[index] } else { None })
}