Skip to main content

rsigma_runtime/
processor.rs

1use std::sync::{Arc, Mutex};
2use std::time::Instant;
3
4use arc_swap::ArcSwap;
5use rsigma_eval::{Event, JsonEvent, ProcessResult};
6
7use crate::engine::RuntimeEngine;
8use crate::input::{EventInputDecoded, InputFormat, parse_line};
9use crate::metrics::MetricsHook;
10
11/// Closure that extracts multiple payloads from a single JSON value.
12///
13/// Used by the daemon's event filter (e.g. jq/jsonpath) to explode a JSON
14/// object into sub-events (e.g. `.records[]`). Only applies to JSON input.
15pub type EventFilter = dyn Fn(&serde_json::Value) -> Vec<serde_json::Value>;
16
17/// Thread-safe handle to the engine, swappable atomically for hot-reload.
18///
19/// Uses `ArcSwap<Mutex<RuntimeEngine>>` so that:
20/// - Detection + correlation processing can acquire `&mut RuntimeEngine` via
21///   the inner `Mutex`.
22/// - Hot-reload swaps the entire engine atomically without blocking in-flight
23///   batches (they hold an `Arc` to the old engine until their batch completes).
24pub struct LogProcessor {
25    engine: Arc<ArcSwap<Mutex<RuntimeEngine>>>,
26    metrics: Arc<dyn MetricsHook>,
27}
28
29impl LogProcessor {
30    /// Create a new processor wrapping the given engine and metrics hook.
31    pub fn new(engine: RuntimeEngine, metrics: Arc<dyn MetricsHook>) -> Self {
32        LogProcessor {
33            engine: Arc::new(ArcSwap::from_pointee(Mutex::new(engine))),
34            metrics,
35        }
36    }
37
38    /// Atomically replace the engine with a new one.
39    ///
40    /// In-flight batches continue against the old engine (they hold an `Arc`
41    /// snapshot). New batches see the replacement on their next call to
42    /// `process_batch_lines`.
43    pub fn swap_engine(&self, new_engine: RuntimeEngine) {
44        self.engine.store(Arc::new(Mutex::new(new_engine)));
45    }
46
47    /// Load a snapshot of the current engine for use during reload.
48    ///
49    /// The caller can lock the returned guard to export state, build a new
50    /// engine, import state, and then call `swap_engine`.
51    pub fn engine_snapshot(&self) -> arc_swap::Guard<Arc<Mutex<RuntimeEngine>>> {
52        self.engine.load()
53    }
54
55    /// Process a batch of raw input lines through the engine.
56    ///
57    /// 1. Parses each line as JSON; on error, increments parse error metrics.
58    /// 2. Applies the `event_filter` closure to extract payloads.
59    /// 3. Evaluates all payloads via `RuntimeEngine::process_batch`.
60    /// 4. Merges per-payload results back into per-line results.
61    /// 5. Updates metrics (events processed, latency, match counts).
62    ///
63    /// Returns one `ProcessResult` per input line.
64    pub fn process_batch_lines(
65        &self,
66        batch: &[String],
67        event_filter: &EventFilter,
68    ) -> Vec<ProcessResult> {
69        let engine_guard = self.engine.load();
70        let mut engine = engine_guard.lock().unwrap();
71
72        // Phase 1: Parse JSON and apply event filters, tracking line origin.
73        let mut parsed: Vec<(usize, Vec<serde_json::Value>)> = Vec::with_capacity(batch.len());
74        for (line_idx, line) in batch.iter().enumerate() {
75            match serde_json::from_str::<serde_json::Value>(line) {
76                Ok(value) => {
77                    let payloads = event_filter(&value);
78                    if !payloads.is_empty() {
79                        parsed.push((line_idx, payloads));
80                    }
81                }
82                Err(e) => {
83                    self.metrics.on_parse_error();
84                    tracing::debug!(error = %e, "Invalid JSON on input");
85                }
86            }
87        }
88
89        // Flatten: (line_idx, &Value) for each payload across all lines
90        let mut flat: Vec<(usize, &serde_json::Value)> = Vec::new();
91        for (line_idx, payloads) in &parsed {
92            for payload in payloads {
93                flat.push((*line_idx, payload));
94            }
95        }
96
97        if flat.is_empty() {
98            return empty_results(batch.len());
99        }
100
101        // Phase 2: Batch evaluation — parallel detection + sequential correlation
102        let events: Vec<JsonEvent> = flat.iter().map(|(_, v)| JsonEvent::borrow(v)).collect();
103        let event_refs: Vec<&JsonEvent> = events.iter().collect();
104
105        let start = Instant::now();
106        let batch_results = engine.process_batch(&event_refs);
107        let elapsed = start.elapsed().as_secs_f64();
108        let per_event_latency = elapsed / event_refs.len() as f64;
109
110        // Update correlation state metrics while we still hold the lock
111        let stats = engine.stats();
112        self.metrics
113            .set_correlation_state_entries(stats.state_entries as u64);
114
115        // Phase 3: Merge results per input line and update metrics
116        let mut line_results = empty_results(batch.len());
117
118        for ((line_idx, _), result) in flat.iter().zip(batch_results) {
119            self.metrics.on_events_processed(1);
120            self.metrics.observe_processing_latency(per_event_latency);
121            self.metrics
122                .on_detection_matches(result.detections.len() as u64);
123            self.metrics
124                .on_correlation_matches(result.correlations.len() as u64);
125
126            for det in &result.detections {
127                let level_str = det.level.as_ref().map_or("unknown", |l| l.as_str());
128                self.metrics
129                    .on_detection_match_detail(&det.rule_title, level_str);
130            }
131            for cor in &result.correlations {
132                let level_str = cor.level.as_ref().map_or("unknown", |l| l.as_str());
133                self.metrics.on_correlation_match_detail(
134                    &cor.rule_title,
135                    level_str,
136                    cor.correlation_type.as_str(),
137                );
138            }
139
140            line_results[*line_idx].detections.extend(result.detections);
141            line_results[*line_idx]
142                .correlations
143                .extend(result.correlations);
144        }
145
146        line_results
147    }
148
149    /// Process a batch of raw input lines using the specified input format.
150    ///
151    /// Unlike [`process_batch_lines`](Self::process_batch_lines), this method
152    /// supports all input formats (JSON, syslog, plain, logfmt, CEF). The
153    /// `event_filter` only applies to JSON-decoded events (it extracts multiple
154    /// payloads from one JSON object, e.g. a `records[]` array). Non-JSON
155    /// formats produce exactly one event per line.
156    ///
157    /// Returns one `ProcessResult` per input line.
158    pub fn process_batch_with_format(
159        &self,
160        batch: &[String],
161        format: &InputFormat,
162        event_filter: Option<&EventFilter>,
163    ) -> Vec<ProcessResult> {
164        let engine_guard = self.engine.load();
165        let mut engine = engine_guard.lock().unwrap();
166
167        // Phase 1: Parse each line into decoded events, tracking line origin.
168        // For JSON with an event_filter, one line can produce multiple events.
169        let mut decoded_events: Vec<(usize, EventInputDecoded)> = Vec::with_capacity(batch.len());
170
171        for (line_idx, line) in batch.iter().enumerate() {
172            let Some(decoded) = parse_line(line, format) else {
173                if !line.trim().is_empty() {
174                    self.metrics.on_parse_error();
175                    tracing::debug!("Failed to parse input line");
176                }
177                continue;
178            };
179
180            // For JSON events with an event filter, apply the filter which
181            // may produce multiple payloads (e.g. `.records[]`).
182            if let Some(filter) = event_filter
183                && let EventInputDecoded::Json(ref json_event) = decoded
184            {
185                let json_value = json_event.to_json();
186                let payloads = filter(&json_value);
187                for payload in payloads {
188                    decoded_events
189                        .push((line_idx, EventInputDecoded::Json(JsonEvent::owned(payload))));
190                }
191                continue;
192            }
193
194            decoded_events.push((line_idx, decoded));
195        }
196
197        if decoded_events.is_empty() {
198            return empty_results(batch.len());
199        }
200
201        // Phase 2: Batch evaluation — parallel detection + sequential correlation
202        let event_refs: Vec<&EventInputDecoded> = decoded_events.iter().map(|(_, e)| e).collect();
203
204        let start = Instant::now();
205        let batch_results = engine.process_batch(&event_refs);
206        let elapsed = start.elapsed().as_secs_f64();
207        let per_event_latency = elapsed / event_refs.len() as f64;
208
209        let stats = engine.stats();
210        self.metrics
211            .set_correlation_state_entries(stats.state_entries as u64);
212
213        // Phase 3: Merge results per input line and update metrics
214        let mut line_results = empty_results(batch.len());
215
216        for ((line_idx, _), result) in decoded_events.iter().zip(batch_results) {
217            self.metrics.on_events_processed(1);
218            self.metrics.observe_processing_latency(per_event_latency);
219            self.metrics
220                .on_detection_matches(result.detections.len() as u64);
221            self.metrics
222                .on_correlation_matches(result.correlations.len() as u64);
223
224            for det in &result.detections {
225                let level_str = det.level.as_ref().map_or("unknown", |l| l.as_str());
226                self.metrics
227                    .on_detection_match_detail(&det.rule_title, level_str);
228            }
229            for cor in &result.correlations {
230                let level_str = cor.level.as_ref().map_or("unknown", |l| l.as_str());
231                self.metrics.on_correlation_match_detail(
232                    &cor.rule_title,
233                    level_str,
234                    cor.correlation_type.as_str(),
235                );
236            }
237
238            line_results[*line_idx].detections.extend(result.detections);
239            line_results[*line_idx]
240                .correlations
241                .extend(result.correlations);
242        }
243
244        line_results
245    }
246
247    /// Reload rules without blocking in-flight event processing.
248    ///
249    /// Builds a fresh `RuntimeEngine` with the same configuration as the
250    /// current one, loads rules into it, imports the old engine's correlation
251    /// state, and atomically swaps. In-flight batches that already hold an
252    /// `Arc` to the old engine finish undisturbed.
253    pub fn reload_rules(&self) -> Result<crate::engine::EngineStats, String> {
254        let (old_state, rules_path, pipelines, corr_config, include_event) = {
255            let snapshot = self.engine.load();
256            let old = snapshot.lock().unwrap();
257            (
258                old.export_state(),
259                old.rules_path().to_path_buf(),
260                old.pipelines().to_vec(),
261                old.corr_config().clone(),
262                old.include_event(),
263            )
264        };
265
266        let mut new_engine = RuntimeEngine::new(rules_path, pipelines, corr_config, include_event);
267        let stats = new_engine.load_rules()?;
268
269        if let Some(state) = old_state
270            && !new_engine.import_state(&state)
271        {
272            tracing::warn!(
273                "Incompatible correlation snapshot version during reload, starting fresh"
274            );
275        }
276
277        self.swap_engine(new_engine);
278        Ok(stats)
279    }
280
281    /// Return the rules path from the current engine.
282    pub fn rules_path(&self) -> std::path::PathBuf {
283        let snapshot = self.engine.load();
284        let engine = snapshot.lock().unwrap();
285        engine.rules_path().to_path_buf()
286    }
287
288    /// Return a reference to the metrics hook.
289    pub fn metrics(&self) -> &dyn MetricsHook {
290        &*self.metrics
291    }
292
293    /// Export correlation state from the current engine.
294    pub fn export_state(&self) -> Option<rsigma_eval::CorrelationSnapshot> {
295        let snapshot = self.engine.load();
296        let engine = snapshot.lock().unwrap();
297        engine.export_state()
298    }
299
300    /// Import correlation state into the current engine.
301    pub fn import_state(&self, snapshot: &rsigma_eval::CorrelationSnapshot) -> bool {
302        let guard = self.engine.load();
303        let mut engine = guard.lock().unwrap();
304        engine.import_state(snapshot)
305    }
306
307    /// Return summary statistics about the current engine.
308    pub fn stats(&self) -> crate::engine::EngineStats {
309        let snapshot = self.engine.load();
310        let engine = snapshot.lock().unwrap();
311        engine.stats()
312    }
313}
314
315/// Produce a vec of empty `ProcessResult`, one per input line.
316fn empty_results(count: usize) -> Vec<ProcessResult> {
317    (0..count)
318        .map(|_| ProcessResult {
319            detections: vec![],
320            correlations: vec![],
321        })
322        .collect()
323}
324
325#[cfg(test)]
326mod tests {
327    use super::*;
328    use crate::metrics::NoopMetrics;
329    use rsigma_eval::CorrelationConfig;
330
331    fn identity_filter(v: &serde_json::Value) -> Vec<serde_json::Value> {
332        vec![v.clone()]
333    }
334
335    fn make_processor(rules_yaml: &str) -> LogProcessor {
336        let dir = tempfile::tempdir().unwrap();
337        let rule_path = dir.path().join("test.yml");
338        std::fs::write(&rule_path, rules_yaml).unwrap();
339
340        let mut engine = RuntimeEngine::new(rule_path, vec![], CorrelationConfig::default(), false);
341        engine.load_rules().unwrap();
342        // Leak the tempdir so the path stays valid
343        std::mem::forget(dir);
344        LogProcessor::new(engine, Arc::new(NoopMetrics))
345    }
346
347    #[test]
348    fn process_batch_lines_valid_json() {
349        let proc = make_processor(
350            r#"
351title: Test Rule
352status: test
353logsource:
354    category: test
355detection:
356    selection:
357        EventID: 1
358    condition: selection
359"#,
360        );
361
362        let batch = vec![
363            r#"{"EventID": 1}"#.to_string(),
364            r#"{"EventID": 2}"#.to_string(),
365        ];
366        let results = proc.process_batch_lines(&batch, &identity_filter);
367        assert_eq!(results.len(), 2);
368        assert!(!results[0].detections.is_empty(), "EventID=1 should match");
369        assert!(
370            results[1].detections.is_empty(),
371            "EventID=2 should not match"
372        );
373    }
374
375    #[test]
376    fn process_batch_lines_invalid_json() {
377        let proc = make_processor(
378            r#"
379title: Test Rule
380status: test
381logsource:
382    category: test
383detection:
384    selection:
385        EventID: 1
386    condition: selection
387"#,
388        );
389
390        let batch = vec!["not json".to_string(), r#"{"EventID": 1}"#.to_string()];
391        let results = proc.process_batch_lines(&batch, &identity_filter);
392        assert_eq!(results.len(), 2);
393        assert!(
394            results[0].detections.is_empty(),
395            "invalid JSON produces empty result"
396        );
397        assert!(
398            !results[1].detections.is_empty(),
399            "valid line still matches"
400        );
401    }
402
403    #[test]
404    fn swap_engine_replaces_rules() {
405        let dir = tempfile::tempdir().unwrap();
406        let rule_path = dir.path().join("test.yml");
407        std::fs::write(
408            &rule_path,
409            r#"
410title: Rule A
411status: test
412logsource:
413    category: test
414detection:
415    selection:
416        EventID: 1
417    condition: selection
418"#,
419        )
420        .unwrap();
421
422        let mut engine = RuntimeEngine::new(
423            rule_path.clone(),
424            vec![],
425            CorrelationConfig::default(),
426            false,
427        );
428        engine.load_rules().unwrap();
429        let proc = LogProcessor::new(engine, Arc::new(NoopMetrics));
430
431        let batch = vec![r#"{"EventID": 1}"#.to_string()];
432        assert!(
433            !proc.process_batch_lines(&batch, &identity_filter)[0]
434                .detections
435                .is_empty()
436        );
437
438        // Swap to a rule that matches EventID: 99
439        std::fs::write(
440            &rule_path,
441            r#"
442title: Rule B
443status: test
444logsource:
445    category: test
446detection:
447    selection:
448        EventID: 99
449    condition: selection
450"#,
451        )
452        .unwrap();
453
454        let mut new_engine =
455            RuntimeEngine::new(rule_path, vec![], CorrelationConfig::default(), false);
456        new_engine.load_rules().unwrap();
457        proc.swap_engine(new_engine);
458
459        assert!(
460            proc.process_batch_lines(&batch, &identity_filter)[0]
461                .detections
462                .is_empty()
463        );
464
465        let batch2 = vec![r#"{"EventID": 99}"#.to_string()];
466        assert!(
467            !proc.process_batch_lines(&batch2, &identity_filter)[0]
468                .detections
469                .is_empty()
470        );
471
472        std::mem::forget(dir);
473    }
474
475    #[test]
476    fn reload_rules_preserves_engine() {
477        let dir = tempfile::tempdir().unwrap();
478        let rule_path = dir.path().join("test.yml");
479        std::fs::write(
480            &rule_path,
481            r#"
482title: Rule A
483status: test
484logsource:
485    category: test
486detection:
487    selection:
488        EventID: 1
489    condition: selection
490"#,
491        )
492        .unwrap();
493
494        let mut engine = RuntimeEngine::new(
495            rule_path.clone(),
496            vec![],
497            CorrelationConfig::default(),
498            false,
499        );
500        engine.load_rules().unwrap();
501        let proc = LogProcessor::new(engine, Arc::new(NoopMetrics));
502
503        let batch = vec![r#"{"EventID": 1}"#.to_string()];
504        assert!(
505            !proc.process_batch_lines(&batch, &identity_filter)[0]
506                .detections
507                .is_empty()
508        );
509
510        // Update the rule file and reload
511        std::fs::write(
512            &rule_path,
513            r#"
514title: Rule B
515status: test
516logsource:
517    category: test
518detection:
519    selection:
520        EventID: 42
521    condition: selection
522"#,
523        )
524        .unwrap();
525
526        let stats = proc.reload_rules().unwrap();
527        assert_eq!(stats.detection_rules, 1);
528
529        // Old rule should no longer match
530        assert!(
531            proc.process_batch_lines(&batch, &identity_filter)[0]
532                .detections
533                .is_empty()
534        );
535        // New rule should match
536        let batch2 = vec![r#"{"EventID": 42}"#.to_string()];
537        assert!(
538            !proc.process_batch_lines(&batch2, &identity_filter)[0]
539                .detections
540                .is_empty()
541        );
542
543        std::mem::forget(dir);
544    }
545
546    #[test]
547    fn custom_event_filter() {
548        let proc = make_processor(
549            r#"
550title: Test Rule
551status: test
552logsource:
553    category: test
554detection:
555    selection:
556        EventID: 1
557    condition: selection
558"#,
559        );
560
561        // Filter that extracts a nested "records" array
562        let filter = |v: &serde_json::Value| -> Vec<serde_json::Value> {
563            if let Some(records) = v.get("records").and_then(|r| r.as_array()) {
564                records.clone()
565            } else {
566                vec![v.clone()]
567            }
568        };
569
570        let batch = vec![r#"{"records": [{"EventID": 1}, {"EventID": 2}]}"#.to_string()];
571        let results = proc.process_batch_lines(&batch, &filter);
572        assert_eq!(results.len(), 1);
573        assert_eq!(
574            results[0].detections.len(),
575            1,
576            "only EventID=1 from records array should match"
577        );
578    }
579
580    #[test]
581    fn empty_batch_returns_empty() {
582        let proc = make_processor(
583            r#"
584title: Test Rule
585status: test
586logsource:
587    category: test
588detection:
589    selection:
590        EventID: 1
591    condition: selection
592"#,
593        );
594
595        let batch: Vec<String> = vec![];
596        let results = proc.process_batch_lines(&batch, &identity_filter);
597        assert!(results.is_empty());
598    }
599
600    /// Verify MetricsHook is called correctly during processing.
601    #[test]
602    fn metrics_hook_invocations() {
603        use std::sync::atomic::{AtomicU64, Ordering};
604
605        struct CountingMetrics {
606            parse_errors: AtomicU64,
607            events_processed: AtomicU64,
608            detection_matches: AtomicU64,
609        }
610
611        impl MetricsHook for CountingMetrics {
612            fn on_parse_error(&self) {
613                self.parse_errors.fetch_add(1, Ordering::Relaxed);
614            }
615            fn on_events_processed(&self, count: u64) {
616                self.events_processed.fetch_add(count, Ordering::Relaxed);
617            }
618            fn on_detection_matches(&self, count: u64) {
619                self.detection_matches.fetch_add(count, Ordering::Relaxed);
620            }
621            fn on_correlation_matches(&self, _: u64) {}
622            fn observe_processing_latency(&self, _: f64) {}
623            fn on_input_queue_depth_change(&self, _: i64) {}
624            fn on_back_pressure(&self) {}
625            fn observe_batch_size(&self, _: u64) {}
626            fn on_output_queue_depth_change(&self, _: i64) {}
627            fn observe_pipeline_latency(&self, _: f64) {}
628            fn set_correlation_state_entries(&self, _: u64) {}
629        }
630
631        let dir = tempfile::tempdir().unwrap();
632        let rule_path = dir.path().join("test.yml");
633        std::fs::write(
634            &rule_path,
635            r#"
636title: Test Rule
637status: test
638logsource:
639    category: test
640detection:
641    selection:
642        EventID: 1
643    condition: selection
644"#,
645        )
646        .unwrap();
647
648        let mut engine = RuntimeEngine::new(rule_path, vec![], CorrelationConfig::default(), false);
649        engine.load_rules().unwrap();
650
651        let metrics = Arc::new(CountingMetrics {
652            parse_errors: AtomicU64::new(0),
653            events_processed: AtomicU64::new(0),
654            detection_matches: AtomicU64::new(0),
655        });
656        let proc = LogProcessor::new(engine, metrics.clone());
657
658        let batch = vec![
659            "not json".to_string(),
660            r#"{"EventID": 1}"#.to_string(),
661            r#"{"EventID": 2}"#.to_string(),
662        ];
663        proc.process_batch_lines(&batch, &identity_filter);
664
665        assert_eq!(metrics.parse_errors.load(Ordering::Relaxed), 1);
666        assert_eq!(metrics.events_processed.load(Ordering::Relaxed), 2);
667        assert_eq!(metrics.detection_matches.load(Ordering::Relaxed), 1);
668
669        std::mem::forget(dir);
670    }
671
672    /// Verify concurrent processing and swap don't panic (basic thread safety).
673    #[test]
674    fn concurrent_swap_and_process() {
675        let dir = tempfile::tempdir().unwrap();
676        let rule_path = dir.path().join("test.yml");
677        std::fs::write(
678            &rule_path,
679            r#"
680title: Rule A
681status: test
682logsource:
683    category: test
684detection:
685    selection:
686        EventID: 1
687    condition: selection
688"#,
689        )
690        .unwrap();
691
692        let mut engine = RuntimeEngine::new(
693            rule_path.clone(),
694            vec![],
695            CorrelationConfig::default(),
696            false,
697        );
698        engine.load_rules().unwrap();
699        let proc = Arc::new(LogProcessor::new(engine, Arc::new(NoopMetrics)));
700
701        let handles: Vec<_> = (0..4)
702            .map(|i| {
703                let proc = proc.clone();
704                let rule_path = rule_path.clone();
705                std::thread::spawn(move || {
706                    let batch = vec![r#"{"EventID": 1}"#.to_string()];
707                    for _ in 0..100 {
708                        let _ = proc.process_batch_lines(&batch, &identity_filter);
709                    }
710                    // Thread 0 does a swap mid-flight
711                    if i == 0 {
712                        let mut new_engine = RuntimeEngine::new(
713                            rule_path,
714                            vec![],
715                            CorrelationConfig::default(),
716                            false,
717                        );
718                        new_engine.load_rules().unwrap();
719                        proc.swap_engine(new_engine);
720                    }
721                })
722            })
723            .collect();
724
725        for h in handles {
726            h.join().unwrap();
727        }
728
729        std::mem::forget(dir);
730    }
731
732    // --- Tests for process_batch_with_format ---
733
734    #[test]
735    fn format_json_matches() {
736        let proc = make_processor(
737            r#"
738title: Test Rule
739status: test
740logsource:
741    category: test
742detection:
743    selection:
744        EventID: 1
745    condition: selection
746"#,
747        );
748
749        let batch = vec![r#"{"EventID": 1}"#.to_string()];
750        let results = proc.process_batch_with_format(&batch, &InputFormat::Json, None);
751        assert_eq!(results.len(), 1);
752        assert!(
753            !results[0].detections.is_empty(),
754            "JSON EventID=1 should match"
755        );
756    }
757
758    #[test]
759    fn format_syslog_extracts_fields() {
760        let proc = make_processor(
761            r#"
762title: Syslog Test
763status: test
764logsource:
765    category: test
766detection:
767    selection:
768        hostname: mymachine
769    condition: selection
770"#,
771        );
772
773        let batch = vec!["<34>Oct 11 22:14:15 mymachine su: test message".to_string()];
774        let results = proc.process_batch_with_format(
775            &batch,
776            &InputFormat::Syslog(crate::input::SyslogConfig::default()),
777            None,
778        );
779        assert_eq!(results.len(), 1);
780        assert!(
781            !results[0].detections.is_empty(),
782            "syslog hostname=mymachine should match"
783        );
784    }
785
786    #[test]
787    fn format_plain_keyword_match() {
788        let proc = make_processor(
789            r#"
790title: Keyword Test
791status: test
792logsource:
793    category: test
794detection:
795    keywords:
796        - "disk full"
797    condition: keywords
798"#,
799        );
800
801        let batch = vec!["ERROR: disk full on /dev/sda1".to_string()];
802        let results = proc.process_batch_with_format(&batch, &InputFormat::Plain, None);
803        assert_eq!(results.len(), 1);
804        assert!(
805            !results[0].detections.is_empty(),
806            "plain keyword 'disk full' should match"
807        );
808    }
809
810    #[test]
811    fn format_auto_detects_json() {
812        let proc = make_processor(
813            r#"
814title: Test Rule
815status: test
816logsource:
817    category: test
818detection:
819    selection:
820        EventID: 1
821    condition: selection
822"#,
823        );
824
825        let batch = vec![r#"{"EventID": 1}"#.to_string()];
826        let results = proc.process_batch_with_format(&batch, &InputFormat::default(), None);
827        assert_eq!(results.len(), 1);
828        assert!(!results[0].detections.is_empty());
829    }
830
831    #[test]
832    fn format_json_with_event_filter() {
833        let proc = make_processor(
834            r#"
835title: Test Rule
836status: test
837logsource:
838    category: test
839detection:
840    selection:
841        EventID: 1
842    condition: selection
843"#,
844        );
845
846        let filter = |v: &serde_json::Value| -> Vec<serde_json::Value> {
847            if let Some(records) = v.get("records").and_then(|r| r.as_array()) {
848                records.clone()
849            } else {
850                vec![v.clone()]
851            }
852        };
853
854        let batch = vec![r#"{"records": [{"EventID": 1}, {"EventID": 2}]}"#.to_string()];
855        let results = proc.process_batch_with_format(&batch, &InputFormat::Json, Some(&filter));
856        assert_eq!(results.len(), 1);
857        assert_eq!(
858            results[0].detections.len(),
859            1,
860            "only EventID=1 from records array should match"
861        );
862    }
863
864    #[test]
865    fn format_empty_lines_skipped() {
866        let proc = make_processor(
867            r#"
868title: Test Rule
869status: test
870logsource:
871    category: test
872detection:
873    selection:
874        EventID: 1
875    condition: selection
876"#,
877        );
878
879        let batch = vec![
880            "".to_string(),
881            "   ".to_string(),
882            r#"{"EventID": 1}"#.to_string(),
883        ];
884        let results = proc.process_batch_with_format(&batch, &InputFormat::Json, None);
885        assert_eq!(results.len(), 3);
886        assert!(results[0].detections.is_empty());
887        assert!(results[1].detections.is_empty());
888        assert!(!results[2].detections.is_empty());
889    }
890
891    #[cfg(feature = "logfmt")]
892    #[test]
893    fn format_logfmt_matches() {
894        let proc = make_processor(
895            r#"
896title: Logfmt Test
897status: test
898logsource:
899    category: test
900detection:
901    selection:
902        level: error
903    condition: selection
904"#,
905        );
906
907        let batch = vec!["level=error msg=something host=web01".to_string()];
908        let results = proc.process_batch_with_format(&batch, &InputFormat::Logfmt, None);
909        assert_eq!(results.len(), 1);
910        assert!(
911            !results[0].detections.is_empty(),
912            "logfmt level=error should match"
913        );
914    }
915
916    #[cfg(feature = "cef")]
917    #[test]
918    fn format_cef_matches() {
919        let proc = make_processor(
920            r#"
921title: CEF Test
922status: test
923logsource:
924    category: test
925detection:
926    selection:
927        deviceVendor: Security
928    condition: selection
929"#,
930        );
931
932        let batch = vec!["CEF:0|Security|IDS|1.0|100|Attack|9|src=10.0.0.1".to_string()];
933        let results = proc.process_batch_with_format(&batch, &InputFormat::Cef, None);
934        assert_eq!(results.len(), 1);
935        assert!(
936            !results[0].detections.is_empty(),
937            "CEF deviceVendor=Security should match"
938        );
939    }
940}