obs_core/wire/envelope_codec.rs
1//! Length-prefixed envelope framing for stream transports.
2//!
3//! The wire record is:
4//!
5//! ```text
6//! [ u32 BE length ][ buffa-encoded ObsEnvelope ]
7//! ```
8//!
9//! No per-record CRC — stream transports (vsock, unix socket, TCP) are
10//! reliable byte-streams; buffa decode catches truncation. Any
11//! transport that needs integrity beyond TCP-level checksums can wrap
12//! the framed stream in its own MAC layer.
13//!
14//! Boundary-review § 3.5 (moved upstream from `tok-initd::obs::ObsEnvelopeCodec`).
15//!
16//! # Usage
17//!
18//! Reuse a single [`buffa::SizeCache`] across every envelope in one
19//! flush window — [`encode_into_with_cache`] calls `clear()` before
20//! each encode so the backing storage is amortised without leaking
21//! stale state between envelopes.
22//!
23//! ```no_run
24//! use obs_core::wire::envelope_codec;
25//! use obs_proto::obs::v1::ObsEnvelope;
26//!
27//! let env = ObsEnvelope::default();
28//! let mut buf = Vec::with_capacity(4096);
29//! let mut cache = buffa::SizeCache::new();
30//! envelope_codec::encode_into_with_cache(&env, &mut buf, &mut cache);
31//!
32//! // On the other side:
33//! if let Some((decoded, consumed)) =
34//! envelope_codec::decode_frame(&buf, 1 << 20).expect("framing ok")
35//! {
36//! assert_eq!(consumed, buf.len());
37//! let _ = decoded;
38//! }
39//! ```
40
41use std::io;
42
43use buffa::{Message, SizeCache};
44use obs_proto::obs::v1::ObsEnvelope;
45
46/// Encode `env` into `out`, length-prefixed. Reuses `out`'s capacity
47/// so caller can amortise allocations across envelopes within one
48/// flush window.
49///
50/// Internally builds a fresh `SizeCache` per call. Prefer
51/// [`encode_into_with_cache`] on batched paths so the cache amortises
52/// across envelopes in the same flush window.
53pub fn encode_into(env: &ObsEnvelope, out: &mut Vec<u8>) {
54 let mut cache = SizeCache::new();
55 encode_into_with_cache(env, out, &mut cache);
56}
57
58/// Encode `env` into `out` reusing a caller-owned [`SizeCache`].
59///
60/// The cache is cleared before each encode so subsequent calls see a
61/// fresh computation but do not reallocate the backing storage. Drop
62/// the cache when the flush window closes or simply keep it alive for
63/// the life of the writer task — neither mutates the envelope.
64pub fn encode_into_with_cache(env: &ObsEnvelope, out: &mut Vec<u8>, cache: &mut SizeCache) {
65 cache.clear();
66 let len = env.compute_size(cache);
67 out.reserve(4 + len as usize);
68 out.extend_from_slice(&len.to_be_bytes());
69 env.write_to(cache, out);
70}
71
72/// Decode one length-prefixed envelope from `buf`.
73///
74/// Returns `Ok(None)` when the buffer is too short to contain a full
75/// length + payload; otherwise returns the decoded envelope and the
76/// number of bytes consumed. Caller is responsible for draining
77/// consumed bytes.
78///
79/// `max_frame` bounds the declared length — anything larger is treated
80/// as a framing error (a well-behaved emitter never produces envelopes
81/// that big; crossing the limit usually means the stream is
82/// desynchronised).
83///
84/// # Errors
85///
86/// Returns `io::ErrorKind::InvalidData` when the declared length
87/// exceeds `max_frame` or when buffa decoding fails.
88pub fn decode_frame(buf: &[u8], max_frame: usize) -> io::Result<Option<(ObsEnvelope, usize)>> {
89 let Some(prefix) = buf.get(..4) else {
90 return Ok(None);
91 };
92 let Ok(prefix) = <[u8; 4]>::try_from(prefix) else {
93 // `buf.get(..4)` already guarantees the 4-byte slice; this
94 // branch is unreachable but keeps the `try_from` happy without
95 // an index or panic.
96 return Ok(None);
97 };
98 let len = u32::from_be_bytes(prefix) as usize;
99 if len > max_frame {
100 return Err(io::Error::new(
101 io::ErrorKind::InvalidData,
102 format!("obs envelope frame too large: {len} > {max_frame}"),
103 ));
104 }
105 let Some(payload) = buf.get(4..4 + len) else {
106 return Ok(None);
107 };
108 let env = ObsEnvelope::decode_from_slice(payload)
109 .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
110 Ok(Some((env, 4 + len)))
111}
112
113#[cfg(test)]
114mod tests {
115 use obs_proto::obs::v1::{Severity as PSeverity, Tier as PTier};
116
117 use super::*;
118
119 fn sample_env() -> ObsEnvelope {
120 ObsEnvelope {
121 full_name: "obs.test.EnvelopeCodec".to_string(),
122 schema_hash: 0xdead_beef,
123 tier: buffa::EnumValue::Known(PTier::TIER_LOG),
124 sev: buffa::EnumValue::Known(PSeverity::SEVERITY_INFO),
125 ts_ns: 42,
126 service: "obs-core".to_string(),
127 instance: "test".to_string(),
128 version: "0.0.0".to_string(),
129 ..Default::default()
130 }
131 }
132
133 #[test]
134 fn test_encode_decode_round_trips() {
135 let env = sample_env();
136 let mut buf = Vec::new();
137 encode_into(&env, &mut buf);
138 let (decoded, consumed) = decode_frame(&buf, 1 << 20).expect("ok").expect("some");
139 assert_eq!(consumed, buf.len());
140 assert_eq!(decoded.full_name, env.full_name);
141 assert_eq!(decoded.schema_hash, env.schema_hash);
142 }
143
144 #[test]
145 fn test_decode_frame_returns_none_on_short_buffer() {
146 let got = decode_frame(&[0, 0, 0, 4], 1 << 20).expect("no err");
147 assert!(got.is_none(), "incomplete buffer must return Ok(None)");
148 }
149
150 #[test]
151 fn test_decode_frame_rejects_oversize() {
152 let mut buf = [0u8; 4];
153 buf.copy_from_slice(&u32::MAX.to_be_bytes());
154 let err = decode_frame(&buf, 1024).expect_err("must reject");
155 assert_eq!(err.kind(), io::ErrorKind::InvalidData);
156 }
157
158 #[test]
159 fn test_encode_into_with_cache_amortises_across_envelopes() {
160 // Two sequential encodes into the same cache must produce the
161 // same bytes as two independent `encode_into` calls — proves
162 // `clear()` happens on the second pass.
163 let env = sample_env();
164 let mut cache = SizeCache::new();
165 let mut batched = Vec::new();
166 encode_into_with_cache(&env, &mut batched, &mut cache);
167 encode_into_with_cache(&env, &mut batched, &mut cache);
168
169 let mut independent = Vec::new();
170 encode_into(&env, &mut independent);
171 encode_into(&env, &mut independent);
172
173 assert_eq!(batched, independent);
174 }
175}