1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
use std::path::{Path, PathBuf};
use std::sync::Arc;
use rsigma_eval::event::Event;
use rsigma_eval::{
CorrelationConfig, CorrelationEngine, CorrelationSnapshot, Engine, Pipeline, ProcessResult,
parse_pipeline_file,
};
use rsigma_parser::SigmaCollection;
use crate::sources::{self, SourceResolver, TemplateExpander};
/// Wraps a CorrelationEngine (or a plain Engine) and provides the interface
/// the runtime needs: process events, reload rules, and query state.
pub struct RuntimeEngine {
engine: EngineVariant,
pipelines: Vec<Pipeline>,
pipeline_paths: Vec<PathBuf>,
rules_path: std::path::PathBuf,
corr_config: CorrelationConfig,
include_event: bool,
source_resolver: Option<Arc<dyn SourceResolver>>,
allow_remote_include: bool,
/// Opt-in bloom-filter pre-filtering of positive substring matchers.
/// Forwarded to the inner detection engine on every rule reload.
bloom_prefilter: bool,
/// Optional override for the bloom memory budget in bytes. `None`
/// means use the eval crate default.
bloom_max_bytes: Option<usize>,
/// Opt-in cross-rule Aho-Corasick pre-filter. Forwarded to the inner
/// detection engine on every rule reload. Available behind the
/// `daachorse-index` Cargo feature.
#[cfg(feature = "daachorse-index")]
cross_rule_ac: bool,
}
enum EngineVariant {
DetectionOnly(Box<Engine>),
WithCorrelations(Box<CorrelationEngine>),
}
/// Summary statistics about the loaded engine state.
pub struct EngineStats {
pub detection_rules: usize,
pub correlation_rules: usize,
pub state_entries: usize,
}
impl RuntimeEngine {
pub fn new(
rules_path: std::path::PathBuf,
pipelines: Vec<Pipeline>,
corr_config: CorrelationConfig,
include_event: bool,
) -> Self {
RuntimeEngine {
engine: EngineVariant::DetectionOnly(Box::new(Engine::new())),
pipelines,
pipeline_paths: Vec::new(),
rules_path,
corr_config,
include_event,
source_resolver: None,
allow_remote_include: false,
bloom_prefilter: false,
bloom_max_bytes: None,
#[cfg(feature = "daachorse-index")]
cross_rule_ac: false,
}
}
/// Enable or disable bloom-filter pre-filtering on the inner detection
/// engine. Off by default. Applies on the next `load_rules()`; pre-load
/// callers should set this before calling `load_rules()`.
pub fn set_bloom_prefilter(&mut self, enabled: bool) {
self.bloom_prefilter = enabled;
}
/// Override the bloom memory budget on the inner detection engine.
/// Applies on the next `load_rules()`.
pub fn set_bloom_max_bytes(&mut self, max_bytes: usize) {
self.bloom_max_bytes = Some(max_bytes);
}
/// Enable or disable the cross-rule Aho-Corasick pre-filter on the
/// inner detection engine. Off by default; the optimization helps only
/// on substring-heavy rule sets > ~5K rules. Applies on the next
/// `load_rules()`.
///
/// Available behind the `daachorse-index` Cargo feature.
#[cfg(feature = "daachorse-index")]
pub fn set_cross_rule_ac(&mut self, enabled: bool) {
self.cross_rule_ac = enabled;
}
/// Set a source resolver for dynamic pipeline sources.
///
/// When set, `load_rules()` resolves dynamic sources and expands
/// `${source.*}` templates before compiling rules.
pub fn set_source_resolver(&mut self, resolver: Arc<dyn SourceResolver>) {
self.source_resolver = Some(resolver);
}
/// Get the source resolver, if one is configured.
pub fn source_resolver(&self) -> Option<&Arc<dyn SourceResolver>> {
self.source_resolver.as_ref()
}
/// Allow `include` directives to reference HTTP/NATS sources.
pub fn set_allow_remote_include(&mut self, allow: bool) {
self.allow_remote_include = allow;
}
/// Whether remote includes are allowed.
pub fn allow_remote_include(&self) -> bool {
self.allow_remote_include
}
/// Set the pipeline file paths used for hot-reload.
///
/// When paths are set, `load_rules()` re-reads pipeline YAML from disk
/// before rebuilding the engine. This enables pipeline hot-reload
/// alongside rule hot-reload.
pub fn set_pipeline_paths(&mut self, paths: Vec<PathBuf>) {
self.pipeline_paths = paths;
}
/// Return the pipeline file paths (used by the daemon to set up watchers).
pub fn pipeline_paths(&self) -> &[PathBuf] {
&self.pipeline_paths
}
/// Resolve dynamic sources in all pipelines and expand templates.
///
/// This is the async entry point for source resolution. Call this before
/// `load_rules()` when you have an async context available, or let
/// `load_rules()` handle it synchronously via `tokio::runtime::Handle`.
pub async fn resolve_dynamic_pipelines(&mut self) -> Result<(), String> {
let Some(resolver) = &self.source_resolver else {
return Ok(());
};
let mut resolved_pipelines = Vec::with_capacity(self.pipelines.len());
for pipeline in &self.pipelines {
if pipeline.is_dynamic() {
match sources::resolve_all(resolver.as_ref(), &pipeline.sources).await {
Ok(resolved_data) => {
let mut expanded = TemplateExpander::expand(pipeline, &resolved_data);
// Expand include directives
sources::include::expand_includes(
&mut expanded,
&resolved_data,
self.allow_remote_include,
)?;
resolved_pipelines.push(expanded);
}
Err(e) => {
return Err(format!(
"Failed to resolve dynamic pipeline '{}': {e}",
pipeline.name
));
}
}
} else {
resolved_pipelines.push(pipeline.clone());
}
}
self.pipelines = resolved_pipelines;
Ok(())
}
/// Load (or reload) rules from the configured path.
///
/// On reload, correlation state is exported before replacing the engine
/// and re-imported after, so in-flight windows and suppression state
/// survive rule changes (entries for removed correlations are dropped).
///
/// If pipeline paths are set (via [`set_pipeline_paths`](Self::set_pipeline_paths)),
/// pipelines are re-read from disk before rebuilding the engine. If any
/// pipeline file fails to parse, the entire reload is aborted and the
/// old engine remains active.
///
/// Dynamic pipeline sources are resolved if a source resolver is configured.
pub fn load_rules(&mut self) -> Result<EngineStats, String> {
if !self.pipeline_paths.is_empty() {
self.pipelines = reload_pipelines(&self.pipeline_paths)?;
}
// Resolve dynamic sources if a resolver is set
if self.source_resolver.is_some() && self.pipelines.iter().any(|p| p.is_dynamic()) {
if let Ok(handle) = tokio::runtime::Handle::try_current() {
let pipelines = std::mem::take(&mut self.pipelines);
let resolver = self.source_resolver.clone().unwrap();
let allow_remote = self.allow_remote_include;
let resolved = tokio::task::block_in_place(|| {
handle.block_on(async {
resolve_pipelines_async(&resolver, &pipelines, allow_remote).await
})
});
match resolved {
Ok(p) => self.pipelines = p,
Err(e) => {
self.pipelines = pipelines;
tracing::warn!(error = %e, "Dynamic source resolution failed, using unresolved pipelines");
}
}
} else {
tracing::warn!("No tokio runtime available for dynamic source resolution");
}
}
let previous_state = self.export_state();
let collection = load_collection(&self.rules_path)?;
let has_correlations = !collection.correlations.is_empty();
if has_correlations {
let mut engine = CorrelationEngine::new(self.corr_config.clone());
engine.set_include_event(self.include_event);
if let Some(budget) = self.bloom_max_bytes {
engine.set_bloom_max_bytes(budget);
}
engine.set_bloom_prefilter(self.bloom_prefilter);
#[cfg(feature = "daachorse-index")]
engine.set_cross_rule_ac(self.cross_rule_ac);
for p in &self.pipelines {
engine.add_pipeline(p.clone());
}
engine
.add_collection(&collection)
.map_err(|e| format!("Error compiling rules: {e}"))?;
if let Some(snapshot) = previous_state {
engine.import_state(snapshot);
}
let stats = EngineStats {
detection_rules: engine.detection_rule_count(),
correlation_rules: engine.correlation_rule_count(),
state_entries: engine.state_count(),
};
self.engine = EngineVariant::WithCorrelations(Box::new(engine));
Ok(stats)
} else {
let mut engine = Engine::new();
engine.set_include_event(self.include_event);
if let Some(budget) = self.bloom_max_bytes {
engine.set_bloom_max_bytes(budget);
}
engine.set_bloom_prefilter(self.bloom_prefilter);
#[cfg(feature = "daachorse-index")]
engine.set_cross_rule_ac(self.cross_rule_ac);
for p in &self.pipelines {
engine.add_pipeline(p.clone());
}
engine
.add_collection(&collection)
.map_err(|e| format!("Error compiling rules: {e}"))?;
let stats = EngineStats {
detection_rules: engine.rule_count(),
correlation_rules: 0,
state_entries: 0,
};
self.engine = EngineVariant::DetectionOnly(Box::new(engine));
Ok(stats)
}
}
/// Process a batch of events using parallel detection + sequential correlation.
///
/// Delegates to `Engine::evaluate_batch` or `CorrelationEngine::process_batch`
/// depending on whether correlation rules are loaded.
pub fn process_batch<E: Event + Sync>(&mut self, events: &[&E]) -> Vec<ProcessResult> {
match &mut self.engine {
EngineVariant::DetectionOnly(engine) => {
let batch_detections = engine.evaluate_batch(events);
batch_detections
.into_iter()
.map(|detections| ProcessResult {
detections,
correlations: vec![],
})
.collect()
}
EngineVariant::WithCorrelations(engine) => engine.process_batch(events),
}
}
/// Return summary statistics about the current engine state.
pub fn stats(&self) -> EngineStats {
match &self.engine {
EngineVariant::DetectionOnly(engine) => EngineStats {
detection_rules: engine.rule_count(),
correlation_rules: 0,
state_entries: 0,
},
EngineVariant::WithCorrelations(engine) => EngineStats {
detection_rules: engine.detection_rule_count(),
correlation_rules: engine.correlation_rule_count(),
state_entries: engine.state_count(),
},
}
}
/// Return the path from which rules are loaded.
pub fn rules_path(&self) -> &Path {
&self.rules_path
}
/// Return the configured processing pipelines.
pub fn pipelines(&self) -> &[Pipeline] {
&self.pipelines
}
/// Return the correlation configuration.
pub fn corr_config(&self) -> &CorrelationConfig {
&self.corr_config
}
/// Whether detection results include the matched event.
pub fn include_event(&self) -> bool {
self.include_event
}
/// Export correlation state as a serializable snapshot.
/// Returns `None` if the engine is detection-only (no correlation state to persist).
pub fn export_state(&self) -> Option<CorrelationSnapshot> {
match &self.engine {
EngineVariant::DetectionOnly(_) => None,
EngineVariant::WithCorrelations(engine) => Some(engine.export_state()),
}
}
/// Import previously exported correlation state.
/// Returns `true` if the import succeeded, `false` if the snapshot version
/// is incompatible. No-op (returns `true`) if the engine is detection-only.
pub fn import_state(&mut self, snapshot: &CorrelationSnapshot) -> bool {
if let EngineVariant::WithCorrelations(engine) = &mut self.engine {
engine.import_state(snapshot.clone())
} else {
true
}
}
}
fn load_collection(path: &Path) -> Result<SigmaCollection, String> {
let collection = if path.is_dir() {
rsigma_parser::parse_sigma_directory(path)
.map_err(|e| format!("Error loading rules from {}: {e}", path.display()))?
} else {
rsigma_parser::parse_sigma_file(path)
.map_err(|e| format!("Error loading rule {}: {e}", path.display()))?
};
if !collection.errors.is_empty() {
tracing::warn!(
count = collection.errors.len(),
"Parse errors while loading rules"
);
}
Ok(collection)
}
/// Re-read and parse all pipeline files from disk, sorted by priority.
fn reload_pipelines(paths: &[PathBuf]) -> Result<Vec<Pipeline>, String> {
let mut pipelines = Vec::with_capacity(paths.len());
for path in paths {
let pipeline = parse_pipeline_file(path)
.map_err(|e| format!("Error reloading pipeline {}: {e}", path.display()))?;
pipelines.push(pipeline);
}
pipelines.sort_by_key(|p| p.priority);
Ok(pipelines)
}
/// Resolve dynamic sources in pipelines asynchronously.
async fn resolve_pipelines_async(
resolver: &Arc<dyn SourceResolver>,
pipelines: &[Pipeline],
allow_remote_include: bool,
) -> Result<Vec<Pipeline>, String> {
let mut resolved_pipelines = Vec::with_capacity(pipelines.len());
for pipeline in pipelines {
if pipeline.is_dynamic() {
let resolved_data = sources::resolve_all(resolver.as_ref(), &pipeline.sources)
.await
.map_err(|e| {
format!(
"Failed to resolve dynamic pipeline '{}': {e}",
pipeline.name
)
})?;
let mut expanded = TemplateExpander::expand(pipeline, &resolved_data);
sources::include::expand_includes(&mut expanded, &resolved_data, allow_remote_include)?;
resolved_pipelines.push(expanded);
} else {
resolved_pipelines.push(pipeline.clone());
}
}
Ok(resolved_pipelines)
}