1use std::{cell::Cell, cell::RefCell, fmt, future::Future, io, marker, mem, net, rc::Rc};
2
3use ntex_bytes::PoolId;
4use ntex_net::Io;
5use ntex_service::{IntoServiceFactory, ServiceFactory};
6use ntex_util::{future::BoxFuture, future::Ready, HashMap};
7
8use super::factory::{
9 self, BoxServerService, FactoryService, FactoryServiceType, NetService,
10};
11use super::{builder::bind_addr, socket::Listener, Token};
12
13#[derive(Clone, Debug)]
14pub struct Config(Rc<InnerServiceConfig>);
15
16#[derive(Debug)]
17pub(super) struct InnerServiceConfig {
18 pub(super) pool: Cell<PoolId>,
19 pub(super) tag: Cell<Option<&'static str>>,
20}
21
22impl Default for Config {
23 fn default() -> Self {
24 Self(Rc::new(InnerServiceConfig {
25 pool: Cell::new(PoolId::DEFAULT),
26 tag: Cell::new(None),
27 }))
28 }
29}
30
31impl Config {
32 pub fn memory_pool(&self, id: PoolId) -> &Self {
36 self.0.pool.set(id);
37 self
38 }
39
40 pub fn tag(&self, tag: &'static str) -> &Self {
42 self.0.tag.set(Some(tag));
43 self
44 }
45
46 pub(super) fn get_pool_id(&self) -> PoolId {
47 self.0.pool.get()
48 }
49
50 pub(super) fn get_tag(&self) -> Option<&'static str> {
51 self.0.tag.get()
52 }
53}
54
55#[derive(Clone, Debug)]
56pub struct ServiceConfig(pub(super) Rc<RefCell<ServiceConfigInner>>);
57
58#[derive(Debug)]
59struct Socket {
60 name: String,
61 sockets: Vec<(Token, Listener, &'static str)>,
62}
63
64pub(super) struct ServiceConfigInner {
65 token: Token,
66 apply: Option<Box<dyn OnWorkerStart>>,
67 sockets: Vec<Socket>,
68 backlog: i32,
69}
70
71impl fmt::Debug for ServiceConfigInner {
72 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
73 f.debug_struct("ServiceConfigInner")
74 .field("token", &self.token)
75 .field("backlog", &self.backlog)
76 .field("sockets", &self.sockets)
77 .finish()
78 }
79}
80
81impl ServiceConfig {
82 pub(super) fn new(token: Token, backlog: i32) -> Self {
83 ServiceConfig(Rc::new(RefCell::new(ServiceConfigInner {
84 token,
85 backlog,
86 sockets: Vec::new(),
87 apply: Some(OnWorkerStartWrapper::create(|_| {
88 not_configured();
89 Ready::Ok::<_, &str>(())
90 })),
91 })))
92 }
93
94 pub fn bind<U, N: AsRef<str>>(&self, name: N, addr: U) -> io::Result<&Self>
96 where
97 U: net::ToSocketAddrs,
98 {
99 let mut inner = self.0.borrow_mut();
100
101 let sockets = bind_addr(addr, inner.backlog)?;
102 let socket = Socket {
103 name: name.as_ref().to_string(),
104 sockets: sockets
105 .into_iter()
106 .map(|lst| (inner.token.next(), Listener::from_tcp(lst), ""))
107 .collect(),
108 };
109 inner.sockets.push(socket);
110
111 Ok(self)
112 }
113
114 pub fn listen<N: AsRef<str>>(&self, name: N, lst: net::TcpListener) -> &Self {
116 let mut inner = self.0.borrow_mut();
117 let socket = Socket {
118 name: name.as_ref().to_string(),
119 sockets: vec![(inner.token.next(), Listener::from_tcp(lst), "")],
120 };
121 inner.sockets.push(socket);
122
123 self
124 }
125
126 pub fn set_tag<N: AsRef<str>>(&self, name: N, tag: &'static str) -> &Self {
128 let mut inner = self.0.borrow_mut();
129 for sock in &mut inner.sockets {
130 if sock.name == name.as_ref() {
131 for item in &mut sock.sockets {
132 item.2 = tag;
133 }
134 }
135 }
136 self
137 }
138
139 pub fn on_worker_start<F, R, E>(&self, f: F) -> &Self
144 where
145 F: Fn(ServiceRuntime) -> R + Send + Clone + 'static,
146 R: Future<Output = Result<(), E>> + 'static,
147 E: fmt::Display + 'static,
148 {
149 self.0.borrow_mut().apply = Some(OnWorkerStartWrapper::create(f));
150 self
151 }
152
153 pub(super) fn into_factory(
154 self,
155 ) -> (Token, Vec<(Token, String, Listener)>, FactoryServiceType) {
156 let mut inner = self.0.borrow_mut();
157
158 let mut sockets = Vec::new();
159 let mut names = HashMap::default();
160 for (idx, s) in mem::take(&mut inner.sockets).into_iter().enumerate() {
161 names.insert(
162 s.name.clone(),
163 Entry {
164 idx,
165 pool: PoolId::DEFAULT,
166 name: s.name.clone(),
167 tokens: s
168 .sockets
169 .iter()
170 .map(|(token, _, tag)| (*token, *tag))
171 .collect(),
172 },
173 );
174
175 sockets.extend(
176 s.sockets
177 .into_iter()
178 .map(|(token, lst, _)| (token, s.name.clone(), lst)),
179 );
180 }
181
182 (
183 inner.token,
184 sockets,
185 Box::new(ConfiguredService {
186 rt: inner.apply.take().unwrap(),
187 names,
188 }),
189 )
190 }
191}
192
193struct ConfiguredService {
194 rt: Box<dyn OnWorkerStart>,
195 names: HashMap<String, Entry>,
196}
197
198impl FactoryService for ConfiguredService {
199 fn clone_factory(&self) -> FactoryServiceType {
200 Box::new(Self {
201 rt: self.rt.clone(),
202 names: self.names.clone(),
203 })
204 }
205
206 fn create(&self) -> BoxFuture<'static, Result<Vec<NetService>, ()>> {
207 let rt = ServiceRuntime::new(self.names.clone());
209 let cfg_fut = self.rt.run(ServiceRuntime(rt.0.clone()));
210
211 Box::pin(async move {
213 cfg_fut.await?;
214 rt.validate();
215
216 let names = mem::take(&mut rt.0.borrow_mut().names);
217 let mut services = mem::take(&mut rt.0.borrow_mut().services);
218
219 let mut res = Vec::new();
220 while let Some(svc) = services.pop() {
221 if let Some(svc) = svc {
222 for entry in names.values() {
223 if entry.idx == services.len() {
224 res.push(NetService {
225 name: std::sync::Arc::from(entry.name.clone()),
226 pool: entry.pool,
227 tokens: entry.tokens.clone(),
228 factory: svc,
229 });
230 break;
231 }
232 }
233 }
234 }
235 Ok(res)
236 })
237 }
238}
239
240fn not_configured() {
241 log::error!("Service is not configured");
242}
243
244pub struct ServiceRuntime(Rc<RefCell<ServiceRuntimeInner>>);
245
246#[derive(Debug, Clone)]
247struct Entry {
248 idx: usize,
249 pool: PoolId,
250 name: String,
251 tokens: Vec<(Token, &'static str)>,
252}
253
254struct ServiceRuntimeInner {
255 names: HashMap<String, Entry>,
256 services: Vec<Option<BoxServerService>>,
257}
258
259impl fmt::Debug for ServiceRuntime {
260 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
261 let inner = self.0.borrow();
262 f.debug_struct("ServiceRuntimer")
263 .field("names", &inner.names)
264 .field("services", &inner.services)
265 .finish()
266 }
267}
268
269impl ServiceRuntime {
270 fn new(names: HashMap<String, Entry>) -> Self {
271 ServiceRuntime(Rc::new(RefCell::new(ServiceRuntimeInner {
272 services: (0..names.len()).map(|_| None).collect(),
273 names,
274 })))
275 }
276
277 fn validate(&self) {
278 let inner = self.0.as_ref().borrow();
279 for (name, item) in &inner.names {
280 if inner.services[item.idx].is_none() {
281 log::error!("Service {:?} is not configured", name);
282 }
283 }
284 }
285
286 pub fn service<T, F>(&self, name: &str, service: F)
291 where
292 F: IntoServiceFactory<T, Io>,
293 T: ServiceFactory<Io> + 'static,
294 T::Service: 'static,
295 T::InitError: fmt::Debug,
296 {
297 self.service_in(name, PoolId::P0, service)
298 }
299
300 pub fn service_in<T, F>(&self, name: &str, pool: PoolId, service: F)
305 where
306 F: IntoServiceFactory<T, Io>,
307 T: ServiceFactory<Io> + 'static,
308 T::Service: 'static,
309 T::InitError: fmt::Debug,
310 {
311 let mut inner = self.0.borrow_mut();
312 if let Some(entry) = inner.names.get_mut(name) {
313 let idx = entry.idx;
314 entry.pool = pool;
315 inner.services[idx] = Some(factory::create_boxed_factory(
316 name.to_string(),
317 service.into_factory(),
318 ));
319 } else {
320 panic!("Unknown service: {:?}", name);
321 }
322 }
323}
324
325trait OnWorkerStart: Send {
326 fn clone(&self) -> Box<dyn OnWorkerStart>;
327
328 fn run(&self, rt: ServiceRuntime) -> BoxFuture<'static, Result<(), ()>>;
329}
330
331struct OnWorkerStartWrapper<F, R, E> {
332 pub(super) f: F,
333 pub(super) _t: marker::PhantomData<(R, E)>,
334}
335
336impl<F, R, E> OnWorkerStartWrapper<F, R, E>
337where
338 F: Fn(ServiceRuntime) -> R + Send + Clone + 'static,
339 R: Future<Output = Result<(), E>> + 'static,
340 E: fmt::Display + 'static,
341{
342 pub(super) fn create(f: F) -> Box<dyn OnWorkerStart + Send> {
343 Box::new(Self {
344 f,
345 _t: marker::PhantomData,
346 })
347 }
348}
349
350unsafe impl<F, R, E> Send for OnWorkerStartWrapper<F, R, E> where F: Send {}
353
354impl<F, R, E> OnWorkerStart for OnWorkerStartWrapper<F, R, E>
355where
356 F: Fn(ServiceRuntime) -> R + Send + Clone + 'static,
357 R: Future<Output = Result<(), E>> + 'static,
358 E: fmt::Display + 'static,
359{
360 fn clone(&self) -> Box<dyn OnWorkerStart> {
361 Box::new(Self {
362 f: self.f.clone(),
363 _t: marker::PhantomData,
364 })
365 }
366
367 fn run(&self, rt: ServiceRuntime) -> BoxFuture<'static, Result<(), ()>> {
368 let f = self.f.clone();
369 Box::pin(async move {
370 (f)(rt).await.map_err(|e| {
371 log::error!("On worker start callback failed: {}", e);
372 })
373 })
374 }
375}