1use std::path::{Path, PathBuf};
2use std::sync::Arc;
3
4use arc_swap::ArcSwap;
5use rsigma_eval::event::Event;
6use rsigma_eval::{
7 CorrelationConfig, CorrelationEngine, CorrelationSnapshot, Engine, Pipeline, ProcessResult,
8 RuleFieldSet, parse_pipeline_file,
9};
10use rsigma_parser::SigmaCollection;
11
12use crate::sources::{self, SourceResolver, TemplateExpander};
13
14pub struct RuntimeEngine {
17 engine: EngineVariant,
18 pipelines: Vec<Pipeline>,
19 pipeline_paths: Vec<PathBuf>,
20 rules_path: std::path::PathBuf,
21 corr_config: CorrelationConfig,
22 include_event: bool,
23 source_resolver: Option<Arc<dyn SourceResolver>>,
24 allow_remote_include: bool,
25 bloom_prefilter: bool,
28 bloom_max_bytes: Option<usize>,
31 #[cfg(feature = "daachorse-index")]
35 cross_rule_ac: bool,
36 rule_field_set: Arc<ArcSwap<RuleFieldSet>>,
41}
42
43enum EngineVariant {
44 DetectionOnly(Box<Engine>),
45 WithCorrelations(Box<CorrelationEngine>),
46}
47
48#[derive(Debug, Clone, Copy)]
50pub struct EngineStats {
51 pub detection_rules: usize,
52 pub correlation_rules: usize,
53 pub state_entries: usize,
54}
55
56impl RuntimeEngine {
57 pub fn new(
58 rules_path: std::path::PathBuf,
59 pipelines: Vec<Pipeline>,
60 corr_config: CorrelationConfig,
61 include_event: bool,
62 ) -> Self {
63 RuntimeEngine {
64 engine: EngineVariant::DetectionOnly(Box::new(Engine::new())),
65 pipelines,
66 pipeline_paths: Vec::new(),
67 rules_path,
68 corr_config,
69 include_event,
70 source_resolver: None,
71 allow_remote_include: false,
72 bloom_prefilter: false,
73 bloom_max_bytes: None,
74 #[cfg(feature = "daachorse-index")]
75 cross_rule_ac: false,
76 rule_field_set: Arc::new(ArcSwap::from_pointee(RuleFieldSet::default())),
77 }
78 }
79
80 pub fn rule_field_set(&self) -> Arc<RuleFieldSet> {
88 self.rule_field_set.load_full()
89 }
90
91 pub fn set_bloom_prefilter(&mut self, enabled: bool) {
95 self.bloom_prefilter = enabled;
96 }
97
98 pub fn bloom_prefilter(&self) -> bool {
103 self.bloom_prefilter
104 }
105
106 pub fn set_bloom_max_bytes(&mut self, max_bytes: usize) {
109 self.bloom_max_bytes = Some(max_bytes);
110 }
111
112 pub fn bloom_max_bytes(&self) -> Option<usize> {
114 self.bloom_max_bytes
115 }
116
117 #[cfg(feature = "daachorse-index")]
124 pub fn set_cross_rule_ac(&mut self, enabled: bool) {
125 self.cross_rule_ac = enabled;
126 }
127
128 #[cfg(feature = "daachorse-index")]
130 pub fn cross_rule_ac(&self) -> bool {
131 self.cross_rule_ac
132 }
133
134 pub fn set_source_resolver(&mut self, resolver: Arc<dyn SourceResolver>) {
139 self.source_resolver = Some(resolver);
140 }
141
142 pub fn source_resolver(&self) -> Option<&Arc<dyn SourceResolver>> {
144 self.source_resolver.as_ref()
145 }
146
147 pub fn set_allow_remote_include(&mut self, allow: bool) {
149 self.allow_remote_include = allow;
150 }
151
152 pub fn allow_remote_include(&self) -> bool {
154 self.allow_remote_include
155 }
156
157 pub fn set_pipeline_paths(&mut self, paths: Vec<PathBuf>) {
163 self.pipeline_paths = paths;
164 }
165
166 pub fn pipeline_paths(&self) -> &[PathBuf] {
168 &self.pipeline_paths
169 }
170
171 pub async fn resolve_dynamic_pipelines(&mut self) -> Result<(), String> {
177 let Some(resolver) = &self.source_resolver else {
178 return Ok(());
179 };
180
181 let mut resolved_pipelines = Vec::with_capacity(self.pipelines.len());
182 for pipeline in &self.pipelines {
183 if pipeline.is_dynamic() {
184 match sources::resolve_all(resolver.as_ref(), &pipeline.sources).await {
185 Ok(resolved_data) => {
186 let mut expanded = TemplateExpander::expand(pipeline, &resolved_data);
187 sources::include::expand_includes(
189 &mut expanded,
190 &resolved_data,
191 self.allow_remote_include,
192 )?;
193 resolved_pipelines.push(expanded);
194 }
195 Err(e) => {
196 return Err(format!(
197 "Failed to resolve dynamic pipeline '{}': {e}",
198 pipeline.name
199 ));
200 }
201 }
202 } else {
203 resolved_pipelines.push(pipeline.clone());
204 }
205 }
206 self.pipelines = resolved_pipelines;
207 Ok(())
208 }
209
210 pub fn load_rules(&mut self) -> Result<EngineStats, String> {
223 let load_span = tracing::info_span!("load_rules", rules_path = %self.rules_path.display());
224 let _enter = load_span.enter();
225 let load_start = std::time::Instant::now();
226
227 if !self.pipeline_paths.is_empty() {
228 self.pipelines = reload_pipelines(&self.pipeline_paths)?;
229 }
230
231 if self.source_resolver.is_some() && self.pipelines.iter().any(|p| p.is_dynamic()) {
240 let handle = tokio::runtime::Handle::try_current().map_err(|_| {
241 "Dynamic pipelines require a tokio runtime; refusing to load rules with \
242 unresolved sources"
243 .to_string()
244 })?;
245 let pipelines = std::mem::take(&mut self.pipelines);
246 let resolver = self.source_resolver.clone().unwrap();
247 let allow_remote = self.allow_remote_include;
248 let resolved = tokio::task::block_in_place(|| {
249 handle.block_on(async {
250 resolve_pipelines_async(&resolver, &pipelines, allow_remote).await
251 })
252 });
253 match resolved {
254 Ok(p) => self.pipelines = p,
255 Err(e) => {
256 self.pipelines = pipelines;
260 return Err(format!("Dynamic source resolution failed: {e}"));
261 }
262 }
263 }
264
265 let previous_state = self.export_state();
266 let collection = load_collection(&self.rules_path)?;
267 let has_correlations = !collection.correlations.is_empty();
268
269 if has_correlations {
270 let mut engine = CorrelationEngine::new(self.corr_config.clone());
271 engine.set_include_event(self.include_event);
272 if let Some(budget) = self.bloom_max_bytes {
273 engine.set_bloom_max_bytes(budget);
274 }
275 engine.set_bloom_prefilter(self.bloom_prefilter);
276 #[cfg(feature = "daachorse-index")]
277 engine.set_cross_rule_ac(self.cross_rule_ac);
278 for p in &self.pipelines {
279 engine.add_pipeline(p.clone());
280 }
281 engine
282 .add_collection(&collection)
283 .map_err(|e| format!("Error compiling rules: {e}"))?;
284
285 if let Some(snapshot) = previous_state {
286 engine.import_state(snapshot);
287 }
288
289 let stats = EngineStats {
290 detection_rules: engine.detection_rule_count(),
291 correlation_rules: engine.correlation_rule_count(),
292 state_entries: engine.state_count(),
293 };
294 self.engine = EngineVariant::WithCorrelations(Box::new(engine));
295 self.refresh_rule_field_set(&collection);
296 tracing::debug!(
297 detection_rules = stats.detection_rules,
298 correlation_rules = stats.correlation_rules,
299 duration_ms = load_start.elapsed().as_millis() as u64,
300 "Rule load complete",
301 );
302 Ok(stats)
303 } else {
304 let mut engine = Engine::new();
305 engine.set_include_event(self.include_event);
306 if let Some(budget) = self.bloom_max_bytes {
307 engine.set_bloom_max_bytes(budget);
308 }
309 engine.set_bloom_prefilter(self.bloom_prefilter);
310 #[cfg(feature = "daachorse-index")]
311 engine.set_cross_rule_ac(self.cross_rule_ac);
312 for p in &self.pipelines {
313 engine.add_pipeline(p.clone());
314 }
315 engine
316 .add_collection(&collection)
317 .map_err(|e| format!("Error compiling rules: {e}"))?;
318
319 let stats = EngineStats {
320 detection_rules: engine.rule_count(),
321 correlation_rules: 0,
322 state_entries: 0,
323 };
324 self.engine = EngineVariant::DetectionOnly(Box::new(engine));
325 self.refresh_rule_field_set(&collection);
326 tracing::debug!(
327 detection_rules = stats.detection_rules,
328 correlation_rules = stats.correlation_rules,
329 duration_ms = load_start.elapsed().as_millis() as u64,
330 "Rule load complete",
331 );
332 Ok(stats)
333 }
334 }
335
336 fn refresh_rule_field_set(&self, collection: &SigmaCollection) {
339 let field_set = RuleFieldSet::collect(collection, &self.pipelines, true);
340 self.rule_field_set.store(Arc::new(field_set));
341 }
342
343 pub fn process_batch<E: Event + Sync>(&mut self, events: &[&E]) -> Vec<ProcessResult> {
348 match &mut self.engine {
349 EngineVariant::DetectionOnly(engine) => engine.evaluate_batch(events),
350 EngineVariant::WithCorrelations(engine) => engine.process_batch(events),
351 }
352 }
353
354 pub fn stats(&self) -> EngineStats {
356 match &self.engine {
357 EngineVariant::DetectionOnly(engine) => EngineStats {
358 detection_rules: engine.rule_count(),
359 correlation_rules: 0,
360 state_entries: 0,
361 },
362 EngineVariant::WithCorrelations(engine) => EngineStats {
363 detection_rules: engine.detection_rule_count(),
364 correlation_rules: engine.correlation_rule_count(),
365 state_entries: engine.state_count(),
366 },
367 }
368 }
369
370 pub fn rules_path(&self) -> &Path {
372 &self.rules_path
373 }
374
375 pub fn pipelines(&self) -> &[Pipeline] {
377 &self.pipelines
378 }
379
380 pub fn corr_config(&self) -> &CorrelationConfig {
382 &self.corr_config
383 }
384
385 pub fn include_event(&self) -> bool {
387 self.include_event
388 }
389
390 pub fn export_state(&self) -> Option<CorrelationSnapshot> {
393 match &self.engine {
394 EngineVariant::DetectionOnly(_) => None,
395 EngineVariant::WithCorrelations(engine) => Some(engine.export_state()),
396 }
397 }
398
399 pub fn import_state(&mut self, snapshot: &CorrelationSnapshot) -> bool {
403 if let EngineVariant::WithCorrelations(engine) = &mut self.engine {
404 engine.import_state(snapshot.clone())
405 } else {
406 true
407 }
408 }
409}
410
411fn load_collection(path: &Path) -> Result<SigmaCollection, String> {
412 let collection = if path.is_dir() {
413 rsigma_parser::parse_sigma_directory(path)
414 .map_err(|e| format!("Error loading rules from {}: {e}", path.display()))?
415 } else {
416 rsigma_parser::parse_sigma_file(path)
417 .map_err(|e| format!("Error loading rule {}: {e}", path.display()))?
418 };
419
420 if !collection.errors.is_empty() {
421 tracing::warn!(
422 count = collection.errors.len(),
423 "Parse errors while loading rules"
424 );
425 for (i, err) in collection.errors.iter().take(3).enumerate() {
426 tracing::warn!(index = i + 1, error = %err, "Rule parse error detail");
427 }
428 }
429
430 Ok(collection)
431}
432
433fn reload_pipelines(paths: &[PathBuf]) -> Result<Vec<Pipeline>, String> {
443 let mut pipelines = Vec::with_capacity(paths.len());
444 for path in paths {
445 let pipeline = parse_pipeline_file(path)
446 .map_err(|e| format!("Error reloading pipeline {}: {e}", path.display()))?;
447 if !pipeline.sources.is_empty() {
448 crate::warn_pipeline_inline_sources(path, &pipeline.name);
449 }
450 pipelines.push(pipeline);
451 }
452 pipelines.sort_by_key(|p| p.priority);
453 Ok(pipelines)
454}
455
456async fn resolve_pipelines_async(
458 resolver: &Arc<dyn SourceResolver>,
459 pipelines: &[Pipeline],
460 allow_remote_include: bool,
461) -> Result<Vec<Pipeline>, String> {
462 let mut resolved_pipelines = Vec::with_capacity(pipelines.len());
463 for pipeline in pipelines {
464 if pipeline.is_dynamic() {
465 let resolved_data = sources::resolve_all(resolver.as_ref(), &pipeline.sources)
466 .await
467 .map_err(|e| {
468 format!(
469 "Failed to resolve dynamic pipeline '{}': {e}",
470 pipeline.name
471 )
472 })?;
473 let mut expanded = TemplateExpander::expand(pipeline, &resolved_data);
474 sources::include::expand_includes(&mut expanded, &resolved_data, allow_remote_include)?;
475 resolved_pipelines.push(expanded);
476 } else {
477 resolved_pipelines.push(pipeline.clone());
478 }
479 }
480 Ok(resolved_pipelines)
481}
482
483#[cfg(test)]
484mod tests {
485 use super::*;
486 use crate::pipeline_deprecation::reset_inline_sources_dedup_for_tests;
487 use std::sync::Mutex;
488
489 static DEDUP_TEST_GUARD: Mutex<()> = Mutex::new(());
494
495 const RULE_YAML: &str = r#"
496title: TestRule
497id: 11111111-1111-1111-1111-111111111111
498status: experimental
499logsource:
500 product: test
501detection:
502 selection:
503 EventID: 1
504 condition: selection
505"#;
506
507 const PIPELINE_WITH_SOURCES: &str = r#"
508name: legacy_pipeline_with_inline_sources
509priority: 50
510sources:
511 - id: threat_feed
512 type: file
513 path: /tmp/does-not-matter.json
514 format: json
515transformations:
516 - type: value_placeholders
517"#;
518
519 const PIPELINE_NO_SOURCES: &str = r#"
520name: simple_pipeline
521priority: 10
522transformations:
523 - id: rename
524 type: field_name_mapping
525 mapping:
526 EventID: event.id
527"#;
528
529 fn dedup_set_contains(path: &Path) -> bool {
530 let canonical = path.canonicalize().unwrap_or_else(|_| path.to_path_buf());
531 crate::pipeline_deprecation::tests_only_snapshot().contains(&canonical)
532 }
533
534 #[test]
535 fn load_rules_surfaces_inline_sources_deprecation_through_runtime() {
536 let _guard = DEDUP_TEST_GUARD.lock().unwrap();
537 reset_inline_sources_dedup_for_tests();
538
539 let dir = tempfile::tempdir().unwrap();
540 let rule_path = dir.path().join("rule.yml");
541 std::fs::write(&rule_path, RULE_YAML).unwrap();
542
543 let pipeline_path = dir.path().join("pipeline.yml");
544 std::fs::write(&pipeline_path, PIPELINE_WITH_SOURCES).unwrap();
545 let pipeline = parse_pipeline_file(&pipeline_path).unwrap();
546
547 let mut engine = RuntimeEngine::new(
548 rule_path,
549 vec![pipeline],
550 CorrelationConfig::default(),
551 false,
552 );
553 engine.set_pipeline_paths(vec![pipeline_path.clone()]);
554 engine.load_rules().unwrap();
555
556 assert!(
557 dedup_set_contains(&pipeline_path),
558 "RuntimeEngine::load_rules should route inline sources through \
559 warn_pipeline_inline_sources so the daemon hot-reload path \
560 covers the deprecation; the canonical pipeline path was not \
561 recorded in the dedup set."
562 );
563 }
564
565 #[test]
566 fn load_rules_does_not_warn_when_pipeline_has_no_inline_sources() {
567 let _guard = DEDUP_TEST_GUARD.lock().unwrap();
568 reset_inline_sources_dedup_for_tests();
569
570 let dir = tempfile::tempdir().unwrap();
571 let rule_path = dir.path().join("rule.yml");
572 std::fs::write(&rule_path, RULE_YAML).unwrap();
573
574 let pipeline_path = dir.path().join("clean.yml");
575 std::fs::write(&pipeline_path, PIPELINE_NO_SOURCES).unwrap();
576 let pipeline = parse_pipeline_file(&pipeline_path).unwrap();
577
578 let mut engine = RuntimeEngine::new(
579 rule_path,
580 vec![pipeline],
581 CorrelationConfig::default(),
582 false,
583 );
584 engine.set_pipeline_paths(vec![pipeline_path.clone()]);
585 engine.load_rules().unwrap();
586
587 assert!(
588 !dedup_set_contains(&pipeline_path),
589 "a pipeline without inline sources must not register in the \
590 deprecation dedup set."
591 );
592 }
593
594 #[test]
595 fn hot_reload_dedups_inline_sources_warning_for_same_pipeline_path() {
596 let _guard = DEDUP_TEST_GUARD.lock().unwrap();
597 reset_inline_sources_dedup_for_tests();
598
599 let dir = tempfile::tempdir().unwrap();
600 let rule_path = dir.path().join("rule.yml");
601 std::fs::write(&rule_path, RULE_YAML).unwrap();
602
603 let pipeline_path = dir.path().join("pipeline.yml");
604 std::fs::write(&pipeline_path, PIPELINE_WITH_SOURCES).unwrap();
605 let pipeline = parse_pipeline_file(&pipeline_path).unwrap();
606
607 let mut engine = RuntimeEngine::new(
608 rule_path,
609 vec![pipeline],
610 CorrelationConfig::default(),
611 false,
612 );
613 engine.set_pipeline_paths(vec![pipeline_path.clone()]);
614
615 engine.load_rules().unwrap();
617 assert!(dedup_set_contains(&pipeline_path));
618
619 let canonical = pipeline_path.canonicalize().unwrap();
624 let before = crate::pipeline_deprecation::tests_only_snapshot();
625 engine.load_rules().unwrap();
626 let after = crate::pipeline_deprecation::tests_only_snapshot();
627
628 assert_eq!(
629 before, after,
630 "second load_rules should not change the dedup set",
631 );
632 assert!(after.contains(&canonical));
633 }
634
635 #[tokio::test(flavor = "multi_thread")]
636 async fn load_rules_fails_closed_when_dynamic_source_resolution_fails() {
637 let dir = tempfile::tempdir().unwrap();
644 let rule_path = dir.path().join("rule.yml");
645 std::fs::write(&rule_path, RULE_YAML).unwrap();
646
647 let missing = dir.path().join("missing.json");
650 let pipeline_yaml = format!(
651 r#"
652name: dynamic_missing
653priority: 10
654sources:
655 - id: feed
656 type: file
657 path: {}
658 format: json
659 on_error: fail
660transformations:
661 - type: value_placeholders
662"#,
663 missing.display(),
664 );
665 let pipeline_path = dir.path().join("pipeline.yml");
666 std::fs::write(&pipeline_path, pipeline_yaml).unwrap();
667 let pipeline = parse_pipeline_file(&pipeline_path).unwrap();
668 assert!(
669 pipeline.is_dynamic(),
670 "fixture should produce a dynamic pipeline"
671 );
672
673 let mut engine = RuntimeEngine::new(
674 rule_path,
675 vec![pipeline],
676 CorrelationConfig::default(),
677 false,
678 );
679 engine.set_source_resolver(Arc::new(sources::DefaultSourceResolver::new()));
680
681 let err = engine
682 .load_rules()
683 .expect_err("missing source must cause load_rules to fail closed");
684 assert!(
685 err.contains("Dynamic source resolution failed"),
686 "error should explain the fail-closed path; got: {err}"
687 );
688 }
689}