mod common;
use common::{rsigma, temp_file};
use insta::assert_snapshot;
use predicates::prelude::*;
use tempfile::TempDir;
const SIMPLE_DETECTION: &str = r#"
title: Detect Whoami
id: 00000000-0000-0000-0000-000000000100
status: test
logsource:
category: process_creation
product: windows
detection:
selection:
CommandLine|contains: 'whoami'
condition: selection
level: medium
"#;
const CORRELATION_RULES: &str = r#"
title: Failed Login
id: 00000000-0000-0000-0000-000000000200
status: test
logsource:
category: auth
product: generic
detection:
selection:
EventType: login_failure
condition: selection
level: low
---
title: Brute Force
correlation:
type: event_count
rules:
- 00000000-0000-0000-0000-000000000200
group-by:
- src_ip
timespan: 5m
condition:
gte: 5
level: high
"#;
#[test]
fn convert_simple_rule_to_postgres() {
let rule = temp_file(".yml", SIMPLE_DETECTION);
let output = rsigma()
.args([
"convert",
rule.path().to_str().unwrap(),
"--target",
"postgres",
])
.output()
.unwrap();
assert!(output.status.success());
assert_snapshot!(String::from_utf8_lossy(&output.stdout), @r#"SELECT * FROM security_events WHERE "CommandLine" ILIKE '%whoami%'"#);
}
#[test]
fn convert_with_format_view() {
let rule = temp_file(".yml", SIMPLE_DETECTION);
let output = rsigma()
.args([
"convert",
rule.path().to_str().unwrap(),
"--target",
"postgres",
"--format",
"view",
])
.output()
.unwrap();
assert!(output.status.success());
assert_snapshot!(String::from_utf8_lossy(&output.stdout), @r#"CREATE OR REPLACE VIEW sigma_00000000_0000_0000_0000_000000000100 AS SELECT * FROM security_events WHERE "CommandLine" ILIKE '%whoami%'"#);
}
#[test]
fn convert_with_format_timescaledb() {
let rule = temp_file(".yml", SIMPLE_DETECTION);
let output = rsigma()
.args([
"convert",
rule.path().to_str().unwrap(),
"--target",
"postgres",
"--format",
"timescaledb",
])
.output()
.unwrap();
assert!(output.status.success());
assert_snapshot!(String::from_utf8_lossy(&output.stdout), @r#"SELECT time_bucket('1 hour', time) AS bucket, * FROM security_events WHERE "CommandLine" ILIKE '%whoami%'"#);
}
#[test]
fn convert_directory_of_rules() {
let dir = TempDir::new().unwrap();
std::fs::write(dir.path().join("rule_a.yml"), SIMPLE_DETECTION).unwrap();
let second_rule = r#"
title: Detect Ipconfig
id: 00000000-0000-0000-0000-000000000101
status: test
logsource:
category: process_creation
product: windows
detection:
selection:
CommandLine|contains: 'ipconfig'
condition: selection
level: low
"#;
std::fs::write(dir.path().join("rule_b.yml"), second_rule).unwrap();
let output = rsigma()
.args([
"convert",
dir.path().to_str().unwrap(),
"--target",
"postgres",
])
.output()
.unwrap();
assert!(output.status.success());
let stdout = String::from_utf8_lossy(&output.stdout);
assert!(stdout.contains("whoami"), "should contain whoami rule");
assert!(stdout.contains("ipconfig"), "should contain ipconfig rule");
}
#[test]
fn convert_with_pipeline() {
let pipeline = temp_file(
".yml",
r#"
name: test-pipeline
priority: 10
transformations:
- type: field_name_mapping
mapping:
CommandLine: process.command_line
"#,
);
let rule = temp_file(".yml", SIMPLE_DETECTION);
let output = rsigma()
.args([
"convert",
rule.path().to_str().unwrap(),
"--target",
"postgres",
"-p",
pipeline.path().to_str().unwrap(),
])
.output()
.unwrap();
assert!(output.status.success());
assert_snapshot!(String::from_utf8_lossy(&output.stdout), @r#"SELECT * FROM security_events WHERE "process.command_line" ILIKE '%whoami%'"#);
}
#[test]
fn convert_skip_unsupported() {
let rule = temp_file(".yml", SIMPLE_DETECTION);
let output = rsigma()
.args([
"convert",
rule.path().to_str().unwrap(),
"--target",
"postgres",
"--skip-unsupported",
])
.output()
.unwrap();
assert!(output.status.success());
assert_snapshot!(String::from_utf8_lossy(&output.stdout), @r#"SELECT * FROM security_events WHERE "CommandLine" ILIKE '%whoami%'"#);
}
#[test]
fn convert_requires_pipeline_error() {
let rule = temp_file(".yml", SIMPLE_DETECTION);
rsigma()
.args([
"convert",
rule.path().to_str().unwrap(),
"--target",
"test_mandatory_pipeline",
])
.assert()
.failure()
.stderr(predicate::str::contains("requires a pipeline"));
}
#[test]
fn convert_invalid_target() {
let rule = temp_file(".yml", SIMPLE_DETECTION);
let output = rsigma()
.args([
"convert",
rule.path().to_str().unwrap(),
"--target",
"nonexistent_backend",
])
.output()
.unwrap();
assert!(!output.status.success());
assert_snapshot!(String::from_utf8_lossy(&output.stderr), @"
Unknown target: nonexistent_backend
Available targets: postgres, lynxdb, test
");
}
#[test]
fn convert_invalid_format() {
let rule = temp_file(".yml", SIMPLE_DETECTION);
let output = rsigma()
.args([
"convert",
rule.path().to_str().unwrap(),
"--target",
"postgres",
"--format",
"nonexistent_format",
])
.output()
.unwrap();
assert!(!output.status.success());
assert_snapshot!(String::from_utf8_lossy(&output.stderr), @"
Unknown format 'nonexistent_format' for backend 'postgres'
Available: default (Plain PostgreSQL SQL), view (CREATE OR REPLACE VIEW for each rule), timescaledb (TimescaleDB-optimized queries with time_bucket()), continuous_aggregate (CREATE MATERIALIZED VIEW ... WITH (timescaledb.continuous)), sliding_window (Correlation queries using window functions for per-row sliding detection)
");
}
#[test]
fn convert_correlation_rule() {
let rule = temp_file(".yml", CORRELATION_RULES);
let output = rsigma()
.args([
"convert",
rule.path().to_str().unwrap(),
"--target",
"postgres",
])
.output()
.unwrap();
assert!(output.status.success());
assert_snapshot!(String::from_utf8_lossy(&output.stdout), @r#"
SELECT * FROM security_events WHERE "EventType" = 'login_failure'
WITH combined_events AS (SELECT * FROM security_events WHERE "EventType" = 'login_failure') SELECT src_ip, COUNT(*) AS event_count FROM combined_events GROUP BY src_ip HAVING COUNT(*) >= 5
"#);
}
#[test]
fn convert_to_file_output() {
let rule = temp_file(".yml", SIMPLE_DETECTION);
let dir = TempDir::new().unwrap();
let out_path = dir.path().join("output.sql");
rsigma()
.args([
"convert",
rule.path().to_str().unwrap(),
"--target",
"postgres",
"--output",
out_path.to_str().unwrap(),
])
.assert()
.success();
let content = std::fs::read_to_string(&out_path).unwrap();
assert_snapshot!(content, @r#"SELECT * FROM security_events WHERE "CommandLine" ILIKE '%whoami%'"#);
}
#[test]
fn list_targets() {
let output = rsigma().args(["list-targets"]).output().unwrap();
assert!(output.status.success());
assert_snapshot!(String::from_utf8_lossy(&output.stdout), @"
Available conversion targets:
postgres - PostgreSQL/TimescaleDB (aliases: postgresql, pg)
lynxdb - LynxDB log analytics engine
test - Backend-neutral test backend
");
}
#[test]
fn list_formats_postgres() {
let output = rsigma()
.args(["list-formats", "postgres"])
.output()
.unwrap();
assert!(output.status.success());
assert_snapshot!(String::from_utf8_lossy(&output.stdout), @"
Available formats for 'postgres':
default - Plain PostgreSQL SQL
view - CREATE OR REPLACE VIEW for each rule
timescaledb - TimescaleDB-optimized queries with time_bucket()
continuous_aggregate - CREATE MATERIALIZED VIEW ... WITH (timescaledb.continuous)
sliding_window - Correlation queries using window functions for per-row sliding detection
");
}
#[test]
fn list_formats_invalid_target() {
let output = rsigma()
.args(["list-formats", "nonexistent"])
.output()
.unwrap();
assert!(!output.status.success());
assert_snapshot!(String::from_utf8_lossy(&output.stderr), @"
Unknown target: nonexistent
Available targets: postgres, lynxdb, test
");
}