busrt/rpc/
mod.rs

1use crate::{Error, Frame};
2use std::fmt;
3
4#[cfg(feature = "rpc")]
5mod async_client;
6#[cfg(feature = "rpc")]
7#[allow(clippy::module_name_repetitions)]
8pub use async_client::{DummyHandlers, Options, Rpc, RpcClient, RpcHandlers};
9
10pub const RPC_NOTIFICATION: u8 = 0x00;
11pub const RPC_REQUEST: u8 = 0x01;
12pub const RPC_REPLY: u8 = 0x11;
13pub const RPC_ERROR: u8 = 0x12;
14
15pub const RPC_ERROR_CODE_NOT_FOUND: i16 = -32001;
16pub const RPC_ERROR_CODE_PARSE: i16 = -32700;
17pub const RPC_ERROR_CODE_INVALID_REQUEST: i16 = -32600;
18pub const RPC_ERROR_CODE_METHOD_NOT_FOUND: i16 = -32601;
19pub const RPC_ERROR_CODE_INVALID_METHOD_PARAMS: i16 = -32602;
20pub const RPC_ERROR_CODE_INTERNAL: i16 = -32603;
21
22#[allow(clippy::module_name_repetitions)]
23#[derive(Debug, Eq, PartialEq, Copy, Clone)]
24#[repr(u8)]
25pub enum RpcEventKind {
26    Notification = RPC_NOTIFICATION,
27    Request = RPC_REQUEST,
28    Reply = RPC_REPLY,
29    ErrorReply = RPC_ERROR,
30}
31
32#[allow(clippy::module_name_repetitions)]
33#[inline]
34pub fn rpc_err_str(v: impl fmt::Display) -> Option<Vec<u8>> {
35    Some(v.to_string().as_bytes().to_vec())
36}
37
38impl fmt::Display for RpcEventKind {
39    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
40        write!(
41            f,
42            "{}",
43            match self {
44                RpcEventKind::Notification => "notifcation",
45                RpcEventKind::Request => "request",
46                RpcEventKind::Reply => "reply",
47                RpcEventKind::ErrorReply => "error reply",
48            }
49        )
50    }
51}
52
53#[allow(clippy::module_name_repetitions)]
54#[derive(Debug)]
55pub struct RpcEvent {
56    kind: RpcEventKind,
57    frame: Frame,
58    payload_pos: usize,
59    use_header: bool,
60}
61
62impl RpcEvent {
63    #[inline]
64    pub fn kind(&self) -> RpcEventKind {
65        self.kind
66    }
67    #[inline]
68    pub fn frame(&self) -> &Frame {
69        &self.frame
70    }
71    #[inline]
72    pub fn sender(&self) -> &str {
73        self.frame.sender()
74    }
75    #[inline]
76    pub fn primary_sender(&self) -> &str {
77        self.frame.primary_sender()
78    }
79    #[inline]
80    pub fn payload(&self) -> &[u8] {
81        &self.frame().payload()[self.payload_pos..]
82    }
83    /// # Panics
84    ///
85    /// Should not panic
86    #[inline]
87    pub fn id(&self) -> u32 {
88        u32::from_le_bytes(
89            if self.use_header {
90                &self.frame.header().unwrap()[1..5]
91            } else {
92                &self.frame.payload()[1..5]
93            }
94            .try_into()
95            .unwrap(),
96        )
97    }
98    #[inline]
99    pub fn is_response_required(&self) -> bool {
100        self.id() != 0
101    }
102    /// # Panics
103    ///
104    /// Should not panic
105    #[inline]
106    pub fn method(&self) -> &[u8] {
107        if self.use_header {
108            let header = self.frame.header.as_ref().unwrap();
109            &header[5..header.len() - 1]
110        } else {
111            &self.frame().payload()[5..self.payload_pos - 1]
112        }
113    }
114    #[inline]
115    pub fn parse_method(&self) -> Result<&str, Error> {
116        std::str::from_utf8(self.method()).map_err(Into::into)
117    }
118    /// # Panics
119    ///
120    /// Should not panic
121    #[inline]
122    pub fn code(&self) -> i16 {
123        if self.kind == RpcEventKind::ErrorReply {
124            i16::from_le_bytes(
125                if self.use_header {
126                    &self.frame.header().unwrap()[5..7]
127                } else {
128                    &self.frame.payload()[5..7]
129                }
130                .try_into()
131                .unwrap(),
132            )
133        } else {
134            0
135        }
136    }
137}
138
139impl TryFrom<Frame> for RpcEvent {
140    type Error = Error;
141    fn try_from(frame: Frame) -> Result<Self, Self::Error> {
142        let (body, use_header) = frame
143            .header()
144            .map_or_else(|| (frame.payload(), false), |h| (h, true));
145        if body.is_empty() {
146            Err(Error::data("Empty RPC frame"))
147        } else {
148            macro_rules! check_len {
149                ($len: expr) => {
150                    if body.len() < $len {
151                        return Err(Error::data("Invalid RPC frame"));
152                    }
153                };
154            }
155            match body[0] {
156                RPC_NOTIFICATION => Ok(RpcEvent {
157                    kind: RpcEventKind::Notification,
158                    frame,
159                    payload_pos: usize::from(!use_header),
160                    use_header: false,
161                }),
162                RPC_REQUEST => {
163                    check_len!(6);
164                    if use_header {
165                        Ok(RpcEvent {
166                            kind: RpcEventKind::Request,
167                            frame,
168                            payload_pos: 0,
169                            use_header: true,
170                        })
171                    } else {
172                        let mut sp = body[5..].splitn(2, |c| *c == 0);
173                        let method = sp.next().ok_or_else(|| Error::data("No RPC method"))?;
174                        let payload_pos = 6 + method.len();
175                        sp.next()
176                            .ok_or_else(|| Error::data("No RPC params block"))?;
177                        Ok(RpcEvent {
178                            kind: RpcEventKind::Request,
179                            frame,
180                            payload_pos,
181                            use_header: false,
182                        })
183                    }
184                }
185                RPC_REPLY => {
186                    check_len!(5);
187                    Ok(RpcEvent {
188                        kind: RpcEventKind::Reply,
189                        frame,
190                        payload_pos: if use_header { 0 } else { 5 },
191                        use_header,
192                    })
193                }
194                RPC_ERROR => {
195                    check_len!(7);
196                    Ok(RpcEvent {
197                        kind: RpcEventKind::ErrorReply,
198                        frame,
199                        payload_pos: if use_header { 0 } else { 7 },
200                        use_header,
201                    })
202                }
203                v => Err(Error::data(format!("Unsupported RPC frame code {}", v))),
204            }
205        }
206    }
207}
208
209#[allow(clippy::module_name_repetitions)]
210#[derive(Debug)]
211pub struct RpcError {
212    code: i16,
213    data: Option<Vec<u8>>,
214}
215
216impl TryFrom<&RpcEvent> for RpcError {
217    type Error = Error;
218    #[inline]
219    fn try_from(event: &RpcEvent) -> Result<Self, Self::Error> {
220        if event.kind() == RpcEventKind::ErrorReply {
221            Ok(RpcError::new(event.code(), Some(event.payload().to_vec())))
222        } else {
223            Err(Error::data("not a RPC error"))
224        }
225    }
226}
227
228impl RpcError {
229    #[inline]
230    pub fn new(code: i16, data: Option<Vec<u8>>) -> Self {
231        Self { code, data }
232    }
233    #[inline]
234    pub fn code(&self) -> i16 {
235        self.code
236    }
237    #[inline]
238    pub fn data(&self) -> Option<&[u8]> {
239        self.data.as_deref()
240    }
241    #[inline]
242    pub fn method(err: Option<Vec<u8>>) -> Self {
243        Self {
244            code: RPC_ERROR_CODE_METHOD_NOT_FOUND,
245            data: err,
246        }
247    }
248    #[inline]
249    pub fn not_found(err: Option<Vec<u8>>) -> Self {
250        Self {
251            code: RPC_ERROR_CODE_NOT_FOUND,
252            data: err,
253        }
254    }
255    #[inline]
256    pub fn params(err: Option<Vec<u8>>) -> Self {
257        Self {
258            code: RPC_ERROR_CODE_INVALID_METHOD_PARAMS,
259            data: err,
260        }
261    }
262    #[inline]
263    pub fn parse(err: Option<Vec<u8>>) -> Self {
264        Self {
265            code: RPC_ERROR_CODE_PARSE,
266            data: err,
267        }
268    }
269    #[inline]
270    pub fn invalid(err: Option<Vec<u8>>) -> Self {
271        Self {
272            code: RPC_ERROR_CODE_INVALID_REQUEST,
273            data: err,
274        }
275    }
276    #[inline]
277    pub fn internal(err: Option<Vec<u8>>) -> Self {
278        Self {
279            code: RPC_ERROR_CODE_INTERNAL,
280            data: err,
281        }
282    }
283    /// Converts displayable to Vec<u8>
284    #[inline]
285    pub fn convert_data(v: impl fmt::Display) -> Vec<u8> {
286        v.to_string().as_bytes().to_vec()
287    }
288}
289
290impl From<Error> for RpcError {
291    #[inline]
292    fn from(e: Error) -> RpcError {
293        RpcError {
294            code: -32000 - e.kind() as i16,
295            data: None,
296        }
297    }
298}
299
300#[cfg(feature = "broker-rpc")]
301impl From<rmp_serde::encode::Error> for RpcError {
302    #[inline]
303    fn from(e: rmp_serde::encode::Error) -> RpcError {
304        RpcError {
305            code: RPC_ERROR_CODE_INTERNAL,
306            data: Some(e.to_string().as_bytes().to_vec()),
307        }
308    }
309}
310
311impl From<regex::Error> for RpcError {
312    #[inline]
313    fn from(e: regex::Error) -> RpcError {
314        RpcError {
315            code: RPC_ERROR_CODE_PARSE,
316            data: Some(e.to_string().as_bytes().to_vec()),
317        }
318    }
319}
320
321impl From<std::io::Error> for RpcError {
322    #[inline]
323    fn from(e: std::io::Error) -> RpcError {
324        RpcError {
325            code: RPC_ERROR_CODE_INTERNAL,
326            data: Some(e.to_string().as_bytes().to_vec()),
327        }
328    }
329}
330
331#[cfg(feature = "broker-rpc")]
332impl From<rmp_serde::decode::Error> for RpcError {
333    #[inline]
334    fn from(e: rmp_serde::decode::Error) -> RpcError {
335        RpcError {
336            code: RPC_ERROR_CODE_PARSE,
337            data: Some(e.to_string().as_bytes().to_vec()),
338        }
339    }
340}
341
342impl fmt::Display for RpcError {
343    #[inline]
344    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
345        write!(f, "rpc error code: {}", self.code)
346    }
347}
348
349impl std::error::Error for RpcError {}
350
351#[allow(clippy::module_name_repetitions)]
352pub type RpcResult = Result<Option<Vec<u8>>, RpcError>;
353
354#[inline]
355pub(crate) fn prepare_call_payload(method: &str, id_bytes: &[u8]) -> Vec<u8> {
356    let m = method.as_bytes();
357    let mut payload = Vec::with_capacity(m.len() + 6);
358    payload.push(RPC_REQUEST);
359    payload.extend(id_bytes);
360    payload.extend(m);
361    payload.push(0x00);
362    payload
363}