Skip to main content

rsigma_runtime/
processor.rs

1use std::sync::{Arc, Mutex};
2use std::time::Instant;
3
4use arc_swap::ArcSwap;
5use rsigma_eval::{Event, JsonEvent, ProcessResult};
6
7use crate::engine::RuntimeEngine;
8use crate::input::{EventInputDecoded, InputFormat, parse_line};
9use crate::metrics::MetricsHook;
10
11/// Closure that extracts multiple payloads from a single JSON value.
12///
13/// Used by the daemon's event filter (e.g. jq/jsonpath) to explode a JSON
14/// object into sub-events (e.g. `.records[]`). Only applies to JSON input.
15pub type EventFilter = dyn Fn(&serde_json::Value) -> Vec<serde_json::Value>;
16
17/// Thread-safe handle to the engine, swappable atomically for hot-reload.
18///
19/// Uses `ArcSwap<Mutex<RuntimeEngine>>` so that:
20/// - Detection + correlation processing can acquire `&mut RuntimeEngine` via
21///   the inner `Mutex`.
22/// - Hot-reload swaps the entire engine atomically without blocking in-flight
23///   batches (they hold an `Arc` to the old engine until their batch completes).
24pub struct LogProcessor {
25    engine: Arc<ArcSwap<Mutex<RuntimeEngine>>>,
26    metrics: Arc<dyn MetricsHook>,
27}
28
29impl LogProcessor {
30    /// Create a new processor wrapping the given engine and metrics hook.
31    pub fn new(engine: RuntimeEngine, metrics: Arc<dyn MetricsHook>) -> Self {
32        LogProcessor {
33            engine: Arc::new(ArcSwap::from_pointee(Mutex::new(engine))),
34            metrics,
35        }
36    }
37
38    /// Atomically replace the engine with a new one.
39    ///
40    /// In-flight batches continue against the old engine (they hold an `Arc`
41    /// snapshot). New batches see the replacement on their next call to
42    /// `process_batch_lines`.
43    pub fn swap_engine(&self, new_engine: RuntimeEngine) {
44        self.engine.store(Arc::new(Mutex::new(new_engine)));
45    }
46
47    /// Load a snapshot of the current engine for use during reload.
48    ///
49    /// The caller can lock the returned guard to export state, build a new
50    /// engine, import state, and then call `swap_engine`.
51    pub fn engine_snapshot(&self) -> arc_swap::Guard<Arc<Mutex<RuntimeEngine>>> {
52        self.engine.load()
53    }
54
55    /// Process a batch of raw input lines through the engine.
56    ///
57    /// 1. Parses each line as JSON; on error, increments parse error metrics.
58    /// 2. Applies the `event_filter` closure to extract payloads.
59    /// 3. Evaluates all payloads via `RuntimeEngine::process_batch`.
60    /// 4. Merges per-payload results back into per-line results.
61    /// 5. Updates metrics (events processed, latency, match counts).
62    ///
63    /// Returns one `ProcessResult` per input line.
64    pub fn process_batch_lines(
65        &self,
66        batch: &[String],
67        event_filter: &EventFilter,
68    ) -> Vec<ProcessResult> {
69        let engine_guard = self.engine.load();
70        let mut engine = engine_guard.lock().unwrap();
71
72        // Phase 1: Parse JSON and apply event filters, tracking line origin.
73        let mut parsed: Vec<(usize, Vec<serde_json::Value>)> = Vec::with_capacity(batch.len());
74        for (line_idx, line) in batch.iter().enumerate() {
75            match serde_json::from_str::<serde_json::Value>(line) {
76                Ok(value) => {
77                    let payloads = event_filter(&value);
78                    if !payloads.is_empty() {
79                        parsed.push((line_idx, payloads));
80                    }
81                }
82                Err(e) => {
83                    self.metrics.on_parse_error();
84                    tracing::debug!(error = %e, "Invalid JSON on input");
85                }
86            }
87        }
88
89        // Flatten: (line_idx, &Value) for each payload across all lines
90        let mut flat: Vec<(usize, &serde_json::Value)> = Vec::new();
91        for (line_idx, payloads) in &parsed {
92            for payload in payloads {
93                flat.push((*line_idx, payload));
94            }
95        }
96
97        if flat.is_empty() {
98            return empty_results(batch.len());
99        }
100
101        // Phase 2: Batch evaluation — parallel detection + sequential correlation
102        let events: Vec<JsonEvent> = flat.iter().map(|(_, v)| JsonEvent::borrow(v)).collect();
103        let event_refs: Vec<&JsonEvent> = events.iter().collect();
104
105        let start = Instant::now();
106        let batch_results = engine.process_batch(&event_refs);
107        let elapsed = start.elapsed().as_secs_f64();
108        let per_event_latency = elapsed / event_refs.len() as f64;
109
110        // Update correlation state metrics while we still hold the lock
111        let stats = engine.stats();
112        self.metrics
113            .set_correlation_state_entries(stats.state_entries as u64);
114
115        // Phase 3: Merge results per input line and update metrics
116        let mut line_results = empty_results(batch.len());
117
118        for ((line_idx, _), result) in flat.iter().zip(batch_results) {
119            self.metrics.on_events_processed(1);
120            self.metrics.observe_processing_latency(per_event_latency);
121            self.metrics
122                .on_detection_matches(result.detections.len() as u64);
123            self.metrics
124                .on_correlation_matches(result.correlations.len() as u64);
125
126            line_results[*line_idx].detections.extend(result.detections);
127            line_results[*line_idx]
128                .correlations
129                .extend(result.correlations);
130        }
131
132        line_results
133    }
134
135    /// Process a batch of raw input lines using the specified input format.
136    ///
137    /// Unlike [`process_batch_lines`](Self::process_batch_lines), this method
138    /// supports all input formats (JSON, syslog, plain, logfmt, CEF). The
139    /// `event_filter` only applies to JSON-decoded events (it extracts multiple
140    /// payloads from one JSON object, e.g. a `records[]` array). Non-JSON
141    /// formats produce exactly one event per line.
142    ///
143    /// Returns one `ProcessResult` per input line.
144    pub fn process_batch_with_format(
145        &self,
146        batch: &[String],
147        format: &InputFormat,
148        event_filter: Option<&EventFilter>,
149    ) -> Vec<ProcessResult> {
150        let engine_guard = self.engine.load();
151        let mut engine = engine_guard.lock().unwrap();
152
153        // Phase 1: Parse each line into decoded events, tracking line origin.
154        // For JSON with an event_filter, one line can produce multiple events.
155        let mut decoded_events: Vec<(usize, EventInputDecoded)> = Vec::with_capacity(batch.len());
156
157        for (line_idx, line) in batch.iter().enumerate() {
158            let Some(decoded) = parse_line(line, format) else {
159                if !line.trim().is_empty() {
160                    self.metrics.on_parse_error();
161                    tracing::debug!("Failed to parse input line");
162                }
163                continue;
164            };
165
166            // For JSON events with an event filter, apply the filter which
167            // may produce multiple payloads (e.g. `.records[]`).
168            if let Some(filter) = event_filter
169                && let EventInputDecoded::Json(ref json_event) = decoded
170            {
171                let json_value = json_event.to_json();
172                let payloads = filter(&json_value);
173                for payload in payloads {
174                    decoded_events
175                        .push((line_idx, EventInputDecoded::Json(JsonEvent::owned(payload))));
176                }
177                continue;
178            }
179
180            decoded_events.push((line_idx, decoded));
181        }
182
183        if decoded_events.is_empty() {
184            return empty_results(batch.len());
185        }
186
187        // Phase 2: Batch evaluation — parallel detection + sequential correlation
188        let event_refs: Vec<&EventInputDecoded> = decoded_events.iter().map(|(_, e)| e).collect();
189
190        let start = Instant::now();
191        let batch_results = engine.process_batch(&event_refs);
192        let elapsed = start.elapsed().as_secs_f64();
193        let per_event_latency = elapsed / event_refs.len() as f64;
194
195        let stats = engine.stats();
196        self.metrics
197            .set_correlation_state_entries(stats.state_entries as u64);
198
199        // Phase 3: Merge results per input line and update metrics
200        let mut line_results = empty_results(batch.len());
201
202        for ((line_idx, _), result) in decoded_events.iter().zip(batch_results) {
203            self.metrics.on_events_processed(1);
204            self.metrics.observe_processing_latency(per_event_latency);
205            self.metrics
206                .on_detection_matches(result.detections.len() as u64);
207            self.metrics
208                .on_correlation_matches(result.correlations.len() as u64);
209
210            line_results[*line_idx].detections.extend(result.detections);
211            line_results[*line_idx]
212                .correlations
213                .extend(result.correlations);
214        }
215
216        line_results
217    }
218
219    /// Reload rules without blocking in-flight event processing.
220    ///
221    /// Builds a fresh `RuntimeEngine` with the same configuration as the
222    /// current one, loads rules into it, imports the old engine's correlation
223    /// state, and atomically swaps. In-flight batches that already hold an
224    /// `Arc` to the old engine finish undisturbed.
225    pub fn reload_rules(&self) -> Result<crate::engine::EngineStats, String> {
226        let (old_state, rules_path, pipelines, corr_config, include_event) = {
227            let snapshot = self.engine.load();
228            let old = snapshot.lock().unwrap();
229            (
230                old.export_state(),
231                old.rules_path().to_path_buf(),
232                old.pipelines().to_vec(),
233                old.corr_config().clone(),
234                old.include_event(),
235            )
236        };
237
238        let mut new_engine = RuntimeEngine::new(rules_path, pipelines, corr_config, include_event);
239        let stats = new_engine.load_rules()?;
240
241        if let Some(state) = old_state
242            && !new_engine.import_state(&state)
243        {
244            tracing::warn!(
245                "Incompatible correlation snapshot version during reload, starting fresh"
246            );
247        }
248
249        self.swap_engine(new_engine);
250        Ok(stats)
251    }
252
253    /// Return the rules path from the current engine.
254    pub fn rules_path(&self) -> std::path::PathBuf {
255        let snapshot = self.engine.load();
256        let engine = snapshot.lock().unwrap();
257        engine.rules_path().to_path_buf()
258    }
259
260    /// Return a reference to the metrics hook.
261    pub fn metrics(&self) -> &dyn MetricsHook {
262        &*self.metrics
263    }
264
265    /// Export correlation state from the current engine.
266    pub fn export_state(&self) -> Option<rsigma_eval::CorrelationSnapshot> {
267        let snapshot = self.engine.load();
268        let engine = snapshot.lock().unwrap();
269        engine.export_state()
270    }
271
272    /// Import correlation state into the current engine.
273    pub fn import_state(&self, snapshot: &rsigma_eval::CorrelationSnapshot) -> bool {
274        let guard = self.engine.load();
275        let mut engine = guard.lock().unwrap();
276        engine.import_state(snapshot)
277    }
278
279    /// Return summary statistics about the current engine.
280    pub fn stats(&self) -> crate::engine::EngineStats {
281        let snapshot = self.engine.load();
282        let engine = snapshot.lock().unwrap();
283        engine.stats()
284    }
285}
286
287/// Produce a vec of empty `ProcessResult`, one per input line.
288fn empty_results(count: usize) -> Vec<ProcessResult> {
289    (0..count)
290        .map(|_| ProcessResult {
291            detections: vec![],
292            correlations: vec![],
293        })
294        .collect()
295}
296
297#[cfg(test)]
298mod tests {
299    use super::*;
300    use crate::metrics::NoopMetrics;
301    use rsigma_eval::CorrelationConfig;
302
303    fn identity_filter(v: &serde_json::Value) -> Vec<serde_json::Value> {
304        vec![v.clone()]
305    }
306
307    fn make_processor(rules_yaml: &str) -> LogProcessor {
308        let dir = tempfile::tempdir().unwrap();
309        let rule_path = dir.path().join("test.yml");
310        std::fs::write(&rule_path, rules_yaml).unwrap();
311
312        let mut engine = RuntimeEngine::new(rule_path, vec![], CorrelationConfig::default(), false);
313        engine.load_rules().unwrap();
314        // Leak the tempdir so the path stays valid
315        std::mem::forget(dir);
316        LogProcessor::new(engine, Arc::new(NoopMetrics))
317    }
318
319    #[test]
320    fn process_batch_lines_valid_json() {
321        let proc = make_processor(
322            r#"
323title: Test Rule
324status: test
325logsource:
326    category: test
327detection:
328    selection:
329        EventID: 1
330    condition: selection
331"#,
332        );
333
334        let batch = vec![
335            r#"{"EventID": 1}"#.to_string(),
336            r#"{"EventID": 2}"#.to_string(),
337        ];
338        let results = proc.process_batch_lines(&batch, &identity_filter);
339        assert_eq!(results.len(), 2);
340        assert!(!results[0].detections.is_empty(), "EventID=1 should match");
341        assert!(
342            results[1].detections.is_empty(),
343            "EventID=2 should not match"
344        );
345    }
346
347    #[test]
348    fn process_batch_lines_invalid_json() {
349        let proc = make_processor(
350            r#"
351title: Test Rule
352status: test
353logsource:
354    category: test
355detection:
356    selection:
357        EventID: 1
358    condition: selection
359"#,
360        );
361
362        let batch = vec!["not json".to_string(), r#"{"EventID": 1}"#.to_string()];
363        let results = proc.process_batch_lines(&batch, &identity_filter);
364        assert_eq!(results.len(), 2);
365        assert!(
366            results[0].detections.is_empty(),
367            "invalid JSON produces empty result"
368        );
369        assert!(
370            !results[1].detections.is_empty(),
371            "valid line still matches"
372        );
373    }
374
375    #[test]
376    fn swap_engine_replaces_rules() {
377        let dir = tempfile::tempdir().unwrap();
378        let rule_path = dir.path().join("test.yml");
379        std::fs::write(
380            &rule_path,
381            r#"
382title: Rule A
383status: test
384logsource:
385    category: test
386detection:
387    selection:
388        EventID: 1
389    condition: selection
390"#,
391        )
392        .unwrap();
393
394        let mut engine = RuntimeEngine::new(
395            rule_path.clone(),
396            vec![],
397            CorrelationConfig::default(),
398            false,
399        );
400        engine.load_rules().unwrap();
401        let proc = LogProcessor::new(engine, Arc::new(NoopMetrics));
402
403        let batch = vec![r#"{"EventID": 1}"#.to_string()];
404        assert!(
405            !proc.process_batch_lines(&batch, &identity_filter)[0]
406                .detections
407                .is_empty()
408        );
409
410        // Swap to a rule that matches EventID: 99
411        std::fs::write(
412            &rule_path,
413            r#"
414title: Rule B
415status: test
416logsource:
417    category: test
418detection:
419    selection:
420        EventID: 99
421    condition: selection
422"#,
423        )
424        .unwrap();
425
426        let mut new_engine =
427            RuntimeEngine::new(rule_path, vec![], CorrelationConfig::default(), false);
428        new_engine.load_rules().unwrap();
429        proc.swap_engine(new_engine);
430
431        assert!(
432            proc.process_batch_lines(&batch, &identity_filter)[0]
433                .detections
434                .is_empty()
435        );
436
437        let batch2 = vec![r#"{"EventID": 99}"#.to_string()];
438        assert!(
439            !proc.process_batch_lines(&batch2, &identity_filter)[0]
440                .detections
441                .is_empty()
442        );
443
444        std::mem::forget(dir);
445    }
446
447    #[test]
448    fn reload_rules_preserves_engine() {
449        let dir = tempfile::tempdir().unwrap();
450        let rule_path = dir.path().join("test.yml");
451        std::fs::write(
452            &rule_path,
453            r#"
454title: Rule A
455status: test
456logsource:
457    category: test
458detection:
459    selection:
460        EventID: 1
461    condition: selection
462"#,
463        )
464        .unwrap();
465
466        let mut engine = RuntimeEngine::new(
467            rule_path.clone(),
468            vec![],
469            CorrelationConfig::default(),
470            false,
471        );
472        engine.load_rules().unwrap();
473        let proc = LogProcessor::new(engine, Arc::new(NoopMetrics));
474
475        let batch = vec![r#"{"EventID": 1}"#.to_string()];
476        assert!(
477            !proc.process_batch_lines(&batch, &identity_filter)[0]
478                .detections
479                .is_empty()
480        );
481
482        // Update the rule file and reload
483        std::fs::write(
484            &rule_path,
485            r#"
486title: Rule B
487status: test
488logsource:
489    category: test
490detection:
491    selection:
492        EventID: 42
493    condition: selection
494"#,
495        )
496        .unwrap();
497
498        let stats = proc.reload_rules().unwrap();
499        assert_eq!(stats.detection_rules, 1);
500
501        // Old rule should no longer match
502        assert!(
503            proc.process_batch_lines(&batch, &identity_filter)[0]
504                .detections
505                .is_empty()
506        );
507        // New rule should match
508        let batch2 = vec![r#"{"EventID": 42}"#.to_string()];
509        assert!(
510            !proc.process_batch_lines(&batch2, &identity_filter)[0]
511                .detections
512                .is_empty()
513        );
514
515        std::mem::forget(dir);
516    }
517
518    #[test]
519    fn custom_event_filter() {
520        let proc = make_processor(
521            r#"
522title: Test Rule
523status: test
524logsource:
525    category: test
526detection:
527    selection:
528        EventID: 1
529    condition: selection
530"#,
531        );
532
533        // Filter that extracts a nested "records" array
534        let filter = |v: &serde_json::Value| -> Vec<serde_json::Value> {
535            if let Some(records) = v.get("records").and_then(|r| r.as_array()) {
536                records.clone()
537            } else {
538                vec![v.clone()]
539            }
540        };
541
542        let batch = vec![r#"{"records": [{"EventID": 1}, {"EventID": 2}]}"#.to_string()];
543        let results = proc.process_batch_lines(&batch, &filter);
544        assert_eq!(results.len(), 1);
545        assert_eq!(
546            results[0].detections.len(),
547            1,
548            "only EventID=1 from records array should match"
549        );
550    }
551
552    #[test]
553    fn empty_batch_returns_empty() {
554        let proc = make_processor(
555            r#"
556title: Test Rule
557status: test
558logsource:
559    category: test
560detection:
561    selection:
562        EventID: 1
563    condition: selection
564"#,
565        );
566
567        let batch: Vec<String> = vec![];
568        let results = proc.process_batch_lines(&batch, &identity_filter);
569        assert!(results.is_empty());
570    }
571
572    /// Verify MetricsHook is called correctly during processing.
573    #[test]
574    fn metrics_hook_invocations() {
575        use std::sync::atomic::{AtomicU64, Ordering};
576
577        struct CountingMetrics {
578            parse_errors: AtomicU64,
579            events_processed: AtomicU64,
580            detection_matches: AtomicU64,
581        }
582
583        impl MetricsHook for CountingMetrics {
584            fn on_parse_error(&self) {
585                self.parse_errors.fetch_add(1, Ordering::Relaxed);
586            }
587            fn on_events_processed(&self, count: u64) {
588                self.events_processed.fetch_add(count, Ordering::Relaxed);
589            }
590            fn on_detection_matches(&self, count: u64) {
591                self.detection_matches.fetch_add(count, Ordering::Relaxed);
592            }
593            fn on_correlation_matches(&self, _: u64) {}
594            fn observe_processing_latency(&self, _: f64) {}
595            fn on_input_queue_depth_change(&self, _: i64) {}
596            fn on_back_pressure(&self) {}
597            fn observe_batch_size(&self, _: u64) {}
598            fn on_output_queue_depth_change(&self, _: i64) {}
599            fn observe_pipeline_latency(&self, _: f64) {}
600            fn set_correlation_state_entries(&self, _: u64) {}
601        }
602
603        let dir = tempfile::tempdir().unwrap();
604        let rule_path = dir.path().join("test.yml");
605        std::fs::write(
606            &rule_path,
607            r#"
608title: Test Rule
609status: test
610logsource:
611    category: test
612detection:
613    selection:
614        EventID: 1
615    condition: selection
616"#,
617        )
618        .unwrap();
619
620        let mut engine = RuntimeEngine::new(rule_path, vec![], CorrelationConfig::default(), false);
621        engine.load_rules().unwrap();
622
623        let metrics = Arc::new(CountingMetrics {
624            parse_errors: AtomicU64::new(0),
625            events_processed: AtomicU64::new(0),
626            detection_matches: AtomicU64::new(0),
627        });
628        let proc = LogProcessor::new(engine, metrics.clone());
629
630        let batch = vec![
631            "not json".to_string(),
632            r#"{"EventID": 1}"#.to_string(),
633            r#"{"EventID": 2}"#.to_string(),
634        ];
635        proc.process_batch_lines(&batch, &identity_filter);
636
637        assert_eq!(metrics.parse_errors.load(Ordering::Relaxed), 1);
638        assert_eq!(metrics.events_processed.load(Ordering::Relaxed), 2);
639        assert_eq!(metrics.detection_matches.load(Ordering::Relaxed), 1);
640
641        std::mem::forget(dir);
642    }
643
644    /// Verify concurrent processing and swap don't panic (basic thread safety).
645    #[test]
646    fn concurrent_swap_and_process() {
647        let dir = tempfile::tempdir().unwrap();
648        let rule_path = dir.path().join("test.yml");
649        std::fs::write(
650            &rule_path,
651            r#"
652title: Rule A
653status: test
654logsource:
655    category: test
656detection:
657    selection:
658        EventID: 1
659    condition: selection
660"#,
661        )
662        .unwrap();
663
664        let mut engine = RuntimeEngine::new(
665            rule_path.clone(),
666            vec![],
667            CorrelationConfig::default(),
668            false,
669        );
670        engine.load_rules().unwrap();
671        let proc = Arc::new(LogProcessor::new(engine, Arc::new(NoopMetrics)));
672
673        let handles: Vec<_> = (0..4)
674            .map(|i| {
675                let proc = proc.clone();
676                let rule_path = rule_path.clone();
677                std::thread::spawn(move || {
678                    let batch = vec![r#"{"EventID": 1}"#.to_string()];
679                    for _ in 0..100 {
680                        let _ = proc.process_batch_lines(&batch, &identity_filter);
681                    }
682                    // Thread 0 does a swap mid-flight
683                    if i == 0 {
684                        let mut new_engine = RuntimeEngine::new(
685                            rule_path,
686                            vec![],
687                            CorrelationConfig::default(),
688                            false,
689                        );
690                        new_engine.load_rules().unwrap();
691                        proc.swap_engine(new_engine);
692                    }
693                })
694            })
695            .collect();
696
697        for h in handles {
698            h.join().unwrap();
699        }
700
701        std::mem::forget(dir);
702    }
703
704    // --- Tests for process_batch_with_format ---
705
706    #[test]
707    fn format_json_matches() {
708        let proc = make_processor(
709            r#"
710title: Test Rule
711status: test
712logsource:
713    category: test
714detection:
715    selection:
716        EventID: 1
717    condition: selection
718"#,
719        );
720
721        let batch = vec![r#"{"EventID": 1}"#.to_string()];
722        let results = proc.process_batch_with_format(&batch, &InputFormat::Json, None);
723        assert_eq!(results.len(), 1);
724        assert!(
725            !results[0].detections.is_empty(),
726            "JSON EventID=1 should match"
727        );
728    }
729
730    #[test]
731    fn format_syslog_extracts_fields() {
732        let proc = make_processor(
733            r#"
734title: Syslog Test
735status: test
736logsource:
737    category: test
738detection:
739    selection:
740        hostname: mymachine
741    condition: selection
742"#,
743        );
744
745        let batch = vec!["<34>Oct 11 22:14:15 mymachine su: test message".to_string()];
746        let results = proc.process_batch_with_format(
747            &batch,
748            &InputFormat::Syslog(crate::input::SyslogConfig::default()),
749            None,
750        );
751        assert_eq!(results.len(), 1);
752        assert!(
753            !results[0].detections.is_empty(),
754            "syslog hostname=mymachine should match"
755        );
756    }
757
758    #[test]
759    fn format_plain_keyword_match() {
760        let proc = make_processor(
761            r#"
762title: Keyword Test
763status: test
764logsource:
765    category: test
766detection:
767    keywords:
768        - "disk full"
769    condition: keywords
770"#,
771        );
772
773        let batch = vec!["ERROR: disk full on /dev/sda1".to_string()];
774        let results = proc.process_batch_with_format(&batch, &InputFormat::Plain, None);
775        assert_eq!(results.len(), 1);
776        assert!(
777            !results[0].detections.is_empty(),
778            "plain keyword 'disk full' should match"
779        );
780    }
781
782    #[test]
783    fn format_auto_detects_json() {
784        let proc = make_processor(
785            r#"
786title: Test Rule
787status: test
788logsource:
789    category: test
790detection:
791    selection:
792        EventID: 1
793    condition: selection
794"#,
795        );
796
797        let batch = vec![r#"{"EventID": 1}"#.to_string()];
798        let results = proc.process_batch_with_format(&batch, &InputFormat::default(), None);
799        assert_eq!(results.len(), 1);
800        assert!(!results[0].detections.is_empty());
801    }
802
803    #[test]
804    fn format_json_with_event_filter() {
805        let proc = make_processor(
806            r#"
807title: Test Rule
808status: test
809logsource:
810    category: test
811detection:
812    selection:
813        EventID: 1
814    condition: selection
815"#,
816        );
817
818        let filter = |v: &serde_json::Value| -> Vec<serde_json::Value> {
819            if let Some(records) = v.get("records").and_then(|r| r.as_array()) {
820                records.clone()
821            } else {
822                vec![v.clone()]
823            }
824        };
825
826        let batch = vec![r#"{"records": [{"EventID": 1}, {"EventID": 2}]}"#.to_string()];
827        let results = proc.process_batch_with_format(&batch, &InputFormat::Json, Some(&filter));
828        assert_eq!(results.len(), 1);
829        assert_eq!(
830            results[0].detections.len(),
831            1,
832            "only EventID=1 from records array should match"
833        );
834    }
835
836    #[test]
837    fn format_empty_lines_skipped() {
838        let proc = make_processor(
839            r#"
840title: Test Rule
841status: test
842logsource:
843    category: test
844detection:
845    selection:
846        EventID: 1
847    condition: selection
848"#,
849        );
850
851        let batch = vec![
852            "".to_string(),
853            "   ".to_string(),
854            r#"{"EventID": 1}"#.to_string(),
855        ];
856        let results = proc.process_batch_with_format(&batch, &InputFormat::Json, None);
857        assert_eq!(results.len(), 3);
858        assert!(results[0].detections.is_empty());
859        assert!(results[1].detections.is_empty());
860        assert!(!results[2].detections.is_empty());
861    }
862
863    #[cfg(feature = "logfmt")]
864    #[test]
865    fn format_logfmt_matches() {
866        let proc = make_processor(
867            r#"
868title: Logfmt Test
869status: test
870logsource:
871    category: test
872detection:
873    selection:
874        level: error
875    condition: selection
876"#,
877        );
878
879        let batch = vec!["level=error msg=something host=web01".to_string()];
880        let results = proc.process_batch_with_format(&batch, &InputFormat::Logfmt, None);
881        assert_eq!(results.len(), 1);
882        assert!(
883            !results[0].detections.is_empty(),
884            "logfmt level=error should match"
885        );
886    }
887
888    #[cfg(feature = "cef")]
889    #[test]
890    fn format_cef_matches() {
891        let proc = make_processor(
892            r#"
893title: CEF Test
894status: test
895logsource:
896    category: test
897detection:
898    selection:
899        deviceVendor: Security
900    condition: selection
901"#,
902        );
903
904        let batch = vec!["CEF:0|Security|IDS|1.0|100|Attack|9|src=10.0.0.1".to_string()];
905        let results = proc.process_batch_with_format(&batch, &InputFormat::Cef, None);
906        assert_eq!(results.len(), 1);
907        assert!(
908            !results[0].detections.is_empty(),
909            "CEF deviceVendor=Security should match"
910        );
911    }
912}