rjrpc/
lib.rs

1use eva_common::value::Value;
2use serde::{Deserialize, Serialize};
3use std::collections::BTreeMap;
4use std::convert::{TryFrom, TryInto};
5use std::fmt;
6use std::net::IpAddr;
7
8use tokio::io::{AsyncReadExt, AsyncWriteExt};
9
10use log::debug;
11
12const ERR_UNSUPPORTED_ENCODING: &str = "Unsupported data encoding";
13const ERR_INVALID_REQUEST: &str = "Invalid request";
14
15#[path = "prelude.rs"]
16pub mod prelude;
17
18use eva_common::ERR_CODE_INTERNAL_RPC;
19use eva_common::ERR_CODE_INVALID_PARAMS;
20use eva_common::ERR_CODE_INVALID_REQUEST;
21use eva_common::ERR_CODE_METHOD_NOT_FOUND;
22
23#[derive(Debug, Serialize, Deserialize)]
24pub struct JsonRpcError {
25    pub code: i16,
26    #[serde(skip_serializing_if = "Option::is_none")]
27    pub message: Option<String>,
28}
29
30impl fmt::Display for JsonRpcError {
31    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
32        if let Some(msg) = self.message.as_ref() {
33            write!(f, "{}", msg)
34        } else {
35            write!(f, "JSON RPC Error")
36        }
37    }
38}
39
40impl From<eva_common::Error> for JsonRpcError {
41    fn from(e: eva_common::Error) -> Self {
42        Self {
43            code: e.kind() as i16,
44            message: e.message().map(ToOwned::to_owned),
45        }
46    }
47}
48
49impl JsonRpcError {
50    pub fn new<T: fmt::Display>(code: i16, message: Option<T>) -> Self {
51        Self {
52            code,
53            message: message.map(|v| v.to_string()),
54        }
55    }
56    pub fn invalid_request<T: fmt::Display>(message: T) -> Self {
57        Self::new(ERR_CODE_INVALID_REQUEST, Some(message))
58    }
59    pub fn method_not_found<T: fmt::Display>(message: T) -> Self {
60        Self::new(ERR_CODE_METHOD_NOT_FOUND, Some(message))
61    }
62    pub fn invalid_params<T: fmt::Display>(message: T) -> Self {
63        Self::new(ERR_CODE_INVALID_PARAMS, Some(message))
64    }
65    pub fn internal_rpc<T: fmt::Display>(message: T) -> Self {
66        Self::new(ERR_CODE_INTERNAL_RPC, Some(message))
67    }
68}
69
70pub type Credentials = (String, String);
71
72#[derive(Debug, Clone)]
73pub struct JsonRpcRequestMeta {
74    source: RequestSource,
75    credentials: Option<Credentials>,
76    external_token: Option<String>,
77    key: Option<String>,
78}
79
80#[derive(Debug, Clone)]
81pub enum RequestSource {
82    UnixSocket,
83    Socket(IpAddr),
84    Http(IpAddr, String),
85    Internal(String),
86    Mqtt(String),
87    Custom(String),
88}
89
90#[allow(clippy::must_use_candidate)]
91impl JsonRpcRequestMeta {
92    #[cfg(not(target_os = "windows"))]
93    pub fn unix_socket() -> Self {
94        Self {
95            source: RequestSource::UnixSocket,
96            credentials: None,
97            external_token: None,
98            key: None,
99        }
100    }
101    pub fn socket(ip: IpAddr) -> Self {
102        Self {
103            source: RequestSource::Socket(ip),
104            credentials: None,
105            external_token: None,
106            key: None,
107        }
108    }
109    pub fn http(
110        ip: IpAddr,
111        agent: String,
112        credentials: Option<Credentials>,
113        key: Option<String>,
114    ) -> Self {
115        Self {
116            source: RequestSource::Http(ip, agent),
117            credentials,
118            external_token: None,
119            key,
120        }
121    }
122    pub fn internal(id: String) -> Self {
123        Self {
124            source: RequestSource::Internal(id),
125            credentials: None,
126            external_token: None,
127            key: None,
128        }
129    }
130    pub fn mqtt(id: String) -> Self {
131        Self {
132            source: RequestSource::Mqtt(id),
133            credentials: None,
134            external_token: None,
135            key: None,
136        }
137    }
138    pub fn custom(id: String) -> Self {
139        Self {
140            source: RequestSource::Custom(id),
141            credentials: None,
142            external_token: None,
143            key: None,
144        }
145    }
146
147    pub fn ip(&self) -> Option<IpAddr> {
148        match self.source {
149            RequestSource::Socket(addr) | RequestSource::Http(addr, _) => Some(addr),
150            _ => None,
151        }
152    }
153    pub fn credentials(&self) -> Option<&Credentials> {
154        self.credentials.as_ref()
155    }
156    pub fn agent(&self) -> Option<&String> {
157        match self.source {
158            RequestSource::Http(_, ref agent) => Some(agent),
159            _ => None,
160        }
161    }
162    #[inline]
163    pub fn key(&self) -> Option<&str> {
164        self.key.as_deref()
165    }
166    #[inline]
167    pub fn take_key(&mut self) -> Option<String> {
168        self.key.take()
169    }
170    #[inline]
171    pub fn set_external_token(&mut self, token: String) {
172        self.external_token = Some(token);
173    }
174    #[inline]
175    pub fn external_token(&self) -> Option<&str> {
176        self.external_token.as_deref()
177    }
178}
179
180const ENCODING_JSON: u8 = 1;
181const ENCODING_MSGPACK: u8 = 2;
182
183#[derive(PartialEq, Debug, Copy, Clone, Default)]
184#[repr(u8)]
185pub enum Encoding {
186    #[default]
187    Json = ENCODING_JSON,
188    MsgPack = ENCODING_MSGPACK,
189}
190
191impl TryFrom<u8> for Encoding {
192    type Error = JsonRpcError;
193    fn try_from(value: u8) -> Result<Self, Self::Error> {
194        match value {
195            x if x == Encoding::Json as u8 => Ok(Encoding::Json),
196            x if x == Encoding::MsgPack as u8 => Ok(Encoding::MsgPack),
197            _ => Err(JsonRpcError::invalid_request(ERR_UNSUPPORTED_ENCODING)),
198        }
199    }
200}
201
202#[macro_export]
203macro_rules! jrpc_q {
204    ($req: expr) => {
205        if !$req.response_required() {
206            return None;
207        };
208    };
209}
210
211macro_rules! impl_err_data {
212    ($e: path) => {
213        impl From<$e> for JsonRpcError {
214            fn from(e: $e) -> JsonRpcError {
215                JsonRpcError::internal_rpc(e)
216            }
217        }
218    };
219}
220
221impl_err_data!(rmp_serde::decode::Error);
222impl_err_data!(rmp_serde::encode::Error);
223impl_err_data!(serde_json::Error);
224
225fn unpack_data<'de, V: Deserialize<'de>>(src: &'de [u8], enc: Encoding) -> Result<V, JsonRpcError> {
226    Ok(match enc {
227        Encoding::Json => serde_json::from_slice(src)?,
228        Encoding::MsgPack => rmp_serde::from_slice(src)?,
229    })
230}
231
232fn pack_data<V: Serialize>(data: &V, enc: Encoding) -> Result<Vec<u8>, JsonRpcError> {
233    Ok(match enc {
234        Encoding::Json => serde_json::to_vec(data)?,
235        Encoding::MsgPack => rmp_serde::to_vec_named(data)?,
236    })
237}
238
239impl fmt::Display for Encoding {
240    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
241        write!(
242            f,
243            "{}",
244            match self {
245                Encoding::Json => "application/json",
246                Encoding::MsgPack => "application/msgpack",
247            }
248        )
249    }
250}
251
252#[derive(Debug)]
253pub struct JrpcException {
254    code: u16,
255    headers: Vec<(String, String)>,
256    content: String,
257}
258
259impl JrpcException {
260    pub fn new(code: u16, content: String) -> Self {
261        Self {
262            code,
263            headers: Vec::new(),
264            content,
265        }
266    }
267    #[inline]
268    pub fn set_header(&mut self, header: String, value: String) {
269        self.headers.push((header, value));
270    }
271    #[inline]
272    pub fn code(&self) -> u16 {
273        self.code
274    }
275    #[inline]
276    pub fn content(&self) -> &str {
277        &self.content
278    }
279}
280
281#[derive(Serialize, Deserialize, Debug)]
282#[serde(deny_unknown_fields)]
283pub struct JsonRpcResponse {
284    jsonrpc: String,
285    pub id: Value,
286    #[serde(skip_serializing_if = "Option::is_none")]
287    pub result: Option<Value>,
288    #[serde(skip_serializing_if = "Option::is_none")]
289    pub error: Option<JsonRpcError>,
290    #[serde(skip, default)]
291    pub encoding: Encoding,
292    #[serde(skip, default)]
293    pub jexception: Option<JrpcException>,
294}
295
296#[allow(clippy::must_use_candidate)]
297impl JsonRpcResponse {
298    /// # Errors
299    ///
300    /// Will return `Err` if failed to pack the response
301    pub fn pack(&self) -> Result<Vec<u8>, JsonRpcError> {
302        pack_data(self, self.encoding)
303    }
304
305    /// # Errors
306    ///
307    /// Will return `Err` if failed to unpack the response
308    pub fn unpack(src: &[u8], encoding: Encoding) -> Result<Self, JsonRpcError> {
309        let response: Self = unpack_data(src, encoding)?;
310        if response.is_valid() {
311            Ok(response)
312        } else {
313            Err(JsonRpcError::invalid_request(ERR_INVALID_REQUEST))
314        }
315    }
316
317    #[must_use]
318    pub fn is_valid(&self) -> bool {
319        self.jsonrpc == "2.0"
320    }
321
322    pub fn is_ok(&self) -> bool {
323        self.error.is_none()
324    }
325
326    pub fn is_err(&self) -> bool {
327        self.error.is_some()
328    }
329    /// jexceptions special exception
330    ///
331    /// If jexceptions, batches stop processing
332    /// JrpcException is
333    pub fn jexception(&mut self, jexception: JrpcException) {
334        self.jexception = Some(jexception);
335    }
336    pub fn take_jexception(&mut self) -> Option<JrpcException> {
337        self.jexception.take()
338    }
339}
340
341#[derive(Serialize, Deserialize, Debug)]
342#[serde(deny_unknown_fields)]
343pub struct JsonRpcRequest {
344    jsonrpc: String,
345    id: Option<Value>,
346    pub method: String,
347    #[serde(default)]
348    pub params: Option<Value>,
349    #[serde(skip, default)]
350    encoding: Encoding,
351}
352
353#[allow(clippy::must_use_candidate)]
354impl<'a> JsonRpcRequest {
355    #[inline]
356    pub fn new(id: Option<Value>, method: &str, params: Option<Value>, encoding: Encoding) -> Self {
357        Self {
358            id,
359            jsonrpc: "2.0".to_owned(),
360            method: method.to_owned(),
361            params,
362            encoding,
363        }
364    }
365    #[inline]
366    pub fn set_params(&mut self, value: Value) {
367        self.params = Some(value);
368    }
369    #[inline]
370    pub fn response_required(&self) -> bool {
371        self.id.is_some()
372    }
373    #[inline]
374    pub fn respond(&self, result: Value) -> Option<JsonRpcResponse> {
375        self.id.as_ref().map(|id| JsonRpcResponse {
376            jsonrpc: self.jsonrpc.clone(),
377            id: id.clone(),
378            result: Some(result),
379            error: None,
380            encoding: self.encoding,
381            jexception: None,
382        })
383    }
384
385    /// # Errors
386    ///
387    /// Will return `Err` if failed to unpack the request
388    #[inline]
389    pub fn unpack(src: &[u8], encoding: Encoding) -> Result<Self, JsonRpcError> {
390        let request: JsonRpcRequest = unpack_data(src, encoding)?;
391        if request.is_valid() {
392            Ok(request)
393        } else {
394            Err(JsonRpcError::invalid_request(ERR_INVALID_REQUEST))
395        }
396    }
397
398    /// # Errors
399    ///
400    /// Will return `Err` if failed to pack the request
401    #[inline]
402    pub fn pack(&self) -> Result<Vec<u8>, JsonRpcError> {
403        pack_data(self, self.encoding)
404    }
405
406    #[inline]
407    pub fn respond_ok(&self) -> Option<JsonRpcResponse> {
408        let mut map = BTreeMap::new();
409        map.insert(Value::String("ok".to_owned()), Value::Bool(true));
410        self.respond(Value::Map(map))
411    }
412
413    #[inline]
414    pub fn is_valid(&self) -> bool {
415        self.jsonrpc == "2.0"
416    }
417
418    #[inline]
419    pub fn error(&self, err: JsonRpcError) -> Option<JsonRpcResponse> {
420        self.id.as_ref().map(|id| JsonRpcResponse {
421            jsonrpc: self.jsonrpc.clone(),
422            id: id.clone(),
423            result: None,
424            error: Some(err),
425            encoding: self.encoding,
426            jexception: None,
427        })
428    }
429
430    #[inline]
431    pub fn jexception(&self, jexception: JrpcException) -> Option<JsonRpcResponse> {
432        self.id.as_ref().map(|id| JsonRpcResponse {
433            jsonrpc: self.jsonrpc.clone(),
434            id: id.clone(),
435            result: None,
436            error: None,
437            encoding: self.encoding,
438            jexception: Some(jexception),
439        })
440    }
441
442    #[inline]
443    pub fn method_not_found(&self) -> Option<JsonRpcResponse> {
444        self.error(JsonRpcError::method_not_found(&self.method))
445    }
446
447    #[inline]
448    pub fn invalid_params(&'a self, message: &'a str) -> Option<JsonRpcResponse> {
449        self.error(JsonRpcError::invalid_params(message))
450    }
451
452    #[inline]
453    pub fn take_params(&'a mut self) -> Option<Value> {
454        self.params.take()
455    }
456}
457
458/*
459 * Universal binary proto header (4 bytes)
460 * Byte 0: data encoding (1 - Json, 2 - MsgPack), u8
461 * Byte 1-3: frame length, u32
462 */
463#[derive(Debug)]
464pub struct JsonRpcBatch {
465    pub single: bool,
466    requests: Vec<JsonRpcRequest>,
467    pub responses: Vec<JsonRpcResponse>,
468    pub encoding: Encoding,
469    id: u32,
470    meta: JsonRpcRequestMeta,
471    jexception: Option<JrpcException>,
472}
473
474impl JsonRpcBatch {
475    #[must_use]
476    pub fn new(meta: JsonRpcRequestMeta) -> Self {
477        JsonRpcBatch {
478            single: true,
479            requests: Vec::new(),
480            responses: Vec::new(),
481            encoding: Encoding::Json,
482            id: 0,
483            meta,
484            jexception: None,
485        }
486    }
487
488    pub fn add(&mut self, req: JsonRpcRequest) {
489        if !self.requests.is_empty() {
490            self.single = false;
491        }
492        self.requests.push(req);
493    }
494
495    pub fn prepare(&mut self, method: &str, params: Option<Value>) -> u32 {
496        self.id += 1;
497        let req = JsonRpcRequest::new(Some(Value::U32(self.id)), method, params, self.encoding);
498        self.add(req);
499        self.id
500    }
501
502    pub fn prepare_muted(&mut self, method: &str, params: Option<Value>) -> u32 {
503        let req = JsonRpcRequest::new(None, method, params, self.encoding);
504        self.add(req);
505        self.id
506    }
507
508    pub async fn process<F, Fut>(&mut self, mut processor: F)
509    where
510        F: FnMut(JsonRpcRequest, JsonRpcRequestMeta) -> Fut,
511        Fut: std::future::Future<Output = Option<JsonRpcResponse>>,
512    {
513        while let Some(req) = self.requests.pop() {
514            if let Some(mut v) = processor(req, self.meta.clone()).await {
515                if let Some(jexception) = v.take_jexception() {
516                    self.jexception = Some(jexception);
517                    break;
518                }
519                self.responses.push(v);
520            }
521        }
522    }
523
524    pub fn take_jexception(&mut self) -> Option<JrpcException> {
525        self.jexception.take()
526    }
527
528    /// # Errors
529    ///
530    /// Will return `Err` if failed to serialize request(s)
531    pub fn pack_requests(
532        &self,
533        encoding: Option<Encoding>,
534    ) -> Result<Option<Vec<u8>>, JsonRpcError> {
535        let enc = encoding.unwrap_or(self.encoding);
536        Ok(if self.requests.is_empty() {
537            None
538        } else if self.single {
539            if let Some(v) = self.requests.first() {
540                Some(pack_data(v, enc)?)
541            } else {
542                None
543            }
544        } else {
545            Some(pack_data(&self.requests, enc)?)
546        })
547    }
548
549    /// # Errors
550    ///
551    /// Will return `Err` if failed to serialize response(s)
552    pub fn pack_responces(
553        &self,
554        encoding: Option<Encoding>,
555    ) -> Result<Option<Vec<u8>>, JsonRpcError> {
556        let enc = encoding.unwrap_or(self.encoding);
557        Ok(if self.responses.is_empty() {
558            None
559        } else if self.single {
560            if let Some(v) = self.responses.first() {
561                Some(pack_data(&v, enc)?)
562            } else {
563                None
564            }
565        } else {
566            Some(pack_data(&self.responses, enc)?)
567        })
568    }
569
570    /// # Errors
571    ///
572    /// Will return `Err` if failed to deserialize requests(s)
573    pub fn unpack_requests(
574        &mut self,
575        src: &[u8],
576        encoding: Option<Encoding>,
577    ) -> Result<(), JsonRpcError> {
578        let enc = encoding.unwrap_or(self.encoding);
579        match enc {
580            Encoding::Json => {
581                if let Ok(v) = serde_json::from_slice(src) {
582                    self.requests = v;
583                    self.single = false;
584                } else {
585                    let request: JsonRpcRequest = serde_json::from_slice(src)?;
586                    self.requests.clear();
587                    self.requests.push(request);
588                    self.single = true;
589                }
590            }
591            Encoding::MsgPack => {
592                if let Ok(v) = rmp_serde::from_slice(src) {
593                    self.requests = v;
594                    self.single = false;
595                } else {
596                    let request: JsonRpcRequest = rmp_serde::from_slice(src)?;
597                    self.requests.clear();
598                    self.requests.push(request);
599                    self.single = true;
600                }
601            }
602        }
603        for r in &self.requests {
604            if !r.is_valid() {
605                return Err(JsonRpcError::invalid_request(ERR_INVALID_REQUEST));
606            }
607        }
608        Ok(())
609    }
610
611    /// # Errors
612    ///
613    /// Will return `Err` if failed to deserialize response(s)
614    pub fn unpack_responses(
615        &mut self,
616        src: &[u8],
617        encoding: Option<Encoding>,
618    ) -> Result<(), JsonRpcError> {
619        if src.is_empty() {
620            return Ok(());
621        }
622        let enc = encoding.unwrap_or(self.encoding);
623        match enc {
624            Encoding::Json => {
625                if let Ok(v) = serde_json::from_slice(src) {
626                    self.responses = v;
627                    self.single = false;
628                } else {
629                    let response: JsonRpcResponse = serde_json::from_slice(src)?;
630                    self.responses.clear();
631                    self.responses.push(response);
632                    self.single = true;
633                }
634            }
635            Encoding::MsgPack => {
636                if let Ok(v) = rmp_serde::from_slice(src) {
637                    self.responses = v;
638                    self.single = false;
639                } else {
640                    let response: JsonRpcResponse = rmp_serde::from_slice(src)?;
641                    self.responses.clear();
642                    self.responses.push(response);
643                    self.single = true;
644                }
645            }
646        }
647        Ok(())
648    }
649}
650
651#[cfg(feature = "http")]
652#[path = "http.rs"]
653pub mod http;
654
655macro_rules! parse_request_header {
656    ($stream:expr, $buf:expr, $encoding:expr, $len:expr) => {
657        match $stream.read_exact(&mut $buf).await {
658            Ok(_) => {
659                $encoding = match $buf[0].try_into() {
660                    Ok(v) => v,
661                    Err(e) => {
662                        debug!("{}", e);
663                        break;
664                    }
665                };
666                $len = u32::from_le_bytes([$buf[1], $buf[2], $buf[3], $buf[4]]);
667            }
668            Err(ref e) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
669                break;
670            }
671            Err(e) => {
672                debug!("API read error {}", e);
673                break;
674            }
675        }
676    };
677}
678
679pub async fn jrpc_stream_worker<T, F, Fut>(
680    stream: &mut T,
681    meta: JsonRpcRequestMeta,
682    mut processor: F,
683) where
684    F: FnMut(JsonRpcRequest, JsonRpcRequestMeta) -> Fut,
685    Fut: std::future::Future<Output = Option<JsonRpcResponse>>,
686    T: AsyncReadExt + AsyncWriteExt + Unpin,
687{
688    loop {
689        let mut header = [0_u8; 5];
690        let encoding: Encoding;
691        let frame_len: u32;
692        parse_request_header!(stream, header, encoding, frame_len);
693        let mut buf: Vec<u8> = vec![0; frame_len as usize];
694        match stream.read_exact(&mut buf).await {
695            Ok(_) => {
696                let mut batch = JsonRpcBatch::new(meta.clone());
697                batch.encoding = encoding;
698                if let Err(e) = batch.unpack_requests(&buf, None) {
699                    debug!("JSON RPC payload unpack error {}", e);
700                    break;
701                }
702                #[allow(clippy::redundant_closure)]
703                batch.process(|x, m| processor(x, m)).await;
704                if let Some(_jexception) = batch.take_jexception() {
705                    // can't send jexception to socket stream
706                    // just closing it
707                    break;
708                }
709                let response_buf = match batch.pack_responces(None) {
710                    Ok(v) => v,
711                    Err(e) => {
712                        debug!("JSON RPC response serialize error {}", e);
713                        break;
714                    }
715                };
716                if let Some(resp) = response_buf {
717                    let mut response = vec![header[0]];
718                    #[allow(clippy::cast_possible_truncation)]
719                    response.extend((resp.len() as u32).to_le_bytes());
720                    response.extend(resp);
721                    if let Err(e) = stream.write_all(&response).await {
722                        debug!("JSON RPC API response write error {}", e);
723                        break;
724                    }
725                }
726            }
727            Err(e) => {
728                debug!("Socket error {}", e);
729                break;
730            }
731        }
732    }
733}
734
735impl From<std::net::SocketAddr> for JsonRpcRequestMeta {
736    fn from(addr: std::net::SocketAddr) -> Self {
737        JsonRpcRequestMeta::socket(addr.ip())
738    }
739}
740
741#[cfg(not(target_os = "windows"))]
742impl From<tokio::net::unix::SocketAddr> for JsonRpcRequestMeta {
743    fn from(_addr: tokio::net::unix::SocketAddr) -> Self {
744        JsonRpcRequestMeta::unix_socket()
745    }
746}
747
748#[macro_export]
749macro_rules! jrpc_stream_server {
750    ($listener: expr, $processor: expr) => {
751        loop {
752            match $listener.accept().await {
753                Ok((mut stream, addr)) => {
754                    debug!("JSON RPC server new connection: {:?}", addr);
755                    let meta = addr.into();
756                    tokio::spawn(async move {
757                        jrpc_stream_worker(&mut stream, meta, processor).await;
758                    });
759                }
760                Err(e) => {
761                    debug!("JSON RPC server errror: {}", e);
762                }
763            }
764        }
765    };
766}