1use obs_proto::obs::v1::{Cardinality, Classification, FieldKind};
16
17use super::erased::EventSchemaErased;
18use crate::envelope::{FieldMeta, FieldRole};
19
20#[derive(Debug, Clone, Copy, PartialEq, Eq)]
23#[non_exhaustive]
24pub enum ArrowLeafType {
25 Utf8,
27 DictUtf8,
29 Int64,
31 UInt64,
33 Float64,
35 Bool,
37 Binary,
39 TimestampNs,
41}
42
43impl ArrowLeafType {
44 #[must_use]
46 pub const fn as_str(&self) -> &'static str {
47 match self {
48 Self::Utf8 => "utf8",
49 Self::DictUtf8 => "dict_utf8",
50 Self::Int64 => "int64",
51 Self::UInt64 => "uint64",
52 Self::Float64 => "float64",
53 Self::Bool => "bool",
54 Self::Binary => "binary",
55 Self::TimestampNs => "timestamp_ns",
56 }
57 }
58}
59
60#[derive(Debug, Clone)]
63pub struct ArrowField {
64 pub name: String,
66 pub tag: u32,
68 pub ty: ArrowLeafType,
70 pub kind: FieldKind,
72 pub cardinality: Cardinality,
74 pub classification: Classification,
76}
77
78#[derive(Debug, Clone)]
81pub struct ArrowEventSchema {
82 pub full_name: String,
84 pub payload_column: String,
87 pub fields: Vec<ArrowField>,
89 pub schema_hash: u64,
91}
92
93impl ArrowEventSchema {
94 #[must_use]
100 pub fn from_erased(schema: &dyn EventSchemaErased) -> Self {
101 let full = schema.full_name().to_string();
102 let payload_column = format!("payload_{}", full.replace('.', "_").to_lowercase());
103 let fields = schema
104 .fields()
105 .iter()
106 .map(arrow_field_for)
107 .collect::<Vec<_>>();
108 Self {
109 full_name: full,
110 payload_column,
111 fields,
112 schema_hash: schema.schema_hash(),
113 }
114 }
115}
116
117fn arrow_field_for(meta: &FieldMeta) -> ArrowField {
118 let kind = match meta.role {
119 FieldRole::Label => FieldKind::Label,
120 FieldRole::Attribute => FieldKind::Attribute,
121 FieldRole::Measurement => FieldKind::Measurement,
122 FieldRole::TraceId => FieldKind::TraceId,
123 FieldRole::SpanId => FieldKind::SpanId,
124 FieldRole::ParentSpanId => FieldKind::ParentSpanId,
125 FieldRole::TimestampNs => FieldKind::TimestampNs,
126 FieldRole::DurationNs => FieldKind::DurationNs,
127 FieldRole::Forensic => FieldKind::Forensic,
128 };
129 let ty = infer_leaf_type(meta);
130 ArrowField {
131 name: meta.name.to_string(),
132 tag: meta.number,
133 ty,
134 kind,
135 cardinality: meta.cardinality,
136 classification: meta.classification,
137 }
138}
139
140fn infer_leaf_type(meta: &FieldMeta) -> ArrowLeafType {
141 match meta.role {
142 FieldRole::Label => match meta.cardinality {
143 Cardinality::Low | Cardinality::Medium => ArrowLeafType::DictUtf8,
144 _ => ArrowLeafType::Utf8,
145 },
146 FieldRole::TraceId | FieldRole::SpanId | FieldRole::ParentSpanId => ArrowLeafType::Utf8,
147 FieldRole::TimestampNs => ArrowLeafType::TimestampNs,
148 FieldRole::DurationNs => ArrowLeafType::UInt64,
149 FieldRole::Measurement => {
150 let n = meta.name;
152 let is_int = n.ends_with("_count")
153 || n.ends_with("_total")
154 || n.ends_with("_n")
155 || n.ends_with("_ms")
156 || n.ends_with("_us")
157 || n.ends_with("_ns")
158 || n.ends_with("_bytes")
159 || n.ends_with("_size");
160 if is_int {
161 ArrowLeafType::UInt64
162 } else {
163 ArrowLeafType::Float64
164 }
165 }
166 FieldRole::Attribute | FieldRole::Forensic => ArrowLeafType::Utf8,
167 }
168}
169
170#[derive(Debug, Clone, Default)]
175pub struct ArrowSchemaModel {
176 pub events: Vec<ArrowEventSchema>,
178}
179
180impl ArrowSchemaModel {
181 #[must_use]
185 pub fn from_schemas<'a, I>(iter: I) -> Self
186 where
187 I: IntoIterator<Item = &'a (dyn EventSchemaErased + 'static)>,
188 {
189 let mut events: Vec<_> = iter
190 .into_iter()
191 .map(ArrowEventSchema::from_erased)
192 .collect();
193 events.sort_by(|a, b| a.full_name.cmp(&b.full_name));
194 Self { events }
195 }
196
197 #[must_use]
199 pub fn len(&self) -> usize {
200 self.events.len()
201 }
202
203 #[must_use]
205 pub fn is_empty(&self) -> bool {
206 self.events.is_empty()
207 }
208
209 #[must_use]
211 pub fn lookup(&self, full_name: &str) -> Option<&ArrowEventSchema> {
212 self.events.iter().find(|e| e.full_name == full_name)
213 }
214
215 pub fn to_json(&self) -> Result<String, serde_json::Error> {
224 let v = self.to_serializable();
225 serde_json::to_string_pretty(&v)
226 }
227
228 fn to_serializable(&self) -> serde_json::Value {
229 use serde_json::{Value, json};
230 let envelope_columns: Vec<Value> = ENVELOPE_COLUMNS
231 .iter()
232 .map(|(name, ty)| json!({"name": name, "type": ty.as_str()}))
233 .collect();
234 let labels = json!({
235 "name": "labels",
236 "type": "map<dict_utf8, utf8>",
237 });
238 let attrs = json!({
239 "name": "attrs",
240 "type": "map<dict_utf8, utf8>",
241 });
242 let payload_proto = json!({
243 "name": "payload_proto",
244 "type": "binary",
245 });
246 let mut payloads = Vec::with_capacity(self.events.len());
247 for evt in &self.events {
248 let columns: Vec<Value> = evt
249 .fields
250 .iter()
251 .map(|f| {
252 json!({
253 "name": f.name,
254 "tag": f.tag,
255 "type": f.ty.as_str(),
256 "kind": kind_str(f.kind),
257 "cardinality": format!("{:?}", f.cardinality),
258 "classification": format!("{:?}", f.classification),
259 })
260 })
261 .collect();
262 payloads.push(json!({
263 "full_name": evt.full_name,
264 "payload_column": evt.payload_column,
265 "schema_hash": format!("{:#018x}", evt.schema_hash),
266 "fields": columns,
267 }));
268 }
269 json!({
270 "table": "obs_events",
271 "envelope": envelope_columns,
272 "labels": labels,
273 "attrs": attrs,
274 "payload_proto": payload_proto,
275 "events": payloads,
276 })
277 }
278}
279
280const fn kind_str(k: FieldKind) -> &'static str {
281 match k {
282 FieldKind::Label => "LABEL",
283 FieldKind::Attribute => "ATTRIBUTE",
284 FieldKind::Measurement => "MEASUREMENT",
285 FieldKind::TraceId => "TRACE_ID",
286 FieldKind::SpanId => "SPAN_ID",
287 FieldKind::ParentSpanId => "PARENT_SPAN_ID",
288 FieldKind::TimestampNs => "TIMESTAMP_NS",
289 FieldKind::DurationNs => "DURATION_NS",
290 FieldKind::Forensic => "FORENSIC",
291 _ => "ATTRIBUTE",
292 }
293}
294
295pub const ENVELOPE_COLUMNS: &[(&str, ArrowLeafType)] = &[
299 ("ts_ns", ArrowLeafType::TimestampNs),
300 ("full_name", ArrowLeafType::DictUtf8),
301 ("schema_hash", ArrowLeafType::UInt64),
302 ("tier", ArrowLeafType::DictUtf8),
303 ("sev", ArrowLeafType::DictUtf8),
304 ("trace_id", ArrowLeafType::Utf8),
305 ("span_id", ArrowLeafType::Utf8),
306 ("parent_span_id", ArrowLeafType::Utf8),
307 ("service", ArrowLeafType::DictUtf8),
308 ("instance", ArrowLeafType::DictUtf8),
309 ("version", ArrowLeafType::DictUtf8),
310 ("sampling_reason", ArrowLeafType::DictUtf8),
311 ("callsite_id", ArrowLeafType::UInt64),
312];
313
314#[cfg(test)]
315mod tests {
316 use super::*;
317
318 #[test]
319 fn test_should_emit_deterministic_json() {
320 let model = ArrowSchemaModel::default();
321 let s = model.to_json().expect("json renders");
322 assert!(s.contains("obs_events"));
323 assert!(s.contains("ts_ns"));
324 }
325
326 #[test]
327 fn test_envelope_columns_should_include_callsite_id() {
328 assert!(ENVELOPE_COLUMNS.iter().any(|(n, _)| *n == "callsite_id"));
329 }
330}