Skip to main content

obs_core/registry/
mod.rs

1//! Schema registry — `EventSchemaErased` object-safe trait, the
2//! `linkme`-collected `EVENT_SCHEMAS` distributed slice, the runtime
3//! `SchemaRegistry`, and the `ScrubbedEnvelope` worker→sink handoff.
4//!
5//! Spec 14.
6
7mod arrow;
8mod callsite_registry;
9mod erased;
10mod payload_decode;
11mod scrubbed;
12mod scrubber;
13
14use std::{collections::HashMap, sync::Arc};
15
16use linkme::distributed_slice;
17use obs_proto::obs::v1::ObsEnvelope;
18pub use scrubber::scrub_payload;
19
20pub use self::{
21    arrow::{ArrowEventSchema, ArrowField, ArrowLeafType, ArrowSchemaModel, ENVELOPE_COLUMNS},
22    callsite_registry::{
23        CallsiteRecord, CallsiteSource, ObsCallsiteRegistry, callsite_id, perturb_to_nonzero,
24    },
25    erased::{
26        ArrowStructBuilder, DecodeError, EventSchemaErased, OtelAttributeView, OtlpValue,
27        ScrubError, Sealed,
28    },
29    scrubbed::ScrubbedEnvelope,
30};
31
32/// Decode a buffa payload into JSON using runtime field metadata.
33///
34/// This is the dynamic-schema sibling of
35/// [`EventSchemaErased::render_json`], used by tools such as `obs
36/// decode --schemas` that load descriptors at runtime instead of
37/// linking generated Rust schema objects.
38pub fn render_payload_json(
39    payload: &[u8],
40    fields: &[crate::envelope::FieldMeta],
41    out: &mut serde_json::Map<String, serde_json::Value>,
42) -> Result<(), DecodeError> {
43    payload_decode::render_json_default(payload, fields, out)
44}
45
46/// The link-time distributed slice every `EventSchema` codegen
47/// registers into. Walked once at observer init to build the runtime
48/// `SchemaRegistry`. See spec 14 § 3.
49///
50/// **Cross-crate registration footgun**: cargo will not link an rlib
51/// the binary doesn't reference. A schema-only crate must be
52/// referenced from the binary (`use the_crate as _;`); see
53/// `docs/research/spike-linkme.md`.
54#[distributed_slice]
55pub static EVENT_SCHEMAS: [&'static dyn EventSchemaErased] = [..];
56
57/// Runtime registry: by-name and by-hash lookup populated from the
58/// `linkme` distributed slice at observer init. Owned by
59/// `StandardObserver`; sinks receive `Arc<SchemaRegistry>` at
60/// construction.
61#[derive(Clone)]
62pub struct SchemaRegistry {
63    by_name: HashMap<&'static str, &'static dyn EventSchemaErased>,
64    by_hash: HashMap<u64, &'static dyn EventSchemaErased>,
65    arrow: Arc<ArrowSchemaModel>,
66}
67
68impl std::fmt::Debug for SchemaRegistry {
69    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
70        let mut names: Vec<_> = self.by_name.keys().copied().collect();
71        names.sort_unstable();
72        f.debug_struct("SchemaRegistry")
73            .field("len", &self.by_name.len())
74            .field("names", &names)
75            .finish()
76    }
77}
78
79impl SchemaRegistry {
80    /// Walk `EVENT_SCHEMAS` and assemble the runtime registry. Called
81    /// once at `StandardObserver::build()`. Spec 14 § 4.
82    ///
83    /// Detects `schema_hash` collisions (two distinct events that
84    /// happen to share the same first-8-byte BLAKE3 prefix) and emits
85    /// `obs.runtime.v1.ObsCallsiteHashCollision` once per collision.
86    /// Spec 14 § 8 row 2 / spec 31 § 10 / spec 93 P2-9.
87    #[must_use]
88    pub fn from_link_section() -> Self {
89        let mut by_name = HashMap::with_capacity(EVENT_SCHEMAS.len());
90        let mut by_hash: HashMap<u64, &'static dyn EventSchemaErased> =
91            HashMap::with_capacity(EVENT_SCHEMAS.len());
92        for &schema in EVENT_SCHEMAS {
93            by_name.insert(schema.full_name(), schema);
94            if let Some(existing) = by_hash.get(&schema.schema_hash())
95                && existing.full_name() != schema.full_name()
96            {
97                crate::self_events_public::emit_callsite_hash_collision(
98                    schema.schema_hash(),
99                    existing.full_name(),
100                    schema.full_name(),
101                );
102                // Keep the first-registered entry (deterministic w.r.t.
103                // linkme order on a given build).
104                continue;
105            }
106            by_hash.insert(schema.schema_hash(), schema);
107        }
108        let arrow = Arc::new(ArrowSchemaModel::from_schemas(
109            EVENT_SCHEMAS
110                .iter()
111                .copied()
112                .map(|s| s as &dyn EventSchemaErased),
113        ));
114        Self {
115            by_name,
116            by_hash,
117            arrow,
118        }
119    }
120
121    /// Empty registry. Useful for tests that don't care about decoding.
122    #[must_use]
123    pub fn empty() -> Self {
124        Self {
125            by_name: HashMap::new(),
126            by_hash: HashMap::new(),
127            arrow: Arc::new(ArrowSchemaModel::default()),
128        }
129    }
130
131    /// The unified Arrow schema model assembled at registry init.
132    /// Used by `ParquetSink::from_registry`, `ClickHouseSink` DDL emit,
133    /// and the CLI's `obs migrate {parquet,clickhouse}` paths.
134    /// Spec 14 § 4 KD5.
135    #[must_use]
136    pub fn arrow_schema(&self) -> Arc<ArrowSchemaModel> {
137        Arc::clone(&self.arrow)
138    }
139
140    /// Hot-path lookup: try `schema_hash` first (8-byte u64), then
141    /// fall back to `full_name` for foreign-producer interop.
142    /// Spec 14 § 4.1.
143    #[must_use]
144    pub fn lookup(&self, env: &ObsEnvelope) -> Option<&'static dyn EventSchemaErased> {
145        self.by_hash
146            .get(&env.schema_hash)
147            .copied()
148            .or_else(|| self.by_name.get(env.full_name.as_str()).copied())
149    }
150
151    /// Lookup by `full_name` only (no schema_hash dispatch). Used by
152    /// the per-event Struct dispatch in `obs-parquet`'s record-batch
153    /// builder: callers walk a registry-backed column array and need
154    /// the raw schema for the matching event without an envelope to
155    /// hand. Spec 94 § 2.8.
156    #[must_use]
157    pub fn lookup_by_full_name(&self, full_name: &str) -> Option<&'static dyn EventSchemaErased> {
158        self.by_name.get(full_name).copied()
159    }
160
161    /// Number of registered schemas.
162    #[must_use]
163    pub fn len(&self) -> usize {
164        self.by_name.len()
165    }
166
167    /// True if no schemas are registered.
168    #[must_use]
169    pub fn is_empty(&self) -> bool {
170        self.by_name.is_empty()
171    }
172
173    /// Iterate all registered schemas (used by `obs schema show`,
174    /// `obs migrate`, the bridge pre-warm path).
175    pub fn iter(&self) -> impl Iterator<Item = &'static dyn EventSchemaErased> + '_ {
176        self.by_name.values().copied()
177    }
178}
179
180impl Default for SchemaRegistry {
181    fn default() -> Self {
182        Self::from_link_section()
183    }
184}
185
186/// Convenience: shared `Arc<SchemaRegistry>` for sink construction.
187pub type SharedRegistry = Arc<SchemaRegistry>;
188
189#[cfg(test)]
190mod tests {
191    use super::*;
192
193    #[test]
194    fn test_should_build_empty_when_no_schemas_registered() {
195        let r = SchemaRegistry::empty();
196        assert!(r.is_empty());
197    }
198
199    #[test]
200    fn test_should_walk_link_section() {
201        // The link section may be empty in tests until the test binary
202        // pulls in obs-proto's built-ins via `use obs_proto as _;`.
203        // We just assert the call returns without panicking.
204        let _ = SchemaRegistry::from_link_section();
205    }
206}