Skip to main content

obs_core/registry/
mod.rs

1//! Schema registry — `EventSchemaErased` object-safe trait, the
2//! `linkme`-collected `EVENT_SCHEMAS` distributed slice, the runtime
3//! `SchemaRegistry`, and the `ScrubbedEnvelope` worker→sink handoff.
4//!
5//! Spec 14.
6
7mod arrow;
8mod callsite_registry;
9mod erased;
10mod payload_decode;
11mod scrubbed;
12mod scrubber;
13
14use std::{collections::HashMap, sync::Arc};
15
16use linkme::distributed_slice;
17use obs_proto::obs::v1::ObsEnvelope;
18pub use scrubber::scrub_payload;
19
20pub use self::{
21    arrow::{ArrowEventSchema, ArrowField, ArrowLeafType, ArrowSchemaModel, ENVELOPE_COLUMNS},
22    callsite_registry::{
23        CallsiteRecord, CallsiteSource, ObsCallsiteRegistry, callsite_id, perturb_to_nonzero,
24    },
25    erased::{
26        ArrowStructBuilder, DecodeError, EventSchemaErased, OtelAttributeView, OtlpValue,
27        ScrubError, Sealed,
28    },
29    scrubbed::ScrubbedEnvelope,
30};
31
32/// The link-time distributed slice every `EventSchema` codegen
33/// registers into. Walked once at observer init to build the runtime
34/// `SchemaRegistry`. See spec 14 § 3.
35///
36/// **Cross-crate registration footgun**: cargo will not link an rlib
37/// the binary doesn't reference. A schema-only crate must be
38/// referenced from the binary (`use the_crate as _;`); see
39/// `docs/research/spike-linkme.md`.
40#[distributed_slice]
41pub static EVENT_SCHEMAS: [&'static dyn EventSchemaErased] = [..];
42
43/// Runtime registry: by-name and by-hash lookup populated from the
44/// `linkme` distributed slice at observer init. Owned by
45/// `StandardObserver`; sinks receive `Arc<SchemaRegistry>` at
46/// construction.
47#[derive(Clone)]
48pub struct SchemaRegistry {
49    by_name: HashMap<&'static str, &'static dyn EventSchemaErased>,
50    by_hash: HashMap<u64, &'static dyn EventSchemaErased>,
51    arrow: Arc<ArrowSchemaModel>,
52}
53
54impl std::fmt::Debug for SchemaRegistry {
55    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
56        let mut names: Vec<_> = self.by_name.keys().copied().collect();
57        names.sort_unstable();
58        f.debug_struct("SchemaRegistry")
59            .field("len", &self.by_name.len())
60            .field("names", &names)
61            .finish()
62    }
63}
64
65impl SchemaRegistry {
66    /// Walk `EVENT_SCHEMAS` and assemble the runtime registry. Called
67    /// once at `StandardObserver::build()`. Spec 14 § 4.
68    ///
69    /// Detects `schema_hash` collisions (two distinct events that
70    /// happen to share the same first-8-byte BLAKE3 prefix) and emits
71    /// `obs.runtime.v1.ObsCallsiteHashCollision` once per collision.
72    /// Spec 14 § 8 row 2 / spec 31 § 10 / spec 93 P2-9.
73    #[must_use]
74    pub fn from_link_section() -> Self {
75        let mut by_name = HashMap::with_capacity(EVENT_SCHEMAS.len());
76        let mut by_hash: HashMap<u64, &'static dyn EventSchemaErased> =
77            HashMap::with_capacity(EVENT_SCHEMAS.len());
78        for &schema in EVENT_SCHEMAS {
79            by_name.insert(schema.full_name(), schema);
80            if let Some(existing) = by_hash.get(&schema.schema_hash())
81                && existing.full_name() != schema.full_name()
82            {
83                crate::self_events_public::emit_callsite_hash_collision(
84                    schema.schema_hash(),
85                    existing.full_name(),
86                    schema.full_name(),
87                );
88                // Keep the first-registered entry (deterministic w.r.t.
89                // linkme order on a given build).
90                continue;
91            }
92            by_hash.insert(schema.schema_hash(), schema);
93        }
94        let arrow = Arc::new(ArrowSchemaModel::from_schemas(
95            EVENT_SCHEMAS
96                .iter()
97                .copied()
98                .map(|s| s as &dyn EventSchemaErased),
99        ));
100        Self {
101            by_name,
102            by_hash,
103            arrow,
104        }
105    }
106
107    /// Empty registry. Useful for tests that don't care about decoding.
108    #[must_use]
109    pub fn empty() -> Self {
110        Self {
111            by_name: HashMap::new(),
112            by_hash: HashMap::new(),
113            arrow: Arc::new(ArrowSchemaModel::default()),
114        }
115    }
116
117    /// The unified Arrow schema model assembled at registry init.
118    /// Used by `ParquetSink::from_registry`, `ClickHouseSink` DDL emit,
119    /// and the CLI's `obs migrate {parquet,clickhouse}` paths.
120    /// Spec 14 § 4 KD5.
121    #[must_use]
122    pub fn arrow_schema(&self) -> Arc<ArrowSchemaModel> {
123        Arc::clone(&self.arrow)
124    }
125
126    /// Hot-path lookup: try `schema_hash` first (8-byte u64), then
127    /// fall back to `full_name` for foreign-producer interop.
128    /// Spec 14 § 4.1.
129    #[must_use]
130    pub fn lookup(&self, env: &ObsEnvelope) -> Option<&'static dyn EventSchemaErased> {
131        self.by_hash
132            .get(&env.schema_hash)
133            .copied()
134            .or_else(|| self.by_name.get(env.full_name.as_str()).copied())
135    }
136
137    /// Lookup by `full_name` only (no schema_hash dispatch). Used by
138    /// the per-event Struct dispatch in `obs-parquet`'s record-batch
139    /// builder: callers walk a registry-backed column array and need
140    /// the raw schema for the matching event without an envelope to
141    /// hand. Spec 94 § 2.8.
142    #[must_use]
143    pub fn lookup_by_full_name(&self, full_name: &str) -> Option<&'static dyn EventSchemaErased> {
144        self.by_name.get(full_name).copied()
145    }
146
147    /// Number of registered schemas.
148    #[must_use]
149    pub fn len(&self) -> usize {
150        self.by_name.len()
151    }
152
153    /// True if no schemas are registered.
154    #[must_use]
155    pub fn is_empty(&self) -> bool {
156        self.by_name.is_empty()
157    }
158
159    /// Iterate all registered schemas (used by `obs schema show`,
160    /// `obs migrate`, the bridge pre-warm path).
161    pub fn iter(&self) -> impl Iterator<Item = &'static dyn EventSchemaErased> + '_ {
162        self.by_name.values().copied()
163    }
164}
165
166impl Default for SchemaRegistry {
167    fn default() -> Self {
168        Self::from_link_section()
169    }
170}
171
172/// Convenience: shared `Arc<SchemaRegistry>` for sink construction.
173pub type SharedRegistry = Arc<SchemaRegistry>;
174
175#[cfg(test)]
176mod tests {
177    use super::*;
178
179    #[test]
180    fn test_should_build_empty_when_no_schemas_registered() {
181        let r = SchemaRegistry::empty();
182        assert!(r.is_empty());
183    }
184
185    #[test]
186    fn test_should_walk_link_section() {
187        // The link section may be empty in tests until the test binary
188        // pulls in obs-proto's built-ins via `use obs_proto as _;`.
189        // We just assert the call returns without panicking.
190        let _ = SchemaRegistry::from_link_section();
191    }
192}