1use std::{fmt, sync::Arc, task::Context};
2
3use ntex_io::Io;
4use ntex_service::{Service, ServiceCtx, ServiceFactory, boxed, cfg::SharedCfg};
5use ntex_util::{HashMap, future::join_all, services::Counter};
6
7use crate::ServerConfiguration;
8
9use super::accept::{AcceptNotify, AcceptorCommand};
10use super::factory::{FactoryServiceType, NetService, OnAccept, OnWorkerStart};
11use super::{MAX_CONNS_COUNTER, Token, socket::Connection};
12
13pub(super) type BoxService = boxed::BoxService<Io, (), ()>;
14
15pub struct StreamServer {
17 notify: AcceptNotify,
18 services: Vec<FactoryServiceType>,
19 on_worker_start: Vec<Box<dyn OnWorkerStart + Send>>,
20 on_accept: Option<Box<dyn OnAccept + Send>>,
21}
22
23impl StreamServer {
24 pub(crate) fn new(
25 notify: AcceptNotify,
26 services: Vec<FactoryServiceType>,
27 on_worker_start: Vec<Box<dyn OnWorkerStart + Send>>,
28 on_accept: Option<Box<dyn OnAccept + Send>>,
29 ) -> Self {
30 Self {
31 notify,
32 services,
33 on_accept,
34 on_worker_start,
35 }
36 }
37}
38
39impl fmt::Debug for StreamServer {
40 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
41 f.debug_struct("StreamServer")
42 .field("services", &self.services.len())
43 .finish()
44 }
45}
46
47impl ServerConfiguration for StreamServer {
49 type Item = Connection;
50 type Factory = StreamService;
51
52 async fn create(&self) -> Result<Self::Factory, ()> {
54 for cb in &self.on_worker_start {
56 cb.run().await?;
57 }
58
59 let mut services = Vec::new();
61 for svc in &self.services {
62 services.extend(svc.create().await?);
63 }
64
65 Ok(StreamService {
66 services,
67 on_accept: self.on_accept.as_ref().map(|f| f.clone_fn()),
68 })
69 }
70
71 fn paused(&self) {
73 self.notify.send(AcceptorCommand::Pause);
74 }
75
76 fn resumed(&self) {
78 self.notify.send(AcceptorCommand::Resume);
79 }
80
81 fn terminate(&self) {
83 self.notify.send(AcceptorCommand::Terminate);
84 }
85
86 async fn stop(&self) {
88 let (tx, rx) = oneshot::channel();
89 self.notify.send(AcceptorCommand::Stop(tx));
90 let _ = rx.await;
91 }
92}
93
94impl Clone for StreamServer {
95 fn clone(&self) -> Self {
96 Self {
97 notify: self.notify.clone(),
98 services: self.services.iter().map(|s| s.clone_factory()).collect(),
99 on_accept: self.on_accept.as_ref().map(|f| f.clone_fn()),
100 on_worker_start: self.on_worker_start.iter().map(|f| f.clone_fn()).collect(),
101 }
102 }
103}
104
105pub struct StreamService {
106 services: Vec<NetService>,
107 on_accept: Option<Box<dyn OnAccept + Send>>,
108}
109
110impl fmt::Debug for StreamService {
111 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
112 f.debug_struct("StreamService")
113 .field("services", &self.services)
114 .finish()
115 }
116}
117
118impl ServiceFactory<Connection> for StreamService {
119 type Response = ();
120 type Error = ();
121 type Service = StreamServiceImpl;
122 type InitError = ();
123
124 async fn create(&self, _: ()) -> Result<Self::Service, Self::InitError> {
125 let mut tokens = HashMap::default();
126 let mut services = Vec::new();
127
128 for info in &self.services {
129 match info.factory.create(info.config).await {
130 Ok(svc) => {
131 log::trace!("Constructed server service for {:?}", info.tokens);
132 services.push(svc);
133 let idx = services.len() - 1;
134 for (token, cfg) in &info.tokens {
135 tokens.insert(*token, (idx, info.name.clone(), *cfg));
136 }
137 }
138 Err(_) => {
139 log::error!("Cannot construct service: {:?}", info.tokens);
140 return Err(());
141 }
142 }
143 }
144
145 Ok(StreamServiceImpl {
146 tokens,
147 services,
148 conns: MAX_CONNS_COUNTER.with(|conns| conns.clone()),
149 on_accept: self.on_accept.as_ref().map(|f| f.clone_fn()),
150 })
151 }
152}
153
154pub struct StreamServiceImpl {
155 tokens: HashMap<Token, (usize, Arc<str>, SharedCfg)>,
156 services: Vec<BoxService>,
157 conns: Counter,
158 on_accept: Option<Box<dyn OnAccept + Send>>,
159}
160
161impl fmt::Debug for StreamServiceImpl {
162 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
163 f.debug_struct("StreamServiceImpl")
164 .field("tokens", &self.tokens)
165 .field("services", &self.services)
166 .field("conns", &self.conns)
167 .finish()
168 }
169}
170
171impl Service<Connection> for StreamServiceImpl {
172 type Response = ();
173 type Error = ();
174
175 async fn ready(&self, ctx: ServiceCtx<'_, Self>) -> Result<(), Self::Error> {
176 if !self.conns.is_available() {
177 self.conns.available().await;
178 }
179 for (idx, svc) in self.services.iter().enumerate() {
180 if ctx.ready(svc).await.is_err() {
181 for (idx_, _, cfg) in self.tokens.values() {
182 if idx == *idx_ {
183 log::error!("{}: Service readiness has failed", cfg.tag());
184 break;
185 }
186 }
187 return Err(());
188 }
189 }
190
191 Ok(())
192 }
193
194 #[inline]
195 fn poll(&self, cx: &mut Context<'_>) -> Result<(), Self::Error> {
196 for svc in &self.services {
197 svc.poll(cx)?;
198 }
199 Ok(())
200 }
201
202 async fn shutdown(&self) {
203 let _ = join_all(self.services.iter().map(|svc| svc.shutdown())).await;
204 log::info!(
205 "Worker service shutdown, {} connections",
206 super::num_connections()
207 );
208 }
209
210 async fn call(&self, con: Connection, ctx: ServiceCtx<'_, Self>) -> Result<(), ()> {
211 if let Some((idx, name, cfg)) = self.tokens.get(&con.token) {
212 let mut io = con.io;
213 if let Some(ref f) = self.on_accept {
214 match f.run(name.clone(), io).await {
215 Ok(st) => io = st,
216 Err(_) => return Err(()),
217 }
218 }
219
220 let stream = io.convert(*cfg).map_err(|e| {
221 log::error!("Cannot convert to an async io stream: {e}");
222 })?;
223
224 let guard = self.conns.get();
225 let _ = ctx.call(&self.services[*idx], stream).await;
226 drop(guard);
227 Ok(())
228 } else {
229 log::error!("Cannot get handler service for connection: {con:?}");
230 Err(())
231 }
232 }
233}