use obs_proto::obs::v1::{Cardinality, Classification, FieldKind};
use super::erased::EventSchemaErased;
use crate::envelope::{FieldMeta, FieldRole};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[non_exhaustive]
pub enum ArrowLeafType {
Utf8,
DictUtf8,
Int64,
UInt64,
Float64,
Bool,
Binary,
TimestampNs,
}
impl ArrowLeafType {
#[must_use]
pub const fn as_str(&self) -> &'static str {
match self {
Self::Utf8 => "utf8",
Self::DictUtf8 => "dict_utf8",
Self::Int64 => "int64",
Self::UInt64 => "uint64",
Self::Float64 => "float64",
Self::Bool => "bool",
Self::Binary => "binary",
Self::TimestampNs => "timestamp_ns",
}
}
}
#[derive(Debug, Clone)]
pub struct ArrowField {
pub name: String,
pub tag: u32,
pub ty: ArrowLeafType,
pub kind: FieldKind,
pub cardinality: Cardinality,
pub classification: Classification,
}
#[derive(Debug, Clone)]
pub struct ArrowEventSchema {
pub full_name: String,
pub payload_column: String,
pub fields: Vec<ArrowField>,
pub schema_hash: u64,
}
impl ArrowEventSchema {
#[must_use]
pub fn from_erased(schema: &dyn EventSchemaErased) -> Self {
let full = schema.full_name().to_string();
let payload_column = format!("payload_{}", full.replace('.', "_").to_lowercase());
let fields = schema
.fields()
.iter()
.map(arrow_field_for)
.collect::<Vec<_>>();
Self {
full_name: full,
payload_column,
fields,
schema_hash: schema.schema_hash(),
}
}
}
fn arrow_field_for(meta: &FieldMeta) -> ArrowField {
let kind = match meta.role {
FieldRole::Label => FieldKind::Label,
FieldRole::Attribute => FieldKind::Attribute,
FieldRole::Measurement => FieldKind::Measurement,
FieldRole::TraceId => FieldKind::TraceId,
FieldRole::SpanId => FieldKind::SpanId,
FieldRole::ParentSpanId => FieldKind::ParentSpanId,
FieldRole::TimestampNs => FieldKind::TimestampNs,
FieldRole::DurationNs => FieldKind::DurationNs,
FieldRole::Forensic => FieldKind::Forensic,
};
let ty = infer_leaf_type(meta);
ArrowField {
name: meta.name.to_string(),
tag: meta.number,
ty,
kind,
cardinality: meta.cardinality,
classification: meta.classification,
}
}
fn infer_leaf_type(meta: &FieldMeta) -> ArrowLeafType {
match meta.role {
FieldRole::Label => match meta.cardinality {
Cardinality::Low | Cardinality::Medium => ArrowLeafType::DictUtf8,
_ => ArrowLeafType::Utf8,
},
FieldRole::TraceId | FieldRole::SpanId | FieldRole::ParentSpanId => ArrowLeafType::Utf8,
FieldRole::TimestampNs => ArrowLeafType::TimestampNs,
FieldRole::DurationNs => ArrowLeafType::UInt64,
FieldRole::Measurement => {
let n = meta.name;
let is_int = n.ends_with("_count")
|| n.ends_with("_total")
|| n.ends_with("_n")
|| n.ends_with("_ms")
|| n.ends_with("_us")
|| n.ends_with("_ns")
|| n.ends_with("_bytes")
|| n.ends_with("_size");
if is_int {
ArrowLeafType::UInt64
} else {
ArrowLeafType::Float64
}
}
FieldRole::Attribute | FieldRole::Forensic => ArrowLeafType::Utf8,
}
}
#[derive(Debug, Clone, Default)]
pub struct ArrowSchemaModel {
pub events: Vec<ArrowEventSchema>,
}
impl ArrowSchemaModel {
#[must_use]
pub fn from_schemas<'a, I>(iter: I) -> Self
where
I: IntoIterator<Item = &'a (dyn EventSchemaErased + 'static)>,
{
let mut events: Vec<_> = iter
.into_iter()
.map(ArrowEventSchema::from_erased)
.collect();
events.sort_by(|a, b| a.full_name.cmp(&b.full_name));
Self { events }
}
#[must_use]
pub fn len(&self) -> usize {
self.events.len()
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.events.is_empty()
}
#[must_use]
pub fn lookup(&self, full_name: &str) -> Option<&ArrowEventSchema> {
self.events.iter().find(|e| e.full_name == full_name)
}
pub fn to_json(&self) -> Result<String, serde_json::Error> {
let v = self.to_serializable();
serde_json::to_string_pretty(&v)
}
fn to_serializable(&self) -> serde_json::Value {
use serde_json::{Value, json};
let envelope_columns: Vec<Value> = ENVELOPE_COLUMNS
.iter()
.map(|(name, ty)| json!({"name": name, "type": ty.as_str()}))
.collect();
let labels = json!({
"name": "labels",
"type": "map<dict_utf8, utf8>",
});
let attrs = json!({
"name": "attrs",
"type": "map<dict_utf8, utf8>",
});
let payload_proto = json!({
"name": "payload_proto",
"type": "binary",
});
let mut payloads = Vec::with_capacity(self.events.len());
for evt in &self.events {
let columns: Vec<Value> = evt
.fields
.iter()
.map(|f| {
json!({
"name": f.name,
"tag": f.tag,
"type": f.ty.as_str(),
"kind": kind_str(f.kind),
"cardinality": format!("{:?}", f.cardinality),
"classification": format!("{:?}", f.classification),
})
})
.collect();
payloads.push(json!({
"full_name": evt.full_name,
"payload_column": evt.payload_column,
"schema_hash": format!("{:#018x}", evt.schema_hash),
"fields": columns,
}));
}
json!({
"table": "obs_events",
"envelope": envelope_columns,
"labels": labels,
"attrs": attrs,
"payload_proto": payload_proto,
"events": payloads,
})
}
}
const fn kind_str(k: FieldKind) -> &'static str {
match k {
FieldKind::Label => "LABEL",
FieldKind::Attribute => "ATTRIBUTE",
FieldKind::Measurement => "MEASUREMENT",
FieldKind::TraceId => "TRACE_ID",
FieldKind::SpanId => "SPAN_ID",
FieldKind::ParentSpanId => "PARENT_SPAN_ID",
FieldKind::TimestampNs => "TIMESTAMP_NS",
FieldKind::DurationNs => "DURATION_NS",
FieldKind::Forensic => "FORENSIC",
_ => "ATTRIBUTE",
}
}
pub const ENVELOPE_COLUMNS: &[(&str, ArrowLeafType)] = &[
("ts_ns", ArrowLeafType::TimestampNs),
("full_name", ArrowLeafType::DictUtf8),
("schema_hash", ArrowLeafType::UInt64),
("tier", ArrowLeafType::DictUtf8),
("sev", ArrowLeafType::DictUtf8),
("trace_id", ArrowLeafType::Utf8),
("span_id", ArrowLeafType::Utf8),
("parent_span_id", ArrowLeafType::Utf8),
("service", ArrowLeafType::DictUtf8),
("instance", ArrowLeafType::DictUtf8),
("version", ArrowLeafType::DictUtf8),
("sampling_reason", ArrowLeafType::DictUtf8),
("callsite_id", ArrowLeafType::UInt64),
];
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_should_emit_deterministic_json() {
let model = ArrowSchemaModel::default();
let s = model.to_json().expect("json renders");
assert!(s.contains("obs_events"));
assert!(s.contains("ts_ns"));
}
#[test]
fn test_envelope_columns_should_include_callsite_id() {
assert!(ENVELOPE_COLUMNS.iter().any(|(n, _)| *n == "callsite_id"));
}
}