1use std::sync::Arc;
2
3use parking_lot::Mutex;
4use std::time::Instant;
5
6use arc_swap::ArcSwap;
7use rsigma_eval::{Event, FieldObserver, JsonEvent, ProcessResult, ProcessResultExt, RuleFieldSet};
8
9use crate::engine::RuntimeEngine;
10use crate::input::{EventInputDecoded, InputFormat, parse_line};
11use crate::metrics::MetricsHook;
12
13pub type EventFilter = dyn Fn(&serde_json::Value) -> Vec<serde_json::Value>;
18
19pub struct LogProcessor {
27 engine: Arc<ArcSwap<Mutex<RuntimeEngine>>>,
28 metrics: Arc<dyn MetricsHook>,
29 field_observer: ArcSwap<Option<Arc<FieldObserver>>>,
34}
35
36impl LogProcessor {
37 pub fn new(engine: RuntimeEngine, metrics: Arc<dyn MetricsHook>) -> Self {
39 LogProcessor {
40 engine: Arc::new(ArcSwap::from_pointee(Mutex::new(engine))),
41 metrics,
42 field_observer: ArcSwap::new(Arc::new(None)),
43 }
44 }
45
46 pub fn set_field_observer(&self, observer: Option<Arc<FieldObserver>>) {
54 self.field_observer.store(Arc::new(observer));
55 }
56
57 pub fn field_observer(&self) -> Option<Arc<FieldObserver>> {
59 self.field_observer.load_full().as_ref().clone()
60 }
61
62 pub fn swap_engine(&self, new_engine: RuntimeEngine) {
68 self.engine.store(Arc::new(Mutex::new(new_engine)));
69 }
70
71 pub fn engine_snapshot(&self) -> arc_swap::Guard<Arc<Mutex<RuntimeEngine>>> {
76 self.engine.load()
77 }
78
79 pub fn process_batch_lines(
89 &self,
90 batch: &[String],
91 event_filter: &EventFilter,
92 ) -> Vec<ProcessResult> {
93 let engine_guard = self.engine.load();
94 let mut engine = engine_guard.lock();
95
96 let mut parsed: Vec<(usize, Vec<serde_json::Value>)> = Vec::with_capacity(batch.len());
98 for (line_idx, line) in batch.iter().enumerate() {
99 match serde_json::from_str::<serde_json::Value>(line) {
100 Ok(value) => {
101 let payloads = event_filter(&value);
102 if !payloads.is_empty() {
103 parsed.push((line_idx, payloads));
104 }
105 }
106 Err(e) => {
107 self.metrics.on_parse_error();
108 tracing::debug!(error = %e, "Invalid JSON on input");
109 }
110 }
111 }
112
113 let mut flat: Vec<(usize, &serde_json::Value)> = Vec::new();
115 for (line_idx, payloads) in &parsed {
116 for payload in payloads {
117 flat.push((*line_idx, payload));
118 }
119 }
120
121 if flat.is_empty() {
122 return empty_results(batch.len());
123 }
124
125 let events: Vec<JsonEvent> = flat.iter().map(|(_, v)| JsonEvent::borrow(v)).collect();
127 let event_refs: Vec<&JsonEvent> = events.iter().collect();
128
129 let start = Instant::now();
130 let batch_results = engine.process_batch(&event_refs);
131 let elapsed = start.elapsed().as_secs_f64();
132 let per_event_latency = elapsed / event_refs.len() as f64;
133
134 let stats = engine.stats();
136 self.metrics
137 .set_correlation_state_entries(stats.state_entries as u64);
138
139 let mut line_results = empty_results(batch.len());
141
142 for ((line_idx, _), result) in flat.iter().zip(batch_results) {
143 self.metrics.on_events_processed(1);
144 self.metrics.observe_processing_latency(per_event_latency);
145 self.metrics
146 .on_detection_matches(result.detection_count() as u64);
147 self.metrics
148 .on_correlation_matches(result.correlation_count() as u64);
149
150 for r in &result {
151 let level_str = r.header.level.as_ref().map_or("unknown", |l| l.as_str());
152 let title = &r.header.rule_title;
153 match &r.body {
154 rsigma_eval::ResultBody::Detection(_) => {
155 self.metrics.on_detection_match_detail(title, level_str);
156 }
157 rsigma_eval::ResultBody::Correlation(body) => {
158 self.metrics.on_correlation_match_detail(
159 title,
160 level_str,
161 body.correlation_type.as_str(),
162 );
163 }
164 }
165 }
166
167 line_results[*line_idx].extend(result);
168 }
169
170 line_results
171 }
172
173 pub fn process_batch_with_format(
183 &self,
184 batch: &[String],
185 format: &InputFormat,
186 event_filter: Option<&EventFilter>,
187 ) -> Vec<ProcessResult> {
188 let engine_guard = self.engine.load();
189 let mut engine = engine_guard.lock();
190
191 let mut decoded_events: Vec<(usize, EventInputDecoded)> = Vec::with_capacity(batch.len());
194
195 for (line_idx, line) in batch.iter().enumerate() {
196 let Some(decoded) = parse_line(line, format) else {
197 if !line.trim().is_empty() {
198 self.metrics.on_parse_error();
199 tracing::debug!("Failed to parse input line");
200 }
201 continue;
202 };
203
204 if let Some(filter) = event_filter
207 && let EventInputDecoded::Json(ref json_event) = decoded
208 {
209 let json_value = json_event.to_json();
210 let payloads = filter(&json_value);
211 for payload in payloads {
212 decoded_events
213 .push((line_idx, EventInputDecoded::Json(JsonEvent::owned(payload))));
214 }
215 continue;
216 }
217
218 decoded_events.push((line_idx, decoded));
219 }
220
221 if decoded_events.is_empty() {
222 return empty_results(batch.len());
223 }
224
225 let observer_guard = self.field_observer.load();
232 if let Some(observer) = observer_guard.as_ref() {
233 for (_, decoded) in &decoded_events {
234 observer.observe(decoded);
235 }
236 }
237 drop(observer_guard);
238
239 let event_refs: Vec<&EventInputDecoded> = decoded_events.iter().map(|(_, e)| e).collect();
241
242 let start = Instant::now();
243 let batch_results = engine.process_batch(&event_refs);
244 let elapsed = start.elapsed().as_secs_f64();
245 let per_event_latency = elapsed / event_refs.len() as f64;
246
247 let stats = engine.stats();
248 self.metrics
249 .set_correlation_state_entries(stats.state_entries as u64);
250
251 let mut line_results = empty_results(batch.len());
253
254 for ((line_idx, _), result) in decoded_events.iter().zip(batch_results) {
255 self.metrics.on_events_processed(1);
256 self.metrics.observe_processing_latency(per_event_latency);
257 self.metrics
258 .on_detection_matches(result.detection_count() as u64);
259 self.metrics
260 .on_correlation_matches(result.correlation_count() as u64);
261
262 for r in &result {
263 let level_str = r.header.level.as_ref().map_or("unknown", |l| l.as_str());
264 let title = &r.header.rule_title;
265 match &r.body {
266 rsigma_eval::ResultBody::Detection(_) => {
267 self.metrics.on_detection_match_detail(title, level_str);
268 }
269 rsigma_eval::ResultBody::Correlation(body) => {
270 self.metrics.on_correlation_match_detail(
271 title,
272 level_str,
273 body.correlation_type.as_str(),
274 );
275 }
276 }
277 }
278
279 line_results[*line_idx].extend(result);
280 }
281
282 line_results
283 }
284
285 pub fn reload_rules(&self) -> Result<crate::engine::EngineStats, String> {
295 let (
296 old_state,
297 rules_path,
298 pipelines,
299 pipeline_paths,
300 corr_config,
301 include_event,
302 resolver,
303 allow_remote_include,
304 ) = {
305 let snapshot = self.engine.load();
306 let old = snapshot.lock();
307 (
308 old.export_state(),
309 old.rules_path().to_path_buf(),
310 old.pipelines().to_vec(),
311 old.pipeline_paths().to_vec(),
312 old.corr_config().clone(),
313 old.include_event(),
314 old.source_resolver().cloned(),
315 old.allow_remote_include(),
316 )
317 };
318
319 let mut new_engine = RuntimeEngine::new(rules_path, pipelines, corr_config, include_event);
320 new_engine.set_pipeline_paths(pipeline_paths);
321 new_engine.set_allow_remote_include(allow_remote_include);
322 if let Some(resolver) = resolver {
323 new_engine.set_source_resolver(resolver);
324 }
325 let stats = new_engine.load_rules()?;
326
327 if let Some(state) = old_state
328 && !new_engine.import_state(&state)
329 {
330 tracing::warn!(
331 "Incompatible correlation snapshot version during reload, starting fresh"
332 );
333 }
334
335 self.swap_engine(new_engine);
336 Ok(stats)
337 }
338
339 pub fn rules_path(&self) -> std::path::PathBuf {
341 let snapshot = self.engine.load();
342 let engine = snapshot.lock();
343 engine.rules_path().to_path_buf()
344 }
345
346 pub fn metrics(&self) -> &dyn MetricsHook {
348 &*self.metrics
349 }
350
351 pub fn export_state(&self) -> Option<rsigma_eval::CorrelationSnapshot> {
353 let snapshot = self.engine.load();
354 let engine = snapshot.lock();
355 engine.export_state()
356 }
357
358 pub fn import_state(&self, snapshot: &rsigma_eval::CorrelationSnapshot) -> bool {
360 let guard = self.engine.load();
361 let mut engine = guard.lock();
362 engine.import_state(snapshot)
363 }
364
365 pub fn stats(&self) -> crate::engine::EngineStats {
367 let snapshot = self.engine.load();
368 let engine = snapshot.lock();
369 engine.stats()
370 }
371
372 pub fn rule_field_set(&self) -> Arc<RuleFieldSet> {
376 let snapshot = self.engine.load();
377 let engine = snapshot.lock();
378 engine.rule_field_set()
379 }
380}
381
382fn empty_results(count: usize) -> Vec<ProcessResult> {
384 (0..count).map(|_| ProcessResult::new()).collect()
385}
386
387#[cfg(test)]
388mod tests {
389 use super::*;
390 use crate::metrics::NoopMetrics;
391 use rsigma_eval::CorrelationConfig;
392
393 fn identity_filter(v: &serde_json::Value) -> Vec<serde_json::Value> {
394 vec![v.clone()]
395 }
396
397 fn make_processor(rules_yaml: &str) -> LogProcessor {
398 let dir = tempfile::tempdir().unwrap();
399 let rule_path = dir.path().join("test.yml");
400 std::fs::write(&rule_path, rules_yaml).unwrap();
401
402 let mut engine = RuntimeEngine::new(rule_path, vec![], CorrelationConfig::default(), false);
403 engine.load_rules().unwrap();
404 std::mem::forget(dir);
406 LogProcessor::new(engine, Arc::new(NoopMetrics))
407 }
408
409 #[test]
410 fn process_batch_lines_valid_json() {
411 let proc = make_processor(
412 r#"
413title: Test Rule
414status: test
415logsource:
416 category: test
417detection:
418 selection:
419 EventID: 1
420 condition: selection
421"#,
422 );
423
424 let batch = vec![
425 r#"{"EventID": 1}"#.to_string(),
426 r#"{"EventID": 2}"#.to_string(),
427 ];
428 let results = proc.process_batch_lines(&batch, &identity_filter);
429 assert_eq!(results.len(), 2);
430 assert!(results[0].detection_count() > 0, "EventID=1 should match");
431 assert!(
432 results[1].detection_count() == 0,
433 "EventID=2 should not match"
434 );
435 }
436
437 #[test]
438 fn process_batch_lines_invalid_json() {
439 let proc = make_processor(
440 r#"
441title: Test Rule
442status: test
443logsource:
444 category: test
445detection:
446 selection:
447 EventID: 1
448 condition: selection
449"#,
450 );
451
452 let batch = vec!["not json".to_string(), r#"{"EventID": 1}"#.to_string()];
453 let results = proc.process_batch_lines(&batch, &identity_filter);
454 assert_eq!(results.len(), 2);
455 assert!(
456 results[0].detection_count() == 0,
457 "invalid JSON produces empty result"
458 );
459 assert!(results[1].detection_count() > 0, "valid line still matches");
460 }
461
462 #[test]
463 fn swap_engine_replaces_rules() {
464 let dir = tempfile::tempdir().unwrap();
465 let rule_path = dir.path().join("test.yml");
466 std::fs::write(
467 &rule_path,
468 r#"
469title: Rule A
470status: test
471logsource:
472 category: test
473detection:
474 selection:
475 EventID: 1
476 condition: selection
477"#,
478 )
479 .unwrap();
480
481 let mut engine = RuntimeEngine::new(
482 rule_path.clone(),
483 vec![],
484 CorrelationConfig::default(),
485 false,
486 );
487 engine.load_rules().unwrap();
488 let proc = LogProcessor::new(engine, Arc::new(NoopMetrics));
489
490 let batch = vec![r#"{"EventID": 1}"#.to_string()];
491 assert!(proc.process_batch_lines(&batch, &identity_filter)[0].detection_count() > 0);
492
493 std::fs::write(
495 &rule_path,
496 r#"
497title: Rule B
498status: test
499logsource:
500 category: test
501detection:
502 selection:
503 EventID: 99
504 condition: selection
505"#,
506 )
507 .unwrap();
508
509 let mut new_engine =
510 RuntimeEngine::new(rule_path, vec![], CorrelationConfig::default(), false);
511 new_engine.load_rules().unwrap();
512 proc.swap_engine(new_engine);
513
514 assert!(proc.process_batch_lines(&batch, &identity_filter)[0].detection_count() == 0);
515
516 let batch2 = vec![r#"{"EventID": 99}"#.to_string()];
517 assert!(proc.process_batch_lines(&batch2, &identity_filter)[0].detection_count() > 0);
518
519 std::mem::forget(dir);
520 }
521
522 #[test]
523 fn reload_rules_preserves_engine() {
524 let dir = tempfile::tempdir().unwrap();
525 let rule_path = dir.path().join("test.yml");
526 std::fs::write(
527 &rule_path,
528 r#"
529title: Rule A
530status: test
531logsource:
532 category: test
533detection:
534 selection:
535 EventID: 1
536 condition: selection
537"#,
538 )
539 .unwrap();
540
541 let mut engine = RuntimeEngine::new(
542 rule_path.clone(),
543 vec![],
544 CorrelationConfig::default(),
545 false,
546 );
547 engine.load_rules().unwrap();
548 let proc = LogProcessor::new(engine, Arc::new(NoopMetrics));
549
550 let batch = vec![r#"{"EventID": 1}"#.to_string()];
551 assert!(proc.process_batch_lines(&batch, &identity_filter)[0].detection_count() > 0);
552
553 std::fs::write(
555 &rule_path,
556 r#"
557title: Rule B
558status: test
559logsource:
560 category: test
561detection:
562 selection:
563 EventID: 42
564 condition: selection
565"#,
566 )
567 .unwrap();
568
569 let stats = proc.reload_rules().unwrap();
570 assert_eq!(stats.detection_rules, 1);
571
572 assert!(proc.process_batch_lines(&batch, &identity_filter)[0].detection_count() == 0);
574 let batch2 = vec![r#"{"EventID": 42}"#.to_string()];
576 assert!(proc.process_batch_lines(&batch2, &identity_filter)[0].detection_count() > 0);
577
578 std::mem::forget(dir);
579 }
580
581 #[test]
582 fn reload_re_reads_pipelines_from_disk() {
583 let dir = tempfile::tempdir().unwrap();
584
585 let rule_path = dir.path().join("test.yml");
588 std::fs::write(
589 &rule_path,
590 r#"
591title: Rule A
592status: test
593logsource:
594 category: test
595detection:
596 selection:
597 SourceIP: "10.0.0.1"
598 condition: selection
599"#,
600 )
601 .unwrap();
602
603 let pipeline_path = dir.path().join("pipeline.yml");
605 std::fs::write(
606 &pipeline_path,
607 r#"
608name: Initial Pipeline
609priority: 10
610transformations:
611 - id: rename_field
612 type: field_name_mapping
613 mapping:
614 SourceIP: src_ip
615"#,
616 )
617 .unwrap();
618
619 let pipelines = vec![rsigma_eval::parse_pipeline_file(&pipeline_path).unwrap()];
620 let mut engine = RuntimeEngine::new(
621 rule_path.clone(),
622 pipelines,
623 CorrelationConfig::default(),
624 false,
625 );
626 engine.set_pipeline_paths(vec![pipeline_path.clone()]);
627 engine.load_rules().unwrap();
628 let proc = LogProcessor::new(engine, Arc::new(NoopMetrics));
629
630 let batch = vec![r#"{"src_ip": "10.0.0.1"}"#.to_string()];
632 assert!(
633 proc.process_batch_lines(&batch, &identity_filter)[0].detection_count() > 0,
634 "src_ip should match because pipeline mapped SourceIP -> src_ip"
635 );
636
637 std::fs::write(
639 &pipeline_path,
640 r#"
641name: Updated Pipeline
642priority: 10
643transformations:
644 - id: rename_field
645 type: field_name_mapping
646 mapping:
647 SourceIP: source.ip
648"#,
649 )
650 .unwrap();
651
652 proc.reload_rules().unwrap();
653
654 assert!(
656 proc.process_batch_lines(&batch, &identity_filter)[0].detection_count() == 0,
657 "after pipeline reload, src_ip should no longer match"
658 );
659
660 let batch2 = vec![r#"{"source.ip": "10.0.0.1"}"#.to_string()];
662 assert!(
663 proc.process_batch_lines(&batch2, &identity_filter)[0].detection_count() > 0,
664 "after pipeline reload, source.ip should match"
665 );
666
667 std::mem::forget(dir);
668 }
669
670 #[test]
671 fn reload_with_broken_pipeline_keeps_old_engine() {
672 let dir = tempfile::tempdir().unwrap();
673 let rule_path = dir.path().join("test.yml");
674 std::fs::write(
675 &rule_path,
676 r#"
677title: Rule A
678status: test
679logsource:
680 category: test
681detection:
682 selection:
683 SourceIP: "10.0.0.1"
684 condition: selection
685"#,
686 )
687 .unwrap();
688
689 let pipeline_path = dir.path().join("pipeline.yml");
690 std::fs::write(
691 &pipeline_path,
692 r#"
693name: Working Pipeline
694priority: 10
695transformations:
696 - id: rename_field
697 type: field_name_mapping
698 mapping:
699 SourceIP: src_ip
700"#,
701 )
702 .unwrap();
703
704 let pipelines = vec![rsigma_eval::parse_pipeline_file(&pipeline_path).unwrap()];
705 let mut engine = RuntimeEngine::new(
706 rule_path.clone(),
707 pipelines,
708 CorrelationConfig::default(),
709 false,
710 );
711 engine.set_pipeline_paths(vec![pipeline_path.clone()]);
712 engine.load_rules().unwrap();
713 let proc = LogProcessor::new(engine, Arc::new(NoopMetrics));
714
715 let batch = vec![r#"{"src_ip": "10.0.0.1"}"#.to_string()];
717 assert!(proc.process_batch_lines(&batch, &identity_filter)[0].detection_count() > 0);
718
719 std::fs::write(&pipeline_path, "{{{{ invalid yaml !!!!").unwrap();
721
722 let result = proc.reload_rules();
724 assert!(result.is_err(), "reload with broken pipeline should fail");
725
726 assert!(
728 proc.process_batch_lines(&batch, &identity_filter)[0].detection_count() > 0,
729 "old engine should still work after failed reload"
730 );
731
732 std::mem::forget(dir);
733 }
734
735 #[test]
736 fn custom_event_filter() {
737 let proc = make_processor(
738 r#"
739title: Test Rule
740status: test
741logsource:
742 category: test
743detection:
744 selection:
745 EventID: 1
746 condition: selection
747"#,
748 );
749
750 let filter = |v: &serde_json::Value| -> Vec<serde_json::Value> {
752 if let Some(records) = v.get("records").and_then(|r| r.as_array()) {
753 records.clone()
754 } else {
755 vec![v.clone()]
756 }
757 };
758
759 let batch = vec![r#"{"records": [{"EventID": 1}, {"EventID": 2}]}"#.to_string()];
760 let results = proc.process_batch_lines(&batch, &filter);
761 assert_eq!(results.len(), 1);
762 assert_eq!(
763 results[0].detection_count(),
764 1,
765 "only EventID=1 from records array should match"
766 );
767 }
768
769 #[test]
770 fn empty_batch_returns_empty() {
771 let proc = make_processor(
772 r#"
773title: Test Rule
774status: test
775logsource:
776 category: test
777detection:
778 selection:
779 EventID: 1
780 condition: selection
781"#,
782 );
783
784 let batch: Vec<String> = vec![];
785 let results = proc.process_batch_lines(&batch, &identity_filter);
786 assert!(results.is_empty());
787 }
788
789 #[test]
791 fn metrics_hook_invocations() {
792 use std::sync::atomic::{AtomicU64, Ordering};
793
794 struct CountingMetrics {
795 parse_errors: AtomicU64,
796 events_processed: AtomicU64,
797 detection_matches: AtomicU64,
798 }
799
800 impl MetricsHook for CountingMetrics {
801 fn on_parse_error(&self) {
802 self.parse_errors.fetch_add(1, Ordering::Relaxed);
803 }
804 fn on_events_processed(&self, count: u64) {
805 self.events_processed.fetch_add(count, Ordering::Relaxed);
806 }
807 fn on_detection_matches(&self, count: u64) {
808 self.detection_matches.fetch_add(count, Ordering::Relaxed);
809 }
810 fn on_correlation_matches(&self, _: u64) {}
811 fn observe_processing_latency(&self, _: f64) {}
812 fn on_input_queue_depth_change(&self, _: i64) {}
813 fn on_back_pressure(&self) {}
814 fn observe_batch_size(&self, _: u64) {}
815 fn on_output_queue_depth_change(&self, _: i64) {}
816 fn observe_pipeline_latency(&self, _: f64) {}
817 fn set_correlation_state_entries(&self, _: u64) {}
818 }
819
820 let dir = tempfile::tempdir().unwrap();
821 let rule_path = dir.path().join("test.yml");
822 std::fs::write(
823 &rule_path,
824 r#"
825title: Test Rule
826status: test
827logsource:
828 category: test
829detection:
830 selection:
831 EventID: 1
832 condition: selection
833"#,
834 )
835 .unwrap();
836
837 let mut engine = RuntimeEngine::new(rule_path, vec![], CorrelationConfig::default(), false);
838 engine.load_rules().unwrap();
839
840 let metrics = Arc::new(CountingMetrics {
841 parse_errors: AtomicU64::new(0),
842 events_processed: AtomicU64::new(0),
843 detection_matches: AtomicU64::new(0),
844 });
845 let proc = LogProcessor::new(engine, metrics.clone());
846
847 let batch = vec![
848 "not json".to_string(),
849 r#"{"EventID": 1}"#.to_string(),
850 r#"{"EventID": 2}"#.to_string(),
851 ];
852 proc.process_batch_lines(&batch, &identity_filter);
853
854 assert_eq!(metrics.parse_errors.load(Ordering::Relaxed), 1);
855 assert_eq!(metrics.events_processed.load(Ordering::Relaxed), 2);
856 assert_eq!(metrics.detection_matches.load(Ordering::Relaxed), 1);
857
858 std::mem::forget(dir);
859 }
860
861 #[test]
863 fn concurrent_swap_and_process() {
864 let dir = tempfile::tempdir().unwrap();
865 let rule_path = dir.path().join("test.yml");
866 std::fs::write(
867 &rule_path,
868 r#"
869title: Rule A
870status: test
871logsource:
872 category: test
873detection:
874 selection:
875 EventID: 1
876 condition: selection
877"#,
878 )
879 .unwrap();
880
881 let mut engine = RuntimeEngine::new(
882 rule_path.clone(),
883 vec![],
884 CorrelationConfig::default(),
885 false,
886 );
887 engine.load_rules().unwrap();
888 let proc = Arc::new(LogProcessor::new(engine, Arc::new(NoopMetrics)));
889
890 let handles: Vec<_> = (0..4)
891 .map(|i| {
892 let proc = proc.clone();
893 let rule_path = rule_path.clone();
894 std::thread::spawn(move || {
895 let batch = vec![r#"{"EventID": 1}"#.to_string()];
896 for _ in 0..100 {
897 let _ = proc.process_batch_lines(&batch, &identity_filter);
898 }
899 if i == 0 {
901 let mut new_engine = RuntimeEngine::new(
902 rule_path,
903 vec![],
904 CorrelationConfig::default(),
905 false,
906 );
907 new_engine.load_rules().unwrap();
908 proc.swap_engine(new_engine);
909 }
910 })
911 })
912 .collect();
913
914 for h in handles {
915 h.join().unwrap();
916 }
917
918 std::mem::forget(dir);
919 }
920
921 #[test]
924 fn format_json_matches() {
925 let proc = make_processor(
926 r#"
927title: Test Rule
928status: test
929logsource:
930 category: test
931detection:
932 selection:
933 EventID: 1
934 condition: selection
935"#,
936 );
937
938 let batch = vec![r#"{"EventID": 1}"#.to_string()];
939 let results = proc.process_batch_with_format(&batch, &InputFormat::Json, None);
940 assert_eq!(results.len(), 1);
941 assert!(
942 results[0].detection_count() > 0,
943 "JSON EventID=1 should match"
944 );
945 }
946
947 #[test]
948 fn format_syslog_extracts_fields() {
949 let proc = make_processor(
950 r#"
951title: Syslog Test
952status: test
953logsource:
954 category: test
955detection:
956 selection:
957 hostname: mymachine
958 condition: selection
959"#,
960 );
961
962 let batch = vec!["<34>Oct 11 22:14:15 mymachine su: test message".to_string()];
963 let results = proc.process_batch_with_format(
964 &batch,
965 &InputFormat::Syslog(crate::input::SyslogConfig::default()),
966 None,
967 );
968 assert_eq!(results.len(), 1);
969 assert!(
970 results[0].detection_count() > 0,
971 "syslog hostname=mymachine should match"
972 );
973 }
974
975 #[test]
976 fn format_plain_keyword_match() {
977 let proc = make_processor(
978 r#"
979title: Keyword Test
980status: test
981logsource:
982 category: test
983detection:
984 keywords:
985 - "disk full"
986 condition: keywords
987"#,
988 );
989
990 let batch = vec!["ERROR: disk full on /dev/sda1".to_string()];
991 let results = proc.process_batch_with_format(&batch, &InputFormat::Plain, None);
992 assert_eq!(results.len(), 1);
993 assert!(
994 results[0].detection_count() > 0,
995 "plain keyword 'disk full' should match"
996 );
997 }
998
999 #[test]
1000 fn format_auto_detects_json() {
1001 let proc = make_processor(
1002 r#"
1003title: Test Rule
1004status: test
1005logsource:
1006 category: test
1007detection:
1008 selection:
1009 EventID: 1
1010 condition: selection
1011"#,
1012 );
1013
1014 let batch = vec![r#"{"EventID": 1}"#.to_string()];
1015 let results = proc.process_batch_with_format(&batch, &InputFormat::default(), None);
1016 assert_eq!(results.len(), 1);
1017 assert!(results[0].detection_count() > 0);
1018 }
1019
1020 #[test]
1021 fn format_json_with_event_filter() {
1022 let proc = make_processor(
1023 r#"
1024title: Test Rule
1025status: test
1026logsource:
1027 category: test
1028detection:
1029 selection:
1030 EventID: 1
1031 condition: selection
1032"#,
1033 );
1034
1035 let filter = |v: &serde_json::Value| -> Vec<serde_json::Value> {
1036 if let Some(records) = v.get("records").and_then(|r| r.as_array()) {
1037 records.clone()
1038 } else {
1039 vec![v.clone()]
1040 }
1041 };
1042
1043 let batch = vec![r#"{"records": [{"EventID": 1}, {"EventID": 2}]}"#.to_string()];
1044 let results = proc.process_batch_with_format(&batch, &InputFormat::Json, Some(&filter));
1045 assert_eq!(results.len(), 1);
1046 assert_eq!(
1047 results[0].detection_count(),
1048 1,
1049 "only EventID=1 from records array should match"
1050 );
1051 }
1052
1053 #[test]
1054 fn format_empty_lines_skipped() {
1055 let proc = make_processor(
1056 r#"
1057title: Test Rule
1058status: test
1059logsource:
1060 category: test
1061detection:
1062 selection:
1063 EventID: 1
1064 condition: selection
1065"#,
1066 );
1067
1068 let batch = vec![
1069 "".to_string(),
1070 " ".to_string(),
1071 r#"{"EventID": 1}"#.to_string(),
1072 ];
1073 let results = proc.process_batch_with_format(&batch, &InputFormat::Json, None);
1074 assert_eq!(results.len(), 3);
1075 assert!(results[0].detection_count() == 0);
1076 assert!(results[1].detection_count() == 0);
1077 assert!(results[2].detection_count() > 0);
1078 }
1079
1080 #[cfg(feature = "logfmt")]
1081 #[test]
1082 fn format_logfmt_matches() {
1083 let proc = make_processor(
1084 r#"
1085title: Logfmt Test
1086status: test
1087logsource:
1088 category: test
1089detection:
1090 selection:
1091 level: error
1092 condition: selection
1093"#,
1094 );
1095
1096 let batch = vec!["level=error msg=something host=web01".to_string()];
1097 let results = proc.process_batch_with_format(&batch, &InputFormat::Logfmt, None);
1098 assert_eq!(results.len(), 1);
1099 assert!(
1100 results[0].detection_count() > 0,
1101 "logfmt level=error should match"
1102 );
1103 }
1104
1105 #[cfg(feature = "cef")]
1106 #[test]
1107 fn format_cef_matches() {
1108 let proc = make_processor(
1109 r#"
1110title: CEF Test
1111status: test
1112logsource:
1113 category: test
1114detection:
1115 selection:
1116 deviceVendor: Security
1117 condition: selection
1118"#,
1119 );
1120
1121 let batch = vec!["CEF:0|Security|IDS|1.0|100|Attack|9|src=10.0.0.1".to_string()];
1122 let results = proc.process_batch_with_format(&batch, &InputFormat::Cef, None);
1123 assert_eq!(results.len(), 1);
1124 assert!(
1125 results[0].detection_count() > 0,
1126 "CEF deviceVendor=Security should match"
1127 );
1128 }
1129}