Skip to main content

rsigma_runtime/
processor.rs

1use std::sync::Arc;
2
3use parking_lot::Mutex;
4use std::time::Instant;
5
6use arc_swap::ArcSwap;
7use rsigma_eval::{Event, JsonEvent, ProcessResult};
8
9use crate::engine::RuntimeEngine;
10use crate::input::{EventInputDecoded, InputFormat, parse_line};
11use crate::metrics::MetricsHook;
12
13/// Closure that extracts multiple payloads from a single JSON value.
14///
15/// Used by the daemon's event filter (e.g. jq/jsonpath) to explode a JSON
16/// object into sub-events (e.g. `.records[]`). Only applies to JSON input.
17pub type EventFilter = dyn Fn(&serde_json::Value) -> Vec<serde_json::Value>;
18
19/// Thread-safe handle to the engine, swappable atomically for hot-reload.
20///
21/// Uses `ArcSwap<Mutex<RuntimeEngine>>` so that:
22/// - Detection + correlation processing can acquire `&mut RuntimeEngine` via
23///   the inner `Mutex`.
24/// - Hot-reload swaps the entire engine atomically without blocking in-flight
25///   batches (they hold an `Arc` to the old engine until their batch completes).
26pub struct LogProcessor {
27    engine: Arc<ArcSwap<Mutex<RuntimeEngine>>>,
28    metrics: Arc<dyn MetricsHook>,
29}
30
31impl LogProcessor {
32    /// Create a new processor wrapping the given engine and metrics hook.
33    pub fn new(engine: RuntimeEngine, metrics: Arc<dyn MetricsHook>) -> Self {
34        LogProcessor {
35            engine: Arc::new(ArcSwap::from_pointee(Mutex::new(engine))),
36            metrics,
37        }
38    }
39
40    /// Atomically replace the engine with a new one.
41    ///
42    /// In-flight batches continue against the old engine (they hold an `Arc`
43    /// snapshot). New batches see the replacement on their next call to
44    /// `process_batch_lines`.
45    pub fn swap_engine(&self, new_engine: RuntimeEngine) {
46        self.engine.store(Arc::new(Mutex::new(new_engine)));
47    }
48
49    /// Load a snapshot of the current engine for use during reload.
50    ///
51    /// The caller can lock the returned guard to export state, build a new
52    /// engine, import state, and then call `swap_engine`.
53    pub fn engine_snapshot(&self) -> arc_swap::Guard<Arc<Mutex<RuntimeEngine>>> {
54        self.engine.load()
55    }
56
57    /// Process a batch of raw input lines through the engine.
58    ///
59    /// 1. Parses each line as JSON; on error, increments parse error metrics.
60    /// 2. Applies the `event_filter` closure to extract payloads.
61    /// 3. Evaluates all payloads via `RuntimeEngine::process_batch`.
62    /// 4. Merges per-payload results back into per-line results.
63    /// 5. Updates metrics (events processed, latency, match counts).
64    ///
65    /// Returns one `ProcessResult` per input line.
66    pub fn process_batch_lines(
67        &self,
68        batch: &[String],
69        event_filter: &EventFilter,
70    ) -> Vec<ProcessResult> {
71        let engine_guard = self.engine.load();
72        let mut engine = engine_guard.lock();
73
74        // Phase 1: Parse JSON and apply event filters, tracking line origin.
75        let mut parsed: Vec<(usize, Vec<serde_json::Value>)> = Vec::with_capacity(batch.len());
76        for (line_idx, line) in batch.iter().enumerate() {
77            match serde_json::from_str::<serde_json::Value>(line) {
78                Ok(value) => {
79                    let payloads = event_filter(&value);
80                    if !payloads.is_empty() {
81                        parsed.push((line_idx, payloads));
82                    }
83                }
84                Err(e) => {
85                    self.metrics.on_parse_error();
86                    tracing::debug!(error = %e, "Invalid JSON on input");
87                }
88            }
89        }
90
91        // Flatten: (line_idx, &Value) for each payload across all lines
92        let mut flat: Vec<(usize, &serde_json::Value)> = Vec::new();
93        for (line_idx, payloads) in &parsed {
94            for payload in payloads {
95                flat.push((*line_idx, payload));
96            }
97        }
98
99        if flat.is_empty() {
100            return empty_results(batch.len());
101        }
102
103        // Phase 2: Batch evaluation — parallel detection + sequential correlation
104        let events: Vec<JsonEvent> = flat.iter().map(|(_, v)| JsonEvent::borrow(v)).collect();
105        let event_refs: Vec<&JsonEvent> = events.iter().collect();
106
107        let start = Instant::now();
108        let batch_results = engine.process_batch(&event_refs);
109        let elapsed = start.elapsed().as_secs_f64();
110        let per_event_latency = elapsed / event_refs.len() as f64;
111
112        // Update correlation state metrics while we still hold the lock
113        let stats = engine.stats();
114        self.metrics
115            .set_correlation_state_entries(stats.state_entries as u64);
116
117        // Phase 3: Merge results per input line and update metrics
118        let mut line_results = empty_results(batch.len());
119
120        for ((line_idx, _), result) in flat.iter().zip(batch_results) {
121            self.metrics.on_events_processed(1);
122            self.metrics.observe_processing_latency(per_event_latency);
123            self.metrics
124                .on_detection_matches(result.detections.len() as u64);
125            self.metrics
126                .on_correlation_matches(result.correlations.len() as u64);
127
128            for det in &result.detections {
129                let level_str = det.level.as_ref().map_or("unknown", |l| l.as_str());
130                self.metrics
131                    .on_detection_match_detail(&det.rule_title, level_str);
132            }
133            for cor in &result.correlations {
134                let level_str = cor.level.as_ref().map_or("unknown", |l| l.as_str());
135                self.metrics.on_correlation_match_detail(
136                    &cor.rule_title,
137                    level_str,
138                    cor.correlation_type.as_str(),
139                );
140            }
141
142            line_results[*line_idx].detections.extend(result.detections);
143            line_results[*line_idx]
144                .correlations
145                .extend(result.correlations);
146        }
147
148        line_results
149    }
150
151    /// Process a batch of raw input lines using the specified input format.
152    ///
153    /// Unlike [`process_batch_lines`](Self::process_batch_lines), this method
154    /// supports all input formats (JSON, syslog, plain, logfmt, CEF). The
155    /// `event_filter` only applies to JSON-decoded events (it extracts multiple
156    /// payloads from one JSON object, e.g. a `records[]` array). Non-JSON
157    /// formats produce exactly one event per line.
158    ///
159    /// Returns one `ProcessResult` per input line.
160    pub fn process_batch_with_format(
161        &self,
162        batch: &[String],
163        format: &InputFormat,
164        event_filter: Option<&EventFilter>,
165    ) -> Vec<ProcessResult> {
166        let engine_guard = self.engine.load();
167        let mut engine = engine_guard.lock();
168
169        // Phase 1: Parse each line into decoded events, tracking line origin.
170        // For JSON with an event_filter, one line can produce multiple events.
171        let mut decoded_events: Vec<(usize, EventInputDecoded)> = Vec::with_capacity(batch.len());
172
173        for (line_idx, line) in batch.iter().enumerate() {
174            let Some(decoded) = parse_line(line, format) else {
175                if !line.trim().is_empty() {
176                    self.metrics.on_parse_error();
177                    tracing::debug!("Failed to parse input line");
178                }
179                continue;
180            };
181
182            // For JSON events with an event filter, apply the filter which
183            // may produce multiple payloads (e.g. `.records[]`).
184            if let Some(filter) = event_filter
185                && let EventInputDecoded::Json(ref json_event) = decoded
186            {
187                let json_value = json_event.to_json();
188                let payloads = filter(&json_value);
189                for payload in payloads {
190                    decoded_events
191                        .push((line_idx, EventInputDecoded::Json(JsonEvent::owned(payload))));
192                }
193                continue;
194            }
195
196            decoded_events.push((line_idx, decoded));
197        }
198
199        if decoded_events.is_empty() {
200            return empty_results(batch.len());
201        }
202
203        // Phase 2: Batch evaluation — parallel detection + sequential correlation
204        let event_refs: Vec<&EventInputDecoded> = decoded_events.iter().map(|(_, e)| e).collect();
205
206        let start = Instant::now();
207        let batch_results = engine.process_batch(&event_refs);
208        let elapsed = start.elapsed().as_secs_f64();
209        let per_event_latency = elapsed / event_refs.len() as f64;
210
211        let stats = engine.stats();
212        self.metrics
213            .set_correlation_state_entries(stats.state_entries as u64);
214
215        // Phase 3: Merge results per input line and update metrics
216        let mut line_results = empty_results(batch.len());
217
218        for ((line_idx, _), result) in decoded_events.iter().zip(batch_results) {
219            self.metrics.on_events_processed(1);
220            self.metrics.observe_processing_latency(per_event_latency);
221            self.metrics
222                .on_detection_matches(result.detections.len() as u64);
223            self.metrics
224                .on_correlation_matches(result.correlations.len() as u64);
225
226            for det in &result.detections {
227                let level_str = det.level.as_ref().map_or("unknown", |l| l.as_str());
228                self.metrics
229                    .on_detection_match_detail(&det.rule_title, level_str);
230            }
231            for cor in &result.correlations {
232                let level_str = cor.level.as_ref().map_or("unknown", |l| l.as_str());
233                self.metrics.on_correlation_match_detail(
234                    &cor.rule_title,
235                    level_str,
236                    cor.correlation_type.as_str(),
237                );
238            }
239
240            line_results[*line_idx].detections.extend(result.detections);
241            line_results[*line_idx]
242                .correlations
243                .extend(result.correlations);
244        }
245
246        line_results
247    }
248
249    /// Reload rules (and pipelines) without blocking in-flight event processing.
250    ///
251    /// Builds a fresh `RuntimeEngine` with the same configuration as the
252    /// current one, re-reads pipeline files from disk (if paths are set),
253    /// loads rules into it, imports the old engine's correlation state, and
254    /// atomically swaps. In-flight batches that already hold an `Arc` to
255    /// the old engine finish undisturbed.
256    ///
257    /// If pipeline or rule loading fails, the old engine remains active.
258    pub fn reload_rules(&self) -> Result<crate::engine::EngineStats, String> {
259        let (
260            old_state,
261            rules_path,
262            pipelines,
263            pipeline_paths,
264            corr_config,
265            include_event,
266            resolver,
267            allow_remote_include,
268        ) = {
269            let snapshot = self.engine.load();
270            let old = snapshot.lock();
271            (
272                old.export_state(),
273                old.rules_path().to_path_buf(),
274                old.pipelines().to_vec(),
275                old.pipeline_paths().to_vec(),
276                old.corr_config().clone(),
277                old.include_event(),
278                old.source_resolver().cloned(),
279                old.allow_remote_include(),
280            )
281        };
282
283        let mut new_engine = RuntimeEngine::new(rules_path, pipelines, corr_config, include_event);
284        new_engine.set_pipeline_paths(pipeline_paths);
285        new_engine.set_allow_remote_include(allow_remote_include);
286        if let Some(resolver) = resolver {
287            new_engine.set_source_resolver(resolver);
288        }
289        let stats = new_engine.load_rules()?;
290
291        if let Some(state) = old_state
292            && !new_engine.import_state(&state)
293        {
294            tracing::warn!(
295                "Incompatible correlation snapshot version during reload, starting fresh"
296            );
297        }
298
299        self.swap_engine(new_engine);
300        Ok(stats)
301    }
302
303    /// Return the rules path from the current engine.
304    pub fn rules_path(&self) -> std::path::PathBuf {
305        let snapshot = self.engine.load();
306        let engine = snapshot.lock();
307        engine.rules_path().to_path_buf()
308    }
309
310    /// Return a reference to the metrics hook.
311    pub fn metrics(&self) -> &dyn MetricsHook {
312        &*self.metrics
313    }
314
315    /// Export correlation state from the current engine.
316    pub fn export_state(&self) -> Option<rsigma_eval::CorrelationSnapshot> {
317        let snapshot = self.engine.load();
318        let engine = snapshot.lock();
319        engine.export_state()
320    }
321
322    /// Import correlation state into the current engine.
323    pub fn import_state(&self, snapshot: &rsigma_eval::CorrelationSnapshot) -> bool {
324        let guard = self.engine.load();
325        let mut engine = guard.lock();
326        engine.import_state(snapshot)
327    }
328
329    /// Return summary statistics about the current engine.
330    pub fn stats(&self) -> crate::engine::EngineStats {
331        let snapshot = self.engine.load();
332        let engine = snapshot.lock();
333        engine.stats()
334    }
335}
336
337/// Produce a vec of empty `ProcessResult`, one per input line.
338fn empty_results(count: usize) -> Vec<ProcessResult> {
339    (0..count)
340        .map(|_| ProcessResult {
341            detections: vec![],
342            correlations: vec![],
343        })
344        .collect()
345}
346
347#[cfg(test)]
348mod tests {
349    use super::*;
350    use crate::metrics::NoopMetrics;
351    use rsigma_eval::CorrelationConfig;
352
353    fn identity_filter(v: &serde_json::Value) -> Vec<serde_json::Value> {
354        vec![v.clone()]
355    }
356
357    fn make_processor(rules_yaml: &str) -> LogProcessor {
358        let dir = tempfile::tempdir().unwrap();
359        let rule_path = dir.path().join("test.yml");
360        std::fs::write(&rule_path, rules_yaml).unwrap();
361
362        let mut engine = RuntimeEngine::new(rule_path, vec![], CorrelationConfig::default(), false);
363        engine.load_rules().unwrap();
364        // Leak the tempdir so the path stays valid
365        std::mem::forget(dir);
366        LogProcessor::new(engine, Arc::new(NoopMetrics))
367    }
368
369    #[test]
370    fn process_batch_lines_valid_json() {
371        let proc = make_processor(
372            r#"
373title: Test Rule
374status: test
375logsource:
376    category: test
377detection:
378    selection:
379        EventID: 1
380    condition: selection
381"#,
382        );
383
384        let batch = vec![
385            r#"{"EventID": 1}"#.to_string(),
386            r#"{"EventID": 2}"#.to_string(),
387        ];
388        let results = proc.process_batch_lines(&batch, &identity_filter);
389        assert_eq!(results.len(), 2);
390        assert!(!results[0].detections.is_empty(), "EventID=1 should match");
391        assert!(
392            results[1].detections.is_empty(),
393            "EventID=2 should not match"
394        );
395    }
396
397    #[test]
398    fn process_batch_lines_invalid_json() {
399        let proc = make_processor(
400            r#"
401title: Test Rule
402status: test
403logsource:
404    category: test
405detection:
406    selection:
407        EventID: 1
408    condition: selection
409"#,
410        );
411
412        let batch = vec!["not json".to_string(), r#"{"EventID": 1}"#.to_string()];
413        let results = proc.process_batch_lines(&batch, &identity_filter);
414        assert_eq!(results.len(), 2);
415        assert!(
416            results[0].detections.is_empty(),
417            "invalid JSON produces empty result"
418        );
419        assert!(
420            !results[1].detections.is_empty(),
421            "valid line still matches"
422        );
423    }
424
425    #[test]
426    fn swap_engine_replaces_rules() {
427        let dir = tempfile::tempdir().unwrap();
428        let rule_path = dir.path().join("test.yml");
429        std::fs::write(
430            &rule_path,
431            r#"
432title: Rule A
433status: test
434logsource:
435    category: test
436detection:
437    selection:
438        EventID: 1
439    condition: selection
440"#,
441        )
442        .unwrap();
443
444        let mut engine = RuntimeEngine::new(
445            rule_path.clone(),
446            vec![],
447            CorrelationConfig::default(),
448            false,
449        );
450        engine.load_rules().unwrap();
451        let proc = LogProcessor::new(engine, Arc::new(NoopMetrics));
452
453        let batch = vec![r#"{"EventID": 1}"#.to_string()];
454        assert!(
455            !proc.process_batch_lines(&batch, &identity_filter)[0]
456                .detections
457                .is_empty()
458        );
459
460        // Swap to a rule that matches EventID: 99
461        std::fs::write(
462            &rule_path,
463            r#"
464title: Rule B
465status: test
466logsource:
467    category: test
468detection:
469    selection:
470        EventID: 99
471    condition: selection
472"#,
473        )
474        .unwrap();
475
476        let mut new_engine =
477            RuntimeEngine::new(rule_path, vec![], CorrelationConfig::default(), false);
478        new_engine.load_rules().unwrap();
479        proc.swap_engine(new_engine);
480
481        assert!(
482            proc.process_batch_lines(&batch, &identity_filter)[0]
483                .detections
484                .is_empty()
485        );
486
487        let batch2 = vec![r#"{"EventID": 99}"#.to_string()];
488        assert!(
489            !proc.process_batch_lines(&batch2, &identity_filter)[0]
490                .detections
491                .is_empty()
492        );
493
494        std::mem::forget(dir);
495    }
496
497    #[test]
498    fn reload_rules_preserves_engine() {
499        let dir = tempfile::tempdir().unwrap();
500        let rule_path = dir.path().join("test.yml");
501        std::fs::write(
502            &rule_path,
503            r#"
504title: Rule A
505status: test
506logsource:
507    category: test
508detection:
509    selection:
510        EventID: 1
511    condition: selection
512"#,
513        )
514        .unwrap();
515
516        let mut engine = RuntimeEngine::new(
517            rule_path.clone(),
518            vec![],
519            CorrelationConfig::default(),
520            false,
521        );
522        engine.load_rules().unwrap();
523        let proc = LogProcessor::new(engine, Arc::new(NoopMetrics));
524
525        let batch = vec![r#"{"EventID": 1}"#.to_string()];
526        assert!(
527            !proc.process_batch_lines(&batch, &identity_filter)[0]
528                .detections
529                .is_empty()
530        );
531
532        // Update the rule file and reload
533        std::fs::write(
534            &rule_path,
535            r#"
536title: Rule B
537status: test
538logsource:
539    category: test
540detection:
541    selection:
542        EventID: 42
543    condition: selection
544"#,
545        )
546        .unwrap();
547
548        let stats = proc.reload_rules().unwrap();
549        assert_eq!(stats.detection_rules, 1);
550
551        // Old rule should no longer match
552        assert!(
553            proc.process_batch_lines(&batch, &identity_filter)[0]
554                .detections
555                .is_empty()
556        );
557        // New rule should match
558        let batch2 = vec![r#"{"EventID": 42}"#.to_string()];
559        assert!(
560            !proc.process_batch_lines(&batch2, &identity_filter)[0]
561                .detections
562                .is_empty()
563        );
564
565        std::mem::forget(dir);
566    }
567
568    #[test]
569    fn reload_re_reads_pipelines_from_disk() {
570        let dir = tempfile::tempdir().unwrap();
571
572        // Rule uses the generic Sigma field name "SourceIP".
573        // The pipeline maps it to what the events actually contain.
574        let rule_path = dir.path().join("test.yml");
575        std::fs::write(
576            &rule_path,
577            r#"
578title: Rule A
579status: test
580logsource:
581    category: test
582detection:
583    selection:
584        SourceIP: "10.0.0.1"
585    condition: selection
586"#,
587        )
588        .unwrap();
589
590        // Pipeline maps the rule's SourceIP field to "src_ip" (event field)
591        let pipeline_path = dir.path().join("pipeline.yml");
592        std::fs::write(
593            &pipeline_path,
594            r#"
595name: Initial Pipeline
596priority: 10
597transformations:
598  - id: rename_field
599    type: field_name_mapping
600    mapping:
601      SourceIP: src_ip
602"#,
603        )
604        .unwrap();
605
606        let pipelines = vec![rsigma_eval::parse_pipeline_file(&pipeline_path).unwrap()];
607        let mut engine = RuntimeEngine::new(
608            rule_path.clone(),
609            pipelines,
610            CorrelationConfig::default(),
611            false,
612        );
613        engine.set_pipeline_paths(vec![pipeline_path.clone()]);
614        engine.load_rules().unwrap();
615        let proc = LogProcessor::new(engine, Arc::new(NoopMetrics));
616
617        // Event uses "src_ip" which the pipeline mapped from SourceIP
618        let batch = vec![r#"{"src_ip": "10.0.0.1"}"#.to_string()];
619        assert!(
620            !proc.process_batch_lines(&batch, &identity_filter)[0]
621                .detections
622                .is_empty(),
623            "src_ip should match because pipeline mapped SourceIP -> src_ip"
624        );
625
626        // Update pipeline to map SourceIP to a different event field name
627        std::fs::write(
628            &pipeline_path,
629            r#"
630name: Updated Pipeline
631priority: 10
632transformations:
633  - id: rename_field
634    type: field_name_mapping
635    mapping:
636      SourceIP: source.ip
637"#,
638        )
639        .unwrap();
640
641        proc.reload_rules().unwrap();
642
643        // src_ip no longer the target, should not match
644        assert!(
645            proc.process_batch_lines(&batch, &identity_filter)[0]
646                .detections
647                .is_empty(),
648            "after pipeline reload, src_ip should no longer match"
649        );
650
651        // source.ip is now the mapped name, should match
652        let batch2 = vec![r#"{"source.ip": "10.0.0.1"}"#.to_string()];
653        assert!(
654            !proc.process_batch_lines(&batch2, &identity_filter)[0]
655                .detections
656                .is_empty(),
657            "after pipeline reload, source.ip should match"
658        );
659
660        std::mem::forget(dir);
661    }
662
663    #[test]
664    fn reload_with_broken_pipeline_keeps_old_engine() {
665        let dir = tempfile::tempdir().unwrap();
666        let rule_path = dir.path().join("test.yml");
667        std::fs::write(
668            &rule_path,
669            r#"
670title: Rule A
671status: test
672logsource:
673    category: test
674detection:
675    selection:
676        SourceIP: "10.0.0.1"
677    condition: selection
678"#,
679        )
680        .unwrap();
681
682        let pipeline_path = dir.path().join("pipeline.yml");
683        std::fs::write(
684            &pipeline_path,
685            r#"
686name: Working Pipeline
687priority: 10
688transformations:
689  - id: rename_field
690    type: field_name_mapping
691    mapping:
692      SourceIP: src_ip
693"#,
694        )
695        .unwrap();
696
697        let pipelines = vec![rsigma_eval::parse_pipeline_file(&pipeline_path).unwrap()];
698        let mut engine = RuntimeEngine::new(
699            rule_path.clone(),
700            pipelines,
701            CorrelationConfig::default(),
702            false,
703        );
704        engine.set_pipeline_paths(vec![pipeline_path.clone()]);
705        engine.load_rules().unwrap();
706        let proc = LogProcessor::new(engine, Arc::new(NoopMetrics));
707
708        // Verify initial state works (SourceIP mapped to src_ip)
709        let batch = vec![r#"{"src_ip": "10.0.0.1"}"#.to_string()];
710        assert!(
711            !proc.process_batch_lines(&batch, &identity_filter)[0]
712                .detections
713                .is_empty()
714        );
715
716        // Write broken YAML to the pipeline file
717        std::fs::write(&pipeline_path, "{{{{ invalid yaml !!!!").unwrap();
718
719        // Reload should fail
720        let result = proc.reload_rules();
721        assert!(result.is_err(), "reload with broken pipeline should fail");
722
723        // Old engine should still be active and working
724        assert!(
725            !proc.process_batch_lines(&batch, &identity_filter)[0]
726                .detections
727                .is_empty(),
728            "old engine should still work after failed reload"
729        );
730
731        std::mem::forget(dir);
732    }
733
734    #[test]
735    fn custom_event_filter() {
736        let proc = make_processor(
737            r#"
738title: Test Rule
739status: test
740logsource:
741    category: test
742detection:
743    selection:
744        EventID: 1
745    condition: selection
746"#,
747        );
748
749        // Filter that extracts a nested "records" array
750        let filter = |v: &serde_json::Value| -> Vec<serde_json::Value> {
751            if let Some(records) = v.get("records").and_then(|r| r.as_array()) {
752                records.clone()
753            } else {
754                vec![v.clone()]
755            }
756        };
757
758        let batch = vec![r#"{"records": [{"EventID": 1}, {"EventID": 2}]}"#.to_string()];
759        let results = proc.process_batch_lines(&batch, &filter);
760        assert_eq!(results.len(), 1);
761        assert_eq!(
762            results[0].detections.len(),
763            1,
764            "only EventID=1 from records array should match"
765        );
766    }
767
768    #[test]
769    fn empty_batch_returns_empty() {
770        let proc = make_processor(
771            r#"
772title: Test Rule
773status: test
774logsource:
775    category: test
776detection:
777    selection:
778        EventID: 1
779    condition: selection
780"#,
781        );
782
783        let batch: Vec<String> = vec![];
784        let results = proc.process_batch_lines(&batch, &identity_filter);
785        assert!(results.is_empty());
786    }
787
788    /// Verify MetricsHook is called correctly during processing.
789    #[test]
790    fn metrics_hook_invocations() {
791        use std::sync::atomic::{AtomicU64, Ordering};
792
793        struct CountingMetrics {
794            parse_errors: AtomicU64,
795            events_processed: AtomicU64,
796            detection_matches: AtomicU64,
797        }
798
799        impl MetricsHook for CountingMetrics {
800            fn on_parse_error(&self) {
801                self.parse_errors.fetch_add(1, Ordering::Relaxed);
802            }
803            fn on_events_processed(&self, count: u64) {
804                self.events_processed.fetch_add(count, Ordering::Relaxed);
805            }
806            fn on_detection_matches(&self, count: u64) {
807                self.detection_matches.fetch_add(count, Ordering::Relaxed);
808            }
809            fn on_correlation_matches(&self, _: u64) {}
810            fn observe_processing_latency(&self, _: f64) {}
811            fn on_input_queue_depth_change(&self, _: i64) {}
812            fn on_back_pressure(&self) {}
813            fn observe_batch_size(&self, _: u64) {}
814            fn on_output_queue_depth_change(&self, _: i64) {}
815            fn observe_pipeline_latency(&self, _: f64) {}
816            fn set_correlation_state_entries(&self, _: u64) {}
817        }
818
819        let dir = tempfile::tempdir().unwrap();
820        let rule_path = dir.path().join("test.yml");
821        std::fs::write(
822            &rule_path,
823            r#"
824title: Test Rule
825status: test
826logsource:
827    category: test
828detection:
829    selection:
830        EventID: 1
831    condition: selection
832"#,
833        )
834        .unwrap();
835
836        let mut engine = RuntimeEngine::new(rule_path, vec![], CorrelationConfig::default(), false);
837        engine.load_rules().unwrap();
838
839        let metrics = Arc::new(CountingMetrics {
840            parse_errors: AtomicU64::new(0),
841            events_processed: AtomicU64::new(0),
842            detection_matches: AtomicU64::new(0),
843        });
844        let proc = LogProcessor::new(engine, metrics.clone());
845
846        let batch = vec![
847            "not json".to_string(),
848            r#"{"EventID": 1}"#.to_string(),
849            r#"{"EventID": 2}"#.to_string(),
850        ];
851        proc.process_batch_lines(&batch, &identity_filter);
852
853        assert_eq!(metrics.parse_errors.load(Ordering::Relaxed), 1);
854        assert_eq!(metrics.events_processed.load(Ordering::Relaxed), 2);
855        assert_eq!(metrics.detection_matches.load(Ordering::Relaxed), 1);
856
857        std::mem::forget(dir);
858    }
859
860    /// Verify concurrent processing and swap don't panic (basic thread safety).
861    #[test]
862    fn concurrent_swap_and_process() {
863        let dir = tempfile::tempdir().unwrap();
864        let rule_path = dir.path().join("test.yml");
865        std::fs::write(
866            &rule_path,
867            r#"
868title: Rule A
869status: test
870logsource:
871    category: test
872detection:
873    selection:
874        EventID: 1
875    condition: selection
876"#,
877        )
878        .unwrap();
879
880        let mut engine = RuntimeEngine::new(
881            rule_path.clone(),
882            vec![],
883            CorrelationConfig::default(),
884            false,
885        );
886        engine.load_rules().unwrap();
887        let proc = Arc::new(LogProcessor::new(engine, Arc::new(NoopMetrics)));
888
889        let handles: Vec<_> = (0..4)
890            .map(|i| {
891                let proc = proc.clone();
892                let rule_path = rule_path.clone();
893                std::thread::spawn(move || {
894                    let batch = vec![r#"{"EventID": 1}"#.to_string()];
895                    for _ in 0..100 {
896                        let _ = proc.process_batch_lines(&batch, &identity_filter);
897                    }
898                    // Thread 0 does a swap mid-flight
899                    if i == 0 {
900                        let mut new_engine = RuntimeEngine::new(
901                            rule_path,
902                            vec![],
903                            CorrelationConfig::default(),
904                            false,
905                        );
906                        new_engine.load_rules().unwrap();
907                        proc.swap_engine(new_engine);
908                    }
909                })
910            })
911            .collect();
912
913        for h in handles {
914            h.join().unwrap();
915        }
916
917        std::mem::forget(dir);
918    }
919
920    // --- Tests for process_batch_with_format ---
921
922    #[test]
923    fn format_json_matches() {
924        let proc = make_processor(
925            r#"
926title: Test Rule
927status: test
928logsource:
929    category: test
930detection:
931    selection:
932        EventID: 1
933    condition: selection
934"#,
935        );
936
937        let batch = vec![r#"{"EventID": 1}"#.to_string()];
938        let results = proc.process_batch_with_format(&batch, &InputFormat::Json, None);
939        assert_eq!(results.len(), 1);
940        assert!(
941            !results[0].detections.is_empty(),
942            "JSON EventID=1 should match"
943        );
944    }
945
946    #[test]
947    fn format_syslog_extracts_fields() {
948        let proc = make_processor(
949            r#"
950title: Syslog Test
951status: test
952logsource:
953    category: test
954detection:
955    selection:
956        hostname: mymachine
957    condition: selection
958"#,
959        );
960
961        let batch = vec!["<34>Oct 11 22:14:15 mymachine su: test message".to_string()];
962        let results = proc.process_batch_with_format(
963            &batch,
964            &InputFormat::Syslog(crate::input::SyslogConfig::default()),
965            None,
966        );
967        assert_eq!(results.len(), 1);
968        assert!(
969            !results[0].detections.is_empty(),
970            "syslog hostname=mymachine should match"
971        );
972    }
973
974    #[test]
975    fn format_plain_keyword_match() {
976        let proc = make_processor(
977            r#"
978title: Keyword Test
979status: test
980logsource:
981    category: test
982detection:
983    keywords:
984        - "disk full"
985    condition: keywords
986"#,
987        );
988
989        let batch = vec!["ERROR: disk full on /dev/sda1".to_string()];
990        let results = proc.process_batch_with_format(&batch, &InputFormat::Plain, None);
991        assert_eq!(results.len(), 1);
992        assert!(
993            !results[0].detections.is_empty(),
994            "plain keyword 'disk full' should match"
995        );
996    }
997
998    #[test]
999    fn format_auto_detects_json() {
1000        let proc = make_processor(
1001            r#"
1002title: Test Rule
1003status: test
1004logsource:
1005    category: test
1006detection:
1007    selection:
1008        EventID: 1
1009    condition: selection
1010"#,
1011        );
1012
1013        let batch = vec![r#"{"EventID": 1}"#.to_string()];
1014        let results = proc.process_batch_with_format(&batch, &InputFormat::default(), None);
1015        assert_eq!(results.len(), 1);
1016        assert!(!results[0].detections.is_empty());
1017    }
1018
1019    #[test]
1020    fn format_json_with_event_filter() {
1021        let proc = make_processor(
1022            r#"
1023title: Test Rule
1024status: test
1025logsource:
1026    category: test
1027detection:
1028    selection:
1029        EventID: 1
1030    condition: selection
1031"#,
1032        );
1033
1034        let filter = |v: &serde_json::Value| -> Vec<serde_json::Value> {
1035            if let Some(records) = v.get("records").and_then(|r| r.as_array()) {
1036                records.clone()
1037            } else {
1038                vec![v.clone()]
1039            }
1040        };
1041
1042        let batch = vec![r#"{"records": [{"EventID": 1}, {"EventID": 2}]}"#.to_string()];
1043        let results = proc.process_batch_with_format(&batch, &InputFormat::Json, Some(&filter));
1044        assert_eq!(results.len(), 1);
1045        assert_eq!(
1046            results[0].detections.len(),
1047            1,
1048            "only EventID=1 from records array should match"
1049        );
1050    }
1051
1052    #[test]
1053    fn format_empty_lines_skipped() {
1054        let proc = make_processor(
1055            r#"
1056title: Test Rule
1057status: test
1058logsource:
1059    category: test
1060detection:
1061    selection:
1062        EventID: 1
1063    condition: selection
1064"#,
1065        );
1066
1067        let batch = vec![
1068            "".to_string(),
1069            "   ".to_string(),
1070            r#"{"EventID": 1}"#.to_string(),
1071        ];
1072        let results = proc.process_batch_with_format(&batch, &InputFormat::Json, None);
1073        assert_eq!(results.len(), 3);
1074        assert!(results[0].detections.is_empty());
1075        assert!(results[1].detections.is_empty());
1076        assert!(!results[2].detections.is_empty());
1077    }
1078
1079    #[cfg(feature = "logfmt")]
1080    #[test]
1081    fn format_logfmt_matches() {
1082        let proc = make_processor(
1083            r#"
1084title: Logfmt Test
1085status: test
1086logsource:
1087    category: test
1088detection:
1089    selection:
1090        level: error
1091    condition: selection
1092"#,
1093        );
1094
1095        let batch = vec!["level=error msg=something host=web01".to_string()];
1096        let results = proc.process_batch_with_format(&batch, &InputFormat::Logfmt, None);
1097        assert_eq!(results.len(), 1);
1098        assert!(
1099            !results[0].detections.is_empty(),
1100            "logfmt level=error should match"
1101        );
1102    }
1103
1104    #[cfg(feature = "cef")]
1105    #[test]
1106    fn format_cef_matches() {
1107        let proc = make_processor(
1108            r#"
1109title: CEF Test
1110status: test
1111logsource:
1112    category: test
1113detection:
1114    selection:
1115        deviceVendor: Security
1116    condition: selection
1117"#,
1118        );
1119
1120        let batch = vec!["CEF:0|Security|IDS|1.0|100|Attack|9|src=10.0.0.1".to_string()];
1121        let results = proc.process_batch_with_format(&batch, &InputFormat::Cef, None);
1122        assert_eq!(results.len(), 1);
1123        assert!(
1124            !results[0].detections.is_empty(),
1125            "CEF deviceVendor=Security should match"
1126        );
1127    }
1128}