rpc_it/
codec.rs

1//! # Codec
2//!
3//! [`Codec`] is a trait that encodes/decodes data frame into underlying RPC protocol.
4
5use std::{
6    borrow::Cow,
7    num::{NonZeroU64, NonZeroUsize},
8    ops::Range,
9};
10
11use bytes::BytesMut;
12use enum_as_inner::EnumAsInner;
13use erased_serde::{Deserializer, Serialize};
14use serde::Deserialize;
15
16/// Splits data stream into frames. For example, for implmenting JSON-RPC over TCP,
17/// this would split the stream into JSON-RPC objects delimited by objects.
18pub trait Framing: Send + Sync + 'static + Unpin {
19    /// Advance internal parsing status
20    ///
21    /// # Returns
22    ///
23    /// - `Ok(Some(Range))` if a frame is found. Returns the range, represents `(valid_data_end,
24    ///   next_frame_start)` respectively.
25    /// - `Ok(None)` if a frame is not found. The buffer should be kept as-is, and the next
26    ///   [`Self::advance`] call should be called with the same buffer, but extended with more data
27    ///   from the underlying transport.
28    /// - `Err(...)` if any error occurs during framing.
29    fn try_framing(&mut self, buffer: &[u8]) -> Result<Option<FramingAdvanceResult>, FramingError>;
30
31    /// Called after every successful frame parsing
32    fn advance(&mut self) {}
33
34    /// Returns hint for the next buffer size. This is used to pre-allocate the buffer for the
35    /// next [`Self::advance`] call.
36    fn next_buffer_size(&self) -> Option<NonZeroUsize> {
37        // Default is no. Usually, this is only providable on protocols with frame headers.
38        None
39    }
40}
41
42#[derive(Default, Debug, Clone, Copy)]
43pub struct FramingAdvanceResult {
44    pub valid_data_end: usize,
45    pub next_frame_start: usize,
46}
47
48#[derive(Debug, thiserror::Error)]
49pub enum FramingError {
50    #[error("Broken buffer. The connection should be closed. Context: {0}")]
51    Broken(Cow<'static, str>),
52
53    #[error("Error occurred, but internal state can be restored after {0} bytes")]
54    Recoverable(usize),
55}
56
57#[derive(Debug, Clone, EnumAsInner)]
58pub enum ReqId {
59    U64(u64),
60    Bytes(Range<usize>),
61}
62
63#[derive(Debug, Clone, EnumAsInner)]
64pub enum ReqIdRef<'a> {
65    U64(u64),
66    Bytes(&'a [u8]),
67}
68
69impl ReqId {
70    pub fn make_ref<'a>(&self, buffer: &'a [u8]) -> ReqIdRef<'a> {
71        match self {
72            ReqId::U64(x) => ReqIdRef::U64(*x),
73            ReqId::Bytes(x) => ReqIdRef::Bytes(&buffer[x.clone()]),
74        }
75    }
76}
77
78/// Parses/Encodes data frame.
79///
80/// This is a trait that encodes/decodes data frame into underlying RPC protocol, and generally
81/// responsible for any protocol-specific data frame handling.
82///
83/// The codec, should trivially be clone-able.
84pub trait Codec: Send + Sync + 'static + std::fmt::Debug {
85    /// Encodes notify frame
86    fn encode_notify(
87        &self,
88        method: &str,
89        params: &dyn Serialize,
90        write: &mut BytesMut,
91    ) -> Result<(), EncodeError> {
92        let _ = (method, params, write);
93        Err(EncodeError::UnsupportedFeature("Notify is not supported by this codec".into()))
94    }
95
96    /// Encodes request frame
97    ///
98    /// It should internally generate appropriate request ID, and provide deterministic hash of the
99    /// internally generated request ID. This generated ID will be fed to [`Codec::decode_inbound`]
100    /// to match the response to the request.
101    ///
102    /// It is best to the output hash be deterministic for input `req_id_hint`, but it is not
103    /// required.
104    ///
105    /// - `req_id_hint` is guaranteed to be odd number.
106    ///
107    /// # Returns
108    ///
109    /// Should return for deterministic hash of the (internally generated) request ID.
110    ///
111    /// This is used to match the response to the request.
112    fn encode_request(
113        &self,
114        method: &str,
115        req_id_hint: NonZeroU64,
116        params: &dyn Serialize,
117        write: &mut BytesMut,
118    ) -> Result<NonZeroU64, EncodeError> {
119        let _ = (method, req_id_hint, params, write);
120        Err(EncodeError::UnsupportedFeature("Request is not supported by this codec".into()))
121    }
122
123    /// Encodes response frame
124    ///
125    /// - `req_id`: The original request ID.
126    /// - `encode_as_error`: If true, the response should be encoded as error response.
127    /// - `response`: The response object.
128    /// - `write`: The writer to write the response to.
129    fn encode_response(
130        &self,
131        req_id: ReqIdRef,
132        encode_as_error: bool,
133        response: &dyn Serialize,
134        write: &mut BytesMut,
135    ) -> Result<(), EncodeError> {
136        let _ = (req_id, response, encode_as_error, write);
137        Err(EncodeError::UnsupportedFeature("Response is not supported by this codec".into()))
138    }
139
140    /// Encodes predefined response error. See [`PredefinedResponseError`].
141    fn encode_response_predefined(
142        &self,
143        req_id: ReqIdRef,
144        response: &PredefinedResponseError,
145        write: &mut BytesMut,
146    ) -> Result<(), EncodeError> {
147        self.encode_response(req_id, true, response, write)
148    }
149
150    /// Decodes inbound frame, and identifies the frame type.
151    ///
152    /// If `Response` is received, the deterministic hash should be calculated from the request ID.
153    ///
154    /// # Returns
155    ///
156    /// Returns the frame type, and the range of the frame.
157    fn decode_inbound(&self, data: &[u8]) -> Result<(InboundFrameType, Range<usize>), DecodeError> {
158        let _ = data;
159        Err(DecodeError::UnsupportedFeature("This codec is write-only.".into()))
160    }
161
162    /// Decodes the payload of the inbound frame.
163    ///
164    /// Codec implementation should call `decode` with created [`Deserializer`] object.
165    /// Its type information can be erased using `<dyn erased_serde::Deserializer>::erase`
166    fn decode_payload<'a>(
167        &self,
168        payload: &'a [u8],
169        decode: &mut dyn FnMut(&mut dyn Deserializer<'a>) -> Result<(), erased_serde::Error>,
170    ) -> Result<(), DecodeError> {
171        let _ = (payload, decode);
172        Err(DecodeError::UnsupportedFeature("This codec is write-only.".into()))
173    }
174
175    /// Tries decode the payload to [`PredefinedResponseError`].
176    fn try_decode_predef_error<'a>(&self, payload: &'a [u8]) -> Option<PredefinedResponseError> {
177        let _ = (payload,);
178        let mut error = None;
179        self.decode_payload(payload, &mut |de| {
180            let result = PredefinedResponseError::deserialize(de);
181            error = Some(result?);
182            Ok(())
183        })
184        .ok()?;
185        error
186    }
187}
188
189/// Some predefined reponse error types. Most of them are automatically generated from the server.
190///
191/// In default, errors are serialized as following form:
192///
193/// ```json
194/// {
195///     "code": "PARSE_FAILED",
196///     "detail": "Failed to parse argument. (hint: Type 'i32' is expected)"
197/// }
198/// ```
199///
200/// The encoded error types may be customized by [`Codec::encode_response_predefined`].
201#[derive(Debug, thiserror::Error, serde::Serialize, serde::Deserialize, EnumAsInner)]
202#[non_exhaustive]
203#[serde(rename_all = "SCREAMING_SNAKE_CASE", tag = "code", content = "detail")]
204pub enum PredefinedResponseError {
205    /// You should not use this error type directly. This error type is used internally by the
206    /// library when the request object is dropped before sending request. To specify intentional
207    /// abort, use [`PredefinedResponseError::Aborted`] instead.
208    #[error("Request object was dropped by server.")]
209    Unhandled,
210
211    /// Use this when you want to abort the request intentionally.
212    #[error("Request was intentionally aborted")]
213    Aborted,
214
215    /// The typename will be obtained from [`std::any::type_name`]
216    #[error("Failed to parse argument. (hint: Type '{0}' is expected)")]
217    ParseFailed(Cow<'static, str>),
218
219    /// Invalid request/notify handler type
220    #[error("Notify handler received request message")]
221    NotifyHandler,
222
223    /// Internal error. This is only generated by the user.
224    #[error("Internal error: {0}")]
225    Internal(i32),
226
227    /// Internal error. This is only generated by the user.
228    #[error("Internal error: {0} ({1})")]
229    InternalDetailed(i32, String),
230}
231
232#[derive(Debug, thiserror::Error)]
233pub enum EncodeError {
234    #[error("Unsupported feature: {0}")]
235    UnsupportedFeature(Cow<'static, str>),
236
237    #[error("Unsupported data format: {0}")]
238    UnsupportedDataFormat(Cow<'static, str>),
239
240    #[error("Serialization failed: {0}")]
241    SerializeError(Box<dyn std::error::Error + Send + Sync + 'static>),
242}
243
244#[derive(Debug, thiserror::Error)]
245pub enum DecodeError {
246    #[error("Unsupported feature: {0}")]
247    UnsupportedFeature(Cow<'static, str>),
248
249    #[error("Unsupported data format: {0}")]
250    InvalidFormat(Cow<'static, str>),
251
252    #[error("Parsing error from decoder: {0}")]
253    ParseFailed(#[from] erased_serde::Error),
254
255    #[error("Other error reported from decoder: {0}")]
256    Other(#[from] Box<dyn std::error::Error + Send + Sync + 'static>),
257}
258
259/// Inbound frame type parsed by codec.
260#[derive(Debug)]
261pub enum InboundFrameType {
262    Notify { method: Range<usize> },
263    Request { method: Range<usize>, req_id: ReqId },
264    Response { req_id: ReqId, req_id_hash: u64, is_error: bool },
265}