use std::io::BufRead;
use rsigma_eval::{CorrelationEngine, Engine, EvaluationResult, Event, FieldObserver, JsonEvent};
use crate::EventFilter;
pub(crate) trait EventProcessor {
fn process<E: Event>(&mut self, event: &E, on_result: &mut dyn FnMut(&EvaluationResult));
}
pub(crate) struct CorrelationProcessor<'a> {
pub engine: &'a mut CorrelationEngine,
}
impl EventProcessor for CorrelationProcessor<'_> {
fn process<E: Event>(&mut self, event: &E, on_result: &mut dyn FnMut(&EvaluationResult)) {
for m in &self.engine.process_event(event) {
on_result(m);
}
}
}
pub(crate) struct DetectionProcessor<'a> {
pub engine: &'a Engine,
}
impl EventProcessor for DetectionProcessor<'_> {
fn process<E: Event>(&mut self, event: &E, on_result: &mut dyn FnMut(&EvaluationResult)) {
for m in &self.engine.evaluate(event) {
on_result(m);
}
}
}
#[inline]
fn observe_event<E: Event + ?Sized>(observer: Option<&FieldObserver>, event: &E) {
if let Some(observer) = observer {
observer.observe(event);
}
}
#[allow(clippy::too_many_arguments)]
pub(crate) fn stream_events<P: EventProcessor>(
reader: impl BufRead,
event_filter: &EventFilter,
input_format_str: &str,
syslog_tz_str: &str,
syslog_strip_bom: bool,
observe: Option<&FieldObserver>,
processor: &mut P,
on_result: &mut dyn FnMut(&EvaluationResult),
) -> u64 {
let mut line_num = 0u64;
#[cfg(feature = "daemon")]
let format =
crate::commands::parse_input_format(input_format_str, syslog_tz_str, syslog_strip_bom);
#[cfg(not(feature = "daemon"))]
let _ = (input_format_str, syslog_tz_str, syslog_strip_bom);
for line in reader.lines() {
line_num += 1;
let line = match line {
Ok(l) => l,
Err(e) => {
eprintln!("Error reading line {line_num}: {e}");
continue;
}
};
if line.trim().is_empty() {
continue;
}
#[cfg(feature = "daemon")]
process_line(processor, &line, &format, event_filter, observe, on_result);
#[cfg(not(feature = "daemon"))]
process_line_json(processor, &line, event_filter, observe, on_result);
}
line_num
}
#[cfg(feature = "daemon")]
fn process_line<P: EventProcessor>(
processor: &mut P,
line: &str,
format: &rsigma_runtime::InputFormat,
event_filter: &EventFilter,
observe: Option<&FieldObserver>,
on_result: &mut dyn FnMut(&EvaluationResult),
) {
let Some(decoded) = rsigma_runtime::parse_line(line, format) else {
return;
};
if matches!(decoded, rsigma_runtime::EventInputDecoded::Json(_)) {
let json_value = decoded.to_json();
for payload in crate::apply_event_filter(&json_value, event_filter) {
let event = JsonEvent::borrow(&payload);
observe_event(observe, &event);
processor.process(&event, on_result);
}
} else {
observe_event(observe, &decoded);
processor.process(&decoded, on_result);
}
}
#[cfg(not(feature = "daemon"))]
fn process_line_json<P: EventProcessor>(
processor: &mut P,
line: &str,
event_filter: &EventFilter,
observe: Option<&FieldObserver>,
on_result: &mut dyn FnMut(&EvaluationResult),
) {
let value: serde_json::Value = match serde_json::from_str(line) {
Ok(v) => v,
Err(e) => {
eprintln!("Invalid JSON: {e}");
return;
}
};
for payload in crate::apply_event_filter(&value, event_filter) {
let event = JsonEvent::borrow(&payload);
observe_event(observe, &event);
processor.process(&event, on_result);
}
}
#[cfg(feature = "evtx")]
pub(crate) fn stream_evtx_events<P: EventProcessor>(
path: &std::path::Path,
event_filter: &EventFilter,
observe: Option<&FieldObserver>,
processor: &mut P,
on_result: &mut dyn FnMut(&EvaluationResult),
) -> u64 {
let mut reader = rsigma_runtime::EvtxFileReader::open(path).unwrap_or_else(|e| {
eprintln!("Error opening EVTX file '{}': {e}", path.display());
std::process::exit(crate::exit_code::RULE_ERROR);
});
let mut rec_count = 0u64;
for record in reader.records() {
rec_count += 1;
let value = match record {
Ok(v) => v,
Err(e) => {
eprintln!("Error reading EVTX record {rec_count}: {e}");
continue;
}
};
for payload in crate::apply_event_filter(&value, event_filter) {
let event = JsonEvent::borrow(&payload);
observe_event(observe, &event);
processor.process(&event, on_result);
}
}
rec_count
}