Skip to main content

reddb_wire/redwire/
frame.rs

1//! RedWire frame layout — 16-byte header + payload, little-endian.
2//!
3//! ```text
4//! ┌──────────────────────────────────────────────────────────┐
5//! │ Header (16 bytes)                                         │
6//! │   u32   length         total frame size, incl. header     │
7//! │   u8    kind           MessageKind                         │
8//! │   u8    flags          COMPRESSED | MORE_FRAMES | …        │
9//! │   u16   stream_id      0 = unsolicited; otherwise multiplex│
10//! │   u64   correlation_id request↔response pairing           │
11//! ├──────────────────────────────────────────────────────────┤
12//! │ Payload (length - 16 bytes)                               │
13//! └──────────────────────────────────────────────────────────┘
14//! ```
15//!
16//! Data-plane kinds live at 0x01..0x0F; handshake / lifecycle at
17//! 0x10..0x1F; control plane at 0x20..0x3F.
18
19pub const FRAME_HEADER_SIZE: usize = 16;
20pub const MAX_FRAME_SIZE: u32 = 16 * 1024 * 1024;
21
22#[derive(Debug, Clone, PartialEq, Eq)]
23pub struct Frame {
24    pub kind: MessageKind,
25    pub flags: Flags,
26    pub stream_id: u16,
27    pub correlation_id: u64,
28    pub payload: Vec<u8>,
29}
30
31impl Frame {
32    pub fn new(kind: MessageKind, correlation_id: u64, payload: Vec<u8>) -> Self {
33        Self {
34            kind,
35            flags: Flags::empty(),
36            stream_id: 0,
37            correlation_id,
38            payload,
39        }
40    }
41
42    pub fn with_stream(mut self, stream_id: u16) -> Self {
43        self.stream_id = stream_id;
44        self
45    }
46
47    pub fn with_flags(mut self, flags: Flags) -> Self {
48        self.flags = flags;
49        self
50    }
51
52    pub fn encoded_len(&self) -> u32 {
53        (FRAME_HEADER_SIZE + self.payload.len()) as u32
54    }
55}
56
57/// Single-byte message-kind discriminator. Numeric values are
58/// part of the wire spec — never repurpose a value once shipped.
59#[derive(Debug, Clone, Copy, PartialEq, Eq)]
60#[repr(u8)]
61pub enum MessageKind {
62    // Data-plane codes.
63    Query = 0x01,
64    Result = 0x02,
65    Error = 0x03,
66    BulkInsert = 0x04,
67    BulkOk = 0x05,
68    BulkInsertBinary = 0x06,
69    QueryBinary = 0x07,
70    BulkInsertPrevalidated = 0x08,
71    BulkStreamStart = 0x09,
72    BulkStreamRows = 0x0A,
73    BulkStreamCommit = 0x0B,
74    BulkStreamAck = 0x0C,
75    Prepare = 0x0D,
76    PreparedOk = 0x0E,
77    ExecutePrepared = 0x0F,
78
79    // Handshake / lifecycle.
80    Hello = 0x10,
81    HelloAck = 0x11,
82    AuthRequest = 0x12,
83    AuthResponse = 0x13,
84    AuthOk = 0x14,
85    AuthFail = 0x15,
86    Bye = 0x16,
87    Ping = 0x17,
88    Pong = 0x18,
89    Get = 0x19,
90    Delete = 0x1A,
91    DeleteOk = 0x1B,
92
93    // Control plane.
94    Cancel = 0x20,
95    Compress = 0x21,
96    SetSession = 0x22,
97    Notice = 0x23,
98
99    // Streamed responses.
100    RowDescription = 0x24,
101    StreamEnd = 0x25,
102
103    // RedDB-native data plane.
104    VectorSearch = 0x26,
105    GraphTraverse = 0x27,
106    QueryWithParams = 0x28,
107
108    // Output stream lifecycle (issue #762 / PRD #759 S3). Streamed
109    // class — these envelopes describe an in-flight multiplexed
110    // stream over the existing `stream_id` field.
111    OpenStream = 0x29,
112    OpenAck = 0x2A,
113    StreamChunk = 0x2B,
114    StreamError = 0x2C,
115    StreamCancel = 0x2D,
116
117    // Live queue wait (issue #917 / PRD #915). A `QueueWaitOpen`
118    // registers an async waiter on a queue; the server pushes a
119    // `QueueEventPush` the instant a message becomes deliverable.
120    // Distinct from the OpenStream/StreamChunk output-stream family
121    // (which stays query-result pull) — these carry queue delivery,
122    // multiplexed over the frame's `stream_id` like the other
123    // streamed envelopes.
124    QueueWaitOpen = 0x2E,
125    QueueEventPush = 0x2F,
126    // Live queue-wait timeout (issue #919 / PRD #915). Pushed when a
127    // wait elapses with no deliverable message — a distinct kind so the
128    // outcome is unambiguous on the wire: not a `QueueEventPush`
129    // (delivery), not a `StreamError` (cancellation / failure).
130    QueueWaitTimeout = 0x30,
131}
132
133/// Coarse routing class for a `MessageKind`.
134///
135/// The numeric ranges in the wire spec (0x01..0x0F data plane,
136/// 0x10..0x1F handshake/lifecycle, 0x20..0x3F control plane) are
137/// turned into a typed catalog so dispatch sites can interrogate
138/// a kind's role without re-implementing the comment-grouped match
139/// arms. `Streamed` is split out from `DataPlane` for kinds that
140/// describe an in-flight stream envelope rather than a request/reply.
141#[derive(Debug, Clone, Copy, PartialEq, Eq)]
142pub enum MessageClass {
143    DataPlane,
144    Handshake,
145    ControlPlane,
146    Streamed,
147}
148
149/// Who is allowed to put this kind on the wire.
150///
151/// The handshake and lifecycle frames split cleanly between the two
152/// peers (Hello is client→server, HelloAck is server→client, etc.);
153/// the data-plane request/reply pairs follow the same split. `Both`
154/// is reserved for symmetric frames such as `Bye` (either side may
155/// initiate the disconnect).
156#[derive(Debug, Clone, Copy, PartialEq, Eq)]
157pub enum MessageDirection {
158    ClientToServer,
159    ServerToClient,
160    Both,
161}
162
163impl MessageKind {
164    /// Routing class derived from the comment-grouped wire ranges.
165    pub fn class(&self) -> MessageClass {
166        match self {
167            // 0x01..0x0F — data plane request/reply pairs. The
168            // BulkStream* family is in this range for backward
169            // compatibility but is reclassified as `Streamed` so
170            // dispatch can treat it as a long-running envelope.
171            Self::Query
172            | Self::Result
173            | Self::Error
174            | Self::BulkInsert
175            | Self::BulkOk
176            | Self::BulkInsertBinary
177            | Self::QueryBinary
178            | Self::BulkInsertPrevalidated
179            | Self::Prepare
180            | Self::PreparedOk
181            | Self::ExecutePrepared
182            | Self::Get
183            | Self::Delete
184            | Self::DeleteOk
185            | Self::VectorSearch
186            | Self::GraphTraverse
187            | Self::QueryWithParams => MessageClass::DataPlane,
188
189            // BulkStream* + RowDescription/StreamEnd describe an
190            // in-flight stream rather than a single round trip.
191            // OpenStream / OpenAck / StreamChunk / StreamError /
192            // StreamCancel (issue #762) also describe an in-flight
193            // multiplexed stream and share the same class.
194            Self::BulkStreamStart
195            | Self::BulkStreamRows
196            | Self::BulkStreamCommit
197            | Self::BulkStreamAck
198            | Self::RowDescription
199            | Self::StreamEnd
200            | Self::OpenStream
201            | Self::OpenAck
202            | Self::StreamChunk
203            | Self::StreamError
204            | Self::StreamCancel
205            // Live queue-wait envelopes (issue #917) describe an
206            // in-flight subscription multiplexed by `stream_id`.
207            | Self::QueueWaitOpen
208            | Self::QueueEventPush
209            // Live queue-wait timeout push (issue #919) — same streamed
210            // class, multiplexed by `stream_id`.
211            | Self::QueueWaitTimeout => MessageClass::Streamed,
212
213            // 0x10..0x1F — handshake / lifecycle.
214            Self::Hello
215            | Self::HelloAck
216            | Self::AuthRequest
217            | Self::AuthResponse
218            | Self::AuthOk
219            | Self::AuthFail
220            | Self::Bye
221            | Self::Ping
222            | Self::Pong => MessageClass::Handshake,
223
224            // 0x20..0x3F — control plane.
225            Self::Cancel | Self::Compress | Self::SetSession | Self::Notice => {
226                MessageClass::ControlPlane
227            }
228        }
229    }
230
231    /// Bitset of `Flags` values this kind may legitimately carry.
232    ///
233    /// Pinned conservatively: `MORE_FRAMES` is universal (any frame
234    /// may be split), but `COMPRESSED` is whitelisted only on kinds
235    /// whose payloads are big enough to benefit from compression.
236    /// Handshake/lifecycle payloads (Hello, AuthRequest, Ping, …)
237    /// are tiny and stay uncompressed today; future contributors
238    /// who want to flip that decision must update both the matrix
239    /// and the unit tests that pin it.
240    pub fn allowed_flags(&self) -> Flags {
241        match self {
242            // Handshake / lifecycle — tiny payloads, never
243            // compressed today.
244            Self::Hello
245            | Self::HelloAck
246            | Self::AuthRequest
247            | Self::AuthResponse
248            | Self::AuthOk
249            | Self::AuthFail
250            | Self::Bye
251            | Self::Ping
252            | Self::Pong => Flags::MORE_FRAMES,
253
254            // Everything else may carry both documented flags.
255            _ => Flags::COMPRESSED.insert(Flags::MORE_FRAMES),
256        }
257    }
258
259    /// `true` when this kind belongs to the handshake/lifecycle group
260    /// (Hello, AuthRequest, AuthOk, …, Bye, Ping, Pong). Equivalent to
261    /// `class() == MessageClass::Handshake` and exists so dispatch sites
262    /// can read the predicate without importing `MessageClass`.
263    pub fn is_handshake(&self) -> bool {
264        matches!(self.class(), MessageClass::Handshake)
265    }
266
267    /// `true` when every flag bit in `flags` is in `allowed_flags()`.
268    /// The catalog is the single source of truth for which flag bits a
269    /// kind may carry; both the codec (decode side) and the builder
270    /// (encode side) consult this so a misframed frame fails at the
271    /// boundary rather than reaching the dispatch arms.
272    pub fn permits_flags(&self, flags: Flags) -> bool {
273        let allowed = self.allowed_flags().bits();
274        (flags.bits() & !allowed) == 0
275    }
276
277    /// Which peer is allowed to originate this kind.
278    pub fn direction(&self) -> MessageDirection {
279        match self {
280            // Client-originated requests.
281            Self::Hello
282            | Self::AuthResponse
283            | Self::Query
284            | Self::QueryBinary
285            | Self::BulkInsert
286            | Self::BulkInsertBinary
287            | Self::BulkInsertPrevalidated
288            | Self::BulkStreamStart
289            | Self::BulkStreamRows
290            | Self::BulkStreamCommit
291            | Self::Prepare
292            | Self::ExecutePrepared
293            | Self::Get
294            | Self::Delete
295            | Self::Cancel
296            | Self::Compress
297            | Self::SetSession
298            | Self::VectorSearch
299            | Self::GraphTraverse
300            | Self::QueryWithParams
301            | Self::OpenStream
302            | Self::StreamCancel
303            // Client opens a live queue wait (issue #917).
304            | Self::QueueWaitOpen => MessageDirection::ClientToServer,
305
306            // `StreamChunk` is symmetric (issue #764 / PRD #759 S5):
307            // the server emits chunks on an *output* stream, and the
308            // client emits chunks of rows on an *input* stream. Both
309            // are routed by the frame's `stream_id`, so the kind has
310            // to be legal in either direction.
311            Self::StreamChunk => MessageDirection::Both,
312
313            // Server-originated replies / push frames.
314            Self::HelloAck
315            | Self::AuthRequest
316            | Self::AuthOk
317            | Self::AuthFail
318            | Self::Result
319            | Self::Error
320            | Self::BulkOk
321            | Self::BulkStreamAck
322            | Self::PreparedOk
323            | Self::DeleteOk
324            | Self::Notice
325            | Self::RowDescription
326            | Self::StreamEnd
327            | Self::OpenAck
328            | Self::StreamError
329            // Server pushes the delivered queue message (issue #917)
330            // and the distinct wait-timeout outcome (issue #919).
331            | Self::QueueEventPush
332            | Self::QueueWaitTimeout => MessageDirection::ServerToClient,
333
334            // Symmetric — either peer may initiate. (`StreamChunk` is
335            // also symmetric but has its own arm above — see issue
336            // #764.)
337            Self::Bye | Self::Ping | Self::Pong => MessageDirection::Both,
338        }
339    }
340
341    pub fn from_u8(byte: u8) -> Option<Self> {
342        match byte {
343            0x01 => Some(Self::Query),
344            0x02 => Some(Self::Result),
345            0x03 => Some(Self::Error),
346            0x04 => Some(Self::BulkInsert),
347            0x05 => Some(Self::BulkOk),
348            0x06 => Some(Self::BulkInsertBinary),
349            0x07 => Some(Self::QueryBinary),
350            0x08 => Some(Self::BulkInsertPrevalidated),
351            0x09 => Some(Self::BulkStreamStart),
352            0x0A => Some(Self::BulkStreamRows),
353            0x0B => Some(Self::BulkStreamCommit),
354            0x0C => Some(Self::BulkStreamAck),
355            0x0D => Some(Self::Prepare),
356            0x0E => Some(Self::PreparedOk),
357            0x0F => Some(Self::ExecutePrepared),
358            0x10 => Some(Self::Hello),
359            0x11 => Some(Self::HelloAck),
360            0x12 => Some(Self::AuthRequest),
361            0x13 => Some(Self::AuthResponse),
362            0x14 => Some(Self::AuthOk),
363            0x15 => Some(Self::AuthFail),
364            0x16 => Some(Self::Bye),
365            0x17 => Some(Self::Ping),
366            0x18 => Some(Self::Pong),
367            0x19 => Some(Self::Get),
368            0x1A => Some(Self::Delete),
369            0x1B => Some(Self::DeleteOk),
370            0x20 => Some(Self::Cancel),
371            0x21 => Some(Self::Compress),
372            0x22 => Some(Self::SetSession),
373            0x23 => Some(Self::Notice),
374            0x24 => Some(Self::RowDescription),
375            0x25 => Some(Self::StreamEnd),
376            0x26 => Some(Self::VectorSearch),
377            0x27 => Some(Self::GraphTraverse),
378            0x28 => Some(Self::QueryWithParams),
379            0x29 => Some(Self::OpenStream),
380            0x2A => Some(Self::OpenAck),
381            0x2B => Some(Self::StreamChunk),
382            0x2C => Some(Self::StreamError),
383            0x2D => Some(Self::StreamCancel),
384            0x2E => Some(Self::QueueWaitOpen),
385            0x2F => Some(Self::QueueEventPush),
386            0x30 => Some(Self::QueueWaitTimeout),
387            _ => None,
388        }
389    }
390}
391
392#[derive(Debug, Clone, Copy, PartialEq, Eq)]
393pub struct Flags(u8);
394
395impl Flags {
396    pub const COMPRESSED: Self = Self(0b0000_0001);
397    pub const MORE_FRAMES: Self = Self(0b0000_0010);
398
399    pub const fn empty() -> Self {
400        Self(0)
401    }
402
403    pub const fn bits(self) -> u8 {
404        self.0
405    }
406
407    pub const fn from_bits(bits: u8) -> Self {
408        Self(bits)
409    }
410
411    pub const fn contains(self, other: Self) -> bool {
412        (self.0 & other.0) == other.0
413    }
414
415    pub const fn insert(self, other: Self) -> Self {
416        Self(self.0 | other.0)
417    }
418}
419
420impl std::ops::BitOr for Flags {
421    type Output = Self;
422    fn bitor(self, rhs: Self) -> Self {
423        self.insert(rhs)
424    }
425}
426
427#[cfg(test)]
428mod catalog_tests {
429    use super::*;
430
431    /// Every kind known to the wire spec — kept in sync with the
432    /// `from_u8` table. New entries must be added here so the
433    /// matrix tests below cover them.
434    const ALL_KINDS: &[MessageKind] = &[
435        MessageKind::Query,
436        MessageKind::Result,
437        MessageKind::Error,
438        MessageKind::BulkInsert,
439        MessageKind::BulkOk,
440        MessageKind::BulkInsertBinary,
441        MessageKind::QueryBinary,
442        MessageKind::BulkInsertPrevalidated,
443        MessageKind::BulkStreamStart,
444        MessageKind::BulkStreamRows,
445        MessageKind::BulkStreamCommit,
446        MessageKind::BulkStreamAck,
447        MessageKind::Prepare,
448        MessageKind::PreparedOk,
449        MessageKind::ExecutePrepared,
450        MessageKind::Hello,
451        MessageKind::HelloAck,
452        MessageKind::AuthRequest,
453        MessageKind::AuthResponse,
454        MessageKind::AuthOk,
455        MessageKind::AuthFail,
456        MessageKind::Bye,
457        MessageKind::Ping,
458        MessageKind::Pong,
459        MessageKind::Get,
460        MessageKind::Delete,
461        MessageKind::DeleteOk,
462        MessageKind::Cancel,
463        MessageKind::Compress,
464        MessageKind::SetSession,
465        MessageKind::Notice,
466        MessageKind::RowDescription,
467        MessageKind::StreamEnd,
468        MessageKind::VectorSearch,
469        MessageKind::GraphTraverse,
470        MessageKind::QueryWithParams,
471        MessageKind::OpenStream,
472        MessageKind::OpenAck,
473        MessageKind::StreamChunk,
474        MessageKind::StreamError,
475        MessageKind::StreamCancel,
476        MessageKind::QueueWaitOpen,
477        MessageKind::QueueEventPush,
478        MessageKind::QueueWaitTimeout,
479    ];
480
481    #[test]
482    fn class_matrix_is_pinned() {
483        // Handshake / lifecycle (0x10..0x1F minus Get/Delete/DeleteOk
484        // which are data plane despite the historic numbering).
485        assert_eq!(MessageKind::Hello.class(), MessageClass::Handshake);
486        assert_eq!(MessageKind::HelloAck.class(), MessageClass::Handshake);
487        assert_eq!(MessageKind::AuthRequest.class(), MessageClass::Handshake);
488        assert_eq!(MessageKind::AuthResponse.class(), MessageClass::Handshake);
489        assert_eq!(MessageKind::AuthOk.class(), MessageClass::Handshake);
490        assert_eq!(MessageKind::AuthFail.class(), MessageClass::Handshake);
491        assert_eq!(MessageKind::Bye.class(), MessageClass::Handshake);
492        assert_eq!(MessageKind::Ping.class(), MessageClass::Handshake);
493        assert_eq!(MessageKind::Pong.class(), MessageClass::Handshake);
494
495        // Data plane.
496        assert_eq!(MessageKind::Query.class(), MessageClass::DataPlane);
497        assert_eq!(MessageKind::Result.class(), MessageClass::DataPlane);
498        assert_eq!(MessageKind::BulkInsert.class(), MessageClass::DataPlane);
499        assert_eq!(MessageKind::Get.class(), MessageClass::DataPlane);
500        assert_eq!(MessageKind::Delete.class(), MessageClass::DataPlane);
501        assert_eq!(MessageKind::DeleteOk.class(), MessageClass::DataPlane);
502        assert_eq!(MessageKind::VectorSearch.class(), MessageClass::DataPlane);
503        assert_eq!(MessageKind::GraphTraverse.class(), MessageClass::DataPlane);
504        assert_eq!(
505            MessageKind::QueryWithParams.class(),
506            MessageClass::DataPlane
507        );
508
509        // Streamed envelopes.
510        assert_eq!(MessageKind::BulkStreamStart.class(), MessageClass::Streamed);
511        assert_eq!(MessageKind::BulkStreamRows.class(), MessageClass::Streamed);
512        assert_eq!(
513            MessageKind::BulkStreamCommit.class(),
514            MessageClass::Streamed
515        );
516        assert_eq!(MessageKind::BulkStreamAck.class(), MessageClass::Streamed);
517        assert_eq!(MessageKind::RowDescription.class(), MessageClass::Streamed);
518        assert_eq!(MessageKind::StreamEnd.class(), MessageClass::Streamed);
519
520        // Output stream lifecycle envelopes (issue #762). All in
521        // the Streamed class — they describe in-flight multiplexed
522        // streams identified by the frame's `stream_id`.
523        assert_eq!(MessageKind::OpenStream.class(), MessageClass::Streamed);
524        assert_eq!(MessageKind::OpenAck.class(), MessageClass::Streamed);
525        assert_eq!(MessageKind::StreamChunk.class(), MessageClass::Streamed);
526        assert_eq!(MessageKind::StreamError.class(), MessageClass::Streamed);
527        assert_eq!(MessageKind::StreamCancel.class(), MessageClass::Streamed);
528
529        // Live queue-wait envelopes (issue #917) — Streamed class,
530        // multiplexed by `stream_id` like the output-stream family.
531        assert_eq!(MessageKind::QueueWaitOpen.class(), MessageClass::Streamed);
532        assert_eq!(MessageKind::QueueEventPush.class(), MessageClass::Streamed);
533        // Live queue-wait timeout push (issue #919).
534        assert_eq!(
535            MessageKind::QueueWaitTimeout.class(),
536            MessageClass::Streamed
537        );
538
539        // Control plane.
540        assert_eq!(MessageKind::Cancel.class(), MessageClass::ControlPlane);
541        assert_eq!(MessageKind::Compress.class(), MessageClass::ControlPlane);
542        assert_eq!(MessageKind::SetSession.class(), MessageClass::ControlPlane);
543        assert_eq!(MessageKind::Notice.class(), MessageClass::ControlPlane);
544
545        // Coverage check — every catalogued kind has a class.
546        for k in ALL_KINDS {
547            let _ = k.class();
548        }
549    }
550
551    #[test]
552    fn allowed_flags_matrix_is_pinned() {
553        // Handshake / lifecycle: MORE_FRAMES only — no COMPRESSED on
554        // tiny control-frame payloads. Flipping this requires updating
555        // the matrix below in lockstep.
556        let handshake = [
557            MessageKind::Hello,
558            MessageKind::HelloAck,
559            MessageKind::AuthRequest,
560            MessageKind::AuthResponse,
561            MessageKind::AuthOk,
562            MessageKind::AuthFail,
563            MessageKind::Bye,
564            MessageKind::Ping,
565            MessageKind::Pong,
566        ];
567        for k in handshake {
568            let f = k.allowed_flags();
569            assert!(
570                f.contains(Flags::MORE_FRAMES),
571                "{k:?} must allow MORE_FRAMES"
572            );
573            assert!(
574                !f.contains(Flags::COMPRESSED),
575                "{k:?} must NOT allow COMPRESSED today"
576            );
577        }
578
579        // Everything else: both documented flags allowed.
580        for k in ALL_KINDS {
581            if handshake.contains(k) {
582                continue;
583            }
584            let f = k.allowed_flags();
585            assert!(
586                f.contains(Flags::MORE_FRAMES),
587                "{k:?} must allow MORE_FRAMES"
588            );
589            assert!(f.contains(Flags::COMPRESSED), "{k:?} must allow COMPRESSED");
590        }
591    }
592
593    #[test]
594    fn every_kind_has_unique_byte_value() {
595        // The byte value is the wire spec — two kinds sharing a value
596        // would silently corrupt dispatch. The catalog must reject it.
597        let mut seen = std::collections::HashSet::new();
598        for k in ALL_KINDS {
599            let byte = *k as u8;
600            assert!(
601                seen.insert(byte),
602                "byte 0x{byte:02x} reused by {k:?}; catalog has a duplicate"
603            );
604        }
605    }
606
607    #[test]
608    fn from_u8_round_trips_for_every_kind() {
609        for k in ALL_KINDS {
610            let byte = *k as u8;
611            let decoded = MessageKind::from_u8(byte).unwrap_or_else(|| {
612                panic!("from_u8 returned None for catalog entry {k:?} (0x{byte:02x})")
613            });
614            assert_eq!(
615                decoded, *k,
616                "from_u8(0x{byte:02x}) must round-trip back to {k:?}"
617            );
618        }
619    }
620
621    #[test]
622    fn permits_flags_matches_allowed_flags() {
623        // Handshake kinds reject COMPRESSED, accept MORE_FRAMES.
624        assert!(MessageKind::Ping.permits_flags(Flags::MORE_FRAMES));
625        assert!(MessageKind::Ping.permits_flags(Flags::empty()));
626        assert!(!MessageKind::Ping.permits_flags(Flags::COMPRESSED));
627        assert!(!MessageKind::Ping.permits_flags(Flags::COMPRESSED | Flags::MORE_FRAMES));
628
629        // Streamed kinds accept both documented bits — the
630        // MORE_FRAMES invariant for in-flight stream envelopes is
631        // declared here through `allowed_flags`.
632        assert!(MessageKind::BulkStreamRows.permits_flags(Flags::MORE_FRAMES));
633        assert!(MessageKind::BulkStreamRows.permits_flags(Flags::COMPRESSED));
634        assert!(MessageKind::RowDescription.permits_flags(Flags::MORE_FRAMES));
635        assert!(MessageKind::StreamEnd.permits_flags(Flags::MORE_FRAMES));
636    }
637
638    #[test]
639    fn direction_matrix_is_pinned() {
640        // Client → Server.
641        for k in [
642            MessageKind::Hello,
643            MessageKind::AuthResponse,
644            MessageKind::Query,
645            MessageKind::QueryBinary,
646            MessageKind::BulkInsert,
647            MessageKind::BulkInsertBinary,
648            MessageKind::BulkInsertPrevalidated,
649            MessageKind::BulkStreamStart,
650            MessageKind::BulkStreamRows,
651            MessageKind::BulkStreamCommit,
652            MessageKind::Prepare,
653            MessageKind::ExecutePrepared,
654            MessageKind::Get,
655            MessageKind::Delete,
656            MessageKind::Cancel,
657            MessageKind::Compress,
658            MessageKind::SetSession,
659            MessageKind::VectorSearch,
660            MessageKind::GraphTraverse,
661            MessageKind::QueryWithParams,
662            MessageKind::OpenStream,
663            MessageKind::StreamCancel,
664            MessageKind::QueueWaitOpen,
665        ] {
666            assert_eq!(
667                k.direction(),
668                MessageDirection::ClientToServer,
669                "{k:?} should be client-originated"
670            );
671        }
672
673        // Server → Client.
674        for k in [
675            MessageKind::HelloAck,
676            MessageKind::AuthRequest,
677            MessageKind::AuthOk,
678            MessageKind::AuthFail,
679            MessageKind::Result,
680            MessageKind::Error,
681            MessageKind::BulkOk,
682            MessageKind::BulkStreamAck,
683            MessageKind::PreparedOk,
684            MessageKind::DeleteOk,
685            MessageKind::Notice,
686            MessageKind::RowDescription,
687            MessageKind::StreamEnd,
688            MessageKind::OpenAck,
689            MessageKind::StreamError,
690            MessageKind::QueueEventPush,
691            MessageKind::QueueWaitTimeout,
692        ] {
693            assert_eq!(
694                k.direction(),
695                MessageDirection::ServerToClient,
696                "{k:?} should be server-originated"
697            );
698        }
699
700        // Symmetric. `StreamChunk` (issue #764) is symmetric: the
701        // server emits it on output streams, the client emits it on
702        // input streams.
703        for k in [
704            MessageKind::Bye,
705            MessageKind::Ping,
706            MessageKind::Pong,
707            MessageKind::StreamChunk,
708        ] {
709            assert_eq!(
710                k.direction(),
711                MessageDirection::Both,
712                "{k:?} should be symmetric"
713            );
714        }
715    }
716}