wasmrs_frames/frames/
mod.rs

1pub(crate) mod f_cancel;
2pub(crate) mod f_error;
3pub(crate) mod f_payload;
4pub(crate) mod f_request_channel;
5pub(crate) mod f_request_fnf;
6pub(crate) mod f_request_n;
7pub(crate) mod f_request_response;
8pub(crate) mod f_request_stream;
9pub(crate) mod header;
10pub(crate) mod metadata;
11pub(crate) mod request_payload;
12
13use bytes::Bytes;
14
15use self::f_cancel::Cancel;
16use self::f_error::ErrorFrame;
17use self::f_payload::PayloadFrame;
18use self::f_request_channel::RequestChannel;
19use self::f_request_fnf::RequestFnF;
20use self::f_request_n::RequestN;
21use self::f_request_response::RequestResponse;
22use self::f_request_stream::RequestStream;
23use crate::{Error, PayloadError};
24
25/// The type that holds the bitmask for Frame flags.
26pub type FrameFlags = u16;
27
28/// Six (6) bytes reserved for FrameHeader information.
29#[derive()]
30#[cfg_attr(not(target = "wasm32-unknown-unknown"), derive(Debug))]
31#[must_use]
32pub struct FrameHeader {
33  /// The header bytes.
34  pub header: Bytes,
35}
36
37#[derive(Clone, Default)]
38#[cfg_attr(not(target = "wasm32-unknown-unknown"), derive(Debug))]
39#[must_use]
40/// A complete [Payload] object that includes metadata and data bytes.
41pub struct RawPayload {
42  /// Metadata bytes if they exist.
43  pub metadata: Option<Bytes>,
44  /// The core payload data bytes if it exists.
45  pub data: Option<Bytes>,
46}
47
48/// Metadata associated with the frame.
49#[derive(Clone, PartialEq)]
50#[cfg_attr(not(target = "wasm32-unknown-unknown"), derive(Debug))]
51#[cfg_attr(feature = "derive_serde", derive(serde::Serialize, serde::Deserialize))]
52#[must_use]
53pub struct Metadata {
54  /// The operation index.
55  pub index: Option<u32>,
56  /// The stream name.
57  pub extra: Option<Bytes>,
58}
59
60#[derive(Copy, Clone, Eq, PartialEq, Debug)]
61#[cfg_attr(feature = "derive_serde", derive(serde::Serialize, serde::Deserialize))]
62/// Frame types from https://rsocket.io/about/protocol
63#[allow(missing_docs)]
64pub enum FrameType {
65  Reserved,
66  Setup,
67  Lease,
68  Keepalive,
69  RequestResponse,
70  RequestFnf,
71  RequestStream,
72  RequestChannel,
73  RequestN,
74  Cancel,
75  Payload,
76  Err,
77  MetadataPush,
78  Resume,
79  ResumeOk,
80  Ext,
81}
82impl std::fmt::Display for FrameType {
83  fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
84    let name = match self {
85      Self::Reserved => "RESERVED",
86      Self::Setup => "SETUP",
87      Self::Lease => "LEASE",
88      Self::Keepalive => "KEEPALIVE",
89      Self::RequestResponse => "REQUEST_RESPONSE",
90      Self::RequestFnf => "REQUEST_FNF",
91      Self::RequestStream => "REQUEST_STREAM",
92      Self::RequestChannel => "REQUEST_CHANNEL",
93      Self::RequestN => "REQUEST_N",
94      Self::Cancel => "CANCEL",
95      Self::Payload => "PAYLOAD",
96      Self::Err => "ERROR",
97      Self::MetadataPush => "METADATA_PUSH",
98      Self::Resume => "RESUME",
99      Self::ResumeOk => "RESUME_OK",
100      Self::Ext => "EXT",
101    };
102    f.write_str(name)
103  }
104}
105
106impl TryFrom<u8> for FrameType {
107  type Error = String;
108  fn try_from(index: u8) -> Result<Self, Self::Error> {
109    match index {
110      0 => Ok(Self::Reserved),
111      1 => Ok(Self::Setup),
112      2 => Ok(Self::Lease),
113      3 => Ok(Self::Keepalive),
114      4 => Ok(Self::RequestResponse),
115      5 => Ok(Self::RequestFnf),
116      6 => Ok(Self::RequestStream),
117      7 => Ok(Self::RequestChannel),
118      8 => Ok(Self::RequestN),
119      9 => Ok(Self::Cancel),
120      10 => Ok(Self::Payload),
121      11 => Ok(Self::Err),
122      12 => Ok(Self::MetadataPush),
123      13 => Ok(Self::Resume),
124      14 => Ok(Self::ResumeOk),
125      63 => Ok(Self::Ext),
126      _ => Err("invalid index for FrameType".to_string()),
127    }
128  }
129}
130
131impl From<FrameType> for u32 {
132  fn from(value: FrameType) -> Self {
133    match value {
134      FrameType::Reserved => unreachable!("reserved frame type"),
135      FrameType::Setup => 1,
136      FrameType::Lease => 2,
137      FrameType::Keepalive => 3,
138      FrameType::RequestResponse => 4,
139      FrameType::RequestFnf => 5,
140      FrameType::RequestStream => 6,
141      FrameType::RequestChannel => 7,
142      FrameType::RequestN => 8,
143      FrameType::Cancel => 9,
144      FrameType::Payload => 10,
145      FrameType::Err => 11,
146      FrameType::MetadataPush => 12,
147      FrameType::Resume => 13,
148      FrameType::ResumeOk => 14,
149      FrameType::Ext => 63,
150    }
151  }
152}
153
154#[derive(Clone, Copy)]
155#[cfg_attr(not(target = "wasm32-unknown-unknown"), derive(Debug))]
156#[must_use]
157/// Frame flags come from https://rsocket.io/about/protocol
158#[allow(missing_docs)]
159pub enum FrameFlag {
160  Metadata,
161  Follows,
162  Complete,
163  Next,
164  Ignore,
165}
166impl std::fmt::Display for FrameFlag {
167  fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
168    let flag = match self {
169      Self::Metadata => "M",
170      Self::Follows => "FRS",
171      Self::Complete => "CL",
172      Self::Next => "N",
173      Self::Ignore => "I",
174    };
175    f.write_str(flag)
176  }
177}
178impl TryFrom<u32> for FrameFlag {
179  type Error = String;
180  fn try_from(index: u32) -> Result<Self, Self::Error> {
181    match index {
182      0 => Ok(Self::Metadata),
183      1 => Ok(Self::Follows),
184      2 => Ok(Self::Complete),
185      3 => Ok(Self::Next),
186      4 => Ok(Self::Ignore),
187      _ => Err("invalid index for FrameFlag".to_string()),
188    }
189  }
190}
191
192impl From<FrameFlag> for u32 {
193  fn from(value: FrameFlag) -> Self {
194    match value {
195      FrameFlag::Metadata => 0,
196      FrameFlag::Follows => 1,
197      FrameFlag::Complete => 2,
198      FrameFlag::Next => 3,
199      FrameFlag::Ignore => 4,
200    }
201  }
202}
203
204/// RSocket Error Codes
205#[derive(Copy, Clone)]
206#[cfg_attr(not(target = "wasm32-unknown-unknown"), derive(Debug))]
207#[must_use]
208/// Error codes come from https://rsocket.io/about/protocol
209#[allow(missing_docs)]
210pub enum ErrorCode {
211  InvalidSetup,
212  UnsupportedSetup,
213  RejectedSetup,
214  RejectedResume,
215  ConnectionError,
216  ConnectionClose,
217  ApplicationError,
218  Rejected,
219  Canceled,
220  Invalid,
221  Reserved,
222}
223impl TryFrom<u32> for ErrorCode {
224  type Error = String;
225  fn try_from(index: u32) -> Result<Self, Self::Error> {
226    match index {
227      1 => Ok(Self::InvalidSetup),
228      2 => Ok(Self::UnsupportedSetup),
229      3 => Ok(Self::RejectedSetup),
230      4 => Ok(Self::RejectedResume),
231      257 => Ok(Self::ConnectionError),
232      258 => Ok(Self::ConnectionClose),
233      513 => Ok(Self::ApplicationError),
234      514 => Ok(Self::Rejected),
235      515 => Ok(Self::Canceled),
236      516 => Ok(Self::Invalid),
237      4294967295 => Ok(Self::Reserved),
238      _ => Err("invalid index for ErrorCode".to_string()),
239    }
240  }
241}
242
243impl From<ErrorCode> for u32 {
244  fn from(value: ErrorCode) -> Self {
245    match value {
246      ErrorCode::InvalidSetup => 1,
247      ErrorCode::UnsupportedSetup => 2,
248      ErrorCode::RejectedSetup => 3,
249      ErrorCode::RejectedResume => 4,
250      ErrorCode::ConnectionError => 257,
251      ErrorCode::ConnectionClose => 258,
252      ErrorCode::ApplicationError => 513,
253      ErrorCode::Rejected => 514,
254      ErrorCode::Canceled => 515,
255      ErrorCode::Invalid => 516,
256      ErrorCode::Reserved => 4294967295,
257    }
258  }
259}
260
261#[derive(Clone)]
262#[cfg_attr(not(target = "wasm32-unknown-unknown"), derive(Debug))]
263#[must_use]
264/// An enum that can hold any time of wasmrs frame.
265#[allow(missing_docs)]
266pub enum Frame {
267  PayloadFrame(PayloadFrame),
268  Cancel(Cancel),
269  ErrorFrame(ErrorFrame),
270  RequestN(RequestN),
271  RequestResponse(RequestResponse),
272  RequestFnF(RequestFnF),
273  RequestStream(RequestStream),
274  RequestChannel(RequestChannel),
275}
276
277impl RawPayload {
278  /// Create a new payload with the passed metadata and data bytes.
279  pub fn new(metadata: Bytes, data: Bytes) -> Self {
280    Self {
281      metadata: Some(metadata),
282      data: Some(data),
283    }
284  }
285
286  /// Create new payload with just data and no metadata set yet.
287  pub fn new_data(metadata: Option<Bytes>, data: Option<Bytes>) -> Self {
288    Self { metadata, data }
289  }
290
291  /// Create an empty payload.
292  pub fn empty() -> Self {
293    Self {
294      metadata: None,
295      data: None,
296    }
297  }
298
299  /// Parse the metadata bytes into a [Metadata] object.
300  pub fn parse_metadata(&mut self) -> Result<Metadata, Error> {
301    if self.metadata.is_none() {
302      return Err(crate::Error::MetadataNotFound);
303    }
304    let bytes = self.metadata.as_mut().unwrap();
305    Metadata::decode(bytes)
306  }
307}
308
309impl From<Frame> for Result<Option<RawPayload>, PayloadError> {
310  fn from(frame: Frame) -> Self {
311    match frame {
312      Frame::PayloadFrame(frame) => Ok(Some(RawPayload::new(frame.metadata, frame.data))),
313      Frame::Cancel(_frame) => todo!(),
314      Frame::ErrorFrame(frame) => Err(crate::PayloadError::new(frame.code, frame.data, frame.metadata)),
315      Frame::RequestN(_frame) => todo!(),
316      Frame::RequestResponse(frame) => Ok(Some(RawPayload::new(frame.0.metadata, frame.0.data))),
317      Frame::RequestFnF(frame) => Ok(Some(RawPayload::new(frame.0.metadata, frame.0.data))),
318      Frame::RequestStream(frame) => Ok(Some(RawPayload::new(frame.0.metadata, frame.0.data))),
319      Frame::RequestChannel(frame) => Ok(Some(RawPayload::new(frame.0.metadata, frame.0.data))),
320    }
321  }
322}
323
324fn to_v0_metadata(m: &mut Bytes) {
325  if !m.is_empty() && m[0] == 0xca {
326    _ = m.split_to(4);
327  }
328}
329
330impl Frame {
331  pub(crate) const LEN_HEADER: usize = 6;
332  /// The IGNORE bit
333  pub const FLAG_IGNORE: FrameFlags = 1 << 4;
334  /// The NEXT bit
335  pub const FLAG_NEXT: FrameFlags = 1 << 5;
336  /// The COMPLETE bit
337  pub const FLAG_COMPLETE: FrameFlags = 1 << 6;
338  /// The FOLLOW bit
339  pub const FLAG_FOLLOW: FrameFlags = 1 << 7;
340  /// The METADATA bit
341  pub const FLAG_METADATA: FrameFlags = 1 << 8;
342  /// The maximum number of N for RequestN
343  pub const REQUEST_MAX: u32 = 0x7FFF_FFFF; // 2147483647
344
345  #[must_use]
346  /// Get the stream id for the frame.
347  pub fn stream_id(&self) -> u32 {
348    match self {
349      Frame::PayloadFrame(frame) => frame.stream_id,
350      Frame::Cancel(frame) => frame.stream_id,
351      Frame::ErrorFrame(frame) => frame.stream_id,
352      Frame::RequestN(frame) => frame.stream_id,
353      Frame::RequestResponse(frame) => frame.0.stream_id,
354      Frame::RequestFnF(frame) => frame.0.stream_id,
355      Frame::RequestStream(frame) => frame.0.stream_id,
356      Frame::RequestChannel(frame) => frame.0.stream_id,
357    }
358  }
359
360  #[must_use]
361  /// Get the set flags for the frame.
362  pub fn get_flag(&self) -> FrameFlags {
363    match self {
364      Frame::PayloadFrame(frame) => frame.get_flag(),
365      Frame::Cancel(frame) => frame.get_flag(),
366      Frame::ErrorFrame(frame) => frame.get_flag(),
367      Frame::RequestN(frame) => frame.get_flag(),
368      Frame::RequestResponse(frame) => frame.get_flag(),
369      Frame::RequestFnF(frame) => frame.get_flag(),
370      Frame::RequestStream(frame) => frame.get_flag(),
371      Frame::RequestChannel(frame) => frame.get_flag(),
372    }
373  }
374
375  #[must_use]
376  /// Get the [FrameType].
377  pub fn frame_type(&self) -> FrameType {
378    match self {
379      Frame::PayloadFrame(_) => FrameType::Payload,
380      Frame::Cancel(_) => FrameType::Cancel,
381      Frame::ErrorFrame(_) => FrameType::Err,
382      Frame::RequestN(_) => FrameType::RequestN,
383      Frame::RequestResponse(_) => FrameType::RequestResponse,
384      Frame::RequestFnF(_) => FrameType::RequestFnf,
385      Frame::RequestStream(_) => FrameType::RequestStream,
386      Frame::RequestChannel(_) => FrameType::RequestChannel,
387    }
388  }
389
390  /// Decode [Bytes] into a [Frame].
391  pub fn decode(mut bytes: Bytes) -> Result<Frame, (u32, Error)> {
392    let header = FrameHeader::from_bytes(bytes.split_to(Frame::LEN_HEADER));
393    let stream_id = header.stream_id();
394    Self::_decode(header, bytes).map_err(|e| (stream_id, e))
395  }
396
397  pub(crate) fn _decode(header: FrameHeader, buffer: Bytes) -> Result<Frame, Error> {
398    let frame = match header.frame_type() {
399      FrameType::Reserved => todo!(),
400      FrameType::Setup => todo!(),
401      FrameType::RequestResponse => {
402        Frame::RequestResponse(f_request_response::RequestResponse::decode_frame(&header, buffer)?)
403      }
404      FrameType::RequestFnf => Frame::RequestFnF(f_request_fnf::RequestFnF::decode_frame(&header, buffer)?),
405      FrameType::RequestStream => Frame::RequestStream(f_request_stream::RequestStream::decode_frame(&header, buffer)?),
406      FrameType::RequestChannel => {
407        Frame::RequestChannel(f_request_channel::RequestChannel::decode_frame(&header, buffer)?)
408      }
409      FrameType::RequestN => Frame::RequestN(f_request_n::RequestN::decode_frame(&header, buffer)?),
410      FrameType::Cancel => Frame::Cancel(Cancel {
411        stream_id: header.stream_id(),
412      }),
413      FrameType::Payload => Frame::PayloadFrame(f_payload::PayloadFrame::decode_frame(&header, buffer)?),
414      FrameType::Err => Frame::ErrorFrame(f_error::ErrorFrame::decode_frame(&header, buffer)?),
415      FrameType::Ext => todo!(),
416      _ => unreachable!("invalid frame type"),
417    };
418    Ok(frame)
419  }
420
421  #[must_use]
422  /// Encode the [Frame] into a byte buffer.
423  pub fn encode(self) -> Bytes {
424    match self {
425      Frame::PayloadFrame(f) => f.encode(),
426      Frame::Cancel(f) => f.encode(),
427      Frame::ErrorFrame(f) => f.encode(),
428      Frame::RequestN(f) => f.encode(),
429      Frame::RequestResponse(f) => f.encode(),
430      Frame::RequestFnF(f) => f.encode(),
431      Frame::RequestStream(f) => f.encode(),
432      Frame::RequestChannel(f) => f.encode(),
433    }
434  }
435
436  /// Convert the metadata to v0 metadata.
437  pub fn make_v0_metadata(&mut self) {
438    match self {
439      Frame::PayloadFrame(ref mut f) => to_v0_metadata(&mut f.metadata),
440
441      Frame::ErrorFrame(ref mut f) => {
442        f.metadata.as_mut().map(to_v0_metadata);
443      }
444      Frame::RequestChannel(f) => to_v0_metadata(&mut f.0.metadata),
445      Frame::RequestFnF(f) => to_v0_metadata(&mut f.0.metadata),
446      Frame::RequestResponse(f) => to_v0_metadata(&mut f.0.metadata),
447      Frame::RequestStream(f) => to_v0_metadata(&mut f.0.metadata),
448      _ => {}
449    }
450  }
451
452  /// Create a new [ErrorFrame].
453  pub fn new_error(stream_id: u32, e: PayloadError) -> Frame {
454    Frame::ErrorFrame(ErrorFrame {
455      stream_id,
456      metadata: e.metadata,
457      code: e.code,
458      data: e.msg,
459    })
460  }
461
462  /// Create a new [Cancel] frame.
463  pub fn new_cancel(stream_id: u32) -> Frame {
464    Frame::Cancel(Cancel { stream_id })
465  }
466
467  /// Create a new [RequestN] frame.
468  pub fn new_request_n(stream_id: u32, n: u32, _flags: FrameFlags) -> Frame {
469    Frame::RequestN(RequestN { stream_id, n })
470  }
471
472  /// Create a new [RequestResponse] frame.
473  pub fn new_request_response(stream_id: u32, payload: RawPayload, flags: FrameFlags) -> Frame {
474    Frame::RequestResponse(RequestResponse::from_payload(stream_id, payload, flags, 0))
475  }
476
477  /// Create a new [RequestStream] frame.
478  pub fn new_request_stream(stream_id: u32, payload: RawPayload, flags: FrameFlags) -> Frame {
479    Frame::RequestStream(RequestStream::from_payload(stream_id, payload, flags, 0))
480  }
481
482  /// Create a new [RequestChannel] frame.
483  pub fn new_request_channel(stream_id: u32, payload: RawPayload, flags: FrameFlags, initial_n: u32) -> Frame {
484    Frame::RequestChannel(RequestChannel::from_payload(stream_id, payload, flags, initial_n))
485  }
486
487  /// Create a new [RequestFnF] (Fire & Forget) frame
488  pub fn new_request_fnf(stream_id: u32, payload: RawPayload, flags: FrameFlags) -> Frame {
489    Frame::RequestFnF(RequestFnF::from_payload(stream_id, payload, flags, 0))
490  }
491
492  /// Create a new [PayloadFrame].
493  pub fn new_payload(stream_id: u32, payload: RawPayload, flags: FrameFlags) -> Frame {
494    Frame::PayloadFrame(PayloadFrame::from_payload(stream_id, payload, flags))
495  }
496}
497
498trait RSocketFrame<T> {
499  const FRAME_TYPE: FrameType;
500  fn check_type(header: &FrameHeader) -> Result<(), Error> {
501    if header.frame_type() == Self::FRAME_TYPE {
502      Ok(())
503    } else {
504      Err(crate::Error::WrongType)
505    }
506  }
507  fn kind(&self) -> FrameType {
508    Self::FRAME_TYPE
509  }
510  fn stream_id(&self) -> u32;
511  fn decode_all(buffer: Bytes) -> Result<T, Error>;
512  fn decode_frame(header: &FrameHeader, buffer: Bytes) -> Result<T, Error>;
513  fn encode(self) -> Bytes;
514  fn gen_header(&self) -> FrameHeader;
515  fn get_flag(&self) -> FrameFlags {
516    0
517  }
518}
519
520/// Utility trait to check the flags of a frame.
521pub trait RSocketFlags {
522  /// Check if the next flag is set.
523  fn flag_next(&self) -> bool;
524  /// Check if the metadata flag is set.
525  fn flag_metadata(&self) -> bool;
526  /// Check if the complete flag is set.
527  fn flag_complete(&self) -> bool;
528  /// Check if the follow flag is set.
529  fn flag_follows(&self) -> bool;
530  /// Check if the ignore flag is set.
531  fn flag_ignore(&self) -> bool;
532}
533
534impl RSocketFlags for FrameFlags {
535  fn flag_next(&self) -> bool {
536    self & Frame::FLAG_NEXT == Frame::FLAG_NEXT
537  }
538
539  fn flag_metadata(&self) -> bool {
540    self & Frame::FLAG_METADATA == Frame::FLAG_METADATA
541  }
542
543  fn flag_complete(&self) -> bool {
544    self & Frame::FLAG_COMPLETE == Frame::FLAG_COMPLETE
545  }
546
547  fn flag_follows(&self) -> bool {
548    self & Frame::FLAG_FOLLOW == Frame::FLAG_FOLLOW
549  }
550
551  fn flag_ignore(&self) -> bool {
552    self & Frame::FLAG_IGNORE == Frame::FLAG_IGNORE
553  }
554}