1mod recv;
2use std::sync::atomic::{AtomicUsize, Ordering};
3
4use async_timer_rs::{hashed::Timeout, Timer};
5use completeq_rs::{error::CompleteQError, oneshot::EventReceiver};
6use futures::{
7 channel::mpsc::{self, Sender},
8 SinkExt,
9};
10use recv::*;
11mod send;
12use send::*;
13mod user_event;
14use serde::{Deserialize, Serialize};
15use user_event::*;
16
17use crate::{
18 channel::{RPCData, TransportChannel},
19 map_error, RPCResult, Request,
20};
21
22#[derive(Clone)]
23pub struct Client {
24 output_sender: Sender<RPCData>,
25 completed_q: RPCCompletedQ,
26}
27
28impl Client {
29 pub fn new<C, S>(tag: S, channel: C) -> Self
30 where
31 C: TransportChannel,
32 S: AsRef<str>,
33 {
34 static ID: AtomicUsize = AtomicUsize::new(1);
35
36 let client_id = format!("{}_{}", tag.as_ref(), ID.fetch_add(1, Ordering::SeqCst));
37
38 let (output_sender, output_receiver) = mpsc::channel(100);
39
40 let completed_q = RPCCompletedQ::new();
41
42 let (input, output) = channel.framed();
43
44 C::spawn(send_loop::<C, String>(
45 client_id.clone(),
46 output,
47 output_receiver,
48 completed_q.clone(),
49 ));
50
51 C::spawn(recv_loop::<C, String>(
52 client_id,
53 input,
54 completed_q.clone(),
55 ));
56
57 Self {
58 output_sender,
59 completed_q,
60 }
61 }
62
63 pub async fn send<P>(&mut self, method: &str, params: P) -> RPCResult<Responser<Timeout>>
64 where
65 P: Serialize,
66 {
67 let receiver = self.completed_q.wait_one();
68
69 let request = Request {
70 id: Some(receiver.event_id()),
71 method,
72 params,
73 jsonrpc: crate::Version::default(),
74 };
75
76 let data = serde_json::to_vec(&request).expect("Inner error, assembly json request");
77
78 self.output_sender
79 .send(data.into())
80 .await
81 .map_err(map_error)?;
82
83 Ok(Responser {
84 receiver: Some(receiver),
85 })
86 }
87
88 pub async fn call<P, R>(&mut self, method: &str, params: P) -> RPCResult<R>
89 where
90 P: Serialize,
91 for<'b> R: Deserialize<'b> + Send + 'static,
92 {
93 self.send(method, params).await?.recv().await
94 }
95
96 pub async fn send_with_timer<P, T>(
97 &mut self,
98 method: &str,
99 params: P,
100 timer: T,
101 ) -> RPCResult<Responser<T>>
102 where
103 P: Serialize,
104 T: Timer + Unpin + 'static,
105 {
106 let receiver = self.completed_q.wait_one_with_timer(timer);
107
108 let request = Request {
109 id: Some(receiver.event_id()),
110 method,
111 params,
112 jsonrpc: crate::Version::default(),
113 };
114
115 let data = serde_json::to_vec(&request).expect("Inner error, assembly json request");
116
117 self.output_sender
118 .send(data.into())
119 .await
120 .map_err(map_error)?;
121
122 Ok(Responser {
123 receiver: Some(receiver),
124 })
125 }
126
127 pub async fn call_with_timer<P, T, R>(
128 &mut self,
129 method: &str,
130 params: P,
131 timer: T,
132 ) -> RPCResult<R>
133 where
134 T: Timer + Unpin + 'static,
135 P: Serialize,
136 for<'b> R: Deserialize<'b> + Send + 'static,
137 {
138 self.send_with_timer(method, params, timer)
139 .await?
140 .recv()
141 .await
142 }
143
144 pub async fn notification<P>(&mut self, method: &str, params: P) -> RPCResult<()>
145 where
146 P: Serialize,
147 {
148 let request = Request {
149 method,
150 params,
151 id: None,
152 jsonrpc: crate::Version::default(),
153 };
154
155 let data = serde_json::to_vec(&request)?;
156
157 self.output_sender
158 .send(data.into())
159 .await
160 .map_err(map_error)?;
161
162 Ok(())
163 }
164}
165
166pub struct Responser<T: Timer> {
167 receiver: Option<EventReceiver<RPCEvent, T>>,
168}
169
170impl<T: Timer> Responser<T>
171where
172 T: Unpin,
173{
174 pub async fn recv<R>(&mut self) -> RPCResult<R>
175 where
176 for<'b> R: Deserialize<'b> + Send + 'static,
177 {
178 let value = self
179 .receiver
180 .take()
181 .unwrap()
182 .await
183 .success()
184 .map_err(map_error)?
185 .ok_or(CompleteQError::PipeBroken)??;
186
187 serde_json::from_value(value.clone()).map_err(map_error)
188 }
189}