use super::*;
use crate::config::{Config, PipelineConfig, SinkConfig, SystemConfig};
fn minimal_config() -> Config {
Config {
system: SystemConfig::default(),
pipeline: PipelineConfig::default(),
}
}
#[test]
fn test_from_config_creates_empty_pipeline() {
let config = minimal_config();
let engine = Engine::from_config(config).expect("should create engine");
assert!(engine.sources.is_empty());
assert!(engine.transforms.is_empty());
assert!(engine.sinks.is_empty());
}
#[test]
fn test_from_config_preserves_system_config() {
let mut config = minimal_config();
config.system.output_buffer_size = Some(2048);
let engine = Engine::from_config(config).expect("should create engine");
assert_eq!(engine.config.system.output_buffer_size(), 2048);
}
#[tokio::test]
async fn test_build_unknown_source_type_returns_error() {
let config = Config::from_yaml(
r#"
pipeline:
sources:
- id: test_src
type: unknown_source_type
transforms:
- id: t1
inputs: [test_src]
outputs: [sink1]
sinks:
- id: sink1
type: blackhole
"#,
)
.unwrap();
let mut engine = Engine::from_config(config).unwrap();
let result = engine.build().await;
assert!(result.is_err());
let err = result.unwrap_err().to_string();
assert!(err.contains("Unknown source type"));
}
#[tokio::test]
async fn test_build_unknown_sink_type_returns_error() {
let config = Config::from_yaml(
r#"
pipeline:
sources:
- id: test_src
type: http_client
config:
url: "http://example.com"
poll_interval_seconds: 60
transforms:
- id: t1
inputs: [test_src]
outputs: [test_sink]
sinks:
- id: test_sink
type: unknown_sink_type
"#,
)
.unwrap();
let mut engine = Engine::from_config(config).unwrap();
let result = engine.build().await;
assert!(result.is_err());
let err = result.unwrap_err().to_string();
assert!(err.contains("Unknown sink type"));
}
#[tokio::test]
async fn test_build_blackhole_sink_succeeds() {
let sink_config = SinkConfig {
id: "blackhole_test".to_string(),
sink_type: "blackhole".to_string(),
config: serde_yaml::Value::Null,
};
let system = SystemConfig::default();
let result = super::build::build_sink(&sink_config, &system).await;
assert!(result.is_ok());
assert_eq!(result.unwrap().id(), "blackhole_test");
}
#[tokio::test]
async fn test_build_console_sink_with_default_config() {
let sink_config = SinkConfig {
id: "console_test".to_string(),
sink_type: "console".to_string(),
config: serde_yaml::Value::Null,
};
let system = SystemConfig::default();
let result = super::build::build_sink(&sink_config, &system).await;
assert!(result.is_ok());
assert_eq!(result.unwrap().id(), "console_test");
}
#[tokio::test]
async fn test_build_file_sink_requires_path() {
let sink_config = SinkConfig {
id: "file_test".to_string(),
sink_type: "file".to_string(),
config: serde_yaml::Value::Null,
};
let system = SystemConfig::default();
let result = super::build::build_sink(&sink_config, &system).await;
assert!(result.is_err(), "File sink should require path config");
}
#[tokio::test]
async fn test_create_source_channels_respects_system_capacity() {
let config = Config::from_yaml(
r#"
system:
output_buffer_size: 512
pipeline:
sources:
- id: src1
type: http_client
config:
url: "http://example.com"
- id: src2
type: http_client
config:
url: "http://example.com"
transforms:
- id: t1
inputs: [src1, src2]
outputs: [sink1]
sinks:
- id: sink1
type: blackhole
"#,
)
.unwrap();
let mut engine = Engine::from_config(config).unwrap();
engine.build().await.unwrap();
let node_channels = engine.create_node_channels();
assert_eq!(node_channels.len(), 6);
assert!(node_channels.contains_key("src1"));
assert!(node_channels.contains_key("src2"));
assert!(node_channels.contains_key(crate::source::system::DLQ_SOURCE_ID));
assert!(node_channels.contains_key(crate::source::system::EVENT_SOURCE_ID));
assert!(node_channels.contains_key(crate::source::system::AUDIT_SOURCE_ID));
assert!(node_channels.contains_key("t1"));
}
#[tokio::test]
async fn test_create_node_channels_uses_default_capacity() {
let config = Config::from_yaml(
r#"
pipeline:
sources:
- id: src1
type: http_client
config:
url: "http://example.com"
transforms:
- id: t1
inputs: [src1]
outputs: [sink1]
sinks:
- id: sink1
type: blackhole
"#,
)
.unwrap();
let mut engine = Engine::from_config(config).unwrap();
engine.build().await.unwrap();
let node_channels = engine.create_node_channels();
assert_eq!(node_channels.len(), 5);
assert!(node_channels.contains_key("src1"));
assert!(node_channels.contains_key(crate::source::system::DLQ_SOURCE_ID));
assert!(node_channels.contains_key(crate::source::system::EVENT_SOURCE_ID));
assert!(node_channels.contains_key(crate::source::system::AUDIT_SOURCE_ID));
assert!(node_channels.contains_key("t1"));
}
#[tokio::test]
async fn test_create_node_channels_respects_source_override() {
let config = Config::from_yaml(
r#"
system:
output_buffer_size: 1024
pipeline:
sources:
- id: src1
type: http_client
output_buffer_size: 256
config:
url: "http://example.com"
- id: src2
type: http_client
config:
url: "http://example.com"
transforms:
- id: t1
inputs: [src1, src2]
outputs: [sink1]
sinks:
- id: sink1
type: blackhole
"#,
)
.unwrap();
let mut engine = Engine::from_config(config).unwrap();
engine.build().await.unwrap();
let node_channels = engine.create_node_channels();
assert_eq!(node_channels.len(), 6);
assert!(node_channels.contains_key("src1"));
assert!(node_channels.contains_key("src2"));
assert!(node_channels.contains_key(crate::source::system::DLQ_SOURCE_ID));
assert!(node_channels.contains_key(crate::source::system::EVENT_SOURCE_ID));
assert!(node_channels.contains_key(crate::source::system::AUDIT_SOURCE_ID));
assert!(node_channels.contains_key("t1"));
}
#[cfg(feature = "http-server")]
#[tokio::test]
async fn test_build_http_server_source() {
let config = Config::from_yaml(
r#"
pipeline:
sources:
- id: server
type: http_server
config:
bind: "127.0.0.1:0"
path: "/"
transforms:
- id: t1
inputs: [server]
outputs: [sink1]
sinks:
- id: sink1
type: blackhole
"#,
)
.unwrap();
let mut engine = Engine::from_config(config).unwrap();
let result = engine.build().await;
assert!(result.is_ok());
assert!(engine.sources.contains_key("server"));
}
#[cfg(not(feature = "http-server"))]
#[tokio::test]
async fn test_build_http_server_source_requires_feature() {
let config = Config::from_yaml(
r#"
pipeline:
sources:
- id: server
type: http_server
transforms:
- id: t1
inputs: [server]
outputs: [sink1]
sinks:
- id: sink1
type: blackhole
"#,
)
.unwrap();
let mut engine = Engine::from_config(config).unwrap();
let result = engine.build().await;
assert!(result.is_err());
assert!(result.unwrap_err().to_string().contains("http-server"));
}
#[tokio::test]
async fn test_build_system_source_dlq() {
let config = Config::from_yaml(
r#"
pipeline:
transforms:
- id: t1
inputs: [source::system::dlq]
outputs: [console]
sinks:
- id: console
type: console
"#,
)
.unwrap();
let mut engine = Engine::from_config(config).unwrap();
let result = engine.build().await;
assert!(result.is_ok(), "Should build system DLQ source");
assert!(engine.sources.contains_key("source::system::dlq"));
assert!(
engine.system_channels().is_some(),
"System channels should be created"
);
}
#[tokio::test]
async fn test_build_system_source_invalid_id() {
let config = Config::from_yaml(
r#"
pipeline:
transforms:
- id: t1
inputs: [source::system::unknown]
outputs: [console]
sinks:
- id: console
type: console
"#,
)
.unwrap();
let result = Engine::from_config(config);
assert!(result.is_err());
let err = result.err().unwrap().to_string();
assert!(err.contains("unknown input"), "Error was: {}", err);
assert!(
err.contains("source::system::unknown"),
"Error was: {}",
err
);
}
struct MockSource {
id: String,
finish_after: Option<tokio::time::Duration>,
fail_after: Option<tokio::time::Duration>,
}
#[async_trait::async_trait]
impl crate::source::Source for MockSource {
fn id(&self) -> &str {
&self.id
}
async fn run(
&self,
_sender: crate::source::MessageSender,
mut shutdown: broadcast::Receiver<()>,
) -> crate::error::Result<()> {
if let Some(d) = self.finish_after {
tokio::select! {
_ = tokio::time::sleep(d) => Ok(()),
_ = shutdown.recv() => Ok(()),
}
} else if let Some(d) = self.fail_after {
tokio::select! {
_ = tokio::time::sleep(d) => Err(crate::error::Error::source("Mock failure")),
_ = shutdown.recv() => Ok(()),
}
} else {
let _ = shutdown.recv().await;
Ok(())
}
}
}
#[tokio::test]
async fn test_lifecycle_fail_fast() {
let mut engine = Engine::from_config(minimal_config()).unwrap();
let source = Arc::new(MockSource {
id: "failing".to_string(),
finish_after: None,
fail_after: Some(tokio::time::Duration::from_millis(50)),
});
engine.sources.insert("failing".to_string(), source);
let result = tokio::time::timeout(tokio::time::Duration::from_secs(1), engine.run()).await;
assert!(
result.is_ok(),
"Engine should exit within timeout (Fail-Fast triggered)"
);
assert!(result.unwrap().is_ok(), "Engine run should return Ok");
}
#[tokio::test]
async fn test_lifecycle_auto_shutdown() {
let mut engine = Engine::from_config(minimal_config()).unwrap();
let source = Arc::new(MockSource {
id: "finishing".to_string(),
finish_after: Some(tokio::time::Duration::from_millis(50)),
fail_after: None,
});
engine.sources.insert("finishing".to_string(), source);
let result = tokio::time::timeout(tokio::time::Duration::from_secs(1), engine.run()).await;
assert!(
result.is_ok(),
"Engine should Auto-Shutdown when source finishes"
);
}