Skip to main content

rsigma_runtime/
engine.rs

1use std::path::Path;
2
3use rsigma_eval::event::Event;
4use rsigma_eval::{
5    CorrelationConfig, CorrelationEngine, CorrelationSnapshot, Engine, Pipeline, ProcessResult,
6};
7use rsigma_parser::SigmaCollection;
8
9/// Wraps a CorrelationEngine (or a plain Engine) and provides the interface
10/// the runtime needs: process events, reload rules, and query state.
11pub struct RuntimeEngine {
12    engine: EngineVariant,
13    pipelines: Vec<Pipeline>,
14    rules_path: std::path::PathBuf,
15    corr_config: CorrelationConfig,
16    include_event: bool,
17}
18
19enum EngineVariant {
20    DetectionOnly(Engine),
21    WithCorrelations(Box<CorrelationEngine>),
22}
23
24/// Summary statistics about the loaded engine state.
25pub struct EngineStats {
26    pub detection_rules: usize,
27    pub correlation_rules: usize,
28    pub state_entries: usize,
29}
30
31impl RuntimeEngine {
32    pub fn new(
33        rules_path: std::path::PathBuf,
34        pipelines: Vec<Pipeline>,
35        corr_config: CorrelationConfig,
36        include_event: bool,
37    ) -> Self {
38        RuntimeEngine {
39            engine: EngineVariant::DetectionOnly(Engine::new()),
40            pipelines,
41            rules_path,
42            corr_config,
43            include_event,
44        }
45    }
46
47    /// Load (or reload) rules from the configured path.
48    ///
49    /// On reload, correlation state is exported before replacing the engine
50    /// and re-imported after, so in-flight windows and suppression state
51    /// survive rule changes (entries for removed correlations are dropped).
52    pub fn load_rules(&mut self) -> Result<EngineStats, String> {
53        let previous_state = self.export_state();
54        let collection = load_collection(&self.rules_path)?;
55        let has_correlations = !collection.correlations.is_empty();
56
57        if has_correlations {
58            let mut engine = CorrelationEngine::new(self.corr_config.clone());
59            engine.set_include_event(self.include_event);
60            for p in &self.pipelines {
61                engine.add_pipeline(p.clone());
62            }
63            engine
64                .add_collection(&collection)
65                .map_err(|e| format!("Error compiling rules: {e}"))?;
66
67            if let Some(snapshot) = previous_state {
68                engine.import_state(snapshot);
69            }
70
71            let stats = EngineStats {
72                detection_rules: engine.detection_rule_count(),
73                correlation_rules: engine.correlation_rule_count(),
74                state_entries: engine.state_count(),
75            };
76            self.engine = EngineVariant::WithCorrelations(Box::new(engine));
77            Ok(stats)
78        } else {
79            let mut engine = Engine::new();
80            engine.set_include_event(self.include_event);
81            for p in &self.pipelines {
82                engine.add_pipeline(p.clone());
83            }
84            engine
85                .add_collection(&collection)
86                .map_err(|e| format!("Error compiling rules: {e}"))?;
87
88            let stats = EngineStats {
89                detection_rules: engine.rule_count(),
90                correlation_rules: 0,
91                state_entries: 0,
92            };
93            self.engine = EngineVariant::DetectionOnly(engine);
94            Ok(stats)
95        }
96    }
97
98    /// Process a batch of events using parallel detection + sequential correlation.
99    ///
100    /// Delegates to `Engine::evaluate_batch` or `CorrelationEngine::process_batch`
101    /// depending on whether correlation rules are loaded.
102    pub fn process_batch<E: Event + Sync>(&mut self, events: &[&E]) -> Vec<ProcessResult> {
103        match &mut self.engine {
104            EngineVariant::DetectionOnly(engine) => {
105                let batch_detections = engine.evaluate_batch(events);
106                batch_detections
107                    .into_iter()
108                    .map(|detections| ProcessResult {
109                        detections,
110                        correlations: vec![],
111                    })
112                    .collect()
113            }
114            EngineVariant::WithCorrelations(engine) => engine.process_batch(events),
115        }
116    }
117
118    /// Return summary statistics about the current engine state.
119    pub fn stats(&self) -> EngineStats {
120        match &self.engine {
121            EngineVariant::DetectionOnly(engine) => EngineStats {
122                detection_rules: engine.rule_count(),
123                correlation_rules: 0,
124                state_entries: 0,
125            },
126            EngineVariant::WithCorrelations(engine) => EngineStats {
127                detection_rules: engine.detection_rule_count(),
128                correlation_rules: engine.correlation_rule_count(),
129                state_entries: engine.state_count(),
130            },
131        }
132    }
133
134    /// Return the path from which rules are loaded.
135    pub fn rules_path(&self) -> &Path {
136        &self.rules_path
137    }
138
139    /// Return the configured processing pipelines.
140    pub fn pipelines(&self) -> &[Pipeline] {
141        &self.pipelines
142    }
143
144    /// Return the correlation configuration.
145    pub fn corr_config(&self) -> &CorrelationConfig {
146        &self.corr_config
147    }
148
149    /// Whether detection results include the matched event.
150    pub fn include_event(&self) -> bool {
151        self.include_event
152    }
153
154    /// Export correlation state as a serializable snapshot.
155    /// Returns `None` if the engine is detection-only (no correlation state to persist).
156    pub fn export_state(&self) -> Option<CorrelationSnapshot> {
157        match &self.engine {
158            EngineVariant::DetectionOnly(_) => None,
159            EngineVariant::WithCorrelations(engine) => Some(engine.export_state()),
160        }
161    }
162
163    /// Import previously exported correlation state.
164    /// Returns `true` if the import succeeded, `false` if the snapshot version
165    /// is incompatible. No-op (returns `true`) if the engine is detection-only.
166    pub fn import_state(&mut self, snapshot: &CorrelationSnapshot) -> bool {
167        if let EngineVariant::WithCorrelations(engine) = &mut self.engine {
168            engine.import_state(snapshot.clone())
169        } else {
170            true
171        }
172    }
173}
174
175fn load_collection(path: &Path) -> Result<SigmaCollection, String> {
176    let collection = if path.is_dir() {
177        rsigma_parser::parse_sigma_directory(path)
178            .map_err(|e| format!("Error loading rules from {}: {e}", path.display()))?
179    } else {
180        rsigma_parser::parse_sigma_file(path)
181            .map_err(|e| format!("Error loading rule {}: {e}", path.display()))?
182    };
183
184    if !collection.errors.is_empty() {
185        tracing::warn!(
186            count = collection.errors.len(),
187            "Parse errors while loading rules"
188        );
189    }
190
191    Ok(collection)
192}