1pub const FRAME_HEADER_SIZE: usize = 16;
20pub const MAX_FRAME_SIZE: u32 = 16 * 1024 * 1024;
21
22#[derive(Debug, Clone, PartialEq, Eq)]
23pub struct Frame {
24 pub kind: MessageKind,
25 pub flags: Flags,
26 pub stream_id: u16,
27 pub correlation_id: u64,
28 pub payload: Vec<u8>,
29}
30
31impl Frame {
32 pub fn new(kind: MessageKind, correlation_id: u64, payload: Vec<u8>) -> Self {
33 Self {
34 kind,
35 flags: Flags::empty(),
36 stream_id: 0,
37 correlation_id,
38 payload,
39 }
40 }
41
42 pub fn with_stream(mut self, stream_id: u16) -> Self {
43 self.stream_id = stream_id;
44 self
45 }
46
47 pub fn with_flags(mut self, flags: Flags) -> Self {
48 self.flags = flags;
49 self
50 }
51
52 pub fn encoded_len(&self) -> u32 {
53 (FRAME_HEADER_SIZE + self.payload.len()) as u32
54 }
55}
56
57#[derive(Debug, Clone, Copy, PartialEq, Eq)]
60#[repr(u8)]
61pub enum MessageKind {
62 Query = 0x01,
64 Result = 0x02,
65 Error = 0x03,
66 BulkInsert = 0x04,
67 BulkOk = 0x05,
68 BulkInsertBinary = 0x06,
69 QueryBinary = 0x07,
70 BulkInsertPrevalidated = 0x08,
71 BulkStreamStart = 0x09,
72 BulkStreamRows = 0x0A,
73 BulkStreamCommit = 0x0B,
74 BulkStreamAck = 0x0C,
75 Prepare = 0x0D,
76 PreparedOk = 0x0E,
77 ExecutePrepared = 0x0F,
78
79 Hello = 0x10,
81 HelloAck = 0x11,
82 AuthRequest = 0x12,
83 AuthResponse = 0x13,
84 AuthOk = 0x14,
85 AuthFail = 0x15,
86 Bye = 0x16,
87 Ping = 0x17,
88 Pong = 0x18,
89 Get = 0x19,
90 Delete = 0x1A,
91 DeleteOk = 0x1B,
92
93 Cancel = 0x20,
95 Compress = 0x21,
96 SetSession = 0x22,
97 Notice = 0x23,
98
99 RowDescription = 0x24,
101 StreamEnd = 0x25,
102
103 VectorSearch = 0x26,
105 GraphTraverse = 0x27,
106 QueryWithParams = 0x28,
107
108 OpenStream = 0x29,
112 OpenAck = 0x2A,
113 StreamChunk = 0x2B,
114 StreamError = 0x2C,
115 StreamCancel = 0x2D,
116}
117
118#[derive(Debug, Clone, Copy, PartialEq, Eq)]
127pub enum MessageClass {
128 DataPlane,
129 Handshake,
130 ControlPlane,
131 Streamed,
132}
133
134#[derive(Debug, Clone, Copy, PartialEq, Eq)]
142pub enum MessageDirection {
143 ClientToServer,
144 ServerToClient,
145 Both,
146}
147
148impl MessageKind {
149 pub fn class(&self) -> MessageClass {
151 match self {
152 Self::Query
157 | Self::Result
158 | Self::Error
159 | Self::BulkInsert
160 | Self::BulkOk
161 | Self::BulkInsertBinary
162 | Self::QueryBinary
163 | Self::BulkInsertPrevalidated
164 | Self::Prepare
165 | Self::PreparedOk
166 | Self::ExecutePrepared
167 | Self::Get
168 | Self::Delete
169 | Self::DeleteOk
170 | Self::VectorSearch
171 | Self::GraphTraverse
172 | Self::QueryWithParams => MessageClass::DataPlane,
173
174 Self::BulkStreamStart
180 | Self::BulkStreamRows
181 | Self::BulkStreamCommit
182 | Self::BulkStreamAck
183 | Self::RowDescription
184 | Self::StreamEnd
185 | Self::OpenStream
186 | Self::OpenAck
187 | Self::StreamChunk
188 | Self::StreamError
189 | Self::StreamCancel => MessageClass::Streamed,
190
191 Self::Hello
193 | Self::HelloAck
194 | Self::AuthRequest
195 | Self::AuthResponse
196 | Self::AuthOk
197 | Self::AuthFail
198 | Self::Bye
199 | Self::Ping
200 | Self::Pong => MessageClass::Handshake,
201
202 Self::Cancel | Self::Compress | Self::SetSession | Self::Notice => {
204 MessageClass::ControlPlane
205 }
206 }
207 }
208
209 pub fn allowed_flags(&self) -> Flags {
219 match self {
220 Self::Hello
223 | Self::HelloAck
224 | Self::AuthRequest
225 | Self::AuthResponse
226 | Self::AuthOk
227 | Self::AuthFail
228 | Self::Bye
229 | Self::Ping
230 | Self::Pong => Flags::MORE_FRAMES,
231
232 _ => Flags::COMPRESSED.insert(Flags::MORE_FRAMES),
234 }
235 }
236
237 pub fn is_handshake(&self) -> bool {
242 matches!(self.class(), MessageClass::Handshake)
243 }
244
245 pub fn permits_flags(&self, flags: Flags) -> bool {
251 let allowed = self.allowed_flags().bits();
252 (flags.bits() & !allowed) == 0
253 }
254
255 pub fn direction(&self) -> MessageDirection {
257 match self {
258 Self::Hello
260 | Self::AuthResponse
261 | Self::Query
262 | Self::QueryBinary
263 | Self::BulkInsert
264 | Self::BulkInsertBinary
265 | Self::BulkInsertPrevalidated
266 | Self::BulkStreamStart
267 | Self::BulkStreamRows
268 | Self::BulkStreamCommit
269 | Self::Prepare
270 | Self::ExecutePrepared
271 | Self::Get
272 | Self::Delete
273 | Self::Cancel
274 | Self::Compress
275 | Self::SetSession
276 | Self::VectorSearch
277 | Self::GraphTraverse
278 | Self::QueryWithParams
279 | Self::OpenStream
280 | Self::StreamCancel => MessageDirection::ClientToServer,
281
282 Self::StreamChunk => MessageDirection::Both,
288
289 Self::HelloAck
291 | Self::AuthRequest
292 | Self::AuthOk
293 | Self::AuthFail
294 | Self::Result
295 | Self::Error
296 | Self::BulkOk
297 | Self::BulkStreamAck
298 | Self::PreparedOk
299 | Self::DeleteOk
300 | Self::Notice
301 | Self::RowDescription
302 | Self::StreamEnd
303 | Self::OpenAck
304 | Self::StreamError => MessageDirection::ServerToClient,
305
306 Self::Bye | Self::Ping | Self::Pong => MessageDirection::Both,
310 }
311 }
312
313 pub fn from_u8(byte: u8) -> Option<Self> {
314 match byte {
315 0x01 => Some(Self::Query),
316 0x02 => Some(Self::Result),
317 0x03 => Some(Self::Error),
318 0x04 => Some(Self::BulkInsert),
319 0x05 => Some(Self::BulkOk),
320 0x06 => Some(Self::BulkInsertBinary),
321 0x07 => Some(Self::QueryBinary),
322 0x08 => Some(Self::BulkInsertPrevalidated),
323 0x09 => Some(Self::BulkStreamStart),
324 0x0A => Some(Self::BulkStreamRows),
325 0x0B => Some(Self::BulkStreamCommit),
326 0x0C => Some(Self::BulkStreamAck),
327 0x0D => Some(Self::Prepare),
328 0x0E => Some(Self::PreparedOk),
329 0x0F => Some(Self::ExecutePrepared),
330 0x10 => Some(Self::Hello),
331 0x11 => Some(Self::HelloAck),
332 0x12 => Some(Self::AuthRequest),
333 0x13 => Some(Self::AuthResponse),
334 0x14 => Some(Self::AuthOk),
335 0x15 => Some(Self::AuthFail),
336 0x16 => Some(Self::Bye),
337 0x17 => Some(Self::Ping),
338 0x18 => Some(Self::Pong),
339 0x19 => Some(Self::Get),
340 0x1A => Some(Self::Delete),
341 0x1B => Some(Self::DeleteOk),
342 0x20 => Some(Self::Cancel),
343 0x21 => Some(Self::Compress),
344 0x22 => Some(Self::SetSession),
345 0x23 => Some(Self::Notice),
346 0x24 => Some(Self::RowDescription),
347 0x25 => Some(Self::StreamEnd),
348 0x26 => Some(Self::VectorSearch),
349 0x27 => Some(Self::GraphTraverse),
350 0x28 => Some(Self::QueryWithParams),
351 0x29 => Some(Self::OpenStream),
352 0x2A => Some(Self::OpenAck),
353 0x2B => Some(Self::StreamChunk),
354 0x2C => Some(Self::StreamError),
355 0x2D => Some(Self::StreamCancel),
356 _ => None,
357 }
358 }
359}
360
361#[derive(Debug, Clone, Copy, PartialEq, Eq)]
362pub struct Flags(u8);
363
364impl Flags {
365 pub const COMPRESSED: Self = Self(0b0000_0001);
366 pub const MORE_FRAMES: Self = Self(0b0000_0010);
367
368 pub const fn empty() -> Self {
369 Self(0)
370 }
371
372 pub const fn bits(self) -> u8 {
373 self.0
374 }
375
376 pub const fn from_bits(bits: u8) -> Self {
377 Self(bits)
378 }
379
380 pub const fn contains(self, other: Self) -> bool {
381 (self.0 & other.0) == other.0
382 }
383
384 pub const fn insert(self, other: Self) -> Self {
385 Self(self.0 | other.0)
386 }
387}
388
389impl std::ops::BitOr for Flags {
390 type Output = Self;
391 fn bitor(self, rhs: Self) -> Self {
392 self.insert(rhs)
393 }
394}
395
396#[cfg(test)]
397mod catalog_tests {
398 use super::*;
399
400 const ALL_KINDS: &[MessageKind] = &[
404 MessageKind::Query,
405 MessageKind::Result,
406 MessageKind::Error,
407 MessageKind::BulkInsert,
408 MessageKind::BulkOk,
409 MessageKind::BulkInsertBinary,
410 MessageKind::QueryBinary,
411 MessageKind::BulkInsertPrevalidated,
412 MessageKind::BulkStreamStart,
413 MessageKind::BulkStreamRows,
414 MessageKind::BulkStreamCommit,
415 MessageKind::BulkStreamAck,
416 MessageKind::Prepare,
417 MessageKind::PreparedOk,
418 MessageKind::ExecutePrepared,
419 MessageKind::Hello,
420 MessageKind::HelloAck,
421 MessageKind::AuthRequest,
422 MessageKind::AuthResponse,
423 MessageKind::AuthOk,
424 MessageKind::AuthFail,
425 MessageKind::Bye,
426 MessageKind::Ping,
427 MessageKind::Pong,
428 MessageKind::Get,
429 MessageKind::Delete,
430 MessageKind::DeleteOk,
431 MessageKind::Cancel,
432 MessageKind::Compress,
433 MessageKind::SetSession,
434 MessageKind::Notice,
435 MessageKind::RowDescription,
436 MessageKind::StreamEnd,
437 MessageKind::VectorSearch,
438 MessageKind::GraphTraverse,
439 MessageKind::QueryWithParams,
440 MessageKind::OpenStream,
441 MessageKind::OpenAck,
442 MessageKind::StreamChunk,
443 MessageKind::StreamError,
444 MessageKind::StreamCancel,
445 ];
446
447 #[test]
448 fn class_matrix_is_pinned() {
449 assert_eq!(MessageKind::Hello.class(), MessageClass::Handshake);
452 assert_eq!(MessageKind::HelloAck.class(), MessageClass::Handshake);
453 assert_eq!(MessageKind::AuthRequest.class(), MessageClass::Handshake);
454 assert_eq!(MessageKind::AuthResponse.class(), MessageClass::Handshake);
455 assert_eq!(MessageKind::AuthOk.class(), MessageClass::Handshake);
456 assert_eq!(MessageKind::AuthFail.class(), MessageClass::Handshake);
457 assert_eq!(MessageKind::Bye.class(), MessageClass::Handshake);
458 assert_eq!(MessageKind::Ping.class(), MessageClass::Handshake);
459 assert_eq!(MessageKind::Pong.class(), MessageClass::Handshake);
460
461 assert_eq!(MessageKind::Query.class(), MessageClass::DataPlane);
463 assert_eq!(MessageKind::Result.class(), MessageClass::DataPlane);
464 assert_eq!(MessageKind::BulkInsert.class(), MessageClass::DataPlane);
465 assert_eq!(MessageKind::Get.class(), MessageClass::DataPlane);
466 assert_eq!(MessageKind::Delete.class(), MessageClass::DataPlane);
467 assert_eq!(MessageKind::DeleteOk.class(), MessageClass::DataPlane);
468 assert_eq!(MessageKind::VectorSearch.class(), MessageClass::DataPlane);
469 assert_eq!(MessageKind::GraphTraverse.class(), MessageClass::DataPlane);
470 assert_eq!(
471 MessageKind::QueryWithParams.class(),
472 MessageClass::DataPlane
473 );
474
475 assert_eq!(MessageKind::BulkStreamStart.class(), MessageClass::Streamed);
477 assert_eq!(MessageKind::BulkStreamRows.class(), MessageClass::Streamed);
478 assert_eq!(
479 MessageKind::BulkStreamCommit.class(),
480 MessageClass::Streamed
481 );
482 assert_eq!(MessageKind::BulkStreamAck.class(), MessageClass::Streamed);
483 assert_eq!(MessageKind::RowDescription.class(), MessageClass::Streamed);
484 assert_eq!(MessageKind::StreamEnd.class(), MessageClass::Streamed);
485
486 assert_eq!(MessageKind::OpenStream.class(), MessageClass::Streamed);
490 assert_eq!(MessageKind::OpenAck.class(), MessageClass::Streamed);
491 assert_eq!(MessageKind::StreamChunk.class(), MessageClass::Streamed);
492 assert_eq!(MessageKind::StreamError.class(), MessageClass::Streamed);
493 assert_eq!(MessageKind::StreamCancel.class(), MessageClass::Streamed);
494
495 assert_eq!(MessageKind::Cancel.class(), MessageClass::ControlPlane);
497 assert_eq!(MessageKind::Compress.class(), MessageClass::ControlPlane);
498 assert_eq!(MessageKind::SetSession.class(), MessageClass::ControlPlane);
499 assert_eq!(MessageKind::Notice.class(), MessageClass::ControlPlane);
500
501 for k in ALL_KINDS {
503 let _ = k.class();
504 }
505 }
506
507 #[test]
508 fn allowed_flags_matrix_is_pinned() {
509 let handshake = [
513 MessageKind::Hello,
514 MessageKind::HelloAck,
515 MessageKind::AuthRequest,
516 MessageKind::AuthResponse,
517 MessageKind::AuthOk,
518 MessageKind::AuthFail,
519 MessageKind::Bye,
520 MessageKind::Ping,
521 MessageKind::Pong,
522 ];
523 for k in handshake {
524 let f = k.allowed_flags();
525 assert!(
526 f.contains(Flags::MORE_FRAMES),
527 "{k:?} must allow MORE_FRAMES"
528 );
529 assert!(
530 !f.contains(Flags::COMPRESSED),
531 "{k:?} must NOT allow COMPRESSED today"
532 );
533 }
534
535 for k in ALL_KINDS {
537 if handshake.contains(k) {
538 continue;
539 }
540 let f = k.allowed_flags();
541 assert!(
542 f.contains(Flags::MORE_FRAMES),
543 "{k:?} must allow MORE_FRAMES"
544 );
545 assert!(f.contains(Flags::COMPRESSED), "{k:?} must allow COMPRESSED");
546 }
547 }
548
549 #[test]
550 fn every_kind_has_unique_byte_value() {
551 let mut seen = std::collections::HashSet::new();
554 for k in ALL_KINDS {
555 let byte = *k as u8;
556 assert!(
557 seen.insert(byte),
558 "byte 0x{byte:02x} reused by {k:?}; catalog has a duplicate"
559 );
560 }
561 }
562
563 #[test]
564 fn from_u8_round_trips_for_every_kind() {
565 for k in ALL_KINDS {
566 let byte = *k as u8;
567 let decoded = MessageKind::from_u8(byte).unwrap_or_else(|| {
568 panic!("from_u8 returned None for catalog entry {k:?} (0x{byte:02x})")
569 });
570 assert_eq!(
571 decoded, *k,
572 "from_u8(0x{byte:02x}) must round-trip back to {k:?}"
573 );
574 }
575 }
576
577 #[test]
578 fn permits_flags_matches_allowed_flags() {
579 assert!(MessageKind::Ping.permits_flags(Flags::MORE_FRAMES));
581 assert!(MessageKind::Ping.permits_flags(Flags::empty()));
582 assert!(!MessageKind::Ping.permits_flags(Flags::COMPRESSED));
583 assert!(!MessageKind::Ping.permits_flags(Flags::COMPRESSED | Flags::MORE_FRAMES));
584
585 assert!(MessageKind::BulkStreamRows.permits_flags(Flags::MORE_FRAMES));
589 assert!(MessageKind::BulkStreamRows.permits_flags(Flags::COMPRESSED));
590 assert!(MessageKind::RowDescription.permits_flags(Flags::MORE_FRAMES));
591 assert!(MessageKind::StreamEnd.permits_flags(Flags::MORE_FRAMES));
592 }
593
594 #[test]
595 fn direction_matrix_is_pinned() {
596 for k in [
598 MessageKind::Hello,
599 MessageKind::AuthResponse,
600 MessageKind::Query,
601 MessageKind::QueryBinary,
602 MessageKind::BulkInsert,
603 MessageKind::BulkInsertBinary,
604 MessageKind::BulkInsertPrevalidated,
605 MessageKind::BulkStreamStart,
606 MessageKind::BulkStreamRows,
607 MessageKind::BulkStreamCommit,
608 MessageKind::Prepare,
609 MessageKind::ExecutePrepared,
610 MessageKind::Get,
611 MessageKind::Delete,
612 MessageKind::Cancel,
613 MessageKind::Compress,
614 MessageKind::SetSession,
615 MessageKind::VectorSearch,
616 MessageKind::GraphTraverse,
617 MessageKind::QueryWithParams,
618 MessageKind::OpenStream,
619 MessageKind::StreamCancel,
620 ] {
621 assert_eq!(
622 k.direction(),
623 MessageDirection::ClientToServer,
624 "{k:?} should be client-originated"
625 );
626 }
627
628 for k in [
630 MessageKind::HelloAck,
631 MessageKind::AuthRequest,
632 MessageKind::AuthOk,
633 MessageKind::AuthFail,
634 MessageKind::Result,
635 MessageKind::Error,
636 MessageKind::BulkOk,
637 MessageKind::BulkStreamAck,
638 MessageKind::PreparedOk,
639 MessageKind::DeleteOk,
640 MessageKind::Notice,
641 MessageKind::RowDescription,
642 MessageKind::StreamEnd,
643 MessageKind::OpenAck,
644 MessageKind::StreamError,
645 ] {
646 assert_eq!(
647 k.direction(),
648 MessageDirection::ServerToClient,
649 "{k:?} should be server-originated"
650 );
651 }
652
653 for k in [
657 MessageKind::Bye,
658 MessageKind::Ping,
659 MessageKind::Pong,
660 MessageKind::StreamChunk,
661 ] {
662 assert_eq!(
663 k.direction(),
664 MessageDirection::Both,
665 "{k:?} should be symmetric"
666 );
667 }
668 }
669}