1use std::sync::{
2 atomic::{AtomicU64, Ordering},
3 Arc,
4};
5
6use async_timer_rs::Timer;
7use futures::channel::mpsc::Receiver;
8use librpc::dispatcher::Dispatcher;
9use serde::{Deserialize, Serialize};
10
11use crate::{
12 object::{Request, Version},
13 result::{RPCError, RPCResult},
14};
15
16#[derive(Debug, Clone)]
18pub struct Client {
19 id_gen: Arc<AtomicU64>,
20 dispatcher: Dispatcher<Vec<u8>, Vec<u8>, RPCError>,
21}
22
23pub type Responder = librpc::responder::Responder<Vec<u8>, RPCError>;
24pub type Output = Receiver<(Option<u64>, Vec<u8>)>;
25
26impl Client {
27 pub fn new(cache_size: usize) -> (Self, Output, Responder) {
29 let (dispatcher, receiver) = Dispatcher::new(cache_size);
30
31 let responder = dispatcher.responder.clone();
32
33 (
34 Client {
35 id_gen: Default::default(),
36 dispatcher,
37 },
38 receiver,
39 responder,
40 )
41 }
42
43 pub async fn call<P, R, T>(
45 &mut self,
46 method: &str,
47 params: P,
48 timeout: Option<T>,
49 ) -> RPCResult<R>
50 where
51 P: Serialize,
52 for<'b> R: Deserialize<'b> + Send + 'static,
53 T: Timer + Unpin,
54 {
55 let id = self.id_gen.fetch_add(1, Ordering::SeqCst);
56
57 let request = Request {
58 id: Some(id),
59 method,
60 params,
61 jsonrpc: Version::default(),
62 };
63
64 let data = serde_json::to_vec(&request).expect("Inner error, assembly json request");
65
66 let result = self.dispatcher.call(id, data, timeout).await?.await?;
67
68 Ok(serde_json::from_slice(&result)?)
69 }
70
71 pub async fn notification<P>(&mut self, method: &str, params: P) -> RPCResult<()>
73 where
74 P: Serialize,
75 {
76 let request = Request {
77 method,
78 params,
79 id: None,
80 jsonrpc: Version::default(),
81 };
82
83 let data = serde_json::to_vec(&request)?;
84
85 self.dispatcher.notification(data).await?;
86
87 Ok(())
88 }
89}