Skip to main content

obs_build/
options.rs

1//! Decoder for the `(obs.v1.event)` / `(obs.v1.field)` custom options
2//! stored as `__buffa_unknown_fields` on the buffa-generated
3//! `MessageOptions`/`FieldOptions`.
4//!
5//! The bytes follow the standard protobuf wire format:
6//!
7//! ```text
8//!   tag = (field_number << 3) | wire_type
9//!   field_number = 80001  (event extension)
10//!   field_number = 80002  (field extension)
11//!   wire_type    = 2      (LEN-delimited; submessage)
12//! ```
13//!
14//! 80001 << 3 | 2 = 640010 = varint `0x8A 0x88 0x27`
15//! 80002 << 3 | 2 = 640018 = varint `0x92 0x88 0x27`
16//!
17//! Inside the LEN payload we decode an `EventMeta` / `FieldMeta` whose
18//! shape is fixed (spec 12 § 2). See `docs/research/spike-buffa-reflect.md`
19//! for the validation memo.
20
21use obs_proto::obs::v1::{Cardinality, Classification, FieldKind, MetricKind, Severity, Tier};
22
23/// Tag prefix for `(obs.v1.event)` (field 80001, wire type 2).
24const EVENT_TAG_BYTES: [u8; 3] = [0x8A, 0x88, 0x27];
25/// Tag prefix for `(obs.v1.field)` (field 80002, wire type 2).
26const FIELD_TAG_BYTES: [u8; 3] = [0x92, 0x88, 0x27];
27
28/// Decoded `(obs.v1.event)` payload.
29#[derive(Debug, Default, Clone)]
30#[non_exhaustive]
31pub struct EventOptions {
32    /// Tier declared by the schema; default `Tier::Log` if absent.
33    pub tier: Option<Tier>,
34    /// Default severity; default `Severity::Info` if absent.
35    pub default_sev: Option<Severity>,
36    /// Sibling full_name when this event participates in a
37    /// Started/Completed pair (spec 93 P1-7).
38    pub paired_with: Option<String>,
39}
40
41/// Decoded `(obs.v1.field)` payload.
42#[derive(Debug, Default, Clone)]
43#[non_exhaustive]
44pub struct FieldOptions {
45    /// Field role.
46    pub kind: Option<FieldKind>,
47    /// Cardinality cap.
48    pub cardinality: Option<Cardinality>,
49    /// PII / SECRET classification.
50    pub classification: Option<Classification>,
51    /// Metric spec when `kind = MEASUREMENT`.
52    pub metric: Option<MetricSpec>,
53}
54
55/// Decoded `(obs.v1.MetricSpec)`.
56#[derive(Debug, Default, Clone)]
57#[non_exhaustive]
58pub struct MetricSpec {
59    /// Counter / Gauge / Histogram.
60    pub kind: Option<MetricKind>,
61    /// UCUM unit (`"ms"`, `"By"`, `"1"`, …).
62    pub unit: Option<String>,
63    /// Histogram buckets.
64    pub bounds: Vec<f64>,
65}
66
67/// Errors returned by the option scanner.
68#[derive(Debug, thiserror::Error)]
69#[non_exhaustive]
70pub enum CodegenError {
71    /// The protoc invocation failed.
72    #[error("protoc failed: {0}")]
73    Protoc(String),
74
75    /// The descriptor set could not be read.
76    #[error("descriptor set IO: {0}")]
77    DescriptorIo(#[source] std::io::Error),
78
79    /// The descriptor set could not be decoded.
80    #[error("descriptor decode failed: {0}")]
81    DescriptorDecode(String),
82
83    /// The buffa-build invocation failed.
84    #[error("buffa-build failed: {0}")]
85    Buffa(String),
86
87    /// The custom option bytes could not be decoded.
88    #[error("option decode failed for `{path}`: {detail}")]
89    OptionDecode {
90        /// Fully qualified message/field path.
91        path: String,
92        /// Human-readable detail.
93        detail: String,
94    },
95
96    /// IO error while writing generated files.
97    #[error("output IO: {0}")]
98    OutputIo(#[source] std::io::Error),
99}
100
101/// Scan an `__buffa_unknown_fields` byte string for the
102/// `(obs.v1.event)` extension and return the decoded payload.
103///
104/// Returns `Ok(None)` if the option is absent; `Err` only when the
105/// bytes are malformed (truncated, invalid varint, etc.).
106///
107/// # Errors
108///
109/// Returns [`CodegenError::OptionDecode`] when the wire payload is
110/// truncated or contains an invalid sub-message.
111#[doc(hidden)]
112pub fn read_event_options(bytes: &[u8], path: &str) -> Result<Option<EventOptions>, CodegenError> {
113    let Some(payload) = find_tag_payload(bytes, &EVENT_TAG_BYTES) else {
114        return Ok(None);
115    };
116    let mut out = EventOptions::default();
117    walk_message(payload, |field, kind, value| {
118        match (field, kind) {
119            // tier (1, varint)
120            (1, WireKind::Varint) => {
121                if let Some(v) = value.varint() {
122                    out.tier = decode_tier(v as i32);
123                }
124            }
125            // default_sev (2, varint)
126            (2, WireKind::Varint) => {
127                if let Some(v) = value.varint() {
128                    out.default_sev = decode_severity(v as i32);
129                }
130            }
131            // paired_with (3, length-delimited string) — spec 93 P1-7.
132            (3, WireKind::Length) => {
133                if let Some(s) = value.length()
134                    && let Ok(s) = std::str::from_utf8(s)
135                {
136                    out.paired_with = Some(s.to_string());
137                }
138            }
139            _ => {}
140        }
141    })
142    .map_err(|detail| CodegenError::OptionDecode {
143        path: path.to_string(),
144        detail: detail.to_string(),
145    })?;
146    Ok(Some(out))
147}
148
149/// Scan an `__buffa_unknown_fields` byte string for the
150/// `(obs.v1.field)` extension. See [`read_event_options`].
151///
152/// # Errors
153///
154/// Returns [`CodegenError::OptionDecode`] when the wire payload is
155/// truncated or contains an invalid sub-message.
156#[doc(hidden)]
157pub fn read_field_options(bytes: &[u8], path: &str) -> Result<Option<FieldOptions>, CodegenError> {
158    let Some(payload) = find_tag_payload(bytes, &FIELD_TAG_BYTES) else {
159        return Ok(None);
160    };
161    let mut out = FieldOptions::default();
162    walk_message(payload, |field, kind, value| match (field, kind) {
163        (1, WireKind::Varint) => {
164            out.kind = value.varint().and_then(|v| decode_field_kind(v as i32))
165        }
166        (2, WireKind::Varint) => {
167            out.cardinality = value.varint().and_then(|v| decode_cardinality(v as i32))
168        }
169        (3, WireKind::Varint) => {
170            out.classification = value.varint().and_then(|v| decode_classification(v as i32))
171        }
172        (4, WireKind::Length) => {
173            if let Some(submsg) = value.length() {
174                let mut spec = MetricSpec::default();
175                let _ = walk_message(submsg, |sf, sk, sv| match (sf, sk) {
176                    (1, WireKind::Varint) => {
177                        spec.kind = sv.varint().and_then(|v| decode_metric_kind(v as i32))
178                    }
179                    (2, WireKind::Length) => {
180                        if let Some(s) = sv.length()
181                            && let Ok(s) = std::str::from_utf8(s)
182                        {
183                            spec.unit = Some(s.to_string());
184                        }
185                    }
186                    (3, WireKind::Length) => {
187                        // Packed repeated double — 8 bytes each.
188                        if let Some(s) = sv.length() {
189                            for chunk in s.chunks_exact(8) {
190                                if let Ok(arr) = <[u8; 8]>::try_from(chunk) {
191                                    spec.bounds.push(f64::from_le_bytes(arr));
192                                }
193                            }
194                        }
195                    }
196                    (3, WireKind::Fixed64) => {
197                        if let Some(b) = sv.fixed64() {
198                            spec.bounds.push(f64::from_le_bytes(b));
199                        }
200                    }
201                    _ => {}
202                });
203                out.metric = Some(spec);
204            }
205        }
206        _ => {}
207    })
208    .map_err(|detail| CodegenError::OptionDecode {
209        path: path.to_string(),
210        detail: detail.to_string(),
211    })?;
212    Ok(Some(out))
213}
214
215fn decode_tier(i: i32) -> Option<Tier> {
216    Some(match i {
217        1 => Tier::Log,
218        2 => Tier::Metric,
219        3 => Tier::Trace,
220        4 => Tier::Audit,
221        _ => Tier::Unspecified,
222    })
223}
224
225fn decode_severity(i: i32) -> Option<Severity> {
226    Some(match i {
227        1 => Severity::Trace,
228        2 => Severity::Debug,
229        3 => Severity::Info,
230        4 => Severity::Warn,
231        5 => Severity::Error,
232        6 => Severity::Fatal,
233        _ => Severity::Unspecified,
234    })
235}
236
237fn decode_field_kind(i: i32) -> Option<FieldKind> {
238    Some(match i {
239        1 => FieldKind::Label,
240        2 => FieldKind::Attribute,
241        3 => FieldKind::Measurement,
242        4 => FieldKind::TraceId,
243        5 => FieldKind::SpanId,
244        6 => FieldKind::ParentSpanId,
245        7 => FieldKind::TimestampNs,
246        8 => FieldKind::DurationNs,
247        9 => FieldKind::Forensic,
248        _ => FieldKind::Unspecified,
249    })
250}
251
252fn decode_cardinality(i: i32) -> Option<Cardinality> {
253    Some(match i {
254        1 => Cardinality::Low,
255        2 => Cardinality::Medium,
256        3 => Cardinality::High,
257        4 => Cardinality::Unbounded,
258        _ => Cardinality::Unspecified,
259    })
260}
261
262fn decode_classification(i: i32) -> Option<Classification> {
263    Some(match i {
264        1 => Classification::Internal,
265        2 => Classification::Pii,
266        3 => Classification::Secret,
267        _ => Classification::Unspecified,
268    })
269}
270
271fn decode_metric_kind(i: i32) -> Option<MetricKind> {
272    Some(match i {
273        1 => MetricKind::Counter,
274        2 => MetricKind::Gauge,
275        3 => MetricKind::Histogram,
276        _ => MetricKind::Unspecified,
277    })
278}
279
280// ─── Minimal protobuf wire-format scanner ──────────────────────────────
281
282#[derive(Clone, Copy, PartialEq, Eq)]
283enum WireKind {
284    Varint,
285    Fixed64,
286    Length,
287    Fixed32,
288}
289
290enum WireValue<'a> {
291    Varint(u64),
292    Fixed64([u8; 8]),
293    Length(&'a [u8]),
294    #[allow(dead_code)] // emitted by walk_message; no current consumer
295    Fixed32([u8; 4]),
296}
297
298impl<'a> WireValue<'a> {
299    fn varint(&self) -> Option<u64> {
300        match self {
301            Self::Varint(v) => Some(*v),
302            _ => None,
303        }
304    }
305    fn fixed64(&self) -> Option<[u8; 8]> {
306        match self {
307            Self::Fixed64(v) => Some(*v),
308            _ => None,
309        }
310    }
311    fn length(&self) -> Option<&'a [u8]> {
312        match self {
313            Self::Length(s) => Some(*s),
314            _ => None,
315        }
316    }
317}
318
319#[derive(Debug)]
320struct WireScanError(&'static str);
321
322impl std::fmt::Display for WireScanError {
323    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
324        f.write_str(self.0)
325    }
326}
327
328/// Find the LEN payload of the first occurrence of a specific tag
329/// prefix in the byte stream. Used to locate the `(obs.v1.event)` or
330/// `(obs.v1.field)` extension payload.
331fn find_tag_payload<'a>(bytes: &'a [u8], tag: &[u8]) -> Option<&'a [u8]> {
332    let mut i = 0;
333    while i + tag.len() <= bytes.len() {
334        if &bytes[i..i + tag.len()] == tag {
335            // Tag matched; next is varint LEN.
336            let mut j = i + tag.len();
337            let (len, consumed) = read_varint(&bytes[j..]).ok()?;
338            j += consumed;
339            let start = j;
340            let end = start.checked_add(len as usize)?;
341            if end > bytes.len() {
342                return None;
343            }
344            return Some(&bytes[start..end]);
345        }
346        i += 1;
347    }
348    None
349}
350
351fn walk_message<F>(payload: &[u8], mut visit: F) -> Result<(), WireScanError>
352where
353    F: FnMut(u32, WireKind, WireValue<'_>),
354{
355    let mut i = 0;
356    while i < payload.len() {
357        let (tag, consumed) =
358            read_varint(&payload[i..]).map_err(|_| WireScanError("invalid tag varint"))?;
359        i += consumed;
360        let field = (tag >> 3) as u32;
361        let wire = tag & 0b111;
362        match wire {
363            0 => {
364                let (v, c) = read_varint(&payload[i..])
365                    .map_err(|_| WireScanError("invalid value varint"))?;
366                i += c;
367                visit(field, WireKind::Varint, WireValue::Varint(v));
368            }
369            1 => {
370                if i + 8 > payload.len() {
371                    return Err(WireScanError("truncated fixed64"));
372                }
373                let mut arr = [0u8; 8];
374                arr.copy_from_slice(&payload[i..i + 8]);
375                i += 8;
376                visit(field, WireKind::Fixed64, WireValue::Fixed64(arr));
377            }
378            2 => {
379                let (len, c) =
380                    read_varint(&payload[i..]).map_err(|_| WireScanError("invalid LEN varint"))?;
381                i += c;
382                let end = i
383                    .checked_add(len as usize)
384                    .ok_or(WireScanError("LEN overflow"))?;
385                if end > payload.len() {
386                    return Err(WireScanError("truncated LEN payload"));
387                }
388                visit(field, WireKind::Length, WireValue::Length(&payload[i..end]));
389                i = end;
390            }
391            5 => {
392                if i + 4 > payload.len() {
393                    return Err(WireScanError("truncated fixed32"));
394                }
395                let mut arr = [0u8; 4];
396                arr.copy_from_slice(&payload[i..i + 4]);
397                i += 4;
398                visit(field, WireKind::Fixed32, WireValue::Fixed32(arr));
399            }
400            _ => return Err(WireScanError("unknown wire type")),
401        }
402    }
403    Ok(())
404}
405
406fn read_varint(bytes: &[u8]) -> Result<(u64, usize), &'static str> {
407    let mut v: u64 = 0;
408    let mut shift = 0u32;
409    for (idx, b) in bytes.iter().enumerate().take(10) {
410        v |= ((*b & 0x7f) as u64) << shift;
411        if (*b & 0x80) == 0 {
412            return Ok((v, idx + 1));
413        }
414        shift += 7;
415    }
416    Err("varint too long")
417}
418
419#[cfg(test)]
420mod tests {
421    use super::*;
422
423    #[test]
424    fn test_should_decode_event_options_from_spike_payload() {
425        // Bytes captured from docs/research/spike-buffa-reflect.md:
426        // 8a 88 27 04 08 01 10 03 → (obs.v1.event) = { tier: 1, default_sev: 3 }
427        let bytes = [0x8a, 0x88, 0x27, 0x04, 0x08, 0x01, 0x10, 0x03];
428        let opts = read_event_options(&bytes, "test").unwrap().unwrap();
429        assert_eq!(opts.tier, Some(Tier::Log));
430        assert_eq!(opts.default_sev, Some(Severity::Info));
431    }
432
433    #[test]
434    fn test_should_decode_field_options_from_spike_payload() {
435        // 92 88 27 06 08 02 10 03 18 02 → (obs.v1.field) = {
436        //     kind: ATTRIBUTE(2), cardinality: HIGH(3), classification: PII(2)
437        // }
438        let bytes = [0x92, 0x88, 0x27, 0x06, 0x08, 0x02, 0x10, 0x03, 0x18, 0x02];
439        let opts = read_field_options(&bytes, "test").unwrap().unwrap();
440        assert_eq!(opts.kind, Some(FieldKind::Attribute));
441        assert_eq!(opts.cardinality, Some(Cardinality::High));
442        assert_eq!(opts.classification, Some(Classification::Pii));
443    }
444
445    #[test]
446    fn test_should_return_none_when_tag_absent() {
447        let bytes = [0x00, 0x01, 0x02];
448        assert!(read_event_options(&bytes, "test").unwrap().is_none());
449    }
450}