use std::collections::HashMap;
use std::fs;
use std::path::Path;
use std::sync::Mutex;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::Instant;
use regex::Regex;
use serde::Deserialize;
use crate::event::Event;
#[derive(Debug, Clone)]
pub enum SchemaPredicate {
FieldPresent(String),
FieldAbsent(String),
AnyOf(Vec<String>),
Equals { field: String, value: String },
Matches { field: String, regex: Regex },
HasAnyField,
}
impl SchemaPredicate {
fn eval<E: Event + ?Sized>(&self, event: &E) -> bool {
match self {
SchemaPredicate::FieldPresent(f) => event.get_field(f).is_some(),
SchemaPredicate::FieldAbsent(f) => event.get_field(f).is_none(),
SchemaPredicate::AnyOf(fields) => fields.iter().any(|f| event.get_field(f).is_some()),
SchemaPredicate::Equals { field, value } => event
.get_field(field)
.and_then(|v| v.as_str().map(|s| s.as_ref().eq_ignore_ascii_case(value)))
.unwrap_or(false),
SchemaPredicate::Matches { field, regex } => event
.get_field(field)
.and_then(|v| v.as_str().map(|s| regex.is_match(s.as_ref())))
.unwrap_or(false),
SchemaPredicate::HasAnyField => !event.field_keys().is_empty(),
}
}
}
#[derive(Debug, Clone)]
pub struct SchemaSignature {
pub name: String,
pub predicates: Vec<SchemaPredicate>,
pub specificity: u32,
}
impl SchemaSignature {
fn matches<E: Event + ?Sized>(&self, event: &E) -> bool {
self.predicates.iter().all(|p| p.eval(event))
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct SchemaMatch {
pub name: String,
pub specificity: u32,
}
#[derive(Debug, Clone)]
pub struct SchemaClassifier {
signatures: Vec<SchemaSignature>,
}
impl SchemaClassifier {
pub fn new(mut signatures: Vec<SchemaSignature>) -> Self {
signatures.sort_by(|a, b| {
b.specificity
.cmp(&a.specificity)
.then_with(|| a.name.cmp(&b.name))
});
Self { signatures }
}
pub fn builtin() -> Self {
Self::new(builtin_signatures())
}
pub fn with_user_signatures(user: Vec<SchemaSignature>) -> Self {
let mut signatures = builtin_signatures();
signatures.extend(user);
Self::new(signatures)
}
pub fn classify<E: Event + ?Sized>(&self, event: &E) -> Option<SchemaMatch> {
self.signatures
.iter()
.find(|s| s.matches(event))
.map(|s| SchemaMatch {
name: s.name.clone(),
specificity: s.specificity,
})
}
pub fn classify_all<E: Event + ?Sized>(&self, event: &E) -> Vec<String> {
let mut out: Vec<String> = Vec::new();
for sig in self.signatures.iter().filter(|s| s.matches(event)) {
if !out.iter().any(|n| n == &sig.name) {
out.push(sig.name.clone());
}
}
out
}
pub fn schema_names(&self) -> Vec<&str> {
let mut out: Vec<&str> = Vec::new();
for sig in &self.signatures {
if !out.contains(&sig.name.as_str()) {
out.push(sig.name.as_str());
}
}
out
}
}
impl Default for SchemaClassifier {
fn default() -> Self {
Self::builtin()
}
}
fn builtin_signatures() -> Vec<SchemaSignature> {
vec![
SchemaSignature {
name: "ecs".to_string(),
specificity: 100,
predicates: vec![SchemaPredicate::FieldPresent("ecs.version".to_string())],
},
SchemaSignature {
name: "ocsf".to_string(),
specificity: 95,
predicates: vec![
SchemaPredicate::FieldPresent("class_uid".to_string()),
SchemaPredicate::FieldPresent("metadata.version".to_string()),
],
},
SchemaSignature {
name: "windows_eventlog".to_string(),
specificity: 90,
predicates: vec![SchemaPredicate::AnyOf(vec![
"Event.System.EventID".to_string(),
"Event.System.Provider".to_string(),
])],
},
SchemaSignature {
name: "sysmon".to_string(),
specificity: 88,
predicates: vec![SchemaPredicate::Equals {
field: "Channel".to_string(),
value: "Microsoft-Windows-Sysmon/Operational".to_string(),
}],
},
SchemaSignature {
name: "sysmon".to_string(),
specificity: 88,
predicates: vec![SchemaPredicate::Equals {
field: "Provider_Name".to_string(),
value: "Microsoft-Windows-Sysmon".to_string(),
}],
},
SchemaSignature {
name: "sysmon".to_string(),
specificity: 80,
predicates: vec![
SchemaPredicate::FieldPresent("EventID".to_string()),
SchemaPredicate::FieldPresent("ProcessGuid".to_string()),
SchemaPredicate::AnyOf(vec!["Image".to_string(), "CommandLine".to_string()]),
],
},
SchemaSignature {
name: "cef".to_string(),
specificity: 85,
predicates: vec![
SchemaPredicate::FieldPresent("deviceVendor".to_string()),
SchemaPredicate::FieldPresent("deviceProduct".to_string()),
SchemaPredicate::FieldPresent("signatureId".to_string()),
],
},
SchemaSignature {
name: "generic_json".to_string(),
specificity: 0,
predicates: vec![SchemaPredicate::HasAnyField],
},
]
}
pub fn builtin_schema_names() -> Vec<&'static str> {
vec![
"ecs",
"ocsf",
"windows_eventlog",
"sysmon",
"cef",
"generic_json",
]
}
#[derive(Debug, thiserror::Error)]
pub enum SchemaError {
#[error("cannot read schema signatures file '{path}': {source}")]
Io {
path: String,
#[source]
source: std::io::Error,
},
#[error("schema signatures YAML parse error: {0}")]
Parse(String),
#[error("invalid regex in schema '{name}': {error}")]
InvalidRegex { name: String, error: String },
}
#[derive(Debug, Clone, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct FieldValueConfig {
pub field: String,
pub value: String,
}
#[derive(Debug, Clone, Default, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct SchemaPredicateConfig {
#[serde(default)]
pub field_present: Option<String>,
#[serde(default)]
pub field_absent: Option<String>,
#[serde(default)]
pub any_of: Option<Vec<String>>,
#[serde(default)]
pub equals: Option<FieldValueConfig>,
#[serde(default)]
pub matches: Option<FieldValueConfig>,
}
impl SchemaPredicateConfig {
fn build(self, schema_name: &str) -> Result<SchemaPredicate, SchemaError> {
let mut chosen: Option<SchemaPredicate> = None;
let mut set = 0u32;
if let Some(f) = self.field_present {
set += 1;
chosen = Some(SchemaPredicate::FieldPresent(f));
}
if let Some(f) = self.field_absent {
set += 1;
chosen = Some(SchemaPredicate::FieldAbsent(f));
}
if let Some(fields) = self.any_of {
set += 1;
chosen = Some(SchemaPredicate::AnyOf(fields));
}
if let Some(fv) = self.equals {
set += 1;
chosen = Some(SchemaPredicate::Equals {
field: fv.field,
value: fv.value,
});
}
if let Some(fv) = self.matches {
set += 1;
chosen = Some(SchemaPredicate::Matches {
field: fv.field,
regex: Regex::new(&fv.value).map_err(|e| SchemaError::InvalidRegex {
name: schema_name.to_string(),
error: e.to_string(),
})?,
});
}
match (set, chosen) {
(1, Some(p)) => Ok(p),
(0, _) => Err(SchemaError::Parse(format!(
"schema '{schema_name}': a predicate has no condition (expected one of \
field_present, field_absent, any_of, equals, matches)"
))),
_ => Err(SchemaError::Parse(format!(
"schema '{schema_name}': a predicate sets multiple conditions; use one per list item"
))),
}
}
}
#[derive(Debug, Clone, Deserialize)]
pub struct SchemaSignatureConfig {
pub name: String,
#[serde(default = "default_user_specificity")]
pub specificity: u32,
#[serde(default, rename = "match")]
pub predicates: Vec<SchemaPredicateConfig>,
}
fn default_user_specificity() -> u32 {
50
}
#[derive(Debug, Clone, Default, Deserialize)]
pub struct SchemaSignaturesFile {
#[serde(default)]
pub schemas: Vec<SchemaSignatureConfig>,
#[serde(default)]
pub routing: Option<RoutingConfig>,
}
impl SchemaSignatureConfig {
fn build(self) -> Result<SchemaSignature, SchemaError> {
let name = self.name;
let predicates = self
.predicates
.into_iter()
.map(|p| p.build(&name))
.collect::<Result<Vec<_>, _>>()?;
Ok(SchemaSignature {
name,
predicates,
specificity: self.specificity,
})
}
}
pub fn parse_schema_signatures(yaml: &str) -> Result<Vec<SchemaSignature>, SchemaError> {
let file: SchemaSignaturesFile =
yaml_serde::from_str(yaml).map_err(|e| SchemaError::Parse(e.to_string()))?;
file.schemas.into_iter().map(|s| s.build()).collect()
}
pub fn load_schema_signatures(path: &Path) -> Result<Vec<SchemaSignature>, SchemaError> {
let content = fs::read_to_string(path).map_err(|e| SchemaError::Io {
path: path.display().to_string(),
source: e,
})?;
parse_schema_signatures(&content)
}
pub fn parse_schema_config(
yaml: &str,
) -> Result<(Vec<SchemaSignature>, Option<RoutingConfig>), SchemaError> {
let file: SchemaSignaturesFile =
yaml_serde::from_str(yaml).map_err(|e| SchemaError::Parse(e.to_string()))?;
let signatures = file
.schemas
.into_iter()
.map(|s| s.build())
.collect::<Result<Vec<_>, _>>()?;
Ok((signatures, file.routing))
}
pub fn load_schema_config(
path: &Path,
) -> Result<(Vec<SchemaSignature>, Option<RoutingConfig>), SchemaError> {
let content = fs::read_to_string(path).map_err(|e| SchemaError::Io {
path: path.display().to_string(),
source: e,
})?;
parse_schema_config(&content)
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum OnUnknown {
#[default]
Warn,
Drop,
Passthrough,
Error,
}
#[derive(Debug, Clone, Deserialize)]
pub struct SchemaBinding {
pub schema: String,
#[serde(default)]
pub pipelines: Vec<String>,
}
#[derive(Debug, Clone, Default, Deserialize)]
pub struct RoutingConfig {
#[serde(default)]
pub on_unknown: OnUnknown,
#[serde(default)]
pub bindings: Vec<SchemaBinding>,
#[serde(default)]
pub default_pipelines: Vec<String>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum RouteDecision {
Evaluate { set: usize, unknown: bool },
Drop,
Error,
}
#[derive(Debug, Clone)]
pub struct RoutingPlan {
pipeline_sets: Vec<Vec<String>>,
schema_to_set: HashMap<String, usize>,
on_unknown: OnUnknown,
}
impl RoutingPlan {
pub fn from_config(config: &RoutingConfig) -> Self {
let mut pipeline_sets: Vec<Vec<String>> = vec![config.default_pipelines.clone()];
let mut schema_to_set: HashMap<String, usize> = HashMap::new();
for binding in &config.bindings {
let idx = pipeline_sets
.iter()
.position(|s| s == &binding.pipelines)
.unwrap_or_else(|| {
pipeline_sets.push(binding.pipelines.clone());
pipeline_sets.len() - 1
});
schema_to_set.insert(binding.schema.clone(), idx);
}
RoutingPlan {
pipeline_sets,
schema_to_set,
on_unknown: config.on_unknown,
}
}
pub fn pipeline_sets(&self) -> &[Vec<String>] {
&self.pipeline_sets
}
pub fn on_unknown(&self) -> OnUnknown {
self.on_unknown
}
pub fn decide(&self, schema: Option<&str>) -> RouteDecision {
match schema {
Some(s) if self.schema_to_set.contains_key(s) => RouteDecision::Evaluate {
set: self.schema_to_set[s],
unknown: false,
},
Some(_) => RouteDecision::Evaluate {
set: 0,
unknown: false,
},
None => match self.on_unknown {
OnUnknown::Warn | OnUnknown::Passthrough => RouteDecision::Evaluate {
set: 0,
unknown: true,
},
OnUnknown::Drop => RouteDecision::Drop,
OnUnknown::Error => RouteDecision::Error,
},
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct SchemaCountEntry {
pub schema: String,
pub count: u64,
}
#[derive(Debug, Clone, Default)]
pub struct SchemaObservation {
pub by_schema: Vec<SchemaCountEntry>,
pub classified: u64,
pub unknown: u64,
pub events_observed: u64,
pub lifetime_classified: u64,
pub lifetime_unknown: u64,
pub uptime_seconds: f64,
}
pub struct SchemaObserver {
classifier: SchemaClassifier,
counts: Mutex<HashMap<String, u64>>,
unknown: AtomicU64,
events_observed: AtomicU64,
lifetime_classified: AtomicU64,
lifetime_unknown: AtomicU64,
start: Mutex<Instant>,
}
impl SchemaObserver {
pub fn new(classifier: SchemaClassifier) -> Self {
Self {
classifier,
counts: Mutex::new(HashMap::new()),
unknown: AtomicU64::new(0),
events_observed: AtomicU64::new(0),
lifetime_classified: AtomicU64::new(0),
lifetime_unknown: AtomicU64::new(0),
start: Mutex::new(Instant::now()),
}
}
pub fn builtin() -> Self {
Self::new(SchemaClassifier::builtin())
}
pub fn observe<E: Event + ?Sized>(&self, event: &E) {
self.events_observed.fetch_add(1, Ordering::Relaxed);
match self.classifier.classify(event) {
Some(m) => {
self.lifetime_classified.fetch_add(1, Ordering::Relaxed);
let mut counts = self.counts.lock().expect("schema observer mutex poisoned");
*counts.entry(m.name).or_insert(0) += 1;
}
None => {
self.unknown.fetch_add(1, Ordering::Relaxed);
self.lifetime_unknown.fetch_add(1, Ordering::Relaxed);
}
}
}
pub fn snapshot(&self) -> SchemaObservation {
let counts = self.counts.lock().expect("schema observer mutex poisoned");
let mut by_schema: Vec<SchemaCountEntry> = counts
.iter()
.map(|(schema, count)| SchemaCountEntry {
schema: schema.clone(),
count: *count,
})
.collect();
let classified: u64 = counts.values().sum();
drop(counts);
by_schema.sort_by(|a, b| b.count.cmp(&a.count).then_with(|| a.schema.cmp(&b.schema)));
let unknown = self.unknown.load(Ordering::Relaxed);
SchemaObservation {
by_schema,
classified,
unknown,
events_observed: self.events_observed.load(Ordering::Relaxed),
lifetime_classified: self.lifetime_classified.load(Ordering::Relaxed),
lifetime_unknown: self.lifetime_unknown.load(Ordering::Relaxed),
uptime_seconds: self
.start
.lock()
.expect("schema observer start mutex poisoned")
.elapsed()
.as_secs_f64(),
}
}
pub fn reset(&self) -> (u64, u64) {
let mut counts = self.counts.lock().expect("schema observer mutex poisoned");
let previous_classified: u64 = counts.values().sum();
counts.clear();
drop(counts);
let previous_unknown = self.unknown.swap(0, Ordering::Relaxed);
self.events_observed.store(0, Ordering::Relaxed);
*self
.start
.lock()
.expect("schema observer start mutex poisoned") = Instant::now();
(previous_classified, previous_unknown)
}
pub fn lifetime_classified(&self) -> u64 {
self.lifetime_classified.load(Ordering::Relaxed)
}
pub fn lifetime_unknown(&self) -> u64 {
self.lifetime_unknown.load(Ordering::Relaxed)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::event::JsonEvent;
use serde_json::json;
fn classify(value: &serde_json::Value) -> Option<String> {
SchemaClassifier::builtin()
.classify(&JsonEvent::borrow(value))
.map(|m| m.name)
}
#[test]
fn recognizes_ecs_by_version_marker() {
let v = json!({"ecs": {"version": "8.11.0"}, "process": {"command_line": "whoami"}});
assert_eq!(classify(&v).as_deref(), Some("ecs"));
}
#[test]
fn recognizes_ecs_with_flattened_keys() {
let v = json!({"ecs.version": "8.11.0", "process.command_line": "whoami"});
assert_eq!(classify(&v).as_deref(), Some("ecs"));
}
#[test]
fn recognizes_ocsf_by_class_and_metadata() {
let v = json!({"class_uid": 1001, "category_uid": 1, "metadata": {"version": "1.1.0"}});
assert_eq!(classify(&v).as_deref(), Some("ocsf"));
}
#[test]
fn recognizes_rendered_windows_event_log() {
let v = json!({"Event": {"System": {"EventID": 4688, "Provider": "Microsoft-Windows-Security-Auditing"}}});
assert_eq!(classify(&v).as_deref(), Some("windows_eventlog"));
}
#[test]
fn recognizes_sysmon_by_channel() {
let v = json!({"Channel": "Microsoft-Windows-Sysmon/Operational", "EventID": 1, "Image": "C:/cmd.exe"});
assert_eq!(classify(&v).as_deref(), Some("sysmon"));
}
#[test]
fn recognizes_sysmon_by_provider() {
let v = json!({"Provider_Name": "Microsoft-Windows-Sysmon", "EventID": 3});
assert_eq!(classify(&v).as_deref(), Some("sysmon"));
}
#[test]
fn recognizes_flat_sysmon_by_field_shape() {
let v = json!({"EventID": 1, "ProcessGuid": "{abc}", "CommandLine": "cmd /c whoami"});
assert_eq!(classify(&v).as_deref(), Some("sysmon"));
}
#[test]
fn recognizes_cef_structured_fields() {
let v = json!({"deviceVendor": "Security", "deviceProduct": "IDS", "signatureId": "100", "src": "10.0.0.1"});
assert_eq!(classify(&v).as_deref(), Some("cef"));
}
#[test]
fn falls_back_to_generic_json_for_unrecognized_structured_events() {
let v = json!({"some_vendor_field": "x", "another": 1});
assert_eq!(classify(&v).as_deref(), Some("generic_json"));
}
#[test]
fn fieldless_events_are_unknown() {
assert_eq!(classify(&json!({})), None);
assert_eq!(classify(&json!("just a string")), None);
}
#[test]
fn specificity_prefers_specific_schema_over_generic() {
let v = json!({"ecs.version": "8.0.0", "vendor_blob": {"x": 1}});
let cls = SchemaClassifier::builtin();
let m = cls.classify(&JsonEvent::borrow(&v)).unwrap();
assert_eq!(m.name, "ecs");
assert_eq!(m.specificity, 100);
let all = cls.classify_all(&JsonEvent::borrow(&v));
assert_eq!(all.first().map(String::as_str), Some("ecs"));
assert!(all.iter().any(|n| n == "generic_json"));
}
#[test]
fn schema_names_lists_builtins_most_specific_first() {
let classifier = SchemaClassifier::builtin();
let names = classifier.schema_names();
assert_eq!(names.first(), Some(&"ecs"));
assert!(names.contains(&"generic_json"));
assert_eq!(names.last(), Some(&"generic_json"));
}
#[test]
fn parses_user_signatures_from_yaml() {
let yaml = r#"
schemas:
- name: my_vendor
specificity: 70
match:
- field_present: vendor.product
- equals:
field: event_type
value: alert
- any_of: [a, b]
"#;
let sigs = parse_schema_signatures(yaml).expect("parse");
assert_eq!(sigs.len(), 1);
assert_eq!(sigs[0].name, "my_vendor");
assert_eq!(sigs[0].specificity, 70);
assert_eq!(sigs[0].predicates.len(), 3);
let cls = SchemaClassifier::with_user_signatures(sigs);
let v = json!({"vendor": {"product": "X"}, "event_type": "ALERT", "a": 1});
assert_eq!(
cls.classify(&JsonEvent::borrow(&v))
.map(|m| m.name)
.as_deref(),
Some("my_vendor")
);
}
#[test]
fn user_signature_with_invalid_regex_is_rejected() {
let yaml = r#"
schemas:
- name: bad
match:
- matches:
field: msg
value: "([unclosed"
"#;
let err = parse_schema_signatures(yaml).unwrap_err();
assert!(matches!(err, SchemaError::InvalidRegex { .. }));
}
#[test]
fn user_regex_signature_matches_field_value() {
let yaml = r#"
schemas:
- name: cef_raw
specificity: 60
match:
- matches:
field: message
value: "^CEF:\\d"
"#;
let sigs = parse_schema_signatures(yaml).expect("parse");
let cls = SchemaClassifier::with_user_signatures(sigs);
let v = json!({"message": "CEF:0|Vendor|Product|1.0|100|Name|9|src=1.2.3.4"});
assert_eq!(
cls.classify(&JsonEvent::borrow(&v))
.map(|m| m.name)
.as_deref(),
Some("cef_raw")
);
}
#[test]
fn observer_counts_per_schema_and_unknown() {
let observer = SchemaObserver::builtin();
observer.observe(&JsonEvent::borrow(&json!({"ecs.version": "8.0.0"})));
observer.observe(&JsonEvent::borrow(&json!({"ecs.version": "8.1.0"})));
observer.observe(&JsonEvent::borrow(
&json!({"class_uid": 1001, "metadata": {"version": "1.1.0"}}),
));
observer.observe(&JsonEvent::borrow(&json!({})));
let snap = observer.snapshot();
assert_eq!(snap.events_observed, 4);
assert_eq!(snap.classified, 3);
assert_eq!(snap.unknown, 1);
assert_eq!(snap.by_schema[0].schema, "ecs");
assert_eq!(snap.by_schema[0].count, 2);
let ocsf = snap.by_schema.iter().find(|e| e.schema == "ocsf").unwrap();
assert_eq!(ocsf.count, 1);
}
#[test]
fn routing_plan_dedups_pipeline_sets() {
let config = RoutingConfig {
on_unknown: OnUnknown::Warn,
default_pipelines: vec![],
bindings: vec![
SchemaBinding {
schema: "ecs".to_string(),
pipelines: vec!["ecs_windows".to_string()],
},
SchemaBinding {
schema: "winlogbeat".to_string(),
pipelines: vec!["ecs_windows".to_string()],
},
SchemaBinding {
schema: "sysmon".to_string(),
pipelines: vec!["sysmon".to_string()],
},
],
};
let plan = RoutingPlan::from_config(&config);
assert_eq!(plan.pipeline_sets().len(), 3);
let ecs = plan.decide(Some("ecs"));
let win = plan.decide(Some("winlogbeat"));
assert_eq!(ecs, win);
assert!(matches!(
ecs,
RouteDecision::Evaluate { unknown: false, .. }
));
assert_ne!(plan.decide(Some("sysmon")), ecs);
}
#[test]
fn routing_decides_bound_unbound_and_unknown() {
let config = RoutingConfig {
on_unknown: OnUnknown::Warn,
default_pipelines: vec![],
bindings: vec![SchemaBinding {
schema: "ecs".to_string(),
pipelines: vec!["ecs_windows".to_string()],
}],
};
let plan = RoutingPlan::from_config(&config);
assert!(matches!(
plan.decide(Some("ecs")),
RouteDecision::Evaluate { unknown: false, .. }
));
assert_eq!(
plan.decide(Some("cef")),
RouteDecision::Evaluate {
set: 0,
unknown: false
}
);
assert_eq!(
plan.decide(None),
RouteDecision::Evaluate {
set: 0,
unknown: true
}
);
}
#[test]
fn routing_on_unknown_policies() {
let base = |policy| RoutingConfig {
on_unknown: policy,
default_pipelines: vec![],
bindings: vec![],
};
assert_eq!(
RoutingPlan::from_config(&base(OnUnknown::Drop)).decide(None),
RouteDecision::Drop
);
assert_eq!(
RoutingPlan::from_config(&base(OnUnknown::Error)).decide(None),
RouteDecision::Error
);
assert_eq!(
RoutingPlan::from_config(&base(OnUnknown::Passthrough)).decide(None),
RouteDecision::Evaluate {
set: 0,
unknown: true
}
);
}
#[test]
fn parses_routing_section_from_yaml() {
let yaml = r#"
schemas:
- name: my_vendor
match:
- field_present: vendor.id
routing:
on_unknown: drop
default_pipelines: [base]
bindings:
- schema: ecs
pipelines: [ecs_windows]
- schema: my_vendor
pipelines: [vendor_map, base]
"#;
let (sigs, routing) = parse_schema_config(yaml).expect("parse");
assert_eq!(sigs.len(), 1);
let routing = routing.expect("routing present");
assert_eq!(routing.on_unknown, OnUnknown::Drop);
assert_eq!(routing.default_pipelines, vec!["base".to_string()]);
assert_eq!(routing.bindings.len(), 2);
let plan = RoutingPlan::from_config(&routing);
assert_eq!(plan.pipeline_sets().len(), 3);
assert_eq!(plan.decide(None), RouteDecision::Drop);
}
#[test]
fn observer_reset_preserves_lifetime_counters() {
let observer = SchemaObserver::builtin();
observer.observe(&JsonEvent::borrow(&json!({"ecs.version": "8.0.0"})));
observer.observe(&JsonEvent::borrow(&json!({})));
let (classified, unknown) = observer.reset();
assert_eq!(classified, 1);
assert_eq!(unknown, 1);
let snap = observer.snapshot();
assert_eq!(snap.classified, 0);
assert_eq!(snap.unknown, 0);
assert_eq!(snap.events_observed, 0);
assert_eq!(snap.lifetime_classified, 1);
assert_eq!(snap.lifetime_unknown, 1);
}
}