floe-core 0.3.7

Core library for Floe, a YAML-driven technical ingestion tool.
Documentation
use std::fs;
use std::path::{Path, PathBuf};

use floe_core::{run, RunOptions};

fn write_csv(dir: &Path, name: &str, contents: &str) -> PathBuf {
    let path = dir.join(name);
    fs::write(&path, contents).expect("write csv");
    path
}

fn write_config(dir: &Path, contents: &str) -> PathBuf {
    let path = dir.join("config.yml");
    fs::write(&path, contents).expect("write config");
    path
}

fn archive_entity_dir(archive_root: &Path, entity: &str) -> PathBuf {
    archive_root.join(entity)
}

fn list_files(dir: &Path) -> Vec<PathBuf> {
    let mut files = Vec::new();
    if dir.exists() {
        for entry in fs::read_dir(dir).expect("read dir") {
            let entry = entry.expect("dir entry");
            let path = entry.path();
            if path.is_file() {
                files.push(path);
            }
        }
    }
    files.sort();
    files
}

fn archive_config(
    report_dir: &Path,
    source_path: &Path,
    accepted_dir: &Path,
    archive_dir: &Path,
    incremental_mode: Option<&str>,
) -> String {
    let incremental_block = incremental_mode
        .map(|mode| format!("    incremental_mode: \"{mode}\"\n"))
        .unwrap_or_default();
    format!(
        r#"version: "0.1"
report:
  path: "{report_dir}"
entities:
  - name: "customer"
{incremental_block}    source:
      format: "csv"
      path: "{source_path}"
    sink:
      accepted:
        format: "parquet"
        path: "{accepted_dir}"
      archive:
        path: "{archive_dir}"
    policy:
      severity: "warn"
    schema:
      columns:
        - name: "id"
          type: "string"
        - name: "name"
          type: "string"
"#,
        report_dir = report_dir.display(),
        incremental_block = incremental_block,
        source_path = source_path.display(),
        accepted_dir = accepted_dir.display(),
        archive_dir = archive_dir.display(),
    )
}

#[test]
fn archive_moves_only_processed_input_not_local_siblings() {
    let temp_dir = tempfile::TempDir::new().expect("temp dir");
    let root = temp_dir.path();
    let input_dir = root.join("in");
    let accepted_dir = root.join("out/accepted/customer");
    let archive_dir = root.join("archive");
    let report_dir = root.join("report");

    fs::create_dir_all(&input_dir).expect("create input dir");
    let processed = write_csv(&input_dir, "data.csv", "id,name\n1,alice\n");
    let sibling = write_csv(&input_dir, "sibling.csv", "id,name\n99,keep\n");

    let yaml = archive_config(
        &report_dir,
        &processed,
        &accepted_dir,
        &archive_dir,
        Some("archive"),
    );
    let config_path = write_config(root, &yaml);

    let outcome = run(
        &config_path,
        RunOptions {
            profile: None,
            run_id: Some("archive-sibling-it".to_string()),
            entities: Vec::new(),
            dry_run: false,
        },
    )
    .expect("run config");

    assert!(!processed.exists(), "processed file should be archived");
    assert!(
        sibling.exists(),
        "non-ingested sibling must remain in place"
    );

    let archived_files = list_files(&archive_entity_dir(&archive_dir, "customer"));
    assert_eq!(archived_files.len(), 1);
    let archived_name = archived_files[0]
        .file_name()
        .and_then(|name| name.to_str())
        .expect("archived filename");
    assert!(archived_name.starts_with("data__run-archive-sibling-it__src-"));
    assert!(archived_name.ends_with(".csv"));

    let report = &outcome.entity_outcomes[0].report;
    assert_eq!(report.files.len(), 1);
    let archived_path = report.files[0]
        .output
        .archived_path
        .as_ref()
        .expect("archived path in report");
    assert_eq!(archived_path, &archived_files[0].display().to_string());
}

#[test]
fn archive_repeated_runs_do_not_overwrite_same_source_filename() {
    let temp_dir = tempfile::TempDir::new().expect("temp dir");
    let root = temp_dir.path();
    let input_dir = root.join("in");
    let accepted_dir = root.join("out/accepted/customer");
    let archive_dir = root.join("archive");
    let report_dir = root.join("report");

    fs::create_dir_all(&input_dir).expect("create input dir");
    let source_path = input_dir.join("data.csv");

    let yaml = archive_config(
        &report_dir,
        &source_path,
        &accepted_dir,
        &archive_dir,
        Some("archive"),
    );
    let config_path = write_config(root, &yaml);

    fs::write(&source_path, "id,name\n1,first\n").expect("write first source");
    run(
        &config_path,
        RunOptions {
            profile: None,
            run_id: Some("archive-run-1".to_string()),
            entities: Vec::new(),
            dry_run: false,
        },
    )
    .expect("first run");

    fs::write(&source_path, "id,name\n2,second\n").expect("write second source");
    run(
        &config_path,
        RunOptions {
            profile: None,
            run_id: Some("archive-run-2".to_string()),
            entities: Vec::new(),
            dry_run: false,
        },
    )
    .expect("second run");

    let archived_files = list_files(&archive_entity_dir(&archive_dir, "customer"));
    assert_eq!(archived_files.len(), 2, "expected two archived artifacts");

    let names = archived_files
        .iter()
        .map(|path| {
            path.file_name()
                .and_then(|name| name.to_str())
                .expect("name")
                .to_string()
        })
        .collect::<Vec<_>>();
    assert!(names
        .iter()
        .any(|name| name.contains("__run-archive-run-1__src-")));
    assert!(names
        .iter()
        .any(|name| name.contains("__run-archive-run-2__src-")));

    let contents = archived_files
        .iter()
        .map(|path| fs::read_to_string(path).expect("read archived file"))
        .collect::<Vec<_>>();
    assert!(contents.iter().any(|content| content.contains("1,first")));
    assert!(contents.iter().any(|content| content.contains("2,second")));
}

#[test]
fn legacy_archive_config_still_archives_inputs() {
    let temp_dir = tempfile::TempDir::new().expect("temp dir");
    let root = temp_dir.path();
    let input_dir = root.join("in");
    let accepted_dir = root.join("out/accepted/customer");
    let archive_dir = root.join("archive");
    let report_dir = root.join("report");

    fs::create_dir_all(&input_dir).expect("create input dir");
    let processed = write_csv(&input_dir, "data.csv", "id,name\n1,alice\n");
    let yaml = archive_config(&report_dir, &processed, &accepted_dir, &archive_dir, None);
    let config_path = write_config(root, &yaml);

    let outcome = run(
        &config_path,
        RunOptions {
            profile: None,
            run_id: Some("archive-legacy-compat".to_string()),
            entities: Vec::new(),
            dry_run: false,
        },
    )
    .expect("run config");

    assert!(outcome.entity_outcomes[0].report.sink.archive.enabled);
    assert!(!processed.exists(), "processed file should be archived");
    assert_eq!(
        list_files(&archive_entity_dir(&archive_dir, "customer")).len(),
        1
    );
}