ntex_server/net/
service.rs1use std::{fmt, task::Context};
2
3use ntex_bytes::{Pool, PoolRef};
4use ntex_net::Io;
5use ntex_service::{boxed, Service, ServiceCtx, ServiceFactory};
6use ntex_util::{future::join_all, services::Counter, HashMap};
7
8use crate::ServerConfiguration;
9
10use super::accept::{AcceptNotify, AcceptorCommand};
11use super::factory::{FactoryServiceType, NetService, OnWorkerStart};
12use super::{socket::Connection, Token, MAX_CONNS_COUNTER};
13
14pub(super) type BoxService = boxed::BoxService<Io, (), ()>;
15
16pub struct StreamServer {
18 notify: AcceptNotify,
19 services: Vec<FactoryServiceType>,
20 on_worker_start: Vec<Box<dyn OnWorkerStart + Send>>,
21}
22
23impl StreamServer {
24 pub(crate) fn new(
25 notify: AcceptNotify,
26 services: Vec<FactoryServiceType>,
27 on_worker_start: Vec<Box<dyn OnWorkerStart + Send>>,
28 ) -> Self {
29 Self {
30 notify,
31 services,
32 on_worker_start,
33 }
34 }
35}
36
37impl fmt::Debug for StreamServer {
38 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
39 f.debug_struct("StreamServer")
40 .field("services", &self.services.len())
41 .finish()
42 }
43}
44
45impl ServerConfiguration for StreamServer {
47 type Item = Connection;
48 type Factory = StreamService;
49
50 async fn create(&self) -> Result<Self::Factory, ()> {
52 for cb in &self.on_worker_start {
54 cb.run().await?;
55 }
56
57 let mut services = Vec::new();
59 for svc in &self.services {
60 services.extend(svc.create().await?);
61 }
62
63 Ok(StreamService { services })
64 }
65
66 fn paused(&self) {
68 self.notify.send(AcceptorCommand::Pause);
69 }
70
71 fn resumed(&self) {
73 self.notify.send(AcceptorCommand::Resume);
74 }
75
76 fn terminate(&self) {
78 self.notify.send(AcceptorCommand::Terminate);
79 }
80
81 async fn stop(&self) {
83 let (tx, rx) = oneshot::channel();
84 self.notify.send(AcceptorCommand::Stop(tx));
85 let _ = rx.await;
86 }
87}
88
89impl Clone for StreamServer {
90 fn clone(&self) -> Self {
91 Self {
92 notify: self.notify.clone(),
93 services: self.services.iter().map(|s| s.clone_factory()).collect(),
94 on_worker_start: self.on_worker_start.iter().map(|f| f.clone_fn()).collect(),
95 }
96 }
97}
98
99#[derive(Debug)]
100pub struct StreamService {
101 services: Vec<NetService>,
102}
103
104impl ServiceFactory<Connection> for StreamService {
105 type Response = ();
106 type Error = ();
107 type Service = StreamServiceImpl;
108 type InitError = ();
109
110 async fn create(&self, _: ()) -> Result<Self::Service, Self::InitError> {
111 let mut tokens = HashMap::default();
112 let mut services = Vec::new();
113
114 for info in &self.services {
115 match info.factory.create(()).await {
116 Ok(svc) => {
117 log::debug!("Constructed server service for {:?}", info.tokens);
118 services.push(svc);
119 let idx = services.len() - 1;
120 for (token, tag) in &info.tokens {
121 tokens.insert(
122 *token,
123 (idx, *tag, info.pool.pool(), info.pool.pool_ref()),
124 );
125 }
126 }
127 Err(_) => {
128 log::error!("Cannot construct service: {:?}", info.tokens);
129 return Err(());
130 }
131 }
132 }
133
134 Ok(StreamServiceImpl {
135 tokens,
136 services,
137 conns: MAX_CONNS_COUNTER.with(|conns| conns.clone()),
138 })
139 }
140}
141
142#[derive(Debug)]
143pub struct StreamServiceImpl {
144 tokens: HashMap<Token, (usize, &'static str, Pool, PoolRef)>,
145 services: Vec<BoxService>,
146 conns: Counter,
147}
148
149impl Service<Connection> for StreamServiceImpl {
150 type Response = ();
151 type Error = ();
152
153 async fn ready(&self, ctx: ServiceCtx<'_, Self>) -> Result<(), Self::Error> {
154 if !self.conns.is_available() {
155 self.conns.available().await;
156 }
157 for (idx, svc) in self.services.iter().enumerate() {
158 if ctx.ready(svc).await.is_err() {
159 for (idx_, tag, _, _) in self.tokens.values() {
160 if idx == *idx_ {
161 log::error!("{}: Service readiness has failed", tag);
162 break;
163 }
164 }
165 return Err(());
166 }
167 }
168
169 Ok(())
170 }
171
172 #[inline]
173 fn poll(&self, cx: &mut Context<'_>) -> Result<(), Self::Error> {
174 for svc in &self.services {
175 svc.poll(cx)?;
176 }
177 Ok(())
178 }
179
180 async fn shutdown(&self) {
181 let _ = join_all(self.services.iter().map(|svc| svc.shutdown())).await;
182 log::info!(
183 "Worker service shutdown, {} connections",
184 super::num_connections()
185 );
186 }
187
188 async fn call(&self, con: Connection, ctx: ServiceCtx<'_, Self>) -> Result<(), ()> {
189 if let Some((idx, tag, _, pool)) = self.tokens.get(&con.token) {
190 let stream: Io<_> = con.io.try_into().map_err(|e| {
191 log::error!("Cannot convert to an async io stream: {}", e);
192 })?;
193
194 stream.set_tag(tag);
195 stream.set_memory_pool(*pool);
196 let guard = self.conns.get();
197 let _ = ctx.call(&self.services[*idx], stream).await;
198 drop(guard);
199 Ok(())
200 } else {
201 log::error!("Cannot get handler service for connection: {:?}", con);
202 Err(())
203 }
204 }
205}