1use super::frame::{Flags, Frame, MessageKind, FRAME_HEADER_SIZE, MAX_FRAME_SIZE};
41
42#[derive(Debug, Clone, PartialEq, Eq)]
50pub enum BuildError {
51 KindMissing,
54 PayloadTooLarge { encoded_len: usize, max: u32 },
56 FlagsNotAllowedForKind { kind: MessageKind, flags: u8 },
59}
60
61impl std::fmt::Display for BuildError {
62 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
63 match self {
64 Self::KindMissing => write!(f, "FrameBuilder: kind() must be called before build()"),
65 Self::PayloadTooLarge { encoded_len, max } => write!(
66 f,
67 "FrameBuilder: encoded frame size {encoded_len} exceeds MAX_FRAME_SIZE ({max})"
68 ),
69 Self::FlagsNotAllowedForKind { kind, flags } => write!(
70 f,
71 "FrameBuilder: flag bits 0x{flags:02x} not allowed on kind {kind:?}"
72 ),
73 }
74 }
75}
76
77impl std::error::Error for BuildError {}
78
79#[derive(Debug, Clone, Copy, PartialEq, Eq)]
85enum Compress {
86 No,
87 Yes,
88}
89
90#[derive(Debug, Clone)]
98pub struct FrameBuilder {
99 kind: Option<MessageKind>,
100 correlation_id: u64,
101 stream_id: u16,
102 payload: Vec<u8>,
103 flags: Flags,
104 compress: Compress,
105 more_frames: bool,
109}
110
111impl FrameBuilder {
112 pub fn request(correlation_id: u64) -> Self {
115 Self::with_correlation(correlation_id)
116 }
117
118 pub fn reply_to(correlation_id: u64) -> Self {
121 Self::with_correlation(correlation_id)
122 }
123
124 pub fn unsolicited() -> Self {
127 Self::with_correlation(0)
128 }
129
130 fn with_correlation(correlation_id: u64) -> Self {
131 Self {
132 kind: None,
133 correlation_id,
134 stream_id: 0,
135 payload: Vec::new(),
136 flags: Flags::empty(),
137 compress: Compress::No,
138 more_frames: false,
139 }
140 }
141
142 pub fn kind(mut self, kind: MessageKind) -> Self {
143 self.kind = Some(kind);
144 self
145 }
146
147 pub fn payload(mut self, payload: Vec<u8>) -> Self {
148 self.payload = payload;
149 self
150 }
151
152 pub fn stream_id(mut self, stream_id: u16) -> Self {
153 self.stream_id = stream_id;
154 self
155 }
156
157 pub fn flags(mut self, flags: Flags) -> Self {
162 self.flags = flags;
163 self
164 }
165
166 pub fn more_frames(mut self, more: bool) -> Self {
171 self.more_frames = more;
172 self
173 }
174
175 pub fn compress(mut self, yes: bool) -> Self {
179 self.compress = if yes { Compress::Yes } else { Compress::No };
180 self
181 }
182
183 pub fn build(self) -> Result<Frame, BuildError> {
198 let kind = self.kind.ok_or(BuildError::KindMissing)?;
199 let encoded_len = FRAME_HEADER_SIZE + self.payload.len();
200 if encoded_len > MAX_FRAME_SIZE as usize {
201 return Err(BuildError::PayloadTooLarge {
202 encoded_len,
203 max: MAX_FRAME_SIZE,
204 });
205 }
206
207 if !kind.permits_flags(self.flags) {
208 return Err(BuildError::FlagsNotAllowedForKind {
209 kind,
210 flags: self.flags.bits(),
211 });
212 }
213
214 let mut flags = self.flags;
215 if self.more_frames {
216 flags = flags.insert(Flags::MORE_FRAMES);
217 } else {
218 flags = Flags::from_bits(flags.bits() & !Flags::MORE_FRAMES.bits());
221 }
222
223 let compressed = match self.compress {
224 Compress::No => false,
225 Compress::Yes => is_payload_compressible(&self.payload),
226 };
227 if compressed {
228 flags = flags.insert(Flags::COMPRESSED);
229 } else {
230 flags = Flags::from_bits(flags.bits() & !Flags::COMPRESSED.bits());
231 }
232
233 if !kind.permits_flags(flags) {
238 return Err(BuildError::FlagsNotAllowedForKind {
239 kind,
240 flags: flags.bits(),
241 });
242 }
243
244 Ok(Frame {
245 kind,
246 flags,
247 stream_id: self.stream_id,
248 correlation_id: self.correlation_id,
249 payload: self.payload,
250 })
251 }
252}
253
254pub fn build_reply_frame(
255 correlation_id: u64,
256 kind: MessageKind,
257 payload: Vec<u8>,
258) -> Result<Frame, BuildError> {
259 FrameBuilder::reply_to(correlation_id)
260 .kind(kind)
261 .payload(payload)
262 .build()
263}
264
265pub fn build_error_frame(correlation_id: u64, message: &str) -> Result<Frame, BuildError> {
266 build_reply_frame(
267 correlation_id,
268 MessageKind::Error,
269 message.as_bytes().to_vec(),
270 )
271}
272
273pub fn build_error_frame_lossy(correlation_id: u64, message: &str) -> Frame {
274 build_error_frame(correlation_id, message).unwrap_or_else(|_| {
275 Frame::new(
276 MessageKind::Error,
277 correlation_id,
278 b"redwire error frame too large".to_vec(),
279 )
280 })
281}
282
283pub fn build_dispatch_reply_frame(
284 correlation_id: u64,
285 kind: MessageKind,
286 payload: Vec<u8>,
287) -> Frame {
288 build_reply_frame(correlation_id, kind, payload)
289 .unwrap_or_else(|err| build_error_frame_lossy(correlation_id, &err.to_string()))
290}
291
292pub fn rewrap_length_prefixed_handler_response(raw_bytes: &[u8], correlation_id: u64) -> Frame {
299 if raw_bytes.len() < 5 {
300 return build_error_frame_lossy(
301 correlation_id,
302 "fast-path handler returned a truncated frame",
303 );
304 }
305 let kind = MessageKind::from_u8(raw_bytes[4]).unwrap_or(MessageKind::Error);
306 build_dispatch_reply_frame(correlation_id, kind, raw_bytes[5..].to_vec())
307}
308
309pub fn build_query_frame(correlation_id: u64, sql: &str) -> Result<Frame, BuildError> {
310 build_request_frame(correlation_id, MessageKind::Query, sql.as_bytes().to_vec())
311}
312
313pub fn build_query_with_params_frame(
314 correlation_id: u64,
315 payload: Vec<u8>,
316) -> Result<Frame, BuildError> {
317 build_request_frame(correlation_id, MessageKind::QueryWithParams, payload)
318}
319
320pub fn build_bulk_insert_frame(correlation_id: u64, payload: Vec<u8>) -> Result<Frame, BuildError> {
321 build_request_frame(correlation_id, MessageKind::BulkInsert, payload)
322}
323
324pub fn build_get_frame(correlation_id: u64, payload: Vec<u8>) -> Result<Frame, BuildError> {
325 build_request_frame(correlation_id, MessageKind::Get, payload)
326}
327
328pub fn build_delete_frame(correlation_id: u64, payload: Vec<u8>) -> Result<Frame, BuildError> {
329 build_request_frame(correlation_id, MessageKind::Delete, payload)
330}
331
332pub fn build_bulk_insert_binary_frame(
333 correlation_id: u64,
334 payload: Vec<u8>,
335) -> Result<Frame, BuildError> {
336 build_request_frame(correlation_id, MessageKind::BulkInsertBinary, payload)
337}
338
339pub fn build_ping_frame(correlation_id: u64) -> Result<Frame, BuildError> {
340 build_request_frame(correlation_id, MessageKind::Ping, Vec::new())
341}
342
343pub fn build_bye_frame(correlation_id: u64) -> Result<Frame, BuildError> {
344 build_request_frame(correlation_id, MessageKind::Bye, Vec::new())
345}
346
347pub fn build_request_frame(
348 correlation_id: u64,
349 kind: MessageKind,
350 payload: Vec<u8>,
351) -> Result<Frame, BuildError> {
352 FrameBuilder::request(correlation_id)
353 .kind(kind)
354 .payload(payload)
355 .build()
356}
357
358fn is_payload_compressible(payload: &[u8]) -> bool {
364 payload.len() > 32
365}
366
367#[cfg(test)]
368mod tests {
369 use super::*;
370
371 #[test]
372 fn reply_to_propagates_correlation_id() {
373 let frame = FrameBuilder::reply_to(0xABCD)
376 .kind(MessageKind::Result)
377 .payload(b"ok".to_vec())
378 .build()
379 .expect("build");
380 assert_eq!(frame.correlation_id, 0xABCD);
381 assert_eq!(frame.kind, MessageKind::Result);
382 assert_eq!(frame.payload, b"ok");
383 }
384
385 #[test]
386 fn request_builders_choose_client_message_kinds() {
387 let query = build_query_frame(1, "select 1").expect("query");
388 assert_eq!(query.kind, MessageKind::Query);
389 assert_eq!(query.correlation_id, 1);
390 assert_eq!(query.payload, b"select 1");
391
392 let ping = build_ping_frame(2).expect("ping");
393 assert_eq!(ping.kind, MessageKind::Ping);
394 assert!(ping.payload.is_empty());
395
396 let bye = build_bye_frame(3).expect("bye");
397 assert_eq!(bye.kind, MessageKind::Bye);
398 assert!(bye.payload.is_empty());
399 }
400
401 #[test]
402 fn unsolicited_uses_zero_correlation() {
403 let frame = FrameBuilder::unsolicited()
404 .kind(MessageKind::Notice)
405 .payload(b"server-side notice".to_vec())
406 .build()
407 .expect("build");
408 assert_eq!(frame.correlation_id, 0);
409 }
410
411 #[test]
412 fn missing_kind_rejected() {
413 let err = FrameBuilder::reply_to(1).build().unwrap_err();
414 assert_eq!(err, BuildError::KindMissing);
415 }
416
417 #[test]
418 fn more_frames_last_frame_clears_the_flag() {
419 let middle = FrameBuilder::reply_to(7)
423 .kind(MessageKind::Result)
424 .payload(vec![0; 8])
425 .more_frames(true)
426 .build()
427 .expect("build middle");
428 assert!(
429 middle.flags.contains(Flags::MORE_FRAMES),
430 "middle frame must carry MORE_FRAMES"
431 );
432
433 let last = FrameBuilder::reply_to(7)
434 .kind(MessageKind::Result)
435 .payload(vec![0; 8])
436 .more_frames(false)
437 .build()
438 .expect("build last");
439 assert!(
440 !last.flags.contains(Flags::MORE_FRAMES),
441 "last frame must clear MORE_FRAMES"
442 );
443 }
444
445 #[test]
446 fn more_frames_default_is_last_frame() {
447 let frame = FrameBuilder::reply_to(1)
451 .kind(MessageKind::Pong)
452 .build()
453 .expect("build");
454 assert!(!frame.flags.contains(Flags::MORE_FRAMES));
455 }
456
457 #[test]
458 fn payload_at_max_size_accepted() {
459 let payload = vec![0u8; (MAX_FRAME_SIZE as usize) - FRAME_HEADER_SIZE];
460 let frame = FrameBuilder::reply_to(1)
461 .kind(MessageKind::Result)
462 .payload(payload)
463 .build()
464 .expect("build at limit");
465 assert_eq!(frame.encoded_len(), MAX_FRAME_SIZE);
466 }
467
468 #[test]
469 fn payload_over_max_size_rejected() {
470 let oversize = (MAX_FRAME_SIZE as usize) - FRAME_HEADER_SIZE + 1;
474 let payload = vec![0u8; oversize];
475 let err = FrameBuilder::reply_to(1)
476 .kind(MessageKind::Result)
477 .payload(payload)
478 .build()
479 .unwrap_err();
480 match err {
481 BuildError::PayloadTooLarge { encoded_len, max } => {
482 assert_eq!(max, MAX_FRAME_SIZE);
483 assert_eq!(encoded_len, MAX_FRAME_SIZE as usize + 1);
484 }
485 other => panic!("expected PayloadTooLarge, got {other:?}"),
486 }
487 }
488
489 #[test]
490 fn compression_fallback_drops_flag_for_incompressible_payload() {
491 let frame = FrameBuilder::reply_to(1)
496 .kind(MessageKind::Result)
497 .payload(b"tiny".to_vec())
498 .compress(true)
499 .build()
500 .expect("build");
501 assert!(
502 !frame.flags.contains(Flags::COMPRESSED),
503 "incompressible payload must not carry COMPRESSED"
504 );
505 }
506
507 #[test]
508 fn compression_kept_for_compressible_payload() {
509 let payload = b"abcabcabc".repeat(16);
510 let frame = FrameBuilder::reply_to(1)
511 .kind(MessageKind::Result)
512 .payload(payload)
513 .compress(true)
514 .build()
515 .expect("build");
516 assert!(frame.flags.contains(Flags::COMPRESSED));
517 }
518
519 #[test]
520 fn flags_not_allowed_for_kind_rejected_at_build() {
521 let err = FrameBuilder::reply_to(1)
525 .kind(MessageKind::Hello)
526 .flags(Flags::COMPRESSED)
527 .build()
528 .unwrap_err();
529 match err {
530 BuildError::FlagsNotAllowedForKind { kind, flags } => {
531 assert_eq!(kind, MessageKind::Hello);
532 assert_eq!(flags, Flags::COMPRESSED.bits());
533 }
534 other => panic!("expected FlagsNotAllowedForKind, got {other:?}"),
535 }
536 }
537
538 #[test]
539 fn stream_id_propagates() {
540 let frame = FrameBuilder::reply_to(1)
541 .kind(MessageKind::Result)
542 .stream_id(0xBEEF)
543 .build()
544 .expect("build");
545 assert_eq!(frame.stream_id, 0xBEEF);
546 }
547
548 #[test]
549 fn generic_reply_builders_pin_server_frame_contracts() {
550 let reply = build_reply_frame(7, MessageKind::Pong, b"ok".to_vec()).expect("reply frame");
551 assert_eq!(reply.correlation_id, 7);
552 assert_eq!(reply.kind, MessageKind::Pong);
553 assert_eq!(reply.payload, b"ok");
554
555 let err = build_error_frame(8, "bad request").expect("error frame");
556 assert_eq!(err.kind, MessageKind::Error);
557 assert_eq!(err.correlation_id, 8);
558 assert_eq!(err.payload, b"bad request");
559
560 let dispatch = build_dispatch_reply_frame(9, MessageKind::Result, b"rows".to_vec());
561 assert_eq!(dispatch.kind, MessageKind::Result);
562 assert_eq!(dispatch.correlation_id, 9);
563 }
564
565 #[test]
566 fn rewraps_length_prefixed_handler_response() {
567 let raw = [4u8, 0, 0, 0, MessageKind::BulkOk as u8, b'o', b'k'];
568 let frame = rewrap_length_prefixed_handler_response(&raw, 12);
569 assert_eq!(frame.correlation_id, 12);
570 assert_eq!(frame.kind, MessageKind::BulkOk);
571 assert_eq!(frame.payload, b"ok");
572
573 let truncated = rewrap_length_prefixed_handler_response(&raw[..4], 13);
574 assert_eq!(truncated.kind, MessageKind::Error);
575 assert_eq!(truncated.correlation_id, 13);
576 assert_eq!(
577 truncated.payload,
578 b"fast-path handler returned a truncated frame"
579 );
580 }
581
582 #[test]
583 fn lossy_error_builder_never_panics_on_oversized_payload() {
584 let too_large = "x".repeat(MAX_FRAME_SIZE as usize);
585 let err = build_error_frame_lossy(11, &too_large);
586 assert_eq!(err.kind, MessageKind::Error);
587 assert_eq!(err.correlation_id, 11);
588 assert_eq!(err.payload, b"redwire error frame too large");
589 }
590}