rsrs_core/
frame.rs

1mod macros;
2mod validators;
3mod version;
4
5mod setup;
6mod error;
7mod lease;
8mod keepalive;
9mod request_response;
10mod request_fnf;
11mod request_stream;
12mod request_channel;
13mod request_n;
14mod cancel;
15mod payload;
16mod metadata_push;
17mod ext;
18mod resume;
19mod resume_ok;
20
21use recode::bytes::BytesMut;
22use recode::util::EncoderExt;
23use recode::{Decoder, Encoder, RawDecoder};
24
25use crate::{FrameFlags, FrameHeader, FrameType};
26#[doc(inline)]
27pub use self::setup::{Setup, SetupBuilder};
28#[doc(inline)]
29pub use self::error::{Error, ErrorBuilder, ErrorCode};
30#[doc(inline)]
31pub use self::lease::{Lease, LeaseBuilder};
32#[doc(inline)]
33pub use self::keepalive::{Keepalive, KeepaliveBuilder};
34#[doc(inline)]
35pub use self::request_response::{RequestResponse, RequestResponseBuilder};
36#[doc(inline)]
37pub use self::request_fnf::{RequestFNF, RequestFNFBuilder};
38#[doc(inline)]
39pub use self::request_stream::{RequestStream, RequestStreamBuilder};
40#[doc(inline)]
41pub use self::request_channel::{RequestChannel, RequestChannelBuilder};
42#[doc(inline)]
43pub use self::request_n::{RequestN, RequestNBuilder};
44#[doc(inline)]
45pub use self::cancel::{Cancel, CancelBuilder};
46#[doc(inline)]
47pub use self::payload::{Payload, PayloadBuilder};
48#[doc(inline)]
49pub use self::metadata_push::{MetadataPush, MetadataPushBuilder};
50#[doc(inline)]
51pub use self::ext::{Ext, ExtBuilder};
52#[doc(inline)]
53pub use self::resume::{Resume, ResumeBuilder};
54#[doc(inline)]
55pub use self::resume_ok::{ResumeOk, ResumeOkBuilder};
56
57/// A trait to be implemented by all frame variants.
58pub trait FrameVariant: Into<Frame> {
59    /// Mask that flags of a variant must be in.
60    const FLAGS_MASK: FrameFlags;
61
62    /// Flags that must be set for a variant.
63    const REQUIRED_FLAGS: FrameFlags = FrameFlags::empty();
64
65    /// Gets the flags of the variant.
66    fn flags(&self) -> FrameFlags;
67}
68
69/// An enum of all frame variants supported by RSocket.
70#[derive(Debug, Clone)]
71pub enum Frame {
72    Setup(Setup),
73    Error(Error),
74    Lease(Lease),
75    Keepalive(Keepalive),
76    RequestResponse(RequestResponse),
77    RequestFNF(RequestFNF),
78    RequestStream(RequestStream),
79    RequestChannel(RequestChannel),
80    RequestN(RequestN),
81    Cancel(Cancel),
82    Payload(Payload),
83    MetadataPush(MetadataPush),
84    Ext(Ext),
85    Resume(Resume),
86    ResumeOk(ResumeOk),
87}
88
89impl Frame {
90    #[inline(always)]
91    fn decode_inner(
92        buf: &mut BytesMut,
93        header: FrameHeader,
94    ) -> crate::Result<Self> {
95        #[inline]
96        fn dec<T>(
97            buf: &mut BytesMut,
98            header: FrameHeader,
99        ) -> crate::Result<Frame>
100        where
101            T: FrameVariant + Decoder<Error = crate::Error>,
102        {
103            // check for flags outside of the mask
104            if header.flags().intersects(!T::FLAGS_MASK) {
105                return Err(crate::Error::UnexpectedFlag {
106                    flag: header.flags(),
107                    frame_type: header.frame_type(),
108                    message: "frame has flags outside of its mask",
109                });
110            }
111
112            // check for missing required flags
113            if !header.flags().contains(T::REQUIRED_FLAGS) {
114                return Err(crate::Error::MissingFlag {
115                    flag: T::REQUIRED_FLAGS,
116                    frame_type: header.frame_type(),
117                    message: "frame is missing required flags",
118                });
119            }
120
121            T::decode(buf).map(Into::into)
122        }
123
124        match header.frame_type() {
125            | FrameType::Setup => dec::<Setup>(buf, header),
126            | FrameType::Lease => dec::<Lease>(buf, header),
127            | FrameType::Keepalive => dec::<Keepalive>(buf, header),
128            | FrameType::RequestResponse => dec::<RequestResponse>(buf, header),
129            | FrameType::RequestFNF => dec::<RequestFNF>(buf, header),
130            | FrameType::RequestStream => dec::<RequestStream>(buf, header),
131            | FrameType::RequestChannel => dec::<RequestChannel>(buf, header),
132            | FrameType::RequestN => dec::<RequestN>(buf, header),
133            | FrameType::Cancel => dec::<Cancel>(buf, header),
134            | FrameType::Payload => dec::<Payload>(buf, header),
135            | FrameType::Error => dec::<Error>(buf, header),
136            | FrameType::MetadataPush => dec::<MetadataPush>(buf, header),
137            | FrameType::Resume => dec::<Resume>(buf, header),
138            | FrameType::ResumeOk => dec::<ResumeOk>(buf, header),
139            | FrameType::Ext => dec::<Ext>(buf, header),
140            | FrameType::Unknown(_) => todo!(),
141        }
142    }
143}
144
145impl Decoder for Frame {
146    type Error = crate::Error;
147
148    fn decode(buf: &mut BytesMut) -> crate::Result<Self> {
149        let (header, _) = FrameHeader::raw_decode(buf.as_ref())?;
150
151        match Self::decode_inner(buf, header) {
152            | Ok(frame) => Ok(frame),
153            | Err(err) => {
154                if !header.flags().contains(FrameFlags::IGNORE) {
155                    return Err(err);
156                }
157
158                // ignoreable frame
159                Err(crate::Error::IgnoreableFrame(Box::new(err)))
160            }
161        }
162    }
163}
164
165impl Encoder for Frame {
166    type Error = crate::Error;
167
168    #[inline]
169    fn encode(item: &Self, buf: &mut BytesMut) -> Result<(), crate::Error> {
170        match item {
171            | Frame::Setup(v) => v.encode_to(buf),
172            | Frame::Error(v) => v.encode_to(buf),
173            | Frame::Lease(v) => v.encode_to(buf),
174            | Frame::Keepalive(v) => v.encode_to(buf),
175            | Frame::RequestResponse(v) => v.encode_to(buf),
176            | Frame::RequestFNF(v) => v.encode_to(buf),
177            | Frame::RequestStream(v) => v.encode_to(buf),
178            | Frame::RequestChannel(v) => v.encode_to(buf),
179            | Frame::RequestN(v) => v.encode_to(buf),
180            | Frame::Cancel(v) => v.encode_to(buf),
181            | Frame::Payload(v) => v.encode_to(buf),
182            | Frame::MetadataPush(v) => v.encode_to(buf),
183            | Frame::Resume(v) => v.encode_to(buf),
184            | Frame::ResumeOk(v) => v.encode_to(buf),
185            | Frame::Ext(v) => v.encode_to(buf),
186        }
187    }
188
189    #[inline]
190    fn size_of(item: &Self) -> usize {
191        match item {
192            | Frame::Setup(v) => v.size(),
193            | Frame::Error(v) => v.size(),
194            | Frame::Lease(v) => v.size(),
195            | Frame::Keepalive(v) => v.size(),
196            | Frame::RequestResponse(v) => v.size(),
197            | Frame::RequestFNF(v) => v.size(),
198            | Frame::RequestStream(v) => v.size(),
199            | Frame::RequestChannel(v) => v.size(),
200            | Frame::RequestN(v) => v.size(),
201            | Frame::Cancel(v) => v.size(),
202            | Frame::Payload(v) => v.size(),
203            | Frame::MetadataPush(v) => v.size(),
204            | Frame::Resume(v) => v.size(),
205            | Frame::ResumeOk(v) => v.size(),
206            | Frame::Ext(v) => v.size(),
207        }
208    }
209}