ntex_server/net/
service.rs

1use std::{fmt, sync::Arc, task::Context};
2
3use ntex_io::Io;
4use ntex_service::{Service, ServiceCtx, ServiceFactory, boxed, cfg::SharedCfg};
5use ntex_util::{HashMap, future::join_all, services::Counter};
6
7use crate::ServerConfiguration;
8
9use super::accept::{AcceptNotify, AcceptorCommand};
10use super::factory::{FactoryServiceType, NetService, OnAccept, OnWorkerStart};
11use super::{MAX_CONNS_COUNTER, Token, socket::Connection};
12
13pub(super) type BoxService = boxed::BoxService<Io, (), ()>;
14
15/// Net streaming server
16pub struct StreamServer {
17    notify: AcceptNotify,
18    services: Vec<FactoryServiceType>,
19    on_worker_start: Vec<Box<dyn OnWorkerStart + Send>>,
20    on_accept: Option<Box<dyn OnAccept + Send>>,
21}
22
23impl StreamServer {
24    pub(crate) fn new(
25        notify: AcceptNotify,
26        services: Vec<FactoryServiceType>,
27        on_worker_start: Vec<Box<dyn OnWorkerStart + Send>>,
28        on_accept: Option<Box<dyn OnAccept + Send>>,
29    ) -> Self {
30        Self {
31            notify,
32            services,
33            on_accept,
34            on_worker_start,
35        }
36    }
37}
38
39impl fmt::Debug for StreamServer {
40    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
41        f.debug_struct("StreamServer")
42            .field("services", &self.services.len())
43            .finish()
44    }
45}
46
47/// Worker service factory.
48impl ServerConfiguration for StreamServer {
49    type Item = Connection;
50    type Factory = StreamService;
51
52    /// Create service factory for handling `WorkerMessage<T>` messages.
53    async fn create(&self) -> Result<Self::Factory, ()> {
54        // on worker start callbacks
55        for cb in &self.on_worker_start {
56            cb.run().await?;
57        }
58
59        // construct services
60        let mut services = Vec::new();
61        for svc in &self.services {
62            services.extend(svc.create().await?);
63        }
64
65        Ok(StreamService {
66            services,
67            on_accept: self.on_accept.as_ref().map(|f| f.clone_fn()),
68        })
69    }
70
71    /// Server is paused
72    fn paused(&self) {
73        self.notify.send(AcceptorCommand::Pause);
74    }
75
76    /// Server is resumed
77    fn resumed(&self) {
78        self.notify.send(AcceptorCommand::Resume);
79    }
80
81    /// Server is stopped
82    fn terminate(&self) {
83        self.notify.send(AcceptorCommand::Terminate);
84    }
85
86    /// Server is stopped
87    async fn stop(&self) {
88        let (tx, rx) = oneshot::channel();
89        self.notify.send(AcceptorCommand::Stop(tx));
90        let _ = rx.await;
91    }
92}
93
94impl Clone for StreamServer {
95    fn clone(&self) -> Self {
96        Self {
97            notify: self.notify.clone(),
98            services: self.services.iter().map(|s| s.clone_factory()).collect(),
99            on_accept: self.on_accept.as_ref().map(|f| f.clone_fn()),
100            on_worker_start: self.on_worker_start.iter().map(|f| f.clone_fn()).collect(),
101        }
102    }
103}
104
105pub struct StreamService {
106    services: Vec<NetService>,
107    on_accept: Option<Box<dyn OnAccept + Send>>,
108}
109
110impl fmt::Debug for StreamService {
111    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
112        f.debug_struct("StreamService")
113            .field("services", &self.services)
114            .finish()
115    }
116}
117
118impl ServiceFactory<Connection> for StreamService {
119    type Response = ();
120    type Error = ();
121    type Service = StreamServiceImpl;
122    type InitError = ();
123
124    async fn create(&self, _: ()) -> Result<Self::Service, Self::InitError> {
125        let mut tokens = HashMap::default();
126        let mut services = Vec::new();
127
128        for info in &self.services {
129            match info.factory.create(info.config).await {
130                Ok(svc) => {
131                    log::trace!("Constructed server service for {:?}", info.tokens);
132                    services.push(svc);
133                    let idx = services.len() - 1;
134                    for (token, cfg) in &info.tokens {
135                        tokens.insert(*token, (idx, info.name.clone(), *cfg));
136                    }
137                }
138                Err(_) => {
139                    log::error!("Cannot construct service: {:?}", info.tokens);
140                    return Err(());
141                }
142            }
143        }
144
145        Ok(StreamServiceImpl {
146            tokens,
147            services,
148            conns: MAX_CONNS_COUNTER.with(|conns| conns.clone()),
149            on_accept: self.on_accept.as_ref().map(|f| f.clone_fn()),
150        })
151    }
152}
153
154pub struct StreamServiceImpl {
155    tokens: HashMap<Token, (usize, Arc<str>, SharedCfg)>,
156    services: Vec<BoxService>,
157    conns: Counter,
158    on_accept: Option<Box<dyn OnAccept + Send>>,
159}
160
161impl fmt::Debug for StreamServiceImpl {
162    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
163        f.debug_struct("StreamServiceImpl")
164            .field("tokens", &self.tokens)
165            .field("services", &self.services)
166            .field("conns", &self.conns)
167            .finish()
168    }
169}
170
171impl Service<Connection> for StreamServiceImpl {
172    type Response = ();
173    type Error = ();
174
175    async fn ready(&self, ctx: ServiceCtx<'_, Self>) -> Result<(), Self::Error> {
176        if !self.conns.is_available() {
177            self.conns.available().await;
178        }
179        for (idx, svc) in self.services.iter().enumerate() {
180            if ctx.ready(svc).await.is_err() {
181                for (idx_, _, cfg) in self.tokens.values() {
182                    if idx == *idx_ {
183                        log::error!("{}: Service readiness has failed", cfg.tag());
184                        break;
185                    }
186                }
187                return Err(());
188            }
189        }
190
191        Ok(())
192    }
193
194    #[inline]
195    fn poll(&self, cx: &mut Context<'_>) -> Result<(), Self::Error> {
196        for svc in &self.services {
197            svc.poll(cx)?;
198        }
199        Ok(())
200    }
201
202    async fn shutdown(&self) {
203        let _ = join_all(self.services.iter().map(|svc| svc.shutdown())).await;
204        log::info!(
205            "Worker service shutdown, {} connections",
206            super::num_connections()
207        );
208    }
209
210    async fn call(&self, con: Connection, ctx: ServiceCtx<'_, Self>) -> Result<(), ()> {
211        if let Some((idx, name, cfg)) = self.tokens.get(&con.token) {
212            let mut io = con.io;
213            if let Some(ref f) = self.on_accept {
214                match f.run(name.clone(), io).await {
215                    Ok(st) => io = st,
216                    Err(_) => return Err(()),
217                }
218            }
219
220            let stream = io.convert(*cfg).map_err(|e| {
221                log::error!("Cannot convert to an async io stream: {e}");
222            })?;
223
224            let guard = self.conns.get();
225            let _ = ctx.call(&self.services[*idx], stream).await;
226            drop(guard);
227            Ok(())
228        } else {
229            log::error!("Cannot get handler service for connection: {con:?}");
230            Err(())
231        }
232    }
233}