Skip to main content

reddb_wire/redwire/
builder.rs

1//! Server-side frame construction discipline.
2//!
3//! Every server-emitted frame today is built by stitching together
4//! [`Frame::new`] + [`Frame::with_stream`] + [`Frame::with_flags`] at
5//! the call site. That spreads four invariants across every dispatch
6//! path:
7//!
8//!   1. Correlation-id propagation — each response must echo the
9//!      request frame's id (or `0` for unsolicited frames).
10//!   2. `MORE_FRAMES` sequencing — only the *last* frame of a multi-
11//!      frame reply may clear the flag.
12//!   3. `MAX_FRAME_SIZE` enforcement — the codec checks on decode but
13//!      a producer happily encodes oversized frames the peer will
14//!      reject anyway.
15//!   4. Compression policy — callers either opt in by setting
16//!      `Flags::COMPRESSED` or do not, and the codec silently falls
17//!      back to plaintext on incompressible input.
18//!
19//! [`FrameBuilder`] owns those invariants. The acceptance test for
20//! this module is the deletion test: deleting `builder.rs` forces
21//! frame-construction discipline back to inline `Frame::new` calls
22//! at every dispatch site.
23//!
24//! ```ignore
25//! use reddb_wire::redwire::{FrameBuilder, MessageKind};
26//!
27//! let frame = FrameBuilder::reply_to(request.correlation_id)
28//!     .kind(MessageKind::Result)
29//!     .payload(body)
30//!     .stream_id(42)
31//!     .more_frames(false)
32//!     .build()?;
33//! ```
34//!
35//! The builder is engine-free — it only depends on [`Frame`],
36//! [`MessageKind`], [`Flags`], and the size constants from this
37//! crate. Server dispatch (auth, session, listener) constructs
38//! frames through the builder; the codec stays focused on bytes.
39
40use super::frame::{Flags, Frame, MessageKind, FRAME_HEADER_SIZE, MAX_FRAME_SIZE};
41
42/// Errors surfaced at `build()` time so they fail at construction
43/// rather than at encode time, where the call site has already lost
44/// the context to recover.
45///
46/// Distinct from [`crate::redwire::codec::FrameError`], which covers
47/// decode-side framing errors. The split keeps producer-side
48/// invariants (this) separate from consumer-side parsing (that).
49#[derive(Debug, Clone, PartialEq, Eq)]
50pub enum BuildError {
51    /// `kind()` was never called — a frame has no default kind so
52    /// the builder refuses to guess.
53    KindMissing,
54    /// The encoded frame would exceed [`MAX_FRAME_SIZE`].
55    PayloadTooLarge { encoded_len: usize, max: u32 },
56    /// Catalog cross-check failed: the requested flag set isn't in
57    /// `MessageKind::allowed_flags()` for the chosen kind.
58    FlagsNotAllowedForKind { kind: MessageKind, flags: u8 },
59}
60
61impl std::fmt::Display for BuildError {
62    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
63        match self {
64            Self::KindMissing => write!(f, "FrameBuilder: kind() must be called before build()"),
65            Self::PayloadTooLarge { encoded_len, max } => write!(
66                f,
67                "FrameBuilder: encoded frame size {encoded_len} exceeds MAX_FRAME_SIZE ({max})"
68            ),
69            Self::FlagsNotAllowedForKind { kind, flags } => write!(
70                f,
71                "FrameBuilder: flag bits 0x{flags:02x} not allowed on kind {kind:?}"
72            ),
73        }
74    }
75}
76
77impl std::error::Error for BuildError {}
78
79/// Compression intent recorded by `compress(true|false)`. The
80/// builder defers the actual zstd call to the codec but tracks
81/// whether the caller asked for compression so it can drop the
82/// `COMPRESSED` flag if the payload is provably incompressible
83/// (see [`FrameBuilder::build`]).
84#[derive(Debug, Clone, Copy, PartialEq, Eq)]
85enum Compress {
86    No,
87    Yes,
88}
89
90/// Builder for server-emitted [`Frame`]s.
91///
92/// Construct with [`FrameBuilder::reply_to`] for response frames
93/// (carries the request's correlation id) or
94/// [`FrameBuilder::unsolicited`] for server-initiated frames
95/// (correlation id `0`). All other fields are optional.
96#[derive(Debug, Clone)]
97pub struct FrameBuilder {
98    kind: Option<MessageKind>,
99    correlation_id: u64,
100    stream_id: u16,
101    payload: Vec<u8>,
102    flags: Flags,
103    compress: Compress,
104    /// `true` when the caller has *not* yet declared this is the
105    /// last frame of a multi-frame reply — i.e. `MORE_FRAMES` is
106    /// set. Defaults to `false` (single-frame reply).
107    more_frames: bool,
108}
109
110impl FrameBuilder {
111    /// Reply to a request frame. Echoes the caller's correlation id
112    /// so the client can pair the response with the request.
113    pub fn reply_to(correlation_id: u64) -> Self {
114        Self::with_correlation(correlation_id)
115    }
116
117    /// Server-initiated frame with no request to echo (correlation
118    /// id `0`). Used for notices, unsolicited Bye, etc.
119    pub fn unsolicited() -> Self {
120        Self::with_correlation(0)
121    }
122
123    fn with_correlation(correlation_id: u64) -> Self {
124        Self {
125            kind: None,
126            correlation_id,
127            stream_id: 0,
128            payload: Vec::new(),
129            flags: Flags::empty(),
130            compress: Compress::No,
131            more_frames: false,
132        }
133    }
134
135    pub fn kind(mut self, kind: MessageKind) -> Self {
136        self.kind = Some(kind);
137        self
138    }
139
140    pub fn payload(mut self, payload: Vec<u8>) -> Self {
141        self.payload = payload;
142        self
143    }
144
145    pub fn stream_id(mut self, stream_id: u16) -> Self {
146        self.stream_id = stream_id;
147        self
148    }
149
150    /// Replace the flag set wholesale. Most callers should prefer
151    /// [`Self::more_frames`] / [`Self::compress`] over poking flags
152    /// directly — this exists for the Cancel / Compress / Notice
153    /// control frames that carry caller-defined bits.
154    pub fn flags(mut self, flags: Flags) -> Self {
155        self.flags = flags;
156        self
157    }
158
159    /// Mark this frame as part of a multi-frame reply. Pass `false`
160    /// (the default) on the *last* frame of the burst — the
161    /// `MORE_FRAMES` last-frame invariant is enforced at build()
162    /// time by the flag bit.
163    pub fn more_frames(mut self, more: bool) -> Self {
164        self.more_frames = more;
165        self
166    }
167
168    /// Request that the encoder zstd-compress the payload. The
169    /// codec falls back to plaintext + cleared flag if the payload
170    /// is incompressible (see [`Self::build`]).
171    pub fn compress(mut self, yes: bool) -> Self {
172        self.compress = if yes { Compress::Yes } else { Compress::No };
173        self
174    }
175
176    /// Finalize the frame.
177    ///
178    /// Enforces:
179    ///   * `kind()` was set (otherwise [`BuildError::KindMissing`]).
180    ///   * Plaintext encoded size <= [`MAX_FRAME_SIZE`] (otherwise
181    ///     [`BuildError::PayloadTooLarge`]) — checked against the
182    ///     plaintext payload, since the wire form after compression
183    ///     can only shrink.
184    ///   * `MORE_FRAMES` flag mirrors the `more_frames(bool)` call.
185    ///   * `COMPRESSED` flag is set only when compression was
186    ///     requested *and* the payload looks compressible. A trivial
187    ///     incompressibility heuristic ("the payload is empty or
188    ///     too short for zstd to reduce") drops the flag here so
189    ///     the encoded bytes match the flag.
190    pub fn build(self) -> Result<Frame, BuildError> {
191        let kind = self.kind.ok_or(BuildError::KindMissing)?;
192        let encoded_len = FRAME_HEADER_SIZE + self.payload.len();
193        if encoded_len > MAX_FRAME_SIZE as usize {
194            return Err(BuildError::PayloadTooLarge {
195                encoded_len,
196                max: MAX_FRAME_SIZE,
197            });
198        }
199
200        let mut flags = self.flags;
201        if self.more_frames {
202            flags = flags.insert(Flags::MORE_FRAMES);
203        } else {
204            // Clear MORE_FRAMES on the last frame of a burst. Stays
205            // a no-op when the caller never set it.
206            flags = Flags::from_bits(flags.bits() & !Flags::MORE_FRAMES.bits());
207        }
208
209        let compressed = match self.compress {
210            Compress::No => false,
211            Compress::Yes => is_payload_compressible(&self.payload),
212        };
213        if compressed {
214            flags = flags.insert(Flags::COMPRESSED);
215        } else {
216            flags = Flags::from_bits(flags.bits() & !Flags::COMPRESSED.bits());
217        }
218
219        // Catalog cross-check: the chosen kind's allowed_flags() is
220        // the producer-side mirror of the codec's decode-time check.
221        // Catches misframed builds (e.g. COMPRESSED on Hello) before
222        // they reach the wire.
223        if !kind.permits_flags(flags) {
224            return Err(BuildError::FlagsNotAllowedForKind {
225                kind,
226                flags: flags.bits(),
227            });
228        }
229
230        Ok(Frame {
231            kind,
232            flags,
233            stream_id: self.stream_id,
234            correlation_id: self.correlation_id,
235            payload: self.payload,
236        })
237    }
238}
239
240/// Cheap heuristic: zstd's frame header is ~12 bytes, so a payload
241/// has to clear that bar to even potentially shrink. The codec also
242/// catches truly pathological cases at encode time and falls back to
243/// plaintext, but this lets the builder report the cleared flag
244/// before encoding.
245fn is_payload_compressible(payload: &[u8]) -> bool {
246    payload.len() > 32
247}
248
249#[cfg(test)]
250mod tests {
251    use super::*;
252
253    #[test]
254    fn reply_to_propagates_correlation_id() {
255        // Mirrors the dispatch site: every response frame must echo
256        // the request's correlation id so the client pairs them up.
257        let frame = FrameBuilder::reply_to(0xABCD)
258            .kind(MessageKind::Result)
259            .payload(b"ok".to_vec())
260            .build()
261            .expect("build");
262        assert_eq!(frame.correlation_id, 0xABCD);
263        assert_eq!(frame.kind, MessageKind::Result);
264        assert_eq!(frame.payload, b"ok");
265    }
266
267    #[test]
268    fn unsolicited_uses_zero_correlation() {
269        let frame = FrameBuilder::unsolicited()
270            .kind(MessageKind::Notice)
271            .payload(b"server-side notice".to_vec())
272            .build()
273            .expect("build");
274        assert_eq!(frame.correlation_id, 0);
275    }
276
277    #[test]
278    fn missing_kind_rejected() {
279        let err = FrameBuilder::reply_to(1).build().unwrap_err();
280        assert_eq!(err, BuildError::KindMissing);
281    }
282
283    #[test]
284    fn more_frames_last_frame_clears_the_flag() {
285        // The MORE_FRAMES last-frame invariant: only intermediate
286        // frames in a burst carry the flag. The last frame must
287        // clear it, otherwise the client keeps waiting for more.
288        let middle = FrameBuilder::reply_to(7)
289            .kind(MessageKind::Result)
290            .payload(vec![0; 8])
291            .more_frames(true)
292            .build()
293            .expect("build middle");
294        assert!(
295            middle.flags.contains(Flags::MORE_FRAMES),
296            "middle frame must carry MORE_FRAMES"
297        );
298
299        let last = FrameBuilder::reply_to(7)
300            .kind(MessageKind::Result)
301            .payload(vec![0; 8])
302            .more_frames(false)
303            .build()
304            .expect("build last");
305        assert!(
306            !last.flags.contains(Flags::MORE_FRAMES),
307            "last frame must clear MORE_FRAMES"
308        );
309    }
310
311    #[test]
312    fn more_frames_default_is_last_frame() {
313        // A single-frame reply (the common case) is implicitly the
314        // last frame — callers shouldn't have to remember to clear
315        // the flag.
316        let frame = FrameBuilder::reply_to(1)
317            .kind(MessageKind::Pong)
318            .build()
319            .expect("build");
320        assert!(!frame.flags.contains(Flags::MORE_FRAMES));
321    }
322
323    #[test]
324    fn payload_at_max_size_accepted() {
325        let payload = vec![0u8; (MAX_FRAME_SIZE as usize) - FRAME_HEADER_SIZE];
326        let frame = FrameBuilder::reply_to(1)
327            .kind(MessageKind::Result)
328            .payload(payload)
329            .build()
330            .expect("build at limit");
331        assert_eq!(frame.encoded_len(), MAX_FRAME_SIZE);
332    }
333
334    #[test]
335    fn payload_over_max_size_rejected() {
336        // MAX_FRAME_SIZE is the on-wire upper bound. The builder
337        // refuses oversize plaintext rather than letting the encoder
338        // produce a frame the peer will reject anyway.
339        let oversize = (MAX_FRAME_SIZE as usize) - FRAME_HEADER_SIZE + 1;
340        let payload = vec![0u8; oversize];
341        let err = FrameBuilder::reply_to(1)
342            .kind(MessageKind::Result)
343            .payload(payload)
344            .build()
345            .unwrap_err();
346        match err {
347            BuildError::PayloadTooLarge { encoded_len, max } => {
348                assert_eq!(max, MAX_FRAME_SIZE);
349                assert_eq!(encoded_len, MAX_FRAME_SIZE as usize + 1);
350            }
351            other => panic!("expected PayloadTooLarge, got {other:?}"),
352        }
353    }
354
355    #[test]
356    fn compression_fallback_drops_flag_for_incompressible_payload() {
357        // Compression was requested, but the payload is too short
358        // for zstd to actually reduce. The builder drops the flag
359        // so the encoded bytes match — otherwise the wire form
360        // claims COMPRESSED but the body is plaintext.
361        let frame = FrameBuilder::reply_to(1)
362            .kind(MessageKind::Result)
363            .payload(b"tiny".to_vec())
364            .compress(true)
365            .build()
366            .expect("build");
367        assert!(
368            !frame.flags.contains(Flags::COMPRESSED),
369            "incompressible payload must not carry COMPRESSED"
370        );
371    }
372
373    #[test]
374    fn compression_kept_for_compressible_payload() {
375        let payload = b"abcabcabc".repeat(16);
376        let frame = FrameBuilder::reply_to(1)
377            .kind(MessageKind::Result)
378            .payload(payload)
379            .compress(true)
380            .build()
381            .expect("build");
382        assert!(frame.flags.contains(Flags::COMPRESSED));
383    }
384
385    #[test]
386    fn flags_not_allowed_for_kind_rejected_at_build() {
387        // Hello is a handshake kind — the catalog forbids COMPRESSED.
388        // The builder mirrors the codec's decode-time check so a
389        // misframed build fails before the bytes ever hit the wire.
390        let err = FrameBuilder::reply_to(1)
391            .kind(MessageKind::Hello)
392            .flags(Flags::COMPRESSED)
393            .build()
394            .unwrap_err();
395        match err {
396            BuildError::FlagsNotAllowedForKind { kind, flags } => {
397                assert_eq!(kind, MessageKind::Hello);
398                assert_eq!(flags, Flags::COMPRESSED.bits());
399            }
400            other => panic!("expected FlagsNotAllowedForKind, got {other:?}"),
401        }
402    }
403
404    #[test]
405    fn stream_id_propagates() {
406        let frame = FrameBuilder::reply_to(1)
407            .kind(MessageKind::Result)
408            .stream_id(0xBEEF)
409            .build()
410            .expect("build");
411        assert_eq!(frame.stream_id, 0xBEEF);
412    }
413}