Skip to main content

obs_core/registry/
arrow.rs

1//! Lightweight, dependency-free Arrow schema descriptor.
2//!
3//! Spec 14 § 4 / KD5: the unified Arrow schema for the `obs_events`
4//! table is built once at observer init from the registry, not per
5//! Parquet file or per ClickHouse INSERT. Spec 22 § 1.1 gives the
6//! column groups.
7//!
8//! `obs-core` does not depend on `arrow-rs` or `arrow-schema`; the full
9//! crate set lives in `obs-parquet` / `obs-clickhouse`. We instead
10//! expose a tiny logical model here that captures the per-event field
11//! shape (name, role, classification, primitive type) so downstream
12//! sinks can translate into their respective target representations
13//! without having to re-parse descriptors.
14
15use obs_proto::obs::v1::{Cardinality, Classification, FieldKind};
16
17use super::erased::EventSchemaErased;
18use crate::envelope::{FieldMeta, FieldRole};
19
20/// Logical Arrow type for a leaf field. Mirrors the subset of
21/// `arrow_schema::DataType` we ever produce from `EventSchema::FIELDS`.
22#[derive(Debug, Clone, Copy, PartialEq, Eq)]
23#[non_exhaustive]
24pub enum ArrowLeafType {
25    /// Variable-length UTF-8.
26    Utf8,
27    /// Variable-length UTF-8 with dictionary encoding (LowCardinality).
28    DictUtf8,
29    /// 64-bit signed integer.
30    Int64,
31    /// 64-bit unsigned integer.
32    UInt64,
33    /// 64-bit floating point.
34    Float64,
35    /// Boolean.
36    Bool,
37    /// Variable-length bytes (used for `payload_proto` raw fallback).
38    Binary,
39    /// Nanosecond timestamp encoded as `fixed64`.
40    TimestampNs,
41}
42
43impl ArrowLeafType {
44    /// Stable wire-type name used by both Parquet and ClickHouse codegen.
45    #[must_use]
46    pub const fn as_str(&self) -> &'static str {
47        match self {
48            Self::Utf8 => "utf8",
49            Self::DictUtf8 => "dict_utf8",
50            Self::Int64 => "int64",
51            Self::UInt64 => "uint64",
52            Self::Float64 => "float64",
53            Self::Bool => "bool",
54            Self::Binary => "binary",
55            Self::TimestampNs => "timestamp_ns",
56        }
57    }
58}
59
60/// One leaf-typed field in the Arrow schema for a per-event payload
61/// struct.
62#[derive(Debug, Clone)]
63pub struct ArrowField {
64    /// Snake-case proto field name, e.g. `latency_ms`.
65    pub name: String,
66    /// The proto tag — provides stable column identity across renames.
67    pub tag: u32,
68    /// Logical primitive type.
69    pub ty: ArrowLeafType,
70    /// `kind:` from the proto annotation.
71    pub kind: FieldKind,
72    /// Cardinality cap (LABEL only; `Unspecified` for non-LABEL).
73    pub cardinality: Cardinality,
74    /// Classification (drives PII redaction).
75    pub classification: Classification,
76}
77
78/// One per-event payload struct. Combined into the unified
79/// `obs_events` table by [`ArrowSchemaModel`].
80#[derive(Debug, Clone)]
81pub struct ArrowEventSchema {
82    /// Stable identity, matches `EventSchema::FULL_NAME`.
83    pub full_name: String,
84    /// `payload_<full_name_snake>` — the column name for this event's
85    /// per-event Nested struct. Spec 22 § 1.1.
86    pub payload_column: String,
87    /// One Arrow field per declared schema field.
88    pub fields: Vec<ArrowField>,
89    /// First 8 bytes of BLAKE3 over the canonical descriptor.
90    pub schema_hash: u64,
91}
92
93impl ArrowEventSchema {
94    /// Build an `ArrowEventSchema` from an [`EventSchemaErased`] view.
95    /// The leaf-type inference uses `FieldRole` plus a small heuristic
96    /// on the field name suffix (`*_ns`, `*_ms`, `*_count`, `*_id`).
97    /// Codegen will eventually own the type table directly; this
98    /// keeps Phase-4 sinks operational against the Phase-1/2 codegen.
99    #[must_use]
100    pub fn from_erased(schema: &dyn EventSchemaErased) -> Self {
101        let full = schema.full_name().to_string();
102        let payload_column = format!("payload_{}", full.replace('.', "_").to_lowercase());
103        let fields = schema
104            .fields()
105            .iter()
106            .map(arrow_field_for)
107            .collect::<Vec<_>>();
108        Self {
109            full_name: full,
110            payload_column,
111            fields,
112            schema_hash: schema.schema_hash(),
113        }
114    }
115}
116
117fn arrow_field_for(meta: &FieldMeta) -> ArrowField {
118    let kind = match meta.role {
119        FieldRole::Label => FieldKind::Label,
120        FieldRole::Attribute => FieldKind::Attribute,
121        FieldRole::Measurement => FieldKind::Measurement,
122        FieldRole::TraceId => FieldKind::TraceId,
123        FieldRole::SpanId => FieldKind::SpanId,
124        FieldRole::ParentSpanId => FieldKind::ParentSpanId,
125        FieldRole::TimestampNs => FieldKind::TimestampNs,
126        FieldRole::DurationNs => FieldKind::DurationNs,
127        FieldRole::Forensic => FieldKind::Forensic,
128    };
129    let ty = infer_leaf_type(meta);
130    ArrowField {
131        name: meta.name.to_string(),
132        tag: meta.number,
133        ty,
134        kind,
135        cardinality: meta.cardinality,
136        classification: meta.classification,
137    }
138}
139
140fn infer_leaf_type(meta: &FieldMeta) -> ArrowLeafType {
141    match meta.role {
142        FieldRole::Label => match meta.cardinality {
143            Cardinality::Low | Cardinality::Medium => ArrowLeafType::DictUtf8,
144            _ => ArrowLeafType::Utf8,
145        },
146        FieldRole::TraceId | FieldRole::SpanId | FieldRole::ParentSpanId => ArrowLeafType::Utf8,
147        FieldRole::TimestampNs => ArrowLeafType::TimestampNs,
148        FieldRole::DurationNs => ArrowLeafType::UInt64,
149        FieldRole::Measurement => {
150            // Heuristic: name suffix decides numeric width.
151            let n = meta.name;
152            let is_int = n.ends_with("_count")
153                || n.ends_with("_total")
154                || n.ends_with("_n")
155                || n.ends_with("_ms")
156                || n.ends_with("_us")
157                || n.ends_with("_ns")
158                || n.ends_with("_bytes")
159                || n.ends_with("_size");
160            if is_int {
161                ArrowLeafType::UInt64
162            } else {
163                ArrowLeafType::Float64
164            }
165        }
166        FieldRole::Attribute | FieldRole::Forensic => ArrowLeafType::Utf8,
167    }
168}
169
170/// Unified `obs_events` table schema. Used by `ParquetSink` to pre-
171/// compute the file-level Arrow schema, by `ClickHouseSink` to emit the
172/// `CREATE TABLE` DDL, and by `obs migrate {parquet,clickhouse}` for
173/// CI-driven migrations. Spec 14 § 4 / KD5 + spec 22 § 1.
174#[derive(Debug, Clone, Default)]
175pub struct ArrowSchemaModel {
176    /// Per-event payload structs, sorted by `full_name`.
177    pub events: Vec<ArrowEventSchema>,
178}
179
180impl ArrowSchemaModel {
181    /// Build the model by walking a sequence of schemas. Sorted
182    /// output makes downstream codegen byte-identical across runs
183    /// (spec 12 § 1.2).
184    #[must_use]
185    pub fn from_schemas<'a, I>(iter: I) -> Self
186    where
187        I: IntoIterator<Item = &'a (dyn EventSchemaErased + 'static)>,
188    {
189        let mut events: Vec<_> = iter
190            .into_iter()
191            .map(ArrowEventSchema::from_erased)
192            .collect();
193        events.sort_by(|a, b| a.full_name.cmp(&b.full_name));
194        Self { events }
195    }
196
197    /// Number of registered event types.
198    #[must_use]
199    pub fn len(&self) -> usize {
200        self.events.len()
201    }
202
203    /// True if no events are registered.
204    #[must_use]
205    pub fn is_empty(&self) -> bool {
206        self.events.is_empty()
207    }
208
209    /// Look up one event's struct by `full_name`.
210    #[must_use]
211    pub fn lookup(&self, full_name: &str) -> Option<&ArrowEventSchema> {
212        self.events.iter().find(|e| e.full_name == full_name)
213    }
214
215    /// Emit the model as a stable JSON string. Used by
216    /// `obs migrate parquet` and as the snapshot format for CI diffing.
217    /// Output is deterministic.
218    ///
219    /// # Errors
220    ///
221    /// Returns `serde_json::Error` if serialization fails (cannot
222    /// happen in practice for our owned structures).
223    pub fn to_json(&self) -> Result<String, serde_json::Error> {
224        let v = self.to_serializable();
225        serde_json::to_string_pretty(&v)
226    }
227
228    fn to_serializable(&self) -> serde_json::Value {
229        use serde_json::{Value, json};
230        let envelope_columns: Vec<Value> = ENVELOPE_COLUMNS
231            .iter()
232            .map(|(name, ty)| json!({"name": name, "type": ty.as_str()}))
233            .collect();
234        let labels = json!({
235            "name": "labels",
236            "type": "map<dict_utf8, utf8>",
237        });
238        let attrs = json!({
239            "name": "attrs",
240            "type": "map<dict_utf8, utf8>",
241        });
242        let payload_proto = json!({
243            "name": "payload_proto",
244            "type": "binary",
245        });
246        let mut payloads = Vec::with_capacity(self.events.len());
247        for evt in &self.events {
248            let columns: Vec<Value> = evt
249                .fields
250                .iter()
251                .map(|f| {
252                    json!({
253                        "name": f.name,
254                        "tag": f.tag,
255                        "type": f.ty.as_str(),
256                        "kind": kind_str(f.kind),
257                        "cardinality": format!("{:?}", f.cardinality),
258                        "classification": format!("{:?}", f.classification),
259                    })
260                })
261                .collect();
262            payloads.push(json!({
263                "full_name": evt.full_name,
264                "payload_column": evt.payload_column,
265                "schema_hash": format!("{:#018x}", evt.schema_hash),
266                "fields": columns,
267            }));
268        }
269        json!({
270            "table": "obs_events",
271            "envelope": envelope_columns,
272            "labels": labels,
273            "attrs": attrs,
274            "payload_proto": payload_proto,
275            "events": payloads,
276        })
277    }
278}
279
280const fn kind_str(k: FieldKind) -> &'static str {
281    match k {
282        FieldKind::Label => "LABEL",
283        FieldKind::Attribute => "ATTRIBUTE",
284        FieldKind::Measurement => "MEASUREMENT",
285        FieldKind::TraceId => "TRACE_ID",
286        FieldKind::SpanId => "SPAN_ID",
287        FieldKind::ParentSpanId => "PARENT_SPAN_ID",
288        FieldKind::TimestampNs => "TIMESTAMP_NS",
289        FieldKind::DurationNs => "DURATION_NS",
290        FieldKind::Forensic => "FORENSIC",
291        _ => "ATTRIBUTE",
292    }
293}
294
295/// Envelope columns emitted by every analytical sink. Spec 22 § 1.1
296/// "Envelope" + "Resource" rows, intentionally short names matching
297/// the ClickHouse template in spec 22 § 3.
298pub const ENVELOPE_COLUMNS: &[(&str, ArrowLeafType)] = &[
299    ("ts_ns", ArrowLeafType::TimestampNs),
300    ("full_name", ArrowLeafType::DictUtf8),
301    ("schema_hash", ArrowLeafType::UInt64),
302    ("tier", ArrowLeafType::DictUtf8),
303    ("sev", ArrowLeafType::DictUtf8),
304    ("trace_id", ArrowLeafType::Utf8),
305    ("span_id", ArrowLeafType::Utf8),
306    ("parent_span_id", ArrowLeafType::Utf8),
307    ("service", ArrowLeafType::DictUtf8),
308    ("instance", ArrowLeafType::DictUtf8),
309    ("version", ArrowLeafType::DictUtf8),
310    ("sampling_reason", ArrowLeafType::DictUtf8),
311    ("callsite_id", ArrowLeafType::UInt64),
312];
313
314#[cfg(test)]
315mod tests {
316    use super::*;
317
318    #[test]
319    fn test_should_emit_deterministic_json() {
320        let model = ArrowSchemaModel::default();
321        let s = model.to_json().expect("json renders");
322        assert!(s.contains("obs_events"));
323        assert!(s.contains("ts_ns"));
324    }
325
326    #[test]
327    fn test_envelope_columns_should_include_callsite_id() {
328        assert!(ENVELOPE_COLUMNS.iter().any(|(n, _)| *n == "callsite_id"));
329    }
330}