ntex_server/net/
config.rs

1use std::{cell::Cell, cell::RefCell, fmt, future::Future, io, marker, mem, net, rc::Rc};
2
3use ntex_bytes::PoolId;
4use ntex_net::Io;
5use ntex_service::{IntoServiceFactory, ServiceFactory};
6use ntex_util::{future::BoxFuture, future::Ready, HashMap};
7
8use super::factory::{
9    self, BoxServerService, FactoryService, FactoryServiceType, NetService,
10};
11use super::{builder::bind_addr, socket::Listener, Token};
12
13#[derive(Clone, Debug)]
14pub struct Config(Rc<InnerServiceConfig>);
15
16#[derive(Debug)]
17pub(super) struct InnerServiceConfig {
18    pub(super) pool: Cell<PoolId>,
19    pub(super) tag: Cell<Option<&'static str>>,
20}
21
22impl Default for Config {
23    fn default() -> Self {
24        Self(Rc::new(InnerServiceConfig {
25            pool: Cell::new(PoolId::DEFAULT),
26            tag: Cell::new(None),
27        }))
28    }
29}
30
31impl Config {
32    /// Set memory pool for the service.
33    ///
34    /// Use specified memory pool for memory allocations.
35    pub fn memory_pool(&self, id: PoolId) -> &Self {
36        self.0.pool.set(id);
37        self
38    }
39
40    /// Set io tag for the service.
41    pub fn tag(&self, tag: &'static str) -> &Self {
42        self.0.tag.set(Some(tag));
43        self
44    }
45
46    pub(super) fn get_pool_id(&self) -> PoolId {
47        self.0.pool.get()
48    }
49
50    pub(super) fn get_tag(&self) -> Option<&'static str> {
51        self.0.tag.get()
52    }
53}
54
55#[derive(Clone, Debug)]
56pub struct ServiceConfig(pub(super) Rc<RefCell<ServiceConfigInner>>);
57
58#[derive(Debug)]
59struct Socket {
60    name: String,
61    sockets: Vec<(Token, Listener, &'static str)>,
62}
63
64pub(super) struct ServiceConfigInner {
65    token: Token,
66    apply: Option<Box<dyn OnWorkerStart>>,
67    sockets: Vec<Socket>,
68    backlog: i32,
69}
70
71impl fmt::Debug for ServiceConfigInner {
72    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
73        f.debug_struct("ServiceConfigInner")
74            .field("token", &self.token)
75            .field("backlog", &self.backlog)
76            .field("sockets", &self.sockets)
77            .finish()
78    }
79}
80
81impl ServiceConfig {
82    pub(super) fn new(token: Token, backlog: i32) -> Self {
83        ServiceConfig(Rc::new(RefCell::new(ServiceConfigInner {
84            token,
85            backlog,
86            sockets: Vec::new(),
87            apply: Some(OnWorkerStartWrapper::create(|_| {
88                not_configured();
89                Ready::Ok::<_, &str>(())
90            })),
91        })))
92    }
93
94    /// Add new service to the server.
95    pub fn bind<U, N: AsRef<str>>(&self, name: N, addr: U) -> io::Result<&Self>
96    where
97        U: net::ToSocketAddrs,
98    {
99        let mut inner = self.0.borrow_mut();
100
101        let sockets = bind_addr(addr, inner.backlog)?;
102        let socket = Socket {
103            name: name.as_ref().to_string(),
104            sockets: sockets
105                .into_iter()
106                .map(|lst| (inner.token.next(), Listener::from_tcp(lst), ""))
107                .collect(),
108        };
109        inner.sockets.push(socket);
110
111        Ok(self)
112    }
113
114    /// Add new service to the server.
115    pub fn listen<N: AsRef<str>>(&self, name: N, lst: net::TcpListener) -> &Self {
116        let mut inner = self.0.borrow_mut();
117        let socket = Socket {
118            name: name.as_ref().to_string(),
119            sockets: vec![(inner.token.next(), Listener::from_tcp(lst), "")],
120        };
121        inner.sockets.push(socket);
122
123        self
124    }
125
126    /// Set io tag for configured service.
127    pub fn set_tag<N: AsRef<str>>(&self, name: N, tag: &'static str) -> &Self {
128        let mut inner = self.0.borrow_mut();
129        for sock in &mut inner.sockets {
130            if sock.name == name.as_ref() {
131                for item in &mut sock.sockets {
132                    item.2 = tag;
133                }
134            }
135        }
136        self
137    }
138
139    /// Register async service configuration function.
140    ///
141    /// This function get called during worker runtime configuration stage.
142    /// It get executed in the worker thread.
143    pub fn on_worker_start<F, R, E>(&self, f: F) -> &Self
144    where
145        F: Fn(ServiceRuntime) -> R + Send + Clone + 'static,
146        R: Future<Output = Result<(), E>> + 'static,
147        E: fmt::Display + 'static,
148    {
149        self.0.borrow_mut().apply = Some(OnWorkerStartWrapper::create(f));
150        self
151    }
152
153    pub(super) fn into_factory(
154        self,
155    ) -> (Token, Vec<(Token, String, Listener)>, FactoryServiceType) {
156        let mut inner = self.0.borrow_mut();
157
158        let mut sockets = Vec::new();
159        let mut names = HashMap::default();
160        for (idx, s) in mem::take(&mut inner.sockets).into_iter().enumerate() {
161            names.insert(
162                s.name.clone(),
163                Entry {
164                    idx,
165                    pool: PoolId::DEFAULT,
166                    tokens: s
167                        .sockets
168                        .iter()
169                        .map(|(token, _, tag)| (*token, *tag))
170                        .collect(),
171                },
172            );
173
174            sockets.extend(
175                s.sockets
176                    .into_iter()
177                    .map(|(token, lst, _)| (token, s.name.clone(), lst)),
178            );
179        }
180
181        (
182            inner.token,
183            sockets,
184            Box::new(ConfiguredService {
185                rt: inner.apply.take().unwrap(),
186                names,
187            }),
188        )
189    }
190}
191
192struct ConfiguredService {
193    rt: Box<dyn OnWorkerStart>,
194    names: HashMap<String, Entry>,
195}
196
197impl FactoryService for ConfiguredService {
198    fn clone_factory(&self) -> FactoryServiceType {
199        Box::new(Self {
200            rt: self.rt.clone(),
201            names: self.names.clone(),
202        })
203    }
204
205    fn create(&self) -> BoxFuture<'static, Result<Vec<NetService>, ()>> {
206        // configure services
207        let rt = ServiceRuntime::new(self.names.clone());
208        let cfg_fut = self.rt.run(ServiceRuntime(rt.0.clone()));
209
210        // construct services
211        Box::pin(async move {
212            cfg_fut.await?;
213            rt.validate();
214
215            let names = mem::take(&mut rt.0.borrow_mut().names);
216            let mut services = mem::take(&mut rt.0.borrow_mut().services);
217
218            let mut res = Vec::new();
219            while let Some(svc) = services.pop() {
220                if let Some(svc) = svc {
221                    for entry in names.values() {
222                        if entry.idx == services.len() {
223                            res.push(NetService {
224                                pool: entry.pool,
225                                tokens: entry.tokens.clone(),
226                                factory: svc,
227                            });
228                            break;
229                        }
230                    }
231                }
232            }
233            Ok(res)
234        })
235    }
236}
237
238fn not_configured() {
239    log::error!("Service is not configured");
240}
241
242pub struct ServiceRuntime(Rc<RefCell<ServiceRuntimeInner>>);
243
244#[derive(Debug, Clone)]
245struct Entry {
246    idx: usize,
247    pool: PoolId,
248    tokens: Vec<(Token, &'static str)>,
249}
250
251struct ServiceRuntimeInner {
252    names: HashMap<String, Entry>,
253    services: Vec<Option<BoxServerService>>,
254}
255
256impl fmt::Debug for ServiceRuntime {
257    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
258        let inner = self.0.borrow();
259        f.debug_struct("ServiceRuntimer")
260            .field("names", &inner.names)
261            .field("services", &inner.services)
262            .finish()
263    }
264}
265
266impl ServiceRuntime {
267    fn new(names: HashMap<String, Entry>) -> Self {
268        ServiceRuntime(Rc::new(RefCell::new(ServiceRuntimeInner {
269            services: (0..names.len()).map(|_| None).collect(),
270            names,
271        })))
272    }
273
274    fn validate(&self) {
275        let inner = self.0.as_ref().borrow();
276        for (name, item) in &inner.names {
277            if inner.services[item.idx].is_none() {
278                log::error!("Service {:?} is not configured", name);
279            }
280        }
281    }
282
283    /// Register service.
284    ///
285    /// Name of the service must be registered during configuration stage with
286    /// *ServiceConfig::bind()* or *ServiceConfig::listen()* methods.
287    pub fn service<T, F>(&self, name: &str, service: F)
288    where
289        F: IntoServiceFactory<T, Io>,
290        T: ServiceFactory<Io> + 'static,
291        T::Service: 'static,
292        T::InitError: fmt::Debug,
293    {
294        self.service_in(name, PoolId::P0, service)
295    }
296
297    /// Register service with memory pool.
298    ///
299    /// Name of the service must be registered during configuration stage with
300    /// *ServiceConfig::bind()* or *ServiceConfig::listen()* methods.
301    pub fn service_in<T, F>(&self, name: &str, pool: PoolId, service: F)
302    where
303        F: IntoServiceFactory<T, Io>,
304        T: ServiceFactory<Io> + 'static,
305        T::Service: 'static,
306        T::InitError: fmt::Debug,
307    {
308        let mut inner = self.0.borrow_mut();
309        if let Some(entry) = inner.names.get_mut(name) {
310            let idx = entry.idx;
311            entry.pool = pool;
312            inner.services[idx] = Some(factory::create_boxed_factory(
313                name.to_string(),
314                service.into_factory(),
315            ));
316        } else {
317            panic!("Unknown service: {:?}", name);
318        }
319    }
320}
321
322trait OnWorkerStart: Send {
323    fn clone(&self) -> Box<dyn OnWorkerStart>;
324
325    fn run(&self, rt: ServiceRuntime) -> BoxFuture<'static, Result<(), ()>>;
326}
327
328struct OnWorkerStartWrapper<F, R, E> {
329    pub(super) f: F,
330    pub(super) _t: marker::PhantomData<(R, E)>,
331}
332
333impl<F, R, E> OnWorkerStartWrapper<F, R, E>
334where
335    F: Fn(ServiceRuntime) -> R + Send + Clone + 'static,
336    R: Future<Output = Result<(), E>> + 'static,
337    E: fmt::Display + 'static,
338{
339    pub(super) fn create(f: F) -> Box<dyn OnWorkerStart + Send> {
340        Box::new(Self {
341            f,
342            _t: marker::PhantomData,
343        })
344    }
345}
346
347// SAFETY: Send cannot be provided authomatically because of R param
348// but R always get executed in one thread and never leave it
349unsafe impl<F, R, E> Send for OnWorkerStartWrapper<F, R, E> where F: Send {}
350
351impl<F, R, E> OnWorkerStart for OnWorkerStartWrapper<F, R, E>
352where
353    F: Fn(ServiceRuntime) -> R + Send + Clone + 'static,
354    R: Future<Output = Result<(), E>> + 'static,
355    E: fmt::Display + 'static,
356{
357    fn clone(&self) -> Box<dyn OnWorkerStart> {
358        Box::new(Self {
359            f: self.f.clone(),
360            _t: marker::PhantomData,
361        })
362    }
363
364    fn run(&self, rt: ServiceRuntime) -> BoxFuture<'static, Result<(), ()>> {
365        let f = self.f.clone();
366        Box::pin(async move {
367            (f)(rt).await.map_err(|e| {
368                log::error!("On worker start callback failed: {}", e);
369            })
370        })
371    }
372}