ntex_server/net/
config.rs

1use std::{cell::Cell, cell::RefCell, fmt, future::Future, io, marker, mem, net, rc::Rc};
2
3use ntex_bytes::PoolId;
4use ntex_net::Io;
5use ntex_service::{IntoServiceFactory, ServiceFactory};
6use ntex_util::{future::BoxFuture, future::Ready, HashMap};
7
8use super::factory::{
9    self, BoxServerService, FactoryService, FactoryServiceType, NetService,
10};
11use super::{builder::bind_addr, socket::Listener, Token};
12
13#[derive(Clone, Debug)]
14pub struct Config(Rc<InnerServiceConfig>);
15
16#[derive(Debug)]
17pub(super) struct InnerServiceConfig {
18    pub(super) pool: Cell<PoolId>,
19    pub(super) tag: Cell<Option<&'static str>>,
20}
21
22impl Default for Config {
23    fn default() -> Self {
24        Self(Rc::new(InnerServiceConfig {
25            pool: Cell::new(PoolId::DEFAULT),
26            tag: Cell::new(None),
27        }))
28    }
29}
30
31impl Config {
32    /// Set memory pool for the service.
33    ///
34    /// Use specified memory pool for memory allocations.
35    pub fn memory_pool(&self, id: PoolId) -> &Self {
36        self.0.pool.set(id);
37        self
38    }
39
40    /// Set io tag for the service.
41    pub fn tag(&self, tag: &'static str) -> &Self {
42        self.0.tag.set(Some(tag));
43        self
44    }
45
46    pub(super) fn get_pool_id(&self) -> PoolId {
47        self.0.pool.get()
48    }
49
50    pub(super) fn get_tag(&self) -> Option<&'static str> {
51        self.0.tag.get()
52    }
53}
54
55#[derive(Clone, Debug)]
56pub struct ServiceConfig(pub(super) Rc<RefCell<ServiceConfigInner>>);
57
58#[derive(Debug)]
59struct Socket {
60    name: String,
61    sockets: Vec<(Token, Listener, &'static str)>,
62}
63
64pub(super) struct ServiceConfigInner {
65    token: Token,
66    apply: Option<Box<dyn OnWorkerStart>>,
67    sockets: Vec<Socket>,
68    backlog: i32,
69}
70
71impl fmt::Debug for ServiceConfigInner {
72    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
73        f.debug_struct("ServiceConfigInner")
74            .field("token", &self.token)
75            .field("backlog", &self.backlog)
76            .field("sockets", &self.sockets)
77            .finish()
78    }
79}
80
81impl ServiceConfig {
82    pub(super) fn new(token: Token, backlog: i32) -> Self {
83        ServiceConfig(Rc::new(RefCell::new(ServiceConfigInner {
84            token,
85            backlog,
86            sockets: Vec::new(),
87            apply: Some(OnWorkerStartWrapper::create(|_| {
88                not_configured();
89                Ready::Ok::<_, &str>(())
90            })),
91        })))
92    }
93
94    /// Add new service to the server.
95    pub fn bind<U, N: AsRef<str>>(&self, name: N, addr: U) -> io::Result<&Self>
96    where
97        U: net::ToSocketAddrs,
98    {
99        let mut inner = self.0.borrow_mut();
100
101        let sockets = bind_addr(addr, inner.backlog)?;
102        let socket = Socket {
103            name: name.as_ref().to_string(),
104            sockets: sockets
105                .into_iter()
106                .map(|lst| (inner.token.next(), Listener::from_tcp(lst), ""))
107                .collect(),
108        };
109        inner.sockets.push(socket);
110
111        Ok(self)
112    }
113
114    /// Add new service to the server.
115    pub fn listen<N: AsRef<str>>(&self, name: N, lst: net::TcpListener) -> &Self {
116        let mut inner = self.0.borrow_mut();
117        let socket = Socket {
118            name: name.as_ref().to_string(),
119            sockets: vec![(inner.token.next(), Listener::from_tcp(lst), "")],
120        };
121        inner.sockets.push(socket);
122
123        self
124    }
125
126    /// Set io tag for configured service.
127    pub fn set_tag<N: AsRef<str>>(&self, name: N, tag: &'static str) -> &Self {
128        let mut inner = self.0.borrow_mut();
129        for sock in &mut inner.sockets {
130            if sock.name == name.as_ref() {
131                for item in &mut sock.sockets {
132                    item.2 = tag;
133                }
134            }
135        }
136        self
137    }
138
139    /// Register async service configuration function.
140    ///
141    /// This function get called during worker runtime configuration stage.
142    /// It get executed in the worker thread.
143    pub fn on_worker_start<F, R, E>(&self, f: F) -> &Self
144    where
145        F: Fn(ServiceRuntime) -> R + Send + Clone + 'static,
146        R: Future<Output = Result<(), E>> + 'static,
147        E: fmt::Display + 'static,
148    {
149        self.0.borrow_mut().apply = Some(OnWorkerStartWrapper::create(f));
150        self
151    }
152
153    pub(super) fn into_factory(
154        self,
155    ) -> (Token, Vec<(Token, String, Listener)>, FactoryServiceType) {
156        let mut inner = self.0.borrow_mut();
157
158        let mut sockets = Vec::new();
159        let mut names = HashMap::default();
160        for (idx, s) in mem::take(&mut inner.sockets).into_iter().enumerate() {
161            names.insert(
162                s.name.clone(),
163                Entry {
164                    idx,
165                    pool: PoolId::DEFAULT,
166                    name: s.name.clone(),
167                    tokens: s
168                        .sockets
169                        .iter()
170                        .map(|(token, _, tag)| (*token, *tag))
171                        .collect(),
172                },
173            );
174
175            sockets.extend(
176                s.sockets
177                    .into_iter()
178                    .map(|(token, lst, _)| (token, s.name.clone(), lst)),
179            );
180        }
181
182        (
183            inner.token,
184            sockets,
185            Box::new(ConfiguredService {
186                rt: inner.apply.take().unwrap(),
187                names,
188            }),
189        )
190    }
191}
192
193struct ConfiguredService {
194    rt: Box<dyn OnWorkerStart>,
195    names: HashMap<String, Entry>,
196}
197
198impl FactoryService for ConfiguredService {
199    fn clone_factory(&self) -> FactoryServiceType {
200        Box::new(Self {
201            rt: self.rt.clone(),
202            names: self.names.clone(),
203        })
204    }
205
206    fn create(&self) -> BoxFuture<'static, Result<Vec<NetService>, ()>> {
207        // configure services
208        let rt = ServiceRuntime::new(self.names.clone());
209        let cfg_fut = self.rt.run(ServiceRuntime(rt.0.clone()));
210
211        // construct services
212        Box::pin(async move {
213            cfg_fut.await?;
214            rt.validate();
215
216            let names = mem::take(&mut rt.0.borrow_mut().names);
217            let mut services = mem::take(&mut rt.0.borrow_mut().services);
218
219            let mut res = Vec::new();
220            while let Some(svc) = services.pop() {
221                if let Some(svc) = svc {
222                    for entry in names.values() {
223                        if entry.idx == services.len() {
224                            res.push(NetService {
225                                name: std::sync::Arc::from(entry.name.clone()),
226                                pool: entry.pool,
227                                tokens: entry.tokens.clone(),
228                                factory: svc,
229                            });
230                            break;
231                        }
232                    }
233                }
234            }
235            Ok(res)
236        })
237    }
238}
239
240fn not_configured() {
241    log::error!("Service is not configured");
242}
243
244pub struct ServiceRuntime(Rc<RefCell<ServiceRuntimeInner>>);
245
246#[derive(Debug, Clone)]
247struct Entry {
248    idx: usize,
249    pool: PoolId,
250    name: String,
251    tokens: Vec<(Token, &'static str)>,
252}
253
254struct ServiceRuntimeInner {
255    names: HashMap<String, Entry>,
256    services: Vec<Option<BoxServerService>>,
257}
258
259impl fmt::Debug for ServiceRuntime {
260    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
261        let inner = self.0.borrow();
262        f.debug_struct("ServiceRuntimer")
263            .field("names", &inner.names)
264            .field("services", &inner.services)
265            .finish()
266    }
267}
268
269impl ServiceRuntime {
270    fn new(names: HashMap<String, Entry>) -> Self {
271        ServiceRuntime(Rc::new(RefCell::new(ServiceRuntimeInner {
272            services: (0..names.len()).map(|_| None).collect(),
273            names,
274        })))
275    }
276
277    fn validate(&self) {
278        let inner = self.0.as_ref().borrow();
279        for (name, item) in &inner.names {
280            if inner.services[item.idx].is_none() {
281                log::error!("Service {:?} is not configured", name);
282            }
283        }
284    }
285
286    /// Register service.
287    ///
288    /// Name of the service must be registered during configuration stage with
289    /// *ServiceConfig::bind()* or *ServiceConfig::listen()* methods.
290    pub fn service<T, F>(&self, name: &str, service: F)
291    where
292        F: IntoServiceFactory<T, Io>,
293        T: ServiceFactory<Io> + 'static,
294        T::Service: 'static,
295        T::InitError: fmt::Debug,
296    {
297        self.service_in(name, PoolId::P0, service)
298    }
299
300    /// Register service with memory pool.
301    ///
302    /// Name of the service must be registered during configuration stage with
303    /// *ServiceConfig::bind()* or *ServiceConfig::listen()* methods.
304    pub fn service_in<T, F>(&self, name: &str, pool: PoolId, service: F)
305    where
306        F: IntoServiceFactory<T, Io>,
307        T: ServiceFactory<Io> + 'static,
308        T::Service: 'static,
309        T::InitError: fmt::Debug,
310    {
311        let mut inner = self.0.borrow_mut();
312        if let Some(entry) = inner.names.get_mut(name) {
313            let idx = entry.idx;
314            entry.pool = pool;
315            inner.services[idx] = Some(factory::create_boxed_factory(
316                name.to_string(),
317                service.into_factory(),
318            ));
319        } else {
320            panic!("Unknown service: {:?}", name);
321        }
322    }
323}
324
325trait OnWorkerStart: Send {
326    fn clone(&self) -> Box<dyn OnWorkerStart>;
327
328    fn run(&self, rt: ServiceRuntime) -> BoxFuture<'static, Result<(), ()>>;
329}
330
331struct OnWorkerStartWrapper<F, R, E> {
332    pub(super) f: F,
333    pub(super) _t: marker::PhantomData<(R, E)>,
334}
335
336impl<F, R, E> OnWorkerStartWrapper<F, R, E>
337where
338    F: Fn(ServiceRuntime) -> R + Send + Clone + 'static,
339    R: Future<Output = Result<(), E>> + 'static,
340    E: fmt::Display + 'static,
341{
342    pub(super) fn create(f: F) -> Box<dyn OnWorkerStart + Send> {
343        Box::new(Self {
344            f,
345            _t: marker::PhantomData,
346        })
347    }
348}
349
350// SAFETY: Send cannot be provided authomatically because of R param
351// but R always get executed in one thread and never leave it
352unsafe impl<F, R, E> Send for OnWorkerStartWrapper<F, R, E> where F: Send {}
353
354impl<F, R, E> OnWorkerStart for OnWorkerStartWrapper<F, R, E>
355where
356    F: Fn(ServiceRuntime) -> R + Send + Clone + 'static,
357    R: Future<Output = Result<(), E>> + 'static,
358    E: fmt::Display + 'static,
359{
360    fn clone(&self) -> Box<dyn OnWorkerStart> {
361        Box::new(Self {
362            f: self.f.clone(),
363            _t: marker::PhantomData,
364        })
365    }
366
367    fn run(&self, rt: ServiceRuntime) -> BoxFuture<'static, Result<(), ()>> {
368        let f = self.f.clone();
369        Box::pin(async move {
370            (f)(rt).await.map_err(|e| {
371                log::error!("On worker start callback failed: {}", e);
372            })
373        })
374    }
375}