tokio_jsonrpc/
endpoint.rs

1// Copyright 2017 tokio-jsonrpc Developers
2//
3// Licensed under the Apache License, Version 2.0, <LICENSE-APACHE or
4// http://apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or
5// http://opensource.org/licenses/MIT>, at your option. This file may not be
6// copied, modified, or distributed except according to those terms.
7
8//! The endpoint of the JSON RPC connection.
9//!
10//! This module helps building the endpoints of the connection. The endpoints act as both client
11//! and server at the same time. If you want a client-only endpoint, use
12//! [`Empty`](../server/struct.Empty.html) as the server or the relevant
13//! [`Endpoint`](struct.Endpoint.html)'s constructor. If you want a server-only endpoint,
14//! simply don't call any RPCs or notifications and forget about the returned
15//! [`Client`](struct.Client.html) structure.
16
17use std::error::Error;
18use std::fmt::{Display, Formatter, Result as FmtResult};
19use std::io::{self, Error as IoError, ErrorKind};
20use std::collections::HashMap;
21use std::rc::Rc;
22use std::time::Duration;
23use std::cell::RefCell;
24
25use futures::{Future, IntoFuture, Sink, Stream};
26use futures::future::Either;
27use futures::stream::{self, empty, unfold, Once};
28use futures::unsync::mpsc::{channel, Sender};
29use futures::unsync::oneshot::{channel as one_channel, Sender as OneSender};
30#[cfg(test)]
31use futures::unsync::oneshot::Receiver as OneReceiver;
32use serde_json::{to_value, Value};
33use slog::{Discard, Logger};
34use tokio_core::reactor::{Handle, Timeout};
35
36use message::{Broken, Message, Notification, Parsed, Request, Response, RpcError};
37use server::{Empty as EmptyServer, Server};
38
39/// Thing that terminates the connection once dropped.
40///
41/// A trick to terminate when all Rcs are forgotten.
42struct DropTerminator(Option<OneSender<()>>);
43
44impl Drop for DropTerminator {
45    fn drop(&mut self) {
46        // Don't care about the result. If the other side is gone, we just have nothing to do.
47        let _ = self.0.take().unwrap().send(());
48    }
49}
50
51type RcDrop = Rc<DropTerminator>;
52
53/// An internal part of `ServerCtl`.
54///
55/// The `ServerCtl` is just a thin Rc wrapper around this.
56struct ServerCtlInternal {
57    // Stop processing requests
58    stop: bool,
59    // Terminate the nice way (if all others also drop)
60    terminator: Option<RcDrop>,
61    // Terminate right now
62    killer: Option<OneSender<()>>,
63    // Info to be able to create a new clients
64    idmap: IDMap,
65    handle: Handle,
66    sender: Option<Sender<Message>>,
67    logger: Logger,
68}
69
70/// An error indicator when a connection has been already terminated.
71#[derive(Copy, Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)]
72pub struct AlreadyTerminated;
73
74impl Display for AlreadyTerminated {
75    fn fmt(&self, f: &mut Formatter) -> FmtResult {
76        write!(f, "Connection already terminated")
77    }
78}
79
80impl Error for AlreadyTerminated {
81    fn description(&self) -> &str {
82        "Connection already terminated"
83    }
84}
85
86/// A handle to control the server.
87///
88/// An instance is provided to each [`Server`](../server/trait.Server.html) callback and it can be
89/// used to manipulate the server (currently only to terminate the server) or to create a client
90/// for the use of the server.
91#[derive(Clone)]
92pub struct ServerCtl(Rc<RefCell<ServerCtlInternal>>);
93
94impl ServerCtl {
95    /// Perform a cleanup when terminating in some way.
96    ///
97    /// And perform some operation on the internal (access for convenience)
98    fn cleanup<R, F: FnOnce(&mut ServerCtlInternal) -> R>(&self, f: F) -> R {
99        let mut internal = self.0.borrow_mut();
100        debug!(internal.logger, "Server cleanup");
101        internal.stop = true;
102        internal.sender.take();
103        f(&mut internal)
104    }
105    /// Stop answering RPCs and calling notifications.
106    ///
107    /// Also terminate the connection if the client handle has been dropped and all ongoing RPC
108    /// answers were received.
109    pub fn terminate(&self) {
110        self.cleanup(|internal| {
111            // Drop the reference count for this one
112            internal.terminator.take();
113        });
114    }
115    /// Kill the connection.
116    ///
117    /// Like, right now. Without a goodbye.
118    pub fn kill(&self) {
119        self.cleanup(|internal| {
120            // The option might be None, but only after we called it already.
121            internal.killer.take().map(|s| s.send(()));
122        });
123    }
124    /// Create a new client for the current endpoint.
125    ///
126    /// This is a way in which the server may access the other endpoint (eg. call RPCs or send
127    /// notifications to the other side).
128    ///
129    /// If the server got terminated by some means (calling `kill`, `terminate`, dropping all
130    /// endpoints or by losing the connection), it returns `Err(AlreadyTerminated)`.
131    ///
132    /// Note that it's generally safe to `unwrap` the result, unless you clone the `ServerCtl` and
133    /// store it somewhere. If you do clone and keep it, there's a chance for race conditions and
134    /// you should check for the error conditions.
135    pub fn client(&self) -> Result<Client, AlreadyTerminated> {
136        let internal = self.0.borrow();
137        let terminator = internal.terminator.as_ref().ok_or(AlreadyTerminated)?;
138        let sender = internal.sender.as_ref().ok_or(AlreadyTerminated)?;
139        Ok(Client::new(
140            &internal.idmap,
141            self,
142            &internal.handle,
143            terminator,
144            sender,
145            internal.logger.clone(),
146        ))
147    }
148    // This one is for unit tests, not part of the general-purpose API. It creates a dummy
149    // ServerCtl that does nothing, but still can be passed to the Server for checking.
150    //
151    // It returns:
152    // * The ServerCtl itself
153    // * Drop future (fires when the corresponding server would get droppend)
154    // * Kill future (fires when kill is signalled)
155    #[doc(hidden)]
156    #[cfg(test)]
157    pub fn new_test() -> (Self, OneReceiver<()>, OneReceiver<()>) {
158        let (drop_sender, drop_receiver) = one_channel();
159        let (kill_sender, kill_receiver) = one_channel();
160        let (msg_sender, _msg_receiver) = channel(1);
161        let terminator = DropTerminator(Some(drop_sender));
162        let core = ::tokio_core::reactor::Core::new().unwrap();
163        let handle = core.handle();
164
165        let ctl = ServerCtl(Rc::new(RefCell::new(ServerCtlInternal {
166            stop: false,
167            terminator: Some(Rc::new(terminator)),
168            killer: Some(kill_sender),
169            idmap: Default::default(),
170            handle: handle,
171            sender: Some(msg_sender),
172            logger: Logger::root(Discard, o!()),
173        })));
174        (ctl, drop_receiver, kill_receiver)
175    }
176}
177
178// Our own BoxFuture & friends that is *not* send. We don't do send.
179type BoxFuture<T, E> = Box<Future<Item = T, Error = E>>;
180type FutureMessage = BoxFuture<Option<Message>, IoError>;
181type BoxStream<T, E> = Box<Stream<Item = T, Error = E>>;
182// None in error means end the stream, please
183type FutureMessageStream = BoxStream<FutureMessage, IoError>;
184
185type IDMap = Rc<RefCell<HashMap<String, OneSender<Response>>>>;
186
187// A future::stream::once that takes only the success value, for convenience.
188fn once<T, E>(item: T) -> Once<T, E> {
189    stream::once(Ok(item))
190}
191
192fn shouldnt_happen<E>(_: E) -> IoError {
193    IoError::new(ErrorKind::Other, "Shouldn't happen")
194}
195
196fn do_request<RpcServer: Server + 'static>(
197    server: &RpcServer, ctl: &ServerCtl, request: Request, logger: &Logger
198) -> FutureMessage {
199    match server.rpc(ctl, &request.method, &request.params) {
200        None => {
201            trace!(logger, "Server refused RPC {}", request.method);
202            let reply = request.error(RpcError::method_not_found(request.method.clone()));
203            Box::new(Ok(Some(reply)).into_future())
204        },
205        Some(future) => {
206            trace!(logger, "Server accepted RPC {}", request.method);
207            let result = future.into_future().then(move |result| match result {
208                Err(err) => Ok(Some(request.error(err))),
209                Ok(result) => Ok(Some(
210                    request.reply(to_value(result).expect("Bad result type")),
211                )),
212            });
213            Box::new(result)
214        },
215    }
216}
217
218fn do_notification<RpcServer: Server>(
219    server: &RpcServer, ctl: &ServerCtl, notification: &Notification, logger: &Logger
220) -> FutureMessage {
221    match server.notification(ctl, &notification.method, &notification.params) {
222        None => {
223            trace!(
224                logger,
225                "Server refused notification {}",
226                notification.method
227            );
228            Box::new(Ok(None).into_future())
229        },
230        // We ignore both success and error, so we convert it into something for now
231        Some(future) => {
232            trace!(
233                logger,
234                "Server accepted notification {}",
235                notification.method
236            );
237            Box::new(future.into_future().then(|_| Ok(None)))
238        },
239    }
240}
241
242// To process a batch using the same set of parallel executors as the whole server, we produce a
243// stream of the computations which return nothing, but gather the results. Then we add yet another
244// future at the end of that stream that takes the gathered results and wraps them into the real
245// message ‒ the result of the whole batch.
246fn do_batch<RpcServer: Server + 'static>(
247    server: &RpcServer, ctl: &ServerCtl, idmap: &IDMap, logger: &Logger, msg: Vec<Message>
248) -> FutureMessageStream {
249    // Create a large enough channel. We may be unable to pick up the results until the final
250    // future gets its turn, so shorter one could lead to a deadlock.
251    let (sender, receiver) = channel(msg.len());
252    // Each message produces a single stream of futures. Create that streams (right now, so we
253    // don't have to keep server long into the future).
254    let small_streams: Vec<_> = msg.into_iter()
255        .map(|sub| -> Result<_, IoError> {
256            let sender = sender.clone();
257            // This part is a bit convoluted. The do_msg returns a stream of futures. We want to
258            // take each of these futures (the outer and_then), run it to completion (the inner
259            // and_then), send its result through the sender if it provided one and then
260            // convert the result to None.
261            //
262            // Note that do_msg may return arbitrary number of work futures, but only at most one
263            // of them is supposed to provide a resulting value which would be sent through the
264            // channel. Unfortunately, there's no way to know which one it'll be, so we have to
265            // clone the sender all over the place.
266            //
267            // Also, it is a bit unfortunate how we need to allocate so many times here. We may try
268            // doing something about that in the future, but without implementing custom future and
269            // stream types, this seems the best we can do.
270            let all_sent = do_msg(server, ctl, idmap, logger, Ok(sub)).and_then(
271                move |future_message| -> Result<FutureMessage, _> {
272                    let sender = sender.clone();
273                    let msg_sent =
274                        future_message.and_then(move |response: Option<Message>| match response {
275                            None => Either::A(Ok(None).into_future()),
276                            Some(msg) => {
277                                Either::B(sender.send(msg).map_err(shouldnt_happen).map(|_| None))
278                            },
279                        });
280                    Ok(Box::new(msg_sent))
281                },
282            );
283            Ok(all_sent)
284        })
285        .collect();
286    // We make it into a stream of streams and flatten it to get one big stream
287    let subs_stream = stream::iter_result(small_streams).flatten();
288    // Once all the results are produced, wrap them into a batch and return that one
289    let collected = receiver.collect().map_err(shouldnt_happen).map(|results| {
290        if results.is_empty() {
291            // The spec says to send nothing at all if there are no results
292            None
293        } else {
294            Some(Message::Batch(results))
295        }
296    });
297    let streamed: Once<FutureMessage, _> = once(Box::new(collected));
298    // Connect the wrapping single-item stream after the work stream
299    Box::new(subs_stream.chain(streamed))
300}
301
302fn do_response(idmap: &IDMap, logger: &Logger, response: Response) -> FutureMessageStream {
303    let maybe_sender = response
304        .id
305        .as_str()
306        .and_then(|id| idmap.borrow_mut().remove(id));
307    if let Some(sender) = maybe_sender {
308        trace!(logger, "Received an RPC response"; "id" => format!("{:?}", response.id));
309        // Don't care about the result, if the other side went away, it doesn't need the response
310        // and that's OK with us.
311        drop(sender.send(response));
312    } else {
313        error!(logger, "Unexpected RPC response"; "id" => format!("{:?}", response.id));
314    }
315    Box::new(empty())
316}
317
318// Handle single message and turn it into an arbitrary number of futures that may be worked on in
319// parallel, but only at most one of which returns a response message
320fn do_msg<RpcServer: Server + 'static>(
321    server: &RpcServer, ctl: &ServerCtl, idmap: &IDMap, logger: &Logger, msg: Parsed
322) -> FutureMessageStream {
323    let terminated = ctl.0.borrow().stop;
324    trace!(logger, "Do a message"; "terminated" => terminated, "message" => format!("{:?}", msg));
325    if terminated {
326        if let Ok(Message::Response(response)) = msg {
327            do_response(idmap, logger, response);
328        }
329        Box::new(empty())
330    } else {
331        match msg {
332            Err(broken) => {
333                let err: FutureMessage = Box::new(Ok(Some(broken.reply())).into_future());
334                Box::new(once(err))
335            },
336            Ok(Message::Request(req)) => Box::new(once(do_request(server, ctl, req, logger))),
337            Ok(Message::Notification(notif)) => {
338                Box::new(once(do_notification(server, ctl, &notif, logger)))
339            },
340            Ok(Message::Batch(batch)) => do_batch(server, ctl, idmap, logger, batch),
341            Ok(Message::UnmatchedSub(value)) => {
342                do_msg(server, ctl, idmap, logger, Err(Broken::Unmatched(value)))
343            },
344            Ok(Message::Response(response)) => do_response(idmap, logger, response),
345        }
346    }
347}
348
349/// Internal part of the client.
350///
351/// Just for convenience, as we need to deconstruct and construct it repeatedly, so this way we
352/// have only one item extra.
353#[derive(Clone)]
354struct ClientData {
355    /// Mapping from IDs to the oneshots to wake up the recipient futures.
356    idmap: IDMap,
357    /// The control of the server.
358    ctl: ServerCtl,
359    handle: Handle,
360    /// Keep the connection alive as long as the client is alive.
361    terminator: RcDrop,
362    logger: Logger,
363}
364
365/// The client part of the endpoint.
366///
367/// This can be used to call RPCs and send notifications to the other end. There's no direct
368/// constructor, it is created through the [Endpoint](struct.Endpoint.html).
369#[derive(Clone)]
370pub struct Client {
371    sender: Sender<Message>,
372    data: ClientData,
373}
374
375pub type Notified = BoxFuture<Client, IoError>;
376pub type RpcFinished = BoxFuture<Option<Response>, IoError>;
377pub type RpcSent = BoxFuture<(Client, RpcFinished), IoError>;
378
379impl Client {
380    /// A constructor (a private one).
381    fn new(
382        idmap: &IDMap, ctl: &ServerCtl, handle: &Handle, terminator: &RcDrop,
383        sender: &Sender<Message>, logger: Logger,
384    ) -> Self {
385        debug!(logger, "Creating a new client");
386        Client {
387            sender: sender.clone(),
388            data: ClientData {
389                idmap: idmap.clone(),
390                ctl: ctl.clone(),
391                handle: handle.clone(),
392                terminator: terminator.clone(),
393                logger,
394            },
395        }
396    }
397    /// Call a RPC.
398    ///
399    /// Construct an RPC message and send it to the other end. It returns a future that resolves
400    /// once the message is sent. It yields the Client back (it is blocked for the time of sending)
401    /// and another future that resolves once the answer is received (or once a timeout happens, in
402    /// which case the result is None).
403    pub fn call(self, method: String, params: Option<Value>, timeout: Option<Duration>) -> RpcSent {
404        // We have to deconstruct self now, because the sender's send takes ownership for it for a
405        // while. We construct it back once the message is passed on.
406        let data = self.data;
407        trace!(data.logger, "Calling RPC {}", method);
408        let msg = Message::request(method, params);
409        let id = match msg {
410            Message::Request(Request {
411                id: Value::String(ref id),
412                ..
413            }) => id.clone(),
414            _ => unreachable!("We produce only string IDs"),
415        };
416        let (sender, receiver) = one_channel();
417        let rc_terminator = data.terminator.clone();
418        let logger_cloned = data.logger.clone();
419        let received = receiver
420            .map_err(|_| IoError::new(io::ErrorKind::Other, "Lost connection"))
421            .map(Some)
422            .then(move |r| {
423                trace!(logger_cloned, "Received RPC answer");
424                drop(rc_terminator);
425                r
426            });
427        let completed: RpcFinished = match timeout {
428            Some(time) => {
429                // If we were provided with a timeout, select what happens first.
430                let timeout = match Timeout::new(time, &data.handle) {
431                    Err(e) => return Box::new(Err(e).into_future()),
432                    Ok(t) => t,
433                };
434                let idmap = data.idmap.clone();
435                let id = id.clone();
436                let logger_cloned = data.logger.clone();
437                let completed = timeout
438                    .then(move |r| {
439                        trace!(logger_cloned, "RPC timed out");
440                        r
441                    })
442                    .map(|_| None)
443                    .select(received)
444                    .map(|(r, _)| r)
445                    .map_err(|(e, _)| e)
446                    // Make sure the ID/sender is removed even when timeout wins.
447                    // This is a NOOP in case the real result arrives, since it is already deleted
448                    // by then, but that doesn't matter and this is simpler.
449                    .then(move |r| {
450                        idmap.borrow_mut().remove(&id);
451                        r
452                    });
453                Box::new(completed)
454            },
455            // If we don't have the timeout, simply pass the future to get the response through.
456            None => Box::new(received),
457        };
458        data.idmap.borrow_mut().insert(id, sender);
459        // Ensure the connection is kept alive until the answer comes
460        let sent = self.sender
461            .send(msg)
462            .map_err(shouldnt_happen)
463            .map(move |sender| {
464                let client = Client { sender, data };
465                (client, completed)
466            });
467        Box::new(sent)
468    }
469    /// Send a notification.
470    ///
471    /// It creates a notification message and sends it. It returs a future that resolves once the
472    /// message is sent and yields the client back for further use.
473    pub fn notify(self, method: String, params: Option<Value>) -> Notified {
474        let data = self.data;
475        trace!(data.logger, "Sending notification {}", method);
476        let future = self.sender
477            .send(Message::notification(method, params))
478            .map_err(shouldnt_happen)
479            .map(move |sender| Client { sender, data });
480        Box::new(future)
481    }
482    /// Get the server control.
483    ///
484    /// That allows terminating the server, etc.
485    pub fn server_ctl(&self) -> &ServerCtl {
486        &self.data.ctl
487    }
488}
489
490/// The builder structure for the end point.
491///
492/// This is used to create the endpoint ‒ both the server and client part at once.
493///
494/// # Examples
495///
496/// This will create a connection, build a client-only endpoint (eg. the server is dummy) on it and
497/// send an RPC to the other side, printing the result once it comes.
498///
499/// ```rust,no_run
500/// # extern crate tokio_core;
501/// # extern crate tokio_io;
502/// # extern crate tokio_jsonrpc;
503/// # extern crate futures;
504/// # #[macro_use]
505/// # extern crate serde_json;
506/// #
507/// # use std::time::Duration;
508/// # use tokio_core::reactor::Core;
509/// # use tokio_core::net::TcpStream;
510/// # use tokio_io::AsyncRead;
511/// # use tokio_jsonrpc::{LineCodec, Server, ServerCtl, RpcError, Endpoint};
512/// # use tokio_jsonrpc::message::Response;
513/// # use futures::{Future, Stream};
514/// # use serde_json::Value;
515/// #
516/// # fn main() {
517/// let mut core = Core::new().unwrap();
518/// let handle = core.handle();
519///
520/// let request = TcpStream::connect(&"127.0.0.1:2346".parse().unwrap(), &handle)
521///     .map(move |stream| {
522///         // Create a client on top of the connection
523///         let (client, _finished) = Endpoint::client_only(stream.framed(LineCodec::new()))
524///             .start(&handle);
525///         // Call a method with some parameters and a 10 seconds timeout
526///         client.call("request".to_owned(),
527///                     Some(json!(["param1", "param2"])),
528///                     Some(Duration::new(10, 0)))
529///             .and_then(|(_client, future_result)| future_result)
530///             .map(|response| {
531///                 match response {
532///                     None => println!("A timeout happened"),
533///                     Some(Response { result, .. }) => println!("The answer is {:?}", result),
534///                 }
535///             })
536///     });
537///
538/// core.run(request).unwrap();
539/// # }
540/// ```
541#[derive(Clone, Debug)]
542pub struct Endpoint<Connection, RpcServer> {
543    connection: Connection,
544    server: RpcServer,
545    parallel: usize,
546    logger: Logger,
547}
548
549impl<Connection, RpcServer> Endpoint<Connection, RpcServer>
550where
551    Connection: Stream<Item = Parsed, Error = IoError>,
552    Connection: Sink<SinkItem = Message, SinkError = IoError>,
553    Connection: Send + 'static,
554    RpcServer: Server + 'static,
555{
556    /// Create the endpoint builder.
557    ///
558    /// Pass it the connection to build the endpoint on and the server to use internally.
559    pub fn new(connection: Connection, server: RpcServer) -> Self {
560        Endpoint {
561            connection,
562            server,
563            parallel: 1,
564            logger: Logger::root(Discard, o!()),
565        }
566    }
567    /// Set how many RPCs may be process in parallel.
568    ///
569    /// As the RPC may be a future, it is possible to have multiple of them in the pipeline at
570    /// once. By default, no parallelism is allowed on one endpoint. Calling this sets how many
571    /// parallel futures there may be at one time on this particular endpoint.
572    ///
573    /// This influences the server half only (eg. protects it from being overloaded by a client)
574    /// and is par single client. The client doesn't limit the amount of sent requests in any way,
575    /// since it can be easily managed by the caller.
576    pub fn parallel(self, parallel: usize) -> Self {
577        Endpoint { parallel, ..self }
578    }
579    /// Sets the logger used by the endpoint.
580    ///
581    /// By default, nothing is logged anywhere. But if you specify a logger, it'll be used for
582    /// logging various messages. The logger is used verbatim ‒ if you want the endpoint to use a
583    /// child logger, inherit it on your side.
584    pub fn logger(self, logger: Logger) -> Self {
585        Endpoint { logger, ..self }
586    }
587    /// Start the endpoint.
588    ///
589    /// Once all configuration is set, this creates the actual endpoint pair ‒ both the server and
590    /// the client. It returns the client and a future that resolves once the server terminates.
591    /// The future yields if there was any error running the server. The server is started on the
592    /// provided handle. It can be manipulated by the [`ServerCtl`](struct.ServerCtl.html), which
593    /// is accessible through the returned [`Client`](struct.Client.html) and is passed to each
594    /// [`Server`](../server/trait.Server.html) callback.
595    // TODO: Description how this works.
596    // TODO: Some cleanup. This looks a *bit* hairy and complex.
597    // TODO: Should we return a better error/return the error once thing resolves?
598    pub fn start(self, handle: &Handle) -> (Client, Box<Future<Item = (), Error = IoError>>) {
599        debug!(self.logger, "Starting endpoint"; "parallel" => self.parallel);
600        let logger = self.logger;
601        let (terminator_sender, terminator_receiver) = one_channel();
602        let (killer_sender, killer_receiver) = one_channel();
603        let (sender, receiver) = channel(32);
604        let idmap = Rc::new(RefCell::new(HashMap::new()));
605        let rc_terminator = Rc::new(DropTerminator(Some(terminator_sender)));
606        let ctl = ServerCtl(Rc::new(RefCell::new(ServerCtlInternal {
607            stop: false,
608            terminator: Some(rc_terminator.clone()),
609            killer: Some(killer_sender),
610            idmap: idmap.clone(),
611            handle: handle.clone(),
612            sender: Some(sender.clone()),
613            logger: logger.clone(),
614        })));
615        let client = Client::new(
616            &idmap,
617            &ctl,
618            handle,
619            &rc_terminator,
620            &sender,
621            logger.clone(),
622        );
623        let (sink, stream) = self.connection.split();
624        // Create a future for each received item that'll return something. Run some of them in
625        // parallel.
626
627        // TODO: Have a concrete enum-type for the futures so we don't have to allocate and box it.
628
629        // A trick to terminate when the receiver fires. We mix a None into the stream from another
630        // stream and stop when we find it, as a marker.
631        let terminator = terminator_receiver
632            .map(|_| None)
633            .map_err(shouldnt_happen)
634            .into_stream();
635        // Move out of self, otherwise the closure captures self, not only server :-|
636        let server = self.server;
637        server.initialized(&ctl);
638        let idmap_cloned = idmap.clone();
639        let logger_cloned = logger.clone();
640        // A stream that contains no elements, but cleans the idmap once called (to kill the RPC
641        // futures)
642        let ctl_clone = ctl.clone();
643        let cleaner = unfold((), move |_| -> Option<Result<_, _>> {
644            let mut idmap = idmap_cloned.borrow_mut();
645            debug!(logger_cloned, "Dropping unanswered RPCs (EOS)"; "outstanding" => idmap.len());
646            idmap.clear();
647            // Terminate the server manually when we reach the end of input, because it holds the
648            // client alive ‒ this will end the messages from the client endpoint.
649            ctl_clone.terminate();
650            Some(Ok((None, ())))
651        });
652        let idmap_cloned = idmap.clone();
653        let logger_cloned = logger.clone();
654        let answers = stream
655            .map(Some)
656            .chain(cleaner)
657            .select(terminator)
658            .take_while(|m| Ok(m.is_some()))
659            .map(move |parsed| do_msg(&server, &ctl, &idmap, &logger_cloned, parsed.unwrap()))
660            .flatten()
661            .buffer_unordered(self.parallel)
662            .filter_map(|message| message);
663        let logger_cloned = logger.clone();
664        // Take both the client RPCs and the answers
665        let outbound = answers.select(receiver.map_err(shouldnt_happen));
666        let (error_sender, error_receiver) = one_channel::<Option<IoError>>();
667        // And send them all (or kill it, if it happens first)
668        let transmitted = sink.send_all(outbound)
669            .map(|_| ())
670            .select(killer_receiver.map_err(shouldnt_happen))
671            .then(move |result| {
672                // This will hopefully kill the RPC futures
673                // We kill on both ends, because we may kill the connection or the other side may.
674                let mut idmap = idmap_cloned.borrow_mut();
675                debug!(logger_cloned, "Dropping unanswered RPCs"; "outstanding" => idmap.len());
676                idmap.clear();
677                match result {
678                    Ok(_) => {
679                        debug!(logger_cloned, "Outbound stream ended successfully");
680                        // Don't care about result (the other side simply doesn't care about the
681                        // notification on error).
682                        drop(error_sender.send(None));
683                        Ok(())
684                    },
685                    Err((e, _select_next)) => {
686                        debug!(logger_cloned, "Outbound stream ended with an error";
687                               "error" => format!("{}", e));
688                        // Don't care about result (the other side simply doesn't care about the
689                        // notification on error).
690                        drop(error_sender.send(Some(e)));
691                        Err(())
692                    },
693                }
694            });
695        // Once the last thing is sent, we're done
696        handle.spawn(transmitted);
697        let finished_errors = error_receiver
698            .map_err(shouldnt_happen)
699            .and_then(|maybe_error| match maybe_error {
700                None => Ok(()),
701                Some(e) => Err(e),
702            });
703        debug!(logger, "Started endpoint");
704        (client, Box::new(finished_errors))
705    }
706}
707
708impl<Connection> Endpoint<Connection, EmptyServer>
709where
710    Connection: Stream<Item = Parsed, Error = IoError>,
711    Connection: Sink<SinkItem = Message, SinkError = IoError>,
712    Connection: Send + 'static,
713{
714    /// Create an endpoint with [`Empty`](../server/struct.Empty.html).
715    ///
716    /// If you want to have client only, you can use this instead of `new`.
717    pub fn client_only(connection: Connection) -> Self {
718        Self::new(connection, EmptyServer)
719    }
720}