Skip to main content

liminal/protocol/
frame.rs

1use super::{
2    causal::MessageId,
3    envelope::{MessageEnvelope, SchemaId},
4    error::ProtocolError,
5    version::ProtocolVersion,
6};
7
8/// Number of bytes in every serialized frame header.
9pub const HEADER_LEN: usize = 10;
10
11/// Frame-flag bit set on a [`Frame::ConversationMessage`] to request a correlated
12/// reply.
13///
14/// A client sets this bit on the request frame of a request-reply round trip. The
15/// server, after delivering the message to the conversation participant, drains
16/// the participant's reply and sends it back as a `ConversationMessage` carrying
17/// the same `conversation_id` (the correlation key) and this same flag bit. A
18/// `ConversationMessage` WITHOUT this bit keeps the pre-existing fire-and-forget
19/// semantics: the server stays silent on success. The bit travels in the frame
20/// header's `flags` byte, which the codec already round-trips, so no wire-format
21/// change is required.
22pub const CONVERSATION_REPLY_REQUESTED_FLAG: u8 = 0x01;
23
24/// Frame-flag bit set on a [`Frame::Publish`] to declare that the frame body
25/// carries a trailing idempotency-key string field (the dedup-on-delivery key).
26///
27/// A publisher sets this bit when it wants the server to consult its dedup cache
28/// keyed by the trailing idempotency key, delivering the message to subscribers
29/// AT MOST ONCE across re-publishes of the same key. A `Publish` frame WITHOUT
30/// this bit keeps the pre-existing wire layout EXACTLY: no trailing field is
31/// written and none is read, so a no-key publish is byte-identical to before.
32/// The bit travels in the frame header's `flags` byte, which the codec already
33/// round-trips, so no header-format change is required (the 13-L0 precedent).
34pub const PUBLISH_IDEMPOTENCY_KEY_FLAG: u8 = 0x02;
35
36/// Frame-flag bit set on a [`Frame::PublishAck`] to report a GENUINE delivery
37/// ack: the published message was accepted by at least one live subscriber on
38/// this publish.
39///
40/// This is distinct from the backpressure `Accept`/`Defer`/`Reject` signal: a
41/// `PublishAck` always means the publish was processed without error, but only a
42/// `PublishAck` carrying this bit means a subscriber actually received the
43/// message. An ack WITHOUT this bit means the publish succeeded but reached no
44/// subscriber (an empty channel, or a duplicate suppressed by dedup-on-delivery),
45/// so a caller that needs a true delivery ack can observe the difference. The bit
46/// rides the existing `flags` byte, so no wire-format change is required.
47pub const PUBLISH_DELIVERED_FLAG: u8 = 0x01;
48
49/// Status byte prefixing a [`Frame::WorkerRegisterAck`] payload that signals the
50/// registration was accepted (no further payload follows).
51pub(crate) const WORKER_REGISTER_ACK_ACCEPTED: u8 = 0x00;
52
53/// Status byte prefixing a [`Frame::WorkerRegisterAck`] payload that signals the
54/// registration was rejected (a length-prefixed reason string follows).
55pub(crate) const WORKER_REGISTER_ACK_REJECTED: u8 = 0x01;
56
57/// Protocol frame categories and their stable wire discriminants.
58#[derive(Clone, Copy, Debug, PartialEq, Eq)]
59pub enum FrameType {
60    /// Connection request.
61    Connect,
62    /// Successful connection response.
63    ConnectAck,
64    /// Failed connection response.
65    ConnectError,
66    /// Connection close notification.
67    Disconnect,
68    /// Channel subscription request.
69    Subscribe,
70    /// Successful subscription response.
71    SubscribeAck,
72    /// Failed subscription response.
73    SubscribeError,
74    /// Channel unsubscription request.
75    Unsubscribe,
76    /// Channel publish request.
77    Publish,
78    /// Successful publish response.
79    PublishAck,
80    /// Failed publish response.
81    PublishError,
82    /// Conversation lifecycle open.
83    ConversationOpen,
84    /// Conversation message delivery.
85    ConversationMessage,
86    /// Conversation lifecycle close.
87    ConversationClose,
88    /// Conversation processing error.
89    ConversationError,
90    /// In-band backpressure acceptance.
91    Accept,
92    /// In-band backpressure deferral.
93    Defer,
94    /// In-band backpressure rejection.
95    Reject,
96    /// Connection keepalive ping.
97    Ping,
98    /// Connection keepalive pong.
99    Pong,
100    /// Server-initiated push of an opaque payload to a connected client.
101    Push,
102    /// Client-initiated correlated reply to a server push.
103    PushReply,
104    /// Worker self-registration announcing identity and routing dimensions.
105    WorkerRegister,
106    /// Server acknowledgement of a worker registration (accepted or rejected).
107    WorkerRegisterAck,
108    /// Forward-compatible frame type not known to this implementation.
109    Unknown(u8),
110}
111
112impl FrameType {
113    /// Return true when this frame type must appear on stream 0.
114    #[must_use]
115    pub const fn is_control(self) -> bool {
116        matches!(
117            self,
118            Self::Connect
119                | Self::ConnectAck
120                | Self::ConnectError
121                | Self::Disconnect
122                | Self::Ping
123                | Self::Pong
124                | Self::WorkerRegister
125                | Self::WorkerRegisterAck
126        )
127    }
128}
129
130impl From<u8> for FrameType {
131    fn from(value: u8) -> Self {
132        match value {
133            0x01 => Self::Connect,
134            0x02 => Self::ConnectAck,
135            0x03 => Self::ConnectError,
136            0x04 => Self::Disconnect,
137            0x05 => Self::Subscribe,
138            0x06 => Self::SubscribeAck,
139            0x07 => Self::SubscribeError,
140            0x08 => Self::Unsubscribe,
141            0x09 => Self::Publish,
142            0x0A => Self::PublishAck,
143            0x0B => Self::PublishError,
144            0x0C => Self::ConversationOpen,
145            0x0D => Self::ConversationMessage,
146            0x0E => Self::ConversationClose,
147            0x0F => Self::ConversationError,
148            0x10 => Self::Accept,
149            0x11 => Self::Defer,
150            0x12 => Self::Reject,
151            0x13 => Self::Ping,
152            0x14 => Self::Pong,
153            0x15 => Self::Push,
154            0x16 => Self::PushReply,
155            0x17 => Self::WorkerRegister,
156            0x18 => Self::WorkerRegisterAck,
157            unknown => Self::Unknown(unknown),
158        }
159    }
160}
161
162impl From<FrameType> for u8 {
163    fn from(value: FrameType) -> Self {
164        match value {
165            FrameType::Connect => 0x01,
166            FrameType::ConnectAck => 0x02,
167            FrameType::ConnectError => 0x03,
168            FrameType::Disconnect => 0x04,
169            FrameType::Subscribe => 0x05,
170            FrameType::SubscribeAck => 0x06,
171            FrameType::SubscribeError => 0x07,
172            FrameType::Unsubscribe => 0x08,
173            FrameType::Publish => 0x09,
174            FrameType::PublishAck => 0x0A,
175            FrameType::PublishError => 0x0B,
176            FrameType::ConversationOpen => 0x0C,
177            FrameType::ConversationMessage => 0x0D,
178            FrameType::ConversationClose => 0x0E,
179            FrameType::ConversationError => 0x0F,
180            FrameType::Accept => 0x10,
181            FrameType::Defer => 0x11,
182            FrameType::Reject => 0x12,
183            FrameType::Ping => 0x13,
184            FrameType::Pong => 0x14,
185            FrameType::Push => 0x15,
186            FrameType::PushReply => 0x16,
187            FrameType::WorkerRegister => 0x17,
188            FrameType::WorkerRegisterAck => 0x18,
189            FrameType::Unknown(type_id) => type_id,
190        }
191    }
192}
193
194/// Fixed-size frame prefix: type, flags, stream identifier, and payload length.
195#[derive(Clone, Copy, Debug, PartialEq, Eq)]
196pub struct FrameHeader {
197    /// Frame type read from or written to byte 0.
198    pub frame_type: FrameType,
199    /// Frame flags read from or written to byte 1.
200    pub flags: u8,
201    /// Stream identifier read from or written to bytes 2..6.
202    pub stream_id: u32,
203    /// Payload length read from or written to bytes 6..10.
204    pub payload_length: u32,
205}
206
207impl FrameHeader {
208    /// Serialized header length in bytes.
209    pub const WIRE_LEN: usize = HEADER_LEN;
210}
211
212/// Self-describing worker registration carried by [`Frame::WorkerRegister`].
213///
214/// A worker announces the routing dimensions it serves plus a stable identity so
215/// the server can associate the worker with its connection and the application can
216/// route work to it. `node` is optional locality (the routing model treats node as
217/// an optional dimension); the codec encodes it with a presence byte rather than
218/// flattening `None` to an empty string.
219#[derive(Clone, Debug, PartialEq, Eq)]
220pub struct WorkerRegistration {
221    /// Namespaces this worker serves.
222    pub namespaces: Vec<String>,
223    /// Task queue this worker pulls from.
224    pub task_queue: String,
225    /// Optional node locality; `None` when the worker declares no node affinity.
226    pub node: Option<String>,
227    /// Activity types this worker can execute.
228    pub activity_types: Vec<String>,
229    /// Stable worker identity.
230    pub identity: String,
231}
232
233/// Outcome of a worker registration, carried by [`Frame::WorkerRegisterAck`].
234#[derive(Clone, Debug, PartialEq, Eq)]
235pub enum WorkerRegisterOutcome {
236    /// The server accepted the registration.
237    Accepted,
238    /// The server rejected the registration, with a human-readable reason.
239    Rejected {
240        /// Human-readable rejection reason surfaced to the worker.
241        reason: String,
242    },
243}
244
245/// A typed protocol frame body plus the header metadata required to encode it.
246#[derive(Clone, Debug, PartialEq, Eq)]
247pub enum Frame {
248    /// Connection request carrying a supported version range and opaque auth token.
249    Connect {
250        flags: u8,
251        min_version: ProtocolVersion,
252        max_version: ProtocolVersion,
253        auth_token: Vec<u8>,
254    },
255    /// Connection success carrying the negotiated protocol version and server capabilities.
256    ConnectAck {
257        flags: u8,
258        selected_version: ProtocolVersion,
259        capabilities: u32,
260    },
261    /// Connection failure carrying a numeric reason and optional message.
262    ConnectError {
263        flags: u8,
264        reason_code: u16,
265        message: Option<String>,
266    },
267    /// Connection close notification with no payload.
268    Disconnect { flags: u8 },
269    /// Channel subscription request carrying a channel and accepted schema hashes.
270    Subscribe {
271        flags: u8,
272        stream_id: u32,
273        channel: String,
274        accepted_schemas: Vec<SchemaId>,
275        max_in_flight: u32,
276    },
277    /// Channel subscription success carrying a subscription id and selected schema.
278    SubscribeAck {
279        flags: u8,
280        stream_id: u32,
281        subscription_id: u64,
282        selected_schema: SchemaId,
283    },
284    /// Channel subscription failure carrying a numeric reason and optional message.
285    SubscribeError {
286        flags: u8,
287        stream_id: u32,
288        reason_code: u16,
289        message: Option<String>,
290    },
291    /// Channel unsubscription request carrying the subscription id.
292    Unsubscribe {
293        flags: u8,
294        stream_id: u32,
295        subscription_id: u64,
296    },
297    /// Publish request carrying a channel and typed message envelope.
298    ///
299    /// `idempotency_key` is `Some` only when the [`PUBLISH_IDEMPOTENCY_KEY_FLAG`]
300    /// flag bit is set; it is the dedup-on-delivery key the server feeds to its
301    /// dedup cache. When `None` (and the flag clear) the frame is byte-identical
302    /// on the wire to a pre-13-L1 publish.
303    Publish {
304        flags: u8,
305        stream_id: u32,
306        channel: String,
307        envelope: MessageEnvelope,
308        idempotency_key: Option<String>,
309    },
310    /// Publish success carrying the accepted message id.
311    PublishAck {
312        flags: u8,
313        stream_id: u32,
314        message_id: u64,
315    },
316    /// Publish failure carrying a numeric reason and optional message.
317    PublishError {
318        flags: u8,
319        stream_id: u32,
320        reason_code: u16,
321        message: Option<String>,
322    },
323    /// Conversation open carrying a conversation id and subject.
324    ConversationOpen {
325        flags: u8,
326        stream_id: u32,
327        conversation_id: u64,
328        subject: String,
329    },
330    /// Conversation message carrying a conversation id and typed message envelope.
331    ConversationMessage {
332        flags: u8,
333        stream_id: u32,
334        conversation_id: u64,
335        envelope: MessageEnvelope,
336    },
337    /// Conversation close carrying a conversation id and optional reason.
338    ConversationClose {
339        flags: u8,
340        stream_id: u32,
341        conversation_id: u64,
342        reason_code: Option<u16>,
343        message: Option<String>,
344    },
345    /// Conversation failure carrying a conversation id, numeric reason, and optional message.
346    ConversationError {
347        flags: u8,
348        stream_id: u32,
349        conversation_id: u64,
350        reason_code: u16,
351        message: Option<String>,
352    },
353    /// Backpressure acceptance for a delivered message.
354    Accept {
355        flags: u8,
356        stream_id: u32,
357        referenced_message_id: MessageId,
358    },
359    /// Backpressure deferral for a buffered message.
360    Defer {
361        flags: u8,
362        stream_id: u32,
363        referenced_message_id: MessageId,
364        reason: Option<String>,
365    },
366    /// Backpressure rejection for a shed message.
367    Reject {
368        flags: u8,
369        stream_id: u32,
370        referenced_message_id: MessageId,
371        reason: Option<String>,
372    },
373    /// Connection keepalive ping.
374    Ping { flags: u8 },
375    /// Connection keepalive pong.
376    Pong { flags: u8 },
377    /// Server-initiated push carrying a correlation id and an opaque payload.
378    ///
379    /// A server writes this frame to a connected client over the client's existing
380    /// connection (server-to-client, the inverse of every other request frame). The
381    /// `correlation_id` is the key the server uses to match the client's later
382    /// [`Frame::PushReply`] back to this push; the `payload` is opaque application
383    /// bytes the server hands the client. This is an application-stream frame, so
384    /// `stream_id` is non-zero like a publish or conversation message.
385    Push {
386        flags: u8,
387        stream_id: u32,
388        correlation_id: u64,
389        payload: Vec<u8>,
390    },
391    /// Client-initiated correlated reply to a [`Frame::Push`].
392    ///
393    /// After handling a pushed frame the client writes this back on the same
394    /// connection, echoing the push's `correlation_id` so the server can match the
395    /// reply to the originating push. The `payload` is the client's opaque answer.
396    PushReply {
397        flags: u8,
398        stream_id: u32,
399        correlation_id: u64,
400        payload: Vec<u8>,
401    },
402    /// Worker self-registration over an established connection.
403    ///
404    /// A worker sends this control frame (stream 0) after the connection
405    /// handshake to announce its identity and routing dimensions. The server
406    /// associates the registration with the connection's process id and surfaces
407    /// it to the application via the connection-notifier hook, then answers with a
408    /// [`Frame::WorkerRegisterAck`]. `node` is optional locality and is encoded
409    /// with a presence byte, never flattened to an empty string.
410    WorkerRegister {
411        flags: u8,
412        registration: WorkerRegistration,
413    },
414    /// Server acknowledgement of a [`Frame::WorkerRegister`].
415    ///
416    /// Carries the registration outcome: [`WorkerRegisterOutcome::Accepted`] when
417    /// the server (and any configured notifier) accepted the worker, or
418    /// [`WorkerRegisterOutcome::Rejected`] carrying a human-readable reason when it
419    /// did not. The acknowledgement is synchronous so a worker never believes it is
420    /// registered when the application rejected it.
421    WorkerRegisterAck {
422        flags: u8,
423        outcome: WorkerRegisterOutcome,
424    },
425    /// Forward-compatible frame preserved after length-delimited skipping.
426    Unknown {
427        type_id: u8,
428        flags: u8,
429        stream_id: u32,
430        payload: Vec<u8>,
431    },
432}
433
434impl Frame {
435    /// Construct a ping frame, enforcing the stream-0 control-frame invariant.
436    ///
437    /// # Errors
438    ///
439    /// Returns [`ProtocolError::InvalidStream`] when `stream_id` is not zero.
440    pub fn new_ping(stream_id: u32) -> Result<Self, ProtocolError> {
441        validate_stream(FrameType::Ping, stream_id)?;
442        Ok(Self::Ping { flags: 0 })
443    }
444
445    /// Construct a publish frame, enforcing the non-zero application-stream invariant.
446    ///
447    /// # Errors
448    ///
449    /// Returns [`ProtocolError::InvalidStream`] when `stream_id` is zero.
450    pub fn new_publish(
451        stream_id: u32,
452        channel: impl Into<String>,
453        envelope: MessageEnvelope,
454    ) -> Result<Self, ProtocolError> {
455        validate_stream(FrameType::Publish, stream_id)?;
456        Ok(Self::Publish {
457            flags: 0,
458            stream_id,
459            channel: channel.into(),
460            envelope,
461            idempotency_key: None,
462        })
463    }
464
465    /// Construct a publish frame carrying an idempotency key for dedup-on-delivery.
466    ///
467    /// The returned frame has [`PUBLISH_IDEMPOTENCY_KEY_FLAG`] set and serializes
468    /// the trailing key field, so the server consults its dedup cache for this key.
469    ///
470    /// # Errors
471    ///
472    /// Returns [`ProtocolError::InvalidStream`] when `stream_id` is zero.
473    pub fn new_publish_with_idempotency_key(
474        stream_id: u32,
475        channel: impl Into<String>,
476        envelope: MessageEnvelope,
477        idempotency_key: impl Into<String>,
478    ) -> Result<Self, ProtocolError> {
479        validate_stream(FrameType::Publish, stream_id)?;
480        Ok(Self::Publish {
481            flags: PUBLISH_IDEMPOTENCY_KEY_FLAG,
482            stream_id,
483            channel: channel.into(),
484            envelope,
485            idempotency_key: Some(idempotency_key.into()),
486        })
487    }
488
489    /// Construct a server-to-client push frame, enforcing the non-zero
490    /// application-stream invariant.
491    ///
492    /// # Errors
493    ///
494    /// Returns [`ProtocolError::InvalidStream`] when `stream_id` is zero.
495    pub fn new_push(
496        stream_id: u32,
497        correlation_id: u64,
498        payload: Vec<u8>,
499    ) -> Result<Self, ProtocolError> {
500        validate_stream(FrameType::Push, stream_id)?;
501        Ok(Self::Push {
502            flags: 0,
503            stream_id,
504            correlation_id,
505            payload,
506        })
507    }
508
509    /// Construct a client-to-server push reply frame, echoing the correlation id of
510    /// the originating push, and enforcing the non-zero application-stream invariant.
511    ///
512    /// # Errors
513    ///
514    /// Returns [`ProtocolError::InvalidStream`] when `stream_id` is zero.
515    pub fn new_push_reply(
516        stream_id: u32,
517        correlation_id: u64,
518        payload: Vec<u8>,
519    ) -> Result<Self, ProtocolError> {
520        validate_stream(FrameType::PushReply, stream_id)?;
521        Ok(Self::PushReply {
522            flags: 0,
523            stream_id,
524            correlation_id,
525            payload,
526        })
527    }
528
529    /// Return the frame type represented by this frame body.
530    #[must_use]
531    pub const fn frame_type(&self) -> FrameType {
532        match self {
533            Self::Connect { .. } => FrameType::Connect,
534            Self::ConnectAck { .. } => FrameType::ConnectAck,
535            Self::ConnectError { .. } => FrameType::ConnectError,
536            Self::Disconnect { .. } => FrameType::Disconnect,
537            Self::Subscribe { .. } => FrameType::Subscribe,
538            Self::SubscribeAck { .. } => FrameType::SubscribeAck,
539            Self::SubscribeError { .. } => FrameType::SubscribeError,
540            Self::Unsubscribe { .. } => FrameType::Unsubscribe,
541            Self::Publish { .. } => FrameType::Publish,
542            Self::PublishAck { .. } => FrameType::PublishAck,
543            Self::PublishError { .. } => FrameType::PublishError,
544            Self::ConversationOpen { .. } => FrameType::ConversationOpen,
545            Self::ConversationMessage { .. } => FrameType::ConversationMessage,
546            Self::ConversationClose { .. } => FrameType::ConversationClose,
547            Self::ConversationError { .. } => FrameType::ConversationError,
548            Self::Accept { .. } => FrameType::Accept,
549            Self::Defer { .. } => FrameType::Defer,
550            Self::Reject { .. } => FrameType::Reject,
551            Self::Ping { .. } => FrameType::Ping,
552            Self::Pong { .. } => FrameType::Pong,
553            Self::Push { .. } => FrameType::Push,
554            Self::PushReply { .. } => FrameType::PushReply,
555            Self::WorkerRegister { .. } => FrameType::WorkerRegister,
556            Self::WorkerRegisterAck { .. } => FrameType::WorkerRegisterAck,
557            Self::Unknown { type_id, .. } => FrameType::Unknown(*type_id),
558        }
559    }
560
561    /// Return the frame flags stored in the fixed header.
562    #[must_use]
563    pub const fn flags(&self) -> u8 {
564        match self {
565            Self::Connect { flags, .. }
566            | Self::ConnectAck { flags, .. }
567            | Self::ConnectError { flags, .. }
568            | Self::Disconnect { flags, .. }
569            | Self::Subscribe { flags, .. }
570            | Self::SubscribeAck { flags, .. }
571            | Self::SubscribeError { flags, .. }
572            | Self::Unsubscribe { flags, .. }
573            | Self::Publish { flags, .. }
574            | Self::PublishAck { flags, .. }
575            | Self::PublishError { flags, .. }
576            | Self::ConversationOpen { flags, .. }
577            | Self::ConversationMessage { flags, .. }
578            | Self::ConversationClose { flags, .. }
579            | Self::ConversationError { flags, .. }
580            | Self::Accept { flags, .. }
581            | Self::Defer { flags, .. }
582            | Self::Reject { flags, .. }
583            | Self::Ping { flags }
584            | Self::Pong { flags }
585            | Self::Push { flags, .. }
586            | Self::PushReply { flags, .. }
587            | Self::WorkerRegister { flags, .. }
588            | Self::WorkerRegisterAck { flags, .. }
589            | Self::Unknown { flags, .. } => *flags,
590        }
591    }
592
593    /// Return the stream id stored in the fixed header.
594    #[must_use]
595    pub const fn stream_id(&self) -> u32 {
596        match self {
597            Self::Connect { .. }
598            | Self::ConnectAck { .. }
599            | Self::ConnectError { .. }
600            | Self::Disconnect { .. }
601            | Self::Ping { .. }
602            | Self::Pong { .. }
603            | Self::WorkerRegister { .. }
604            | Self::WorkerRegisterAck { .. } => 0,
605            Self::Subscribe { stream_id, .. }
606            | Self::SubscribeAck { stream_id, .. }
607            | Self::SubscribeError { stream_id, .. }
608            | Self::Unsubscribe { stream_id, .. }
609            | Self::Publish { stream_id, .. }
610            | Self::PublishAck { stream_id, .. }
611            | Self::PublishError { stream_id, .. }
612            | Self::ConversationOpen { stream_id, .. }
613            | Self::ConversationMessage { stream_id, .. }
614            | Self::ConversationClose { stream_id, .. }
615            | Self::ConversationError { stream_id, .. }
616            | Self::Accept { stream_id, .. }
617            | Self::Defer { stream_id, .. }
618            | Self::Reject { stream_id, .. }
619            | Self::Push { stream_id, .. }
620            | Self::PushReply { stream_id, .. }
621            | Self::Unknown { stream_id, .. } => *stream_id,
622        }
623    }
624
625    /// Validate the stream invariant for this frame.
626    pub(crate) fn validate(&self) -> Result<(), ProtocolError> {
627        validate_stream(self.frame_type(), self.stream_id())?;
628
629        if let Self::Subscribe { max_in_flight, .. } = self {
630            if *max_in_flight == 0 {
631                return Err(ProtocolError::codec(
632                    "max_in_flight must be greater than zero",
633                ));
634            }
635        }
636
637        Ok(())
638    }
639}
640
641/// Validate stream placement for a known frame type.
642///
643/// # Errors
644///
645/// Returns [`ProtocolError::InvalidStream`] when `stream_id` is invalid for
646/// `frame_type`.
647pub fn validate_stream(frame_type: FrameType, stream_id: u32) -> Result<(), ProtocolError> {
648    if matches!(frame_type, FrameType::Unknown(_)) {
649        return Ok(());
650    }
651
652    let valid = if frame_type.is_control() {
653        stream_id == 0
654    } else {
655        stream_id >= 1
656    };
657
658    if valid {
659        Ok(())
660    } else {
661        Err(ProtocolError::invalid_stream(frame_type, stream_id))
662    }
663}
664
665#[cfg(test)]
666mod tests;