use std::fs::File;
use std::io::{self, BufRead, BufReader, Write};
use std::path::{Path, PathBuf};
use std::process;
use std::sync::Arc;
use clap::Args;
use rsigma_eval::{
CorrelationEngine, Engine, Event, FieldObserver, JsonEvent, Pipeline, RuleFieldSet,
};
use rsigma_parser::SigmaCollection;
use crate::EventFilter;
#[derive(Args, Debug)]
pub(crate) struct EvalArgs {
#[arg(short, long)]
pub rules: PathBuf,
#[arg(short, long)]
pub event: Option<String>,
#[arg(long)]
pub pretty: bool,
#[arg(short = 'p', long = "pipeline")]
pub pipelines: Vec<PathBuf>,
#[arg(long = "jq", conflicts_with = "jsonpath")]
pub jq: Option<String>,
#[arg(long = "jsonpath", conflicts_with = "jq")]
pub jsonpath: Option<String>,
#[arg(long = "suppress")]
pub suppress: Option<String>,
#[arg(long = "action", value_parser = ["alert", "reset"])]
pub action: Option<String>,
#[arg(long = "no-detections")]
pub no_detections: bool,
#[arg(long = "include-event")]
pub include_event: bool,
#[arg(long = "correlation-event-mode", default_value = "none")]
pub correlation_event_mode: String,
#[arg(long = "max-correlation-events", default_value = "10")]
pub max_correlation_events: usize,
#[arg(long = "timestamp-field")]
pub timestamp_fields: Vec<String>,
#[arg(long = "input-format", default_value = "auto")]
pub input_format: String,
#[arg(long = "syslog-tz", default_value = "+00:00")]
pub syslog_tz: String,
#[arg(long = "fail-on-detection")]
pub fail_on_detection: bool,
#[arg(long = "bloom-prefilter")]
pub bloom_prefilter: bool,
#[arg(long = "bloom-max-bytes")]
pub bloom_max_bytes: Option<usize>,
#[cfg(feature = "daachorse-index")]
#[arg(long = "cross-rule-ac")]
pub cross_rule_ac: bool,
#[arg(long = "observe-fields")]
pub observe_fields: bool,
#[arg(
long = "observe-fields-max-keys",
default_value_t = std::num::NonZeroUsize::new(10_000).unwrap(),
)]
pub observe_fields_max_keys: std::num::NonZeroUsize,
#[arg(
long = "observe-fields-report",
value_name = "PATH",
requires = "observe_fields"
)]
pub observe_fields_report: Option<PathBuf>,
}
enum EventSource {
SingleJson(String),
NdjsonFile(PathBuf),
#[cfg(feature = "evtx")]
EvtxFile(PathBuf),
Stdin,
}
fn resolve_event_source(event_json: Option<String>) -> EventSource {
match event_json {
Some(s) if s.starts_with('@') => {
let path = PathBuf::from(&s[1..]);
if !path.exists() {
eprintln!("Event file not found: {}", path.display());
process::exit(crate::exit_code::RULE_ERROR);
}
#[cfg(feature = "evtx")]
if path
.extension()
.is_some_and(|ext| ext.eq_ignore_ascii_case("evtx"))
{
return EventSource::EvtxFile(path);
}
EventSource::NdjsonFile(path)
}
Some(s) => EventSource::SingleJson(s),
None => EventSource::Stdin,
}
}
pub(crate) fn cmd_eval(args: EvalArgs) -> bool {
let EvalArgs {
rules: rules_path,
event: event_json,
pretty,
pipelines: pipeline_paths,
jq,
jsonpath,
suppress,
action,
no_detections,
include_event,
correlation_event_mode,
max_correlation_events,
timestamp_fields,
input_format,
syslog_tz,
fail_on_detection: _,
bloom_prefilter,
bloom_max_bytes,
#[cfg(feature = "daachorse-index")]
cross_rule_ac,
observe_fields,
observe_fields_max_keys,
observe_fields_report,
} = args;
let collection = crate::load_collection(&rules_path);
let pipelines = crate::load_pipelines(&pipeline_paths);
if pipelines.iter().any(|p| p.is_dynamic()) {
eprintln!(
" note: dynamic sources are not resolved by `rsigma engine eval`. \
Use `rsigma pipeline resolve` to inspect sources or `rsigma engine daemon` to evaluate \
events with dynamic pipelines."
);
}
let has_correlations = !collection.correlations.is_empty();
let event_filter = crate::build_event_filter(jq, jsonpath);
let event_source = resolve_event_source(event_json);
let corr_config = crate::build_correlation_config(
suppress,
action,
no_detections,
correlation_event_mode,
max_correlation_events,
timestamp_fields,
"wallclock",
);
let observe_ctx: Option<ObserveContext> = if observe_fields {
Some(ObserveContext {
observer: Arc::new(FieldObserver::new(observe_fields_max_keys.get())),
rule_field_set: RuleFieldSet::collect(&collection, &pipelines, true),
report_path: observe_fields_report,
})
} else {
None
};
let observe_ref = observe_ctx.as_ref();
let had_matches = if has_correlations {
cmd_eval_with_correlations(
collection,
&rules_path,
event_source,
pretty,
&pipelines,
&event_filter,
corr_config,
include_event,
&input_format,
&syslog_tz,
bloom_prefilter,
bloom_max_bytes,
#[cfg(feature = "daachorse-index")]
cross_rule_ac,
observe_ref,
)
} else {
cmd_eval_detection_only(
collection,
&rules_path,
event_source,
pretty,
&pipelines,
&event_filter,
include_event,
&input_format,
&syslog_tz,
bloom_prefilter,
bloom_max_bytes,
#[cfg(feature = "daachorse-index")]
cross_rule_ac,
observe_ref,
)
};
if let Some(ctx) = observe_ref {
render_field_report(ctx);
}
had_matches
}
#[allow(clippy::too_many_arguments)]
fn cmd_eval_with_correlations(
collection: SigmaCollection,
rules_path: &std::path::Path,
event_source: EventSource,
pretty: bool,
pipelines: &[Pipeline],
event_filter: &EventFilter,
config: rsigma_eval::CorrelationConfig,
include_event: bool,
input_format_str: &str,
syslog_tz_str: &str,
bloom_prefilter: bool,
bloom_max_bytes: Option<usize>,
#[cfg(feature = "daachorse-index")] cross_rule_ac: bool,
observe: Option<&ObserveContext>,
) -> bool {
let mut engine = CorrelationEngine::new(config);
engine.set_include_event(include_event);
if let Some(budget) = bloom_max_bytes {
engine.set_bloom_max_bytes(budget);
}
engine.set_bloom_prefilter(bloom_prefilter);
#[cfg(feature = "daachorse-index")]
engine.set_cross_rule_ac(cross_rule_ac);
for p in pipelines {
engine.add_pipeline(p.clone());
}
if let Err(e) = engine.add_collection(&collection) {
eprintln!("Error compiling rules: {e}");
process::exit(crate::exit_code::RULE_ERROR);
}
eprintln!(
"Loaded {} detection rules + {} correlation rules from {}",
engine.detection_rule_count(),
engine.correlation_rule_count(),
rules_path.display(),
);
tracing::info!(
detection_rules = engine.detection_rule_count(),
correlation_rules = engine.correlation_rule_count(),
rules_path = %rules_path.display(),
"Rules loaded",
);
match event_source {
EventSource::SingleJson(json_str) => {
let value: serde_json::Value = match serde_json::from_str(&json_str) {
Ok(v) => v,
Err(e) => {
eprintln!("Invalid JSON event: {e}");
process::exit(crate::exit_code::RULE_ERROR);
}
};
let mut had_matches = false;
for payload in crate::apply_event_filter(&value, event_filter) {
let event = JsonEvent::borrow(&payload);
observe_event(observe, &event);
let result = engine.process_event(&event);
if result.is_empty() {
eprintln!("No matches.");
} else {
had_matches = true;
for m in &result {
crate::print_json(m, pretty);
}
}
}
had_matches
}
EventSource::NdjsonFile(path) => {
let file = File::open(&path).unwrap_or_else(|e| {
eprintln!("Error opening event file '{}': {e}", path.display());
process::exit(crate::exit_code::RULE_ERROR);
});
let reader = BufReader::new(file);
let (det_count, corr_count, line_num) = eval_stream_corr(
&mut engine,
reader,
event_filter,
pretty,
input_format_str,
syslog_tz_str,
observe,
);
eprintln!(
"Processed {line_num} events, {det_count} detection matches, {corr_count} correlation matches."
);
det_count > 0 || corr_count > 0
}
#[cfg(feature = "evtx")]
EventSource::EvtxFile(path) => {
let (det_count, corr_count, rec_count) =
eval_evtx_corr(&mut engine, &path, event_filter, pretty, observe);
eprintln!(
"Processed {rec_count} EVTX records, {det_count} detection matches, {corr_count} correlation matches."
);
det_count > 0 || corr_count > 0
}
EventSource::Stdin => {
let stdin = io::stdin();
let (det_count, corr_count, line_num) = eval_stream_corr(
&mut engine,
stdin.lock(),
event_filter,
pretty,
input_format_str,
syslog_tz_str,
observe,
);
eprintln!(
"Processed {line_num} events, {det_count} detection matches, {corr_count} correlation matches."
);
det_count > 0 || corr_count > 0
}
}
}
#[allow(clippy::too_many_arguments)]
fn eval_stream_corr(
engine: &mut CorrelationEngine,
reader: impl BufRead,
event_filter: &EventFilter,
pretty: bool,
input_format_str: &str,
syslog_tz_str: &str,
observe: Option<&ObserveContext>,
) -> (u64, u64, u64) {
let mut line_num = 0u64;
let mut det_count = 0u64;
let mut corr_count = 0u64;
#[cfg(feature = "daemon")]
let format = crate::commands::parse_input_format(input_format_str, syslog_tz_str);
#[cfg(not(feature = "daemon"))]
let _ = (input_format_str, syslog_tz_str);
for line in reader.lines() {
line_num += 1;
let line = match line {
Ok(l) => l,
Err(e) => {
eprintln!("Error reading line {line_num}: {e}");
continue;
}
};
if line.trim().is_empty() {
continue;
}
#[cfg(feature = "daemon")]
{
eval_line_corr(
engine,
&line,
&format,
event_filter,
pretty,
&mut det_count,
&mut corr_count,
observe,
);
}
#[cfg(not(feature = "daemon"))]
{
eval_line_corr_json(
engine,
&line,
event_filter,
pretty,
&mut det_count,
&mut corr_count,
observe,
);
}
}
(det_count, corr_count, line_num)
}
#[cfg(feature = "daemon")]
#[allow(clippy::too_many_arguments)]
fn eval_line_corr(
engine: &mut CorrelationEngine,
line: &str,
format: &rsigma_runtime::InputFormat,
event_filter: &EventFilter,
pretty: bool,
det_count: &mut u64,
corr_count: &mut u64,
observe: Option<&ObserveContext>,
) {
if let Some(decoded) = rsigma_runtime::parse_line(line, format) {
if matches!(decoded, rsigma_runtime::EventInputDecoded::Json(_)) {
let json_value = decoded.to_json();
for payload in crate::apply_event_filter(&json_value, event_filter) {
let event = JsonEvent::borrow(&payload);
observe_event(observe, &event);
let result = engine.process_event(&event);
for m in &result {
if m.is_detection() {
*det_count += 1;
} else {
*corr_count += 1;
}
crate::print_json(m, pretty);
}
}
} else {
observe_event(observe, &decoded);
let result = engine.process_event(&decoded);
for m in &result {
if m.is_detection() {
*det_count += 1;
} else {
*corr_count += 1;
}
crate::print_json(m, pretty);
}
}
}
}
#[cfg(not(feature = "daemon"))]
#[allow(clippy::too_many_arguments)]
fn eval_line_corr_json(
engine: &mut CorrelationEngine,
line: &str,
event_filter: &EventFilter,
pretty: bool,
det_count: &mut u64,
corr_count: &mut u64,
observe: Option<&ObserveContext>,
) {
let value: serde_json::Value = match serde_json::from_str(line) {
Ok(v) => v,
Err(e) => {
eprintln!("Invalid JSON: {e}");
return;
}
};
for payload in crate::apply_event_filter(&value, event_filter) {
let event = JsonEvent::borrow(&payload);
observe_event(observe, &event);
let result = engine.process_event(&event);
for m in &result {
if m.is_detection() {
*det_count += 1;
} else {
*corr_count += 1;
}
crate::print_json(m, pretty);
}
}
}
#[allow(clippy::too_many_arguments)]
fn cmd_eval_detection_only(
collection: SigmaCollection,
rules_path: &std::path::Path,
event_source: EventSource,
pretty: bool,
pipelines: &[Pipeline],
event_filter: &EventFilter,
include_event: bool,
input_format_str: &str,
syslog_tz_str: &str,
bloom_prefilter: bool,
bloom_max_bytes: Option<usize>,
#[cfg(feature = "daachorse-index")] cross_rule_ac: bool,
observe: Option<&ObserveContext>,
) -> bool {
let mut engine = Engine::new();
engine.set_include_event(include_event);
if let Some(budget) = bloom_max_bytes {
engine.set_bloom_max_bytes(budget);
}
engine.set_bloom_prefilter(bloom_prefilter);
#[cfg(feature = "daachorse-index")]
engine.set_cross_rule_ac(cross_rule_ac);
for p in pipelines {
engine.add_pipeline(p.clone());
}
if let Err(e) = engine.add_collection(&collection) {
eprintln!("Error compiling rules: {e}");
process::exit(crate::exit_code::RULE_ERROR);
}
eprintln!(
"Loaded {} rules from {}",
engine.rule_count(),
rules_path.display()
);
tracing::info!(
detection_rules = engine.rule_count(),
correlation_rules = 0,
rules_path = %rules_path.display(),
"Rules loaded",
);
match event_source {
EventSource::SingleJson(json_str) => {
let value: serde_json::Value = match serde_json::from_str(&json_str) {
Ok(v) => v,
Err(e) => {
eprintln!("Invalid JSON event: {e}");
process::exit(crate::exit_code::RULE_ERROR);
}
};
let mut had_matches = false;
let payloads = crate::apply_event_filter(&value, event_filter);
if payloads.is_empty() {
eprintln!("No matches.");
} else {
for payload in &payloads {
let event = JsonEvent::borrow(payload);
observe_event(observe, &event);
let matches = engine.evaluate(&event);
if matches.is_empty() {
eprintln!("No matches.");
} else {
had_matches = true;
for m in &matches {
crate::print_json(m, pretty);
}
}
}
}
had_matches
}
EventSource::NdjsonFile(path) => {
let file = File::open(&path).unwrap_or_else(|e| {
eprintln!("Error opening event file '{}': {e}", path.display());
process::exit(crate::exit_code::RULE_ERROR);
});
let reader = BufReader::new(file);
let (match_count, line_num) = eval_stream_detect(
&engine,
reader,
event_filter,
pretty,
input_format_str,
syslog_tz_str,
observe,
);
eprintln!("Processed {line_num} events, {match_count} matches.");
match_count > 0
}
#[cfg(feature = "evtx")]
EventSource::EvtxFile(path) => {
let (match_count, rec_count) =
eval_evtx_detect(&engine, &path, event_filter, pretty, observe);
eprintln!("Processed {rec_count} EVTX records, {match_count} matches.");
match_count > 0
}
EventSource::Stdin => {
let stdin = io::stdin();
let (match_count, line_num) = eval_stream_detect(
&engine,
stdin.lock(),
event_filter,
pretty,
input_format_str,
syslog_tz_str,
observe,
);
eprintln!("Processed {line_num} events, {match_count} matches.");
match_count > 0
}
}
}
#[allow(clippy::too_many_arguments)]
fn eval_stream_detect(
engine: &Engine,
reader: impl BufRead,
event_filter: &EventFilter,
pretty: bool,
input_format_str: &str,
syslog_tz_str: &str,
observe: Option<&ObserveContext>,
) -> (u64, u64) {
let mut line_num = 0u64;
let mut match_count = 0u64;
#[cfg(feature = "daemon")]
let format = crate::commands::parse_input_format(input_format_str, syslog_tz_str);
#[cfg(not(feature = "daemon"))]
let _ = (input_format_str, syslog_tz_str);
for line in reader.lines() {
line_num += 1;
let line = match line {
Ok(l) => l,
Err(e) => {
eprintln!("Error reading line {line_num}: {e}");
continue;
}
};
if line.trim().is_empty() {
continue;
}
#[cfg(feature = "daemon")]
{
eval_line_detect(
engine,
&line,
&format,
event_filter,
pretty,
&mut match_count,
observe,
);
}
#[cfg(not(feature = "daemon"))]
{
eval_line_detect_json(
engine,
&line,
event_filter,
pretty,
&mut match_count,
observe,
);
}
}
(match_count, line_num)
}
#[cfg(feature = "daemon")]
#[allow(clippy::too_many_arguments)]
fn eval_line_detect(
engine: &Engine,
line: &str,
format: &rsigma_runtime::InputFormat,
event_filter: &EventFilter,
pretty: bool,
match_count: &mut u64,
observe: Option<&ObserveContext>,
) {
if let Some(decoded) = rsigma_runtime::parse_line(line, format) {
if matches!(decoded, rsigma_runtime::EventInputDecoded::Json(_)) {
let json_value = decoded.to_json();
for payload in crate::apply_event_filter(&json_value, event_filter) {
let event = JsonEvent::borrow(&payload);
observe_event(observe, &event);
for m in &engine.evaluate(&event) {
*match_count += 1;
crate::print_json(m, pretty);
}
}
} else {
observe_event(observe, &decoded);
for m in &engine.evaluate(&decoded) {
*match_count += 1;
crate::print_json(m, pretty);
}
}
}
}
#[cfg(not(feature = "daemon"))]
fn eval_line_detect_json(
engine: &Engine,
line: &str,
event_filter: &EventFilter,
pretty: bool,
match_count: &mut u64,
observe: Option<&ObserveContext>,
) {
let value: serde_json::Value = match serde_json::from_str(line) {
Ok(v) => v,
Err(e) => {
eprintln!("Invalid JSON: {e}");
return;
}
};
for payload in crate::apply_event_filter(&value, event_filter) {
let event = JsonEvent::borrow(&payload);
observe_event(observe, &event);
for m in &engine.evaluate(&event) {
*match_count += 1;
crate::print_json(m, pretty);
}
}
}
#[cfg(feature = "evtx")]
fn eval_evtx_corr(
engine: &mut CorrelationEngine,
path: &std::path::Path,
event_filter: &EventFilter,
pretty: bool,
observe: Option<&ObserveContext>,
) -> (u64, u64, u64) {
let mut reader = rsigma_runtime::EvtxFileReader::open(path).unwrap_or_else(|e| {
eprintln!("Error opening EVTX file '{}': {e}", path.display());
process::exit(crate::exit_code::RULE_ERROR);
});
let mut rec_count = 0u64;
let mut det_count = 0u64;
let mut corr_count = 0u64;
for record in reader.records() {
rec_count += 1;
let value = match record {
Ok(v) => v,
Err(e) => {
eprintln!("Error reading EVTX record {rec_count}: {e}");
continue;
}
};
for payload in crate::apply_event_filter(&value, event_filter) {
let event = JsonEvent::borrow(&payload);
observe_event(observe, &event);
let result = engine.process_event(&event);
for m in &result {
if m.is_detection() {
det_count += 1;
} else {
corr_count += 1;
}
crate::print_json(m, pretty);
}
}
}
(det_count, corr_count, rec_count)
}
#[cfg(feature = "evtx")]
fn eval_evtx_detect(
engine: &Engine,
path: &std::path::Path,
event_filter: &EventFilter,
pretty: bool,
observe: Option<&ObserveContext>,
) -> (u64, u64) {
let mut reader = rsigma_runtime::EvtxFileReader::open(path).unwrap_or_else(|e| {
eprintln!("Error opening EVTX file '{}': {e}", path.display());
process::exit(crate::exit_code::RULE_ERROR);
});
let mut rec_count = 0u64;
let mut match_count = 0u64;
for record in reader.records() {
rec_count += 1;
let value = match record {
Ok(v) => v,
Err(e) => {
eprintln!("Error reading EVTX record {rec_count}: {e}");
continue;
}
};
for payload in crate::apply_event_filter(&value, event_filter) {
let event = JsonEvent::borrow(&payload);
observe_event(observe, &event);
for m in &engine.evaluate(&event) {
match_count += 1;
crate::print_json(m, pretty);
}
}
}
(match_count, rec_count)
}
struct ObserveContext {
observer: Arc<FieldObserver>,
rule_field_set: RuleFieldSet,
report_path: Option<PathBuf>,
}
#[inline]
fn observe_event<E: Event + ?Sized>(ctx: Option<&ObserveContext>, event: &E) {
if let Some(ctx) = ctx {
ctx.observer.observe(event);
}
}
const EVAL_MISSING_RULE_TITLES_CAP: usize = 10;
fn render_field_report(ctx: &ObserveContext) {
let snapshot = ctx.observer.snapshot();
let coverage = snapshot.coverage(&ctx.rule_field_set);
let unknown_entries: Vec<serde_json::Value> = coverage
.unknown
.iter()
.map(|e| {
let field: &str = &e.field;
serde_json::json!({ "field": field, "count": e.count })
})
.collect();
let missing_entries: Vec<serde_json::Value> = coverage
.missing
.iter()
.map(|(name, origin)| {
let total = origin.rule_titles.len();
let truncated = total > EVAL_MISSING_RULE_TITLES_CAP;
let rule_titles: Vec<&str> = origin
.rule_titles
.iter()
.map(String::as_str)
.take(EVAL_MISSING_RULE_TITLES_CAP)
.collect();
let sources: Vec<&str> = origin.sources.iter().map(|s| s.as_str()).collect();
serde_json::json!({
"field": name,
"rule_count": total,
"sources": sources,
"rule_titles": rule_titles,
"truncated": truncated,
})
})
.collect();
let report = serde_json::json!({
"summary": {
"events_observed": snapshot.events_observed,
"unique_keys_observed": snapshot.unique_keys,
"rule_fields_loaded": ctx.rule_field_set.len(),
"overflow_dropped": snapshot.overflow_dropped,
"max_keys": snapshot.max_keys,
"uptime_seconds": snapshot.uptime_seconds,
"intersection_count": coverage.intersection_count,
"unknown_count": unknown_entries.len(),
"missing_count": missing_entries.len(),
},
"unknown": unknown_entries,
"missing": missing_entries,
});
let serialized = serde_json::to_string_pretty(&report).unwrap_or_else(|_| report.to_string());
match ctx.report_path.as_deref() {
Some(path) => {
if let Err(e) = write_report_to_file(path, &serialized) {
eprintln!(
"Failed to write field observation report to {}: {e}",
path.display()
);
}
}
None => {
let _ = writeln!(io::stderr(), "{serialized}");
}
}
}
fn write_report_to_file(path: &Path, serialized: &str) -> std::io::Result<()> {
let mut file = File::create(path)?;
file.write_all(serialized.as_bytes())?;
file.write_all(b"\n")?;
file.flush()
}