#![allow(dead_code)]
use std::io::{BufRead, BufReader, Write};
use std::net::TcpStream;
use std::process::{Command as StdCommand, Stdio};
use std::time::{Duration, Instant};
use assert_cmd::Command;
use tempfile::NamedTempFile;
#[allow(deprecated)]
pub fn rsigma() -> Command {
Command::cargo_bin("rsigma").expect("binary not found")
}
pub fn rsigma_bin() -> String {
assert_cmd::cargo::cargo_bin("rsigma")
.to_str()
.unwrap()
.to_string()
}
pub fn temp_file(suffix: &str, contents: &str) -> NamedTempFile {
let mut f = tempfile::Builder::new().suffix(suffix).tempfile().unwrap();
f.write_all(contents.as_bytes()).unwrap();
f.flush().unwrap();
f
}
enum StartupEvent {
ApiAddr(String),
SinkStarted,
}
struct ChildGuard(Option<std::process::Child>);
impl ChildGuard {
fn as_child_mut(&mut self) -> &mut std::process::Child {
self.0.as_mut().expect("guard already disarmed")
}
fn disarm(mut self) -> std::process::Child {
self.0.take().expect("guard already disarmed")
}
}
impl Drop for ChildGuard {
fn drop(&mut self) {
if let Some(mut child) = self.0.take() {
let _ = child.kill();
let _ = child.wait();
}
}
}
pub struct DaemonProcess {
child: std::process::Child,
api_addr: String,
}
impl DaemonProcess {
pub fn spawn(args: &[&str]) -> Self {
let child = StdCommand::new(rsigma_bin())
.args(args)
.stdin(Stdio::null())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()
.expect("failed to spawn rsigma engine daemon");
let mut guard = ChildGuard(Some(child));
if let Some(stdout) = guard.as_child_mut().stdout.take() {
std::thread::spawn(move || {
let mut sink = std::io::sink();
let _ = std::io::copy(&mut BufReader::new(stdout), &mut sink);
});
}
let stderr = guard.as_child_mut().stderr.take().unwrap();
let (tx, rx) = std::sync::mpsc::channel::<StartupEvent>();
std::thread::spawn(move || {
for line in BufReader::new(stderr).lines() {
let Ok(line) = line else { return };
if line.contains("API server listening")
&& let Some(addr) = extract_addr(&line)
{
let _ = tx.send(StartupEvent::ApiAddr(addr));
}
if line.contains("Sink started") {
let _ = tx.send(StartupEvent::SinkStarted);
}
}
});
let mut api_addr = String::new();
let mut sink_started = false;
let handshake_deadline = Instant::now() + Duration::from_secs(10);
while !sink_started || api_addr.is_empty() {
let remaining = handshake_deadline
.checked_duration_since(Instant::now())
.unwrap_or(Duration::ZERO);
match rx.recv_timeout(remaining) {
Ok(StartupEvent::ApiAddr(addr)) => api_addr = addr,
Ok(StartupEvent::SinkStarted) => sink_started = true,
Err(_) => panic!(
"daemon did not finish startup within 10s (api_addr={api_addr:?}, sink_started={sink_started})"
),
}
}
let socket: std::net::SocketAddr = api_addr
.parse()
.unwrap_or_else(|e| panic!("invalid api_addr {api_addr:?}: {e}"));
let deadline = Instant::now() + Duration::from_secs(5);
loop {
if TcpStream::connect_timeout(&socket, Duration::from_millis(200)).is_ok() {
return Self {
child: guard.disarm(),
api_addr,
};
}
if Instant::now() >= deadline {
panic!("daemon API at {api_addr} never became reachable within 5s");
}
std::thread::sleep(Duration::from_millis(25));
}
}
pub fn spawn_http(rule_path: &str) -> Self {
Self::spawn(&[
"engine",
"daemon",
"-r",
rule_path,
"--input",
"http",
"--api-addr",
"127.0.0.1:0",
])
}
pub fn url(&self, path: &str) -> String {
format!("http://{}{path}", self.api_addr)
}
pub fn api_addr(&self) -> &str {
&self.api_addr
}
fn kill(&mut self) {
let _ = self.child.kill();
let _ = self.child.wait();
}
}
impl Drop for DaemonProcess {
fn drop(&mut self) {
self.kill();
}
}
fn extract_addr(line: &str) -> Option<String> {
serde_json::from_str::<serde_json::Value>(line)
.ok()
.and_then(|v| v["fields"]["addr"].as_str().map(|s| s.to_string()))
}
pub fn http_get(url: &str) -> (u16, String) {
let resp = ureq::get(url).call().expect("HTTP GET failed");
let status = resp.status().as_u16();
let body = resp.into_body().read_to_string().unwrap();
(status, body)
}
pub fn http_post(url: &str, body: &str) -> (u16, String) {
match ureq::post(url).send(body) {
Ok(resp) => {
let status = resp.status().as_u16();
let body = resp.into_body().read_to_string().unwrap();
(status, body)
}
Err(ureq::Error::StatusCode(code)) => (code, String::new()),
Err(e) => panic!("HTTP POST failed: {e}"),
}
}
pub fn poll_until<T>(deadline: Duration, mut check: impl FnMut() -> Option<T>) -> Option<T> {
let end = Instant::now() + deadline;
loop {
if let Some(v) = check() {
return Some(v);
}
if Instant::now() >= end {
return None;
}
std::thread::sleep(Duration::from_millis(50));
}
}
pub const SIMPLE_RULE: &str = r#"
title: Test Rule
id: 00000000-0000-0000-0000-000000000001
status: test
logsource:
category: test
product: test
detection:
selection:
CommandLine|contains: "malware"
condition: selection
level: high
"#;
pub const PIPELINE_YAML: &str = r#"
name: test-pipeline
priority: 10
transformations:
- type: field_name_mapping
mapping:
CommandLine: process.command_line
"#;