multiio 0.2.3

A unified I/O orchestration library for CLI/server applications
Documentation
#![cfg(all(feature = "json", feature = "yaml"))]

use std::fs;
use std::path::PathBuf;

use crate::config::PipelineConfig;
use crate::error::{AggregateError, Stage};
use crate::{ErrorPolicy, MultiioBuilder, default_registry};

#[derive(Debug, serde::Serialize, serde::Deserialize, PartialEq)]
struct ConfigData {
    name: String,
    value: i32,
}

fn write_json_object(path: &PathBuf, data: &ConfigData) {
    let json = serde_json::to_string_pretty(data).expect("serialize test data");
    fs::write(path, json).expect("write test input file");
}

#[test]
fn pipeline_e2e_file_to_file_json() {
    let dir = tempfile::tempdir().expect("tempdir");
    let in_path = dir.path().join("input.json");
    let out_path = dir.path().join("output.json");

    let record = ConfigData {
        name: "a".into(),
        value: 1,
    };
    write_json_object(&in_path, &record);

    let yaml = format!(
        r#"
inputs:
  - id: in
    kind: file
    path: {}
    format: json
outputs:
  - id: out
    kind: file
    path: {}
    format: json
error_policy: fast_fail
format_order: ["json", "yaml", "plaintext"]
"#,
        in_path.to_string_lossy(),
        out_path.to_string_lossy()
    );

    let pipeline: PipelineConfig = serde_yaml::from_str(&yaml).expect("parse pipeline yaml");

    let registry = default_registry();
    let builder = MultiioBuilder::from_pipeline_config(pipeline, registry)
        .expect("from_pipeline_config should succeed");

    let engine = builder
        .with_mode(ErrorPolicy::FastFail)
        .build()
        .expect("build engine");

    let vals: Vec<ConfigData> = engine.read_all().expect("read_all");
    assert_eq!(vals.len(), 1);
    assert_eq!(vals[0], record);

    engine.write_all(&vals).expect("write_all");

    let out_bytes = fs::read(&out_path).expect("read output file");
    let decoded: Vec<ConfigData> = serde_json::from_slice(&out_bytes).expect("decode output json");
    assert_eq!(decoded.len(), 1);
    assert_eq!(decoded[0], record);
}

#[test]
fn pipeline_e2e_multi_input_multi_output_json() {
    let dir = tempfile::tempdir().expect("tempdir");
    let in_path1 = dir.path().join("input1.json");
    let in_path2 = dir.path().join("input2.json");
    let out_path1 = dir.path().join("output1.json");
    let out_path2 = dir.path().join("output2.json");

    let record1 = ConfigData {
        name: "a".into(),
        value: 1,
    };
    let record2 = ConfigData {
        name: "b".into(),
        value: 2,
    };
    write_json_object(&in_path1, &record1);
    write_json_object(&in_path2, &record2);

    let yaml = format!(
        r#"
inputs:
  - id: in1
    kind: file
    path: {}
    format: json
  - id: in2
    kind: file
    path: {}
    format: json
outputs:
  - id: out1
    kind: file
    path: {}
    format: json
  - id: out2
    kind: file
    path: {}
    format: json
error_policy: accumulate
format_order: ["json", "yaml", "plaintext"]
"#,
        in_path1.to_string_lossy(),
        in_path2.to_string_lossy(),
        out_path1.to_string_lossy(),
        out_path2.to_string_lossy(),
    );

    let pipeline: PipelineConfig = serde_yaml::from_str(&yaml).expect("parse pipeline yaml");

    let registry = default_registry();
    let builder = MultiioBuilder::from_pipeline_config(pipeline, registry)
        .expect("from_pipeline_config should succeed");

    let engine = builder
        .with_mode(ErrorPolicy::Accumulate)
        .build()
        .expect("build engine");

    let vals: Vec<ConfigData> = engine.read_all().expect("read_all");
    assert_eq!(vals.len(), 2);
    assert_eq!(vals[0], record1);
    assert_eq!(vals[1], record2);

    engine.write_all(&vals).expect("write_all");

    for out in [&out_path1, &out_path2] {
        let out_bytes = fs::read(out).expect("read output file");
        let decoded: Vec<ConfigData> =
            serde_json::from_slice(&out_bytes).expect("decode output json");
        assert_eq!(decoded, vals);
    }
}

