1use std::{cell::Cell, cell::RefCell, fmt, io, marker, mem, net, rc::Rc};
2
3use ntex_io::Io;
4use ntex_service::{IntoServiceFactory, ServiceFactory, cfg::SharedCfg};
5use ntex_util::{HashMap, future::BoxFuture, future::Ready};
6
7use super::factory::{
8 self, BoxServerService, FactoryService, FactoryServiceType, NetService,
9};
10use super::{Token, builder::bind_addr, socket::Listener};
11
12#[derive(Clone, Debug)]
13pub struct Config(Rc<InnerServiceConfig>);
14
15#[derive(Debug)]
16pub(super) struct InnerServiceConfig {
17 pub(super) config: Cell<Option<SharedCfg>>,
18}
19
20impl Default for Config {
21 fn default() -> Self {
22 Self(Rc::new(InnerServiceConfig {
23 config: Cell::new(None),
24 }))
25 }
26}
27
28impl Config {
29 pub fn config<T: Into<SharedCfg>>(&self, cfg: T) -> &Self {
31 self.0.config.set(Some(cfg.into()));
32 self
33 }
34
35 pub(super) fn get_config(&self) -> Option<SharedCfg> {
36 self.0.config.get()
37 }
38}
39
40#[derive(Clone, Debug)]
41pub struct ServiceConfig(pub(super) Rc<RefCell<ServiceConfigInner>>);
42
43#[derive(Debug)]
44struct Socket {
45 name: String,
46 config: SharedCfg,
47 sockets: Vec<(Token, Listener, SharedCfg)>,
48}
49
50pub(super) struct ServiceConfigInner {
51 token: Token,
52 on_start_set: bool,
53 on_start: Vec<Box<dyn OnWorkerStart>>,
54 sockets: Vec<Socket>,
55 backlog: i32,
56}
57
58impl fmt::Debug for ServiceConfigInner {
59 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
60 f.debug_struct("ServiceConfigInner")
61 .field("token", &self.token)
62 .field("backlog", &self.backlog)
63 .field("sockets", &self.sockets)
64 .finish()
65 }
66}
67
68impl ServiceConfig {
69 pub(super) fn new(token: Token, backlog: i32) -> Self {
70 ServiceConfig(Rc::new(RefCell::new(ServiceConfigInner {
71 token,
72 backlog,
73 sockets: Vec::new(),
74 on_start_set: false,
75 on_start: vec![OnWorkerStartWrapper::create(|_| {
76 not_configured();
77 Ready::Ok::<_, &str>(())
78 })],
79 })))
80 }
81
82 pub fn bind<U, N: AsRef<str>>(&self, name: N, addr: U) -> io::Result<&Self>
84 where
85 U: net::ToSocketAddrs,
86 {
87 let mut inner = self.0.borrow_mut();
88
89 let sockets = bind_addr(addr, inner.backlog)?;
90 let socket = Socket {
91 name: name.as_ref().to_string(),
92 config: SharedCfg::default(),
93 sockets: sockets
94 .into_iter()
95 .map(|lst| {
96 (
97 inner.token.next(),
98 Listener::from_tcp(lst),
99 SharedCfg::default(),
100 )
101 })
102 .collect(),
103 };
104 inner.sockets.push(socket);
105
106 Ok(self)
107 }
108
109 pub fn listen<N: AsRef<str>>(&self, name: N, lst: net::TcpListener) -> &Self {
111 let mut inner = self.0.borrow_mut();
112 let socket = Socket {
113 name: name.as_ref().to_string(),
114 config: SharedCfg::default(),
115 sockets: vec![(
116 inner.token.next(),
117 Listener::from_tcp(lst),
118 SharedCfg::default(),
119 )],
120 };
121 inner.sockets.push(socket);
122
123 self
124 }
125
126 pub fn config<N, T>(&self, name: N, cfg: T) -> &Self
128 where
129 N: AsRef<str>,
130 T: Into<SharedCfg>,
131 {
132 let cfg = cfg.into();
133 let mut inner = self.0.borrow_mut();
134 for sock in &mut inner.sockets {
135 if sock.name == name.as_ref() {
136 sock.config = cfg;
137 for item in &mut sock.sockets {
138 item.2 = cfg;
139 }
140 }
141 }
142 self
143 }
144
145 pub fn on_worker_start<F, E>(&self, f: F) -> &Self
150 where
151 F: AsyncFn(ServiceRuntime) -> Result<(), E> + Send + Clone + 'static,
152 E: fmt::Display + 'static,
153 {
154 let mut inner = self.0.borrow_mut();
155 if !inner.on_start_set {
156 inner.on_start.clear();
157 inner.on_start_set = true;
158 }
159 inner.on_start.push(OnWorkerStartWrapper::create(f));
160 self
161 }
162
163 pub(super) fn into_factory(
164 self,
165 ) -> (Token, Vec<(Token, String, Listener)>, FactoryServiceType) {
166 let mut inner = self.0.borrow_mut();
167
168 let mut sockets = Vec::new();
169 let mut names = HashMap::default();
170 for (idx, s) in mem::take(&mut inner.sockets).into_iter().enumerate() {
171 names.insert(
172 s.name.clone(),
173 Entry {
174 idx,
175 name: s.name.clone(),
176 config: s.config,
177 tokens: s
178 .sockets
179 .iter()
180 .map(|(token, _, tag)| (*token, *tag))
181 .collect(),
182 },
183 );
184
185 sockets.extend(
186 s.sockets
187 .into_iter()
188 .map(|(token, lst, _)| (token, s.name.clone(), lst)),
189 );
190 }
191
192 (
193 inner.token,
194 sockets,
195 Box::new(ConfiguredService {
196 names,
197 on_start: mem::take(&mut inner.on_start),
198 }),
199 )
200 }
201}
202
203struct ConfiguredService {
204 names: HashMap<String, Entry>,
205 on_start: Vec<Box<dyn OnWorkerStart>>,
206}
207
208impl FactoryService for ConfiguredService {
209 fn clone_factory(&self) -> FactoryServiceType {
210 Box::new(Self {
211 names: self.names.clone(),
212 on_start: self.on_start.iter().map(|cb| (*cb).clone()).collect(),
213 })
214 }
215
216 fn create(&self) -> BoxFuture<'static, Result<Vec<NetService>, ()>> {
217 let rt = ServiceRuntime::new(self.names.clone());
219 let on_start: Vec<_> = self
220 .on_start
221 .iter()
222 .map(|cb| cb.run(ServiceRuntime(rt.0.clone())))
223 .collect();
224
225 Box::pin(async move {
227 for fut in on_start {
228 fut.await?;
229 }
230 rt.validate();
231
232 let names = mem::take(&mut rt.0.borrow_mut().names);
233 let mut services = mem::take(&mut rt.0.borrow_mut().services);
234
235 let mut res = Vec::new();
236 while let Some(svc) = services.pop() {
237 if let Some(svc) = svc {
238 for entry in names.values() {
239 if entry.idx == services.len() {
240 res.push(NetService {
241 config: entry.config,
242 name: std::sync::Arc::from(entry.name.clone()),
243 tokens: entry.tokens.clone(),
244 factory: svc,
245 });
246 break;
247 }
248 }
249 }
250 }
251 Ok(res)
252 })
253 }
254}
255
256fn not_configured() {
257 log::error!("Service is not configured");
258}
259
260pub struct ServiceRuntime(Rc<RefCell<ServiceRuntimeInner>>);
261
262#[derive(Debug, Clone)]
263struct Entry {
264 idx: usize,
265 name: String,
266 config: SharedCfg,
267 tokens: Vec<(Token, SharedCfg)>,
268}
269
270struct ServiceRuntimeInner {
271 names: HashMap<String, Entry>,
272 services: Vec<Option<BoxServerService>>,
273}
274
275impl fmt::Debug for ServiceRuntime {
276 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
277 let inner = self.0.borrow();
278 f.debug_struct("ServiceRuntimer")
279 .field("names", &inner.names)
280 .field("services", &inner.services)
281 .finish()
282 }
283}
284
285impl ServiceRuntime {
286 fn new(names: HashMap<String, Entry>) -> Self {
287 ServiceRuntime(Rc::new(RefCell::new(ServiceRuntimeInner {
288 services: (0..names.len()).map(|_| None).collect(),
289 names,
290 })))
291 }
292
293 fn validate(&self) {
294 let inner = self.0.as_ref().borrow();
295 for (name, item) in &inner.names {
296 if inner.services[item.idx].is_none() {
297 log::error!("Service {name:?} is not configured");
298 }
299 }
300 }
301
302 pub fn service<T, F>(&self, name: &str, service: F) -> &Self
311 where
312 F: IntoServiceFactory<T, Io, SharedCfg>,
313 T: ServiceFactory<Io, SharedCfg> + 'static,
314 T::Service: 'static,
315 T::InitError: fmt::Debug,
316 {
317 let mut inner = self.0.borrow_mut();
318 if let Some(entry) = inner.names.get_mut(name) {
319 let idx = entry.idx;
320 inner.services[idx] = Some(factory::create_boxed_factory(
321 name.to_string(),
322 service.into_factory(),
323 ));
324 } else {
325 panic!("Unknown service: {name:?}");
326 }
327 self
328 }
329
330 pub fn config<T: Into<SharedCfg>>(&self, name: &str, cfg: T) -> &Self {
332 let mut inner = self.0.borrow_mut();
333 if let Some(entry) = inner.names.get_mut(name) {
334 entry.config = cfg.into();
335 }
336 self
337 }
338}
339
340trait OnWorkerStart: Send {
341 fn clone(&self) -> Box<dyn OnWorkerStart>;
342
343 fn run(&self, rt: ServiceRuntime) -> BoxFuture<'static, Result<(), ()>>;
344}
345
346struct OnWorkerStartWrapper<F, E> {
347 pub(super) f: F,
348 pub(super) _t: marker::PhantomData<E>,
349}
350
351impl<F, E> OnWorkerStartWrapper<F, E>
352where
353 F: AsyncFn(ServiceRuntime) -> Result<(), E> + Send + Clone + 'static,
354 E: fmt::Display + 'static,
355{
356 pub(super) fn create(f: F) -> Box<dyn OnWorkerStart + Send> {
357 Box::new(Self {
358 f,
359 _t: marker::PhantomData,
360 })
361 }
362}
363
364unsafe impl<F, E> Send for OnWorkerStartWrapper<F, E> where F: Send {}
367
368impl<F, E> OnWorkerStart for OnWorkerStartWrapper<F, E>
369where
370 F: AsyncFn(ServiceRuntime) -> Result<(), E> + Send + Clone + 'static,
371 E: fmt::Display + 'static,
372{
373 fn clone(&self) -> Box<dyn OnWorkerStart> {
374 Box::new(Self {
375 f: self.f.clone(),
376 _t: marker::PhantomData,
377 })
378 }
379
380 fn run(&self, rt: ServiceRuntime) -> BoxFuture<'static, Result<(), ()>> {
381 let f = self.f.clone();
382 Box::pin(async move {
383 (f)(rt).await.map_err(|e| {
384 log::error!("On worker start callback failed: {e}");
385 })
386 })
387 }
388}