1mod arrow;
8mod callsite_registry;
9mod erased;
10mod payload_decode;
11mod scrubbed;
12mod scrubber;
13
14use std::{collections::HashMap, sync::Arc};
15
16use linkme::distributed_slice;
17use obs_proto::obs::v1::ObsEnvelope;
18pub use scrubber::scrub_payload;
19
20pub use self::{
21 arrow::{ArrowEventSchema, ArrowField, ArrowLeafType, ArrowSchemaModel, ENVELOPE_COLUMNS},
22 callsite_registry::{
23 CallsiteRecord, CallsiteSource, ObsCallsiteRegistry, callsite_id, perturb_to_nonzero,
24 },
25 erased::{
26 ArrowStructBuilder, DecodeError, EventSchemaErased, OtelAttributeView, OtlpValue,
27 ScrubError, Sealed,
28 },
29 scrubbed::ScrubbedEnvelope,
30};
31
32pub fn render_payload_json(
39 payload: &[u8],
40 fields: &[crate::envelope::FieldMeta],
41 out: &mut serde_json::Map<String, serde_json::Value>,
42) -> Result<(), DecodeError> {
43 payload_decode::render_json_default(payload, fields, out)
44}
45
46#[distributed_slice]
55pub static EVENT_SCHEMAS: [&'static dyn EventSchemaErased] = [..];
56
57#[derive(Clone)]
62pub struct SchemaRegistry {
63 by_name: HashMap<&'static str, &'static dyn EventSchemaErased>,
64 by_hash: HashMap<u64, &'static dyn EventSchemaErased>,
65 arrow: Arc<ArrowSchemaModel>,
66}
67
68impl std::fmt::Debug for SchemaRegistry {
69 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
70 let mut names: Vec<_> = self.by_name.keys().copied().collect();
71 names.sort_unstable();
72 f.debug_struct("SchemaRegistry")
73 .field("len", &self.by_name.len())
74 .field("names", &names)
75 .finish()
76 }
77}
78
79impl SchemaRegistry {
80 #[must_use]
88 pub fn from_link_section() -> Self {
89 let mut by_name = HashMap::with_capacity(EVENT_SCHEMAS.len());
90 let mut by_hash: HashMap<u64, &'static dyn EventSchemaErased> =
91 HashMap::with_capacity(EVENT_SCHEMAS.len());
92 for &schema in EVENT_SCHEMAS {
93 by_name.insert(schema.full_name(), schema);
94 if let Some(existing) = by_hash.get(&schema.schema_hash())
95 && existing.full_name() != schema.full_name()
96 {
97 crate::self_events_public::emit_callsite_hash_collision(
98 schema.schema_hash(),
99 existing.full_name(),
100 schema.full_name(),
101 );
102 continue;
105 }
106 by_hash.insert(schema.schema_hash(), schema);
107 }
108 let arrow = Arc::new(ArrowSchemaModel::from_schemas(
109 EVENT_SCHEMAS
110 .iter()
111 .copied()
112 .map(|s| s as &dyn EventSchemaErased),
113 ));
114 Self {
115 by_name,
116 by_hash,
117 arrow,
118 }
119 }
120
121 #[must_use]
123 pub fn empty() -> Self {
124 Self {
125 by_name: HashMap::new(),
126 by_hash: HashMap::new(),
127 arrow: Arc::new(ArrowSchemaModel::default()),
128 }
129 }
130
131 #[must_use]
136 pub fn arrow_schema(&self) -> Arc<ArrowSchemaModel> {
137 Arc::clone(&self.arrow)
138 }
139
140 #[must_use]
144 pub fn lookup(&self, env: &ObsEnvelope) -> Option<&'static dyn EventSchemaErased> {
145 self.by_hash
146 .get(&env.schema_hash)
147 .copied()
148 .or_else(|| self.by_name.get(env.full_name.as_str()).copied())
149 }
150
151 #[must_use]
157 pub fn lookup_by_full_name(&self, full_name: &str) -> Option<&'static dyn EventSchemaErased> {
158 self.by_name.get(full_name).copied()
159 }
160
161 #[must_use]
163 pub fn len(&self) -> usize {
164 self.by_name.len()
165 }
166
167 #[must_use]
169 pub fn is_empty(&self) -> bool {
170 self.by_name.is_empty()
171 }
172
173 pub fn iter(&self) -> impl Iterator<Item = &'static dyn EventSchemaErased> + '_ {
176 self.by_name.values().copied()
177 }
178}
179
180impl Default for SchemaRegistry {
181 fn default() -> Self {
182 Self::from_link_section()
183 }
184}
185
186pub type SharedRegistry = Arc<SchemaRegistry>;
188
189#[cfg(test)]
190mod tests {
191 use super::*;
192
193 #[test]
194 fn test_should_build_empty_when_no_schemas_registered() {
195 let r = SchemaRegistry::empty();
196 assert!(r.is_empty());
197 }
198
199 #[test]
200 fn test_should_walk_link_section() {
201 let _ = SchemaRegistry::from_link_section();
205 }
206}