1use std::{cell::Cell, cell::RefCell, fmt, future::Future, io, marker, mem, net, rc::Rc};
2
3use ntex_io::Io;
4use ntex_service::{IntoServiceFactory, ServiceFactory, cfg::SharedCfg};
5use ntex_util::{HashMap, future::BoxFuture, future::Ready};
6
7use super::factory::{
8 self, BoxServerService, FactoryService, FactoryServiceType, NetService,
9};
10use super::{Token, builder::bind_addr, socket::Listener};
11
12#[derive(Clone, Debug)]
13pub struct Config(Rc<InnerServiceConfig>);
14
15#[derive(Debug)]
16pub(super) struct InnerServiceConfig {
17 pub(super) config: Cell<Option<SharedCfg>>,
18}
19
20impl Default for Config {
21 fn default() -> Self {
22 Self(Rc::new(InnerServiceConfig {
23 config: Cell::new(None),
24 }))
25 }
26}
27
28impl Config {
29 pub fn config<T: Into<SharedCfg>>(&self, cfg: T) -> &Self {
31 self.0.config.set(Some(cfg.into()));
32 self
33 }
34
35 pub(super) fn get_config(&self) -> Option<SharedCfg> {
36 self.0.config.get()
37 }
38}
39
40#[derive(Clone, Debug)]
41pub struct ServiceConfig(pub(super) Rc<RefCell<ServiceConfigInner>>);
42
43#[derive(Debug)]
44struct Socket {
45 name: String,
46 config: SharedCfg,
47 sockets: Vec<(Token, Listener, SharedCfg)>,
48}
49
50pub(super) struct ServiceConfigInner {
51 token: Token,
52 apply: Option<Box<dyn OnWorkerStart>>,
53 sockets: Vec<Socket>,
54 backlog: i32,
55}
56
57impl fmt::Debug for ServiceConfigInner {
58 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
59 f.debug_struct("ServiceConfigInner")
60 .field("token", &self.token)
61 .field("backlog", &self.backlog)
62 .field("sockets", &self.sockets)
63 .finish()
64 }
65}
66
67impl ServiceConfig {
68 pub(super) fn new(token: Token, backlog: i32) -> Self {
69 ServiceConfig(Rc::new(RefCell::new(ServiceConfigInner {
70 token,
71 backlog,
72 sockets: Vec::new(),
73 apply: Some(OnWorkerStartWrapper::create(|_| {
74 not_configured();
75 Ready::Ok::<_, &str>(())
76 })),
77 })))
78 }
79
80 pub fn bind<U, N: AsRef<str>>(&self, name: N, addr: U) -> io::Result<&Self>
82 where
83 U: net::ToSocketAddrs,
84 {
85 let mut inner = self.0.borrow_mut();
86
87 let sockets = bind_addr(addr, inner.backlog)?;
88 let socket = Socket {
89 name: name.as_ref().to_string(),
90 config: SharedCfg::default(),
91 sockets: sockets
92 .into_iter()
93 .map(|lst| {
94 (
95 inner.token.next(),
96 Listener::from_tcp(lst),
97 SharedCfg::default(),
98 )
99 })
100 .collect(),
101 };
102 inner.sockets.push(socket);
103
104 Ok(self)
105 }
106
107 pub fn listen<N: AsRef<str>>(&self, name: N, lst: net::TcpListener) -> &Self {
109 let mut inner = self.0.borrow_mut();
110 let socket = Socket {
111 name: name.as_ref().to_string(),
112 config: SharedCfg::default(),
113 sockets: vec![(
114 inner.token.next(),
115 Listener::from_tcp(lst),
116 SharedCfg::default(),
117 )],
118 };
119 inner.sockets.push(socket);
120
121 self
122 }
123
124 pub fn config<N, T>(&self, name: N, cfg: T) -> &Self
126 where
127 N: AsRef<str>,
128 T: Into<SharedCfg>,
129 {
130 let cfg = cfg.into();
131 let mut inner = self.0.borrow_mut();
132 for sock in &mut inner.sockets {
133 if sock.name == name.as_ref() {
134 sock.config = cfg;
135 for item in &mut sock.sockets {
136 item.2 = cfg;
137 }
138 }
139 }
140 self
141 }
142
143 pub fn on_worker_start<F, R, E>(&self, f: F) -> &Self
148 where
149 F: Fn(ServiceRuntime) -> R + Send + Clone + 'static,
150 R: Future<Output = Result<(), E>> + 'static,
151 E: fmt::Display + 'static,
152 {
153 self.0.borrow_mut().apply = Some(OnWorkerStartWrapper::create(f));
154 self
155 }
156
157 pub(super) fn into_factory(
158 self,
159 ) -> (Token, Vec<(Token, String, Listener)>, FactoryServiceType) {
160 let mut inner = self.0.borrow_mut();
161
162 let mut sockets = Vec::new();
163 let mut names = HashMap::default();
164 for (idx, s) in mem::take(&mut inner.sockets).into_iter().enumerate() {
165 names.insert(
166 s.name.clone(),
167 Entry {
168 idx,
169 name: s.name.clone(),
170 config: s.config,
171 tokens: s
172 .sockets
173 .iter()
174 .map(|(token, _, tag)| (*token, *tag))
175 .collect(),
176 },
177 );
178
179 sockets.extend(
180 s.sockets
181 .into_iter()
182 .map(|(token, lst, _)| (token, s.name.clone(), lst)),
183 );
184 }
185
186 (
187 inner.token,
188 sockets,
189 Box::new(ConfiguredService {
190 names,
191 rt: inner.apply.take().unwrap(),
192 }),
193 )
194 }
195}
196
197struct ConfiguredService {
198 rt: Box<dyn OnWorkerStart>,
199 names: HashMap<String, Entry>,
200}
201
202impl FactoryService for ConfiguredService {
203 fn clone_factory(&self) -> FactoryServiceType {
204 Box::new(Self {
205 rt: self.rt.clone(),
206 names: self.names.clone(),
207 })
208 }
209
210 fn create(&self) -> BoxFuture<'static, Result<Vec<NetService>, ()>> {
211 let rt = ServiceRuntime::new(self.names.clone());
213 let cfg_fut = self.rt.run(ServiceRuntime(rt.0.clone()));
214
215 Box::pin(async move {
217 cfg_fut.await?;
218 rt.validate();
219
220 let names = mem::take(&mut rt.0.borrow_mut().names);
221 let mut services = mem::take(&mut rt.0.borrow_mut().services);
222
223 let mut res = Vec::new();
224 while let Some(svc) = services.pop() {
225 if let Some(svc) = svc {
226 for entry in names.values() {
227 if entry.idx == services.len() {
228 res.push(NetService {
229 config: entry.config,
230 name: std::sync::Arc::from(entry.name.clone()),
231 tokens: entry.tokens.clone(),
232 factory: svc,
233 });
234 break;
235 }
236 }
237 }
238 }
239 Ok(res)
240 })
241 }
242}
243
244fn not_configured() {
245 log::error!("Service is not configured");
246}
247
248pub struct ServiceRuntime(Rc<RefCell<ServiceRuntimeInner>>);
249
250#[derive(Debug, Clone)]
251struct Entry {
252 idx: usize,
253 name: String,
254 config: SharedCfg,
255 tokens: Vec<(Token, SharedCfg)>,
256}
257
258struct ServiceRuntimeInner {
259 names: HashMap<String, Entry>,
260 services: Vec<Option<BoxServerService>>,
261}
262
263impl fmt::Debug for ServiceRuntime {
264 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
265 let inner = self.0.borrow();
266 f.debug_struct("ServiceRuntimer")
267 .field("names", &inner.names)
268 .field("services", &inner.services)
269 .finish()
270 }
271}
272
273impl ServiceRuntime {
274 fn new(names: HashMap<String, Entry>) -> Self {
275 ServiceRuntime(Rc::new(RefCell::new(ServiceRuntimeInner {
276 services: (0..names.len()).map(|_| None).collect(),
277 names,
278 })))
279 }
280
281 fn validate(&self) {
282 let inner = self.0.as_ref().borrow();
283 for (name, item) in &inner.names {
284 if inner.services[item.idx].is_none() {
285 log::error!("Service {name:?} is not configured");
286 }
287 }
288 }
289
290 pub fn service<T, F>(&self, name: &str, service: F)
295 where
296 F: IntoServiceFactory<T, Io, SharedCfg>,
297 T: ServiceFactory<Io, SharedCfg> + 'static,
298 T::Service: 'static,
299 T::InitError: fmt::Debug,
300 {
301 let mut inner = self.0.borrow_mut();
302 if let Some(entry) = inner.names.get_mut(name) {
303 let idx = entry.idx;
304 inner.services[idx] = Some(factory::create_boxed_factory(
305 name.to_string(),
306 service.into_factory(),
307 ));
308 } else {
309 panic!("Unknown service: {name:?}");
310 }
311 }
312
313 pub fn config(&self, name: &str, cfg: SharedCfg) {
315 let mut inner = self.0.borrow_mut();
316 if let Some(entry) = inner.names.get_mut(name) {
317 entry.config = cfg;
318 }
319 }
320}
321
322trait OnWorkerStart: Send {
323 fn clone(&self) -> Box<dyn OnWorkerStart>;
324
325 fn run(&self, rt: ServiceRuntime) -> BoxFuture<'static, Result<(), ()>>;
326}
327
328struct OnWorkerStartWrapper<F, R, E> {
329 pub(super) f: F,
330 pub(super) _t: marker::PhantomData<(R, E)>,
331}
332
333impl<F, R, E> OnWorkerStartWrapper<F, R, E>
334where
335 F: Fn(ServiceRuntime) -> R + Send + Clone + 'static,
336 R: Future<Output = Result<(), E>> + 'static,
337 E: fmt::Display + 'static,
338{
339 pub(super) fn create(f: F) -> Box<dyn OnWorkerStart + Send> {
340 Box::new(Self {
341 f,
342 _t: marker::PhantomData,
343 })
344 }
345}
346
347unsafe impl<F, R, E> Send for OnWorkerStartWrapper<F, R, E> where F: Send {}
350
351impl<F, R, E> OnWorkerStart for OnWorkerStartWrapper<F, R, E>
352where
353 F: Fn(ServiceRuntime) -> R + Send + Clone + 'static,
354 R: Future<Output = Result<(), E>> + 'static,
355 E: fmt::Display + 'static,
356{
357 fn clone(&self) -> Box<dyn OnWorkerStart> {
358 Box::new(Self {
359 f: self.f.clone(),
360 _t: marker::PhantomData,
361 })
362 }
363
364 fn run(&self, rt: ServiceRuntime) -> BoxFuture<'static, Result<(), ()>> {
365 let f = self.f.clone();
366 Box::pin(async move {
367 (f)(rt).await.map_err(|e| {
368 log::error!("On worker start callback failed: {e}");
369 })
370 })
371 }
372}