Skip to main content

rsigma_runtime/
processor.rs

1use std::sync::Arc;
2
3use parking_lot::Mutex;
4use std::time::Instant;
5
6use arc_swap::ArcSwap;
7use rsigma_eval::{Event, FieldObserver, JsonEvent, ProcessResult, ProcessResultExt, RuleFieldSet};
8
9use crate::engine::RuntimeEngine;
10use crate::input::{EventInputDecoded, InputFormat, parse_line};
11use crate::metrics::MetricsHook;
12
13/// Closure that extracts multiple payloads from a single JSON value.
14///
15/// Used by the daemon's event filter (e.g. jq/jsonpath) to explode a JSON
16/// object into sub-events (e.g. `.records[]`). Only applies to JSON input.
17pub type EventFilter = dyn Fn(&serde_json::Value) -> Vec<serde_json::Value>;
18
19/// Thread-safe handle to the engine, swappable atomically for hot-reload.
20///
21/// Uses `ArcSwap<Mutex<RuntimeEngine>>` so that:
22/// - Detection + correlation processing can acquire `&mut RuntimeEngine` via
23///   the inner `Mutex`.
24/// - Hot-reload swaps the entire engine atomically without blocking in-flight
25///   batches (they hold an `Arc` to the old engine until their batch completes).
26pub struct LogProcessor {
27    engine: Arc<ArcSwap<Mutex<RuntimeEngine>>>,
28    metrics: Arc<dyn MetricsHook>,
29    /// Optional opt-in field observer. When `Some`, every parsed event
30    /// flowing through `process_batch_with_format` has its field keys
31    /// recorded. When `None`, the batch path skips iteration entirely
32    /// so the hot path stays untouched.
33    field_observer: ArcSwap<Option<Arc<FieldObserver>>>,
34}
35
36impl LogProcessor {
37    /// Create a new processor wrapping the given engine and metrics hook.
38    pub fn new(engine: RuntimeEngine, metrics: Arc<dyn MetricsHook>) -> Self {
39        LogProcessor {
40            engine: Arc::new(ArcSwap::from_pointee(Mutex::new(engine))),
41            metrics,
42            field_observer: ArcSwap::new(Arc::new(None)),
43        }
44    }
45
46    /// Attach (or detach) the opt-in field observer.
47    ///
48    /// When set, [`process_batch_with_format`](Self::process_batch_with_format)
49    /// records each parsed event's field keys before evaluation. Pass
50    /// `None` to disable observation; the hot path then performs zero
51    /// extra work. Safe to call at runtime: the swap is wait-free, and
52    /// in-flight batches finish against whichever observer they read.
53    pub fn set_field_observer(&self, observer: Option<Arc<FieldObserver>>) {
54        self.field_observer.store(Arc::new(observer));
55    }
56
57    /// Return the currently-attached field observer, if any.
58    pub fn field_observer(&self) -> Option<Arc<FieldObserver>> {
59        self.field_observer.load_full().as_ref().clone()
60    }
61
62    /// Atomically replace the engine with a new one.
63    ///
64    /// In-flight batches continue against the old engine (they hold an `Arc`
65    /// snapshot). New batches see the replacement on their next call to
66    /// `process_batch_lines`.
67    pub fn swap_engine(&self, new_engine: RuntimeEngine) {
68        self.engine.store(Arc::new(Mutex::new(new_engine)));
69    }
70
71    /// Load a snapshot of the current engine for use during reload.
72    ///
73    /// The caller can lock the returned guard to export state, build a new
74    /// engine, import state, and then call `swap_engine`.
75    pub fn engine_snapshot(&self) -> arc_swap::Guard<Arc<Mutex<RuntimeEngine>>> {
76        self.engine.load()
77    }
78
79    /// Process a batch of raw input lines through the engine.
80    ///
81    /// 1. Parses each line as JSON; on error, increments parse error metrics.
82    /// 2. Applies the `event_filter` closure to extract payloads.
83    /// 3. Evaluates all payloads via `RuntimeEngine::process_batch`.
84    /// 4. Merges per-payload results back into per-line results.
85    /// 5. Updates metrics (events processed, latency, match counts).
86    ///
87    /// Returns one `ProcessResult` per input line.
88    pub fn process_batch_lines(
89        &self,
90        batch: &[String],
91        event_filter: &EventFilter,
92    ) -> Vec<ProcessResult> {
93        let engine_guard = self.engine.load();
94        let mut engine = engine_guard.lock();
95
96        // Phase 1: Parse JSON and apply event filters, tracking line origin.
97        let mut parsed: Vec<(usize, Vec<serde_json::Value>)> = Vec::with_capacity(batch.len());
98        for (line_idx, line) in batch.iter().enumerate() {
99            match serde_json::from_str::<serde_json::Value>(line) {
100                Ok(value) => {
101                    let payloads = event_filter(&value);
102                    if !payloads.is_empty() {
103                        parsed.push((line_idx, payloads));
104                    }
105                }
106                Err(e) => {
107                    self.metrics.on_parse_error();
108                    tracing::debug!(error = %e, "Invalid JSON on input");
109                }
110            }
111        }
112
113        // Flatten: (line_idx, &Value) for each payload across all lines
114        let mut flat: Vec<(usize, &serde_json::Value)> = Vec::new();
115        for (line_idx, payloads) in &parsed {
116            for payload in payloads {
117                flat.push((*line_idx, payload));
118            }
119        }
120
121        if flat.is_empty() {
122            return empty_results(batch.len());
123        }
124
125        // Phase 2: Batch evaluation — parallel detection + sequential correlation
126        let events: Vec<JsonEvent> = flat.iter().map(|(_, v)| JsonEvent::borrow(v)).collect();
127        let event_refs: Vec<&JsonEvent> = events.iter().collect();
128
129        let start = Instant::now();
130        let batch_results = engine.process_batch(&event_refs);
131        let elapsed = start.elapsed().as_secs_f64();
132        let per_event_latency = elapsed / event_refs.len() as f64;
133
134        // Update correlation state metrics while we still hold the lock
135        let stats = engine.stats();
136        self.metrics
137            .set_correlation_state_entries(stats.state_entries as u64);
138
139        // Phase 3: Merge results per input line and update metrics
140        let mut line_results = empty_results(batch.len());
141
142        for ((line_idx, _), result) in flat.iter().zip(batch_results) {
143            self.metrics.on_events_processed(1);
144            self.metrics.observe_processing_latency(per_event_latency);
145            self.metrics
146                .on_detection_matches(result.detection_count() as u64);
147            self.metrics
148                .on_correlation_matches(result.correlation_count() as u64);
149
150            for r in &result {
151                let level_str = r.header.level.as_ref().map_or("unknown", |l| l.as_str());
152                let title = &r.header.rule_title;
153                match &r.body {
154                    rsigma_eval::ResultBody::Detection(_) => {
155                        self.metrics.on_detection_match_detail(title, level_str);
156                    }
157                    rsigma_eval::ResultBody::Correlation(body) => {
158                        self.metrics.on_correlation_match_detail(
159                            title,
160                            level_str,
161                            body.correlation_type.as_str(),
162                        );
163                    }
164                }
165            }
166
167            line_results[*line_idx].extend(result);
168        }
169
170        line_results
171    }
172
173    /// Process a batch of raw input lines using the specified input format.
174    ///
175    /// Unlike [`process_batch_lines`](Self::process_batch_lines), this method
176    /// supports all input formats (JSON, syslog, plain, logfmt, CEF). The
177    /// `event_filter` only applies to JSON-decoded events (it extracts multiple
178    /// payloads from one JSON object, e.g. a `records[]` array). Non-JSON
179    /// formats produce exactly one event per line.
180    ///
181    /// Returns one `ProcessResult` per input line.
182    pub fn process_batch_with_format(
183        &self,
184        batch: &[String],
185        format: &InputFormat,
186        event_filter: Option<&EventFilter>,
187    ) -> Vec<ProcessResult> {
188        let engine_guard = self.engine.load();
189        let mut engine = engine_guard.lock();
190
191        // Phase 1: Parse each line into decoded events, tracking line origin.
192        // For JSON with an event_filter, one line can produce multiple events.
193        let mut decoded_events: Vec<(usize, EventInputDecoded)> = Vec::with_capacity(batch.len());
194
195        for (line_idx, line) in batch.iter().enumerate() {
196            let Some(decoded) = parse_line(line, format) else {
197                if !line.trim().is_empty() {
198                    self.metrics.on_parse_error();
199                    tracing::debug!("Failed to parse input line");
200                }
201                continue;
202            };
203
204            // For JSON events with an event filter, apply the filter which
205            // may produce multiple payloads (e.g. `.records[]`).
206            if let Some(filter) = event_filter
207                && let EventInputDecoded::Json(ref json_event) = decoded
208            {
209                let json_value = json_event.to_json();
210                let payloads = filter(&json_value);
211                for payload in payloads {
212                    decoded_events
213                        .push((line_idx, EventInputDecoded::Json(JsonEvent::owned(payload))));
214                }
215                continue;
216            }
217
218            decoded_events.push((line_idx, decoded));
219        }
220
221        if decoded_events.is_empty() {
222            return empty_results(batch.len());
223        }
224
225        // Optional opt-in field observation. Cheap when disabled: one
226        // hazard-pointer `Guard` (no Arc clone) plus an `Option` check.
227        // When enabled, walks each decoded event's field keys before
228        // evaluation. The Guard's lifetime extends through the loop so
229        // the observer cannot be dropped mid-batch even if the daemon
230        // detaches it concurrently.
231        let observer_guard = self.field_observer.load();
232        if let Some(observer) = observer_guard.as_ref() {
233            for (_, decoded) in &decoded_events {
234                observer.observe(decoded);
235            }
236        }
237        drop(observer_guard);
238
239        // Phase 2: Batch evaluation — parallel detection + sequential correlation
240        let event_refs: Vec<&EventInputDecoded> = decoded_events.iter().map(|(_, e)| e).collect();
241
242        let start = Instant::now();
243        let batch_results = engine.process_batch(&event_refs);
244        let elapsed = start.elapsed().as_secs_f64();
245        let per_event_latency = elapsed / event_refs.len() as f64;
246
247        let stats = engine.stats();
248        self.metrics
249            .set_correlation_state_entries(stats.state_entries as u64);
250
251        // Phase 3: Merge results per input line and update metrics
252        let mut line_results = empty_results(batch.len());
253
254        for ((line_idx, _), result) in decoded_events.iter().zip(batch_results) {
255            self.metrics.on_events_processed(1);
256            self.metrics.observe_processing_latency(per_event_latency);
257            self.metrics
258                .on_detection_matches(result.detection_count() as u64);
259            self.metrics
260                .on_correlation_matches(result.correlation_count() as u64);
261
262            for r in &result {
263                let level_str = r.header.level.as_ref().map_or("unknown", |l| l.as_str());
264                let title = &r.header.rule_title;
265                match &r.body {
266                    rsigma_eval::ResultBody::Detection(_) => {
267                        self.metrics.on_detection_match_detail(title, level_str);
268                    }
269                    rsigma_eval::ResultBody::Correlation(body) => {
270                        self.metrics.on_correlation_match_detail(
271                            title,
272                            level_str,
273                            body.correlation_type.as_str(),
274                        );
275                    }
276                }
277            }
278
279            line_results[*line_idx].extend(result);
280        }
281
282        line_results
283    }
284
285    /// Reload rules (and pipelines) without blocking in-flight event processing.
286    ///
287    /// Builds a fresh `RuntimeEngine` with the same configuration as the
288    /// current one, re-reads pipeline files from disk (if paths are set),
289    /// loads rules into it, imports the old engine's correlation state, and
290    /// atomically swaps. In-flight batches that already hold an `Arc` to
291    /// the old engine finish undisturbed.
292    ///
293    /// If pipeline or rule loading fails, the old engine remains active.
294    pub fn reload_rules(&self) -> Result<crate::engine::EngineStats, String> {
295        // Snapshot the old engine's configuration AND tuning so the
296        // replacement reaches `load_rules()` with the same flags. Daemon
297        // startup typically sets `set_bloom_prefilter`/`set_bloom_max_bytes`
298        // (and `set_cross_rule_ac` behind `daachorse-index`) before the
299        // first load; carrying those across the swap keeps hot-reload from
300        // silently undoing them.
301        let snapshot = self.engine.load();
302        let old = snapshot.lock();
303        let old_state = old.export_state();
304        let rules_path = old.rules_path().to_path_buf();
305        let pipelines = old.pipelines().to_vec();
306        let pipeline_paths = old.pipeline_paths().to_vec();
307        let corr_config = old.corr_config().clone();
308        let include_event = old.include_event();
309        let resolver = old.source_resolver().cloned();
310        let allow_remote_include = old.allow_remote_include();
311        let bloom_prefilter = old.bloom_prefilter();
312        let bloom_max_bytes = old.bloom_max_bytes();
313        let match_detail = old.match_detail();
314        #[cfg(feature = "daachorse-index")]
315        let cross_rule_ac = old.cross_rule_ac();
316        drop(old);
317        drop(snapshot);
318
319        let mut new_engine = RuntimeEngine::new(rules_path, pipelines, corr_config, include_event);
320        new_engine.set_pipeline_paths(pipeline_paths);
321        new_engine.set_allow_remote_include(allow_remote_include);
322        new_engine.set_match_detail(match_detail);
323        new_engine.set_bloom_prefilter(bloom_prefilter);
324        if let Some(budget) = bloom_max_bytes {
325            new_engine.set_bloom_max_bytes(budget);
326        }
327        #[cfg(feature = "daachorse-index")]
328        new_engine.set_cross_rule_ac(cross_rule_ac);
329        if let Some(resolver) = resolver {
330            new_engine.set_source_resolver(resolver);
331        }
332        let stats = new_engine.load_rules()?;
333
334        if let Some(state) = old_state
335            && !new_engine.import_state(&state)
336        {
337            tracing::warn!(
338                "Incompatible correlation snapshot version during reload, starting fresh"
339            );
340        }
341
342        self.swap_engine(new_engine);
343        Ok(stats)
344    }
345
346    /// Return the rules path from the current engine.
347    pub fn rules_path(&self) -> std::path::PathBuf {
348        let snapshot = self.engine.load();
349        let engine = snapshot.lock();
350        engine.rules_path().to_path_buf()
351    }
352
353    /// Return a reference to the metrics hook.
354    pub fn metrics(&self) -> &dyn MetricsHook {
355        &*self.metrics
356    }
357
358    /// Export correlation state from the current engine.
359    pub fn export_state(&self) -> Option<rsigma_eval::CorrelationSnapshot> {
360        let snapshot = self.engine.load();
361        let engine = snapshot.lock();
362        engine.export_state()
363    }
364
365    /// Import correlation state into the current engine.
366    pub fn import_state(&self, snapshot: &rsigma_eval::CorrelationSnapshot) -> bool {
367        let guard = self.engine.load();
368        let mut engine = guard.lock();
369        engine.import_state(snapshot)
370    }
371
372    /// Return summary statistics about the current engine.
373    pub fn stats(&self) -> crate::engine::EngineStats {
374        let snapshot = self.engine.load();
375        let engine = snapshot.lock();
376        engine.stats()
377    }
378
379    /// Return an immutable snapshot of the current rule field set
380    /// (post-pipeline). The lock is held only long enough to clone the
381    /// `Arc`; the returned value remains valid across reloads.
382    pub fn rule_field_set(&self) -> Arc<RuleFieldSet> {
383        let snapshot = self.engine.load();
384        let engine = snapshot.lock();
385        engine.rule_field_set()
386    }
387}
388
389/// Produce a vec of empty `ProcessResult`, one per input line.
390fn empty_results(count: usize) -> Vec<ProcessResult> {
391    (0..count).map(|_| ProcessResult::new()).collect()
392}
393
394#[cfg(test)]
395mod tests {
396    use super::*;
397    use crate::metrics::NoopMetrics;
398    use rsigma_eval::CorrelationConfig;
399
400    fn identity_filter(v: &serde_json::Value) -> Vec<serde_json::Value> {
401        vec![v.clone()]
402    }
403
404    fn make_processor(rules_yaml: &str) -> LogProcessor {
405        let dir = tempfile::tempdir().unwrap();
406        let rule_path = dir.path().join("test.yml");
407        std::fs::write(&rule_path, rules_yaml).unwrap();
408
409        let mut engine = RuntimeEngine::new(rule_path, vec![], CorrelationConfig::default(), false);
410        engine.load_rules().unwrap();
411        // Leak the tempdir so the path stays valid
412        std::mem::forget(dir);
413        LogProcessor::new(engine, Arc::new(NoopMetrics))
414    }
415
416    #[test]
417    fn process_batch_lines_valid_json() {
418        let proc = make_processor(
419            r#"
420title: Test Rule
421status: test
422logsource:
423    category: test
424detection:
425    selection:
426        EventID: 1
427    condition: selection
428"#,
429        );
430
431        let batch = vec![
432            r#"{"EventID": 1}"#.to_string(),
433            r#"{"EventID": 2}"#.to_string(),
434        ];
435        let results = proc.process_batch_lines(&batch, &identity_filter);
436        assert_eq!(results.len(), 2);
437        assert!(results[0].detection_count() > 0, "EventID=1 should match");
438        assert!(
439            results[1].detection_count() == 0,
440            "EventID=2 should not match"
441        );
442    }
443
444    #[test]
445    fn process_batch_lines_invalid_json() {
446        let proc = make_processor(
447            r#"
448title: Test Rule
449status: test
450logsource:
451    category: test
452detection:
453    selection:
454        EventID: 1
455    condition: selection
456"#,
457        );
458
459        let batch = vec!["not json".to_string(), r#"{"EventID": 1}"#.to_string()];
460        let results = proc.process_batch_lines(&batch, &identity_filter);
461        assert_eq!(results.len(), 2);
462        assert!(
463            results[0].detection_count() == 0,
464            "invalid JSON produces empty result"
465        );
466        assert!(results[1].detection_count() > 0, "valid line still matches");
467    }
468
469    #[test]
470    fn swap_engine_replaces_rules() {
471        let dir = tempfile::tempdir().unwrap();
472        let rule_path = dir.path().join("test.yml");
473        std::fs::write(
474            &rule_path,
475            r#"
476title: Rule A
477status: test
478logsource:
479    category: test
480detection:
481    selection:
482        EventID: 1
483    condition: selection
484"#,
485        )
486        .unwrap();
487
488        let mut engine = RuntimeEngine::new(
489            rule_path.clone(),
490            vec![],
491            CorrelationConfig::default(),
492            false,
493        );
494        engine.load_rules().unwrap();
495        let proc = LogProcessor::new(engine, Arc::new(NoopMetrics));
496
497        let batch = vec![r#"{"EventID": 1}"#.to_string()];
498        assert!(proc.process_batch_lines(&batch, &identity_filter)[0].detection_count() > 0);
499
500        // Swap to a rule that matches EventID: 99
501        std::fs::write(
502            &rule_path,
503            r#"
504title: Rule B
505status: test
506logsource:
507    category: test
508detection:
509    selection:
510        EventID: 99
511    condition: selection
512"#,
513        )
514        .unwrap();
515
516        let mut new_engine =
517            RuntimeEngine::new(rule_path, vec![], CorrelationConfig::default(), false);
518        new_engine.load_rules().unwrap();
519        proc.swap_engine(new_engine);
520
521        assert!(proc.process_batch_lines(&batch, &identity_filter)[0].detection_count() == 0);
522
523        let batch2 = vec![r#"{"EventID": 99}"#.to_string()];
524        assert!(proc.process_batch_lines(&batch2, &identity_filter)[0].detection_count() > 0);
525
526        std::mem::forget(dir);
527    }
528
529    #[test]
530    fn reload_rules_preserves_bloom_tuning() {
531        // Daemon startup typically calls `set_bloom_prefilter(true)` and
532        // friends on the initial RuntimeEngine. Previously, `reload_rules`
533        // rebuilt a fresh RuntimeEngine via `RuntimeEngine::new`, which
534        // resets those flags to defaults; the daemon then silently lost
535        // bloom pre-filtering on the first hot-reload. This test pins the
536        // fix by checking the underlying engine's setters after reload.
537        let dir = tempfile::tempdir().unwrap();
538        let rule_path = dir.path().join("test.yml");
539        std::fs::write(
540            &rule_path,
541            r#"
542title: Rule A
543status: test
544logsource:
545    category: test
546detection:
547    selection:
548        EventID: 1
549    condition: selection
550"#,
551        )
552        .unwrap();
553
554        let mut engine = RuntimeEngine::new(
555            rule_path.clone(),
556            vec![],
557            CorrelationConfig::default(),
558            false,
559        );
560        engine.set_bloom_prefilter(true);
561        engine.set_bloom_max_bytes(2 * 1024 * 1024);
562        #[cfg(feature = "daachorse-index")]
563        engine.set_cross_rule_ac(true);
564        engine.load_rules().unwrap();
565
566        let proc = LogProcessor::new(engine, Arc::new(NoopMetrics));
567        proc.reload_rules().unwrap();
568
569        let snapshot = proc.engine_snapshot();
570        let reloaded = snapshot.lock();
571        assert!(
572            reloaded.bloom_prefilter(),
573            "bloom_prefilter must survive reload_rules"
574        );
575        assert_eq!(
576            reloaded.bloom_max_bytes(),
577            Some(2 * 1024 * 1024),
578            "bloom_max_bytes must survive reload_rules"
579        );
580        #[cfg(feature = "daachorse-index")]
581        assert!(
582            reloaded.cross_rule_ac(),
583            "cross_rule_ac must survive reload_rules"
584        );
585
586        std::mem::forget(dir);
587    }
588
589    #[test]
590    fn reload_rules_preserves_engine() {
591        let dir = tempfile::tempdir().unwrap();
592        let rule_path = dir.path().join("test.yml");
593        std::fs::write(
594            &rule_path,
595            r#"
596title: Rule A
597status: test
598logsource:
599    category: test
600detection:
601    selection:
602        EventID: 1
603    condition: selection
604"#,
605        )
606        .unwrap();
607
608        let mut engine = RuntimeEngine::new(
609            rule_path.clone(),
610            vec![],
611            CorrelationConfig::default(),
612            false,
613        );
614        engine.load_rules().unwrap();
615        let proc = LogProcessor::new(engine, Arc::new(NoopMetrics));
616
617        let batch = vec![r#"{"EventID": 1}"#.to_string()];
618        assert!(proc.process_batch_lines(&batch, &identity_filter)[0].detection_count() > 0);
619
620        // Update the rule file and reload
621        std::fs::write(
622            &rule_path,
623            r#"
624title: Rule B
625status: test
626logsource:
627    category: test
628detection:
629    selection:
630        EventID: 42
631    condition: selection
632"#,
633        )
634        .unwrap();
635
636        let stats = proc.reload_rules().unwrap();
637        assert_eq!(stats.detection_rules, 1);
638
639        // Old rule should no longer match
640        assert!(proc.process_batch_lines(&batch, &identity_filter)[0].detection_count() == 0);
641        // New rule should match
642        let batch2 = vec![r#"{"EventID": 42}"#.to_string()];
643        assert!(proc.process_batch_lines(&batch2, &identity_filter)[0].detection_count() > 0);
644
645        std::mem::forget(dir);
646    }
647
648    #[test]
649    fn reload_re_reads_pipelines_from_disk() {
650        let dir = tempfile::tempdir().unwrap();
651
652        // Rule uses the generic Sigma field name "SourceIP".
653        // The pipeline maps it to what the events actually contain.
654        let rule_path = dir.path().join("test.yml");
655        std::fs::write(
656            &rule_path,
657            r#"
658title: Rule A
659status: test
660logsource:
661    category: test
662detection:
663    selection:
664        SourceIP: "10.0.0.1"
665    condition: selection
666"#,
667        )
668        .unwrap();
669
670        // Pipeline maps the rule's SourceIP field to "src_ip" (event field)
671        let pipeline_path = dir.path().join("pipeline.yml");
672        std::fs::write(
673            &pipeline_path,
674            r#"
675name: Initial Pipeline
676priority: 10
677transformations:
678  - id: rename_field
679    type: field_name_mapping
680    mapping:
681      SourceIP: src_ip
682"#,
683        )
684        .unwrap();
685
686        let pipelines = vec![rsigma_eval::parse_pipeline_file(&pipeline_path).unwrap()];
687        let mut engine = RuntimeEngine::new(
688            rule_path.clone(),
689            pipelines,
690            CorrelationConfig::default(),
691            false,
692        );
693        engine.set_pipeline_paths(vec![pipeline_path.clone()]);
694        engine.load_rules().unwrap();
695        let proc = LogProcessor::new(engine, Arc::new(NoopMetrics));
696
697        // Event uses "src_ip" which the pipeline mapped from SourceIP
698        let batch = vec![r#"{"src_ip": "10.0.0.1"}"#.to_string()];
699        assert!(
700            proc.process_batch_lines(&batch, &identity_filter)[0].detection_count() > 0,
701            "src_ip should match because pipeline mapped SourceIP -> src_ip"
702        );
703
704        // Update pipeline to map SourceIP to a different event field name
705        std::fs::write(
706            &pipeline_path,
707            r#"
708name: Updated Pipeline
709priority: 10
710transformations:
711  - id: rename_field
712    type: field_name_mapping
713    mapping:
714      SourceIP: source.ip
715"#,
716        )
717        .unwrap();
718
719        proc.reload_rules().unwrap();
720
721        // src_ip no longer the target, should not match
722        assert!(
723            proc.process_batch_lines(&batch, &identity_filter)[0].detection_count() == 0,
724            "after pipeline reload, src_ip should no longer match"
725        );
726
727        // source.ip is now the mapped name, should match
728        let batch2 = vec![r#"{"source.ip": "10.0.0.1"}"#.to_string()];
729        assert!(
730            proc.process_batch_lines(&batch2, &identity_filter)[0].detection_count() > 0,
731            "after pipeline reload, source.ip should match"
732        );
733
734        std::mem::forget(dir);
735    }
736
737    #[test]
738    fn reload_with_broken_pipeline_keeps_old_engine() {
739        let dir = tempfile::tempdir().unwrap();
740        let rule_path = dir.path().join("test.yml");
741        std::fs::write(
742            &rule_path,
743            r#"
744title: Rule A
745status: test
746logsource:
747    category: test
748detection:
749    selection:
750        SourceIP: "10.0.0.1"
751    condition: selection
752"#,
753        )
754        .unwrap();
755
756        let pipeline_path = dir.path().join("pipeline.yml");
757        std::fs::write(
758            &pipeline_path,
759            r#"
760name: Working Pipeline
761priority: 10
762transformations:
763  - id: rename_field
764    type: field_name_mapping
765    mapping:
766      SourceIP: src_ip
767"#,
768        )
769        .unwrap();
770
771        let pipelines = vec![rsigma_eval::parse_pipeline_file(&pipeline_path).unwrap()];
772        let mut engine = RuntimeEngine::new(
773            rule_path.clone(),
774            pipelines,
775            CorrelationConfig::default(),
776            false,
777        );
778        engine.set_pipeline_paths(vec![pipeline_path.clone()]);
779        engine.load_rules().unwrap();
780        let proc = LogProcessor::new(engine, Arc::new(NoopMetrics));
781
782        // Verify initial state works (SourceIP mapped to src_ip)
783        let batch = vec![r#"{"src_ip": "10.0.0.1"}"#.to_string()];
784        assert!(proc.process_batch_lines(&batch, &identity_filter)[0].detection_count() > 0);
785
786        // Write broken YAML to the pipeline file
787        std::fs::write(&pipeline_path, "{{{{ invalid yaml !!!!").unwrap();
788
789        // Reload should fail
790        let result = proc.reload_rules();
791        assert!(result.is_err(), "reload with broken pipeline should fail");
792
793        // Old engine should still be active and working
794        assert!(
795            proc.process_batch_lines(&batch, &identity_filter)[0].detection_count() > 0,
796            "old engine should still work after failed reload"
797        );
798
799        std::mem::forget(dir);
800    }
801
802    #[test]
803    fn custom_event_filter() {
804        let proc = make_processor(
805            r#"
806title: Test Rule
807status: test
808logsource:
809    category: test
810detection:
811    selection:
812        EventID: 1
813    condition: selection
814"#,
815        );
816
817        // Filter that extracts a nested "records" array
818        let filter = |v: &serde_json::Value| -> Vec<serde_json::Value> {
819            if let Some(records) = v.get("records").and_then(|r| r.as_array()) {
820                records.clone()
821            } else {
822                vec![v.clone()]
823            }
824        };
825
826        let batch = vec![r#"{"records": [{"EventID": 1}, {"EventID": 2}]}"#.to_string()];
827        let results = proc.process_batch_lines(&batch, &filter);
828        assert_eq!(results.len(), 1);
829        assert_eq!(
830            results[0].detection_count(),
831            1,
832            "only EventID=1 from records array should match"
833        );
834    }
835
836    #[test]
837    fn empty_batch_returns_empty() {
838        let proc = make_processor(
839            r#"
840title: Test Rule
841status: test
842logsource:
843    category: test
844detection:
845    selection:
846        EventID: 1
847    condition: selection
848"#,
849        );
850
851        let batch: Vec<String> = vec![];
852        let results = proc.process_batch_lines(&batch, &identity_filter);
853        assert!(results.is_empty());
854    }
855
856    /// Verify MetricsHook is called correctly during processing.
857    #[test]
858    fn metrics_hook_invocations() {
859        use std::sync::atomic::{AtomicU64, Ordering};
860
861        struct CountingMetrics {
862            parse_errors: AtomicU64,
863            events_processed: AtomicU64,
864            detection_matches: AtomicU64,
865        }
866
867        impl MetricsHook for CountingMetrics {
868            fn on_parse_error(&self) {
869                self.parse_errors.fetch_add(1, Ordering::Relaxed);
870            }
871            fn on_events_processed(&self, count: u64) {
872                self.events_processed.fetch_add(count, Ordering::Relaxed);
873            }
874            fn on_detection_matches(&self, count: u64) {
875                self.detection_matches.fetch_add(count, Ordering::Relaxed);
876            }
877            fn on_correlation_matches(&self, _: u64) {}
878            fn observe_processing_latency(&self, _: f64) {}
879            fn on_input_queue_depth_change(&self, _: i64) {}
880            fn on_back_pressure(&self) {}
881            fn observe_batch_size(&self, _: u64) {}
882            fn on_output_queue_depth_change(&self, _: i64) {}
883            fn observe_pipeline_latency(&self, _: f64) {}
884            fn set_correlation_state_entries(&self, _: u64) {}
885        }
886
887        let dir = tempfile::tempdir().unwrap();
888        let rule_path = dir.path().join("test.yml");
889        std::fs::write(
890            &rule_path,
891            r#"
892title: Test Rule
893status: test
894logsource:
895    category: test
896detection:
897    selection:
898        EventID: 1
899    condition: selection
900"#,
901        )
902        .unwrap();
903
904        let mut engine = RuntimeEngine::new(rule_path, vec![], CorrelationConfig::default(), false);
905        engine.load_rules().unwrap();
906
907        let metrics = Arc::new(CountingMetrics {
908            parse_errors: AtomicU64::new(0),
909            events_processed: AtomicU64::new(0),
910            detection_matches: AtomicU64::new(0),
911        });
912        let proc = LogProcessor::new(engine, metrics.clone());
913
914        let batch = vec![
915            "not json".to_string(),
916            r#"{"EventID": 1}"#.to_string(),
917            r#"{"EventID": 2}"#.to_string(),
918        ];
919        proc.process_batch_lines(&batch, &identity_filter);
920
921        assert_eq!(metrics.parse_errors.load(Ordering::Relaxed), 1);
922        assert_eq!(metrics.events_processed.load(Ordering::Relaxed), 2);
923        assert_eq!(metrics.detection_matches.load(Ordering::Relaxed), 1);
924
925        std::mem::forget(dir);
926    }
927
928    /// Verify concurrent processing and swap don't panic (basic thread safety).
929    #[test]
930    fn concurrent_swap_and_process() {
931        let dir = tempfile::tempdir().unwrap();
932        let rule_path = dir.path().join("test.yml");
933        std::fs::write(
934            &rule_path,
935            r#"
936title: Rule A
937status: test
938logsource:
939    category: test
940detection:
941    selection:
942        EventID: 1
943    condition: selection
944"#,
945        )
946        .unwrap();
947
948        let mut engine = RuntimeEngine::new(
949            rule_path.clone(),
950            vec![],
951            CorrelationConfig::default(),
952            false,
953        );
954        engine.load_rules().unwrap();
955        let proc = Arc::new(LogProcessor::new(engine, Arc::new(NoopMetrics)));
956
957        let handles: Vec<_> = (0..4)
958            .map(|i| {
959                let proc = proc.clone();
960                let rule_path = rule_path.clone();
961                std::thread::spawn(move || {
962                    let batch = vec![r#"{"EventID": 1}"#.to_string()];
963                    for _ in 0..100 {
964                        let _ = proc.process_batch_lines(&batch, &identity_filter);
965                    }
966                    // Thread 0 does a swap mid-flight
967                    if i == 0 {
968                        let mut new_engine = RuntimeEngine::new(
969                            rule_path,
970                            vec![],
971                            CorrelationConfig::default(),
972                            false,
973                        );
974                        new_engine.load_rules().unwrap();
975                        proc.swap_engine(new_engine);
976                    }
977                })
978            })
979            .collect();
980
981        for h in handles {
982            h.join().unwrap();
983        }
984
985        std::mem::forget(dir);
986    }
987
988    // --- Tests for process_batch_with_format ---
989
990    #[test]
991    fn format_json_matches() {
992        let proc = make_processor(
993            r#"
994title: Test Rule
995status: test
996logsource:
997    category: test
998detection:
999    selection:
1000        EventID: 1
1001    condition: selection
1002"#,
1003        );
1004
1005        let batch = vec![r#"{"EventID": 1}"#.to_string()];
1006        let results = proc.process_batch_with_format(&batch, &InputFormat::Json, None);
1007        assert_eq!(results.len(), 1);
1008        assert!(
1009            results[0].detection_count() > 0,
1010            "JSON EventID=1 should match"
1011        );
1012    }
1013
1014    #[test]
1015    fn format_syslog_extracts_fields() {
1016        let proc = make_processor(
1017            r#"
1018title: Syslog Test
1019status: test
1020logsource:
1021    category: test
1022detection:
1023    selection:
1024        hostname: mymachine
1025    condition: selection
1026"#,
1027        );
1028
1029        let batch = vec!["<34>Oct 11 22:14:15 mymachine su: test message".to_string()];
1030        let results = proc.process_batch_with_format(
1031            &batch,
1032            &InputFormat::Syslog(crate::input::SyslogConfig::default()),
1033            None,
1034        );
1035        assert_eq!(results.len(), 1);
1036        assert!(
1037            results[0].detection_count() > 0,
1038            "syslog hostname=mymachine should match"
1039        );
1040    }
1041
1042    #[test]
1043    fn format_plain_keyword_match() {
1044        let proc = make_processor(
1045            r#"
1046title: Keyword Test
1047status: test
1048logsource:
1049    category: test
1050detection:
1051    keywords:
1052        - "disk full"
1053    condition: keywords
1054"#,
1055        );
1056
1057        let batch = vec!["ERROR: disk full on /dev/sda1".to_string()];
1058        let results = proc.process_batch_with_format(&batch, &InputFormat::Plain, None);
1059        assert_eq!(results.len(), 1);
1060        assert!(
1061            results[0].detection_count() > 0,
1062            "plain keyword 'disk full' should match"
1063        );
1064    }
1065
1066    #[test]
1067    fn format_auto_detects_json() {
1068        let proc = make_processor(
1069            r#"
1070title: Test Rule
1071status: test
1072logsource:
1073    category: test
1074detection:
1075    selection:
1076        EventID: 1
1077    condition: selection
1078"#,
1079        );
1080
1081        let batch = vec![r#"{"EventID": 1}"#.to_string()];
1082        let results = proc.process_batch_with_format(&batch, &InputFormat::default(), None);
1083        assert_eq!(results.len(), 1);
1084        assert!(results[0].detection_count() > 0);
1085    }
1086
1087    #[test]
1088    fn format_json_with_event_filter() {
1089        let proc = make_processor(
1090            r#"
1091title: Test Rule
1092status: test
1093logsource:
1094    category: test
1095detection:
1096    selection:
1097        EventID: 1
1098    condition: selection
1099"#,
1100        );
1101
1102        let filter = |v: &serde_json::Value| -> Vec<serde_json::Value> {
1103            if let Some(records) = v.get("records").and_then(|r| r.as_array()) {
1104                records.clone()
1105            } else {
1106                vec![v.clone()]
1107            }
1108        };
1109
1110        let batch = vec![r#"{"records": [{"EventID": 1}, {"EventID": 2}]}"#.to_string()];
1111        let results = proc.process_batch_with_format(&batch, &InputFormat::Json, Some(&filter));
1112        assert_eq!(results.len(), 1);
1113        assert_eq!(
1114            results[0].detection_count(),
1115            1,
1116            "only EventID=1 from records array should match"
1117        );
1118    }
1119
1120    #[test]
1121    fn format_empty_lines_skipped() {
1122        let proc = make_processor(
1123            r#"
1124title: Test Rule
1125status: test
1126logsource:
1127    category: test
1128detection:
1129    selection:
1130        EventID: 1
1131    condition: selection
1132"#,
1133        );
1134
1135        let batch = vec![
1136            "".to_string(),
1137            "   ".to_string(),
1138            r#"{"EventID": 1}"#.to_string(),
1139        ];
1140        let results = proc.process_batch_with_format(&batch, &InputFormat::Json, None);
1141        assert_eq!(results.len(), 3);
1142        assert!(results[0].detection_count() == 0);
1143        assert!(results[1].detection_count() == 0);
1144        assert!(results[2].detection_count() > 0);
1145    }
1146
1147    #[cfg(feature = "logfmt")]
1148    #[test]
1149    fn format_logfmt_matches() {
1150        let proc = make_processor(
1151            r#"
1152title: Logfmt Test
1153status: test
1154logsource:
1155    category: test
1156detection:
1157    selection:
1158        level: error
1159    condition: selection
1160"#,
1161        );
1162
1163        let batch = vec!["level=error msg=something host=web01".to_string()];
1164        let results = proc.process_batch_with_format(&batch, &InputFormat::Logfmt, None);
1165        assert_eq!(results.len(), 1);
1166        assert!(
1167            results[0].detection_count() > 0,
1168            "logfmt level=error should match"
1169        );
1170    }
1171
1172    #[cfg(feature = "cef")]
1173    #[test]
1174    fn format_cef_matches() {
1175        let proc = make_processor(
1176            r#"
1177title: CEF Test
1178status: test
1179logsource:
1180    category: test
1181detection:
1182    selection:
1183        deviceVendor: Security
1184    condition: selection
1185"#,
1186        );
1187
1188        let batch = vec!["CEF:0|Security|IDS|1.0|100|Attack|9|src=10.0.0.1".to_string()];
1189        let results = proc.process_batch_with_format(&batch, &InputFormat::Cef, None);
1190        assert_eq!(results.len(), 1);
1191        assert!(
1192            results[0].detection_count() > 0,
1193            "CEF deviceVendor=Security should match"
1194        );
1195    }
1196}