Skip to main content

obs_core/envelope/
builder.rs

1//! Envelope construction helpers used by the emit hot path.
2
3use std::time::{SystemTime, UNIX_EPOCH};
4
5use bytes::BytesMut;
6use obs_proto::{
7    __private::Message,
8    obs::v1::{ObsEnvelope, SamplingReason, Severity},
9};
10
11use crate::{callsite::ObsCallsite, envelope::projection::EventSchema};
12
13thread_local! {
14    /// Per-thread reusable encode buffer. Cleared and reused every emit
15    /// so steady-state has no per-event allocation (spec 11 § 5).
16    static EMIT_BUF: std::cell::RefCell<BytesMut> = std::cell::RefCell::new(BytesMut::with_capacity(4096));
17}
18
19/// Newtype wrapper around the wire `ObsEnvelope` so tests and downstream
20/// code don't depend directly on the buffa-generated type's
21/// `Default + Clone` shape (which is private to `obs-proto`'s codegen
22/// boundary).
23#[derive(Debug, Clone, Default)]
24pub struct Envelope(pub ObsEnvelope);
25
26impl Envelope {
27    /// Borrow the inner envelope.
28    #[must_use]
29    pub fn inner(&self) -> &ObsEnvelope {
30        &self.0
31    }
32
33    /// Mutate the inner envelope.
34    pub fn inner_mut(&mut self) -> &mut ObsEnvelope {
35        &mut self.0
36    }
37
38    /// Take the inner envelope.
39    #[must_use]
40    pub fn into_inner(self) -> ObsEnvelope {
41        self.0
42    }
43}
44
45/// Build an envelope for the given event using its declared default
46/// severity. The payload is encoded into a thread-local scratch buffer
47/// and copied into the envelope's `payload` field; labels and lifted
48/// fields are still empty here — `project()` runs next on the emit path.
49///
50/// Hot path. Spec 11 § 4.1 step 3.
51#[must_use]
52pub fn build_envelope<E: EventSchema>(callsite: &ObsCallsite, event: &E) -> ObsEnvelope {
53    build_envelope_at::<E>(callsite, event, E::DEFAULT_SEV)
54}
55
56/// Like [`build_envelope`] but with a caller-specified severity (used
57/// by `emit_at(sev)`).
58#[must_use]
59pub fn build_envelope_at<E: EventSchema>(
60    callsite: &ObsCallsite,
61    event: &E,
62    sev: Severity,
63) -> ObsEnvelope {
64    let _ = callsite; // reserved: callsite metadata threading
65    let payload = EMIT_BUF.with(|cell| {
66        let mut buf = cell.borrow_mut();
67        buf.clear();
68        event.encode_payload(&mut buf);
69        buf.split().freeze().to_vec()
70    });
71
72    let ts_ns = SystemTime::now()
73        .duration_since(UNIX_EPOCH)
74        .map(|d| d.as_nanos() as u64)
75        .unwrap_or(0);
76
77    ObsEnvelope {
78        full_name: E::FULL_NAME.to_string(),
79        schema_hash: E::SCHEMA_HASH,
80        tier: ::buffa::EnumValue::Known(E::TIER),
81        sev: ::buffa::EnumValue::Known(sev),
82        ts_ns,
83        payload,
84        sampling_reason: ::buffa::EnumValue::Known(SamplingReason::HeadRate),
85        ..Default::default()
86    }
87}
88
89/// Encode an `ObsEnvelope` into a `Vec<u8>`. Convenience for tests and
90/// for sinks that ship raw envelope bytes (NDJSON, OTLP).
91#[must_use]
92#[allow(dead_code)] // re-emerges once Phase-3 NDJSON sink uses it
93pub fn encode_envelope(env: &ObsEnvelope) -> Vec<u8> {
94    let mut buf = Vec::with_capacity(env.encoded_len() as usize);
95    env.encode(&mut buf);
96    buf
97}