ntex_server/net/
service.rs

1use std::{fmt, task::Context};
2
3use ntex_bytes::{Pool, PoolRef};
4use ntex_net::Io;
5use ntex_service::{boxed, Service, ServiceCtx, ServiceFactory};
6use ntex_util::{future::join_all, services::Counter, HashMap};
7
8use crate::ServerConfiguration;
9
10use super::accept::{AcceptNotify, AcceptorCommand};
11use super::factory::{FactoryServiceType, NetService, OnWorkerStart};
12use super::{socket::Connection, Token, MAX_CONNS_COUNTER};
13
14pub(super) type BoxService = boxed::BoxService<Io, (), ()>;
15
16/// Net streaming server
17pub struct StreamServer {
18    notify: AcceptNotify,
19    services: Vec<FactoryServiceType>,
20    on_worker_start: Vec<Box<dyn OnWorkerStart + Send>>,
21}
22
23impl StreamServer {
24    pub(crate) fn new(
25        notify: AcceptNotify,
26        services: Vec<FactoryServiceType>,
27        on_worker_start: Vec<Box<dyn OnWorkerStart + Send>>,
28    ) -> Self {
29        Self {
30            notify,
31            services,
32            on_worker_start,
33        }
34    }
35}
36
37impl fmt::Debug for StreamServer {
38    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
39        f.debug_struct("StreamServer")
40            .field("services", &self.services.len())
41            .finish()
42    }
43}
44
45/// Worker service factory.
46impl ServerConfiguration for StreamServer {
47    type Item = Connection;
48    type Factory = StreamService;
49
50    /// Create service factory for handling `WorkerMessage<T>` messages.
51    async fn create(&self) -> Result<Self::Factory, ()> {
52        // on worker start callbacks
53        for cb in &self.on_worker_start {
54            cb.run().await?;
55        }
56
57        // construct services
58        let mut services = Vec::new();
59        for svc in &self.services {
60            services.extend(svc.create().await?);
61        }
62
63        Ok(StreamService { services })
64    }
65
66    /// Server is paused
67    fn paused(&self) {
68        self.notify.send(AcceptorCommand::Pause);
69    }
70
71    /// Server is resumed
72    fn resumed(&self) {
73        self.notify.send(AcceptorCommand::Resume);
74    }
75
76    /// Server is stopped
77    fn terminate(&self) {
78        self.notify.send(AcceptorCommand::Terminate);
79    }
80
81    /// Server is stopped
82    async fn stop(&self) {
83        let (tx, rx) = oneshot::channel();
84        self.notify.send(AcceptorCommand::Stop(tx));
85        let _ = rx.await;
86    }
87}
88
89impl Clone for StreamServer {
90    fn clone(&self) -> Self {
91        Self {
92            notify: self.notify.clone(),
93            services: self.services.iter().map(|s| s.clone_factory()).collect(),
94            on_worker_start: self.on_worker_start.iter().map(|f| f.clone_fn()).collect(),
95        }
96    }
97}
98
99#[derive(Debug)]
100pub struct StreamService {
101    services: Vec<NetService>,
102}
103
104impl ServiceFactory<Connection> for StreamService {
105    type Response = ();
106    type Error = ();
107    type Service = StreamServiceImpl;
108    type InitError = ();
109
110    async fn create(&self, _: ()) -> Result<Self::Service, Self::InitError> {
111        let mut tokens = HashMap::default();
112        let mut services = Vec::new();
113
114        for info in &self.services {
115            match info.factory.create(()).await {
116                Ok(svc) => {
117                    log::debug!("Constructed server service for {:?}", info.tokens);
118                    services.push(svc);
119                    let idx = services.len() - 1;
120                    for (token, tag) in &info.tokens {
121                        tokens.insert(
122                            *token,
123                            (idx, *tag, info.pool.pool(), info.pool.pool_ref()),
124                        );
125                    }
126                }
127                Err(_) => {
128                    log::error!("Cannot construct service: {:?}", info.tokens);
129                    return Err(());
130                }
131            }
132        }
133
134        Ok(StreamServiceImpl {
135            tokens,
136            services,
137            conns: MAX_CONNS_COUNTER.with(|conns| conns.clone()),
138        })
139    }
140}
141
142#[derive(Debug)]
143pub struct StreamServiceImpl {
144    tokens: HashMap<Token, (usize, &'static str, Pool, PoolRef)>,
145    services: Vec<BoxService>,
146    conns: Counter,
147}
148
149impl Service<Connection> for StreamServiceImpl {
150    type Response = ();
151    type Error = ();
152
153    async fn ready(&self, ctx: ServiceCtx<'_, Self>) -> Result<(), Self::Error> {
154        if !self.conns.is_available() {
155            self.conns.available().await;
156        }
157        for (idx, svc) in self.services.iter().enumerate() {
158            if ctx.ready(svc).await.is_err() {
159                for (idx_, tag, _, _) in self.tokens.values() {
160                    if idx == *idx_ {
161                        log::error!("{}: Service readiness has failed", tag);
162                        break;
163                    }
164                }
165                return Err(());
166            }
167        }
168
169        Ok(())
170    }
171
172    #[inline]
173    fn poll(&self, cx: &mut Context<'_>) -> Result<(), Self::Error> {
174        for svc in &self.services {
175            svc.poll(cx)?;
176        }
177        Ok(())
178    }
179
180    async fn shutdown(&self) {
181        let _ = join_all(self.services.iter().map(|svc| svc.shutdown())).await;
182        log::info!(
183            "Worker service shutdown, {} connections",
184            super::num_connections()
185        );
186    }
187
188    async fn call(&self, con: Connection, ctx: ServiceCtx<'_, Self>) -> Result<(), ()> {
189        if let Some((idx, tag, _, pool)) = self.tokens.get(&con.token) {
190            let stream: Io<_> = con.io.try_into().map_err(|e| {
191                log::error!("Cannot convert to an async io stream: {}", e);
192            })?;
193
194            stream.set_tag(tag);
195            stream.set_memory_pool(*pool);
196            let guard = self.conns.get();
197            let _ = ctx.call(&self.services[*idx], stream).await;
198            drop(guard);
199            Ok(())
200        } else {
201            log::error!("Cannot get handler service for connection: {:?}", con);
202            Err(())
203        }
204    }
205}