1use std::path::{Path, PathBuf};
2use std::sync::Arc;
3
4use arc_swap::ArcSwap;
5use rsigma_eval::event::Event;
6use rsigma_eval::{
7 CorrelationConfig, CorrelationEngine, CorrelationSnapshot, Engine, Pipeline, ProcessResult,
8 RuleFieldSet, parse_pipeline_file,
9};
10use rsigma_parser::SigmaCollection;
11
12use crate::sources::{self, SourceResolver, TemplateExpander};
13
14pub struct RuntimeEngine {
17 engine: EngineVariant,
18 pipelines: Vec<Pipeline>,
19 pipeline_paths: Vec<PathBuf>,
20 rules_path: std::path::PathBuf,
21 corr_config: CorrelationConfig,
22 include_event: bool,
23 source_resolver: Option<Arc<dyn SourceResolver>>,
24 allow_remote_include: bool,
25 bloom_prefilter: bool,
28 bloom_max_bytes: Option<usize>,
31 #[cfg(feature = "daachorse-index")]
35 cross_rule_ac: bool,
36 rule_field_set: Arc<ArcSwap<RuleFieldSet>>,
41}
42
43enum EngineVariant {
44 DetectionOnly(Box<Engine>),
45 WithCorrelations(Box<CorrelationEngine>),
46}
47
48pub struct EngineStats {
50 pub detection_rules: usize,
51 pub correlation_rules: usize,
52 pub state_entries: usize,
53}
54
55impl RuntimeEngine {
56 pub fn new(
57 rules_path: std::path::PathBuf,
58 pipelines: Vec<Pipeline>,
59 corr_config: CorrelationConfig,
60 include_event: bool,
61 ) -> Self {
62 RuntimeEngine {
63 engine: EngineVariant::DetectionOnly(Box::new(Engine::new())),
64 pipelines,
65 pipeline_paths: Vec::new(),
66 rules_path,
67 corr_config,
68 include_event,
69 source_resolver: None,
70 allow_remote_include: false,
71 bloom_prefilter: false,
72 bloom_max_bytes: None,
73 #[cfg(feature = "daachorse-index")]
74 cross_rule_ac: false,
75 rule_field_set: Arc::new(ArcSwap::from_pointee(RuleFieldSet::default())),
76 }
77 }
78
79 pub fn rule_field_set(&self) -> Arc<RuleFieldSet> {
87 self.rule_field_set.load_full()
88 }
89
90 pub fn set_bloom_prefilter(&mut self, enabled: bool) {
94 self.bloom_prefilter = enabled;
95 }
96
97 pub fn set_bloom_max_bytes(&mut self, max_bytes: usize) {
100 self.bloom_max_bytes = Some(max_bytes);
101 }
102
103 #[cfg(feature = "daachorse-index")]
110 pub fn set_cross_rule_ac(&mut self, enabled: bool) {
111 self.cross_rule_ac = enabled;
112 }
113
114 pub fn set_source_resolver(&mut self, resolver: Arc<dyn SourceResolver>) {
119 self.source_resolver = Some(resolver);
120 }
121
122 pub fn source_resolver(&self) -> Option<&Arc<dyn SourceResolver>> {
124 self.source_resolver.as_ref()
125 }
126
127 pub fn set_allow_remote_include(&mut self, allow: bool) {
129 self.allow_remote_include = allow;
130 }
131
132 pub fn allow_remote_include(&self) -> bool {
134 self.allow_remote_include
135 }
136
137 pub fn set_pipeline_paths(&mut self, paths: Vec<PathBuf>) {
143 self.pipeline_paths = paths;
144 }
145
146 pub fn pipeline_paths(&self) -> &[PathBuf] {
148 &self.pipeline_paths
149 }
150
151 pub async fn resolve_dynamic_pipelines(&mut self) -> Result<(), String> {
157 let Some(resolver) = &self.source_resolver else {
158 return Ok(());
159 };
160
161 let mut resolved_pipelines = Vec::with_capacity(self.pipelines.len());
162 for pipeline in &self.pipelines {
163 if pipeline.is_dynamic() {
164 match sources::resolve_all(resolver.as_ref(), &pipeline.sources).await {
165 Ok(resolved_data) => {
166 let mut expanded = TemplateExpander::expand(pipeline, &resolved_data);
167 sources::include::expand_includes(
169 &mut expanded,
170 &resolved_data,
171 self.allow_remote_include,
172 )?;
173 resolved_pipelines.push(expanded);
174 }
175 Err(e) => {
176 return Err(format!(
177 "Failed to resolve dynamic pipeline '{}': {e}",
178 pipeline.name
179 ));
180 }
181 }
182 } else {
183 resolved_pipelines.push(pipeline.clone());
184 }
185 }
186 self.pipelines = resolved_pipelines;
187 Ok(())
188 }
189
190 pub fn load_rules(&mut self) -> Result<EngineStats, String> {
203 let load_span = tracing::info_span!("load_rules", rules_path = %self.rules_path.display());
204 let _enter = load_span.enter();
205 let load_start = std::time::Instant::now();
206
207 if !self.pipeline_paths.is_empty() {
208 self.pipelines = reload_pipelines(&self.pipeline_paths)?;
209 }
210
211 if self.source_resolver.is_some() && self.pipelines.iter().any(|p| p.is_dynamic()) {
213 if let Ok(handle) = tokio::runtime::Handle::try_current() {
214 let pipelines = std::mem::take(&mut self.pipelines);
215 let resolver = self.source_resolver.clone().unwrap();
216 let allow_remote = self.allow_remote_include;
217 let resolved = tokio::task::block_in_place(|| {
218 handle.block_on(async {
219 resolve_pipelines_async(&resolver, &pipelines, allow_remote).await
220 })
221 });
222 match resolved {
223 Ok(p) => self.pipelines = p,
224 Err(e) => {
225 self.pipelines = pipelines;
226 tracing::warn!(error = %e, "Dynamic source resolution failed, using unresolved pipelines");
227 }
228 }
229 } else {
230 tracing::warn!("No tokio runtime available for dynamic source resolution");
231 }
232 }
233
234 let previous_state = self.export_state();
235 let collection = load_collection(&self.rules_path)?;
236 let has_correlations = !collection.correlations.is_empty();
237
238 if has_correlations {
239 let mut engine = CorrelationEngine::new(self.corr_config.clone());
240 engine.set_include_event(self.include_event);
241 if let Some(budget) = self.bloom_max_bytes {
242 engine.set_bloom_max_bytes(budget);
243 }
244 engine.set_bloom_prefilter(self.bloom_prefilter);
245 #[cfg(feature = "daachorse-index")]
246 engine.set_cross_rule_ac(self.cross_rule_ac);
247 for p in &self.pipelines {
248 engine.add_pipeline(p.clone());
249 }
250 engine
251 .add_collection(&collection)
252 .map_err(|e| format!("Error compiling rules: {e}"))?;
253
254 if let Some(snapshot) = previous_state {
255 engine.import_state(snapshot);
256 }
257
258 let stats = EngineStats {
259 detection_rules: engine.detection_rule_count(),
260 correlation_rules: engine.correlation_rule_count(),
261 state_entries: engine.state_count(),
262 };
263 self.engine = EngineVariant::WithCorrelations(Box::new(engine));
264 self.refresh_rule_field_set(&collection);
265 tracing::debug!(
266 detection_rules = stats.detection_rules,
267 correlation_rules = stats.correlation_rules,
268 duration_ms = load_start.elapsed().as_millis() as u64,
269 "Rule load complete",
270 );
271 Ok(stats)
272 } else {
273 let mut engine = Engine::new();
274 engine.set_include_event(self.include_event);
275 if let Some(budget) = self.bloom_max_bytes {
276 engine.set_bloom_max_bytes(budget);
277 }
278 engine.set_bloom_prefilter(self.bloom_prefilter);
279 #[cfg(feature = "daachorse-index")]
280 engine.set_cross_rule_ac(self.cross_rule_ac);
281 for p in &self.pipelines {
282 engine.add_pipeline(p.clone());
283 }
284 engine
285 .add_collection(&collection)
286 .map_err(|e| format!("Error compiling rules: {e}"))?;
287
288 let stats = EngineStats {
289 detection_rules: engine.rule_count(),
290 correlation_rules: 0,
291 state_entries: 0,
292 };
293 self.engine = EngineVariant::DetectionOnly(Box::new(engine));
294 self.refresh_rule_field_set(&collection);
295 tracing::debug!(
296 detection_rules = stats.detection_rules,
297 correlation_rules = stats.correlation_rules,
298 duration_ms = load_start.elapsed().as_millis() as u64,
299 "Rule load complete",
300 );
301 Ok(stats)
302 }
303 }
304
305 fn refresh_rule_field_set(&self, collection: &SigmaCollection) {
308 let field_set = RuleFieldSet::collect(collection, &self.pipelines, true);
309 self.rule_field_set.store(Arc::new(field_set));
310 }
311
312 pub fn process_batch<E: Event + Sync>(&mut self, events: &[&E]) -> Vec<ProcessResult> {
317 match &mut self.engine {
318 EngineVariant::DetectionOnly(engine) => engine.evaluate_batch(events),
319 EngineVariant::WithCorrelations(engine) => engine.process_batch(events),
320 }
321 }
322
323 pub fn stats(&self) -> EngineStats {
325 match &self.engine {
326 EngineVariant::DetectionOnly(engine) => EngineStats {
327 detection_rules: engine.rule_count(),
328 correlation_rules: 0,
329 state_entries: 0,
330 },
331 EngineVariant::WithCorrelations(engine) => EngineStats {
332 detection_rules: engine.detection_rule_count(),
333 correlation_rules: engine.correlation_rule_count(),
334 state_entries: engine.state_count(),
335 },
336 }
337 }
338
339 pub fn rules_path(&self) -> &Path {
341 &self.rules_path
342 }
343
344 pub fn pipelines(&self) -> &[Pipeline] {
346 &self.pipelines
347 }
348
349 pub fn corr_config(&self) -> &CorrelationConfig {
351 &self.corr_config
352 }
353
354 pub fn include_event(&self) -> bool {
356 self.include_event
357 }
358
359 pub fn export_state(&self) -> Option<CorrelationSnapshot> {
362 match &self.engine {
363 EngineVariant::DetectionOnly(_) => None,
364 EngineVariant::WithCorrelations(engine) => Some(engine.export_state()),
365 }
366 }
367
368 pub fn import_state(&mut self, snapshot: &CorrelationSnapshot) -> bool {
372 if let EngineVariant::WithCorrelations(engine) = &mut self.engine {
373 engine.import_state(snapshot.clone())
374 } else {
375 true
376 }
377 }
378}
379
380fn load_collection(path: &Path) -> Result<SigmaCollection, String> {
381 let collection = if path.is_dir() {
382 rsigma_parser::parse_sigma_directory(path)
383 .map_err(|e| format!("Error loading rules from {}: {e}", path.display()))?
384 } else {
385 rsigma_parser::parse_sigma_file(path)
386 .map_err(|e| format!("Error loading rule {}: {e}", path.display()))?
387 };
388
389 if !collection.errors.is_empty() {
390 tracing::warn!(
391 count = collection.errors.len(),
392 "Parse errors while loading rules"
393 );
394 for (i, err) in collection.errors.iter().take(3).enumerate() {
395 tracing::warn!(index = i + 1, error = %err, "Rule parse error detail");
396 }
397 }
398
399 Ok(collection)
400}
401
402fn reload_pipelines(paths: &[PathBuf]) -> Result<Vec<Pipeline>, String> {
404 let mut pipelines = Vec::with_capacity(paths.len());
405 for path in paths {
406 let pipeline = parse_pipeline_file(path)
407 .map_err(|e| format!("Error reloading pipeline {}: {e}", path.display()))?;
408 pipelines.push(pipeline);
409 }
410 pipelines.sort_by_key(|p| p.priority);
411 Ok(pipelines)
412}
413
414async fn resolve_pipelines_async(
416 resolver: &Arc<dyn SourceResolver>,
417 pipelines: &[Pipeline],
418 allow_remote_include: bool,
419) -> Result<Vec<Pipeline>, String> {
420 let mut resolved_pipelines = Vec::with_capacity(pipelines.len());
421 for pipeline in pipelines {
422 if pipeline.is_dynamic() {
423 let resolved_data = sources::resolve_all(resolver.as_ref(), &pipeline.sources)
424 .await
425 .map_err(|e| {
426 format!(
427 "Failed to resolve dynamic pipeline '{}': {e}",
428 pipeline.name
429 )
430 })?;
431 let mut expanded = TemplateExpander::expand(pipeline, &resolved_data);
432 sources::include::expand_includes(&mut expanded, &resolved_data, allow_remote_include)?;
433 resolved_pipelines.push(expanded);
434 } else {
435 resolved_pipelines.push(pipeline.clone());
436 }
437 }
438 Ok(resolved_pipelines)
439}