ntex_server/net/
config.rs

1use std::{cell::Cell, cell::RefCell, fmt, io, marker, mem, net, rc::Rc};
2
3use ntex_io::Io;
4use ntex_service::{IntoServiceFactory, ServiceFactory, cfg::SharedCfg};
5use ntex_util::{HashMap, future::BoxFuture, future::Ready};
6
7use super::factory::{
8    self, BoxServerService, FactoryService, FactoryServiceType, NetService,
9};
10use super::{Token, builder::bind_addr, socket::Listener};
11
12#[derive(Clone, Debug)]
13pub struct Config(Rc<InnerServiceConfig>);
14
15#[derive(Debug)]
16pub(super) struct InnerServiceConfig {
17    pub(super) config: Cell<Option<SharedCfg>>,
18}
19
20impl Default for Config {
21    fn default() -> Self {
22        Self(Rc::new(InnerServiceConfig {
23            config: Cell::new(None),
24        }))
25    }
26}
27
28impl Config {
29    /// Set io config for the service.
30    pub fn config<T: Into<SharedCfg>>(&self, cfg: T) -> &Self {
31        self.0.config.set(Some(cfg.into()));
32        self
33    }
34
35    pub(super) fn get_config(&self) -> Option<SharedCfg> {
36        self.0.config.get()
37    }
38}
39
40#[derive(Clone, Debug)]
41pub struct ServiceConfig(pub(super) Rc<RefCell<ServiceConfigInner>>);
42
43#[derive(Debug)]
44struct Socket {
45    name: String,
46    config: SharedCfg,
47    sockets: Vec<(Token, Listener, SharedCfg)>,
48}
49
50pub(super) struct ServiceConfigInner {
51    token: Token,
52    apply: Option<Box<dyn OnWorkerStart>>,
53    sockets: Vec<Socket>,
54    backlog: i32,
55}
56
57impl fmt::Debug for ServiceConfigInner {
58    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
59        f.debug_struct("ServiceConfigInner")
60            .field("token", &self.token)
61            .field("backlog", &self.backlog)
62            .field("sockets", &self.sockets)
63            .finish()
64    }
65}
66
67impl ServiceConfig {
68    pub(super) fn new(token: Token, backlog: i32) -> Self {
69        ServiceConfig(Rc::new(RefCell::new(ServiceConfigInner {
70            token,
71            backlog,
72            sockets: Vec::new(),
73            apply: Some(OnWorkerStartWrapper::create(|_| {
74                not_configured();
75                Ready::Ok::<_, &str>(())
76            })),
77        })))
78    }
79
80    /// Add new service to the server.
81    pub fn bind<U, N: AsRef<str>>(&self, name: N, addr: U) -> io::Result<&Self>
82    where
83        U: net::ToSocketAddrs,
84    {
85        let mut inner = self.0.borrow_mut();
86
87        let sockets = bind_addr(addr, inner.backlog)?;
88        let socket = Socket {
89            name: name.as_ref().to_string(),
90            config: SharedCfg::default(),
91            sockets: sockets
92                .into_iter()
93                .map(|lst| {
94                    (
95                        inner.token.next(),
96                        Listener::from_tcp(lst),
97                        SharedCfg::default(),
98                    )
99                })
100                .collect(),
101        };
102        inner.sockets.push(socket);
103
104        Ok(self)
105    }
106
107    /// Add new service to the server.
108    pub fn listen<N: AsRef<str>>(&self, name: N, lst: net::TcpListener) -> &Self {
109        let mut inner = self.0.borrow_mut();
110        let socket = Socket {
111            name: name.as_ref().to_string(),
112            config: SharedCfg::default(),
113            sockets: vec![(
114                inner.token.next(),
115                Listener::from_tcp(lst),
116                SharedCfg::default(),
117            )],
118        };
119        inner.sockets.push(socket);
120
121        self
122    }
123
124    /// Set io config for configured service.
125    pub fn config<N, T>(&self, name: N, cfg: T) -> &Self
126    where
127        N: AsRef<str>,
128        T: Into<SharedCfg>,
129    {
130        let cfg = cfg.into();
131        let mut inner = self.0.borrow_mut();
132        for sock in &mut inner.sockets {
133            if sock.name == name.as_ref() {
134                sock.config = cfg;
135                for item in &mut sock.sockets {
136                    item.2 = cfg;
137                }
138            }
139        }
140        self
141    }
142
143    /// Register async service configuration function.
144    ///
145    /// This function get called during worker runtime configuration stage.
146    /// It get executed in the worker thread.
147    pub fn on_worker_start<F, E>(&self, f: F) -> &Self
148    where
149        F: AsyncFn(ServiceRuntime) -> Result<(), E> + Send + Clone + 'static,
150        E: fmt::Display + 'static,
151    {
152        self.0.borrow_mut().apply = Some(OnWorkerStartWrapper::create(f));
153        self
154    }
155
156    pub(super) fn into_factory(
157        self,
158    ) -> (Token, Vec<(Token, String, Listener)>, FactoryServiceType) {
159        let mut inner = self.0.borrow_mut();
160
161        let mut sockets = Vec::new();
162        let mut names = HashMap::default();
163        for (idx, s) in mem::take(&mut inner.sockets).into_iter().enumerate() {
164            names.insert(
165                s.name.clone(),
166                Entry {
167                    idx,
168                    name: s.name.clone(),
169                    config: s.config,
170                    tokens: s
171                        .sockets
172                        .iter()
173                        .map(|(token, _, tag)| (*token, *tag))
174                        .collect(),
175                },
176            );
177
178            sockets.extend(
179                s.sockets
180                    .into_iter()
181                    .map(|(token, lst, _)| (token, s.name.clone(), lst)),
182            );
183        }
184
185        (
186            inner.token,
187            sockets,
188            Box::new(ConfiguredService {
189                names,
190                rt: inner.apply.take().unwrap(),
191            }),
192        )
193    }
194}
195
196struct ConfiguredService {
197    rt: Box<dyn OnWorkerStart>,
198    names: HashMap<String, Entry>,
199}
200
201impl FactoryService for ConfiguredService {
202    fn clone_factory(&self) -> FactoryServiceType {
203        Box::new(Self {
204            rt: self.rt.clone(),
205            names: self.names.clone(),
206        })
207    }
208
209    fn create(&self) -> BoxFuture<'static, Result<Vec<NetService>, ()>> {
210        // configure services
211        let rt = ServiceRuntime::new(self.names.clone());
212        let cfg_fut = self.rt.run(ServiceRuntime(rt.0.clone()));
213
214        // construct services
215        Box::pin(async move {
216            cfg_fut.await?;
217            rt.validate();
218
219            let names = mem::take(&mut rt.0.borrow_mut().names);
220            let mut services = mem::take(&mut rt.0.borrow_mut().services);
221
222            let mut res = Vec::new();
223            while let Some(svc) = services.pop() {
224                if let Some(svc) = svc {
225                    for entry in names.values() {
226                        if entry.idx == services.len() {
227                            res.push(NetService {
228                                config: entry.config,
229                                name: std::sync::Arc::from(entry.name.clone()),
230                                tokens: entry.tokens.clone(),
231                                factory: svc,
232                            });
233                            break;
234                        }
235                    }
236                }
237            }
238            Ok(res)
239        })
240    }
241}
242
243fn not_configured() {
244    log::error!("Service is not configured");
245}
246
247pub struct ServiceRuntime(Rc<RefCell<ServiceRuntimeInner>>);
248
249#[derive(Debug, Clone)]
250struct Entry {
251    idx: usize,
252    name: String,
253    config: SharedCfg,
254    tokens: Vec<(Token, SharedCfg)>,
255}
256
257struct ServiceRuntimeInner {
258    names: HashMap<String, Entry>,
259    services: Vec<Option<BoxServerService>>,
260}
261
262impl fmt::Debug for ServiceRuntime {
263    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
264        let inner = self.0.borrow();
265        f.debug_struct("ServiceRuntimer")
266            .field("names", &inner.names)
267            .field("services", &inner.services)
268            .finish()
269    }
270}
271
272impl ServiceRuntime {
273    fn new(names: HashMap<String, Entry>) -> Self {
274        ServiceRuntime(Rc::new(RefCell::new(ServiceRuntimeInner {
275            services: (0..names.len()).map(|_| None).collect(),
276            names,
277        })))
278    }
279
280    fn validate(&self) {
281        let inner = self.0.as_ref().borrow();
282        for (name, item) in &inner.names {
283            if inner.services[item.idx].is_none() {
284                log::error!("Service {name:?} is not configured");
285            }
286        }
287    }
288
289    /// Register service.
290    ///
291    /// Name of the service must be registered during configuration stage with
292    /// *ServiceConfig::bind()* or *ServiceConfig::listen()* methods.
293    pub fn service<T, F>(&self, name: &str, service: F) -> &Self
294    where
295        F: IntoServiceFactory<T, Io, SharedCfg>,
296        T: ServiceFactory<Io, SharedCfg> + 'static,
297        T::Service: 'static,
298        T::InitError: fmt::Debug,
299    {
300        let mut inner = self.0.borrow_mut();
301        if let Some(entry) = inner.names.get_mut(name) {
302            let idx = entry.idx;
303            inner.services[idx] = Some(factory::create_boxed_factory(
304                name.to_string(),
305                service.into_factory(),
306            ));
307        } else {
308            panic!("Unknown service: {name:?}");
309        }
310        self
311    }
312
313    /// Set io config for configured service.
314    pub fn config<T: Into<SharedCfg>>(&self, name: &str, cfg: T) -> &Self {
315        let mut inner = self.0.borrow_mut();
316        if let Some(entry) = inner.names.get_mut(name) {
317            entry.config = cfg.into();
318        }
319        self
320    }
321}
322
323trait OnWorkerStart: Send {
324    fn clone(&self) -> Box<dyn OnWorkerStart>;
325
326    fn run(&self, rt: ServiceRuntime) -> BoxFuture<'static, Result<(), ()>>;
327}
328
329struct OnWorkerStartWrapper<F, E> {
330    pub(super) f: F,
331    pub(super) _t: marker::PhantomData<E>,
332}
333
334impl<F, E> OnWorkerStartWrapper<F, E>
335where
336    F: AsyncFn(ServiceRuntime) -> Result<(), E> + Send + Clone + 'static,
337    E: fmt::Display + 'static,
338{
339    pub(super) fn create(f: F) -> Box<dyn OnWorkerStart + Send> {
340        Box::new(Self {
341            f,
342            _t: marker::PhantomData,
343        })
344    }
345}
346
347// SAFETY: Send cannot be provided authomatically because of R param
348// but R always get executed in one thread and never leave it
349unsafe impl<F, E> Send for OnWorkerStartWrapper<F, E> where F: Send {}
350
351impl<F, E> OnWorkerStart for OnWorkerStartWrapper<F, E>
352where
353    F: AsyncFn(ServiceRuntime) -> Result<(), E> + Send + Clone + 'static,
354    E: fmt::Display + 'static,
355{
356    fn clone(&self) -> Box<dyn OnWorkerStart> {
357        Box::new(Self {
358            f: self.f.clone(),
359            _t: marker::PhantomData,
360        })
361    }
362
363    fn run(&self, rt: ServiceRuntime) -> BoxFuture<'static, Result<(), ()>> {
364        let f = self.f.clone();
365        Box::pin(async move {
366            (f)(rt).await.map_err(|e| {
367                log::error!("On worker start callback failed: {e}");
368            })
369        })
370    }
371}