mod arrow;
mod callsite_registry;
mod erased;
mod payload_decode;
mod scrubbed;
mod scrubber;
use std::{collections::HashMap, sync::Arc};
use linkme::distributed_slice;
use obs_proto::obs::v1::ObsEnvelope;
pub use scrubber::scrub_payload;
pub use self::{
arrow::{ArrowEventSchema, ArrowField, ArrowLeafType, ArrowSchemaModel, ENVELOPE_COLUMNS},
callsite_registry::{
CallsiteRecord, CallsiteSource, ObsCallsiteRegistry, callsite_id, perturb_to_nonzero,
},
erased::{
ArrowStructBuilder, DecodeError, EventSchemaErased, OtelAttributeView, OtlpValue,
ScrubError, Sealed,
},
scrubbed::ScrubbedEnvelope,
};
pub fn render_payload_json(
payload: &[u8],
fields: &[crate::envelope::FieldMeta],
out: &mut serde_json::Map<String, serde_json::Value>,
) -> Result<(), DecodeError> {
payload_decode::render_json_default(payload, fields, out)
}
#[distributed_slice]
pub static EVENT_SCHEMAS: [&'static dyn EventSchemaErased] = [..];
#[derive(Clone)]
pub struct SchemaRegistry {
by_name: HashMap<&'static str, &'static dyn EventSchemaErased>,
by_hash: HashMap<u64, &'static dyn EventSchemaErased>,
arrow: Arc<ArrowSchemaModel>,
}
impl std::fmt::Debug for SchemaRegistry {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let mut names: Vec<_> = self.by_name.keys().copied().collect();
names.sort_unstable();
f.debug_struct("SchemaRegistry")
.field("len", &self.by_name.len())
.field("names", &names)
.finish()
}
}
impl SchemaRegistry {
#[must_use]
pub fn from_link_section() -> Self {
let mut by_name = HashMap::with_capacity(EVENT_SCHEMAS.len());
let mut by_hash: HashMap<u64, &'static dyn EventSchemaErased> =
HashMap::with_capacity(EVENT_SCHEMAS.len());
for &schema in EVENT_SCHEMAS {
by_name.insert(schema.full_name(), schema);
if let Some(existing) = by_hash.get(&schema.schema_hash())
&& existing.full_name() != schema.full_name()
{
crate::self_events_public::emit_callsite_hash_collision(
schema.schema_hash(),
existing.full_name(),
schema.full_name(),
);
continue;
}
by_hash.insert(schema.schema_hash(), schema);
}
let arrow = Arc::new(ArrowSchemaModel::from_schemas(
EVENT_SCHEMAS
.iter()
.copied()
.map(|s| s as &dyn EventSchemaErased),
));
Self {
by_name,
by_hash,
arrow,
}
}
#[must_use]
pub fn empty() -> Self {
Self {
by_name: HashMap::new(),
by_hash: HashMap::new(),
arrow: Arc::new(ArrowSchemaModel::default()),
}
}
#[must_use]
pub fn arrow_schema(&self) -> Arc<ArrowSchemaModel> {
Arc::clone(&self.arrow)
}
#[must_use]
pub fn lookup(&self, env: &ObsEnvelope) -> Option<&'static dyn EventSchemaErased> {
self.by_hash
.get(&env.schema_hash)
.copied()
.or_else(|| self.by_name.get(env.full_name.as_str()).copied())
}
#[must_use]
pub fn lookup_by_full_name(&self, full_name: &str) -> Option<&'static dyn EventSchemaErased> {
self.by_name.get(full_name).copied()
}
#[must_use]
pub fn len(&self) -> usize {
self.by_name.len()
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.by_name.is_empty()
}
pub fn iter(&self) -> impl Iterator<Item = &'static dyn EventSchemaErased> + '_ {
self.by_name.values().copied()
}
}
impl Default for SchemaRegistry {
fn default() -> Self {
Self::from_link_section()
}
}
pub type SharedRegistry = Arc<SchemaRegistry>;
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_should_build_empty_when_no_schemas_registered() {
let r = SchemaRegistry::empty();
assert!(r.is_empty());
}
#[test]
fn test_should_walk_link_section() {
let _ = SchemaRegistry::from_link_section();
}
}