Skip to main content

ntex_server/net/
service.rs

1use std::{fmt, sync::Arc, task::Context};
2
3use ntex_io::Io;
4use ntex_service::{Service, ServiceCtx, ServiceFactory, boxed, cfg::SharedCfg};
5use ntex_util::{HashMap, future::join_all, services::Counter};
6
7use crate::ServerConfiguration;
8
9use super::accept::{AcceptNotify, AcceptorCommand};
10use super::factory::{FactoryServiceType, NetService, OnAccept, OnWorkerStart};
11use super::{MAX_CONNS_COUNTER, Token, socket::Connection};
12
13pub(super) type BoxService = boxed::BoxService<Io, (), ()>;
14
15/// Net streaming server
16pub struct StreamServer {
17    notify: AcceptNotify,
18    services: Vec<FactoryServiceType>,
19    on_worker_start: Vec<Box<dyn OnWorkerStart + Send>>,
20    on_accept: Option<Box<dyn OnAccept + Send>>,
21}
22
23impl StreamServer {
24    pub(crate) fn new(
25        notify: AcceptNotify,
26        services: Vec<FactoryServiceType>,
27        on_worker_start: Vec<Box<dyn OnWorkerStart + Send>>,
28        on_accept: Option<Box<dyn OnAccept + Send>>,
29    ) -> Self {
30        Self {
31            notify,
32            services,
33            on_worker_start,
34            on_accept,
35        }
36    }
37}
38
39impl fmt::Debug for StreamServer {
40    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
41        f.debug_struct("StreamServer")
42            .field("services", &self.services.len())
43            .finish()
44    }
45}
46
47/// Worker service factory.
48impl ServerConfiguration for StreamServer {
49    type Item = Connection;
50    type Factory = StreamService;
51
52    /// Create service factory for handling `WorkerMessage<T>` messages.
53    async fn create(&self) -> Result<Self::Factory, ()> {
54        // on worker start callbacks
55        for cb in &self.on_worker_start {
56            cb.run().await?;
57        }
58
59        // construct services
60        let mut services = Vec::new();
61        for svc in &self.services {
62            services.extend(svc.create().await?);
63        }
64
65        Ok(StreamService {
66            services,
67            on_accept: self.on_accept.as_ref().map(|f| f.clone_fn()),
68        })
69    }
70
71    /// Server is paused
72    fn paused(&self) {
73        self.notify.send(AcceptorCommand::Pause);
74    }
75
76    /// Server is resumed
77    fn resumed(&self) {
78        self.notify.send(AcceptorCommand::Resume);
79    }
80
81    /// Server is stopped
82    fn terminate(&self) {
83        self.notify.send(AcceptorCommand::Terminate);
84    }
85
86    /// Server is stopped
87    async fn stop(&self) {
88        let (tx, rx) = oneshot::channel();
89        self.notify.send(AcceptorCommand::Stop(tx));
90        let _ = rx.await;
91    }
92}
93
94impl Clone for StreamServer {
95    fn clone(&self) -> Self {
96        Self {
97            notify: self.notify.clone(),
98            services: self.services.iter().map(|s| s.clone_factory()).collect(),
99            on_accept: self.on_accept.as_ref().map(|f| f.clone_fn()),
100            on_worker_start: self.on_worker_start.iter().map(|f| f.clone_fn()).collect(),
101        }
102    }
103}
104
105pub struct StreamService {
106    services: Vec<NetService>,
107    on_accept: Option<Box<dyn OnAccept + Send>>,
108}
109
110impl fmt::Debug for StreamService {
111    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
112        f.debug_struct("StreamService")
113            .field("services", &self.services)
114            .finish()
115    }
116}
117
118impl ServiceFactory<Connection> for StreamService {
119    type Response = ();
120    type Error = ();
121    type Service = StreamServiceImpl;
122    type InitError = ();
123
124    async fn create(&self, _r: ()) -> Result<Self::Service, Self::InitError> {
125        let mut tokens = HashMap::default();
126        let mut services = Vec::new();
127
128        for info in &self.services {
129            if let Ok(svc) = info.factory.create(info.config).await {
130                log::trace!("Constructed server service for {:?}", info.tokens);
131                services.push(svc);
132                let idx = services.len() - 1;
133                for (token, cfg) in &info.tokens {
134                    tokens.insert(*token, (idx, info.name.clone(), *cfg));
135                }
136            } else {
137                log::error!("Cannot construct service: {:?}", info.tokens);
138                return Err(());
139            }
140        }
141
142        Ok(StreamServiceImpl {
143            tokens,
144            services,
145            conns: MAX_CONNS_COUNTER.with(Clone::clone),
146            on_accept: self.on_accept.as_ref().map(|f| f.clone_fn()),
147        })
148    }
149}
150
151pub struct StreamServiceImpl {
152    tokens: HashMap<Token, (usize, Arc<str>, SharedCfg)>,
153    services: Vec<BoxService>,
154    conns: Counter,
155    on_accept: Option<Box<dyn OnAccept + Send>>,
156}
157
158impl fmt::Debug for StreamServiceImpl {
159    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
160        f.debug_struct("StreamServiceImpl")
161            .field("tokens", &self.tokens)
162            .field("services", &self.services)
163            .field("conns", &self.conns)
164            .finish()
165    }
166}
167
168impl Service<Connection> for StreamServiceImpl {
169    type Response = ();
170    type Error = ();
171
172    async fn ready(&self, ctx: ServiceCtx<'_, Self>) -> Result<(), Self::Error> {
173        if !self.conns.is_available() {
174            self.conns.available().await;
175        }
176        for (idx, svc) in self.services.iter().enumerate() {
177            if ctx.ready(svc).await.is_err() {
178                for (idx_, _, cfg) in self.tokens.values() {
179                    if idx == *idx_ {
180                        log::error!("{}: Service readiness has failed", cfg.tag());
181                        break;
182                    }
183                }
184                return Err(());
185            }
186        }
187
188        Ok(())
189    }
190
191    #[inline]
192    fn poll(&self, cx: &mut Context<'_>) -> Result<(), Self::Error> {
193        for svc in &self.services {
194            svc.poll(cx)?;
195        }
196        Ok(())
197    }
198
199    async fn shutdown(&self) {
200        let _ = join_all(self.services.iter().map(Service::shutdown)).await;
201        log::info!(
202            "Worker service shutdown, {} connections",
203            super::num_connections()
204        );
205    }
206
207    async fn call(&self, con: Connection, ctx: ServiceCtx<'_, Self>) -> Result<(), ()> {
208        if let Some((idx, name, cfg)) = self.tokens.get(&con.token) {
209            let mut io = con.io;
210            if let Some(ref f) = self.on_accept {
211                match f.run(name.clone(), io).await {
212                    Ok(st) => io = st,
213                    Err(()) => return Err(()),
214                }
215            }
216
217            let stream = io.convert(*cfg).map_err(|e| {
218                log::error!("Cannot convert to an async io stream: {e}");
219            })?;
220
221            let guard = self.conns.get();
222            let _ = ctx.call(&self.services[*idx], stream).await;
223            drop(guard);
224            Ok(())
225        } else {
226            log::error!("Cannot get handler service for connection: {con:?}");
227            Err(())
228        }
229    }
230}