1use std::{fmt, future::Future, marker::PhantomData, sync::Arc};
2
3use ntex_bytes::PoolId;
4use ntex_net::Io;
5use ntex_service::{boxed, Service, ServiceCtx, ServiceFactory};
6use ntex_util::future::{BoxFuture, Ready};
7
8use super::{socket::Stream, Config, Token};
9
10pub(super) type BoxServerService = boxed::BoxServiceFactory<(), Io, (), (), ()>;
11pub(crate) type FactoryServiceType = Box<dyn FactoryService>;
12
13#[derive(Debug)]
14pub(crate) struct NetService {
15 pub(crate) name: Arc<str>,
16 pub(crate) tokens: Vec<(Token, &'static str)>,
17 pub(crate) factory: BoxServerService,
18 pub(crate) pool: PoolId,
19}
20
21pub(crate) trait FactoryService: Send {
22 fn name(&self, _: Token) -> &str {
23 ""
24 }
25
26 fn set_tag(&mut self, _: Token, _: &'static str) {}
27
28 fn clone_factory(&self) -> Box<dyn FactoryService>;
29
30 fn create(&self) -> BoxFuture<'static, Result<Vec<NetService>, ()>>;
31}
32
33pub(crate) fn create_boxed_factory<S>(name: String, factory: S) -> BoxServerService
34where
35 S: ServiceFactory<Io> + 'static,
36{
37 boxed::factory(ServerServiceFactory {
38 name: Arc::from(name),
39 factory,
40 })
41}
42
43pub(crate) fn create_factory_service<F, R>(
44 name: String,
45 tokens: Vec<(Token, &'static str)>,
46 factory: F,
47) -> Box<dyn FactoryService>
48where
49 F: Fn(Config) -> R + Send + Clone + 'static,
50 R: ServiceFactory<Io> + 'static,
51{
52 Box::new(Factory {
53 tokens,
54 name: name.clone(),
55 factory: move |cfg| {
56 Ready::Ok::<_, &'static str>(create_boxed_factory(name.clone(), (factory)(cfg)))
57 },
58 _t: PhantomData,
59 })
60}
61
62struct Factory<F, R, E> {
63 name: String,
64 tokens: Vec<(Token, &'static str)>,
65 factory: F,
66 _t: PhantomData<(R, E)>,
67}
68
69impl<F, R, E> FactoryService for Factory<F, R, E>
70where
71 F: Fn(Config) -> R + Send + Clone + 'static,
72 R: Future<Output = Result<BoxServerService, E>> + 'static,
73 E: fmt::Display + 'static,
74{
75 fn name(&self, _: Token) -> &str {
76 &self.name
77 }
78
79 fn clone_factory(&self) -> Box<dyn FactoryService> {
80 Box::new(Self {
81 name: self.name.clone(),
82 tokens: self.tokens.clone(),
83 factory: self.factory.clone(),
84 _t: PhantomData,
85 })
86 }
87
88 fn set_tag(&mut self, token: Token, tag: &'static str) {
89 for item in &mut self.tokens {
90 if item.0 == token {
91 item.1 = tag;
92 }
93 }
94 }
95
96 fn create(&self) -> BoxFuture<'static, Result<Vec<NetService>, ()>> {
97 let cfg = Config::default();
98 let name = self.name.clone();
99 let mut tokens = self.tokens.clone();
100 let factory_fut = (self.factory)(cfg.clone());
101
102 Box::pin(async move {
103 let factory = factory_fut.await.map_err(|_| {
104 log::error!("Cannot create {:?} service", name);
105 })?;
106 if let Some(tag) = cfg.get_tag() {
107 for item in &mut tokens {
108 item.1 = tag;
109 }
110 }
111
112 Ok(vec![NetService {
113 tokens,
114 factory,
115 name: Arc::from(name),
116 pool: cfg.get_pool_id(),
117 }])
118 })
119 }
120}
121
122struct ServerServiceFactory<S> {
123 name: Arc<str>,
124 factory: S,
125}
126
127impl<S> ServiceFactory<Io> for ServerServiceFactory<S>
128where
129 S: ServiceFactory<Io>,
130{
131 type Response = ();
132 type Error = ();
133 type Service = ServerService<S::Service>;
134 type InitError = ();
135
136 async fn create(&self, _: ()) -> Result<Self::Service, Self::InitError> {
137 self.factory
138 .create(())
139 .await
140 .map(|inner| ServerService { inner })
141 .map_err(|_| log::error!("Cannot construct {:?} service", self.name))
142 }
143}
144
145struct ServerService<S> {
146 inner: S,
147}
148
149impl<S> Service<Io> for ServerService<S>
150where
151 S: Service<Io>,
152{
153 type Response = ();
154 type Error = ();
155
156 async fn ready(&self, ctx: ServiceCtx<'_, Self>) -> Result<(), ()> {
157 ctx.ready(&self.inner).await.map_err(|_| ())
158 }
159
160 async fn call(&self, req: Io, ctx: ServiceCtx<'_, Self>) -> Result<(), ()> {
161 ctx.call(&self.inner, req).await.map(|_| ()).map_err(|_| ())
162 }
163
164 ntex_service::forward_shutdown!(inner);
165}
166
167unsafe impl<F, R, E> Send for Factory<F, R, E> where F: Send {}
170
171pub(crate) trait OnWorkerStart {
172 fn clone_fn(&self) -> Box<dyn OnWorkerStart + Send>;
173
174 fn run(&self) -> BoxFuture<'static, Result<(), ()>>;
175}
176
177pub(super) struct OnWorkerStartWrapper<F, R, E> {
178 pub(super) f: F,
179 pub(super) _t: PhantomData<(R, E)>,
180}
181
182unsafe impl<F, R, E> Send for OnWorkerStartWrapper<F, R, E> where F: Send {}
183
184impl<F, R, E> OnWorkerStartWrapper<F, R, E>
185where
186 F: Fn() -> R + Send + Clone + 'static,
187 R: Future<Output = Result<(), E>> + 'static,
188 E: fmt::Display + 'static,
189{
190 pub(super) fn create(f: F) -> Box<dyn OnWorkerStart + Send> {
191 Box::new(Self { f, _t: PhantomData })
192 }
193}
194
195impl<F, R, E> OnWorkerStart for OnWorkerStartWrapper<F, R, E>
196where
197 F: Fn() -> R + Send + Clone + 'static,
198 R: Future<Output = Result<(), E>> + 'static,
199 E: fmt::Display + 'static,
200{
201 fn clone_fn(&self) -> Box<dyn OnWorkerStart + Send> {
202 Box::new(Self {
203 f: self.f.clone(),
204 _t: PhantomData,
205 })
206 }
207
208 fn run(&self) -> BoxFuture<'static, Result<(), ()>> {
209 let f = self.f.clone();
210 Box::pin(async move {
211 (f)().await.map_err(|e| {
212 log::error!("On worker start callback failed: {}", e);
213 })
214 })
215 }
216}
217
218pub(crate) trait OnAccept {
219 fn clone_fn(&self) -> Box<dyn OnAccept + Send>;
220
221 fn run(&self, name: Arc<str>, stream: Stream)
222 -> BoxFuture<'static, Result<Stream, ()>>;
223}
224
225pub(super) struct OnAcceptWrapper<F, R, E> {
226 pub(super) f: F,
227 pub(super) _t: PhantomData<(R, E)>,
228}
229
230unsafe impl<F, R, E> Send for OnAcceptWrapper<F, R, E> where F: Send {}
231
232impl<F, R, E> OnAcceptWrapper<F, R, E>
233where
234 F: Fn(Arc<str>, Stream) -> R + Send + Clone + 'static,
235 R: Future<Output = Result<Stream, E>> + 'static,
236 E: fmt::Display + 'static,
237{
238 pub(super) fn create(f: F) -> Box<dyn OnAccept + Send> {
239 Box::new(Self { f, _t: PhantomData })
240 }
241}
242
243impl<F, R, E> OnAccept for OnAcceptWrapper<F, R, E>
244where
245 F: Fn(Arc<str>, Stream) -> R + Send + Clone + 'static,
246 R: Future<Output = Result<Stream, E>> + 'static,
247 E: fmt::Display + 'static,
248{
249 fn clone_fn(&self) -> Box<dyn OnAccept + Send> {
250 Box::new(Self {
251 f: self.f.clone(),
252 _t: PhantomData,
253 })
254 }
255
256 fn run(
257 &self,
258 name: Arc<str>,
259 stream: Stream,
260 ) -> BoxFuture<'static, Result<Stream, ()>> {
261 let f = self.f.clone();
262 Box::pin(async move {
263 (f)(name, stream).await.map_err(|e| {
264 log::error!("On accept callback failed: {}", e);
265 })
266 })
267 }
268}