forked_tarpc/server/
tokio.rs1use super::{Channel, Requests, Serve};
2use futures::{prelude::*, ready, task::*};
3use pin_project::pin_project;
4use std::pin::Pin;
5
6#[must_use]
10#[pin_project]
11#[derive(Debug)]
12pub struct TokioServerExecutor<T, S> {
13 #[pin]
14 inner: T,
15 serve: S,
16}
17
18impl<T, S> TokioServerExecutor<T, S> {
19 pub(crate) fn new(inner: T, serve: S) -> Self {
20 Self { inner, serve }
21 }
22}
23
24#[must_use]
28#[pin_project]
29#[derive(Debug)]
30pub struct TokioChannelExecutor<T, S> {
31 #[pin]
32 inner: T,
33 serve: S,
34}
35
36impl<T, S> TokioServerExecutor<T, S> {
37 fn inner_pin_mut<'a>(self: &'a mut Pin<&mut Self>) -> Pin<&'a mut T> {
38 self.as_mut().project().inner
39 }
40}
41
42impl<T, S> TokioChannelExecutor<T, S> {
43 fn inner_pin_mut<'a>(self: &'a mut Pin<&mut Self>) -> Pin<&'a mut T> {
44 self.as_mut().project().inner
45 }
46}
47
48impl<C> Requests<C>
51where
52 C: Channel,
53 C::Req: Send + 'static,
54 C::Resp: Send + 'static,
55{
56 pub fn execute<S>(self, serve: S) -> TokioChannelExecutor<Self, S>
59 where
60 S: Serve<C::Req, Resp = C::Resp> + Send + 'static,
61 {
62 TokioChannelExecutor { inner: self, serve }
63 }
64}
65
66impl<St, C, Se> Future for TokioServerExecutor<St, Se>
67where
68 St: Sized + Stream<Item = C>,
69 C: Channel + Send + 'static,
70 C::Req: Send + 'static,
71 C::Resp: Send + 'static,
72 Se: Serve<C::Req, Resp = C::Resp> + Send + 'static + Clone,
73 Se::Fut: Send,
74{
75 type Output = ();
76
77 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
78 while let Some(channel) = ready!(self.inner_pin_mut().poll_next(cx)) {
79 tokio::spawn(channel.execute(self.serve.clone()));
80 }
81 tracing::info!("Server shutting down.");
82 Poll::Ready(())
83 }
84}
85
86impl<C, S> Future for TokioChannelExecutor<Requests<C>, S>
87where
88 C: Channel + 'static,
89 C::Req: Send + 'static,
90 C::Resp: Send + 'static,
91 S: Serve<C::Req, Resp = C::Resp> + Send + 'static + Clone,
92 S::Fut: Send,
93{
94 type Output = ();
95
96 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
97 while let Some(response_handler) = ready!(self.inner_pin_mut().poll_next(cx)) {
98 match response_handler {
99 Ok(resp) => {
100 let server = self.serve.clone();
101 tokio::spawn(async move {
102 resp.execute(server).await;
103 });
104 }
105 Err(e) => {
106 tracing::warn!("Requests stream errored out: {}", e);
107 break;
108 }
109 }
110 }
111 Poll::Ready(())
112 }
113}