Skip to main content

shuflr_wire/
message.rs

1//! Typed message model for `shuflr-wire/1`. Mirrors 005 §3.2-3.4.
2//!
3//! Enum variants correspond 1:1 to message kinds on the wire. Fields
4//! that are opaque bytes on the wire (the protobuf-encoded
5//! `OpenStream` / `StreamOpened`, bearer-token bytes, error
6//! detail bytes) are `Vec<u8>` here so the codec never has to
7//! understand anything more than length-prefixed framing. The
8//! transport layer is free to decode those as protobuf / utf8.
9
10/// Message kinds on the wire. The numeric values are the bytes that
11/// appear in the first octet of every frame.
12#[repr(u8)]
13#[derive(Debug, Clone, Copy, PartialEq, Eq)]
14pub enum Kind {
15    Handshake = 0,
16    RawFrame = 1,
17    ZstdBatch = 2,
18    PlainBatch = 3,
19    EpochBoundary = 4,
20    StreamError = 5,
21    StreamClosed = 6,
22    Heartbeat = 7,
23    AddCredit = 10,
24    Cancel = 11,
25    Pong = 12,
26}
27
28impl Kind {
29    pub fn from_u8(v: u8) -> Option<Self> {
30        Some(match v {
31            0 => Kind::Handshake,
32            1 => Kind::RawFrame,
33            2 => Kind::ZstdBatch,
34            3 => Kind::PlainBatch,
35            4 => Kind::EpochBoundary,
36            5 => Kind::StreamError,
37            6 => Kind::StreamClosed,
38            7 => Kind::Heartbeat,
39            10 => Kind::AddCredit,
40            11 => Kind::Cancel,
41            12 => Kind::Pong,
42            _ => return None,
43        })
44    }
45
46    /// Handshake frames skip xxh3 (see 005 §3.2). Every other frame
47    /// carries a trailing u64 xxh3 checksum.
48    pub fn has_checksum(self) -> bool {
49        !matches!(self, Kind::Handshake)
50    }
51}
52
53#[derive(Debug, Clone, Copy, PartialEq, Eq)]
54#[repr(u8)]
55pub enum AuthKind {
56    None = 0,
57    Bearer = 1,
58    Mtls = 2,
59}
60
61impl AuthKind {
62    pub fn from_u8(v: u8) -> Option<Self> {
63        match v {
64            0 => Some(AuthKind::None),
65            1 => Some(AuthKind::Bearer),
66            2 => Some(AuthKind::Mtls),
67            _ => None,
68        }
69    }
70}
71
72#[derive(Debug, Clone, Copy, PartialEq, Eq)]
73#[repr(u8)]
74pub enum ChosenMode {
75    RawFrame = 1,
76    ZstdBatch = 2,
77    PlainBatch = 3,
78}
79
80impl ChosenMode {
81    pub fn from_u8(v: u8) -> Option<Self> {
82        match v {
83            1 => Some(ChosenMode::RawFrame),
84            2 => Some(ChosenMode::ZstdBatch),
85            3 => Some(ChosenMode::PlainBatch),
86            _ => None,
87        }
88    }
89}
90
91#[derive(Debug, Clone, Copy, PartialEq, Eq)]
92#[repr(u8)]
93pub enum HandshakeStatus {
94    Ok = 0,
95    /// Any non-zero status. The `stream_opened` payload on a non-Ok
96    /// ServerHello carries a human detail string the transport can log.
97    Error = 1,
98}
99
100impl HandshakeStatus {
101    pub fn from_u8(v: u8) -> Self {
102        match v {
103            0 => HandshakeStatus::Ok,
104            _ => HandshakeStatus::Error,
105        }
106    }
107}
108
109#[derive(Debug, Clone, Copy, PartialEq, Eq)]
110#[repr(u8)]
111pub enum StreamErrorCode {
112    Unspecified = 0,
113    MalformedRecord = 1,
114    OversizedRecord = 2,
115    IoError = 3,
116    SlowConsumer = 4,
117    ServerShutdown = 5,
118    AuthDenied = 6,
119    ResourceExhausted = 7,
120    Internal = 8,
121}
122
123impl StreamErrorCode {
124    pub fn from_u8(v: u8) -> Self {
125        match v {
126            1 => StreamErrorCode::MalformedRecord,
127            2 => StreamErrorCode::OversizedRecord,
128            3 => StreamErrorCode::IoError,
129            4 => StreamErrorCode::SlowConsumer,
130            5 => StreamErrorCode::ServerShutdown,
131            6 => StreamErrorCode::AuthDenied,
132            7 => StreamErrorCode::ResourceExhausted,
133            8 => StreamErrorCode::Internal,
134            _ => StreamErrorCode::Unspecified,
135        }
136    }
137}
138
139/// Decoded payload for a `PlainBatch` (kind=3).
140#[derive(Debug, Clone, PartialEq, Eq)]
141pub struct BatchPayload {
142    pub batch_id: u64,
143    pub epoch: u32,
144    pub records: Vec<Vec<u8>>,
145}
146
147#[derive(Debug, Clone, PartialEq, Eq)]
148pub struct ClientHello {
149    /// bit 0 raw-frame, bit 1 zstd-batch, bit 2 xxh3 on control msgs.
150    /// bits 3-7 reserved (must be 0).
151    pub capability_flags: u8,
152    pub auth_kind: AuthKind,
153    /// Max 65535 bytes (u16 length on wire).
154    pub auth: Vec<u8>,
155    /// Opaque `shuflr.v1.OpenStream` protobuf bytes. Max
156    /// `MAX_MESSAGE_BYTES` minus the fixed-size fields.
157    pub open_stream: Vec<u8>,
158}
159
160#[derive(Debug, Clone, PartialEq, Eq)]
161pub struct ServerHello {
162    pub status: HandshakeStatus,
163    pub chosen_mode: Option<ChosenMode>,
164    pub initial_credit: u64,
165    pub server_version: u8,
166    pub max_message_bytes: u32,
167    /// Opaque `shuflr.v1.StreamOpened` protobuf bytes, or a UTF-8
168    /// error detail when `status != Ok`.
169    pub stream_opened: Vec<u8>,
170}
171
172/// A decoded `shuflr-wire/1` message.
173#[derive(Debug, Clone, PartialEq, Eq)]
174pub enum Message {
175    ClientHello(ClientHello),
176    ServerHello(ServerHello),
177    RawFrame {
178        frame_id: u32,
179        /// 32-byte ChaCha20 seed the server used to Fisher-Yates the
180        /// records inside this frame. The client reseeds and replays
181        /// to produce byte-identical output to the in-process
182        /// `chunk-shuffled` pipeline.
183        perm_seed: [u8; 32],
184        zstd_bytes: Vec<u8>,
185    },
186    ZstdBatch {
187        batch_id: u64,
188        epoch: u32,
189        n_records: u32,
190        zstd_bytes: Vec<u8>,
191    },
192    PlainBatch(BatchPayload),
193    EpochBoundary {
194        completed_epoch: u32,
195        records_in_epoch: u64,
196    },
197    StreamError {
198        code: StreamErrorCode,
199        fatal: bool,
200        detail: Vec<u8>,
201    },
202    StreamClosed {
203        total_records: u64,
204        epochs_completed: u32,
205    },
206    Heartbeat {
207        now_unix_nanos: u64,
208    },
209    AddCredit {
210        add_bytes: u64,
211    },
212    Cancel {
213        reason: Vec<u8>,
214    },
215    Pong {
216        now_unix_nanos: u64,
217    },
218}
219
220impl Message {
221    pub fn kind(&self) -> Kind {
222        match self {
223            Message::ClientHello(_) | Message::ServerHello(_) => Kind::Handshake,
224            Message::RawFrame { .. } => Kind::RawFrame,
225            Message::ZstdBatch { .. } => Kind::ZstdBatch,
226            Message::PlainBatch(_) => Kind::PlainBatch,
227            Message::EpochBoundary { .. } => Kind::EpochBoundary,
228            Message::StreamError { .. } => Kind::StreamError,
229            Message::StreamClosed { .. } => Kind::StreamClosed,
230            Message::Heartbeat { .. } => Kind::Heartbeat,
231            Message::AddCredit { .. } => Kind::AddCredit,
232            Message::Cancel { .. } => Kind::Cancel,
233            Message::Pong { .. } => Kind::Pong,
234        }
235    }
236}