1pub const FRAME_HEADER_SIZE: usize = 16;
20pub const MAX_FRAME_SIZE: u32 = 16 * 1024 * 1024;
21
22#[derive(Debug, Clone, PartialEq, Eq)]
23pub struct Frame {
24 pub kind: MessageKind,
25 pub flags: Flags,
26 pub stream_id: u16,
27 pub correlation_id: u64,
28 pub payload: Vec<u8>,
29}
30
31impl Frame {
32 pub fn new(kind: MessageKind, correlation_id: u64, payload: Vec<u8>) -> Self {
33 Self {
34 kind,
35 flags: Flags::empty(),
36 stream_id: 0,
37 correlation_id,
38 payload,
39 }
40 }
41
42 pub fn with_stream(mut self, stream_id: u16) -> Self {
43 self.stream_id = stream_id;
44 self
45 }
46
47 pub fn with_flags(mut self, flags: Flags) -> Self {
48 self.flags = flags;
49 self
50 }
51
52 pub fn encoded_len(&self) -> u32 {
53 (FRAME_HEADER_SIZE + self.payload.len()) as u32
54 }
55}
56
57#[derive(Debug, Clone, Copy, PartialEq, Eq)]
60#[repr(u8)]
61pub enum MessageKind {
62 Query = 0x01,
64 Result = 0x02,
65 Error = 0x03,
66 BulkInsert = 0x04,
67 BulkOk = 0x05,
68 BulkInsertBinary = 0x06,
69 QueryBinary = 0x07,
70 BulkInsertPrevalidated = 0x08,
71 BulkStreamStart = 0x09,
72 BulkStreamRows = 0x0A,
73 BulkStreamCommit = 0x0B,
74 BulkStreamAck = 0x0C,
75 Prepare = 0x0D,
76 PreparedOk = 0x0E,
77 ExecutePrepared = 0x0F,
78
79 Hello = 0x10,
81 HelloAck = 0x11,
82 AuthRequest = 0x12,
83 AuthResponse = 0x13,
84 AuthOk = 0x14,
85 AuthFail = 0x15,
86 Bye = 0x16,
87 Ping = 0x17,
88 Pong = 0x18,
89 Get = 0x19,
90 Delete = 0x1A,
91 DeleteOk = 0x1B,
92
93 Cancel = 0x20,
95 Compress = 0x21,
96 SetSession = 0x22,
97 Notice = 0x23,
98
99 RowDescription = 0x24,
101 StreamEnd = 0x25,
102
103 VectorSearch = 0x26,
105 GraphTraverse = 0x27,
106 QueryWithParams = 0x28,
107
108 OpenStream = 0x29,
112 OpenAck = 0x2A,
113 StreamChunk = 0x2B,
114 StreamError = 0x2C,
115 StreamCancel = 0x2D,
116
117 QueueWaitOpen = 0x2E,
125 QueueEventPush = 0x2F,
126 QueueWaitTimeout = 0x30,
131}
132
133#[derive(Debug, Clone, Copy, PartialEq, Eq)]
142pub enum MessageClass {
143 DataPlane,
144 Handshake,
145 ControlPlane,
146 Streamed,
147}
148
149#[derive(Debug, Clone, Copy, PartialEq, Eq)]
157pub enum MessageDirection {
158 ClientToServer,
159 ServerToClient,
160 Both,
161}
162
163impl MessageKind {
164 pub fn class(&self) -> MessageClass {
166 match self {
167 Self::Query
172 | Self::Result
173 | Self::Error
174 | Self::BulkInsert
175 | Self::BulkOk
176 | Self::BulkInsertBinary
177 | Self::QueryBinary
178 | Self::BulkInsertPrevalidated
179 | Self::Prepare
180 | Self::PreparedOk
181 | Self::ExecutePrepared
182 | Self::Get
183 | Self::Delete
184 | Self::DeleteOk
185 | Self::VectorSearch
186 | Self::GraphTraverse
187 | Self::QueryWithParams => MessageClass::DataPlane,
188
189 Self::BulkStreamStart
195 | Self::BulkStreamRows
196 | Self::BulkStreamCommit
197 | Self::BulkStreamAck
198 | Self::RowDescription
199 | Self::StreamEnd
200 | Self::OpenStream
201 | Self::OpenAck
202 | Self::StreamChunk
203 | Self::StreamError
204 | Self::StreamCancel
205 | Self::QueueWaitOpen
208 | Self::QueueEventPush
209 | Self::QueueWaitTimeout => MessageClass::Streamed,
212
213 Self::Hello
215 | Self::HelloAck
216 | Self::AuthRequest
217 | Self::AuthResponse
218 | Self::AuthOk
219 | Self::AuthFail
220 | Self::Bye
221 | Self::Ping
222 | Self::Pong => MessageClass::Handshake,
223
224 Self::Cancel | Self::Compress | Self::SetSession | Self::Notice => {
226 MessageClass::ControlPlane
227 }
228 }
229 }
230
231 pub fn allowed_flags(&self) -> Flags {
241 match self {
242 Self::Hello
245 | Self::HelloAck
246 | Self::AuthRequest
247 | Self::AuthResponse
248 | Self::AuthOk
249 | Self::AuthFail
250 | Self::Bye
251 | Self::Ping
252 | Self::Pong => Flags::MORE_FRAMES,
253
254 _ => Flags::COMPRESSED.insert(Flags::MORE_FRAMES),
256 }
257 }
258
259 pub fn is_handshake(&self) -> bool {
264 matches!(self.class(), MessageClass::Handshake)
265 }
266
267 pub fn permits_flags(&self, flags: Flags) -> bool {
273 let allowed = self.allowed_flags().bits();
274 (flags.bits() & !allowed) == 0
275 }
276
277 pub fn direction(&self) -> MessageDirection {
279 match self {
280 Self::Hello
282 | Self::AuthResponse
283 | Self::Query
284 | Self::QueryBinary
285 | Self::BulkInsert
286 | Self::BulkInsertBinary
287 | Self::BulkInsertPrevalidated
288 | Self::BulkStreamStart
289 | Self::BulkStreamRows
290 | Self::BulkStreamCommit
291 | Self::Prepare
292 | Self::ExecutePrepared
293 | Self::Get
294 | Self::Delete
295 | Self::Cancel
296 | Self::Compress
297 | Self::SetSession
298 | Self::VectorSearch
299 | Self::GraphTraverse
300 | Self::QueryWithParams
301 | Self::OpenStream
302 | Self::StreamCancel
303 | Self::QueueWaitOpen => MessageDirection::ClientToServer,
305
306 Self::StreamChunk => MessageDirection::Both,
312
313 Self::HelloAck
315 | Self::AuthRequest
316 | Self::AuthOk
317 | Self::AuthFail
318 | Self::Result
319 | Self::Error
320 | Self::BulkOk
321 | Self::BulkStreamAck
322 | Self::PreparedOk
323 | Self::DeleteOk
324 | Self::Notice
325 | Self::RowDescription
326 | Self::StreamEnd
327 | Self::OpenAck
328 | Self::StreamError
329 | Self::QueueEventPush
332 | Self::QueueWaitTimeout => MessageDirection::ServerToClient,
333
334 Self::Bye | Self::Ping | Self::Pong => MessageDirection::Both,
338 }
339 }
340
341 pub fn from_u8(byte: u8) -> Option<Self> {
342 match byte {
343 0x01 => Some(Self::Query),
344 0x02 => Some(Self::Result),
345 0x03 => Some(Self::Error),
346 0x04 => Some(Self::BulkInsert),
347 0x05 => Some(Self::BulkOk),
348 0x06 => Some(Self::BulkInsertBinary),
349 0x07 => Some(Self::QueryBinary),
350 0x08 => Some(Self::BulkInsertPrevalidated),
351 0x09 => Some(Self::BulkStreamStart),
352 0x0A => Some(Self::BulkStreamRows),
353 0x0B => Some(Self::BulkStreamCommit),
354 0x0C => Some(Self::BulkStreamAck),
355 0x0D => Some(Self::Prepare),
356 0x0E => Some(Self::PreparedOk),
357 0x0F => Some(Self::ExecutePrepared),
358 0x10 => Some(Self::Hello),
359 0x11 => Some(Self::HelloAck),
360 0x12 => Some(Self::AuthRequest),
361 0x13 => Some(Self::AuthResponse),
362 0x14 => Some(Self::AuthOk),
363 0x15 => Some(Self::AuthFail),
364 0x16 => Some(Self::Bye),
365 0x17 => Some(Self::Ping),
366 0x18 => Some(Self::Pong),
367 0x19 => Some(Self::Get),
368 0x1A => Some(Self::Delete),
369 0x1B => Some(Self::DeleteOk),
370 0x20 => Some(Self::Cancel),
371 0x21 => Some(Self::Compress),
372 0x22 => Some(Self::SetSession),
373 0x23 => Some(Self::Notice),
374 0x24 => Some(Self::RowDescription),
375 0x25 => Some(Self::StreamEnd),
376 0x26 => Some(Self::VectorSearch),
377 0x27 => Some(Self::GraphTraverse),
378 0x28 => Some(Self::QueryWithParams),
379 0x29 => Some(Self::OpenStream),
380 0x2A => Some(Self::OpenAck),
381 0x2B => Some(Self::StreamChunk),
382 0x2C => Some(Self::StreamError),
383 0x2D => Some(Self::StreamCancel),
384 0x2E => Some(Self::QueueWaitOpen),
385 0x2F => Some(Self::QueueEventPush),
386 0x30 => Some(Self::QueueWaitTimeout),
387 _ => None,
388 }
389 }
390}
391
392#[derive(Debug, Clone, Copy, PartialEq, Eq)]
393pub struct Flags(u8);
394
395impl Flags {
396 pub const COMPRESSED: Self = Self(0b0000_0001);
397 pub const MORE_FRAMES: Self = Self(0b0000_0010);
398
399 pub const fn empty() -> Self {
400 Self(0)
401 }
402
403 pub const fn bits(self) -> u8 {
404 self.0
405 }
406
407 pub const fn from_bits(bits: u8) -> Self {
408 Self(bits)
409 }
410
411 pub const fn contains(self, other: Self) -> bool {
412 (self.0 & other.0) == other.0
413 }
414
415 pub const fn insert(self, other: Self) -> Self {
416 Self(self.0 | other.0)
417 }
418}
419
420impl std::ops::BitOr for Flags {
421 type Output = Self;
422 fn bitor(self, rhs: Self) -> Self {
423 self.insert(rhs)
424 }
425}
426
427#[cfg(test)]
428mod catalog_tests {
429 use super::*;
430
431 const ALL_KINDS: &[MessageKind] = &[
435 MessageKind::Query,
436 MessageKind::Result,
437 MessageKind::Error,
438 MessageKind::BulkInsert,
439 MessageKind::BulkOk,
440 MessageKind::BulkInsertBinary,
441 MessageKind::QueryBinary,
442 MessageKind::BulkInsertPrevalidated,
443 MessageKind::BulkStreamStart,
444 MessageKind::BulkStreamRows,
445 MessageKind::BulkStreamCommit,
446 MessageKind::BulkStreamAck,
447 MessageKind::Prepare,
448 MessageKind::PreparedOk,
449 MessageKind::ExecutePrepared,
450 MessageKind::Hello,
451 MessageKind::HelloAck,
452 MessageKind::AuthRequest,
453 MessageKind::AuthResponse,
454 MessageKind::AuthOk,
455 MessageKind::AuthFail,
456 MessageKind::Bye,
457 MessageKind::Ping,
458 MessageKind::Pong,
459 MessageKind::Get,
460 MessageKind::Delete,
461 MessageKind::DeleteOk,
462 MessageKind::Cancel,
463 MessageKind::Compress,
464 MessageKind::SetSession,
465 MessageKind::Notice,
466 MessageKind::RowDescription,
467 MessageKind::StreamEnd,
468 MessageKind::VectorSearch,
469 MessageKind::GraphTraverse,
470 MessageKind::QueryWithParams,
471 MessageKind::OpenStream,
472 MessageKind::OpenAck,
473 MessageKind::StreamChunk,
474 MessageKind::StreamError,
475 MessageKind::StreamCancel,
476 MessageKind::QueueWaitOpen,
477 MessageKind::QueueEventPush,
478 MessageKind::QueueWaitTimeout,
479 ];
480
481 #[test]
482 fn class_matrix_is_pinned() {
483 assert_eq!(MessageKind::Hello.class(), MessageClass::Handshake);
486 assert_eq!(MessageKind::HelloAck.class(), MessageClass::Handshake);
487 assert_eq!(MessageKind::AuthRequest.class(), MessageClass::Handshake);
488 assert_eq!(MessageKind::AuthResponse.class(), MessageClass::Handshake);
489 assert_eq!(MessageKind::AuthOk.class(), MessageClass::Handshake);
490 assert_eq!(MessageKind::AuthFail.class(), MessageClass::Handshake);
491 assert_eq!(MessageKind::Bye.class(), MessageClass::Handshake);
492 assert_eq!(MessageKind::Ping.class(), MessageClass::Handshake);
493 assert_eq!(MessageKind::Pong.class(), MessageClass::Handshake);
494
495 assert_eq!(MessageKind::Query.class(), MessageClass::DataPlane);
497 assert_eq!(MessageKind::Result.class(), MessageClass::DataPlane);
498 assert_eq!(MessageKind::BulkInsert.class(), MessageClass::DataPlane);
499 assert_eq!(MessageKind::Get.class(), MessageClass::DataPlane);
500 assert_eq!(MessageKind::Delete.class(), MessageClass::DataPlane);
501 assert_eq!(MessageKind::DeleteOk.class(), MessageClass::DataPlane);
502 assert_eq!(MessageKind::VectorSearch.class(), MessageClass::DataPlane);
503 assert_eq!(MessageKind::GraphTraverse.class(), MessageClass::DataPlane);
504 assert_eq!(
505 MessageKind::QueryWithParams.class(),
506 MessageClass::DataPlane
507 );
508
509 assert_eq!(MessageKind::BulkStreamStart.class(), MessageClass::Streamed);
511 assert_eq!(MessageKind::BulkStreamRows.class(), MessageClass::Streamed);
512 assert_eq!(
513 MessageKind::BulkStreamCommit.class(),
514 MessageClass::Streamed
515 );
516 assert_eq!(MessageKind::BulkStreamAck.class(), MessageClass::Streamed);
517 assert_eq!(MessageKind::RowDescription.class(), MessageClass::Streamed);
518 assert_eq!(MessageKind::StreamEnd.class(), MessageClass::Streamed);
519
520 assert_eq!(MessageKind::OpenStream.class(), MessageClass::Streamed);
524 assert_eq!(MessageKind::OpenAck.class(), MessageClass::Streamed);
525 assert_eq!(MessageKind::StreamChunk.class(), MessageClass::Streamed);
526 assert_eq!(MessageKind::StreamError.class(), MessageClass::Streamed);
527 assert_eq!(MessageKind::StreamCancel.class(), MessageClass::Streamed);
528
529 assert_eq!(MessageKind::QueueWaitOpen.class(), MessageClass::Streamed);
532 assert_eq!(MessageKind::QueueEventPush.class(), MessageClass::Streamed);
533 assert_eq!(
535 MessageKind::QueueWaitTimeout.class(),
536 MessageClass::Streamed
537 );
538
539 assert_eq!(MessageKind::Cancel.class(), MessageClass::ControlPlane);
541 assert_eq!(MessageKind::Compress.class(), MessageClass::ControlPlane);
542 assert_eq!(MessageKind::SetSession.class(), MessageClass::ControlPlane);
543 assert_eq!(MessageKind::Notice.class(), MessageClass::ControlPlane);
544
545 for k in ALL_KINDS {
547 let _ = k.class();
548 }
549 }
550
551 #[test]
552 fn allowed_flags_matrix_is_pinned() {
553 let handshake = [
557 MessageKind::Hello,
558 MessageKind::HelloAck,
559 MessageKind::AuthRequest,
560 MessageKind::AuthResponse,
561 MessageKind::AuthOk,
562 MessageKind::AuthFail,
563 MessageKind::Bye,
564 MessageKind::Ping,
565 MessageKind::Pong,
566 ];
567 for k in handshake {
568 let f = k.allowed_flags();
569 assert!(
570 f.contains(Flags::MORE_FRAMES),
571 "{k:?} must allow MORE_FRAMES"
572 );
573 assert!(
574 !f.contains(Flags::COMPRESSED),
575 "{k:?} must NOT allow COMPRESSED today"
576 );
577 }
578
579 for k in ALL_KINDS {
581 if handshake.contains(k) {
582 continue;
583 }
584 let f = k.allowed_flags();
585 assert!(
586 f.contains(Flags::MORE_FRAMES),
587 "{k:?} must allow MORE_FRAMES"
588 );
589 assert!(f.contains(Flags::COMPRESSED), "{k:?} must allow COMPRESSED");
590 }
591 }
592
593 #[test]
594 fn every_kind_has_unique_byte_value() {
595 let mut seen = std::collections::HashSet::new();
598 for k in ALL_KINDS {
599 let byte = *k as u8;
600 assert!(
601 seen.insert(byte),
602 "byte 0x{byte:02x} reused by {k:?}; catalog has a duplicate"
603 );
604 }
605 }
606
607 #[test]
608 fn from_u8_round_trips_for_every_kind() {
609 for k in ALL_KINDS {
610 let byte = *k as u8;
611 let decoded = MessageKind::from_u8(byte).unwrap_or_else(|| {
612 panic!("from_u8 returned None for catalog entry {k:?} (0x{byte:02x})")
613 });
614 assert_eq!(
615 decoded, *k,
616 "from_u8(0x{byte:02x}) must round-trip back to {k:?}"
617 );
618 }
619 }
620
621 #[test]
622 fn permits_flags_matches_allowed_flags() {
623 assert!(MessageKind::Ping.permits_flags(Flags::MORE_FRAMES));
625 assert!(MessageKind::Ping.permits_flags(Flags::empty()));
626 assert!(!MessageKind::Ping.permits_flags(Flags::COMPRESSED));
627 assert!(!MessageKind::Ping.permits_flags(Flags::COMPRESSED | Flags::MORE_FRAMES));
628
629 assert!(MessageKind::BulkStreamRows.permits_flags(Flags::MORE_FRAMES));
633 assert!(MessageKind::BulkStreamRows.permits_flags(Flags::COMPRESSED));
634 assert!(MessageKind::RowDescription.permits_flags(Flags::MORE_FRAMES));
635 assert!(MessageKind::StreamEnd.permits_flags(Flags::MORE_FRAMES));
636 }
637
638 #[test]
639 fn direction_matrix_is_pinned() {
640 for k in [
642 MessageKind::Hello,
643 MessageKind::AuthResponse,
644 MessageKind::Query,
645 MessageKind::QueryBinary,
646 MessageKind::BulkInsert,
647 MessageKind::BulkInsertBinary,
648 MessageKind::BulkInsertPrevalidated,
649 MessageKind::BulkStreamStart,
650 MessageKind::BulkStreamRows,
651 MessageKind::BulkStreamCommit,
652 MessageKind::Prepare,
653 MessageKind::ExecutePrepared,
654 MessageKind::Get,
655 MessageKind::Delete,
656 MessageKind::Cancel,
657 MessageKind::Compress,
658 MessageKind::SetSession,
659 MessageKind::VectorSearch,
660 MessageKind::GraphTraverse,
661 MessageKind::QueryWithParams,
662 MessageKind::OpenStream,
663 MessageKind::StreamCancel,
664 MessageKind::QueueWaitOpen,
665 ] {
666 assert_eq!(
667 k.direction(),
668 MessageDirection::ClientToServer,
669 "{k:?} should be client-originated"
670 );
671 }
672
673 for k in [
675 MessageKind::HelloAck,
676 MessageKind::AuthRequest,
677 MessageKind::AuthOk,
678 MessageKind::AuthFail,
679 MessageKind::Result,
680 MessageKind::Error,
681 MessageKind::BulkOk,
682 MessageKind::BulkStreamAck,
683 MessageKind::PreparedOk,
684 MessageKind::DeleteOk,
685 MessageKind::Notice,
686 MessageKind::RowDescription,
687 MessageKind::StreamEnd,
688 MessageKind::OpenAck,
689 MessageKind::StreamError,
690 MessageKind::QueueEventPush,
691 MessageKind::QueueWaitTimeout,
692 ] {
693 assert_eq!(
694 k.direction(),
695 MessageDirection::ServerToClient,
696 "{k:?} should be server-originated"
697 );
698 }
699
700 for k in [
704 MessageKind::Bye,
705 MessageKind::Ping,
706 MessageKind::Pong,
707 MessageKind::StreamChunk,
708 ] {
709 assert_eq!(
710 k.direction(),
711 MessageDirection::Both,
712 "{k:?} should be symmetric"
713 );
714 }
715 }
716}