requiem_server/
service.rs1use std::marker::PhantomData;
2use std::net::SocketAddr;
3use std::task::{Context, Poll};
4use std::time::Duration;
5
6use requiem_rt::spawn;
7use requiem_service::{self as actix, Service, ServiceFactory as ActixServiceFactory};
8use requiem_utils::counter::CounterGuard;
9use futures::future::{err, ok, LocalBoxFuture, Ready};
10use futures::{FutureExt, TryFutureExt};
11use log::error;
12
13use super::Token;
14use crate::socket::{FromStream, StdStream};
15
16pub(crate) enum ServerMessage {
18 Connect(StdStream),
20 Shutdown(Duration),
22 ForceShutdown,
24}
25
26pub trait ServiceFactory<Stream: FromStream>: Send + Clone + 'static {
27 type Factory: actix::ServiceFactory<Config = (), Request = Stream>;
28
29 fn create(&self) -> Self::Factory;
30}
31
32pub(crate) trait InternalServiceFactory: Send {
33 fn name(&self, token: Token) -> &str;
34
35 fn clone_factory(&self) -> Box<dyn InternalServiceFactory>;
36
37 fn create(&self) -> LocalBoxFuture<'static, Result<Vec<(Token, BoxedServerService)>, ()>>;
38}
39
40pub(crate) type BoxedServerService = Box<
41 dyn Service<
42 Request = (Option<CounterGuard>, ServerMessage),
43 Response = (),
44 Error = (),
45 Future = Ready<Result<(), ()>>,
46 >,
47>;
48
49pub(crate) struct StreamService<T> {
50 service: T,
51}
52
53impl<T> StreamService<T> {
54 pub(crate) fn new(service: T) -> Self {
55 StreamService { service }
56 }
57}
58
59impl<T, I> Service for StreamService<T>
60where
61 T: Service<Request = I>,
62 T::Future: 'static,
63 T::Error: 'static,
64 I: FromStream,
65{
66 type Request = (Option<CounterGuard>, ServerMessage);
67 type Response = ();
68 type Error = ();
69 type Future = Ready<Result<(), ()>>;
70
71 fn poll_ready(&mut self, ctx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
72 self.service.poll_ready(ctx).map_err(|_| ())
73 }
74
75 fn call(&mut self, (guard, req): (Option<CounterGuard>, ServerMessage)) -> Self::Future {
76 match req {
77 ServerMessage::Connect(stream) => {
78 let stream = FromStream::from_stdstream(stream).map_err(|e| {
79 error!("Can not convert to an async tcp stream: {}", e);
80 });
81
82 if let Ok(stream) = stream {
83 let f = self.service.call(stream);
84 spawn(async move {
85 let _ = f.await;
86 drop(guard);
87 });
88 ok(())
89 } else {
90 err(())
91 }
92 }
93 _ => ok(()),
94 }
95 }
96}
97
98pub(crate) struct StreamNewService<F: ServiceFactory<Io>, Io: FromStream> {
99 name: String,
100 inner: F,
101 token: Token,
102 addr: SocketAddr,
103 _t: PhantomData<Io>,
104}
105
106impl<F, Io> StreamNewService<F, Io>
107where
108 F: ServiceFactory<Io>,
109 Io: FromStream + Send + 'static,
110{
111 pub(crate) fn create(
112 name: String,
113 token: Token,
114 inner: F,
115 addr: SocketAddr,
116 ) -> Box<dyn InternalServiceFactory> {
117 Box::new(Self {
118 name,
119 token,
120 inner,
121 addr,
122 _t: PhantomData,
123 })
124 }
125}
126
127impl<F, Io> InternalServiceFactory for StreamNewService<F, Io>
128where
129 F: ServiceFactory<Io>,
130 Io: FromStream + Send + 'static,
131{
132 fn name(&self, _: Token) -> &str {
133 &self.name
134 }
135
136 fn clone_factory(&self) -> Box<dyn InternalServiceFactory> {
137 Box::new(Self {
138 name: self.name.clone(),
139 inner: self.inner.clone(),
140 token: self.token,
141 addr: self.addr,
142 _t: PhantomData,
143 })
144 }
145
146 fn create(&self) -> LocalBoxFuture<'static, Result<Vec<(Token, BoxedServerService)>, ()>> {
147 let token = self.token;
148 self.inner
149 .create()
150 .new_service(())
151 .map_err(|_| ())
152 .map_ok(move |inner| {
153 let service: BoxedServerService = Box::new(StreamService::new(inner));
154 vec![(token, service)]
155 })
156 .boxed_local()
157 }
158}
159
160impl InternalServiceFactory for Box<dyn InternalServiceFactory> {
161 fn name(&self, token: Token) -> &str {
162 self.as_ref().name(token)
163 }
164
165 fn clone_factory(&self) -> Box<dyn InternalServiceFactory> {
166 self.as_ref().clone_factory()
167 }
168
169 fn create(&self) -> LocalBoxFuture<'static, Result<Vec<(Token, BoxedServerService)>, ()>> {
170 self.as_ref().create()
171 }
172}
173
174impl<F, T, I> ServiceFactory<I> for F
175where
176 F: Fn() -> T + Send + Clone + 'static,
177 T: actix::ServiceFactory<Config = (), Request = I>,
178 I: FromStream,
179{
180 type Factory = T;
181
182 fn create(&self) -> T {
183 (self)()
184 }
185}