1use std::{fmt, sync::Arc, task::Context};
2
3use ntex_io::Io;
4use ntex_service::{Service, ServiceCtx, ServiceFactory, boxed, cfg::SharedCfg};
5use ntex_util::{HashMap, future::join_all, services::Counter};
6
7use crate::ServerConfiguration;
8
9use super::accept::{AcceptNotify, AcceptorCommand};
10use super::factory::{FactoryServiceType, NetService, OnAccept, OnWorkerStart};
11use super::{MAX_CONNS_COUNTER, Token, socket::Connection};
12
13pub(super) type BoxService = boxed::BoxService<Io, (), ()>;
14
15pub struct StreamServer {
17 notify: AcceptNotify,
18 services: Vec<FactoryServiceType>,
19 on_worker_start: Vec<Box<dyn OnWorkerStart + Send>>,
20 on_accept: Option<Box<dyn OnAccept + Send>>,
21}
22
23impl StreamServer {
24 pub(crate) fn new(
25 notify: AcceptNotify,
26 services: Vec<FactoryServiceType>,
27 on_worker_start: Vec<Box<dyn OnWorkerStart + Send>>,
28 on_accept: Option<Box<dyn OnAccept + Send>>,
29 ) -> Self {
30 Self {
31 notify,
32 services,
33 on_worker_start,
34 on_accept,
35 }
36 }
37}
38
39impl fmt::Debug for StreamServer {
40 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
41 f.debug_struct("StreamServer")
42 .field("services", &self.services.len())
43 .finish()
44 }
45}
46
47impl ServerConfiguration for StreamServer {
49 type Item = Connection;
50 type Factory = StreamService;
51
52 async fn create(&self) -> Result<Self::Factory, ()> {
54 for cb in &self.on_worker_start {
56 cb.run().await?;
57 }
58
59 let mut services = Vec::new();
61 for svc in &self.services {
62 services.extend(svc.create().await?);
63 }
64
65 Ok(StreamService {
66 services,
67 on_accept: self.on_accept.as_ref().map(|f| f.clone_fn()),
68 })
69 }
70
71 fn paused(&self) {
73 self.notify.send(AcceptorCommand::Pause);
74 }
75
76 fn resumed(&self) {
78 self.notify.send(AcceptorCommand::Resume);
79 }
80
81 fn terminate(&self) {
83 self.notify.send(AcceptorCommand::Terminate);
84 }
85
86 async fn stop(&self) {
88 let (tx, rx) = oneshot::channel();
89 self.notify.send(AcceptorCommand::Stop(tx));
90 let _ = rx.await;
91 }
92}
93
94impl Clone for StreamServer {
95 fn clone(&self) -> Self {
96 Self {
97 notify: self.notify.clone(),
98 services: self.services.iter().map(|s| s.clone_factory()).collect(),
99 on_accept: self.on_accept.as_ref().map(|f| f.clone_fn()),
100 on_worker_start: self.on_worker_start.iter().map(|f| f.clone_fn()).collect(),
101 }
102 }
103}
104
105pub struct StreamService {
106 services: Vec<NetService>,
107 on_accept: Option<Box<dyn OnAccept + Send>>,
108}
109
110impl fmt::Debug for StreamService {
111 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
112 f.debug_struct("StreamService")
113 .field("services", &self.services)
114 .finish()
115 }
116}
117
118impl ServiceFactory<Connection> for StreamService {
119 type Response = ();
120 type Error = ();
121 type Service = StreamServiceImpl;
122 type InitError = ();
123
124 async fn create(&self, _r: ()) -> Result<Self::Service, Self::InitError> {
125 let mut tokens = HashMap::default();
126 let mut services = Vec::new();
127
128 for info in &self.services {
129 if let Ok(svc) = info.factory.create(info.config).await {
130 log::trace!("Constructed server service for {:?}", info.tokens);
131 services.push(svc);
132 let idx = services.len() - 1;
133 for (token, cfg) in &info.tokens {
134 tokens.insert(*token, (idx, info.name.clone(), *cfg));
135 }
136 } else {
137 log::error!("Cannot construct service: {:?}", info.tokens);
138 return Err(());
139 }
140 }
141
142 Ok(StreamServiceImpl {
143 tokens,
144 services,
145 conns: MAX_CONNS_COUNTER.with(Clone::clone),
146 on_accept: self.on_accept.as_ref().map(|f| f.clone_fn()),
147 })
148 }
149}
150
151pub struct StreamServiceImpl {
152 tokens: HashMap<Token, (usize, Arc<str>, SharedCfg)>,
153 services: Vec<BoxService>,
154 conns: Counter,
155 on_accept: Option<Box<dyn OnAccept + Send>>,
156}
157
158impl fmt::Debug for StreamServiceImpl {
159 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
160 f.debug_struct("StreamServiceImpl")
161 .field("tokens", &self.tokens)
162 .field("services", &self.services)
163 .field("conns", &self.conns)
164 .finish()
165 }
166}
167
168impl Service<Connection> for StreamServiceImpl {
169 type Response = ();
170 type Error = ();
171
172 async fn ready(&self, ctx: ServiceCtx<'_, Self>) -> Result<(), Self::Error> {
173 if !self.conns.is_available() {
174 self.conns.available().await;
175 }
176 for (idx, svc) in self.services.iter().enumerate() {
177 if ctx.ready(svc).await.is_err() {
178 for (idx_, _, cfg) in self.tokens.values() {
179 if idx == *idx_ {
180 log::error!("{}: Service readiness has failed", cfg.tag());
181 break;
182 }
183 }
184 return Err(());
185 }
186 }
187
188 Ok(())
189 }
190
191 #[inline]
192 fn poll(&self, cx: &mut Context<'_>) -> Result<(), Self::Error> {
193 for svc in &self.services {
194 svc.poll(cx)?;
195 }
196 Ok(())
197 }
198
199 async fn shutdown(&self) {
200 let _ = join_all(self.services.iter().map(Service::shutdown)).await;
201 log::info!(
202 "Worker service shutdown, {} connections",
203 super::num_connections()
204 );
205 }
206
207 async fn call(&self, con: Connection, ctx: ServiceCtx<'_, Self>) -> Result<(), ()> {
208 if let Some((idx, name, cfg)) = self.tokens.get(&con.token) {
209 let mut io = con.io;
210 if let Some(ref f) = self.on_accept {
211 match f.run(name.clone(), io).await {
212 Ok(st) => io = st,
213 Err(()) => return Err(()),
214 }
215 }
216
217 let stream = io.convert(*cfg).map_err(|e| {
218 log::error!("Cannot convert to an async io stream: {e}");
219 })?;
220
221 let guard = self.conns.get();
222 let _ = ctx.call(&self.services[*idx], stream).await;
223 drop(guard);
224 Ok(())
225 } else {
226 log::error!("Cannot get handler service for connection: {con:?}");
227 Err(())
228 }
229 }
230}