1use std::{fmt, sync::Arc, task::Context};
2
3use ntex_bytes::{Pool, PoolRef};
4use ntex_net::Io;
5use ntex_service::{boxed, Service, ServiceCtx, ServiceFactory};
6use ntex_util::{future::join_all, services::Counter, HashMap};
7
8use crate::ServerConfiguration;
9
10use super::accept::{AcceptNotify, AcceptorCommand};
11use super::factory::{FactoryServiceType, NetService, OnAccept, OnWorkerStart};
12use super::{socket::Connection, Token, MAX_CONNS_COUNTER};
13
14pub(super) type BoxService = boxed::BoxService<Io, (), ()>;
15
16pub struct StreamServer {
18 notify: AcceptNotify,
19 services: Vec<FactoryServiceType>,
20 on_worker_start: Vec<Box<dyn OnWorkerStart + Send>>,
21 on_accept: Option<Box<dyn OnAccept + Send>>,
22}
23
24impl StreamServer {
25 pub(crate) fn new(
26 notify: AcceptNotify,
27 services: Vec<FactoryServiceType>,
28 on_worker_start: Vec<Box<dyn OnWorkerStart + Send>>,
29 on_accept: Option<Box<dyn OnAccept + Send>>,
30 ) -> Self {
31 Self {
32 notify,
33 services,
34 on_accept,
35 on_worker_start,
36 }
37 }
38}
39
40impl fmt::Debug for StreamServer {
41 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
42 f.debug_struct("StreamServer")
43 .field("services", &self.services.len())
44 .finish()
45 }
46}
47
48impl ServerConfiguration for StreamServer {
50 type Item = Connection;
51 type Factory = StreamService;
52
53 async fn create(&self) -> Result<Self::Factory, ()> {
55 for cb in &self.on_worker_start {
57 cb.run().await?;
58 }
59
60 let mut services = Vec::new();
62 for svc in &self.services {
63 services.extend(svc.create().await?);
64 }
65
66 Ok(StreamService {
67 services,
68 on_accept: self.on_accept.as_ref().map(|f| f.clone_fn()),
69 })
70 }
71
72 fn paused(&self) {
74 self.notify.send(AcceptorCommand::Pause);
75 }
76
77 fn resumed(&self) {
79 self.notify.send(AcceptorCommand::Resume);
80 }
81
82 fn terminate(&self) {
84 self.notify.send(AcceptorCommand::Terminate);
85 }
86
87 async fn stop(&self) {
89 let (tx, rx) = oneshot::channel();
90 self.notify.send(AcceptorCommand::Stop(tx));
91 let _ = rx.await;
92 }
93}
94
95impl Clone for StreamServer {
96 fn clone(&self) -> Self {
97 Self {
98 notify: self.notify.clone(),
99 services: self.services.iter().map(|s| s.clone_factory()).collect(),
100 on_accept: self.on_accept.as_ref().map(|f| f.clone_fn()),
101 on_worker_start: self.on_worker_start.iter().map(|f| f.clone_fn()).collect(),
102 }
103 }
104}
105
106pub struct StreamService {
107 services: Vec<NetService>,
108 on_accept: Option<Box<dyn OnAccept + Send>>,
109}
110
111impl fmt::Debug for StreamService {
112 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
113 f.debug_struct("StreamService")
114 .field("services", &self.services)
115 .finish()
116 }
117}
118
119impl ServiceFactory<Connection> for StreamService {
120 type Response = ();
121 type Error = ();
122 type Service = StreamServiceImpl;
123 type InitError = ();
124
125 async fn create(&self, _: ()) -> Result<Self::Service, Self::InitError> {
126 let mut tokens = HashMap::default();
127 let mut services = Vec::new();
128
129 for info in &self.services {
130 match info.factory.create(()).await {
131 Ok(svc) => {
132 log::trace!("Constructed server service for {:?}", info.tokens);
133 services.push(svc);
134 let idx = services.len() - 1;
135 for (token, tag) in &info.tokens {
136 tokens.insert(
137 *token,
138 (
139 idx,
140 *tag,
141 info.name.clone(),
142 info.pool.pool(),
143 info.pool.pool_ref(),
144 ),
145 );
146 }
147 }
148 Err(_) => {
149 log::error!("Cannot construct service: {:?}", info.tokens);
150 return Err(());
151 }
152 }
153 }
154
155 Ok(StreamServiceImpl {
156 tokens,
157 services,
158 conns: MAX_CONNS_COUNTER.with(|conns| conns.clone()),
159 on_accept: self.on_accept.as_ref().map(|f| f.clone_fn()),
160 })
161 }
162}
163
164pub struct StreamServiceImpl {
165 tokens: HashMap<Token, (usize, &'static str, Arc<str>, Pool, PoolRef)>,
166 services: Vec<BoxService>,
167 conns: Counter,
168 on_accept: Option<Box<dyn OnAccept + Send>>,
169}
170
171impl fmt::Debug for StreamServiceImpl {
172 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
173 f.debug_struct("StreamServiceImpl")
174 .field("tokens", &self.tokens)
175 .field("services", &self.services)
176 .field("conns", &self.conns)
177 .finish()
178 }
179}
180
181impl Service<Connection> for StreamServiceImpl {
182 type Response = ();
183 type Error = ();
184
185 async fn ready(&self, ctx: ServiceCtx<'_, Self>) -> Result<(), Self::Error> {
186 if !self.conns.is_available() {
187 self.conns.available().await;
188 }
189 for (idx, svc) in self.services.iter().enumerate() {
190 if ctx.ready(svc).await.is_err() {
191 for (idx_, tag, _, _, _) in self.tokens.values() {
192 if idx == *idx_ {
193 log::error!("{}: Service readiness has failed", tag);
194 break;
195 }
196 }
197 return Err(());
198 }
199 }
200
201 Ok(())
202 }
203
204 #[inline]
205 fn poll(&self, cx: &mut Context<'_>) -> Result<(), Self::Error> {
206 for svc in &self.services {
207 svc.poll(cx)?;
208 }
209 Ok(())
210 }
211
212 async fn shutdown(&self) {
213 let _ = join_all(self.services.iter().map(|svc| svc.shutdown())).await;
214 log::info!(
215 "Worker service shutdown, {} connections",
216 super::num_connections()
217 );
218 }
219
220 async fn call(&self, con: Connection, ctx: ServiceCtx<'_, Self>) -> Result<(), ()> {
221 if let Some((idx, tag, name, _, pool)) = self.tokens.get(&con.token) {
222 let mut io = con.io;
223 if let Some(ref f) = self.on_accept {
224 match f.run(name.clone(), io).await {
225 Ok(st) => io = st,
226 Err(_) => return Err(()),
227 }
228 }
229
230 let stream: Io<_> = io.try_into().map_err(|e| {
231 log::error!("Cannot convert to an async io stream: {}", e);
232 })?;
233
234 stream.set_tag(tag);
235 stream.set_memory_pool(*pool);
236 let guard = self.conns.get();
237 let _ = ctx.call(&self.services[*idx], stream).await;
238 drop(guard);
239 Ok(())
240 } else {
241 log::error!("Cannot get handler service for connection: {:?}", con);
242 Err(())
243 }
244 }
245}