Skip to main content

rsigma_runtime/
processor.rs

1use std::sync::Arc;
2
3use parking_lot::Mutex;
4use std::time::Instant;
5
6use arc_swap::ArcSwap;
7use rsigma_eval::{
8    Event, FieldObserver, JsonEvent, ProcessResult, ProcessResultExt, RuleFieldSet, SchemaObserver,
9};
10
11use crate::engine::RuntimeEngine;
12use crate::input::{EventInputDecoded, InputFormat, parse_line};
13use crate::metrics::MetricsHook;
14use crate::tap::{TapPayload, TapRegistry, TapStage};
15
16/// Closure that extracts multiple payloads from a single JSON value.
17///
18/// Used by the daemon's event filter (e.g. jq/jsonpath) to explode a JSON
19/// object into sub-events (e.g. `.records[]`). Only applies to JSON input.
20pub type EventFilter = dyn Fn(&serde_json::Value) -> Vec<serde_json::Value>;
21
22/// Thread-safe handle to the engine, swappable atomically for hot-reload.
23///
24/// Uses `ArcSwap<Mutex<RuntimeEngine>>` so that:
25/// - Detection + correlation processing can acquire `&mut RuntimeEngine` via
26///   the inner `Mutex`.
27/// - Hot-reload swaps the entire engine atomically without blocking in-flight
28///   batches (they hold an `Arc` to the old engine until their batch completes).
29pub struct LogProcessor {
30    engine: Arc<ArcSwap<Mutex<RuntimeEngine>>>,
31    metrics: Arc<dyn MetricsHook>,
32    /// Optional opt-in field observer. When `Some`, every parsed event
33    /// flowing through `process_batch_with_format` has its field keys
34    /// recorded. When `None`, the batch path skips iteration entirely
35    /// so the hot path stays untouched.
36    field_observer: ArcSwap<Option<Arc<FieldObserver>>>,
37    /// Optional opt-in schema observer. When `Some`, every parsed event
38    /// flowing through `process_batch_with_format` is classified and tallied
39    /// per schema. When `None`, the batch path skips classification entirely
40    /// so the hot path stays untouched.
41    schema_observer: ArcSwap<Option<Arc<SchemaObserver>>>,
42    /// Optional live event tap. When `Some`, `process_batch_with_format`
43    /// offers raw lines and/or decoded events to any active capture
44    /// sessions with a non-blocking `try_send`. When `None` (or when no
45    /// session is active), the hot path performs one `ArcSwap` load and
46    /// skips capture entirely.
47    event_tap: ArcSwap<Option<Arc<TapRegistry>>>,
48}
49
50impl LogProcessor {
51    /// Create a new processor wrapping the given engine and metrics hook.
52    pub fn new(engine: RuntimeEngine, metrics: Arc<dyn MetricsHook>) -> Self {
53        LogProcessor {
54            engine: Arc::new(ArcSwap::from_pointee(Mutex::new(engine))),
55            metrics,
56            field_observer: ArcSwap::new(Arc::new(None)),
57            schema_observer: ArcSwap::new(Arc::new(None)),
58            event_tap: ArcSwap::new(Arc::new(None)),
59        }
60    }
61
62    /// Attach (or detach) the opt-in field observer.
63    ///
64    /// When set, [`process_batch_with_format`](Self::process_batch_with_format)
65    /// records each parsed event's field keys before evaluation. Pass
66    /// `None` to disable observation; the hot path then performs zero
67    /// extra work. Safe to call at runtime: the swap is wait-free, and
68    /// in-flight batches finish against whichever observer they read.
69    pub fn set_field_observer(&self, observer: Option<Arc<FieldObserver>>) {
70        self.field_observer.store(Arc::new(observer));
71    }
72
73    /// Return the currently-attached field observer, if any.
74    pub fn field_observer(&self) -> Option<Arc<FieldObserver>> {
75        self.field_observer.load_full().as_ref().clone()
76    }
77
78    /// Attach (or detach) the opt-in schema observer.
79    ///
80    /// When set, [`process_batch_with_format`](Self::process_batch_with_format)
81    /// classifies each parsed event before evaluation. Pass `None` to disable
82    /// classification; the hot path then performs zero extra work. Safe to
83    /// call at runtime: the swap is wait-free, and in-flight batches finish
84    /// against whichever observer they read.
85    pub fn set_schema_observer(&self, observer: Option<Arc<SchemaObserver>>) {
86        self.schema_observer.store(Arc::new(observer));
87    }
88
89    /// Return the currently-attached schema observer, if any.
90    pub fn schema_observer(&self) -> Option<Arc<SchemaObserver>> {
91        self.schema_observer.load_full().as_ref().clone()
92    }
93
94    /// Attach (or detach) the live event tap.
95    ///
96    /// When set, [`process_batch_with_format`](Self::process_batch_with_format)
97    /// offers raw lines and decoded events to the registry's active capture
98    /// sessions. Pass `None` to disable the tap; the hot path then performs a
99    /// single `ArcSwap` load and skips capture. Safe to call at runtime: the
100    /// swap is wait-free, and in-flight batches finish against whichever
101    /// registry they read.
102    pub fn set_event_tap(&self, tap: Option<Arc<TapRegistry>>) {
103        self.event_tap.store(Arc::new(tap));
104    }
105
106    /// Return the currently-attached event tap registry, if any.
107    pub fn event_tap(&self) -> Option<Arc<TapRegistry>> {
108        self.event_tap.load_full().as_ref().clone()
109    }
110
111    /// Atomically replace the engine with a new one.
112    ///
113    /// In-flight batches continue against the old engine (they hold an `Arc`
114    /// snapshot). New batches see the replacement on their next call to
115    /// `process_batch_lines`.
116    pub fn swap_engine(&self, new_engine: RuntimeEngine) {
117        self.engine.store(Arc::new(Mutex::new(new_engine)));
118    }
119
120    /// Load a snapshot of the current engine for use during reload.
121    ///
122    /// The caller can lock the returned guard to export state, build a new
123    /// engine, import state, and then call `swap_engine`.
124    pub fn engine_snapshot(&self) -> arc_swap::Guard<Arc<Mutex<RuntimeEngine>>> {
125        self.engine.load()
126    }
127
128    /// Process a batch of raw input lines through the engine.
129    ///
130    /// 1. Parses each line as JSON; on error, increments parse error metrics.
131    /// 2. Applies the `event_filter` closure to extract payloads.
132    /// 3. Evaluates all payloads via `RuntimeEngine::process_batch`.
133    /// 4. Merges per-payload results back into per-line results.
134    /// 5. Updates metrics (events processed, latency, match counts).
135    ///
136    /// Returns one `ProcessResult` per input line.
137    pub fn process_batch_lines(
138        &self,
139        batch: &[String],
140        event_filter: &EventFilter,
141    ) -> Vec<ProcessResult> {
142        let engine_guard = self.engine.load();
143        let mut engine = engine_guard.lock();
144
145        // Phase 1: Parse JSON and apply event filters, tracking line origin.
146        let mut parsed: Vec<(usize, Vec<serde_json::Value>)> = Vec::with_capacity(batch.len());
147        for (line_idx, line) in batch.iter().enumerate() {
148            match serde_json::from_str::<serde_json::Value>(line) {
149                Ok(value) => {
150                    let payloads = event_filter(&value);
151                    if !payloads.is_empty() {
152                        parsed.push((line_idx, payloads));
153                    }
154                }
155                Err(e) => {
156                    self.metrics.on_parse_error();
157                    tracing::debug!(error = %e, "Invalid JSON on input");
158                }
159            }
160        }
161
162        // Flatten: (line_idx, &Value) for each payload across all lines
163        let mut flat: Vec<(usize, &serde_json::Value)> = Vec::new();
164        for (line_idx, payloads) in &parsed {
165            for payload in payloads {
166                flat.push((*line_idx, payload));
167            }
168        }
169
170        if flat.is_empty() {
171            return empty_results(batch.len());
172        }
173
174        // Phase 2: Batch evaluation — parallel detection + sequential correlation
175        let events: Vec<JsonEvent> = flat.iter().map(|(_, v)| JsonEvent::borrow(v)).collect();
176        let event_refs: Vec<&JsonEvent> = events.iter().collect();
177
178        let start = Instant::now();
179        let batch_results = engine.process_batch(&event_refs);
180        let elapsed = start.elapsed().as_secs_f64();
181        let per_event_latency = elapsed / event_refs.len() as f64;
182
183        // Update correlation state metrics while we still hold the lock
184        let stats = engine.stats();
185        self.metrics
186            .set_correlation_state_entries(stats.state_entries as u64);
187
188        // Phase 3: Merge results per input line and update metrics
189        let mut line_results = empty_results(batch.len());
190
191        for ((line_idx, _), result) in flat.iter().zip(batch_results) {
192            self.metrics.on_events_processed(1);
193            self.metrics.observe_processing_latency(per_event_latency);
194            self.metrics
195                .on_detection_matches(result.detection_count() as u64);
196            self.metrics
197                .on_correlation_matches(result.correlation_count() as u64);
198
199            for r in &result {
200                let level_str = r.header.level.as_ref().map_or("unknown", |l| l.as_str());
201                let title = &r.header.rule_title;
202                match &r.body {
203                    rsigma_eval::ResultBody::Detection(_) => {
204                        self.metrics.on_detection_match_detail(title, level_str);
205                    }
206                    rsigma_eval::ResultBody::Correlation(body) => {
207                        self.metrics.on_correlation_match_detail(
208                            title,
209                            level_str,
210                            body.correlation_type.as_str(),
211                        );
212                    }
213                }
214            }
215
216            line_results[*line_idx].extend(result);
217        }
218
219        line_results
220    }
221
222    /// Process a batch of raw input lines using the specified input format.
223    ///
224    /// Unlike [`process_batch_lines`](Self::process_batch_lines), this method
225    /// supports all input formats (JSON, syslog, plain, logfmt, CEF). The
226    /// `event_filter` only applies to JSON-decoded events (it extracts multiple
227    /// payloads from one JSON object, e.g. a `records[]` array). Non-JSON
228    /// formats produce exactly one event per line.
229    ///
230    /// Returns one `ProcessResult` per input line.
231    pub fn process_batch_with_format(
232        &self,
233        batch: &[String],
234        format: &InputFormat,
235        event_filter: Option<&EventFilter>,
236    ) -> Vec<ProcessResult> {
237        let engine_guard = self.engine.load();
238        let mut engine = engine_guard.lock();
239
240        // Live event tap: load the active-session snapshot once for the whole
241        // batch. Cheap when disabled (one `ArcSwap` load plus an `Option`
242        // check); the guard is held through the line and event loops below.
243        let tap_guard = self.event_tap.load();
244        let tap_sessions = tap_guard
245            .as_ref()
246            .as_ref()
247            .map(|reg| reg.sessions_snapshot());
248        let tap_has_raw = tap_sessions
249            .as_ref()
250            .is_some_and(|s| s.iter().any(|x| x.stage == TapStage::Raw));
251        let tap_has_decoded = tap_sessions
252            .as_ref()
253            .is_some_and(|s| s.iter().any(|x| x.stage == TapStage::Decoded));
254
255        // Phase 1: Parse each line into decoded events, tracking line origin.
256        // For JSON with an event_filter, one line can produce multiple events.
257        let mut decoded_events: Vec<(usize, EventInputDecoded)> = Vec::with_capacity(batch.len());
258
259        for (line_idx, line) in batch.iter().enumerate() {
260            // Raw-stage tap runs before parsing so a non-redacting raw capture
261            // records every non-empty line, including ones that fail to parse.
262            if tap_has_raw
263                && !line.trim().is_empty()
264                && let Some(sessions) = tap_sessions.as_ref()
265            {
266                for s in sessions.iter().filter(|s| s.stage == TapStage::Raw) {
267                    s.offer(TapPayload::Raw(line.clone()));
268                }
269            }
270
271            let Some(decoded) = parse_line(line, format) else {
272                if !line.trim().is_empty() {
273                    self.metrics.on_parse_error();
274                    tracing::debug!("Failed to parse input line");
275                }
276                continue;
277            };
278
279            // For JSON events with an event filter, apply the filter which
280            // may produce multiple payloads (e.g. `.records[]`).
281            if let Some(filter) = event_filter
282                && let EventInputDecoded::Json(ref json_event) = decoded
283            {
284                let json_value = json_event.to_json();
285                let payloads = filter(&json_value);
286                for payload in payloads {
287                    decoded_events
288                        .push((line_idx, EventInputDecoded::Json(JsonEvent::owned(payload))));
289                }
290                continue;
291            }
292
293            decoded_events.push((line_idx, decoded));
294        }
295
296        if decoded_events.is_empty() {
297            return empty_results(batch.len());
298        }
299
300        // Optional opt-in field observation. Cheap when disabled: one
301        // hazard-pointer `Guard` (no Arc clone) plus an `Option` check.
302        // When enabled, walks each decoded event's field keys before
303        // evaluation. The Guard's lifetime extends through the loop so
304        // the observer cannot be dropped mid-batch even if the daemon
305        // detaches it concurrently.
306        let observer_guard = self.field_observer.load();
307        if let Some(observer) = observer_guard.as_ref() {
308            for (_, decoded) in &decoded_events {
309                observer.observe(decoded);
310            }
311        }
312        drop(observer_guard);
313
314        // Optional opt-in schema observation, with the same cheap-when-disabled
315        // discipline as the field observer above.
316        let schema_guard = self.schema_observer.load();
317        if let Some(observer) = schema_guard.as_ref() {
318            for (_, decoded) in &decoded_events {
319                observer.observe(decoded);
320            }
321        }
322        drop(schema_guard);
323
324        // Decoded-stage tap: offer each decoded event (post-parse,
325        // post-event-filter) to active decoded-stage sessions. The event is
326        // serialized to JSON only when at least one decoded session is active.
327        if tap_has_decoded && let Some(sessions) = tap_sessions.as_ref() {
328            for (_, decoded) in &decoded_events {
329                let value = decoded.to_json();
330                for s in sessions.iter().filter(|s| s.stage == TapStage::Decoded) {
331                    s.offer(TapPayload::Decoded(Box::new(value.clone())));
332                }
333            }
334        }
335
336        // Phase 2: Batch evaluation — parallel detection + sequential correlation
337        let event_refs: Vec<&EventInputDecoded> = decoded_events.iter().map(|(_, e)| e).collect();
338
339        let start = Instant::now();
340        let batch_results = engine.process_batch(&event_refs);
341        let elapsed = start.elapsed().as_secs_f64();
342        let per_event_latency = elapsed / event_refs.len() as f64;
343
344        let stats = engine.stats();
345        self.metrics
346            .set_correlation_state_entries(stats.state_entries as u64);
347
348        // Phase 3: Merge results per input line and update metrics
349        let mut line_results = empty_results(batch.len());
350
351        for ((line_idx, _), result) in decoded_events.iter().zip(batch_results) {
352            self.metrics.on_events_processed(1);
353            self.metrics.observe_processing_latency(per_event_latency);
354            self.metrics
355                .on_detection_matches(result.detection_count() as u64);
356            self.metrics
357                .on_correlation_matches(result.correlation_count() as u64);
358
359            for r in &result {
360                let level_str = r.header.level.as_ref().map_or("unknown", |l| l.as_str());
361                let title = &r.header.rule_title;
362                match &r.body {
363                    rsigma_eval::ResultBody::Detection(_) => {
364                        self.metrics.on_detection_match_detail(title, level_str);
365                    }
366                    rsigma_eval::ResultBody::Correlation(body) => {
367                        self.metrics.on_correlation_match_detail(
368                            title,
369                            level_str,
370                            body.correlation_type.as_str(),
371                        );
372                    }
373                }
374            }
375
376            line_results[*line_idx].extend(result);
377        }
378
379        line_results
380    }
381
382    /// Reload rules (and pipelines) without blocking in-flight event processing.
383    ///
384    /// Builds a fresh `RuntimeEngine` with the same configuration as the
385    /// current one, re-reads pipeline files from disk (if paths are set),
386    /// loads rules into it, imports the old engine's correlation state, and
387    /// atomically swaps. In-flight batches that already hold an `Arc` to
388    /// the old engine finish undisturbed.
389    ///
390    /// If pipeline or rule loading fails, the old engine remains active.
391    pub fn reload_rules(&self) -> Result<crate::engine::EngineStats, String> {
392        // Snapshot the old engine's configuration AND tuning so the
393        // replacement reaches `load_rules()` with the same flags. Daemon
394        // startup typically sets `set_bloom_prefilter`/`set_bloom_max_bytes`
395        // (and `set_cross_rule_ac` behind `daachorse-index`) before the
396        // first load; carrying those across the swap keeps hot-reload from
397        // silently undoing them.
398        let snapshot = self.engine.load();
399        let old = snapshot.lock();
400        let old_state = old.export_state();
401        let rules_path = old.rules_path().to_path_buf();
402        let pipelines = old.pipelines().to_vec();
403        let pipeline_paths = old.pipeline_paths().to_vec();
404        let corr_config = old.corr_config().clone();
405        let include_event = old.include_event();
406        let resolver = old.source_resolver().cloned();
407        let allow_remote_include = old.allow_remote_include();
408        let bloom_prefilter = old.bloom_prefilter();
409        let bloom_max_bytes = old.bloom_max_bytes();
410        let match_detail = old.match_detail();
411        let routing = old.routing();
412        let logsource_extractor = old.logsource_extractor();
413        #[cfg(feature = "daachorse-index")]
414        let cross_rule_ac = old.cross_rule_ac();
415        drop(old);
416        drop(snapshot);
417
418        let mut new_engine = RuntimeEngine::new(rules_path, pipelines, corr_config, include_event);
419        new_engine.set_pipeline_paths(pipeline_paths);
420        new_engine.set_allow_remote_include(allow_remote_include);
421        new_engine.set_match_detail(match_detail);
422        new_engine.set_bloom_prefilter(bloom_prefilter);
423        if let Some(budget) = bloom_max_bytes {
424            new_engine.set_bloom_max_bytes(budget);
425        }
426        #[cfg(feature = "daachorse-index")]
427        new_engine.set_cross_rule_ac(cross_rule_ac);
428        if let Some(resolver) = resolver {
429            new_engine.set_source_resolver(resolver);
430        }
431        // Carry the schema-routing spec so hot-reload rebuilds the router
432        // instead of silently dropping back to a single engine.
433        new_engine.set_routing(routing);
434        // Carry the logsource extractor so pruning survives hot-reload.
435        new_engine.set_logsource_extractor(logsource_extractor);
436        let stats = new_engine.load_rules()?;
437
438        if let Some(state) = old_state
439            && !new_engine.import_state(&state)
440        {
441            tracing::warn!(
442                "Incompatible correlation snapshot version during reload, starting fresh"
443            );
444        }
445
446        self.swap_engine(new_engine);
447        Ok(stats)
448    }
449
450    /// Return the rules path from the current engine.
451    pub fn rules_path(&self) -> std::path::PathBuf {
452        let snapshot = self.engine.load();
453        let engine = snapshot.lock();
454        engine.rules_path().to_path_buf()
455    }
456
457    /// Return a reference to the metrics hook.
458    pub fn metrics(&self) -> &dyn MetricsHook {
459        &*self.metrics
460    }
461
462    /// Export correlation state from the current engine.
463    pub fn export_state(&self) -> Option<rsigma_eval::CorrelationSnapshot> {
464        let snapshot = self.engine.load();
465        let engine = snapshot.lock();
466        engine.export_state()
467    }
468
469    /// Import correlation state into the current engine.
470    pub fn import_state(&self, snapshot: &rsigma_eval::CorrelationSnapshot) -> bool {
471        let guard = self.engine.load();
472        let mut engine = guard.lock();
473        engine.import_state(snapshot)
474    }
475
476    /// Return summary statistics about the current engine.
477    pub fn stats(&self) -> crate::engine::EngineStats {
478        let snapshot = self.engine.load();
479        let engine = snapshot.lock();
480        engine.stats()
481    }
482
483    /// Read-only snapshot of the correlation window state, filtered by
484    /// correlation id and/or group-key substring. Holds the engine lock only
485    /// for the projection. `None` for a detection-only engine.
486    pub fn introspect_correlations(
487        &self,
488        id: Option<&str>,
489        group: Option<&str>,
490    ) -> Option<rsigma_eval::CorrelationStateSnapshot> {
491        let snapshot = self.engine.load();
492        let engine = snapshot.lock();
493        engine.introspect_correlations(id, group)
494    }
495
496    /// Total rule candidates pruned by logsource on the current engine.
497    pub fn logsource_pruned_total(&self) -> u64 {
498        let snapshot = self.engine.load();
499        let engine = snapshot.lock();
500        engine.logsource_pruned_total()
501    }
502
503    /// Total evaluate calls with no extractable event logsource (fail-open).
504    pub fn logsource_absent_total(&self) -> u64 {
505        let snapshot = self.engine.load();
506        let engine = snapshot.lock();
507        engine.logsource_absent_total()
508    }
509
510    /// Return an immutable snapshot of the current rule field set
511    /// (post-pipeline). The lock is held only long enough to clone the
512    /// `Arc`; the returned value remains valid across reloads.
513    pub fn rule_field_set(&self) -> Arc<RuleFieldSet> {
514        let snapshot = self.engine.load();
515        let engine = snapshot.lock();
516        engine.rule_field_set()
517    }
518}
519
520/// Produce a vec of empty `ProcessResult`, one per input line.
521fn empty_results(count: usize) -> Vec<ProcessResult> {
522    (0..count).map(|_| ProcessResult::new()).collect()
523}
524
525#[cfg(test)]
526mod tests {
527    use super::*;
528    use crate::metrics::NoopMetrics;
529    use rsigma_eval::CorrelationConfig;
530
531    fn identity_filter(v: &serde_json::Value) -> Vec<serde_json::Value> {
532        vec![v.clone()]
533    }
534
535    fn make_processor(rules_yaml: &str) -> LogProcessor {
536        let dir = tempfile::tempdir().unwrap();
537        let rule_path = dir.path().join("test.yml");
538        std::fs::write(&rule_path, rules_yaml).unwrap();
539
540        let mut engine = RuntimeEngine::new(rule_path, vec![], CorrelationConfig::default(), false);
541        engine.load_rules().unwrap();
542        // Leak the tempdir so the path stays valid
543        std::mem::forget(dir);
544        LogProcessor::new(engine, Arc::new(NoopMetrics))
545    }
546
547    #[test]
548    fn process_batch_lines_valid_json() {
549        let proc = make_processor(
550            r#"
551title: Test Rule
552status: test
553logsource:
554    category: test
555detection:
556    selection:
557        EventID: 1
558    condition: selection
559"#,
560        );
561
562        let batch = vec![
563            r#"{"EventID": 1}"#.to_string(),
564            r#"{"EventID": 2}"#.to_string(),
565        ];
566        let results = proc.process_batch_lines(&batch, &identity_filter);
567        assert_eq!(results.len(), 2);
568        assert!(results[0].detection_count() > 0, "EventID=1 should match");
569        assert!(
570            results[1].detection_count() == 0,
571            "EventID=2 should not match"
572        );
573    }
574
575    #[test]
576    fn process_batch_lines_invalid_json() {
577        let proc = make_processor(
578            r#"
579title: Test Rule
580status: test
581logsource:
582    category: test
583detection:
584    selection:
585        EventID: 1
586    condition: selection
587"#,
588        );
589
590        let batch = vec!["not json".to_string(), r#"{"EventID": 1}"#.to_string()];
591        let results = proc.process_batch_lines(&batch, &identity_filter);
592        assert_eq!(results.len(), 2);
593        assert!(
594            results[0].detection_count() == 0,
595            "invalid JSON produces empty result"
596        );
597        assert!(results[1].detection_count() > 0, "valid line still matches");
598    }
599
600    #[test]
601    fn swap_engine_replaces_rules() {
602        let dir = tempfile::tempdir().unwrap();
603        let rule_path = dir.path().join("test.yml");
604        std::fs::write(
605            &rule_path,
606            r#"
607title: Rule A
608status: test
609logsource:
610    category: test
611detection:
612    selection:
613        EventID: 1
614    condition: selection
615"#,
616        )
617        .unwrap();
618
619        let mut engine = RuntimeEngine::new(
620            rule_path.clone(),
621            vec![],
622            CorrelationConfig::default(),
623            false,
624        );
625        engine.load_rules().unwrap();
626        let proc = LogProcessor::new(engine, Arc::new(NoopMetrics));
627
628        let batch = vec![r#"{"EventID": 1}"#.to_string()];
629        assert!(proc.process_batch_lines(&batch, &identity_filter)[0].detection_count() > 0);
630
631        // Swap to a rule that matches EventID: 99
632        std::fs::write(
633            &rule_path,
634            r#"
635title: Rule B
636status: test
637logsource:
638    category: test
639detection:
640    selection:
641        EventID: 99
642    condition: selection
643"#,
644        )
645        .unwrap();
646
647        let mut new_engine =
648            RuntimeEngine::new(rule_path, vec![], CorrelationConfig::default(), false);
649        new_engine.load_rules().unwrap();
650        proc.swap_engine(new_engine);
651
652        assert!(proc.process_batch_lines(&batch, &identity_filter)[0].detection_count() == 0);
653
654        let batch2 = vec![r#"{"EventID": 99}"#.to_string()];
655        assert!(proc.process_batch_lines(&batch2, &identity_filter)[0].detection_count() > 0);
656
657        std::mem::forget(dir);
658    }
659
660    #[test]
661    fn reload_rules_preserves_bloom_tuning() {
662        // Daemon startup typically calls `set_bloom_prefilter(true)` and
663        // friends on the initial RuntimeEngine. Previously, `reload_rules`
664        // rebuilt a fresh RuntimeEngine via `RuntimeEngine::new`, which
665        // resets those flags to defaults; the daemon then silently lost
666        // bloom pre-filtering on the first hot-reload. This test pins the
667        // fix by checking the underlying engine's setters after reload.
668        let dir = tempfile::tempdir().unwrap();
669        let rule_path = dir.path().join("test.yml");
670        std::fs::write(
671            &rule_path,
672            r#"
673title: Rule A
674status: test
675logsource:
676    category: test
677detection:
678    selection:
679        EventID: 1
680    condition: selection
681"#,
682        )
683        .unwrap();
684
685        let mut engine = RuntimeEngine::new(
686            rule_path.clone(),
687            vec![],
688            CorrelationConfig::default(),
689            false,
690        );
691        engine.set_bloom_prefilter(true);
692        engine.set_bloom_max_bytes(2 * 1024 * 1024);
693        #[cfg(feature = "daachorse-index")]
694        engine.set_cross_rule_ac(true);
695        engine.load_rules().unwrap();
696
697        let proc = LogProcessor::new(engine, Arc::new(NoopMetrics));
698        proc.reload_rules().unwrap();
699
700        let snapshot = proc.engine_snapshot();
701        let reloaded = snapshot.lock();
702        assert!(
703            reloaded.bloom_prefilter(),
704            "bloom_prefilter must survive reload_rules"
705        );
706        assert_eq!(
707            reloaded.bloom_max_bytes(),
708            Some(2 * 1024 * 1024),
709            "bloom_max_bytes must survive reload_rules"
710        );
711        #[cfg(feature = "daachorse-index")]
712        assert!(
713            reloaded.cross_rule_ac(),
714            "cross_rule_ac must survive reload_rules"
715        );
716
717        std::mem::forget(dir);
718    }
719
720    #[test]
721    fn reload_rules_preserves_engine() {
722        let dir = tempfile::tempdir().unwrap();
723        let rule_path = dir.path().join("test.yml");
724        std::fs::write(
725            &rule_path,
726            r#"
727title: Rule A
728status: test
729logsource:
730    category: test
731detection:
732    selection:
733        EventID: 1
734    condition: selection
735"#,
736        )
737        .unwrap();
738
739        let mut engine = RuntimeEngine::new(
740            rule_path.clone(),
741            vec![],
742            CorrelationConfig::default(),
743            false,
744        );
745        engine.load_rules().unwrap();
746        let proc = LogProcessor::new(engine, Arc::new(NoopMetrics));
747
748        let batch = vec![r#"{"EventID": 1}"#.to_string()];
749        assert!(proc.process_batch_lines(&batch, &identity_filter)[0].detection_count() > 0);
750
751        // Update the rule file and reload
752        std::fs::write(
753            &rule_path,
754            r#"
755title: Rule B
756status: test
757logsource:
758    category: test
759detection:
760    selection:
761        EventID: 42
762    condition: selection
763"#,
764        )
765        .unwrap();
766
767        let stats = proc.reload_rules().unwrap();
768        assert_eq!(stats.detection_rules, 1);
769
770        // Old rule should no longer match
771        assert!(proc.process_batch_lines(&batch, &identity_filter)[0].detection_count() == 0);
772        // New rule should match
773        let batch2 = vec![r#"{"EventID": 42}"#.to_string()];
774        assert!(proc.process_batch_lines(&batch2, &identity_filter)[0].detection_count() > 0);
775
776        std::mem::forget(dir);
777    }
778
779    #[test]
780    fn reload_re_reads_pipelines_from_disk() {
781        let dir = tempfile::tempdir().unwrap();
782
783        // Rule uses the generic Sigma field name "SourceIP".
784        // The pipeline maps it to what the events actually contain.
785        let rule_path = dir.path().join("test.yml");
786        std::fs::write(
787            &rule_path,
788            r#"
789title: Rule A
790status: test
791logsource:
792    category: test
793detection:
794    selection:
795        SourceIP: "10.0.0.1"
796    condition: selection
797"#,
798        )
799        .unwrap();
800
801        // Pipeline maps the rule's SourceIP field to "src_ip" (event field)
802        let pipeline_path = dir.path().join("pipeline.yml");
803        std::fs::write(
804            &pipeline_path,
805            r#"
806name: Initial Pipeline
807priority: 10
808transformations:
809  - id: rename_field
810    type: field_name_mapping
811    mapping:
812      SourceIP: src_ip
813"#,
814        )
815        .unwrap();
816
817        let pipelines = vec![rsigma_eval::parse_pipeline_file(&pipeline_path).unwrap()];
818        let mut engine = RuntimeEngine::new(
819            rule_path.clone(),
820            pipelines,
821            CorrelationConfig::default(),
822            false,
823        );
824        engine.set_pipeline_paths(vec![pipeline_path.clone()]);
825        engine.load_rules().unwrap();
826        let proc = LogProcessor::new(engine, Arc::new(NoopMetrics));
827
828        // Event uses "src_ip" which the pipeline mapped from SourceIP
829        let batch = vec![r#"{"src_ip": "10.0.0.1"}"#.to_string()];
830        assert!(
831            proc.process_batch_lines(&batch, &identity_filter)[0].detection_count() > 0,
832            "src_ip should match because pipeline mapped SourceIP -> src_ip"
833        );
834
835        // Update pipeline to map SourceIP to a different event field name
836        std::fs::write(
837            &pipeline_path,
838            r#"
839name: Updated Pipeline
840priority: 10
841transformations:
842  - id: rename_field
843    type: field_name_mapping
844    mapping:
845      SourceIP: source.ip
846"#,
847        )
848        .unwrap();
849
850        proc.reload_rules().unwrap();
851
852        // src_ip no longer the target, should not match
853        assert!(
854            proc.process_batch_lines(&batch, &identity_filter)[0].detection_count() == 0,
855            "after pipeline reload, src_ip should no longer match"
856        );
857
858        // source.ip is now the mapped name, should match
859        let batch2 = vec![r#"{"source.ip": "10.0.0.1"}"#.to_string()];
860        assert!(
861            proc.process_batch_lines(&batch2, &identity_filter)[0].detection_count() > 0,
862            "after pipeline reload, source.ip should match"
863        );
864
865        std::mem::forget(dir);
866    }
867
868    #[test]
869    fn reload_with_broken_pipeline_keeps_old_engine() {
870        let dir = tempfile::tempdir().unwrap();
871        let rule_path = dir.path().join("test.yml");
872        std::fs::write(
873            &rule_path,
874            r#"
875title: Rule A
876status: test
877logsource:
878    category: test
879detection:
880    selection:
881        SourceIP: "10.0.0.1"
882    condition: selection
883"#,
884        )
885        .unwrap();
886
887        let pipeline_path = dir.path().join("pipeline.yml");
888        std::fs::write(
889            &pipeline_path,
890            r#"
891name: Working Pipeline
892priority: 10
893transformations:
894  - id: rename_field
895    type: field_name_mapping
896    mapping:
897      SourceIP: src_ip
898"#,
899        )
900        .unwrap();
901
902        let pipelines = vec![rsigma_eval::parse_pipeline_file(&pipeline_path).unwrap()];
903        let mut engine = RuntimeEngine::new(
904            rule_path.clone(),
905            pipelines,
906            CorrelationConfig::default(),
907            false,
908        );
909        engine.set_pipeline_paths(vec![pipeline_path.clone()]);
910        engine.load_rules().unwrap();
911        let proc = LogProcessor::new(engine, Arc::new(NoopMetrics));
912
913        // Verify initial state works (SourceIP mapped to src_ip)
914        let batch = vec![r#"{"src_ip": "10.0.0.1"}"#.to_string()];
915        assert!(proc.process_batch_lines(&batch, &identity_filter)[0].detection_count() > 0);
916
917        // Write broken YAML to the pipeline file
918        std::fs::write(&pipeline_path, "{{{{ invalid yaml !!!!").unwrap();
919
920        // Reload should fail
921        let result = proc.reload_rules();
922        assert!(result.is_err(), "reload with broken pipeline should fail");
923
924        // Old engine should still be active and working
925        assert!(
926            proc.process_batch_lines(&batch, &identity_filter)[0].detection_count() > 0,
927            "old engine should still work after failed reload"
928        );
929
930        std::mem::forget(dir);
931    }
932
933    #[test]
934    fn custom_event_filter() {
935        let proc = make_processor(
936            r#"
937title: Test Rule
938status: test
939logsource:
940    category: test
941detection:
942    selection:
943        EventID: 1
944    condition: selection
945"#,
946        );
947
948        // Filter that extracts a nested "records" array
949        let filter = |v: &serde_json::Value| -> Vec<serde_json::Value> {
950            if let Some(records) = v.get("records").and_then(|r| r.as_array()) {
951                records.clone()
952            } else {
953                vec![v.clone()]
954            }
955        };
956
957        let batch = vec![r#"{"records": [{"EventID": 1}, {"EventID": 2}]}"#.to_string()];
958        let results = proc.process_batch_lines(&batch, &filter);
959        assert_eq!(results.len(), 1);
960        assert_eq!(
961            results[0].detection_count(),
962            1,
963            "only EventID=1 from records array should match"
964        );
965    }
966
967    #[test]
968    fn empty_batch_returns_empty() {
969        let proc = make_processor(
970            r#"
971title: Test Rule
972status: test
973logsource:
974    category: test
975detection:
976    selection:
977        EventID: 1
978    condition: selection
979"#,
980        );
981
982        let batch: Vec<String> = vec![];
983        let results = proc.process_batch_lines(&batch, &identity_filter);
984        assert!(results.is_empty());
985    }
986
987    /// Verify MetricsHook is called correctly during processing.
988    #[test]
989    fn metrics_hook_invocations() {
990        use std::sync::atomic::{AtomicU64, Ordering};
991
992        struct CountingMetrics {
993            parse_errors: AtomicU64,
994            events_processed: AtomicU64,
995            detection_matches: AtomicU64,
996        }
997
998        impl MetricsHook for CountingMetrics {
999            fn on_parse_error(&self) {
1000                self.parse_errors.fetch_add(1, Ordering::Relaxed);
1001            }
1002            fn on_events_processed(&self, count: u64) {
1003                self.events_processed.fetch_add(count, Ordering::Relaxed);
1004            }
1005            fn on_detection_matches(&self, count: u64) {
1006                self.detection_matches.fetch_add(count, Ordering::Relaxed);
1007            }
1008            fn on_correlation_matches(&self, _: u64) {}
1009            fn observe_processing_latency(&self, _: f64) {}
1010            fn on_input_queue_depth_change(&self, _: i64) {}
1011            fn on_back_pressure(&self) {}
1012            fn observe_batch_size(&self, _: u64) {}
1013            fn on_output_queue_depth_change(&self, _: i64) {}
1014            fn observe_pipeline_latency(&self, _: f64) {}
1015            fn set_correlation_state_entries(&self, _: u64) {}
1016        }
1017
1018        let dir = tempfile::tempdir().unwrap();
1019        let rule_path = dir.path().join("test.yml");
1020        std::fs::write(
1021            &rule_path,
1022            r#"
1023title: Test Rule
1024status: test
1025logsource:
1026    category: test
1027detection:
1028    selection:
1029        EventID: 1
1030    condition: selection
1031"#,
1032        )
1033        .unwrap();
1034
1035        let mut engine = RuntimeEngine::new(rule_path, vec![], CorrelationConfig::default(), false);
1036        engine.load_rules().unwrap();
1037
1038        let metrics = Arc::new(CountingMetrics {
1039            parse_errors: AtomicU64::new(0),
1040            events_processed: AtomicU64::new(0),
1041            detection_matches: AtomicU64::new(0),
1042        });
1043        let proc = LogProcessor::new(engine, metrics.clone());
1044
1045        let batch = vec![
1046            "not json".to_string(),
1047            r#"{"EventID": 1}"#.to_string(),
1048            r#"{"EventID": 2}"#.to_string(),
1049        ];
1050        proc.process_batch_lines(&batch, &identity_filter);
1051
1052        assert_eq!(metrics.parse_errors.load(Ordering::Relaxed), 1);
1053        assert_eq!(metrics.events_processed.load(Ordering::Relaxed), 2);
1054        assert_eq!(metrics.detection_matches.load(Ordering::Relaxed), 1);
1055
1056        std::mem::forget(dir);
1057    }
1058
1059    /// Verify concurrent processing and swap don't panic (basic thread safety).
1060    #[test]
1061    fn concurrent_swap_and_process() {
1062        let dir = tempfile::tempdir().unwrap();
1063        let rule_path = dir.path().join("test.yml");
1064        std::fs::write(
1065            &rule_path,
1066            r#"
1067title: Rule A
1068status: test
1069logsource:
1070    category: test
1071detection:
1072    selection:
1073        EventID: 1
1074    condition: selection
1075"#,
1076        )
1077        .unwrap();
1078
1079        let mut engine = RuntimeEngine::new(
1080            rule_path.clone(),
1081            vec![],
1082            CorrelationConfig::default(),
1083            false,
1084        );
1085        engine.load_rules().unwrap();
1086        let proc = Arc::new(LogProcessor::new(engine, Arc::new(NoopMetrics)));
1087
1088        let handles: Vec<_> = (0..4)
1089            .map(|i| {
1090                let proc = proc.clone();
1091                let rule_path = rule_path.clone();
1092                std::thread::spawn(move || {
1093                    let batch = vec![r#"{"EventID": 1}"#.to_string()];
1094                    for _ in 0..100 {
1095                        let _ = proc.process_batch_lines(&batch, &identity_filter);
1096                    }
1097                    // Thread 0 does a swap mid-flight
1098                    if i == 0 {
1099                        let mut new_engine = RuntimeEngine::new(
1100                            rule_path,
1101                            vec![],
1102                            CorrelationConfig::default(),
1103                            false,
1104                        );
1105                        new_engine.load_rules().unwrap();
1106                        proc.swap_engine(new_engine);
1107                    }
1108                })
1109            })
1110            .collect();
1111
1112        for h in handles {
1113            h.join().unwrap();
1114        }
1115
1116        std::mem::forget(dir);
1117    }
1118
1119    // --- Tests for process_batch_with_format ---
1120
1121    #[test]
1122    fn format_json_matches() {
1123        let proc = make_processor(
1124            r#"
1125title: Test Rule
1126status: test
1127logsource:
1128    category: test
1129detection:
1130    selection:
1131        EventID: 1
1132    condition: selection
1133"#,
1134        );
1135
1136        let batch = vec![r#"{"EventID": 1}"#.to_string()];
1137        let results = proc.process_batch_with_format(&batch, &InputFormat::Json, None);
1138        assert_eq!(results.len(), 1);
1139        assert!(
1140            results[0].detection_count() > 0,
1141            "JSON EventID=1 should match"
1142        );
1143    }
1144
1145    #[test]
1146    fn format_syslog_extracts_fields() {
1147        let proc = make_processor(
1148            r#"
1149title: Syslog Test
1150status: test
1151logsource:
1152    category: test
1153detection:
1154    selection:
1155        hostname: mymachine
1156    condition: selection
1157"#,
1158        );
1159
1160        let batch = vec!["<34>Oct 11 22:14:15 mymachine su: test message".to_string()];
1161        let results = proc.process_batch_with_format(
1162            &batch,
1163            &InputFormat::Syslog(crate::input::SyslogConfig::default()),
1164            None,
1165        );
1166        assert_eq!(results.len(), 1);
1167        assert!(
1168            results[0].detection_count() > 0,
1169            "syslog hostname=mymachine should match"
1170        );
1171    }
1172
1173    #[test]
1174    fn format_plain_keyword_match() {
1175        let proc = make_processor(
1176            r#"
1177title: Keyword Test
1178status: test
1179logsource:
1180    category: test
1181detection:
1182    keywords:
1183        - "disk full"
1184    condition: keywords
1185"#,
1186        );
1187
1188        let batch = vec!["ERROR: disk full on /dev/sda1".to_string()];
1189        let results = proc.process_batch_with_format(&batch, &InputFormat::Plain, None);
1190        assert_eq!(results.len(), 1);
1191        assert!(
1192            results[0].detection_count() > 0,
1193            "plain keyword 'disk full' should match"
1194        );
1195    }
1196
1197    #[test]
1198    fn format_auto_detects_json() {
1199        let proc = make_processor(
1200            r#"
1201title: Test Rule
1202status: test
1203logsource:
1204    category: test
1205detection:
1206    selection:
1207        EventID: 1
1208    condition: selection
1209"#,
1210        );
1211
1212        let batch = vec![r#"{"EventID": 1}"#.to_string()];
1213        let results = proc.process_batch_with_format(&batch, &InputFormat::default(), None);
1214        assert_eq!(results.len(), 1);
1215        assert!(results[0].detection_count() > 0);
1216    }
1217
1218    #[test]
1219    fn format_json_with_event_filter() {
1220        let proc = make_processor(
1221            r#"
1222title: Test Rule
1223status: test
1224logsource:
1225    category: test
1226detection:
1227    selection:
1228        EventID: 1
1229    condition: selection
1230"#,
1231        );
1232
1233        let filter = |v: &serde_json::Value| -> Vec<serde_json::Value> {
1234            if let Some(records) = v.get("records").and_then(|r| r.as_array()) {
1235                records.clone()
1236            } else {
1237                vec![v.clone()]
1238            }
1239        };
1240
1241        let batch = vec![r#"{"records": [{"EventID": 1}, {"EventID": 2}]}"#.to_string()];
1242        let results = proc.process_batch_with_format(&batch, &InputFormat::Json, Some(&filter));
1243        assert_eq!(results.len(), 1);
1244        assert_eq!(
1245            results[0].detection_count(),
1246            1,
1247            "only EventID=1 from records array should match"
1248        );
1249    }
1250
1251    #[test]
1252    fn format_empty_lines_skipped() {
1253        let proc = make_processor(
1254            r#"
1255title: Test Rule
1256status: test
1257logsource:
1258    category: test
1259detection:
1260    selection:
1261        EventID: 1
1262    condition: selection
1263"#,
1264        );
1265
1266        let batch = vec![
1267            "".to_string(),
1268            "   ".to_string(),
1269            r#"{"EventID": 1}"#.to_string(),
1270        ];
1271        let results = proc.process_batch_with_format(&batch, &InputFormat::Json, None);
1272        assert_eq!(results.len(), 3);
1273        assert!(results[0].detection_count() == 0);
1274        assert!(results[1].detection_count() == 0);
1275        assert!(results[2].detection_count() > 0);
1276    }
1277
1278    #[cfg(feature = "logfmt")]
1279    #[test]
1280    fn format_logfmt_matches() {
1281        let proc = make_processor(
1282            r#"
1283title: Logfmt Test
1284status: test
1285logsource:
1286    category: test
1287detection:
1288    selection:
1289        level: error
1290    condition: selection
1291"#,
1292        );
1293
1294        let batch = vec!["level=error msg=something host=web01".to_string()];
1295        let results = proc.process_batch_with_format(&batch, &InputFormat::Logfmt, None);
1296        assert_eq!(results.len(), 1);
1297        assert!(
1298            results[0].detection_count() > 0,
1299            "logfmt level=error should match"
1300        );
1301    }
1302
1303    #[cfg(feature = "cef")]
1304    #[test]
1305    fn format_cef_matches() {
1306        let proc = make_processor(
1307            r#"
1308title: CEF Test
1309status: test
1310logsource:
1311    category: test
1312detection:
1313    selection:
1314        deviceVendor: Security
1315    condition: selection
1316"#,
1317        );
1318
1319        let batch = vec!["CEF:0|Security|IDS|1.0|100|Attack|9|src=10.0.0.1".to_string()];
1320        let results = proc.process_batch_with_format(&batch, &InputFormat::Cef, None);
1321        assert_eq!(results.len(), 1);
1322        assert!(
1323            results[0].detection_count() > 0,
1324            "CEF deviceVendor=Security should match"
1325        );
1326    }
1327}