1use std::sync::Arc;
2
3use parking_lot::Mutex;
4use std::time::Instant;
5
6use arc_swap::ArcSwap;
7use rsigma_eval::{Event, FieldObserver, JsonEvent, ProcessResult, ProcessResultExt, RuleFieldSet};
8
9use crate::engine::RuntimeEngine;
10use crate::input::{EventInputDecoded, InputFormat, parse_line};
11use crate::metrics::MetricsHook;
12use crate::tap::{TapPayload, TapRegistry, TapStage};
13
14pub type EventFilter = dyn Fn(&serde_json::Value) -> Vec<serde_json::Value>;
19
20pub struct LogProcessor {
28 engine: Arc<ArcSwap<Mutex<RuntimeEngine>>>,
29 metrics: Arc<dyn MetricsHook>,
30 field_observer: ArcSwap<Option<Arc<FieldObserver>>>,
35 event_tap: ArcSwap<Option<Arc<TapRegistry>>>,
41}
42
43impl LogProcessor {
44 pub fn new(engine: RuntimeEngine, metrics: Arc<dyn MetricsHook>) -> Self {
46 LogProcessor {
47 engine: Arc::new(ArcSwap::from_pointee(Mutex::new(engine))),
48 metrics,
49 field_observer: ArcSwap::new(Arc::new(None)),
50 event_tap: ArcSwap::new(Arc::new(None)),
51 }
52 }
53
54 pub fn set_field_observer(&self, observer: Option<Arc<FieldObserver>>) {
62 self.field_observer.store(Arc::new(observer));
63 }
64
65 pub fn field_observer(&self) -> Option<Arc<FieldObserver>> {
67 self.field_observer.load_full().as_ref().clone()
68 }
69
70 pub fn set_event_tap(&self, tap: Option<Arc<TapRegistry>>) {
79 self.event_tap.store(Arc::new(tap));
80 }
81
82 pub fn event_tap(&self) -> Option<Arc<TapRegistry>> {
84 self.event_tap.load_full().as_ref().clone()
85 }
86
87 pub fn swap_engine(&self, new_engine: RuntimeEngine) {
93 self.engine.store(Arc::new(Mutex::new(new_engine)));
94 }
95
96 pub fn engine_snapshot(&self) -> arc_swap::Guard<Arc<Mutex<RuntimeEngine>>> {
101 self.engine.load()
102 }
103
104 pub fn process_batch_lines(
114 &self,
115 batch: &[String],
116 event_filter: &EventFilter,
117 ) -> Vec<ProcessResult> {
118 let engine_guard = self.engine.load();
119 let mut engine = engine_guard.lock();
120
121 let mut parsed: Vec<(usize, Vec<serde_json::Value>)> = Vec::with_capacity(batch.len());
123 for (line_idx, line) in batch.iter().enumerate() {
124 match serde_json::from_str::<serde_json::Value>(line) {
125 Ok(value) => {
126 let payloads = event_filter(&value);
127 if !payloads.is_empty() {
128 parsed.push((line_idx, payloads));
129 }
130 }
131 Err(e) => {
132 self.metrics.on_parse_error();
133 tracing::debug!(error = %e, "Invalid JSON on input");
134 }
135 }
136 }
137
138 let mut flat: Vec<(usize, &serde_json::Value)> = Vec::new();
140 for (line_idx, payloads) in &parsed {
141 for payload in payloads {
142 flat.push((*line_idx, payload));
143 }
144 }
145
146 if flat.is_empty() {
147 return empty_results(batch.len());
148 }
149
150 let events: Vec<JsonEvent> = flat.iter().map(|(_, v)| JsonEvent::borrow(v)).collect();
152 let event_refs: Vec<&JsonEvent> = events.iter().collect();
153
154 let start = Instant::now();
155 let batch_results = engine.process_batch(&event_refs);
156 let elapsed = start.elapsed().as_secs_f64();
157 let per_event_latency = elapsed / event_refs.len() as f64;
158
159 let stats = engine.stats();
161 self.metrics
162 .set_correlation_state_entries(stats.state_entries as u64);
163
164 let mut line_results = empty_results(batch.len());
166
167 for ((line_idx, _), result) in flat.iter().zip(batch_results) {
168 self.metrics.on_events_processed(1);
169 self.metrics.observe_processing_latency(per_event_latency);
170 self.metrics
171 .on_detection_matches(result.detection_count() as u64);
172 self.metrics
173 .on_correlation_matches(result.correlation_count() as u64);
174
175 for r in &result {
176 let level_str = r.header.level.as_ref().map_or("unknown", |l| l.as_str());
177 let title = &r.header.rule_title;
178 match &r.body {
179 rsigma_eval::ResultBody::Detection(_) => {
180 self.metrics.on_detection_match_detail(title, level_str);
181 }
182 rsigma_eval::ResultBody::Correlation(body) => {
183 self.metrics.on_correlation_match_detail(
184 title,
185 level_str,
186 body.correlation_type.as_str(),
187 );
188 }
189 }
190 }
191
192 line_results[*line_idx].extend(result);
193 }
194
195 line_results
196 }
197
198 pub fn process_batch_with_format(
208 &self,
209 batch: &[String],
210 format: &InputFormat,
211 event_filter: Option<&EventFilter>,
212 ) -> Vec<ProcessResult> {
213 let engine_guard = self.engine.load();
214 let mut engine = engine_guard.lock();
215
216 let tap_guard = self.event_tap.load();
220 let tap_sessions = tap_guard
221 .as_ref()
222 .as_ref()
223 .map(|reg| reg.sessions_snapshot());
224 let tap_has_raw = tap_sessions
225 .as_ref()
226 .is_some_and(|s| s.iter().any(|x| x.stage == TapStage::Raw));
227 let tap_has_decoded = tap_sessions
228 .as_ref()
229 .is_some_and(|s| s.iter().any(|x| x.stage == TapStage::Decoded));
230
231 let mut decoded_events: Vec<(usize, EventInputDecoded)> = Vec::with_capacity(batch.len());
234
235 for (line_idx, line) in batch.iter().enumerate() {
236 if tap_has_raw
239 && !line.trim().is_empty()
240 && let Some(sessions) = tap_sessions.as_ref()
241 {
242 for s in sessions.iter().filter(|s| s.stage == TapStage::Raw) {
243 s.offer(TapPayload::Raw(line.clone()));
244 }
245 }
246
247 let Some(decoded) = parse_line(line, format) else {
248 if !line.trim().is_empty() {
249 self.metrics.on_parse_error();
250 tracing::debug!("Failed to parse input line");
251 }
252 continue;
253 };
254
255 if let Some(filter) = event_filter
258 && let EventInputDecoded::Json(ref json_event) = decoded
259 {
260 let json_value = json_event.to_json();
261 let payloads = filter(&json_value);
262 for payload in payloads {
263 decoded_events
264 .push((line_idx, EventInputDecoded::Json(JsonEvent::owned(payload))));
265 }
266 continue;
267 }
268
269 decoded_events.push((line_idx, decoded));
270 }
271
272 if decoded_events.is_empty() {
273 return empty_results(batch.len());
274 }
275
276 let observer_guard = self.field_observer.load();
283 if let Some(observer) = observer_guard.as_ref() {
284 for (_, decoded) in &decoded_events {
285 observer.observe(decoded);
286 }
287 }
288 drop(observer_guard);
289
290 if tap_has_decoded && let Some(sessions) = tap_sessions.as_ref() {
294 for (_, decoded) in &decoded_events {
295 let value = decoded.to_json();
296 for s in sessions.iter().filter(|s| s.stage == TapStage::Decoded) {
297 s.offer(TapPayload::Decoded(Box::new(value.clone())));
298 }
299 }
300 }
301
302 let event_refs: Vec<&EventInputDecoded> = decoded_events.iter().map(|(_, e)| e).collect();
304
305 let start = Instant::now();
306 let batch_results = engine.process_batch(&event_refs);
307 let elapsed = start.elapsed().as_secs_f64();
308 let per_event_latency = elapsed / event_refs.len() as f64;
309
310 let stats = engine.stats();
311 self.metrics
312 .set_correlation_state_entries(stats.state_entries as u64);
313
314 let mut line_results = empty_results(batch.len());
316
317 for ((line_idx, _), result) in decoded_events.iter().zip(batch_results) {
318 self.metrics.on_events_processed(1);
319 self.metrics.observe_processing_latency(per_event_latency);
320 self.metrics
321 .on_detection_matches(result.detection_count() as u64);
322 self.metrics
323 .on_correlation_matches(result.correlation_count() as u64);
324
325 for r in &result {
326 let level_str = r.header.level.as_ref().map_or("unknown", |l| l.as_str());
327 let title = &r.header.rule_title;
328 match &r.body {
329 rsigma_eval::ResultBody::Detection(_) => {
330 self.metrics.on_detection_match_detail(title, level_str);
331 }
332 rsigma_eval::ResultBody::Correlation(body) => {
333 self.metrics.on_correlation_match_detail(
334 title,
335 level_str,
336 body.correlation_type.as_str(),
337 );
338 }
339 }
340 }
341
342 line_results[*line_idx].extend(result);
343 }
344
345 line_results
346 }
347
348 pub fn reload_rules(&self) -> Result<crate::engine::EngineStats, String> {
358 let snapshot = self.engine.load();
365 let old = snapshot.lock();
366 let old_state = old.export_state();
367 let rules_path = old.rules_path().to_path_buf();
368 let pipelines = old.pipelines().to_vec();
369 let pipeline_paths = old.pipeline_paths().to_vec();
370 let corr_config = old.corr_config().clone();
371 let include_event = old.include_event();
372 let resolver = old.source_resolver().cloned();
373 let allow_remote_include = old.allow_remote_include();
374 let bloom_prefilter = old.bloom_prefilter();
375 let bloom_max_bytes = old.bloom_max_bytes();
376 let match_detail = old.match_detail();
377 #[cfg(feature = "daachorse-index")]
378 let cross_rule_ac = old.cross_rule_ac();
379 drop(old);
380 drop(snapshot);
381
382 let mut new_engine = RuntimeEngine::new(rules_path, pipelines, corr_config, include_event);
383 new_engine.set_pipeline_paths(pipeline_paths);
384 new_engine.set_allow_remote_include(allow_remote_include);
385 new_engine.set_match_detail(match_detail);
386 new_engine.set_bloom_prefilter(bloom_prefilter);
387 if let Some(budget) = bloom_max_bytes {
388 new_engine.set_bloom_max_bytes(budget);
389 }
390 #[cfg(feature = "daachorse-index")]
391 new_engine.set_cross_rule_ac(cross_rule_ac);
392 if let Some(resolver) = resolver {
393 new_engine.set_source_resolver(resolver);
394 }
395 let stats = new_engine.load_rules()?;
396
397 if let Some(state) = old_state
398 && !new_engine.import_state(&state)
399 {
400 tracing::warn!(
401 "Incompatible correlation snapshot version during reload, starting fresh"
402 );
403 }
404
405 self.swap_engine(new_engine);
406 Ok(stats)
407 }
408
409 pub fn rules_path(&self) -> std::path::PathBuf {
411 let snapshot = self.engine.load();
412 let engine = snapshot.lock();
413 engine.rules_path().to_path_buf()
414 }
415
416 pub fn metrics(&self) -> &dyn MetricsHook {
418 &*self.metrics
419 }
420
421 pub fn export_state(&self) -> Option<rsigma_eval::CorrelationSnapshot> {
423 let snapshot = self.engine.load();
424 let engine = snapshot.lock();
425 engine.export_state()
426 }
427
428 pub fn import_state(&self, snapshot: &rsigma_eval::CorrelationSnapshot) -> bool {
430 let guard = self.engine.load();
431 let mut engine = guard.lock();
432 engine.import_state(snapshot)
433 }
434
435 pub fn stats(&self) -> crate::engine::EngineStats {
437 let snapshot = self.engine.load();
438 let engine = snapshot.lock();
439 engine.stats()
440 }
441
442 pub fn rule_field_set(&self) -> Arc<RuleFieldSet> {
446 let snapshot = self.engine.load();
447 let engine = snapshot.lock();
448 engine.rule_field_set()
449 }
450}
451
452fn empty_results(count: usize) -> Vec<ProcessResult> {
454 (0..count).map(|_| ProcessResult::new()).collect()
455}
456
457#[cfg(test)]
458mod tests {
459 use super::*;
460 use crate::metrics::NoopMetrics;
461 use rsigma_eval::CorrelationConfig;
462
463 fn identity_filter(v: &serde_json::Value) -> Vec<serde_json::Value> {
464 vec![v.clone()]
465 }
466
467 fn make_processor(rules_yaml: &str) -> LogProcessor {
468 let dir = tempfile::tempdir().unwrap();
469 let rule_path = dir.path().join("test.yml");
470 std::fs::write(&rule_path, rules_yaml).unwrap();
471
472 let mut engine = RuntimeEngine::new(rule_path, vec![], CorrelationConfig::default(), false);
473 engine.load_rules().unwrap();
474 std::mem::forget(dir);
476 LogProcessor::new(engine, Arc::new(NoopMetrics))
477 }
478
479 #[test]
480 fn process_batch_lines_valid_json() {
481 let proc = make_processor(
482 r#"
483title: Test Rule
484status: test
485logsource:
486 category: test
487detection:
488 selection:
489 EventID: 1
490 condition: selection
491"#,
492 );
493
494 let batch = vec![
495 r#"{"EventID": 1}"#.to_string(),
496 r#"{"EventID": 2}"#.to_string(),
497 ];
498 let results = proc.process_batch_lines(&batch, &identity_filter);
499 assert_eq!(results.len(), 2);
500 assert!(results[0].detection_count() > 0, "EventID=1 should match");
501 assert!(
502 results[1].detection_count() == 0,
503 "EventID=2 should not match"
504 );
505 }
506
507 #[test]
508 fn process_batch_lines_invalid_json() {
509 let proc = make_processor(
510 r#"
511title: Test Rule
512status: test
513logsource:
514 category: test
515detection:
516 selection:
517 EventID: 1
518 condition: selection
519"#,
520 );
521
522 let batch = vec!["not json".to_string(), r#"{"EventID": 1}"#.to_string()];
523 let results = proc.process_batch_lines(&batch, &identity_filter);
524 assert_eq!(results.len(), 2);
525 assert!(
526 results[0].detection_count() == 0,
527 "invalid JSON produces empty result"
528 );
529 assert!(results[1].detection_count() > 0, "valid line still matches");
530 }
531
532 #[test]
533 fn swap_engine_replaces_rules() {
534 let dir = tempfile::tempdir().unwrap();
535 let rule_path = dir.path().join("test.yml");
536 std::fs::write(
537 &rule_path,
538 r#"
539title: Rule A
540status: test
541logsource:
542 category: test
543detection:
544 selection:
545 EventID: 1
546 condition: selection
547"#,
548 )
549 .unwrap();
550
551 let mut engine = RuntimeEngine::new(
552 rule_path.clone(),
553 vec![],
554 CorrelationConfig::default(),
555 false,
556 );
557 engine.load_rules().unwrap();
558 let proc = LogProcessor::new(engine, Arc::new(NoopMetrics));
559
560 let batch = vec![r#"{"EventID": 1}"#.to_string()];
561 assert!(proc.process_batch_lines(&batch, &identity_filter)[0].detection_count() > 0);
562
563 std::fs::write(
565 &rule_path,
566 r#"
567title: Rule B
568status: test
569logsource:
570 category: test
571detection:
572 selection:
573 EventID: 99
574 condition: selection
575"#,
576 )
577 .unwrap();
578
579 let mut new_engine =
580 RuntimeEngine::new(rule_path, vec![], CorrelationConfig::default(), false);
581 new_engine.load_rules().unwrap();
582 proc.swap_engine(new_engine);
583
584 assert!(proc.process_batch_lines(&batch, &identity_filter)[0].detection_count() == 0);
585
586 let batch2 = vec![r#"{"EventID": 99}"#.to_string()];
587 assert!(proc.process_batch_lines(&batch2, &identity_filter)[0].detection_count() > 0);
588
589 std::mem::forget(dir);
590 }
591
592 #[test]
593 fn reload_rules_preserves_bloom_tuning() {
594 let dir = tempfile::tempdir().unwrap();
601 let rule_path = dir.path().join("test.yml");
602 std::fs::write(
603 &rule_path,
604 r#"
605title: Rule A
606status: test
607logsource:
608 category: test
609detection:
610 selection:
611 EventID: 1
612 condition: selection
613"#,
614 )
615 .unwrap();
616
617 let mut engine = RuntimeEngine::new(
618 rule_path.clone(),
619 vec![],
620 CorrelationConfig::default(),
621 false,
622 );
623 engine.set_bloom_prefilter(true);
624 engine.set_bloom_max_bytes(2 * 1024 * 1024);
625 #[cfg(feature = "daachorse-index")]
626 engine.set_cross_rule_ac(true);
627 engine.load_rules().unwrap();
628
629 let proc = LogProcessor::new(engine, Arc::new(NoopMetrics));
630 proc.reload_rules().unwrap();
631
632 let snapshot = proc.engine_snapshot();
633 let reloaded = snapshot.lock();
634 assert!(
635 reloaded.bloom_prefilter(),
636 "bloom_prefilter must survive reload_rules"
637 );
638 assert_eq!(
639 reloaded.bloom_max_bytes(),
640 Some(2 * 1024 * 1024),
641 "bloom_max_bytes must survive reload_rules"
642 );
643 #[cfg(feature = "daachorse-index")]
644 assert!(
645 reloaded.cross_rule_ac(),
646 "cross_rule_ac must survive reload_rules"
647 );
648
649 std::mem::forget(dir);
650 }
651
652 #[test]
653 fn reload_rules_preserves_engine() {
654 let dir = tempfile::tempdir().unwrap();
655 let rule_path = dir.path().join("test.yml");
656 std::fs::write(
657 &rule_path,
658 r#"
659title: Rule A
660status: test
661logsource:
662 category: test
663detection:
664 selection:
665 EventID: 1
666 condition: selection
667"#,
668 )
669 .unwrap();
670
671 let mut engine = RuntimeEngine::new(
672 rule_path.clone(),
673 vec![],
674 CorrelationConfig::default(),
675 false,
676 );
677 engine.load_rules().unwrap();
678 let proc = LogProcessor::new(engine, Arc::new(NoopMetrics));
679
680 let batch = vec![r#"{"EventID": 1}"#.to_string()];
681 assert!(proc.process_batch_lines(&batch, &identity_filter)[0].detection_count() > 0);
682
683 std::fs::write(
685 &rule_path,
686 r#"
687title: Rule B
688status: test
689logsource:
690 category: test
691detection:
692 selection:
693 EventID: 42
694 condition: selection
695"#,
696 )
697 .unwrap();
698
699 let stats = proc.reload_rules().unwrap();
700 assert_eq!(stats.detection_rules, 1);
701
702 assert!(proc.process_batch_lines(&batch, &identity_filter)[0].detection_count() == 0);
704 let batch2 = vec![r#"{"EventID": 42}"#.to_string()];
706 assert!(proc.process_batch_lines(&batch2, &identity_filter)[0].detection_count() > 0);
707
708 std::mem::forget(dir);
709 }
710
711 #[test]
712 fn reload_re_reads_pipelines_from_disk() {
713 let dir = tempfile::tempdir().unwrap();
714
715 let rule_path = dir.path().join("test.yml");
718 std::fs::write(
719 &rule_path,
720 r#"
721title: Rule A
722status: test
723logsource:
724 category: test
725detection:
726 selection:
727 SourceIP: "10.0.0.1"
728 condition: selection
729"#,
730 )
731 .unwrap();
732
733 let pipeline_path = dir.path().join("pipeline.yml");
735 std::fs::write(
736 &pipeline_path,
737 r#"
738name: Initial Pipeline
739priority: 10
740transformations:
741 - id: rename_field
742 type: field_name_mapping
743 mapping:
744 SourceIP: src_ip
745"#,
746 )
747 .unwrap();
748
749 let pipelines = vec![rsigma_eval::parse_pipeline_file(&pipeline_path).unwrap()];
750 let mut engine = RuntimeEngine::new(
751 rule_path.clone(),
752 pipelines,
753 CorrelationConfig::default(),
754 false,
755 );
756 engine.set_pipeline_paths(vec![pipeline_path.clone()]);
757 engine.load_rules().unwrap();
758 let proc = LogProcessor::new(engine, Arc::new(NoopMetrics));
759
760 let batch = vec![r#"{"src_ip": "10.0.0.1"}"#.to_string()];
762 assert!(
763 proc.process_batch_lines(&batch, &identity_filter)[0].detection_count() > 0,
764 "src_ip should match because pipeline mapped SourceIP -> src_ip"
765 );
766
767 std::fs::write(
769 &pipeline_path,
770 r#"
771name: Updated Pipeline
772priority: 10
773transformations:
774 - id: rename_field
775 type: field_name_mapping
776 mapping:
777 SourceIP: source.ip
778"#,
779 )
780 .unwrap();
781
782 proc.reload_rules().unwrap();
783
784 assert!(
786 proc.process_batch_lines(&batch, &identity_filter)[0].detection_count() == 0,
787 "after pipeline reload, src_ip should no longer match"
788 );
789
790 let batch2 = vec![r#"{"source.ip": "10.0.0.1"}"#.to_string()];
792 assert!(
793 proc.process_batch_lines(&batch2, &identity_filter)[0].detection_count() > 0,
794 "after pipeline reload, source.ip should match"
795 );
796
797 std::mem::forget(dir);
798 }
799
800 #[test]
801 fn reload_with_broken_pipeline_keeps_old_engine() {
802 let dir = tempfile::tempdir().unwrap();
803 let rule_path = dir.path().join("test.yml");
804 std::fs::write(
805 &rule_path,
806 r#"
807title: Rule A
808status: test
809logsource:
810 category: test
811detection:
812 selection:
813 SourceIP: "10.0.0.1"
814 condition: selection
815"#,
816 )
817 .unwrap();
818
819 let pipeline_path = dir.path().join("pipeline.yml");
820 std::fs::write(
821 &pipeline_path,
822 r#"
823name: Working Pipeline
824priority: 10
825transformations:
826 - id: rename_field
827 type: field_name_mapping
828 mapping:
829 SourceIP: src_ip
830"#,
831 )
832 .unwrap();
833
834 let pipelines = vec![rsigma_eval::parse_pipeline_file(&pipeline_path).unwrap()];
835 let mut engine = RuntimeEngine::new(
836 rule_path.clone(),
837 pipelines,
838 CorrelationConfig::default(),
839 false,
840 );
841 engine.set_pipeline_paths(vec![pipeline_path.clone()]);
842 engine.load_rules().unwrap();
843 let proc = LogProcessor::new(engine, Arc::new(NoopMetrics));
844
845 let batch = vec![r#"{"src_ip": "10.0.0.1"}"#.to_string()];
847 assert!(proc.process_batch_lines(&batch, &identity_filter)[0].detection_count() > 0);
848
849 std::fs::write(&pipeline_path, "{{{{ invalid yaml !!!!").unwrap();
851
852 let result = proc.reload_rules();
854 assert!(result.is_err(), "reload with broken pipeline should fail");
855
856 assert!(
858 proc.process_batch_lines(&batch, &identity_filter)[0].detection_count() > 0,
859 "old engine should still work after failed reload"
860 );
861
862 std::mem::forget(dir);
863 }
864
865 #[test]
866 fn custom_event_filter() {
867 let proc = make_processor(
868 r#"
869title: Test Rule
870status: test
871logsource:
872 category: test
873detection:
874 selection:
875 EventID: 1
876 condition: selection
877"#,
878 );
879
880 let filter = |v: &serde_json::Value| -> Vec<serde_json::Value> {
882 if let Some(records) = v.get("records").and_then(|r| r.as_array()) {
883 records.clone()
884 } else {
885 vec![v.clone()]
886 }
887 };
888
889 let batch = vec![r#"{"records": [{"EventID": 1}, {"EventID": 2}]}"#.to_string()];
890 let results = proc.process_batch_lines(&batch, &filter);
891 assert_eq!(results.len(), 1);
892 assert_eq!(
893 results[0].detection_count(),
894 1,
895 "only EventID=1 from records array should match"
896 );
897 }
898
899 #[test]
900 fn empty_batch_returns_empty() {
901 let proc = make_processor(
902 r#"
903title: Test Rule
904status: test
905logsource:
906 category: test
907detection:
908 selection:
909 EventID: 1
910 condition: selection
911"#,
912 );
913
914 let batch: Vec<String> = vec![];
915 let results = proc.process_batch_lines(&batch, &identity_filter);
916 assert!(results.is_empty());
917 }
918
919 #[test]
921 fn metrics_hook_invocations() {
922 use std::sync::atomic::{AtomicU64, Ordering};
923
924 struct CountingMetrics {
925 parse_errors: AtomicU64,
926 events_processed: AtomicU64,
927 detection_matches: AtomicU64,
928 }
929
930 impl MetricsHook for CountingMetrics {
931 fn on_parse_error(&self) {
932 self.parse_errors.fetch_add(1, Ordering::Relaxed);
933 }
934 fn on_events_processed(&self, count: u64) {
935 self.events_processed.fetch_add(count, Ordering::Relaxed);
936 }
937 fn on_detection_matches(&self, count: u64) {
938 self.detection_matches.fetch_add(count, Ordering::Relaxed);
939 }
940 fn on_correlation_matches(&self, _: u64) {}
941 fn observe_processing_latency(&self, _: f64) {}
942 fn on_input_queue_depth_change(&self, _: i64) {}
943 fn on_back_pressure(&self) {}
944 fn observe_batch_size(&self, _: u64) {}
945 fn on_output_queue_depth_change(&self, _: i64) {}
946 fn observe_pipeline_latency(&self, _: f64) {}
947 fn set_correlation_state_entries(&self, _: u64) {}
948 }
949
950 let dir = tempfile::tempdir().unwrap();
951 let rule_path = dir.path().join("test.yml");
952 std::fs::write(
953 &rule_path,
954 r#"
955title: Test Rule
956status: test
957logsource:
958 category: test
959detection:
960 selection:
961 EventID: 1
962 condition: selection
963"#,
964 )
965 .unwrap();
966
967 let mut engine = RuntimeEngine::new(rule_path, vec![], CorrelationConfig::default(), false);
968 engine.load_rules().unwrap();
969
970 let metrics = Arc::new(CountingMetrics {
971 parse_errors: AtomicU64::new(0),
972 events_processed: AtomicU64::new(0),
973 detection_matches: AtomicU64::new(0),
974 });
975 let proc = LogProcessor::new(engine, metrics.clone());
976
977 let batch = vec![
978 "not json".to_string(),
979 r#"{"EventID": 1}"#.to_string(),
980 r#"{"EventID": 2}"#.to_string(),
981 ];
982 proc.process_batch_lines(&batch, &identity_filter);
983
984 assert_eq!(metrics.parse_errors.load(Ordering::Relaxed), 1);
985 assert_eq!(metrics.events_processed.load(Ordering::Relaxed), 2);
986 assert_eq!(metrics.detection_matches.load(Ordering::Relaxed), 1);
987
988 std::mem::forget(dir);
989 }
990
991 #[test]
993 fn concurrent_swap_and_process() {
994 let dir = tempfile::tempdir().unwrap();
995 let rule_path = dir.path().join("test.yml");
996 std::fs::write(
997 &rule_path,
998 r#"
999title: Rule A
1000status: test
1001logsource:
1002 category: test
1003detection:
1004 selection:
1005 EventID: 1
1006 condition: selection
1007"#,
1008 )
1009 .unwrap();
1010
1011 let mut engine = RuntimeEngine::new(
1012 rule_path.clone(),
1013 vec![],
1014 CorrelationConfig::default(),
1015 false,
1016 );
1017 engine.load_rules().unwrap();
1018 let proc = Arc::new(LogProcessor::new(engine, Arc::new(NoopMetrics)));
1019
1020 let handles: Vec<_> = (0..4)
1021 .map(|i| {
1022 let proc = proc.clone();
1023 let rule_path = rule_path.clone();
1024 std::thread::spawn(move || {
1025 let batch = vec![r#"{"EventID": 1}"#.to_string()];
1026 for _ in 0..100 {
1027 let _ = proc.process_batch_lines(&batch, &identity_filter);
1028 }
1029 if i == 0 {
1031 let mut new_engine = RuntimeEngine::new(
1032 rule_path,
1033 vec![],
1034 CorrelationConfig::default(),
1035 false,
1036 );
1037 new_engine.load_rules().unwrap();
1038 proc.swap_engine(new_engine);
1039 }
1040 })
1041 })
1042 .collect();
1043
1044 for h in handles {
1045 h.join().unwrap();
1046 }
1047
1048 std::mem::forget(dir);
1049 }
1050
1051 #[test]
1054 fn format_json_matches() {
1055 let proc = make_processor(
1056 r#"
1057title: Test Rule
1058status: test
1059logsource:
1060 category: test
1061detection:
1062 selection:
1063 EventID: 1
1064 condition: selection
1065"#,
1066 );
1067
1068 let batch = vec![r#"{"EventID": 1}"#.to_string()];
1069 let results = proc.process_batch_with_format(&batch, &InputFormat::Json, None);
1070 assert_eq!(results.len(), 1);
1071 assert!(
1072 results[0].detection_count() > 0,
1073 "JSON EventID=1 should match"
1074 );
1075 }
1076
1077 #[test]
1078 fn format_syslog_extracts_fields() {
1079 let proc = make_processor(
1080 r#"
1081title: Syslog Test
1082status: test
1083logsource:
1084 category: test
1085detection:
1086 selection:
1087 hostname: mymachine
1088 condition: selection
1089"#,
1090 );
1091
1092 let batch = vec!["<34>Oct 11 22:14:15 mymachine su: test message".to_string()];
1093 let results = proc.process_batch_with_format(
1094 &batch,
1095 &InputFormat::Syslog(crate::input::SyslogConfig::default()),
1096 None,
1097 );
1098 assert_eq!(results.len(), 1);
1099 assert!(
1100 results[0].detection_count() > 0,
1101 "syslog hostname=mymachine should match"
1102 );
1103 }
1104
1105 #[test]
1106 fn format_plain_keyword_match() {
1107 let proc = make_processor(
1108 r#"
1109title: Keyword Test
1110status: test
1111logsource:
1112 category: test
1113detection:
1114 keywords:
1115 - "disk full"
1116 condition: keywords
1117"#,
1118 );
1119
1120 let batch = vec!["ERROR: disk full on /dev/sda1".to_string()];
1121 let results = proc.process_batch_with_format(&batch, &InputFormat::Plain, None);
1122 assert_eq!(results.len(), 1);
1123 assert!(
1124 results[0].detection_count() > 0,
1125 "plain keyword 'disk full' should match"
1126 );
1127 }
1128
1129 #[test]
1130 fn format_auto_detects_json() {
1131 let proc = make_processor(
1132 r#"
1133title: Test Rule
1134status: test
1135logsource:
1136 category: test
1137detection:
1138 selection:
1139 EventID: 1
1140 condition: selection
1141"#,
1142 );
1143
1144 let batch = vec![r#"{"EventID": 1}"#.to_string()];
1145 let results = proc.process_batch_with_format(&batch, &InputFormat::default(), None);
1146 assert_eq!(results.len(), 1);
1147 assert!(results[0].detection_count() > 0);
1148 }
1149
1150 #[test]
1151 fn format_json_with_event_filter() {
1152 let proc = make_processor(
1153 r#"
1154title: Test Rule
1155status: test
1156logsource:
1157 category: test
1158detection:
1159 selection:
1160 EventID: 1
1161 condition: selection
1162"#,
1163 );
1164
1165 let filter = |v: &serde_json::Value| -> Vec<serde_json::Value> {
1166 if let Some(records) = v.get("records").and_then(|r| r.as_array()) {
1167 records.clone()
1168 } else {
1169 vec![v.clone()]
1170 }
1171 };
1172
1173 let batch = vec![r#"{"records": [{"EventID": 1}, {"EventID": 2}]}"#.to_string()];
1174 let results = proc.process_batch_with_format(&batch, &InputFormat::Json, Some(&filter));
1175 assert_eq!(results.len(), 1);
1176 assert_eq!(
1177 results[0].detection_count(),
1178 1,
1179 "only EventID=1 from records array should match"
1180 );
1181 }
1182
1183 #[test]
1184 fn format_empty_lines_skipped() {
1185 let proc = make_processor(
1186 r#"
1187title: Test Rule
1188status: test
1189logsource:
1190 category: test
1191detection:
1192 selection:
1193 EventID: 1
1194 condition: selection
1195"#,
1196 );
1197
1198 let batch = vec![
1199 "".to_string(),
1200 " ".to_string(),
1201 r#"{"EventID": 1}"#.to_string(),
1202 ];
1203 let results = proc.process_batch_with_format(&batch, &InputFormat::Json, None);
1204 assert_eq!(results.len(), 3);
1205 assert!(results[0].detection_count() == 0);
1206 assert!(results[1].detection_count() == 0);
1207 assert!(results[2].detection_count() > 0);
1208 }
1209
1210 #[cfg(feature = "logfmt")]
1211 #[test]
1212 fn format_logfmt_matches() {
1213 let proc = make_processor(
1214 r#"
1215title: Logfmt Test
1216status: test
1217logsource:
1218 category: test
1219detection:
1220 selection:
1221 level: error
1222 condition: selection
1223"#,
1224 );
1225
1226 let batch = vec!["level=error msg=something host=web01".to_string()];
1227 let results = proc.process_batch_with_format(&batch, &InputFormat::Logfmt, None);
1228 assert_eq!(results.len(), 1);
1229 assert!(
1230 results[0].detection_count() > 0,
1231 "logfmt level=error should match"
1232 );
1233 }
1234
1235 #[cfg(feature = "cef")]
1236 #[test]
1237 fn format_cef_matches() {
1238 let proc = make_processor(
1239 r#"
1240title: CEF Test
1241status: test
1242logsource:
1243 category: test
1244detection:
1245 selection:
1246 deviceVendor: Security
1247 condition: selection
1248"#,
1249 );
1250
1251 let batch = vec!["CEF:0|Security|IDS|1.0|100|Attack|9|src=10.0.0.1".to_string()];
1252 let results = proc.process_batch_with_format(&batch, &InputFormat::Cef, None);
1253 assert_eq!(results.len(), 1);
1254 assert!(
1255 results[0].detection_count() > 0,
1256 "CEF deviceVendor=Security should match"
1257 );
1258 }
1259}