requiem_server/
service.rs

1use std::marker::PhantomData;
2use std::net::SocketAddr;
3use std::task::{Context, Poll};
4use std::time::Duration;
5
6use requiem_rt::spawn;
7use requiem_service::{self as actix, Service, ServiceFactory as ActixServiceFactory};
8use requiem_utils::counter::CounterGuard;
9use futures::future::{err, ok, LocalBoxFuture, Ready};
10use futures::{FutureExt, TryFutureExt};
11use log::error;
12
13use super::Token;
14use crate::socket::{FromStream, StdStream};
15
16/// Server message
17pub(crate) enum ServerMessage {
18    /// New stream
19    Connect(StdStream),
20    /// Gracefull shutdown
21    Shutdown(Duration),
22    /// Force shutdown
23    ForceShutdown,
24}
25
26pub trait ServiceFactory<Stream: FromStream>: Send + Clone + 'static {
27    type Factory: actix::ServiceFactory<Config = (), Request = Stream>;
28
29    fn create(&self) -> Self::Factory;
30}
31
32pub(crate) trait InternalServiceFactory: Send {
33    fn name(&self, token: Token) -> &str;
34
35    fn clone_factory(&self) -> Box<dyn InternalServiceFactory>;
36
37    fn create(&self) -> LocalBoxFuture<'static, Result<Vec<(Token, BoxedServerService)>, ()>>;
38}
39
40pub(crate) type BoxedServerService = Box<
41    dyn Service<
42        Request = (Option<CounterGuard>, ServerMessage),
43        Response = (),
44        Error = (),
45        Future = Ready<Result<(), ()>>,
46    >,
47>;
48
49pub(crate) struct StreamService<T> {
50    service: T,
51}
52
53impl<T> StreamService<T> {
54    pub(crate) fn new(service: T) -> Self {
55        StreamService { service }
56    }
57}
58
59impl<T, I> Service for StreamService<T>
60where
61    T: Service<Request = I>,
62    T::Future: 'static,
63    T::Error: 'static,
64    I: FromStream,
65{
66    type Request = (Option<CounterGuard>, ServerMessage);
67    type Response = ();
68    type Error = ();
69    type Future = Ready<Result<(), ()>>;
70
71    fn poll_ready(&mut self, ctx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
72        self.service.poll_ready(ctx).map_err(|_| ())
73    }
74
75    fn call(&mut self, (guard, req): (Option<CounterGuard>, ServerMessage)) -> Self::Future {
76        match req {
77            ServerMessage::Connect(stream) => {
78                let stream = FromStream::from_stdstream(stream).map_err(|e| {
79                    error!("Can not convert to an async tcp stream: {}", e);
80                });
81
82                if let Ok(stream) = stream {
83                    let f = self.service.call(stream);
84                    spawn(async move {
85                        let _ = f.await;
86                        drop(guard);
87                    });
88                    ok(())
89                } else {
90                    err(())
91                }
92            }
93            _ => ok(()),
94        }
95    }
96}
97
98pub(crate) struct StreamNewService<F: ServiceFactory<Io>, Io: FromStream> {
99    name: String,
100    inner: F,
101    token: Token,
102    addr: SocketAddr,
103    _t: PhantomData<Io>,
104}
105
106impl<F, Io> StreamNewService<F, Io>
107where
108    F: ServiceFactory<Io>,
109    Io: FromStream + Send + 'static,
110{
111    pub(crate) fn create(
112        name: String,
113        token: Token,
114        inner: F,
115        addr: SocketAddr,
116    ) -> Box<dyn InternalServiceFactory> {
117        Box::new(Self {
118            name,
119            token,
120            inner,
121            addr,
122            _t: PhantomData,
123        })
124    }
125}
126
127impl<F, Io> InternalServiceFactory for StreamNewService<F, Io>
128where
129    F: ServiceFactory<Io>,
130    Io: FromStream + Send + 'static,
131{
132    fn name(&self, _: Token) -> &str {
133        &self.name
134    }
135
136    fn clone_factory(&self) -> Box<dyn InternalServiceFactory> {
137        Box::new(Self {
138            name: self.name.clone(),
139            inner: self.inner.clone(),
140            token: self.token,
141            addr: self.addr,
142            _t: PhantomData,
143        })
144    }
145
146    fn create(&self) -> LocalBoxFuture<'static, Result<Vec<(Token, BoxedServerService)>, ()>> {
147        let token = self.token;
148        self.inner
149            .create()
150            .new_service(())
151            .map_err(|_| ())
152            .map_ok(move |inner| {
153                let service: BoxedServerService = Box::new(StreamService::new(inner));
154                vec![(token, service)]
155            })
156            .boxed_local()
157    }
158}
159
160impl InternalServiceFactory for Box<dyn InternalServiceFactory> {
161    fn name(&self, token: Token) -> &str {
162        self.as_ref().name(token)
163    }
164
165    fn clone_factory(&self) -> Box<dyn InternalServiceFactory> {
166        self.as_ref().clone_factory()
167    }
168
169    fn create(&self) -> LocalBoxFuture<'static, Result<Vec<(Token, BoxedServerService)>, ()>> {
170        self.as_ref().create()
171    }
172}
173
174impl<F, T, I> ServiceFactory<I> for F
175where
176    F: Fn() -> T + Send + Clone + 'static,
177    T: actix::ServiceFactory<Config = (), Request = I>,
178    I: FromStream,
179{
180    type Factory = T;
181
182    fn create(&self) -> T {
183        (self)()
184    }
185}