Skip to main content

rsigma_runtime/
processor.rs

1use std::sync::Arc;
2
3use parking_lot::Mutex;
4use std::time::Instant;
5
6use arc_swap::ArcSwap;
7use rsigma_eval::{Event, FieldObserver, JsonEvent, ProcessResult, ProcessResultExt, RuleFieldSet};
8
9use crate::engine::RuntimeEngine;
10use crate::input::{EventInputDecoded, InputFormat, parse_line};
11use crate::metrics::MetricsHook;
12
13/// Closure that extracts multiple payloads from a single JSON value.
14///
15/// Used by the daemon's event filter (e.g. jq/jsonpath) to explode a JSON
16/// object into sub-events (e.g. `.records[]`). Only applies to JSON input.
17pub type EventFilter = dyn Fn(&serde_json::Value) -> Vec<serde_json::Value>;
18
19/// Thread-safe handle to the engine, swappable atomically for hot-reload.
20///
21/// Uses `ArcSwap<Mutex<RuntimeEngine>>` so that:
22/// - Detection + correlation processing can acquire `&mut RuntimeEngine` via
23///   the inner `Mutex`.
24/// - Hot-reload swaps the entire engine atomically without blocking in-flight
25///   batches (they hold an `Arc` to the old engine until their batch completes).
26pub struct LogProcessor {
27    engine: Arc<ArcSwap<Mutex<RuntimeEngine>>>,
28    metrics: Arc<dyn MetricsHook>,
29    /// Optional opt-in field observer. When `Some`, every parsed event
30    /// flowing through `process_batch_with_format` has its field keys
31    /// recorded. When `None`, the batch path skips iteration entirely
32    /// so the hot path stays untouched.
33    field_observer: ArcSwap<Option<Arc<FieldObserver>>>,
34}
35
36impl LogProcessor {
37    /// Create a new processor wrapping the given engine and metrics hook.
38    pub fn new(engine: RuntimeEngine, metrics: Arc<dyn MetricsHook>) -> Self {
39        LogProcessor {
40            engine: Arc::new(ArcSwap::from_pointee(Mutex::new(engine))),
41            metrics,
42            field_observer: ArcSwap::new(Arc::new(None)),
43        }
44    }
45
46    /// Attach (or detach) the opt-in field observer.
47    ///
48    /// When set, [`process_batch_with_format`](Self::process_batch_with_format)
49    /// records each parsed event's field keys before evaluation. Pass
50    /// `None` to disable observation; the hot path then performs zero
51    /// extra work. Safe to call at runtime: the swap is wait-free, and
52    /// in-flight batches finish against whichever observer they read.
53    pub fn set_field_observer(&self, observer: Option<Arc<FieldObserver>>) {
54        self.field_observer.store(Arc::new(observer));
55    }
56
57    /// Return the currently-attached field observer, if any.
58    pub fn field_observer(&self) -> Option<Arc<FieldObserver>> {
59        self.field_observer.load_full().as_ref().clone()
60    }
61
62    /// Atomically replace the engine with a new one.
63    ///
64    /// In-flight batches continue against the old engine (they hold an `Arc`
65    /// snapshot). New batches see the replacement on their next call to
66    /// `process_batch_lines`.
67    pub fn swap_engine(&self, new_engine: RuntimeEngine) {
68        self.engine.store(Arc::new(Mutex::new(new_engine)));
69    }
70
71    /// Load a snapshot of the current engine for use during reload.
72    ///
73    /// The caller can lock the returned guard to export state, build a new
74    /// engine, import state, and then call `swap_engine`.
75    pub fn engine_snapshot(&self) -> arc_swap::Guard<Arc<Mutex<RuntimeEngine>>> {
76        self.engine.load()
77    }
78
79    /// Process a batch of raw input lines through the engine.
80    ///
81    /// 1. Parses each line as JSON; on error, increments parse error metrics.
82    /// 2. Applies the `event_filter` closure to extract payloads.
83    /// 3. Evaluates all payloads via `RuntimeEngine::process_batch`.
84    /// 4. Merges per-payload results back into per-line results.
85    /// 5. Updates metrics (events processed, latency, match counts).
86    ///
87    /// Returns one `ProcessResult` per input line.
88    pub fn process_batch_lines(
89        &self,
90        batch: &[String],
91        event_filter: &EventFilter,
92    ) -> Vec<ProcessResult> {
93        let engine_guard = self.engine.load();
94        let mut engine = engine_guard.lock();
95
96        // Phase 1: Parse JSON and apply event filters, tracking line origin.
97        let mut parsed: Vec<(usize, Vec<serde_json::Value>)> = Vec::with_capacity(batch.len());
98        for (line_idx, line) in batch.iter().enumerate() {
99            match serde_json::from_str::<serde_json::Value>(line) {
100                Ok(value) => {
101                    let payloads = event_filter(&value);
102                    if !payloads.is_empty() {
103                        parsed.push((line_idx, payloads));
104                    }
105                }
106                Err(e) => {
107                    self.metrics.on_parse_error();
108                    tracing::debug!(error = %e, "Invalid JSON on input");
109                }
110            }
111        }
112
113        // Flatten: (line_idx, &Value) for each payload across all lines
114        let mut flat: Vec<(usize, &serde_json::Value)> = Vec::new();
115        for (line_idx, payloads) in &parsed {
116            for payload in payloads {
117                flat.push((*line_idx, payload));
118            }
119        }
120
121        if flat.is_empty() {
122            return empty_results(batch.len());
123        }
124
125        // Phase 2: Batch evaluation — parallel detection + sequential correlation
126        let events: Vec<JsonEvent> = flat.iter().map(|(_, v)| JsonEvent::borrow(v)).collect();
127        let event_refs: Vec<&JsonEvent> = events.iter().collect();
128
129        let start = Instant::now();
130        let batch_results = engine.process_batch(&event_refs);
131        let elapsed = start.elapsed().as_secs_f64();
132        let per_event_latency = elapsed / event_refs.len() as f64;
133
134        // Update correlation state metrics while we still hold the lock
135        let stats = engine.stats();
136        self.metrics
137            .set_correlation_state_entries(stats.state_entries as u64);
138
139        // Phase 3: Merge results per input line and update metrics
140        let mut line_results = empty_results(batch.len());
141
142        for ((line_idx, _), result) in flat.iter().zip(batch_results) {
143            self.metrics.on_events_processed(1);
144            self.metrics.observe_processing_latency(per_event_latency);
145            self.metrics
146                .on_detection_matches(result.detection_count() as u64);
147            self.metrics
148                .on_correlation_matches(result.correlation_count() as u64);
149
150            for r in &result {
151                let level_str = r.header.level.as_ref().map_or("unknown", |l| l.as_str());
152                let title = &r.header.rule_title;
153                match &r.body {
154                    rsigma_eval::ResultBody::Detection(_) => {
155                        self.metrics.on_detection_match_detail(title, level_str);
156                    }
157                    rsigma_eval::ResultBody::Correlation(body) => {
158                        self.metrics.on_correlation_match_detail(
159                            title,
160                            level_str,
161                            body.correlation_type.as_str(),
162                        );
163                    }
164                }
165            }
166
167            line_results[*line_idx].extend(result);
168        }
169
170        line_results
171    }
172
173    /// Process a batch of raw input lines using the specified input format.
174    ///
175    /// Unlike [`process_batch_lines`](Self::process_batch_lines), this method
176    /// supports all input formats (JSON, syslog, plain, logfmt, CEF). The
177    /// `event_filter` only applies to JSON-decoded events (it extracts multiple
178    /// payloads from one JSON object, e.g. a `records[]` array). Non-JSON
179    /// formats produce exactly one event per line.
180    ///
181    /// Returns one `ProcessResult` per input line.
182    pub fn process_batch_with_format(
183        &self,
184        batch: &[String],
185        format: &InputFormat,
186        event_filter: Option<&EventFilter>,
187    ) -> Vec<ProcessResult> {
188        let engine_guard = self.engine.load();
189        let mut engine = engine_guard.lock();
190
191        // Phase 1: Parse each line into decoded events, tracking line origin.
192        // For JSON with an event_filter, one line can produce multiple events.
193        let mut decoded_events: Vec<(usize, EventInputDecoded)> = Vec::with_capacity(batch.len());
194
195        for (line_idx, line) in batch.iter().enumerate() {
196            let Some(decoded) = parse_line(line, format) else {
197                if !line.trim().is_empty() {
198                    self.metrics.on_parse_error();
199                    tracing::debug!("Failed to parse input line");
200                }
201                continue;
202            };
203
204            // For JSON events with an event filter, apply the filter which
205            // may produce multiple payloads (e.g. `.records[]`).
206            if let Some(filter) = event_filter
207                && let EventInputDecoded::Json(ref json_event) = decoded
208            {
209                let json_value = json_event.to_json();
210                let payloads = filter(&json_value);
211                for payload in payloads {
212                    decoded_events
213                        .push((line_idx, EventInputDecoded::Json(JsonEvent::owned(payload))));
214                }
215                continue;
216            }
217
218            decoded_events.push((line_idx, decoded));
219        }
220
221        if decoded_events.is_empty() {
222            return empty_results(batch.len());
223        }
224
225        // Optional opt-in field observation. Cheap when disabled: one
226        // hazard-pointer `Guard` (no Arc clone) plus an `Option` check.
227        // When enabled, walks each decoded event's field keys before
228        // evaluation. The Guard's lifetime extends through the loop so
229        // the observer cannot be dropped mid-batch even if the daemon
230        // detaches it concurrently.
231        let observer_guard = self.field_observer.load();
232        if let Some(observer) = observer_guard.as_ref() {
233            for (_, decoded) in &decoded_events {
234                observer.observe(decoded);
235            }
236        }
237        drop(observer_guard);
238
239        // Phase 2: Batch evaluation — parallel detection + sequential correlation
240        let event_refs: Vec<&EventInputDecoded> = decoded_events.iter().map(|(_, e)| e).collect();
241
242        let start = Instant::now();
243        let batch_results = engine.process_batch(&event_refs);
244        let elapsed = start.elapsed().as_secs_f64();
245        let per_event_latency = elapsed / event_refs.len() as f64;
246
247        let stats = engine.stats();
248        self.metrics
249            .set_correlation_state_entries(stats.state_entries as u64);
250
251        // Phase 3: Merge results per input line and update metrics
252        let mut line_results = empty_results(batch.len());
253
254        for ((line_idx, _), result) in decoded_events.iter().zip(batch_results) {
255            self.metrics.on_events_processed(1);
256            self.metrics.observe_processing_latency(per_event_latency);
257            self.metrics
258                .on_detection_matches(result.detection_count() as u64);
259            self.metrics
260                .on_correlation_matches(result.correlation_count() as u64);
261
262            for r in &result {
263                let level_str = r.header.level.as_ref().map_or("unknown", |l| l.as_str());
264                let title = &r.header.rule_title;
265                match &r.body {
266                    rsigma_eval::ResultBody::Detection(_) => {
267                        self.metrics.on_detection_match_detail(title, level_str);
268                    }
269                    rsigma_eval::ResultBody::Correlation(body) => {
270                        self.metrics.on_correlation_match_detail(
271                            title,
272                            level_str,
273                            body.correlation_type.as_str(),
274                        );
275                    }
276                }
277            }
278
279            line_results[*line_idx].extend(result);
280        }
281
282        line_results
283    }
284
285    /// Reload rules (and pipelines) without blocking in-flight event processing.
286    ///
287    /// Builds a fresh `RuntimeEngine` with the same configuration as the
288    /// current one, re-reads pipeline files from disk (if paths are set),
289    /// loads rules into it, imports the old engine's correlation state, and
290    /// atomically swaps. In-flight batches that already hold an `Arc` to
291    /// the old engine finish undisturbed.
292    ///
293    /// If pipeline or rule loading fails, the old engine remains active.
294    pub fn reload_rules(&self) -> Result<crate::engine::EngineStats, String> {
295        // Snapshot the old engine's configuration AND tuning so the
296        // replacement reaches `load_rules()` with the same flags. Daemon
297        // startup typically sets `set_bloom_prefilter`/`set_bloom_max_bytes`
298        // (and `set_cross_rule_ac` behind `daachorse-index`) before the
299        // first load; carrying those across the swap keeps hot-reload from
300        // silently undoing them.
301        let snapshot = self.engine.load();
302        let old = snapshot.lock();
303        let old_state = old.export_state();
304        let rules_path = old.rules_path().to_path_buf();
305        let pipelines = old.pipelines().to_vec();
306        let pipeline_paths = old.pipeline_paths().to_vec();
307        let corr_config = old.corr_config().clone();
308        let include_event = old.include_event();
309        let resolver = old.source_resolver().cloned();
310        let allow_remote_include = old.allow_remote_include();
311        let bloom_prefilter = old.bloom_prefilter();
312        let bloom_max_bytes = old.bloom_max_bytes();
313        #[cfg(feature = "daachorse-index")]
314        let cross_rule_ac = old.cross_rule_ac();
315        drop(old);
316        drop(snapshot);
317
318        let mut new_engine = RuntimeEngine::new(rules_path, pipelines, corr_config, include_event);
319        new_engine.set_pipeline_paths(pipeline_paths);
320        new_engine.set_allow_remote_include(allow_remote_include);
321        new_engine.set_bloom_prefilter(bloom_prefilter);
322        if let Some(budget) = bloom_max_bytes {
323            new_engine.set_bloom_max_bytes(budget);
324        }
325        #[cfg(feature = "daachorse-index")]
326        new_engine.set_cross_rule_ac(cross_rule_ac);
327        if let Some(resolver) = resolver {
328            new_engine.set_source_resolver(resolver);
329        }
330        let stats = new_engine.load_rules()?;
331
332        if let Some(state) = old_state
333            && !new_engine.import_state(&state)
334        {
335            tracing::warn!(
336                "Incompatible correlation snapshot version during reload, starting fresh"
337            );
338        }
339
340        self.swap_engine(new_engine);
341        Ok(stats)
342    }
343
344    /// Return the rules path from the current engine.
345    pub fn rules_path(&self) -> std::path::PathBuf {
346        let snapshot = self.engine.load();
347        let engine = snapshot.lock();
348        engine.rules_path().to_path_buf()
349    }
350
351    /// Return a reference to the metrics hook.
352    pub fn metrics(&self) -> &dyn MetricsHook {
353        &*self.metrics
354    }
355
356    /// Export correlation state from the current engine.
357    pub fn export_state(&self) -> Option<rsigma_eval::CorrelationSnapshot> {
358        let snapshot = self.engine.load();
359        let engine = snapshot.lock();
360        engine.export_state()
361    }
362
363    /// Import correlation state into the current engine.
364    pub fn import_state(&self, snapshot: &rsigma_eval::CorrelationSnapshot) -> bool {
365        let guard = self.engine.load();
366        let mut engine = guard.lock();
367        engine.import_state(snapshot)
368    }
369
370    /// Return summary statistics about the current engine.
371    pub fn stats(&self) -> crate::engine::EngineStats {
372        let snapshot = self.engine.load();
373        let engine = snapshot.lock();
374        engine.stats()
375    }
376
377    /// Return an immutable snapshot of the current rule field set
378    /// (post-pipeline). The lock is held only long enough to clone the
379    /// `Arc`; the returned value remains valid across reloads.
380    pub fn rule_field_set(&self) -> Arc<RuleFieldSet> {
381        let snapshot = self.engine.load();
382        let engine = snapshot.lock();
383        engine.rule_field_set()
384    }
385}
386
387/// Produce a vec of empty `ProcessResult`, one per input line.
388fn empty_results(count: usize) -> Vec<ProcessResult> {
389    (0..count).map(|_| ProcessResult::new()).collect()
390}
391
392#[cfg(test)]
393mod tests {
394    use super::*;
395    use crate::metrics::NoopMetrics;
396    use rsigma_eval::CorrelationConfig;
397
398    fn identity_filter(v: &serde_json::Value) -> Vec<serde_json::Value> {
399        vec![v.clone()]
400    }
401
402    fn make_processor(rules_yaml: &str) -> LogProcessor {
403        let dir = tempfile::tempdir().unwrap();
404        let rule_path = dir.path().join("test.yml");
405        std::fs::write(&rule_path, rules_yaml).unwrap();
406
407        let mut engine = RuntimeEngine::new(rule_path, vec![], CorrelationConfig::default(), false);
408        engine.load_rules().unwrap();
409        // Leak the tempdir so the path stays valid
410        std::mem::forget(dir);
411        LogProcessor::new(engine, Arc::new(NoopMetrics))
412    }
413
414    #[test]
415    fn process_batch_lines_valid_json() {
416        let proc = make_processor(
417            r#"
418title: Test Rule
419status: test
420logsource:
421    category: test
422detection:
423    selection:
424        EventID: 1
425    condition: selection
426"#,
427        );
428
429        let batch = vec![
430            r#"{"EventID": 1}"#.to_string(),
431            r#"{"EventID": 2}"#.to_string(),
432        ];
433        let results = proc.process_batch_lines(&batch, &identity_filter);
434        assert_eq!(results.len(), 2);
435        assert!(results[0].detection_count() > 0, "EventID=1 should match");
436        assert!(
437            results[1].detection_count() == 0,
438            "EventID=2 should not match"
439        );
440    }
441
442    #[test]
443    fn process_batch_lines_invalid_json() {
444        let proc = make_processor(
445            r#"
446title: Test Rule
447status: test
448logsource:
449    category: test
450detection:
451    selection:
452        EventID: 1
453    condition: selection
454"#,
455        );
456
457        let batch = vec!["not json".to_string(), r#"{"EventID": 1}"#.to_string()];
458        let results = proc.process_batch_lines(&batch, &identity_filter);
459        assert_eq!(results.len(), 2);
460        assert!(
461            results[0].detection_count() == 0,
462            "invalid JSON produces empty result"
463        );
464        assert!(results[1].detection_count() > 0, "valid line still matches");
465    }
466
467    #[test]
468    fn swap_engine_replaces_rules() {
469        let dir = tempfile::tempdir().unwrap();
470        let rule_path = dir.path().join("test.yml");
471        std::fs::write(
472            &rule_path,
473            r#"
474title: Rule A
475status: test
476logsource:
477    category: test
478detection:
479    selection:
480        EventID: 1
481    condition: selection
482"#,
483        )
484        .unwrap();
485
486        let mut engine = RuntimeEngine::new(
487            rule_path.clone(),
488            vec![],
489            CorrelationConfig::default(),
490            false,
491        );
492        engine.load_rules().unwrap();
493        let proc = LogProcessor::new(engine, Arc::new(NoopMetrics));
494
495        let batch = vec![r#"{"EventID": 1}"#.to_string()];
496        assert!(proc.process_batch_lines(&batch, &identity_filter)[0].detection_count() > 0);
497
498        // Swap to a rule that matches EventID: 99
499        std::fs::write(
500            &rule_path,
501            r#"
502title: Rule B
503status: test
504logsource:
505    category: test
506detection:
507    selection:
508        EventID: 99
509    condition: selection
510"#,
511        )
512        .unwrap();
513
514        let mut new_engine =
515            RuntimeEngine::new(rule_path, vec![], CorrelationConfig::default(), false);
516        new_engine.load_rules().unwrap();
517        proc.swap_engine(new_engine);
518
519        assert!(proc.process_batch_lines(&batch, &identity_filter)[0].detection_count() == 0);
520
521        let batch2 = vec![r#"{"EventID": 99}"#.to_string()];
522        assert!(proc.process_batch_lines(&batch2, &identity_filter)[0].detection_count() > 0);
523
524        std::mem::forget(dir);
525    }
526
527    #[test]
528    fn reload_rules_preserves_bloom_tuning() {
529        // Daemon startup typically calls `set_bloom_prefilter(true)` and
530        // friends on the initial RuntimeEngine. Previously, `reload_rules`
531        // rebuilt a fresh RuntimeEngine via `RuntimeEngine::new`, which
532        // resets those flags to defaults; the daemon then silently lost
533        // bloom pre-filtering on the first hot-reload. This test pins the
534        // fix by checking the underlying engine's setters after reload.
535        let dir = tempfile::tempdir().unwrap();
536        let rule_path = dir.path().join("test.yml");
537        std::fs::write(
538            &rule_path,
539            r#"
540title: Rule A
541status: test
542logsource:
543    category: test
544detection:
545    selection:
546        EventID: 1
547    condition: selection
548"#,
549        )
550        .unwrap();
551
552        let mut engine = RuntimeEngine::new(
553            rule_path.clone(),
554            vec![],
555            CorrelationConfig::default(),
556            false,
557        );
558        engine.set_bloom_prefilter(true);
559        engine.set_bloom_max_bytes(2 * 1024 * 1024);
560        #[cfg(feature = "daachorse-index")]
561        engine.set_cross_rule_ac(true);
562        engine.load_rules().unwrap();
563
564        let proc = LogProcessor::new(engine, Arc::new(NoopMetrics));
565        proc.reload_rules().unwrap();
566
567        let snapshot = proc.engine_snapshot();
568        let reloaded = snapshot.lock();
569        assert!(
570            reloaded.bloom_prefilter(),
571            "bloom_prefilter must survive reload_rules"
572        );
573        assert_eq!(
574            reloaded.bloom_max_bytes(),
575            Some(2 * 1024 * 1024),
576            "bloom_max_bytes must survive reload_rules"
577        );
578        #[cfg(feature = "daachorse-index")]
579        assert!(
580            reloaded.cross_rule_ac(),
581            "cross_rule_ac must survive reload_rules"
582        );
583
584        std::mem::forget(dir);
585    }
586
587    #[test]
588    fn reload_rules_preserves_engine() {
589        let dir = tempfile::tempdir().unwrap();
590        let rule_path = dir.path().join("test.yml");
591        std::fs::write(
592            &rule_path,
593            r#"
594title: Rule A
595status: test
596logsource:
597    category: test
598detection:
599    selection:
600        EventID: 1
601    condition: selection
602"#,
603        )
604        .unwrap();
605
606        let mut engine = RuntimeEngine::new(
607            rule_path.clone(),
608            vec![],
609            CorrelationConfig::default(),
610            false,
611        );
612        engine.load_rules().unwrap();
613        let proc = LogProcessor::new(engine, Arc::new(NoopMetrics));
614
615        let batch = vec![r#"{"EventID": 1}"#.to_string()];
616        assert!(proc.process_batch_lines(&batch, &identity_filter)[0].detection_count() > 0);
617
618        // Update the rule file and reload
619        std::fs::write(
620            &rule_path,
621            r#"
622title: Rule B
623status: test
624logsource:
625    category: test
626detection:
627    selection:
628        EventID: 42
629    condition: selection
630"#,
631        )
632        .unwrap();
633
634        let stats = proc.reload_rules().unwrap();
635        assert_eq!(stats.detection_rules, 1);
636
637        // Old rule should no longer match
638        assert!(proc.process_batch_lines(&batch, &identity_filter)[0].detection_count() == 0);
639        // New rule should match
640        let batch2 = vec![r#"{"EventID": 42}"#.to_string()];
641        assert!(proc.process_batch_lines(&batch2, &identity_filter)[0].detection_count() > 0);
642
643        std::mem::forget(dir);
644    }
645
646    #[test]
647    fn reload_re_reads_pipelines_from_disk() {
648        let dir = tempfile::tempdir().unwrap();
649
650        // Rule uses the generic Sigma field name "SourceIP".
651        // The pipeline maps it to what the events actually contain.
652        let rule_path = dir.path().join("test.yml");
653        std::fs::write(
654            &rule_path,
655            r#"
656title: Rule A
657status: test
658logsource:
659    category: test
660detection:
661    selection:
662        SourceIP: "10.0.0.1"
663    condition: selection
664"#,
665        )
666        .unwrap();
667
668        // Pipeline maps the rule's SourceIP field to "src_ip" (event field)
669        let pipeline_path = dir.path().join("pipeline.yml");
670        std::fs::write(
671            &pipeline_path,
672            r#"
673name: Initial Pipeline
674priority: 10
675transformations:
676  - id: rename_field
677    type: field_name_mapping
678    mapping:
679      SourceIP: src_ip
680"#,
681        )
682        .unwrap();
683
684        let pipelines = vec![rsigma_eval::parse_pipeline_file(&pipeline_path).unwrap()];
685        let mut engine = RuntimeEngine::new(
686            rule_path.clone(),
687            pipelines,
688            CorrelationConfig::default(),
689            false,
690        );
691        engine.set_pipeline_paths(vec![pipeline_path.clone()]);
692        engine.load_rules().unwrap();
693        let proc = LogProcessor::new(engine, Arc::new(NoopMetrics));
694
695        // Event uses "src_ip" which the pipeline mapped from SourceIP
696        let batch = vec![r#"{"src_ip": "10.0.0.1"}"#.to_string()];
697        assert!(
698            proc.process_batch_lines(&batch, &identity_filter)[0].detection_count() > 0,
699            "src_ip should match because pipeline mapped SourceIP -> src_ip"
700        );
701
702        // Update pipeline to map SourceIP to a different event field name
703        std::fs::write(
704            &pipeline_path,
705            r#"
706name: Updated Pipeline
707priority: 10
708transformations:
709  - id: rename_field
710    type: field_name_mapping
711    mapping:
712      SourceIP: source.ip
713"#,
714        )
715        .unwrap();
716
717        proc.reload_rules().unwrap();
718
719        // src_ip no longer the target, should not match
720        assert!(
721            proc.process_batch_lines(&batch, &identity_filter)[0].detection_count() == 0,
722            "after pipeline reload, src_ip should no longer match"
723        );
724
725        // source.ip is now the mapped name, should match
726        let batch2 = vec![r#"{"source.ip": "10.0.0.1"}"#.to_string()];
727        assert!(
728            proc.process_batch_lines(&batch2, &identity_filter)[0].detection_count() > 0,
729            "after pipeline reload, source.ip should match"
730        );
731
732        std::mem::forget(dir);
733    }
734
735    #[test]
736    fn reload_with_broken_pipeline_keeps_old_engine() {
737        let dir = tempfile::tempdir().unwrap();
738        let rule_path = dir.path().join("test.yml");
739        std::fs::write(
740            &rule_path,
741            r#"
742title: Rule A
743status: test
744logsource:
745    category: test
746detection:
747    selection:
748        SourceIP: "10.0.0.1"
749    condition: selection
750"#,
751        )
752        .unwrap();
753
754        let pipeline_path = dir.path().join("pipeline.yml");
755        std::fs::write(
756            &pipeline_path,
757            r#"
758name: Working Pipeline
759priority: 10
760transformations:
761  - id: rename_field
762    type: field_name_mapping
763    mapping:
764      SourceIP: src_ip
765"#,
766        )
767        .unwrap();
768
769        let pipelines = vec![rsigma_eval::parse_pipeline_file(&pipeline_path).unwrap()];
770        let mut engine = RuntimeEngine::new(
771            rule_path.clone(),
772            pipelines,
773            CorrelationConfig::default(),
774            false,
775        );
776        engine.set_pipeline_paths(vec![pipeline_path.clone()]);
777        engine.load_rules().unwrap();
778        let proc = LogProcessor::new(engine, Arc::new(NoopMetrics));
779
780        // Verify initial state works (SourceIP mapped to src_ip)
781        let batch = vec![r#"{"src_ip": "10.0.0.1"}"#.to_string()];
782        assert!(proc.process_batch_lines(&batch, &identity_filter)[0].detection_count() > 0);
783
784        // Write broken YAML to the pipeline file
785        std::fs::write(&pipeline_path, "{{{{ invalid yaml !!!!").unwrap();
786
787        // Reload should fail
788        let result = proc.reload_rules();
789        assert!(result.is_err(), "reload with broken pipeline should fail");
790
791        // Old engine should still be active and working
792        assert!(
793            proc.process_batch_lines(&batch, &identity_filter)[0].detection_count() > 0,
794            "old engine should still work after failed reload"
795        );
796
797        std::mem::forget(dir);
798    }
799
800    #[test]
801    fn custom_event_filter() {
802        let proc = make_processor(
803            r#"
804title: Test Rule
805status: test
806logsource:
807    category: test
808detection:
809    selection:
810        EventID: 1
811    condition: selection
812"#,
813        );
814
815        // Filter that extracts a nested "records" array
816        let filter = |v: &serde_json::Value| -> Vec<serde_json::Value> {
817            if let Some(records) = v.get("records").and_then(|r| r.as_array()) {
818                records.clone()
819            } else {
820                vec![v.clone()]
821            }
822        };
823
824        let batch = vec![r#"{"records": [{"EventID": 1}, {"EventID": 2}]}"#.to_string()];
825        let results = proc.process_batch_lines(&batch, &filter);
826        assert_eq!(results.len(), 1);
827        assert_eq!(
828            results[0].detection_count(),
829            1,
830            "only EventID=1 from records array should match"
831        );
832    }
833
834    #[test]
835    fn empty_batch_returns_empty() {
836        let proc = make_processor(
837            r#"
838title: Test Rule
839status: test
840logsource:
841    category: test
842detection:
843    selection:
844        EventID: 1
845    condition: selection
846"#,
847        );
848
849        let batch: Vec<String> = vec![];
850        let results = proc.process_batch_lines(&batch, &identity_filter);
851        assert!(results.is_empty());
852    }
853
854    /// Verify MetricsHook is called correctly during processing.
855    #[test]
856    fn metrics_hook_invocations() {
857        use std::sync::atomic::{AtomicU64, Ordering};
858
859        struct CountingMetrics {
860            parse_errors: AtomicU64,
861            events_processed: AtomicU64,
862            detection_matches: AtomicU64,
863        }
864
865        impl MetricsHook for CountingMetrics {
866            fn on_parse_error(&self) {
867                self.parse_errors.fetch_add(1, Ordering::Relaxed);
868            }
869            fn on_events_processed(&self, count: u64) {
870                self.events_processed.fetch_add(count, Ordering::Relaxed);
871            }
872            fn on_detection_matches(&self, count: u64) {
873                self.detection_matches.fetch_add(count, Ordering::Relaxed);
874            }
875            fn on_correlation_matches(&self, _: u64) {}
876            fn observe_processing_latency(&self, _: f64) {}
877            fn on_input_queue_depth_change(&self, _: i64) {}
878            fn on_back_pressure(&self) {}
879            fn observe_batch_size(&self, _: u64) {}
880            fn on_output_queue_depth_change(&self, _: i64) {}
881            fn observe_pipeline_latency(&self, _: f64) {}
882            fn set_correlation_state_entries(&self, _: u64) {}
883        }
884
885        let dir = tempfile::tempdir().unwrap();
886        let rule_path = dir.path().join("test.yml");
887        std::fs::write(
888            &rule_path,
889            r#"
890title: Test Rule
891status: test
892logsource:
893    category: test
894detection:
895    selection:
896        EventID: 1
897    condition: selection
898"#,
899        )
900        .unwrap();
901
902        let mut engine = RuntimeEngine::new(rule_path, vec![], CorrelationConfig::default(), false);
903        engine.load_rules().unwrap();
904
905        let metrics = Arc::new(CountingMetrics {
906            parse_errors: AtomicU64::new(0),
907            events_processed: AtomicU64::new(0),
908            detection_matches: AtomicU64::new(0),
909        });
910        let proc = LogProcessor::new(engine, metrics.clone());
911
912        let batch = vec![
913            "not json".to_string(),
914            r#"{"EventID": 1}"#.to_string(),
915            r#"{"EventID": 2}"#.to_string(),
916        ];
917        proc.process_batch_lines(&batch, &identity_filter);
918
919        assert_eq!(metrics.parse_errors.load(Ordering::Relaxed), 1);
920        assert_eq!(metrics.events_processed.load(Ordering::Relaxed), 2);
921        assert_eq!(metrics.detection_matches.load(Ordering::Relaxed), 1);
922
923        std::mem::forget(dir);
924    }
925
926    /// Verify concurrent processing and swap don't panic (basic thread safety).
927    #[test]
928    fn concurrent_swap_and_process() {
929        let dir = tempfile::tempdir().unwrap();
930        let rule_path = dir.path().join("test.yml");
931        std::fs::write(
932            &rule_path,
933            r#"
934title: Rule A
935status: test
936logsource:
937    category: test
938detection:
939    selection:
940        EventID: 1
941    condition: selection
942"#,
943        )
944        .unwrap();
945
946        let mut engine = RuntimeEngine::new(
947            rule_path.clone(),
948            vec![],
949            CorrelationConfig::default(),
950            false,
951        );
952        engine.load_rules().unwrap();
953        let proc = Arc::new(LogProcessor::new(engine, Arc::new(NoopMetrics)));
954
955        let handles: Vec<_> = (0..4)
956            .map(|i| {
957                let proc = proc.clone();
958                let rule_path = rule_path.clone();
959                std::thread::spawn(move || {
960                    let batch = vec![r#"{"EventID": 1}"#.to_string()];
961                    for _ in 0..100 {
962                        let _ = proc.process_batch_lines(&batch, &identity_filter);
963                    }
964                    // Thread 0 does a swap mid-flight
965                    if i == 0 {
966                        let mut new_engine = RuntimeEngine::new(
967                            rule_path,
968                            vec![],
969                            CorrelationConfig::default(),
970                            false,
971                        );
972                        new_engine.load_rules().unwrap();
973                        proc.swap_engine(new_engine);
974                    }
975                })
976            })
977            .collect();
978
979        for h in handles {
980            h.join().unwrap();
981        }
982
983        std::mem::forget(dir);
984    }
985
986    // --- Tests for process_batch_with_format ---
987
988    #[test]
989    fn format_json_matches() {
990        let proc = make_processor(
991            r#"
992title: Test Rule
993status: test
994logsource:
995    category: test
996detection:
997    selection:
998        EventID: 1
999    condition: selection
1000"#,
1001        );
1002
1003        let batch = vec![r#"{"EventID": 1}"#.to_string()];
1004        let results = proc.process_batch_with_format(&batch, &InputFormat::Json, None);
1005        assert_eq!(results.len(), 1);
1006        assert!(
1007            results[0].detection_count() > 0,
1008            "JSON EventID=1 should match"
1009        );
1010    }
1011
1012    #[test]
1013    fn format_syslog_extracts_fields() {
1014        let proc = make_processor(
1015            r#"
1016title: Syslog Test
1017status: test
1018logsource:
1019    category: test
1020detection:
1021    selection:
1022        hostname: mymachine
1023    condition: selection
1024"#,
1025        );
1026
1027        let batch = vec!["<34>Oct 11 22:14:15 mymachine su: test message".to_string()];
1028        let results = proc.process_batch_with_format(
1029            &batch,
1030            &InputFormat::Syslog(crate::input::SyslogConfig::default()),
1031            None,
1032        );
1033        assert_eq!(results.len(), 1);
1034        assert!(
1035            results[0].detection_count() > 0,
1036            "syslog hostname=mymachine should match"
1037        );
1038    }
1039
1040    #[test]
1041    fn format_plain_keyword_match() {
1042        let proc = make_processor(
1043            r#"
1044title: Keyword Test
1045status: test
1046logsource:
1047    category: test
1048detection:
1049    keywords:
1050        - "disk full"
1051    condition: keywords
1052"#,
1053        );
1054
1055        let batch = vec!["ERROR: disk full on /dev/sda1".to_string()];
1056        let results = proc.process_batch_with_format(&batch, &InputFormat::Plain, None);
1057        assert_eq!(results.len(), 1);
1058        assert!(
1059            results[0].detection_count() > 0,
1060            "plain keyword 'disk full' should match"
1061        );
1062    }
1063
1064    #[test]
1065    fn format_auto_detects_json() {
1066        let proc = make_processor(
1067            r#"
1068title: Test Rule
1069status: test
1070logsource:
1071    category: test
1072detection:
1073    selection:
1074        EventID: 1
1075    condition: selection
1076"#,
1077        );
1078
1079        let batch = vec![r#"{"EventID": 1}"#.to_string()];
1080        let results = proc.process_batch_with_format(&batch, &InputFormat::default(), None);
1081        assert_eq!(results.len(), 1);
1082        assert!(results[0].detection_count() > 0);
1083    }
1084
1085    #[test]
1086    fn format_json_with_event_filter() {
1087        let proc = make_processor(
1088            r#"
1089title: Test Rule
1090status: test
1091logsource:
1092    category: test
1093detection:
1094    selection:
1095        EventID: 1
1096    condition: selection
1097"#,
1098        );
1099
1100        let filter = |v: &serde_json::Value| -> Vec<serde_json::Value> {
1101            if let Some(records) = v.get("records").and_then(|r| r.as_array()) {
1102                records.clone()
1103            } else {
1104                vec![v.clone()]
1105            }
1106        };
1107
1108        let batch = vec![r#"{"records": [{"EventID": 1}, {"EventID": 2}]}"#.to_string()];
1109        let results = proc.process_batch_with_format(&batch, &InputFormat::Json, Some(&filter));
1110        assert_eq!(results.len(), 1);
1111        assert_eq!(
1112            results[0].detection_count(),
1113            1,
1114            "only EventID=1 from records array should match"
1115        );
1116    }
1117
1118    #[test]
1119    fn format_empty_lines_skipped() {
1120        let proc = make_processor(
1121            r#"
1122title: Test Rule
1123status: test
1124logsource:
1125    category: test
1126detection:
1127    selection:
1128        EventID: 1
1129    condition: selection
1130"#,
1131        );
1132
1133        let batch = vec![
1134            "".to_string(),
1135            "   ".to_string(),
1136            r#"{"EventID": 1}"#.to_string(),
1137        ];
1138        let results = proc.process_batch_with_format(&batch, &InputFormat::Json, None);
1139        assert_eq!(results.len(), 3);
1140        assert!(results[0].detection_count() == 0);
1141        assert!(results[1].detection_count() == 0);
1142        assert!(results[2].detection_count() > 0);
1143    }
1144
1145    #[cfg(feature = "logfmt")]
1146    #[test]
1147    fn format_logfmt_matches() {
1148        let proc = make_processor(
1149            r#"
1150title: Logfmt Test
1151status: test
1152logsource:
1153    category: test
1154detection:
1155    selection:
1156        level: error
1157    condition: selection
1158"#,
1159        );
1160
1161        let batch = vec!["level=error msg=something host=web01".to_string()];
1162        let results = proc.process_batch_with_format(&batch, &InputFormat::Logfmt, None);
1163        assert_eq!(results.len(), 1);
1164        assert!(
1165            results[0].detection_count() > 0,
1166            "logfmt level=error should match"
1167        );
1168    }
1169
1170    #[cfg(feature = "cef")]
1171    #[test]
1172    fn format_cef_matches() {
1173        let proc = make_processor(
1174            r#"
1175title: CEF Test
1176status: test
1177logsource:
1178    category: test
1179detection:
1180    selection:
1181        deviceVendor: Security
1182    condition: selection
1183"#,
1184        );
1185
1186        let batch = vec!["CEF:0|Security|IDS|1.0|100|Attack|9|src=10.0.0.1".to_string()];
1187        let results = proc.process_batch_with_format(&batch, &InputFormat::Cef, None);
1188        assert_eq!(results.len(), 1);
1189        assert!(
1190            results[0].detection_count() > 0,
1191            "CEF deviceVendor=Security should match"
1192        );
1193    }
1194}