ntex_server/net/
config.rs

1use std::{cell::Cell, cell::RefCell, fmt, future::Future, io, marker, mem, net, rc::Rc};
2
3use ntex_io::Io;
4use ntex_service::{IntoServiceFactory, ServiceFactory, cfg::SharedCfg};
5use ntex_util::{HashMap, future::BoxFuture, future::Ready};
6
7use super::factory::{
8    self, BoxServerService, FactoryService, FactoryServiceType, NetService,
9};
10use super::{Token, builder::bind_addr, socket::Listener};
11
12#[derive(Clone, Debug)]
13pub struct Config(Rc<InnerServiceConfig>);
14
15#[derive(Debug)]
16pub(super) struct InnerServiceConfig {
17    pub(super) config: Cell<Option<SharedCfg>>,
18}
19
20impl Default for Config {
21    fn default() -> Self {
22        Self(Rc::new(InnerServiceConfig {
23            config: Cell::new(None),
24        }))
25    }
26}
27
28impl Config {
29    /// Set io config for the service.
30    pub fn config<T: Into<SharedCfg>>(&self, cfg: T) -> &Self {
31        self.0.config.set(Some(cfg.into()));
32        self
33    }
34
35    pub(super) fn get_config(&self) -> Option<SharedCfg> {
36        self.0.config.get()
37    }
38}
39
40#[derive(Clone, Debug)]
41pub struct ServiceConfig(pub(super) Rc<RefCell<ServiceConfigInner>>);
42
43#[derive(Debug)]
44struct Socket {
45    name: String,
46    config: SharedCfg,
47    sockets: Vec<(Token, Listener, SharedCfg)>,
48}
49
50pub(super) struct ServiceConfigInner {
51    token: Token,
52    apply: Option<Box<dyn OnWorkerStart>>,
53    sockets: Vec<Socket>,
54    backlog: i32,
55}
56
57impl fmt::Debug for ServiceConfigInner {
58    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
59        f.debug_struct("ServiceConfigInner")
60            .field("token", &self.token)
61            .field("backlog", &self.backlog)
62            .field("sockets", &self.sockets)
63            .finish()
64    }
65}
66
67impl ServiceConfig {
68    pub(super) fn new(token: Token, backlog: i32) -> Self {
69        ServiceConfig(Rc::new(RefCell::new(ServiceConfigInner {
70            token,
71            backlog,
72            sockets: Vec::new(),
73            apply: Some(OnWorkerStartWrapper::create(|_| {
74                not_configured();
75                Ready::Ok::<_, &str>(())
76            })),
77        })))
78    }
79
80    /// Add new service to the server.
81    pub fn bind<U, N: AsRef<str>>(&self, name: N, addr: U) -> io::Result<&Self>
82    where
83        U: net::ToSocketAddrs,
84    {
85        let mut inner = self.0.borrow_mut();
86
87        let sockets = bind_addr(addr, inner.backlog)?;
88        let socket = Socket {
89            name: name.as_ref().to_string(),
90            config: SharedCfg::default(),
91            sockets: sockets
92                .into_iter()
93                .map(|lst| {
94                    (
95                        inner.token.next(),
96                        Listener::from_tcp(lst),
97                        SharedCfg::default(),
98                    )
99                })
100                .collect(),
101        };
102        inner.sockets.push(socket);
103
104        Ok(self)
105    }
106
107    /// Add new service to the server.
108    pub fn listen<N: AsRef<str>>(&self, name: N, lst: net::TcpListener) -> &Self {
109        let mut inner = self.0.borrow_mut();
110        let socket = Socket {
111            name: name.as_ref().to_string(),
112            config: SharedCfg::default(),
113            sockets: vec![(
114                inner.token.next(),
115                Listener::from_tcp(lst),
116                SharedCfg::default(),
117            )],
118        };
119        inner.sockets.push(socket);
120
121        self
122    }
123
124    /// Set io config for configured service.
125    pub fn config<N, T>(&self, name: N, cfg: T) -> &Self
126    where
127        N: AsRef<str>,
128        T: Into<SharedCfg>,
129    {
130        let cfg = cfg.into();
131        let mut inner = self.0.borrow_mut();
132        for sock in &mut inner.sockets {
133            if sock.name == name.as_ref() {
134                sock.config = cfg;
135                for item in &mut sock.sockets {
136                    item.2 = cfg;
137                }
138            }
139        }
140        self
141    }
142
143    /// Register async service configuration function.
144    ///
145    /// This function get called during worker runtime configuration stage.
146    /// It get executed in the worker thread.
147    pub fn on_worker_start<F, R, E>(&self, f: F) -> &Self
148    where
149        F: Fn(ServiceRuntime) -> R + Send + Clone + 'static,
150        R: Future<Output = Result<(), E>> + 'static,
151        E: fmt::Display + 'static,
152    {
153        self.0.borrow_mut().apply = Some(OnWorkerStartWrapper::create(f));
154        self
155    }
156
157    pub(super) fn into_factory(
158        self,
159    ) -> (Token, Vec<(Token, String, Listener)>, FactoryServiceType) {
160        let mut inner = self.0.borrow_mut();
161
162        let mut sockets = Vec::new();
163        let mut names = HashMap::default();
164        for (idx, s) in mem::take(&mut inner.sockets).into_iter().enumerate() {
165            names.insert(
166                s.name.clone(),
167                Entry {
168                    idx,
169                    name: s.name.clone(),
170                    config: s.config,
171                    tokens: s
172                        .sockets
173                        .iter()
174                        .map(|(token, _, tag)| (*token, *tag))
175                        .collect(),
176                },
177            );
178
179            sockets.extend(
180                s.sockets
181                    .into_iter()
182                    .map(|(token, lst, _)| (token, s.name.clone(), lst)),
183            );
184        }
185
186        (
187            inner.token,
188            sockets,
189            Box::new(ConfiguredService {
190                names,
191                rt: inner.apply.take().unwrap(),
192            }),
193        )
194    }
195}
196
197struct ConfiguredService {
198    rt: Box<dyn OnWorkerStart>,
199    names: HashMap<String, Entry>,
200}
201
202impl FactoryService for ConfiguredService {
203    fn clone_factory(&self) -> FactoryServiceType {
204        Box::new(Self {
205            rt: self.rt.clone(),
206            names: self.names.clone(),
207        })
208    }
209
210    fn create(&self) -> BoxFuture<'static, Result<Vec<NetService>, ()>> {
211        // configure services
212        let rt = ServiceRuntime::new(self.names.clone());
213        let cfg_fut = self.rt.run(ServiceRuntime(rt.0.clone()));
214
215        // construct services
216        Box::pin(async move {
217            cfg_fut.await?;
218            rt.validate();
219
220            let names = mem::take(&mut rt.0.borrow_mut().names);
221            let mut services = mem::take(&mut rt.0.borrow_mut().services);
222
223            let mut res = Vec::new();
224            while let Some(svc) = services.pop() {
225                if let Some(svc) = svc {
226                    for entry in names.values() {
227                        if entry.idx == services.len() {
228                            res.push(NetService {
229                                config: entry.config,
230                                name: std::sync::Arc::from(entry.name.clone()),
231                                tokens: entry.tokens.clone(),
232                                factory: svc,
233                            });
234                            break;
235                        }
236                    }
237                }
238            }
239            Ok(res)
240        })
241    }
242}
243
244fn not_configured() {
245    log::error!("Service is not configured");
246}
247
248pub struct ServiceRuntime(Rc<RefCell<ServiceRuntimeInner>>);
249
250#[derive(Debug, Clone)]
251struct Entry {
252    idx: usize,
253    name: String,
254    config: SharedCfg,
255    tokens: Vec<(Token, SharedCfg)>,
256}
257
258struct ServiceRuntimeInner {
259    names: HashMap<String, Entry>,
260    services: Vec<Option<BoxServerService>>,
261}
262
263impl fmt::Debug for ServiceRuntime {
264    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
265        let inner = self.0.borrow();
266        f.debug_struct("ServiceRuntimer")
267            .field("names", &inner.names)
268            .field("services", &inner.services)
269            .finish()
270    }
271}
272
273impl ServiceRuntime {
274    fn new(names: HashMap<String, Entry>) -> Self {
275        ServiceRuntime(Rc::new(RefCell::new(ServiceRuntimeInner {
276            services: (0..names.len()).map(|_| None).collect(),
277            names,
278        })))
279    }
280
281    fn validate(&self) {
282        let inner = self.0.as_ref().borrow();
283        for (name, item) in &inner.names {
284            if inner.services[item.idx].is_none() {
285                log::error!("Service {name:?} is not configured");
286            }
287        }
288    }
289
290    /// Register service.
291    ///
292    /// Name of the service must be registered during configuration stage with
293    /// *ServiceConfig::bind()* or *ServiceConfig::listen()* methods.
294    pub fn service<T, F>(&self, name: &str, service: F)
295    where
296        F: IntoServiceFactory<T, Io, SharedCfg>,
297        T: ServiceFactory<Io, SharedCfg> + 'static,
298        T::Service: 'static,
299        T::InitError: fmt::Debug,
300    {
301        let mut inner = self.0.borrow_mut();
302        if let Some(entry) = inner.names.get_mut(name) {
303            let idx = entry.idx;
304            inner.services[idx] = Some(factory::create_boxed_factory(
305                name.to_string(),
306                service.into_factory(),
307            ));
308        } else {
309            panic!("Unknown service: {name:?}");
310        }
311    }
312
313    /// Set io config for configured service.
314    pub fn config(&self, name: &str, cfg: SharedCfg) {
315        let mut inner = self.0.borrow_mut();
316        if let Some(entry) = inner.names.get_mut(name) {
317            entry.config = cfg;
318        }
319    }
320}
321
322trait OnWorkerStart: Send {
323    fn clone(&self) -> Box<dyn OnWorkerStart>;
324
325    fn run(&self, rt: ServiceRuntime) -> BoxFuture<'static, Result<(), ()>>;
326}
327
328struct OnWorkerStartWrapper<F, R, E> {
329    pub(super) f: F,
330    pub(super) _t: marker::PhantomData<(R, E)>,
331}
332
333impl<F, R, E> OnWorkerStartWrapper<F, R, E>
334where
335    F: Fn(ServiceRuntime) -> R + Send + Clone + 'static,
336    R: Future<Output = Result<(), E>> + 'static,
337    E: fmt::Display + 'static,
338{
339    pub(super) fn create(f: F) -> Box<dyn OnWorkerStart + Send> {
340        Box::new(Self {
341            f,
342            _t: marker::PhantomData,
343        })
344    }
345}
346
347// SAFETY: Send cannot be provided authomatically because of R param
348// but R always get executed in one thread and never leave it
349unsafe impl<F, R, E> Send for OnWorkerStartWrapper<F, R, E> where F: Send {}
350
351impl<F, R, E> OnWorkerStart for OnWorkerStartWrapper<F, R, E>
352where
353    F: Fn(ServiceRuntime) -> R + Send + Clone + 'static,
354    R: Future<Output = Result<(), E>> + 'static,
355    E: fmt::Display + 'static,
356{
357    fn clone(&self) -> Box<dyn OnWorkerStart> {
358        Box::new(Self {
359            f: self.f.clone(),
360            _t: marker::PhantomData,
361        })
362    }
363
364    fn run(&self, rt: ServiceRuntime) -> BoxFuture<'static, Result<(), ()>> {
365        let f = self.f.clone();
366        Box::pin(async move {
367            (f)(rt).await.map_err(|e| {
368                log::error!("On worker start callback failed: {e}");
369            })
370        })
371    }
372}