1use std::collections::HashMap;
2use std::io;
3
4use futures::sync::{mpsc, oneshot};
5use futures::{Async, Future, Poll, Stream};
6use serde_json::Value;
7use tokio::io::{AsyncRead, AsyncWrite};
8
9use super::errors::RpcError;
10use super::message::Response as ResponseMessage;
11use super::message::{Message, Notification, Request};
12use super::transport::Transport;
13
14type RequestRx = mpsc::UnboundedReceiver<(Request, ResponseTx)>;
15type RequestTx = mpsc::UnboundedSender<(Request, ResponseTx)>;
16type NotificationTx = mpsc::UnboundedSender<(Notification, AckTx)>;
17type NotificationRx = mpsc::UnboundedReceiver<(Notification, AckTx)>;
18
19type ResponseTx = oneshot::Sender<Result<Value, Value>>;
20type AckTx = oneshot::Sender<()>;
21
22pub struct Response(oneshot::Receiver<Result<Value, Value>>);
24
25impl Future for Response {
26 type Item = Result<Value, Value>;
27 type Error = RpcError;
28
29 fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
30 self.0
31 .poll()
32 .map_err(|oneshot::Canceled| RpcError::ResponseCanceled)
33 }
34}
35
36pub struct Ack(oneshot::Receiver<()>);
40
41impl Future for Ack {
42 type Item = ();
43 type Error = RpcError;
44
45 fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
46 self.0
47 .poll()
48 .map_err(|oneshot::Canceled| RpcError::AckCanceled)
49 }
50}
51
52pub struct InnerClient {
53 shutting_down: bool,
54 request_id: u64,
55 requests_rx: RequestRx,
56 notifications_rx: NotificationRx,
57 pending_requests: HashMap<u64, ResponseTx>,
58 pending_notifications: Vec<AckTx>,
59 shutdown_rx: mpsc::UnboundedReceiver<()>,
60}
61
62impl InnerClient {
63 pub fn new() -> (Self, Client) {
64 let (requests_tx, requests_rx) = mpsc::unbounded();
65 let (notifications_tx, notifications_rx) = mpsc::unbounded();
66 let (shutdown_tx, shutdown_rx) = mpsc::unbounded();
67
68 let client_proxy = Client::new(requests_tx, notifications_tx, shutdown_tx);
69
70 let client = InnerClient {
71 shutting_down: false,
72 request_id: 0,
73 requests_rx,
74 notifications_rx,
75 pending_requests: HashMap::new(),
76 pending_notifications: Vec::new(),
77 shutdown_rx,
78 };
79
80 (client, client_proxy)
81 }
82
83 pub fn shutdown(&mut self) {
84 debug!("shutting down inner client");
85 self.shutting_down = true;
86 }
87
88 pub fn is_shutting_down(&self) -> bool {
89 self.shutting_down
90 }
91
92 pub fn process_shutdown_signals(&mut self) {
93 trace!("polling shutdown signal channel");
94 loop {
95 match self.shutdown_rx.poll() {
96 Ok(Async::Ready(Some(()))) => {
97 info!("Received shutdown signal");
98 self.shutdown();
99 break;
105 }
106 Ok(Async::Ready(None)) => {
107 warn!("client closed the shutdown signal channel");
108 self.shutdown();
109 break;
110 }
111 Ok(Async::NotReady) => {
112 trace!("no shutdown signal from client");
113 break;
114 }
115 Err(()) => {
116 error!("an error occured while polling the shutdown signal channel");
117 panic!("an error occured while polling the shutdown signal channel");
118 }
119 }
120 }
121 }
122
123 pub fn process_notifications<T: AsyncRead + AsyncWrite>(&mut self, stream: &mut Transport<T>) {
124 trace!("polling client notifications channel");
125 loop {
126 match self.notifications_rx.poll() {
127 Ok(Async::Ready(Some((notification, ack_sender)))) => {
128 trace!("sending notification: {:?}", notification);
129 stream.send(Message::Notification(notification));
130 self.pending_notifications.push(ack_sender);
131 }
132 Ok(Async::NotReady) => {
133 trace!("no new notification from client");
134 break;
135 }
136 Ok(Async::Ready(None)) => {
137 warn!("client closed the notifications channel");
138 self.shutdown();
139 break;
140 }
141 Err(()) => {
142 error!("an error occured while polling the notifications channel");
145 panic!("an error occured while polling the notifications channel");
146 }
147 }
148 }
149 }
150
151 pub fn process_requests<T: AsyncRead + AsyncWrite>(&mut self, stream: &mut Transport<T>) {
152 trace!("polling client requests channel");
153 loop {
154 match self.requests_rx.poll() {
155 Ok(Async::Ready(Some((mut request, response_sender)))) => {
156 self.request_id += 1;
157 trace!("sending request: {:?}", request);
158 request.id = self.request_id;
159 stream.send(Message::Request(request));
160 self.pending_requests
161 .insert(self.request_id, response_sender);
162 }
163 Ok(Async::Ready(None)) => {
164 warn!("client closed the requests channel.");
165 self.shutdown();
166 break;
167 }
168 Ok(Async::NotReady) => {
169 trace!("no new request from client");
170 break;
171 }
172 Err(()) => {
173 panic!("An error occured while polling the requests channel");
176 }
177 }
178 }
179 }
180
181 pub fn process_response(&mut self, response: ResponseMessage) {
182 if self.is_shutting_down() {
183 return;
184 }
185 if let Some(response_tx) = self.pending_requests.remove(&response.id) {
186 trace!("forwarding response to the client.");
187 if let Err(e) = response_tx.send(response.result) {
188 warn!("Failed to send response to client: {:?}", e);
189 }
190 } else {
191 warn!("no pending request found for response {}", &response.id);
192 }
193 }
194
195 pub fn acknowledge_notifications(&mut self) {
196 for chan in self.pending_notifications.drain(..) {
197 trace!("acknowledging notification.");
198 if let Err(e) = chan.send(()) {
199 warn!("Failed to send ack to client: {:?}", e);
200 }
201 }
202 }
203}
204
205#[derive(Clone)]
213pub struct Client {
214 requests_tx: RequestTx,
215 notifications_tx: NotificationTx,
216 shutdown_tx: mpsc::UnboundedSender<()>,
217}
218
219impl Client {
220 fn new(
221 requests_tx: RequestTx,
222 notifications_tx: NotificationTx,
223 shutdown_tx: mpsc::UnboundedSender<()>,
224 ) -> Self {
225 Client {
226 requests_tx,
227 notifications_tx,
228 shutdown_tx,
229 }
230 }
231
232 pub fn request(&self, method: &str, params: Value) -> Response {
233 trace!(
234 "forwarding request to endpoint (method={}, params={:?})",
235 method,
236 params
237 );
238 let request = Request {
239 id: 0,
240 method: method.to_owned(),
241 params,
242 };
243 let (tx, rx) = oneshot::channel();
244 let _ = mpsc::UnboundedSender::unbounded_send(&self.requests_tx, (request, tx));
249 Response(rx)
250 }
251
252 pub fn notify(&self, method: &str, params: Value) -> Ack {
253 trace!(
254 "forwarding notification to endpoint (method={}, params={:?})",
255 method,
256 params
257 );
258 let notification = Notification {
259 method: method.to_owned(),
260 params,
261 };
262 let (tx, rx) = oneshot::channel();
263 let _ = mpsc::UnboundedSender::unbounded_send(&self.notifications_tx, (notification, tx));
264 Ack(rx)
265 }
266
267 pub fn shutdown(&self) {
271 let _ = mpsc::UnboundedSender::unbounded_send(&self.shutdown_tx, ());
272 }
273}
274
275impl Future for Client {
276 type Item = ();
277 type Error = io::Error;
278
279 fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
280 Ok(Async::Ready(()))
281 }
282}