1use std::collections::HashMap;
27use std::fs;
28use std::path::Path;
29use std::sync::Mutex;
30use std::sync::atomic::{AtomicU64, Ordering};
31use std::time::Instant;
32
33use regex::Regex;
34use serde::Deserialize;
35
36use crate::event::Event;
37
38#[derive(Debug, Clone)]
44pub enum SchemaPredicate {
45 FieldPresent(String),
47 FieldAbsent(String),
49 AnyOf(Vec<String>),
51 Equals { field: String, value: String },
54 Matches { field: String, regex: Regex },
56 HasAnyField,
60}
61
62impl SchemaPredicate {
63 fn eval<E: Event + ?Sized>(&self, event: &E) -> bool {
64 match self {
65 SchemaPredicate::FieldPresent(f) => event.get_field(f).is_some(),
66 SchemaPredicate::FieldAbsent(f) => event.get_field(f).is_none(),
67 SchemaPredicate::AnyOf(fields) => fields.iter().any(|f| event.get_field(f).is_some()),
68 SchemaPredicate::Equals { field, value } => event
69 .get_field(field)
70 .and_then(|v| v.as_str().map(|s| s.as_ref().eq_ignore_ascii_case(value)))
71 .unwrap_or(false),
72 SchemaPredicate::Matches { field, regex } => event
73 .get_field(field)
74 .and_then(|v| v.as_str().map(|s| regex.is_match(s.as_ref())))
75 .unwrap_or(false),
76 SchemaPredicate::HasAnyField => !event.field_keys().is_empty(),
77 }
78 }
79}
80
81#[derive(Debug, Clone)]
86pub struct SchemaSignature {
87 pub name: String,
89 pub predicates: Vec<SchemaPredicate>,
93 pub specificity: u32,
95}
96
97impl SchemaSignature {
98 fn matches<E: Event + ?Sized>(&self, event: &E) -> bool {
99 self.predicates.iter().all(|p| p.eval(event))
100 }
101}
102
103#[derive(Debug, Clone, PartialEq, Eq)]
106pub struct SchemaMatch {
107 pub name: String,
108 pub specificity: u32,
109}
110
111#[derive(Debug, Clone)]
117pub struct SchemaClassifier {
118 signatures: Vec<SchemaSignature>,
119}
120
121impl SchemaClassifier {
122 pub fn new(mut signatures: Vec<SchemaSignature>) -> Self {
124 signatures.sort_by(|a, b| {
125 b.specificity
126 .cmp(&a.specificity)
127 .then_with(|| a.name.cmp(&b.name))
128 });
129 Self { signatures }
130 }
131
132 pub fn builtin() -> Self {
134 Self::new(builtin_signatures())
135 }
136
137 pub fn with_user_signatures(user: Vec<SchemaSignature>) -> Self {
141 let mut signatures = builtin_signatures();
142 signatures.extend(user);
143 Self::new(signatures)
144 }
145
146 pub fn classify<E: Event + ?Sized>(&self, event: &E) -> Option<SchemaMatch> {
149 self.signatures
150 .iter()
151 .find(|s| s.matches(event))
152 .map(|s| SchemaMatch {
153 name: s.name.clone(),
154 specificity: s.specificity,
155 })
156 }
157
158 pub fn classify_all<E: Event + ?Sized>(&self, event: &E) -> Vec<String> {
162 let mut out: Vec<String> = Vec::new();
163 for sig in self.signatures.iter().filter(|s| s.matches(event)) {
164 if !out.iter().any(|n| n == &sig.name) {
165 out.push(sig.name.clone());
166 }
167 }
168 out
169 }
170
171 pub fn schema_names(&self) -> Vec<&str> {
173 let mut out: Vec<&str> = Vec::new();
174 for sig in &self.signatures {
175 if !out.contains(&sig.name.as_str()) {
176 out.push(sig.name.as_str());
177 }
178 }
179 out
180 }
181}
182
183impl Default for SchemaClassifier {
184 fn default() -> Self {
185 Self::builtin()
186 }
187}
188
189fn builtin_signatures() -> Vec<SchemaSignature> {
193 vec![
194 SchemaSignature {
196 name: "ecs".to_string(),
197 specificity: 100,
198 predicates: vec![SchemaPredicate::FieldPresent("ecs.version".to_string())],
199 },
200 SchemaSignature {
202 name: "ocsf".to_string(),
203 specificity: 95,
204 predicates: vec![
205 SchemaPredicate::FieldPresent("class_uid".to_string()),
206 SchemaPredicate::FieldPresent("metadata.version".to_string()),
207 ],
208 },
209 SchemaSignature {
211 name: "windows_eventlog".to_string(),
212 specificity: 90,
213 predicates: vec![SchemaPredicate::AnyOf(vec![
214 "Event.System.EventID".to_string(),
215 "Event.System.Provider".to_string(),
216 ])],
217 },
218 SchemaSignature {
220 name: "sysmon".to_string(),
221 specificity: 88,
222 predicates: vec![SchemaPredicate::Equals {
223 field: "Channel".to_string(),
224 value: "Microsoft-Windows-Sysmon/Operational".to_string(),
225 }],
226 },
227 SchemaSignature {
229 name: "sysmon".to_string(),
230 specificity: 88,
231 predicates: vec![SchemaPredicate::Equals {
232 field: "Provider_Name".to_string(),
233 value: "Microsoft-Windows-Sysmon".to_string(),
234 }],
235 },
236 SchemaSignature {
238 name: "sysmon".to_string(),
239 specificity: 80,
240 predicates: vec![
241 SchemaPredicate::FieldPresent("EventID".to_string()),
242 SchemaPredicate::FieldPresent("ProcessGuid".to_string()),
243 SchemaPredicate::AnyOf(vec!["Image".to_string(), "CommandLine".to_string()]),
244 ],
245 },
246 SchemaSignature {
249 name: "cef".to_string(),
250 specificity: 85,
251 predicates: vec![
252 SchemaPredicate::FieldPresent("deviceVendor".to_string()),
253 SchemaPredicate::FieldPresent("deviceProduct".to_string()),
254 SchemaPredicate::FieldPresent("signatureId".to_string()),
255 ],
256 },
257 SchemaSignature {
259 name: "generic_json".to_string(),
260 specificity: 0,
261 predicates: vec![SchemaPredicate::HasAnyField],
262 },
263 ]
264}
265
266pub fn builtin_schema_names() -> Vec<&'static str> {
268 vec![
269 "ecs",
270 "ocsf",
271 "windows_eventlog",
272 "sysmon",
273 "cef",
274 "generic_json",
275 ]
276}
277
278#[derive(Debug, thiserror::Error)]
284pub enum SchemaError {
285 #[error("cannot read schema signatures file '{path}': {source}")]
287 Io {
288 path: String,
289 #[source]
290 source: std::io::Error,
291 },
292 #[error("schema signatures YAML parse error: {0}")]
294 Parse(String),
295 #[error("invalid regex in schema '{name}': {error}")]
297 InvalidRegex { name: String, error: String },
298}
299
300#[derive(Debug, Clone, Deserialize)]
303#[serde(deny_unknown_fields)]
304pub struct FieldValueConfig {
305 pub field: String,
306 pub value: String,
307}
308
309#[derive(Debug, Clone, Default, Deserialize)]
313#[serde(deny_unknown_fields)]
314pub struct SchemaPredicateConfig {
315 #[serde(default)]
317 pub field_present: Option<String>,
318 #[serde(default)]
320 pub field_absent: Option<String>,
321 #[serde(default)]
323 pub any_of: Option<Vec<String>>,
324 #[serde(default)]
326 pub equals: Option<FieldValueConfig>,
327 #[serde(default)]
329 pub matches: Option<FieldValueConfig>,
330}
331
332impl SchemaPredicateConfig {
333 fn build(self, schema_name: &str) -> Result<SchemaPredicate, SchemaError> {
334 let mut chosen: Option<SchemaPredicate> = None;
335 let mut set = 0u32;
336 if let Some(f) = self.field_present {
337 set += 1;
338 chosen = Some(SchemaPredicate::FieldPresent(f));
339 }
340 if let Some(f) = self.field_absent {
341 set += 1;
342 chosen = Some(SchemaPredicate::FieldAbsent(f));
343 }
344 if let Some(fields) = self.any_of {
345 set += 1;
346 chosen = Some(SchemaPredicate::AnyOf(fields));
347 }
348 if let Some(fv) = self.equals {
349 set += 1;
350 chosen = Some(SchemaPredicate::Equals {
351 field: fv.field,
352 value: fv.value,
353 });
354 }
355 if let Some(fv) = self.matches {
356 set += 1;
357 chosen = Some(SchemaPredicate::Matches {
358 field: fv.field,
359 regex: Regex::new(&fv.value).map_err(|e| SchemaError::InvalidRegex {
360 name: schema_name.to_string(),
361 error: e.to_string(),
362 })?,
363 });
364 }
365 match (set, chosen) {
366 (1, Some(p)) => Ok(p),
367 (0, _) => Err(SchemaError::Parse(format!(
368 "schema '{schema_name}': a predicate has no condition (expected one of \
369 field_present, field_absent, any_of, equals, matches)"
370 ))),
371 _ => Err(SchemaError::Parse(format!(
372 "schema '{schema_name}': a predicate sets multiple conditions; use one per list item"
373 ))),
374 }
375 }
376}
377
378#[derive(Debug, Clone, Deserialize)]
380pub struct SchemaSignatureConfig {
381 pub name: String,
383 #[serde(default = "default_user_specificity")]
386 pub specificity: u32,
387 #[serde(default, rename = "match")]
389 pub predicates: Vec<SchemaPredicateConfig>,
390}
391
392fn default_user_specificity() -> u32 {
393 50
394}
395
396#[derive(Debug, Clone, Default, Deserialize)]
399pub struct SchemaSignaturesFile {
400 #[serde(default)]
401 pub schemas: Vec<SchemaSignatureConfig>,
402 #[serde(default)]
403 pub routing: Option<RoutingConfig>,
404}
405
406impl SchemaSignatureConfig {
407 fn build(self) -> Result<SchemaSignature, SchemaError> {
408 let name = self.name;
409 let predicates = self
410 .predicates
411 .into_iter()
412 .map(|p| p.build(&name))
413 .collect::<Result<Vec<_>, _>>()?;
414 Ok(SchemaSignature {
415 name,
416 predicates,
417 specificity: self.specificity,
418 })
419 }
420}
421
422pub fn parse_schema_signatures(yaml: &str) -> Result<Vec<SchemaSignature>, SchemaError> {
424 let file: SchemaSignaturesFile =
425 yaml_serde::from_str(yaml).map_err(|e| SchemaError::Parse(e.to_string()))?;
426 file.schemas.into_iter().map(|s| s.build()).collect()
427}
428
429pub fn load_schema_signatures(path: &Path) -> Result<Vec<SchemaSignature>, SchemaError> {
431 let content = fs::read_to_string(path).map_err(|e| SchemaError::Io {
432 path: path.display().to_string(),
433 source: e,
434 })?;
435 parse_schema_signatures(&content)
436}
437
438pub fn parse_schema_config(
441 yaml: &str,
442) -> Result<(Vec<SchemaSignature>, Option<RoutingConfig>), SchemaError> {
443 let file: SchemaSignaturesFile =
444 yaml_serde::from_str(yaml).map_err(|e| SchemaError::Parse(e.to_string()))?;
445 let signatures = file
446 .schemas
447 .into_iter()
448 .map(|s| s.build())
449 .collect::<Result<Vec<_>, _>>()?;
450 Ok((signatures, file.routing))
451}
452
453pub fn load_schema_config(
456 path: &Path,
457) -> Result<(Vec<SchemaSignature>, Option<RoutingConfig>), SchemaError> {
458 let content = fs::read_to_string(path).map_err(|e| SchemaError::Io {
459 path: path.display().to_string(),
460 source: e,
461 })?;
462 parse_schema_config(&content)
463}
464
465#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Deserialize)]
471#[serde(rename_all = "snake_case")]
472pub enum OnUnknown {
473 #[default]
475 Warn,
476 Drop,
478 Passthrough,
480 Error,
482}
483
484#[derive(Debug, Clone, Deserialize)]
487pub struct SchemaBinding {
488 pub schema: String,
489 #[serde(default)]
491 pub pipelines: Vec<String>,
492}
493
494#[derive(Debug, Clone, Default, Deserialize)]
496pub struct RoutingConfig {
497 #[serde(default)]
498 pub on_unknown: OnUnknown,
499 #[serde(default)]
500 pub bindings: Vec<SchemaBinding>,
501 #[serde(default)]
504 pub default_pipelines: Vec<String>,
505}
506
507#[derive(Debug, Clone, Copy, PartialEq, Eq)]
509pub enum RouteDecision {
510 Evaluate { set: usize, unknown: bool },
513 Drop,
515 Error,
517}
518
519#[derive(Debug, Clone)]
526pub struct RoutingPlan {
527 pipeline_sets: Vec<Vec<String>>,
529 schema_to_set: HashMap<String, usize>,
531 on_unknown: OnUnknown,
532}
533
534impl RoutingPlan {
535 pub fn from_config(config: &RoutingConfig) -> Self {
538 let mut pipeline_sets: Vec<Vec<String>> = vec![config.default_pipelines.clone()];
540 let mut schema_to_set: HashMap<String, usize> = HashMap::new();
541
542 for binding in &config.bindings {
543 let idx = pipeline_sets
544 .iter()
545 .position(|s| s == &binding.pipelines)
546 .unwrap_or_else(|| {
547 pipeline_sets.push(binding.pipelines.clone());
548 pipeline_sets.len() - 1
549 });
550 schema_to_set.insert(binding.schema.clone(), idx);
551 }
552
553 RoutingPlan {
554 pipeline_sets,
555 schema_to_set,
556 on_unknown: config.on_unknown,
557 }
558 }
559
560 pub fn pipeline_sets(&self) -> &[Vec<String>] {
563 &self.pipeline_sets
564 }
565
566 pub fn on_unknown(&self) -> OnUnknown {
568 self.on_unknown
569 }
570
571 pub fn decide(&self, schema: Option<&str>) -> RouteDecision {
574 match schema {
575 Some(s) if self.schema_to_set.contains_key(s) => RouteDecision::Evaluate {
577 set: self.schema_to_set[s],
578 unknown: false,
579 },
580 Some(_) => RouteDecision::Evaluate {
582 set: 0,
583 unknown: false,
584 },
585 None => match self.on_unknown {
587 OnUnknown::Warn | OnUnknown::Passthrough => RouteDecision::Evaluate {
588 set: 0,
589 unknown: true,
590 },
591 OnUnknown::Drop => RouteDecision::Drop,
592 OnUnknown::Error => RouteDecision::Error,
593 },
594 }
595 }
596}
597
598#[derive(Debug, Clone, PartialEq, Eq)]
604pub struct SchemaCountEntry {
605 pub schema: String,
607 pub count: u64,
609}
610
611#[derive(Debug, Clone, Default)]
613pub struct SchemaObservation {
614 pub by_schema: Vec<SchemaCountEntry>,
616 pub classified: u64,
618 pub unknown: u64,
620 pub events_observed: u64,
622 pub lifetime_classified: u64,
625 pub lifetime_unknown: u64,
627 pub uptime_seconds: f64,
629}
630
631pub struct SchemaObserver {
637 classifier: SchemaClassifier,
638 counts: Mutex<HashMap<String, u64>>,
639 unknown: AtomicU64,
640 events_observed: AtomicU64,
641 lifetime_classified: AtomicU64,
642 lifetime_unknown: AtomicU64,
643 start: Mutex<Instant>,
644}
645
646impl SchemaObserver {
647 pub fn new(classifier: SchemaClassifier) -> Self {
649 Self {
650 classifier,
651 counts: Mutex::new(HashMap::new()),
652 unknown: AtomicU64::new(0),
653 events_observed: AtomicU64::new(0),
654 lifetime_classified: AtomicU64::new(0),
655 lifetime_unknown: AtomicU64::new(0),
656 start: Mutex::new(Instant::now()),
657 }
658 }
659
660 pub fn builtin() -> Self {
662 Self::new(SchemaClassifier::builtin())
663 }
664
665 pub fn observe<E: Event + ?Sized>(&self, event: &E) {
668 self.events_observed.fetch_add(1, Ordering::Relaxed);
669 match self.classifier.classify(event) {
670 Some(m) => {
671 self.lifetime_classified.fetch_add(1, Ordering::Relaxed);
672 let mut counts = self.counts.lock().expect("schema observer mutex poisoned");
673 *counts.entry(m.name).or_insert(0) += 1;
674 }
675 None => {
676 self.unknown.fetch_add(1, Ordering::Relaxed);
677 self.lifetime_unknown.fetch_add(1, Ordering::Relaxed);
678 }
679 }
680 }
681
682 pub fn snapshot(&self) -> SchemaObservation {
684 let counts = self.counts.lock().expect("schema observer mutex poisoned");
685 let mut by_schema: Vec<SchemaCountEntry> = counts
686 .iter()
687 .map(|(schema, count)| SchemaCountEntry {
688 schema: schema.clone(),
689 count: *count,
690 })
691 .collect();
692 let classified: u64 = counts.values().sum();
693 drop(counts);
694 by_schema.sort_by(|a, b| b.count.cmp(&a.count).then_with(|| a.schema.cmp(&b.schema)));
695 let unknown = self.unknown.load(Ordering::Relaxed);
696 SchemaObservation {
697 by_schema,
698 classified,
699 unknown,
700 events_observed: self.events_observed.load(Ordering::Relaxed),
701 lifetime_classified: self.lifetime_classified.load(Ordering::Relaxed),
702 lifetime_unknown: self.lifetime_unknown.load(Ordering::Relaxed),
703 uptime_seconds: self
704 .start
705 .lock()
706 .expect("schema observer start mutex poisoned")
707 .elapsed()
708 .as_secs_f64(),
709 }
710 }
711
712 pub fn reset(&self) -> (u64, u64) {
715 let mut counts = self.counts.lock().expect("schema observer mutex poisoned");
716 let previous_classified: u64 = counts.values().sum();
717 counts.clear();
718 drop(counts);
719 let previous_unknown = self.unknown.swap(0, Ordering::Relaxed);
720 self.events_observed.store(0, Ordering::Relaxed);
721 *self
722 .start
723 .lock()
724 .expect("schema observer start mutex poisoned") = Instant::now();
725 (previous_classified, previous_unknown)
726 }
727
728 pub fn lifetime_classified(&self) -> u64 {
730 self.lifetime_classified.load(Ordering::Relaxed)
731 }
732
733 pub fn lifetime_unknown(&self) -> u64 {
735 self.lifetime_unknown.load(Ordering::Relaxed)
736 }
737}
738
739#[cfg(test)]
740mod tests {
741 use super::*;
742 use crate::event::JsonEvent;
743 use serde_json::json;
744
745 fn classify(value: &serde_json::Value) -> Option<String> {
746 SchemaClassifier::builtin()
747 .classify(&JsonEvent::borrow(value))
748 .map(|m| m.name)
749 }
750
751 #[test]
752 fn recognizes_ecs_by_version_marker() {
753 let v = json!({"ecs": {"version": "8.11.0"}, "process": {"command_line": "whoami"}});
754 assert_eq!(classify(&v).as_deref(), Some("ecs"));
755 }
756
757 #[test]
758 fn recognizes_ecs_with_flattened_keys() {
759 let v = json!({"ecs.version": "8.11.0", "process.command_line": "whoami"});
760 assert_eq!(classify(&v).as_deref(), Some("ecs"));
761 }
762
763 #[test]
764 fn recognizes_ocsf_by_class_and_metadata() {
765 let v = json!({"class_uid": 1001, "category_uid": 1, "metadata": {"version": "1.1.0"}});
766 assert_eq!(classify(&v).as_deref(), Some("ocsf"));
767 }
768
769 #[test]
770 fn recognizes_rendered_windows_event_log() {
771 let v = json!({"Event": {"System": {"EventID": 4688, "Provider": "Microsoft-Windows-Security-Auditing"}}});
772 assert_eq!(classify(&v).as_deref(), Some("windows_eventlog"));
773 }
774
775 #[test]
776 fn recognizes_sysmon_by_channel() {
777 let v = json!({"Channel": "Microsoft-Windows-Sysmon/Operational", "EventID": 1, "Image": "C:/cmd.exe"});
778 assert_eq!(classify(&v).as_deref(), Some("sysmon"));
779 }
780
781 #[test]
782 fn recognizes_sysmon_by_provider() {
783 let v = json!({"Provider_Name": "Microsoft-Windows-Sysmon", "EventID": 3});
784 assert_eq!(classify(&v).as_deref(), Some("sysmon"));
785 }
786
787 #[test]
788 fn recognizes_flat_sysmon_by_field_shape() {
789 let v = json!({"EventID": 1, "ProcessGuid": "{abc}", "CommandLine": "cmd /c whoami"});
790 assert_eq!(classify(&v).as_deref(), Some("sysmon"));
791 }
792
793 #[test]
794 fn recognizes_cef_structured_fields() {
795 let v = json!({"deviceVendor": "Security", "deviceProduct": "IDS", "signatureId": "100", "src": "10.0.0.1"});
796 assert_eq!(classify(&v).as_deref(), Some("cef"));
797 }
798
799 #[test]
800 fn falls_back_to_generic_json_for_unrecognized_structured_events() {
801 let v = json!({"some_vendor_field": "x", "another": 1});
802 assert_eq!(classify(&v).as_deref(), Some("generic_json"));
803 }
804
805 #[test]
806 fn fieldless_events_are_unknown() {
807 assert_eq!(classify(&json!({})), None);
809 assert_eq!(classify(&json!("just a string")), None);
811 }
812
813 #[test]
814 fn specificity_prefers_specific_schema_over_generic() {
815 let v = json!({"ecs.version": "8.0.0", "vendor_blob": {"x": 1}});
817 let cls = SchemaClassifier::builtin();
818 let m = cls.classify(&JsonEvent::borrow(&v)).unwrap();
819 assert_eq!(m.name, "ecs");
820 assert_eq!(m.specificity, 100);
821 let all = cls.classify_all(&JsonEvent::borrow(&v));
823 assert_eq!(all.first().map(String::as_str), Some("ecs"));
824 assert!(all.iter().any(|n| n == "generic_json"));
825 }
826
827 #[test]
828 fn schema_names_lists_builtins_most_specific_first() {
829 let classifier = SchemaClassifier::builtin();
830 let names = classifier.schema_names();
831 assert_eq!(names.first(), Some(&"ecs"));
832 assert!(names.contains(&"generic_json"));
833 assert_eq!(names.last(), Some(&"generic_json"));
835 }
836
837 #[test]
838 fn parses_user_signatures_from_yaml() {
839 let yaml = r#"
840schemas:
841 - name: my_vendor
842 specificity: 70
843 match:
844 - field_present: vendor.product
845 - equals:
846 field: event_type
847 value: alert
848 - any_of: [a, b]
849"#;
850 let sigs = parse_schema_signatures(yaml).expect("parse");
851 assert_eq!(sigs.len(), 1);
852 assert_eq!(sigs[0].name, "my_vendor");
853 assert_eq!(sigs[0].specificity, 70);
854 assert_eq!(sigs[0].predicates.len(), 3);
855
856 let cls = SchemaClassifier::with_user_signatures(sigs);
857 let v = json!({"vendor": {"product": "X"}, "event_type": "ALERT", "a": 1});
858 assert_eq!(
859 cls.classify(&JsonEvent::borrow(&v))
860 .map(|m| m.name)
861 .as_deref(),
862 Some("my_vendor")
863 );
864 }
865
866 #[test]
867 fn user_signature_with_invalid_regex_is_rejected() {
868 let yaml = r#"
869schemas:
870 - name: bad
871 match:
872 - matches:
873 field: msg
874 value: "([unclosed"
875"#;
876 let err = parse_schema_signatures(yaml).unwrap_err();
877 assert!(matches!(err, SchemaError::InvalidRegex { .. }));
878 }
879
880 #[test]
881 fn user_regex_signature_matches_field_value() {
882 let yaml = r#"
883schemas:
884 - name: cef_raw
885 specificity: 60
886 match:
887 - matches:
888 field: message
889 value: "^CEF:\\d"
890"#;
891 let sigs = parse_schema_signatures(yaml).expect("parse");
892 let cls = SchemaClassifier::with_user_signatures(sigs);
893 let v = json!({"message": "CEF:0|Vendor|Product|1.0|100|Name|9|src=1.2.3.4"});
894 assert_eq!(
895 cls.classify(&JsonEvent::borrow(&v))
896 .map(|m| m.name)
897 .as_deref(),
898 Some("cef_raw")
899 );
900 }
901
902 #[test]
903 fn observer_counts_per_schema_and_unknown() {
904 let observer = SchemaObserver::builtin();
905 observer.observe(&JsonEvent::borrow(&json!({"ecs.version": "8.0.0"})));
906 observer.observe(&JsonEvent::borrow(&json!({"ecs.version": "8.1.0"})));
907 observer.observe(&JsonEvent::borrow(
908 &json!({"class_uid": 1001, "metadata": {"version": "1.1.0"}}),
909 ));
910 observer.observe(&JsonEvent::borrow(&json!({})));
911
912 let snap = observer.snapshot();
913 assert_eq!(snap.events_observed, 4);
914 assert_eq!(snap.classified, 3);
915 assert_eq!(snap.unknown, 1);
916 assert_eq!(snap.by_schema[0].schema, "ecs");
918 assert_eq!(snap.by_schema[0].count, 2);
919 let ocsf = snap.by_schema.iter().find(|e| e.schema == "ocsf").unwrap();
920 assert_eq!(ocsf.count, 1);
921 }
922
923 #[test]
924 fn routing_plan_dedups_pipeline_sets() {
925 let config = RoutingConfig {
926 on_unknown: OnUnknown::Warn,
927 default_pipelines: vec![],
928 bindings: vec![
929 SchemaBinding {
930 schema: "ecs".to_string(),
931 pipelines: vec!["ecs_windows".to_string()],
932 },
933 SchemaBinding {
934 schema: "winlogbeat".to_string(),
935 pipelines: vec!["ecs_windows".to_string()],
936 },
937 SchemaBinding {
938 schema: "sysmon".to_string(),
939 pipelines: vec!["sysmon".to_string()],
940 },
941 ],
942 };
943 let plan = RoutingPlan::from_config(&config);
944 assert_eq!(plan.pipeline_sets().len(), 3);
946 let ecs = plan.decide(Some("ecs"));
948 let win = plan.decide(Some("winlogbeat"));
949 assert_eq!(ecs, win);
950 assert!(matches!(
951 ecs,
952 RouteDecision::Evaluate { unknown: false, .. }
953 ));
954 assert_ne!(plan.decide(Some("sysmon")), ecs);
956 }
957
958 #[test]
959 fn routing_decides_bound_unbound_and_unknown() {
960 let config = RoutingConfig {
961 on_unknown: OnUnknown::Warn,
962 default_pipelines: vec![],
963 bindings: vec![SchemaBinding {
964 schema: "ecs".to_string(),
965 pipelines: vec!["ecs_windows".to_string()],
966 }],
967 };
968 let plan = RoutingPlan::from_config(&config);
969 assert!(matches!(
971 plan.decide(Some("ecs")),
972 RouteDecision::Evaluate { unknown: false, .. }
973 ));
974 assert_eq!(
976 plan.decide(Some("cef")),
977 RouteDecision::Evaluate {
978 set: 0,
979 unknown: false
980 }
981 );
982 assert_eq!(
984 plan.decide(None),
985 RouteDecision::Evaluate {
986 set: 0,
987 unknown: true
988 }
989 );
990 }
991
992 #[test]
993 fn routing_on_unknown_policies() {
994 let base = |policy| RoutingConfig {
995 on_unknown: policy,
996 default_pipelines: vec![],
997 bindings: vec![],
998 };
999 assert_eq!(
1000 RoutingPlan::from_config(&base(OnUnknown::Drop)).decide(None),
1001 RouteDecision::Drop
1002 );
1003 assert_eq!(
1004 RoutingPlan::from_config(&base(OnUnknown::Error)).decide(None),
1005 RouteDecision::Error
1006 );
1007 assert_eq!(
1008 RoutingPlan::from_config(&base(OnUnknown::Passthrough)).decide(None),
1009 RouteDecision::Evaluate {
1010 set: 0,
1011 unknown: true
1012 }
1013 );
1014 }
1015
1016 #[test]
1017 fn parses_routing_section_from_yaml() {
1018 let yaml = r#"
1019schemas:
1020 - name: my_vendor
1021 match:
1022 - field_present: vendor.id
1023routing:
1024 on_unknown: drop
1025 default_pipelines: [base]
1026 bindings:
1027 - schema: ecs
1028 pipelines: [ecs_windows]
1029 - schema: my_vendor
1030 pipelines: [vendor_map, base]
1031"#;
1032 let (sigs, routing) = parse_schema_config(yaml).expect("parse");
1033 assert_eq!(sigs.len(), 1);
1034 let routing = routing.expect("routing present");
1035 assert_eq!(routing.on_unknown, OnUnknown::Drop);
1036 assert_eq!(routing.default_pipelines, vec!["base".to_string()]);
1037 assert_eq!(routing.bindings.len(), 2);
1038 let plan = RoutingPlan::from_config(&routing);
1039 assert_eq!(plan.pipeline_sets().len(), 3);
1041 assert_eq!(plan.decide(None), RouteDecision::Drop);
1042 }
1043
1044 #[test]
1045 fn observer_reset_preserves_lifetime_counters() {
1046 let observer = SchemaObserver::builtin();
1047 observer.observe(&JsonEvent::borrow(&json!({"ecs.version": "8.0.0"})));
1048 observer.observe(&JsonEvent::borrow(&json!({})));
1049 let (classified, unknown) = observer.reset();
1050 assert_eq!(classified, 1);
1051 assert_eq!(unknown, 1);
1052
1053 let snap = observer.snapshot();
1054 assert_eq!(snap.classified, 0);
1055 assert_eq!(snap.unknown, 0);
1056 assert_eq!(snap.events_observed, 0);
1057 assert_eq!(snap.lifetime_classified, 1);
1059 assert_eq!(snap.lifetime_unknown, 1);
1060 }
1061}