1use eva_common::value::Value;
2use serde::{Deserialize, Serialize};
3use std::collections::BTreeMap;
4use std::convert::{TryFrom, TryInto};
5use std::fmt;
6use std::net::IpAddr;
7
8use tokio::io::{AsyncReadExt, AsyncWriteExt};
9
10use log::debug;
11
12const ERR_UNSUPPORTED_ENCODING: &str = "Unsupported data encoding";
13const ERR_INVALID_REQUEST: &str = "Invalid request";
14
15#[path = "prelude.rs"]
16pub mod prelude;
17
18use eva_common::ERR_CODE_INTERNAL_RPC;
19use eva_common::ERR_CODE_INVALID_PARAMS;
20use eva_common::ERR_CODE_INVALID_REQUEST;
21use eva_common::ERR_CODE_METHOD_NOT_FOUND;
22
23#[derive(Debug, Serialize, Deserialize)]
24pub struct JsonRpcError {
25 pub code: i16,
26 #[serde(skip_serializing_if = "Option::is_none")]
27 pub message: Option<String>,
28}
29
30impl fmt::Display for JsonRpcError {
31 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
32 if let Some(msg) = self.message.as_ref() {
33 write!(f, "{}", msg)
34 } else {
35 write!(f, "JSON RPC Error")
36 }
37 }
38}
39
40impl From<eva_common::Error> for JsonRpcError {
41 fn from(e: eva_common::Error) -> Self {
42 Self {
43 code: e.kind() as i16,
44 message: e.message().map(ToOwned::to_owned),
45 }
46 }
47}
48
49impl JsonRpcError {
50 pub fn new<T: fmt::Display>(code: i16, message: Option<T>) -> Self {
51 Self {
52 code,
53 message: message.map(|v| v.to_string()),
54 }
55 }
56 pub fn invalid_request<T: fmt::Display>(message: T) -> Self {
57 Self::new(ERR_CODE_INVALID_REQUEST, Some(message))
58 }
59 pub fn method_not_found<T: fmt::Display>(message: T) -> Self {
60 Self::new(ERR_CODE_METHOD_NOT_FOUND, Some(message))
61 }
62 pub fn invalid_params<T: fmt::Display>(message: T) -> Self {
63 Self::new(ERR_CODE_INVALID_PARAMS, Some(message))
64 }
65 pub fn internal_rpc<T: fmt::Display>(message: T) -> Self {
66 Self::new(ERR_CODE_INTERNAL_RPC, Some(message))
67 }
68}
69
70pub type Credentials = (String, String);
71
72#[derive(Debug, Clone)]
73pub struct JsonRpcRequestMeta {
74 source: RequestSource,
75 credentials: Option<Credentials>,
76 external_token: Option<String>,
77 key: Option<String>,
78}
79
80#[derive(Debug, Clone)]
81pub enum RequestSource {
82 UnixSocket,
83 Socket(IpAddr),
84 Http(IpAddr, String),
85 Internal(String),
86 Mqtt(String),
87 Custom(String),
88}
89
90#[allow(clippy::must_use_candidate)]
91impl JsonRpcRequestMeta {
92 #[cfg(not(target_os = "windows"))]
93 pub fn unix_socket() -> Self {
94 Self {
95 source: RequestSource::UnixSocket,
96 credentials: None,
97 external_token: None,
98 key: None,
99 }
100 }
101 pub fn socket(ip: IpAddr) -> Self {
102 Self {
103 source: RequestSource::Socket(ip),
104 credentials: None,
105 external_token: None,
106 key: None,
107 }
108 }
109 pub fn http(
110 ip: IpAddr,
111 agent: String,
112 credentials: Option<Credentials>,
113 key: Option<String>,
114 ) -> Self {
115 Self {
116 source: RequestSource::Http(ip, agent),
117 credentials,
118 external_token: None,
119 key,
120 }
121 }
122 pub fn internal(id: String) -> Self {
123 Self {
124 source: RequestSource::Internal(id),
125 credentials: None,
126 external_token: None,
127 key: None,
128 }
129 }
130 pub fn mqtt(id: String) -> Self {
131 Self {
132 source: RequestSource::Mqtt(id),
133 credentials: None,
134 external_token: None,
135 key: None,
136 }
137 }
138 pub fn custom(id: String) -> Self {
139 Self {
140 source: RequestSource::Custom(id),
141 credentials: None,
142 external_token: None,
143 key: None,
144 }
145 }
146
147 pub fn ip(&self) -> Option<IpAddr> {
148 match self.source {
149 RequestSource::Socket(addr) | RequestSource::Http(addr, _) => Some(addr),
150 _ => None,
151 }
152 }
153 pub fn credentials(&self) -> Option<&Credentials> {
154 self.credentials.as_ref()
155 }
156 pub fn agent(&self) -> Option<&String> {
157 match self.source {
158 RequestSource::Http(_, ref agent) => Some(agent),
159 _ => None,
160 }
161 }
162 #[inline]
163 pub fn key(&self) -> Option<&str> {
164 self.key.as_deref()
165 }
166 #[inline]
167 pub fn take_key(&mut self) -> Option<String> {
168 self.key.take()
169 }
170 #[inline]
171 pub fn set_external_token(&mut self, token: String) {
172 self.external_token = Some(token);
173 }
174 #[inline]
175 pub fn external_token(&self) -> Option<&str> {
176 self.external_token.as_deref()
177 }
178}
179
180const ENCODING_JSON: u8 = 1;
181const ENCODING_MSGPACK: u8 = 2;
182
183#[derive(PartialEq, Debug, Copy, Clone, Default)]
184#[repr(u8)]
185pub enum Encoding {
186 #[default]
187 Json = ENCODING_JSON,
188 MsgPack = ENCODING_MSGPACK,
189}
190
191impl TryFrom<u8> for Encoding {
192 type Error = JsonRpcError;
193 fn try_from(value: u8) -> Result<Self, Self::Error> {
194 match value {
195 x if x == Encoding::Json as u8 => Ok(Encoding::Json),
196 x if x == Encoding::MsgPack as u8 => Ok(Encoding::MsgPack),
197 _ => Err(JsonRpcError::invalid_request(ERR_UNSUPPORTED_ENCODING)),
198 }
199 }
200}
201
202#[macro_export]
203macro_rules! jrpc_q {
204 ($req: expr) => {
205 if !$req.response_required() {
206 return None;
207 };
208 };
209}
210
211macro_rules! impl_err_data {
212 ($e: path) => {
213 impl From<$e> for JsonRpcError {
214 fn from(e: $e) -> JsonRpcError {
215 JsonRpcError::internal_rpc(e)
216 }
217 }
218 };
219}
220
221impl_err_data!(rmp_serde::decode::Error);
222impl_err_data!(rmp_serde::encode::Error);
223impl_err_data!(serde_json::Error);
224
225fn unpack_data<'de, V: Deserialize<'de>>(src: &'de [u8], enc: Encoding) -> Result<V, JsonRpcError> {
226 Ok(match enc {
227 Encoding::Json => serde_json::from_slice(src)?,
228 Encoding::MsgPack => rmp_serde::from_slice(src)?,
229 })
230}
231
232fn pack_data<V: Serialize>(data: &V, enc: Encoding) -> Result<Vec<u8>, JsonRpcError> {
233 Ok(match enc {
234 Encoding::Json => serde_json::to_vec(data)?,
235 Encoding::MsgPack => rmp_serde::to_vec_named(data)?,
236 })
237}
238
239impl fmt::Display for Encoding {
240 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
241 write!(
242 f,
243 "{}",
244 match self {
245 Encoding::Json => "application/json",
246 Encoding::MsgPack => "application/msgpack",
247 }
248 )
249 }
250}
251
252#[derive(Debug)]
253pub struct JrpcException {
254 code: u16,
255 headers: Vec<(String, String)>,
256 content: String,
257}
258
259impl JrpcException {
260 pub fn new(code: u16, content: String) -> Self {
261 Self {
262 code,
263 headers: Vec::new(),
264 content,
265 }
266 }
267 #[inline]
268 pub fn set_header(&mut self, header: String, value: String) {
269 self.headers.push((header, value));
270 }
271 #[inline]
272 pub fn code(&self) -> u16 {
273 self.code
274 }
275 #[inline]
276 pub fn content(&self) -> &str {
277 &self.content
278 }
279}
280
281#[derive(Serialize, Deserialize, Debug)]
282#[serde(deny_unknown_fields)]
283pub struct JsonRpcResponse {
284 jsonrpc: String,
285 pub id: Value,
286 #[serde(skip_serializing_if = "Option::is_none")]
287 pub result: Option<Value>,
288 #[serde(skip_serializing_if = "Option::is_none")]
289 pub error: Option<JsonRpcError>,
290 #[serde(skip, default)]
291 pub encoding: Encoding,
292 #[serde(skip, default)]
293 pub jexception: Option<JrpcException>,
294}
295
296#[allow(clippy::must_use_candidate)]
297impl JsonRpcResponse {
298 pub fn pack(&self) -> Result<Vec<u8>, JsonRpcError> {
302 pack_data(self, self.encoding)
303 }
304
305 pub fn unpack(src: &[u8], encoding: Encoding) -> Result<Self, JsonRpcError> {
309 let response: Self = unpack_data(src, encoding)?;
310 if response.is_valid() {
311 Ok(response)
312 } else {
313 Err(JsonRpcError::invalid_request(ERR_INVALID_REQUEST))
314 }
315 }
316
317 #[must_use]
318 pub fn is_valid(&self) -> bool {
319 self.jsonrpc == "2.0"
320 }
321
322 pub fn is_ok(&self) -> bool {
323 self.error.is_none()
324 }
325
326 pub fn is_err(&self) -> bool {
327 self.error.is_some()
328 }
329 pub fn jexception(&mut self, jexception: JrpcException) {
334 self.jexception = Some(jexception);
335 }
336 pub fn take_jexception(&mut self) -> Option<JrpcException> {
337 self.jexception.take()
338 }
339}
340
341#[derive(Serialize, Deserialize, Debug)]
342#[serde(deny_unknown_fields)]
343pub struct JsonRpcRequest {
344 jsonrpc: String,
345 id: Option<Value>,
346 pub method: String,
347 #[serde(default)]
348 pub params: Option<Value>,
349 #[serde(skip, default)]
350 encoding: Encoding,
351}
352
353#[allow(clippy::must_use_candidate)]
354impl<'a> JsonRpcRequest {
355 #[inline]
356 pub fn new(id: Option<Value>, method: &str, params: Option<Value>, encoding: Encoding) -> Self {
357 Self {
358 id,
359 jsonrpc: "2.0".to_owned(),
360 method: method.to_owned(),
361 params,
362 encoding,
363 }
364 }
365 #[inline]
366 pub fn set_params(&mut self, value: Value) {
367 self.params = Some(value);
368 }
369 #[inline]
370 pub fn response_required(&self) -> bool {
371 self.id.is_some()
372 }
373 #[inline]
374 pub fn respond(&self, result: Value) -> Option<JsonRpcResponse> {
375 self.id.as_ref().map(|id| JsonRpcResponse {
376 jsonrpc: self.jsonrpc.clone(),
377 id: id.clone(),
378 result: Some(result),
379 error: None,
380 encoding: self.encoding,
381 jexception: None,
382 })
383 }
384
385 #[inline]
389 pub fn unpack(src: &[u8], encoding: Encoding) -> Result<Self, JsonRpcError> {
390 let request: JsonRpcRequest = unpack_data(src, encoding)?;
391 if request.is_valid() {
392 Ok(request)
393 } else {
394 Err(JsonRpcError::invalid_request(ERR_INVALID_REQUEST))
395 }
396 }
397
398 #[inline]
402 pub fn pack(&self) -> Result<Vec<u8>, JsonRpcError> {
403 pack_data(self, self.encoding)
404 }
405
406 #[inline]
407 pub fn respond_ok(&self) -> Option<JsonRpcResponse> {
408 let mut map = BTreeMap::new();
409 map.insert(Value::String("ok".to_owned()), Value::Bool(true));
410 self.respond(Value::Map(map))
411 }
412
413 #[inline]
414 pub fn is_valid(&self) -> bool {
415 self.jsonrpc == "2.0"
416 }
417
418 #[inline]
419 pub fn error(&self, err: JsonRpcError) -> Option<JsonRpcResponse> {
420 self.id.as_ref().map(|id| JsonRpcResponse {
421 jsonrpc: self.jsonrpc.clone(),
422 id: id.clone(),
423 result: None,
424 error: Some(err),
425 encoding: self.encoding,
426 jexception: None,
427 })
428 }
429
430 #[inline]
431 pub fn jexception(&self, jexception: JrpcException) -> Option<JsonRpcResponse> {
432 self.id.as_ref().map(|id| JsonRpcResponse {
433 jsonrpc: self.jsonrpc.clone(),
434 id: id.clone(),
435 result: None,
436 error: None,
437 encoding: self.encoding,
438 jexception: Some(jexception),
439 })
440 }
441
442 #[inline]
443 pub fn method_not_found(&self) -> Option<JsonRpcResponse> {
444 self.error(JsonRpcError::method_not_found(&self.method))
445 }
446
447 #[inline]
448 pub fn invalid_params(&'a self, message: &'a str) -> Option<JsonRpcResponse> {
449 self.error(JsonRpcError::invalid_params(message))
450 }
451
452 #[inline]
453 pub fn take_params(&'a mut self) -> Option<Value> {
454 self.params.take()
455 }
456}
457
458#[derive(Debug)]
464pub struct JsonRpcBatch {
465 pub single: bool,
466 requests: Vec<JsonRpcRequest>,
467 pub responses: Vec<JsonRpcResponse>,
468 pub encoding: Encoding,
469 id: u32,
470 meta: JsonRpcRequestMeta,
471 jexception: Option<JrpcException>,
472}
473
474impl JsonRpcBatch {
475 #[must_use]
476 pub fn new(meta: JsonRpcRequestMeta) -> Self {
477 JsonRpcBatch {
478 single: true,
479 requests: Vec::new(),
480 responses: Vec::new(),
481 encoding: Encoding::Json,
482 id: 0,
483 meta,
484 jexception: None,
485 }
486 }
487
488 pub fn add(&mut self, req: JsonRpcRequest) {
489 if !self.requests.is_empty() {
490 self.single = false;
491 }
492 self.requests.push(req);
493 }
494
495 pub fn prepare(&mut self, method: &str, params: Option<Value>) -> u32 {
496 self.id += 1;
497 let req = JsonRpcRequest::new(Some(Value::U32(self.id)), method, params, self.encoding);
498 self.add(req);
499 self.id
500 }
501
502 pub fn prepare_muted(&mut self, method: &str, params: Option<Value>) -> u32 {
503 let req = JsonRpcRequest::new(None, method, params, self.encoding);
504 self.add(req);
505 self.id
506 }
507
508 pub async fn process<F, Fut>(&mut self, mut processor: F)
509 where
510 F: FnMut(JsonRpcRequest, JsonRpcRequestMeta) -> Fut,
511 Fut: std::future::Future<Output = Option<JsonRpcResponse>>,
512 {
513 while let Some(req) = self.requests.pop() {
514 if let Some(mut v) = processor(req, self.meta.clone()).await {
515 if let Some(jexception) = v.take_jexception() {
516 self.jexception = Some(jexception);
517 break;
518 }
519 self.responses.push(v);
520 }
521 }
522 }
523
524 pub fn take_jexception(&mut self) -> Option<JrpcException> {
525 self.jexception.take()
526 }
527
528 pub fn pack_requests(
532 &self,
533 encoding: Option<Encoding>,
534 ) -> Result<Option<Vec<u8>>, JsonRpcError> {
535 let enc = encoding.unwrap_or(self.encoding);
536 Ok(if self.requests.is_empty() {
537 None
538 } else if self.single {
539 if let Some(v) = self.requests.first() {
540 Some(pack_data(v, enc)?)
541 } else {
542 None
543 }
544 } else {
545 Some(pack_data(&self.requests, enc)?)
546 })
547 }
548
549 pub fn pack_responces(
553 &self,
554 encoding: Option<Encoding>,
555 ) -> Result<Option<Vec<u8>>, JsonRpcError> {
556 let enc = encoding.unwrap_or(self.encoding);
557 Ok(if self.responses.is_empty() {
558 None
559 } else if self.single {
560 if let Some(v) = self.responses.first() {
561 Some(pack_data(&v, enc)?)
562 } else {
563 None
564 }
565 } else {
566 Some(pack_data(&self.responses, enc)?)
567 })
568 }
569
570 pub fn unpack_requests(
574 &mut self,
575 src: &[u8],
576 encoding: Option<Encoding>,
577 ) -> Result<(), JsonRpcError> {
578 let enc = encoding.unwrap_or(self.encoding);
579 match enc {
580 Encoding::Json => {
581 if let Ok(v) = serde_json::from_slice(src) {
582 self.requests = v;
583 self.single = false;
584 } else {
585 let request: JsonRpcRequest = serde_json::from_slice(src)?;
586 self.requests.clear();
587 self.requests.push(request);
588 self.single = true;
589 }
590 }
591 Encoding::MsgPack => {
592 if let Ok(v) = rmp_serde::from_slice(src) {
593 self.requests = v;
594 self.single = false;
595 } else {
596 let request: JsonRpcRequest = rmp_serde::from_slice(src)?;
597 self.requests.clear();
598 self.requests.push(request);
599 self.single = true;
600 }
601 }
602 }
603 for r in &self.requests {
604 if !r.is_valid() {
605 return Err(JsonRpcError::invalid_request(ERR_INVALID_REQUEST));
606 }
607 }
608 Ok(())
609 }
610
611 pub fn unpack_responses(
615 &mut self,
616 src: &[u8],
617 encoding: Option<Encoding>,
618 ) -> Result<(), JsonRpcError> {
619 if src.is_empty() {
620 return Ok(());
621 }
622 let enc = encoding.unwrap_or(self.encoding);
623 match enc {
624 Encoding::Json => {
625 if let Ok(v) = serde_json::from_slice(src) {
626 self.responses = v;
627 self.single = false;
628 } else {
629 let response: JsonRpcResponse = serde_json::from_slice(src)?;
630 self.responses.clear();
631 self.responses.push(response);
632 self.single = true;
633 }
634 }
635 Encoding::MsgPack => {
636 if let Ok(v) = rmp_serde::from_slice(src) {
637 self.responses = v;
638 self.single = false;
639 } else {
640 let response: JsonRpcResponse = rmp_serde::from_slice(src)?;
641 self.responses.clear();
642 self.responses.push(response);
643 self.single = true;
644 }
645 }
646 }
647 Ok(())
648 }
649}
650
651#[cfg(feature = "http")]
652#[path = "http.rs"]
653pub mod http;
654
655macro_rules! parse_request_header {
656 ($stream:expr, $buf:expr, $encoding:expr, $len:expr) => {
657 match $stream.read_exact(&mut $buf).await {
658 Ok(_) => {
659 $encoding = match $buf[0].try_into() {
660 Ok(v) => v,
661 Err(e) => {
662 debug!("{}", e);
663 break;
664 }
665 };
666 $len = u32::from_le_bytes([$buf[1], $buf[2], $buf[3], $buf[4]]);
667 }
668 Err(ref e) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
669 break;
670 }
671 Err(e) => {
672 debug!("API read error {}", e);
673 break;
674 }
675 }
676 };
677}
678
679pub async fn jrpc_stream_worker<T, F, Fut>(
680 stream: &mut T,
681 meta: JsonRpcRequestMeta,
682 mut processor: F,
683) where
684 F: FnMut(JsonRpcRequest, JsonRpcRequestMeta) -> Fut,
685 Fut: std::future::Future<Output = Option<JsonRpcResponse>>,
686 T: AsyncReadExt + AsyncWriteExt + Unpin,
687{
688 loop {
689 let mut header = [0_u8; 5];
690 let encoding: Encoding;
691 let frame_len: u32;
692 parse_request_header!(stream, header, encoding, frame_len);
693 let mut buf: Vec<u8> = vec![0; frame_len as usize];
694 match stream.read_exact(&mut buf).await {
695 Ok(_) => {
696 let mut batch = JsonRpcBatch::new(meta.clone());
697 batch.encoding = encoding;
698 if let Err(e) = batch.unpack_requests(&buf, None) {
699 debug!("JSON RPC payload unpack error {}", e);
700 break;
701 }
702 #[allow(clippy::redundant_closure)]
703 batch.process(|x, m| processor(x, m)).await;
704 if let Some(_jexception) = batch.take_jexception() {
705 break;
708 }
709 let response_buf = match batch.pack_responces(None) {
710 Ok(v) => v,
711 Err(e) => {
712 debug!("JSON RPC response serialize error {}", e);
713 break;
714 }
715 };
716 if let Some(resp) = response_buf {
717 let mut response = vec![header[0]];
718 #[allow(clippy::cast_possible_truncation)]
719 response.extend((resp.len() as u32).to_le_bytes());
720 response.extend(resp);
721 if let Err(e) = stream.write_all(&response).await {
722 debug!("JSON RPC API response write error {}", e);
723 break;
724 }
725 }
726 }
727 Err(e) => {
728 debug!("Socket error {}", e);
729 break;
730 }
731 }
732 }
733}
734
735impl From<std::net::SocketAddr> for JsonRpcRequestMeta {
736 fn from(addr: std::net::SocketAddr) -> Self {
737 JsonRpcRequestMeta::socket(addr.ip())
738 }
739}
740
741#[cfg(not(target_os = "windows"))]
742impl From<tokio::net::unix::SocketAddr> for JsonRpcRequestMeta {
743 fn from(_addr: tokio::net::unix::SocketAddr) -> Self {
744 JsonRpcRequestMeta::unix_socket()
745 }
746}
747
748#[macro_export]
749macro_rules! jrpc_stream_server {
750 ($listener: expr, $processor: expr) => {
751 loop {
752 match $listener.accept().await {
753 Ok((mut stream, addr)) => {
754 debug!("JSON RPC server new connection: {:?}", addr);
755 let meta = addr.into();
756 tokio::spawn(async move {
757 jrpc_stream_worker(&mut stream, meta, processor).await;
758 });
759 }
760 Err(e) => {
761 debug!("JSON RPC server errror: {}", e);
762 }
763 }
764 }
765 };
766}