ntex_server/net/
service.rs

1use std::{fmt, sync::Arc, task::Context};
2
3use ntex_bytes::{Pool, PoolRef};
4use ntex_net::Io;
5use ntex_service::{boxed, Service, ServiceCtx, ServiceFactory};
6use ntex_util::{future::join_all, services::Counter, HashMap};
7
8use crate::ServerConfiguration;
9
10use super::accept::{AcceptNotify, AcceptorCommand};
11use super::factory::{FactoryServiceType, NetService, OnAccept, OnWorkerStart};
12use super::{socket::Connection, Token, MAX_CONNS_COUNTER};
13
14pub(super) type BoxService = boxed::BoxService<Io, (), ()>;
15
16/// Net streaming server
17pub struct StreamServer {
18    notify: AcceptNotify,
19    services: Vec<FactoryServiceType>,
20    on_worker_start: Vec<Box<dyn OnWorkerStart + Send>>,
21    on_accept: Option<Box<dyn OnAccept + Send>>,
22}
23
24impl StreamServer {
25    pub(crate) fn new(
26        notify: AcceptNotify,
27        services: Vec<FactoryServiceType>,
28        on_worker_start: Vec<Box<dyn OnWorkerStart + Send>>,
29        on_accept: Option<Box<dyn OnAccept + Send>>,
30    ) -> Self {
31        Self {
32            notify,
33            services,
34            on_accept,
35            on_worker_start,
36        }
37    }
38}
39
40impl fmt::Debug for StreamServer {
41    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
42        f.debug_struct("StreamServer")
43            .field("services", &self.services.len())
44            .finish()
45    }
46}
47
48/// Worker service factory.
49impl ServerConfiguration for StreamServer {
50    type Item = Connection;
51    type Factory = StreamService;
52
53    /// Create service factory for handling `WorkerMessage<T>` messages.
54    async fn create(&self) -> Result<Self::Factory, ()> {
55        // on worker start callbacks
56        for cb in &self.on_worker_start {
57            cb.run().await?;
58        }
59
60        // construct services
61        let mut services = Vec::new();
62        for svc in &self.services {
63            services.extend(svc.create().await?);
64        }
65
66        Ok(StreamService {
67            services,
68            on_accept: self.on_accept.as_ref().map(|f| f.clone_fn()),
69        })
70    }
71
72    /// Server is paused
73    fn paused(&self) {
74        self.notify.send(AcceptorCommand::Pause);
75    }
76
77    /// Server is resumed
78    fn resumed(&self) {
79        self.notify.send(AcceptorCommand::Resume);
80    }
81
82    /// Server is stopped
83    fn terminate(&self) {
84        self.notify.send(AcceptorCommand::Terminate);
85    }
86
87    /// Server is stopped
88    async fn stop(&self) {
89        let (tx, rx) = oneshot::channel();
90        self.notify.send(AcceptorCommand::Stop(tx));
91        let _ = rx.await;
92    }
93}
94
95impl Clone for StreamServer {
96    fn clone(&self) -> Self {
97        Self {
98            notify: self.notify.clone(),
99            services: self.services.iter().map(|s| s.clone_factory()).collect(),
100            on_accept: self.on_accept.as_ref().map(|f| f.clone_fn()),
101            on_worker_start: self.on_worker_start.iter().map(|f| f.clone_fn()).collect(),
102        }
103    }
104}
105
106pub struct StreamService {
107    services: Vec<NetService>,
108    on_accept: Option<Box<dyn OnAccept + Send>>,
109}
110
111impl fmt::Debug for StreamService {
112    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
113        f.debug_struct("StreamService")
114            .field("services", &self.services)
115            .finish()
116    }
117}
118
119impl ServiceFactory<Connection> for StreamService {
120    type Response = ();
121    type Error = ();
122    type Service = StreamServiceImpl;
123    type InitError = ();
124
125    async fn create(&self, _: ()) -> Result<Self::Service, Self::InitError> {
126        let mut tokens = HashMap::default();
127        let mut services = Vec::new();
128
129        for info in &self.services {
130            match info.factory.create(()).await {
131                Ok(svc) => {
132                    log::trace!("Constructed server service for {:?}", info.tokens);
133                    services.push(svc);
134                    let idx = services.len() - 1;
135                    for (token, tag) in &info.tokens {
136                        tokens.insert(
137                            *token,
138                            (
139                                idx,
140                                *tag,
141                                info.name.clone(),
142                                info.pool.pool(),
143                                info.pool.pool_ref(),
144                            ),
145                        );
146                    }
147                }
148                Err(_) => {
149                    log::error!("Cannot construct service: {:?}", info.tokens);
150                    return Err(());
151                }
152            }
153        }
154
155        Ok(StreamServiceImpl {
156            tokens,
157            services,
158            conns: MAX_CONNS_COUNTER.with(|conns| conns.clone()),
159            on_accept: self.on_accept.as_ref().map(|f| f.clone_fn()),
160        })
161    }
162}
163
164pub struct StreamServiceImpl {
165    tokens: HashMap<Token, (usize, &'static str, Arc<str>, Pool, PoolRef)>,
166    services: Vec<BoxService>,
167    conns: Counter,
168    on_accept: Option<Box<dyn OnAccept + Send>>,
169}
170
171impl fmt::Debug for StreamServiceImpl {
172    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
173        f.debug_struct("StreamServiceImpl")
174            .field("tokens", &self.tokens)
175            .field("services", &self.services)
176            .field("conns", &self.conns)
177            .finish()
178    }
179}
180
181impl Service<Connection> for StreamServiceImpl {
182    type Response = ();
183    type Error = ();
184
185    async fn ready(&self, ctx: ServiceCtx<'_, Self>) -> Result<(), Self::Error> {
186        if !self.conns.is_available() {
187            self.conns.available().await;
188        }
189        for (idx, svc) in self.services.iter().enumerate() {
190            if ctx.ready(svc).await.is_err() {
191                for (idx_, tag, _, _, _) in self.tokens.values() {
192                    if idx == *idx_ {
193                        log::error!("{}: Service readiness has failed", tag);
194                        break;
195                    }
196                }
197                return Err(());
198            }
199        }
200
201        Ok(())
202    }
203
204    #[inline]
205    fn poll(&self, cx: &mut Context<'_>) -> Result<(), Self::Error> {
206        for svc in &self.services {
207            svc.poll(cx)?;
208        }
209        Ok(())
210    }
211
212    async fn shutdown(&self) {
213        let _ = join_all(self.services.iter().map(|svc| svc.shutdown())).await;
214        log::info!(
215            "Worker service shutdown, {} connections",
216            super::num_connections()
217        );
218    }
219
220    async fn call(&self, con: Connection, ctx: ServiceCtx<'_, Self>) -> Result<(), ()> {
221        if let Some((idx, tag, name, _, pool)) = self.tokens.get(&con.token) {
222            let mut io = con.io;
223            if let Some(ref f) = self.on_accept {
224                match f.run(name.clone(), io).await {
225                    Ok(st) => io = st,
226                    Err(_) => return Err(()),
227                }
228            }
229
230            let stream: Io<_> = io.try_into().map_err(|e| {
231                log::error!("Cannot convert to an async io stream: {}", e);
232            })?;
233
234            stream.set_tag(tag);
235            stream.set_memory_pool(*pool);
236            let guard = self.conns.get();
237            let _ = ctx.call(&self.services[*idx], stream).await;
238            drop(guard);
239            Ok(())
240        } else {
241            log::error!("Cannot get handler service for connection: {:?}", con);
242            Err(())
243        }
244    }
245}