#[test]
fn pipeline_unknown_custom_format_accumulates_errors() {
    let dir = tempfile::tempdir().expect("tempdir");
    let good_path = dir.path().join("good.json");
    let bad_path = dir.path().join("bad.json");

    let record = ConfigData {
        name: "good".into(),
        value: 1,
    };
    write_json_object(&good_path, &record);
    write_json_object(&bad_path, &record);

    let yaml = format!(
        r#"
inputs:
  - id: good
    kind: file
    path: {}
    format: json
  - id: bad
    kind: file
    path: {}
    format: custom:missing-format
outputs:
  - id: out
    kind: stdout
error_policy: accumulate
format_order: ["json"]
"#,
        good_path.to_string_lossy(),
        bad_path.to_string_lossy(),
    );

    let pipeline: PipelineConfig = serde_yaml::from_str(&yaml).expect("parse pipeline yaml");

    let registry = default_registry();
    let builder = MultiioBuilder::from_pipeline_config(pipeline, registry)
        .expect("from_pipeline_config should succeed");

    let engine = builder
        .with_mode(ErrorPolicy::Accumulate)
        .build()
        .expect("build engine");

    let result: Result<Vec<ConfigData>, AggregateError> = engine.read_all();
    let agg = result.expect_err("expected aggregate error due to unknown custom format");

    assert_eq!(agg.errors.len(), 1);
    let e = &agg.errors[0];
    assert_eq!(e.stage, Stage::Parse);
    assert_eq!(e.target, "bad");
    let msg = e.error.to_string();
    assert!(msg.contains("Unknown format"));
    assert!(msg.contains("missing-format"));
}

#[test]
fn pipeline_unknown_input_kind_produces_resolve_error() {
    let yaml = r#"
inputs:
  - id: net1
    kind: http
    url: "https://example.com/data.json"
outputs:
  - id: out
    kind: stdout
"#;

    let pipeline: PipelineConfig = serde_yaml::from_str(yaml).expect("parse pipeline yaml");

    let registry = default_registry();
    let result = MultiioBuilder::from_pipeline_config(pipeline, registry);

    let err: AggregateError = match result {
        Err(e) => e,
        Ok(_) => panic!("expected failure due to unknown input kind"),
    };
    assert_eq!(err.errors.len(), 1);
    let e = &err.errors[0];
    assert_eq!(e.stage, Stage::ResolveInput);
    assert_eq!(e.target, "net1");
}

#[test]
fn pipeline_missing_file_path_for_file_input() {
    let yaml = r#"
inputs:
  - id: in
    kind: file
outputs:
  - id: out
    kind: stdout
"#;

    let pipeline: PipelineConfig = serde_yaml::from_str(yaml).expect("parse pipeline yaml");

    let registry = default_registry();
    let result = MultiioBuilder::from_pipeline_config(pipeline, registry);

    let err: AggregateError = match result {
        Err(e) => e,
        Ok(_) => panic!("expected failure due to missing file path"),
    };
    assert_eq!(err.errors.len(), 1);
    let e = &err.errors[0];
    assert_eq!(e.stage, Stage::ResolveInput);
    assert_eq!(e.target, "in");
}

#[test]
fn pipeline_file_exists_policy_from_string() {
    let dir = tempfile::tempdir().expect("tempdir");
    let out_path = dir.path().join("out.json");

    let yaml = format!(
        r#"
inputs:
  - id: in
    kind: stdin
outputs:
  - id: out
    kind: file
    path: {}
    file_exists_policy: append
"#,
        out_path.to_string_lossy()
    );

    let pipeline: PipelineConfig = serde_yaml::from_str(&yaml).expect("parse pipeline yaml");

    let registry = default_registry();
    let builder = MultiioBuilder::from_pipeline_config(pipeline, registry)
        .expect("from_pipeline_config should succeed");

    fs::write(&out_path, b"OLD").expect("write initial output");

    let engine = builder
        .with_mode(ErrorPolicy::FastFail)
        .build()
        .expect("build engine");

    #[derive(Debug, serde::Serialize)]
    struct Dummy {
        x: i32,
    }

    let vals = vec![Dummy { x: 1 }];
    engine.write_all(&vals).expect("write_all");

    let contents = fs::read(&out_path).expect("read output");
    let s = String::from_utf8_lossy(&contents);
    assert!(s.starts_with("OLD"));
}