Skip to main content

rsigma_runtime/
processor.rs

1use std::sync::Arc;
2
3use parking_lot::Mutex;
4use std::time::Instant;
5
6use arc_swap::ArcSwap;
7use rsigma_eval::{Event, FieldObserver, JsonEvent, ProcessResult, ProcessResultExt, RuleFieldSet};
8
9use crate::engine::RuntimeEngine;
10use crate::input::{EventInputDecoded, InputFormat, parse_line};
11use crate::metrics::MetricsHook;
12
13/// Closure that extracts multiple payloads from a single JSON value.
14///
15/// Used by the daemon's event filter (e.g. jq/jsonpath) to explode a JSON
16/// object into sub-events (e.g. `.records[]`). Only applies to JSON input.
17pub type EventFilter = dyn Fn(&serde_json::Value) -> Vec<serde_json::Value>;
18
19/// Thread-safe handle to the engine, swappable atomically for hot-reload.
20///
21/// Uses `ArcSwap<Mutex<RuntimeEngine>>` so that:
22/// - Detection + correlation processing can acquire `&mut RuntimeEngine` via
23///   the inner `Mutex`.
24/// - Hot-reload swaps the entire engine atomically without blocking in-flight
25///   batches (they hold an `Arc` to the old engine until their batch completes).
26pub struct LogProcessor {
27    engine: Arc<ArcSwap<Mutex<RuntimeEngine>>>,
28    metrics: Arc<dyn MetricsHook>,
29    /// Optional opt-in field observer. When `Some`, every parsed event
30    /// flowing through `process_batch_with_format` has its field keys
31    /// recorded. When `None`, the batch path skips iteration entirely
32    /// so the hot path stays untouched.
33    field_observer: ArcSwap<Option<Arc<FieldObserver>>>,
34}
35
36impl LogProcessor {
37    /// Create a new processor wrapping the given engine and metrics hook.
38    pub fn new(engine: RuntimeEngine, metrics: Arc<dyn MetricsHook>) -> Self {
39        LogProcessor {
40            engine: Arc::new(ArcSwap::from_pointee(Mutex::new(engine))),
41            metrics,
42            field_observer: ArcSwap::new(Arc::new(None)),
43        }
44    }
45
46    /// Attach (or detach) the opt-in field observer.
47    ///
48    /// When set, [`process_batch_with_format`](Self::process_batch_with_format)
49    /// records each parsed event's field keys before evaluation. Pass
50    /// `None` to disable observation; the hot path then performs zero
51    /// extra work. Safe to call at runtime: the swap is wait-free, and
52    /// in-flight batches finish against whichever observer they read.
53    pub fn set_field_observer(&self, observer: Option<Arc<FieldObserver>>) {
54        self.field_observer.store(Arc::new(observer));
55    }
56
57    /// Return the currently-attached field observer, if any.
58    pub fn field_observer(&self) -> Option<Arc<FieldObserver>> {
59        self.field_observer.load_full().as_ref().clone()
60    }
61
62    /// Atomically replace the engine with a new one.
63    ///
64    /// In-flight batches continue against the old engine (they hold an `Arc`
65    /// snapshot). New batches see the replacement on their next call to
66    /// `process_batch_lines`.
67    pub fn swap_engine(&self, new_engine: RuntimeEngine) {
68        self.engine.store(Arc::new(Mutex::new(new_engine)));
69    }
70
71    /// Load a snapshot of the current engine for use during reload.
72    ///
73    /// The caller can lock the returned guard to export state, build a new
74    /// engine, import state, and then call `swap_engine`.
75    pub fn engine_snapshot(&self) -> arc_swap::Guard<Arc<Mutex<RuntimeEngine>>> {
76        self.engine.load()
77    }
78
79    /// Process a batch of raw input lines through the engine.
80    ///
81    /// 1. Parses each line as JSON; on error, increments parse error metrics.
82    /// 2. Applies the `event_filter` closure to extract payloads.
83    /// 3. Evaluates all payloads via `RuntimeEngine::process_batch`.
84    /// 4. Merges per-payload results back into per-line results.
85    /// 5. Updates metrics (events processed, latency, match counts).
86    ///
87    /// Returns one `ProcessResult` per input line.
88    pub fn process_batch_lines(
89        &self,
90        batch: &[String],
91        event_filter: &EventFilter,
92    ) -> Vec<ProcessResult> {
93        let engine_guard = self.engine.load();
94        let mut engine = engine_guard.lock();
95
96        // Phase 1: Parse JSON and apply event filters, tracking line origin.
97        let mut parsed: Vec<(usize, Vec<serde_json::Value>)> = Vec::with_capacity(batch.len());
98        for (line_idx, line) in batch.iter().enumerate() {
99            match serde_json::from_str::<serde_json::Value>(line) {
100                Ok(value) => {
101                    let payloads = event_filter(&value);
102                    if !payloads.is_empty() {
103                        parsed.push((line_idx, payloads));
104                    }
105                }
106                Err(e) => {
107                    self.metrics.on_parse_error();
108                    tracing::debug!(error = %e, "Invalid JSON on input");
109                }
110            }
111        }
112
113        // Flatten: (line_idx, &Value) for each payload across all lines
114        let mut flat: Vec<(usize, &serde_json::Value)> = Vec::new();
115        for (line_idx, payloads) in &parsed {
116            for payload in payloads {
117                flat.push((*line_idx, payload));
118            }
119        }
120
121        if flat.is_empty() {
122            return empty_results(batch.len());
123        }
124
125        // Phase 2: Batch evaluation — parallel detection + sequential correlation
126        let events: Vec<JsonEvent> = flat.iter().map(|(_, v)| JsonEvent::borrow(v)).collect();
127        let event_refs: Vec<&JsonEvent> = events.iter().collect();
128
129        let start = Instant::now();
130        let batch_results = engine.process_batch(&event_refs);
131        let elapsed = start.elapsed().as_secs_f64();
132        let per_event_latency = elapsed / event_refs.len() as f64;
133
134        // Update correlation state metrics while we still hold the lock
135        let stats = engine.stats();
136        self.metrics
137            .set_correlation_state_entries(stats.state_entries as u64);
138
139        // Phase 3: Merge results per input line and update metrics
140        let mut line_results = empty_results(batch.len());
141
142        for ((line_idx, _), result) in flat.iter().zip(batch_results) {
143            self.metrics.on_events_processed(1);
144            self.metrics.observe_processing_latency(per_event_latency);
145            self.metrics
146                .on_detection_matches(result.detection_count() as u64);
147            self.metrics
148                .on_correlation_matches(result.correlation_count() as u64);
149
150            for r in &result {
151                let level_str = r.header.level.as_ref().map_or("unknown", |l| l.as_str());
152                let title = &r.header.rule_title;
153                match &r.body {
154                    rsigma_eval::ResultBody::Detection(_) => {
155                        self.metrics.on_detection_match_detail(title, level_str);
156                    }
157                    rsigma_eval::ResultBody::Correlation(body) => {
158                        self.metrics.on_correlation_match_detail(
159                            title,
160                            level_str,
161                            body.correlation_type.as_str(),
162                        );
163                    }
164                }
165            }
166
167            line_results[*line_idx].extend(result);
168        }
169
170        line_results
171    }
172
173    /// Process a batch of raw input lines using the specified input format.
174    ///
175    /// Unlike [`process_batch_lines`](Self::process_batch_lines), this method
176    /// supports all input formats (JSON, syslog, plain, logfmt, CEF). The
177    /// `event_filter` only applies to JSON-decoded events (it extracts multiple
178    /// payloads from one JSON object, e.g. a `records[]` array). Non-JSON
179    /// formats produce exactly one event per line.
180    ///
181    /// Returns one `ProcessResult` per input line.
182    pub fn process_batch_with_format(
183        &self,
184        batch: &[String],
185        format: &InputFormat,
186        event_filter: Option<&EventFilter>,
187    ) -> Vec<ProcessResult> {
188        let engine_guard = self.engine.load();
189        let mut engine = engine_guard.lock();
190
191        // Phase 1: Parse each line into decoded events, tracking line origin.
192        // For JSON with an event_filter, one line can produce multiple events.
193        let mut decoded_events: Vec<(usize, EventInputDecoded)> = Vec::with_capacity(batch.len());
194
195        for (line_idx, line) in batch.iter().enumerate() {
196            let Some(decoded) = parse_line(line, format) else {
197                if !line.trim().is_empty() {
198                    self.metrics.on_parse_error();
199                    tracing::debug!("Failed to parse input line");
200                }
201                continue;
202            };
203
204            // For JSON events with an event filter, apply the filter which
205            // may produce multiple payloads (e.g. `.records[]`).
206            if let Some(filter) = event_filter
207                && let EventInputDecoded::Json(ref json_event) = decoded
208            {
209                let json_value = json_event.to_json();
210                let payloads = filter(&json_value);
211                for payload in payloads {
212                    decoded_events
213                        .push((line_idx, EventInputDecoded::Json(JsonEvent::owned(payload))));
214                }
215                continue;
216            }
217
218            decoded_events.push((line_idx, decoded));
219        }
220
221        if decoded_events.is_empty() {
222            return empty_results(batch.len());
223        }
224
225        // Optional opt-in field observation. Cheap when disabled: one
226        // hazard-pointer `Guard` (no Arc clone) plus an `Option` check.
227        // When enabled, walks each decoded event's field keys before
228        // evaluation. The Guard's lifetime extends through the loop so
229        // the observer cannot be dropped mid-batch even if the daemon
230        // detaches it concurrently.
231        let observer_guard = self.field_observer.load();
232        if let Some(observer) = observer_guard.as_ref() {
233            for (_, decoded) in &decoded_events {
234                observer.observe(decoded);
235            }
236        }
237        drop(observer_guard);
238
239        // Phase 2: Batch evaluation — parallel detection + sequential correlation
240        let event_refs: Vec<&EventInputDecoded> = decoded_events.iter().map(|(_, e)| e).collect();
241
242        let start = Instant::now();
243        let batch_results = engine.process_batch(&event_refs);
244        let elapsed = start.elapsed().as_secs_f64();
245        let per_event_latency = elapsed / event_refs.len() as f64;
246
247        let stats = engine.stats();
248        self.metrics
249            .set_correlation_state_entries(stats.state_entries as u64);
250
251        // Phase 3: Merge results per input line and update metrics
252        let mut line_results = empty_results(batch.len());
253
254        for ((line_idx, _), result) in decoded_events.iter().zip(batch_results) {
255            self.metrics.on_events_processed(1);
256            self.metrics.observe_processing_latency(per_event_latency);
257            self.metrics
258                .on_detection_matches(result.detection_count() as u64);
259            self.metrics
260                .on_correlation_matches(result.correlation_count() as u64);
261
262            for r in &result {
263                let level_str = r.header.level.as_ref().map_or("unknown", |l| l.as_str());
264                let title = &r.header.rule_title;
265                match &r.body {
266                    rsigma_eval::ResultBody::Detection(_) => {
267                        self.metrics.on_detection_match_detail(title, level_str);
268                    }
269                    rsigma_eval::ResultBody::Correlation(body) => {
270                        self.metrics.on_correlation_match_detail(
271                            title,
272                            level_str,
273                            body.correlation_type.as_str(),
274                        );
275                    }
276                }
277            }
278
279            line_results[*line_idx].extend(result);
280        }
281
282        line_results
283    }
284
285    /// Reload rules (and pipelines) without blocking in-flight event processing.
286    ///
287    /// Builds a fresh `RuntimeEngine` with the same configuration as the
288    /// current one, re-reads pipeline files from disk (if paths are set),
289    /// loads rules into it, imports the old engine's correlation state, and
290    /// atomically swaps. In-flight batches that already hold an `Arc` to
291    /// the old engine finish undisturbed.
292    ///
293    /// If pipeline or rule loading fails, the old engine remains active.
294    pub fn reload_rules(&self) -> Result<crate::engine::EngineStats, String> {
295        let (
296            old_state,
297            rules_path,
298            pipelines,
299            pipeline_paths,
300            corr_config,
301            include_event,
302            resolver,
303            allow_remote_include,
304        ) = {
305            let snapshot = self.engine.load();
306            let old = snapshot.lock();
307            (
308                old.export_state(),
309                old.rules_path().to_path_buf(),
310                old.pipelines().to_vec(),
311                old.pipeline_paths().to_vec(),
312                old.corr_config().clone(),
313                old.include_event(),
314                old.source_resolver().cloned(),
315                old.allow_remote_include(),
316            )
317        };
318
319        let mut new_engine = RuntimeEngine::new(rules_path, pipelines, corr_config, include_event);
320        new_engine.set_pipeline_paths(pipeline_paths);
321        new_engine.set_allow_remote_include(allow_remote_include);
322        if let Some(resolver) = resolver {
323            new_engine.set_source_resolver(resolver);
324        }
325        let stats = new_engine.load_rules()?;
326
327        if let Some(state) = old_state
328            && !new_engine.import_state(&state)
329        {
330            tracing::warn!(
331                "Incompatible correlation snapshot version during reload, starting fresh"
332            );
333        }
334
335        self.swap_engine(new_engine);
336        Ok(stats)
337    }
338
339    /// Return the rules path from the current engine.
340    pub fn rules_path(&self) -> std::path::PathBuf {
341        let snapshot = self.engine.load();
342        let engine = snapshot.lock();
343        engine.rules_path().to_path_buf()
344    }
345
346    /// Return a reference to the metrics hook.
347    pub fn metrics(&self) -> &dyn MetricsHook {
348        &*self.metrics
349    }
350
351    /// Export correlation state from the current engine.
352    pub fn export_state(&self) -> Option<rsigma_eval::CorrelationSnapshot> {
353        let snapshot = self.engine.load();
354        let engine = snapshot.lock();
355        engine.export_state()
356    }
357
358    /// Import correlation state into the current engine.
359    pub fn import_state(&self, snapshot: &rsigma_eval::CorrelationSnapshot) -> bool {
360        let guard = self.engine.load();
361        let mut engine = guard.lock();
362        engine.import_state(snapshot)
363    }
364
365    /// Return summary statistics about the current engine.
366    pub fn stats(&self) -> crate::engine::EngineStats {
367        let snapshot = self.engine.load();
368        let engine = snapshot.lock();
369        engine.stats()
370    }
371
372    /// Return an immutable snapshot of the current rule field set
373    /// (post-pipeline). The lock is held only long enough to clone the
374    /// `Arc`; the returned value remains valid across reloads.
375    pub fn rule_field_set(&self) -> Arc<RuleFieldSet> {
376        let snapshot = self.engine.load();
377        let engine = snapshot.lock();
378        engine.rule_field_set()
379    }
380}
381
382/// Produce a vec of empty `ProcessResult`, one per input line.
383fn empty_results(count: usize) -> Vec<ProcessResult> {
384    (0..count).map(|_| ProcessResult::new()).collect()
385}
386
387#[cfg(test)]
388mod tests {
389    use super::*;
390    use crate::metrics::NoopMetrics;
391    use rsigma_eval::CorrelationConfig;
392
393    fn identity_filter(v: &serde_json::Value) -> Vec<serde_json::Value> {
394        vec![v.clone()]
395    }
396
397    fn make_processor(rules_yaml: &str) -> LogProcessor {
398        let dir = tempfile::tempdir().unwrap();
399        let rule_path = dir.path().join("test.yml");
400        std::fs::write(&rule_path, rules_yaml).unwrap();
401
402        let mut engine = RuntimeEngine::new(rule_path, vec![], CorrelationConfig::default(), false);
403        engine.load_rules().unwrap();
404        // Leak the tempdir so the path stays valid
405        std::mem::forget(dir);
406        LogProcessor::new(engine, Arc::new(NoopMetrics))
407    }
408
409    #[test]
410    fn process_batch_lines_valid_json() {
411        let proc = make_processor(
412            r#"
413title: Test Rule
414status: test
415logsource:
416    category: test
417detection:
418    selection:
419        EventID: 1
420    condition: selection
421"#,
422        );
423
424        let batch = vec![
425            r#"{"EventID": 1}"#.to_string(),
426            r#"{"EventID": 2}"#.to_string(),
427        ];
428        let results = proc.process_batch_lines(&batch, &identity_filter);
429        assert_eq!(results.len(), 2);
430        assert!(results[0].detection_count() > 0, "EventID=1 should match");
431        assert!(
432            results[1].detection_count() == 0,
433            "EventID=2 should not match"
434        );
435    }
436
437    #[test]
438    fn process_batch_lines_invalid_json() {
439        let proc = make_processor(
440            r#"
441title: Test Rule
442status: test
443logsource:
444    category: test
445detection:
446    selection:
447        EventID: 1
448    condition: selection
449"#,
450        );
451
452        let batch = vec!["not json".to_string(), r#"{"EventID": 1}"#.to_string()];
453        let results = proc.process_batch_lines(&batch, &identity_filter);
454        assert_eq!(results.len(), 2);
455        assert!(
456            results[0].detection_count() == 0,
457            "invalid JSON produces empty result"
458        );
459        assert!(results[1].detection_count() > 0, "valid line still matches");
460    }
461
462    #[test]
463    fn swap_engine_replaces_rules() {
464        let dir = tempfile::tempdir().unwrap();
465        let rule_path = dir.path().join("test.yml");
466        std::fs::write(
467            &rule_path,
468            r#"
469title: Rule A
470status: test
471logsource:
472    category: test
473detection:
474    selection:
475        EventID: 1
476    condition: selection
477"#,
478        )
479        .unwrap();
480
481        let mut engine = RuntimeEngine::new(
482            rule_path.clone(),
483            vec![],
484            CorrelationConfig::default(),
485            false,
486        );
487        engine.load_rules().unwrap();
488        let proc = LogProcessor::new(engine, Arc::new(NoopMetrics));
489
490        let batch = vec![r#"{"EventID": 1}"#.to_string()];
491        assert!(proc.process_batch_lines(&batch, &identity_filter)[0].detection_count() > 0);
492
493        // Swap to a rule that matches EventID: 99
494        std::fs::write(
495            &rule_path,
496            r#"
497title: Rule B
498status: test
499logsource:
500    category: test
501detection:
502    selection:
503        EventID: 99
504    condition: selection
505"#,
506        )
507        .unwrap();
508
509        let mut new_engine =
510            RuntimeEngine::new(rule_path, vec![], CorrelationConfig::default(), false);
511        new_engine.load_rules().unwrap();
512        proc.swap_engine(new_engine);
513
514        assert!(proc.process_batch_lines(&batch, &identity_filter)[0].detection_count() == 0);
515
516        let batch2 = vec![r#"{"EventID": 99}"#.to_string()];
517        assert!(proc.process_batch_lines(&batch2, &identity_filter)[0].detection_count() > 0);
518
519        std::mem::forget(dir);
520    }
521
522    #[test]
523    fn reload_rules_preserves_engine() {
524        let dir = tempfile::tempdir().unwrap();
525        let rule_path = dir.path().join("test.yml");
526        std::fs::write(
527            &rule_path,
528            r#"
529title: Rule A
530status: test
531logsource:
532    category: test
533detection:
534    selection:
535        EventID: 1
536    condition: selection
537"#,
538        )
539        .unwrap();
540
541        let mut engine = RuntimeEngine::new(
542            rule_path.clone(),
543            vec![],
544            CorrelationConfig::default(),
545            false,
546        );
547        engine.load_rules().unwrap();
548        let proc = LogProcessor::new(engine, Arc::new(NoopMetrics));
549
550        let batch = vec![r#"{"EventID": 1}"#.to_string()];
551        assert!(proc.process_batch_lines(&batch, &identity_filter)[0].detection_count() > 0);
552
553        // Update the rule file and reload
554        std::fs::write(
555            &rule_path,
556            r#"
557title: Rule B
558status: test
559logsource:
560    category: test
561detection:
562    selection:
563        EventID: 42
564    condition: selection
565"#,
566        )
567        .unwrap();
568
569        let stats = proc.reload_rules().unwrap();
570        assert_eq!(stats.detection_rules, 1);
571
572        // Old rule should no longer match
573        assert!(proc.process_batch_lines(&batch, &identity_filter)[0].detection_count() == 0);
574        // New rule should match
575        let batch2 = vec![r#"{"EventID": 42}"#.to_string()];
576        assert!(proc.process_batch_lines(&batch2, &identity_filter)[0].detection_count() > 0);
577
578        std::mem::forget(dir);
579    }
580
581    #[test]
582    fn reload_re_reads_pipelines_from_disk() {
583        let dir = tempfile::tempdir().unwrap();
584
585        // Rule uses the generic Sigma field name "SourceIP".
586        // The pipeline maps it to what the events actually contain.
587        let rule_path = dir.path().join("test.yml");
588        std::fs::write(
589            &rule_path,
590            r#"
591title: Rule A
592status: test
593logsource:
594    category: test
595detection:
596    selection:
597        SourceIP: "10.0.0.1"
598    condition: selection
599"#,
600        )
601        .unwrap();
602
603        // Pipeline maps the rule's SourceIP field to "src_ip" (event field)
604        let pipeline_path = dir.path().join("pipeline.yml");
605        std::fs::write(
606            &pipeline_path,
607            r#"
608name: Initial Pipeline
609priority: 10
610transformations:
611  - id: rename_field
612    type: field_name_mapping
613    mapping:
614      SourceIP: src_ip
615"#,
616        )
617        .unwrap();
618
619        let pipelines = vec![rsigma_eval::parse_pipeline_file(&pipeline_path).unwrap()];
620        let mut engine = RuntimeEngine::new(
621            rule_path.clone(),
622            pipelines,
623            CorrelationConfig::default(),
624            false,
625        );
626        engine.set_pipeline_paths(vec![pipeline_path.clone()]);
627        engine.load_rules().unwrap();
628        let proc = LogProcessor::new(engine, Arc::new(NoopMetrics));
629
630        // Event uses "src_ip" which the pipeline mapped from SourceIP
631        let batch = vec![r#"{"src_ip": "10.0.0.1"}"#.to_string()];
632        assert!(
633            proc.process_batch_lines(&batch, &identity_filter)[0].detection_count() > 0,
634            "src_ip should match because pipeline mapped SourceIP -> src_ip"
635        );
636
637        // Update pipeline to map SourceIP to a different event field name
638        std::fs::write(
639            &pipeline_path,
640            r#"
641name: Updated Pipeline
642priority: 10
643transformations:
644  - id: rename_field
645    type: field_name_mapping
646    mapping:
647      SourceIP: source.ip
648"#,
649        )
650        .unwrap();
651
652        proc.reload_rules().unwrap();
653
654        // src_ip no longer the target, should not match
655        assert!(
656            proc.process_batch_lines(&batch, &identity_filter)[0].detection_count() == 0,
657            "after pipeline reload, src_ip should no longer match"
658        );
659
660        // source.ip is now the mapped name, should match
661        let batch2 = vec![r#"{"source.ip": "10.0.0.1"}"#.to_string()];
662        assert!(
663            proc.process_batch_lines(&batch2, &identity_filter)[0].detection_count() > 0,
664            "after pipeline reload, source.ip should match"
665        );
666
667        std::mem::forget(dir);
668    }
669
670    #[test]
671    fn reload_with_broken_pipeline_keeps_old_engine() {
672        let dir = tempfile::tempdir().unwrap();
673        let rule_path = dir.path().join("test.yml");
674        std::fs::write(
675            &rule_path,
676            r#"
677title: Rule A
678status: test
679logsource:
680    category: test
681detection:
682    selection:
683        SourceIP: "10.0.0.1"
684    condition: selection
685"#,
686        )
687        .unwrap();
688
689        let pipeline_path = dir.path().join("pipeline.yml");
690        std::fs::write(
691            &pipeline_path,
692            r#"
693name: Working Pipeline
694priority: 10
695transformations:
696  - id: rename_field
697    type: field_name_mapping
698    mapping:
699      SourceIP: src_ip
700"#,
701        )
702        .unwrap();
703
704        let pipelines = vec![rsigma_eval::parse_pipeline_file(&pipeline_path).unwrap()];
705        let mut engine = RuntimeEngine::new(
706            rule_path.clone(),
707            pipelines,
708            CorrelationConfig::default(),
709            false,
710        );
711        engine.set_pipeline_paths(vec![pipeline_path.clone()]);
712        engine.load_rules().unwrap();
713        let proc = LogProcessor::new(engine, Arc::new(NoopMetrics));
714
715        // Verify initial state works (SourceIP mapped to src_ip)
716        let batch = vec![r#"{"src_ip": "10.0.0.1"}"#.to_string()];
717        assert!(proc.process_batch_lines(&batch, &identity_filter)[0].detection_count() > 0);
718
719        // Write broken YAML to the pipeline file
720        std::fs::write(&pipeline_path, "{{{{ invalid yaml !!!!").unwrap();
721
722        // Reload should fail
723        let result = proc.reload_rules();
724        assert!(result.is_err(), "reload with broken pipeline should fail");
725
726        // Old engine should still be active and working
727        assert!(
728            proc.process_batch_lines(&batch, &identity_filter)[0].detection_count() > 0,
729            "old engine should still work after failed reload"
730        );
731
732        std::mem::forget(dir);
733    }
734
735    #[test]
736    fn custom_event_filter() {
737        let proc = make_processor(
738            r#"
739title: Test Rule
740status: test
741logsource:
742    category: test
743detection:
744    selection:
745        EventID: 1
746    condition: selection
747"#,
748        );
749
750        // Filter that extracts a nested "records" array
751        let filter = |v: &serde_json::Value| -> Vec<serde_json::Value> {
752            if let Some(records) = v.get("records").and_then(|r| r.as_array()) {
753                records.clone()
754            } else {
755                vec![v.clone()]
756            }
757        };
758
759        let batch = vec![r#"{"records": [{"EventID": 1}, {"EventID": 2}]}"#.to_string()];
760        let results = proc.process_batch_lines(&batch, &filter);
761        assert_eq!(results.len(), 1);
762        assert_eq!(
763            results[0].detection_count(),
764            1,
765            "only EventID=1 from records array should match"
766        );
767    }
768
769    #[test]
770    fn empty_batch_returns_empty() {
771        let proc = make_processor(
772            r#"
773title: Test Rule
774status: test
775logsource:
776    category: test
777detection:
778    selection:
779        EventID: 1
780    condition: selection
781"#,
782        );
783
784        let batch: Vec<String> = vec![];
785        let results = proc.process_batch_lines(&batch, &identity_filter);
786        assert!(results.is_empty());
787    }
788
789    /// Verify MetricsHook is called correctly during processing.
790    #[test]
791    fn metrics_hook_invocations() {
792        use std::sync::atomic::{AtomicU64, Ordering};
793
794        struct CountingMetrics {
795            parse_errors: AtomicU64,
796            events_processed: AtomicU64,
797            detection_matches: AtomicU64,
798        }
799
800        impl MetricsHook for CountingMetrics {
801            fn on_parse_error(&self) {
802                self.parse_errors.fetch_add(1, Ordering::Relaxed);
803            }
804            fn on_events_processed(&self, count: u64) {
805                self.events_processed.fetch_add(count, Ordering::Relaxed);
806            }
807            fn on_detection_matches(&self, count: u64) {
808                self.detection_matches.fetch_add(count, Ordering::Relaxed);
809            }
810            fn on_correlation_matches(&self, _: u64) {}
811            fn observe_processing_latency(&self, _: f64) {}
812            fn on_input_queue_depth_change(&self, _: i64) {}
813            fn on_back_pressure(&self) {}
814            fn observe_batch_size(&self, _: u64) {}
815            fn on_output_queue_depth_change(&self, _: i64) {}
816            fn observe_pipeline_latency(&self, _: f64) {}
817            fn set_correlation_state_entries(&self, _: u64) {}
818        }
819
820        let dir = tempfile::tempdir().unwrap();
821        let rule_path = dir.path().join("test.yml");
822        std::fs::write(
823            &rule_path,
824            r#"
825title: Test Rule
826status: test
827logsource:
828    category: test
829detection:
830    selection:
831        EventID: 1
832    condition: selection
833"#,
834        )
835        .unwrap();
836
837        let mut engine = RuntimeEngine::new(rule_path, vec![], CorrelationConfig::default(), false);
838        engine.load_rules().unwrap();
839
840        let metrics = Arc::new(CountingMetrics {
841            parse_errors: AtomicU64::new(0),
842            events_processed: AtomicU64::new(0),
843            detection_matches: AtomicU64::new(0),
844        });
845        let proc = LogProcessor::new(engine, metrics.clone());
846
847        let batch = vec![
848            "not json".to_string(),
849            r#"{"EventID": 1}"#.to_string(),
850            r#"{"EventID": 2}"#.to_string(),
851        ];
852        proc.process_batch_lines(&batch, &identity_filter);
853
854        assert_eq!(metrics.parse_errors.load(Ordering::Relaxed), 1);
855        assert_eq!(metrics.events_processed.load(Ordering::Relaxed), 2);
856        assert_eq!(metrics.detection_matches.load(Ordering::Relaxed), 1);
857
858        std::mem::forget(dir);
859    }
860
861    /// Verify concurrent processing and swap don't panic (basic thread safety).
862    #[test]
863    fn concurrent_swap_and_process() {
864        let dir = tempfile::tempdir().unwrap();
865        let rule_path = dir.path().join("test.yml");
866        std::fs::write(
867            &rule_path,
868            r#"
869title: Rule A
870status: test
871logsource:
872    category: test
873detection:
874    selection:
875        EventID: 1
876    condition: selection
877"#,
878        )
879        .unwrap();
880
881        let mut engine = RuntimeEngine::new(
882            rule_path.clone(),
883            vec![],
884            CorrelationConfig::default(),
885            false,
886        );
887        engine.load_rules().unwrap();
888        let proc = Arc::new(LogProcessor::new(engine, Arc::new(NoopMetrics)));
889
890        let handles: Vec<_> = (0..4)
891            .map(|i| {
892                let proc = proc.clone();
893                let rule_path = rule_path.clone();
894                std::thread::spawn(move || {
895                    let batch = vec![r#"{"EventID": 1}"#.to_string()];
896                    for _ in 0..100 {
897                        let _ = proc.process_batch_lines(&batch, &identity_filter);
898                    }
899                    // Thread 0 does a swap mid-flight
900                    if i == 0 {
901                        let mut new_engine = RuntimeEngine::new(
902                            rule_path,
903                            vec![],
904                            CorrelationConfig::default(),
905                            false,
906                        );
907                        new_engine.load_rules().unwrap();
908                        proc.swap_engine(new_engine);
909                    }
910                })
911            })
912            .collect();
913
914        for h in handles {
915            h.join().unwrap();
916        }
917
918        std::mem::forget(dir);
919    }
920
921    // --- Tests for process_batch_with_format ---
922
923    #[test]
924    fn format_json_matches() {
925        let proc = make_processor(
926            r#"
927title: Test Rule
928status: test
929logsource:
930    category: test
931detection:
932    selection:
933        EventID: 1
934    condition: selection
935"#,
936        );
937
938        let batch = vec![r#"{"EventID": 1}"#.to_string()];
939        let results = proc.process_batch_with_format(&batch, &InputFormat::Json, None);
940        assert_eq!(results.len(), 1);
941        assert!(
942            results[0].detection_count() > 0,
943            "JSON EventID=1 should match"
944        );
945    }
946
947    #[test]
948    fn format_syslog_extracts_fields() {
949        let proc = make_processor(
950            r#"
951title: Syslog Test
952status: test
953logsource:
954    category: test
955detection:
956    selection:
957        hostname: mymachine
958    condition: selection
959"#,
960        );
961
962        let batch = vec!["<34>Oct 11 22:14:15 mymachine su: test message".to_string()];
963        let results = proc.process_batch_with_format(
964            &batch,
965            &InputFormat::Syslog(crate::input::SyslogConfig::default()),
966            None,
967        );
968        assert_eq!(results.len(), 1);
969        assert!(
970            results[0].detection_count() > 0,
971            "syslog hostname=mymachine should match"
972        );
973    }
974
975    #[test]
976    fn format_plain_keyword_match() {
977        let proc = make_processor(
978            r#"
979title: Keyword Test
980status: test
981logsource:
982    category: test
983detection:
984    keywords:
985        - "disk full"
986    condition: keywords
987"#,
988        );
989
990        let batch = vec!["ERROR: disk full on /dev/sda1".to_string()];
991        let results = proc.process_batch_with_format(&batch, &InputFormat::Plain, None);
992        assert_eq!(results.len(), 1);
993        assert!(
994            results[0].detection_count() > 0,
995            "plain keyword 'disk full' should match"
996        );
997    }
998
999    #[test]
1000    fn format_auto_detects_json() {
1001        let proc = make_processor(
1002            r#"
1003title: Test Rule
1004status: test
1005logsource:
1006    category: test
1007detection:
1008    selection:
1009        EventID: 1
1010    condition: selection
1011"#,
1012        );
1013
1014        let batch = vec![r#"{"EventID": 1}"#.to_string()];
1015        let results = proc.process_batch_with_format(&batch, &InputFormat::default(), None);
1016        assert_eq!(results.len(), 1);
1017        assert!(results[0].detection_count() > 0);
1018    }
1019
1020    #[test]
1021    fn format_json_with_event_filter() {
1022        let proc = make_processor(
1023            r#"
1024title: Test Rule
1025status: test
1026logsource:
1027    category: test
1028detection:
1029    selection:
1030        EventID: 1
1031    condition: selection
1032"#,
1033        );
1034
1035        let filter = |v: &serde_json::Value| -> Vec<serde_json::Value> {
1036            if let Some(records) = v.get("records").and_then(|r| r.as_array()) {
1037                records.clone()
1038            } else {
1039                vec![v.clone()]
1040            }
1041        };
1042
1043        let batch = vec![r#"{"records": [{"EventID": 1}, {"EventID": 2}]}"#.to_string()];
1044        let results = proc.process_batch_with_format(&batch, &InputFormat::Json, Some(&filter));
1045        assert_eq!(results.len(), 1);
1046        assert_eq!(
1047            results[0].detection_count(),
1048            1,
1049            "only EventID=1 from records array should match"
1050        );
1051    }
1052
1053    #[test]
1054    fn format_empty_lines_skipped() {
1055        let proc = make_processor(
1056            r#"
1057title: Test Rule
1058status: test
1059logsource:
1060    category: test
1061detection:
1062    selection:
1063        EventID: 1
1064    condition: selection
1065"#,
1066        );
1067
1068        let batch = vec![
1069            "".to_string(),
1070            "   ".to_string(),
1071            r#"{"EventID": 1}"#.to_string(),
1072        ];
1073        let results = proc.process_batch_with_format(&batch, &InputFormat::Json, None);
1074        assert_eq!(results.len(), 3);
1075        assert!(results[0].detection_count() == 0);
1076        assert!(results[1].detection_count() == 0);
1077        assert!(results[2].detection_count() > 0);
1078    }
1079
1080    #[cfg(feature = "logfmt")]
1081    #[test]
1082    fn format_logfmt_matches() {
1083        let proc = make_processor(
1084            r#"
1085title: Logfmt Test
1086status: test
1087logsource:
1088    category: test
1089detection:
1090    selection:
1091        level: error
1092    condition: selection
1093"#,
1094        );
1095
1096        let batch = vec!["level=error msg=something host=web01".to_string()];
1097        let results = proc.process_batch_with_format(&batch, &InputFormat::Logfmt, None);
1098        assert_eq!(results.len(), 1);
1099        assert!(
1100            results[0].detection_count() > 0,
1101            "logfmt level=error should match"
1102        );
1103    }
1104
1105    #[cfg(feature = "cef")]
1106    #[test]
1107    fn format_cef_matches() {
1108        let proc = make_processor(
1109            r#"
1110title: CEF Test
1111status: test
1112logsource:
1113    category: test
1114detection:
1115    selection:
1116        deviceVendor: Security
1117    condition: selection
1118"#,
1119        );
1120
1121        let batch = vec!["CEF:0|Security|IDS|1.0|100|Attack|9|src=10.0.0.1".to_string()];
1122        let results = proc.process_batch_with_format(&batch, &InputFormat::Cef, None);
1123        assert_eq!(results.len(), 1);
1124        assert!(
1125            results[0].detection_count() > 0,
1126            "CEF deviceVendor=Security should match"
1127        );
1128    }
1129}