pub mod channel;
pub mod file;
pub mod http;
#[cfg(feature = "kafka")]
pub mod kafka;
pub mod loki;
pub mod memory;
#[cfg(feature = "remote-write")]
pub mod remote_write;
pub mod stdout;
pub mod tcp;
pub mod udp;
use std::collections::HashMap;
use std::path::Path;
use serde::Deserialize;
use crate::SondaError;
pub trait Sink: Send + Sync {
fn write(&mut self, data: &[u8]) -> Result<(), SondaError>;
fn flush(&mut self) -> Result<(), SondaError>;
}
#[derive(Debug, Clone, Deserialize)]
#[serde(tag = "type")]
pub enum SinkConfig {
#[serde(rename = "stdout")]
Stdout,
#[serde(rename = "file")]
File {
path: String,
},
#[serde(rename = "tcp")]
Tcp {
address: String,
},
#[serde(rename = "udp")]
Udp {
address: String,
},
#[serde(rename = "http_push")]
HttpPush {
url: String,
content_type: Option<String>,
batch_size: Option<usize>,
#[serde(default)]
headers: Option<HashMap<String, String>>,
},
#[cfg(feature = "remote-write")]
#[serde(rename = "remote_write")]
RemoteWrite {
url: String,
#[serde(default)]
batch_size: Option<usize>,
},
#[cfg(feature = "kafka")]
#[serde(rename = "kafka")]
Kafka {
brokers: String,
topic: String,
},
#[serde(rename = "loki")]
Loki {
url: String,
#[serde(default)]
labels: HashMap<String, String>,
#[serde(default)]
batch_size: Option<usize>,
},
}
pub fn create_sink(config: &SinkConfig) -> Result<Box<dyn Sink>, SondaError> {
match config {
SinkConfig::Stdout => Ok(Box::new(stdout::StdoutSink::new())),
SinkConfig::File { path } => Ok(Box::new(file::FileSink::new(Path::new(path))?)),
SinkConfig::Tcp { address } => Ok(Box::new(tcp::TcpSink::new(address)?)),
SinkConfig::Udp { address } => Ok(Box::new(udp::UdpSink::new(address)?)),
SinkConfig::HttpPush {
url,
content_type,
batch_size,
headers,
} => {
let ct = content_type
.as_deref()
.unwrap_or("application/octet-stream");
let bs = batch_size.unwrap_or(http::DEFAULT_BATCH_SIZE);
let h = headers.clone().unwrap_or_default();
Ok(Box::new(http::HttpPushSink::new(url, ct, bs, h)?))
}
#[cfg(feature = "remote-write")]
SinkConfig::RemoteWrite { url, batch_size } => {
let bs = batch_size.unwrap_or(remote_write::DEFAULT_BATCH_SIZE);
Ok(Box::new(remote_write::RemoteWriteSink::new(url, bs)?))
}
#[cfg(feature = "kafka")]
SinkConfig::Kafka { brokers, topic } => {
Ok(Box::new(kafka::KafkaSink::new(brokers, topic)?))
}
SinkConfig::Loki {
url,
labels,
batch_size,
} => {
let bs = batch_size.unwrap_or(100);
Ok(Box::new(loki::LokiSink::new(
url.clone(),
labels.clone(),
bs,
)?))
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn create_sink_stdout_returns_ok() {
let result = create_sink(&SinkConfig::Stdout);
assert!(result.is_ok());
}
#[test]
fn create_sink_stdout_write_and_flush_succeed() {
let mut sink = create_sink(&SinkConfig::Stdout).unwrap();
assert!(sink.write(b"").is_ok());
assert!(sink.flush().is_ok());
}
#[test]
fn sink_config_stdout_deserializes_from_yaml() {
let yaml = "type: stdout";
let config: SinkConfig = serde_yaml::from_str(yaml).unwrap();
assert!(matches!(config, SinkConfig::Stdout));
}
#[test]
fn sink_config_is_cloneable() {
let config = SinkConfig::Stdout;
let cloned = config.clone();
assert!(create_sink(&config).is_ok());
assert!(create_sink(&cloned).is_ok());
}
#[test]
fn sink_config_is_debuggable() {
let config = SinkConfig::Stdout;
let s = format!("{config:?}");
assert!(s.contains("Stdout"));
}
#[test]
fn sink_config_file_deserializes_with_type_field() {
let yaml = "type: file\npath: /tmp/sonda-mod-test.txt";
let config: SinkConfig = serde_yaml::from_str(yaml).unwrap();
assert!(
matches!(config, SinkConfig::File { ref path } if path == "/tmp/sonda-mod-test.txt")
);
}
#[test]
fn sink_config_tcp_deserializes_with_type_field() {
let yaml = "type: tcp\naddress: \"127.0.0.1:9999\"";
let config: SinkConfig = serde_yaml::from_str(yaml).unwrap();
assert!(matches!(config, SinkConfig::Tcp { ref address } if address == "127.0.0.1:9999"));
}
#[test]
fn sink_config_udp_deserializes_with_type_field() {
let yaml = "type: udp\naddress: \"127.0.0.1:9999\"";
let config: SinkConfig = serde_yaml::from_str(yaml).unwrap();
assert!(matches!(config, SinkConfig::Udp { ref address } if address == "127.0.0.1:9999"));
}
#[test]
fn sink_config_unknown_type_returns_error() {
let yaml = "type: no_such_sink";
let result: Result<SinkConfig, _> = serde_yaml::from_str(yaml);
assert!(
result.is_err(),
"unknown type tag should fail deserialization"
);
}
#[test]
fn sink_config_missing_type_field_returns_error() {
let yaml = "stdout";
let result: Result<SinkConfig, _> = serde_yaml::from_str(yaml);
assert!(
result.is_err(),
"missing type field should fail deserialization"
);
}
#[test]
fn sink_config_old_external_tag_format_is_rejected() {
let yaml = "!stdout";
let result: Result<SinkConfig, _> = serde_yaml::from_str(yaml);
assert!(
result.is_err(),
"externally-tagged YAML format must be rejected in favour of internally-tagged"
);
}
#[test]
fn sink_config_file_requires_path_field() {
let yaml = "type: file";
let result: Result<SinkConfig, _> = serde_yaml::from_str(yaml);
assert!(
result.is_err(),
"file variant without path should fail deserialization"
);
}
#[test]
fn sink_config_tcp_requires_address_field() {
let yaml = "type: tcp";
let result: Result<SinkConfig, _> = serde_yaml::from_str(yaml);
assert!(
result.is_err(),
"tcp variant without address should fail deserialization"
);
}
#[test]
fn sink_config_udp_requires_address_field() {
let yaml = "type: udp";
let result: Result<SinkConfig, _> = serde_yaml::from_str(yaml);
assert!(
result.is_err(),
"udp variant without address should fail deserialization"
);
}
#[test]
fn sink_config_is_send_and_sync() {
fn assert_send_sync<T: Send + Sync>() {}
assert_send_sync::<SinkConfig>();
}
#[test]
fn sink_config_file_is_cloneable_and_debuggable() {
let config = SinkConfig::File {
path: "/tmp/test.txt".to_string(),
};
let cloned = config.clone();
assert!(matches!(cloned, SinkConfig::File { ref path } if path == "/tmp/test.txt"));
let s = format!("{config:?}");
assert!(s.contains("File"));
}
#[test]
fn sink_config_tcp_is_cloneable_and_debuggable() {
let config = SinkConfig::Tcp {
address: "127.0.0.1:9999".to_string(),
};
let cloned = config.clone();
assert!(matches!(cloned, SinkConfig::Tcp { ref address } if address == "127.0.0.1:9999"));
let s = format!("{config:?}");
assert!(s.contains("Tcp"));
}
#[test]
fn sink_config_udp_is_cloneable_and_debuggable() {
let config = SinkConfig::Udp {
address: "127.0.0.1:9999".to_string(),
};
let cloned = config.clone();
assert!(matches!(cloned, SinkConfig::Udp { ref address } if address == "127.0.0.1:9999"));
let s = format!("{config:?}");
assert!(s.contains("Udp"));
}
#[test]
fn scenario_yaml_with_tcp_sink_deserializes_correctly() {
use crate::config::ScenarioConfig;
let yaml = r#"
name: test_metric
rate: 100.0
generator:
type: constant
value: 1.0
encoder:
type: prometheus_text
sink:
type: tcp
address: "127.0.0.1:4321"
"#;
let config: ScenarioConfig = serde_yaml::from_str(yaml).unwrap();
assert_eq!(config.name, "test_metric");
assert!(matches!(
config.encoder,
crate::encoder::EncoderConfig::PrometheusText
));
assert!(
matches!(config.sink, SinkConfig::Tcp { ref address } if address == "127.0.0.1:4321")
);
}
#[test]
fn scenario_yaml_with_file_sink_and_json_encoder_deserializes_correctly() {
use crate::config::ScenarioConfig;
let yaml = r#"
name: file_json_test
rate: 10.0
generator:
type: constant
value: 42.0
encoder:
type: json_lines
sink:
type: file
path: /tmp/sonda-file-json-test.txt
"#;
let config: ScenarioConfig = serde_yaml::from_str(yaml).unwrap();
assert!(matches!(
config.encoder,
crate::encoder::EncoderConfig::JsonLines
));
assert!(
matches!(config.sink, SinkConfig::File { ref path } if path == "/tmp/sonda-file-json-test.txt")
);
}
#[test]
fn scenario_yaml_with_udp_sink_and_influx_encoder_deserializes_correctly() {
use crate::config::ScenarioConfig;
let yaml = r#"
name: udp_influx_test
rate: 50.0
generator:
type: constant
value: 0.0
encoder:
type: influx_lp
field_key: "bytes"
sink:
type: udp
address: "127.0.0.1:5555"
"#;
let config: ScenarioConfig = serde_yaml::from_str(yaml).unwrap();
assert!(matches!(
config.encoder,
crate::encoder::EncoderConfig::InfluxLineProtocol { field_key: Some(ref k) } if k == "bytes"
));
assert!(
matches!(config.sink, SinkConfig::Udp { ref address } if address == "127.0.0.1:5555")
);
}
#[cfg(feature = "kafka")]
#[test]
fn sink_config_kafka_deserializes_with_type_field() {
let yaml = "type: kafka\nbrokers: \"127.0.0.1:9092\"\ntopic: sonda-test";
let config: SinkConfig = serde_yaml::from_str(yaml).unwrap();
assert!(
matches!(config, SinkConfig::Kafka { ref brokers, ref topic }
if brokers == "127.0.0.1:9092" && topic == "sonda-test")
);
}
#[cfg(feature = "kafka")]
#[test]
fn sink_config_kafka_requires_brokers_field() {
let yaml = "type: kafka\ntopic: sonda-test";
let result: Result<SinkConfig, _> = serde_yaml::from_str(yaml);
assert!(
result.is_err(),
"kafka variant without brokers should fail deserialization"
);
}
#[cfg(feature = "kafka")]
#[test]
fn sink_config_kafka_requires_topic_field() {
let yaml = "type: kafka\nbrokers: \"127.0.0.1:9092\"";
let result: Result<SinkConfig, _> = serde_yaml::from_str(yaml);
assert!(
result.is_err(),
"kafka variant without topic should fail deserialization"
);
}
#[cfg(feature = "kafka")]
#[test]
fn sink_config_kafka_is_cloneable_and_debuggable() {
let config = SinkConfig::Kafka {
brokers: "127.0.0.1:9092".to_string(),
topic: "sonda-test".to_string(),
};
let cloned = config.clone();
assert!(
matches!(cloned, SinkConfig::Kafka { ref brokers, ref topic }
if brokers == "127.0.0.1:9092" && topic == "sonda-test")
);
let s = format!("{config:?}");
assert!(s.contains("Kafka"));
}
#[cfg(feature = "kafka")]
#[test]
#[ignore = "requires network timeout which is slow; run with --ignored when desired"]
fn create_sink_kafka_with_unreachable_broker_returns_err() {
let config = SinkConfig::Kafka {
brokers: "127.0.0.1:1".to_string(),
topic: "sonda-test".to_string(),
};
let result = create_sink(&config);
assert!(
result.is_err(),
"create_sink should propagate the broker connection failure"
);
}
#[cfg(feature = "kafka")]
#[test]
fn create_sink_kafka_with_empty_broker_returns_err() {
let config = SinkConfig::Kafka {
brokers: String::new(),
topic: "sonda-test".to_string(),
};
let result = create_sink(&config);
assert!(
result.is_err(),
"create_sink should reject an empty broker string"
);
}
#[test]
fn sink_config_http_push_with_headers_deserializes() {
let yaml = r#"
type: http_push
url: "http://localhost:8428/api/v1/write"
headers:
Content-Type: "application/x-protobuf"
Content-Encoding: "snappy"
X-Prometheus-Remote-Write-Version: "0.1.0"
"#;
let config: SinkConfig = serde_yaml::from_str(yaml).expect("should deserialize");
match config {
SinkConfig::HttpPush { url, headers, .. } => {
assert_eq!(url, "http://localhost:8428/api/v1/write");
let hdr = headers.expect("headers should be Some");
assert_eq!(
hdr.get("Content-Type").map(String::as_str),
Some("application/x-protobuf")
);
assert_eq!(
hdr.get("Content-Encoding").map(String::as_str),
Some("snappy")
);
assert_eq!(
hdr.get("X-Prometheus-Remote-Write-Version")
.map(String::as_str),
Some("0.1.0")
);
}
other => panic!("expected HttpPush, got {other:?}"),
}
}
#[test]
fn sink_config_http_push_without_headers_is_backward_compatible() {
let yaml = r#"
type: http_push
url: "http://localhost:9090/push"
content_type: "text/plain"
"#;
let config: SinkConfig = serde_yaml::from_str(yaml).expect("should deserialize");
match config {
SinkConfig::HttpPush {
url,
headers,
content_type,
..
} => {
assert_eq!(url, "http://localhost:9090/push");
assert_eq!(content_type.as_deref(), Some("text/plain"));
assert!(
headers.is_none(),
"headers should default to None when not specified"
);
}
other => panic!("expected HttpPush, got {other:?}"),
}
}
#[test]
fn sink_config_http_push_with_empty_headers_map_deserializes() {
let yaml = r#"
type: http_push
url: "http://localhost:9090/push"
headers: {}
"#;
let config: SinkConfig = serde_yaml::from_str(yaml).expect("should deserialize");
match config {
SinkConfig::HttpPush { headers, .. } => {
let hdr = headers.expect("headers should be Some even when empty");
assert!(
hdr.is_empty(),
"empty headers map should deserialize as empty HashMap"
);
}
other => panic!("expected HttpPush, got {other:?}"),
}
}
#[test]
fn sink_config_http_push_with_headers_is_cloneable_and_debuggable() {
let mut hdr = HashMap::new();
hdr.insert("X-Custom".to_string(), "val".to_string());
let config = SinkConfig::HttpPush {
url: "http://localhost:9090/push".to_string(),
content_type: None,
batch_size: None,
headers: Some(hdr),
};
let cloned = config.clone();
let debug_str = format!("{cloned:?}");
assert!(debug_str.contains("HttpPush"));
assert!(debug_str.contains("X-Custom"));
}
}