1use std::path::{Path, PathBuf};
2use std::sync::Arc;
3
4use rsigma_eval::event::Event;
5use rsigma_eval::{
6 CorrelationConfig, CorrelationEngine, CorrelationSnapshot, Engine, Pipeline, ProcessResult,
7 parse_pipeline_file,
8};
9use rsigma_parser::SigmaCollection;
10
11use crate::sources::{self, SourceResolver, TemplateExpander};
12
13pub struct RuntimeEngine {
16 engine: EngineVariant,
17 pipelines: Vec<Pipeline>,
18 pipeline_paths: Vec<PathBuf>,
19 rules_path: std::path::PathBuf,
20 corr_config: CorrelationConfig,
21 include_event: bool,
22 source_resolver: Option<Arc<dyn SourceResolver>>,
23 allow_remote_include: bool,
24}
25
26enum EngineVariant {
27 DetectionOnly(Engine),
28 WithCorrelations(Box<CorrelationEngine>),
29}
30
31pub struct EngineStats {
33 pub detection_rules: usize,
34 pub correlation_rules: usize,
35 pub state_entries: usize,
36}
37
38impl RuntimeEngine {
39 pub fn new(
40 rules_path: std::path::PathBuf,
41 pipelines: Vec<Pipeline>,
42 corr_config: CorrelationConfig,
43 include_event: bool,
44 ) -> Self {
45 RuntimeEngine {
46 engine: EngineVariant::DetectionOnly(Engine::new()),
47 pipelines,
48 pipeline_paths: Vec::new(),
49 rules_path,
50 corr_config,
51 include_event,
52 source_resolver: None,
53 allow_remote_include: false,
54 }
55 }
56
57 pub fn set_source_resolver(&mut self, resolver: Arc<dyn SourceResolver>) {
62 self.source_resolver = Some(resolver);
63 }
64
65 pub fn source_resolver(&self) -> Option<&Arc<dyn SourceResolver>> {
67 self.source_resolver.as_ref()
68 }
69
70 pub fn set_allow_remote_include(&mut self, allow: bool) {
72 self.allow_remote_include = allow;
73 }
74
75 pub fn allow_remote_include(&self) -> bool {
77 self.allow_remote_include
78 }
79
80 pub fn set_pipeline_paths(&mut self, paths: Vec<PathBuf>) {
86 self.pipeline_paths = paths;
87 }
88
89 pub fn pipeline_paths(&self) -> &[PathBuf] {
91 &self.pipeline_paths
92 }
93
94 pub async fn resolve_dynamic_pipelines(&mut self) -> Result<(), String> {
100 let Some(resolver) = &self.source_resolver else {
101 return Ok(());
102 };
103
104 let mut resolved_pipelines = Vec::with_capacity(self.pipelines.len());
105 for pipeline in &self.pipelines {
106 if pipeline.is_dynamic() {
107 match sources::resolve_all(resolver.as_ref(), &pipeline.sources).await {
108 Ok(resolved_data) => {
109 let mut expanded = TemplateExpander::expand(pipeline, &resolved_data);
110 sources::include::expand_includes(
112 &mut expanded,
113 &resolved_data,
114 self.allow_remote_include,
115 )?;
116 resolved_pipelines.push(expanded);
117 }
118 Err(e) => {
119 return Err(format!(
120 "Failed to resolve dynamic pipeline '{}': {e}",
121 pipeline.name
122 ));
123 }
124 }
125 } else {
126 resolved_pipelines.push(pipeline.clone());
127 }
128 }
129 self.pipelines = resolved_pipelines;
130 Ok(())
131 }
132
133 pub fn load_rules(&mut self) -> Result<EngineStats, String> {
146 if !self.pipeline_paths.is_empty() {
147 self.pipelines = reload_pipelines(&self.pipeline_paths)?;
148 }
149
150 if self.source_resolver.is_some() && self.pipelines.iter().any(|p| p.is_dynamic()) {
152 if let Ok(handle) = tokio::runtime::Handle::try_current() {
153 let pipelines = std::mem::take(&mut self.pipelines);
154 let resolver = self.source_resolver.clone().unwrap();
155 let allow_remote = self.allow_remote_include;
156 let resolved = tokio::task::block_in_place(|| {
157 handle.block_on(async {
158 resolve_pipelines_async(&resolver, &pipelines, allow_remote).await
159 })
160 });
161 match resolved {
162 Ok(p) => self.pipelines = p,
163 Err(e) => {
164 self.pipelines = pipelines;
165 tracing::warn!(error = %e, "Dynamic source resolution failed, using unresolved pipelines");
166 }
167 }
168 } else {
169 tracing::warn!("No tokio runtime available for dynamic source resolution");
170 }
171 }
172
173 let previous_state = self.export_state();
174 let collection = load_collection(&self.rules_path)?;
175 let has_correlations = !collection.correlations.is_empty();
176
177 if has_correlations {
178 let mut engine = CorrelationEngine::new(self.corr_config.clone());
179 engine.set_include_event(self.include_event);
180 for p in &self.pipelines {
181 engine.add_pipeline(p.clone());
182 }
183 engine
184 .add_collection(&collection)
185 .map_err(|e| format!("Error compiling rules: {e}"))?;
186
187 if let Some(snapshot) = previous_state {
188 engine.import_state(snapshot);
189 }
190
191 let stats = EngineStats {
192 detection_rules: engine.detection_rule_count(),
193 correlation_rules: engine.correlation_rule_count(),
194 state_entries: engine.state_count(),
195 };
196 self.engine = EngineVariant::WithCorrelations(Box::new(engine));
197 Ok(stats)
198 } else {
199 let mut engine = Engine::new();
200 engine.set_include_event(self.include_event);
201 for p in &self.pipelines {
202 engine.add_pipeline(p.clone());
203 }
204 engine
205 .add_collection(&collection)
206 .map_err(|e| format!("Error compiling rules: {e}"))?;
207
208 let stats = EngineStats {
209 detection_rules: engine.rule_count(),
210 correlation_rules: 0,
211 state_entries: 0,
212 };
213 self.engine = EngineVariant::DetectionOnly(engine);
214 Ok(stats)
215 }
216 }
217
218 pub fn process_batch<E: Event + Sync>(&mut self, events: &[&E]) -> Vec<ProcessResult> {
223 match &mut self.engine {
224 EngineVariant::DetectionOnly(engine) => {
225 let batch_detections = engine.evaluate_batch(events);
226 batch_detections
227 .into_iter()
228 .map(|detections| ProcessResult {
229 detections,
230 correlations: vec![],
231 })
232 .collect()
233 }
234 EngineVariant::WithCorrelations(engine) => engine.process_batch(events),
235 }
236 }
237
238 pub fn stats(&self) -> EngineStats {
240 match &self.engine {
241 EngineVariant::DetectionOnly(engine) => EngineStats {
242 detection_rules: engine.rule_count(),
243 correlation_rules: 0,
244 state_entries: 0,
245 },
246 EngineVariant::WithCorrelations(engine) => EngineStats {
247 detection_rules: engine.detection_rule_count(),
248 correlation_rules: engine.correlation_rule_count(),
249 state_entries: engine.state_count(),
250 },
251 }
252 }
253
254 pub fn rules_path(&self) -> &Path {
256 &self.rules_path
257 }
258
259 pub fn pipelines(&self) -> &[Pipeline] {
261 &self.pipelines
262 }
263
264 pub fn corr_config(&self) -> &CorrelationConfig {
266 &self.corr_config
267 }
268
269 pub fn include_event(&self) -> bool {
271 self.include_event
272 }
273
274 pub fn export_state(&self) -> Option<CorrelationSnapshot> {
277 match &self.engine {
278 EngineVariant::DetectionOnly(_) => None,
279 EngineVariant::WithCorrelations(engine) => Some(engine.export_state()),
280 }
281 }
282
283 pub fn import_state(&mut self, snapshot: &CorrelationSnapshot) -> bool {
287 if let EngineVariant::WithCorrelations(engine) = &mut self.engine {
288 engine.import_state(snapshot.clone())
289 } else {
290 true
291 }
292 }
293}
294
295fn load_collection(path: &Path) -> Result<SigmaCollection, String> {
296 let collection = if path.is_dir() {
297 rsigma_parser::parse_sigma_directory(path)
298 .map_err(|e| format!("Error loading rules from {}: {e}", path.display()))?
299 } else {
300 rsigma_parser::parse_sigma_file(path)
301 .map_err(|e| format!("Error loading rule {}: {e}", path.display()))?
302 };
303
304 if !collection.errors.is_empty() {
305 tracing::warn!(
306 count = collection.errors.len(),
307 "Parse errors while loading rules"
308 );
309 }
310
311 Ok(collection)
312}
313
314fn reload_pipelines(paths: &[PathBuf]) -> Result<Vec<Pipeline>, String> {
316 let mut pipelines = Vec::with_capacity(paths.len());
317 for path in paths {
318 let pipeline = parse_pipeline_file(path)
319 .map_err(|e| format!("Error reloading pipeline {}: {e}", path.display()))?;
320 pipelines.push(pipeline);
321 }
322 pipelines.sort_by_key(|p| p.priority);
323 Ok(pipelines)
324}
325
326async fn resolve_pipelines_async(
328 resolver: &Arc<dyn SourceResolver>,
329 pipelines: &[Pipeline],
330 allow_remote_include: bool,
331) -> Result<Vec<Pipeline>, String> {
332 let mut resolved_pipelines = Vec::with_capacity(pipelines.len());
333 for pipeline in pipelines {
334 if pipeline.is_dynamic() {
335 let resolved_data = sources::resolve_all(resolver.as_ref(), &pipeline.sources)
336 .await
337 .map_err(|e| {
338 format!(
339 "Failed to resolve dynamic pipeline '{}': {e}",
340 pipeline.name
341 )
342 })?;
343 let mut expanded = TemplateExpander::expand(pipeline, &resolved_data);
344 sources::include::expand_includes(&mut expanded, &resolved_data, allow_remote_include)?;
345 resolved_pipelines.push(expanded);
346 } else {
347 resolved_pipelines.push(pipeline.clone());
348 }
349 }
350 Ok(resolved_pipelines)
351}