1pub(crate) mod f_cancel;
2pub(crate) mod f_error;
3pub(crate) mod f_payload;
4pub(crate) mod f_request_channel;
5pub(crate) mod f_request_fnf;
6pub(crate) mod f_request_n;
7pub(crate) mod f_request_response;
8pub(crate) mod f_request_stream;
9pub(crate) mod header;
10pub(crate) mod metadata;
11pub(crate) mod request_payload;
12
13use bytes::Bytes;
14
15use self::f_cancel::Cancel;
16use self::f_error::ErrorFrame;
17use self::f_payload::PayloadFrame;
18use self::f_request_channel::RequestChannel;
19use self::f_request_fnf::RequestFnF;
20use self::f_request_n::RequestN;
21use self::f_request_response::RequestResponse;
22use self::f_request_stream::RequestStream;
23use crate::{Error, PayloadError};
24
25pub type FrameFlags = u16;
27
28#[derive()]
30#[cfg_attr(not(target = "wasm32-unknown-unknown"), derive(Debug))]
31#[must_use]
32pub struct FrameHeader {
33 pub header: Bytes,
35}
36
37#[derive(Clone, Default)]
38#[cfg_attr(not(target = "wasm32-unknown-unknown"), derive(Debug))]
39#[must_use]
40pub struct RawPayload {
42 pub metadata: Option<Bytes>,
44 pub data: Option<Bytes>,
46}
47
48#[derive(Clone, PartialEq)]
50#[cfg_attr(not(target = "wasm32-unknown-unknown"), derive(Debug))]
51#[cfg_attr(feature = "derive_serde", derive(serde::Serialize, serde::Deserialize))]
52#[must_use]
53pub struct Metadata {
54 pub index: Option<u32>,
56 pub extra: Option<Bytes>,
58}
59
60#[derive(Copy, Clone, Eq, PartialEq, Debug)]
61#[cfg_attr(feature = "derive_serde", derive(serde::Serialize, serde::Deserialize))]
62#[allow(missing_docs)]
64pub enum FrameType {
65 Reserved,
66 Setup,
67 Lease,
68 Keepalive,
69 RequestResponse,
70 RequestFnf,
71 RequestStream,
72 RequestChannel,
73 RequestN,
74 Cancel,
75 Payload,
76 Err,
77 MetadataPush,
78 Resume,
79 ResumeOk,
80 Ext,
81}
82impl std::fmt::Display for FrameType {
83 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
84 let name = match self {
85 Self::Reserved => "RESERVED",
86 Self::Setup => "SETUP",
87 Self::Lease => "LEASE",
88 Self::Keepalive => "KEEPALIVE",
89 Self::RequestResponse => "REQUEST_RESPONSE",
90 Self::RequestFnf => "REQUEST_FNF",
91 Self::RequestStream => "REQUEST_STREAM",
92 Self::RequestChannel => "REQUEST_CHANNEL",
93 Self::RequestN => "REQUEST_N",
94 Self::Cancel => "CANCEL",
95 Self::Payload => "PAYLOAD",
96 Self::Err => "ERROR",
97 Self::MetadataPush => "METADATA_PUSH",
98 Self::Resume => "RESUME",
99 Self::ResumeOk => "RESUME_OK",
100 Self::Ext => "EXT",
101 };
102 f.write_str(name)
103 }
104}
105
106impl TryFrom<u8> for FrameType {
107 type Error = String;
108 fn try_from(index: u8) -> Result<Self, Self::Error> {
109 match index {
110 0 => Ok(Self::Reserved),
111 1 => Ok(Self::Setup),
112 2 => Ok(Self::Lease),
113 3 => Ok(Self::Keepalive),
114 4 => Ok(Self::RequestResponse),
115 5 => Ok(Self::RequestFnf),
116 6 => Ok(Self::RequestStream),
117 7 => Ok(Self::RequestChannel),
118 8 => Ok(Self::RequestN),
119 9 => Ok(Self::Cancel),
120 10 => Ok(Self::Payload),
121 11 => Ok(Self::Err),
122 12 => Ok(Self::MetadataPush),
123 13 => Ok(Self::Resume),
124 14 => Ok(Self::ResumeOk),
125 63 => Ok(Self::Ext),
126 _ => Err("invalid index for FrameType".to_string()),
127 }
128 }
129}
130
131impl From<FrameType> for u32 {
132 fn from(value: FrameType) -> Self {
133 match value {
134 FrameType::Reserved => unreachable!("reserved frame type"),
135 FrameType::Setup => 1,
136 FrameType::Lease => 2,
137 FrameType::Keepalive => 3,
138 FrameType::RequestResponse => 4,
139 FrameType::RequestFnf => 5,
140 FrameType::RequestStream => 6,
141 FrameType::RequestChannel => 7,
142 FrameType::RequestN => 8,
143 FrameType::Cancel => 9,
144 FrameType::Payload => 10,
145 FrameType::Err => 11,
146 FrameType::MetadataPush => 12,
147 FrameType::Resume => 13,
148 FrameType::ResumeOk => 14,
149 FrameType::Ext => 63,
150 }
151 }
152}
153
154#[derive(Clone, Copy)]
155#[cfg_attr(not(target = "wasm32-unknown-unknown"), derive(Debug))]
156#[must_use]
157#[allow(missing_docs)]
159pub enum FrameFlag {
160 Metadata,
161 Follows,
162 Complete,
163 Next,
164 Ignore,
165}
166impl std::fmt::Display for FrameFlag {
167 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
168 let flag = match self {
169 Self::Metadata => "M",
170 Self::Follows => "FRS",
171 Self::Complete => "CL",
172 Self::Next => "N",
173 Self::Ignore => "I",
174 };
175 f.write_str(flag)
176 }
177}
178impl TryFrom<u32> for FrameFlag {
179 type Error = String;
180 fn try_from(index: u32) -> Result<Self, Self::Error> {
181 match index {
182 0 => Ok(Self::Metadata),
183 1 => Ok(Self::Follows),
184 2 => Ok(Self::Complete),
185 3 => Ok(Self::Next),
186 4 => Ok(Self::Ignore),
187 _ => Err("invalid index for FrameFlag".to_string()),
188 }
189 }
190}
191
192impl From<FrameFlag> for u32 {
193 fn from(value: FrameFlag) -> Self {
194 match value {
195 FrameFlag::Metadata => 0,
196 FrameFlag::Follows => 1,
197 FrameFlag::Complete => 2,
198 FrameFlag::Next => 3,
199 FrameFlag::Ignore => 4,
200 }
201 }
202}
203
204#[derive(Copy, Clone)]
206#[cfg_attr(not(target = "wasm32-unknown-unknown"), derive(Debug))]
207#[must_use]
208#[allow(missing_docs)]
210pub enum ErrorCode {
211 InvalidSetup,
212 UnsupportedSetup,
213 RejectedSetup,
214 RejectedResume,
215 ConnectionError,
216 ConnectionClose,
217 ApplicationError,
218 Rejected,
219 Canceled,
220 Invalid,
221 Reserved,
222}
223impl TryFrom<u32> for ErrorCode {
224 type Error = String;
225 fn try_from(index: u32) -> Result<Self, Self::Error> {
226 match index {
227 1 => Ok(Self::InvalidSetup),
228 2 => Ok(Self::UnsupportedSetup),
229 3 => Ok(Self::RejectedSetup),
230 4 => Ok(Self::RejectedResume),
231 257 => Ok(Self::ConnectionError),
232 258 => Ok(Self::ConnectionClose),
233 513 => Ok(Self::ApplicationError),
234 514 => Ok(Self::Rejected),
235 515 => Ok(Self::Canceled),
236 516 => Ok(Self::Invalid),
237 4294967295 => Ok(Self::Reserved),
238 _ => Err("invalid index for ErrorCode".to_string()),
239 }
240 }
241}
242
243impl From<ErrorCode> for u32 {
244 fn from(value: ErrorCode) -> Self {
245 match value {
246 ErrorCode::InvalidSetup => 1,
247 ErrorCode::UnsupportedSetup => 2,
248 ErrorCode::RejectedSetup => 3,
249 ErrorCode::RejectedResume => 4,
250 ErrorCode::ConnectionError => 257,
251 ErrorCode::ConnectionClose => 258,
252 ErrorCode::ApplicationError => 513,
253 ErrorCode::Rejected => 514,
254 ErrorCode::Canceled => 515,
255 ErrorCode::Invalid => 516,
256 ErrorCode::Reserved => 4294967295,
257 }
258 }
259}
260
261#[derive(Clone)]
262#[cfg_attr(not(target = "wasm32-unknown-unknown"), derive(Debug))]
263#[must_use]
264#[allow(missing_docs)]
266pub enum Frame {
267 PayloadFrame(PayloadFrame),
268 Cancel(Cancel),
269 ErrorFrame(ErrorFrame),
270 RequestN(RequestN),
271 RequestResponse(RequestResponse),
272 RequestFnF(RequestFnF),
273 RequestStream(RequestStream),
274 RequestChannel(RequestChannel),
275}
276
277impl RawPayload {
278 pub fn new(metadata: Bytes, data: Bytes) -> Self {
280 Self {
281 metadata: Some(metadata),
282 data: Some(data),
283 }
284 }
285
286 pub fn new_data(metadata: Option<Bytes>, data: Option<Bytes>) -> Self {
288 Self { metadata, data }
289 }
290
291 pub fn empty() -> Self {
293 Self {
294 metadata: None,
295 data: None,
296 }
297 }
298
299 pub fn parse_metadata(&mut self) -> Result<Metadata, Error> {
301 if self.metadata.is_none() {
302 return Err(crate::Error::MetadataNotFound);
303 }
304 let bytes = self.metadata.as_mut().unwrap();
305 Metadata::decode(bytes)
306 }
307}
308
309impl From<Frame> for Result<Option<RawPayload>, PayloadError> {
310 fn from(frame: Frame) -> Self {
311 match frame {
312 Frame::PayloadFrame(frame) => Ok(Some(RawPayload::new(frame.metadata, frame.data))),
313 Frame::Cancel(_frame) => todo!(),
314 Frame::ErrorFrame(frame) => Err(crate::PayloadError::new(frame.code, frame.data, frame.metadata)),
315 Frame::RequestN(_frame) => todo!(),
316 Frame::RequestResponse(frame) => Ok(Some(RawPayload::new(frame.0.metadata, frame.0.data))),
317 Frame::RequestFnF(frame) => Ok(Some(RawPayload::new(frame.0.metadata, frame.0.data))),
318 Frame::RequestStream(frame) => Ok(Some(RawPayload::new(frame.0.metadata, frame.0.data))),
319 Frame::RequestChannel(frame) => Ok(Some(RawPayload::new(frame.0.metadata, frame.0.data))),
320 }
321 }
322}
323
324fn to_v0_metadata(m: &mut Bytes) {
325 if !m.is_empty() && m[0] == 0xca {
326 _ = m.split_to(4);
327 }
328}
329
330impl Frame {
331 pub(crate) const LEN_HEADER: usize = 6;
332 pub const FLAG_IGNORE: FrameFlags = 1 << 4;
334 pub const FLAG_NEXT: FrameFlags = 1 << 5;
336 pub const FLAG_COMPLETE: FrameFlags = 1 << 6;
338 pub const FLAG_FOLLOW: FrameFlags = 1 << 7;
340 pub const FLAG_METADATA: FrameFlags = 1 << 8;
342 pub const REQUEST_MAX: u32 = 0x7FFF_FFFF; #[must_use]
346 pub fn stream_id(&self) -> u32 {
348 match self {
349 Frame::PayloadFrame(frame) => frame.stream_id,
350 Frame::Cancel(frame) => frame.stream_id,
351 Frame::ErrorFrame(frame) => frame.stream_id,
352 Frame::RequestN(frame) => frame.stream_id,
353 Frame::RequestResponse(frame) => frame.0.stream_id,
354 Frame::RequestFnF(frame) => frame.0.stream_id,
355 Frame::RequestStream(frame) => frame.0.stream_id,
356 Frame::RequestChannel(frame) => frame.0.stream_id,
357 }
358 }
359
360 #[must_use]
361 pub fn get_flag(&self) -> FrameFlags {
363 match self {
364 Frame::PayloadFrame(frame) => frame.get_flag(),
365 Frame::Cancel(frame) => frame.get_flag(),
366 Frame::ErrorFrame(frame) => frame.get_flag(),
367 Frame::RequestN(frame) => frame.get_flag(),
368 Frame::RequestResponse(frame) => frame.get_flag(),
369 Frame::RequestFnF(frame) => frame.get_flag(),
370 Frame::RequestStream(frame) => frame.get_flag(),
371 Frame::RequestChannel(frame) => frame.get_flag(),
372 }
373 }
374
375 #[must_use]
376 pub fn frame_type(&self) -> FrameType {
378 match self {
379 Frame::PayloadFrame(_) => FrameType::Payload,
380 Frame::Cancel(_) => FrameType::Cancel,
381 Frame::ErrorFrame(_) => FrameType::Err,
382 Frame::RequestN(_) => FrameType::RequestN,
383 Frame::RequestResponse(_) => FrameType::RequestResponse,
384 Frame::RequestFnF(_) => FrameType::RequestFnf,
385 Frame::RequestStream(_) => FrameType::RequestStream,
386 Frame::RequestChannel(_) => FrameType::RequestChannel,
387 }
388 }
389
390 pub fn decode(mut bytes: Bytes) -> Result<Frame, (u32, Error)> {
392 let header = FrameHeader::from_bytes(bytes.split_to(Frame::LEN_HEADER));
393 let stream_id = header.stream_id();
394 Self::_decode(header, bytes).map_err(|e| (stream_id, e))
395 }
396
397 pub(crate) fn _decode(header: FrameHeader, buffer: Bytes) -> Result<Frame, Error> {
398 let frame = match header.frame_type() {
399 FrameType::Reserved => todo!(),
400 FrameType::Setup => todo!(),
401 FrameType::RequestResponse => {
402 Frame::RequestResponse(f_request_response::RequestResponse::decode_frame(&header, buffer)?)
403 }
404 FrameType::RequestFnf => Frame::RequestFnF(f_request_fnf::RequestFnF::decode_frame(&header, buffer)?),
405 FrameType::RequestStream => Frame::RequestStream(f_request_stream::RequestStream::decode_frame(&header, buffer)?),
406 FrameType::RequestChannel => {
407 Frame::RequestChannel(f_request_channel::RequestChannel::decode_frame(&header, buffer)?)
408 }
409 FrameType::RequestN => Frame::RequestN(f_request_n::RequestN::decode_frame(&header, buffer)?),
410 FrameType::Cancel => Frame::Cancel(Cancel {
411 stream_id: header.stream_id(),
412 }),
413 FrameType::Payload => Frame::PayloadFrame(f_payload::PayloadFrame::decode_frame(&header, buffer)?),
414 FrameType::Err => Frame::ErrorFrame(f_error::ErrorFrame::decode_frame(&header, buffer)?),
415 FrameType::Ext => todo!(),
416 _ => unreachable!("invalid frame type"),
417 };
418 Ok(frame)
419 }
420
421 #[must_use]
422 pub fn encode(self) -> Bytes {
424 match self {
425 Frame::PayloadFrame(f) => f.encode(),
426 Frame::Cancel(f) => f.encode(),
427 Frame::ErrorFrame(f) => f.encode(),
428 Frame::RequestN(f) => f.encode(),
429 Frame::RequestResponse(f) => f.encode(),
430 Frame::RequestFnF(f) => f.encode(),
431 Frame::RequestStream(f) => f.encode(),
432 Frame::RequestChannel(f) => f.encode(),
433 }
434 }
435
436 pub fn make_v0_metadata(&mut self) {
438 match self {
439 Frame::PayloadFrame(ref mut f) => to_v0_metadata(&mut f.metadata),
440
441 Frame::ErrorFrame(ref mut f) => {
442 f.metadata.as_mut().map(to_v0_metadata);
443 }
444 Frame::RequestChannel(f) => to_v0_metadata(&mut f.0.metadata),
445 Frame::RequestFnF(f) => to_v0_metadata(&mut f.0.metadata),
446 Frame::RequestResponse(f) => to_v0_metadata(&mut f.0.metadata),
447 Frame::RequestStream(f) => to_v0_metadata(&mut f.0.metadata),
448 _ => {}
449 }
450 }
451
452 pub fn new_error(stream_id: u32, e: PayloadError) -> Frame {
454 Frame::ErrorFrame(ErrorFrame {
455 stream_id,
456 metadata: e.metadata,
457 code: e.code,
458 data: e.msg,
459 })
460 }
461
462 pub fn new_cancel(stream_id: u32) -> Frame {
464 Frame::Cancel(Cancel { stream_id })
465 }
466
467 pub fn new_request_n(stream_id: u32, n: u32, _flags: FrameFlags) -> Frame {
469 Frame::RequestN(RequestN { stream_id, n })
470 }
471
472 pub fn new_request_response(stream_id: u32, payload: RawPayload, flags: FrameFlags) -> Frame {
474 Frame::RequestResponse(RequestResponse::from_payload(stream_id, payload, flags, 0))
475 }
476
477 pub fn new_request_stream(stream_id: u32, payload: RawPayload, flags: FrameFlags) -> Frame {
479 Frame::RequestStream(RequestStream::from_payload(stream_id, payload, flags, 0))
480 }
481
482 pub fn new_request_channel(stream_id: u32, payload: RawPayload, flags: FrameFlags, initial_n: u32) -> Frame {
484 Frame::RequestChannel(RequestChannel::from_payload(stream_id, payload, flags, initial_n))
485 }
486
487 pub fn new_request_fnf(stream_id: u32, payload: RawPayload, flags: FrameFlags) -> Frame {
489 Frame::RequestFnF(RequestFnF::from_payload(stream_id, payload, flags, 0))
490 }
491
492 pub fn new_payload(stream_id: u32, payload: RawPayload, flags: FrameFlags) -> Frame {
494 Frame::PayloadFrame(PayloadFrame::from_payload(stream_id, payload, flags))
495 }
496}
497
498trait RSocketFrame<T> {
499 const FRAME_TYPE: FrameType;
500 fn check_type(header: &FrameHeader) -> Result<(), Error> {
501 if header.frame_type() == Self::FRAME_TYPE {
502 Ok(())
503 } else {
504 Err(crate::Error::WrongType)
505 }
506 }
507 fn kind(&self) -> FrameType {
508 Self::FRAME_TYPE
509 }
510 fn stream_id(&self) -> u32;
511 fn decode_all(buffer: Bytes) -> Result<T, Error>;
512 fn decode_frame(header: &FrameHeader, buffer: Bytes) -> Result<T, Error>;
513 fn encode(self) -> Bytes;
514 fn gen_header(&self) -> FrameHeader;
515 fn get_flag(&self) -> FrameFlags {
516 0
517 }
518}
519
520pub trait RSocketFlags {
522 fn flag_next(&self) -> bool;
524 fn flag_metadata(&self) -> bool;
526 fn flag_complete(&self) -> bool;
528 fn flag_follows(&self) -> bool;
530 fn flag_ignore(&self) -> bool;
532}
533
534impl RSocketFlags for FrameFlags {
535 fn flag_next(&self) -> bool {
536 self & Frame::FLAG_NEXT == Frame::FLAG_NEXT
537 }
538
539 fn flag_metadata(&self) -> bool {
540 self & Frame::FLAG_METADATA == Frame::FLAG_METADATA
541 }
542
543 fn flag_complete(&self) -> bool {
544 self & Frame::FLAG_COMPLETE == Frame::FLAG_COMPLETE
545 }
546
547 fn flag_follows(&self) -> bool {
548 self & Frame::FLAG_FOLLOW == Frame::FLAG_FOLLOW
549 }
550
551 fn flag_ignore(&self) -> bool {
552 self & Frame::FLAG_IGNORE == Frame::FLAG_IGNORE
553 }
554}