1use std::sync::{Arc, Mutex};
2use std::time::Instant;
3
4use arc_swap::ArcSwap;
5use rsigma_eval::{Event, JsonEvent, ProcessResult};
6
7use crate::engine::RuntimeEngine;
8use crate::input::{EventInputDecoded, InputFormat, parse_line};
9use crate::metrics::MetricsHook;
10
11pub type EventFilter = dyn Fn(&serde_json::Value) -> Vec<serde_json::Value>;
16
17pub struct LogProcessor {
25 engine: Arc<ArcSwap<Mutex<RuntimeEngine>>>,
26 metrics: Arc<dyn MetricsHook>,
27}
28
29impl LogProcessor {
30 pub fn new(engine: RuntimeEngine, metrics: Arc<dyn MetricsHook>) -> Self {
32 LogProcessor {
33 engine: Arc::new(ArcSwap::from_pointee(Mutex::new(engine))),
34 metrics,
35 }
36 }
37
38 pub fn swap_engine(&self, new_engine: RuntimeEngine) {
44 self.engine.store(Arc::new(Mutex::new(new_engine)));
45 }
46
47 pub fn engine_snapshot(&self) -> arc_swap::Guard<Arc<Mutex<RuntimeEngine>>> {
52 self.engine.load()
53 }
54
55 pub fn process_batch_lines(
65 &self,
66 batch: &[String],
67 event_filter: &EventFilter,
68 ) -> Vec<ProcessResult> {
69 let engine_guard = self.engine.load();
70 let mut engine = engine_guard.lock().unwrap();
71
72 let mut parsed: Vec<(usize, Vec<serde_json::Value>)> = Vec::with_capacity(batch.len());
74 for (line_idx, line) in batch.iter().enumerate() {
75 match serde_json::from_str::<serde_json::Value>(line) {
76 Ok(value) => {
77 let payloads = event_filter(&value);
78 if !payloads.is_empty() {
79 parsed.push((line_idx, payloads));
80 }
81 }
82 Err(e) => {
83 self.metrics.on_parse_error();
84 tracing::debug!(error = %e, "Invalid JSON on input");
85 }
86 }
87 }
88
89 let mut flat: Vec<(usize, &serde_json::Value)> = Vec::new();
91 for (line_idx, payloads) in &parsed {
92 for payload in payloads {
93 flat.push((*line_idx, payload));
94 }
95 }
96
97 if flat.is_empty() {
98 return empty_results(batch.len());
99 }
100
101 let events: Vec<JsonEvent> = flat.iter().map(|(_, v)| JsonEvent::borrow(v)).collect();
103 let event_refs: Vec<&JsonEvent> = events.iter().collect();
104
105 let start = Instant::now();
106 let batch_results = engine.process_batch(&event_refs);
107 let elapsed = start.elapsed().as_secs_f64();
108 let per_event_latency = elapsed / event_refs.len() as f64;
109
110 let stats = engine.stats();
112 self.metrics
113 .set_correlation_state_entries(stats.state_entries as u64);
114
115 let mut line_results = empty_results(batch.len());
117
118 for ((line_idx, _), result) in flat.iter().zip(batch_results) {
119 self.metrics.on_events_processed(1);
120 self.metrics.observe_processing_latency(per_event_latency);
121 self.metrics
122 .on_detection_matches(result.detections.len() as u64);
123 self.metrics
124 .on_correlation_matches(result.correlations.len() as u64);
125
126 line_results[*line_idx].detections.extend(result.detections);
127 line_results[*line_idx]
128 .correlations
129 .extend(result.correlations);
130 }
131
132 line_results
133 }
134
135 pub fn process_batch_with_format(
145 &self,
146 batch: &[String],
147 format: &InputFormat,
148 event_filter: Option<&EventFilter>,
149 ) -> Vec<ProcessResult> {
150 let engine_guard = self.engine.load();
151 let mut engine = engine_guard.lock().unwrap();
152
153 let mut decoded_events: Vec<(usize, EventInputDecoded)> = Vec::with_capacity(batch.len());
156
157 for (line_idx, line) in batch.iter().enumerate() {
158 let Some(decoded) = parse_line(line, format) else {
159 if !line.trim().is_empty() {
160 self.metrics.on_parse_error();
161 tracing::debug!("Failed to parse input line");
162 }
163 continue;
164 };
165
166 if let Some(filter) = event_filter
169 && let EventInputDecoded::Json(ref json_event) = decoded
170 {
171 let json_value = json_event.to_json();
172 let payloads = filter(&json_value);
173 for payload in payloads {
174 decoded_events
175 .push((line_idx, EventInputDecoded::Json(JsonEvent::owned(payload))));
176 }
177 continue;
178 }
179
180 decoded_events.push((line_idx, decoded));
181 }
182
183 if decoded_events.is_empty() {
184 return empty_results(batch.len());
185 }
186
187 let event_refs: Vec<&EventInputDecoded> = decoded_events.iter().map(|(_, e)| e).collect();
189
190 let start = Instant::now();
191 let batch_results = engine.process_batch(&event_refs);
192 let elapsed = start.elapsed().as_secs_f64();
193 let per_event_latency = elapsed / event_refs.len() as f64;
194
195 let stats = engine.stats();
196 self.metrics
197 .set_correlation_state_entries(stats.state_entries as u64);
198
199 let mut line_results = empty_results(batch.len());
201
202 for ((line_idx, _), result) in decoded_events.iter().zip(batch_results) {
203 self.metrics.on_events_processed(1);
204 self.metrics.observe_processing_latency(per_event_latency);
205 self.metrics
206 .on_detection_matches(result.detections.len() as u64);
207 self.metrics
208 .on_correlation_matches(result.correlations.len() as u64);
209
210 line_results[*line_idx].detections.extend(result.detections);
211 line_results[*line_idx]
212 .correlations
213 .extend(result.correlations);
214 }
215
216 line_results
217 }
218
219 pub fn reload_rules(&self) -> Result<crate::engine::EngineStats, String> {
226 let (old_state, rules_path, pipelines, corr_config, include_event) = {
227 let snapshot = self.engine.load();
228 let old = snapshot.lock().unwrap();
229 (
230 old.export_state(),
231 old.rules_path().to_path_buf(),
232 old.pipelines().to_vec(),
233 old.corr_config().clone(),
234 old.include_event(),
235 )
236 };
237
238 let mut new_engine = RuntimeEngine::new(rules_path, pipelines, corr_config, include_event);
239 let stats = new_engine.load_rules()?;
240
241 if let Some(state) = old_state
242 && !new_engine.import_state(&state)
243 {
244 tracing::warn!(
245 "Incompatible correlation snapshot version during reload, starting fresh"
246 );
247 }
248
249 self.swap_engine(new_engine);
250 Ok(stats)
251 }
252
253 pub fn rules_path(&self) -> std::path::PathBuf {
255 let snapshot = self.engine.load();
256 let engine = snapshot.lock().unwrap();
257 engine.rules_path().to_path_buf()
258 }
259
260 pub fn metrics(&self) -> &dyn MetricsHook {
262 &*self.metrics
263 }
264
265 pub fn export_state(&self) -> Option<rsigma_eval::CorrelationSnapshot> {
267 let snapshot = self.engine.load();
268 let engine = snapshot.lock().unwrap();
269 engine.export_state()
270 }
271
272 pub fn import_state(&self, snapshot: &rsigma_eval::CorrelationSnapshot) -> bool {
274 let guard = self.engine.load();
275 let mut engine = guard.lock().unwrap();
276 engine.import_state(snapshot)
277 }
278
279 pub fn stats(&self) -> crate::engine::EngineStats {
281 let snapshot = self.engine.load();
282 let engine = snapshot.lock().unwrap();
283 engine.stats()
284 }
285}
286
287fn empty_results(count: usize) -> Vec<ProcessResult> {
289 (0..count)
290 .map(|_| ProcessResult {
291 detections: vec![],
292 correlations: vec![],
293 })
294 .collect()
295}
296
297#[cfg(test)]
298mod tests {
299 use super::*;
300 use crate::metrics::NoopMetrics;
301 use rsigma_eval::CorrelationConfig;
302
303 fn identity_filter(v: &serde_json::Value) -> Vec<serde_json::Value> {
304 vec![v.clone()]
305 }
306
307 fn make_processor(rules_yaml: &str) -> LogProcessor {
308 let dir = tempfile::tempdir().unwrap();
309 let rule_path = dir.path().join("test.yml");
310 std::fs::write(&rule_path, rules_yaml).unwrap();
311
312 let mut engine = RuntimeEngine::new(rule_path, vec![], CorrelationConfig::default(), false);
313 engine.load_rules().unwrap();
314 std::mem::forget(dir);
316 LogProcessor::new(engine, Arc::new(NoopMetrics))
317 }
318
319 #[test]
320 fn process_batch_lines_valid_json() {
321 let proc = make_processor(
322 r#"
323title: Test Rule
324status: test
325logsource:
326 category: test
327detection:
328 selection:
329 EventID: 1
330 condition: selection
331"#,
332 );
333
334 let batch = vec![
335 r#"{"EventID": 1}"#.to_string(),
336 r#"{"EventID": 2}"#.to_string(),
337 ];
338 let results = proc.process_batch_lines(&batch, &identity_filter);
339 assert_eq!(results.len(), 2);
340 assert!(!results[0].detections.is_empty(), "EventID=1 should match");
341 assert!(
342 results[1].detections.is_empty(),
343 "EventID=2 should not match"
344 );
345 }
346
347 #[test]
348 fn process_batch_lines_invalid_json() {
349 let proc = make_processor(
350 r#"
351title: Test Rule
352status: test
353logsource:
354 category: test
355detection:
356 selection:
357 EventID: 1
358 condition: selection
359"#,
360 );
361
362 let batch = vec!["not json".to_string(), r#"{"EventID": 1}"#.to_string()];
363 let results = proc.process_batch_lines(&batch, &identity_filter);
364 assert_eq!(results.len(), 2);
365 assert!(
366 results[0].detections.is_empty(),
367 "invalid JSON produces empty result"
368 );
369 assert!(
370 !results[1].detections.is_empty(),
371 "valid line still matches"
372 );
373 }
374
375 #[test]
376 fn swap_engine_replaces_rules() {
377 let dir = tempfile::tempdir().unwrap();
378 let rule_path = dir.path().join("test.yml");
379 std::fs::write(
380 &rule_path,
381 r#"
382title: Rule A
383status: test
384logsource:
385 category: test
386detection:
387 selection:
388 EventID: 1
389 condition: selection
390"#,
391 )
392 .unwrap();
393
394 let mut engine = RuntimeEngine::new(
395 rule_path.clone(),
396 vec![],
397 CorrelationConfig::default(),
398 false,
399 );
400 engine.load_rules().unwrap();
401 let proc = LogProcessor::new(engine, Arc::new(NoopMetrics));
402
403 let batch = vec![r#"{"EventID": 1}"#.to_string()];
404 assert!(
405 !proc.process_batch_lines(&batch, &identity_filter)[0]
406 .detections
407 .is_empty()
408 );
409
410 std::fs::write(
412 &rule_path,
413 r#"
414title: Rule B
415status: test
416logsource:
417 category: test
418detection:
419 selection:
420 EventID: 99
421 condition: selection
422"#,
423 )
424 .unwrap();
425
426 let mut new_engine =
427 RuntimeEngine::new(rule_path, vec![], CorrelationConfig::default(), false);
428 new_engine.load_rules().unwrap();
429 proc.swap_engine(new_engine);
430
431 assert!(
432 proc.process_batch_lines(&batch, &identity_filter)[0]
433 .detections
434 .is_empty()
435 );
436
437 let batch2 = vec![r#"{"EventID": 99}"#.to_string()];
438 assert!(
439 !proc.process_batch_lines(&batch2, &identity_filter)[0]
440 .detections
441 .is_empty()
442 );
443
444 std::mem::forget(dir);
445 }
446
447 #[test]
448 fn reload_rules_preserves_engine() {
449 let dir = tempfile::tempdir().unwrap();
450 let rule_path = dir.path().join("test.yml");
451 std::fs::write(
452 &rule_path,
453 r#"
454title: Rule A
455status: test
456logsource:
457 category: test
458detection:
459 selection:
460 EventID: 1
461 condition: selection
462"#,
463 )
464 .unwrap();
465
466 let mut engine = RuntimeEngine::new(
467 rule_path.clone(),
468 vec![],
469 CorrelationConfig::default(),
470 false,
471 );
472 engine.load_rules().unwrap();
473 let proc = LogProcessor::new(engine, Arc::new(NoopMetrics));
474
475 let batch = vec![r#"{"EventID": 1}"#.to_string()];
476 assert!(
477 !proc.process_batch_lines(&batch, &identity_filter)[0]
478 .detections
479 .is_empty()
480 );
481
482 std::fs::write(
484 &rule_path,
485 r#"
486title: Rule B
487status: test
488logsource:
489 category: test
490detection:
491 selection:
492 EventID: 42
493 condition: selection
494"#,
495 )
496 .unwrap();
497
498 let stats = proc.reload_rules().unwrap();
499 assert_eq!(stats.detection_rules, 1);
500
501 assert!(
503 proc.process_batch_lines(&batch, &identity_filter)[0]
504 .detections
505 .is_empty()
506 );
507 let batch2 = vec![r#"{"EventID": 42}"#.to_string()];
509 assert!(
510 !proc.process_batch_lines(&batch2, &identity_filter)[0]
511 .detections
512 .is_empty()
513 );
514
515 std::mem::forget(dir);
516 }
517
518 #[test]
519 fn custom_event_filter() {
520 let proc = make_processor(
521 r#"
522title: Test Rule
523status: test
524logsource:
525 category: test
526detection:
527 selection:
528 EventID: 1
529 condition: selection
530"#,
531 );
532
533 let filter = |v: &serde_json::Value| -> Vec<serde_json::Value> {
535 if let Some(records) = v.get("records").and_then(|r| r.as_array()) {
536 records.clone()
537 } else {
538 vec![v.clone()]
539 }
540 };
541
542 let batch = vec![r#"{"records": [{"EventID": 1}, {"EventID": 2}]}"#.to_string()];
543 let results = proc.process_batch_lines(&batch, &filter);
544 assert_eq!(results.len(), 1);
545 assert_eq!(
546 results[0].detections.len(),
547 1,
548 "only EventID=1 from records array should match"
549 );
550 }
551
552 #[test]
553 fn empty_batch_returns_empty() {
554 let proc = make_processor(
555 r#"
556title: Test Rule
557status: test
558logsource:
559 category: test
560detection:
561 selection:
562 EventID: 1
563 condition: selection
564"#,
565 );
566
567 let batch: Vec<String> = vec![];
568 let results = proc.process_batch_lines(&batch, &identity_filter);
569 assert!(results.is_empty());
570 }
571
572 #[test]
574 fn metrics_hook_invocations() {
575 use std::sync::atomic::{AtomicU64, Ordering};
576
577 struct CountingMetrics {
578 parse_errors: AtomicU64,
579 events_processed: AtomicU64,
580 detection_matches: AtomicU64,
581 }
582
583 impl MetricsHook for CountingMetrics {
584 fn on_parse_error(&self) {
585 self.parse_errors.fetch_add(1, Ordering::Relaxed);
586 }
587 fn on_events_processed(&self, count: u64) {
588 self.events_processed.fetch_add(count, Ordering::Relaxed);
589 }
590 fn on_detection_matches(&self, count: u64) {
591 self.detection_matches.fetch_add(count, Ordering::Relaxed);
592 }
593 fn on_correlation_matches(&self, _: u64) {}
594 fn observe_processing_latency(&self, _: f64) {}
595 fn on_input_queue_depth_change(&self, _: i64) {}
596 fn on_back_pressure(&self) {}
597 fn observe_batch_size(&self, _: u64) {}
598 fn on_output_queue_depth_change(&self, _: i64) {}
599 fn observe_pipeline_latency(&self, _: f64) {}
600 fn set_correlation_state_entries(&self, _: u64) {}
601 }
602
603 let dir = tempfile::tempdir().unwrap();
604 let rule_path = dir.path().join("test.yml");
605 std::fs::write(
606 &rule_path,
607 r#"
608title: Test Rule
609status: test
610logsource:
611 category: test
612detection:
613 selection:
614 EventID: 1
615 condition: selection
616"#,
617 )
618 .unwrap();
619
620 let mut engine = RuntimeEngine::new(rule_path, vec![], CorrelationConfig::default(), false);
621 engine.load_rules().unwrap();
622
623 let metrics = Arc::new(CountingMetrics {
624 parse_errors: AtomicU64::new(0),
625 events_processed: AtomicU64::new(0),
626 detection_matches: AtomicU64::new(0),
627 });
628 let proc = LogProcessor::new(engine, metrics.clone());
629
630 let batch = vec![
631 "not json".to_string(),
632 r#"{"EventID": 1}"#.to_string(),
633 r#"{"EventID": 2}"#.to_string(),
634 ];
635 proc.process_batch_lines(&batch, &identity_filter);
636
637 assert_eq!(metrics.parse_errors.load(Ordering::Relaxed), 1);
638 assert_eq!(metrics.events_processed.load(Ordering::Relaxed), 2);
639 assert_eq!(metrics.detection_matches.load(Ordering::Relaxed), 1);
640
641 std::mem::forget(dir);
642 }
643
644 #[test]
646 fn concurrent_swap_and_process() {
647 let dir = tempfile::tempdir().unwrap();
648 let rule_path = dir.path().join("test.yml");
649 std::fs::write(
650 &rule_path,
651 r#"
652title: Rule A
653status: test
654logsource:
655 category: test
656detection:
657 selection:
658 EventID: 1
659 condition: selection
660"#,
661 )
662 .unwrap();
663
664 let mut engine = RuntimeEngine::new(
665 rule_path.clone(),
666 vec![],
667 CorrelationConfig::default(),
668 false,
669 );
670 engine.load_rules().unwrap();
671 let proc = Arc::new(LogProcessor::new(engine, Arc::new(NoopMetrics)));
672
673 let handles: Vec<_> = (0..4)
674 .map(|i| {
675 let proc = proc.clone();
676 let rule_path = rule_path.clone();
677 std::thread::spawn(move || {
678 let batch = vec![r#"{"EventID": 1}"#.to_string()];
679 for _ in 0..100 {
680 let _ = proc.process_batch_lines(&batch, &identity_filter);
681 }
682 if i == 0 {
684 let mut new_engine = RuntimeEngine::new(
685 rule_path,
686 vec![],
687 CorrelationConfig::default(),
688 false,
689 );
690 new_engine.load_rules().unwrap();
691 proc.swap_engine(new_engine);
692 }
693 })
694 })
695 .collect();
696
697 for h in handles {
698 h.join().unwrap();
699 }
700
701 std::mem::forget(dir);
702 }
703
704 #[test]
707 fn format_json_matches() {
708 let proc = make_processor(
709 r#"
710title: Test Rule
711status: test
712logsource:
713 category: test
714detection:
715 selection:
716 EventID: 1
717 condition: selection
718"#,
719 );
720
721 let batch = vec![r#"{"EventID": 1}"#.to_string()];
722 let results = proc.process_batch_with_format(&batch, &InputFormat::Json, None);
723 assert_eq!(results.len(), 1);
724 assert!(
725 !results[0].detections.is_empty(),
726 "JSON EventID=1 should match"
727 );
728 }
729
730 #[test]
731 fn format_syslog_extracts_fields() {
732 let proc = make_processor(
733 r#"
734title: Syslog Test
735status: test
736logsource:
737 category: test
738detection:
739 selection:
740 hostname: mymachine
741 condition: selection
742"#,
743 );
744
745 let batch = vec!["<34>Oct 11 22:14:15 mymachine su: test message".to_string()];
746 let results = proc.process_batch_with_format(
747 &batch,
748 &InputFormat::Syslog(crate::input::SyslogConfig::default()),
749 None,
750 );
751 assert_eq!(results.len(), 1);
752 assert!(
753 !results[0].detections.is_empty(),
754 "syslog hostname=mymachine should match"
755 );
756 }
757
758 #[test]
759 fn format_plain_keyword_match() {
760 let proc = make_processor(
761 r#"
762title: Keyword Test
763status: test
764logsource:
765 category: test
766detection:
767 keywords:
768 - "disk full"
769 condition: keywords
770"#,
771 );
772
773 let batch = vec!["ERROR: disk full on /dev/sda1".to_string()];
774 let results = proc.process_batch_with_format(&batch, &InputFormat::Plain, None);
775 assert_eq!(results.len(), 1);
776 assert!(
777 !results[0].detections.is_empty(),
778 "plain keyword 'disk full' should match"
779 );
780 }
781
782 #[test]
783 fn format_auto_detects_json() {
784 let proc = make_processor(
785 r#"
786title: Test Rule
787status: test
788logsource:
789 category: test
790detection:
791 selection:
792 EventID: 1
793 condition: selection
794"#,
795 );
796
797 let batch = vec![r#"{"EventID": 1}"#.to_string()];
798 let results = proc.process_batch_with_format(&batch, &InputFormat::default(), None);
799 assert_eq!(results.len(), 1);
800 assert!(!results[0].detections.is_empty());
801 }
802
803 #[test]
804 fn format_json_with_event_filter() {
805 let proc = make_processor(
806 r#"
807title: Test Rule
808status: test
809logsource:
810 category: test
811detection:
812 selection:
813 EventID: 1
814 condition: selection
815"#,
816 );
817
818 let filter = |v: &serde_json::Value| -> Vec<serde_json::Value> {
819 if let Some(records) = v.get("records").and_then(|r| r.as_array()) {
820 records.clone()
821 } else {
822 vec![v.clone()]
823 }
824 };
825
826 let batch = vec![r#"{"records": [{"EventID": 1}, {"EventID": 2}]}"#.to_string()];
827 let results = proc.process_batch_with_format(&batch, &InputFormat::Json, Some(&filter));
828 assert_eq!(results.len(), 1);
829 assert_eq!(
830 results[0].detections.len(),
831 1,
832 "only EventID=1 from records array should match"
833 );
834 }
835
836 #[test]
837 fn format_empty_lines_skipped() {
838 let proc = make_processor(
839 r#"
840title: Test Rule
841status: test
842logsource:
843 category: test
844detection:
845 selection:
846 EventID: 1
847 condition: selection
848"#,
849 );
850
851 let batch = vec![
852 "".to_string(),
853 " ".to_string(),
854 r#"{"EventID": 1}"#.to_string(),
855 ];
856 let results = proc.process_batch_with_format(&batch, &InputFormat::Json, None);
857 assert_eq!(results.len(), 3);
858 assert!(results[0].detections.is_empty());
859 assert!(results[1].detections.is_empty());
860 assert!(!results[2].detections.is_empty());
861 }
862
863 #[cfg(feature = "logfmt")]
864 #[test]
865 fn format_logfmt_matches() {
866 let proc = make_processor(
867 r#"
868title: Logfmt Test
869status: test
870logsource:
871 category: test
872detection:
873 selection:
874 level: error
875 condition: selection
876"#,
877 );
878
879 let batch = vec!["level=error msg=something host=web01".to_string()];
880 let results = proc.process_batch_with_format(&batch, &InputFormat::Logfmt, None);
881 assert_eq!(results.len(), 1);
882 assert!(
883 !results[0].detections.is_empty(),
884 "logfmt level=error should match"
885 );
886 }
887
888 #[cfg(feature = "cef")]
889 #[test]
890 fn format_cef_matches() {
891 let proc = make_processor(
892 r#"
893title: CEF Test
894status: test
895logsource:
896 category: test
897detection:
898 selection:
899 deviceVendor: Security
900 condition: selection
901"#,
902 );
903
904 let batch = vec!["CEF:0|Security|IDS|1.0|100|Attack|9|src=10.0.0.1".to_string()];
905 let results = proc.process_batch_with_format(&batch, &InputFormat::Cef, None);
906 assert_eq!(results.len(), 1);
907 assert!(
908 !results[0].detections.is_empty(),
909 "CEF deviceVendor=Security should match"
910 );
911 }
912}