use std::path::{Path, PathBuf};
use std::sync::Arc;
use arc_swap::ArcSwap;
use rsigma_eval::event::Event;
use rsigma_eval::{
CorrelationConfig, CorrelationEngine, CorrelationSnapshot, Engine, Pipeline, ProcessResult,
RuleFieldSet, parse_pipeline_file,
};
use rsigma_parser::SigmaCollection;
use crate::sources::{self, SourceResolver, TemplateExpander};
pub struct RuntimeEngine {
engine: EngineVariant,
pipelines: Vec<Pipeline>,
pipeline_paths: Vec<PathBuf>,
rules_path: std::path::PathBuf,
corr_config: CorrelationConfig,
include_event: bool,
source_resolver: Option<Arc<dyn SourceResolver>>,
allow_remote_include: bool,
bloom_prefilter: bool,
bloom_max_bytes: Option<usize>,
#[cfg(feature = "daachorse-index")]
cross_rule_ac: bool,
rule_field_set: Arc<ArcSwap<RuleFieldSet>>,
}
enum EngineVariant {
DetectionOnly(Box<Engine>),
WithCorrelations(Box<CorrelationEngine>),
}
#[derive(Debug, Clone, Copy)]
pub struct EngineStats {
pub detection_rules: usize,
pub correlation_rules: usize,
pub state_entries: usize,
}
impl RuntimeEngine {
pub fn new(
rules_path: std::path::PathBuf,
pipelines: Vec<Pipeline>,
corr_config: CorrelationConfig,
include_event: bool,
) -> Self {
RuntimeEngine {
engine: EngineVariant::DetectionOnly(Box::new(Engine::new())),
pipelines,
pipeline_paths: Vec::new(),
rules_path,
corr_config,
include_event,
source_resolver: None,
allow_remote_include: false,
bloom_prefilter: false,
bloom_max_bytes: None,
#[cfg(feature = "daachorse-index")]
cross_rule_ac: false,
rule_field_set: Arc::new(ArcSwap::from_pointee(RuleFieldSet::default())),
}
}
pub fn rule_field_set(&self) -> Arc<RuleFieldSet> {
self.rule_field_set.load_full()
}
pub fn set_bloom_prefilter(&mut self, enabled: bool) {
self.bloom_prefilter = enabled;
}
pub fn bloom_prefilter(&self) -> bool {
self.bloom_prefilter
}
pub fn set_bloom_max_bytes(&mut self, max_bytes: usize) {
self.bloom_max_bytes = Some(max_bytes);
}
pub fn bloom_max_bytes(&self) -> Option<usize> {
self.bloom_max_bytes
}
#[cfg(feature = "daachorse-index")]
pub fn set_cross_rule_ac(&mut self, enabled: bool) {
self.cross_rule_ac = enabled;
}
#[cfg(feature = "daachorse-index")]
pub fn cross_rule_ac(&self) -> bool {
self.cross_rule_ac
}
pub fn set_source_resolver(&mut self, resolver: Arc<dyn SourceResolver>) {
self.source_resolver = Some(resolver);
}
pub fn source_resolver(&self) -> Option<&Arc<dyn SourceResolver>> {
self.source_resolver.as_ref()
}
pub fn set_allow_remote_include(&mut self, allow: bool) {
self.allow_remote_include = allow;
}
pub fn allow_remote_include(&self) -> bool {
self.allow_remote_include
}
pub fn set_pipeline_paths(&mut self, paths: Vec<PathBuf>) {
self.pipeline_paths = paths;
}
pub fn pipeline_paths(&self) -> &[PathBuf] {
&self.pipeline_paths
}
pub async fn resolve_dynamic_pipelines(&mut self) -> Result<(), String> {
let Some(resolver) = &self.source_resolver else {
return Ok(());
};
let mut resolved_pipelines = Vec::with_capacity(self.pipelines.len());
for pipeline in &self.pipelines {
if pipeline.is_dynamic() {
match sources::resolve_all(resolver.as_ref(), &pipeline.sources).await {
Ok(resolved_data) => {
let mut expanded = TemplateExpander::expand(pipeline, &resolved_data);
sources::include::expand_includes(
&mut expanded,
&resolved_data,
self.allow_remote_include,
)?;
resolved_pipelines.push(expanded);
}
Err(e) => {
return Err(format!(
"Failed to resolve dynamic pipeline '{}': {e}",
pipeline.name
));
}
}
} else {
resolved_pipelines.push(pipeline.clone());
}
}
self.pipelines = resolved_pipelines;
Ok(())
}
pub fn load_rules(&mut self) -> Result<EngineStats, String> {
let load_span = tracing::info_span!("load_rules", rules_path = %self.rules_path.display());
let _enter = load_span.enter();
let load_start = std::time::Instant::now();
if !self.pipeline_paths.is_empty() {
self.pipelines = reload_pipelines(&self.pipeline_paths)?;
}
if self.source_resolver.is_some() && self.pipelines.iter().any(|p| p.is_dynamic()) {
let handle = tokio::runtime::Handle::try_current().map_err(|_| {
"Dynamic pipelines require a tokio runtime; refusing to load rules with \
unresolved sources"
.to_string()
})?;
let pipelines = std::mem::take(&mut self.pipelines);
let resolver = self.source_resolver.clone().unwrap();
let allow_remote = self.allow_remote_include;
let resolved = tokio::task::block_in_place(|| {
handle.block_on(async {
resolve_pipelines_async(&resolver, &pipelines, allow_remote).await
})
});
match resolved {
Ok(p) => self.pipelines = p,
Err(e) => {
self.pipelines = pipelines;
return Err(format!("Dynamic source resolution failed: {e}"));
}
}
}
let previous_state = self.export_state();
let collection = load_collection(&self.rules_path)?;
let has_correlations = !collection.correlations.is_empty();
if has_correlations {
let mut engine = CorrelationEngine::new(self.corr_config.clone());
engine.set_include_event(self.include_event);
if let Some(budget) = self.bloom_max_bytes {
engine.set_bloom_max_bytes(budget);
}
engine.set_bloom_prefilter(self.bloom_prefilter);
#[cfg(feature = "daachorse-index")]
engine.set_cross_rule_ac(self.cross_rule_ac);
for p in &self.pipelines {
engine.add_pipeline(p.clone());
}
engine
.add_collection(&collection)
.map_err(|e| format!("Error compiling rules: {e}"))?;
if let Some(snapshot) = previous_state {
engine.import_state(snapshot);
}
let stats = EngineStats {
detection_rules: engine.detection_rule_count(),
correlation_rules: engine.correlation_rule_count(),
state_entries: engine.state_count(),
};
self.engine = EngineVariant::WithCorrelations(Box::new(engine));
self.refresh_rule_field_set(&collection);
tracing::debug!(
detection_rules = stats.detection_rules,
correlation_rules = stats.correlation_rules,
duration_ms = load_start.elapsed().as_millis() as u64,
"Rule load complete",
);
Ok(stats)
} else {
let mut engine = Engine::new();
engine.set_include_event(self.include_event);
if let Some(budget) = self.bloom_max_bytes {
engine.set_bloom_max_bytes(budget);
}
engine.set_bloom_prefilter(self.bloom_prefilter);
#[cfg(feature = "daachorse-index")]
engine.set_cross_rule_ac(self.cross_rule_ac);
for p in &self.pipelines {
engine.add_pipeline(p.clone());
}
engine
.add_collection(&collection)
.map_err(|e| format!("Error compiling rules: {e}"))?;
let stats = EngineStats {
detection_rules: engine.rule_count(),
correlation_rules: 0,
state_entries: 0,
};
self.engine = EngineVariant::DetectionOnly(Box::new(engine));
self.refresh_rule_field_set(&collection);
tracing::debug!(
detection_rules = stats.detection_rules,
correlation_rules = stats.correlation_rules,
duration_ms = load_start.elapsed().as_millis() as u64,
"Rule load complete",
);
Ok(stats)
}
}
fn refresh_rule_field_set(&self, collection: &SigmaCollection) {
let field_set = RuleFieldSet::collect(collection, &self.pipelines, true);
self.rule_field_set.store(Arc::new(field_set));
}
pub fn process_batch<E: Event + Sync>(&mut self, events: &[&E]) -> Vec<ProcessResult> {
match &mut self.engine {
EngineVariant::DetectionOnly(engine) => engine.evaluate_batch(events),
EngineVariant::WithCorrelations(engine) => engine.process_batch(events),
}
}
pub fn stats(&self) -> EngineStats {
match &self.engine {
EngineVariant::DetectionOnly(engine) => EngineStats {
detection_rules: engine.rule_count(),
correlation_rules: 0,
state_entries: 0,
},
EngineVariant::WithCorrelations(engine) => EngineStats {
detection_rules: engine.detection_rule_count(),
correlation_rules: engine.correlation_rule_count(),
state_entries: engine.state_count(),
},
}
}
pub fn rules_path(&self) -> &Path {
&self.rules_path
}
pub fn pipelines(&self) -> &[Pipeline] {
&self.pipelines
}
pub fn corr_config(&self) -> &CorrelationConfig {
&self.corr_config
}
pub fn include_event(&self) -> bool {
self.include_event
}
pub fn export_state(&self) -> Option<CorrelationSnapshot> {
match &self.engine {
EngineVariant::DetectionOnly(_) => None,
EngineVariant::WithCorrelations(engine) => Some(engine.export_state()),
}
}
pub fn import_state(&mut self, snapshot: &CorrelationSnapshot) -> bool {
if let EngineVariant::WithCorrelations(engine) = &mut self.engine {
engine.import_state(snapshot.clone())
} else {
true
}
}
}
fn load_collection(path: &Path) -> Result<SigmaCollection, String> {
let collection = if path.is_dir() {
rsigma_parser::parse_sigma_directory(path)
.map_err(|e| format!("Error loading rules from {}: {e}", path.display()))?
} else {
rsigma_parser::parse_sigma_file(path)
.map_err(|e| format!("Error loading rule {}: {e}", path.display()))?
};
if !collection.errors.is_empty() {
tracing::warn!(
count = collection.errors.len(),
"Parse errors while loading rules"
);
for (i, err) in collection.errors.iter().take(3).enumerate() {
tracing::warn!(index = i + 1, error = %err, "Rule parse error detail");
}
}
Ok(collection)
}
fn reload_pipelines(paths: &[PathBuf]) -> Result<Vec<Pipeline>, String> {
let mut pipelines = Vec::with_capacity(paths.len());
for path in paths {
let pipeline = parse_pipeline_file(path)
.map_err(|e| format!("Error reloading pipeline {}: {e}", path.display()))?;
if !pipeline.sources.is_empty() {
crate::warn_pipeline_inline_sources(path, &pipeline.name);
}
pipelines.push(pipeline);
}
pipelines.sort_by_key(|p| p.priority);
Ok(pipelines)
}
async fn resolve_pipelines_async(
resolver: &Arc<dyn SourceResolver>,
pipelines: &[Pipeline],
allow_remote_include: bool,
) -> Result<Vec<Pipeline>, String> {
let mut resolved_pipelines = Vec::with_capacity(pipelines.len());
for pipeline in pipelines {
if pipeline.is_dynamic() {
let resolved_data = sources::resolve_all(resolver.as_ref(), &pipeline.sources)
.await
.map_err(|e| {
format!(
"Failed to resolve dynamic pipeline '{}': {e}",
pipeline.name
)
})?;
let mut expanded = TemplateExpander::expand(pipeline, &resolved_data);
sources::include::expand_includes(&mut expanded, &resolved_data, allow_remote_include)?;
resolved_pipelines.push(expanded);
} else {
resolved_pipelines.push(pipeline.clone());
}
}
Ok(resolved_pipelines)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::pipeline_deprecation::reset_inline_sources_dedup_for_tests;
use std::sync::Mutex;
static DEDUP_TEST_GUARD: Mutex<()> = Mutex::new(());
const RULE_YAML: &str = r#"
title: TestRule
id: 11111111-1111-1111-1111-111111111111
status: experimental
logsource:
product: test
detection:
selection:
EventID: 1
condition: selection
"#;
const PIPELINE_WITH_SOURCES: &str = r#"
name: legacy_pipeline_with_inline_sources
priority: 50
sources:
- id: threat_feed
type: file
path: /tmp/does-not-matter.json
format: json
transformations:
- type: value_placeholders
"#;
const PIPELINE_NO_SOURCES: &str = r#"
name: simple_pipeline
priority: 10
transformations:
- id: rename
type: field_name_mapping
mapping:
EventID: event.id
"#;
fn dedup_set_contains(path: &Path) -> bool {
let canonical = path.canonicalize().unwrap_or_else(|_| path.to_path_buf());
crate::pipeline_deprecation::tests_only_snapshot().contains(&canonical)
}
#[test]
fn load_rules_surfaces_inline_sources_deprecation_through_runtime() {
let _guard = DEDUP_TEST_GUARD.lock().unwrap();
reset_inline_sources_dedup_for_tests();
let dir = tempfile::tempdir().unwrap();
let rule_path = dir.path().join("rule.yml");
std::fs::write(&rule_path, RULE_YAML).unwrap();
let pipeline_path = dir.path().join("pipeline.yml");
std::fs::write(&pipeline_path, PIPELINE_WITH_SOURCES).unwrap();
let pipeline = parse_pipeline_file(&pipeline_path).unwrap();
let mut engine = RuntimeEngine::new(
rule_path,
vec![pipeline],
CorrelationConfig::default(),
false,
);
engine.set_pipeline_paths(vec![pipeline_path.clone()]);
engine.load_rules().unwrap();
assert!(
dedup_set_contains(&pipeline_path),
"RuntimeEngine::load_rules should route inline sources through \
warn_pipeline_inline_sources so the daemon hot-reload path \
covers the deprecation; the canonical pipeline path was not \
recorded in the dedup set."
);
}
#[test]
fn load_rules_does_not_warn_when_pipeline_has_no_inline_sources() {
let _guard = DEDUP_TEST_GUARD.lock().unwrap();
reset_inline_sources_dedup_for_tests();
let dir = tempfile::tempdir().unwrap();
let rule_path = dir.path().join("rule.yml");
std::fs::write(&rule_path, RULE_YAML).unwrap();
let pipeline_path = dir.path().join("clean.yml");
std::fs::write(&pipeline_path, PIPELINE_NO_SOURCES).unwrap();
let pipeline = parse_pipeline_file(&pipeline_path).unwrap();
let mut engine = RuntimeEngine::new(
rule_path,
vec![pipeline],
CorrelationConfig::default(),
false,
);
engine.set_pipeline_paths(vec![pipeline_path.clone()]);
engine.load_rules().unwrap();
assert!(
!dedup_set_contains(&pipeline_path),
"a pipeline without inline sources must not register in the \
deprecation dedup set."
);
}
#[test]
fn hot_reload_dedups_inline_sources_warning_for_same_pipeline_path() {
let _guard = DEDUP_TEST_GUARD.lock().unwrap();
reset_inline_sources_dedup_for_tests();
let dir = tempfile::tempdir().unwrap();
let rule_path = dir.path().join("rule.yml");
std::fs::write(&rule_path, RULE_YAML).unwrap();
let pipeline_path = dir.path().join("pipeline.yml");
std::fs::write(&pipeline_path, PIPELINE_WITH_SOURCES).unwrap();
let pipeline = parse_pipeline_file(&pipeline_path).unwrap();
let mut engine = RuntimeEngine::new(
rule_path,
vec![pipeline],
CorrelationConfig::default(),
false,
);
engine.set_pipeline_paths(vec![pipeline_path.clone()]);
engine.load_rules().unwrap();
assert!(dedup_set_contains(&pipeline_path));
let canonical = pipeline_path.canonicalize().unwrap();
let before = crate::pipeline_deprecation::tests_only_snapshot();
engine.load_rules().unwrap();
let after = crate::pipeline_deprecation::tests_only_snapshot();
assert_eq!(
before, after,
"second load_rules should not change the dedup set",
);
assert!(after.contains(&canonical));
}
#[tokio::test(flavor = "multi_thread")]
async fn load_rules_fails_closed_when_dynamic_source_resolution_fails() {
let dir = tempfile::tempdir().unwrap();
let rule_path = dir.path().join("rule.yml");
std::fs::write(&rule_path, RULE_YAML).unwrap();
let missing = dir.path().join("missing.json");
let pipeline_yaml = format!(
r#"
name: dynamic_missing
priority: 10
sources:
- id: feed
type: file
path: {}
format: json
on_error: fail
transformations:
- type: value_placeholders
"#,
missing.display(),
);
let pipeline_path = dir.path().join("pipeline.yml");
std::fs::write(&pipeline_path, pipeline_yaml).unwrap();
let pipeline = parse_pipeline_file(&pipeline_path).unwrap();
assert!(
pipeline.is_dynamic(),
"fixture should produce a dynamic pipeline"
);
let mut engine = RuntimeEngine::new(
rule_path,
vec![pipeline],
CorrelationConfig::default(),
false,
);
engine.set_source_resolver(Arc::new(sources::DefaultSourceResolver::new()));
let err = engine
.load_rules()
.expect_err("missing source must cause load_rules to fail closed");
assert!(
err.contains("Dynamic source resolution failed"),
"error should explain the fail-closed path; got: {err}"
);
}
}