reddb_wire/redwire/builder.rs
1//! Server-side frame construction discipline.
2//!
3//! Every server-emitted frame today is built by stitching together
4//! [`Frame::new`] + [`Frame::with_stream`] + [`Frame::with_flags`] at
5//! the call site. That spreads four invariants across every dispatch
6//! path:
7//!
8//! 1. Correlation-id propagation — each response must echo the
9//! request frame's id (or `0` for unsolicited frames).
10//! 2. `MORE_FRAMES` sequencing — only the *last* frame of a multi-
11//! frame reply may clear the flag.
12//! 3. `MAX_FRAME_SIZE` enforcement — the codec checks on decode but
13//! a producer happily encodes oversized frames the peer will
14//! reject anyway.
15//! 4. Compression policy — callers either opt in by setting
16//! `Flags::COMPRESSED` or do not, and the codec silently falls
17//! back to plaintext on incompressible input.
18//!
19//! [`FrameBuilder`] owns those invariants. The acceptance test for
20//! this module is the deletion test: deleting `builder.rs` forces
21//! frame-construction discipline back to inline `Frame::new` calls
22//! at every dispatch site.
23//!
24//! ```ignore
25//! use reddb_wire::redwire::{FrameBuilder, MessageKind};
26//!
27//! let frame = FrameBuilder::reply_to(request.correlation_id)
28//! .kind(MessageKind::Result)
29//! .payload(body)
30//! .stream_id(42)
31//! .more_frames(false)
32//! .build()?;
33//! ```
34//!
35//! The builder is engine-free — it only depends on [`Frame`],
36//! [`MessageKind`], [`Flags`], and the size constants from this
37//! crate. Server dispatch (auth, session, listener) constructs
38//! frames through the builder; the codec stays focused on bytes.
39
40use super::frame::{Flags, Frame, MessageKind, FRAME_HEADER_SIZE, MAX_FRAME_SIZE};
41
42/// Errors surfaced at `build()` time so they fail at construction
43/// rather than at encode time, where the call site has already lost
44/// the context to recover.
45///
46/// Distinct from [`crate::redwire::codec::FrameError`], which covers
47/// decode-side framing errors. The split keeps producer-side
48/// invariants (this) separate from consumer-side parsing (that).
49#[derive(Debug, Clone, PartialEq, Eq)]
50pub enum BuildError {
51 /// `kind()` was never called — a frame has no default kind so
52 /// the builder refuses to guess.
53 KindMissing,
54 /// The encoded frame would exceed [`MAX_FRAME_SIZE`].
55 PayloadTooLarge { encoded_len: usize, max: u32 },
56 /// Catalog cross-check failed: the requested flag set isn't in
57 /// `MessageKind::allowed_flags()` for the chosen kind.
58 FlagsNotAllowedForKind { kind: MessageKind, flags: u8 },
59}
60
61impl std::fmt::Display for BuildError {
62 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
63 match self {
64 Self::KindMissing => write!(f, "FrameBuilder: kind() must be called before build()"),
65 Self::PayloadTooLarge { encoded_len, max } => write!(
66 f,
67 "FrameBuilder: encoded frame size {encoded_len} exceeds MAX_FRAME_SIZE ({max})"
68 ),
69 Self::FlagsNotAllowedForKind { kind, flags } => write!(
70 f,
71 "FrameBuilder: flag bits 0x{flags:02x} not allowed on kind {kind:?}"
72 ),
73 }
74 }
75}
76
77impl std::error::Error for BuildError {}
78
79/// Compression intent recorded by `compress(true|false)`. The
80/// builder defers the actual zstd call to the codec but tracks
81/// whether the caller asked for compression so it can drop the
82/// `COMPRESSED` flag if the payload is provably incompressible
83/// (see [`FrameBuilder::build`]).
84#[derive(Debug, Clone, Copy, PartialEq, Eq)]
85enum Compress {
86 No,
87 Yes,
88}
89
90/// Builder for server-emitted [`Frame`]s.
91///
92/// Construct with [`FrameBuilder::reply_to`] for response frames
93/// (carries the request's correlation id) or
94/// [`FrameBuilder::unsolicited`] for server-initiated frames
95/// (correlation id `0`). All other fields are optional.
96#[derive(Debug, Clone)]
97pub struct FrameBuilder {
98 kind: Option<MessageKind>,
99 correlation_id: u64,
100 stream_id: u16,
101 payload: Vec<u8>,
102 flags: Flags,
103 compress: Compress,
104 /// `true` when the caller has *not* yet declared this is the
105 /// last frame of a multi-frame reply — i.e. `MORE_FRAMES` is
106 /// set. Defaults to `false` (single-frame reply).
107 more_frames: bool,
108}
109
110impl FrameBuilder {
111 /// Reply to a request frame. Echoes the caller's correlation id
112 /// so the client can pair the response with the request.
113 pub fn reply_to(correlation_id: u64) -> Self {
114 Self::with_correlation(correlation_id)
115 }
116
117 /// Server-initiated frame with no request to echo (correlation
118 /// id `0`). Used for notices, unsolicited Bye, etc.
119 pub fn unsolicited() -> Self {
120 Self::with_correlation(0)
121 }
122
123 fn with_correlation(correlation_id: u64) -> Self {
124 Self {
125 kind: None,
126 correlation_id,
127 stream_id: 0,
128 payload: Vec::new(),
129 flags: Flags::empty(),
130 compress: Compress::No,
131 more_frames: false,
132 }
133 }
134
135 pub fn kind(mut self, kind: MessageKind) -> Self {
136 self.kind = Some(kind);
137 self
138 }
139
140 pub fn payload(mut self, payload: Vec<u8>) -> Self {
141 self.payload = payload;
142 self
143 }
144
145 pub fn stream_id(mut self, stream_id: u16) -> Self {
146 self.stream_id = stream_id;
147 self
148 }
149
150 /// Replace the flag set wholesale. Most callers should prefer
151 /// [`Self::more_frames`] / [`Self::compress`] over poking flags
152 /// directly — this exists for the Cancel / Compress / Notice
153 /// control frames that carry caller-defined bits.
154 pub fn flags(mut self, flags: Flags) -> Self {
155 self.flags = flags;
156 self
157 }
158
159 /// Mark this frame as part of a multi-frame reply. Pass `false`
160 /// (the default) on the *last* frame of the burst — the
161 /// `MORE_FRAMES` last-frame invariant is enforced at build()
162 /// time by the flag bit.
163 pub fn more_frames(mut self, more: bool) -> Self {
164 self.more_frames = more;
165 self
166 }
167
168 /// Request that the encoder zstd-compress the payload. The
169 /// codec falls back to plaintext + cleared flag if the payload
170 /// is incompressible (see [`Self::build`]).
171 pub fn compress(mut self, yes: bool) -> Self {
172 self.compress = if yes { Compress::Yes } else { Compress::No };
173 self
174 }
175
176 /// Finalize the frame.
177 ///
178 /// Enforces:
179 /// * `kind()` was set (otherwise [`BuildError::KindMissing`]).
180 /// * Plaintext encoded size <= [`MAX_FRAME_SIZE`] (otherwise
181 /// [`BuildError::PayloadTooLarge`]) — checked against the
182 /// plaintext payload, since the wire form after compression
183 /// can only shrink.
184 /// * `MORE_FRAMES` flag mirrors the `more_frames(bool)` call.
185 /// * `COMPRESSED` flag is set only when compression was
186 /// requested *and* the payload looks compressible. A trivial
187 /// incompressibility heuristic ("the payload is empty or
188 /// too short for zstd to reduce") drops the flag here so
189 /// the encoded bytes match the flag.
190 pub fn build(self) -> Result<Frame, BuildError> {
191 let kind = self.kind.ok_or(BuildError::KindMissing)?;
192 let encoded_len = FRAME_HEADER_SIZE + self.payload.len();
193 if encoded_len > MAX_FRAME_SIZE as usize {
194 return Err(BuildError::PayloadTooLarge {
195 encoded_len,
196 max: MAX_FRAME_SIZE,
197 });
198 }
199
200 let mut flags = self.flags;
201 if self.more_frames {
202 flags = flags.insert(Flags::MORE_FRAMES);
203 } else {
204 // Clear MORE_FRAMES on the last frame of a burst. Stays
205 // a no-op when the caller never set it.
206 flags = Flags::from_bits(flags.bits() & !Flags::MORE_FRAMES.bits());
207 }
208
209 let compressed = match self.compress {
210 Compress::No => false,
211 Compress::Yes => is_payload_compressible(&self.payload),
212 };
213 if compressed {
214 flags = flags.insert(Flags::COMPRESSED);
215 } else {
216 flags = Flags::from_bits(flags.bits() & !Flags::COMPRESSED.bits());
217 }
218
219 // Catalog cross-check: the chosen kind's allowed_flags() is
220 // the producer-side mirror of the codec's decode-time check.
221 // Catches misframed builds (e.g. COMPRESSED on Hello) before
222 // they reach the wire.
223 if !kind.permits_flags(flags) {
224 return Err(BuildError::FlagsNotAllowedForKind {
225 kind,
226 flags: flags.bits(),
227 });
228 }
229
230 Ok(Frame {
231 kind,
232 flags,
233 stream_id: self.stream_id,
234 correlation_id: self.correlation_id,
235 payload: self.payload,
236 })
237 }
238}
239
240/// Cheap heuristic: zstd's frame header is ~12 bytes, so a payload
241/// has to clear that bar to even potentially shrink. The codec also
242/// catches truly pathological cases at encode time and falls back to
243/// plaintext, but this lets the builder report the cleared flag
244/// before encoding.
245fn is_payload_compressible(payload: &[u8]) -> bool {
246 payload.len() > 32
247}
248
249#[cfg(test)]
250mod tests {
251 use super::*;
252
253 #[test]
254 fn reply_to_propagates_correlation_id() {
255 // Mirrors the dispatch site: every response frame must echo
256 // the request's correlation id so the client pairs them up.
257 let frame = FrameBuilder::reply_to(0xABCD)
258 .kind(MessageKind::Result)
259 .payload(b"ok".to_vec())
260 .build()
261 .expect("build");
262 assert_eq!(frame.correlation_id, 0xABCD);
263 assert_eq!(frame.kind, MessageKind::Result);
264 assert_eq!(frame.payload, b"ok");
265 }
266
267 #[test]
268 fn unsolicited_uses_zero_correlation() {
269 let frame = FrameBuilder::unsolicited()
270 .kind(MessageKind::Notice)
271 .payload(b"server-side notice".to_vec())
272 .build()
273 .expect("build");
274 assert_eq!(frame.correlation_id, 0);
275 }
276
277 #[test]
278 fn missing_kind_rejected() {
279 let err = FrameBuilder::reply_to(1).build().unwrap_err();
280 assert_eq!(err, BuildError::KindMissing);
281 }
282
283 #[test]
284 fn more_frames_last_frame_clears_the_flag() {
285 // The MORE_FRAMES last-frame invariant: only intermediate
286 // frames in a burst carry the flag. The last frame must
287 // clear it, otherwise the client keeps waiting for more.
288 let middle = FrameBuilder::reply_to(7)
289 .kind(MessageKind::Result)
290 .payload(vec![0; 8])
291 .more_frames(true)
292 .build()
293 .expect("build middle");
294 assert!(
295 middle.flags.contains(Flags::MORE_FRAMES),
296 "middle frame must carry MORE_FRAMES"
297 );
298
299 let last = FrameBuilder::reply_to(7)
300 .kind(MessageKind::Result)
301 .payload(vec![0; 8])
302 .more_frames(false)
303 .build()
304 .expect("build last");
305 assert!(
306 !last.flags.contains(Flags::MORE_FRAMES),
307 "last frame must clear MORE_FRAMES"
308 );
309 }
310
311 #[test]
312 fn more_frames_default_is_last_frame() {
313 // A single-frame reply (the common case) is implicitly the
314 // last frame — callers shouldn't have to remember to clear
315 // the flag.
316 let frame = FrameBuilder::reply_to(1)
317 .kind(MessageKind::Pong)
318 .build()
319 .expect("build");
320 assert!(!frame.flags.contains(Flags::MORE_FRAMES));
321 }
322
323 #[test]
324 fn payload_at_max_size_accepted() {
325 let payload = vec![0u8; (MAX_FRAME_SIZE as usize) - FRAME_HEADER_SIZE];
326 let frame = FrameBuilder::reply_to(1)
327 .kind(MessageKind::Result)
328 .payload(payload)
329 .build()
330 .expect("build at limit");
331 assert_eq!(frame.encoded_len(), MAX_FRAME_SIZE);
332 }
333
334 #[test]
335 fn payload_over_max_size_rejected() {
336 // MAX_FRAME_SIZE is the on-wire upper bound. The builder
337 // refuses oversize plaintext rather than letting the encoder
338 // produce a frame the peer will reject anyway.
339 let oversize = (MAX_FRAME_SIZE as usize) - FRAME_HEADER_SIZE + 1;
340 let payload = vec![0u8; oversize];
341 let err = FrameBuilder::reply_to(1)
342 .kind(MessageKind::Result)
343 .payload(payload)
344 .build()
345 .unwrap_err();
346 match err {
347 BuildError::PayloadTooLarge { encoded_len, max } => {
348 assert_eq!(max, MAX_FRAME_SIZE);
349 assert_eq!(encoded_len, MAX_FRAME_SIZE as usize + 1);
350 }
351 other => panic!("expected PayloadTooLarge, got {other:?}"),
352 }
353 }
354
355 #[test]
356 fn compression_fallback_drops_flag_for_incompressible_payload() {
357 // Compression was requested, but the payload is too short
358 // for zstd to actually reduce. The builder drops the flag
359 // so the encoded bytes match — otherwise the wire form
360 // claims COMPRESSED but the body is plaintext.
361 let frame = FrameBuilder::reply_to(1)
362 .kind(MessageKind::Result)
363 .payload(b"tiny".to_vec())
364 .compress(true)
365 .build()
366 .expect("build");
367 assert!(
368 !frame.flags.contains(Flags::COMPRESSED),
369 "incompressible payload must not carry COMPRESSED"
370 );
371 }
372
373 #[test]
374 fn compression_kept_for_compressible_payload() {
375 let payload = b"abcabcabc".repeat(16);
376 let frame = FrameBuilder::reply_to(1)
377 .kind(MessageKind::Result)
378 .payload(payload)
379 .compress(true)
380 .build()
381 .expect("build");
382 assert!(frame.flags.contains(Flags::COMPRESSED));
383 }
384
385 #[test]
386 fn flags_not_allowed_for_kind_rejected_at_build() {
387 // Hello is a handshake kind — the catalog forbids COMPRESSED.
388 // The builder mirrors the codec's decode-time check so a
389 // misframed build fails before the bytes ever hit the wire.
390 let err = FrameBuilder::reply_to(1)
391 .kind(MessageKind::Hello)
392 .flags(Flags::COMPRESSED)
393 .build()
394 .unwrap_err();
395 match err {
396 BuildError::FlagsNotAllowedForKind { kind, flags } => {
397 assert_eq!(kind, MessageKind::Hello);
398 assert_eq!(flags, Flags::COMPRESSED.bits());
399 }
400 other => panic!("expected FlagsNotAllowedForKind, got {other:?}"),
401 }
402 }
403
404 #[test]
405 fn stream_id_propagates() {
406 let frame = FrameBuilder::reply_to(1)
407 .kind(MessageKind::Result)
408 .stream_id(0xBEEF)
409 .build()
410 .expect("build");
411 assert_eq!(frame.stream_id, 0xBEEF);
412 }
413}