Skip to main content

obs_core/wire/
envelope_codec.rs

1//! Length-prefixed envelope framing for stream transports.
2//!
3//! The wire record is:
4//!
5//! ```text
6//!   [ u32 BE length ][ buffa-encoded ObsEnvelope ]
7//! ```
8//!
9//! No per-record CRC — stream transports (vsock, unix socket, TCP) are
10//! reliable byte-streams; buffa decode catches truncation. Any
11//! transport that needs integrity beyond TCP-level checksums can wrap
12//! the framed stream in its own MAC layer.
13//!
14//! Boundary-review § 3.5 (moved upstream from `tok-initd::obs::ObsEnvelopeCodec`).
15//!
16//! # Usage
17//!
18//! Reuse a single [`buffa::SizeCache`] across every envelope in one
19//! flush window — [`encode_into_with_cache`] calls `clear()` before
20//! each encode so the backing storage is amortised without leaking
21//! stale state between envelopes.
22//!
23//! ```no_run
24//! use obs_core::wire::envelope_codec;
25//! use obs_proto::obs::v1::ObsEnvelope;
26//!
27//! let env = ObsEnvelope::default();
28//! let mut buf = Vec::with_capacity(4096);
29//! let mut cache = buffa::SizeCache::new();
30//! envelope_codec::encode_into_with_cache(&env, &mut buf, &mut cache);
31//!
32//! // On the other side:
33//! if let Some((decoded, consumed)) =
34//!     envelope_codec::decode_frame(&buf, 1 << 20).expect("framing ok")
35//! {
36//!     assert_eq!(consumed, buf.len());
37//!     let _ = decoded;
38//! }
39//! ```
40
41use std::io;
42
43use buffa::{Message, SizeCache};
44use obs_proto::obs::v1::ObsEnvelope;
45
46/// Encode `env` into `out`, length-prefixed. Reuses `out`'s capacity
47/// so caller can amortise allocations across envelopes within one
48/// flush window.
49///
50/// Internally builds a fresh `SizeCache` per call. Prefer
51/// [`encode_into_with_cache`] on batched paths so the cache amortises
52/// across envelopes in the same flush window.
53pub fn encode_into(env: &ObsEnvelope, out: &mut Vec<u8>) {
54    let mut cache = SizeCache::new();
55    encode_into_with_cache(env, out, &mut cache);
56}
57
58/// Encode `env` into `out` reusing a caller-owned [`SizeCache`].
59///
60/// The cache is cleared before each encode so subsequent calls see a
61/// fresh computation but do not reallocate the backing storage. Drop
62/// the cache when the flush window closes or simply keep it alive for
63/// the life of the writer task — neither mutates the envelope.
64pub fn encode_into_with_cache(env: &ObsEnvelope, out: &mut Vec<u8>, cache: &mut SizeCache) {
65    cache.clear();
66    let len = env.compute_size(cache);
67    out.reserve(4 + len as usize);
68    out.extend_from_slice(&len.to_be_bytes());
69    env.write_to(cache, out);
70}
71
72/// Decode one length-prefixed envelope from `buf`.
73///
74/// Returns `Ok(None)` when the buffer is too short to contain a full
75/// length + payload; otherwise returns the decoded envelope and the
76/// number of bytes consumed. Caller is responsible for draining
77/// consumed bytes.
78///
79/// `max_frame` bounds the declared length — anything larger is treated
80/// as a framing error (a well-behaved emitter never produces envelopes
81/// that big; crossing the limit usually means the stream is
82/// desynchronised).
83///
84/// # Errors
85///
86/// Returns `io::ErrorKind::InvalidData` when the declared length
87/// exceeds `max_frame` or when buffa decoding fails.
88pub fn decode_frame(buf: &[u8], max_frame: usize) -> io::Result<Option<(ObsEnvelope, usize)>> {
89    let Some(prefix) = buf.get(..4) else {
90        return Ok(None);
91    };
92    let Ok(prefix) = <[u8; 4]>::try_from(prefix) else {
93        // `buf.get(..4)` already guarantees the 4-byte slice; this
94        // branch is unreachable but keeps the `try_from` happy without
95        // an index or panic.
96        return Ok(None);
97    };
98    let len = u32::from_be_bytes(prefix) as usize;
99    if len > max_frame {
100        return Err(io::Error::new(
101            io::ErrorKind::InvalidData,
102            format!("obs envelope frame too large: {len} > {max_frame}"),
103        ));
104    }
105    let Some(payload) = buf.get(4..4 + len) else {
106        return Ok(None);
107    };
108    let env = ObsEnvelope::decode_from_slice(payload)
109        .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
110    Ok(Some((env, 4 + len)))
111}
112
113#[cfg(test)]
114mod tests {
115    use obs_proto::obs::v1::{Severity as PSeverity, Tier as PTier};
116
117    use super::*;
118
119    fn sample_env() -> ObsEnvelope {
120        ObsEnvelope {
121            full_name: "obs.test.EnvelopeCodec".to_string(),
122            schema_hash: 0xdead_beef,
123            tier: buffa::EnumValue::Known(PTier::TIER_LOG),
124            sev: buffa::EnumValue::Known(PSeverity::SEVERITY_INFO),
125            ts_ns: 42,
126            service: "obs-core".to_string(),
127            instance: "test".to_string(),
128            version: "0.0.0".to_string(),
129            ..Default::default()
130        }
131    }
132
133    #[test]
134    fn test_encode_decode_round_trips() {
135        let env = sample_env();
136        let mut buf = Vec::new();
137        encode_into(&env, &mut buf);
138        let (decoded, consumed) = decode_frame(&buf, 1 << 20).expect("ok").expect("some");
139        assert_eq!(consumed, buf.len());
140        assert_eq!(decoded.full_name, env.full_name);
141        assert_eq!(decoded.schema_hash, env.schema_hash);
142    }
143
144    #[test]
145    fn test_decode_frame_returns_none_on_short_buffer() {
146        let got = decode_frame(&[0, 0, 0, 4], 1 << 20).expect("no err");
147        assert!(got.is_none(), "incomplete buffer must return Ok(None)");
148    }
149
150    #[test]
151    fn test_decode_frame_rejects_oversize() {
152        let mut buf = [0u8; 4];
153        buf.copy_from_slice(&u32::MAX.to_be_bytes());
154        let err = decode_frame(&buf, 1024).expect_err("must reject");
155        assert_eq!(err.kind(), io::ErrorKind::InvalidData);
156    }
157
158    #[test]
159    fn test_encode_into_with_cache_amortises_across_envelopes() {
160        // Two sequential encodes into the same cache must produce the
161        // same bytes as two independent `encode_into` calls — proves
162        // `clear()` happens on the second pass.
163        let env = sample_env();
164        let mut cache = SizeCache::new();
165        let mut batched = Vec::new();
166        encode_into_with_cache(&env, &mut batched, &mut cache);
167        encode_into_with_cache(&env, &mut batched, &mut cache);
168
169        let mut independent = Vec::new();
170        encode_into(&env, &mut independent);
171        encode_into(&env, &mut independent);
172
173        assert_eq!(batched, independent);
174    }
175}