1mod arrow;
8mod callsite_registry;
9mod erased;
10mod payload_decode;
11mod scrubbed;
12mod scrubber;
13
14use std::{collections::HashMap, sync::Arc};
15
16use linkme::distributed_slice;
17use obs_proto::obs::v1::ObsEnvelope;
18pub use scrubber::scrub_payload;
19
20pub use self::{
21 arrow::{ArrowEventSchema, ArrowField, ArrowLeafType, ArrowSchemaModel, ENVELOPE_COLUMNS},
22 callsite_registry::{
23 CallsiteRecord, CallsiteSource, ObsCallsiteRegistry, callsite_id, perturb_to_nonzero,
24 },
25 erased::{
26 ArrowStructBuilder, DecodeError, EventSchemaErased, OtelAttributeView, OtlpValue,
27 ScrubError, Sealed,
28 },
29 scrubbed::ScrubbedEnvelope,
30};
31
32#[distributed_slice]
41pub static EVENT_SCHEMAS: [&'static dyn EventSchemaErased] = [..];
42
43#[derive(Clone)]
48pub struct SchemaRegistry {
49 by_name: HashMap<&'static str, &'static dyn EventSchemaErased>,
50 by_hash: HashMap<u64, &'static dyn EventSchemaErased>,
51 arrow: Arc<ArrowSchemaModel>,
52}
53
54impl std::fmt::Debug for SchemaRegistry {
55 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
56 let mut names: Vec<_> = self.by_name.keys().copied().collect();
57 names.sort_unstable();
58 f.debug_struct("SchemaRegistry")
59 .field("len", &self.by_name.len())
60 .field("names", &names)
61 .finish()
62 }
63}
64
65impl SchemaRegistry {
66 #[must_use]
74 pub fn from_link_section() -> Self {
75 let mut by_name = HashMap::with_capacity(EVENT_SCHEMAS.len());
76 let mut by_hash: HashMap<u64, &'static dyn EventSchemaErased> =
77 HashMap::with_capacity(EVENT_SCHEMAS.len());
78 for &schema in EVENT_SCHEMAS {
79 by_name.insert(schema.full_name(), schema);
80 if let Some(existing) = by_hash.get(&schema.schema_hash())
81 && existing.full_name() != schema.full_name()
82 {
83 crate::self_events_public::emit_callsite_hash_collision(
84 schema.schema_hash(),
85 existing.full_name(),
86 schema.full_name(),
87 );
88 continue;
91 }
92 by_hash.insert(schema.schema_hash(), schema);
93 }
94 let arrow = Arc::new(ArrowSchemaModel::from_schemas(
95 EVENT_SCHEMAS
96 .iter()
97 .copied()
98 .map(|s| s as &dyn EventSchemaErased),
99 ));
100 Self {
101 by_name,
102 by_hash,
103 arrow,
104 }
105 }
106
107 #[must_use]
109 pub fn empty() -> Self {
110 Self {
111 by_name: HashMap::new(),
112 by_hash: HashMap::new(),
113 arrow: Arc::new(ArrowSchemaModel::default()),
114 }
115 }
116
117 #[must_use]
122 pub fn arrow_schema(&self) -> Arc<ArrowSchemaModel> {
123 Arc::clone(&self.arrow)
124 }
125
126 #[must_use]
130 pub fn lookup(&self, env: &ObsEnvelope) -> Option<&'static dyn EventSchemaErased> {
131 self.by_hash
132 .get(&env.schema_hash)
133 .copied()
134 .or_else(|| self.by_name.get(env.full_name.as_str()).copied())
135 }
136
137 #[must_use]
143 pub fn lookup_by_full_name(&self, full_name: &str) -> Option<&'static dyn EventSchemaErased> {
144 self.by_name.get(full_name).copied()
145 }
146
147 #[must_use]
149 pub fn len(&self) -> usize {
150 self.by_name.len()
151 }
152
153 #[must_use]
155 pub fn is_empty(&self) -> bool {
156 self.by_name.is_empty()
157 }
158
159 pub fn iter(&self) -> impl Iterator<Item = &'static dyn EventSchemaErased> + '_ {
162 self.by_name.values().copied()
163 }
164}
165
166impl Default for SchemaRegistry {
167 fn default() -> Self {
168 Self::from_link_section()
169 }
170}
171
172pub type SharedRegistry = Arc<SchemaRegistry>;
174
175#[cfg(test)]
176mod tests {
177 use super::*;
178
179 #[test]
180 fn test_should_build_empty_when_no_schemas_registered() {
181 let r = SchemaRegistry::empty();
182 assert!(r.is_empty());
183 }
184
185 #[test]
186 fn test_should_walk_link_section() {
187 let _ = SchemaRegistry::from_link_section();
191 }
192}