Skip to main content

reddb_wire/redwire/
frame.rs

1//! RedWire frame layout — 16-byte header + payload, little-endian.
2//!
3//! ```text
4//! ┌──────────────────────────────────────────────────────────┐
5//! │ Header (16 bytes)                                         │
6//! │   u32   length         total frame size, incl. header     │
7//! │   u8    kind           MessageKind                         │
8//! │   u8    flags          COMPRESSED | MORE_FRAMES | …        │
9//! │   u16   stream_id      0 = unsolicited; otherwise multiplex│
10//! │   u64   correlation_id request↔response pairing           │
11//! ├──────────────────────────────────────────────────────────┤
12//! │ Payload (length - 16 bytes)                               │
13//! └──────────────────────────────────────────────────────────┘
14//! ```
15//!
16//! Data-plane kinds live at 0x01..0x0F; handshake / lifecycle at
17//! 0x10..0x1F; control plane at 0x20..0x3F.
18
19pub const FRAME_HEADER_SIZE: usize = 16;
20pub const MAX_FRAME_SIZE: u32 = 16 * 1024 * 1024;
21
22#[derive(Debug, Clone, PartialEq, Eq)]
23pub struct Frame {
24    pub kind: MessageKind,
25    pub flags: Flags,
26    pub stream_id: u16,
27    pub correlation_id: u64,
28    pub payload: Vec<u8>,
29}
30
31impl Frame {
32    pub fn new(kind: MessageKind, correlation_id: u64, payload: Vec<u8>) -> Self {
33        Self {
34            kind,
35            flags: Flags::empty(),
36            stream_id: 0,
37            correlation_id,
38            payload,
39        }
40    }
41
42    pub fn with_stream(mut self, stream_id: u16) -> Self {
43        self.stream_id = stream_id;
44        self
45    }
46
47    pub fn with_flags(mut self, flags: Flags) -> Self {
48        self.flags = flags;
49        self
50    }
51
52    pub fn encoded_len(&self) -> u32 {
53        (FRAME_HEADER_SIZE + self.payload.len()) as u32
54    }
55}
56
57/// Single-byte message-kind discriminator. Numeric values are
58/// part of the wire spec — never repurpose a value once shipped.
59#[derive(Debug, Clone, Copy, PartialEq, Eq)]
60#[repr(u8)]
61pub enum MessageKind {
62    // Data-plane codes.
63    Query = 0x01,
64    Result = 0x02,
65    Error = 0x03,
66    BulkInsert = 0x04,
67    BulkOk = 0x05,
68    BulkInsertBinary = 0x06,
69    QueryBinary = 0x07,
70    BulkInsertPrevalidated = 0x08,
71    BulkStreamStart = 0x09,
72    BulkStreamRows = 0x0A,
73    BulkStreamCommit = 0x0B,
74    BulkStreamAck = 0x0C,
75    Prepare = 0x0D,
76    PreparedOk = 0x0E,
77    ExecutePrepared = 0x0F,
78
79    // Handshake / lifecycle.
80    Hello = 0x10,
81    HelloAck = 0x11,
82    AuthRequest = 0x12,
83    AuthResponse = 0x13,
84    AuthOk = 0x14,
85    AuthFail = 0x15,
86    Bye = 0x16,
87    Ping = 0x17,
88    Pong = 0x18,
89    Get = 0x19,
90    Delete = 0x1A,
91    DeleteOk = 0x1B,
92
93    // Control plane.
94    Cancel = 0x20,
95    Compress = 0x21,
96    SetSession = 0x22,
97    Notice = 0x23,
98
99    // Streamed responses.
100    RowDescription = 0x24,
101    StreamEnd = 0x25,
102
103    // RedDB-native data plane.
104    VectorSearch = 0x26,
105    GraphTraverse = 0x27,
106    QueryWithParams = 0x28,
107
108    // Output stream lifecycle (issue #762 / PRD #759 S3). Streamed
109    // class — these envelopes describe an in-flight multiplexed
110    // stream over the existing `stream_id` field.
111    OpenStream = 0x29,
112    OpenAck = 0x2A,
113    StreamChunk = 0x2B,
114    StreamError = 0x2C,
115    StreamCancel = 0x2D,
116}
117
118/// Coarse routing class for a `MessageKind`.
119///
120/// The numeric ranges in the wire spec (0x01..0x0F data plane,
121/// 0x10..0x1F handshake/lifecycle, 0x20..0x3F control plane) are
122/// turned into a typed catalog so dispatch sites can interrogate
123/// a kind's role without re-implementing the comment-grouped match
124/// arms. `Streamed` is split out from `DataPlane` for kinds that
125/// describe an in-flight stream envelope rather than a request/reply.
126#[derive(Debug, Clone, Copy, PartialEq, Eq)]
127pub enum MessageClass {
128    DataPlane,
129    Handshake,
130    ControlPlane,
131    Streamed,
132}
133
134/// Who is allowed to put this kind on the wire.
135///
136/// The handshake and lifecycle frames split cleanly between the two
137/// peers (Hello is client→server, HelloAck is server→client, etc.);
138/// the data-plane request/reply pairs follow the same split. `Both`
139/// is reserved for symmetric frames such as `Bye` (either side may
140/// initiate the disconnect).
141#[derive(Debug, Clone, Copy, PartialEq, Eq)]
142pub enum MessageDirection {
143    ClientToServer,
144    ServerToClient,
145    Both,
146}
147
148impl MessageKind {
149    /// Routing class derived from the comment-grouped wire ranges.
150    pub fn class(&self) -> MessageClass {
151        match self {
152            // 0x01..0x0F — data plane request/reply pairs. The
153            // BulkStream* family is in this range for backward
154            // compatibility but is reclassified as `Streamed` so
155            // dispatch can treat it as a long-running envelope.
156            Self::Query
157            | Self::Result
158            | Self::Error
159            | Self::BulkInsert
160            | Self::BulkOk
161            | Self::BulkInsertBinary
162            | Self::QueryBinary
163            | Self::BulkInsertPrevalidated
164            | Self::Prepare
165            | Self::PreparedOk
166            | Self::ExecutePrepared
167            | Self::Get
168            | Self::Delete
169            | Self::DeleteOk
170            | Self::VectorSearch
171            | Self::GraphTraverse
172            | Self::QueryWithParams => MessageClass::DataPlane,
173
174            // BulkStream* + RowDescription/StreamEnd describe an
175            // in-flight stream rather than a single round trip.
176            // OpenStream / OpenAck / StreamChunk / StreamError /
177            // StreamCancel (issue #762) also describe an in-flight
178            // multiplexed stream and share the same class.
179            Self::BulkStreamStart
180            | Self::BulkStreamRows
181            | Self::BulkStreamCommit
182            | Self::BulkStreamAck
183            | Self::RowDescription
184            | Self::StreamEnd
185            | Self::OpenStream
186            | Self::OpenAck
187            | Self::StreamChunk
188            | Self::StreamError
189            | Self::StreamCancel => MessageClass::Streamed,
190
191            // 0x10..0x1F — handshake / lifecycle.
192            Self::Hello
193            | Self::HelloAck
194            | Self::AuthRequest
195            | Self::AuthResponse
196            | Self::AuthOk
197            | Self::AuthFail
198            | Self::Bye
199            | Self::Ping
200            | Self::Pong => MessageClass::Handshake,
201
202            // 0x20..0x3F — control plane.
203            Self::Cancel | Self::Compress | Self::SetSession | Self::Notice => {
204                MessageClass::ControlPlane
205            }
206        }
207    }
208
209    /// Bitset of `Flags` values this kind may legitimately carry.
210    ///
211    /// Pinned conservatively: `MORE_FRAMES` is universal (any frame
212    /// may be split), but `COMPRESSED` is whitelisted only on kinds
213    /// whose payloads are big enough to benefit from compression.
214    /// Handshake/lifecycle payloads (Hello, AuthRequest, Ping, …)
215    /// are tiny and stay uncompressed today; future contributors
216    /// who want to flip that decision must update both the matrix
217    /// and the unit tests that pin it.
218    pub fn allowed_flags(&self) -> Flags {
219        match self {
220            // Handshake / lifecycle — tiny payloads, never
221            // compressed today.
222            Self::Hello
223            | Self::HelloAck
224            | Self::AuthRequest
225            | Self::AuthResponse
226            | Self::AuthOk
227            | Self::AuthFail
228            | Self::Bye
229            | Self::Ping
230            | Self::Pong => Flags::MORE_FRAMES,
231
232            // Everything else may carry both documented flags.
233            _ => Flags::COMPRESSED.insert(Flags::MORE_FRAMES),
234        }
235    }
236
237    /// `true` when this kind belongs to the handshake/lifecycle group
238    /// (Hello, AuthRequest, AuthOk, …, Bye, Ping, Pong). Equivalent to
239    /// `class() == MessageClass::Handshake` and exists so dispatch sites
240    /// can read the predicate without importing `MessageClass`.
241    pub fn is_handshake(&self) -> bool {
242        matches!(self.class(), MessageClass::Handshake)
243    }
244
245    /// `true` when every flag bit in `flags` is in `allowed_flags()`.
246    /// The catalog is the single source of truth for which flag bits a
247    /// kind may carry; both the codec (decode side) and the builder
248    /// (encode side) consult this so a misframed frame fails at the
249    /// boundary rather than reaching the dispatch arms.
250    pub fn permits_flags(&self, flags: Flags) -> bool {
251        let allowed = self.allowed_flags().bits();
252        (flags.bits() & !allowed) == 0
253    }
254
255    /// Which peer is allowed to originate this kind.
256    pub fn direction(&self) -> MessageDirection {
257        match self {
258            // Client-originated requests.
259            Self::Hello
260            | Self::AuthResponse
261            | Self::Query
262            | Self::QueryBinary
263            | Self::BulkInsert
264            | Self::BulkInsertBinary
265            | Self::BulkInsertPrevalidated
266            | Self::BulkStreamStart
267            | Self::BulkStreamRows
268            | Self::BulkStreamCommit
269            | Self::Prepare
270            | Self::ExecutePrepared
271            | Self::Get
272            | Self::Delete
273            | Self::Cancel
274            | Self::Compress
275            | Self::SetSession
276            | Self::VectorSearch
277            | Self::GraphTraverse
278            | Self::QueryWithParams
279            | Self::OpenStream
280            | Self::StreamCancel => MessageDirection::ClientToServer,
281
282            // `StreamChunk` is symmetric (issue #764 / PRD #759 S5):
283            // the server emits chunks on an *output* stream, and the
284            // client emits chunks of rows on an *input* stream. Both
285            // are routed by the frame's `stream_id`, so the kind has
286            // to be legal in either direction.
287            Self::StreamChunk => MessageDirection::Both,
288
289            // Server-originated replies / push frames.
290            Self::HelloAck
291            | Self::AuthRequest
292            | Self::AuthOk
293            | Self::AuthFail
294            | Self::Result
295            | Self::Error
296            | Self::BulkOk
297            | Self::BulkStreamAck
298            | Self::PreparedOk
299            | Self::DeleteOk
300            | Self::Notice
301            | Self::RowDescription
302            | Self::StreamEnd
303            | Self::OpenAck
304            | Self::StreamError => MessageDirection::ServerToClient,
305
306            // Symmetric — either peer may initiate. (`StreamChunk` is
307            // also symmetric but has its own arm above — see issue
308            // #764.)
309            Self::Bye | Self::Ping | Self::Pong => MessageDirection::Both,
310        }
311    }
312
313    pub fn from_u8(byte: u8) -> Option<Self> {
314        match byte {
315            0x01 => Some(Self::Query),
316            0x02 => Some(Self::Result),
317            0x03 => Some(Self::Error),
318            0x04 => Some(Self::BulkInsert),
319            0x05 => Some(Self::BulkOk),
320            0x06 => Some(Self::BulkInsertBinary),
321            0x07 => Some(Self::QueryBinary),
322            0x08 => Some(Self::BulkInsertPrevalidated),
323            0x09 => Some(Self::BulkStreamStart),
324            0x0A => Some(Self::BulkStreamRows),
325            0x0B => Some(Self::BulkStreamCommit),
326            0x0C => Some(Self::BulkStreamAck),
327            0x0D => Some(Self::Prepare),
328            0x0E => Some(Self::PreparedOk),
329            0x0F => Some(Self::ExecutePrepared),
330            0x10 => Some(Self::Hello),
331            0x11 => Some(Self::HelloAck),
332            0x12 => Some(Self::AuthRequest),
333            0x13 => Some(Self::AuthResponse),
334            0x14 => Some(Self::AuthOk),
335            0x15 => Some(Self::AuthFail),
336            0x16 => Some(Self::Bye),
337            0x17 => Some(Self::Ping),
338            0x18 => Some(Self::Pong),
339            0x19 => Some(Self::Get),
340            0x1A => Some(Self::Delete),
341            0x1B => Some(Self::DeleteOk),
342            0x20 => Some(Self::Cancel),
343            0x21 => Some(Self::Compress),
344            0x22 => Some(Self::SetSession),
345            0x23 => Some(Self::Notice),
346            0x24 => Some(Self::RowDescription),
347            0x25 => Some(Self::StreamEnd),
348            0x26 => Some(Self::VectorSearch),
349            0x27 => Some(Self::GraphTraverse),
350            0x28 => Some(Self::QueryWithParams),
351            0x29 => Some(Self::OpenStream),
352            0x2A => Some(Self::OpenAck),
353            0x2B => Some(Self::StreamChunk),
354            0x2C => Some(Self::StreamError),
355            0x2D => Some(Self::StreamCancel),
356            _ => None,
357        }
358    }
359}
360
361#[derive(Debug, Clone, Copy, PartialEq, Eq)]
362pub struct Flags(u8);
363
364impl Flags {
365    pub const COMPRESSED: Self = Self(0b0000_0001);
366    pub const MORE_FRAMES: Self = Self(0b0000_0010);
367
368    pub const fn empty() -> Self {
369        Self(0)
370    }
371
372    pub const fn bits(self) -> u8 {
373        self.0
374    }
375
376    pub const fn from_bits(bits: u8) -> Self {
377        Self(bits)
378    }
379
380    pub const fn contains(self, other: Self) -> bool {
381        (self.0 & other.0) == other.0
382    }
383
384    pub const fn insert(self, other: Self) -> Self {
385        Self(self.0 | other.0)
386    }
387}
388
389impl std::ops::BitOr for Flags {
390    type Output = Self;
391    fn bitor(self, rhs: Self) -> Self {
392        self.insert(rhs)
393    }
394}
395
396#[cfg(test)]
397mod catalog_tests {
398    use super::*;
399
400    /// Every kind known to the wire spec — kept in sync with the
401    /// `from_u8` table. New entries must be added here so the
402    /// matrix tests below cover them.
403    const ALL_KINDS: &[MessageKind] = &[
404        MessageKind::Query,
405        MessageKind::Result,
406        MessageKind::Error,
407        MessageKind::BulkInsert,
408        MessageKind::BulkOk,
409        MessageKind::BulkInsertBinary,
410        MessageKind::QueryBinary,
411        MessageKind::BulkInsertPrevalidated,
412        MessageKind::BulkStreamStart,
413        MessageKind::BulkStreamRows,
414        MessageKind::BulkStreamCommit,
415        MessageKind::BulkStreamAck,
416        MessageKind::Prepare,
417        MessageKind::PreparedOk,
418        MessageKind::ExecutePrepared,
419        MessageKind::Hello,
420        MessageKind::HelloAck,
421        MessageKind::AuthRequest,
422        MessageKind::AuthResponse,
423        MessageKind::AuthOk,
424        MessageKind::AuthFail,
425        MessageKind::Bye,
426        MessageKind::Ping,
427        MessageKind::Pong,
428        MessageKind::Get,
429        MessageKind::Delete,
430        MessageKind::DeleteOk,
431        MessageKind::Cancel,
432        MessageKind::Compress,
433        MessageKind::SetSession,
434        MessageKind::Notice,
435        MessageKind::RowDescription,
436        MessageKind::StreamEnd,
437        MessageKind::VectorSearch,
438        MessageKind::GraphTraverse,
439        MessageKind::QueryWithParams,
440        MessageKind::OpenStream,
441        MessageKind::OpenAck,
442        MessageKind::StreamChunk,
443        MessageKind::StreamError,
444        MessageKind::StreamCancel,
445    ];
446
447    #[test]
448    fn class_matrix_is_pinned() {
449        // Handshake / lifecycle (0x10..0x1F minus Get/Delete/DeleteOk
450        // which are data plane despite the historic numbering).
451        assert_eq!(MessageKind::Hello.class(), MessageClass::Handshake);
452        assert_eq!(MessageKind::HelloAck.class(), MessageClass::Handshake);
453        assert_eq!(MessageKind::AuthRequest.class(), MessageClass::Handshake);
454        assert_eq!(MessageKind::AuthResponse.class(), MessageClass::Handshake);
455        assert_eq!(MessageKind::AuthOk.class(), MessageClass::Handshake);
456        assert_eq!(MessageKind::AuthFail.class(), MessageClass::Handshake);
457        assert_eq!(MessageKind::Bye.class(), MessageClass::Handshake);
458        assert_eq!(MessageKind::Ping.class(), MessageClass::Handshake);
459        assert_eq!(MessageKind::Pong.class(), MessageClass::Handshake);
460
461        // Data plane.
462        assert_eq!(MessageKind::Query.class(), MessageClass::DataPlane);
463        assert_eq!(MessageKind::Result.class(), MessageClass::DataPlane);
464        assert_eq!(MessageKind::BulkInsert.class(), MessageClass::DataPlane);
465        assert_eq!(MessageKind::Get.class(), MessageClass::DataPlane);
466        assert_eq!(MessageKind::Delete.class(), MessageClass::DataPlane);
467        assert_eq!(MessageKind::DeleteOk.class(), MessageClass::DataPlane);
468        assert_eq!(MessageKind::VectorSearch.class(), MessageClass::DataPlane);
469        assert_eq!(MessageKind::GraphTraverse.class(), MessageClass::DataPlane);
470        assert_eq!(
471            MessageKind::QueryWithParams.class(),
472            MessageClass::DataPlane
473        );
474
475        // Streamed envelopes.
476        assert_eq!(MessageKind::BulkStreamStart.class(), MessageClass::Streamed);
477        assert_eq!(MessageKind::BulkStreamRows.class(), MessageClass::Streamed);
478        assert_eq!(
479            MessageKind::BulkStreamCommit.class(),
480            MessageClass::Streamed
481        );
482        assert_eq!(MessageKind::BulkStreamAck.class(), MessageClass::Streamed);
483        assert_eq!(MessageKind::RowDescription.class(), MessageClass::Streamed);
484        assert_eq!(MessageKind::StreamEnd.class(), MessageClass::Streamed);
485
486        // Output stream lifecycle envelopes (issue #762). All in
487        // the Streamed class — they describe in-flight multiplexed
488        // streams identified by the frame's `stream_id`.
489        assert_eq!(MessageKind::OpenStream.class(), MessageClass::Streamed);
490        assert_eq!(MessageKind::OpenAck.class(), MessageClass::Streamed);
491        assert_eq!(MessageKind::StreamChunk.class(), MessageClass::Streamed);
492        assert_eq!(MessageKind::StreamError.class(), MessageClass::Streamed);
493        assert_eq!(MessageKind::StreamCancel.class(), MessageClass::Streamed);
494
495        // Control plane.
496        assert_eq!(MessageKind::Cancel.class(), MessageClass::ControlPlane);
497        assert_eq!(MessageKind::Compress.class(), MessageClass::ControlPlane);
498        assert_eq!(MessageKind::SetSession.class(), MessageClass::ControlPlane);
499        assert_eq!(MessageKind::Notice.class(), MessageClass::ControlPlane);
500
501        // Coverage check — every catalogued kind has a class.
502        for k in ALL_KINDS {
503            let _ = k.class();
504        }
505    }
506
507    #[test]
508    fn allowed_flags_matrix_is_pinned() {
509        // Handshake / lifecycle: MORE_FRAMES only — no COMPRESSED on
510        // tiny control-frame payloads. Flipping this requires updating
511        // the matrix below in lockstep.
512        let handshake = [
513            MessageKind::Hello,
514            MessageKind::HelloAck,
515            MessageKind::AuthRequest,
516            MessageKind::AuthResponse,
517            MessageKind::AuthOk,
518            MessageKind::AuthFail,
519            MessageKind::Bye,
520            MessageKind::Ping,
521            MessageKind::Pong,
522        ];
523        for k in handshake {
524            let f = k.allowed_flags();
525            assert!(
526                f.contains(Flags::MORE_FRAMES),
527                "{k:?} must allow MORE_FRAMES"
528            );
529            assert!(
530                !f.contains(Flags::COMPRESSED),
531                "{k:?} must NOT allow COMPRESSED today"
532            );
533        }
534
535        // Everything else: both documented flags allowed.
536        for k in ALL_KINDS {
537            if handshake.contains(k) {
538                continue;
539            }
540            let f = k.allowed_flags();
541            assert!(
542                f.contains(Flags::MORE_FRAMES),
543                "{k:?} must allow MORE_FRAMES"
544            );
545            assert!(f.contains(Flags::COMPRESSED), "{k:?} must allow COMPRESSED");
546        }
547    }
548
549    #[test]
550    fn every_kind_has_unique_byte_value() {
551        // The byte value is the wire spec — two kinds sharing a value
552        // would silently corrupt dispatch. The catalog must reject it.
553        let mut seen = std::collections::HashSet::new();
554        for k in ALL_KINDS {
555            let byte = *k as u8;
556            assert!(
557                seen.insert(byte),
558                "byte 0x{byte:02x} reused by {k:?}; catalog has a duplicate"
559            );
560        }
561    }
562
563    #[test]
564    fn from_u8_round_trips_for_every_kind() {
565        for k in ALL_KINDS {
566            let byte = *k as u8;
567            let decoded = MessageKind::from_u8(byte).unwrap_or_else(|| {
568                panic!("from_u8 returned None for catalog entry {k:?} (0x{byte:02x})")
569            });
570            assert_eq!(
571                decoded, *k,
572                "from_u8(0x{byte:02x}) must round-trip back to {k:?}"
573            );
574        }
575    }
576
577    #[test]
578    fn permits_flags_matches_allowed_flags() {
579        // Handshake kinds reject COMPRESSED, accept MORE_FRAMES.
580        assert!(MessageKind::Ping.permits_flags(Flags::MORE_FRAMES));
581        assert!(MessageKind::Ping.permits_flags(Flags::empty()));
582        assert!(!MessageKind::Ping.permits_flags(Flags::COMPRESSED));
583        assert!(!MessageKind::Ping.permits_flags(Flags::COMPRESSED | Flags::MORE_FRAMES));
584
585        // Streamed kinds accept both documented bits — the
586        // MORE_FRAMES invariant for in-flight stream envelopes is
587        // declared here through `allowed_flags`.
588        assert!(MessageKind::BulkStreamRows.permits_flags(Flags::MORE_FRAMES));
589        assert!(MessageKind::BulkStreamRows.permits_flags(Flags::COMPRESSED));
590        assert!(MessageKind::RowDescription.permits_flags(Flags::MORE_FRAMES));
591        assert!(MessageKind::StreamEnd.permits_flags(Flags::MORE_FRAMES));
592    }
593
594    #[test]
595    fn direction_matrix_is_pinned() {
596        // Client → Server.
597        for k in [
598            MessageKind::Hello,
599            MessageKind::AuthResponse,
600            MessageKind::Query,
601            MessageKind::QueryBinary,
602            MessageKind::BulkInsert,
603            MessageKind::BulkInsertBinary,
604            MessageKind::BulkInsertPrevalidated,
605            MessageKind::BulkStreamStart,
606            MessageKind::BulkStreamRows,
607            MessageKind::BulkStreamCommit,
608            MessageKind::Prepare,
609            MessageKind::ExecutePrepared,
610            MessageKind::Get,
611            MessageKind::Delete,
612            MessageKind::Cancel,
613            MessageKind::Compress,
614            MessageKind::SetSession,
615            MessageKind::VectorSearch,
616            MessageKind::GraphTraverse,
617            MessageKind::QueryWithParams,
618            MessageKind::OpenStream,
619            MessageKind::StreamCancel,
620        ] {
621            assert_eq!(
622                k.direction(),
623                MessageDirection::ClientToServer,
624                "{k:?} should be client-originated"
625            );
626        }
627
628        // Server → Client.
629        for k in [
630            MessageKind::HelloAck,
631            MessageKind::AuthRequest,
632            MessageKind::AuthOk,
633            MessageKind::AuthFail,
634            MessageKind::Result,
635            MessageKind::Error,
636            MessageKind::BulkOk,
637            MessageKind::BulkStreamAck,
638            MessageKind::PreparedOk,
639            MessageKind::DeleteOk,
640            MessageKind::Notice,
641            MessageKind::RowDescription,
642            MessageKind::StreamEnd,
643            MessageKind::OpenAck,
644            MessageKind::StreamError,
645        ] {
646            assert_eq!(
647                k.direction(),
648                MessageDirection::ServerToClient,
649                "{k:?} should be server-originated"
650            );
651        }
652
653        // Symmetric. `StreamChunk` (issue #764) is symmetric: the
654        // server emits it on output streams, the client emits it on
655        // input streams.
656        for k in [
657            MessageKind::Bye,
658            MessageKind::Ping,
659            MessageKind::Pong,
660            MessageKind::StreamChunk,
661        ] {
662            assert_eq!(
663                k.direction(),
664                MessageDirection::Both,
665                "{k:?} should be symmetric"
666            );
667        }
668    }
669}