ntex_server/net/
factory.rs

1use std::{fmt, future::Future, marker::PhantomData, sync::Arc};
2
3use ntex_bytes::PoolId;
4use ntex_net::Io;
5use ntex_service::{boxed, Service, ServiceCtx, ServiceFactory};
6use ntex_util::future::{BoxFuture, Ready};
7
8use super::{socket::Stream, Config, Token};
9
10pub(super) type BoxServerService = boxed::BoxServiceFactory<(), Io, (), (), ()>;
11pub(crate) type FactoryServiceType = Box<dyn FactoryService>;
12
13#[derive(Debug)]
14pub(crate) struct NetService {
15    pub(crate) name: Arc<str>,
16    pub(crate) tokens: Vec<(Token, &'static str)>,
17    pub(crate) factory: BoxServerService,
18    pub(crate) pool: PoolId,
19}
20
21pub(crate) trait FactoryService: Send {
22    fn name(&self, _: Token) -> &str {
23        ""
24    }
25
26    fn set_tag(&mut self, _: Token, _: &'static str) {}
27
28    fn clone_factory(&self) -> Box<dyn FactoryService>;
29
30    fn create(&self) -> BoxFuture<'static, Result<Vec<NetService>, ()>>;
31}
32
33pub(crate) fn create_boxed_factory<S>(name: String, factory: S) -> BoxServerService
34where
35    S: ServiceFactory<Io> + 'static,
36{
37    boxed::factory(ServerServiceFactory {
38        name: Arc::from(name),
39        factory,
40    })
41}
42
43pub(crate) fn create_factory_service<F, R>(
44    name: String,
45    tokens: Vec<(Token, &'static str)>,
46    factory: F,
47) -> Box<dyn FactoryService>
48where
49    F: Fn(Config) -> R + Send + Clone + 'static,
50    R: ServiceFactory<Io> + 'static,
51{
52    Box::new(Factory {
53        tokens,
54        name: name.clone(),
55        factory: move |cfg| {
56            Ready::Ok::<_, &'static str>(create_boxed_factory(name.clone(), (factory)(cfg)))
57        },
58        _t: PhantomData,
59    })
60}
61
62struct Factory<F, R, E> {
63    name: String,
64    tokens: Vec<(Token, &'static str)>,
65    factory: F,
66    _t: PhantomData<(R, E)>,
67}
68
69impl<F, R, E> FactoryService for Factory<F, R, E>
70where
71    F: Fn(Config) -> R + Send + Clone + 'static,
72    R: Future<Output = Result<BoxServerService, E>> + 'static,
73    E: fmt::Display + 'static,
74{
75    fn name(&self, _: Token) -> &str {
76        &self.name
77    }
78
79    fn clone_factory(&self) -> Box<dyn FactoryService> {
80        Box::new(Self {
81            name: self.name.clone(),
82            tokens: self.tokens.clone(),
83            factory: self.factory.clone(),
84            _t: PhantomData,
85        })
86    }
87
88    fn set_tag(&mut self, token: Token, tag: &'static str) {
89        for item in &mut self.tokens {
90            if item.0 == token {
91                item.1 = tag;
92            }
93        }
94    }
95
96    fn create(&self) -> BoxFuture<'static, Result<Vec<NetService>, ()>> {
97        let cfg = Config::default();
98        let name = self.name.clone();
99        let mut tokens = self.tokens.clone();
100        let factory_fut = (self.factory)(cfg.clone());
101
102        Box::pin(async move {
103            let factory = factory_fut.await.map_err(|_| {
104                log::error!("Cannot create {:?} service", name);
105            })?;
106            if let Some(tag) = cfg.get_tag() {
107                for item in &mut tokens {
108                    item.1 = tag;
109                }
110            }
111
112            Ok(vec![NetService {
113                tokens,
114                factory,
115                name: Arc::from(name),
116                pool: cfg.get_pool_id(),
117            }])
118        })
119    }
120}
121
122struct ServerServiceFactory<S> {
123    name: Arc<str>,
124    factory: S,
125}
126
127impl<S> ServiceFactory<Io> for ServerServiceFactory<S>
128where
129    S: ServiceFactory<Io>,
130{
131    type Response = ();
132    type Error = ();
133    type Service = ServerService<S::Service>;
134    type InitError = ();
135
136    async fn create(&self, _: ()) -> Result<Self::Service, Self::InitError> {
137        self.factory
138            .create(())
139            .await
140            .map(|inner| ServerService { inner })
141            .map_err(|_| log::error!("Cannot construct {:?} service", self.name))
142    }
143}
144
145struct ServerService<S> {
146    inner: S,
147}
148
149impl<S> Service<Io> for ServerService<S>
150where
151    S: Service<Io>,
152{
153    type Response = ();
154    type Error = ();
155
156    async fn ready(&self, ctx: ServiceCtx<'_, Self>) -> Result<(), ()> {
157        ctx.ready(&self.inner).await.map_err(|_| ())
158    }
159
160    async fn call(&self, req: Io, ctx: ServiceCtx<'_, Self>) -> Result<(), ()> {
161        ctx.call(&self.inner, req).await.map(|_| ()).map_err(|_| ())
162    }
163
164    ntex_service::forward_shutdown!(inner);
165}
166
167// SAFETY: Send cannot be provided authomatically because of E and R params
168// but R always get executed in one thread and never leave it
169unsafe impl<F, R, E> Send for Factory<F, R, E> where F: Send {}
170
171pub(crate) trait OnWorkerStart {
172    fn clone_fn(&self) -> Box<dyn OnWorkerStart + Send>;
173
174    fn run(&self) -> BoxFuture<'static, Result<(), ()>>;
175}
176
177pub(super) struct OnWorkerStartWrapper<F, R, E> {
178    pub(super) f: F,
179    pub(super) _t: PhantomData<(R, E)>,
180}
181
182unsafe impl<F, R, E> Send for OnWorkerStartWrapper<F, R, E> where F: Send {}
183
184impl<F, R, E> OnWorkerStartWrapper<F, R, E>
185where
186    F: Fn() -> R + Send + Clone + 'static,
187    R: Future<Output = Result<(), E>> + 'static,
188    E: fmt::Display + 'static,
189{
190    pub(super) fn create(f: F) -> Box<dyn OnWorkerStart + Send> {
191        Box::new(Self { f, _t: PhantomData })
192    }
193}
194
195impl<F, R, E> OnWorkerStart for OnWorkerStartWrapper<F, R, E>
196where
197    F: Fn() -> R + Send + Clone + 'static,
198    R: Future<Output = Result<(), E>> + 'static,
199    E: fmt::Display + 'static,
200{
201    fn clone_fn(&self) -> Box<dyn OnWorkerStart + Send> {
202        Box::new(Self {
203            f: self.f.clone(),
204            _t: PhantomData,
205        })
206    }
207
208    fn run(&self) -> BoxFuture<'static, Result<(), ()>> {
209        let f = self.f.clone();
210        Box::pin(async move {
211            (f)().await.map_err(|e| {
212                log::error!("On worker start callback failed: {}", e);
213            })
214        })
215    }
216}
217
218pub(crate) trait OnAccept {
219    fn clone_fn(&self) -> Box<dyn OnAccept + Send>;
220
221    fn run(&self, name: Arc<str>, stream: Stream)
222        -> BoxFuture<'static, Result<Stream, ()>>;
223}
224
225pub(super) struct OnAcceptWrapper<F, R, E> {
226    pub(super) f: F,
227    pub(super) _t: PhantomData<(R, E)>,
228}
229
230unsafe impl<F, R, E> Send for OnAcceptWrapper<F, R, E> where F: Send {}
231
232impl<F, R, E> OnAcceptWrapper<F, R, E>
233where
234    F: Fn(Arc<str>, Stream) -> R + Send + Clone + 'static,
235    R: Future<Output = Result<Stream, E>> + 'static,
236    E: fmt::Display + 'static,
237{
238    pub(super) fn create(f: F) -> Box<dyn OnAccept + Send> {
239        Box::new(Self { f, _t: PhantomData })
240    }
241}
242
243impl<F, R, E> OnAccept for OnAcceptWrapper<F, R, E>
244where
245    F: Fn(Arc<str>, Stream) -> R + Send + Clone + 'static,
246    R: Future<Output = Result<Stream, E>> + 'static,
247    E: fmt::Display + 'static,
248{
249    fn clone_fn(&self) -> Box<dyn OnAccept + Send> {
250        Box::new(Self {
251            f: self.f.clone(),
252            _t: PhantomData,
253        })
254    }
255
256    fn run(
257        &self,
258        name: Arc<str>,
259        stream: Stream,
260    ) -> BoxFuture<'static, Result<Stream, ()>> {
261        let f = self.f.clone();
262        Box::pin(async move {
263            (f)(name, stream).await.map_err(|e| {
264                log::error!("On accept callback failed: {}", e);
265            })
266        })
267    }
268}