1use std::{cell::Cell, cell::RefCell, fmt, future::Future, io, marker, mem, net, rc::Rc};
2
3use ntex_bytes::PoolId;
4use ntex_net::Io;
5use ntex_service::{IntoServiceFactory, ServiceFactory};
6use ntex_util::{future::BoxFuture, future::Ready, HashMap};
7
8use super::factory::{
9 self, BoxServerService, FactoryService, FactoryServiceType, NetService,
10};
11use super::{builder::bind_addr, socket::Listener, Token};
12
13#[derive(Clone, Debug)]
14pub struct Config(Rc<InnerServiceConfig>);
15
16#[derive(Debug)]
17pub(super) struct InnerServiceConfig {
18 pub(super) pool: Cell<PoolId>,
19 pub(super) tag: Cell<Option<&'static str>>,
20}
21
22impl Default for Config {
23 fn default() -> Self {
24 Self(Rc::new(InnerServiceConfig {
25 pool: Cell::new(PoolId::DEFAULT),
26 tag: Cell::new(None),
27 }))
28 }
29}
30
31impl Config {
32 pub fn memory_pool(&self, id: PoolId) -> &Self {
36 self.0.pool.set(id);
37 self
38 }
39
40 pub fn tag(&self, tag: &'static str) -> &Self {
42 self.0.tag.set(Some(tag));
43 self
44 }
45
46 pub(super) fn get_pool_id(&self) -> PoolId {
47 self.0.pool.get()
48 }
49
50 pub(super) fn get_tag(&self) -> Option<&'static str> {
51 self.0.tag.get()
52 }
53}
54
55#[derive(Clone, Debug)]
56pub struct ServiceConfig(pub(super) Rc<RefCell<ServiceConfigInner>>);
57
58#[derive(Debug)]
59struct Socket {
60 name: String,
61 sockets: Vec<(Token, Listener, &'static str)>,
62}
63
64pub(super) struct ServiceConfigInner {
65 token: Token,
66 apply: Option<Box<dyn OnWorkerStart>>,
67 sockets: Vec<Socket>,
68 backlog: i32,
69}
70
71impl fmt::Debug for ServiceConfigInner {
72 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
73 f.debug_struct("ServiceConfigInner")
74 .field("token", &self.token)
75 .field("backlog", &self.backlog)
76 .field("sockets", &self.sockets)
77 .finish()
78 }
79}
80
81impl ServiceConfig {
82 pub(super) fn new(token: Token, backlog: i32) -> Self {
83 ServiceConfig(Rc::new(RefCell::new(ServiceConfigInner {
84 token,
85 backlog,
86 sockets: Vec::new(),
87 apply: Some(OnWorkerStartWrapper::create(|_| {
88 not_configured();
89 Ready::Ok::<_, &str>(())
90 })),
91 })))
92 }
93
94 pub fn bind<U, N: AsRef<str>>(&self, name: N, addr: U) -> io::Result<&Self>
96 where
97 U: net::ToSocketAddrs,
98 {
99 let mut inner = self.0.borrow_mut();
100
101 let sockets = bind_addr(addr, inner.backlog)?;
102 let socket = Socket {
103 name: name.as_ref().to_string(),
104 sockets: sockets
105 .into_iter()
106 .map(|lst| (inner.token.next(), Listener::from_tcp(lst), ""))
107 .collect(),
108 };
109 inner.sockets.push(socket);
110
111 Ok(self)
112 }
113
114 pub fn listen<N: AsRef<str>>(&self, name: N, lst: net::TcpListener) -> &Self {
116 let mut inner = self.0.borrow_mut();
117 let socket = Socket {
118 name: name.as_ref().to_string(),
119 sockets: vec![(inner.token.next(), Listener::from_tcp(lst), "")],
120 };
121 inner.sockets.push(socket);
122
123 self
124 }
125
126 pub fn set_tag<N: AsRef<str>>(&self, name: N, tag: &'static str) -> &Self {
128 let mut inner = self.0.borrow_mut();
129 for sock in &mut inner.sockets {
130 if sock.name == name.as_ref() {
131 for item in &mut sock.sockets {
132 item.2 = tag;
133 }
134 }
135 }
136 self
137 }
138
139 pub fn on_worker_start<F, R, E>(&self, f: F) -> &Self
144 where
145 F: Fn(ServiceRuntime) -> R + Send + Clone + 'static,
146 R: Future<Output = Result<(), E>> + 'static,
147 E: fmt::Display + 'static,
148 {
149 self.0.borrow_mut().apply = Some(OnWorkerStartWrapper::create(f));
150 self
151 }
152
153 pub(super) fn into_factory(
154 self,
155 ) -> (Token, Vec<(Token, String, Listener)>, FactoryServiceType) {
156 let mut inner = self.0.borrow_mut();
157
158 let mut sockets = Vec::new();
159 let mut names = HashMap::default();
160 for (idx, s) in mem::take(&mut inner.sockets).into_iter().enumerate() {
161 names.insert(
162 s.name.clone(),
163 Entry {
164 idx,
165 pool: PoolId::DEFAULT,
166 tokens: s
167 .sockets
168 .iter()
169 .map(|(token, _, tag)| (*token, *tag))
170 .collect(),
171 },
172 );
173
174 sockets.extend(
175 s.sockets
176 .into_iter()
177 .map(|(token, lst, _)| (token, s.name.clone(), lst)),
178 );
179 }
180
181 (
182 inner.token,
183 sockets,
184 Box::new(ConfiguredService {
185 rt: inner.apply.take().unwrap(),
186 names,
187 }),
188 )
189 }
190}
191
192struct ConfiguredService {
193 rt: Box<dyn OnWorkerStart>,
194 names: HashMap<String, Entry>,
195}
196
197impl FactoryService for ConfiguredService {
198 fn clone_factory(&self) -> FactoryServiceType {
199 Box::new(Self {
200 rt: self.rt.clone(),
201 names: self.names.clone(),
202 })
203 }
204
205 fn create(&self) -> BoxFuture<'static, Result<Vec<NetService>, ()>> {
206 let rt = ServiceRuntime::new(self.names.clone());
208 let cfg_fut = self.rt.run(ServiceRuntime(rt.0.clone()));
209
210 Box::pin(async move {
212 cfg_fut.await?;
213 rt.validate();
214
215 let names = mem::take(&mut rt.0.borrow_mut().names);
216 let mut services = mem::take(&mut rt.0.borrow_mut().services);
217
218 let mut res = Vec::new();
219 while let Some(svc) = services.pop() {
220 if let Some(svc) = svc {
221 for entry in names.values() {
222 if entry.idx == services.len() {
223 res.push(NetService {
224 pool: entry.pool,
225 tokens: entry.tokens.clone(),
226 factory: svc,
227 });
228 break;
229 }
230 }
231 }
232 }
233 Ok(res)
234 })
235 }
236}
237
238fn not_configured() {
239 log::error!("Service is not configured");
240}
241
242pub struct ServiceRuntime(Rc<RefCell<ServiceRuntimeInner>>);
243
244#[derive(Debug, Clone)]
245struct Entry {
246 idx: usize,
247 pool: PoolId,
248 tokens: Vec<(Token, &'static str)>,
249}
250
251struct ServiceRuntimeInner {
252 names: HashMap<String, Entry>,
253 services: Vec<Option<BoxServerService>>,
254}
255
256impl fmt::Debug for ServiceRuntime {
257 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
258 let inner = self.0.borrow();
259 f.debug_struct("ServiceRuntimer")
260 .field("names", &inner.names)
261 .field("services", &inner.services)
262 .finish()
263 }
264}
265
266impl ServiceRuntime {
267 fn new(names: HashMap<String, Entry>) -> Self {
268 ServiceRuntime(Rc::new(RefCell::new(ServiceRuntimeInner {
269 services: (0..names.len()).map(|_| None).collect(),
270 names,
271 })))
272 }
273
274 fn validate(&self) {
275 let inner = self.0.as_ref().borrow();
276 for (name, item) in &inner.names {
277 if inner.services[item.idx].is_none() {
278 log::error!("Service {:?} is not configured", name);
279 }
280 }
281 }
282
283 pub fn service<T, F>(&self, name: &str, service: F)
288 where
289 F: IntoServiceFactory<T, Io>,
290 T: ServiceFactory<Io> + 'static,
291 T::Service: 'static,
292 T::InitError: fmt::Debug,
293 {
294 self.service_in(name, PoolId::P0, service)
295 }
296
297 pub fn service_in<T, F>(&self, name: &str, pool: PoolId, service: F)
302 where
303 F: IntoServiceFactory<T, Io>,
304 T: ServiceFactory<Io> + 'static,
305 T::Service: 'static,
306 T::InitError: fmt::Debug,
307 {
308 let mut inner = self.0.borrow_mut();
309 if let Some(entry) = inner.names.get_mut(name) {
310 let idx = entry.idx;
311 entry.pool = pool;
312 inner.services[idx] = Some(factory::create_boxed_factory(
313 name.to_string(),
314 service.into_factory(),
315 ));
316 } else {
317 panic!("Unknown service: {:?}", name);
318 }
319 }
320}
321
322trait OnWorkerStart: Send {
323 fn clone(&self) -> Box<dyn OnWorkerStart>;
324
325 fn run(&self, rt: ServiceRuntime) -> BoxFuture<'static, Result<(), ()>>;
326}
327
328struct OnWorkerStartWrapper<F, R, E> {
329 pub(super) f: F,
330 pub(super) _t: marker::PhantomData<(R, E)>,
331}
332
333impl<F, R, E> OnWorkerStartWrapper<F, R, E>
334where
335 F: Fn(ServiceRuntime) -> R + Send + Clone + 'static,
336 R: Future<Output = Result<(), E>> + 'static,
337 E: fmt::Display + 'static,
338{
339 pub(super) fn create(f: F) -> Box<dyn OnWorkerStart + Send> {
340 Box::new(Self {
341 f,
342 _t: marker::PhantomData,
343 })
344 }
345}
346
347unsafe impl<F, R, E> Send for OnWorkerStartWrapper<F, R, E> where F: Send {}
350
351impl<F, R, E> OnWorkerStart for OnWorkerStartWrapper<F, R, E>
352where
353 F: Fn(ServiceRuntime) -> R + Send + Clone + 'static,
354 R: Future<Output = Result<(), E>> + 'static,
355 E: fmt::Display + 'static,
356{
357 fn clone(&self) -> Box<dyn OnWorkerStart> {
358 Box::new(Self {
359 f: self.f.clone(),
360 _t: marker::PhantomData,
361 })
362 }
363
364 fn run(&self, rt: ServiceRuntime) -> BoxFuture<'static, Result<(), ()>> {
365 let f = self.f.clone();
366 Box::pin(async move {
367 (f)(rt).await.map_err(|e| {
368 log::error!("On worker start callback failed: {}", e);
369 })
370 })
371 }
372}