msgpack_rpc/
endpoint.rs

1use std::collections::HashMap;
2use std::io;
3
4use std::marker::Unpin;
5use std::pin::Pin;
6use std::task::{Context, Poll};
7
8use futures::channel::{mpsc, oneshot};
9use futures::io::{AsyncRead, AsyncWrite};
10use futures::{ready, Future, FutureExt, Sink, Stream, TryFutureExt};
11use rmpv::Value;
12use tokio_util::codec::{Decoder, Framed};
13use tokio_util::compat::{Compat, FuturesAsyncWriteCompatExt};
14
15use crate::codec::Codec;
16use crate::message::Response as MsgPackResponse;
17use crate::message::{Message, Notification, Request};
18
19/// The `Service` trait defines how a `MessagePack-RPC` server handles requests and notifications.
20pub trait Service: Send {
21    /// The type of future returned by `handle_request`. This future will be spawned on the event
22    /// loop, and when it is complete then the result will be sent back to the client that made the
23    /// request.
24    ///
25    /// Note that if your `handle_request` method involves only a simple and quick computation,
26    /// then you can set `RequestFut` to `Result<Value, Value>` (which gets turned into a future
27    /// that completes immediately). You only need to use a "real" future if there's some longer
28    /// computation or I/O that needs to be deferred.
29    type RequestFuture: Future<Output = Result<Value, Value>> + 'static + Send;
30
31    /// Handle a `MessagePack-RPC` request.
32    ///
33    /// The name of the request is `method`, and the parameters are given in `params`.
34    ///
35    /// Note that this method is called synchronously within the main event loop, and so it should
36    /// return quickly. If you need to run a longer computation, put it in a future and return it.
37    fn handle_request(&mut self, method: &str, params: &[Value]) -> Self::RequestFuture;
38
39    /// Handle a `MessagePack-RPC` notification.
40    ///
41    /// Note that this method is called synchronously within the main event loop, and so it should
42    /// return quickly. If you need to run a longer computation, put it in a future and spawn it on
43    /// the event loop.
44    fn handle_notification(&mut self, method: &str, params: &[Value]);
45}
46
47/// This is a beefed-up version of [`Service`](Service), in which the various handler
48/// methods also get access to a [`Client`](Client), which allows them to send requests and
49/// notifications to the same msgpack-rpc client that made the original request.
50pub trait ServiceWithClient {
51    /// The type of future returned by [`handle_request`](ServiceWithClient::handle_request).
52    type RequestFuture: Future<Output = Result<Value, Value>> + 'static + Send;
53
54    /// Handle a `MessagePack-RPC` request.
55    ///
56    /// This differs from [`Service::handle_request`](Service::handle_request) in that you also get
57    /// access to a [`Client`](Client) for sending requests and notifications.
58    fn handle_request(
59        &mut self,
60        client: &mut Client,
61        method: &str,
62        params: &[Value],
63    ) -> Self::RequestFuture;
64
65    /// Handle a `MessagePack-RPC` notification.
66    ///
67    /// This differs from [`Service::handle_notification`](Service::handle_notification) in that
68    /// you also get access to a [`Client`](Client) for sending requests and notifications.
69    fn handle_notification(&mut self, client: &mut Client, method: &str, params: &[Value]);
70}
71
72// Given a service that doesn't require access to a client, we can also treat it as a service that
73// does require access to a client.
74impl<S: Service> ServiceWithClient for S {
75    type RequestFuture = <S as Service>::RequestFuture;
76
77    fn handle_request(
78        &mut self,
79        _client: &mut Client,
80        method: &str,
81        params: &[Value],
82    ) -> Self::RequestFuture {
83        self.handle_request(method, params)
84    }
85
86    fn handle_notification(&mut self, _client: &mut Client, method: &str, params: &[Value]) {
87        self.handle_notification(method, params);
88    }
89}
90
91struct Server<S> {
92    service: S,
93    // This will receive responses from the service (or possibly from whatever worker tasks that
94    // the service spawned). The u32 contains the id of the request that the response is for.
95    pending_responses: mpsc::UnboundedReceiver<(u32, Result<Value, Value>)>,
96    // We hand out a clone of this whenever we call `service.handle_request`.
97    response_sender: mpsc::UnboundedSender<(u32, Result<Value, Value>)>,
98    // TODO: We partially add backpressure by ensuring that the pending responses get sent out
99    // before we accept new requests. However, it could be that there are lots of response
100    // computations out there that haven't sent pending responses yet; we don't yet have a way to
101    // apply backpressure there.
102}
103
104impl<S: ServiceWithClient> Server<S> {
105    fn new(service: S) -> Self {
106        let (send, recv) = mpsc::unbounded();
107
108        Server {
109            service,
110            pending_responses: recv,
111            response_sender: send,
112        }
113    }
114
115    // Pushes all responses (that are ready) onto the stream, to send back to the client.
116    //
117    // Returns Async::Ready if all of the pending responses were successfully sent on their way.
118    // (This does not necessarily mean that they were received yet.)
119    fn send_responses<T: AsyncRead + AsyncWrite>(
120        &mut self,
121        cx: &mut Context,
122        mut sink: Pin<&mut Transport<T>>,
123    ) -> Poll<io::Result<()>> {
124        trace!("Server: flushing responses");
125        loop {
126            ready!(sink.as_mut().poll_ready(cx)?);
127            match Pin::new(&mut self.pending_responses).poll_next(cx) {
128                Poll::Ready(Some((id, result))) => {
129                    let msg = Message::Response(MsgPackResponse { id, result });
130                    sink.as_mut().start_send(msg).unwrap();
131                }
132                Poll::Ready(None) => panic!("we store the sender, it can't be dropped"),
133                Poll::Pending => return sink.as_mut().poll_flush(cx),
134            }
135        }
136    }
137
138    fn spawn_request_worker<F: Future<Output = Result<Value, Value>> + 'static + Send>(
139        &self,
140        id: u32,
141        f: F,
142    ) {
143        trace!("spawning a new task");
144
145        // XXX Is this even a worthwhile optimization to reintroduce? Does it work correctly with
146        //     the no-op waker?
147        /*
148        // The simplest implementation of this function would just spawn a future immediately, but
149        // as an optimization let's check if the future is immediately ready and avoid spawning in
150        // that case.
151        match f.poll(&mut Context::from_waker(futures::task::noop_waker_ref())) {
152            Ok(Async::Ready(result)) => {
153                trace!("the task is already done, no need to spawn it on the event loop");
154                // An error in unbounded_send means that the receiver has been dropped, which
155                // means that the Server has stopped running. There is no meaningful way to
156                // signal an error from here (but the client should see an error anyway,
157                // because its stream will end before it gets a response).
158                let _ = self.response_sender.unbounded_send((id, Ok(result)));
159            }
160            Err(e) => {
161                trace!("the task failed, no need to spawn it on the event loop");
162                let _ = self.response_sender.unbounded_send((id, Err(e)));
163            }
164            Ok(Async::NotReady) => {
165                trace!("spawning the task on the event loop");
166                // Ok, we can't avoid it: spawn a future on the event loop.
167                let send = self.response_sender.clone();
168                tokio::spawn(
169                    f.map(move |result| send.unbounded_send((id, result))),
170                );
171            }
172        }
173        */
174
175        trace!("spawning the task on the event loop");
176        let send = self.response_sender.clone();
177        tokio::spawn(f.map(move |result| send.unbounded_send((id, result))));
178    }
179}
180
181// We need to write three different endpoints: client, server, and client+server. This trait helps
182// us avoid code duplication by defining the two main roles of an endpoint.
183trait MessageHandler {
184    // We just received `msg` on our input stream. Handle it.
185    fn handle_incoming(&mut self, msg: Message);
186
187    // Try to push out all of the outgoing messages (e.g. responses in the case of a server,
188    // notifications+requests in the case of a client) onto the sink. Return Ok(Async::Ready(()))
189    // if we managed to push them all out and flush the sink.
190    fn send_outgoing<T: AsyncRead + AsyncWrite>(
191        &mut self,
192        cx: &mut Context,
193        sink: Pin<&mut Transport<T>>,
194    ) -> Poll<io::Result<()>>;
195
196    // Is the endpoint finished? This is only relevant for clients, since servers and
197    // client+servers will never voluntarily stop.
198    fn is_finished(&self) -> bool {
199        false
200    }
201}
202
203type ResponseTx = oneshot::Sender<Result<Value, Value>>;
204/// Future response to a request. It resolved once the response is available.
205///
206/// Note that there are two different kinds of "errors" that can be signalled. If we resolve to an
207/// `Err(Cancelled)`, it means that the connection was closed before a response was received. If we
208/// resolve to `Ok(Async::Ready(Err(value)))`, it means that the server encountered an error when
209/// responding to the request, and it send back an error message.
210pub struct Response(oneshot::Receiver<Result<Value, Value>>);
211
212type AckTx = oneshot::Sender<()>;
213
214/// A future that resolves when a notification has been effictively sent to the server. It does not
215/// guarantees that the server receives it, just that it has been sent.
216pub struct Ack(oneshot::Receiver<()>);
217
218// TODO: perhaps make these bounded (for better backpressure)
219type RequestTx = mpsc::UnboundedSender<(Request, ResponseTx)>;
220type RequestRx = mpsc::UnboundedReceiver<(Request, ResponseTx)>;
221
222type NotificationTx = mpsc::UnboundedSender<(Notification, AckTx)>;
223type NotificationRx = mpsc::UnboundedReceiver<(Notification, AckTx)>;
224
225impl Future for Response {
226    type Output = Result<Value, Value>;
227
228    fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
229        trace!("Response: polling");
230        Poll::Ready(match ready!(Pin::new(&mut self.0).poll(cx)) {
231            Ok(Ok(v)) => Ok(v),
232            Ok(Err(v)) => Err(v),
233            Err(_) => Err(Value::Nil),
234        })
235    }
236}
237
238impl Future for Ack {
239    type Output = Result<(), ()>;
240
241    fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
242        trace!("Ack: polling");
243        Pin::new(&mut self.0).poll(cx).map_err(|_| ())
244    }
245}
246
247struct InnerClient {
248    client_closed: bool,
249    request_id: u32,
250    requests_rx: RequestRx,
251    notifications_rx: NotificationRx,
252    pending_requests: HashMap<u32, ResponseTx>,
253    pending_notifications: Vec<AckTx>,
254}
255
256impl InnerClient {
257    fn new() -> (Self, Client) {
258        let (requests_tx, requests_rx) = mpsc::unbounded();
259        let (notifications_tx, notifications_rx) = mpsc::unbounded();
260
261        let client_proxy = Client::from_channels(requests_tx, notifications_tx);
262
263        let client = InnerClient {
264            client_closed: false,
265            request_id: 0,
266            requests_rx,
267            notifications_rx,
268            pending_requests: HashMap::new(),
269            pending_notifications: Vec::new(),
270        };
271
272        (client, client_proxy)
273    }
274
275    fn process_notifications<T: AsyncRead + AsyncWrite>(
276        &mut self,
277        cx: &mut Context,
278        mut stream: Pin<&mut Transport<T>>,
279    ) -> io::Result<()> {
280        // Don't try to process notifications after the notifications channel was closed, because
281        // trying to read from it might cause panics.
282        if self.client_closed {
283            return Ok(());
284        }
285
286        trace!("Polling client notifications channel");
287
288        while let Poll::Ready(()) = stream.as_mut().poll_ready(cx)? {
289            match Pin::new(&mut self.notifications_rx).poll_next(cx) {
290                Poll::Ready(Some((notification, ack_sender))) => {
291                    trace!("Got notification from client.");
292                    stream
293                        .as_mut()
294                        .start_send(Message::Notification(notification))?;
295                    self.pending_notifications.push(ack_sender);
296                }
297                Poll::Ready(None) => {
298                    trace!("Client closed the notifications channel.");
299                    self.client_closed = true;
300                    break;
301                }
302                Poll::Pending => {
303                    trace!("No new notification from client");
304                    break;
305                }
306            }
307        }
308        Ok(())
309    }
310
311    fn send_messages<T: AsyncRead + AsyncWrite>(
312        &mut self,
313        cx: &mut Context,
314        mut stream: Pin<&mut Transport<T>>,
315    ) -> Poll<io::Result<()>> {
316        self.process_requests(cx, stream.as_mut())?;
317        self.process_notifications(cx, stream.as_mut())?;
318
319        match stream.poll_flush(cx)? {
320            Poll::Ready(()) => {
321                self.acknowledge_notifications();
322                Poll::Ready(Ok(()))
323            }
324            Poll::Pending => Poll::Pending,
325        }
326    }
327
328    fn process_requests<T: AsyncRead + AsyncWrite>(
329        &mut self,
330        cx: &mut Context,
331        mut stream: Pin<&mut Transport<T>>,
332    ) -> io::Result<()> {
333        // Don't try to process requests after the requests channel was closed, because
334        // trying to read from it might cause panics.
335        if self.client_closed {
336            return Ok(());
337        }
338        trace!("Polling client requests channel");
339        while let Poll::Ready(()) = stream.as_mut().poll_ready(cx)? {
340            match Pin::new(&mut self.requests_rx).poll_next(cx) {
341                Poll::Ready(Some((mut request, response_sender))) => {
342                    self.request_id += 1;
343                    trace!("Got request from client: {:?}", request);
344                    request.id = self.request_id;
345                    stream.as_mut().start_send(Message::Request(request))?;
346                    self.pending_requests
347                        .insert(self.request_id, response_sender);
348                }
349                Poll::Ready(None) => {
350                    trace!("Client closed the requests channel.");
351                    self.client_closed = true;
352                    break;
353                }
354                Poll::Pending => {
355                    trace!("No new request from client");
356                    break;
357                }
358            }
359        }
360        Ok(())
361    }
362
363    fn process_response(&mut self, response: MsgPackResponse) {
364        if let Some(response_tx) = self.pending_requests.remove(&response.id) {
365            trace!("Forwarding response to the client.");
366            if let Err(e) = response_tx.send(response.result) {
367                warn!("Failed to send response to client: {:?}", e);
368            }
369        } else {
370            warn!("no pending request found for response {}", &response.id);
371        }
372    }
373
374    fn acknowledge_notifications(&mut self) {
375        for chan in self.pending_notifications.drain(..) {
376            trace!("Acknowledging notification.");
377            if let Err(e) = chan.send(()) {
378                warn!("Failed to send ack to client: {:?}", e);
379            }
380        }
381    }
382}
383
384struct Transport<T>(Framed<Compat<T>, Codec>);
385
386impl<T> Transport<T>
387where
388    T: AsyncRead + AsyncWrite,
389{
390    fn inner(self: Pin<&mut Self>) -> Pin<&mut Framed<Compat<T>, Codec>> {
391        unsafe { self.map_unchecked_mut(|this| &mut this.0) }
392    }
393}
394
395impl<T> Stream for Transport<T>
396where
397    T: AsyncRead + AsyncWrite,
398{
399    type Item = io::Result<Message>;
400
401    fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
402        trace!("Transport: polling");
403        self.inner().poll_next(cx)
404    }
405}
406
407impl<T> Sink<Message> for Transport<T>
408where
409    T: AsyncRead + AsyncWrite,
410{
411    type Error = io::Error;
412
413    fn poll_ready(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
414        self.inner().poll_ready(cx)
415    }
416
417    fn start_send(self: Pin<&mut Self>, item: Message) -> Result<(), Self::Error> {
418        self.inner().start_send(item)
419    }
420
421    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
422        self.inner().poll_flush(cx)
423    }
424
425    fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
426        self.inner().poll_close(cx)
427    }
428}
429
430impl<S: Service> MessageHandler for Server<S> {
431    fn handle_incoming(&mut self, msg: Message) {
432        match msg {
433            Message::Request(req) => {
434                let f = self.service.handle_request(&req.method, &req.params);
435                self.spawn_request_worker(req.id, f);
436            }
437            Message::Notification(note) => {
438                self.service.handle_notification(&note.method, &note.params);
439            }
440            Message::Response(_) => {
441                trace!("This endpoint doesn't handle responses, ignoring the msg.");
442            }
443        };
444    }
445
446    fn send_outgoing<T: AsyncRead + AsyncWrite>(
447        &mut self,
448        cx: &mut Context,
449        sink: Pin<&mut Transport<T>>,
450    ) -> Poll<io::Result<()>> {
451        self.send_responses(cx, sink)
452    }
453}
454
455impl MessageHandler for InnerClient {
456    fn handle_incoming(&mut self, msg: Message) {
457        trace!("Received {:?}", msg);
458        if let Message::Response(response) = msg {
459            self.process_response(response);
460        } else {
461            trace!("This endpoint only handles reponses, ignoring the msg.");
462        }
463    }
464
465    fn send_outgoing<T: AsyncRead + AsyncWrite>(
466        &mut self,
467        cx: &mut Context,
468        sink: Pin<&mut Transport<T>>,
469    ) -> Poll<io::Result<()>> {
470        self.send_messages(cx, sink)
471    }
472
473    fn is_finished(&self) -> bool {
474        self.client_closed
475            && self.pending_requests.is_empty()
476            && self.pending_notifications.is_empty()
477    }
478}
479
480struct ClientAndServer<S> {
481    inner_client: InnerClient,
482    server: Server<S>,
483    client: Client,
484}
485
486impl<S: ServiceWithClient> MessageHandler for ClientAndServer<S> {
487    fn handle_incoming(&mut self, msg: Message) {
488        match msg {
489            Message::Request(req) => {
490                let f =
491                    self.server
492                        .service
493                        .handle_request(&mut self.client, &req.method, &req.params);
494                self.server.spawn_request_worker(req.id, f);
495            }
496            Message::Notification(note) => {
497                self.server.service.handle_notification(
498                    &mut self.client,
499                    &note.method,
500                    &note.params,
501                );
502            }
503            Message::Response(response) => self.inner_client.process_response(response),
504        };
505    }
506
507    fn send_outgoing<T: AsyncRead + AsyncWrite>(
508        &mut self,
509        cx: &mut Context,
510        mut sink: Pin<&mut Transport<T>>,
511    ) -> Poll<io::Result<()>> {
512        if let Poll::Ready(()) = self.server.send_responses(cx, sink.as_mut())? {
513            self.inner_client.send_messages(cx, sink)
514        } else {
515            Poll::Pending
516        }
517    }
518}
519
520struct InnerEndpoint<MH, T> {
521    handler: MH,
522    stream: Transport<T>,
523}
524
525impl<MH: MessageHandler + Unpin, T: AsyncRead + AsyncWrite> Future for InnerEndpoint<MH, T> {
526    type Output = io::Result<()>;
527
528    fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
529        trace!("InnerEndpoint: polling");
530        // Try to flush out all the responses that are queued up. If this doesn't succeed yet, our
531        // output sink is full. In that case, we'll apply some backpressure to our input stream by
532        // not reading from it.
533
534        // XXX It's sound to return unpin MH (ie handler: &mut MH) here since it is Unpin
535        let (handler, mut stream) = unsafe {
536            let this = self.get_unchecked_mut();
537            (&mut this.handler, Pin::new_unchecked(&mut this.stream))
538        };
539        if let Poll::Pending = handler.send_outgoing(cx, stream.as_mut())? {
540            trace!("Sink not yet flushed, waiting...");
541            return Poll::Pending;
542        }
543
544        trace!("Polling stream.");
545        while let Poll::Ready(msg) = stream.as_mut().poll_next(cx)? {
546            if let Some(msg) = msg {
547                handler.handle_incoming(msg);
548            } else {
549                trace!("Stream closed by remote peer.");
550                // FIXME: not sure if we should still continue sending responses here. Is it
551                // possible that the client closed the stream only one way and is still waiting
552                // for response? Not for TCP at least, but maybe for other transport types?
553                return Poll::Ready(Ok(()));
554            }
555        }
556
557        if handler.is_finished() {
558            trace!("inner client finished, exiting...");
559            Poll::Ready(Ok(()))
560        } else {
561            trace!("notifying the reactor that we're not done yet");
562            Poll::Pending
563        }
564    }
565}
566
567/// Creates a future for running a `Service` on a stream.
568///
569/// The returned future will run until the stream is closed; if the stream encounters an error,
570/// then the future will propagate it and terminate.
571pub fn serve<'a, S: Service + Unpin + 'a, T: AsyncRead + AsyncWrite + 'a + Send>(
572    stream: T,
573    service: S,
574) -> impl Future<Output = io::Result<()>> + 'a + Send {
575    ServerEndpoint::new(stream, service)
576}
577
578struct ServerEndpoint<S, T> {
579    inner: InnerEndpoint<Server<S>, T>,
580}
581
582impl<S: Service + Unpin, T: AsyncRead + AsyncWrite> ServerEndpoint<S, T> {
583    pub fn new(stream: T, service: S) -> Self {
584        let stream = FuturesAsyncWriteCompatExt::compat_write(stream);
585        ServerEndpoint {
586            inner: InnerEndpoint {
587                stream: Transport(Codec.framed(stream)),
588                handler: Server::new(service),
589            },
590        }
591    }
592}
593
594impl<S: Service + Unpin, T: AsyncRead + AsyncWrite> Future for ServerEndpoint<S, T> {
595    type Output = io::Result<()>;
596
597    fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
598        trace!("ServerEndpoint: polling");
599        unsafe { self.map_unchecked_mut(|this| &mut this.inner) }.poll(cx)
600    }
601}
602
603/// A `Future` for running both a client and a server at the same time.
604///
605/// The client part will be provided to the
606/// [`ServiceWithClient::handle_request`](ServiceWithClient::handle_request) and
607/// [`ServiceWithClient::handle_notification`](ServiceWithClient::handle_notification) methods,
608/// so that the server can send back requests and notifications as part of its handling duties. You
609/// may also access the client with the [`client()`](#method.client) method if you want to send
610/// additional requests.
611///
612/// The returned future needs to be spawned onto a task in order to actually run the server (and
613/// the client). It will run until the stream is closed; if the stream encounters an error, the
614/// future will propagate it and terminate.
615///
616/// ```
617/// use std::io;
618/// use rmp_rpc::ServiceWithClient;
619/// # use rmp_rpc::{Client, Endpoint, Value};
620/// use std::net::SocketAddr;
621/// use tokio::net::TcpListener;
622/// use tokio_util::compat::TokioAsyncReadCompatExt;
623///
624/// struct MyService;
625/// impl ServiceWithClient for MyService {
626/// // ...
627/// # type RequestFuture = futures::future::Ready<Result<Value, Value>>;
628/// # fn handle_request(&mut self, _: &mut Client, _: &str, _: &[Value]) -> Self::RequestFuture {
629/// #     unimplemented!();
630/// # }
631/// # fn handle_notification(&mut self, _: &mut Client, _: &str, _: &[Value]) {
632/// #     unimplemented!();
633/// # }
634/// }
635///
636/// #[tokio::main]
637/// async fn main() -> io::Result<()> {
638///     let addr: SocketAddr = "127.0.0.1:54321".parse().unwrap();
639///
640///     // Here's the simplest version: we listen for incoming TCP connections and run an
641///     // endpoint on each one.
642///     let server = async {
643/// #        if false {
644/// #            return Ok::<(), io::Error>(())
645/// #        }
646///         let mut listener = TcpListener::bind(&addr).await?;
647///         loop {
648///             // Each time the listener finds a new connection, start up an endpoint to handle
649///             // it.
650///             let (socket, _) = listener.accept().await?;
651///             if let Err(e) = Endpoint::new(socket.compat(), MyService).await {
652///                 println!("error on endpoint {}", e);
653///             }
654///         }
655///     };
656///
657///     // Uncomment this to run the server on the tokio event loop. This is blocking.
658///     // Press ^C to stop
659///     // tokio::run(server);
660///
661///     // Here's an alternative, where we take a handle to the client and spawn the endpoint
662///     // on its own task.
663///     let addr: SocketAddr = "127.0.0.1:65432".parse().unwrap();
664///     let server = async {
665/// #        if false {
666/// #            return Ok::<(), io::Error>(())
667/// #        }
668///         let mut listener = TcpListener::bind(&addr).await?;
669///         loop {
670///             let (socket, _) = listener.accept().await?;
671///             let end = Endpoint::new(socket.compat(), MyService);
672///             let client = end.client();
673///
674///             // Spawn the endpoint. It will do its own thing, while we can use the client
675///             // to send requests.
676///             tokio::spawn(end);
677///
678///             // Send a request with method name "hello" and argument "world!".
679///             match client.request("hello", &["world!".into()]).await {
680///                 Ok(response) => println!("{:?}", response),
681///                 Err(e) => println!("got an error: {:?}", e),
682///             };
683///             // We're returning the future that came from `client.request`. This means that
684///             // `server` (and therefore our entire program) will terminate once the
685///             // response is received and the messages are printed. If you wanted to keep
686///             // the endpoint running even after the response is received, you could
687///             // (instead of spawning `end` on its own task) `join` the two futures (i.e.
688///             // `end` and the one returned by `client.request`).
689///         }
690///     };
691///
692///     // Uncomment this to run the server on the tokio event loop. This is blocking.
693///     // Press ^C to stop
694///     // tokio::run(server);
695///
696///     Ok(())
697/// }
698/// ```
699pub struct Endpoint<S, T> {
700    inner: InnerEndpoint<ClientAndServer<S>, T>,
701}
702
703impl<S: ServiceWithClient + Unpin, T: AsyncRead + AsyncWrite> Endpoint<S, T> {
704    /// Creates a new `Endpoint` on `stream`, using `service` to handle requests and notifications.
705    pub fn new(stream: T, service: S) -> Self {
706        let (inner_client, client) = InnerClient::new();
707        let stream = FuturesAsyncWriteCompatExt::compat_write(stream);
708        Endpoint {
709            inner: InnerEndpoint {
710                stream: Transport(Codec.framed(stream)),
711                handler: ClientAndServer {
712                    inner_client,
713                    client,
714                    server: Server::new(service),
715                },
716            },
717        }
718    }
719
720    /// Returns a handle to the client half of this `Endpoint`, which can be used for sending
721    /// requests and notifications.
722    pub fn client(&self) -> Client {
723        self.inner.handler.client.clone()
724    }
725}
726
727impl<S: ServiceWithClient + Unpin, T: AsyncRead + AsyncWrite> Future for Endpoint<S, T> {
728    type Output = io::Result<()>;
729
730    fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
731        trace!("Endpoint: polling");
732        unsafe { self.map_unchecked_mut(|this| &mut this.inner) }.poll(cx)
733    }
734}
735
736/// A client that sends requests and notifications to a remote MessagePack-RPC server.
737#[derive(Clone)]
738pub struct Client {
739    requests_tx: RequestTx,
740    notifications_tx: NotificationTx,
741}
742
743impl Client {
744    /// Creates a new `Client` that can be used to send requests and notifications on the given
745    /// stream.
746    ///
747    /// ```
748    /// use std::io;
749    /// use std::net::SocketAddr;
750    ///
751    /// use rmp_rpc::Client;
752    /// use tokio::net::TcpStream;
753    /// use tokio_util::compat::TokioAsyncReadCompatExt;
754    ///
755    /// let addr: SocketAddr = "127.0.0.1:54321".parse().unwrap();
756    ///
757    /// let f = async {
758    ///     // Create a future that connects to the server, and send a notification and a request.
759    ///     let socket = TcpStream::connect(&addr).await?;
760    ///     let client = Client::new(socket.compat());
761    ///
762    ///     // Use the client to send a notification.
763    ///     // The future returned by client.notify() finishes when the notification
764    ///     // has been sent, in case we care about that. We can also just drop it.
765    ///     client.notify("hello", &[]);
766    ///
767    ///     // Use the client to send a request with the method "dostuff", and two parameters:
768    ///     // the string "foo" and the integer "42".
769    ///     // The future returned by client.request() finishes when the response
770    ///     // is received.
771    ///     if let Ok(resp) = client.request("dostuff", &["foo".into(), 42.into()]).await {
772    ///         println!("Response: {:?}", resp);
773    ///     }
774    ///     Ok::<_, io::Error>(())
775    /// };
776    ///
777    /// // Uncomment this to run the client, blocking until the response was received and the
778    /// // message was printed.
779    /// // tokio::run(f);
780    /// ```
781    /// # Panics
782    ///
783    /// This function will panic if the default executor is not set or if spawning
784    /// onto the default executor returns an error. To avoid the panic, use
785    ///
786    /// [`DefaultExecutor`](tokio::executor::DefaultExecutor)
787    pub fn new<T: AsyncRead + AsyncWrite + 'static + Send>(stream: T) -> Self {
788        let (inner_client, client) = InnerClient::new();
789        let stream = FuturesAsyncWriteCompatExt::compat_write(stream);
790        let endpoint = InnerEndpoint {
791            stream: Transport(Codec.framed(stream)),
792            handler: inner_client,
793        };
794        // We swallow io::Errors. The client will see an error if it has any outstanding requests
795        // or if it tries to send anything, because the endpoint has terminated.
796        tokio::spawn(
797            endpoint.map_err(|e| trace!("Client endpoint closed because of an error: {}", e)),
798        );
799        client
800    }
801
802    fn from_channels(requests_tx: RequestTx, notifications_tx: NotificationTx) -> Self {
803        Client {
804            requests_tx,
805            notifications_tx,
806        }
807    }
808
809    /// Send a `MessagePack-RPC` request
810    pub fn request(&self, method: &str, params: &[Value]) -> Response {
811        trace!("New request (method={}, params={:?})", method, params);
812        let request = Request {
813            id: 0,
814            method: method.to_owned(),
815            params: Vec::from(params),
816        };
817        let (tx, rx) = oneshot::channel();
818        // If send returns an Err, its because the other side has been dropped. By ignoring it,
819        // we are just dropping the `tx`, which will mean the rx will return Canceled when
820        // polled. In turn, that is translated into a BrokenPipe, which conveys the proper
821        // error.
822        let _ = mpsc::UnboundedSender::unbounded_send(&self.requests_tx, (request, tx));
823        Response(rx)
824    }
825
826    /// Send a `MessagePack-RPC` notification
827    pub fn notify(&self, method: &str, params: &[Value]) -> Ack {
828        trace!("New notification (method={}, params={:?})", method, params);
829        let notification = Notification {
830            method: method.to_owned(),
831            params: Vec::from(params),
832        };
833        let (tx, rx) = oneshot::channel();
834        let _ = mpsc::UnboundedSender::unbounded_send(&self.notifications_tx, (notification, tx));
835        Ack(rx)
836    }
837}
838
839impl Future for Client {
840    type Output = io::Result<()>;
841
842    fn poll(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Self::Output> {
843        trace!("Client: polling");
844        Poll::Ready(Ok(()))
845    }
846}