Skip to main content

xrl/protocol/
client.rs

1use std::collections::HashMap;
2use std::io;
3
4use futures::sync::{mpsc, oneshot};
5use futures::{Async, Future, Poll, Stream};
6use serde_json::Value;
7use tokio::io::{AsyncRead, AsyncWrite};
8
9use super::errors::RpcError;
10use super::message::Response as ResponseMessage;
11use super::message::{Message, Notification, Request};
12use super::transport::Transport;
13
14type RequestRx = mpsc::UnboundedReceiver<(Request, ResponseTx)>;
15type RequestTx = mpsc::UnboundedSender<(Request, ResponseTx)>;
16type NotificationTx = mpsc::UnboundedSender<(Notification, AckTx)>;
17type NotificationRx = mpsc::UnboundedReceiver<(Notification, AckTx)>;
18
19type ResponseTx = oneshot::Sender<Result<Value, Value>>;
20type AckTx = oneshot::Sender<()>;
21
22/// Future response to a request. It resolved once the response is available.
23pub struct Response(oneshot::Receiver<Result<Value, Value>>);
24
25impl Future for Response {
26    type Item = Result<Value, Value>;
27    type Error = RpcError;
28
29    fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
30        self.0
31            .poll()
32            .map_err(|oneshot::Canceled| RpcError::ResponseCanceled)
33    }
34}
35
36/// A future that resolves when a notification has been effectively sent to the
37/// server. It does not guarantees that the server receives it, just that it
38/// has been sent.
39pub struct Ack(oneshot::Receiver<()>);
40
41impl Future for Ack {
42    type Item = ();
43    type Error = RpcError;
44
45    fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
46        self.0
47            .poll()
48            .map_err(|oneshot::Canceled| RpcError::AckCanceled)
49    }
50}
51
52pub struct InnerClient {
53    shutting_down: bool,
54    request_id: u64,
55    requests_rx: RequestRx,
56    notifications_rx: NotificationRx,
57    pending_requests: HashMap<u64, ResponseTx>,
58    pending_notifications: Vec<AckTx>,
59    shutdown_rx: mpsc::UnboundedReceiver<()>,
60}
61
62impl InnerClient {
63    pub fn new() -> (Self, Client) {
64        let (requests_tx, requests_rx) = mpsc::unbounded();
65        let (notifications_tx, notifications_rx) = mpsc::unbounded();
66        let (shutdown_tx, shutdown_rx) = mpsc::unbounded();
67
68        let client_proxy = Client::new(requests_tx, notifications_tx, shutdown_tx);
69
70        let client = InnerClient {
71            shutting_down: false,
72            request_id: 0,
73            requests_rx,
74            notifications_rx,
75            pending_requests: HashMap::new(),
76            pending_notifications: Vec::new(),
77            shutdown_rx,
78        };
79
80        (client, client_proxy)
81    }
82
83    pub fn shutdown(&mut self) {
84        debug!("shutting down inner client");
85        self.shutting_down = true;
86    }
87
88    pub fn is_shutting_down(&self) -> bool {
89        self.shutting_down
90    }
91
92    pub fn process_shutdown_signals(&mut self) {
93        trace!("polling shutdown signal channel");
94        loop {
95            match self.shutdown_rx.poll() {
96                Ok(Async::Ready(Some(()))) => {
97                    info!("Received shutdown signal");
98                    self.shutdown();
99                    // Note that in theory, we should continue polling
100                    // until NotReady, but since we're shutting down
101                    // anyway, the Endpoint is going to be dropped so
102                    // it does not matter if the rest of the IO events
103                    // are being polled or not.
104                    break;
105                }
106                Ok(Async::Ready(None)) => {
107                    warn!("client closed the shutdown signal channel");
108                    self.shutdown();
109                    break;
110                }
111                Ok(Async::NotReady) => {
112                    trace!("no shutdown signal from client");
113                    break;
114                }
115                Err(()) => {
116                    error!("an error occured while polling the shutdown signal channel");
117                    panic!("an error occured while polling the shutdown signal channel");
118                }
119            }
120        }
121    }
122
123    pub fn process_notifications<T: AsyncRead + AsyncWrite>(&mut self, stream: &mut Transport<T>) {
124        trace!("polling client notifications channel");
125        loop {
126            match self.notifications_rx.poll() {
127                Ok(Async::Ready(Some((notification, ack_sender)))) => {
128                    trace!("sending notification: {:?}", notification);
129                    stream.send(Message::Notification(notification));
130                    self.pending_notifications.push(ack_sender);
131                }
132                Ok(Async::NotReady) => {
133                    trace!("no new notification from client");
134                    break;
135                }
136                Ok(Async::Ready(None)) => {
137                    warn!("client closed the notifications channel");
138                    self.shutdown();
139                    break;
140                }
141                Err(()) => {
142                    // I have no idea how this should be handled.
143                    // The documentation does not tell what may trigger an error.
144                    error!("an error occured while polling the notifications channel");
145                    panic!("an error occured while polling the notifications channel");
146                }
147            }
148        }
149    }
150
151    pub fn process_requests<T: AsyncRead + AsyncWrite>(&mut self, stream: &mut Transport<T>) {
152        trace!("polling client requests channel");
153        loop {
154            match self.requests_rx.poll() {
155                Ok(Async::Ready(Some((mut request, response_sender)))) => {
156                    self.request_id += 1;
157                    trace!("sending request: {:?}", request);
158                    request.id = self.request_id;
159                    stream.send(Message::Request(request));
160                    self.pending_requests
161                        .insert(self.request_id, response_sender);
162                }
163                Ok(Async::Ready(None)) => {
164                    warn!("client closed the requests channel.");
165                    self.shutdown();
166                    break;
167                }
168                Ok(Async::NotReady) => {
169                    trace!("no new request from client");
170                    break;
171                }
172                Err(()) => {
173                    // I have no idea how this should be handled.
174                    // The documentation does not tell what may trigger an error.
175                    panic!("An error occured while polling the requests channel");
176                }
177            }
178        }
179    }
180
181    pub fn process_response(&mut self, response: ResponseMessage) {
182        if self.is_shutting_down() {
183            return;
184        }
185        if let Some(response_tx) = self.pending_requests.remove(&response.id) {
186            trace!("forwarding response to the client.");
187            if let Err(e) = response_tx.send(response.result) {
188                warn!("Failed to send response to client: {:?}", e);
189            }
190        } else {
191            warn!("no pending request found for response {}", &response.id);
192        }
193    }
194
195    pub fn acknowledge_notifications(&mut self) {
196        for chan in self.pending_notifications.drain(..) {
197            trace!("acknowledging notification.");
198            if let Err(e) = chan.send(()) {
199                warn!("Failed to send ack to client: {:?}", e);
200            }
201        }
202    }
203}
204
205/// `Client` can be used to send Xi-RPC requests and notifications. It
206/// implements `Clone` so multiple clients can be instantiated. When
207/// all the `Client` instances are dropped, the Xi-RPC endoint shuts
208/// down. If the Xi-RPC endpoint shuts down while there are still
209/// `Client` instances, `Client::request()`, `Client::notify()` and
210/// `Client::shutdown()` can still be called on these instances, but
211/// will have no effect.
212#[derive(Clone)]
213pub struct Client {
214    requests_tx: RequestTx,
215    notifications_tx: NotificationTx,
216    shutdown_tx: mpsc::UnboundedSender<()>,
217}
218
219impl Client {
220    fn new(
221        requests_tx: RequestTx,
222        notifications_tx: NotificationTx,
223        shutdown_tx: mpsc::UnboundedSender<()>,
224    ) -> Self {
225        Client {
226            requests_tx,
227            notifications_tx,
228            shutdown_tx,
229        }
230    }
231
232    pub fn request(&self, method: &str, params: Value) -> Response {
233        trace!(
234            "forwarding request to endpoint (method={}, params={:?})",
235            method,
236            params
237        );
238        let request = Request {
239            id: 0,
240            method: method.to_owned(),
241            params,
242        };
243        let (tx, rx) = oneshot::channel();
244        // If send returns an Err, its because the other side has been dropped.
245        // By ignoring it, we are just dropping the `tx`, which will mean the
246        // rx will return Canceled when polled. In turn, that is translated
247        // into a BrokenPipe, which conveys the proper error.
248        let _ = mpsc::UnboundedSender::unbounded_send(&self.requests_tx, (request, tx));
249        Response(rx)
250    }
251
252    pub fn notify(&self, method: &str, params: Value) -> Ack {
253        trace!(
254            "forwarding notification to endpoint (method={}, params={:?})",
255            method,
256            params
257        );
258        let notification = Notification {
259            method: method.to_owned(),
260            params,
261        };
262        let (tx, rx) = oneshot::channel();
263        let _ = mpsc::UnboundedSender::unbounded_send(&self.notifications_tx, (notification, tx));
264        Ack(rx)
265    }
266
267    /// Forces the Xi-RPC endpoint to shut down. After this, the the
268    /// `request()`, `notify()` and `shutdown()` methods can still be
269    /// called but will have not effect.
270    pub fn shutdown(&self) {
271        let _ = mpsc::UnboundedSender::unbounded_send(&self.shutdown_tx, ());
272    }
273}
274
275impl Future for Client {
276    type Item = ();
277    type Error = io::Error;
278
279    fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
280        Ok(Async::Ready(()))
281    }
282}