use super::*;
#[test]
fn test_from_yaml_valid() {
let yaml = r#"
pipeline:
sources:
- id: src1
type: http_client
transforms:
- id: t1
inputs: [src1]
outputs: [sink1]
sinks:
- id: sink1
type: console
"#;
let config = Config::from_yaml(yaml).unwrap();
assert_eq!(config.pipeline.sources.len(), 1);
assert_eq!(config.pipeline.sinks.len(), 1);
}
#[test]
fn test_from_yaml_invalid() {
let yaml = "invalid: [yaml: content";
assert!(Config::from_yaml(yaml).is_err());
}
#[test]
fn test_validate_duplicate_source_id() {
let yaml = r#"
pipeline:
sources:
- id: dup
type: http_client
- id: dup
type: http_client
"#;
let config = Config::from_yaml(yaml).unwrap();
let err = config.validate().unwrap_err();
assert!(err.to_string().contains("Duplicate source ID"));
}
#[test]
fn test_validate_duplicate_transform_id() {
let yaml = r#"
pipeline:
sources:
- id: src1
type: http_client
transforms:
- id: dup
inputs: [src1]
outputs: [sink1]
- id: dup
inputs: [src1]
outputs: [sink1]
sinks:
- id: sink1
type: console
"#;
let config = Config::from_yaml(yaml).unwrap();
let err = config.validate().unwrap_err();
assert!(err.to_string().contains("Duplicate transform ID"));
}
#[test]
fn test_validate_duplicate_sink_id() {
let yaml = r#"
pipeline:
sources:
- id: src1
type: http_client
transforms:
- id: t1
inputs: [src1]
outputs: [dup]
sinks:
- id: dup
type: console
- id: dup
type: console
"#;
let config = Config::from_yaml(yaml).unwrap();
let err = config.validate().unwrap_err();
assert!(err.to_string().contains("Duplicate sink ID"));
}
#[test]
fn test_validate_unknown_input_transform() {
let yaml = r#"
pipeline:
sources:
- id: src1
type: http_client
transforms:
- id: t1
inputs: [unknown_node]
outputs: [sink1]
sinks:
- id: sink1
type: console
"#;
let config = Config::from_yaml(yaml).unwrap();
let err = config.validate().unwrap_err();
assert!(err.to_string().contains("unknown input"));
}
#[test]
fn test_validate_unknown_output_transform() {
let yaml = r#"
pipeline:
sources:
- id: src1
type: http_client
transforms:
- id: t1
inputs: [src1]
outputs: [nonexistent]
"#;
let config = Config::from_yaml(yaml).unwrap();
let err = config.validate().unwrap_err();
assert!(err.to_string().contains("unknown output"));
}
#[test]
fn test_validate_transform_input_must_be_emitter() {
let yaml = r#"
pipeline:
sources:
- id: src1
type: http_client
transforms:
- id: t1
inputs: [sink1]
outputs: [sink1]
sinks:
- id: sink1
type: console
"#;
let config = Config::from_yaml(yaml).unwrap();
let err = config.validate().unwrap_err();
assert!(err.to_string().contains("must be source or transform"));
}
#[test]
fn test_validate_transform_to_transform_chain() {
let yaml = r#"
pipeline:
sources:
- id: src1
type: http_client
transforms:
- id: t1
inputs: [src1]
outputs: [t2]
- id: t2
inputs: [t1]
outputs: [sink1]
sinks:
- id: sink1
type: console
"#;
let config = Config::from_yaml(yaml).unwrap();
assert!(config.validate().is_ok());
}
#[test]
fn test_validate_transform_chain_with_fanout() {
let yaml = r#"
pipeline:
sources:
- id: src1
type: http_client
transforms:
- id: t1
inputs: [src1]
outputs: [t2, t3]
- id: t2
inputs: [t1]
outputs: [sink1]
- id: t3
inputs: [t1]
outputs: [sink1]
sinks:
- id: sink1
type: console
"#;
let config = Config::from_yaml(yaml).unwrap();
assert!(config.validate().is_ok());
}
#[test]
fn test_validate_transform_chain_outputs_only_link() {
let yaml = r#"
pipeline:
sources:
- id: src1
type: http_client
transforms:
- id: t1
inputs: [src1]
outputs: [t2]
- id: t2
outputs: [sink1]
sinks:
- id: sink1
type: console
"#;
let config = Config::from_yaml(yaml).unwrap();
assert!(config.validate().is_ok());
}
#[test]
fn test_validate_transform_chain_inputs_only_link() {
let yaml = r#"
pipeline:
sources:
- id: src1
type: http_client
transforms:
- id: t1
inputs: [src1]
outputs: [sink1]
- id: t2
inputs: [t1]
outputs: [sink1]
sinks:
- id: sink1
type: console
"#;
let config = Config::from_yaml(yaml).unwrap();
assert!(config.validate().is_ok());
}
#[test]
fn test_validate_transform_self_reference_input() {
let yaml = r#"
pipeline:
sources:
- id: src1
type: http_client
transforms:
- id: t1
inputs: [t1]
outputs: [sink1]
sinks:
- id: sink1
type: console
"#;
let config = Config::from_yaml(yaml).unwrap();
let err = config.validate().unwrap_err();
assert!(err.to_string().contains("cannot reference itself"));
}
#[test]
fn test_validate_transform_self_reference_output() {
let yaml = r#"
pipeline:
sources:
- id: src1
type: http_client
transforms:
- id: t1
inputs: [src1]
outputs: [t1]
sinks:
- id: sink1
type: console
"#;
let config = Config::from_yaml(yaml).unwrap();
let err = config.validate().unwrap_err();
assert!(err.to_string().contains("cannot reference itself"));
}
#[test]
fn test_validate_transform_chain_cycle_detection() {
let yaml = r#"
pipeline:
sources:
- id: src1
type: http_client
transforms:
- id: t1
inputs: [src1, t2]
outputs: [t2]
- id: t2
inputs: [t1]
outputs: [t1, sink1]
sinks:
- id: sink1
type: console
"#;
let config = Config::from_yaml(yaml).unwrap();
let err = config.validate().unwrap_err();
assert!(err.to_string().contains("Cycle detected"));
}
#[test]
fn test_validate_transform_requires_inputs() {
let yaml = r#"
pipeline:
sources:
- id: src1
type: http_client
transforms:
- id: t1
outputs: [sink1]
sinks:
- id: sink1
type: console
"#;
let config = Config::from_yaml(yaml).unwrap();
let err = config.validate().unwrap_err();
assert!(err.to_string().contains("has no inputs defined"));
}
#[test]
fn test_validate_no_cycles() {
let yaml = r#"
pipeline:
sources:
- id: src1
type: http_client
transforms:
- id: t1
inputs: [src1]
outputs: [sink1]
sinks:
- id: sink1
type: console
"#;
let config = Config::from_yaml(yaml).unwrap();
assert!(config.validate().is_ok());
}
#[test]
fn test_system_config_default() {
let cfg = SystemConfig::default();
assert_eq!(cfg.output_buffer_size(), 1024);
assert_eq!(cfg.channel_size(), 256);
assert_eq!(cfg.shutdown_timeout(), std::time::Duration::from_secs(5));
}
#[test]
fn test_validate_system_source_input_ok() {
let yaml = r#"
pipeline:
transforms:
- id: t1
inputs: [source::system::event]
outputs: [sink1]
sinks:
- id: sink1
type: console
"#;
let config = Config::from_yaml(yaml).unwrap();
assert!(config.validate().is_ok());
}
#[test]
fn test_validate_system_source_unknown_rejected() {
let yaml = r#"
pipeline:
transforms:
- id: t1
inputs: [source::system::unknown]
outputs: [sink1]
sinks:
- id: sink1
type: console
"#;
let config = Config::from_yaml(yaml).unwrap();
let err = config.validate().unwrap_err();
assert!(err.to_string().contains("unknown input"));
}
#[test]
fn test_validate_system_sink_output_ok() {
let yaml = r#"
pipeline:
sources:
- id: src1
type: http_client
transforms:
- id: t1
inputs: [src1]
outputs: [sink::system::event]
"#;
let config = Config::from_yaml(yaml).unwrap();
assert!(config.validate().is_ok());
}
#[test]
fn test_validate_system_ids_reserved() {
let yaml = r#"
pipeline:
sources:
- id: source::system::event
type: http_client
transforms:
- id: t1
inputs: [source::system::event]
outputs: [sink::system::event]
sinks:
- id: sink::system::event
type: console
"#;
let config = Config::from_yaml(yaml).unwrap();
let err = config.validate().unwrap_err();
assert!(err.to_string().contains("reserved for system use"));
}
#[test]
fn test_system_config_serde() {
let yaml = r#"
output_buffer_size: 2048
channel_size: 512
shutdown_timeout: 8s
"#;
let cfg: SystemConfig = serde_yaml::from_str(yaml).unwrap();
assert_eq!(cfg.output_buffer_size(), 2048);
assert_eq!(cfg.channel_size(), 512);
assert_eq!(cfg.shutdown_timeout(), std::time::Duration::from_secs(8));
}
#[test]
fn test_system_notify_silence_merge_overrides() {
let base = Config::from_yaml(
r#"
system:
notify:
silence:
window: 2h
key: "{{ $.name }}"
pipeline: {}
"#,
)
.unwrap();
let other = Config::from_yaml(
r#"
system:
notify:
silence:
key: "{{ $.message }}"
pipeline: {}
"#,
)
.unwrap();
let mut merged = base;
merged.merge(other);
let silence = merged.system.notify.silence.as_ref().unwrap();
assert_eq!(silence.window, Some(Duration::from_secs(7200)));
assert_eq!(silence.key.as_deref(), Some("{{ $.message }}"));
}
#[test]
fn test_validate_rejects_system_notify_active_window_missing_end() {
let yaml = r#"
system:
notify:
active_window:
start: "08:00"
pipeline: {}
"#;
let config = Config::from_yaml(yaml).unwrap();
let err = config.validate().unwrap_err().to_string();
assert!(err.contains("system.notify.active_window.end"));
}
#[test]
fn test_validate_rejects_system_notify_active_window_invalid_timezone() {
let yaml = r#"
system:
notify:
active_window:
start: "08:00"
end: "22:00"
timezone: "Not/AZone"
pipeline: {}
"#;
let config = Config::from_yaml(yaml).unwrap();
let err = config.validate().unwrap_err().to_string();
assert!(err.contains("system.notify.active_window.timezone"));
}
#[test]
fn test_source_config_output_buffer_size() {
let yaml = r#"
pipeline:
sources:
- id: src1
type: http_client
output_buffer_size: 256
"#;
let config = Config::from_yaml(yaml).unwrap();
assert_eq!(config.pipeline.sources[0].output_buffer_size, Some(256));
}
#[test]
fn test_validate_transform_allows_empty_steps() {
let yaml = r#"
pipeline:
sources:
- id: src1
type: http_client
transforms:
- id: t1
inputs: [src1]
outputs: [sink1]
sinks:
- id: sink1
type: console
"#;
let config = Config::from_yaml(yaml).unwrap();
assert!(config.validate().is_ok());
}
#[test]
fn test_system_sinks_config_parses() {
let yaml = r#"
system:
sinks:
dir: data
dlq: data/custom_dlq.jsonl
pipeline: {}
"#;
let config = Config::from_yaml(yaml).unwrap();
assert_eq!(
config.system.sinks.dir.as_ref().unwrap(),
&std::path::PathBuf::from("data")
);
assert_eq!(
config.system.sinks.dlq.as_ref().unwrap(),
&std::path::PathBuf::from("data/custom_dlq.jsonl")
);
}
#[test]
fn test_system_sinks_config_merge_overrides() {
let mut base = Config::default();
base.system.sinks.dir = Some(std::path::PathBuf::from("data"));
let mut other = Config::default();
other.system.sinks.event = Some(std::path::PathBuf::from("event.jsonl"));
base.merge(other);
assert_eq!(
base.system.sinks.dir.as_ref().unwrap(),
&std::path::PathBuf::from("data")
);
assert_eq!(
base.system.sinks.event.as_ref().unwrap(),
&std::path::PathBuf::from("event.jsonl")
);
}
#[test]
fn test_system_config_accepts_global_alias() {
let yaml = r#"
global:
output_buffer_size: 256
channel_size: 128
shutdown_timeout: 3s
pipeline: {}
"#;
let config = Config::from_yaml(yaml).unwrap();
assert_eq!(config.system.output_buffer_size(), 256);
assert_eq!(config.system.channel_size(), 128);
assert_eq!(
config.system.shutdown_timeout(),
std::time::Duration::from_secs(3)
);
}
#[test]
fn test_validate_rejects_zero_system_buffers() {
let yaml = r#"
system:
output_buffer_size: 0
channel_size: 0
pipeline: {}
"#;
let config = Config::from_yaml(yaml).unwrap();
let err = config.validate().unwrap_err().to_string();
assert!(err.contains("system.output_buffer_size"));
}
#[test]
fn test_validate_rejects_zero_source_output_buffer() {
let yaml = r#"
pipeline:
sources:
- id: src1
type: http_client
output_buffer_size: 0
transforms:
- id: t1
inputs: [src1]
outputs: [sink1]
steps: []
sinks:
- id: sink1
type: console
"#;
let config = Config::from_yaml(yaml).unwrap();
let err = config.validate().unwrap_err().to_string();
assert!(err.contains("Source 'src1'"));
}
#[test]
fn test_validate_rejects_zero_transform_output_buffer() {
let yaml = r#"
pipeline:
sources:
- id: src1
type: http_client
transforms:
- id: t1
inputs: [src1]
outputs: [sink1]
output_buffer_size: 0
steps: []
sinks:
- id: sink1
type: console
"#;
let config = Config::from_yaml(yaml).unwrap();
let err = config.validate().unwrap_err().to_string();
assert!(err.contains("Transform 't1'"));
}
#[test]
fn test_from_dir_merge() {
use std::fs::File;
use std::io::Write;
use tempfile::tempdir;
let dir = tempdir().unwrap();
let path = dir.path();
let sources_yaml = r#"
pipeline:
sources:
- id: src1
type: http_client
"#;
let mut f = File::create(path.join("01_sources.yaml")).unwrap();
f.write_all(sources_yaml.as_bytes()).unwrap();
let sinks_yaml = r#"
pipeline:
transforms:
- id: t1
inputs: [src1]
outputs: [sink1]
sinks:
- id: sink1
type: console
"#;
let mut f = File::create(path.join("02_sinks.yaml")).unwrap();
f.write_all(sinks_yaml.as_bytes()).unwrap();
let config = Config::from_file(path).unwrap();
assert_eq!(config.pipeline.sources.len(), 1);
assert_eq!(config.pipeline.sources[0].id, "src1");
assert_eq!(config.pipeline.sinks.len(), 1);
assert_eq!(config.pipeline.sinks[0].id, "sink1");
}
#[test]
fn test_from_dir_empty_fail() {
use tempfile::tempdir;
let dir = tempdir().unwrap();
let path = dir.path();
let result = Config::from_file(path);
assert!(result.is_err());
assert!(
result
.unwrap_err()
.to_string()
.contains("No .yaml or .yml files")
);
}
#[test]
fn test_validate_unused_source() {
let yaml = r#"
pipeline:
sources:
- id: src1
type: http_client
- id: unused
type: http_client
transforms:
- id: t1
inputs: [src1]
outputs: [sink1]
sinks:
- id: sink1
type: console
"#;
let config = Config::from_yaml(yaml).unwrap();
let err = config.validate().unwrap_err();
assert!(err.to_string().contains("Source 'unused' is not used"));
}
#[test]
fn test_validate_unused_sink() {
let yaml = r#"
pipeline:
sources:
- id: src1
type: http_client
transforms:
- id: t1
inputs: [src1]
outputs: [sink1]
sinks:
- id: sink1
type: console
- id: unused
type: console
"#;
let config = Config::from_yaml(yaml).unwrap();
let err = config.validate().unwrap_err();
assert!(err.to_string().contains("Sink 'unused' is not used"));
}
#[test]
fn test_env_var_substitution() {
let home = std::env::var("HOME").unwrap_or_else(|_| "/tmp".to_string());
let yaml = r#"
pipeline:
sources:
- id: src1
type: http_client
config:
path: "${HOME}"
transforms:
- id: t1
inputs: [src1]
outputs: [sink1]
sinks:
- id: sink1
type: console
"#;
let config = Config::from_yaml(yaml).unwrap();
let path = config.pipeline.sources[0]
.config
.get("path")
.and_then(|v| v.as_str())
.unwrap();
assert_eq!(path, home);
}
#[test]
fn test_env_var_missing_error() {
let yaml = r#"
pipeline:
sources:
- id: src1
type: http_client
config:
url: "${NONEXISTENT_PIPEFLOW_VAR}"
"#;
let err = Config::from_yaml(yaml).unwrap_err();
assert!(err.to_string().contains("environment variable"));
}
#[test]
fn test_env_var_preserves_dollar_var_syntax() {
let yaml = r#"
pipeline:
sources:
- id: src1
type: http_client
config:
id_field: "$UUID"
time_field: "$NOW"
transforms:
- id: t1
inputs: [src1]
outputs: [sink1]
sinks:
- id: sink1
type: console
"#;
let config = Config::from_yaml(yaml).unwrap();
let id_field = config.pipeline.sources[0]
.config
.get("id_field")
.and_then(|v| v.as_str())
.unwrap();
assert_eq!(id_field, "$UUID");
let time_field = config.pipeline.sources[0]
.config
.get("time_field")
.and_then(|v| v.as_str())
.unwrap();
assert_eq!(time_field, "$NOW");
}
#[test]
fn test_env_var_ignores_invalid_syntax() {
let yaml = r#"
pipeline:
sources:
- id: src1
type: http_client
config:
expr1: "${ $.price }"
expr2: "${invalid-var-name}"
transforms:
- id: t1
inputs: [src1]
outputs: [sink1]
sinks:
- id: sink1
type: console
"#;
let config = Config::from_yaml(yaml).unwrap();
let expr1 = config.pipeline.sources[0]
.config
.get("expr1")
.and_then(|v| v.as_str())
.unwrap();
assert_eq!(expr1, "${ $.price }");
let expr2 = config.pipeline.sources[0]
.config
.get("expr2")
.and_then(|v| v.as_str())
.unwrap();
assert_eq!(expr2, "${invalid-var-name}"); }