msgpack_rpc/endpoint.rs
1use std::collections::HashMap;
2use std::io;
3
4use std::marker::Unpin;
5use std::pin::Pin;
6use std::task::{Context, Poll};
7
8use futures::channel::{mpsc, oneshot};
9use futures::io::{AsyncRead, AsyncWrite};
10use futures::{ready, Future, FutureExt, Sink, Stream, TryFutureExt};
11use rmpv::Value;
12use tokio_util::codec::{Decoder, Framed};
13use tokio_util::compat::{Compat, FuturesAsyncWriteCompatExt};
14
15use crate::codec::Codec;
16use crate::message::Response as MsgPackResponse;
17use crate::message::{Message, Notification, Request};
18
19/// The `Service` trait defines how a `MessagePack-RPC` server handles requests and notifications.
20pub trait Service: Send {
21 /// The type of future returned by `handle_request`. This future will be spawned on the event
22 /// loop, and when it is complete then the result will be sent back to the client that made the
23 /// request.
24 ///
25 /// Note that if your `handle_request` method involves only a simple and quick computation,
26 /// then you can set `RequestFut` to `Result<Value, Value>` (which gets turned into a future
27 /// that completes immediately). You only need to use a "real" future if there's some longer
28 /// computation or I/O that needs to be deferred.
29 type RequestFuture: Future<Output = Result<Value, Value>> + 'static + Send;
30
31 /// Handle a `MessagePack-RPC` request.
32 ///
33 /// The name of the request is `method`, and the parameters are given in `params`.
34 ///
35 /// Note that this method is called synchronously within the main event loop, and so it should
36 /// return quickly. If you need to run a longer computation, put it in a future and return it.
37 fn handle_request(&mut self, method: &str, params: &[Value]) -> Self::RequestFuture;
38
39 /// Handle a `MessagePack-RPC` notification.
40 ///
41 /// Note that this method is called synchronously within the main event loop, and so it should
42 /// return quickly. If you need to run a longer computation, put it in a future and spawn it on
43 /// the event loop.
44 fn handle_notification(&mut self, method: &str, params: &[Value]);
45}
46
47/// This is a beefed-up version of [`Service`](Service), in which the various handler
48/// methods also get access to a [`Client`](Client), which allows them to send requests and
49/// notifications to the same msgpack-rpc client that made the original request.
50pub trait ServiceWithClient {
51 /// The type of future returned by [`handle_request`](ServiceWithClient::handle_request).
52 type RequestFuture: Future<Output = Result<Value, Value>> + 'static + Send;
53
54 /// Handle a `MessagePack-RPC` request.
55 ///
56 /// This differs from [`Service::handle_request`](Service::handle_request) in that you also get
57 /// access to a [`Client`](Client) for sending requests and notifications.
58 fn handle_request(
59 &mut self,
60 client: &mut Client,
61 method: &str,
62 params: &[Value],
63 ) -> Self::RequestFuture;
64
65 /// Handle a `MessagePack-RPC` notification.
66 ///
67 /// This differs from [`Service::handle_notification`](Service::handle_notification) in that
68 /// you also get access to a [`Client`](Client) for sending requests and notifications.
69 fn handle_notification(&mut self, client: &mut Client, method: &str, params: &[Value]);
70}
71
72// Given a service that doesn't require access to a client, we can also treat it as a service that
73// does require access to a client.
74impl<S: Service> ServiceWithClient for S {
75 type RequestFuture = <S as Service>::RequestFuture;
76
77 fn handle_request(
78 &mut self,
79 _client: &mut Client,
80 method: &str,
81 params: &[Value],
82 ) -> Self::RequestFuture {
83 self.handle_request(method, params)
84 }
85
86 fn handle_notification(&mut self, _client: &mut Client, method: &str, params: &[Value]) {
87 self.handle_notification(method, params);
88 }
89}
90
91struct Server<S> {
92 service: S,
93 // This will receive responses from the service (or possibly from whatever worker tasks that
94 // the service spawned). The u32 contains the id of the request that the response is for.
95 pending_responses: mpsc::UnboundedReceiver<(u32, Result<Value, Value>)>,
96 // We hand out a clone of this whenever we call `service.handle_request`.
97 response_sender: mpsc::UnboundedSender<(u32, Result<Value, Value>)>,
98 // TODO: We partially add backpressure by ensuring that the pending responses get sent out
99 // before we accept new requests. However, it could be that there are lots of response
100 // computations out there that haven't sent pending responses yet; we don't yet have a way to
101 // apply backpressure there.
102}
103
104impl<S: ServiceWithClient> Server<S> {
105 fn new(service: S) -> Self {
106 let (send, recv) = mpsc::unbounded();
107
108 Server {
109 service,
110 pending_responses: recv,
111 response_sender: send,
112 }
113 }
114
115 // Pushes all responses (that are ready) onto the stream, to send back to the client.
116 //
117 // Returns Async::Ready if all of the pending responses were successfully sent on their way.
118 // (This does not necessarily mean that they were received yet.)
119 fn send_responses<T: AsyncRead + AsyncWrite>(
120 &mut self,
121 cx: &mut Context,
122 mut sink: Pin<&mut Transport<T>>,
123 ) -> Poll<io::Result<()>> {
124 trace!("Server: flushing responses");
125 loop {
126 ready!(sink.as_mut().poll_ready(cx)?);
127 match Pin::new(&mut self.pending_responses).poll_next(cx) {
128 Poll::Ready(Some((id, result))) => {
129 let msg = Message::Response(MsgPackResponse { id, result });
130 sink.as_mut().start_send(msg).unwrap();
131 }
132 Poll::Ready(None) => panic!("we store the sender, it can't be dropped"),
133 Poll::Pending => return sink.as_mut().poll_flush(cx),
134 }
135 }
136 }
137
138 fn spawn_request_worker<F: Future<Output = Result<Value, Value>> + 'static + Send>(
139 &self,
140 id: u32,
141 f: F,
142 ) {
143 trace!("spawning a new task");
144
145 // XXX Is this even a worthwhile optimization to reintroduce? Does it work correctly with
146 // the no-op waker?
147 /*
148 // The simplest implementation of this function would just spawn a future immediately, but
149 // as an optimization let's check if the future is immediately ready and avoid spawning in
150 // that case.
151 match f.poll(&mut Context::from_waker(futures::task::noop_waker_ref())) {
152 Ok(Async::Ready(result)) => {
153 trace!("the task is already done, no need to spawn it on the event loop");
154 // An error in unbounded_send means that the receiver has been dropped, which
155 // means that the Server has stopped running. There is no meaningful way to
156 // signal an error from here (but the client should see an error anyway,
157 // because its stream will end before it gets a response).
158 let _ = self.response_sender.unbounded_send((id, Ok(result)));
159 }
160 Err(e) => {
161 trace!("the task failed, no need to spawn it on the event loop");
162 let _ = self.response_sender.unbounded_send((id, Err(e)));
163 }
164 Ok(Async::NotReady) => {
165 trace!("spawning the task on the event loop");
166 // Ok, we can't avoid it: spawn a future on the event loop.
167 let send = self.response_sender.clone();
168 tokio::spawn(
169 f.map(move |result| send.unbounded_send((id, result))),
170 );
171 }
172 }
173 */
174
175 trace!("spawning the task on the event loop");
176 let send = self.response_sender.clone();
177 tokio::spawn(f.map(move |result| send.unbounded_send((id, result))));
178 }
179}
180
181// We need to write three different endpoints: client, server, and client+server. This trait helps
182// us avoid code duplication by defining the two main roles of an endpoint.
183trait MessageHandler {
184 // We just received `msg` on our input stream. Handle it.
185 fn handle_incoming(&mut self, msg: Message);
186
187 // Try to push out all of the outgoing messages (e.g. responses in the case of a server,
188 // notifications+requests in the case of a client) onto the sink. Return Ok(Async::Ready(()))
189 // if we managed to push them all out and flush the sink.
190 fn send_outgoing<T: AsyncRead + AsyncWrite>(
191 &mut self,
192 cx: &mut Context,
193 sink: Pin<&mut Transport<T>>,
194 ) -> Poll<io::Result<()>>;
195
196 // Is the endpoint finished? This is only relevant for clients, since servers and
197 // client+servers will never voluntarily stop.
198 fn is_finished(&self) -> bool {
199 false
200 }
201}
202
203type ResponseTx = oneshot::Sender<Result<Value, Value>>;
204/// Future response to a request. It resolved once the response is available.
205///
206/// Note that there are two different kinds of "errors" that can be signalled. If we resolve to an
207/// `Err(Cancelled)`, it means that the connection was closed before a response was received. If we
208/// resolve to `Ok(Async::Ready(Err(value)))`, it means that the server encountered an error when
209/// responding to the request, and it send back an error message.
210pub struct Response(oneshot::Receiver<Result<Value, Value>>);
211
212type AckTx = oneshot::Sender<()>;
213
214/// A future that resolves when a notification has been effictively sent to the server. It does not
215/// guarantees that the server receives it, just that it has been sent.
216pub struct Ack(oneshot::Receiver<()>);
217
218// TODO: perhaps make these bounded (for better backpressure)
219type RequestTx = mpsc::UnboundedSender<(Request, ResponseTx)>;
220type RequestRx = mpsc::UnboundedReceiver<(Request, ResponseTx)>;
221
222type NotificationTx = mpsc::UnboundedSender<(Notification, AckTx)>;
223type NotificationRx = mpsc::UnboundedReceiver<(Notification, AckTx)>;
224
225impl Future for Response {
226 type Output = Result<Value, Value>;
227
228 fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
229 trace!("Response: polling");
230 Poll::Ready(match ready!(Pin::new(&mut self.0).poll(cx)) {
231 Ok(Ok(v)) => Ok(v),
232 Ok(Err(v)) => Err(v),
233 Err(_) => Err(Value::Nil),
234 })
235 }
236}
237
238impl Future for Ack {
239 type Output = Result<(), ()>;
240
241 fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
242 trace!("Ack: polling");
243 Pin::new(&mut self.0).poll(cx).map_err(|_| ())
244 }
245}
246
247struct InnerClient {
248 client_closed: bool,
249 request_id: u32,
250 requests_rx: RequestRx,
251 notifications_rx: NotificationRx,
252 pending_requests: HashMap<u32, ResponseTx>,
253 pending_notifications: Vec<AckTx>,
254}
255
256impl InnerClient {
257 fn new() -> (Self, Client) {
258 let (requests_tx, requests_rx) = mpsc::unbounded();
259 let (notifications_tx, notifications_rx) = mpsc::unbounded();
260
261 let client_proxy = Client::from_channels(requests_tx, notifications_tx);
262
263 let client = InnerClient {
264 client_closed: false,
265 request_id: 0,
266 requests_rx,
267 notifications_rx,
268 pending_requests: HashMap::new(),
269 pending_notifications: Vec::new(),
270 };
271
272 (client, client_proxy)
273 }
274
275 fn process_notifications<T: AsyncRead + AsyncWrite>(
276 &mut self,
277 cx: &mut Context,
278 mut stream: Pin<&mut Transport<T>>,
279 ) -> io::Result<()> {
280 // Don't try to process notifications after the notifications channel was closed, because
281 // trying to read from it might cause panics.
282 if self.client_closed {
283 return Ok(());
284 }
285
286 trace!("Polling client notifications channel");
287
288 while let Poll::Ready(()) = stream.as_mut().poll_ready(cx)? {
289 match Pin::new(&mut self.notifications_rx).poll_next(cx) {
290 Poll::Ready(Some((notification, ack_sender))) => {
291 trace!("Got notification from client.");
292 stream
293 .as_mut()
294 .start_send(Message::Notification(notification))?;
295 self.pending_notifications.push(ack_sender);
296 }
297 Poll::Ready(None) => {
298 trace!("Client closed the notifications channel.");
299 self.client_closed = true;
300 break;
301 }
302 Poll::Pending => {
303 trace!("No new notification from client");
304 break;
305 }
306 }
307 }
308 Ok(())
309 }
310
311 fn send_messages<T: AsyncRead + AsyncWrite>(
312 &mut self,
313 cx: &mut Context,
314 mut stream: Pin<&mut Transport<T>>,
315 ) -> Poll<io::Result<()>> {
316 self.process_requests(cx, stream.as_mut())?;
317 self.process_notifications(cx, stream.as_mut())?;
318
319 match stream.poll_flush(cx)? {
320 Poll::Ready(()) => {
321 self.acknowledge_notifications();
322 Poll::Ready(Ok(()))
323 }
324 Poll::Pending => Poll::Pending,
325 }
326 }
327
328 fn process_requests<T: AsyncRead + AsyncWrite>(
329 &mut self,
330 cx: &mut Context,
331 mut stream: Pin<&mut Transport<T>>,
332 ) -> io::Result<()> {
333 // Don't try to process requests after the requests channel was closed, because
334 // trying to read from it might cause panics.
335 if self.client_closed {
336 return Ok(());
337 }
338 trace!("Polling client requests channel");
339 while let Poll::Ready(()) = stream.as_mut().poll_ready(cx)? {
340 match Pin::new(&mut self.requests_rx).poll_next(cx) {
341 Poll::Ready(Some((mut request, response_sender))) => {
342 self.request_id += 1;
343 trace!("Got request from client: {:?}", request);
344 request.id = self.request_id;
345 stream.as_mut().start_send(Message::Request(request))?;
346 self.pending_requests
347 .insert(self.request_id, response_sender);
348 }
349 Poll::Ready(None) => {
350 trace!("Client closed the requests channel.");
351 self.client_closed = true;
352 break;
353 }
354 Poll::Pending => {
355 trace!("No new request from client");
356 break;
357 }
358 }
359 }
360 Ok(())
361 }
362
363 fn process_response(&mut self, response: MsgPackResponse) {
364 if let Some(response_tx) = self.pending_requests.remove(&response.id) {
365 trace!("Forwarding response to the client.");
366 if let Err(e) = response_tx.send(response.result) {
367 warn!("Failed to send response to client: {:?}", e);
368 }
369 } else {
370 warn!("no pending request found for response {}", &response.id);
371 }
372 }
373
374 fn acknowledge_notifications(&mut self) {
375 for chan in self.pending_notifications.drain(..) {
376 trace!("Acknowledging notification.");
377 if let Err(e) = chan.send(()) {
378 warn!("Failed to send ack to client: {:?}", e);
379 }
380 }
381 }
382}
383
384struct Transport<T>(Framed<Compat<T>, Codec>);
385
386impl<T> Transport<T>
387where
388 T: AsyncRead + AsyncWrite,
389{
390 fn inner(self: Pin<&mut Self>) -> Pin<&mut Framed<Compat<T>, Codec>> {
391 unsafe { self.map_unchecked_mut(|this| &mut this.0) }
392 }
393}
394
395impl<T> Stream for Transport<T>
396where
397 T: AsyncRead + AsyncWrite,
398{
399 type Item = io::Result<Message>;
400
401 fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
402 trace!("Transport: polling");
403 self.inner().poll_next(cx)
404 }
405}
406
407impl<T> Sink<Message> for Transport<T>
408where
409 T: AsyncRead + AsyncWrite,
410{
411 type Error = io::Error;
412
413 fn poll_ready(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
414 self.inner().poll_ready(cx)
415 }
416
417 fn start_send(self: Pin<&mut Self>, item: Message) -> Result<(), Self::Error> {
418 self.inner().start_send(item)
419 }
420
421 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
422 self.inner().poll_flush(cx)
423 }
424
425 fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
426 self.inner().poll_close(cx)
427 }
428}
429
430impl<S: Service> MessageHandler for Server<S> {
431 fn handle_incoming(&mut self, msg: Message) {
432 match msg {
433 Message::Request(req) => {
434 let f = self.service.handle_request(&req.method, &req.params);
435 self.spawn_request_worker(req.id, f);
436 }
437 Message::Notification(note) => {
438 self.service.handle_notification(¬e.method, ¬e.params);
439 }
440 Message::Response(_) => {
441 trace!("This endpoint doesn't handle responses, ignoring the msg.");
442 }
443 };
444 }
445
446 fn send_outgoing<T: AsyncRead + AsyncWrite>(
447 &mut self,
448 cx: &mut Context,
449 sink: Pin<&mut Transport<T>>,
450 ) -> Poll<io::Result<()>> {
451 self.send_responses(cx, sink)
452 }
453}
454
455impl MessageHandler for InnerClient {
456 fn handle_incoming(&mut self, msg: Message) {
457 trace!("Received {:?}", msg);
458 if let Message::Response(response) = msg {
459 self.process_response(response);
460 } else {
461 trace!("This endpoint only handles reponses, ignoring the msg.");
462 }
463 }
464
465 fn send_outgoing<T: AsyncRead + AsyncWrite>(
466 &mut self,
467 cx: &mut Context,
468 sink: Pin<&mut Transport<T>>,
469 ) -> Poll<io::Result<()>> {
470 self.send_messages(cx, sink)
471 }
472
473 fn is_finished(&self) -> bool {
474 self.client_closed
475 && self.pending_requests.is_empty()
476 && self.pending_notifications.is_empty()
477 }
478}
479
480struct ClientAndServer<S> {
481 inner_client: InnerClient,
482 server: Server<S>,
483 client: Client,
484}
485
486impl<S: ServiceWithClient> MessageHandler for ClientAndServer<S> {
487 fn handle_incoming(&mut self, msg: Message) {
488 match msg {
489 Message::Request(req) => {
490 let f =
491 self.server
492 .service
493 .handle_request(&mut self.client, &req.method, &req.params);
494 self.server.spawn_request_worker(req.id, f);
495 }
496 Message::Notification(note) => {
497 self.server.service.handle_notification(
498 &mut self.client,
499 ¬e.method,
500 ¬e.params,
501 );
502 }
503 Message::Response(response) => self.inner_client.process_response(response),
504 };
505 }
506
507 fn send_outgoing<T: AsyncRead + AsyncWrite>(
508 &mut self,
509 cx: &mut Context,
510 mut sink: Pin<&mut Transport<T>>,
511 ) -> Poll<io::Result<()>> {
512 if let Poll::Ready(()) = self.server.send_responses(cx, sink.as_mut())? {
513 self.inner_client.send_messages(cx, sink)
514 } else {
515 Poll::Pending
516 }
517 }
518}
519
520struct InnerEndpoint<MH, T> {
521 handler: MH,
522 stream: Transport<T>,
523}
524
525impl<MH: MessageHandler + Unpin, T: AsyncRead + AsyncWrite> Future for InnerEndpoint<MH, T> {
526 type Output = io::Result<()>;
527
528 fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
529 trace!("InnerEndpoint: polling");
530 // Try to flush out all the responses that are queued up. If this doesn't succeed yet, our
531 // output sink is full. In that case, we'll apply some backpressure to our input stream by
532 // not reading from it.
533
534 // XXX It's sound to return unpin MH (ie handler: &mut MH) here since it is Unpin
535 let (handler, mut stream) = unsafe {
536 let this = self.get_unchecked_mut();
537 (&mut this.handler, Pin::new_unchecked(&mut this.stream))
538 };
539 if let Poll::Pending = handler.send_outgoing(cx, stream.as_mut())? {
540 trace!("Sink not yet flushed, waiting...");
541 return Poll::Pending;
542 }
543
544 trace!("Polling stream.");
545 while let Poll::Ready(msg) = stream.as_mut().poll_next(cx)? {
546 if let Some(msg) = msg {
547 handler.handle_incoming(msg);
548 } else {
549 trace!("Stream closed by remote peer.");
550 // FIXME: not sure if we should still continue sending responses here. Is it
551 // possible that the client closed the stream only one way and is still waiting
552 // for response? Not for TCP at least, but maybe for other transport types?
553 return Poll::Ready(Ok(()));
554 }
555 }
556
557 if handler.is_finished() {
558 trace!("inner client finished, exiting...");
559 Poll::Ready(Ok(()))
560 } else {
561 trace!("notifying the reactor that we're not done yet");
562 Poll::Pending
563 }
564 }
565}
566
567/// Creates a future for running a `Service` on a stream.
568///
569/// The returned future will run until the stream is closed; if the stream encounters an error,
570/// then the future will propagate it and terminate.
571pub fn serve<'a, S: Service + Unpin + 'a, T: AsyncRead + AsyncWrite + 'a + Send>(
572 stream: T,
573 service: S,
574) -> impl Future<Output = io::Result<()>> + 'a + Send {
575 ServerEndpoint::new(stream, service)
576}
577
578struct ServerEndpoint<S, T> {
579 inner: InnerEndpoint<Server<S>, T>,
580}
581
582impl<S: Service + Unpin, T: AsyncRead + AsyncWrite> ServerEndpoint<S, T> {
583 pub fn new(stream: T, service: S) -> Self {
584 let stream = FuturesAsyncWriteCompatExt::compat_write(stream);
585 ServerEndpoint {
586 inner: InnerEndpoint {
587 stream: Transport(Codec.framed(stream)),
588 handler: Server::new(service),
589 },
590 }
591 }
592}
593
594impl<S: Service + Unpin, T: AsyncRead + AsyncWrite> Future for ServerEndpoint<S, T> {
595 type Output = io::Result<()>;
596
597 fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
598 trace!("ServerEndpoint: polling");
599 unsafe { self.map_unchecked_mut(|this| &mut this.inner) }.poll(cx)
600 }
601}
602
603/// A `Future` for running both a client and a server at the same time.
604///
605/// The client part will be provided to the
606/// [`ServiceWithClient::handle_request`](ServiceWithClient::handle_request) and
607/// [`ServiceWithClient::handle_notification`](ServiceWithClient::handle_notification) methods,
608/// so that the server can send back requests and notifications as part of its handling duties. You
609/// may also access the client with the [`client()`](#method.client) method if you want to send
610/// additional requests.
611///
612/// The returned future needs to be spawned onto a task in order to actually run the server (and
613/// the client). It will run until the stream is closed; if the stream encounters an error, the
614/// future will propagate it and terminate.
615///
616/// ```
617/// use std::io;
618/// use rmp_rpc::ServiceWithClient;
619/// # use rmp_rpc::{Client, Endpoint, Value};
620/// use std::net::SocketAddr;
621/// use tokio::net::TcpListener;
622/// use tokio_util::compat::TokioAsyncReadCompatExt;
623///
624/// struct MyService;
625/// impl ServiceWithClient for MyService {
626/// // ...
627/// # type RequestFuture = futures::future::Ready<Result<Value, Value>>;
628/// # fn handle_request(&mut self, _: &mut Client, _: &str, _: &[Value]) -> Self::RequestFuture {
629/// # unimplemented!();
630/// # }
631/// # fn handle_notification(&mut self, _: &mut Client, _: &str, _: &[Value]) {
632/// # unimplemented!();
633/// # }
634/// }
635///
636/// #[tokio::main]
637/// async fn main() -> io::Result<()> {
638/// let addr: SocketAddr = "127.0.0.1:54321".parse().unwrap();
639///
640/// // Here's the simplest version: we listen for incoming TCP connections and run an
641/// // endpoint on each one.
642/// let server = async {
643/// # if false {
644/// # return Ok::<(), io::Error>(())
645/// # }
646/// let mut listener = TcpListener::bind(&addr).await?;
647/// loop {
648/// // Each time the listener finds a new connection, start up an endpoint to handle
649/// // it.
650/// let (socket, _) = listener.accept().await?;
651/// if let Err(e) = Endpoint::new(socket.compat(), MyService).await {
652/// println!("error on endpoint {}", e);
653/// }
654/// }
655/// };
656///
657/// // Uncomment this to run the server on the tokio event loop. This is blocking.
658/// // Press ^C to stop
659/// // tokio::run(server);
660///
661/// // Here's an alternative, where we take a handle to the client and spawn the endpoint
662/// // on its own task.
663/// let addr: SocketAddr = "127.0.0.1:65432".parse().unwrap();
664/// let server = async {
665/// # if false {
666/// # return Ok::<(), io::Error>(())
667/// # }
668/// let mut listener = TcpListener::bind(&addr).await?;
669/// loop {
670/// let (socket, _) = listener.accept().await?;
671/// let end = Endpoint::new(socket.compat(), MyService);
672/// let client = end.client();
673///
674/// // Spawn the endpoint. It will do its own thing, while we can use the client
675/// // to send requests.
676/// tokio::spawn(end);
677///
678/// // Send a request with method name "hello" and argument "world!".
679/// match client.request("hello", &["world!".into()]).await {
680/// Ok(response) => println!("{:?}", response),
681/// Err(e) => println!("got an error: {:?}", e),
682/// };
683/// // We're returning the future that came from `client.request`. This means that
684/// // `server` (and therefore our entire program) will terminate once the
685/// // response is received and the messages are printed. If you wanted to keep
686/// // the endpoint running even after the response is received, you could
687/// // (instead of spawning `end` on its own task) `join` the two futures (i.e.
688/// // `end` and the one returned by `client.request`).
689/// }
690/// };
691///
692/// // Uncomment this to run the server on the tokio event loop. This is blocking.
693/// // Press ^C to stop
694/// // tokio::run(server);
695///
696/// Ok(())
697/// }
698/// ```
699pub struct Endpoint<S, T> {
700 inner: InnerEndpoint<ClientAndServer<S>, T>,
701}
702
703impl<S: ServiceWithClient + Unpin, T: AsyncRead + AsyncWrite> Endpoint<S, T> {
704 /// Creates a new `Endpoint` on `stream`, using `service` to handle requests and notifications.
705 pub fn new(stream: T, service: S) -> Self {
706 let (inner_client, client) = InnerClient::new();
707 let stream = FuturesAsyncWriteCompatExt::compat_write(stream);
708 Endpoint {
709 inner: InnerEndpoint {
710 stream: Transport(Codec.framed(stream)),
711 handler: ClientAndServer {
712 inner_client,
713 client,
714 server: Server::new(service),
715 },
716 },
717 }
718 }
719
720 /// Returns a handle to the client half of this `Endpoint`, which can be used for sending
721 /// requests and notifications.
722 pub fn client(&self) -> Client {
723 self.inner.handler.client.clone()
724 }
725}
726
727impl<S: ServiceWithClient + Unpin, T: AsyncRead + AsyncWrite> Future for Endpoint<S, T> {
728 type Output = io::Result<()>;
729
730 fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
731 trace!("Endpoint: polling");
732 unsafe { self.map_unchecked_mut(|this| &mut this.inner) }.poll(cx)
733 }
734}
735
736/// A client that sends requests and notifications to a remote MessagePack-RPC server.
737#[derive(Clone)]
738pub struct Client {
739 requests_tx: RequestTx,
740 notifications_tx: NotificationTx,
741}
742
743impl Client {
744 /// Creates a new `Client` that can be used to send requests and notifications on the given
745 /// stream.
746 ///
747 /// ```
748 /// use std::io;
749 /// use std::net::SocketAddr;
750 ///
751 /// use rmp_rpc::Client;
752 /// use tokio::net::TcpStream;
753 /// use tokio_util::compat::TokioAsyncReadCompatExt;
754 ///
755 /// let addr: SocketAddr = "127.0.0.1:54321".parse().unwrap();
756 ///
757 /// let f = async {
758 /// // Create a future that connects to the server, and send a notification and a request.
759 /// let socket = TcpStream::connect(&addr).await?;
760 /// let client = Client::new(socket.compat());
761 ///
762 /// // Use the client to send a notification.
763 /// // The future returned by client.notify() finishes when the notification
764 /// // has been sent, in case we care about that. We can also just drop it.
765 /// client.notify("hello", &[]);
766 ///
767 /// // Use the client to send a request with the method "dostuff", and two parameters:
768 /// // the string "foo" and the integer "42".
769 /// // The future returned by client.request() finishes when the response
770 /// // is received.
771 /// if let Ok(resp) = client.request("dostuff", &["foo".into(), 42.into()]).await {
772 /// println!("Response: {:?}", resp);
773 /// }
774 /// Ok::<_, io::Error>(())
775 /// };
776 ///
777 /// // Uncomment this to run the client, blocking until the response was received and the
778 /// // message was printed.
779 /// // tokio::run(f);
780 /// ```
781 /// # Panics
782 ///
783 /// This function will panic if the default executor is not set or if spawning
784 /// onto the default executor returns an error. To avoid the panic, use
785 ///
786 /// [`DefaultExecutor`](tokio::executor::DefaultExecutor)
787 pub fn new<T: AsyncRead + AsyncWrite + 'static + Send>(stream: T) -> Self {
788 let (inner_client, client) = InnerClient::new();
789 let stream = FuturesAsyncWriteCompatExt::compat_write(stream);
790 let endpoint = InnerEndpoint {
791 stream: Transport(Codec.framed(stream)),
792 handler: inner_client,
793 };
794 // We swallow io::Errors. The client will see an error if it has any outstanding requests
795 // or if it tries to send anything, because the endpoint has terminated.
796 tokio::spawn(
797 endpoint.map_err(|e| trace!("Client endpoint closed because of an error: {}", e)),
798 );
799 client
800 }
801
802 fn from_channels(requests_tx: RequestTx, notifications_tx: NotificationTx) -> Self {
803 Client {
804 requests_tx,
805 notifications_tx,
806 }
807 }
808
809 /// Send a `MessagePack-RPC` request
810 pub fn request(&self, method: &str, params: &[Value]) -> Response {
811 trace!("New request (method={}, params={:?})", method, params);
812 let request = Request {
813 id: 0,
814 method: method.to_owned(),
815 params: Vec::from(params),
816 };
817 let (tx, rx) = oneshot::channel();
818 // If send returns an Err, its because the other side has been dropped. By ignoring it,
819 // we are just dropping the `tx`, which will mean the rx will return Canceled when
820 // polled. In turn, that is translated into a BrokenPipe, which conveys the proper
821 // error.
822 let _ = mpsc::UnboundedSender::unbounded_send(&self.requests_tx, (request, tx));
823 Response(rx)
824 }
825
826 /// Send a `MessagePack-RPC` notification
827 pub fn notify(&self, method: &str, params: &[Value]) -> Ack {
828 trace!("New notification (method={}, params={:?})", method, params);
829 let notification = Notification {
830 method: method.to_owned(),
831 params: Vec::from(params),
832 };
833 let (tx, rx) = oneshot::channel();
834 let _ = mpsc::UnboundedSender::unbounded_send(&self.notifications_tx, (notification, tx));
835 Ack(rx)
836 }
837}
838
839impl Future for Client {
840 type Output = io::Result<()>;
841
842 fn poll(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Self::Output> {
843 trace!("Client: polling");
844 Poll::Ready(Ok(()))
845 }
846}