use std::fs::File;
use std::io::{self, BufReader, Write};
use std::path::{Path, PathBuf};
use std::process;
use std::sync::Arc;
use clap::parser::ValueSource;
use clap::{ArgMatches, Args};
use rsigma_eval::{
CorrelationConfig, CorrelationEngine, Engine, EvaluationResult, FieldObserver, JsonEvent,
LogSourceExtractor, MatchDetailLevel, OnUnknown, Pipeline, ResultBody, RouteOutcome,
RoutingPlan, RuleFieldSet, SchemaClassifier, SchemaRouter, load_schema_config,
};
use rsigma_parser::SigmaCollection;
#[cfg(feature = "evtx")]
use super::eval_stream::stream_evtx_events;
use super::eval_stream::{
CorrelationProcessor, DetectionProcessor, RoutingProcessor, stream_events,
};
use crate::EventFilter;
use crate::config;
use crate::exit_code;
use crate::output::{DelimitedWriter, OutputCtx, OutputFormat, Tabular};
#[derive(Args, Debug)]
pub(crate) struct EvalArgs {
#[arg(long = "config", value_name = "PATH")]
pub config: Option<PathBuf>,
#[arg(long = "dry-run")]
pub dry_run: bool,
#[arg(short, long)]
pub rules: Option<PathBuf>,
#[arg(short, long)]
pub event: Option<String>,
#[arg(long)]
pub pretty: bool,
#[arg(short = 'p', long = "pipeline")]
pub pipelines: Vec<PathBuf>,
#[arg(long = "jq", conflicts_with = "jsonpath")]
pub jq: Option<String>,
#[arg(long = "jsonpath", conflicts_with = "jq")]
pub jsonpath: Option<String>,
#[arg(long = "suppress")]
pub suppress: Option<String>,
#[arg(long = "action", value_parser = ["alert", "reset"])]
pub action: Option<String>,
#[arg(long = "no-detections")]
pub no_detections: bool,
#[arg(long = "include-event")]
pub include_event: bool,
#[arg(long = "match-detail", value_parser = ["off", "summary", "full"], default_value = "off")]
pub match_detail: String,
#[arg(long = "correlation-event-mode", default_value = "none")]
pub correlation_event_mode: String,
#[arg(long = "max-correlation-events", default_value = "10")]
pub max_correlation_events: usize,
#[arg(long = "max-state-entries", default_value_t = crate::config::defaults::MAX_STATE_ENTRIES, value_parser = clap::value_parser!(usize))]
pub max_state_entries: usize,
#[arg(long = "max-group-entries")]
pub max_group_entries: Option<usize>,
#[arg(long = "timestamp-field")]
pub timestamp_fields: Vec<String>,
#[arg(long = "input-format", default_value = "auto")]
pub input_format: String,
#[arg(long = "syslog-tz", default_value = "+00:00")]
pub syslog_tz: String,
#[arg(long = "syslog-strip-bom", default_value_t = true, action = clap::ArgAction::Set)]
pub syslog_strip_bom: bool,
#[arg(long = "fail-on-detection")]
pub fail_on_detection: bool,
#[arg(long = "bloom-prefilter")]
pub bloom_prefilter: bool,
#[arg(long = "bloom-max-bytes")]
pub bloom_max_bytes: Option<usize>,
#[cfg(feature = "daachorse-index")]
#[arg(long = "cross-rule-ac")]
pub cross_rule_ac: bool,
#[arg(long = "observe-fields")]
pub observe_fields: bool,
#[arg(
long = "observe-fields-max-keys",
default_value_t = std::num::NonZeroUsize::new(10_000).unwrap(),
)]
pub observe_fields_max_keys: std::num::NonZeroUsize,
#[arg(
long = "observe-fields-report",
value_name = "PATH",
requires = "observe_fields"
)]
pub observe_fields_report: Option<PathBuf>,
#[arg(long = "schema-routing")]
pub schema_routing: bool,
#[arg(long = "schema-config", value_name = "PATH")]
pub schema_config: Option<PathBuf>,
#[arg(long = "on-unknown", value_name = "POLICY")]
pub on_unknown: Option<String>,
#[arg(long = "logsource-routing")]
pub logsource_routing: bool,
#[arg(long = "logsource-field-map", value_name = "MAP")]
pub logsource_field_map: Option<String>,
#[arg(long = "event-logsource", value_name = "LOGSOURCE")]
pub event_logsource: Option<String>,
#[arg(long = "dump-correlation-state")]
pub dump_correlation_state: bool,
}
enum EventSource {
SingleJson(String),
NdjsonFile(PathBuf),
#[cfg(feature = "evtx")]
EvtxFile(PathBuf),
Stdin,
}
fn resolve_event_source(event_json: Option<String>) -> EventSource {
match event_json {
Some(s) if s.starts_with('@') => {
let path = PathBuf::from(&s[1..]);
if !path.exists() {
eprintln!("Event file not found: {}", path.display());
process::exit(crate::exit_code::RULE_ERROR);
}
#[cfg(feature = "evtx")]
if path
.extension()
.is_some_and(|ext| ext.eq_ignore_ascii_case("evtx"))
{
return EventSource::EvtxFile(path);
}
EventSource::NdjsonFile(path)
}
Some(s) => EventSource::SingleJson(s),
None => EventSource::Stdin,
}
}
pub(crate) fn apply_eval_config(args: &mut EvalArgs, matches: &ArgMatches) {
let base = config::load_and_merge(args.config.as_deref());
if args.dry_run {
config::print_dry_run("eval", &base);
process::exit(exit_code::SUCCESS);
}
overlay_eval_config(args, matches, base);
}
fn overlay_eval_config(
args: &mut EvalArgs,
matches: &ArgMatches,
base: config::RsigmaConfigPartial,
) {
let explicit = |id: &str| {
matches!(
matches.value_source(id),
Some(ValueSource::CommandLine | ValueSource::EnvVariable)
)
};
if let Some(eval) = base.eval {
if !explicit("rules")
&& let Some(v) = eval.rules
{
args.rules = Some(v);
}
if !explicit("pipelines")
&& let Some(v) = eval.pipelines
{
args.pipelines = v;
}
if !explicit("input_format")
&& let Some(v) = eval.input_format
{
args.input_format = v;
}
if !explicit("syslog_tz")
&& let Some(v) = eval.syslog_tz
{
args.syslog_tz = v;
}
if !explicit("syslog_strip_bom")
&& let Some(v) = eval.syslog_strip_bom
{
args.syslog_strip_bom = v;
}
if !explicit("fail_on_detection")
&& let Some(v) = eval.fail_on_detection
{
args.fail_on_detection = v;
}
if let Some(schema) = eval.schema {
if !explicit("schema_routing")
&& let Some(v) = schema.routing
{
args.schema_routing = v;
}
if !explicit("schema_config")
&& let Some(v) = schema.config
{
args.schema_config = Some(v);
}
if !explicit("on_unknown")
&& let Some(v) = schema.on_unknown
{
args.on_unknown = Some(v);
}
}
if let Some(ls) = eval.logsource_routing {
if !explicit("logsource_routing")
&& let Some(v) = ls.enabled
{
args.logsource_routing = v;
}
if !explicit("logsource_field_map")
&& let Some(fm) = ls.field_map
&& let Some(s) = crate::logsource_opts::dims_to_kv(
fm.product.as_deref(),
fm.service.as_deref(),
fm.category.as_deref(),
)
{
args.logsource_field_map = Some(s);
}
if !explicit("event_logsource")
&& let Some(el) = ls.event_logsource
&& let Some(s) = crate::logsource_opts::dims_to_kv(
el.product.as_deref(),
el.service.as_deref(),
el.category.as_deref(),
)
{
args.event_logsource = Some(s);
}
}
}
}
pub(crate) fn cmd_eval(args: EvalArgs, ctx: OutputCtx) -> bool {
let EvalArgs {
config: _,
dry_run: _,
rules: rules_opt,
event: event_json,
pretty,
pipelines: pipeline_paths,
jq,
jsonpath,
suppress,
action,
no_detections,
include_event,
match_detail,
correlation_event_mode,
max_correlation_events,
max_state_entries,
max_group_entries,
timestamp_fields,
input_format,
syslog_tz,
syslog_strip_bom,
fail_on_detection: _,
bloom_prefilter,
bloom_max_bytes,
#[cfg(feature = "daachorse-index")]
cross_rule_ac,
observe_fields,
observe_fields_max_keys,
observe_fields_report,
schema_routing,
schema_config,
on_unknown,
logsource_routing,
logsource_field_map,
event_logsource,
dump_correlation_state,
} = args;
let rules_path = rules_opt.unwrap_or_else(|| {
eprintln!("error: no rules path; set --rules or eval.rules in the config file");
process::exit(exit_code::CONFIG_ERROR);
});
let collection = crate::load_collection(&rules_path);
let pipelines = crate::load_pipelines(&pipeline_paths);
if pipelines.iter().any(|p| p.is_dynamic()) && ctx.show_progress() {
eprintln!(
" note: dynamic sources are not resolved by `rsigma engine eval`. \
Use `rsigma pipeline resolve` to inspect sources or `rsigma engine daemon` to evaluate \
events with dynamic pipelines."
);
}
let has_correlations = !collection.correlations.is_empty();
if dump_correlation_state && (schema_routing || !has_correlations) {
eprintln!(
"warning: --dump-correlation-state needs correlation rules and is not supported with \
--schema-routing; ignoring"
);
}
let match_detail = match_detail
.parse::<MatchDetailLevel>()
.unwrap_or(MatchDetailLevel::Off);
let event_filter = crate::build_event_filter(jq, jsonpath);
let event_source = resolve_event_source(event_json);
let evtx_input = {
#[cfg(feature = "evtx")]
{
matches!(event_source, EventSource::EvtxFile(_))
}
#[cfg(not(feature = "evtx"))]
{
false
}
};
let logsource_extractor = match crate::logsource_opts::build_logsource_extractor(
logsource_routing,
logsource_field_map.as_deref(),
event_logsource.as_deref(),
evtx_input,
) {
Ok(extractor) => extractor,
Err(e) => {
eprintln!("error: {e}");
process::exit(exit_code::CONFIG_ERROR);
}
};
let corr_config = crate::build_correlation_config(
suppress,
action,
no_detections,
correlation_event_mode,
max_correlation_events,
max_state_entries,
max_group_entries,
timestamp_fields,
"wallclock",
);
let observe_ctx: Option<ObserveContext> = if observe_fields {
Some(ObserveContext {
observer: Arc::new(FieldObserver::new(observe_fields_max_keys.get())),
rule_field_set: RuleFieldSet::collect(&collection, &pipelines, true),
report_path: observe_fields_report,
})
} else {
None
};
let observe_ref = observe_ctx.as_ref();
let mut renderer = MatchRenderer::new(ctx, pretty);
if schema_routing {
let mut router = build_schema_router(
&collection,
schema_config.as_deref(),
on_unknown.as_deref(),
corr_config,
include_event,
match_detail,
logsource_extractor,
);
let had_matches = cmd_eval_routed(
event_source,
&mut renderer,
&mut router,
&event_filter,
&input_format,
&syslog_tz,
syslog_strip_bom,
observe_ref,
);
renderer.flush();
if let Some(octx) = observe_ref {
render_field_report(octx);
}
return had_matches;
}
let had_matches = if has_correlations {
cmd_eval_with_correlations(
collection,
&rules_path,
event_source,
&mut renderer,
&pipelines,
&event_filter,
corr_config,
include_event,
match_detail,
&input_format,
&syslog_tz,
syslog_strip_bom,
bloom_prefilter,
bloom_max_bytes,
#[cfg(feature = "daachorse-index")]
cross_rule_ac,
logsource_extractor,
observe_ref,
dump_correlation_state,
)
} else {
cmd_eval_detection_only(
collection,
&rules_path,
event_source,
&mut renderer,
&pipelines,
&event_filter,
include_event,
match_detail,
&input_format,
&syslog_tz,
syslog_strip_bom,
bloom_prefilter,
bloom_max_bytes,
#[cfg(feature = "daachorse-index")]
cross_rule_ac,
logsource_extractor,
observe_ref,
)
};
renderer.flush();
if let Some(octx) = observe_ref {
render_field_report(octx);
}
had_matches
}
fn parse_on_unknown(s: &str) -> OnUnknown {
match s.to_ascii_lowercase().as_str() {
"warn" => OnUnknown::Warn,
"drop" => OnUnknown::Drop,
"passthrough" => OnUnknown::Passthrough,
"error" => OnUnknown::Error,
other => {
eprintln!(
"Invalid --on-unknown policy '{other}' (expected warn, drop, passthrough, or error)"
);
process::exit(exit_code::CONFIG_ERROR);
}
}
}
fn build_schema_router(
collection: &SigmaCollection,
schema_config: Option<&Path>,
on_unknown_override: Option<&str>,
corr_config: CorrelationConfig,
include_event: bool,
match_detail: MatchDetailLevel,
logsource_extractor: Option<LogSourceExtractor>,
) -> SchemaRouter {
let (signatures, routing) = match schema_config {
Some(path) => match load_schema_config(path) {
Ok(v) => v,
Err(e) => {
eprintln!("Error loading schema config: {e}");
process::exit(exit_code::CONFIG_ERROR);
}
},
None => (Vec::new(), None),
};
let classifier = if signatures.is_empty() {
SchemaClassifier::builtin()
} else {
SchemaClassifier::with_user_signatures(signatures)
};
let mut routing = routing.unwrap_or_default();
if let Some(policy) = on_unknown_override {
routing.on_unknown = parse_on_unknown(policy);
}
if routing.bindings.is_empty() {
eprintln!(
" note: --schema-routing is on but no routing bindings are configured; \
every event routes to the default pipeline-set. Add a routing section to --schema-config."
);
}
let plan = RoutingPlan::from_config(&routing);
let pipeline_sets: Vec<Vec<Pipeline>> = plan
.pipeline_sets()
.iter()
.map(|names| {
let paths: Vec<PathBuf> = names.iter().map(PathBuf::from).collect();
crate::load_pipelines(&paths)
})
.collect();
match SchemaRouter::build(
collection,
classifier,
plan,
pipeline_sets,
corr_config,
include_event,
match_detail,
logsource_extractor,
) {
Ok(r) => r,
Err(e) => {
eprintln!("Error building schema router: {e}");
process::exit(exit_code::RULE_ERROR);
}
}
}
#[allow(clippy::too_many_arguments)]
fn cmd_eval_routed(
event_source: EventSource,
renderer: &mut MatchRenderer,
router: &mut SchemaRouter,
event_filter: &EventFilter,
input_format_str: &str,
syslog_tz_str: &str,
syslog_strip_bom: bool,
observe: Option<&ObserveContext>,
) -> bool {
let mut det_count = 0u64;
let mut corr_count = 0u64;
let mut unknown = 0u64;
let mut dropped = 0u64;
let events: u64;
match event_source {
EventSource::SingleJson(json_str) => {
events = 1;
let value: serde_json::Value = match serde_json::from_str(&json_str) {
Ok(v) => v,
Err(e) => {
eprintln!("Invalid JSON event: {e}");
process::exit(crate::exit_code::RULE_ERROR);
}
};
for payload in crate::apply_event_filter(&value, event_filter) {
let event = JsonEvent::borrow(&payload);
if let Some(octx) = observe {
octx.observer.observe(&event);
}
let routed = router.route(&event);
match routed.outcome {
RouteOutcome::EvaluatedUnknown => unknown += 1,
RouteOutcome::Dropped | RouteOutcome::Errored => dropped += 1,
RouteOutcome::Evaluated => {}
}
for m in &routed.results {
if m.is_correlation() {
corr_count += 1;
} else {
det_count += 1;
}
renderer.emit(m);
}
}
}
EventSource::NdjsonFile(path) => {
let file = File::open(&path).unwrap_or_else(|e| {
eprintln!("Error opening event file '{}': {e}", path.display());
process::exit(crate::exit_code::RULE_ERROR);
});
let observer = observe.map(|c| c.observer.as_ref());
let mut processor = RoutingProcessor::new(router);
events = stream_events(
BufReader::new(file),
event_filter,
input_format_str,
syslog_tz_str,
syslog_strip_bom,
observer,
&mut processor,
&mut |m| {
if m.is_correlation() {
corr_count += 1;
} else {
det_count += 1;
}
renderer.emit(m);
},
);
unknown = processor.unknown;
dropped = processor.dropped + processor.errored;
}
#[cfg(feature = "evtx")]
EventSource::EvtxFile(path) => {
let observer = observe.map(|c| c.observer.as_ref());
let mut processor = RoutingProcessor::new(router);
events = stream_evtx_events(&path, event_filter, observer, &mut processor, &mut |m| {
if m.is_correlation() {
corr_count += 1;
} else {
det_count += 1;
}
renderer.emit(m);
});
unknown = processor.unknown;
dropped = processor.dropped + processor.errored;
}
EventSource::Stdin => {
let stdin = io::stdin();
let observer = observe.map(|c| c.observer.as_ref());
let mut processor = RoutingProcessor::new(router);
events = stream_events(
stdin.lock(),
event_filter,
input_format_str,
syslog_tz_str,
syslog_strip_bom,
observer,
&mut processor,
&mut |m| {
if m.is_correlation() {
corr_count += 1;
} else {
det_count += 1;
}
renderer.emit(m);
},
);
unknown = processor.unknown;
dropped = processor.dropped + processor.errored;
}
}
if renderer.ctx().show_stats() {
eprintln!(
"Processed {events} events, {det_count} detection matches, {corr_count} correlation matches, {unknown} unknown schema, {dropped} dropped."
);
}
det_count > 0 || corr_count > 0
}
#[allow(clippy::too_many_arguments)]
fn cmd_eval_with_correlations(
collection: SigmaCollection,
rules_path: &std::path::Path,
event_source: EventSource,
renderer: &mut MatchRenderer,
pipelines: &[Pipeline],
event_filter: &EventFilter,
config: rsigma_eval::CorrelationConfig,
include_event: bool,
match_detail: MatchDetailLevel,
input_format_str: &str,
syslog_tz_str: &str,
syslog_strip_bom: bool,
bloom_prefilter: bool,
bloom_max_bytes: Option<usize>,
#[cfg(feature = "daachorse-index")] cross_rule_ac: bool,
logsource_extractor: Option<LogSourceExtractor>,
observe: Option<&ObserveContext>,
dump_correlation_state: bool,
) -> bool {
let mut engine = CorrelationEngine::new(config);
engine.set_include_event(include_event);
engine.set_match_detail(match_detail);
if let Some(budget) = bloom_max_bytes {
engine.set_bloom_max_bytes(budget);
}
engine.set_bloom_prefilter(bloom_prefilter);
#[cfg(feature = "daachorse-index")]
engine.set_cross_rule_ac(cross_rule_ac);
engine.set_logsource_extractor(logsource_extractor);
for p in pipelines {
engine.add_pipeline(p.clone());
}
if let Err(e) = engine.add_collection(&collection) {
eprintln!("Error compiling rules: {e}");
process::exit(crate::exit_code::RULE_ERROR);
}
if renderer.ctx().show_progress() {
eprintln!(
"Loaded {} detection rules + {} correlation rules from {}",
engine.detection_rule_count(),
engine.correlation_rule_count(),
rules_path.display(),
);
}
tracing::info!(
detection_rules = engine.detection_rule_count(),
correlation_rules = engine.correlation_rule_count(),
rules_path = %rules_path.display(),
"Rules loaded",
);
let had_matches = match event_source {
EventSource::SingleJson(json_str) => {
let value: serde_json::Value = match serde_json::from_str(&json_str) {
Ok(v) => v,
Err(e) => {
eprintln!("Invalid JSON event: {e}");
process::exit(crate::exit_code::RULE_ERROR);
}
};
let mut had_matches = false;
for payload in crate::apply_event_filter(&value, event_filter) {
let event = JsonEvent::borrow(&payload);
if let Some(octx) = observe {
octx.observer.observe(&event);
}
let result = engine.process_event(&event);
if result.is_empty() {
if renderer.ctx().show_progress() {
eprintln!("No matches.");
}
} else {
had_matches = true;
for m in &result {
renderer.emit(m);
}
}
}
had_matches
}
EventSource::NdjsonFile(path) => {
let file = File::open(&path).unwrap_or_else(|e| {
eprintln!("Error opening event file '{}': {e}", path.display());
process::exit(crate::exit_code::RULE_ERROR);
});
let reader = BufReader::new(file);
let mut det_count = 0u64;
let mut corr_count = 0u64;
let line_num = {
let observer = observe.map(|c| c.observer.as_ref());
let mut processor = CorrelationProcessor {
engine: &mut engine,
};
stream_events(
reader,
event_filter,
input_format_str,
syslog_tz_str,
syslog_strip_bom,
observer,
&mut processor,
&mut |m| {
if m.is_detection() {
det_count += 1;
} else {
corr_count += 1;
}
renderer.emit(m);
},
)
};
if renderer.ctx().show_stats() {
eprintln!(
"Processed {line_num} events, {det_count} detection matches, {corr_count} correlation matches."
);
}
det_count > 0 || corr_count > 0
}
#[cfg(feature = "evtx")]
EventSource::EvtxFile(path) => {
let mut det_count = 0u64;
let mut corr_count = 0u64;
let rec_count = {
let observer = observe.map(|c| c.observer.as_ref());
let mut processor = CorrelationProcessor {
engine: &mut engine,
};
stream_evtx_events(&path, event_filter, observer, &mut processor, &mut |m| {
if m.is_detection() {
det_count += 1;
} else {
corr_count += 1;
}
renderer.emit(m);
})
};
if renderer.ctx().show_stats() {
eprintln!(
"Processed {rec_count} EVTX records, {det_count} detection matches, {corr_count} correlation matches."
);
}
det_count > 0 || corr_count > 0
}
EventSource::Stdin => {
let stdin = io::stdin();
let mut det_count = 0u64;
let mut corr_count = 0u64;
let line_num = {
let observer = observe.map(|c| c.observer.as_ref());
let mut processor = CorrelationProcessor {
engine: &mut engine,
};
stream_events(
stdin.lock(),
event_filter,
input_format_str,
syslog_tz_str,
syslog_strip_bom,
observer,
&mut processor,
&mut |m| {
if m.is_detection() {
det_count += 1;
} else {
corr_count += 1;
}
renderer.emit(m);
},
)
};
if renderer.ctx().show_stats() {
eprintln!(
"Processed {line_num} events, {det_count} detection matches, {corr_count} correlation matches."
);
}
det_count > 0 || corr_count > 0
}
};
if dump_correlation_state {
dump_correlation_snapshot(&engine);
}
had_matches
}
fn dump_correlation_snapshot(engine: &CorrelationEngine) {
let snapshot = engine.introspect();
match serde_json::to_string_pretty(&snapshot) {
Ok(json) => eprintln!("--- correlation state snapshot ---\n{json}"),
Err(e) => eprintln!("failed to serialize correlation state snapshot: {e}"),
}
}
#[allow(clippy::too_many_arguments)]
fn cmd_eval_detection_only(
collection: SigmaCollection,
rules_path: &std::path::Path,
event_source: EventSource,
renderer: &mut MatchRenderer,
pipelines: &[Pipeline],
event_filter: &EventFilter,
include_event: bool,
match_detail: MatchDetailLevel,
input_format_str: &str,
syslog_tz_str: &str,
syslog_strip_bom: bool,
bloom_prefilter: bool,
bloom_max_bytes: Option<usize>,
#[cfg(feature = "daachorse-index")] cross_rule_ac: bool,
logsource_extractor: Option<LogSourceExtractor>,
observe: Option<&ObserveContext>,
) -> bool {
let mut engine = Engine::new();
engine.set_include_event(include_event);
engine.set_match_detail(match_detail);
if let Some(budget) = bloom_max_bytes {
engine.set_bloom_max_bytes(budget);
}
engine.set_bloom_prefilter(bloom_prefilter);
#[cfg(feature = "daachorse-index")]
engine.set_cross_rule_ac(cross_rule_ac);
engine.set_logsource_extractor(logsource_extractor);
for p in pipelines {
engine.add_pipeline(p.clone());
}
if let Err(e) = engine.add_collection(&collection) {
eprintln!("Error compiling rules: {e}");
process::exit(crate::exit_code::RULE_ERROR);
}
if renderer.ctx().show_progress() {
eprintln!(
"Loaded {} rules from {}",
engine.rule_count(),
rules_path.display()
);
}
tracing::info!(
detection_rules = engine.rule_count(),
correlation_rules = 0,
rules_path = %rules_path.display(),
"Rules loaded",
);
match event_source {
EventSource::SingleJson(json_str) => {
let value: serde_json::Value = match serde_json::from_str(&json_str) {
Ok(v) => v,
Err(e) => {
eprintln!("Invalid JSON event: {e}");
process::exit(crate::exit_code::RULE_ERROR);
}
};
let mut had_matches = false;
let payloads = crate::apply_event_filter(&value, event_filter);
if payloads.is_empty() {
if renderer.ctx().show_progress() {
eprintln!("No matches.");
}
} else {
for payload in &payloads {
let event = JsonEvent::borrow(payload);
if let Some(octx) = observe {
octx.observer.observe(&event);
}
let matches = engine.evaluate(&event);
if matches.is_empty() {
if renderer.ctx().show_progress() {
eprintln!("No matches.");
}
} else {
had_matches = true;
for m in &matches {
renderer.emit(m);
}
}
}
}
had_matches
}
EventSource::NdjsonFile(path) => {
let file = File::open(&path).unwrap_or_else(|e| {
eprintln!("Error opening event file '{}': {e}", path.display());
process::exit(crate::exit_code::RULE_ERROR);
});
let reader = BufReader::new(file);
let mut match_count = 0u64;
let line_num = {
let observer = observe.map(|c| c.observer.as_ref());
let mut processor = DetectionProcessor { engine: &engine };
stream_events(
reader,
event_filter,
input_format_str,
syslog_tz_str,
syslog_strip_bom,
observer,
&mut processor,
&mut |m| {
match_count += 1;
renderer.emit(m);
},
)
};
if renderer.ctx().show_stats() {
eprintln!("Processed {line_num} events, {match_count} matches.");
}
match_count > 0
}
#[cfg(feature = "evtx")]
EventSource::EvtxFile(path) => {
let mut match_count = 0u64;
let rec_count = {
let observer = observe.map(|c| c.observer.as_ref());
let mut processor = DetectionProcessor { engine: &engine };
stream_evtx_events(&path, event_filter, observer, &mut processor, &mut |m| {
match_count += 1;
renderer.emit(m);
})
};
if renderer.ctx().show_stats() {
eprintln!("Processed {rec_count} EVTX records, {match_count} matches.");
}
match_count > 0
}
EventSource::Stdin => {
let stdin = io::stdin();
let mut match_count = 0u64;
let line_num = {
let observer = observe.map(|c| c.observer.as_ref());
let mut processor = DetectionProcessor { engine: &engine };
stream_events(
stdin.lock(),
event_filter,
input_format_str,
syslog_tz_str,
syslog_strip_bom,
observer,
&mut processor,
&mut |m| {
match_count += 1;
renderer.emit(m);
},
)
};
if renderer.ctx().show_stats() {
eprintln!("Processed {line_num} events, {match_count} matches.");
}
match_count > 0
}
}
}
struct ObserveContext {
observer: Arc<FieldObserver>,
rule_field_set: RuleFieldSet,
report_path: Option<PathBuf>,
}
const EVAL_MISSING_RULE_TITLES_CAP: usize = 10;
fn render_field_report(ctx: &ObserveContext) {
let snapshot = ctx.observer.snapshot();
let coverage = snapshot.coverage(&ctx.rule_field_set);
let unknown_entries: Vec<serde_json::Value> = coverage
.unknown
.iter()
.map(|e| {
let field: &str = &e.field;
serde_json::json!({ "field": field, "count": e.count })
})
.collect();
let missing_entries: Vec<serde_json::Value> = coverage
.missing
.iter()
.map(|(name, origin)| {
let total = origin.rule_titles.len();
let truncated = total > EVAL_MISSING_RULE_TITLES_CAP;
let rule_titles: Vec<&str> = origin
.rule_titles
.iter()
.map(String::as_str)
.take(EVAL_MISSING_RULE_TITLES_CAP)
.collect();
let sources: Vec<&str> = origin.sources.iter().map(|s| s.as_str()).collect();
serde_json::json!({
"field": name,
"rule_count": total,
"sources": sources,
"rule_titles": rule_titles,
"truncated": truncated,
})
})
.collect();
let report = serde_json::json!({
"summary": {
"events_observed": snapshot.events_observed,
"unique_keys_observed": snapshot.unique_keys,
"rule_fields_loaded": ctx.rule_field_set.len(),
"overflow_dropped": snapshot.overflow_dropped,
"max_keys": snapshot.max_keys,
"uptime_seconds": snapshot.uptime_seconds,
"intersection_count": coverage.intersection_count,
"unknown_count": unknown_entries.len(),
"missing_count": missing_entries.len(),
},
"unknown": unknown_entries,
"missing": missing_entries,
});
let serialized = serde_json::to_string_pretty(&report).unwrap_or_else(|_| report.to_string());
match ctx.report_path.as_deref() {
Some(path) => {
if let Err(e) = write_report_to_file(path, &serialized) {
eprintln!(
"Failed to write field observation report to {}: {e}",
path.display()
);
}
}
None => {
let _ = writeln!(io::stderr(), "{serialized}");
}
}
}
fn write_report_to_file(path: &Path, serialized: &str) -> std::io::Result<()> {
let mut file = File::create(path)?;
file.write_all(serialized.as_bytes())?;
file.write_all(b"\n")?;
file.flush()
}
struct MatchRenderer {
ctx: OutputCtx,
state: RenderState,
}
enum RenderState {
Json { pretty: bool },
Delimited(DelimitedWriter),
Table(Vec<EvalRow>),
}
impl MatchRenderer {
fn new(ctx: OutputCtx, pretty_flag: bool) -> Self {
let state = if pretty_flag && !ctx.explicit_format {
RenderState::Json { pretty: true }
} else {
match ctx.format {
OutputFormat::Json => RenderState::Json {
pretty: pretty_flag || ctx.pretty_json(),
},
OutputFormat::Ndjson => RenderState::Json { pretty: false },
OutputFormat::Csv => {
RenderState::Delimited(DelimitedWriter::new(',', EvalRow::headers()))
}
OutputFormat::Tsv => {
RenderState::Delimited(DelimitedWriter::new('\t', EvalRow::headers()))
}
OutputFormat::Table => RenderState::Table(Vec::new()),
}
};
Self { ctx, state }
}
fn ctx(&self) -> &OutputCtx {
&self.ctx
}
fn emit(&mut self, m: &EvaluationResult) {
match &mut self.state {
RenderState::Json { pretty } => crate::output::render_json(m, *pretty),
RenderState::Delimited(writer) => {
let row = EvalRow::from_result(m).row();
writer.push(&row);
}
RenderState::Table(rows) => rows.push(EvalRow::from_result(m)),
}
}
fn flush(&mut self) {
if let RenderState::Table(rows) = &self.state {
crate::output::render_table(rows);
}
}
}
#[derive(Clone)]
struct EvalRow {
level: String,
rule: String,
kind: String,
detail: String,
}
const ROW_HEADERS: &[&str] = &["LEVEL", "RULE", "TYPE", "DETAIL"];
const DETAIL_MAX: usize = 200;
impl EvalRow {
fn from_result(m: &EvaluationResult) -> Self {
let level = m
.header
.level
.as_ref()
.map(|l| l.as_str().to_string())
.unwrap_or_else(|| "-".to_string());
let rule = m.header.rule_title.clone();
let (kind, detail) = match &m.body {
ResultBody::Detection(d) => {
let detail = d
.matched_fields
.iter()
.map(|fm| format!("{}={}", fm.field, summarize_value(&fm.value)))
.collect::<Vec<_>>()
.join(", ");
("detection".to_string(), truncate(detail))
}
ResultBody::Correlation(c) => {
let group = c
.group_key
.iter()
.map(|(k, v)| format!("{k}={v}"))
.collect::<Vec<_>>()
.join(", ");
let detail = if group.is_empty() {
format!("agg={}", c.aggregated_value)
} else {
format!("{group} | agg={}", c.aggregated_value)
};
(c.correlation_type.as_str().to_string(), truncate(detail))
}
};
Self {
level,
rule,
kind,
detail,
}
}
}
impl Tabular for EvalRow {
fn headers() -> &'static [&'static str] {
ROW_HEADERS
}
fn row(&self) -> Vec<String> {
vec![
self.level.clone(),
self.rule.clone(),
self.kind.clone(),
self.detail.clone(),
]
}
}
fn summarize_value(v: &serde_json::Value) -> String {
match v {
serde_json::Value::String(s) => s.clone(),
serde_json::Value::Number(n) => n.to_string(),
serde_json::Value::Bool(b) => b.to_string(),
serde_json::Value::Null => "null".to_string(),
other => other.to_string(),
}
}
fn truncate(mut s: String) -> String {
if s.chars().count() <= DETAIL_MAX {
return s;
}
let truncated: String = s.chars().take(DETAIL_MAX - 1).collect();
s.clear();
s.push_str(&truncated);
s.push('…');
s
}
#[cfg(test)]
mod tests {
use super::*;
use clap::{Command, FromArgMatches};
fn parse(argv: &[&str]) -> (EvalArgs, ArgMatches) {
let cmd = EvalArgs::augment_args(Command::new("eval"));
let matches = cmd.get_matches_from(argv);
let args = EvalArgs::from_arg_matches(&matches).expect("valid args");
(args, matches)
}
fn partial(yaml: &str) -> config::RsigmaConfigPartial {
yaml_serde::from_str(yaml).expect("valid partial")
}
#[test]
fn cli_flag_beats_config_file() {
let (mut args, matches) = parse(&["eval", "--rules", "/cli/rules"]);
let base = partial("eval:\n rules: /file/rules\n fail_on_detection: true\n");
overlay_eval_config(&mut args, &matches, base);
assert_eq!(args.rules.as_deref(), Some(Path::new("/cli/rules")));
assert!(args.fail_on_detection);
}
#[test]
fn config_fills_unset_rules() {
let (mut args, matches) = parse(&["eval"]);
let base = partial("eval:\n rules: /file/rules\n");
overlay_eval_config(&mut args, &matches, base);
assert_eq!(args.rules.as_deref(), Some(Path::new("/file/rules")));
}
#[test]
fn syslog_strip_bom_default_on_config_off_flag_wins() {
let (args, _) = parse(&["eval", "--rules", "/r"]);
assert!(args.syslog_strip_bom);
let (mut args, matches) = parse(&["eval", "--rules", "/r"]);
let base = partial("eval:\n syslog_strip_bom: false\n");
overlay_eval_config(&mut args, &matches, base);
assert!(!args.syslog_strip_bom);
let (mut args, matches) = parse(&["eval", "--rules", "/r", "--syslog-strip-bom", "true"]);
let base = partial("eval:\n syslog_strip_bom: false\n");
overlay_eval_config(&mut args, &matches, base);
assert!(args.syslog_strip_bom);
}
#[test]
fn schema_routing_from_config_file() {
let (mut args, matches) = parse(&["eval", "--rules", "/r"]);
let base = partial(
"eval:\n schema:\n routing: true\n config: /file/schema.yml\n on_unknown: drop\n",
);
overlay_eval_config(&mut args, &matches, base);
assert!(args.schema_routing);
assert_eq!(
args.schema_config.as_deref(),
Some(Path::new("/file/schema.yml"))
);
assert_eq!(args.on_unknown.as_deref(), Some("drop"));
}
#[test]
fn schema_routing_flag_beats_config_file() {
let (mut args, matches) = parse(&["eval", "--rules", "/r", "--on-unknown", "error"]);
let base = partial("eval:\n schema:\n routing: true\n on_unknown: drop\n");
overlay_eval_config(&mut args, &matches, base);
assert!(args.schema_routing);
assert_eq!(args.on_unknown.as_deref(), Some("error"));
}
}