forked_tarpc/server/
tokio.rs

1use super::{Channel, Requests, Serve};
2use futures::{prelude::*, ready, task::*};
3use pin_project::pin_project;
4use std::pin::Pin;
5
6/// A future that drives the server by [spawning](tokio::spawn) a [`TokioChannelExecutor`](TokioChannelExecutor)
7/// for each new channel. Returned by
8/// [`Incoming::execute`](crate::server::incoming::Incoming::execute).
9#[must_use]
10#[pin_project]
11#[derive(Debug)]
12pub struct TokioServerExecutor<T, S> {
13    #[pin]
14    inner: T,
15    serve: S,
16}
17
18impl<T, S> TokioServerExecutor<T, S> {
19    pub(crate) fn new(inner: T, serve: S) -> Self {
20        Self { inner, serve }
21    }
22}
23
24/// A future that drives the server by [spawning](tokio::spawn) each [response
25/// handler](super::InFlightRequest::execute) on tokio's default executor. Returned by
26/// [`Channel::execute`](crate::server::Channel::execute).
27#[must_use]
28#[pin_project]
29#[derive(Debug)]
30pub struct TokioChannelExecutor<T, S> {
31    #[pin]
32    inner: T,
33    serve: S,
34}
35
36impl<T, S> TokioServerExecutor<T, S> {
37    fn inner_pin_mut<'a>(self: &'a mut Pin<&mut Self>) -> Pin<&'a mut T> {
38        self.as_mut().project().inner
39    }
40}
41
42impl<T, S> TokioChannelExecutor<T, S> {
43    fn inner_pin_mut<'a>(self: &'a mut Pin<&mut Self>) -> Pin<&'a mut T> {
44        self.as_mut().project().inner
45    }
46}
47
48// Send + 'static execution helper methods.
49
50impl<C> Requests<C>
51where
52    C: Channel,
53    C::Req: Send + 'static,
54    C::Resp: Send + 'static,
55{
56    /// Executes all requests using the given service function. Requests are handled concurrently
57    /// by [spawning](::tokio::spawn) each handler on tokio's default executor.
58    pub fn execute<S>(self, serve: S) -> TokioChannelExecutor<Self, S>
59    where
60        S: Serve<C::Req, Resp = C::Resp> + Send + 'static,
61    {
62        TokioChannelExecutor { inner: self, serve }
63    }
64}
65
66impl<St, C, Se> Future for TokioServerExecutor<St, Se>
67where
68    St: Sized + Stream<Item = C>,
69    C: Channel + Send + 'static,
70    C::Req: Send + 'static,
71    C::Resp: Send + 'static,
72    Se: Serve<C::Req, Resp = C::Resp> + Send + 'static + Clone,
73    Se::Fut: Send,
74{
75    type Output = ();
76
77    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
78        while let Some(channel) = ready!(self.inner_pin_mut().poll_next(cx)) {
79            tokio::spawn(channel.execute(self.serve.clone()));
80        }
81        tracing::info!("Server shutting down.");
82        Poll::Ready(())
83    }
84}
85
86impl<C, S> Future for TokioChannelExecutor<Requests<C>, S>
87where
88    C: Channel + 'static,
89    C::Req: Send + 'static,
90    C::Resp: Send + 'static,
91    S: Serve<C::Req, Resp = C::Resp> + Send + 'static + Clone,
92    S::Fut: Send,
93{
94    type Output = ();
95
96    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
97        while let Some(response_handler) = ready!(self.inner_pin_mut().poll_next(cx)) {
98            match response_handler {
99                Ok(resp) => {
100                    let server = self.serve.clone();
101                    tokio::spawn(async move {
102                        resp.execute(server).await;
103                    });
104                }
105                Err(e) => {
106                    tracing::warn!("Requests stream errored out: {}", e);
107                    break;
108                }
109            }
110        }
111        Poll::Ready(())
112    }
113}