floe-core 0.3.7

Core library for Floe, a YAML-driven technical ingestion tool.
Documentation
use std::fs;
use std::path::{Path, PathBuf};

use floe_core::report::FileStatus;
use floe_core::{run, RunOptions};
use polars::prelude::{ParquetReader, SerReader};

fn write_json(dir: &Path, name: &str, contents: &str) -> PathBuf {
    let path = dir.join(name);
    fs::write(&path, contents).expect("write json");
    path
}

fn write_config(dir: &Path, contents: &str) -> PathBuf {
    let path = dir.join("config.yml");
    fs::write(&path, contents).expect("write config");
    path
}

fn parquet_files(dir: &Path) -> Vec<PathBuf> {
    let mut parquet_files = Vec::new();
    if dir.exists() {
        for entry in fs::read_dir(dir).expect("read accepted dir") {
            let entry = entry.expect("read entry");
            if entry.path().extension().and_then(|ext| ext.to_str()) == Some("parquet") {
                parquet_files.push(entry.path());
            }
        }
    }
    parquet_files
}

#[test]
fn json_selector_strict_rejects_non_scalar() {
    let temp_dir = tempfile::TempDir::new().expect("temp dir");
    let root = temp_dir.path();
    let input_dir = root.join("in");
    let accepted_dir = root.join("out/accepted/customer");
    let rejected_dir = root.join("out/rejected/customer");
    let report_dir = root.join("report");

    fs::create_dir_all(&input_dir).expect("create input dir");
    write_json(
        &input_dir,
        "data.json",
        r#"[{"id":"1","user":{"name":"Ada"}}]"#,
    );

    let yaml = format!(
        r#"version: "0.1"
report:
  path: "{report_dir}"
entities:
  - name: "customer"
    source:
      format: "json"
      path: "{input_dir}"
      cast_mode: "strict"
    sink:
      accepted:
        format: "parquet"
        path: "{accepted_dir}"
      rejected:
        format: "csv"
        path: "{rejected_dir}"
    policy:
      severity: "reject"
    schema:
      columns:
        - name: "id"
          type: "string"
        - name: "user_payload"
          source: "user"
          type: "string"
"#,
        report_dir = report_dir.display(),
        input_dir = input_dir.display(),
        accepted_dir = accepted_dir.display(),
        rejected_dir = rejected_dir.display(),
    );
    let config_path = write_config(root, &yaml);

    let outcome = run(
        &config_path,
        RunOptions {
            profile: None,
            run_id: Some("it-json-strict".to_string()),
            entities: Vec::new(),
            dry_run: false,
        },
    )
    .expect("run config");

    let file_report = &outcome.entity_outcomes[0].report.files[0];
    assert_eq!(file_report.status, FileStatus::Rejected);
    assert_eq!(
        file_report
            .mismatch
            .error
            .as_ref()
            .expect("mismatch error")
            .rule,
        "json_selector_non_scalar"
    );
}

#[test]
fn json_selector_coerce_stringifies_non_scalar() {
    let temp_dir = tempfile::TempDir::new().expect("temp dir");
    let root = temp_dir.path();
    let input_dir = root.join("in");
    let accepted_dir = root.join("out/accepted/customer");
    let rejected_dir = root.join("out/rejected/customer");
    let report_dir = root.join("report");

    fs::create_dir_all(&input_dir).expect("create input dir");
    write_json(
        &input_dir,
        "data.json",
        r#"[{"id":"1","user":{"name":"Ada"}}]"#,
    );

    let yaml = format!(
        r#"version: "0.1"
report:
  path: "{report_dir}"
entities:
  - name: "customer"
    source:
      format: "json"
      path: "{input_dir}"
      cast_mode: "coerce"
    sink:
      accepted:
        format: "parquet"
        path: "{accepted_dir}"
      rejected:
        format: "csv"
        path: "{rejected_dir}"
    policy:
      severity: "warn"
    schema:
      columns:
        - name: "id"
          type: "string"
        - name: "user_payload"
          source: "user"
          type: "string"
"#,
        report_dir = report_dir.display(),
        input_dir = input_dir.display(),
        accepted_dir = accepted_dir.display(),
        rejected_dir = rejected_dir.display(),
    );
    let config_path = write_config(root, &yaml);

    let outcome = run(
        &config_path,
        RunOptions {
            profile: None,
            run_id: Some("it-json-coerce".to_string()),
            entities: Vec::new(),
            dry_run: false,
        },
    )
    .expect("run config");

    let file_report = &outcome.entity_outcomes[0].report.files[0];
    assert_eq!(file_report.status, FileStatus::Success);

    let files = parquet_files(&accepted_dir);
    assert_eq!(files.len(), 1);
    let file = fs::File::open(&files[0]).expect("open parquet");
    let df = ParquetReader::new(file).finish().expect("read parquet");
    let value = df
        .column("user_payload")
        .expect("user_payload column")
        .as_materialized_series()
        .str()
        .expect("string column")
        .get(0)
        .expect("value");
    assert_eq!(value, "{\"name\":\"Ada\"}");
}

#[test]
fn json_selector_mismatch_uses_top_level_only() {
    let temp_dir = tempfile::TempDir::new().expect("temp dir");
    let root = temp_dir.path();
    let input_dir = root.join("in");
    let accepted_dir = root.join("out/accepted/customer");
    let rejected_dir = root.join("out/rejected/customer");
    let report_dir = root.join("report");

    fs::create_dir_all(&input_dir).expect("create input dir");
    write_json(
        &input_dir,
        "data.json",
        r#"[{"id":"1","user":{"name":"Ada"},"extra":"x"}]"#,
    );

    let yaml = format!(
        r#"version: "0.1"
report:
  path: "{report_dir}"
entities:
  - name: "customer"
    source:
      format: "json"
      path: "{input_dir}"
      cast_mode: "strict"
    sink:
      accepted:
        format: "parquet"
        path: "{accepted_dir}"
      rejected:
        format: "csv"
        path: "{rejected_dir}"
    policy:
      severity: "reject"
    schema:
      mismatch:
        extra_columns: "reject_file"
      columns:
        - name: "id"
          type: "string"
        - name: "user_name"
          source: "user.name"
          type: "string"
"#,
        report_dir = report_dir.display(),
        input_dir = input_dir.display(),
        accepted_dir = accepted_dir.display(),
        rejected_dir = rejected_dir.display(),
    );
    let config_path = write_config(root, &yaml);

    let outcome = run(
        &config_path,
        RunOptions {
            profile: None,
            run_id: Some("it-json-mismatch".to_string()),
            entities: Vec::new(),
            dry_run: false,
        },
    )
    .expect("run config");

    let file_report = &outcome.entity_outcomes[0].report.files[0];
    assert_eq!(file_report.status, FileStatus::Rejected);
    assert_eq!(file_report.mismatch.missing_columns, Vec::<String>::new());
    assert_eq!(
        file_report.mismatch.extra_columns,
        vec!["extra".to_string(), "user".to_string()]
    );
}