1use std::sync::Arc;
2
3use parking_lot::Mutex;
4use std::time::Instant;
5
6use arc_swap::ArcSwap;
7use rsigma_eval::{Event, FieldObserver, JsonEvent, ProcessResult, ProcessResultExt, RuleFieldSet};
8
9use crate::engine::RuntimeEngine;
10use crate::input::{EventInputDecoded, InputFormat, parse_line};
11use crate::metrics::MetricsHook;
12
13pub type EventFilter = dyn Fn(&serde_json::Value) -> Vec<serde_json::Value>;
18
19pub struct LogProcessor {
27 engine: Arc<ArcSwap<Mutex<RuntimeEngine>>>,
28 metrics: Arc<dyn MetricsHook>,
29 field_observer: ArcSwap<Option<Arc<FieldObserver>>>,
34}
35
36impl LogProcessor {
37 pub fn new(engine: RuntimeEngine, metrics: Arc<dyn MetricsHook>) -> Self {
39 LogProcessor {
40 engine: Arc::new(ArcSwap::from_pointee(Mutex::new(engine))),
41 metrics,
42 field_observer: ArcSwap::new(Arc::new(None)),
43 }
44 }
45
46 pub fn set_field_observer(&self, observer: Option<Arc<FieldObserver>>) {
54 self.field_observer.store(Arc::new(observer));
55 }
56
57 pub fn field_observer(&self) -> Option<Arc<FieldObserver>> {
59 self.field_observer.load_full().as_ref().clone()
60 }
61
62 pub fn swap_engine(&self, new_engine: RuntimeEngine) {
68 self.engine.store(Arc::new(Mutex::new(new_engine)));
69 }
70
71 pub fn engine_snapshot(&self) -> arc_swap::Guard<Arc<Mutex<RuntimeEngine>>> {
76 self.engine.load()
77 }
78
79 pub fn process_batch_lines(
89 &self,
90 batch: &[String],
91 event_filter: &EventFilter,
92 ) -> Vec<ProcessResult> {
93 let engine_guard = self.engine.load();
94 let mut engine = engine_guard.lock();
95
96 let mut parsed: Vec<(usize, Vec<serde_json::Value>)> = Vec::with_capacity(batch.len());
98 for (line_idx, line) in batch.iter().enumerate() {
99 match serde_json::from_str::<serde_json::Value>(line) {
100 Ok(value) => {
101 let payloads = event_filter(&value);
102 if !payloads.is_empty() {
103 parsed.push((line_idx, payloads));
104 }
105 }
106 Err(e) => {
107 self.metrics.on_parse_error();
108 tracing::debug!(error = %e, "Invalid JSON on input");
109 }
110 }
111 }
112
113 let mut flat: Vec<(usize, &serde_json::Value)> = Vec::new();
115 for (line_idx, payloads) in &parsed {
116 for payload in payloads {
117 flat.push((*line_idx, payload));
118 }
119 }
120
121 if flat.is_empty() {
122 return empty_results(batch.len());
123 }
124
125 let events: Vec<JsonEvent> = flat.iter().map(|(_, v)| JsonEvent::borrow(v)).collect();
127 let event_refs: Vec<&JsonEvent> = events.iter().collect();
128
129 let start = Instant::now();
130 let batch_results = engine.process_batch(&event_refs);
131 let elapsed = start.elapsed().as_secs_f64();
132 let per_event_latency = elapsed / event_refs.len() as f64;
133
134 let stats = engine.stats();
136 self.metrics
137 .set_correlation_state_entries(stats.state_entries as u64);
138
139 let mut line_results = empty_results(batch.len());
141
142 for ((line_idx, _), result) in flat.iter().zip(batch_results) {
143 self.metrics.on_events_processed(1);
144 self.metrics.observe_processing_latency(per_event_latency);
145 self.metrics
146 .on_detection_matches(result.detection_count() as u64);
147 self.metrics
148 .on_correlation_matches(result.correlation_count() as u64);
149
150 for r in &result {
151 let level_str = r.header.level.as_ref().map_or("unknown", |l| l.as_str());
152 let title = &r.header.rule_title;
153 match &r.body {
154 rsigma_eval::ResultBody::Detection(_) => {
155 self.metrics.on_detection_match_detail(title, level_str);
156 }
157 rsigma_eval::ResultBody::Correlation(body) => {
158 self.metrics.on_correlation_match_detail(
159 title,
160 level_str,
161 body.correlation_type.as_str(),
162 );
163 }
164 }
165 }
166
167 line_results[*line_idx].extend(result);
168 }
169
170 line_results
171 }
172
173 pub fn process_batch_with_format(
183 &self,
184 batch: &[String],
185 format: &InputFormat,
186 event_filter: Option<&EventFilter>,
187 ) -> Vec<ProcessResult> {
188 let engine_guard = self.engine.load();
189 let mut engine = engine_guard.lock();
190
191 let mut decoded_events: Vec<(usize, EventInputDecoded)> = Vec::with_capacity(batch.len());
194
195 for (line_idx, line) in batch.iter().enumerate() {
196 let Some(decoded) = parse_line(line, format) else {
197 if !line.trim().is_empty() {
198 self.metrics.on_parse_error();
199 tracing::debug!("Failed to parse input line");
200 }
201 continue;
202 };
203
204 if let Some(filter) = event_filter
207 && let EventInputDecoded::Json(ref json_event) = decoded
208 {
209 let json_value = json_event.to_json();
210 let payloads = filter(&json_value);
211 for payload in payloads {
212 decoded_events
213 .push((line_idx, EventInputDecoded::Json(JsonEvent::owned(payload))));
214 }
215 continue;
216 }
217
218 decoded_events.push((line_idx, decoded));
219 }
220
221 if decoded_events.is_empty() {
222 return empty_results(batch.len());
223 }
224
225 let observer_guard = self.field_observer.load();
232 if let Some(observer) = observer_guard.as_ref() {
233 for (_, decoded) in &decoded_events {
234 observer.observe(decoded);
235 }
236 }
237 drop(observer_guard);
238
239 let event_refs: Vec<&EventInputDecoded> = decoded_events.iter().map(|(_, e)| e).collect();
241
242 let start = Instant::now();
243 let batch_results = engine.process_batch(&event_refs);
244 let elapsed = start.elapsed().as_secs_f64();
245 let per_event_latency = elapsed / event_refs.len() as f64;
246
247 let stats = engine.stats();
248 self.metrics
249 .set_correlation_state_entries(stats.state_entries as u64);
250
251 let mut line_results = empty_results(batch.len());
253
254 for ((line_idx, _), result) in decoded_events.iter().zip(batch_results) {
255 self.metrics.on_events_processed(1);
256 self.metrics.observe_processing_latency(per_event_latency);
257 self.metrics
258 .on_detection_matches(result.detection_count() as u64);
259 self.metrics
260 .on_correlation_matches(result.correlation_count() as u64);
261
262 for r in &result {
263 let level_str = r.header.level.as_ref().map_or("unknown", |l| l.as_str());
264 let title = &r.header.rule_title;
265 match &r.body {
266 rsigma_eval::ResultBody::Detection(_) => {
267 self.metrics.on_detection_match_detail(title, level_str);
268 }
269 rsigma_eval::ResultBody::Correlation(body) => {
270 self.metrics.on_correlation_match_detail(
271 title,
272 level_str,
273 body.correlation_type.as_str(),
274 );
275 }
276 }
277 }
278
279 line_results[*line_idx].extend(result);
280 }
281
282 line_results
283 }
284
285 pub fn reload_rules(&self) -> Result<crate::engine::EngineStats, String> {
295 let snapshot = self.engine.load();
302 let old = snapshot.lock();
303 let old_state = old.export_state();
304 let rules_path = old.rules_path().to_path_buf();
305 let pipelines = old.pipelines().to_vec();
306 let pipeline_paths = old.pipeline_paths().to_vec();
307 let corr_config = old.corr_config().clone();
308 let include_event = old.include_event();
309 let resolver = old.source_resolver().cloned();
310 let allow_remote_include = old.allow_remote_include();
311 let bloom_prefilter = old.bloom_prefilter();
312 let bloom_max_bytes = old.bloom_max_bytes();
313 #[cfg(feature = "daachorse-index")]
314 let cross_rule_ac = old.cross_rule_ac();
315 drop(old);
316 drop(snapshot);
317
318 let mut new_engine = RuntimeEngine::new(rules_path, pipelines, corr_config, include_event);
319 new_engine.set_pipeline_paths(pipeline_paths);
320 new_engine.set_allow_remote_include(allow_remote_include);
321 new_engine.set_bloom_prefilter(bloom_prefilter);
322 if let Some(budget) = bloom_max_bytes {
323 new_engine.set_bloom_max_bytes(budget);
324 }
325 #[cfg(feature = "daachorse-index")]
326 new_engine.set_cross_rule_ac(cross_rule_ac);
327 if let Some(resolver) = resolver {
328 new_engine.set_source_resolver(resolver);
329 }
330 let stats = new_engine.load_rules()?;
331
332 if let Some(state) = old_state
333 && !new_engine.import_state(&state)
334 {
335 tracing::warn!(
336 "Incompatible correlation snapshot version during reload, starting fresh"
337 );
338 }
339
340 self.swap_engine(new_engine);
341 Ok(stats)
342 }
343
344 pub fn rules_path(&self) -> std::path::PathBuf {
346 let snapshot = self.engine.load();
347 let engine = snapshot.lock();
348 engine.rules_path().to_path_buf()
349 }
350
351 pub fn metrics(&self) -> &dyn MetricsHook {
353 &*self.metrics
354 }
355
356 pub fn export_state(&self) -> Option<rsigma_eval::CorrelationSnapshot> {
358 let snapshot = self.engine.load();
359 let engine = snapshot.lock();
360 engine.export_state()
361 }
362
363 pub fn import_state(&self, snapshot: &rsigma_eval::CorrelationSnapshot) -> bool {
365 let guard = self.engine.load();
366 let mut engine = guard.lock();
367 engine.import_state(snapshot)
368 }
369
370 pub fn stats(&self) -> crate::engine::EngineStats {
372 let snapshot = self.engine.load();
373 let engine = snapshot.lock();
374 engine.stats()
375 }
376
377 pub fn rule_field_set(&self) -> Arc<RuleFieldSet> {
381 let snapshot = self.engine.load();
382 let engine = snapshot.lock();
383 engine.rule_field_set()
384 }
385}
386
387fn empty_results(count: usize) -> Vec<ProcessResult> {
389 (0..count).map(|_| ProcessResult::new()).collect()
390}
391
392#[cfg(test)]
393mod tests {
394 use super::*;
395 use crate::metrics::NoopMetrics;
396 use rsigma_eval::CorrelationConfig;
397
398 fn identity_filter(v: &serde_json::Value) -> Vec<serde_json::Value> {
399 vec![v.clone()]
400 }
401
402 fn make_processor(rules_yaml: &str) -> LogProcessor {
403 let dir = tempfile::tempdir().unwrap();
404 let rule_path = dir.path().join("test.yml");
405 std::fs::write(&rule_path, rules_yaml).unwrap();
406
407 let mut engine = RuntimeEngine::new(rule_path, vec![], CorrelationConfig::default(), false);
408 engine.load_rules().unwrap();
409 std::mem::forget(dir);
411 LogProcessor::new(engine, Arc::new(NoopMetrics))
412 }
413
414 #[test]
415 fn process_batch_lines_valid_json() {
416 let proc = make_processor(
417 r#"
418title: Test Rule
419status: test
420logsource:
421 category: test
422detection:
423 selection:
424 EventID: 1
425 condition: selection
426"#,
427 );
428
429 let batch = vec![
430 r#"{"EventID": 1}"#.to_string(),
431 r#"{"EventID": 2}"#.to_string(),
432 ];
433 let results = proc.process_batch_lines(&batch, &identity_filter);
434 assert_eq!(results.len(), 2);
435 assert!(results[0].detection_count() > 0, "EventID=1 should match");
436 assert!(
437 results[1].detection_count() == 0,
438 "EventID=2 should not match"
439 );
440 }
441
442 #[test]
443 fn process_batch_lines_invalid_json() {
444 let proc = make_processor(
445 r#"
446title: Test Rule
447status: test
448logsource:
449 category: test
450detection:
451 selection:
452 EventID: 1
453 condition: selection
454"#,
455 );
456
457 let batch = vec!["not json".to_string(), r#"{"EventID": 1}"#.to_string()];
458 let results = proc.process_batch_lines(&batch, &identity_filter);
459 assert_eq!(results.len(), 2);
460 assert!(
461 results[0].detection_count() == 0,
462 "invalid JSON produces empty result"
463 );
464 assert!(results[1].detection_count() > 0, "valid line still matches");
465 }
466
467 #[test]
468 fn swap_engine_replaces_rules() {
469 let dir = tempfile::tempdir().unwrap();
470 let rule_path = dir.path().join("test.yml");
471 std::fs::write(
472 &rule_path,
473 r#"
474title: Rule A
475status: test
476logsource:
477 category: test
478detection:
479 selection:
480 EventID: 1
481 condition: selection
482"#,
483 )
484 .unwrap();
485
486 let mut engine = RuntimeEngine::new(
487 rule_path.clone(),
488 vec![],
489 CorrelationConfig::default(),
490 false,
491 );
492 engine.load_rules().unwrap();
493 let proc = LogProcessor::new(engine, Arc::new(NoopMetrics));
494
495 let batch = vec![r#"{"EventID": 1}"#.to_string()];
496 assert!(proc.process_batch_lines(&batch, &identity_filter)[0].detection_count() > 0);
497
498 std::fs::write(
500 &rule_path,
501 r#"
502title: Rule B
503status: test
504logsource:
505 category: test
506detection:
507 selection:
508 EventID: 99
509 condition: selection
510"#,
511 )
512 .unwrap();
513
514 let mut new_engine =
515 RuntimeEngine::new(rule_path, vec![], CorrelationConfig::default(), false);
516 new_engine.load_rules().unwrap();
517 proc.swap_engine(new_engine);
518
519 assert!(proc.process_batch_lines(&batch, &identity_filter)[0].detection_count() == 0);
520
521 let batch2 = vec![r#"{"EventID": 99}"#.to_string()];
522 assert!(proc.process_batch_lines(&batch2, &identity_filter)[0].detection_count() > 0);
523
524 std::mem::forget(dir);
525 }
526
527 #[test]
528 fn reload_rules_preserves_bloom_tuning() {
529 let dir = tempfile::tempdir().unwrap();
536 let rule_path = dir.path().join("test.yml");
537 std::fs::write(
538 &rule_path,
539 r#"
540title: Rule A
541status: test
542logsource:
543 category: test
544detection:
545 selection:
546 EventID: 1
547 condition: selection
548"#,
549 )
550 .unwrap();
551
552 let mut engine = RuntimeEngine::new(
553 rule_path.clone(),
554 vec![],
555 CorrelationConfig::default(),
556 false,
557 );
558 engine.set_bloom_prefilter(true);
559 engine.set_bloom_max_bytes(2 * 1024 * 1024);
560 #[cfg(feature = "daachorse-index")]
561 engine.set_cross_rule_ac(true);
562 engine.load_rules().unwrap();
563
564 let proc = LogProcessor::new(engine, Arc::new(NoopMetrics));
565 proc.reload_rules().unwrap();
566
567 let snapshot = proc.engine_snapshot();
568 let reloaded = snapshot.lock();
569 assert!(
570 reloaded.bloom_prefilter(),
571 "bloom_prefilter must survive reload_rules"
572 );
573 assert_eq!(
574 reloaded.bloom_max_bytes(),
575 Some(2 * 1024 * 1024),
576 "bloom_max_bytes must survive reload_rules"
577 );
578 #[cfg(feature = "daachorse-index")]
579 assert!(
580 reloaded.cross_rule_ac(),
581 "cross_rule_ac must survive reload_rules"
582 );
583
584 std::mem::forget(dir);
585 }
586
587 #[test]
588 fn reload_rules_preserves_engine() {
589 let dir = tempfile::tempdir().unwrap();
590 let rule_path = dir.path().join("test.yml");
591 std::fs::write(
592 &rule_path,
593 r#"
594title: Rule A
595status: test
596logsource:
597 category: test
598detection:
599 selection:
600 EventID: 1
601 condition: selection
602"#,
603 )
604 .unwrap();
605
606 let mut engine = RuntimeEngine::new(
607 rule_path.clone(),
608 vec![],
609 CorrelationConfig::default(),
610 false,
611 );
612 engine.load_rules().unwrap();
613 let proc = LogProcessor::new(engine, Arc::new(NoopMetrics));
614
615 let batch = vec![r#"{"EventID": 1}"#.to_string()];
616 assert!(proc.process_batch_lines(&batch, &identity_filter)[0].detection_count() > 0);
617
618 std::fs::write(
620 &rule_path,
621 r#"
622title: Rule B
623status: test
624logsource:
625 category: test
626detection:
627 selection:
628 EventID: 42
629 condition: selection
630"#,
631 )
632 .unwrap();
633
634 let stats = proc.reload_rules().unwrap();
635 assert_eq!(stats.detection_rules, 1);
636
637 assert!(proc.process_batch_lines(&batch, &identity_filter)[0].detection_count() == 0);
639 let batch2 = vec![r#"{"EventID": 42}"#.to_string()];
641 assert!(proc.process_batch_lines(&batch2, &identity_filter)[0].detection_count() > 0);
642
643 std::mem::forget(dir);
644 }
645
646 #[test]
647 fn reload_re_reads_pipelines_from_disk() {
648 let dir = tempfile::tempdir().unwrap();
649
650 let rule_path = dir.path().join("test.yml");
653 std::fs::write(
654 &rule_path,
655 r#"
656title: Rule A
657status: test
658logsource:
659 category: test
660detection:
661 selection:
662 SourceIP: "10.0.0.1"
663 condition: selection
664"#,
665 )
666 .unwrap();
667
668 let pipeline_path = dir.path().join("pipeline.yml");
670 std::fs::write(
671 &pipeline_path,
672 r#"
673name: Initial Pipeline
674priority: 10
675transformations:
676 - id: rename_field
677 type: field_name_mapping
678 mapping:
679 SourceIP: src_ip
680"#,
681 )
682 .unwrap();
683
684 let pipelines = vec![rsigma_eval::parse_pipeline_file(&pipeline_path).unwrap()];
685 let mut engine = RuntimeEngine::new(
686 rule_path.clone(),
687 pipelines,
688 CorrelationConfig::default(),
689 false,
690 );
691 engine.set_pipeline_paths(vec![pipeline_path.clone()]);
692 engine.load_rules().unwrap();
693 let proc = LogProcessor::new(engine, Arc::new(NoopMetrics));
694
695 let batch = vec![r#"{"src_ip": "10.0.0.1"}"#.to_string()];
697 assert!(
698 proc.process_batch_lines(&batch, &identity_filter)[0].detection_count() > 0,
699 "src_ip should match because pipeline mapped SourceIP -> src_ip"
700 );
701
702 std::fs::write(
704 &pipeline_path,
705 r#"
706name: Updated Pipeline
707priority: 10
708transformations:
709 - id: rename_field
710 type: field_name_mapping
711 mapping:
712 SourceIP: source.ip
713"#,
714 )
715 .unwrap();
716
717 proc.reload_rules().unwrap();
718
719 assert!(
721 proc.process_batch_lines(&batch, &identity_filter)[0].detection_count() == 0,
722 "after pipeline reload, src_ip should no longer match"
723 );
724
725 let batch2 = vec![r#"{"source.ip": "10.0.0.1"}"#.to_string()];
727 assert!(
728 proc.process_batch_lines(&batch2, &identity_filter)[0].detection_count() > 0,
729 "after pipeline reload, source.ip should match"
730 );
731
732 std::mem::forget(dir);
733 }
734
735 #[test]
736 fn reload_with_broken_pipeline_keeps_old_engine() {
737 let dir = tempfile::tempdir().unwrap();
738 let rule_path = dir.path().join("test.yml");
739 std::fs::write(
740 &rule_path,
741 r#"
742title: Rule A
743status: test
744logsource:
745 category: test
746detection:
747 selection:
748 SourceIP: "10.0.0.1"
749 condition: selection
750"#,
751 )
752 .unwrap();
753
754 let pipeline_path = dir.path().join("pipeline.yml");
755 std::fs::write(
756 &pipeline_path,
757 r#"
758name: Working Pipeline
759priority: 10
760transformations:
761 - id: rename_field
762 type: field_name_mapping
763 mapping:
764 SourceIP: src_ip
765"#,
766 )
767 .unwrap();
768
769 let pipelines = vec![rsigma_eval::parse_pipeline_file(&pipeline_path).unwrap()];
770 let mut engine = RuntimeEngine::new(
771 rule_path.clone(),
772 pipelines,
773 CorrelationConfig::default(),
774 false,
775 );
776 engine.set_pipeline_paths(vec![pipeline_path.clone()]);
777 engine.load_rules().unwrap();
778 let proc = LogProcessor::new(engine, Arc::new(NoopMetrics));
779
780 let batch = vec![r#"{"src_ip": "10.0.0.1"}"#.to_string()];
782 assert!(proc.process_batch_lines(&batch, &identity_filter)[0].detection_count() > 0);
783
784 std::fs::write(&pipeline_path, "{{{{ invalid yaml !!!!").unwrap();
786
787 let result = proc.reload_rules();
789 assert!(result.is_err(), "reload with broken pipeline should fail");
790
791 assert!(
793 proc.process_batch_lines(&batch, &identity_filter)[0].detection_count() > 0,
794 "old engine should still work after failed reload"
795 );
796
797 std::mem::forget(dir);
798 }
799
800 #[test]
801 fn custom_event_filter() {
802 let proc = make_processor(
803 r#"
804title: Test Rule
805status: test
806logsource:
807 category: test
808detection:
809 selection:
810 EventID: 1
811 condition: selection
812"#,
813 );
814
815 let filter = |v: &serde_json::Value| -> Vec<serde_json::Value> {
817 if let Some(records) = v.get("records").and_then(|r| r.as_array()) {
818 records.clone()
819 } else {
820 vec![v.clone()]
821 }
822 };
823
824 let batch = vec![r#"{"records": [{"EventID": 1}, {"EventID": 2}]}"#.to_string()];
825 let results = proc.process_batch_lines(&batch, &filter);
826 assert_eq!(results.len(), 1);
827 assert_eq!(
828 results[0].detection_count(),
829 1,
830 "only EventID=1 from records array should match"
831 );
832 }
833
834 #[test]
835 fn empty_batch_returns_empty() {
836 let proc = make_processor(
837 r#"
838title: Test Rule
839status: test
840logsource:
841 category: test
842detection:
843 selection:
844 EventID: 1
845 condition: selection
846"#,
847 );
848
849 let batch: Vec<String> = vec![];
850 let results = proc.process_batch_lines(&batch, &identity_filter);
851 assert!(results.is_empty());
852 }
853
854 #[test]
856 fn metrics_hook_invocations() {
857 use std::sync::atomic::{AtomicU64, Ordering};
858
859 struct CountingMetrics {
860 parse_errors: AtomicU64,
861 events_processed: AtomicU64,
862 detection_matches: AtomicU64,
863 }
864
865 impl MetricsHook for CountingMetrics {
866 fn on_parse_error(&self) {
867 self.parse_errors.fetch_add(1, Ordering::Relaxed);
868 }
869 fn on_events_processed(&self, count: u64) {
870 self.events_processed.fetch_add(count, Ordering::Relaxed);
871 }
872 fn on_detection_matches(&self, count: u64) {
873 self.detection_matches.fetch_add(count, Ordering::Relaxed);
874 }
875 fn on_correlation_matches(&self, _: u64) {}
876 fn observe_processing_latency(&self, _: f64) {}
877 fn on_input_queue_depth_change(&self, _: i64) {}
878 fn on_back_pressure(&self) {}
879 fn observe_batch_size(&self, _: u64) {}
880 fn on_output_queue_depth_change(&self, _: i64) {}
881 fn observe_pipeline_latency(&self, _: f64) {}
882 fn set_correlation_state_entries(&self, _: u64) {}
883 }
884
885 let dir = tempfile::tempdir().unwrap();
886 let rule_path = dir.path().join("test.yml");
887 std::fs::write(
888 &rule_path,
889 r#"
890title: Test Rule
891status: test
892logsource:
893 category: test
894detection:
895 selection:
896 EventID: 1
897 condition: selection
898"#,
899 )
900 .unwrap();
901
902 let mut engine = RuntimeEngine::new(rule_path, vec![], CorrelationConfig::default(), false);
903 engine.load_rules().unwrap();
904
905 let metrics = Arc::new(CountingMetrics {
906 parse_errors: AtomicU64::new(0),
907 events_processed: AtomicU64::new(0),
908 detection_matches: AtomicU64::new(0),
909 });
910 let proc = LogProcessor::new(engine, metrics.clone());
911
912 let batch = vec![
913 "not json".to_string(),
914 r#"{"EventID": 1}"#.to_string(),
915 r#"{"EventID": 2}"#.to_string(),
916 ];
917 proc.process_batch_lines(&batch, &identity_filter);
918
919 assert_eq!(metrics.parse_errors.load(Ordering::Relaxed), 1);
920 assert_eq!(metrics.events_processed.load(Ordering::Relaxed), 2);
921 assert_eq!(metrics.detection_matches.load(Ordering::Relaxed), 1);
922
923 std::mem::forget(dir);
924 }
925
926 #[test]
928 fn concurrent_swap_and_process() {
929 let dir = tempfile::tempdir().unwrap();
930 let rule_path = dir.path().join("test.yml");
931 std::fs::write(
932 &rule_path,
933 r#"
934title: Rule A
935status: test
936logsource:
937 category: test
938detection:
939 selection:
940 EventID: 1
941 condition: selection
942"#,
943 )
944 .unwrap();
945
946 let mut engine = RuntimeEngine::new(
947 rule_path.clone(),
948 vec![],
949 CorrelationConfig::default(),
950 false,
951 );
952 engine.load_rules().unwrap();
953 let proc = Arc::new(LogProcessor::new(engine, Arc::new(NoopMetrics)));
954
955 let handles: Vec<_> = (0..4)
956 .map(|i| {
957 let proc = proc.clone();
958 let rule_path = rule_path.clone();
959 std::thread::spawn(move || {
960 let batch = vec![r#"{"EventID": 1}"#.to_string()];
961 for _ in 0..100 {
962 let _ = proc.process_batch_lines(&batch, &identity_filter);
963 }
964 if i == 0 {
966 let mut new_engine = RuntimeEngine::new(
967 rule_path,
968 vec![],
969 CorrelationConfig::default(),
970 false,
971 );
972 new_engine.load_rules().unwrap();
973 proc.swap_engine(new_engine);
974 }
975 })
976 })
977 .collect();
978
979 for h in handles {
980 h.join().unwrap();
981 }
982
983 std::mem::forget(dir);
984 }
985
986 #[test]
989 fn format_json_matches() {
990 let proc = make_processor(
991 r#"
992title: Test Rule
993status: test
994logsource:
995 category: test
996detection:
997 selection:
998 EventID: 1
999 condition: selection
1000"#,
1001 );
1002
1003 let batch = vec![r#"{"EventID": 1}"#.to_string()];
1004 let results = proc.process_batch_with_format(&batch, &InputFormat::Json, None);
1005 assert_eq!(results.len(), 1);
1006 assert!(
1007 results[0].detection_count() > 0,
1008 "JSON EventID=1 should match"
1009 );
1010 }
1011
1012 #[test]
1013 fn format_syslog_extracts_fields() {
1014 let proc = make_processor(
1015 r#"
1016title: Syslog Test
1017status: test
1018logsource:
1019 category: test
1020detection:
1021 selection:
1022 hostname: mymachine
1023 condition: selection
1024"#,
1025 );
1026
1027 let batch = vec!["<34>Oct 11 22:14:15 mymachine su: test message".to_string()];
1028 let results = proc.process_batch_with_format(
1029 &batch,
1030 &InputFormat::Syslog(crate::input::SyslogConfig::default()),
1031 None,
1032 );
1033 assert_eq!(results.len(), 1);
1034 assert!(
1035 results[0].detection_count() > 0,
1036 "syslog hostname=mymachine should match"
1037 );
1038 }
1039
1040 #[test]
1041 fn format_plain_keyword_match() {
1042 let proc = make_processor(
1043 r#"
1044title: Keyword Test
1045status: test
1046logsource:
1047 category: test
1048detection:
1049 keywords:
1050 - "disk full"
1051 condition: keywords
1052"#,
1053 );
1054
1055 let batch = vec!["ERROR: disk full on /dev/sda1".to_string()];
1056 let results = proc.process_batch_with_format(&batch, &InputFormat::Plain, None);
1057 assert_eq!(results.len(), 1);
1058 assert!(
1059 results[0].detection_count() > 0,
1060 "plain keyword 'disk full' should match"
1061 );
1062 }
1063
1064 #[test]
1065 fn format_auto_detects_json() {
1066 let proc = make_processor(
1067 r#"
1068title: Test Rule
1069status: test
1070logsource:
1071 category: test
1072detection:
1073 selection:
1074 EventID: 1
1075 condition: selection
1076"#,
1077 );
1078
1079 let batch = vec![r#"{"EventID": 1}"#.to_string()];
1080 let results = proc.process_batch_with_format(&batch, &InputFormat::default(), None);
1081 assert_eq!(results.len(), 1);
1082 assert!(results[0].detection_count() > 0);
1083 }
1084
1085 #[test]
1086 fn format_json_with_event_filter() {
1087 let proc = make_processor(
1088 r#"
1089title: Test Rule
1090status: test
1091logsource:
1092 category: test
1093detection:
1094 selection:
1095 EventID: 1
1096 condition: selection
1097"#,
1098 );
1099
1100 let filter = |v: &serde_json::Value| -> Vec<serde_json::Value> {
1101 if let Some(records) = v.get("records").and_then(|r| r.as_array()) {
1102 records.clone()
1103 } else {
1104 vec![v.clone()]
1105 }
1106 };
1107
1108 let batch = vec![r#"{"records": [{"EventID": 1}, {"EventID": 2}]}"#.to_string()];
1109 let results = proc.process_batch_with_format(&batch, &InputFormat::Json, Some(&filter));
1110 assert_eq!(results.len(), 1);
1111 assert_eq!(
1112 results[0].detection_count(),
1113 1,
1114 "only EventID=1 from records array should match"
1115 );
1116 }
1117
1118 #[test]
1119 fn format_empty_lines_skipped() {
1120 let proc = make_processor(
1121 r#"
1122title: Test Rule
1123status: test
1124logsource:
1125 category: test
1126detection:
1127 selection:
1128 EventID: 1
1129 condition: selection
1130"#,
1131 );
1132
1133 let batch = vec![
1134 "".to_string(),
1135 " ".to_string(),
1136 r#"{"EventID": 1}"#.to_string(),
1137 ];
1138 let results = proc.process_batch_with_format(&batch, &InputFormat::Json, None);
1139 assert_eq!(results.len(), 3);
1140 assert!(results[0].detection_count() == 0);
1141 assert!(results[1].detection_count() == 0);
1142 assert!(results[2].detection_count() > 0);
1143 }
1144
1145 #[cfg(feature = "logfmt")]
1146 #[test]
1147 fn format_logfmt_matches() {
1148 let proc = make_processor(
1149 r#"
1150title: Logfmt Test
1151status: test
1152logsource:
1153 category: test
1154detection:
1155 selection:
1156 level: error
1157 condition: selection
1158"#,
1159 );
1160
1161 let batch = vec!["level=error msg=something host=web01".to_string()];
1162 let results = proc.process_batch_with_format(&batch, &InputFormat::Logfmt, None);
1163 assert_eq!(results.len(), 1);
1164 assert!(
1165 results[0].detection_count() > 0,
1166 "logfmt level=error should match"
1167 );
1168 }
1169
1170 #[cfg(feature = "cef")]
1171 #[test]
1172 fn format_cef_matches() {
1173 let proc = make_processor(
1174 r#"
1175title: CEF Test
1176status: test
1177logsource:
1178 category: test
1179detection:
1180 selection:
1181 deviceVendor: Security
1182 condition: selection
1183"#,
1184 );
1185
1186 let batch = vec!["CEF:0|Security|IDS|1.0|100|Attack|9|src=10.0.0.1".to_string()];
1187 let results = proc.process_batch_with_format(&batch, &InputFormat::Cef, None);
1188 assert_eq!(results.len(), 1);
1189 assert!(
1190 results[0].detection_count() > 0,
1191 "CEF deviceVendor=Security should match"
1192 );
1193 }
1194}