#![cfg(feature = "daemon-otlp")]
mod common;
use common::{DaemonProcess, SIMPLE_RULE, http_get, poll_until, temp_file};
use opentelemetry_proto::tonic::{
collector::logs::v1::ExportLogsServiceRequest,
common::v1::{AnyValue, KeyValue, KeyValueList, any_value},
logs::v1::{LogRecord, ResourceLogs, ScopeLogs},
resource::v1::Resource,
};
use prost::Message;
use std::time::Duration;
fn string_val(s: &str) -> AnyValue {
AnyValue {
value: Some(any_value::Value::StringValue(s.to_string())),
}
}
fn kv(key: &str, value: AnyValue) -> KeyValue {
KeyValue {
key: key.to_string(),
value: Some(value),
..Default::default()
}
}
fn make_otlp_request(fields: Vec<KeyValue>) -> ExportLogsServiceRequest {
ExportLogsServiceRequest {
resource_logs: vec![ResourceLogs {
resource: Some(Resource {
attributes: vec![kv("service.name", string_val("test-service"))],
..Default::default()
}),
scope_logs: vec![ScopeLogs {
log_records: vec![LogRecord {
severity_text: "INFO".to_string(),
body: Some(AnyValue {
value: Some(any_value::Value::KvlistValue(KeyValueList {
values: fields,
})),
}),
..Default::default()
}],
..Default::default()
}],
..Default::default()
}],
}
}
fn wait_for_detection_match(daemon: &DaemonProcess) {
poll_until(Duration::from_secs(5), || {
let (_, body) = http_get(&daemon.url("/api/v1/status"));
let v: serde_json::Value = serde_json::from_str(&body).ok()?;
(v["detection_matches"].as_u64()? >= 1).then_some(())
})
.expect("detection_matches never reached 1 within 5s");
}
#[test]
fn otlp_http_protobuf_accepted() {
let rule = temp_file(".yml", SIMPLE_RULE);
let daemon = DaemonProcess::spawn_http(rule.path().to_str().unwrap());
let request = make_otlp_request(vec![kv("CommandLine", string_val("something benign"))]);
let mut buf = Vec::new();
request.encode(&mut buf).unwrap();
let resp = ureq::post(&daemon.url("/v1/logs"))
.header("Content-Type", "application/x-protobuf")
.send(&buf[..])
.expect("OTLP POST failed");
assert_eq!(resp.status().as_u16(), 200);
let body: serde_json::Value =
serde_json::from_str(&resp.into_body().read_to_string().unwrap()).unwrap();
assert_eq!(body["partialSuccess"]["rejectedLogRecords"], 0);
}
#[test]
fn otlp_http_protobuf_triggers_detection() {
let rule = temp_file(".yml", SIMPLE_RULE);
let daemon = DaemonProcess::spawn_http(rule.path().to_str().unwrap());
let request = make_otlp_request(vec![kv("CommandLine", string_val("run malware.exe now"))]);
let mut buf = Vec::new();
request.encode(&mut buf).unwrap();
let resp = ureq::post(&daemon.url("/v1/logs"))
.header("Content-Type", "application/x-protobuf")
.send(&buf[..])
.expect("OTLP POST failed");
assert_eq!(resp.status().as_u16(), 200);
wait_for_detection_match(&daemon);
let (_, status_body) = http_get(&daemon.url("/api/v1/status"));
let v: serde_json::Value = serde_json::from_str(&status_body).unwrap();
assert!(
v["events_processed"].as_u64().unwrap() >= 1,
"events_processed should be >= 1 after OTLP ingestion"
);
assert!(
v["detection_matches"].as_u64().unwrap() >= 1,
"detection_matches should be >= 1 for matching OTLP event"
);
}
#[test]
fn otlp_http_json_accepted() {
let rule = temp_file(".yml", SIMPLE_RULE);
let daemon = DaemonProcess::spawn_http(rule.path().to_str().unwrap());
let request = make_otlp_request(vec![kv("CommandLine", string_val("benign process"))]);
let json_body = serde_json::to_string(&request).unwrap();
let resp = ureq::post(&daemon.url("/v1/logs"))
.header("Content-Type", "application/json")
.send(json_body.as_bytes())
.expect("OTLP JSON POST failed");
assert_eq!(resp.status().as_u16(), 200);
let body: serde_json::Value =
serde_json::from_str(&resp.into_body().read_to_string().unwrap()).unwrap();
assert_eq!(body["partialSuccess"]["rejectedLogRecords"], 0);
}
#[test]
fn otlp_http_json_triggers_detection() {
let rule = temp_file(".yml", SIMPLE_RULE);
let daemon = DaemonProcess::spawn_http(rule.path().to_str().unwrap());
let request = make_otlp_request(vec![kv("CommandLine", string_val("launch malware.exe"))]);
let json_body = serde_json::to_string(&request).unwrap();
let resp = ureq::post(&daemon.url("/v1/logs"))
.header("Content-Type", "application/json")
.send(json_body.as_bytes())
.expect("OTLP JSON POST failed");
assert_eq!(resp.status().as_u16(), 200);
wait_for_detection_match(&daemon);
let (_, status_body) = http_get(&daemon.url("/api/v1/status"));
let v: serde_json::Value = serde_json::from_str(&status_body).unwrap();
assert!(
v["detection_matches"].as_u64().unwrap() >= 1,
"detection_matches should be >= 1 for matching OTLP JSON event"
);
}
#[test]
fn otlp_http_gzip_protobuf_accepted() {
use flate2::Compression;
use flate2::write::GzEncoder;
use std::io::Write;
let rule = temp_file(".yml", SIMPLE_RULE);
let daemon = DaemonProcess::spawn_http(rule.path().to_str().unwrap());
let request = make_otlp_request(vec![kv("CommandLine", string_val("malware.exe gzip test"))]);
let mut proto_buf = Vec::new();
request.encode(&mut proto_buf).unwrap();
let mut encoder = GzEncoder::new(Vec::new(), Compression::default());
encoder.write_all(&proto_buf).unwrap();
let compressed = encoder.finish().unwrap();
let resp = ureq::post(&daemon.url("/v1/logs"))
.header("Content-Type", "application/x-protobuf")
.header("Content-Encoding", "gzip")
.send(&compressed[..])
.expect("OTLP gzip POST failed");
assert_eq!(resp.status().as_u16(), 200);
wait_for_detection_match(&daemon);
let (_, status_body) = http_get(&daemon.url("/api/v1/status"));
let v: serde_json::Value = serde_json::from_str(&status_body).unwrap();
assert!(
v["detection_matches"].as_u64().unwrap() >= 1,
"detection_matches should be >= 1 for gzip-compressed OTLP event"
);
}
#[test]
fn otlp_http_unsupported_content_type_returns_415() {
let rule = temp_file(".yml", SIMPLE_RULE);
let daemon = DaemonProcess::spawn_http(rule.path().to_str().unwrap());
let result = ureq::post(&daemon.url("/v1/logs"))
.header("Content-Type", "text/plain")
.send("not otlp");
match result {
Err(ureq::Error::StatusCode(415)) => {}
other => panic!("expected 415 Unsupported Media Type, got {other:?}"),
}
}
#[test]
fn otlp_http_malformed_protobuf_returns_400() {
let rule = temp_file(".yml", SIMPLE_RULE);
let daemon = DaemonProcess::spawn_http(rule.path().to_str().unwrap());
let result = ureq::post(&daemon.url("/v1/logs"))
.header("Content-Type", "application/x-protobuf")
.send(&b"not valid protobuf"[..]);
match result {
Err(ureq::Error::StatusCode(400)) => {}
other => panic!("expected 400 Bad Request, got {other:?}"),
}
}
#[test]
fn otlp_http_malformed_json_returns_400() {
let rule = temp_file(".yml", SIMPLE_RULE);
let daemon = DaemonProcess::spawn_http(rule.path().to_str().unwrap());
let result = ureq::post(&daemon.url("/v1/logs"))
.header("Content-Type", "application/json")
.send("{not valid json".as_bytes());
match result {
Err(ureq::Error::StatusCode(400)) => {}
other => panic!("expected 400 Bad Request, got {other:?}"),
}
}
#[test]
fn otlp_metrics_exposed_after_request() {
let rule = temp_file(".yml", SIMPLE_RULE);
let daemon = DaemonProcess::spawn_http(rule.path().to_str().unwrap());
let request = make_otlp_request(vec![kv("CommandLine", string_val("test"))]);
let mut buf = Vec::new();
request.encode(&mut buf).unwrap();
ureq::post(&daemon.url("/v1/logs"))
.header("Content-Type", "application/x-protobuf")
.send(&buf[..])
.expect("OTLP POST failed");
let body = poll_until(Duration::from_secs(5), || {
let (status, body) = http_get(&daemon.url("/metrics"));
(status == 200
&& body.contains("rsigma_otlp_requests_total")
&& body.contains("rsigma_otlp_log_records_total"))
.then_some(body)
})
.expect("OTLP metrics never appeared within 5s");
assert!(
body.contains("rsigma_otlp_requests_total"),
"metrics should contain rsigma_otlp_requests_total"
);
assert!(
body.contains("rsigma_otlp_log_records_total"),
"metrics should contain rsigma_otlp_log_records_total"
);
}