tokio_jsonrpc/endpoint.rs
1// Copyright 2017 tokio-jsonrpc Developers
2//
3// Licensed under the Apache License, Version 2.0, <LICENSE-APACHE or
4// http://apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or
5// http://opensource.org/licenses/MIT>, at your option. This file may not be
6// copied, modified, or distributed except according to those terms.
7
8//! The endpoint of the JSON RPC connection.
9//!
10//! This module helps building the endpoints of the connection. The endpoints act as both client
11//! and server at the same time. If you want a client-only endpoint, use
12//! [`Empty`](../server/struct.Empty.html) as the server or the relevant
13//! [`Endpoint`](struct.Endpoint.html)'s constructor. If you want a server-only endpoint,
14//! simply don't call any RPCs or notifications and forget about the returned
15//! [`Client`](struct.Client.html) structure.
16
17use std::error::Error;
18use std::fmt::{Display, Formatter, Result as FmtResult};
19use std::io::{self, Error as IoError, ErrorKind};
20use std::collections::HashMap;
21use std::rc::Rc;
22use std::time::Duration;
23use std::cell::RefCell;
24
25use futures::{Future, IntoFuture, Sink, Stream};
26use futures::future::Either;
27use futures::stream::{self, empty, unfold, Once};
28use futures::unsync::mpsc::{channel, Sender};
29use futures::unsync::oneshot::{channel as one_channel, Sender as OneSender};
30#[cfg(test)]
31use futures::unsync::oneshot::Receiver as OneReceiver;
32use serde_json::{to_value, Value};
33use slog::{Discard, Logger};
34use tokio_core::reactor::{Handle, Timeout};
35
36use message::{Broken, Message, Notification, Parsed, Request, Response, RpcError};
37use server::{Empty as EmptyServer, Server};
38
39/// Thing that terminates the connection once dropped.
40///
41/// A trick to terminate when all Rcs are forgotten.
42struct DropTerminator(Option<OneSender<()>>);
43
44impl Drop for DropTerminator {
45 fn drop(&mut self) {
46 // Don't care about the result. If the other side is gone, we just have nothing to do.
47 let _ = self.0.take().unwrap().send(());
48 }
49}
50
51type RcDrop = Rc<DropTerminator>;
52
53/// An internal part of `ServerCtl`.
54///
55/// The `ServerCtl` is just a thin Rc wrapper around this.
56struct ServerCtlInternal {
57 // Stop processing requests
58 stop: bool,
59 // Terminate the nice way (if all others also drop)
60 terminator: Option<RcDrop>,
61 // Terminate right now
62 killer: Option<OneSender<()>>,
63 // Info to be able to create a new clients
64 idmap: IDMap,
65 handle: Handle,
66 sender: Option<Sender<Message>>,
67 logger: Logger,
68}
69
70/// An error indicator when a connection has been already terminated.
71#[derive(Copy, Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)]
72pub struct AlreadyTerminated;
73
74impl Display for AlreadyTerminated {
75 fn fmt(&self, f: &mut Formatter) -> FmtResult {
76 write!(f, "Connection already terminated")
77 }
78}
79
80impl Error for AlreadyTerminated {
81 fn description(&self) -> &str {
82 "Connection already terminated"
83 }
84}
85
86/// A handle to control the server.
87///
88/// An instance is provided to each [`Server`](../server/trait.Server.html) callback and it can be
89/// used to manipulate the server (currently only to terminate the server) or to create a client
90/// for the use of the server.
91#[derive(Clone)]
92pub struct ServerCtl(Rc<RefCell<ServerCtlInternal>>);
93
94impl ServerCtl {
95 /// Perform a cleanup when terminating in some way.
96 ///
97 /// And perform some operation on the internal (access for convenience)
98 fn cleanup<R, F: FnOnce(&mut ServerCtlInternal) -> R>(&self, f: F) -> R {
99 let mut internal = self.0.borrow_mut();
100 debug!(internal.logger, "Server cleanup");
101 internal.stop = true;
102 internal.sender.take();
103 f(&mut internal)
104 }
105 /// Stop answering RPCs and calling notifications.
106 ///
107 /// Also terminate the connection if the client handle has been dropped and all ongoing RPC
108 /// answers were received.
109 pub fn terminate(&self) {
110 self.cleanup(|internal| {
111 // Drop the reference count for this one
112 internal.terminator.take();
113 });
114 }
115 /// Kill the connection.
116 ///
117 /// Like, right now. Without a goodbye.
118 pub fn kill(&self) {
119 self.cleanup(|internal| {
120 // The option might be None, but only after we called it already.
121 internal.killer.take().map(|s| s.send(()));
122 });
123 }
124 /// Create a new client for the current endpoint.
125 ///
126 /// This is a way in which the server may access the other endpoint (eg. call RPCs or send
127 /// notifications to the other side).
128 ///
129 /// If the server got terminated by some means (calling `kill`, `terminate`, dropping all
130 /// endpoints or by losing the connection), it returns `Err(AlreadyTerminated)`.
131 ///
132 /// Note that it's generally safe to `unwrap` the result, unless you clone the `ServerCtl` and
133 /// store it somewhere. If you do clone and keep it, there's a chance for race conditions and
134 /// you should check for the error conditions.
135 pub fn client(&self) -> Result<Client, AlreadyTerminated> {
136 let internal = self.0.borrow();
137 let terminator = internal.terminator.as_ref().ok_or(AlreadyTerminated)?;
138 let sender = internal.sender.as_ref().ok_or(AlreadyTerminated)?;
139 Ok(Client::new(
140 &internal.idmap,
141 self,
142 &internal.handle,
143 terminator,
144 sender,
145 internal.logger.clone(),
146 ))
147 }
148 // This one is for unit tests, not part of the general-purpose API. It creates a dummy
149 // ServerCtl that does nothing, but still can be passed to the Server for checking.
150 //
151 // It returns:
152 // * The ServerCtl itself
153 // * Drop future (fires when the corresponding server would get droppend)
154 // * Kill future (fires when kill is signalled)
155 #[doc(hidden)]
156 #[cfg(test)]
157 pub fn new_test() -> (Self, OneReceiver<()>, OneReceiver<()>) {
158 let (drop_sender, drop_receiver) = one_channel();
159 let (kill_sender, kill_receiver) = one_channel();
160 let (msg_sender, _msg_receiver) = channel(1);
161 let terminator = DropTerminator(Some(drop_sender));
162 let core = ::tokio_core::reactor::Core::new().unwrap();
163 let handle = core.handle();
164
165 let ctl = ServerCtl(Rc::new(RefCell::new(ServerCtlInternal {
166 stop: false,
167 terminator: Some(Rc::new(terminator)),
168 killer: Some(kill_sender),
169 idmap: Default::default(),
170 handle: handle,
171 sender: Some(msg_sender),
172 logger: Logger::root(Discard, o!()),
173 })));
174 (ctl, drop_receiver, kill_receiver)
175 }
176}
177
178// Our own BoxFuture & friends that is *not* send. We don't do send.
179type BoxFuture<T, E> = Box<Future<Item = T, Error = E>>;
180type FutureMessage = BoxFuture<Option<Message>, IoError>;
181type BoxStream<T, E> = Box<Stream<Item = T, Error = E>>;
182// None in error means end the stream, please
183type FutureMessageStream = BoxStream<FutureMessage, IoError>;
184
185type IDMap = Rc<RefCell<HashMap<String, OneSender<Response>>>>;
186
187// A future::stream::once that takes only the success value, for convenience.
188fn once<T, E>(item: T) -> Once<T, E> {
189 stream::once(Ok(item))
190}
191
192fn shouldnt_happen<E>(_: E) -> IoError {
193 IoError::new(ErrorKind::Other, "Shouldn't happen")
194}
195
196fn do_request<RpcServer: Server + 'static>(
197 server: &RpcServer, ctl: &ServerCtl, request: Request, logger: &Logger
198) -> FutureMessage {
199 match server.rpc(ctl, &request.method, &request.params) {
200 None => {
201 trace!(logger, "Server refused RPC {}", request.method);
202 let reply = request.error(RpcError::method_not_found(request.method.clone()));
203 Box::new(Ok(Some(reply)).into_future())
204 },
205 Some(future) => {
206 trace!(logger, "Server accepted RPC {}", request.method);
207 let result = future.into_future().then(move |result| match result {
208 Err(err) => Ok(Some(request.error(err))),
209 Ok(result) => Ok(Some(
210 request.reply(to_value(result).expect("Bad result type")),
211 )),
212 });
213 Box::new(result)
214 },
215 }
216}
217
218fn do_notification<RpcServer: Server>(
219 server: &RpcServer, ctl: &ServerCtl, notification: &Notification, logger: &Logger
220) -> FutureMessage {
221 match server.notification(ctl, ¬ification.method, ¬ification.params) {
222 None => {
223 trace!(
224 logger,
225 "Server refused notification {}",
226 notification.method
227 );
228 Box::new(Ok(None).into_future())
229 },
230 // We ignore both success and error, so we convert it into something for now
231 Some(future) => {
232 trace!(
233 logger,
234 "Server accepted notification {}",
235 notification.method
236 );
237 Box::new(future.into_future().then(|_| Ok(None)))
238 },
239 }
240}
241
242// To process a batch using the same set of parallel executors as the whole server, we produce a
243// stream of the computations which return nothing, but gather the results. Then we add yet another
244// future at the end of that stream that takes the gathered results and wraps them into the real
245// message ‒ the result of the whole batch.
246fn do_batch<RpcServer: Server + 'static>(
247 server: &RpcServer, ctl: &ServerCtl, idmap: &IDMap, logger: &Logger, msg: Vec<Message>
248) -> FutureMessageStream {
249 // Create a large enough channel. We may be unable to pick up the results until the final
250 // future gets its turn, so shorter one could lead to a deadlock.
251 let (sender, receiver) = channel(msg.len());
252 // Each message produces a single stream of futures. Create that streams (right now, so we
253 // don't have to keep server long into the future).
254 let small_streams: Vec<_> = msg.into_iter()
255 .map(|sub| -> Result<_, IoError> {
256 let sender = sender.clone();
257 // This part is a bit convoluted. The do_msg returns a stream of futures. We want to
258 // take each of these futures (the outer and_then), run it to completion (the inner
259 // and_then), send its result through the sender if it provided one and then
260 // convert the result to None.
261 //
262 // Note that do_msg may return arbitrary number of work futures, but only at most one
263 // of them is supposed to provide a resulting value which would be sent through the
264 // channel. Unfortunately, there's no way to know which one it'll be, so we have to
265 // clone the sender all over the place.
266 //
267 // Also, it is a bit unfortunate how we need to allocate so many times here. We may try
268 // doing something about that in the future, but without implementing custom future and
269 // stream types, this seems the best we can do.
270 let all_sent = do_msg(server, ctl, idmap, logger, Ok(sub)).and_then(
271 move |future_message| -> Result<FutureMessage, _> {
272 let sender = sender.clone();
273 let msg_sent =
274 future_message.and_then(move |response: Option<Message>| match response {
275 None => Either::A(Ok(None).into_future()),
276 Some(msg) => {
277 Either::B(sender.send(msg).map_err(shouldnt_happen).map(|_| None))
278 },
279 });
280 Ok(Box::new(msg_sent))
281 },
282 );
283 Ok(all_sent)
284 })
285 .collect();
286 // We make it into a stream of streams and flatten it to get one big stream
287 let subs_stream = stream::iter_result(small_streams).flatten();
288 // Once all the results are produced, wrap them into a batch and return that one
289 let collected = receiver.collect().map_err(shouldnt_happen).map(|results| {
290 if results.is_empty() {
291 // The spec says to send nothing at all if there are no results
292 None
293 } else {
294 Some(Message::Batch(results))
295 }
296 });
297 let streamed: Once<FutureMessage, _> = once(Box::new(collected));
298 // Connect the wrapping single-item stream after the work stream
299 Box::new(subs_stream.chain(streamed))
300}
301
302fn do_response(idmap: &IDMap, logger: &Logger, response: Response) -> FutureMessageStream {
303 let maybe_sender = response
304 .id
305 .as_str()
306 .and_then(|id| idmap.borrow_mut().remove(id));
307 if let Some(sender) = maybe_sender {
308 trace!(logger, "Received an RPC response"; "id" => format!("{:?}", response.id));
309 // Don't care about the result, if the other side went away, it doesn't need the response
310 // and that's OK with us.
311 drop(sender.send(response));
312 } else {
313 error!(logger, "Unexpected RPC response"; "id" => format!("{:?}", response.id));
314 }
315 Box::new(empty())
316}
317
318// Handle single message and turn it into an arbitrary number of futures that may be worked on in
319// parallel, but only at most one of which returns a response message
320fn do_msg<RpcServer: Server + 'static>(
321 server: &RpcServer, ctl: &ServerCtl, idmap: &IDMap, logger: &Logger, msg: Parsed
322) -> FutureMessageStream {
323 let terminated = ctl.0.borrow().stop;
324 trace!(logger, "Do a message"; "terminated" => terminated, "message" => format!("{:?}", msg));
325 if terminated {
326 if let Ok(Message::Response(response)) = msg {
327 do_response(idmap, logger, response);
328 }
329 Box::new(empty())
330 } else {
331 match msg {
332 Err(broken) => {
333 let err: FutureMessage = Box::new(Ok(Some(broken.reply())).into_future());
334 Box::new(once(err))
335 },
336 Ok(Message::Request(req)) => Box::new(once(do_request(server, ctl, req, logger))),
337 Ok(Message::Notification(notif)) => {
338 Box::new(once(do_notification(server, ctl, ¬if, logger)))
339 },
340 Ok(Message::Batch(batch)) => do_batch(server, ctl, idmap, logger, batch),
341 Ok(Message::UnmatchedSub(value)) => {
342 do_msg(server, ctl, idmap, logger, Err(Broken::Unmatched(value)))
343 },
344 Ok(Message::Response(response)) => do_response(idmap, logger, response),
345 }
346 }
347}
348
349/// Internal part of the client.
350///
351/// Just for convenience, as we need to deconstruct and construct it repeatedly, so this way we
352/// have only one item extra.
353#[derive(Clone)]
354struct ClientData {
355 /// Mapping from IDs to the oneshots to wake up the recipient futures.
356 idmap: IDMap,
357 /// The control of the server.
358 ctl: ServerCtl,
359 handle: Handle,
360 /// Keep the connection alive as long as the client is alive.
361 terminator: RcDrop,
362 logger: Logger,
363}
364
365/// The client part of the endpoint.
366///
367/// This can be used to call RPCs and send notifications to the other end. There's no direct
368/// constructor, it is created through the [Endpoint](struct.Endpoint.html).
369#[derive(Clone)]
370pub struct Client {
371 sender: Sender<Message>,
372 data: ClientData,
373}
374
375pub type Notified = BoxFuture<Client, IoError>;
376pub type RpcFinished = BoxFuture<Option<Response>, IoError>;
377pub type RpcSent = BoxFuture<(Client, RpcFinished), IoError>;
378
379impl Client {
380 /// A constructor (a private one).
381 fn new(
382 idmap: &IDMap, ctl: &ServerCtl, handle: &Handle, terminator: &RcDrop,
383 sender: &Sender<Message>, logger: Logger,
384 ) -> Self {
385 debug!(logger, "Creating a new client");
386 Client {
387 sender: sender.clone(),
388 data: ClientData {
389 idmap: idmap.clone(),
390 ctl: ctl.clone(),
391 handle: handle.clone(),
392 terminator: terminator.clone(),
393 logger,
394 },
395 }
396 }
397 /// Call a RPC.
398 ///
399 /// Construct an RPC message and send it to the other end. It returns a future that resolves
400 /// once the message is sent. It yields the Client back (it is blocked for the time of sending)
401 /// and another future that resolves once the answer is received (or once a timeout happens, in
402 /// which case the result is None).
403 pub fn call(self, method: String, params: Option<Value>, timeout: Option<Duration>) -> RpcSent {
404 // We have to deconstruct self now, because the sender's send takes ownership for it for a
405 // while. We construct it back once the message is passed on.
406 let data = self.data;
407 trace!(data.logger, "Calling RPC {}", method);
408 let msg = Message::request(method, params);
409 let id = match msg {
410 Message::Request(Request {
411 id: Value::String(ref id),
412 ..
413 }) => id.clone(),
414 _ => unreachable!("We produce only string IDs"),
415 };
416 let (sender, receiver) = one_channel();
417 let rc_terminator = data.terminator.clone();
418 let logger_cloned = data.logger.clone();
419 let received = receiver
420 .map_err(|_| IoError::new(io::ErrorKind::Other, "Lost connection"))
421 .map(Some)
422 .then(move |r| {
423 trace!(logger_cloned, "Received RPC answer");
424 drop(rc_terminator);
425 r
426 });
427 let completed: RpcFinished = match timeout {
428 Some(time) => {
429 // If we were provided with a timeout, select what happens first.
430 let timeout = match Timeout::new(time, &data.handle) {
431 Err(e) => return Box::new(Err(e).into_future()),
432 Ok(t) => t,
433 };
434 let idmap = data.idmap.clone();
435 let id = id.clone();
436 let logger_cloned = data.logger.clone();
437 let completed = timeout
438 .then(move |r| {
439 trace!(logger_cloned, "RPC timed out");
440 r
441 })
442 .map(|_| None)
443 .select(received)
444 .map(|(r, _)| r)
445 .map_err(|(e, _)| e)
446 // Make sure the ID/sender is removed even when timeout wins.
447 // This is a NOOP in case the real result arrives, since it is already deleted
448 // by then, but that doesn't matter and this is simpler.
449 .then(move |r| {
450 idmap.borrow_mut().remove(&id);
451 r
452 });
453 Box::new(completed)
454 },
455 // If we don't have the timeout, simply pass the future to get the response through.
456 None => Box::new(received),
457 };
458 data.idmap.borrow_mut().insert(id, sender);
459 // Ensure the connection is kept alive until the answer comes
460 let sent = self.sender
461 .send(msg)
462 .map_err(shouldnt_happen)
463 .map(move |sender| {
464 let client = Client { sender, data };
465 (client, completed)
466 });
467 Box::new(sent)
468 }
469 /// Send a notification.
470 ///
471 /// It creates a notification message and sends it. It returs a future that resolves once the
472 /// message is sent and yields the client back for further use.
473 pub fn notify(self, method: String, params: Option<Value>) -> Notified {
474 let data = self.data;
475 trace!(data.logger, "Sending notification {}", method);
476 let future = self.sender
477 .send(Message::notification(method, params))
478 .map_err(shouldnt_happen)
479 .map(move |sender| Client { sender, data });
480 Box::new(future)
481 }
482 /// Get the server control.
483 ///
484 /// That allows terminating the server, etc.
485 pub fn server_ctl(&self) -> &ServerCtl {
486 &self.data.ctl
487 }
488}
489
490/// The builder structure for the end point.
491///
492/// This is used to create the endpoint ‒ both the server and client part at once.
493///
494/// # Examples
495///
496/// This will create a connection, build a client-only endpoint (eg. the server is dummy) on it and
497/// send an RPC to the other side, printing the result once it comes.
498///
499/// ```rust,no_run
500/// # extern crate tokio_core;
501/// # extern crate tokio_io;
502/// # extern crate tokio_jsonrpc;
503/// # extern crate futures;
504/// # #[macro_use]
505/// # extern crate serde_json;
506/// #
507/// # use std::time::Duration;
508/// # use tokio_core::reactor::Core;
509/// # use tokio_core::net::TcpStream;
510/// # use tokio_io::AsyncRead;
511/// # use tokio_jsonrpc::{LineCodec, Server, ServerCtl, RpcError, Endpoint};
512/// # use tokio_jsonrpc::message::Response;
513/// # use futures::{Future, Stream};
514/// # use serde_json::Value;
515/// #
516/// # fn main() {
517/// let mut core = Core::new().unwrap();
518/// let handle = core.handle();
519///
520/// let request = TcpStream::connect(&"127.0.0.1:2346".parse().unwrap(), &handle)
521/// .map(move |stream| {
522/// // Create a client on top of the connection
523/// let (client, _finished) = Endpoint::client_only(stream.framed(LineCodec::new()))
524/// .start(&handle);
525/// // Call a method with some parameters and a 10 seconds timeout
526/// client.call("request".to_owned(),
527/// Some(json!(["param1", "param2"])),
528/// Some(Duration::new(10, 0)))
529/// .and_then(|(_client, future_result)| future_result)
530/// .map(|response| {
531/// match response {
532/// None => println!("A timeout happened"),
533/// Some(Response { result, .. }) => println!("The answer is {:?}", result),
534/// }
535/// })
536/// });
537///
538/// core.run(request).unwrap();
539/// # }
540/// ```
541#[derive(Clone, Debug)]
542pub struct Endpoint<Connection, RpcServer> {
543 connection: Connection,
544 server: RpcServer,
545 parallel: usize,
546 logger: Logger,
547}
548
549impl<Connection, RpcServer> Endpoint<Connection, RpcServer>
550where
551 Connection: Stream<Item = Parsed, Error = IoError>,
552 Connection: Sink<SinkItem = Message, SinkError = IoError>,
553 Connection: Send + 'static,
554 RpcServer: Server + 'static,
555{
556 /// Create the endpoint builder.
557 ///
558 /// Pass it the connection to build the endpoint on and the server to use internally.
559 pub fn new(connection: Connection, server: RpcServer) -> Self {
560 Endpoint {
561 connection,
562 server,
563 parallel: 1,
564 logger: Logger::root(Discard, o!()),
565 }
566 }
567 /// Set how many RPCs may be process in parallel.
568 ///
569 /// As the RPC may be a future, it is possible to have multiple of them in the pipeline at
570 /// once. By default, no parallelism is allowed on one endpoint. Calling this sets how many
571 /// parallel futures there may be at one time on this particular endpoint.
572 ///
573 /// This influences the server half only (eg. protects it from being overloaded by a client)
574 /// and is par single client. The client doesn't limit the amount of sent requests in any way,
575 /// since it can be easily managed by the caller.
576 pub fn parallel(self, parallel: usize) -> Self {
577 Endpoint { parallel, ..self }
578 }
579 /// Sets the logger used by the endpoint.
580 ///
581 /// By default, nothing is logged anywhere. But if you specify a logger, it'll be used for
582 /// logging various messages. The logger is used verbatim ‒ if you want the endpoint to use a
583 /// child logger, inherit it on your side.
584 pub fn logger(self, logger: Logger) -> Self {
585 Endpoint { logger, ..self }
586 }
587 /// Start the endpoint.
588 ///
589 /// Once all configuration is set, this creates the actual endpoint pair ‒ both the server and
590 /// the client. It returns the client and a future that resolves once the server terminates.
591 /// The future yields if there was any error running the server. The server is started on the
592 /// provided handle. It can be manipulated by the [`ServerCtl`](struct.ServerCtl.html), which
593 /// is accessible through the returned [`Client`](struct.Client.html) and is passed to each
594 /// [`Server`](../server/trait.Server.html) callback.
595 // TODO: Description how this works.
596 // TODO: Some cleanup. This looks a *bit* hairy and complex.
597 // TODO: Should we return a better error/return the error once thing resolves?
598 pub fn start(self, handle: &Handle) -> (Client, Box<Future<Item = (), Error = IoError>>) {
599 debug!(self.logger, "Starting endpoint"; "parallel" => self.parallel);
600 let logger = self.logger;
601 let (terminator_sender, terminator_receiver) = one_channel();
602 let (killer_sender, killer_receiver) = one_channel();
603 let (sender, receiver) = channel(32);
604 let idmap = Rc::new(RefCell::new(HashMap::new()));
605 let rc_terminator = Rc::new(DropTerminator(Some(terminator_sender)));
606 let ctl = ServerCtl(Rc::new(RefCell::new(ServerCtlInternal {
607 stop: false,
608 terminator: Some(rc_terminator.clone()),
609 killer: Some(killer_sender),
610 idmap: idmap.clone(),
611 handle: handle.clone(),
612 sender: Some(sender.clone()),
613 logger: logger.clone(),
614 })));
615 let client = Client::new(
616 &idmap,
617 &ctl,
618 handle,
619 &rc_terminator,
620 &sender,
621 logger.clone(),
622 );
623 let (sink, stream) = self.connection.split();
624 // Create a future for each received item that'll return something. Run some of them in
625 // parallel.
626
627 // TODO: Have a concrete enum-type for the futures so we don't have to allocate and box it.
628
629 // A trick to terminate when the receiver fires. We mix a None into the stream from another
630 // stream and stop when we find it, as a marker.
631 let terminator = terminator_receiver
632 .map(|_| None)
633 .map_err(shouldnt_happen)
634 .into_stream();
635 // Move out of self, otherwise the closure captures self, not only server :-|
636 let server = self.server;
637 server.initialized(&ctl);
638 let idmap_cloned = idmap.clone();
639 let logger_cloned = logger.clone();
640 // A stream that contains no elements, but cleans the idmap once called (to kill the RPC
641 // futures)
642 let ctl_clone = ctl.clone();
643 let cleaner = unfold((), move |_| -> Option<Result<_, _>> {
644 let mut idmap = idmap_cloned.borrow_mut();
645 debug!(logger_cloned, "Dropping unanswered RPCs (EOS)"; "outstanding" => idmap.len());
646 idmap.clear();
647 // Terminate the server manually when we reach the end of input, because it holds the
648 // client alive ‒ this will end the messages from the client endpoint.
649 ctl_clone.terminate();
650 Some(Ok((None, ())))
651 });
652 let idmap_cloned = idmap.clone();
653 let logger_cloned = logger.clone();
654 let answers = stream
655 .map(Some)
656 .chain(cleaner)
657 .select(terminator)
658 .take_while(|m| Ok(m.is_some()))
659 .map(move |parsed| do_msg(&server, &ctl, &idmap, &logger_cloned, parsed.unwrap()))
660 .flatten()
661 .buffer_unordered(self.parallel)
662 .filter_map(|message| message);
663 let logger_cloned = logger.clone();
664 // Take both the client RPCs and the answers
665 let outbound = answers.select(receiver.map_err(shouldnt_happen));
666 let (error_sender, error_receiver) = one_channel::<Option<IoError>>();
667 // And send them all (or kill it, if it happens first)
668 let transmitted = sink.send_all(outbound)
669 .map(|_| ())
670 .select(killer_receiver.map_err(shouldnt_happen))
671 .then(move |result| {
672 // This will hopefully kill the RPC futures
673 // We kill on both ends, because we may kill the connection or the other side may.
674 let mut idmap = idmap_cloned.borrow_mut();
675 debug!(logger_cloned, "Dropping unanswered RPCs"; "outstanding" => idmap.len());
676 idmap.clear();
677 match result {
678 Ok(_) => {
679 debug!(logger_cloned, "Outbound stream ended successfully");
680 // Don't care about result (the other side simply doesn't care about the
681 // notification on error).
682 drop(error_sender.send(None));
683 Ok(())
684 },
685 Err((e, _select_next)) => {
686 debug!(logger_cloned, "Outbound stream ended with an error";
687 "error" => format!("{}", e));
688 // Don't care about result (the other side simply doesn't care about the
689 // notification on error).
690 drop(error_sender.send(Some(e)));
691 Err(())
692 },
693 }
694 });
695 // Once the last thing is sent, we're done
696 handle.spawn(transmitted);
697 let finished_errors = error_receiver
698 .map_err(shouldnt_happen)
699 .and_then(|maybe_error| match maybe_error {
700 None => Ok(()),
701 Some(e) => Err(e),
702 });
703 debug!(logger, "Started endpoint");
704 (client, Box::new(finished_errors))
705 }
706}
707
708impl<Connection> Endpoint<Connection, EmptyServer>
709where
710 Connection: Stream<Item = Parsed, Error = IoError>,
711 Connection: Sink<SinkItem = Message, SinkError = IoError>,
712 Connection: Send + 'static,
713{
714 /// Create an endpoint with [`Empty`](../server/struct.Empty.html).
715 ///
716 /// If you want to have client only, you can use this instead of `new`.
717 pub fn client_only(connection: Connection) -> Self {
718 Self::new(connection, EmptyServer)
719 }
720}