jsonrpc_rs/
client.rs

1mod recv;
2use std::sync::atomic::{AtomicUsize, Ordering};
3
4use async_timer_rs::{hashed::Timeout, Timer};
5use completeq_rs::{error::CompleteQError, oneshot::EventReceiver};
6use futures::{
7    channel::mpsc::{self, Sender},
8    SinkExt,
9};
10use recv::*;
11mod send;
12use send::*;
13mod user_event;
14use serde::{Deserialize, Serialize};
15use user_event::*;
16
17use crate::{
18    channel::{RPCData, TransportChannel},
19    map_error, RPCResult, Request,
20};
21
22#[derive(Clone)]
23pub struct Client {
24    output_sender: Sender<RPCData>,
25    completed_q: RPCCompletedQ,
26}
27
28impl Client {
29    pub fn new<C, S>(tag: S, channel: C) -> Self
30    where
31        C: TransportChannel,
32        S: AsRef<str>,
33    {
34        static ID: AtomicUsize = AtomicUsize::new(1);
35
36        let client_id = format!("{}_{}", tag.as_ref(), ID.fetch_add(1, Ordering::SeqCst));
37
38        let (output_sender, output_receiver) = mpsc::channel(100);
39
40        let completed_q = RPCCompletedQ::new();
41
42        let (input, output) = channel.framed();
43
44        C::spawn(send_loop::<C, String>(
45            client_id.clone(),
46            output,
47            output_receiver,
48            completed_q.clone(),
49        ));
50
51        C::spawn(recv_loop::<C, String>(
52            client_id,
53            input,
54            completed_q.clone(),
55        ));
56
57        Self {
58            output_sender,
59            completed_q,
60        }
61    }
62
63    pub async fn send<P>(&mut self, method: &str, params: P) -> RPCResult<Responser<Timeout>>
64    where
65        P: Serialize,
66    {
67        let receiver = self.completed_q.wait_one();
68
69        let request = Request {
70            id: Some(receiver.event_id()),
71            method,
72            params,
73            jsonrpc: crate::Version::default(),
74        };
75
76        let data = serde_json::to_vec(&request).expect("Inner error, assembly json request");
77
78        self.output_sender
79            .send(data.into())
80            .await
81            .map_err(map_error)?;
82
83        Ok(Responser {
84            receiver: Some(receiver),
85        })
86    }
87
88    pub async fn call<P, R>(&mut self, method: &str, params: P) -> RPCResult<R>
89    where
90        P: Serialize,
91        for<'b> R: Deserialize<'b> + Send + 'static,
92    {
93        self.send(method, params).await?.recv().await
94    }
95
96    pub async fn send_with_timer<P, T>(
97        &mut self,
98        method: &str,
99        params: P,
100        timer: T,
101    ) -> RPCResult<Responser<T>>
102    where
103        P: Serialize,
104        T: Timer + Unpin + 'static,
105    {
106        let receiver = self.completed_q.wait_one_with_timer(timer);
107
108        let request = Request {
109            id: Some(receiver.event_id()),
110            method,
111            params,
112            jsonrpc: crate::Version::default(),
113        };
114
115        let data = serde_json::to_vec(&request).expect("Inner error, assembly json request");
116
117        self.output_sender
118            .send(data.into())
119            .await
120            .map_err(map_error)?;
121
122        Ok(Responser {
123            receiver: Some(receiver),
124        })
125    }
126
127    pub async fn call_with_timer<P, T, R>(
128        &mut self,
129        method: &str,
130        params: P,
131        timer: T,
132    ) -> RPCResult<R>
133    where
134        T: Timer + Unpin + 'static,
135        P: Serialize,
136        for<'b> R: Deserialize<'b> + Send + 'static,
137    {
138        self.send_with_timer(method, params, timer)
139            .await?
140            .recv()
141            .await
142    }
143
144    pub async fn notification<P>(&mut self, method: &str, params: P) -> RPCResult<()>
145    where
146        P: Serialize,
147    {
148        let request = Request {
149            method,
150            params,
151            id: None,
152            jsonrpc: crate::Version::default(),
153        };
154
155        let data = serde_json::to_vec(&request)?;
156
157        self.output_sender
158            .send(data.into())
159            .await
160            .map_err(map_error)?;
161
162        Ok(())
163    }
164}
165
166pub struct Responser<T: Timer> {
167    receiver: Option<EventReceiver<RPCEvent, T>>,
168}
169
170impl<T: Timer> Responser<T>
171where
172    T: Unpin,
173{
174    pub async fn recv<R>(&mut self) -> RPCResult<R>
175    where
176        for<'b> R: Deserialize<'b> + Send + 'static,
177    {
178        let value = self
179            .receiver
180            .take()
181            .unwrap()
182            .await
183            .success()
184            .map_err(map_error)?
185            .ok_or(CompleteQError::PipeBroken)??;
186
187        serde_json::from_value(value.clone()).map_err(map_error)
188    }
189}