librpc_json/
client.rs

1use std::sync::{
2    atomic::{AtomicU64, Ordering},
3    Arc,
4};
5
6use async_timer_rs::Timer;
7use futures::channel::mpsc::Receiver;
8use librpc::dispatcher::Dispatcher;
9use serde::{Deserialize, Serialize};
10
11use crate::{
12    object::{Request, Version},
13    result::{RPCError, RPCResult},
14};
15
16/// JSONRPC V2.0 client
17#[derive(Debug, Clone)]
18pub struct Client {
19    id_gen: Arc<AtomicU64>,
20    dispatcher: Dispatcher<Vec<u8>, Vec<u8>, RPCError>,
21}
22
23pub type Responder = librpc::responder::Responder<Vec<u8>, RPCError>;
24pub type Output = Receiver<(Option<u64>, Vec<u8>)>;
25
26impl Client {
27    /// Create new JSONRPC client instance with sending cache quene length.
28    pub fn new(cache_size: usize) -> (Self, Output, Responder) {
29        let (dispatcher, receiver) = Dispatcher::new(cache_size);
30
31        let responder = dispatcher.responder.clone();
32
33        (
34            Client {
35                id_gen: Default::default(),
36                dispatcher,
37            },
38            receiver,
39            responder,
40        )
41    }
42
43    /// Asynchronous send a JSONRPC v2.0 request and wait response
44    pub async fn call<P, R, T>(
45        &mut self,
46        method: &str,
47        params: P,
48        timeout: Option<T>,
49    ) -> RPCResult<R>
50    where
51        P: Serialize,
52        for<'b> R: Deserialize<'b> + Send + 'static,
53        T: Timer + Unpin,
54    {
55        let id = self.id_gen.fetch_add(1, Ordering::SeqCst);
56
57        let request = Request {
58            id: Some(id),
59            method,
60            params,
61            jsonrpc: Version::default(),
62        };
63
64        let data = serde_json::to_vec(&request).expect("Inner error, assembly json request");
65
66        let result = self.dispatcher.call(id, data, timeout).await?.await?;
67
68        Ok(serde_json::from_slice(&result)?)
69    }
70
71    /// Asynchronous send a JSONRPC v2.0 notification
72    pub async fn notification<P>(&mut self, method: &str, params: P) -> RPCResult<()>
73    where
74        P: Serialize,
75    {
76        let request = Request {
77            method,
78            params,
79            id: None,
80            jsonrpc: Version::default(),
81        };
82
83        let data = serde_json::to_vec(&request)?;
84
85        self.dispatcher.notification(data).await?;
86
87        Ok(())
88    }
89}