1use std::sync::Arc;
2
3use parking_lot::Mutex;
4use std::time::Instant;
5
6use arc_swap::ArcSwap;
7use rsigma_eval::{
8 Event, FieldObserver, JsonEvent, ProcessResult, ProcessResultExt, RuleFieldSet, SchemaObserver,
9};
10
11use crate::engine::RuntimeEngine;
12use crate::input::{EventInputDecoded, InputFormat, parse_line};
13use crate::metrics::MetricsHook;
14use crate::tap::{TapPayload, TapRegistry, TapStage};
15
16pub type EventFilter = dyn Fn(&serde_json::Value) -> Vec<serde_json::Value>;
21
22pub struct LogProcessor {
30 engine: Arc<ArcSwap<Mutex<RuntimeEngine>>>,
31 metrics: Arc<dyn MetricsHook>,
32 field_observer: ArcSwap<Option<Arc<FieldObserver>>>,
37 schema_observer: ArcSwap<Option<Arc<SchemaObserver>>>,
42 event_tap: ArcSwap<Option<Arc<TapRegistry>>>,
48}
49
50impl LogProcessor {
51 pub fn new(engine: RuntimeEngine, metrics: Arc<dyn MetricsHook>) -> Self {
53 LogProcessor {
54 engine: Arc::new(ArcSwap::from_pointee(Mutex::new(engine))),
55 metrics,
56 field_observer: ArcSwap::new(Arc::new(None)),
57 schema_observer: ArcSwap::new(Arc::new(None)),
58 event_tap: ArcSwap::new(Arc::new(None)),
59 }
60 }
61
62 pub fn set_field_observer(&self, observer: Option<Arc<FieldObserver>>) {
70 self.field_observer.store(Arc::new(observer));
71 }
72
73 pub fn field_observer(&self) -> Option<Arc<FieldObserver>> {
75 self.field_observer.load_full().as_ref().clone()
76 }
77
78 pub fn set_schema_observer(&self, observer: Option<Arc<SchemaObserver>>) {
86 self.schema_observer.store(Arc::new(observer));
87 }
88
89 pub fn schema_observer(&self) -> Option<Arc<SchemaObserver>> {
91 self.schema_observer.load_full().as_ref().clone()
92 }
93
94 pub fn set_event_tap(&self, tap: Option<Arc<TapRegistry>>) {
103 self.event_tap.store(Arc::new(tap));
104 }
105
106 pub fn event_tap(&self) -> Option<Arc<TapRegistry>> {
108 self.event_tap.load_full().as_ref().clone()
109 }
110
111 pub fn swap_engine(&self, new_engine: RuntimeEngine) {
117 self.engine.store(Arc::new(Mutex::new(new_engine)));
118 }
119
120 pub fn engine_snapshot(&self) -> arc_swap::Guard<Arc<Mutex<RuntimeEngine>>> {
125 self.engine.load()
126 }
127
128 pub fn process_batch_lines(
138 &self,
139 batch: &[String],
140 event_filter: &EventFilter,
141 ) -> Vec<ProcessResult> {
142 let engine_guard = self.engine.load();
143 let mut engine = engine_guard.lock();
144
145 let mut parsed: Vec<(usize, Vec<serde_json::Value>)> = Vec::with_capacity(batch.len());
147 for (line_idx, line) in batch.iter().enumerate() {
148 match serde_json::from_str::<serde_json::Value>(line) {
149 Ok(value) => {
150 let payloads = event_filter(&value);
151 if !payloads.is_empty() {
152 parsed.push((line_idx, payloads));
153 }
154 }
155 Err(e) => {
156 self.metrics.on_parse_error();
157 tracing::debug!(error = %e, "Invalid JSON on input");
158 }
159 }
160 }
161
162 let mut flat: Vec<(usize, &serde_json::Value)> = Vec::new();
164 for (line_idx, payloads) in &parsed {
165 for payload in payloads {
166 flat.push((*line_idx, payload));
167 }
168 }
169
170 if flat.is_empty() {
171 return empty_results(batch.len());
172 }
173
174 let events: Vec<JsonEvent> = flat.iter().map(|(_, v)| JsonEvent::borrow(v)).collect();
176 let event_refs: Vec<&JsonEvent> = events.iter().collect();
177
178 let start = Instant::now();
179 let batch_results = engine.process_batch(&event_refs);
180 let elapsed = start.elapsed().as_secs_f64();
181 let per_event_latency = elapsed / event_refs.len() as f64;
182
183 let stats = engine.stats();
185 self.metrics
186 .set_correlation_state_entries(stats.state_entries as u64);
187
188 let mut line_results = empty_results(batch.len());
190
191 for ((line_idx, _), result) in flat.iter().zip(batch_results) {
192 self.metrics.on_events_processed(1);
193 self.metrics.observe_processing_latency(per_event_latency);
194 self.metrics
195 .on_detection_matches(result.detection_count() as u64);
196 self.metrics
197 .on_correlation_matches(result.correlation_count() as u64);
198
199 for r in &result {
200 let level_str = r.header.level.as_ref().map_or("unknown", |l| l.as_str());
201 let title = &r.header.rule_title;
202 match &r.body {
203 rsigma_eval::ResultBody::Detection(_) => {
204 self.metrics.on_detection_match_detail(title, level_str);
205 }
206 rsigma_eval::ResultBody::Correlation(body) => {
207 self.metrics.on_correlation_match_detail(
208 title,
209 level_str,
210 body.correlation_type.as_str(),
211 );
212 }
213 }
214 }
215
216 line_results[*line_idx].extend(result);
217 }
218
219 line_results
220 }
221
222 pub fn process_batch_with_format(
232 &self,
233 batch: &[String],
234 format: &InputFormat,
235 event_filter: Option<&EventFilter>,
236 ) -> Vec<ProcessResult> {
237 let engine_guard = self.engine.load();
238 let mut engine = engine_guard.lock();
239
240 let tap_guard = self.event_tap.load();
244 let tap_sessions = tap_guard
245 .as_ref()
246 .as_ref()
247 .map(|reg| reg.sessions_snapshot());
248 let tap_has_raw = tap_sessions
249 .as_ref()
250 .is_some_and(|s| s.iter().any(|x| x.stage == TapStage::Raw));
251 let tap_has_decoded = tap_sessions
252 .as_ref()
253 .is_some_and(|s| s.iter().any(|x| x.stage == TapStage::Decoded));
254
255 let mut decoded_events: Vec<(usize, EventInputDecoded)> = Vec::with_capacity(batch.len());
258
259 for (line_idx, line) in batch.iter().enumerate() {
260 if tap_has_raw
263 && !line.trim().is_empty()
264 && let Some(sessions) = tap_sessions.as_ref()
265 {
266 for s in sessions.iter().filter(|s| s.stage == TapStage::Raw) {
267 s.offer(TapPayload::Raw(line.clone()));
268 }
269 }
270
271 let Some(decoded) = parse_line(line, format) else {
272 if !line.trim().is_empty() {
273 self.metrics.on_parse_error();
274 tracing::debug!("Failed to parse input line");
275 }
276 continue;
277 };
278
279 if let Some(filter) = event_filter
282 && let EventInputDecoded::Json(ref json_event) = decoded
283 {
284 let json_value = json_event.to_json();
285 let payloads = filter(&json_value);
286 for payload in payloads {
287 decoded_events
288 .push((line_idx, EventInputDecoded::Json(JsonEvent::owned(payload))));
289 }
290 continue;
291 }
292
293 decoded_events.push((line_idx, decoded));
294 }
295
296 if decoded_events.is_empty() {
297 return empty_results(batch.len());
298 }
299
300 let observer_guard = self.field_observer.load();
307 if let Some(observer) = observer_guard.as_ref() {
308 for (_, decoded) in &decoded_events {
309 observer.observe(decoded);
310 }
311 }
312 drop(observer_guard);
313
314 let schema_guard = self.schema_observer.load();
317 if let Some(observer) = schema_guard.as_ref() {
318 for (_, decoded) in &decoded_events {
319 observer.observe(decoded);
320 }
321 }
322 drop(schema_guard);
323
324 if tap_has_decoded && let Some(sessions) = tap_sessions.as_ref() {
328 for (_, decoded) in &decoded_events {
329 let value = decoded.to_json();
330 for s in sessions.iter().filter(|s| s.stage == TapStage::Decoded) {
331 s.offer(TapPayload::Decoded(Box::new(value.clone())));
332 }
333 }
334 }
335
336 let event_refs: Vec<&EventInputDecoded> = decoded_events.iter().map(|(_, e)| e).collect();
338
339 let start = Instant::now();
340 let batch_results = engine.process_batch(&event_refs);
341 let elapsed = start.elapsed().as_secs_f64();
342 let per_event_latency = elapsed / event_refs.len() as f64;
343
344 let stats = engine.stats();
345 self.metrics
346 .set_correlation_state_entries(stats.state_entries as u64);
347
348 let mut line_results = empty_results(batch.len());
350
351 for ((line_idx, _), result) in decoded_events.iter().zip(batch_results) {
352 self.metrics.on_events_processed(1);
353 self.metrics.observe_processing_latency(per_event_latency);
354 self.metrics
355 .on_detection_matches(result.detection_count() as u64);
356 self.metrics
357 .on_correlation_matches(result.correlation_count() as u64);
358
359 for r in &result {
360 let level_str = r.header.level.as_ref().map_or("unknown", |l| l.as_str());
361 let title = &r.header.rule_title;
362 match &r.body {
363 rsigma_eval::ResultBody::Detection(_) => {
364 self.metrics.on_detection_match_detail(title, level_str);
365 }
366 rsigma_eval::ResultBody::Correlation(body) => {
367 self.metrics.on_correlation_match_detail(
368 title,
369 level_str,
370 body.correlation_type.as_str(),
371 );
372 }
373 }
374 }
375
376 line_results[*line_idx].extend(result);
377 }
378
379 line_results
380 }
381
382 pub fn reload_rules(&self) -> Result<crate::engine::EngineStats, String> {
392 let snapshot = self.engine.load();
399 let old = snapshot.lock();
400 let old_state = old.export_state();
401 let rules_path = old.rules_path().to_path_buf();
402 let pipelines = old.pipelines().to_vec();
403 let pipeline_paths = old.pipeline_paths().to_vec();
404 let corr_config = old.corr_config().clone();
405 let include_event = old.include_event();
406 let resolver = old.source_resolver().cloned();
407 let allow_remote_include = old.allow_remote_include();
408 let bloom_prefilter = old.bloom_prefilter();
409 let bloom_max_bytes = old.bloom_max_bytes();
410 let match_detail = old.match_detail();
411 let routing = old.routing();
412 let logsource_extractor = old.logsource_extractor();
413 #[cfg(feature = "daachorse-index")]
414 let cross_rule_ac = old.cross_rule_ac();
415 drop(old);
416 drop(snapshot);
417
418 let mut new_engine = RuntimeEngine::new(rules_path, pipelines, corr_config, include_event);
419 new_engine.set_pipeline_paths(pipeline_paths);
420 new_engine.set_allow_remote_include(allow_remote_include);
421 new_engine.set_match_detail(match_detail);
422 new_engine.set_bloom_prefilter(bloom_prefilter);
423 if let Some(budget) = bloom_max_bytes {
424 new_engine.set_bloom_max_bytes(budget);
425 }
426 #[cfg(feature = "daachorse-index")]
427 new_engine.set_cross_rule_ac(cross_rule_ac);
428 if let Some(resolver) = resolver {
429 new_engine.set_source_resolver(resolver);
430 }
431 new_engine.set_routing(routing);
434 new_engine.set_logsource_extractor(logsource_extractor);
436 let stats = new_engine.load_rules()?;
437
438 if let Some(state) = old_state
439 && !new_engine.import_state(&state)
440 {
441 tracing::warn!(
442 "Incompatible correlation snapshot version during reload, starting fresh"
443 );
444 }
445
446 self.swap_engine(new_engine);
447 Ok(stats)
448 }
449
450 pub fn rules_path(&self) -> std::path::PathBuf {
452 let snapshot = self.engine.load();
453 let engine = snapshot.lock();
454 engine.rules_path().to_path_buf()
455 }
456
457 pub fn metrics(&self) -> &dyn MetricsHook {
459 &*self.metrics
460 }
461
462 pub fn export_state(&self) -> Option<rsigma_eval::CorrelationSnapshot> {
464 let snapshot = self.engine.load();
465 let engine = snapshot.lock();
466 engine.export_state()
467 }
468
469 pub fn import_state(&self, snapshot: &rsigma_eval::CorrelationSnapshot) -> bool {
471 let guard = self.engine.load();
472 let mut engine = guard.lock();
473 engine.import_state(snapshot)
474 }
475
476 pub fn stats(&self) -> crate::engine::EngineStats {
478 let snapshot = self.engine.load();
479 let engine = snapshot.lock();
480 engine.stats()
481 }
482
483 pub fn introspect_correlations(
487 &self,
488 id: Option<&str>,
489 group: Option<&str>,
490 ) -> Option<rsigma_eval::CorrelationStateSnapshot> {
491 let snapshot = self.engine.load();
492 let engine = snapshot.lock();
493 engine.introspect_correlations(id, group)
494 }
495
496 pub fn logsource_pruned_total(&self) -> u64 {
498 let snapshot = self.engine.load();
499 let engine = snapshot.lock();
500 engine.logsource_pruned_total()
501 }
502
503 pub fn logsource_absent_total(&self) -> u64 {
505 let snapshot = self.engine.load();
506 let engine = snapshot.lock();
507 engine.logsource_absent_total()
508 }
509
510 pub fn rule_field_set(&self) -> Arc<RuleFieldSet> {
514 let snapshot = self.engine.load();
515 let engine = snapshot.lock();
516 engine.rule_field_set()
517 }
518}
519
520fn empty_results(count: usize) -> Vec<ProcessResult> {
522 (0..count).map(|_| ProcessResult::new()).collect()
523}
524
525#[cfg(test)]
526mod tests {
527 use super::*;
528 use crate::metrics::NoopMetrics;
529 use rsigma_eval::CorrelationConfig;
530
531 fn identity_filter(v: &serde_json::Value) -> Vec<serde_json::Value> {
532 vec![v.clone()]
533 }
534
535 fn make_processor(rules_yaml: &str) -> LogProcessor {
536 let dir = tempfile::tempdir().unwrap();
537 let rule_path = dir.path().join("test.yml");
538 std::fs::write(&rule_path, rules_yaml).unwrap();
539
540 let mut engine = RuntimeEngine::new(rule_path, vec![], CorrelationConfig::default(), false);
541 engine.load_rules().unwrap();
542 std::mem::forget(dir);
544 LogProcessor::new(engine, Arc::new(NoopMetrics))
545 }
546
547 #[test]
548 fn process_batch_lines_valid_json() {
549 let proc = make_processor(
550 r#"
551title: Test Rule
552status: test
553logsource:
554 category: test
555detection:
556 selection:
557 EventID: 1
558 condition: selection
559"#,
560 );
561
562 let batch = vec![
563 r#"{"EventID": 1}"#.to_string(),
564 r#"{"EventID": 2}"#.to_string(),
565 ];
566 let results = proc.process_batch_lines(&batch, &identity_filter);
567 assert_eq!(results.len(), 2);
568 assert!(results[0].detection_count() > 0, "EventID=1 should match");
569 assert!(
570 results[1].detection_count() == 0,
571 "EventID=2 should not match"
572 );
573 }
574
575 #[test]
576 fn process_batch_lines_invalid_json() {
577 let proc = make_processor(
578 r#"
579title: Test Rule
580status: test
581logsource:
582 category: test
583detection:
584 selection:
585 EventID: 1
586 condition: selection
587"#,
588 );
589
590 let batch = vec!["not json".to_string(), r#"{"EventID": 1}"#.to_string()];
591 let results = proc.process_batch_lines(&batch, &identity_filter);
592 assert_eq!(results.len(), 2);
593 assert!(
594 results[0].detection_count() == 0,
595 "invalid JSON produces empty result"
596 );
597 assert!(results[1].detection_count() > 0, "valid line still matches");
598 }
599
600 #[test]
601 fn swap_engine_replaces_rules() {
602 let dir = tempfile::tempdir().unwrap();
603 let rule_path = dir.path().join("test.yml");
604 std::fs::write(
605 &rule_path,
606 r#"
607title: Rule A
608status: test
609logsource:
610 category: test
611detection:
612 selection:
613 EventID: 1
614 condition: selection
615"#,
616 )
617 .unwrap();
618
619 let mut engine = RuntimeEngine::new(
620 rule_path.clone(),
621 vec![],
622 CorrelationConfig::default(),
623 false,
624 );
625 engine.load_rules().unwrap();
626 let proc = LogProcessor::new(engine, Arc::new(NoopMetrics));
627
628 let batch = vec![r#"{"EventID": 1}"#.to_string()];
629 assert!(proc.process_batch_lines(&batch, &identity_filter)[0].detection_count() > 0);
630
631 std::fs::write(
633 &rule_path,
634 r#"
635title: Rule B
636status: test
637logsource:
638 category: test
639detection:
640 selection:
641 EventID: 99
642 condition: selection
643"#,
644 )
645 .unwrap();
646
647 let mut new_engine =
648 RuntimeEngine::new(rule_path, vec![], CorrelationConfig::default(), false);
649 new_engine.load_rules().unwrap();
650 proc.swap_engine(new_engine);
651
652 assert!(proc.process_batch_lines(&batch, &identity_filter)[0].detection_count() == 0);
653
654 let batch2 = vec![r#"{"EventID": 99}"#.to_string()];
655 assert!(proc.process_batch_lines(&batch2, &identity_filter)[0].detection_count() > 0);
656
657 std::mem::forget(dir);
658 }
659
660 #[test]
661 fn reload_rules_preserves_bloom_tuning() {
662 let dir = tempfile::tempdir().unwrap();
669 let rule_path = dir.path().join("test.yml");
670 std::fs::write(
671 &rule_path,
672 r#"
673title: Rule A
674status: test
675logsource:
676 category: test
677detection:
678 selection:
679 EventID: 1
680 condition: selection
681"#,
682 )
683 .unwrap();
684
685 let mut engine = RuntimeEngine::new(
686 rule_path.clone(),
687 vec![],
688 CorrelationConfig::default(),
689 false,
690 );
691 engine.set_bloom_prefilter(true);
692 engine.set_bloom_max_bytes(2 * 1024 * 1024);
693 #[cfg(feature = "daachorse-index")]
694 engine.set_cross_rule_ac(true);
695 engine.load_rules().unwrap();
696
697 let proc = LogProcessor::new(engine, Arc::new(NoopMetrics));
698 proc.reload_rules().unwrap();
699
700 let snapshot = proc.engine_snapshot();
701 let reloaded = snapshot.lock();
702 assert!(
703 reloaded.bloom_prefilter(),
704 "bloom_prefilter must survive reload_rules"
705 );
706 assert_eq!(
707 reloaded.bloom_max_bytes(),
708 Some(2 * 1024 * 1024),
709 "bloom_max_bytes must survive reload_rules"
710 );
711 #[cfg(feature = "daachorse-index")]
712 assert!(
713 reloaded.cross_rule_ac(),
714 "cross_rule_ac must survive reload_rules"
715 );
716
717 std::mem::forget(dir);
718 }
719
720 #[test]
721 fn reload_rules_preserves_engine() {
722 let dir = tempfile::tempdir().unwrap();
723 let rule_path = dir.path().join("test.yml");
724 std::fs::write(
725 &rule_path,
726 r#"
727title: Rule A
728status: test
729logsource:
730 category: test
731detection:
732 selection:
733 EventID: 1
734 condition: selection
735"#,
736 )
737 .unwrap();
738
739 let mut engine = RuntimeEngine::new(
740 rule_path.clone(),
741 vec![],
742 CorrelationConfig::default(),
743 false,
744 );
745 engine.load_rules().unwrap();
746 let proc = LogProcessor::new(engine, Arc::new(NoopMetrics));
747
748 let batch = vec![r#"{"EventID": 1}"#.to_string()];
749 assert!(proc.process_batch_lines(&batch, &identity_filter)[0].detection_count() > 0);
750
751 std::fs::write(
753 &rule_path,
754 r#"
755title: Rule B
756status: test
757logsource:
758 category: test
759detection:
760 selection:
761 EventID: 42
762 condition: selection
763"#,
764 )
765 .unwrap();
766
767 let stats = proc.reload_rules().unwrap();
768 assert_eq!(stats.detection_rules, 1);
769
770 assert!(proc.process_batch_lines(&batch, &identity_filter)[0].detection_count() == 0);
772 let batch2 = vec![r#"{"EventID": 42}"#.to_string()];
774 assert!(proc.process_batch_lines(&batch2, &identity_filter)[0].detection_count() > 0);
775
776 std::mem::forget(dir);
777 }
778
779 #[test]
780 fn reload_re_reads_pipelines_from_disk() {
781 let dir = tempfile::tempdir().unwrap();
782
783 let rule_path = dir.path().join("test.yml");
786 std::fs::write(
787 &rule_path,
788 r#"
789title: Rule A
790status: test
791logsource:
792 category: test
793detection:
794 selection:
795 SourceIP: "10.0.0.1"
796 condition: selection
797"#,
798 )
799 .unwrap();
800
801 let pipeline_path = dir.path().join("pipeline.yml");
803 std::fs::write(
804 &pipeline_path,
805 r#"
806name: Initial Pipeline
807priority: 10
808transformations:
809 - id: rename_field
810 type: field_name_mapping
811 mapping:
812 SourceIP: src_ip
813"#,
814 )
815 .unwrap();
816
817 let pipelines = vec![rsigma_eval::parse_pipeline_file(&pipeline_path).unwrap()];
818 let mut engine = RuntimeEngine::new(
819 rule_path.clone(),
820 pipelines,
821 CorrelationConfig::default(),
822 false,
823 );
824 engine.set_pipeline_paths(vec![pipeline_path.clone()]);
825 engine.load_rules().unwrap();
826 let proc = LogProcessor::new(engine, Arc::new(NoopMetrics));
827
828 let batch = vec![r#"{"src_ip": "10.0.0.1"}"#.to_string()];
830 assert!(
831 proc.process_batch_lines(&batch, &identity_filter)[0].detection_count() > 0,
832 "src_ip should match because pipeline mapped SourceIP -> src_ip"
833 );
834
835 std::fs::write(
837 &pipeline_path,
838 r#"
839name: Updated Pipeline
840priority: 10
841transformations:
842 - id: rename_field
843 type: field_name_mapping
844 mapping:
845 SourceIP: source.ip
846"#,
847 )
848 .unwrap();
849
850 proc.reload_rules().unwrap();
851
852 assert!(
854 proc.process_batch_lines(&batch, &identity_filter)[0].detection_count() == 0,
855 "after pipeline reload, src_ip should no longer match"
856 );
857
858 let batch2 = vec![r#"{"source.ip": "10.0.0.1"}"#.to_string()];
860 assert!(
861 proc.process_batch_lines(&batch2, &identity_filter)[0].detection_count() > 0,
862 "after pipeline reload, source.ip should match"
863 );
864
865 std::mem::forget(dir);
866 }
867
868 #[test]
869 fn reload_with_broken_pipeline_keeps_old_engine() {
870 let dir = tempfile::tempdir().unwrap();
871 let rule_path = dir.path().join("test.yml");
872 std::fs::write(
873 &rule_path,
874 r#"
875title: Rule A
876status: test
877logsource:
878 category: test
879detection:
880 selection:
881 SourceIP: "10.0.0.1"
882 condition: selection
883"#,
884 )
885 .unwrap();
886
887 let pipeline_path = dir.path().join("pipeline.yml");
888 std::fs::write(
889 &pipeline_path,
890 r#"
891name: Working Pipeline
892priority: 10
893transformations:
894 - id: rename_field
895 type: field_name_mapping
896 mapping:
897 SourceIP: src_ip
898"#,
899 )
900 .unwrap();
901
902 let pipelines = vec![rsigma_eval::parse_pipeline_file(&pipeline_path).unwrap()];
903 let mut engine = RuntimeEngine::new(
904 rule_path.clone(),
905 pipelines,
906 CorrelationConfig::default(),
907 false,
908 );
909 engine.set_pipeline_paths(vec![pipeline_path.clone()]);
910 engine.load_rules().unwrap();
911 let proc = LogProcessor::new(engine, Arc::new(NoopMetrics));
912
913 let batch = vec![r#"{"src_ip": "10.0.0.1"}"#.to_string()];
915 assert!(proc.process_batch_lines(&batch, &identity_filter)[0].detection_count() > 0);
916
917 std::fs::write(&pipeline_path, "{{{{ invalid yaml !!!!").unwrap();
919
920 let result = proc.reload_rules();
922 assert!(result.is_err(), "reload with broken pipeline should fail");
923
924 assert!(
926 proc.process_batch_lines(&batch, &identity_filter)[0].detection_count() > 0,
927 "old engine should still work after failed reload"
928 );
929
930 std::mem::forget(dir);
931 }
932
933 #[test]
934 fn custom_event_filter() {
935 let proc = make_processor(
936 r#"
937title: Test Rule
938status: test
939logsource:
940 category: test
941detection:
942 selection:
943 EventID: 1
944 condition: selection
945"#,
946 );
947
948 let filter = |v: &serde_json::Value| -> Vec<serde_json::Value> {
950 if let Some(records) = v.get("records").and_then(|r| r.as_array()) {
951 records.clone()
952 } else {
953 vec![v.clone()]
954 }
955 };
956
957 let batch = vec![r#"{"records": [{"EventID": 1}, {"EventID": 2}]}"#.to_string()];
958 let results = proc.process_batch_lines(&batch, &filter);
959 assert_eq!(results.len(), 1);
960 assert_eq!(
961 results[0].detection_count(),
962 1,
963 "only EventID=1 from records array should match"
964 );
965 }
966
967 #[test]
968 fn empty_batch_returns_empty() {
969 let proc = make_processor(
970 r#"
971title: Test Rule
972status: test
973logsource:
974 category: test
975detection:
976 selection:
977 EventID: 1
978 condition: selection
979"#,
980 );
981
982 let batch: Vec<String> = vec![];
983 let results = proc.process_batch_lines(&batch, &identity_filter);
984 assert!(results.is_empty());
985 }
986
987 #[test]
989 fn metrics_hook_invocations() {
990 use std::sync::atomic::{AtomicU64, Ordering};
991
992 struct CountingMetrics {
993 parse_errors: AtomicU64,
994 events_processed: AtomicU64,
995 detection_matches: AtomicU64,
996 }
997
998 impl MetricsHook for CountingMetrics {
999 fn on_parse_error(&self) {
1000 self.parse_errors.fetch_add(1, Ordering::Relaxed);
1001 }
1002 fn on_events_processed(&self, count: u64) {
1003 self.events_processed.fetch_add(count, Ordering::Relaxed);
1004 }
1005 fn on_detection_matches(&self, count: u64) {
1006 self.detection_matches.fetch_add(count, Ordering::Relaxed);
1007 }
1008 fn on_correlation_matches(&self, _: u64) {}
1009 fn observe_processing_latency(&self, _: f64) {}
1010 fn on_input_queue_depth_change(&self, _: i64) {}
1011 fn on_back_pressure(&self) {}
1012 fn observe_batch_size(&self, _: u64) {}
1013 fn on_output_queue_depth_change(&self, _: i64) {}
1014 fn observe_pipeline_latency(&self, _: f64) {}
1015 fn set_correlation_state_entries(&self, _: u64) {}
1016 }
1017
1018 let dir = tempfile::tempdir().unwrap();
1019 let rule_path = dir.path().join("test.yml");
1020 std::fs::write(
1021 &rule_path,
1022 r#"
1023title: Test Rule
1024status: test
1025logsource:
1026 category: test
1027detection:
1028 selection:
1029 EventID: 1
1030 condition: selection
1031"#,
1032 )
1033 .unwrap();
1034
1035 let mut engine = RuntimeEngine::new(rule_path, vec![], CorrelationConfig::default(), false);
1036 engine.load_rules().unwrap();
1037
1038 let metrics = Arc::new(CountingMetrics {
1039 parse_errors: AtomicU64::new(0),
1040 events_processed: AtomicU64::new(0),
1041 detection_matches: AtomicU64::new(0),
1042 });
1043 let proc = LogProcessor::new(engine, metrics.clone());
1044
1045 let batch = vec![
1046 "not json".to_string(),
1047 r#"{"EventID": 1}"#.to_string(),
1048 r#"{"EventID": 2}"#.to_string(),
1049 ];
1050 proc.process_batch_lines(&batch, &identity_filter);
1051
1052 assert_eq!(metrics.parse_errors.load(Ordering::Relaxed), 1);
1053 assert_eq!(metrics.events_processed.load(Ordering::Relaxed), 2);
1054 assert_eq!(metrics.detection_matches.load(Ordering::Relaxed), 1);
1055
1056 std::mem::forget(dir);
1057 }
1058
1059 #[test]
1061 fn concurrent_swap_and_process() {
1062 let dir = tempfile::tempdir().unwrap();
1063 let rule_path = dir.path().join("test.yml");
1064 std::fs::write(
1065 &rule_path,
1066 r#"
1067title: Rule A
1068status: test
1069logsource:
1070 category: test
1071detection:
1072 selection:
1073 EventID: 1
1074 condition: selection
1075"#,
1076 )
1077 .unwrap();
1078
1079 let mut engine = RuntimeEngine::new(
1080 rule_path.clone(),
1081 vec![],
1082 CorrelationConfig::default(),
1083 false,
1084 );
1085 engine.load_rules().unwrap();
1086 let proc = Arc::new(LogProcessor::new(engine, Arc::new(NoopMetrics)));
1087
1088 let handles: Vec<_> = (0..4)
1089 .map(|i| {
1090 let proc = proc.clone();
1091 let rule_path = rule_path.clone();
1092 std::thread::spawn(move || {
1093 let batch = vec![r#"{"EventID": 1}"#.to_string()];
1094 for _ in 0..100 {
1095 let _ = proc.process_batch_lines(&batch, &identity_filter);
1096 }
1097 if i == 0 {
1099 let mut new_engine = RuntimeEngine::new(
1100 rule_path,
1101 vec![],
1102 CorrelationConfig::default(),
1103 false,
1104 );
1105 new_engine.load_rules().unwrap();
1106 proc.swap_engine(new_engine);
1107 }
1108 })
1109 })
1110 .collect();
1111
1112 for h in handles {
1113 h.join().unwrap();
1114 }
1115
1116 std::mem::forget(dir);
1117 }
1118
1119 #[test]
1122 fn format_json_matches() {
1123 let proc = make_processor(
1124 r#"
1125title: Test Rule
1126status: test
1127logsource:
1128 category: test
1129detection:
1130 selection:
1131 EventID: 1
1132 condition: selection
1133"#,
1134 );
1135
1136 let batch = vec![r#"{"EventID": 1}"#.to_string()];
1137 let results = proc.process_batch_with_format(&batch, &InputFormat::Json, None);
1138 assert_eq!(results.len(), 1);
1139 assert!(
1140 results[0].detection_count() > 0,
1141 "JSON EventID=1 should match"
1142 );
1143 }
1144
1145 #[test]
1146 fn format_syslog_extracts_fields() {
1147 let proc = make_processor(
1148 r#"
1149title: Syslog Test
1150status: test
1151logsource:
1152 category: test
1153detection:
1154 selection:
1155 hostname: mymachine
1156 condition: selection
1157"#,
1158 );
1159
1160 let batch = vec!["<34>Oct 11 22:14:15 mymachine su: test message".to_string()];
1161 let results = proc.process_batch_with_format(
1162 &batch,
1163 &InputFormat::Syslog(crate::input::SyslogConfig::default()),
1164 None,
1165 );
1166 assert_eq!(results.len(), 1);
1167 assert!(
1168 results[0].detection_count() > 0,
1169 "syslog hostname=mymachine should match"
1170 );
1171 }
1172
1173 #[test]
1174 fn format_plain_keyword_match() {
1175 let proc = make_processor(
1176 r#"
1177title: Keyword Test
1178status: test
1179logsource:
1180 category: test
1181detection:
1182 keywords:
1183 - "disk full"
1184 condition: keywords
1185"#,
1186 );
1187
1188 let batch = vec!["ERROR: disk full on /dev/sda1".to_string()];
1189 let results = proc.process_batch_with_format(&batch, &InputFormat::Plain, None);
1190 assert_eq!(results.len(), 1);
1191 assert!(
1192 results[0].detection_count() > 0,
1193 "plain keyword 'disk full' should match"
1194 );
1195 }
1196
1197 #[test]
1198 fn format_auto_detects_json() {
1199 let proc = make_processor(
1200 r#"
1201title: Test Rule
1202status: test
1203logsource:
1204 category: test
1205detection:
1206 selection:
1207 EventID: 1
1208 condition: selection
1209"#,
1210 );
1211
1212 let batch = vec![r#"{"EventID": 1}"#.to_string()];
1213 let results = proc.process_batch_with_format(&batch, &InputFormat::default(), None);
1214 assert_eq!(results.len(), 1);
1215 assert!(results[0].detection_count() > 0);
1216 }
1217
1218 #[test]
1219 fn format_json_with_event_filter() {
1220 let proc = make_processor(
1221 r#"
1222title: Test Rule
1223status: test
1224logsource:
1225 category: test
1226detection:
1227 selection:
1228 EventID: 1
1229 condition: selection
1230"#,
1231 );
1232
1233 let filter = |v: &serde_json::Value| -> Vec<serde_json::Value> {
1234 if let Some(records) = v.get("records").and_then(|r| r.as_array()) {
1235 records.clone()
1236 } else {
1237 vec![v.clone()]
1238 }
1239 };
1240
1241 let batch = vec![r#"{"records": [{"EventID": 1}, {"EventID": 2}]}"#.to_string()];
1242 let results = proc.process_batch_with_format(&batch, &InputFormat::Json, Some(&filter));
1243 assert_eq!(results.len(), 1);
1244 assert_eq!(
1245 results[0].detection_count(),
1246 1,
1247 "only EventID=1 from records array should match"
1248 );
1249 }
1250
1251 #[test]
1252 fn format_empty_lines_skipped() {
1253 let proc = make_processor(
1254 r#"
1255title: Test Rule
1256status: test
1257logsource:
1258 category: test
1259detection:
1260 selection:
1261 EventID: 1
1262 condition: selection
1263"#,
1264 );
1265
1266 let batch = vec![
1267 "".to_string(),
1268 " ".to_string(),
1269 r#"{"EventID": 1}"#.to_string(),
1270 ];
1271 let results = proc.process_batch_with_format(&batch, &InputFormat::Json, None);
1272 assert_eq!(results.len(), 3);
1273 assert!(results[0].detection_count() == 0);
1274 assert!(results[1].detection_count() == 0);
1275 assert!(results[2].detection_count() > 0);
1276 }
1277
1278 #[cfg(feature = "logfmt")]
1279 #[test]
1280 fn format_logfmt_matches() {
1281 let proc = make_processor(
1282 r#"
1283title: Logfmt Test
1284status: test
1285logsource:
1286 category: test
1287detection:
1288 selection:
1289 level: error
1290 condition: selection
1291"#,
1292 );
1293
1294 let batch = vec!["level=error msg=something host=web01".to_string()];
1295 let results = proc.process_batch_with_format(&batch, &InputFormat::Logfmt, None);
1296 assert_eq!(results.len(), 1);
1297 assert!(
1298 results[0].detection_count() > 0,
1299 "logfmt level=error should match"
1300 );
1301 }
1302
1303 #[cfg(feature = "cef")]
1304 #[test]
1305 fn format_cef_matches() {
1306 let proc = make_processor(
1307 r#"
1308title: CEF Test
1309status: test
1310logsource:
1311 category: test
1312detection:
1313 selection:
1314 deviceVendor: Security
1315 condition: selection
1316"#,
1317 );
1318
1319 let batch = vec!["CEF:0|Security|IDS|1.0|100|Attack|9|src=10.0.0.1".to_string()];
1320 let results = proc.process_batch_with_format(&batch, &InputFormat::Cef, None);
1321 assert_eq!(results.len(), 1);
1322 assert!(
1323 results[0].detection_count() > 0,
1324 "CEF deviceVendor=Security should match"
1325 );
1326 }
1327}