Skip to main content

ntex_server/net/
config.rs

1use std::{cell::Cell, cell::RefCell, fmt, io, marker, mem, net, rc::Rc};
2
3use ntex_io::Io;
4use ntex_service::{IntoServiceFactory, ServiceFactory, cfg::SharedCfg};
5use ntex_util::{HashMap, future::BoxFuture, future::Ready};
6
7use super::factory::{
8    self, BoxServerService, FactoryService, FactoryServiceType, NetService,
9};
10use super::{Token, builder::bind_addr, socket::Listener};
11
12#[derive(Clone, Debug)]
13pub struct Config(Rc<InnerServiceConfig>);
14
15#[derive(Debug)]
16pub(super) struct InnerServiceConfig {
17    pub(super) config: Cell<Option<SharedCfg>>,
18}
19
20impl Default for Config {
21    fn default() -> Self {
22        Self(Rc::new(InnerServiceConfig {
23            config: Cell::new(None),
24        }))
25    }
26}
27
28impl Config {
29    /// Set io config for the service.
30    pub fn config<T: Into<SharedCfg>>(&self, cfg: T) -> &Self {
31        self.0.config.set(Some(cfg.into()));
32        self
33    }
34
35    pub(super) fn get_config(&self) -> Option<SharedCfg> {
36        self.0.config.get()
37    }
38}
39
40#[derive(Clone, Debug)]
41pub struct ServiceConfig(pub(super) Rc<RefCell<ServiceConfigInner>>);
42
43#[derive(Debug)]
44struct Socket {
45    name: String,
46    config: SharedCfg,
47    sockets: Vec<(Token, Listener, SharedCfg)>,
48}
49
50pub(super) struct ServiceConfigInner {
51    token: Token,
52    on_start_set: bool,
53    on_start: Vec<Box<dyn OnWorkerStart>>,
54    sockets: Vec<Socket>,
55    backlog: i32,
56}
57
58impl fmt::Debug for ServiceConfigInner {
59    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
60        f.debug_struct("ServiceConfigInner")
61            .field("token", &self.token)
62            .field("backlog", &self.backlog)
63            .field("sockets", &self.sockets)
64            .finish()
65    }
66}
67
68impl ServiceConfig {
69    pub(super) fn new(token: Token, backlog: i32) -> Self {
70        ServiceConfig(Rc::new(RefCell::new(ServiceConfigInner {
71            token,
72            backlog,
73            sockets: Vec::new(),
74            on_start_set: false,
75            on_start: vec![OnWorkerStartWrapper::create(|_| {
76                not_configured();
77                Ready::Ok::<_, &str>(())
78            })],
79        })))
80    }
81
82    /// Add new service to the server.
83    pub fn bind<U, N: AsRef<str>>(&self, name: N, addr: U) -> io::Result<&Self>
84    where
85        U: net::ToSocketAddrs,
86    {
87        let mut inner = self.0.borrow_mut();
88
89        let sockets = bind_addr(addr, inner.backlog)?;
90        let socket = Socket {
91            name: name.as_ref().to_string(),
92            config: SharedCfg::default(),
93            sockets: sockets
94                .into_iter()
95                .map(|lst| {
96                    (
97                        inner.token.next(),
98                        Listener::from_tcp(lst),
99                        SharedCfg::default(),
100                    )
101                })
102                .collect(),
103        };
104        inner.sockets.push(socket);
105
106        Ok(self)
107    }
108
109    /// Add new service to the server.
110    pub fn listen<N: AsRef<str>>(&self, name: N, lst: net::TcpListener) -> &Self {
111        let mut inner = self.0.borrow_mut();
112        let socket = Socket {
113            name: name.as_ref().to_string(),
114            config: SharedCfg::default(),
115            sockets: vec![(
116                inner.token.next(),
117                Listener::from_tcp(lst),
118                SharedCfg::default(),
119            )],
120        };
121        inner.sockets.push(socket);
122
123        self
124    }
125
126    /// Set io config for configured service.
127    pub fn config<N, T>(&self, name: N, cfg: T) -> &Self
128    where
129        N: AsRef<str>,
130        T: Into<SharedCfg>,
131    {
132        let cfg = cfg.into();
133        let mut inner = self.0.borrow_mut();
134        for sock in &mut inner.sockets {
135            if sock.name == name.as_ref() {
136                sock.config = cfg;
137                for item in &mut sock.sockets {
138                    item.2 = cfg;
139                }
140            }
141        }
142        self
143    }
144
145    /// Register async service configuration function.
146    ///
147    /// This function get called during worker runtime configuration stage.
148    /// It get executed in the worker thread.
149    pub fn on_worker_start<F, E>(&self, f: F) -> &Self
150    where
151        F: AsyncFn(ServiceRuntime) -> Result<(), E> + Send + Clone + 'static,
152        E: fmt::Display + 'static,
153    {
154        let mut inner = self.0.borrow_mut();
155        if !inner.on_start_set {
156            inner.on_start.clear();
157            inner.on_start_set = true;
158        }
159        inner.on_start.push(OnWorkerStartWrapper::create(f));
160        self
161    }
162
163    pub(super) fn into_factory(
164        self,
165    ) -> (Token, Vec<(Token, String, Listener)>, FactoryServiceType) {
166        let mut inner = self.0.borrow_mut();
167
168        let mut sockets = Vec::new();
169        let mut names = HashMap::default();
170        for (idx, s) in mem::take(&mut inner.sockets).into_iter().enumerate() {
171            names.insert(
172                s.name.clone(),
173                Entry {
174                    idx,
175                    name: s.name.clone(),
176                    config: s.config,
177                    tokens: s
178                        .sockets
179                        .iter()
180                        .map(|(token, _, tag)| (*token, *tag))
181                        .collect(),
182                },
183            );
184
185            sockets.extend(
186                s.sockets
187                    .into_iter()
188                    .map(|(token, lst, _)| (token, s.name.clone(), lst)),
189            );
190        }
191
192        (
193            inner.token,
194            sockets,
195            Box::new(ConfiguredService {
196                names,
197                on_start: mem::take(&mut inner.on_start),
198            }),
199        )
200    }
201}
202
203struct ConfiguredService {
204    names: HashMap<String, Entry>,
205    on_start: Vec<Box<dyn OnWorkerStart>>,
206}
207
208impl FactoryService for ConfiguredService {
209    fn clone_factory(&self) -> FactoryServiceType {
210        Box::new(Self {
211            names: self.names.clone(),
212            on_start: self.on_start.iter().map(|cb| (*cb).clone()).collect(),
213        })
214    }
215
216    fn create(&self) -> BoxFuture<'static, Result<Vec<NetService>, ()>> {
217        // configure services
218        let rt = ServiceRuntime::new(self.names.clone());
219        let on_start: Vec<_> = self
220            .on_start
221            .iter()
222            .map(|cb| cb.run(ServiceRuntime(rt.0.clone())))
223            .collect();
224
225        // construct services
226        Box::pin(async move {
227            for fut in on_start {
228                fut.await?;
229            }
230            rt.validate();
231
232            let names = mem::take(&mut rt.0.borrow_mut().names);
233            let mut services = mem::take(&mut rt.0.borrow_mut().services);
234
235            let mut res = Vec::new();
236            while let Some(svc) = services.pop() {
237                if let Some(svc) = svc {
238                    for entry in names.values() {
239                        if entry.idx == services.len() {
240                            res.push(NetService {
241                                config: entry.config,
242                                name: std::sync::Arc::from(entry.name.clone()),
243                                tokens: entry.tokens.clone(),
244                                factory: svc,
245                            });
246                            break;
247                        }
248                    }
249                }
250            }
251            Ok(res)
252        })
253    }
254}
255
256fn not_configured() {
257    log::error!("Service is not configured");
258}
259
260pub struct ServiceRuntime(Rc<RefCell<ServiceRuntimeInner>>);
261
262#[derive(Debug, Clone)]
263struct Entry {
264    idx: usize,
265    name: String,
266    config: SharedCfg,
267    tokens: Vec<(Token, SharedCfg)>,
268}
269
270struct ServiceRuntimeInner {
271    names: HashMap<String, Entry>,
272    services: Vec<Option<BoxServerService>>,
273}
274
275impl fmt::Debug for ServiceRuntime {
276    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
277        let inner = self.0.borrow();
278        f.debug_struct("ServiceRuntimer")
279            .field("names", &inner.names)
280            .field("services", &inner.services)
281            .finish()
282    }
283}
284
285impl ServiceRuntime {
286    fn new(names: HashMap<String, Entry>) -> Self {
287        ServiceRuntime(Rc::new(RefCell::new(ServiceRuntimeInner {
288            services: (0..names.len()).map(|_| None).collect(),
289            names,
290        })))
291    }
292
293    fn validate(&self) {
294        let inner = self.0.as_ref().borrow();
295        for (name, item) in &inner.names {
296            if inner.services[item.idx].is_none() {
297                log::error!("Service {name:?} is not configured");
298            }
299        }
300    }
301
302    /// Register service.
303    ///
304    /// Name of the service must be registered during configuration stage with
305    /// `ServiceConfig::bind()` or `ServiceConfig::listen()` methods.
306    ///
307    /// # Panics
308    ///
309    /// Panics if service with specified name is registered already
310    pub fn service<T, F>(&self, name: &str, service: F) -> &Self
311    where
312        F: IntoServiceFactory<T, Io, SharedCfg>,
313        T: ServiceFactory<Io, SharedCfg> + 'static,
314        T::Service: 'static,
315        T::InitError: fmt::Debug,
316    {
317        let mut inner = self.0.borrow_mut();
318        if let Some(entry) = inner.names.get_mut(name) {
319            let idx = entry.idx;
320            inner.services[idx] = Some(factory::create_boxed_factory(
321                name.to_string(),
322                service.into_factory(),
323            ));
324        } else {
325            panic!("Unknown service: {name:?}");
326        }
327        self
328    }
329
330    /// Set io config for configured service.
331    pub fn config<T: Into<SharedCfg>>(&self, name: &str, cfg: T) -> &Self {
332        let mut inner = self.0.borrow_mut();
333        if let Some(entry) = inner.names.get_mut(name) {
334            entry.config = cfg.into();
335        }
336        self
337    }
338}
339
340trait OnWorkerStart: Send {
341    fn clone(&self) -> Box<dyn OnWorkerStart>;
342
343    fn run(&self, rt: ServiceRuntime) -> BoxFuture<'static, Result<(), ()>>;
344}
345
346struct OnWorkerStartWrapper<F, E> {
347    pub(super) f: F,
348    pub(super) _t: marker::PhantomData<E>,
349}
350
351impl<F, E> OnWorkerStartWrapper<F, E>
352where
353    F: AsyncFn(ServiceRuntime) -> Result<(), E> + Send + Clone + 'static,
354    E: fmt::Display + 'static,
355{
356    pub(super) fn create(f: F) -> Box<dyn OnWorkerStart + Send> {
357        Box::new(Self {
358            f,
359            _t: marker::PhantomData,
360        })
361    }
362}
363
364// SAFETY: Send cannot be provided authomatically because of R param
365// but R always get executed in one thread and never leave it
366unsafe impl<F, E> Send for OnWorkerStartWrapper<F, E> where F: Send {}
367
368impl<F, E> OnWorkerStart for OnWorkerStartWrapper<F, E>
369where
370    F: AsyncFn(ServiceRuntime) -> Result<(), E> + Send + Clone + 'static,
371    E: fmt::Display + 'static,
372{
373    fn clone(&self) -> Box<dyn OnWorkerStart> {
374        Box::new(Self {
375            f: self.f.clone(),
376            _t: marker::PhantomData,
377        })
378    }
379
380    fn run(&self, rt: ServiceRuntime) -> BoxFuture<'static, Result<(), ()>> {
381        let f = self.f.clone();
382        Box::pin(async move {
383            (f)(rt).await.map_err(|e| {
384                log::error!("On worker start callback failed: {e}");
385            })
386        })
387    }
388}