1use std::sync::Arc;
2
3use parking_lot::Mutex;
4use std::time::Instant;
5
6use arc_swap::ArcSwap;
7use rsigma_eval::{Event, JsonEvent, ProcessResult};
8
9use crate::engine::RuntimeEngine;
10use crate::input::{EventInputDecoded, InputFormat, parse_line};
11use crate::metrics::MetricsHook;
12
13pub type EventFilter = dyn Fn(&serde_json::Value) -> Vec<serde_json::Value>;
18
19pub struct LogProcessor {
27 engine: Arc<ArcSwap<Mutex<RuntimeEngine>>>,
28 metrics: Arc<dyn MetricsHook>,
29}
30
31impl LogProcessor {
32 pub fn new(engine: RuntimeEngine, metrics: Arc<dyn MetricsHook>) -> Self {
34 LogProcessor {
35 engine: Arc::new(ArcSwap::from_pointee(Mutex::new(engine))),
36 metrics,
37 }
38 }
39
40 pub fn swap_engine(&self, new_engine: RuntimeEngine) {
46 self.engine.store(Arc::new(Mutex::new(new_engine)));
47 }
48
49 pub fn engine_snapshot(&self) -> arc_swap::Guard<Arc<Mutex<RuntimeEngine>>> {
54 self.engine.load()
55 }
56
57 pub fn process_batch_lines(
67 &self,
68 batch: &[String],
69 event_filter: &EventFilter,
70 ) -> Vec<ProcessResult> {
71 let engine_guard = self.engine.load();
72 let mut engine = engine_guard.lock();
73
74 let mut parsed: Vec<(usize, Vec<serde_json::Value>)> = Vec::with_capacity(batch.len());
76 for (line_idx, line) in batch.iter().enumerate() {
77 match serde_json::from_str::<serde_json::Value>(line) {
78 Ok(value) => {
79 let payloads = event_filter(&value);
80 if !payloads.is_empty() {
81 parsed.push((line_idx, payloads));
82 }
83 }
84 Err(e) => {
85 self.metrics.on_parse_error();
86 tracing::debug!(error = %e, "Invalid JSON on input");
87 }
88 }
89 }
90
91 let mut flat: Vec<(usize, &serde_json::Value)> = Vec::new();
93 for (line_idx, payloads) in &parsed {
94 for payload in payloads {
95 flat.push((*line_idx, payload));
96 }
97 }
98
99 if flat.is_empty() {
100 return empty_results(batch.len());
101 }
102
103 let events: Vec<JsonEvent> = flat.iter().map(|(_, v)| JsonEvent::borrow(v)).collect();
105 let event_refs: Vec<&JsonEvent> = events.iter().collect();
106
107 let start = Instant::now();
108 let batch_results = engine.process_batch(&event_refs);
109 let elapsed = start.elapsed().as_secs_f64();
110 let per_event_latency = elapsed / event_refs.len() as f64;
111
112 let stats = engine.stats();
114 self.metrics
115 .set_correlation_state_entries(stats.state_entries as u64);
116
117 let mut line_results = empty_results(batch.len());
119
120 for ((line_idx, _), result) in flat.iter().zip(batch_results) {
121 self.metrics.on_events_processed(1);
122 self.metrics.observe_processing_latency(per_event_latency);
123 self.metrics
124 .on_detection_matches(result.detections.len() as u64);
125 self.metrics
126 .on_correlation_matches(result.correlations.len() as u64);
127
128 for det in &result.detections {
129 let level_str = det.level.as_ref().map_or("unknown", |l| l.as_str());
130 self.metrics
131 .on_detection_match_detail(&det.rule_title, level_str);
132 }
133 for cor in &result.correlations {
134 let level_str = cor.level.as_ref().map_or("unknown", |l| l.as_str());
135 self.metrics.on_correlation_match_detail(
136 &cor.rule_title,
137 level_str,
138 cor.correlation_type.as_str(),
139 );
140 }
141
142 line_results[*line_idx].detections.extend(result.detections);
143 line_results[*line_idx]
144 .correlations
145 .extend(result.correlations);
146 }
147
148 line_results
149 }
150
151 pub fn process_batch_with_format(
161 &self,
162 batch: &[String],
163 format: &InputFormat,
164 event_filter: Option<&EventFilter>,
165 ) -> Vec<ProcessResult> {
166 let engine_guard = self.engine.load();
167 let mut engine = engine_guard.lock();
168
169 let mut decoded_events: Vec<(usize, EventInputDecoded)> = Vec::with_capacity(batch.len());
172
173 for (line_idx, line) in batch.iter().enumerate() {
174 let Some(decoded) = parse_line(line, format) else {
175 if !line.trim().is_empty() {
176 self.metrics.on_parse_error();
177 tracing::debug!("Failed to parse input line");
178 }
179 continue;
180 };
181
182 if let Some(filter) = event_filter
185 && let EventInputDecoded::Json(ref json_event) = decoded
186 {
187 let json_value = json_event.to_json();
188 let payloads = filter(&json_value);
189 for payload in payloads {
190 decoded_events
191 .push((line_idx, EventInputDecoded::Json(JsonEvent::owned(payload))));
192 }
193 continue;
194 }
195
196 decoded_events.push((line_idx, decoded));
197 }
198
199 if decoded_events.is_empty() {
200 return empty_results(batch.len());
201 }
202
203 let event_refs: Vec<&EventInputDecoded> = decoded_events.iter().map(|(_, e)| e).collect();
205
206 let start = Instant::now();
207 let batch_results = engine.process_batch(&event_refs);
208 let elapsed = start.elapsed().as_secs_f64();
209 let per_event_latency = elapsed / event_refs.len() as f64;
210
211 let stats = engine.stats();
212 self.metrics
213 .set_correlation_state_entries(stats.state_entries as u64);
214
215 let mut line_results = empty_results(batch.len());
217
218 for ((line_idx, _), result) in decoded_events.iter().zip(batch_results) {
219 self.metrics.on_events_processed(1);
220 self.metrics.observe_processing_latency(per_event_latency);
221 self.metrics
222 .on_detection_matches(result.detections.len() as u64);
223 self.metrics
224 .on_correlation_matches(result.correlations.len() as u64);
225
226 for det in &result.detections {
227 let level_str = det.level.as_ref().map_or("unknown", |l| l.as_str());
228 self.metrics
229 .on_detection_match_detail(&det.rule_title, level_str);
230 }
231 for cor in &result.correlations {
232 let level_str = cor.level.as_ref().map_or("unknown", |l| l.as_str());
233 self.metrics.on_correlation_match_detail(
234 &cor.rule_title,
235 level_str,
236 cor.correlation_type.as_str(),
237 );
238 }
239
240 line_results[*line_idx].detections.extend(result.detections);
241 line_results[*line_idx]
242 .correlations
243 .extend(result.correlations);
244 }
245
246 line_results
247 }
248
249 pub fn reload_rules(&self) -> Result<crate::engine::EngineStats, String> {
259 let (
260 old_state,
261 rules_path,
262 pipelines,
263 pipeline_paths,
264 corr_config,
265 include_event,
266 resolver,
267 allow_remote_include,
268 ) = {
269 let snapshot = self.engine.load();
270 let old = snapshot.lock();
271 (
272 old.export_state(),
273 old.rules_path().to_path_buf(),
274 old.pipelines().to_vec(),
275 old.pipeline_paths().to_vec(),
276 old.corr_config().clone(),
277 old.include_event(),
278 old.source_resolver().cloned(),
279 old.allow_remote_include(),
280 )
281 };
282
283 let mut new_engine = RuntimeEngine::new(rules_path, pipelines, corr_config, include_event);
284 new_engine.set_pipeline_paths(pipeline_paths);
285 new_engine.set_allow_remote_include(allow_remote_include);
286 if let Some(resolver) = resolver {
287 new_engine.set_source_resolver(resolver);
288 }
289 let stats = new_engine.load_rules()?;
290
291 if let Some(state) = old_state
292 && !new_engine.import_state(&state)
293 {
294 tracing::warn!(
295 "Incompatible correlation snapshot version during reload, starting fresh"
296 );
297 }
298
299 self.swap_engine(new_engine);
300 Ok(stats)
301 }
302
303 pub fn rules_path(&self) -> std::path::PathBuf {
305 let snapshot = self.engine.load();
306 let engine = snapshot.lock();
307 engine.rules_path().to_path_buf()
308 }
309
310 pub fn metrics(&self) -> &dyn MetricsHook {
312 &*self.metrics
313 }
314
315 pub fn export_state(&self) -> Option<rsigma_eval::CorrelationSnapshot> {
317 let snapshot = self.engine.load();
318 let engine = snapshot.lock();
319 engine.export_state()
320 }
321
322 pub fn import_state(&self, snapshot: &rsigma_eval::CorrelationSnapshot) -> bool {
324 let guard = self.engine.load();
325 let mut engine = guard.lock();
326 engine.import_state(snapshot)
327 }
328
329 pub fn stats(&self) -> crate::engine::EngineStats {
331 let snapshot = self.engine.load();
332 let engine = snapshot.lock();
333 engine.stats()
334 }
335}
336
337fn empty_results(count: usize) -> Vec<ProcessResult> {
339 (0..count)
340 .map(|_| ProcessResult {
341 detections: vec![],
342 correlations: vec![],
343 })
344 .collect()
345}
346
347#[cfg(test)]
348mod tests {
349 use super::*;
350 use crate::metrics::NoopMetrics;
351 use rsigma_eval::CorrelationConfig;
352
353 fn identity_filter(v: &serde_json::Value) -> Vec<serde_json::Value> {
354 vec![v.clone()]
355 }
356
357 fn make_processor(rules_yaml: &str) -> LogProcessor {
358 let dir = tempfile::tempdir().unwrap();
359 let rule_path = dir.path().join("test.yml");
360 std::fs::write(&rule_path, rules_yaml).unwrap();
361
362 let mut engine = RuntimeEngine::new(rule_path, vec![], CorrelationConfig::default(), false);
363 engine.load_rules().unwrap();
364 std::mem::forget(dir);
366 LogProcessor::new(engine, Arc::new(NoopMetrics))
367 }
368
369 #[test]
370 fn process_batch_lines_valid_json() {
371 let proc = make_processor(
372 r#"
373title: Test Rule
374status: test
375logsource:
376 category: test
377detection:
378 selection:
379 EventID: 1
380 condition: selection
381"#,
382 );
383
384 let batch = vec![
385 r#"{"EventID": 1}"#.to_string(),
386 r#"{"EventID": 2}"#.to_string(),
387 ];
388 let results = proc.process_batch_lines(&batch, &identity_filter);
389 assert_eq!(results.len(), 2);
390 assert!(!results[0].detections.is_empty(), "EventID=1 should match");
391 assert!(
392 results[1].detections.is_empty(),
393 "EventID=2 should not match"
394 );
395 }
396
397 #[test]
398 fn process_batch_lines_invalid_json() {
399 let proc = make_processor(
400 r#"
401title: Test Rule
402status: test
403logsource:
404 category: test
405detection:
406 selection:
407 EventID: 1
408 condition: selection
409"#,
410 );
411
412 let batch = vec!["not json".to_string(), r#"{"EventID": 1}"#.to_string()];
413 let results = proc.process_batch_lines(&batch, &identity_filter);
414 assert_eq!(results.len(), 2);
415 assert!(
416 results[0].detections.is_empty(),
417 "invalid JSON produces empty result"
418 );
419 assert!(
420 !results[1].detections.is_empty(),
421 "valid line still matches"
422 );
423 }
424
425 #[test]
426 fn swap_engine_replaces_rules() {
427 let dir = tempfile::tempdir().unwrap();
428 let rule_path = dir.path().join("test.yml");
429 std::fs::write(
430 &rule_path,
431 r#"
432title: Rule A
433status: test
434logsource:
435 category: test
436detection:
437 selection:
438 EventID: 1
439 condition: selection
440"#,
441 )
442 .unwrap();
443
444 let mut engine = RuntimeEngine::new(
445 rule_path.clone(),
446 vec![],
447 CorrelationConfig::default(),
448 false,
449 );
450 engine.load_rules().unwrap();
451 let proc = LogProcessor::new(engine, Arc::new(NoopMetrics));
452
453 let batch = vec![r#"{"EventID": 1}"#.to_string()];
454 assert!(
455 !proc.process_batch_lines(&batch, &identity_filter)[0]
456 .detections
457 .is_empty()
458 );
459
460 std::fs::write(
462 &rule_path,
463 r#"
464title: Rule B
465status: test
466logsource:
467 category: test
468detection:
469 selection:
470 EventID: 99
471 condition: selection
472"#,
473 )
474 .unwrap();
475
476 let mut new_engine =
477 RuntimeEngine::new(rule_path, vec![], CorrelationConfig::default(), false);
478 new_engine.load_rules().unwrap();
479 proc.swap_engine(new_engine);
480
481 assert!(
482 proc.process_batch_lines(&batch, &identity_filter)[0]
483 .detections
484 .is_empty()
485 );
486
487 let batch2 = vec![r#"{"EventID": 99}"#.to_string()];
488 assert!(
489 !proc.process_batch_lines(&batch2, &identity_filter)[0]
490 .detections
491 .is_empty()
492 );
493
494 std::mem::forget(dir);
495 }
496
497 #[test]
498 fn reload_rules_preserves_engine() {
499 let dir = tempfile::tempdir().unwrap();
500 let rule_path = dir.path().join("test.yml");
501 std::fs::write(
502 &rule_path,
503 r#"
504title: Rule A
505status: test
506logsource:
507 category: test
508detection:
509 selection:
510 EventID: 1
511 condition: selection
512"#,
513 )
514 .unwrap();
515
516 let mut engine = RuntimeEngine::new(
517 rule_path.clone(),
518 vec![],
519 CorrelationConfig::default(),
520 false,
521 );
522 engine.load_rules().unwrap();
523 let proc = LogProcessor::new(engine, Arc::new(NoopMetrics));
524
525 let batch = vec![r#"{"EventID": 1}"#.to_string()];
526 assert!(
527 !proc.process_batch_lines(&batch, &identity_filter)[0]
528 .detections
529 .is_empty()
530 );
531
532 std::fs::write(
534 &rule_path,
535 r#"
536title: Rule B
537status: test
538logsource:
539 category: test
540detection:
541 selection:
542 EventID: 42
543 condition: selection
544"#,
545 )
546 .unwrap();
547
548 let stats = proc.reload_rules().unwrap();
549 assert_eq!(stats.detection_rules, 1);
550
551 assert!(
553 proc.process_batch_lines(&batch, &identity_filter)[0]
554 .detections
555 .is_empty()
556 );
557 let batch2 = vec![r#"{"EventID": 42}"#.to_string()];
559 assert!(
560 !proc.process_batch_lines(&batch2, &identity_filter)[0]
561 .detections
562 .is_empty()
563 );
564
565 std::mem::forget(dir);
566 }
567
568 #[test]
569 fn reload_re_reads_pipelines_from_disk() {
570 let dir = tempfile::tempdir().unwrap();
571
572 let rule_path = dir.path().join("test.yml");
575 std::fs::write(
576 &rule_path,
577 r#"
578title: Rule A
579status: test
580logsource:
581 category: test
582detection:
583 selection:
584 SourceIP: "10.0.0.1"
585 condition: selection
586"#,
587 )
588 .unwrap();
589
590 let pipeline_path = dir.path().join("pipeline.yml");
592 std::fs::write(
593 &pipeline_path,
594 r#"
595name: Initial Pipeline
596priority: 10
597transformations:
598 - id: rename_field
599 type: field_name_mapping
600 mapping:
601 SourceIP: src_ip
602"#,
603 )
604 .unwrap();
605
606 let pipelines = vec![rsigma_eval::parse_pipeline_file(&pipeline_path).unwrap()];
607 let mut engine = RuntimeEngine::new(
608 rule_path.clone(),
609 pipelines,
610 CorrelationConfig::default(),
611 false,
612 );
613 engine.set_pipeline_paths(vec![pipeline_path.clone()]);
614 engine.load_rules().unwrap();
615 let proc = LogProcessor::new(engine, Arc::new(NoopMetrics));
616
617 let batch = vec![r#"{"src_ip": "10.0.0.1"}"#.to_string()];
619 assert!(
620 !proc.process_batch_lines(&batch, &identity_filter)[0]
621 .detections
622 .is_empty(),
623 "src_ip should match because pipeline mapped SourceIP -> src_ip"
624 );
625
626 std::fs::write(
628 &pipeline_path,
629 r#"
630name: Updated Pipeline
631priority: 10
632transformations:
633 - id: rename_field
634 type: field_name_mapping
635 mapping:
636 SourceIP: source.ip
637"#,
638 )
639 .unwrap();
640
641 proc.reload_rules().unwrap();
642
643 assert!(
645 proc.process_batch_lines(&batch, &identity_filter)[0]
646 .detections
647 .is_empty(),
648 "after pipeline reload, src_ip should no longer match"
649 );
650
651 let batch2 = vec![r#"{"source.ip": "10.0.0.1"}"#.to_string()];
653 assert!(
654 !proc.process_batch_lines(&batch2, &identity_filter)[0]
655 .detections
656 .is_empty(),
657 "after pipeline reload, source.ip should match"
658 );
659
660 std::mem::forget(dir);
661 }
662
663 #[test]
664 fn reload_with_broken_pipeline_keeps_old_engine() {
665 let dir = tempfile::tempdir().unwrap();
666 let rule_path = dir.path().join("test.yml");
667 std::fs::write(
668 &rule_path,
669 r#"
670title: Rule A
671status: test
672logsource:
673 category: test
674detection:
675 selection:
676 SourceIP: "10.0.0.1"
677 condition: selection
678"#,
679 )
680 .unwrap();
681
682 let pipeline_path = dir.path().join("pipeline.yml");
683 std::fs::write(
684 &pipeline_path,
685 r#"
686name: Working Pipeline
687priority: 10
688transformations:
689 - id: rename_field
690 type: field_name_mapping
691 mapping:
692 SourceIP: src_ip
693"#,
694 )
695 .unwrap();
696
697 let pipelines = vec![rsigma_eval::parse_pipeline_file(&pipeline_path).unwrap()];
698 let mut engine = RuntimeEngine::new(
699 rule_path.clone(),
700 pipelines,
701 CorrelationConfig::default(),
702 false,
703 );
704 engine.set_pipeline_paths(vec![pipeline_path.clone()]);
705 engine.load_rules().unwrap();
706 let proc = LogProcessor::new(engine, Arc::new(NoopMetrics));
707
708 let batch = vec![r#"{"src_ip": "10.0.0.1"}"#.to_string()];
710 assert!(
711 !proc.process_batch_lines(&batch, &identity_filter)[0]
712 .detections
713 .is_empty()
714 );
715
716 std::fs::write(&pipeline_path, "{{{{ invalid yaml !!!!").unwrap();
718
719 let result = proc.reload_rules();
721 assert!(result.is_err(), "reload with broken pipeline should fail");
722
723 assert!(
725 !proc.process_batch_lines(&batch, &identity_filter)[0]
726 .detections
727 .is_empty(),
728 "old engine should still work after failed reload"
729 );
730
731 std::mem::forget(dir);
732 }
733
734 #[test]
735 fn custom_event_filter() {
736 let proc = make_processor(
737 r#"
738title: Test Rule
739status: test
740logsource:
741 category: test
742detection:
743 selection:
744 EventID: 1
745 condition: selection
746"#,
747 );
748
749 let filter = |v: &serde_json::Value| -> Vec<serde_json::Value> {
751 if let Some(records) = v.get("records").and_then(|r| r.as_array()) {
752 records.clone()
753 } else {
754 vec![v.clone()]
755 }
756 };
757
758 let batch = vec![r#"{"records": [{"EventID": 1}, {"EventID": 2}]}"#.to_string()];
759 let results = proc.process_batch_lines(&batch, &filter);
760 assert_eq!(results.len(), 1);
761 assert_eq!(
762 results[0].detections.len(),
763 1,
764 "only EventID=1 from records array should match"
765 );
766 }
767
768 #[test]
769 fn empty_batch_returns_empty() {
770 let proc = make_processor(
771 r#"
772title: Test Rule
773status: test
774logsource:
775 category: test
776detection:
777 selection:
778 EventID: 1
779 condition: selection
780"#,
781 );
782
783 let batch: Vec<String> = vec![];
784 let results = proc.process_batch_lines(&batch, &identity_filter);
785 assert!(results.is_empty());
786 }
787
788 #[test]
790 fn metrics_hook_invocations() {
791 use std::sync::atomic::{AtomicU64, Ordering};
792
793 struct CountingMetrics {
794 parse_errors: AtomicU64,
795 events_processed: AtomicU64,
796 detection_matches: AtomicU64,
797 }
798
799 impl MetricsHook for CountingMetrics {
800 fn on_parse_error(&self) {
801 self.parse_errors.fetch_add(1, Ordering::Relaxed);
802 }
803 fn on_events_processed(&self, count: u64) {
804 self.events_processed.fetch_add(count, Ordering::Relaxed);
805 }
806 fn on_detection_matches(&self, count: u64) {
807 self.detection_matches.fetch_add(count, Ordering::Relaxed);
808 }
809 fn on_correlation_matches(&self, _: u64) {}
810 fn observe_processing_latency(&self, _: f64) {}
811 fn on_input_queue_depth_change(&self, _: i64) {}
812 fn on_back_pressure(&self) {}
813 fn observe_batch_size(&self, _: u64) {}
814 fn on_output_queue_depth_change(&self, _: i64) {}
815 fn observe_pipeline_latency(&self, _: f64) {}
816 fn set_correlation_state_entries(&self, _: u64) {}
817 }
818
819 let dir = tempfile::tempdir().unwrap();
820 let rule_path = dir.path().join("test.yml");
821 std::fs::write(
822 &rule_path,
823 r#"
824title: Test Rule
825status: test
826logsource:
827 category: test
828detection:
829 selection:
830 EventID: 1
831 condition: selection
832"#,
833 )
834 .unwrap();
835
836 let mut engine = RuntimeEngine::new(rule_path, vec![], CorrelationConfig::default(), false);
837 engine.load_rules().unwrap();
838
839 let metrics = Arc::new(CountingMetrics {
840 parse_errors: AtomicU64::new(0),
841 events_processed: AtomicU64::new(0),
842 detection_matches: AtomicU64::new(0),
843 });
844 let proc = LogProcessor::new(engine, metrics.clone());
845
846 let batch = vec![
847 "not json".to_string(),
848 r#"{"EventID": 1}"#.to_string(),
849 r#"{"EventID": 2}"#.to_string(),
850 ];
851 proc.process_batch_lines(&batch, &identity_filter);
852
853 assert_eq!(metrics.parse_errors.load(Ordering::Relaxed), 1);
854 assert_eq!(metrics.events_processed.load(Ordering::Relaxed), 2);
855 assert_eq!(metrics.detection_matches.load(Ordering::Relaxed), 1);
856
857 std::mem::forget(dir);
858 }
859
860 #[test]
862 fn concurrent_swap_and_process() {
863 let dir = tempfile::tempdir().unwrap();
864 let rule_path = dir.path().join("test.yml");
865 std::fs::write(
866 &rule_path,
867 r#"
868title: Rule A
869status: test
870logsource:
871 category: test
872detection:
873 selection:
874 EventID: 1
875 condition: selection
876"#,
877 )
878 .unwrap();
879
880 let mut engine = RuntimeEngine::new(
881 rule_path.clone(),
882 vec![],
883 CorrelationConfig::default(),
884 false,
885 );
886 engine.load_rules().unwrap();
887 let proc = Arc::new(LogProcessor::new(engine, Arc::new(NoopMetrics)));
888
889 let handles: Vec<_> = (0..4)
890 .map(|i| {
891 let proc = proc.clone();
892 let rule_path = rule_path.clone();
893 std::thread::spawn(move || {
894 let batch = vec![r#"{"EventID": 1}"#.to_string()];
895 for _ in 0..100 {
896 let _ = proc.process_batch_lines(&batch, &identity_filter);
897 }
898 if i == 0 {
900 let mut new_engine = RuntimeEngine::new(
901 rule_path,
902 vec![],
903 CorrelationConfig::default(),
904 false,
905 );
906 new_engine.load_rules().unwrap();
907 proc.swap_engine(new_engine);
908 }
909 })
910 })
911 .collect();
912
913 for h in handles {
914 h.join().unwrap();
915 }
916
917 std::mem::forget(dir);
918 }
919
920 #[test]
923 fn format_json_matches() {
924 let proc = make_processor(
925 r#"
926title: Test Rule
927status: test
928logsource:
929 category: test
930detection:
931 selection:
932 EventID: 1
933 condition: selection
934"#,
935 );
936
937 let batch = vec![r#"{"EventID": 1}"#.to_string()];
938 let results = proc.process_batch_with_format(&batch, &InputFormat::Json, None);
939 assert_eq!(results.len(), 1);
940 assert!(
941 !results[0].detections.is_empty(),
942 "JSON EventID=1 should match"
943 );
944 }
945
946 #[test]
947 fn format_syslog_extracts_fields() {
948 let proc = make_processor(
949 r#"
950title: Syslog Test
951status: test
952logsource:
953 category: test
954detection:
955 selection:
956 hostname: mymachine
957 condition: selection
958"#,
959 );
960
961 let batch = vec!["<34>Oct 11 22:14:15 mymachine su: test message".to_string()];
962 let results = proc.process_batch_with_format(
963 &batch,
964 &InputFormat::Syslog(crate::input::SyslogConfig::default()),
965 None,
966 );
967 assert_eq!(results.len(), 1);
968 assert!(
969 !results[0].detections.is_empty(),
970 "syslog hostname=mymachine should match"
971 );
972 }
973
974 #[test]
975 fn format_plain_keyword_match() {
976 let proc = make_processor(
977 r#"
978title: Keyword Test
979status: test
980logsource:
981 category: test
982detection:
983 keywords:
984 - "disk full"
985 condition: keywords
986"#,
987 );
988
989 let batch = vec!["ERROR: disk full on /dev/sda1".to_string()];
990 let results = proc.process_batch_with_format(&batch, &InputFormat::Plain, None);
991 assert_eq!(results.len(), 1);
992 assert!(
993 !results[0].detections.is_empty(),
994 "plain keyword 'disk full' should match"
995 );
996 }
997
998 #[test]
999 fn format_auto_detects_json() {
1000 let proc = make_processor(
1001 r#"
1002title: Test Rule
1003status: test
1004logsource:
1005 category: test
1006detection:
1007 selection:
1008 EventID: 1
1009 condition: selection
1010"#,
1011 );
1012
1013 let batch = vec![r#"{"EventID": 1}"#.to_string()];
1014 let results = proc.process_batch_with_format(&batch, &InputFormat::default(), None);
1015 assert_eq!(results.len(), 1);
1016 assert!(!results[0].detections.is_empty());
1017 }
1018
1019 #[test]
1020 fn format_json_with_event_filter() {
1021 let proc = make_processor(
1022 r#"
1023title: Test Rule
1024status: test
1025logsource:
1026 category: test
1027detection:
1028 selection:
1029 EventID: 1
1030 condition: selection
1031"#,
1032 );
1033
1034 let filter = |v: &serde_json::Value| -> Vec<serde_json::Value> {
1035 if let Some(records) = v.get("records").and_then(|r| r.as_array()) {
1036 records.clone()
1037 } else {
1038 vec![v.clone()]
1039 }
1040 };
1041
1042 let batch = vec![r#"{"records": [{"EventID": 1}, {"EventID": 2}]}"#.to_string()];
1043 let results = proc.process_batch_with_format(&batch, &InputFormat::Json, Some(&filter));
1044 assert_eq!(results.len(), 1);
1045 assert_eq!(
1046 results[0].detections.len(),
1047 1,
1048 "only EventID=1 from records array should match"
1049 );
1050 }
1051
1052 #[test]
1053 fn format_empty_lines_skipped() {
1054 let proc = make_processor(
1055 r#"
1056title: Test Rule
1057status: test
1058logsource:
1059 category: test
1060detection:
1061 selection:
1062 EventID: 1
1063 condition: selection
1064"#,
1065 );
1066
1067 let batch = vec![
1068 "".to_string(),
1069 " ".to_string(),
1070 r#"{"EventID": 1}"#.to_string(),
1071 ];
1072 let results = proc.process_batch_with_format(&batch, &InputFormat::Json, None);
1073 assert_eq!(results.len(), 3);
1074 assert!(results[0].detections.is_empty());
1075 assert!(results[1].detections.is_empty());
1076 assert!(!results[2].detections.is_empty());
1077 }
1078
1079 #[cfg(feature = "logfmt")]
1080 #[test]
1081 fn format_logfmt_matches() {
1082 let proc = make_processor(
1083 r#"
1084title: Logfmt Test
1085status: test
1086logsource:
1087 category: test
1088detection:
1089 selection:
1090 level: error
1091 condition: selection
1092"#,
1093 );
1094
1095 let batch = vec!["level=error msg=something host=web01".to_string()];
1096 let results = proc.process_batch_with_format(&batch, &InputFormat::Logfmt, None);
1097 assert_eq!(results.len(), 1);
1098 assert!(
1099 !results[0].detections.is_empty(),
1100 "logfmt level=error should match"
1101 );
1102 }
1103
1104 #[cfg(feature = "cef")]
1105 #[test]
1106 fn format_cef_matches() {
1107 let proc = make_processor(
1108 r#"
1109title: CEF Test
1110status: test
1111logsource:
1112 category: test
1113detection:
1114 selection:
1115 deviceVendor: Security
1116 condition: selection
1117"#,
1118 );
1119
1120 let batch = vec!["CEF:0|Security|IDS|1.0|100|Attack|9|src=10.0.0.1".to_string()];
1121 let results = proc.process_batch_with_format(&batch, &InputFormat::Cef, None);
1122 assert_eq!(results.len(), 1);
1123 assert!(
1124 !results[0].detections.is_empty(),
1125 "CEF deviceVendor=Security should match"
1126 );
1127 }
1128}