Skip to main content

reddb_wire/redwire/
builder.rs

1//! Server-side frame construction discipline.
2//!
3//! Every server-emitted frame today is built by stitching together
4//! [`Frame::new`] + [`Frame::with_stream`] + [`Frame::with_flags`] at
5//! the call site. That spreads four invariants across every dispatch
6//! path:
7//!
8//!   1. Correlation-id propagation — each response must echo the
9//!      request frame's id (or `0` for unsolicited frames).
10//!   2. `MORE_FRAMES` sequencing — only the *last* frame of a multi-
11//!      frame reply may clear the flag.
12//!   3. `MAX_FRAME_SIZE` enforcement — the codec checks on decode but
13//!      a producer happily encodes oversized frames the peer will
14//!      reject anyway.
15//!   4. Compression policy — callers either opt in by setting
16//!      `Flags::COMPRESSED` or do not, and the codec silently falls
17//!      back to plaintext on incompressible input.
18//!
19//! [`FrameBuilder`] owns those invariants. The acceptance test for
20//! this module is the deletion test: deleting `builder.rs` forces
21//! frame-construction discipline back to inline `Frame::new` calls
22//! at every dispatch site.
23//!
24//! ```ignore
25//! use reddb_wire::redwire::{FrameBuilder, MessageKind};
26//!
27//! let frame = FrameBuilder::reply_to(request.correlation_id)
28//!     .kind(MessageKind::Result)
29//!     .payload(body)
30//!     .stream_id(42)
31//!     .more_frames(false)
32//!     .build()?;
33//! ```
34//!
35//! The builder is engine-free — it only depends on [`Frame`],
36//! [`MessageKind`], [`Flags`], and the size constants from this
37//! crate. Server dispatch (auth, session, listener) constructs
38//! frames through the builder; the codec stays focused on bytes.
39
40use super::frame::{Flags, Frame, MessageKind, FRAME_HEADER_SIZE, MAX_FRAME_SIZE};
41
42/// Errors surfaced at `build()` time so they fail at construction
43/// rather than at encode time, where the call site has already lost
44/// the context to recover.
45///
46/// Distinct from [`crate::redwire::codec::FrameError`], which covers
47/// decode-side framing errors. The split keeps producer-side
48/// invariants (this) separate from consumer-side parsing (that).
49#[derive(Debug, Clone, PartialEq, Eq)]
50pub enum BuildError {
51    /// `kind()` was never called — a frame has no default kind so
52    /// the builder refuses to guess.
53    KindMissing,
54    /// The encoded frame would exceed [`MAX_FRAME_SIZE`].
55    PayloadTooLarge { encoded_len: usize, max: u32 },
56    /// Catalog cross-check failed: the requested flag set isn't in
57    /// `MessageKind::allowed_flags()` for the chosen kind.
58    FlagsNotAllowedForKind { kind: MessageKind, flags: u8 },
59}
60
61impl std::fmt::Display for BuildError {
62    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
63        match self {
64            Self::KindMissing => write!(f, "FrameBuilder: kind() must be called before build()"),
65            Self::PayloadTooLarge { encoded_len, max } => write!(
66                f,
67                "FrameBuilder: encoded frame size {encoded_len} exceeds MAX_FRAME_SIZE ({max})"
68            ),
69            Self::FlagsNotAllowedForKind { kind, flags } => write!(
70                f,
71                "FrameBuilder: flag bits 0x{flags:02x} not allowed on kind {kind:?}"
72            ),
73        }
74    }
75}
76
77impl std::error::Error for BuildError {}
78
79/// Compression intent recorded by `compress(true|false)`. The
80/// builder defers the actual zstd call to the codec but tracks
81/// whether the caller asked for compression so it can drop the
82/// `COMPRESSED` flag if the payload is provably incompressible
83/// (see [`FrameBuilder::build`]).
84#[derive(Debug, Clone, Copy, PartialEq, Eq)]
85enum Compress {
86    No,
87    Yes,
88}
89
90/// Builder for RedWire [`Frame`]s.
91///
92/// Construct with [`FrameBuilder::request`] for client requests,
93/// [`FrameBuilder::reply_to`] for response frames (carries the
94/// request's correlation id), or [`FrameBuilder::unsolicited`] for
95/// server-initiated frames (correlation id `0`). All other fields are
96/// optional.
97#[derive(Debug, Clone)]
98pub struct FrameBuilder {
99    kind: Option<MessageKind>,
100    correlation_id: u64,
101    stream_id: u16,
102    payload: Vec<u8>,
103    flags: Flags,
104    compress: Compress,
105    /// `true` when the caller has *not* yet declared this is the
106    /// last frame of a multi-frame reply — i.e. `MORE_FRAMES` is
107    /// set. Defaults to `false` (single-frame reply).
108    more_frames: bool,
109}
110
111impl FrameBuilder {
112    /// Client request frame. Carries the caller's new correlation id
113    /// so the server can echo it in its reply.
114    pub fn request(correlation_id: u64) -> Self {
115        Self::with_correlation(correlation_id)
116    }
117
118    /// Reply to a request frame. Echoes the caller's correlation id
119    /// so the client can pair the response with the request.
120    pub fn reply_to(correlation_id: u64) -> Self {
121        Self::with_correlation(correlation_id)
122    }
123
124    /// Server-initiated frame with no request to echo (correlation
125    /// id `0`). Used for notices, unsolicited Bye, etc.
126    pub fn unsolicited() -> Self {
127        Self::with_correlation(0)
128    }
129
130    fn with_correlation(correlation_id: u64) -> Self {
131        Self {
132            kind: None,
133            correlation_id,
134            stream_id: 0,
135            payload: Vec::new(),
136            flags: Flags::empty(),
137            compress: Compress::No,
138            more_frames: false,
139        }
140    }
141
142    pub fn kind(mut self, kind: MessageKind) -> Self {
143        self.kind = Some(kind);
144        self
145    }
146
147    pub fn payload(mut self, payload: Vec<u8>) -> Self {
148        self.payload = payload;
149        self
150    }
151
152    pub fn stream_id(mut self, stream_id: u16) -> Self {
153        self.stream_id = stream_id;
154        self
155    }
156
157    /// Replace the flag set wholesale. Most callers should prefer
158    /// [`Self::more_frames`] / [`Self::compress`] over poking flags
159    /// directly — this exists for the Cancel / Compress / Notice
160    /// control frames that carry caller-defined bits.
161    pub fn flags(mut self, flags: Flags) -> Self {
162        self.flags = flags;
163        self
164    }
165
166    /// Mark this frame as part of a multi-frame reply. Pass `false`
167    /// (the default) on the *last* frame of the burst — the
168    /// `MORE_FRAMES` last-frame invariant is enforced at build()
169    /// time by the flag bit.
170    pub fn more_frames(mut self, more: bool) -> Self {
171        self.more_frames = more;
172        self
173    }
174
175    /// Request that the encoder zstd-compress the payload. The
176    /// codec falls back to plaintext + cleared flag if the payload
177    /// is incompressible (see [`Self::build`]).
178    pub fn compress(mut self, yes: bool) -> Self {
179        self.compress = if yes { Compress::Yes } else { Compress::No };
180        self
181    }
182
183    /// Finalize the frame.
184    ///
185    /// Enforces:
186    ///   * `kind()` was set (otherwise [`BuildError::KindMissing`]).
187    ///   * Plaintext encoded size <= [`MAX_FRAME_SIZE`] (otherwise
188    ///     [`BuildError::PayloadTooLarge`]) — checked against the
189    ///     plaintext payload, since the wire form after compression
190    ///     can only shrink.
191    ///   * `MORE_FRAMES` flag mirrors the `more_frames(bool)` call.
192    ///   * `COMPRESSED` flag is set only when compression was
193    ///     requested *and* the payload looks compressible. A trivial
194    ///     incompressibility heuristic ("the payload is empty or
195    ///     too short for zstd to reduce") drops the flag here so
196    ///     the encoded bytes match the flag.
197    pub fn build(self) -> Result<Frame, BuildError> {
198        let kind = self.kind.ok_or(BuildError::KindMissing)?;
199        let encoded_len = FRAME_HEADER_SIZE + self.payload.len();
200        if encoded_len > MAX_FRAME_SIZE as usize {
201            return Err(BuildError::PayloadTooLarge {
202                encoded_len,
203                max: MAX_FRAME_SIZE,
204            });
205        }
206
207        if !kind.permits_flags(self.flags) {
208            return Err(BuildError::FlagsNotAllowedForKind {
209                kind,
210                flags: self.flags.bits(),
211            });
212        }
213
214        let mut flags = self.flags;
215        if self.more_frames {
216            flags = flags.insert(Flags::MORE_FRAMES);
217        } else {
218            // Clear MORE_FRAMES on the last frame of a burst. Stays
219            // a no-op when the caller never set it.
220            flags = Flags::from_bits(flags.bits() & !Flags::MORE_FRAMES.bits());
221        }
222
223        let compressed = match self.compress {
224            Compress::No => false,
225            Compress::Yes => is_payload_compressible(&self.payload),
226        };
227        if compressed {
228            flags = flags.insert(Flags::COMPRESSED);
229        } else {
230            flags = Flags::from_bits(flags.bits() & !Flags::COMPRESSED.bits());
231        }
232
233        // Catalog cross-check: the chosen kind's allowed_flags() is
234        // the producer-side mirror of the codec's decode-time check.
235        // Catches misframed builds (e.g. COMPRESSED on Hello) before
236        // they reach the wire.
237        if !kind.permits_flags(flags) {
238            return Err(BuildError::FlagsNotAllowedForKind {
239                kind,
240                flags: flags.bits(),
241            });
242        }
243
244        Ok(Frame {
245            kind,
246            flags,
247            stream_id: self.stream_id,
248            correlation_id: self.correlation_id,
249            payload: self.payload,
250        })
251    }
252}
253
254pub fn build_reply_frame(
255    correlation_id: u64,
256    kind: MessageKind,
257    payload: Vec<u8>,
258) -> Result<Frame, BuildError> {
259    FrameBuilder::reply_to(correlation_id)
260        .kind(kind)
261        .payload(payload)
262        .build()
263}
264
265pub fn build_error_frame(correlation_id: u64, message: &str) -> Result<Frame, BuildError> {
266    build_reply_frame(
267        correlation_id,
268        MessageKind::Error,
269        message.as_bytes().to_vec(),
270    )
271}
272
273pub fn build_error_frame_lossy(correlation_id: u64, message: &str) -> Frame {
274    build_error_frame(correlation_id, message).unwrap_or_else(|_| {
275        Frame::new(
276            MessageKind::Error,
277            correlation_id,
278            b"redwire error frame too large".to_vec(),
279        )
280    })
281}
282
283pub fn build_dispatch_reply_frame(
284    correlation_id: u64,
285    kind: MessageKind,
286    payload: Vec<u8>,
287) -> Frame {
288    build_reply_frame(correlation_id, kind, payload)
289        .unwrap_or_else(|err| build_error_frame_lossy(correlation_id, &err.to_string()))
290}
291
292/// Adapt an older 5-byte handler envelope into a RedWire reply frame.
293///
294/// Some in-process fast paths return `[u32 length][u8 kind][body...]`.
295/// RedWire uses the same message-kind discriminator for `kind`, but owns the
296/// outer frame header. This helper keeps that compatibility bridge in the wire
297/// crate instead of letting runtime code parse kind bytes directly.
298pub fn rewrap_length_prefixed_handler_response(raw_bytes: &[u8], correlation_id: u64) -> Frame {
299    if raw_bytes.len() < 5 {
300        return build_error_frame_lossy(
301            correlation_id,
302            "fast-path handler returned a truncated frame",
303        );
304    }
305    let kind = MessageKind::from_u8(raw_bytes[4]).unwrap_or(MessageKind::Error);
306    build_dispatch_reply_frame(correlation_id, kind, raw_bytes[5..].to_vec())
307}
308
309pub fn build_query_frame(correlation_id: u64, sql: &str) -> Result<Frame, BuildError> {
310    build_request_frame(correlation_id, MessageKind::Query, sql.as_bytes().to_vec())
311}
312
313pub fn build_query_with_params_frame(
314    correlation_id: u64,
315    payload: Vec<u8>,
316) -> Result<Frame, BuildError> {
317    build_request_frame(correlation_id, MessageKind::QueryWithParams, payload)
318}
319
320pub fn build_bulk_insert_frame(correlation_id: u64, payload: Vec<u8>) -> Result<Frame, BuildError> {
321    build_request_frame(correlation_id, MessageKind::BulkInsert, payload)
322}
323
324pub fn build_get_frame(correlation_id: u64, payload: Vec<u8>) -> Result<Frame, BuildError> {
325    build_request_frame(correlation_id, MessageKind::Get, payload)
326}
327
328pub fn build_delete_frame(correlation_id: u64, payload: Vec<u8>) -> Result<Frame, BuildError> {
329    build_request_frame(correlation_id, MessageKind::Delete, payload)
330}
331
332pub fn build_bulk_insert_binary_frame(
333    correlation_id: u64,
334    payload: Vec<u8>,
335) -> Result<Frame, BuildError> {
336    build_request_frame(correlation_id, MessageKind::BulkInsertBinary, payload)
337}
338
339pub fn build_ping_frame(correlation_id: u64) -> Result<Frame, BuildError> {
340    build_request_frame(correlation_id, MessageKind::Ping, Vec::new())
341}
342
343pub fn build_bye_frame(correlation_id: u64) -> Result<Frame, BuildError> {
344    build_request_frame(correlation_id, MessageKind::Bye, Vec::new())
345}
346
347pub fn build_request_frame(
348    correlation_id: u64,
349    kind: MessageKind,
350    payload: Vec<u8>,
351) -> Result<Frame, BuildError> {
352    FrameBuilder::request(correlation_id)
353        .kind(kind)
354        .payload(payload)
355        .build()
356}
357
358/// Cheap heuristic: zstd's frame header is ~12 bytes, so a payload
359/// has to clear that bar to even potentially shrink. The codec also
360/// catches truly pathological cases at encode time and falls back to
361/// plaintext, but this lets the builder report the cleared flag
362/// before encoding.
363fn is_payload_compressible(payload: &[u8]) -> bool {
364    payload.len() > 32
365}
366
367#[cfg(test)]
368mod tests {
369    use super::*;
370
371    #[test]
372    fn reply_to_propagates_correlation_id() {
373        // Mirrors the dispatch site: every response frame must echo
374        // the request's correlation id so the client pairs them up.
375        let frame = FrameBuilder::reply_to(0xABCD)
376            .kind(MessageKind::Result)
377            .payload(b"ok".to_vec())
378            .build()
379            .expect("build");
380        assert_eq!(frame.correlation_id, 0xABCD);
381        assert_eq!(frame.kind, MessageKind::Result);
382        assert_eq!(frame.payload, b"ok");
383    }
384
385    #[test]
386    fn request_builders_choose_client_message_kinds() {
387        let query = build_query_frame(1, "select 1").expect("query");
388        assert_eq!(query.kind, MessageKind::Query);
389        assert_eq!(query.correlation_id, 1);
390        assert_eq!(query.payload, b"select 1");
391
392        let ping = build_ping_frame(2).expect("ping");
393        assert_eq!(ping.kind, MessageKind::Ping);
394        assert!(ping.payload.is_empty());
395
396        let bye = build_bye_frame(3).expect("bye");
397        assert_eq!(bye.kind, MessageKind::Bye);
398        assert!(bye.payload.is_empty());
399    }
400
401    #[test]
402    fn unsolicited_uses_zero_correlation() {
403        let frame = FrameBuilder::unsolicited()
404            .kind(MessageKind::Notice)
405            .payload(b"server-side notice".to_vec())
406            .build()
407            .expect("build");
408        assert_eq!(frame.correlation_id, 0);
409    }
410
411    #[test]
412    fn missing_kind_rejected() {
413        let err = FrameBuilder::reply_to(1).build().unwrap_err();
414        assert_eq!(err, BuildError::KindMissing);
415    }
416
417    #[test]
418    fn more_frames_last_frame_clears_the_flag() {
419        // The MORE_FRAMES last-frame invariant: only intermediate
420        // frames in a burst carry the flag. The last frame must
421        // clear it, otherwise the client keeps waiting for more.
422        let middle = FrameBuilder::reply_to(7)
423            .kind(MessageKind::Result)
424            .payload(vec![0; 8])
425            .more_frames(true)
426            .build()
427            .expect("build middle");
428        assert!(
429            middle.flags.contains(Flags::MORE_FRAMES),
430            "middle frame must carry MORE_FRAMES"
431        );
432
433        let last = FrameBuilder::reply_to(7)
434            .kind(MessageKind::Result)
435            .payload(vec![0; 8])
436            .more_frames(false)
437            .build()
438            .expect("build last");
439        assert!(
440            !last.flags.contains(Flags::MORE_FRAMES),
441            "last frame must clear MORE_FRAMES"
442        );
443    }
444
445    #[test]
446    fn more_frames_default_is_last_frame() {
447        // A single-frame reply (the common case) is implicitly the
448        // last frame — callers shouldn't have to remember to clear
449        // the flag.
450        let frame = FrameBuilder::reply_to(1)
451            .kind(MessageKind::Pong)
452            .build()
453            .expect("build");
454        assert!(!frame.flags.contains(Flags::MORE_FRAMES));
455    }
456
457    #[test]
458    fn payload_at_max_size_accepted() {
459        let payload = vec![0u8; (MAX_FRAME_SIZE as usize) - FRAME_HEADER_SIZE];
460        let frame = FrameBuilder::reply_to(1)
461            .kind(MessageKind::Result)
462            .payload(payload)
463            .build()
464            .expect("build at limit");
465        assert_eq!(frame.encoded_len(), MAX_FRAME_SIZE);
466    }
467
468    #[test]
469    fn payload_over_max_size_rejected() {
470        // MAX_FRAME_SIZE is the on-wire upper bound. The builder
471        // refuses oversize plaintext rather than letting the encoder
472        // produce a frame the peer will reject anyway.
473        let oversize = (MAX_FRAME_SIZE as usize) - FRAME_HEADER_SIZE + 1;
474        let payload = vec![0u8; oversize];
475        let err = FrameBuilder::reply_to(1)
476            .kind(MessageKind::Result)
477            .payload(payload)
478            .build()
479            .unwrap_err();
480        match err {
481            BuildError::PayloadTooLarge { encoded_len, max } => {
482                assert_eq!(max, MAX_FRAME_SIZE);
483                assert_eq!(encoded_len, MAX_FRAME_SIZE as usize + 1);
484            }
485            other => panic!("expected PayloadTooLarge, got {other:?}"),
486        }
487    }
488
489    #[test]
490    fn compression_fallback_drops_flag_for_incompressible_payload() {
491        // Compression was requested, but the payload is too short
492        // for zstd to actually reduce. The builder drops the flag
493        // so the encoded bytes match — otherwise the wire form
494        // claims COMPRESSED but the body is plaintext.
495        let frame = FrameBuilder::reply_to(1)
496            .kind(MessageKind::Result)
497            .payload(b"tiny".to_vec())
498            .compress(true)
499            .build()
500            .expect("build");
501        assert!(
502            !frame.flags.contains(Flags::COMPRESSED),
503            "incompressible payload must not carry COMPRESSED"
504        );
505    }
506
507    #[test]
508    fn compression_kept_for_compressible_payload() {
509        let payload = b"abcabcabc".repeat(16);
510        let frame = FrameBuilder::reply_to(1)
511            .kind(MessageKind::Result)
512            .payload(payload)
513            .compress(true)
514            .build()
515            .expect("build");
516        assert!(frame.flags.contains(Flags::COMPRESSED));
517    }
518
519    #[test]
520    fn flags_not_allowed_for_kind_rejected_at_build() {
521        // Hello is a handshake kind — the catalog forbids COMPRESSED.
522        // The builder mirrors the codec's decode-time check so a
523        // misframed build fails before the bytes ever hit the wire.
524        let err = FrameBuilder::reply_to(1)
525            .kind(MessageKind::Hello)
526            .flags(Flags::COMPRESSED)
527            .build()
528            .unwrap_err();
529        match err {
530            BuildError::FlagsNotAllowedForKind { kind, flags } => {
531                assert_eq!(kind, MessageKind::Hello);
532                assert_eq!(flags, Flags::COMPRESSED.bits());
533            }
534            other => panic!("expected FlagsNotAllowedForKind, got {other:?}"),
535        }
536    }
537
538    #[test]
539    fn stream_id_propagates() {
540        let frame = FrameBuilder::reply_to(1)
541            .kind(MessageKind::Result)
542            .stream_id(0xBEEF)
543            .build()
544            .expect("build");
545        assert_eq!(frame.stream_id, 0xBEEF);
546    }
547
548    #[test]
549    fn generic_reply_builders_pin_server_frame_contracts() {
550        let reply = build_reply_frame(7, MessageKind::Pong, b"ok".to_vec()).expect("reply frame");
551        assert_eq!(reply.correlation_id, 7);
552        assert_eq!(reply.kind, MessageKind::Pong);
553        assert_eq!(reply.payload, b"ok");
554
555        let err = build_error_frame(8, "bad request").expect("error frame");
556        assert_eq!(err.kind, MessageKind::Error);
557        assert_eq!(err.correlation_id, 8);
558        assert_eq!(err.payload, b"bad request");
559
560        let dispatch = build_dispatch_reply_frame(9, MessageKind::Result, b"rows".to_vec());
561        assert_eq!(dispatch.kind, MessageKind::Result);
562        assert_eq!(dispatch.correlation_id, 9);
563    }
564
565    #[test]
566    fn rewraps_length_prefixed_handler_response() {
567        let raw = [4u8, 0, 0, 0, MessageKind::BulkOk as u8, b'o', b'k'];
568        let frame = rewrap_length_prefixed_handler_response(&raw, 12);
569        assert_eq!(frame.correlation_id, 12);
570        assert_eq!(frame.kind, MessageKind::BulkOk);
571        assert_eq!(frame.payload, b"ok");
572
573        let truncated = rewrap_length_prefixed_handler_response(&raw[..4], 13);
574        assert_eq!(truncated.kind, MessageKind::Error);
575        assert_eq!(truncated.correlation_id, 13);
576        assert_eq!(
577            truncated.payload,
578            b"fast-path handler returned a truncated frame"
579        );
580    }
581
582    #[test]
583    fn lossy_error_builder_never_panics_on_oversized_payload() {
584        let too_large = "x".repeat(MAX_FRAME_SIZE as usize);
585        let err = build_error_frame_lossy(11, &too_large);
586        assert_eq!(err.kind, MessageKind::Error);
587        assert_eq!(err.correlation_id, 11);
588        assert_eq!(err.payload, b"redwire error frame too large");
589    }
590}