1use std::sync::Arc;
2
3use parking_lot::Mutex;
4use std::time::Instant;
5
6use arc_swap::ArcSwap;
7use rsigma_eval::{Event, FieldObserver, JsonEvent, ProcessResult, ProcessResultExt, RuleFieldSet};
8
9use crate::engine::RuntimeEngine;
10use crate::input::{EventInputDecoded, InputFormat, parse_line};
11use crate::metrics::MetricsHook;
12
13pub type EventFilter = dyn Fn(&serde_json::Value) -> Vec<serde_json::Value>;
18
19pub struct LogProcessor {
27 engine: Arc<ArcSwap<Mutex<RuntimeEngine>>>,
28 metrics: Arc<dyn MetricsHook>,
29 field_observer: ArcSwap<Option<Arc<FieldObserver>>>,
34}
35
36impl LogProcessor {
37 pub fn new(engine: RuntimeEngine, metrics: Arc<dyn MetricsHook>) -> Self {
39 LogProcessor {
40 engine: Arc::new(ArcSwap::from_pointee(Mutex::new(engine))),
41 metrics,
42 field_observer: ArcSwap::new(Arc::new(None)),
43 }
44 }
45
46 pub fn set_field_observer(&self, observer: Option<Arc<FieldObserver>>) {
54 self.field_observer.store(Arc::new(observer));
55 }
56
57 pub fn field_observer(&self) -> Option<Arc<FieldObserver>> {
59 self.field_observer.load_full().as_ref().clone()
60 }
61
62 pub fn swap_engine(&self, new_engine: RuntimeEngine) {
68 self.engine.store(Arc::new(Mutex::new(new_engine)));
69 }
70
71 pub fn engine_snapshot(&self) -> arc_swap::Guard<Arc<Mutex<RuntimeEngine>>> {
76 self.engine.load()
77 }
78
79 pub fn process_batch_lines(
89 &self,
90 batch: &[String],
91 event_filter: &EventFilter,
92 ) -> Vec<ProcessResult> {
93 let engine_guard = self.engine.load();
94 let mut engine = engine_guard.lock();
95
96 let mut parsed: Vec<(usize, Vec<serde_json::Value>)> = Vec::with_capacity(batch.len());
98 for (line_idx, line) in batch.iter().enumerate() {
99 match serde_json::from_str::<serde_json::Value>(line) {
100 Ok(value) => {
101 let payloads = event_filter(&value);
102 if !payloads.is_empty() {
103 parsed.push((line_idx, payloads));
104 }
105 }
106 Err(e) => {
107 self.metrics.on_parse_error();
108 tracing::debug!(error = %e, "Invalid JSON on input");
109 }
110 }
111 }
112
113 let mut flat: Vec<(usize, &serde_json::Value)> = Vec::new();
115 for (line_idx, payloads) in &parsed {
116 for payload in payloads {
117 flat.push((*line_idx, payload));
118 }
119 }
120
121 if flat.is_empty() {
122 return empty_results(batch.len());
123 }
124
125 let events: Vec<JsonEvent> = flat.iter().map(|(_, v)| JsonEvent::borrow(v)).collect();
127 let event_refs: Vec<&JsonEvent> = events.iter().collect();
128
129 let start = Instant::now();
130 let batch_results = engine.process_batch(&event_refs);
131 let elapsed = start.elapsed().as_secs_f64();
132 let per_event_latency = elapsed / event_refs.len() as f64;
133
134 let stats = engine.stats();
136 self.metrics
137 .set_correlation_state_entries(stats.state_entries as u64);
138
139 let mut line_results = empty_results(batch.len());
141
142 for ((line_idx, _), result) in flat.iter().zip(batch_results) {
143 self.metrics.on_events_processed(1);
144 self.metrics.observe_processing_latency(per_event_latency);
145 self.metrics
146 .on_detection_matches(result.detection_count() as u64);
147 self.metrics
148 .on_correlation_matches(result.correlation_count() as u64);
149
150 for r in &result {
151 let level_str = r.header.level.as_ref().map_or("unknown", |l| l.as_str());
152 let title = &r.header.rule_title;
153 match &r.body {
154 rsigma_eval::ResultBody::Detection(_) => {
155 self.metrics.on_detection_match_detail(title, level_str);
156 }
157 rsigma_eval::ResultBody::Correlation(body) => {
158 self.metrics.on_correlation_match_detail(
159 title,
160 level_str,
161 body.correlation_type.as_str(),
162 );
163 }
164 }
165 }
166
167 line_results[*line_idx].extend(result);
168 }
169
170 line_results
171 }
172
173 pub fn process_batch_with_format(
183 &self,
184 batch: &[String],
185 format: &InputFormat,
186 event_filter: Option<&EventFilter>,
187 ) -> Vec<ProcessResult> {
188 let engine_guard = self.engine.load();
189 let mut engine = engine_guard.lock();
190
191 let mut decoded_events: Vec<(usize, EventInputDecoded)> = Vec::with_capacity(batch.len());
194
195 for (line_idx, line) in batch.iter().enumerate() {
196 let Some(decoded) = parse_line(line, format) else {
197 if !line.trim().is_empty() {
198 self.metrics.on_parse_error();
199 tracing::debug!("Failed to parse input line");
200 }
201 continue;
202 };
203
204 if let Some(filter) = event_filter
207 && let EventInputDecoded::Json(ref json_event) = decoded
208 {
209 let json_value = json_event.to_json();
210 let payloads = filter(&json_value);
211 for payload in payloads {
212 decoded_events
213 .push((line_idx, EventInputDecoded::Json(JsonEvent::owned(payload))));
214 }
215 continue;
216 }
217
218 decoded_events.push((line_idx, decoded));
219 }
220
221 if decoded_events.is_empty() {
222 return empty_results(batch.len());
223 }
224
225 let observer_guard = self.field_observer.load();
232 if let Some(observer) = observer_guard.as_ref() {
233 for (_, decoded) in &decoded_events {
234 observer.observe(decoded);
235 }
236 }
237 drop(observer_guard);
238
239 let event_refs: Vec<&EventInputDecoded> = decoded_events.iter().map(|(_, e)| e).collect();
241
242 let start = Instant::now();
243 let batch_results = engine.process_batch(&event_refs);
244 let elapsed = start.elapsed().as_secs_f64();
245 let per_event_latency = elapsed / event_refs.len() as f64;
246
247 let stats = engine.stats();
248 self.metrics
249 .set_correlation_state_entries(stats.state_entries as u64);
250
251 let mut line_results = empty_results(batch.len());
253
254 for ((line_idx, _), result) in decoded_events.iter().zip(batch_results) {
255 self.metrics.on_events_processed(1);
256 self.metrics.observe_processing_latency(per_event_latency);
257 self.metrics
258 .on_detection_matches(result.detection_count() as u64);
259 self.metrics
260 .on_correlation_matches(result.correlation_count() as u64);
261
262 for r in &result {
263 let level_str = r.header.level.as_ref().map_or("unknown", |l| l.as_str());
264 let title = &r.header.rule_title;
265 match &r.body {
266 rsigma_eval::ResultBody::Detection(_) => {
267 self.metrics.on_detection_match_detail(title, level_str);
268 }
269 rsigma_eval::ResultBody::Correlation(body) => {
270 self.metrics.on_correlation_match_detail(
271 title,
272 level_str,
273 body.correlation_type.as_str(),
274 );
275 }
276 }
277 }
278
279 line_results[*line_idx].extend(result);
280 }
281
282 line_results
283 }
284
285 pub fn reload_rules(&self) -> Result<crate::engine::EngineStats, String> {
295 let snapshot = self.engine.load();
302 let old = snapshot.lock();
303 let old_state = old.export_state();
304 let rules_path = old.rules_path().to_path_buf();
305 let pipelines = old.pipelines().to_vec();
306 let pipeline_paths = old.pipeline_paths().to_vec();
307 let corr_config = old.corr_config().clone();
308 let include_event = old.include_event();
309 let resolver = old.source_resolver().cloned();
310 let allow_remote_include = old.allow_remote_include();
311 let bloom_prefilter = old.bloom_prefilter();
312 let bloom_max_bytes = old.bloom_max_bytes();
313 let match_detail = old.match_detail();
314 #[cfg(feature = "daachorse-index")]
315 let cross_rule_ac = old.cross_rule_ac();
316 drop(old);
317 drop(snapshot);
318
319 let mut new_engine = RuntimeEngine::new(rules_path, pipelines, corr_config, include_event);
320 new_engine.set_pipeline_paths(pipeline_paths);
321 new_engine.set_allow_remote_include(allow_remote_include);
322 new_engine.set_match_detail(match_detail);
323 new_engine.set_bloom_prefilter(bloom_prefilter);
324 if let Some(budget) = bloom_max_bytes {
325 new_engine.set_bloom_max_bytes(budget);
326 }
327 #[cfg(feature = "daachorse-index")]
328 new_engine.set_cross_rule_ac(cross_rule_ac);
329 if let Some(resolver) = resolver {
330 new_engine.set_source_resolver(resolver);
331 }
332 let stats = new_engine.load_rules()?;
333
334 if let Some(state) = old_state
335 && !new_engine.import_state(&state)
336 {
337 tracing::warn!(
338 "Incompatible correlation snapshot version during reload, starting fresh"
339 );
340 }
341
342 self.swap_engine(new_engine);
343 Ok(stats)
344 }
345
346 pub fn rules_path(&self) -> std::path::PathBuf {
348 let snapshot = self.engine.load();
349 let engine = snapshot.lock();
350 engine.rules_path().to_path_buf()
351 }
352
353 pub fn metrics(&self) -> &dyn MetricsHook {
355 &*self.metrics
356 }
357
358 pub fn export_state(&self) -> Option<rsigma_eval::CorrelationSnapshot> {
360 let snapshot = self.engine.load();
361 let engine = snapshot.lock();
362 engine.export_state()
363 }
364
365 pub fn import_state(&self, snapshot: &rsigma_eval::CorrelationSnapshot) -> bool {
367 let guard = self.engine.load();
368 let mut engine = guard.lock();
369 engine.import_state(snapshot)
370 }
371
372 pub fn stats(&self) -> crate::engine::EngineStats {
374 let snapshot = self.engine.load();
375 let engine = snapshot.lock();
376 engine.stats()
377 }
378
379 pub fn rule_field_set(&self) -> Arc<RuleFieldSet> {
383 let snapshot = self.engine.load();
384 let engine = snapshot.lock();
385 engine.rule_field_set()
386 }
387}
388
389fn empty_results(count: usize) -> Vec<ProcessResult> {
391 (0..count).map(|_| ProcessResult::new()).collect()
392}
393
394#[cfg(test)]
395mod tests {
396 use super::*;
397 use crate::metrics::NoopMetrics;
398 use rsigma_eval::CorrelationConfig;
399
400 fn identity_filter(v: &serde_json::Value) -> Vec<serde_json::Value> {
401 vec![v.clone()]
402 }
403
404 fn make_processor(rules_yaml: &str) -> LogProcessor {
405 let dir = tempfile::tempdir().unwrap();
406 let rule_path = dir.path().join("test.yml");
407 std::fs::write(&rule_path, rules_yaml).unwrap();
408
409 let mut engine = RuntimeEngine::new(rule_path, vec![], CorrelationConfig::default(), false);
410 engine.load_rules().unwrap();
411 std::mem::forget(dir);
413 LogProcessor::new(engine, Arc::new(NoopMetrics))
414 }
415
416 #[test]
417 fn process_batch_lines_valid_json() {
418 let proc = make_processor(
419 r#"
420title: Test Rule
421status: test
422logsource:
423 category: test
424detection:
425 selection:
426 EventID: 1
427 condition: selection
428"#,
429 );
430
431 let batch = vec![
432 r#"{"EventID": 1}"#.to_string(),
433 r#"{"EventID": 2}"#.to_string(),
434 ];
435 let results = proc.process_batch_lines(&batch, &identity_filter);
436 assert_eq!(results.len(), 2);
437 assert!(results[0].detection_count() > 0, "EventID=1 should match");
438 assert!(
439 results[1].detection_count() == 0,
440 "EventID=2 should not match"
441 );
442 }
443
444 #[test]
445 fn process_batch_lines_invalid_json() {
446 let proc = make_processor(
447 r#"
448title: Test Rule
449status: test
450logsource:
451 category: test
452detection:
453 selection:
454 EventID: 1
455 condition: selection
456"#,
457 );
458
459 let batch = vec!["not json".to_string(), r#"{"EventID": 1}"#.to_string()];
460 let results = proc.process_batch_lines(&batch, &identity_filter);
461 assert_eq!(results.len(), 2);
462 assert!(
463 results[0].detection_count() == 0,
464 "invalid JSON produces empty result"
465 );
466 assert!(results[1].detection_count() > 0, "valid line still matches");
467 }
468
469 #[test]
470 fn swap_engine_replaces_rules() {
471 let dir = tempfile::tempdir().unwrap();
472 let rule_path = dir.path().join("test.yml");
473 std::fs::write(
474 &rule_path,
475 r#"
476title: Rule A
477status: test
478logsource:
479 category: test
480detection:
481 selection:
482 EventID: 1
483 condition: selection
484"#,
485 )
486 .unwrap();
487
488 let mut engine = RuntimeEngine::new(
489 rule_path.clone(),
490 vec![],
491 CorrelationConfig::default(),
492 false,
493 );
494 engine.load_rules().unwrap();
495 let proc = LogProcessor::new(engine, Arc::new(NoopMetrics));
496
497 let batch = vec![r#"{"EventID": 1}"#.to_string()];
498 assert!(proc.process_batch_lines(&batch, &identity_filter)[0].detection_count() > 0);
499
500 std::fs::write(
502 &rule_path,
503 r#"
504title: Rule B
505status: test
506logsource:
507 category: test
508detection:
509 selection:
510 EventID: 99
511 condition: selection
512"#,
513 )
514 .unwrap();
515
516 let mut new_engine =
517 RuntimeEngine::new(rule_path, vec![], CorrelationConfig::default(), false);
518 new_engine.load_rules().unwrap();
519 proc.swap_engine(new_engine);
520
521 assert!(proc.process_batch_lines(&batch, &identity_filter)[0].detection_count() == 0);
522
523 let batch2 = vec![r#"{"EventID": 99}"#.to_string()];
524 assert!(proc.process_batch_lines(&batch2, &identity_filter)[0].detection_count() > 0);
525
526 std::mem::forget(dir);
527 }
528
529 #[test]
530 fn reload_rules_preserves_bloom_tuning() {
531 let dir = tempfile::tempdir().unwrap();
538 let rule_path = dir.path().join("test.yml");
539 std::fs::write(
540 &rule_path,
541 r#"
542title: Rule A
543status: test
544logsource:
545 category: test
546detection:
547 selection:
548 EventID: 1
549 condition: selection
550"#,
551 )
552 .unwrap();
553
554 let mut engine = RuntimeEngine::new(
555 rule_path.clone(),
556 vec![],
557 CorrelationConfig::default(),
558 false,
559 );
560 engine.set_bloom_prefilter(true);
561 engine.set_bloom_max_bytes(2 * 1024 * 1024);
562 #[cfg(feature = "daachorse-index")]
563 engine.set_cross_rule_ac(true);
564 engine.load_rules().unwrap();
565
566 let proc = LogProcessor::new(engine, Arc::new(NoopMetrics));
567 proc.reload_rules().unwrap();
568
569 let snapshot = proc.engine_snapshot();
570 let reloaded = snapshot.lock();
571 assert!(
572 reloaded.bloom_prefilter(),
573 "bloom_prefilter must survive reload_rules"
574 );
575 assert_eq!(
576 reloaded.bloom_max_bytes(),
577 Some(2 * 1024 * 1024),
578 "bloom_max_bytes must survive reload_rules"
579 );
580 #[cfg(feature = "daachorse-index")]
581 assert!(
582 reloaded.cross_rule_ac(),
583 "cross_rule_ac must survive reload_rules"
584 );
585
586 std::mem::forget(dir);
587 }
588
589 #[test]
590 fn reload_rules_preserves_engine() {
591 let dir = tempfile::tempdir().unwrap();
592 let rule_path = dir.path().join("test.yml");
593 std::fs::write(
594 &rule_path,
595 r#"
596title: Rule A
597status: test
598logsource:
599 category: test
600detection:
601 selection:
602 EventID: 1
603 condition: selection
604"#,
605 )
606 .unwrap();
607
608 let mut engine = RuntimeEngine::new(
609 rule_path.clone(),
610 vec![],
611 CorrelationConfig::default(),
612 false,
613 );
614 engine.load_rules().unwrap();
615 let proc = LogProcessor::new(engine, Arc::new(NoopMetrics));
616
617 let batch = vec![r#"{"EventID": 1}"#.to_string()];
618 assert!(proc.process_batch_lines(&batch, &identity_filter)[0].detection_count() > 0);
619
620 std::fs::write(
622 &rule_path,
623 r#"
624title: Rule B
625status: test
626logsource:
627 category: test
628detection:
629 selection:
630 EventID: 42
631 condition: selection
632"#,
633 )
634 .unwrap();
635
636 let stats = proc.reload_rules().unwrap();
637 assert_eq!(stats.detection_rules, 1);
638
639 assert!(proc.process_batch_lines(&batch, &identity_filter)[0].detection_count() == 0);
641 let batch2 = vec![r#"{"EventID": 42}"#.to_string()];
643 assert!(proc.process_batch_lines(&batch2, &identity_filter)[0].detection_count() > 0);
644
645 std::mem::forget(dir);
646 }
647
648 #[test]
649 fn reload_re_reads_pipelines_from_disk() {
650 let dir = tempfile::tempdir().unwrap();
651
652 let rule_path = dir.path().join("test.yml");
655 std::fs::write(
656 &rule_path,
657 r#"
658title: Rule A
659status: test
660logsource:
661 category: test
662detection:
663 selection:
664 SourceIP: "10.0.0.1"
665 condition: selection
666"#,
667 )
668 .unwrap();
669
670 let pipeline_path = dir.path().join("pipeline.yml");
672 std::fs::write(
673 &pipeline_path,
674 r#"
675name: Initial Pipeline
676priority: 10
677transformations:
678 - id: rename_field
679 type: field_name_mapping
680 mapping:
681 SourceIP: src_ip
682"#,
683 )
684 .unwrap();
685
686 let pipelines = vec![rsigma_eval::parse_pipeline_file(&pipeline_path).unwrap()];
687 let mut engine = RuntimeEngine::new(
688 rule_path.clone(),
689 pipelines,
690 CorrelationConfig::default(),
691 false,
692 );
693 engine.set_pipeline_paths(vec![pipeline_path.clone()]);
694 engine.load_rules().unwrap();
695 let proc = LogProcessor::new(engine, Arc::new(NoopMetrics));
696
697 let batch = vec![r#"{"src_ip": "10.0.0.1"}"#.to_string()];
699 assert!(
700 proc.process_batch_lines(&batch, &identity_filter)[0].detection_count() > 0,
701 "src_ip should match because pipeline mapped SourceIP -> src_ip"
702 );
703
704 std::fs::write(
706 &pipeline_path,
707 r#"
708name: Updated Pipeline
709priority: 10
710transformations:
711 - id: rename_field
712 type: field_name_mapping
713 mapping:
714 SourceIP: source.ip
715"#,
716 )
717 .unwrap();
718
719 proc.reload_rules().unwrap();
720
721 assert!(
723 proc.process_batch_lines(&batch, &identity_filter)[0].detection_count() == 0,
724 "after pipeline reload, src_ip should no longer match"
725 );
726
727 let batch2 = vec![r#"{"source.ip": "10.0.0.1"}"#.to_string()];
729 assert!(
730 proc.process_batch_lines(&batch2, &identity_filter)[0].detection_count() > 0,
731 "after pipeline reload, source.ip should match"
732 );
733
734 std::mem::forget(dir);
735 }
736
737 #[test]
738 fn reload_with_broken_pipeline_keeps_old_engine() {
739 let dir = tempfile::tempdir().unwrap();
740 let rule_path = dir.path().join("test.yml");
741 std::fs::write(
742 &rule_path,
743 r#"
744title: Rule A
745status: test
746logsource:
747 category: test
748detection:
749 selection:
750 SourceIP: "10.0.0.1"
751 condition: selection
752"#,
753 )
754 .unwrap();
755
756 let pipeline_path = dir.path().join("pipeline.yml");
757 std::fs::write(
758 &pipeline_path,
759 r#"
760name: Working Pipeline
761priority: 10
762transformations:
763 - id: rename_field
764 type: field_name_mapping
765 mapping:
766 SourceIP: src_ip
767"#,
768 )
769 .unwrap();
770
771 let pipelines = vec![rsigma_eval::parse_pipeline_file(&pipeline_path).unwrap()];
772 let mut engine = RuntimeEngine::new(
773 rule_path.clone(),
774 pipelines,
775 CorrelationConfig::default(),
776 false,
777 );
778 engine.set_pipeline_paths(vec![pipeline_path.clone()]);
779 engine.load_rules().unwrap();
780 let proc = LogProcessor::new(engine, Arc::new(NoopMetrics));
781
782 let batch = vec![r#"{"src_ip": "10.0.0.1"}"#.to_string()];
784 assert!(proc.process_batch_lines(&batch, &identity_filter)[0].detection_count() > 0);
785
786 std::fs::write(&pipeline_path, "{{{{ invalid yaml !!!!").unwrap();
788
789 let result = proc.reload_rules();
791 assert!(result.is_err(), "reload with broken pipeline should fail");
792
793 assert!(
795 proc.process_batch_lines(&batch, &identity_filter)[0].detection_count() > 0,
796 "old engine should still work after failed reload"
797 );
798
799 std::mem::forget(dir);
800 }
801
802 #[test]
803 fn custom_event_filter() {
804 let proc = make_processor(
805 r#"
806title: Test Rule
807status: test
808logsource:
809 category: test
810detection:
811 selection:
812 EventID: 1
813 condition: selection
814"#,
815 );
816
817 let filter = |v: &serde_json::Value| -> Vec<serde_json::Value> {
819 if let Some(records) = v.get("records").and_then(|r| r.as_array()) {
820 records.clone()
821 } else {
822 vec![v.clone()]
823 }
824 };
825
826 let batch = vec![r#"{"records": [{"EventID": 1}, {"EventID": 2}]}"#.to_string()];
827 let results = proc.process_batch_lines(&batch, &filter);
828 assert_eq!(results.len(), 1);
829 assert_eq!(
830 results[0].detection_count(),
831 1,
832 "only EventID=1 from records array should match"
833 );
834 }
835
836 #[test]
837 fn empty_batch_returns_empty() {
838 let proc = make_processor(
839 r#"
840title: Test Rule
841status: test
842logsource:
843 category: test
844detection:
845 selection:
846 EventID: 1
847 condition: selection
848"#,
849 );
850
851 let batch: Vec<String> = vec![];
852 let results = proc.process_batch_lines(&batch, &identity_filter);
853 assert!(results.is_empty());
854 }
855
856 #[test]
858 fn metrics_hook_invocations() {
859 use std::sync::atomic::{AtomicU64, Ordering};
860
861 struct CountingMetrics {
862 parse_errors: AtomicU64,
863 events_processed: AtomicU64,
864 detection_matches: AtomicU64,
865 }
866
867 impl MetricsHook for CountingMetrics {
868 fn on_parse_error(&self) {
869 self.parse_errors.fetch_add(1, Ordering::Relaxed);
870 }
871 fn on_events_processed(&self, count: u64) {
872 self.events_processed.fetch_add(count, Ordering::Relaxed);
873 }
874 fn on_detection_matches(&self, count: u64) {
875 self.detection_matches.fetch_add(count, Ordering::Relaxed);
876 }
877 fn on_correlation_matches(&self, _: u64) {}
878 fn observe_processing_latency(&self, _: f64) {}
879 fn on_input_queue_depth_change(&self, _: i64) {}
880 fn on_back_pressure(&self) {}
881 fn observe_batch_size(&self, _: u64) {}
882 fn on_output_queue_depth_change(&self, _: i64) {}
883 fn observe_pipeline_latency(&self, _: f64) {}
884 fn set_correlation_state_entries(&self, _: u64) {}
885 }
886
887 let dir = tempfile::tempdir().unwrap();
888 let rule_path = dir.path().join("test.yml");
889 std::fs::write(
890 &rule_path,
891 r#"
892title: Test Rule
893status: test
894logsource:
895 category: test
896detection:
897 selection:
898 EventID: 1
899 condition: selection
900"#,
901 )
902 .unwrap();
903
904 let mut engine = RuntimeEngine::new(rule_path, vec![], CorrelationConfig::default(), false);
905 engine.load_rules().unwrap();
906
907 let metrics = Arc::new(CountingMetrics {
908 parse_errors: AtomicU64::new(0),
909 events_processed: AtomicU64::new(0),
910 detection_matches: AtomicU64::new(0),
911 });
912 let proc = LogProcessor::new(engine, metrics.clone());
913
914 let batch = vec![
915 "not json".to_string(),
916 r#"{"EventID": 1}"#.to_string(),
917 r#"{"EventID": 2}"#.to_string(),
918 ];
919 proc.process_batch_lines(&batch, &identity_filter);
920
921 assert_eq!(metrics.parse_errors.load(Ordering::Relaxed), 1);
922 assert_eq!(metrics.events_processed.load(Ordering::Relaxed), 2);
923 assert_eq!(metrics.detection_matches.load(Ordering::Relaxed), 1);
924
925 std::mem::forget(dir);
926 }
927
928 #[test]
930 fn concurrent_swap_and_process() {
931 let dir = tempfile::tempdir().unwrap();
932 let rule_path = dir.path().join("test.yml");
933 std::fs::write(
934 &rule_path,
935 r#"
936title: Rule A
937status: test
938logsource:
939 category: test
940detection:
941 selection:
942 EventID: 1
943 condition: selection
944"#,
945 )
946 .unwrap();
947
948 let mut engine = RuntimeEngine::new(
949 rule_path.clone(),
950 vec![],
951 CorrelationConfig::default(),
952 false,
953 );
954 engine.load_rules().unwrap();
955 let proc = Arc::new(LogProcessor::new(engine, Arc::new(NoopMetrics)));
956
957 let handles: Vec<_> = (0..4)
958 .map(|i| {
959 let proc = proc.clone();
960 let rule_path = rule_path.clone();
961 std::thread::spawn(move || {
962 let batch = vec![r#"{"EventID": 1}"#.to_string()];
963 for _ in 0..100 {
964 let _ = proc.process_batch_lines(&batch, &identity_filter);
965 }
966 if i == 0 {
968 let mut new_engine = RuntimeEngine::new(
969 rule_path,
970 vec![],
971 CorrelationConfig::default(),
972 false,
973 );
974 new_engine.load_rules().unwrap();
975 proc.swap_engine(new_engine);
976 }
977 })
978 })
979 .collect();
980
981 for h in handles {
982 h.join().unwrap();
983 }
984
985 std::mem::forget(dir);
986 }
987
988 #[test]
991 fn format_json_matches() {
992 let proc = make_processor(
993 r#"
994title: Test Rule
995status: test
996logsource:
997 category: test
998detection:
999 selection:
1000 EventID: 1
1001 condition: selection
1002"#,
1003 );
1004
1005 let batch = vec![r#"{"EventID": 1}"#.to_string()];
1006 let results = proc.process_batch_with_format(&batch, &InputFormat::Json, None);
1007 assert_eq!(results.len(), 1);
1008 assert!(
1009 results[0].detection_count() > 0,
1010 "JSON EventID=1 should match"
1011 );
1012 }
1013
1014 #[test]
1015 fn format_syslog_extracts_fields() {
1016 let proc = make_processor(
1017 r#"
1018title: Syslog Test
1019status: test
1020logsource:
1021 category: test
1022detection:
1023 selection:
1024 hostname: mymachine
1025 condition: selection
1026"#,
1027 );
1028
1029 let batch = vec!["<34>Oct 11 22:14:15 mymachine su: test message".to_string()];
1030 let results = proc.process_batch_with_format(
1031 &batch,
1032 &InputFormat::Syslog(crate::input::SyslogConfig::default()),
1033 None,
1034 );
1035 assert_eq!(results.len(), 1);
1036 assert!(
1037 results[0].detection_count() > 0,
1038 "syslog hostname=mymachine should match"
1039 );
1040 }
1041
1042 #[test]
1043 fn format_plain_keyword_match() {
1044 let proc = make_processor(
1045 r#"
1046title: Keyword Test
1047status: test
1048logsource:
1049 category: test
1050detection:
1051 keywords:
1052 - "disk full"
1053 condition: keywords
1054"#,
1055 );
1056
1057 let batch = vec!["ERROR: disk full on /dev/sda1".to_string()];
1058 let results = proc.process_batch_with_format(&batch, &InputFormat::Plain, None);
1059 assert_eq!(results.len(), 1);
1060 assert!(
1061 results[0].detection_count() > 0,
1062 "plain keyword 'disk full' should match"
1063 );
1064 }
1065
1066 #[test]
1067 fn format_auto_detects_json() {
1068 let proc = make_processor(
1069 r#"
1070title: Test Rule
1071status: test
1072logsource:
1073 category: test
1074detection:
1075 selection:
1076 EventID: 1
1077 condition: selection
1078"#,
1079 );
1080
1081 let batch = vec![r#"{"EventID": 1}"#.to_string()];
1082 let results = proc.process_batch_with_format(&batch, &InputFormat::default(), None);
1083 assert_eq!(results.len(), 1);
1084 assert!(results[0].detection_count() > 0);
1085 }
1086
1087 #[test]
1088 fn format_json_with_event_filter() {
1089 let proc = make_processor(
1090 r#"
1091title: Test Rule
1092status: test
1093logsource:
1094 category: test
1095detection:
1096 selection:
1097 EventID: 1
1098 condition: selection
1099"#,
1100 );
1101
1102 let filter = |v: &serde_json::Value| -> Vec<serde_json::Value> {
1103 if let Some(records) = v.get("records").and_then(|r| r.as_array()) {
1104 records.clone()
1105 } else {
1106 vec![v.clone()]
1107 }
1108 };
1109
1110 let batch = vec![r#"{"records": [{"EventID": 1}, {"EventID": 2}]}"#.to_string()];
1111 let results = proc.process_batch_with_format(&batch, &InputFormat::Json, Some(&filter));
1112 assert_eq!(results.len(), 1);
1113 assert_eq!(
1114 results[0].detection_count(),
1115 1,
1116 "only EventID=1 from records array should match"
1117 );
1118 }
1119
1120 #[test]
1121 fn format_empty_lines_skipped() {
1122 let proc = make_processor(
1123 r#"
1124title: Test Rule
1125status: test
1126logsource:
1127 category: test
1128detection:
1129 selection:
1130 EventID: 1
1131 condition: selection
1132"#,
1133 );
1134
1135 let batch = vec![
1136 "".to_string(),
1137 " ".to_string(),
1138 r#"{"EventID": 1}"#.to_string(),
1139 ];
1140 let results = proc.process_batch_with_format(&batch, &InputFormat::Json, None);
1141 assert_eq!(results.len(), 3);
1142 assert!(results[0].detection_count() == 0);
1143 assert!(results[1].detection_count() == 0);
1144 assert!(results[2].detection_count() > 0);
1145 }
1146
1147 #[cfg(feature = "logfmt")]
1148 #[test]
1149 fn format_logfmt_matches() {
1150 let proc = make_processor(
1151 r#"
1152title: Logfmt Test
1153status: test
1154logsource:
1155 category: test
1156detection:
1157 selection:
1158 level: error
1159 condition: selection
1160"#,
1161 );
1162
1163 let batch = vec!["level=error msg=something host=web01".to_string()];
1164 let results = proc.process_batch_with_format(&batch, &InputFormat::Logfmt, None);
1165 assert_eq!(results.len(), 1);
1166 assert!(
1167 results[0].detection_count() > 0,
1168 "logfmt level=error should match"
1169 );
1170 }
1171
1172 #[cfg(feature = "cef")]
1173 #[test]
1174 fn format_cef_matches() {
1175 let proc = make_processor(
1176 r#"
1177title: CEF Test
1178status: test
1179logsource:
1180 category: test
1181detection:
1182 selection:
1183 deviceVendor: Security
1184 condition: selection
1185"#,
1186 );
1187
1188 let batch = vec!["CEF:0|Security|IDS|1.0|100|Attack|9|src=10.0.0.1".to_string()];
1189 let results = proc.process_batch_with_format(&batch, &InputFormat::Cef, None);
1190 assert_eq!(results.len(), 1);
1191 assert!(
1192 results[0].detection_count() > 0,
1193 "CEF deviceVendor=Security should match"
1194 );
1195 }
1196}