1use crate::{Error, Frame};
2use std::fmt;
3
4#[cfg(feature = "rpc")]
5mod async_client;
6#[cfg(feature = "rpc")]
7#[allow(clippy::module_name_repetitions)]
8pub use async_client::{DummyHandlers, Options, Rpc, RpcClient, RpcHandlers};
9
10pub const RPC_NOTIFICATION: u8 = 0x00;
11pub const RPC_REQUEST: u8 = 0x01;
12pub const RPC_REPLY: u8 = 0x11;
13pub const RPC_ERROR: u8 = 0x12;
14
15pub const RPC_ERROR_CODE_NOT_FOUND: i16 = -32001;
16pub const RPC_ERROR_CODE_PARSE: i16 = -32700;
17pub const RPC_ERROR_CODE_INVALID_REQUEST: i16 = -32600;
18pub const RPC_ERROR_CODE_METHOD_NOT_FOUND: i16 = -32601;
19pub const RPC_ERROR_CODE_INVALID_METHOD_PARAMS: i16 = -32602;
20pub const RPC_ERROR_CODE_INTERNAL: i16 = -32603;
21
22#[allow(clippy::module_name_repetitions)]
23#[derive(Debug, Eq, PartialEq, Copy, Clone)]
24#[repr(u8)]
25pub enum RpcEventKind {
26 Notification = RPC_NOTIFICATION,
27 Request = RPC_REQUEST,
28 Reply = RPC_REPLY,
29 ErrorReply = RPC_ERROR,
30}
31
32#[allow(clippy::module_name_repetitions)]
33#[inline]
34pub fn rpc_err_str(v: impl fmt::Display) -> Option<Vec<u8>> {
35 Some(v.to_string().as_bytes().to_vec())
36}
37
38impl fmt::Display for RpcEventKind {
39 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
40 write!(
41 f,
42 "{}",
43 match self {
44 RpcEventKind::Notification => "notifcation",
45 RpcEventKind::Request => "request",
46 RpcEventKind::Reply => "reply",
47 RpcEventKind::ErrorReply => "error reply",
48 }
49 )
50 }
51}
52
53#[allow(clippy::module_name_repetitions)]
54#[derive(Debug)]
55pub struct RpcEvent {
56 kind: RpcEventKind,
57 frame: Frame,
58 payload_pos: usize,
59 use_header: bool,
60}
61
62impl RpcEvent {
63 #[inline]
64 pub fn kind(&self) -> RpcEventKind {
65 self.kind
66 }
67 #[inline]
68 pub fn frame(&self) -> &Frame {
69 &self.frame
70 }
71 #[inline]
72 pub fn sender(&self) -> &str {
73 self.frame.sender()
74 }
75 #[inline]
76 pub fn primary_sender(&self) -> &str {
77 self.frame.primary_sender()
78 }
79 #[inline]
80 pub fn payload(&self) -> &[u8] {
81 &self.frame().payload()[self.payload_pos..]
82 }
83 #[inline]
87 pub fn id(&self) -> u32 {
88 u32::from_le_bytes(
89 if self.use_header {
90 &self.frame.header().unwrap()[1..5]
91 } else {
92 &self.frame.payload()[1..5]
93 }
94 .try_into()
95 .unwrap(),
96 )
97 }
98 #[inline]
99 pub fn is_response_required(&self) -> bool {
100 self.id() != 0
101 }
102 #[inline]
106 pub fn method(&self) -> &[u8] {
107 if self.use_header {
108 let header = self.frame.header.as_ref().unwrap();
109 &header[5..header.len() - 1]
110 } else {
111 &self.frame().payload()[5..self.payload_pos - 1]
112 }
113 }
114 #[inline]
115 pub fn parse_method(&self) -> Result<&str, Error> {
116 std::str::from_utf8(self.method()).map_err(Into::into)
117 }
118 #[inline]
122 pub fn code(&self) -> i16 {
123 if self.kind == RpcEventKind::ErrorReply {
124 i16::from_le_bytes(
125 if self.use_header {
126 &self.frame.header().unwrap()[5..7]
127 } else {
128 &self.frame.payload()[5..7]
129 }
130 .try_into()
131 .unwrap(),
132 )
133 } else {
134 0
135 }
136 }
137}
138
139impl TryFrom<Frame> for RpcEvent {
140 type Error = Error;
141 fn try_from(frame: Frame) -> Result<Self, Self::Error> {
142 let (body, use_header) = frame
143 .header()
144 .map_or_else(|| (frame.payload(), false), |h| (h, true));
145 if body.is_empty() {
146 Err(Error::data("Empty RPC frame"))
147 } else {
148 macro_rules! check_len {
149 ($len: expr) => {
150 if body.len() < $len {
151 return Err(Error::data("Invalid RPC frame"));
152 }
153 };
154 }
155 match body[0] {
156 RPC_NOTIFICATION => Ok(RpcEvent {
157 kind: RpcEventKind::Notification,
158 frame,
159 payload_pos: usize::from(!use_header),
160 use_header: false,
161 }),
162 RPC_REQUEST => {
163 check_len!(6);
164 if use_header {
165 Ok(RpcEvent {
166 kind: RpcEventKind::Request,
167 frame,
168 payload_pos: 0,
169 use_header: true,
170 })
171 } else {
172 let mut sp = body[5..].splitn(2, |c| *c == 0);
173 let method = sp.next().ok_or_else(|| Error::data("No RPC method"))?;
174 let payload_pos = 6 + method.len();
175 sp.next()
176 .ok_or_else(|| Error::data("No RPC params block"))?;
177 Ok(RpcEvent {
178 kind: RpcEventKind::Request,
179 frame,
180 payload_pos,
181 use_header: false,
182 })
183 }
184 }
185 RPC_REPLY => {
186 check_len!(5);
187 Ok(RpcEvent {
188 kind: RpcEventKind::Reply,
189 frame,
190 payload_pos: if use_header { 0 } else { 5 },
191 use_header,
192 })
193 }
194 RPC_ERROR => {
195 check_len!(7);
196 Ok(RpcEvent {
197 kind: RpcEventKind::ErrorReply,
198 frame,
199 payload_pos: if use_header { 0 } else { 7 },
200 use_header,
201 })
202 }
203 v => Err(Error::data(format!("Unsupported RPC frame code {}", v))),
204 }
205 }
206 }
207}
208
209#[allow(clippy::module_name_repetitions)]
210#[derive(Debug)]
211pub struct RpcError {
212 code: i16,
213 data: Option<Vec<u8>>,
214}
215
216impl TryFrom<&RpcEvent> for RpcError {
217 type Error = Error;
218 #[inline]
219 fn try_from(event: &RpcEvent) -> Result<Self, Self::Error> {
220 if event.kind() == RpcEventKind::ErrorReply {
221 Ok(RpcError::new(event.code(), Some(event.payload().to_vec())))
222 } else {
223 Err(Error::data("not a RPC error"))
224 }
225 }
226}
227
228impl RpcError {
229 #[inline]
230 pub fn new(code: i16, data: Option<Vec<u8>>) -> Self {
231 Self { code, data }
232 }
233 #[inline]
234 pub fn code(&self) -> i16 {
235 self.code
236 }
237 #[inline]
238 pub fn data(&self) -> Option<&[u8]> {
239 self.data.as_deref()
240 }
241 #[inline]
242 pub fn method(err: Option<Vec<u8>>) -> Self {
243 Self {
244 code: RPC_ERROR_CODE_METHOD_NOT_FOUND,
245 data: err,
246 }
247 }
248 #[inline]
249 pub fn not_found(err: Option<Vec<u8>>) -> Self {
250 Self {
251 code: RPC_ERROR_CODE_NOT_FOUND,
252 data: err,
253 }
254 }
255 #[inline]
256 pub fn params(err: Option<Vec<u8>>) -> Self {
257 Self {
258 code: RPC_ERROR_CODE_INVALID_METHOD_PARAMS,
259 data: err,
260 }
261 }
262 #[inline]
263 pub fn parse(err: Option<Vec<u8>>) -> Self {
264 Self {
265 code: RPC_ERROR_CODE_PARSE,
266 data: err,
267 }
268 }
269 #[inline]
270 pub fn invalid(err: Option<Vec<u8>>) -> Self {
271 Self {
272 code: RPC_ERROR_CODE_INVALID_REQUEST,
273 data: err,
274 }
275 }
276 #[inline]
277 pub fn internal(err: Option<Vec<u8>>) -> Self {
278 Self {
279 code: RPC_ERROR_CODE_INTERNAL,
280 data: err,
281 }
282 }
283 #[inline]
285 pub fn convert_data(v: impl fmt::Display) -> Vec<u8> {
286 v.to_string().as_bytes().to_vec()
287 }
288}
289
290impl From<Error> for RpcError {
291 #[inline]
292 fn from(e: Error) -> RpcError {
293 RpcError {
294 code: -32000 - e.kind() as i16,
295 data: None,
296 }
297 }
298}
299
300#[cfg(feature = "broker-rpc")]
301impl From<rmp_serde::encode::Error> for RpcError {
302 #[inline]
303 fn from(e: rmp_serde::encode::Error) -> RpcError {
304 RpcError {
305 code: RPC_ERROR_CODE_INTERNAL,
306 data: Some(e.to_string().as_bytes().to_vec()),
307 }
308 }
309}
310
311impl From<regex::Error> for RpcError {
312 #[inline]
313 fn from(e: regex::Error) -> RpcError {
314 RpcError {
315 code: RPC_ERROR_CODE_PARSE,
316 data: Some(e.to_string().as_bytes().to_vec()),
317 }
318 }
319}
320
321impl From<std::io::Error> for RpcError {
322 #[inline]
323 fn from(e: std::io::Error) -> RpcError {
324 RpcError {
325 code: RPC_ERROR_CODE_INTERNAL,
326 data: Some(e.to_string().as_bytes().to_vec()),
327 }
328 }
329}
330
331#[cfg(feature = "broker-rpc")]
332impl From<rmp_serde::decode::Error> for RpcError {
333 #[inline]
334 fn from(e: rmp_serde::decode::Error) -> RpcError {
335 RpcError {
336 code: RPC_ERROR_CODE_PARSE,
337 data: Some(e.to_string().as_bytes().to_vec()),
338 }
339 }
340}
341
342impl fmt::Display for RpcError {
343 #[inline]
344 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
345 write!(f, "rpc error code: {}", self.code)
346 }
347}
348
349impl std::error::Error for RpcError {}
350
351#[allow(clippy::module_name_repetitions)]
352pub type RpcResult = Result<Option<Vec<u8>>, RpcError>;
353
354#[inline]
355pub(crate) fn prepare_call_payload(method: &str, id_bytes: &[u8]) -> Vec<u8> {
356 let m = method.as_bytes();
357 let mut payload = Vec::with_capacity(m.len() + 6);
358 payload.push(RPC_REQUEST);
359 payload.extend(id_bytes);
360 payload.extend(m);
361 payload.push(0x00);
362 payload
363}