1use std::{cell::Cell, cell::RefCell, fmt, io, marker, mem, net, rc::Rc};
2
3use ntex_io::Io;
4use ntex_service::{IntoServiceFactory, ServiceFactory, cfg::SharedCfg};
5use ntex_util::{HashMap, future::BoxFuture, future::Ready};
6
7use super::factory::{
8 self, BoxServerService, FactoryService, FactoryServiceType, NetService,
9};
10use super::{Token, builder::bind_addr, socket::Listener};
11
12#[derive(Clone, Debug)]
13pub struct Config(Rc<InnerServiceConfig>);
14
15#[derive(Debug)]
16pub(super) struct InnerServiceConfig {
17 pub(super) config: Cell<Option<SharedCfg>>,
18}
19
20impl Default for Config {
21 fn default() -> Self {
22 Self(Rc::new(InnerServiceConfig {
23 config: Cell::new(None),
24 }))
25 }
26}
27
28impl Config {
29 pub fn config<T: Into<SharedCfg>>(&self, cfg: T) -> &Self {
31 self.0.config.set(Some(cfg.into()));
32 self
33 }
34
35 pub(super) fn get_config(&self) -> Option<SharedCfg> {
36 self.0.config.get()
37 }
38}
39
40#[derive(Clone, Debug)]
41pub struct ServiceConfig(pub(super) Rc<RefCell<ServiceConfigInner>>);
42
43#[derive(Debug)]
44struct Socket {
45 name: String,
46 config: SharedCfg,
47 sockets: Vec<(Token, Listener, SharedCfg)>,
48}
49
50pub(super) struct ServiceConfigInner {
51 token: Token,
52 apply: Option<Box<dyn OnWorkerStart>>,
53 sockets: Vec<Socket>,
54 backlog: i32,
55}
56
57impl fmt::Debug for ServiceConfigInner {
58 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
59 f.debug_struct("ServiceConfigInner")
60 .field("token", &self.token)
61 .field("backlog", &self.backlog)
62 .field("sockets", &self.sockets)
63 .finish()
64 }
65}
66
67impl ServiceConfig {
68 pub(super) fn new(token: Token, backlog: i32) -> Self {
69 ServiceConfig(Rc::new(RefCell::new(ServiceConfigInner {
70 token,
71 backlog,
72 sockets: Vec::new(),
73 apply: Some(OnWorkerStartWrapper::create(|_| {
74 not_configured();
75 Ready::Ok::<_, &str>(())
76 })),
77 })))
78 }
79
80 pub fn bind<U, N: AsRef<str>>(&self, name: N, addr: U) -> io::Result<&Self>
82 where
83 U: net::ToSocketAddrs,
84 {
85 let mut inner = self.0.borrow_mut();
86
87 let sockets = bind_addr(addr, inner.backlog)?;
88 let socket = Socket {
89 name: name.as_ref().to_string(),
90 config: SharedCfg::default(),
91 sockets: sockets
92 .into_iter()
93 .map(|lst| {
94 (
95 inner.token.next(),
96 Listener::from_tcp(lst),
97 SharedCfg::default(),
98 )
99 })
100 .collect(),
101 };
102 inner.sockets.push(socket);
103
104 Ok(self)
105 }
106
107 pub fn listen<N: AsRef<str>>(&self, name: N, lst: net::TcpListener) -> &Self {
109 let mut inner = self.0.borrow_mut();
110 let socket = Socket {
111 name: name.as_ref().to_string(),
112 config: SharedCfg::default(),
113 sockets: vec![(
114 inner.token.next(),
115 Listener::from_tcp(lst),
116 SharedCfg::default(),
117 )],
118 };
119 inner.sockets.push(socket);
120
121 self
122 }
123
124 pub fn config<N, T>(&self, name: N, cfg: T) -> &Self
126 where
127 N: AsRef<str>,
128 T: Into<SharedCfg>,
129 {
130 let cfg = cfg.into();
131 let mut inner = self.0.borrow_mut();
132 for sock in &mut inner.sockets {
133 if sock.name == name.as_ref() {
134 sock.config = cfg;
135 for item in &mut sock.sockets {
136 item.2 = cfg;
137 }
138 }
139 }
140 self
141 }
142
143 pub fn on_worker_start<F, E>(&self, f: F) -> &Self
148 where
149 F: AsyncFn(ServiceRuntime) -> Result<(), E> + Send + Clone + 'static,
150 E: fmt::Display + 'static,
151 {
152 self.0.borrow_mut().apply = Some(OnWorkerStartWrapper::create(f));
153 self
154 }
155
156 pub(super) fn into_factory(
157 self,
158 ) -> (Token, Vec<(Token, String, Listener)>, FactoryServiceType) {
159 let mut inner = self.0.borrow_mut();
160
161 let mut sockets = Vec::new();
162 let mut names = HashMap::default();
163 for (idx, s) in mem::take(&mut inner.sockets).into_iter().enumerate() {
164 names.insert(
165 s.name.clone(),
166 Entry {
167 idx,
168 name: s.name.clone(),
169 config: s.config,
170 tokens: s
171 .sockets
172 .iter()
173 .map(|(token, _, tag)| (*token, *tag))
174 .collect(),
175 },
176 );
177
178 sockets.extend(
179 s.sockets
180 .into_iter()
181 .map(|(token, lst, _)| (token, s.name.clone(), lst)),
182 );
183 }
184
185 (
186 inner.token,
187 sockets,
188 Box::new(ConfiguredService {
189 names,
190 rt: inner.apply.take().unwrap(),
191 }),
192 )
193 }
194}
195
196struct ConfiguredService {
197 rt: Box<dyn OnWorkerStart>,
198 names: HashMap<String, Entry>,
199}
200
201impl FactoryService for ConfiguredService {
202 fn clone_factory(&self) -> FactoryServiceType {
203 Box::new(Self {
204 rt: self.rt.clone(),
205 names: self.names.clone(),
206 })
207 }
208
209 fn create(&self) -> BoxFuture<'static, Result<Vec<NetService>, ()>> {
210 let rt = ServiceRuntime::new(self.names.clone());
212 let cfg_fut = self.rt.run(ServiceRuntime(rt.0.clone()));
213
214 Box::pin(async move {
216 cfg_fut.await?;
217 rt.validate();
218
219 let names = mem::take(&mut rt.0.borrow_mut().names);
220 let mut services = mem::take(&mut rt.0.borrow_mut().services);
221
222 let mut res = Vec::new();
223 while let Some(svc) = services.pop() {
224 if let Some(svc) = svc {
225 for entry in names.values() {
226 if entry.idx == services.len() {
227 res.push(NetService {
228 config: entry.config,
229 name: std::sync::Arc::from(entry.name.clone()),
230 tokens: entry.tokens.clone(),
231 factory: svc,
232 });
233 break;
234 }
235 }
236 }
237 }
238 Ok(res)
239 })
240 }
241}
242
243fn not_configured() {
244 log::error!("Service is not configured");
245}
246
247pub struct ServiceRuntime(Rc<RefCell<ServiceRuntimeInner>>);
248
249#[derive(Debug, Clone)]
250struct Entry {
251 idx: usize,
252 name: String,
253 config: SharedCfg,
254 tokens: Vec<(Token, SharedCfg)>,
255}
256
257struct ServiceRuntimeInner {
258 names: HashMap<String, Entry>,
259 services: Vec<Option<BoxServerService>>,
260}
261
262impl fmt::Debug for ServiceRuntime {
263 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
264 let inner = self.0.borrow();
265 f.debug_struct("ServiceRuntimer")
266 .field("names", &inner.names)
267 .field("services", &inner.services)
268 .finish()
269 }
270}
271
272impl ServiceRuntime {
273 fn new(names: HashMap<String, Entry>) -> Self {
274 ServiceRuntime(Rc::new(RefCell::new(ServiceRuntimeInner {
275 services: (0..names.len()).map(|_| None).collect(),
276 names,
277 })))
278 }
279
280 fn validate(&self) {
281 let inner = self.0.as_ref().borrow();
282 for (name, item) in &inner.names {
283 if inner.services[item.idx].is_none() {
284 log::error!("Service {name:?} is not configured");
285 }
286 }
287 }
288
289 pub fn service<T, F>(&self, name: &str, service: F) -> &Self
294 where
295 F: IntoServiceFactory<T, Io, SharedCfg>,
296 T: ServiceFactory<Io, SharedCfg> + 'static,
297 T::Service: 'static,
298 T::InitError: fmt::Debug,
299 {
300 let mut inner = self.0.borrow_mut();
301 if let Some(entry) = inner.names.get_mut(name) {
302 let idx = entry.idx;
303 inner.services[idx] = Some(factory::create_boxed_factory(
304 name.to_string(),
305 service.into_factory(),
306 ));
307 } else {
308 panic!("Unknown service: {name:?}");
309 }
310 self
311 }
312
313 pub fn config<T: Into<SharedCfg>>(&self, name: &str, cfg: T) -> &Self {
315 let mut inner = self.0.borrow_mut();
316 if let Some(entry) = inner.names.get_mut(name) {
317 entry.config = cfg.into();
318 }
319 self
320 }
321}
322
323trait OnWorkerStart: Send {
324 fn clone(&self) -> Box<dyn OnWorkerStart>;
325
326 fn run(&self, rt: ServiceRuntime) -> BoxFuture<'static, Result<(), ()>>;
327}
328
329struct OnWorkerStartWrapper<F, E> {
330 pub(super) f: F,
331 pub(super) _t: marker::PhantomData<E>,
332}
333
334impl<F, E> OnWorkerStartWrapper<F, E>
335where
336 F: AsyncFn(ServiceRuntime) -> Result<(), E> + Send + Clone + 'static,
337 E: fmt::Display + 'static,
338{
339 pub(super) fn create(f: F) -> Box<dyn OnWorkerStart + Send> {
340 Box::new(Self {
341 f,
342 _t: marker::PhantomData,
343 })
344 }
345}
346
347unsafe impl<F, E> Send for OnWorkerStartWrapper<F, E> where F: Send {}
350
351impl<F, E> OnWorkerStart for OnWorkerStartWrapper<F, E>
352where
353 F: AsyncFn(ServiceRuntime) -> Result<(), E> + Send + Clone + 'static,
354 E: fmt::Display + 'static,
355{
356 fn clone(&self) -> Box<dyn OnWorkerStart> {
357 Box::new(Self {
358 f: self.f.clone(),
359 _t: marker::PhantomData,
360 })
361 }
362
363 fn run(&self, rt: ServiceRuntime) -> BoxFuture<'static, Result<(), ()>> {
364 let f = self.f.clone();
365 Box::pin(async move {
366 (f)(rt).await.map_err(|e| {
367 log::error!("On worker start callback failed: {e}");
368 })
369 })
370 }
371}