pub mod channel;
pub mod file;
#[cfg(feature = "http")]
pub mod http;
#[cfg(feature = "kafka")]
pub mod kafka;
#[cfg(feature = "http")]
pub mod loki;
pub mod memory;
#[cfg(feature = "otlp")]
pub mod otlp_grpc;
#[cfg(feature = "remote-write")]
pub mod remote_write;
pub mod retry;
pub mod stdout;
pub mod tcp;
pub mod udp;
use std::collections::HashMap;
use std::path::Path;
use crate::SondaError;
pub trait Sink: Send + Sync {
fn write(&mut self, data: &[u8]) -> Result<(), SondaError>;
fn flush(&mut self) -> Result<(), SondaError>;
}
#[derive(Debug, Clone)]
#[cfg_attr(feature = "config", derive(serde::Deserialize))]
#[cfg_attr(feature = "config", serde(tag = "type"))]
pub enum SinkConfig {
#[cfg_attr(feature = "config", serde(rename = "stdout"))]
Stdout,
#[cfg_attr(feature = "config", serde(rename = "file"))]
File {
path: String,
},
#[cfg_attr(feature = "config", serde(rename = "tcp"))]
Tcp {
address: String,
#[cfg_attr(feature = "config", serde(default))]
retry: Option<retry::RetryConfig>,
},
#[cfg_attr(feature = "config", serde(rename = "udp"))]
Udp {
address: String,
},
#[cfg(feature = "http")]
#[cfg_attr(feature = "config", serde(rename = "http_push"))]
HttpPush {
url: String,
content_type: Option<String>,
batch_size: Option<usize>,
#[cfg_attr(feature = "config", serde(default))]
headers: Option<HashMap<String, String>>,
#[cfg_attr(feature = "config", serde(default))]
retry: Option<retry::RetryConfig>,
},
#[cfg(feature = "remote-write")]
#[cfg_attr(feature = "config", serde(rename = "remote_write"))]
RemoteWrite {
url: String,
#[cfg_attr(feature = "config", serde(default))]
batch_size: Option<usize>,
#[cfg_attr(feature = "config", serde(default))]
retry: Option<retry::RetryConfig>,
},
#[cfg(feature = "kafka")]
#[cfg_attr(feature = "config", serde(rename = "kafka"))]
Kafka {
brokers: String,
topic: String,
#[cfg_attr(feature = "config", serde(default))]
retry: Option<retry::RetryConfig>,
},
#[cfg(feature = "http")]
#[cfg_attr(feature = "config", serde(rename = "loki"))]
Loki {
url: String,
#[cfg_attr(feature = "config", serde(default))]
batch_size: Option<usize>,
#[cfg_attr(feature = "config", serde(default))]
retry: Option<retry::RetryConfig>,
},
#[cfg(feature = "otlp")]
#[cfg_attr(feature = "config", serde(rename = "otlp_grpc"))]
OtlpGrpc {
endpoint: String,
signal_type: otlp_grpc::OtlpSignalType,
#[cfg_attr(feature = "config", serde(default))]
batch_size: Option<usize>,
#[cfg_attr(feature = "config", serde(default))]
retry: Option<retry::RetryConfig>,
},
}
pub fn create_sink(
config: &SinkConfig,
labels: Option<&HashMap<String, String>>,
) -> Result<Box<dyn Sink>, SondaError> {
let _ = &labels;
match config {
SinkConfig::Stdout => Ok(Box::new(stdout::StdoutSink::new())),
SinkConfig::File { path } => Ok(Box::new(file::FileSink::new(Path::new(path))?)),
SinkConfig::Tcp {
address,
retry: retry_cfg,
} => {
let rp = retry_cfg
.as_ref()
.map(retry::RetryPolicy::from_config)
.transpose()?;
Ok(Box::new(tcp::TcpSink::new(address, rp)?))
}
SinkConfig::Udp { address } => Ok(Box::new(udp::UdpSink::new(address)?)),
#[cfg(feature = "http")]
SinkConfig::HttpPush {
url,
content_type,
batch_size,
headers,
retry: retry_cfg,
} => {
let ct = content_type
.as_deref()
.unwrap_or("application/octet-stream");
let bs = batch_size.unwrap_or(http::DEFAULT_BATCH_SIZE);
let h = headers.clone().unwrap_or_default();
let rp = retry_cfg
.as_ref()
.map(retry::RetryPolicy::from_config)
.transpose()?;
Ok(Box::new(http::HttpPushSink::new(url, ct, bs, h, rp)?))
}
#[cfg(feature = "remote-write")]
SinkConfig::RemoteWrite {
url,
batch_size,
retry: retry_cfg,
} => {
let bs = batch_size.unwrap_or(remote_write::DEFAULT_BATCH_SIZE);
let rp = retry_cfg
.as_ref()
.map(retry::RetryPolicy::from_config)
.transpose()?;
Ok(Box::new(remote_write::RemoteWriteSink::new(url, bs, rp)?))
}
#[cfg(feature = "kafka")]
SinkConfig::Kafka {
brokers,
topic,
retry: retry_cfg,
} => {
let rp = retry_cfg
.as_ref()
.map(retry::RetryPolicy::from_config)
.transpose()?;
Ok(Box::new(kafka::KafkaSink::new(brokers, topic, rp)?))
}
#[cfg(feature = "http")]
SinkConfig::Loki {
url,
batch_size,
retry: retry_cfg,
} => {
let bs = batch_size.unwrap_or(100);
let loki_labels = labels.cloned().unwrap_or_default();
let rp = retry_cfg
.as_ref()
.map(retry::RetryPolicy::from_config)
.transpose()?;
Ok(Box::new(loki::LokiSink::new(
url.clone(),
loki_labels,
bs,
rp,
)?))
}
#[cfg(feature = "otlp")]
SinkConfig::OtlpGrpc {
endpoint,
signal_type,
batch_size,
retry: retry_cfg,
} => {
let bs = batch_size.unwrap_or(otlp_grpc::DEFAULT_BATCH_SIZE);
let resource_attrs: Vec<crate::encoder::otlp::KeyValue> = labels
.map(|l| {
l.iter()
.map(|(k, v)| crate::encoder::otlp::KeyValue {
key: k.clone(),
value: Some(crate::encoder::otlp::AnyValue {
value: Some(crate::encoder::otlp::any_value::Value::StringValue(
v.clone(),
)),
}),
})
.collect()
})
.unwrap_or_default();
let rp = retry_cfg
.as_ref()
.map(retry::RetryPolicy::from_config)
.transpose()?;
Ok(Box::new(otlp_grpc::OtlpGrpcSink::new(
endpoint,
*signal_type,
bs,
resource_attrs,
rp,
)?))
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn create_sink_stdout_returns_ok() {
let result = create_sink(&SinkConfig::Stdout, None);
assert!(result.is_ok());
}
#[test]
fn create_sink_stdout_write_and_flush_succeed() {
let mut sink = create_sink(&SinkConfig::Stdout, None).unwrap();
assert!(sink.write(b"").is_ok());
assert!(sink.flush().is_ok());
}
#[cfg(feature = "config")]
#[test]
fn sink_config_stdout_deserializes_from_yaml() {
let yaml = "type: stdout";
let config: SinkConfig = serde_yaml_ng::from_str(yaml).unwrap();
assert!(matches!(config, SinkConfig::Stdout));
}
#[test]
fn sink_config_is_cloneable() {
let config = SinkConfig::Stdout;
let cloned = config.clone();
assert!(create_sink(&config, None).is_ok());
assert!(create_sink(&cloned, None).is_ok());
}
#[test]
fn sink_config_is_debuggable() {
let config = SinkConfig::Stdout;
let s = format!("{config:?}");
assert!(s.contains("Stdout"));
}
#[cfg(feature = "config")]
#[test]
fn sink_config_file_deserializes_with_type_field() {
let yaml = "type: file\npath: /tmp/sonda-mod-test.txt";
let config: SinkConfig = serde_yaml_ng::from_str(yaml).unwrap();
assert!(
matches!(config, SinkConfig::File { ref path } if path == "/tmp/sonda-mod-test.txt")
);
}
#[cfg(feature = "config")]
#[test]
fn sink_config_tcp_deserializes_with_type_field() {
let yaml = "type: tcp\naddress: \"127.0.0.1:9999\"";
let config: SinkConfig = serde_yaml_ng::from_str(yaml).unwrap();
assert!(
matches!(config, SinkConfig::Tcp { ref address, .. } if address == "127.0.0.1:9999")
);
}
#[cfg(feature = "config")]
#[test]
fn sink_config_udp_deserializes_with_type_field() {
let yaml = "type: udp\naddress: \"127.0.0.1:9999\"";
let config: SinkConfig = serde_yaml_ng::from_str(yaml).unwrap();
assert!(matches!(config, SinkConfig::Udp { ref address } if address == "127.0.0.1:9999"));
}
#[cfg(feature = "config")]
#[test]
fn sink_config_unknown_type_returns_error() {
let yaml = "type: no_such_sink";
let result: Result<SinkConfig, _> = serde_yaml_ng::from_str(yaml);
assert!(
result.is_err(),
"unknown type tag should fail deserialization"
);
}
#[cfg(feature = "config")]
#[test]
fn sink_config_missing_type_field_returns_error() {
let yaml = "stdout";
let result: Result<SinkConfig, _> = serde_yaml_ng::from_str(yaml);
assert!(
result.is_err(),
"missing type field should fail deserialization"
);
}
#[cfg(feature = "config")]
#[test]
fn sink_config_old_external_tag_format_is_rejected() {
let yaml = "!stdout";
let result: Result<SinkConfig, _> = serde_yaml_ng::from_str(yaml);
assert!(
result.is_err(),
"externally-tagged YAML format must be rejected in favour of internally-tagged"
);
}
#[cfg(feature = "config")]
#[test]
fn sink_config_file_requires_path_field() {
let yaml = "type: file";
let result: Result<SinkConfig, _> = serde_yaml_ng::from_str(yaml);
assert!(
result.is_err(),
"file variant without path should fail deserialization"
);
}
#[cfg(feature = "config")]
#[test]
fn sink_config_tcp_requires_address_field() {
let yaml = "type: tcp";
let result: Result<SinkConfig, _> = serde_yaml_ng::from_str(yaml);
assert!(
result.is_err(),
"tcp variant without address should fail deserialization"
);
}
#[cfg(feature = "config")]
#[test]
fn sink_config_udp_requires_address_field() {
let yaml = "type: udp";
let result: Result<SinkConfig, _> = serde_yaml_ng::from_str(yaml);
assert!(
result.is_err(),
"udp variant without address should fail deserialization"
);
}
#[test]
fn sink_config_is_send_and_sync() {
fn assert_send_sync<T: Send + Sync>() {}
assert_send_sync::<SinkConfig>();
}
#[test]
fn sink_config_file_is_cloneable_and_debuggable() {
let config = SinkConfig::File {
path: "/tmp/test.txt".to_string(),
};
let cloned = config.clone();
assert!(matches!(cloned, SinkConfig::File { ref path } if path == "/tmp/test.txt"));
let s = format!("{config:?}");
assert!(s.contains("File"));
}
#[test]
fn sink_config_tcp_is_cloneable_and_debuggable() {
let config = SinkConfig::Tcp {
address: "127.0.0.1:9999".to_string(),
retry: None,
};
let cloned = config.clone();
assert!(
matches!(cloned, SinkConfig::Tcp { ref address, .. } if address == "127.0.0.1:9999")
);
let s = format!("{config:?}");
assert!(s.contains("Tcp"));
}
#[test]
fn sink_config_udp_is_cloneable_and_debuggable() {
let config = SinkConfig::Udp {
address: "127.0.0.1:9999".to_string(),
};
let cloned = config.clone();
assert!(matches!(cloned, SinkConfig::Udp { ref address } if address == "127.0.0.1:9999"));
let s = format!("{config:?}");
assert!(s.contains("Udp"));
}
#[cfg(feature = "config")]
#[test]
fn scenario_yaml_with_tcp_sink_deserializes_correctly() {
use crate::config::ScenarioConfig;
let yaml = r#"
name: test_metric
rate: 100.0
generator:
type: constant
value: 1.0
encoder:
type: prometheus_text
sink:
type: tcp
address: "127.0.0.1:4321"
"#;
let config: ScenarioConfig = serde_yaml_ng::from_str(yaml).unwrap();
assert_eq!(config.name, "test_metric");
assert!(matches!(
config.encoder,
crate::encoder::EncoderConfig::PrometheusText { .. }
));
assert!(
matches!(config.sink, SinkConfig::Tcp { ref address, .. } if address == "127.0.0.1:4321")
);
}
#[cfg(feature = "config")]
#[test]
fn scenario_yaml_with_file_sink_and_json_encoder_deserializes_correctly() {
use crate::config::ScenarioConfig;
let yaml = r#"
name: file_json_test
rate: 10.0
generator:
type: constant
value: 42.0
encoder:
type: json_lines
sink:
type: file
path: /tmp/sonda-file-json-test.txt
"#;
let config: ScenarioConfig = serde_yaml_ng::from_str(yaml).unwrap();
assert!(matches!(
config.encoder,
crate::encoder::EncoderConfig::JsonLines { .. }
));
assert!(
matches!(config.sink, SinkConfig::File { ref path } if path == "/tmp/sonda-file-json-test.txt")
);
}
#[cfg(feature = "config")]
#[test]
fn scenario_yaml_with_udp_sink_and_influx_encoder_deserializes_correctly() {
use crate::config::ScenarioConfig;
let yaml = r#"
name: udp_influx_test
rate: 50.0
generator:
type: constant
value: 0.0
encoder:
type: influx_lp
field_key: "bytes"
sink:
type: udp
address: "127.0.0.1:5555"
"#;
let config: ScenarioConfig = serde_yaml_ng::from_str(yaml).unwrap();
assert!(matches!(
config.encoder,
crate::encoder::EncoderConfig::InfluxLineProtocol { field_key: Some(ref k), .. } if k == "bytes"
));
assert!(
matches!(config.sink, SinkConfig::Udp { ref address } if address == "127.0.0.1:5555")
);
}
#[cfg(all(feature = "kafka", feature = "config"))]
#[test]
fn sink_config_kafka_deserializes_with_type_field() {
let yaml = "type: kafka\nbrokers: \"127.0.0.1:9092\"\ntopic: sonda-test";
let config: SinkConfig = serde_yaml_ng::from_str(yaml).unwrap();
assert!(
matches!(config, SinkConfig::Kafka { ref brokers, ref topic, .. }
if brokers == "127.0.0.1:9092" && topic == "sonda-test")
);
}
#[cfg(all(feature = "kafka", feature = "config"))]
#[test]
fn sink_config_kafka_requires_brokers_field() {
let yaml = "type: kafka\ntopic: sonda-test";
let result: Result<SinkConfig, _> = serde_yaml_ng::from_str(yaml);
assert!(
result.is_err(),
"kafka variant without brokers should fail deserialization"
);
}
#[cfg(all(feature = "kafka", feature = "config"))]
#[test]
fn sink_config_kafka_requires_topic_field() {
let yaml = "type: kafka\nbrokers: \"127.0.0.1:9092\"";
let result: Result<SinkConfig, _> = serde_yaml_ng::from_str(yaml);
assert!(
result.is_err(),
"kafka variant without topic should fail deserialization"
);
}
#[cfg(feature = "kafka")]
#[test]
fn sink_config_kafka_is_cloneable_and_debuggable() {
let config = SinkConfig::Kafka {
brokers: "127.0.0.1:9092".to_string(),
topic: "sonda-test".to_string(),
retry: None,
};
let cloned = config.clone();
assert!(
matches!(cloned, SinkConfig::Kafka { ref brokers, ref topic, .. }
if brokers == "127.0.0.1:9092" && topic == "sonda-test")
);
let s = format!("{config:?}");
assert!(s.contains("Kafka"));
}
#[cfg(feature = "kafka")]
#[test]
#[ignore = "requires network timeout which is slow; run with --ignored when desired"]
fn create_sink_kafka_with_unreachable_broker_returns_err() {
let config = SinkConfig::Kafka {
brokers: "127.0.0.1:1".to_string(),
topic: "sonda-test".to_string(),
retry: None,
};
let result = create_sink(&config, None);
assert!(
result.is_err(),
"create_sink should propagate the broker connection failure"
);
}
#[cfg(feature = "kafka")]
#[test]
fn create_sink_kafka_with_empty_broker_returns_err() {
let config = SinkConfig::Kafka {
brokers: String::new(),
topic: "sonda-test".to_string(),
retry: None,
};
let result = create_sink(&config, None);
assert!(
result.is_err(),
"create_sink should reject an empty broker string"
);
}
#[cfg(all(feature = "http", feature = "config"))]
#[test]
fn sink_config_http_push_with_headers_deserializes() {
let yaml = r#"
type: http_push
url: "http://localhost:8428/api/v1/write"
headers:
Content-Type: "application/x-protobuf"
Content-Encoding: "snappy"
X-Prometheus-Remote-Write-Version: "0.1.0"
"#;
let config: SinkConfig = serde_yaml_ng::from_str(yaml).expect("should deserialize");
match config {
SinkConfig::HttpPush { url, headers, .. } => {
assert_eq!(url, "http://localhost:8428/api/v1/write");
let hdr = headers.expect("headers should be Some");
assert_eq!(
hdr.get("Content-Type").map(String::as_str),
Some("application/x-protobuf")
);
assert_eq!(
hdr.get("Content-Encoding").map(String::as_str),
Some("snappy")
);
assert_eq!(
hdr.get("X-Prometheus-Remote-Write-Version")
.map(String::as_str),
Some("0.1.0")
);
}
other => panic!("expected HttpPush, got {other:?}"),
}
}
#[cfg(all(feature = "http", feature = "config"))]
#[test]
fn sink_config_http_push_without_headers_is_backward_compatible() {
let yaml = r#"
type: http_push
url: "http://localhost:9090/push"
content_type: "text/plain"
"#;
let config: SinkConfig = serde_yaml_ng::from_str(yaml).expect("should deserialize");
match config {
SinkConfig::HttpPush {
url,
headers,
content_type,
..
} => {
assert_eq!(url, "http://localhost:9090/push");
assert_eq!(content_type.as_deref(), Some("text/plain"));
assert!(
headers.is_none(),
"headers should default to None when not specified"
);
}
other => panic!("expected HttpPush, got {other:?}"),
}
}
#[cfg(all(feature = "http", feature = "config"))]
#[test]
fn sink_config_http_push_with_empty_headers_map_deserializes() {
let yaml = r#"
type: http_push
url: "http://localhost:9090/push"
headers: {}
"#;
let config: SinkConfig = serde_yaml_ng::from_str(yaml).expect("should deserialize");
match config {
SinkConfig::HttpPush { headers, .. } => {
let hdr = headers.expect("headers should be Some even when empty");
assert!(
hdr.is_empty(),
"empty headers map should deserialize as empty HashMap"
);
}
other => panic!("expected HttpPush, got {other:?}"),
}
}
#[cfg(feature = "http")]
#[test]
fn sink_config_http_push_with_headers_is_cloneable_and_debuggable() {
let mut hdr = HashMap::new();
hdr.insert("X-Custom".to_string(), "val".to_string());
let config = SinkConfig::HttpPush {
url: "http://localhost:9090/push".to_string(),
content_type: None,
batch_size: None,
headers: Some(hdr),
retry: None,
};
let cloned = config.clone();
let debug_str = format!("{cloned:?}");
assert!(debug_str.contains("HttpPush"));
assert!(debug_str.contains("X-Custom"));
}
#[cfg(feature = "http")]
#[test]
fn http_feature_enables_http_push_variant() {
let config = SinkConfig::HttpPush {
url: "http://127.0.0.1:19999/push".to_string(),
content_type: None,
batch_size: None,
headers: None,
retry: None,
};
let result = create_sink(&config, None);
assert!(
result.is_ok(),
"HttpPush variant must be available when http feature is enabled"
);
}
#[cfg(feature = "http")]
#[test]
fn http_feature_enables_loki_variant() {
let config = SinkConfig::Loki {
url: "http://127.0.0.1:19999".to_string(),
batch_size: None,
retry: None,
};
let result = create_sink(&config, None);
assert!(
result.is_ok(),
"Loki variant must be available when http feature is enabled"
);
}
#[cfg(all(feature = "http", feature = "config"))]
#[test]
fn http_feature_enables_http_push_deserialization() {
let yaml = "type: http_push\nurl: \"http://localhost:9090/push\"";
let config: SinkConfig = serde_yaml_ng::from_str(yaml).expect("should deserialize");
assert!(matches!(config, SinkConfig::HttpPush { .. }));
}
#[cfg(all(feature = "http", feature = "config"))]
#[test]
fn http_feature_enables_loki_deserialization() {
let yaml = "type: loki\nurl: \"http://localhost:3100\"";
let config: SinkConfig = serde_yaml_ng::from_str(yaml).expect("should deserialize");
assert!(matches!(config, SinkConfig::Loki { .. }));
}
#[test]
fn non_http_sinks_available_without_http_feature() {
assert!(create_sink(&SinkConfig::Stdout, None).is_ok());
}
#[cfg(feature = "config")]
#[test]
fn sink_config_tcp_with_retry_deserializes() {
let yaml = r#"
type: tcp
address: "127.0.0.1:9999"
retry:
max_attempts: 3
initial_backoff: 100ms
max_backoff: 5s
"#;
let config: SinkConfig = serde_yaml_ng::from_str(yaml).expect("should deserialize");
match config {
SinkConfig::Tcp { address, retry } => {
assert_eq!(address, "127.0.0.1:9999");
let r = retry.expect("retry should be Some");
assert_eq!(r.max_attempts, 3);
assert_eq!(r.initial_backoff, "100ms");
assert_eq!(r.max_backoff, "5s");
}
other => panic!("expected SinkConfig::Tcp, got {other:?}"),
}
}
#[cfg(feature = "config")]
#[test]
fn sink_config_tcp_without_retry_has_none() {
let yaml = "type: tcp\naddress: \"127.0.0.1:9999\"";
let config: SinkConfig = serde_yaml_ng::from_str(yaml).expect("should deserialize");
match config {
SinkConfig::Tcp { retry, .. } => {
assert!(retry.is_none(), "retry should default to None");
}
other => panic!("expected SinkConfig::Tcp, got {other:?}"),
}
}
#[cfg(all(feature = "http", feature = "config"))]
#[test]
fn sink_config_http_push_with_retry_deserializes() {
let yaml = r#"
type: http_push
url: "http://localhost:9090/push"
retry:
max_attempts: 5
initial_backoff: 200ms
max_backoff: 10s
"#;
let config: SinkConfig = serde_yaml_ng::from_str(yaml).expect("should deserialize");
match config {
SinkConfig::HttpPush { retry, .. } => {
let r = retry.expect("retry should be Some");
assert_eq!(r.max_attempts, 5);
assert_eq!(r.initial_backoff, "200ms");
assert_eq!(r.max_backoff, "10s");
}
other => panic!("expected HttpPush, got {other:?}"),
}
}
#[cfg(all(feature = "http", feature = "config"))]
#[test]
fn sink_config_http_push_without_retry_is_backward_compatible() {
let yaml = "type: http_push\nurl: \"http://localhost:9090/push\"";
let config: SinkConfig = serde_yaml_ng::from_str(yaml).expect("should deserialize");
match config {
SinkConfig::HttpPush { retry, .. } => {
assert!(retry.is_none(), "retry should default to None");
}
other => panic!("expected HttpPush, got {other:?}"),
}
}
}