Skip to main content

obs_core/envelope/
builder.rs

1//! Envelope construction helpers used by the emit hot path.
2
3use std::time::{SystemTime, UNIX_EPOCH};
4
5use bytes::BytesMut;
6use obs_proto::{__private::Message, obs::v1::ObsEnvelope};
7use obs_types::{SamplingReason, Severity};
8
9use crate::{callsite::ObsCallsite, envelope::projection::EventSchema};
10
11thread_local! {
12    /// Per-thread reusable encode buffer. Cleared and reused every emit
13    /// so steady-state has no per-event allocation (spec 11 § 5).
14    static EMIT_BUF: std::cell::RefCell<BytesMut> = std::cell::RefCell::new(BytesMut::with_capacity(4096));
15}
16
17/// Newtype wrapper around the wire `ObsEnvelope` so tests and downstream
18/// code don't depend directly on the buffa-generated type's
19/// `Default + Clone` shape (which is private to `obs-proto`'s codegen
20/// boundary).
21#[derive(Debug, Clone, Default)]
22pub struct Envelope(pub ObsEnvelope);
23
24impl Envelope {
25    /// Borrow the inner envelope.
26    #[must_use]
27    pub fn inner(&self) -> &ObsEnvelope {
28        &self.0
29    }
30
31    /// Mutate the inner envelope.
32    pub fn inner_mut(&mut self) -> &mut ObsEnvelope {
33        &mut self.0
34    }
35
36    /// Take the inner envelope.
37    #[must_use]
38    pub fn into_inner(self) -> ObsEnvelope {
39        self.0
40    }
41}
42
43/// Build an envelope for the given event using its declared default
44/// severity. The payload is encoded into a thread-local scratch buffer
45/// and copied into the envelope's `payload` field; labels and lifted
46/// fields are still empty here — `project()` runs next on the emit path.
47///
48/// Hot path. Spec 11 § 4.1 step 3.
49#[must_use]
50pub fn build_envelope<E: EventSchema>(callsite: &ObsCallsite, event: &E) -> ObsEnvelope {
51    build_envelope_at::<E>(callsite, event, E::DEFAULT_SEV)
52}
53
54/// Like [`build_envelope`] but with a caller-specified severity (used
55/// by `emit_at(sev)`).
56#[must_use]
57pub fn build_envelope_at<E: EventSchema>(
58    callsite: &ObsCallsite,
59    event: &E,
60    sev: Severity,
61) -> ObsEnvelope {
62    let _ = callsite; // reserved: callsite metadata threading
63    let payload = EMIT_BUF.with(|cell| {
64        let mut buf = cell.borrow_mut();
65        buf.clear();
66        event.encode_payload(&mut buf);
67        buf.split().freeze().to_vec()
68    });
69
70    let ts_ns = SystemTime::now()
71        .duration_since(UNIX_EPOCH)
72        .map(|d| d.as_nanos() as u64)
73        .unwrap_or(0);
74
75    ObsEnvelope {
76        full_name: E::FULL_NAME.to_string(),
77        schema_hash: E::SCHEMA_HASH,
78        tier: ::buffa::EnumValue::Known(tier_to_proto(E::TIER)),
79        sev: ::buffa::EnumValue::Known(sev_to_proto(sev)),
80        ts_ns,
81        payload,
82        sampling_reason: ::buffa::EnumValue::Known(sampling_reason_to_proto(
83            SamplingReason::HeadRate,
84        )),
85        ..Default::default()
86    }
87}
88
89#[allow(non_snake_case, non_upper_case_globals)]
90fn tier_to_proto(t: obs_types::Tier) -> obs_proto::obs::v1::Tier {
91    use obs_proto::obs::v1::Tier as P;
92    match t {
93        obs_types::Tier::Log => P::TIER_LOG,
94        obs_types::Tier::Metric => P::TIER_METRIC,
95        obs_types::Tier::Trace => P::TIER_TRACE,
96        obs_types::Tier::Audit => P::TIER_AUDIT,
97        _ => P::TIER_UNSPECIFIED,
98    }
99}
100
101#[allow(non_snake_case, non_upper_case_globals)]
102fn sev_to_proto(s: obs_types::Severity) -> obs_proto::obs::v1::Severity {
103    use obs_proto::obs::v1::Severity as P;
104    match s {
105        obs_types::Severity::Trace => P::SEVERITY_TRACE,
106        obs_types::Severity::Debug => P::SEVERITY_DEBUG,
107        obs_types::Severity::Info => P::SEVERITY_INFO,
108        obs_types::Severity::Warn => P::SEVERITY_WARN,
109        obs_types::Severity::Error => P::SEVERITY_ERROR,
110        obs_types::Severity::Fatal => P::SEVERITY_FATAL,
111        _ => P::SEVERITY_UNSPECIFIED,
112    }
113}
114
115#[allow(non_snake_case, non_upper_case_globals)]
116fn sampling_reason_to_proto(r: SamplingReason) -> obs_proto::obs::v1::SamplingReason {
117    use obs_proto::obs::v1::SamplingReason as P;
118    match r {
119        SamplingReason::HeadRate => P::SAMPLING_REASON_HEAD_RATE,
120        SamplingReason::TailError => P::SAMPLING_REASON_TAIL_ERROR,
121        SamplingReason::Slow => P::SAMPLING_REASON_SLOW,
122        SamplingReason::Forensic => P::SAMPLING_REASON_FORENSIC,
123        SamplingReason::Audit => P::SAMPLING_REASON_AUDIT,
124        SamplingReason::Runtime => P::SAMPLING_REASON_RUNTIME,
125        SamplingReason::Override => P::SAMPLING_REASON_OVERRIDE,
126        _ => P::SAMPLING_REASON_UNSPECIFIED,
127    }
128}
129
130/// Encode an `ObsEnvelope` into a `Vec<u8>`. Convenience for tests and
131/// for sinks that ship raw envelope bytes (NDJSON, OTLP).
132#[must_use]
133#[allow(dead_code)] // re-emerges once Phase-3 NDJSON sink uses it
134pub fn encode_envelope(env: &ObsEnvelope) -> Vec<u8> {
135    let mut buf = Vec::with_capacity(env.encoded_len() as usize);
136    env.encode(&mut buf);
137    buf
138}