Skip to main content

rsigma_runtime/
processor.rs

1use std::sync::Arc;
2
3use parking_lot::Mutex;
4use std::time::Instant;
5
6use arc_swap::ArcSwap;
7use rsigma_eval::{Event, FieldObserver, JsonEvent, ProcessResult, ProcessResultExt, RuleFieldSet};
8
9use crate::engine::RuntimeEngine;
10use crate::input::{EventInputDecoded, InputFormat, parse_line};
11use crate::metrics::MetricsHook;
12use crate::tap::{TapPayload, TapRegistry, TapStage};
13
14/// Closure that extracts multiple payloads from a single JSON value.
15///
16/// Used by the daemon's event filter (e.g. jq/jsonpath) to explode a JSON
17/// object into sub-events (e.g. `.records[]`). Only applies to JSON input.
18pub type EventFilter = dyn Fn(&serde_json::Value) -> Vec<serde_json::Value>;
19
20/// Thread-safe handle to the engine, swappable atomically for hot-reload.
21///
22/// Uses `ArcSwap<Mutex<RuntimeEngine>>` so that:
23/// - Detection + correlation processing can acquire `&mut RuntimeEngine` via
24///   the inner `Mutex`.
25/// - Hot-reload swaps the entire engine atomically without blocking in-flight
26///   batches (they hold an `Arc` to the old engine until their batch completes).
27pub struct LogProcessor {
28    engine: Arc<ArcSwap<Mutex<RuntimeEngine>>>,
29    metrics: Arc<dyn MetricsHook>,
30    /// Optional opt-in field observer. When `Some`, every parsed event
31    /// flowing through `process_batch_with_format` has its field keys
32    /// recorded. When `None`, the batch path skips iteration entirely
33    /// so the hot path stays untouched.
34    field_observer: ArcSwap<Option<Arc<FieldObserver>>>,
35    /// Optional live event tap. When `Some`, `process_batch_with_format`
36    /// offers raw lines and/or decoded events to any active capture
37    /// sessions with a non-blocking `try_send`. When `None` (or when no
38    /// session is active), the hot path performs one `ArcSwap` load and
39    /// skips capture entirely.
40    event_tap: ArcSwap<Option<Arc<TapRegistry>>>,
41}
42
43impl LogProcessor {
44    /// Create a new processor wrapping the given engine and metrics hook.
45    pub fn new(engine: RuntimeEngine, metrics: Arc<dyn MetricsHook>) -> Self {
46        LogProcessor {
47            engine: Arc::new(ArcSwap::from_pointee(Mutex::new(engine))),
48            metrics,
49            field_observer: ArcSwap::new(Arc::new(None)),
50            event_tap: ArcSwap::new(Arc::new(None)),
51        }
52    }
53
54    /// Attach (or detach) the opt-in field observer.
55    ///
56    /// When set, [`process_batch_with_format`](Self::process_batch_with_format)
57    /// records each parsed event's field keys before evaluation. Pass
58    /// `None` to disable observation; the hot path then performs zero
59    /// extra work. Safe to call at runtime: the swap is wait-free, and
60    /// in-flight batches finish against whichever observer they read.
61    pub fn set_field_observer(&self, observer: Option<Arc<FieldObserver>>) {
62        self.field_observer.store(Arc::new(observer));
63    }
64
65    /// Return the currently-attached field observer, if any.
66    pub fn field_observer(&self) -> Option<Arc<FieldObserver>> {
67        self.field_observer.load_full().as_ref().clone()
68    }
69
70    /// Attach (or detach) the live event tap.
71    ///
72    /// When set, [`process_batch_with_format`](Self::process_batch_with_format)
73    /// offers raw lines and decoded events to the registry's active capture
74    /// sessions. Pass `None` to disable the tap; the hot path then performs a
75    /// single `ArcSwap` load and skips capture. Safe to call at runtime: the
76    /// swap is wait-free, and in-flight batches finish against whichever
77    /// registry they read.
78    pub fn set_event_tap(&self, tap: Option<Arc<TapRegistry>>) {
79        self.event_tap.store(Arc::new(tap));
80    }
81
82    /// Return the currently-attached event tap registry, if any.
83    pub fn event_tap(&self) -> Option<Arc<TapRegistry>> {
84        self.event_tap.load_full().as_ref().clone()
85    }
86
87    /// Atomically replace the engine with a new one.
88    ///
89    /// In-flight batches continue against the old engine (they hold an `Arc`
90    /// snapshot). New batches see the replacement on their next call to
91    /// `process_batch_lines`.
92    pub fn swap_engine(&self, new_engine: RuntimeEngine) {
93        self.engine.store(Arc::new(Mutex::new(new_engine)));
94    }
95
96    /// Load a snapshot of the current engine for use during reload.
97    ///
98    /// The caller can lock the returned guard to export state, build a new
99    /// engine, import state, and then call `swap_engine`.
100    pub fn engine_snapshot(&self) -> arc_swap::Guard<Arc<Mutex<RuntimeEngine>>> {
101        self.engine.load()
102    }
103
104    /// Process a batch of raw input lines through the engine.
105    ///
106    /// 1. Parses each line as JSON; on error, increments parse error metrics.
107    /// 2. Applies the `event_filter` closure to extract payloads.
108    /// 3. Evaluates all payloads via `RuntimeEngine::process_batch`.
109    /// 4. Merges per-payload results back into per-line results.
110    /// 5. Updates metrics (events processed, latency, match counts).
111    ///
112    /// Returns one `ProcessResult` per input line.
113    pub fn process_batch_lines(
114        &self,
115        batch: &[String],
116        event_filter: &EventFilter,
117    ) -> Vec<ProcessResult> {
118        let engine_guard = self.engine.load();
119        let mut engine = engine_guard.lock();
120
121        // Phase 1: Parse JSON and apply event filters, tracking line origin.
122        let mut parsed: Vec<(usize, Vec<serde_json::Value>)> = Vec::with_capacity(batch.len());
123        for (line_idx, line) in batch.iter().enumerate() {
124            match serde_json::from_str::<serde_json::Value>(line) {
125                Ok(value) => {
126                    let payloads = event_filter(&value);
127                    if !payloads.is_empty() {
128                        parsed.push((line_idx, payloads));
129                    }
130                }
131                Err(e) => {
132                    self.metrics.on_parse_error();
133                    tracing::debug!(error = %e, "Invalid JSON on input");
134                }
135            }
136        }
137
138        // Flatten: (line_idx, &Value) for each payload across all lines
139        let mut flat: Vec<(usize, &serde_json::Value)> = Vec::new();
140        for (line_idx, payloads) in &parsed {
141            for payload in payloads {
142                flat.push((*line_idx, payload));
143            }
144        }
145
146        if flat.is_empty() {
147            return empty_results(batch.len());
148        }
149
150        // Phase 2: Batch evaluation — parallel detection + sequential correlation
151        let events: Vec<JsonEvent> = flat.iter().map(|(_, v)| JsonEvent::borrow(v)).collect();
152        let event_refs: Vec<&JsonEvent> = events.iter().collect();
153
154        let start = Instant::now();
155        let batch_results = engine.process_batch(&event_refs);
156        let elapsed = start.elapsed().as_secs_f64();
157        let per_event_latency = elapsed / event_refs.len() as f64;
158
159        // Update correlation state metrics while we still hold the lock
160        let stats = engine.stats();
161        self.metrics
162            .set_correlation_state_entries(stats.state_entries as u64);
163
164        // Phase 3: Merge results per input line and update metrics
165        let mut line_results = empty_results(batch.len());
166
167        for ((line_idx, _), result) in flat.iter().zip(batch_results) {
168            self.metrics.on_events_processed(1);
169            self.metrics.observe_processing_latency(per_event_latency);
170            self.metrics
171                .on_detection_matches(result.detection_count() as u64);
172            self.metrics
173                .on_correlation_matches(result.correlation_count() as u64);
174
175            for r in &result {
176                let level_str = r.header.level.as_ref().map_or("unknown", |l| l.as_str());
177                let title = &r.header.rule_title;
178                match &r.body {
179                    rsigma_eval::ResultBody::Detection(_) => {
180                        self.metrics.on_detection_match_detail(title, level_str);
181                    }
182                    rsigma_eval::ResultBody::Correlation(body) => {
183                        self.metrics.on_correlation_match_detail(
184                            title,
185                            level_str,
186                            body.correlation_type.as_str(),
187                        );
188                    }
189                }
190            }
191
192            line_results[*line_idx].extend(result);
193        }
194
195        line_results
196    }
197
198    /// Process a batch of raw input lines using the specified input format.
199    ///
200    /// Unlike [`process_batch_lines`](Self::process_batch_lines), this method
201    /// supports all input formats (JSON, syslog, plain, logfmt, CEF). The
202    /// `event_filter` only applies to JSON-decoded events (it extracts multiple
203    /// payloads from one JSON object, e.g. a `records[]` array). Non-JSON
204    /// formats produce exactly one event per line.
205    ///
206    /// Returns one `ProcessResult` per input line.
207    pub fn process_batch_with_format(
208        &self,
209        batch: &[String],
210        format: &InputFormat,
211        event_filter: Option<&EventFilter>,
212    ) -> Vec<ProcessResult> {
213        let engine_guard = self.engine.load();
214        let mut engine = engine_guard.lock();
215
216        // Live event tap: load the active-session snapshot once for the whole
217        // batch. Cheap when disabled (one `ArcSwap` load plus an `Option`
218        // check); the guard is held through the line and event loops below.
219        let tap_guard = self.event_tap.load();
220        let tap_sessions = tap_guard
221            .as_ref()
222            .as_ref()
223            .map(|reg| reg.sessions_snapshot());
224        let tap_has_raw = tap_sessions
225            .as_ref()
226            .is_some_and(|s| s.iter().any(|x| x.stage == TapStage::Raw));
227        let tap_has_decoded = tap_sessions
228            .as_ref()
229            .is_some_and(|s| s.iter().any(|x| x.stage == TapStage::Decoded));
230
231        // Phase 1: Parse each line into decoded events, tracking line origin.
232        // For JSON with an event_filter, one line can produce multiple events.
233        let mut decoded_events: Vec<(usize, EventInputDecoded)> = Vec::with_capacity(batch.len());
234
235        for (line_idx, line) in batch.iter().enumerate() {
236            // Raw-stage tap runs before parsing so a non-redacting raw capture
237            // records every non-empty line, including ones that fail to parse.
238            if tap_has_raw
239                && !line.trim().is_empty()
240                && let Some(sessions) = tap_sessions.as_ref()
241            {
242                for s in sessions.iter().filter(|s| s.stage == TapStage::Raw) {
243                    s.offer(TapPayload::Raw(line.clone()));
244                }
245            }
246
247            let Some(decoded) = parse_line(line, format) else {
248                if !line.trim().is_empty() {
249                    self.metrics.on_parse_error();
250                    tracing::debug!("Failed to parse input line");
251                }
252                continue;
253            };
254
255            // For JSON events with an event filter, apply the filter which
256            // may produce multiple payloads (e.g. `.records[]`).
257            if let Some(filter) = event_filter
258                && let EventInputDecoded::Json(ref json_event) = decoded
259            {
260                let json_value = json_event.to_json();
261                let payloads = filter(&json_value);
262                for payload in payloads {
263                    decoded_events
264                        .push((line_idx, EventInputDecoded::Json(JsonEvent::owned(payload))));
265                }
266                continue;
267            }
268
269            decoded_events.push((line_idx, decoded));
270        }
271
272        if decoded_events.is_empty() {
273            return empty_results(batch.len());
274        }
275
276        // Optional opt-in field observation. Cheap when disabled: one
277        // hazard-pointer `Guard` (no Arc clone) plus an `Option` check.
278        // When enabled, walks each decoded event's field keys before
279        // evaluation. The Guard's lifetime extends through the loop so
280        // the observer cannot be dropped mid-batch even if the daemon
281        // detaches it concurrently.
282        let observer_guard = self.field_observer.load();
283        if let Some(observer) = observer_guard.as_ref() {
284            for (_, decoded) in &decoded_events {
285                observer.observe(decoded);
286            }
287        }
288        drop(observer_guard);
289
290        // Decoded-stage tap: offer each decoded event (post-parse,
291        // post-event-filter) to active decoded-stage sessions. The event is
292        // serialized to JSON only when at least one decoded session is active.
293        if tap_has_decoded && let Some(sessions) = tap_sessions.as_ref() {
294            for (_, decoded) in &decoded_events {
295                let value = decoded.to_json();
296                for s in sessions.iter().filter(|s| s.stage == TapStage::Decoded) {
297                    s.offer(TapPayload::Decoded(Box::new(value.clone())));
298                }
299            }
300        }
301
302        // Phase 2: Batch evaluation — parallel detection + sequential correlation
303        let event_refs: Vec<&EventInputDecoded> = decoded_events.iter().map(|(_, e)| e).collect();
304
305        let start = Instant::now();
306        let batch_results = engine.process_batch(&event_refs);
307        let elapsed = start.elapsed().as_secs_f64();
308        let per_event_latency = elapsed / event_refs.len() as f64;
309
310        let stats = engine.stats();
311        self.metrics
312            .set_correlation_state_entries(stats.state_entries as u64);
313
314        // Phase 3: Merge results per input line and update metrics
315        let mut line_results = empty_results(batch.len());
316
317        for ((line_idx, _), result) in decoded_events.iter().zip(batch_results) {
318            self.metrics.on_events_processed(1);
319            self.metrics.observe_processing_latency(per_event_latency);
320            self.metrics
321                .on_detection_matches(result.detection_count() as u64);
322            self.metrics
323                .on_correlation_matches(result.correlation_count() as u64);
324
325            for r in &result {
326                let level_str = r.header.level.as_ref().map_or("unknown", |l| l.as_str());
327                let title = &r.header.rule_title;
328                match &r.body {
329                    rsigma_eval::ResultBody::Detection(_) => {
330                        self.metrics.on_detection_match_detail(title, level_str);
331                    }
332                    rsigma_eval::ResultBody::Correlation(body) => {
333                        self.metrics.on_correlation_match_detail(
334                            title,
335                            level_str,
336                            body.correlation_type.as_str(),
337                        );
338                    }
339                }
340            }
341
342            line_results[*line_idx].extend(result);
343        }
344
345        line_results
346    }
347
348    /// Reload rules (and pipelines) without blocking in-flight event processing.
349    ///
350    /// Builds a fresh `RuntimeEngine` with the same configuration as the
351    /// current one, re-reads pipeline files from disk (if paths are set),
352    /// loads rules into it, imports the old engine's correlation state, and
353    /// atomically swaps. In-flight batches that already hold an `Arc` to
354    /// the old engine finish undisturbed.
355    ///
356    /// If pipeline or rule loading fails, the old engine remains active.
357    pub fn reload_rules(&self) -> Result<crate::engine::EngineStats, String> {
358        // Snapshot the old engine's configuration AND tuning so the
359        // replacement reaches `load_rules()` with the same flags. Daemon
360        // startup typically sets `set_bloom_prefilter`/`set_bloom_max_bytes`
361        // (and `set_cross_rule_ac` behind `daachorse-index`) before the
362        // first load; carrying those across the swap keeps hot-reload from
363        // silently undoing them.
364        let snapshot = self.engine.load();
365        let old = snapshot.lock();
366        let old_state = old.export_state();
367        let rules_path = old.rules_path().to_path_buf();
368        let pipelines = old.pipelines().to_vec();
369        let pipeline_paths = old.pipeline_paths().to_vec();
370        let corr_config = old.corr_config().clone();
371        let include_event = old.include_event();
372        let resolver = old.source_resolver().cloned();
373        let allow_remote_include = old.allow_remote_include();
374        let bloom_prefilter = old.bloom_prefilter();
375        let bloom_max_bytes = old.bloom_max_bytes();
376        let match_detail = old.match_detail();
377        #[cfg(feature = "daachorse-index")]
378        let cross_rule_ac = old.cross_rule_ac();
379        drop(old);
380        drop(snapshot);
381
382        let mut new_engine = RuntimeEngine::new(rules_path, pipelines, corr_config, include_event);
383        new_engine.set_pipeline_paths(pipeline_paths);
384        new_engine.set_allow_remote_include(allow_remote_include);
385        new_engine.set_match_detail(match_detail);
386        new_engine.set_bloom_prefilter(bloom_prefilter);
387        if let Some(budget) = bloom_max_bytes {
388            new_engine.set_bloom_max_bytes(budget);
389        }
390        #[cfg(feature = "daachorse-index")]
391        new_engine.set_cross_rule_ac(cross_rule_ac);
392        if let Some(resolver) = resolver {
393            new_engine.set_source_resolver(resolver);
394        }
395        let stats = new_engine.load_rules()?;
396
397        if let Some(state) = old_state
398            && !new_engine.import_state(&state)
399        {
400            tracing::warn!(
401                "Incompatible correlation snapshot version during reload, starting fresh"
402            );
403        }
404
405        self.swap_engine(new_engine);
406        Ok(stats)
407    }
408
409    /// Return the rules path from the current engine.
410    pub fn rules_path(&self) -> std::path::PathBuf {
411        let snapshot = self.engine.load();
412        let engine = snapshot.lock();
413        engine.rules_path().to_path_buf()
414    }
415
416    /// Return a reference to the metrics hook.
417    pub fn metrics(&self) -> &dyn MetricsHook {
418        &*self.metrics
419    }
420
421    /// Export correlation state from the current engine.
422    pub fn export_state(&self) -> Option<rsigma_eval::CorrelationSnapshot> {
423        let snapshot = self.engine.load();
424        let engine = snapshot.lock();
425        engine.export_state()
426    }
427
428    /// Import correlation state into the current engine.
429    pub fn import_state(&self, snapshot: &rsigma_eval::CorrelationSnapshot) -> bool {
430        let guard = self.engine.load();
431        let mut engine = guard.lock();
432        engine.import_state(snapshot)
433    }
434
435    /// Return summary statistics about the current engine.
436    pub fn stats(&self) -> crate::engine::EngineStats {
437        let snapshot = self.engine.load();
438        let engine = snapshot.lock();
439        engine.stats()
440    }
441
442    /// Return an immutable snapshot of the current rule field set
443    /// (post-pipeline). The lock is held only long enough to clone the
444    /// `Arc`; the returned value remains valid across reloads.
445    pub fn rule_field_set(&self) -> Arc<RuleFieldSet> {
446        let snapshot = self.engine.load();
447        let engine = snapshot.lock();
448        engine.rule_field_set()
449    }
450}
451
452/// Produce a vec of empty `ProcessResult`, one per input line.
453fn empty_results(count: usize) -> Vec<ProcessResult> {
454    (0..count).map(|_| ProcessResult::new()).collect()
455}
456
457#[cfg(test)]
458mod tests {
459    use super::*;
460    use crate::metrics::NoopMetrics;
461    use rsigma_eval::CorrelationConfig;
462
463    fn identity_filter(v: &serde_json::Value) -> Vec<serde_json::Value> {
464        vec![v.clone()]
465    }
466
467    fn make_processor(rules_yaml: &str) -> LogProcessor {
468        let dir = tempfile::tempdir().unwrap();
469        let rule_path = dir.path().join("test.yml");
470        std::fs::write(&rule_path, rules_yaml).unwrap();
471
472        let mut engine = RuntimeEngine::new(rule_path, vec![], CorrelationConfig::default(), false);
473        engine.load_rules().unwrap();
474        // Leak the tempdir so the path stays valid
475        std::mem::forget(dir);
476        LogProcessor::new(engine, Arc::new(NoopMetrics))
477    }
478
479    #[test]
480    fn process_batch_lines_valid_json() {
481        let proc = make_processor(
482            r#"
483title: Test Rule
484status: test
485logsource:
486    category: test
487detection:
488    selection:
489        EventID: 1
490    condition: selection
491"#,
492        );
493
494        let batch = vec![
495            r#"{"EventID": 1}"#.to_string(),
496            r#"{"EventID": 2}"#.to_string(),
497        ];
498        let results = proc.process_batch_lines(&batch, &identity_filter);
499        assert_eq!(results.len(), 2);
500        assert!(results[0].detection_count() > 0, "EventID=1 should match");
501        assert!(
502            results[1].detection_count() == 0,
503            "EventID=2 should not match"
504        );
505    }
506
507    #[test]
508    fn process_batch_lines_invalid_json() {
509        let proc = make_processor(
510            r#"
511title: Test Rule
512status: test
513logsource:
514    category: test
515detection:
516    selection:
517        EventID: 1
518    condition: selection
519"#,
520        );
521
522        let batch = vec!["not json".to_string(), r#"{"EventID": 1}"#.to_string()];
523        let results = proc.process_batch_lines(&batch, &identity_filter);
524        assert_eq!(results.len(), 2);
525        assert!(
526            results[0].detection_count() == 0,
527            "invalid JSON produces empty result"
528        );
529        assert!(results[1].detection_count() > 0, "valid line still matches");
530    }
531
532    #[test]
533    fn swap_engine_replaces_rules() {
534        let dir = tempfile::tempdir().unwrap();
535        let rule_path = dir.path().join("test.yml");
536        std::fs::write(
537            &rule_path,
538            r#"
539title: Rule A
540status: test
541logsource:
542    category: test
543detection:
544    selection:
545        EventID: 1
546    condition: selection
547"#,
548        )
549        .unwrap();
550
551        let mut engine = RuntimeEngine::new(
552            rule_path.clone(),
553            vec![],
554            CorrelationConfig::default(),
555            false,
556        );
557        engine.load_rules().unwrap();
558        let proc = LogProcessor::new(engine, Arc::new(NoopMetrics));
559
560        let batch = vec![r#"{"EventID": 1}"#.to_string()];
561        assert!(proc.process_batch_lines(&batch, &identity_filter)[0].detection_count() > 0);
562
563        // Swap to a rule that matches EventID: 99
564        std::fs::write(
565            &rule_path,
566            r#"
567title: Rule B
568status: test
569logsource:
570    category: test
571detection:
572    selection:
573        EventID: 99
574    condition: selection
575"#,
576        )
577        .unwrap();
578
579        let mut new_engine =
580            RuntimeEngine::new(rule_path, vec![], CorrelationConfig::default(), false);
581        new_engine.load_rules().unwrap();
582        proc.swap_engine(new_engine);
583
584        assert!(proc.process_batch_lines(&batch, &identity_filter)[0].detection_count() == 0);
585
586        let batch2 = vec![r#"{"EventID": 99}"#.to_string()];
587        assert!(proc.process_batch_lines(&batch2, &identity_filter)[0].detection_count() > 0);
588
589        std::mem::forget(dir);
590    }
591
592    #[test]
593    fn reload_rules_preserves_bloom_tuning() {
594        // Daemon startup typically calls `set_bloom_prefilter(true)` and
595        // friends on the initial RuntimeEngine. Previously, `reload_rules`
596        // rebuilt a fresh RuntimeEngine via `RuntimeEngine::new`, which
597        // resets those flags to defaults; the daemon then silently lost
598        // bloom pre-filtering on the first hot-reload. This test pins the
599        // fix by checking the underlying engine's setters after reload.
600        let dir = tempfile::tempdir().unwrap();
601        let rule_path = dir.path().join("test.yml");
602        std::fs::write(
603            &rule_path,
604            r#"
605title: Rule A
606status: test
607logsource:
608    category: test
609detection:
610    selection:
611        EventID: 1
612    condition: selection
613"#,
614        )
615        .unwrap();
616
617        let mut engine = RuntimeEngine::new(
618            rule_path.clone(),
619            vec![],
620            CorrelationConfig::default(),
621            false,
622        );
623        engine.set_bloom_prefilter(true);
624        engine.set_bloom_max_bytes(2 * 1024 * 1024);
625        #[cfg(feature = "daachorse-index")]
626        engine.set_cross_rule_ac(true);
627        engine.load_rules().unwrap();
628
629        let proc = LogProcessor::new(engine, Arc::new(NoopMetrics));
630        proc.reload_rules().unwrap();
631
632        let snapshot = proc.engine_snapshot();
633        let reloaded = snapshot.lock();
634        assert!(
635            reloaded.bloom_prefilter(),
636            "bloom_prefilter must survive reload_rules"
637        );
638        assert_eq!(
639            reloaded.bloom_max_bytes(),
640            Some(2 * 1024 * 1024),
641            "bloom_max_bytes must survive reload_rules"
642        );
643        #[cfg(feature = "daachorse-index")]
644        assert!(
645            reloaded.cross_rule_ac(),
646            "cross_rule_ac must survive reload_rules"
647        );
648
649        std::mem::forget(dir);
650    }
651
652    #[test]
653    fn reload_rules_preserves_engine() {
654        let dir = tempfile::tempdir().unwrap();
655        let rule_path = dir.path().join("test.yml");
656        std::fs::write(
657            &rule_path,
658            r#"
659title: Rule A
660status: test
661logsource:
662    category: test
663detection:
664    selection:
665        EventID: 1
666    condition: selection
667"#,
668        )
669        .unwrap();
670
671        let mut engine = RuntimeEngine::new(
672            rule_path.clone(),
673            vec![],
674            CorrelationConfig::default(),
675            false,
676        );
677        engine.load_rules().unwrap();
678        let proc = LogProcessor::new(engine, Arc::new(NoopMetrics));
679
680        let batch = vec![r#"{"EventID": 1}"#.to_string()];
681        assert!(proc.process_batch_lines(&batch, &identity_filter)[0].detection_count() > 0);
682
683        // Update the rule file and reload
684        std::fs::write(
685            &rule_path,
686            r#"
687title: Rule B
688status: test
689logsource:
690    category: test
691detection:
692    selection:
693        EventID: 42
694    condition: selection
695"#,
696        )
697        .unwrap();
698
699        let stats = proc.reload_rules().unwrap();
700        assert_eq!(stats.detection_rules, 1);
701
702        // Old rule should no longer match
703        assert!(proc.process_batch_lines(&batch, &identity_filter)[0].detection_count() == 0);
704        // New rule should match
705        let batch2 = vec![r#"{"EventID": 42}"#.to_string()];
706        assert!(proc.process_batch_lines(&batch2, &identity_filter)[0].detection_count() > 0);
707
708        std::mem::forget(dir);
709    }
710
711    #[test]
712    fn reload_re_reads_pipelines_from_disk() {
713        let dir = tempfile::tempdir().unwrap();
714
715        // Rule uses the generic Sigma field name "SourceIP".
716        // The pipeline maps it to what the events actually contain.
717        let rule_path = dir.path().join("test.yml");
718        std::fs::write(
719            &rule_path,
720            r#"
721title: Rule A
722status: test
723logsource:
724    category: test
725detection:
726    selection:
727        SourceIP: "10.0.0.1"
728    condition: selection
729"#,
730        )
731        .unwrap();
732
733        // Pipeline maps the rule's SourceIP field to "src_ip" (event field)
734        let pipeline_path = dir.path().join("pipeline.yml");
735        std::fs::write(
736            &pipeline_path,
737            r#"
738name: Initial Pipeline
739priority: 10
740transformations:
741  - id: rename_field
742    type: field_name_mapping
743    mapping:
744      SourceIP: src_ip
745"#,
746        )
747        .unwrap();
748
749        let pipelines = vec![rsigma_eval::parse_pipeline_file(&pipeline_path).unwrap()];
750        let mut engine = RuntimeEngine::new(
751            rule_path.clone(),
752            pipelines,
753            CorrelationConfig::default(),
754            false,
755        );
756        engine.set_pipeline_paths(vec![pipeline_path.clone()]);
757        engine.load_rules().unwrap();
758        let proc = LogProcessor::new(engine, Arc::new(NoopMetrics));
759
760        // Event uses "src_ip" which the pipeline mapped from SourceIP
761        let batch = vec![r#"{"src_ip": "10.0.0.1"}"#.to_string()];
762        assert!(
763            proc.process_batch_lines(&batch, &identity_filter)[0].detection_count() > 0,
764            "src_ip should match because pipeline mapped SourceIP -> src_ip"
765        );
766
767        // Update pipeline to map SourceIP to a different event field name
768        std::fs::write(
769            &pipeline_path,
770            r#"
771name: Updated Pipeline
772priority: 10
773transformations:
774  - id: rename_field
775    type: field_name_mapping
776    mapping:
777      SourceIP: source.ip
778"#,
779        )
780        .unwrap();
781
782        proc.reload_rules().unwrap();
783
784        // src_ip no longer the target, should not match
785        assert!(
786            proc.process_batch_lines(&batch, &identity_filter)[0].detection_count() == 0,
787            "after pipeline reload, src_ip should no longer match"
788        );
789
790        // source.ip is now the mapped name, should match
791        let batch2 = vec![r#"{"source.ip": "10.0.0.1"}"#.to_string()];
792        assert!(
793            proc.process_batch_lines(&batch2, &identity_filter)[0].detection_count() > 0,
794            "after pipeline reload, source.ip should match"
795        );
796
797        std::mem::forget(dir);
798    }
799
800    #[test]
801    fn reload_with_broken_pipeline_keeps_old_engine() {
802        let dir = tempfile::tempdir().unwrap();
803        let rule_path = dir.path().join("test.yml");
804        std::fs::write(
805            &rule_path,
806            r#"
807title: Rule A
808status: test
809logsource:
810    category: test
811detection:
812    selection:
813        SourceIP: "10.0.0.1"
814    condition: selection
815"#,
816        )
817        .unwrap();
818
819        let pipeline_path = dir.path().join("pipeline.yml");
820        std::fs::write(
821            &pipeline_path,
822            r#"
823name: Working Pipeline
824priority: 10
825transformations:
826  - id: rename_field
827    type: field_name_mapping
828    mapping:
829      SourceIP: src_ip
830"#,
831        )
832        .unwrap();
833
834        let pipelines = vec![rsigma_eval::parse_pipeline_file(&pipeline_path).unwrap()];
835        let mut engine = RuntimeEngine::new(
836            rule_path.clone(),
837            pipelines,
838            CorrelationConfig::default(),
839            false,
840        );
841        engine.set_pipeline_paths(vec![pipeline_path.clone()]);
842        engine.load_rules().unwrap();
843        let proc = LogProcessor::new(engine, Arc::new(NoopMetrics));
844
845        // Verify initial state works (SourceIP mapped to src_ip)
846        let batch = vec![r#"{"src_ip": "10.0.0.1"}"#.to_string()];
847        assert!(proc.process_batch_lines(&batch, &identity_filter)[0].detection_count() > 0);
848
849        // Write broken YAML to the pipeline file
850        std::fs::write(&pipeline_path, "{{{{ invalid yaml !!!!").unwrap();
851
852        // Reload should fail
853        let result = proc.reload_rules();
854        assert!(result.is_err(), "reload with broken pipeline should fail");
855
856        // Old engine should still be active and working
857        assert!(
858            proc.process_batch_lines(&batch, &identity_filter)[0].detection_count() > 0,
859            "old engine should still work after failed reload"
860        );
861
862        std::mem::forget(dir);
863    }
864
865    #[test]
866    fn custom_event_filter() {
867        let proc = make_processor(
868            r#"
869title: Test Rule
870status: test
871logsource:
872    category: test
873detection:
874    selection:
875        EventID: 1
876    condition: selection
877"#,
878        );
879
880        // Filter that extracts a nested "records" array
881        let filter = |v: &serde_json::Value| -> Vec<serde_json::Value> {
882            if let Some(records) = v.get("records").and_then(|r| r.as_array()) {
883                records.clone()
884            } else {
885                vec![v.clone()]
886            }
887        };
888
889        let batch = vec![r#"{"records": [{"EventID": 1}, {"EventID": 2}]}"#.to_string()];
890        let results = proc.process_batch_lines(&batch, &filter);
891        assert_eq!(results.len(), 1);
892        assert_eq!(
893            results[0].detection_count(),
894            1,
895            "only EventID=1 from records array should match"
896        );
897    }
898
899    #[test]
900    fn empty_batch_returns_empty() {
901        let proc = make_processor(
902            r#"
903title: Test Rule
904status: test
905logsource:
906    category: test
907detection:
908    selection:
909        EventID: 1
910    condition: selection
911"#,
912        );
913
914        let batch: Vec<String> = vec![];
915        let results = proc.process_batch_lines(&batch, &identity_filter);
916        assert!(results.is_empty());
917    }
918
919    /// Verify MetricsHook is called correctly during processing.
920    #[test]
921    fn metrics_hook_invocations() {
922        use std::sync::atomic::{AtomicU64, Ordering};
923
924        struct CountingMetrics {
925            parse_errors: AtomicU64,
926            events_processed: AtomicU64,
927            detection_matches: AtomicU64,
928        }
929
930        impl MetricsHook for CountingMetrics {
931            fn on_parse_error(&self) {
932                self.parse_errors.fetch_add(1, Ordering::Relaxed);
933            }
934            fn on_events_processed(&self, count: u64) {
935                self.events_processed.fetch_add(count, Ordering::Relaxed);
936            }
937            fn on_detection_matches(&self, count: u64) {
938                self.detection_matches.fetch_add(count, Ordering::Relaxed);
939            }
940            fn on_correlation_matches(&self, _: u64) {}
941            fn observe_processing_latency(&self, _: f64) {}
942            fn on_input_queue_depth_change(&self, _: i64) {}
943            fn on_back_pressure(&self) {}
944            fn observe_batch_size(&self, _: u64) {}
945            fn on_output_queue_depth_change(&self, _: i64) {}
946            fn observe_pipeline_latency(&self, _: f64) {}
947            fn set_correlation_state_entries(&self, _: u64) {}
948        }
949
950        let dir = tempfile::tempdir().unwrap();
951        let rule_path = dir.path().join("test.yml");
952        std::fs::write(
953            &rule_path,
954            r#"
955title: Test Rule
956status: test
957logsource:
958    category: test
959detection:
960    selection:
961        EventID: 1
962    condition: selection
963"#,
964        )
965        .unwrap();
966
967        let mut engine = RuntimeEngine::new(rule_path, vec![], CorrelationConfig::default(), false);
968        engine.load_rules().unwrap();
969
970        let metrics = Arc::new(CountingMetrics {
971            parse_errors: AtomicU64::new(0),
972            events_processed: AtomicU64::new(0),
973            detection_matches: AtomicU64::new(0),
974        });
975        let proc = LogProcessor::new(engine, metrics.clone());
976
977        let batch = vec![
978            "not json".to_string(),
979            r#"{"EventID": 1}"#.to_string(),
980            r#"{"EventID": 2}"#.to_string(),
981        ];
982        proc.process_batch_lines(&batch, &identity_filter);
983
984        assert_eq!(metrics.parse_errors.load(Ordering::Relaxed), 1);
985        assert_eq!(metrics.events_processed.load(Ordering::Relaxed), 2);
986        assert_eq!(metrics.detection_matches.load(Ordering::Relaxed), 1);
987
988        std::mem::forget(dir);
989    }
990
991    /// Verify concurrent processing and swap don't panic (basic thread safety).
992    #[test]
993    fn concurrent_swap_and_process() {
994        let dir = tempfile::tempdir().unwrap();
995        let rule_path = dir.path().join("test.yml");
996        std::fs::write(
997            &rule_path,
998            r#"
999title: Rule A
1000status: test
1001logsource:
1002    category: test
1003detection:
1004    selection:
1005        EventID: 1
1006    condition: selection
1007"#,
1008        )
1009        .unwrap();
1010
1011        let mut engine = RuntimeEngine::new(
1012            rule_path.clone(),
1013            vec![],
1014            CorrelationConfig::default(),
1015            false,
1016        );
1017        engine.load_rules().unwrap();
1018        let proc = Arc::new(LogProcessor::new(engine, Arc::new(NoopMetrics)));
1019
1020        let handles: Vec<_> = (0..4)
1021            .map(|i| {
1022                let proc = proc.clone();
1023                let rule_path = rule_path.clone();
1024                std::thread::spawn(move || {
1025                    let batch = vec![r#"{"EventID": 1}"#.to_string()];
1026                    for _ in 0..100 {
1027                        let _ = proc.process_batch_lines(&batch, &identity_filter);
1028                    }
1029                    // Thread 0 does a swap mid-flight
1030                    if i == 0 {
1031                        let mut new_engine = RuntimeEngine::new(
1032                            rule_path,
1033                            vec![],
1034                            CorrelationConfig::default(),
1035                            false,
1036                        );
1037                        new_engine.load_rules().unwrap();
1038                        proc.swap_engine(new_engine);
1039                    }
1040                })
1041            })
1042            .collect();
1043
1044        for h in handles {
1045            h.join().unwrap();
1046        }
1047
1048        std::mem::forget(dir);
1049    }
1050
1051    // --- Tests for process_batch_with_format ---
1052
1053    #[test]
1054    fn format_json_matches() {
1055        let proc = make_processor(
1056            r#"
1057title: Test Rule
1058status: test
1059logsource:
1060    category: test
1061detection:
1062    selection:
1063        EventID: 1
1064    condition: selection
1065"#,
1066        );
1067
1068        let batch = vec![r#"{"EventID": 1}"#.to_string()];
1069        let results = proc.process_batch_with_format(&batch, &InputFormat::Json, None);
1070        assert_eq!(results.len(), 1);
1071        assert!(
1072            results[0].detection_count() > 0,
1073            "JSON EventID=1 should match"
1074        );
1075    }
1076
1077    #[test]
1078    fn format_syslog_extracts_fields() {
1079        let proc = make_processor(
1080            r#"
1081title: Syslog Test
1082status: test
1083logsource:
1084    category: test
1085detection:
1086    selection:
1087        hostname: mymachine
1088    condition: selection
1089"#,
1090        );
1091
1092        let batch = vec!["<34>Oct 11 22:14:15 mymachine su: test message".to_string()];
1093        let results = proc.process_batch_with_format(
1094            &batch,
1095            &InputFormat::Syslog(crate::input::SyslogConfig::default()),
1096            None,
1097        );
1098        assert_eq!(results.len(), 1);
1099        assert!(
1100            results[0].detection_count() > 0,
1101            "syslog hostname=mymachine should match"
1102        );
1103    }
1104
1105    #[test]
1106    fn format_plain_keyword_match() {
1107        let proc = make_processor(
1108            r#"
1109title: Keyword Test
1110status: test
1111logsource:
1112    category: test
1113detection:
1114    keywords:
1115        - "disk full"
1116    condition: keywords
1117"#,
1118        );
1119
1120        let batch = vec!["ERROR: disk full on /dev/sda1".to_string()];
1121        let results = proc.process_batch_with_format(&batch, &InputFormat::Plain, None);
1122        assert_eq!(results.len(), 1);
1123        assert!(
1124            results[0].detection_count() > 0,
1125            "plain keyword 'disk full' should match"
1126        );
1127    }
1128
1129    #[test]
1130    fn format_auto_detects_json() {
1131        let proc = make_processor(
1132            r#"
1133title: Test Rule
1134status: test
1135logsource:
1136    category: test
1137detection:
1138    selection:
1139        EventID: 1
1140    condition: selection
1141"#,
1142        );
1143
1144        let batch = vec![r#"{"EventID": 1}"#.to_string()];
1145        let results = proc.process_batch_with_format(&batch, &InputFormat::default(), None);
1146        assert_eq!(results.len(), 1);
1147        assert!(results[0].detection_count() > 0);
1148    }
1149
1150    #[test]
1151    fn format_json_with_event_filter() {
1152        let proc = make_processor(
1153            r#"
1154title: Test Rule
1155status: test
1156logsource:
1157    category: test
1158detection:
1159    selection:
1160        EventID: 1
1161    condition: selection
1162"#,
1163        );
1164
1165        let filter = |v: &serde_json::Value| -> Vec<serde_json::Value> {
1166            if let Some(records) = v.get("records").and_then(|r| r.as_array()) {
1167                records.clone()
1168            } else {
1169                vec![v.clone()]
1170            }
1171        };
1172
1173        let batch = vec![r#"{"records": [{"EventID": 1}, {"EventID": 2}]}"#.to_string()];
1174        let results = proc.process_batch_with_format(&batch, &InputFormat::Json, Some(&filter));
1175        assert_eq!(results.len(), 1);
1176        assert_eq!(
1177            results[0].detection_count(),
1178            1,
1179            "only EventID=1 from records array should match"
1180        );
1181    }
1182
1183    #[test]
1184    fn format_empty_lines_skipped() {
1185        let proc = make_processor(
1186            r#"
1187title: Test Rule
1188status: test
1189logsource:
1190    category: test
1191detection:
1192    selection:
1193        EventID: 1
1194    condition: selection
1195"#,
1196        );
1197
1198        let batch = vec![
1199            "".to_string(),
1200            "   ".to_string(),
1201            r#"{"EventID": 1}"#.to_string(),
1202        ];
1203        let results = proc.process_batch_with_format(&batch, &InputFormat::Json, None);
1204        assert_eq!(results.len(), 3);
1205        assert!(results[0].detection_count() == 0);
1206        assert!(results[1].detection_count() == 0);
1207        assert!(results[2].detection_count() > 0);
1208    }
1209
1210    #[cfg(feature = "logfmt")]
1211    #[test]
1212    fn format_logfmt_matches() {
1213        let proc = make_processor(
1214            r#"
1215title: Logfmt Test
1216status: test
1217logsource:
1218    category: test
1219detection:
1220    selection:
1221        level: error
1222    condition: selection
1223"#,
1224        );
1225
1226        let batch = vec!["level=error msg=something host=web01".to_string()];
1227        let results = proc.process_batch_with_format(&batch, &InputFormat::Logfmt, None);
1228        assert_eq!(results.len(), 1);
1229        assert!(
1230            results[0].detection_count() > 0,
1231            "logfmt level=error should match"
1232        );
1233    }
1234
1235    #[cfg(feature = "cef")]
1236    #[test]
1237    fn format_cef_matches() {
1238        let proc = make_processor(
1239            r#"
1240title: CEF Test
1241status: test
1242logsource:
1243    category: test
1244detection:
1245    selection:
1246        deviceVendor: Security
1247    condition: selection
1248"#,
1249        );
1250
1251        let batch = vec!["CEF:0|Security|IDS|1.0|100|Attack|9|src=10.0.0.1".to_string()];
1252        let results = proc.process_batch_with_format(&batch, &InputFormat::Cef, None);
1253        assert_eq!(results.len(), 1);
1254        assert!(
1255            results[0].detection_count() > 0,
1256            "CEF deviceVendor=Security should match"
1257        );
1258    }
1259}