Skip to main content

rsigma_eval/
schema.rs

1//! Schema classification: recognize the structure of a parsed event.
2//!
3//! Real-world streams mix log schemas: one feed can carry ECS-normalized
4//! events, raw (rendered) Windows Event Log, flat Sysmon JSON, CEF, OCSF, or
5//! vendor-specific shapes, and the wire format is often still JSON while only
6//! the field names differ. This module recognizes which schema a parsed event
7//! belongs to from its *content* (marker fields and values), not from the
8//! input format, so it works regardless of how the event arrived.
9//!
10//! Classification is declarative: each [`SchemaSignature`] is a set of
11//! [`SchemaPredicate`]s that must all hold (logical AND). The
12//! [`SchemaClassifier`] returns the highest-[`specificity`](SchemaSignature::specificity)
13//! signature that matches, breaking ties by name for determinism. Returning
14//! `None` means the event matched no signature ("unknown"), which is the
15//! actionable signal for surfacing unsupported schemas.
16//!
17//! Built-in signatures cover `ecs`, `ocsf`, `windows_eventlog`, `sysmon`,
18//! `cef`, and a low-specificity `generic_json` fallback for structured events
19//! that match no specific security schema. Users extend the set with their own
20//! signatures loaded from YAML (see [`parse_schema_signatures`]).
21//!
22//! Detection-side only: this recognizes events so callers can route them to the
23//! right field-mapping pipeline. It does not collect, transport, or normalize
24//! events.
25
26use std::collections::HashMap;
27use std::fs;
28use std::path::Path;
29use std::sync::Mutex;
30use std::sync::atomic::{AtomicU64, Ordering};
31use std::time::Instant;
32
33use regex::Regex;
34use serde::Deserialize;
35
36use crate::event::Event;
37
38/// A single condition over a parsed event used to recognize a schema.
39///
40/// Field names use the same dot-notation as [`Event::get_field`], so nested
41/// shapes like `Event.System.EventID` or `ecs.version` work whether the event
42/// is nested or carries flattened dotted keys.
43#[derive(Debug, Clone)]
44pub enum SchemaPredicate {
45    /// The named field is present (any non-absent value, including null).
46    FieldPresent(String),
47    /// The named field is absent.
48    FieldAbsent(String),
49    /// At least one of the named fields is present.
50    AnyOf(Vec<String>),
51    /// The field is present and its string-coerced value equals `value`
52    /// (ASCII case-insensitive).
53    Equals { field: String, value: String },
54    /// The field is present and its string-coerced value matches `regex`.
55    Matches { field: String, regex: Regex },
56    /// The event has at least one structured field. Used by the
57    /// `generic_json` fallback to distinguish structured events from
58    /// field-less ones (raw text, empty objects), which stay "unknown".
59    HasAnyField,
60}
61
62impl SchemaPredicate {
63    fn eval<E: Event + ?Sized>(&self, event: &E) -> bool {
64        match self {
65            SchemaPredicate::FieldPresent(f) => event.get_field(f).is_some(),
66            SchemaPredicate::FieldAbsent(f) => event.get_field(f).is_none(),
67            SchemaPredicate::AnyOf(fields) => fields.iter().any(|f| event.get_field(f).is_some()),
68            SchemaPredicate::Equals { field, value } => event
69                .get_field(field)
70                .and_then(|v| v.as_str().map(|s| s.as_ref().eq_ignore_ascii_case(value)))
71                .unwrap_or(false),
72            SchemaPredicate::Matches { field, regex } => event
73                .get_field(field)
74                .and_then(|v| v.as_str().map(|s| regex.is_match(s.as_ref())))
75                .unwrap_or(false),
76            SchemaPredicate::HasAnyField => !event.field_keys().is_empty(),
77        }
78    }
79}
80
81/// A named schema recognizer: every predicate must hold for the signature to
82/// match. Higher `specificity` wins when several signatures match the same
83/// event. Multiple signatures may share a `name` (for example several distinct
84/// ways to recognize Sysmon); the classifier reports the name.
85#[derive(Debug, Clone)]
86pub struct SchemaSignature {
87    /// Schema label reported on a match (for example `ecs`, `sysmon`).
88    pub name: String,
89    /// Conditions that must all hold (logical AND). An empty predicate set
90    /// matches every event; prefer [`SchemaPredicate::HasAnyField`] for a
91    /// structured-event fallback.
92    pub predicates: Vec<SchemaPredicate>,
93    /// Tie-breaking weight; the highest-specificity matching signature wins.
94    pub specificity: u32,
95}
96
97impl SchemaSignature {
98    fn matches<E: Event + ?Sized>(&self, event: &E) -> bool {
99        self.predicates.iter().all(|p| p.eval(event))
100    }
101}
102
103/// The result of classifying an event: the matched schema name and the
104/// specificity of the signature that matched.
105#[derive(Debug, Clone, PartialEq, Eq)]
106pub struct SchemaMatch {
107    pub name: String,
108    pub specificity: u32,
109}
110
111/// Recognizes the schema of parsed events from a set of signatures.
112///
113/// Signatures are sorted once at construction (specificity descending, then
114/// name ascending) so [`classify`](Self::classify) returns the best match with
115/// a single in-order scan.
116#[derive(Debug, Clone)]
117pub struct SchemaClassifier {
118    signatures: Vec<SchemaSignature>,
119}
120
121impl SchemaClassifier {
122    /// Build a classifier from an explicit signature set.
123    pub fn new(mut signatures: Vec<SchemaSignature>) -> Self {
124        signatures.sort_by(|a, b| {
125            b.specificity
126                .cmp(&a.specificity)
127                .then_with(|| a.name.cmp(&b.name))
128        });
129        Self { signatures }
130    }
131
132    /// Build a classifier from the built-in signatures only.
133    pub fn builtin() -> Self {
134        Self::new(builtin_signatures())
135    }
136
137    /// Build a classifier from the built-ins plus user-supplied signatures.
138    /// User signatures are added to (not replacing) the built-ins; a user
139    /// signature with a higher specificity than a built-in wins on overlap.
140    pub fn with_user_signatures(user: Vec<SchemaSignature>) -> Self {
141        let mut signatures = builtin_signatures();
142        signatures.extend(user);
143        Self::new(signatures)
144    }
145
146    /// Classify an event. Returns the highest-specificity matching schema, or
147    /// `None` when the event matches no signature ("unknown").
148    pub fn classify<E: Event + ?Sized>(&self, event: &E) -> Option<SchemaMatch> {
149        self.signatures
150            .iter()
151            .find(|s| s.matches(event))
152            .map(|s| SchemaMatch {
153                name: s.name.clone(),
154                specificity: s.specificity,
155            })
156    }
157
158    /// All matching schema names for an event, most specific first. Useful for
159    /// tuning signatures (seeing what else an event could match). Deduplicated
160    /// by name while preserving order.
161    pub fn classify_all<E: Event + ?Sized>(&self, event: &E) -> Vec<String> {
162        let mut out: Vec<String> = Vec::new();
163        for sig in self.signatures.iter().filter(|s| s.matches(event)) {
164            if !out.iter().any(|n| n == &sig.name) {
165                out.push(sig.name.clone());
166            }
167        }
168        out
169    }
170
171    /// Distinct schema names this classifier can produce, most specific first.
172    pub fn schema_names(&self) -> Vec<&str> {
173        let mut out: Vec<&str> = Vec::new();
174        for sig in &self.signatures {
175            if !out.contains(&sig.name.as_str()) {
176                out.push(sig.name.as_str());
177            }
178        }
179        out
180    }
181}
182
183impl Default for SchemaClassifier {
184    fn default() -> Self {
185        Self::builtin()
186    }
187}
188
189/// The built-in schema signatures, derived from the public schema specs:
190/// Elastic Common Schema, OCSF, the Windows event XML model, Microsoft
191/// Sysmon, and the ArcSight CEF spec.
192fn builtin_signatures() -> Vec<SchemaSignature> {
193    vec![
194        // ECS (Elastic Common Schema): `ecs.version` is the canonical marker.
195        SchemaSignature {
196            name: "ecs".to_string(),
197            specificity: 100,
198            predicates: vec![SchemaPredicate::FieldPresent("ecs.version".to_string())],
199        },
200        // OCSF: class_uid plus metadata.version are mandatory discriminators.
201        SchemaSignature {
202            name: "ocsf".to_string(),
203            specificity: 95,
204            predicates: vec![
205                SchemaPredicate::FieldPresent("class_uid".to_string()),
206                SchemaPredicate::FieldPresent("metadata.version".to_string()),
207            ],
208        },
209        // Rendered Windows Event Log (EVTX decoded to JSON): Event.System.*.
210        SchemaSignature {
211            name: "windows_eventlog".to_string(),
212            specificity: 90,
213            predicates: vec![SchemaPredicate::AnyOf(vec![
214                "Event.System.EventID".to_string(),
215                "Event.System.Provider".to_string(),
216            ])],
217        },
218        // Sysmon (flat) via the operational channel marker.
219        SchemaSignature {
220            name: "sysmon".to_string(),
221            specificity: 88,
222            predicates: vec![SchemaPredicate::Equals {
223                field: "Channel".to_string(),
224                value: "Microsoft-Windows-Sysmon/Operational".to_string(),
225            }],
226        },
227        // Sysmon (flat) via the provider marker.
228        SchemaSignature {
229            name: "sysmon".to_string(),
230            specificity: 88,
231            predicates: vec![SchemaPredicate::Equals {
232                field: "Provider_Name".to_string(),
233                value: "Microsoft-Windows-Sysmon".to_string(),
234            }],
235        },
236        // Sysmon (flat) via field shape when no provider/channel tag is present.
237        SchemaSignature {
238            name: "sysmon".to_string(),
239            specificity: 80,
240            predicates: vec![
241                SchemaPredicate::FieldPresent("EventID".to_string()),
242                SchemaPredicate::FieldPresent("ProcessGuid".to_string()),
243                SchemaPredicate::AnyOf(vec!["Image".to_string(), "CommandLine".to_string()]),
244            ],
245        },
246        // CEF: structured header fields produced by the CEF parser or carried
247        // in JSON (deviceVendor / deviceProduct / signatureId).
248        SchemaSignature {
249            name: "cef".to_string(),
250            specificity: 85,
251            predicates: vec![
252                SchemaPredicate::FieldPresent("deviceVendor".to_string()),
253                SchemaPredicate::FieldPresent("deviceProduct".to_string()),
254                SchemaPredicate::FieldPresent("signatureId".to_string()),
255            ],
256        },
257        // Generic JSON: any structured event that matched no specific schema.
258        SchemaSignature {
259            name: "generic_json".to_string(),
260            specificity: 0,
261            predicates: vec![SchemaPredicate::HasAnyField],
262        },
263    ]
264}
265
266/// Distinct built-in schema names, most specific first.
267pub fn builtin_schema_names() -> Vec<&'static str> {
268    vec![
269        "ecs",
270        "ocsf",
271        "windows_eventlog",
272        "sysmon",
273        "cef",
274        "generic_json",
275    ]
276}
277
278// =============================================================================
279// User-supplied signatures (YAML config)
280// =============================================================================
281
282/// Errors raised while loading user schema signatures.
283#[derive(Debug, thiserror::Error)]
284pub enum SchemaError {
285    /// The signatures file could not be read.
286    #[error("cannot read schema signatures file '{path}': {source}")]
287    Io {
288        path: String,
289        #[source]
290        source: std::io::Error,
291    },
292    /// The signatures YAML failed to parse.
293    #[error("schema signatures YAML parse error: {0}")]
294    Parse(String),
295    /// A predicate carried an invalid regular expression.
296    #[error("invalid regex in schema '{name}': {error}")]
297    InvalidRegex { name: String, error: String },
298}
299
300/// A `{ field: ..., value: ... }` pair used by the `equals` and `matches`
301/// predicate forms.
302#[derive(Debug, Clone, Deserialize)]
303#[serde(deny_unknown_fields)]
304pub struct FieldValueConfig {
305    pub field: String,
306    pub value: String,
307}
308
309/// A predicate as written in YAML: a single-key map, for example
310/// `field_present: ecs.version` or `equals: { field: type, value: alert }`.
311/// Exactly one form must be set per list entry.
312#[derive(Debug, Clone, Default, Deserialize)]
313#[serde(deny_unknown_fields)]
314pub struct SchemaPredicateConfig {
315    /// `field_present: <field>`
316    #[serde(default)]
317    pub field_present: Option<String>,
318    /// `field_absent: <field>`
319    #[serde(default)]
320    pub field_absent: Option<String>,
321    /// `any_of: [<field>, ...]`
322    #[serde(default)]
323    pub any_of: Option<Vec<String>>,
324    /// `equals: { field: <field>, value: <value> }`
325    #[serde(default)]
326    pub equals: Option<FieldValueConfig>,
327    /// `matches: { field: <field>, value: <regex> }`
328    #[serde(default)]
329    pub matches: Option<FieldValueConfig>,
330}
331
332impl SchemaPredicateConfig {
333    fn build(self, schema_name: &str) -> Result<SchemaPredicate, SchemaError> {
334        let mut chosen: Option<SchemaPredicate> = None;
335        let mut set = 0u32;
336        if let Some(f) = self.field_present {
337            set += 1;
338            chosen = Some(SchemaPredicate::FieldPresent(f));
339        }
340        if let Some(f) = self.field_absent {
341            set += 1;
342            chosen = Some(SchemaPredicate::FieldAbsent(f));
343        }
344        if let Some(fields) = self.any_of {
345            set += 1;
346            chosen = Some(SchemaPredicate::AnyOf(fields));
347        }
348        if let Some(fv) = self.equals {
349            set += 1;
350            chosen = Some(SchemaPredicate::Equals {
351                field: fv.field,
352                value: fv.value,
353            });
354        }
355        if let Some(fv) = self.matches {
356            set += 1;
357            chosen = Some(SchemaPredicate::Matches {
358                field: fv.field,
359                regex: Regex::new(&fv.value).map_err(|e| SchemaError::InvalidRegex {
360                    name: schema_name.to_string(),
361                    error: e.to_string(),
362                })?,
363            });
364        }
365        match (set, chosen) {
366            (1, Some(p)) => Ok(p),
367            (0, _) => Err(SchemaError::Parse(format!(
368                "schema '{schema_name}': a predicate has no condition (expected one of \
369                 field_present, field_absent, any_of, equals, matches)"
370            ))),
371            _ => Err(SchemaError::Parse(format!(
372                "schema '{schema_name}': a predicate sets multiple conditions; use one per list item"
373            ))),
374        }
375    }
376}
377
378/// A signature as written in YAML.
379#[derive(Debug, Clone, Deserialize)]
380pub struct SchemaSignatureConfig {
381    /// Schema label reported on a match.
382    pub name: String,
383    /// Tie-breaking weight (default 50, above `generic_json` and below the
384    /// strong built-ins by default).
385    #[serde(default = "default_user_specificity")]
386    pub specificity: u32,
387    /// Conditions that must all hold.
388    #[serde(default, rename = "match")]
389    pub predicates: Vec<SchemaPredicateConfig>,
390}
391
392fn default_user_specificity() -> u32 {
393    50
394}
395
396/// Top-level YAML document holding a `schemas:` list and an optional
397/// `routing:` section.
398#[derive(Debug, Clone, Default, Deserialize)]
399pub struct SchemaSignaturesFile {
400    #[serde(default)]
401    pub schemas: Vec<SchemaSignatureConfig>,
402    #[serde(default)]
403    pub routing: Option<RoutingConfig>,
404}
405
406impl SchemaSignatureConfig {
407    fn build(self) -> Result<SchemaSignature, SchemaError> {
408        let name = self.name;
409        let predicates = self
410            .predicates
411            .into_iter()
412            .map(|p| p.build(&name))
413            .collect::<Result<Vec<_>, _>>()?;
414        Ok(SchemaSignature {
415            name,
416            predicates,
417            specificity: self.specificity,
418        })
419    }
420}
421
422/// Parse user schema signatures from a YAML string.
423pub fn parse_schema_signatures(yaml: &str) -> Result<Vec<SchemaSignature>, SchemaError> {
424    let file: SchemaSignaturesFile =
425        yaml_serde::from_str(yaml).map_err(|e| SchemaError::Parse(e.to_string()))?;
426    file.schemas.into_iter().map(|s| s.build()).collect()
427}
428
429/// Load user schema signatures from a YAML file path.
430pub fn load_schema_signatures(path: &Path) -> Result<Vec<SchemaSignature>, SchemaError> {
431    let content = fs::read_to_string(path).map_err(|e| SchemaError::Io {
432        path: path.display().to_string(),
433        source: e,
434    })?;
435    parse_schema_signatures(&content)
436}
437
438/// Parse both the user signatures and the optional routing section from a YAML
439/// string.
440pub fn parse_schema_config(
441    yaml: &str,
442) -> Result<(Vec<SchemaSignature>, Option<RoutingConfig>), SchemaError> {
443    let file: SchemaSignaturesFile =
444        yaml_serde::from_str(yaml).map_err(|e| SchemaError::Parse(e.to_string()))?;
445    let signatures = file
446        .schemas
447        .into_iter()
448        .map(|s| s.build())
449        .collect::<Result<Vec<_>, _>>()?;
450    Ok((signatures, file.routing))
451}
452
453/// Load both the user signatures and the optional routing section from a YAML
454/// file path.
455pub fn load_schema_config(
456    path: &Path,
457) -> Result<(Vec<SchemaSignature>, Option<RoutingConfig>), SchemaError> {
458    let content = fs::read_to_string(path).map_err(|e| SchemaError::Io {
459        path: path.display().to_string(),
460        source: e,
461    })?;
462    parse_schema_config(&content)
463}
464
465// =============================================================================
466// Routing: schema -> pipeline-set bindings and the dispatch plan
467// =============================================================================
468
469/// What to do with an event whose schema matched no signature.
470#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Deserialize)]
471#[serde(rename_all = "snake_case")]
472pub enum OnUnknown {
473    /// Evaluate against the default pipeline-set and log a warning.
474    #[default]
475    Warn,
476    /// Drop the event without evaluating.
477    Drop,
478    /// Evaluate against the default pipeline-set without logging.
479    Passthrough,
480    /// Drop the event and flag it as an error (non-zero exit / error counter).
481    Error,
482}
483
484/// A `schema -> pipelines` binding: events recognized as `schema` are
485/// evaluated against the engine built from `pipelines`.
486#[derive(Debug, Clone, Deserialize)]
487pub struct SchemaBinding {
488    pub schema: String,
489    /// Pipeline names or file paths, resolved by the caller.
490    #[serde(default)]
491    pub pipelines: Vec<String>,
492}
493
494/// The `routing:` section of a schema config file.
495#[derive(Debug, Clone, Default, Deserialize)]
496pub struct RoutingConfig {
497    #[serde(default)]
498    pub on_unknown: OnUnknown,
499    #[serde(default)]
500    pub bindings: Vec<SchemaBinding>,
501    /// Pipelines applied to known-but-unbound schemas and to the
502    /// unknown-fallback path. Empty means "rules with no pipeline".
503    #[serde(default)]
504    pub default_pipelines: Vec<String>,
505}
506
507/// The decision for one event, produced by [`RoutingPlan::decide`].
508#[derive(Debug, Clone, Copy, PartialEq, Eq)]
509pub enum RouteDecision {
510    /// Evaluate against the pipeline-set at this index. `unknown` is true when
511    /// the event matched no signature and fell through to the default set.
512    Evaluate { set: usize, unknown: bool },
513    /// Drop the event without evaluating (`on_unknown: drop`).
514    Drop,
515    /// Drop and flag as an error (`on_unknown: error`).
516    Error,
517}
518
519/// A resolved routing plan: the deduplicated pipeline-sets to build one engine
520/// each, plus the schema-to-set mapping and the unknown-handling policy.
521///
522/// Pure data: it decides *which* pipeline-set an event routes to, leaving the
523/// engine construction and dispatch to the caller. The default set (index 0)
524/// is always present, so there is always a fallback target.
525#[derive(Debug, Clone)]
526pub struct RoutingPlan {
527    /// Deduplicated pipeline-sets. Index 0 is always the default set.
528    pipeline_sets: Vec<Vec<String>>,
529    /// Recognized schema name -> pipeline-set index.
530    schema_to_set: HashMap<String, usize>,
531    on_unknown: OnUnknown,
532}
533
534impl RoutingPlan {
535    /// Build a plan from a routing config, deduplicating identical
536    /// pipeline-sets so the caller compiles each distinct set once.
537    pub fn from_config(config: &RoutingConfig) -> Self {
538        // Index 0 is always the default set.
539        let mut pipeline_sets: Vec<Vec<String>> = vec![config.default_pipelines.clone()];
540        let mut schema_to_set: HashMap<String, usize> = HashMap::new();
541
542        for binding in &config.bindings {
543            let idx = pipeline_sets
544                .iter()
545                .position(|s| s == &binding.pipelines)
546                .unwrap_or_else(|| {
547                    pipeline_sets.push(binding.pipelines.clone());
548                    pipeline_sets.len() - 1
549                });
550            schema_to_set.insert(binding.schema.clone(), idx);
551        }
552
553        RoutingPlan {
554            pipeline_sets,
555            schema_to_set,
556            on_unknown: config.on_unknown,
557        }
558    }
559
560    /// The deduplicated pipeline-sets, in index order (set 0 is the default).
561    /// The caller builds one engine per entry.
562    pub fn pipeline_sets(&self) -> &[Vec<String>] {
563        &self.pipeline_sets
564    }
565
566    /// The configured unknown-handling policy.
567    pub fn on_unknown(&self) -> OnUnknown {
568        self.on_unknown
569    }
570
571    /// Decide how to route an event given its classified schema (or `None`
572    /// when nothing matched).
573    pub fn decide(&self, schema: Option<&str>) -> RouteDecision {
574        match schema {
575            // Recognized and bound: its own set.
576            Some(s) if self.schema_to_set.contains_key(s) => RouteDecision::Evaluate {
577                set: self.schema_to_set[s],
578                unknown: false,
579            },
580            // Recognized but unbound: the default set, not flagged unknown.
581            Some(_) => RouteDecision::Evaluate {
582                set: 0,
583                unknown: false,
584            },
585            // Unrecognized: per the unknown policy.
586            None => match self.on_unknown {
587                OnUnknown::Warn | OnUnknown::Passthrough => RouteDecision::Evaluate {
588                    set: 0,
589                    unknown: true,
590                },
591                OnUnknown::Drop => RouteDecision::Drop,
592                OnUnknown::Error => RouteDecision::Error,
593            },
594        }
595    }
596}
597
598// =============================================================================
599// SchemaObserver: opt-in per-schema counting for reporting
600// =============================================================================
601
602/// One per-schema counter as exposed via [`SchemaObserver::snapshot`].
603#[derive(Debug, Clone, PartialEq, Eq)]
604pub struct SchemaCountEntry {
605    /// Recognized schema name.
606    pub schema: String,
607    /// Number of events classified as this schema since the last reset.
608    pub count: u64,
609}
610
611/// Immutable snapshot of a [`SchemaObserver`] at one moment.
612#[derive(Debug, Clone, Default)]
613pub struct SchemaObservation {
614    /// Per-schema counts, sorted by descending count then ascending name.
615    pub by_schema: Vec<SchemaCountEntry>,
616    /// Events classified into a known schema since the last reset.
617    pub classified: u64,
618    /// Events that matched no signature since the last reset.
619    pub unknown: u64,
620    /// Total events observed since the last reset (`classified + unknown`).
621    pub events_observed: u64,
622    /// Lifetime total of classified events, ignoring resets. Monotonic, so it
623    /// can drive Prometheus counters across observer resets.
624    pub lifetime_classified: u64,
625    /// Lifetime total of unknown events, ignoring resets. Monotonic.
626    pub lifetime_unknown: u64,
627    /// Seconds since the observer was created (or last reset).
628    pub uptime_seconds: f64,
629}
630
631/// Opt-in counter that classifies each observed event and tallies per-schema
632/// (and unknown) totals. Mirrors the design of [`FieldObserver`](crate::FieldObserver):
633/// shared behind an `Arc`, cheap repeated snapshots, monotonic lifetime
634/// counters for a Prometheus bridge. The schema set is small and bounded, so
635/// there is no key cap.
636pub struct SchemaObserver {
637    classifier: SchemaClassifier,
638    counts: Mutex<HashMap<String, u64>>,
639    unknown: AtomicU64,
640    events_observed: AtomicU64,
641    lifetime_classified: AtomicU64,
642    lifetime_unknown: AtomicU64,
643    start: Mutex<Instant>,
644}
645
646impl SchemaObserver {
647    /// Create an observer backed by the given classifier.
648    pub fn new(classifier: SchemaClassifier) -> Self {
649        Self {
650            classifier,
651            counts: Mutex::new(HashMap::new()),
652            unknown: AtomicU64::new(0),
653            events_observed: AtomicU64::new(0),
654            lifetime_classified: AtomicU64::new(0),
655            lifetime_unknown: AtomicU64::new(0),
656            start: Mutex::new(Instant::now()),
657        }
658    }
659
660    /// Create an observer using the built-in classifier.
661    pub fn builtin() -> Self {
662        Self::new(SchemaClassifier::builtin())
663    }
664
665    /// Classify an event and update the counters. Takes `&self` so the
666    /// observer can be shared behind an `Arc`.
667    pub fn observe<E: Event + ?Sized>(&self, event: &E) {
668        self.events_observed.fetch_add(1, Ordering::Relaxed);
669        match self.classifier.classify(event) {
670            Some(m) => {
671                self.lifetime_classified.fetch_add(1, Ordering::Relaxed);
672                let mut counts = self.counts.lock().expect("schema observer mutex poisoned");
673                *counts.entry(m.name).or_insert(0) += 1;
674            }
675            None => {
676                self.unknown.fetch_add(1, Ordering::Relaxed);
677                self.lifetime_unknown.fetch_add(1, Ordering::Relaxed);
678            }
679        }
680    }
681
682    /// Snapshot the current counts, sorted by descending count then name.
683    pub fn snapshot(&self) -> SchemaObservation {
684        let counts = self.counts.lock().expect("schema observer mutex poisoned");
685        let mut by_schema: Vec<SchemaCountEntry> = counts
686            .iter()
687            .map(|(schema, count)| SchemaCountEntry {
688                schema: schema.clone(),
689                count: *count,
690            })
691            .collect();
692        let classified: u64 = counts.values().sum();
693        drop(counts);
694        by_schema.sort_by(|a, b| b.count.cmp(&a.count).then_with(|| a.schema.cmp(&b.schema)));
695        let unknown = self.unknown.load(Ordering::Relaxed);
696        SchemaObservation {
697            by_schema,
698            classified,
699            unknown,
700            events_observed: self.events_observed.load(Ordering::Relaxed),
701            lifetime_classified: self.lifetime_classified.load(Ordering::Relaxed),
702            lifetime_unknown: self.lifetime_unknown.load(Ordering::Relaxed),
703            uptime_seconds: self
704                .start
705                .lock()
706                .expect("schema observer start mutex poisoned")
707                .elapsed()
708                .as_secs_f64(),
709        }
710    }
711
712    /// Reset the since-last-reset counters (lifetime totals are preserved).
713    /// Returns the previous `(classified, unknown)` pair.
714    pub fn reset(&self) -> (u64, u64) {
715        let mut counts = self.counts.lock().expect("schema observer mutex poisoned");
716        let previous_classified: u64 = counts.values().sum();
717        counts.clear();
718        drop(counts);
719        let previous_unknown = self.unknown.swap(0, Ordering::Relaxed);
720        self.events_observed.store(0, Ordering::Relaxed);
721        *self
722            .start
723            .lock()
724            .expect("schema observer start mutex poisoned") = Instant::now();
725        (previous_classified, previous_unknown)
726    }
727
728    /// Lifetime classified total, ignoring resets. Monotonic.
729    pub fn lifetime_classified(&self) -> u64 {
730        self.lifetime_classified.load(Ordering::Relaxed)
731    }
732
733    /// Lifetime unknown total, ignoring resets. Monotonic.
734    pub fn lifetime_unknown(&self) -> u64 {
735        self.lifetime_unknown.load(Ordering::Relaxed)
736    }
737}
738
739#[cfg(test)]
740mod tests {
741    use super::*;
742    use crate::event::JsonEvent;
743    use serde_json::json;
744
745    fn classify(value: &serde_json::Value) -> Option<String> {
746        SchemaClassifier::builtin()
747            .classify(&JsonEvent::borrow(value))
748            .map(|m| m.name)
749    }
750
751    #[test]
752    fn recognizes_ecs_by_version_marker() {
753        let v = json!({"ecs": {"version": "8.11.0"}, "process": {"command_line": "whoami"}});
754        assert_eq!(classify(&v).as_deref(), Some("ecs"));
755    }
756
757    #[test]
758    fn recognizes_ecs_with_flattened_keys() {
759        let v = json!({"ecs.version": "8.11.0", "process.command_line": "whoami"});
760        assert_eq!(classify(&v).as_deref(), Some("ecs"));
761    }
762
763    #[test]
764    fn recognizes_ocsf_by_class_and_metadata() {
765        let v = json!({"class_uid": 1001, "category_uid": 1, "metadata": {"version": "1.1.0"}});
766        assert_eq!(classify(&v).as_deref(), Some("ocsf"));
767    }
768
769    #[test]
770    fn recognizes_rendered_windows_event_log() {
771        let v = json!({"Event": {"System": {"EventID": 4688, "Provider": "Microsoft-Windows-Security-Auditing"}}});
772        assert_eq!(classify(&v).as_deref(), Some("windows_eventlog"));
773    }
774
775    #[test]
776    fn recognizes_sysmon_by_channel() {
777        let v = json!({"Channel": "Microsoft-Windows-Sysmon/Operational", "EventID": 1, "Image": "C:/cmd.exe"});
778        assert_eq!(classify(&v).as_deref(), Some("sysmon"));
779    }
780
781    #[test]
782    fn recognizes_sysmon_by_provider() {
783        let v = json!({"Provider_Name": "Microsoft-Windows-Sysmon", "EventID": 3});
784        assert_eq!(classify(&v).as_deref(), Some("sysmon"));
785    }
786
787    #[test]
788    fn recognizes_flat_sysmon_by_field_shape() {
789        let v = json!({"EventID": 1, "ProcessGuid": "{abc}", "CommandLine": "cmd /c whoami"});
790        assert_eq!(classify(&v).as_deref(), Some("sysmon"));
791    }
792
793    #[test]
794    fn recognizes_cef_structured_fields() {
795        let v = json!({"deviceVendor": "Security", "deviceProduct": "IDS", "signatureId": "100", "src": "10.0.0.1"});
796        assert_eq!(classify(&v).as_deref(), Some("cef"));
797    }
798
799    #[test]
800    fn falls_back_to_generic_json_for_unrecognized_structured_events() {
801        let v = json!({"some_vendor_field": "x", "another": 1});
802        assert_eq!(classify(&v).as_deref(), Some("generic_json"));
803    }
804
805    #[test]
806    fn fieldless_events_are_unknown() {
807        // Empty object: no fields, no signature matches (not even generic_json).
808        assert_eq!(classify(&json!({})), None);
809        // JSON scalar/array carries no named fields either.
810        assert_eq!(classify(&json!("just a string")), None);
811    }
812
813    #[test]
814    fn specificity_prefers_specific_schema_over_generic() {
815        // Carries both an ECS marker and arbitrary extra fields; ECS wins.
816        let v = json!({"ecs.version": "8.0.0", "vendor_blob": {"x": 1}});
817        let cls = SchemaClassifier::builtin();
818        let m = cls.classify(&JsonEvent::borrow(&v)).unwrap();
819        assert_eq!(m.name, "ecs");
820        assert_eq!(m.specificity, 100);
821        // generic_json is still a candidate, just lower priority.
822        let all = cls.classify_all(&JsonEvent::borrow(&v));
823        assert_eq!(all.first().map(String::as_str), Some("ecs"));
824        assert!(all.iter().any(|n| n == "generic_json"));
825    }
826
827    #[test]
828    fn schema_names_lists_builtins_most_specific_first() {
829        let classifier = SchemaClassifier::builtin();
830        let names = classifier.schema_names();
831        assert_eq!(names.first(), Some(&"ecs"));
832        assert!(names.contains(&"generic_json"));
833        // generic_json is the lowest-specificity, so it sorts last.
834        assert_eq!(names.last(), Some(&"generic_json"));
835    }
836
837    #[test]
838    fn parses_user_signatures_from_yaml() {
839        let yaml = r#"
840schemas:
841  - name: my_vendor
842    specificity: 70
843    match:
844      - field_present: vendor.product
845      - equals:
846          field: event_type
847          value: alert
848      - any_of: [a, b]
849"#;
850        let sigs = parse_schema_signatures(yaml).expect("parse");
851        assert_eq!(sigs.len(), 1);
852        assert_eq!(sigs[0].name, "my_vendor");
853        assert_eq!(sigs[0].specificity, 70);
854        assert_eq!(sigs[0].predicates.len(), 3);
855
856        let cls = SchemaClassifier::with_user_signatures(sigs);
857        let v = json!({"vendor": {"product": "X"}, "event_type": "ALERT", "a": 1});
858        assert_eq!(
859            cls.classify(&JsonEvent::borrow(&v))
860                .map(|m| m.name)
861                .as_deref(),
862            Some("my_vendor")
863        );
864    }
865
866    #[test]
867    fn user_signature_with_invalid_regex_is_rejected() {
868        let yaml = r#"
869schemas:
870  - name: bad
871    match:
872      - matches:
873          field: msg
874          value: "([unclosed"
875"#;
876        let err = parse_schema_signatures(yaml).unwrap_err();
877        assert!(matches!(err, SchemaError::InvalidRegex { .. }));
878    }
879
880    #[test]
881    fn user_regex_signature_matches_field_value() {
882        let yaml = r#"
883schemas:
884  - name: cef_raw
885    specificity: 60
886    match:
887      - matches:
888          field: message
889          value: "^CEF:\\d"
890"#;
891        let sigs = parse_schema_signatures(yaml).expect("parse");
892        let cls = SchemaClassifier::with_user_signatures(sigs);
893        let v = json!({"message": "CEF:0|Vendor|Product|1.0|100|Name|9|src=1.2.3.4"});
894        assert_eq!(
895            cls.classify(&JsonEvent::borrow(&v))
896                .map(|m| m.name)
897                .as_deref(),
898            Some("cef_raw")
899        );
900    }
901
902    #[test]
903    fn observer_counts_per_schema_and_unknown() {
904        let observer = SchemaObserver::builtin();
905        observer.observe(&JsonEvent::borrow(&json!({"ecs.version": "8.0.0"})));
906        observer.observe(&JsonEvent::borrow(&json!({"ecs.version": "8.1.0"})));
907        observer.observe(&JsonEvent::borrow(
908            &json!({"class_uid": 1001, "metadata": {"version": "1.1.0"}}),
909        ));
910        observer.observe(&JsonEvent::borrow(&json!({})));
911
912        let snap = observer.snapshot();
913        assert_eq!(snap.events_observed, 4);
914        assert_eq!(snap.classified, 3);
915        assert_eq!(snap.unknown, 1);
916        // Sorted by descending count, so ecs (2) comes first.
917        assert_eq!(snap.by_schema[0].schema, "ecs");
918        assert_eq!(snap.by_schema[0].count, 2);
919        let ocsf = snap.by_schema.iter().find(|e| e.schema == "ocsf").unwrap();
920        assert_eq!(ocsf.count, 1);
921    }
922
923    #[test]
924    fn routing_plan_dedups_pipeline_sets() {
925        let config = RoutingConfig {
926            on_unknown: OnUnknown::Warn,
927            default_pipelines: vec![],
928            bindings: vec![
929                SchemaBinding {
930                    schema: "ecs".to_string(),
931                    pipelines: vec!["ecs_windows".to_string()],
932                },
933                SchemaBinding {
934                    schema: "winlogbeat".to_string(),
935                    pipelines: vec!["ecs_windows".to_string()],
936                },
937                SchemaBinding {
938                    schema: "sysmon".to_string(),
939                    pipelines: vec!["sysmon".to_string()],
940                },
941            ],
942        };
943        let plan = RoutingPlan::from_config(&config);
944        // Default set (0) + ecs_windows set + sysmon set = 3 distinct sets.
945        assert_eq!(plan.pipeline_sets().len(), 3);
946        // ecs and winlogbeat share the same deduped set.
947        let ecs = plan.decide(Some("ecs"));
948        let win = plan.decide(Some("winlogbeat"));
949        assert_eq!(ecs, win);
950        assert!(matches!(
951            ecs,
952            RouteDecision::Evaluate { unknown: false, .. }
953        ));
954        // sysmon is a different set.
955        assert_ne!(plan.decide(Some("sysmon")), ecs);
956    }
957
958    #[test]
959    fn routing_decides_bound_unbound_and_unknown() {
960        let config = RoutingConfig {
961            on_unknown: OnUnknown::Warn,
962            default_pipelines: vec![],
963            bindings: vec![SchemaBinding {
964                schema: "ecs".to_string(),
965                pipelines: vec!["ecs_windows".to_string()],
966            }],
967        };
968        let plan = RoutingPlan::from_config(&config);
969        // Bound schema -> its own set, not flagged unknown.
970        assert!(matches!(
971            plan.decide(Some("ecs")),
972            RouteDecision::Evaluate { unknown: false, .. }
973        ));
974        // Recognized but unbound -> default set (0), not flagged unknown.
975        assert_eq!(
976            plan.decide(Some("cef")),
977            RouteDecision::Evaluate {
978                set: 0,
979                unknown: false
980            }
981        );
982        // Unknown -> default set, flagged unknown (Warn).
983        assert_eq!(
984            plan.decide(None),
985            RouteDecision::Evaluate {
986                set: 0,
987                unknown: true
988            }
989        );
990    }
991
992    #[test]
993    fn routing_on_unknown_policies() {
994        let base = |policy| RoutingConfig {
995            on_unknown: policy,
996            default_pipelines: vec![],
997            bindings: vec![],
998        };
999        assert_eq!(
1000            RoutingPlan::from_config(&base(OnUnknown::Drop)).decide(None),
1001            RouteDecision::Drop
1002        );
1003        assert_eq!(
1004            RoutingPlan::from_config(&base(OnUnknown::Error)).decide(None),
1005            RouteDecision::Error
1006        );
1007        assert_eq!(
1008            RoutingPlan::from_config(&base(OnUnknown::Passthrough)).decide(None),
1009            RouteDecision::Evaluate {
1010                set: 0,
1011                unknown: true
1012            }
1013        );
1014    }
1015
1016    #[test]
1017    fn parses_routing_section_from_yaml() {
1018        let yaml = r#"
1019schemas:
1020  - name: my_vendor
1021    match:
1022      - field_present: vendor.id
1023routing:
1024  on_unknown: drop
1025  default_pipelines: [base]
1026  bindings:
1027    - schema: ecs
1028      pipelines: [ecs_windows]
1029    - schema: my_vendor
1030      pipelines: [vendor_map, base]
1031"#;
1032        let (sigs, routing) = parse_schema_config(yaml).expect("parse");
1033        assert_eq!(sigs.len(), 1);
1034        let routing = routing.expect("routing present");
1035        assert_eq!(routing.on_unknown, OnUnknown::Drop);
1036        assert_eq!(routing.default_pipelines, vec!["base".to_string()]);
1037        assert_eq!(routing.bindings.len(), 2);
1038        let plan = RoutingPlan::from_config(&routing);
1039        // default [base], ecs [ecs_windows], my_vendor [vendor_map, base] = 3.
1040        assert_eq!(plan.pipeline_sets().len(), 3);
1041        assert_eq!(plan.decide(None), RouteDecision::Drop);
1042    }
1043
1044    #[test]
1045    fn observer_reset_preserves_lifetime_counters() {
1046        let observer = SchemaObserver::builtin();
1047        observer.observe(&JsonEvent::borrow(&json!({"ecs.version": "8.0.0"})));
1048        observer.observe(&JsonEvent::borrow(&json!({})));
1049        let (classified, unknown) = observer.reset();
1050        assert_eq!(classified, 1);
1051        assert_eq!(unknown, 1);
1052
1053        let snap = observer.snapshot();
1054        assert_eq!(snap.classified, 0);
1055        assert_eq!(snap.unknown, 0);
1056        assert_eq!(snap.events_observed, 0);
1057        // Lifetime totals survive the reset for the Prometheus bridge.
1058        assert_eq!(snap.lifetime_classified, 1);
1059        assert_eq!(snap.lifetime_unknown, 1);
1060    }
1061}