Skip to main content

xitca_web/
server.rs

1use std::{fmt, future::Future, sync::Arc, time::Duration};
2
3use xitca_http::{
4    HttpServiceBuilder,
5    config::{DEFAULT_HEADER_LIMIT, DEFAULT_READ_BUF_LIMIT, DEFAULT_WRITE_BUF_LIMIT, HttpServiceConfig},
6};
7use xitca_server::{Builder, ServerFuture, net::IntoListener};
8use xitca_service::ServiceExt;
9
10use crate::{
11    body::{Body, RequestBody},
12    bytes::Bytes,
13    http::{Request, RequestExt, Response},
14    service::{Service, ready::ReadyService},
15};
16
17/// multi protocol handling http server
18pub struct HttpServer<
19    S,
20    const HEADER_LIMIT: usize = DEFAULT_HEADER_LIMIT,
21    const READ_BUF_LIMIT: usize = DEFAULT_READ_BUF_LIMIT,
22    const WRITE_BUF_LIMIT: usize = DEFAULT_WRITE_BUF_LIMIT,
23> {
24    service: Arc<S>,
25    builder: Builder,
26    enable_io_uring: bool,
27    config: HttpServiceConfig<HEADER_LIMIT, READ_BUF_LIMIT, WRITE_BUF_LIMIT>,
28}
29
30impl<S> HttpServer<S>
31where
32    S: Send + Sync + 'static,
33{
34    pub fn serve(service: S) -> Self {
35        Self {
36            service: Arc::new(service),
37            builder: Builder::new(),
38            enable_io_uring: false,
39            config: HttpServiceConfig::default(),
40        }
41    }
42}
43
44impl<S, const HEADER_LIMIT: usize, const READ_BUF_LIMIT: usize, const WRITE_BUF_LIMIT: usize>
45    HttpServer<S, HEADER_LIMIT, READ_BUF_LIMIT, WRITE_BUF_LIMIT>
46where
47    S: Send + Sync + 'static,
48{
49    /// Set number of threads dedicated to accepting connections.
50    ///
51    /// Default set to 1.
52    ///
53    /// # Panics:
54    /// When receive 0 as number of server thread.
55    pub fn server_threads(mut self, num: usize) -> Self {
56        self.builder = self.builder.server_threads(num);
57        self
58    }
59
60    /// Set number of workers to start.
61    ///
62    /// Default set to available logical cpu as workers count.
63    ///
64    /// # Panics:
65    /// When received 0 as number of worker thread.
66    pub fn worker_threads(mut self, num: usize) -> Self {
67        self.builder = self.builder.worker_threads(num);
68        self
69    }
70
71    /// Set max number of threads for each worker's blocking task thread pool.
72    ///
73    /// One thread pool is set up **per worker**; not shared across workers.
74    pub fn worker_max_blocking_threads(mut self, num: usize) -> Self {
75        self.builder = self.builder.worker_max_blocking_threads(num);
76        self
77    }
78
79    /// Disable signal listening.
80    ///
81    /// `tokio::signal` is used for listening and it only functions in tokio runtime 1.x.
82    /// Disabling it would enable server runs in other async runtimes.
83    pub fn disable_signal(mut self) -> Self {
84        self.builder = self.builder.disable_signal();
85        self
86    }
87
88    pub fn backlog(mut self, num: u32) -> Self {
89        self.builder = self.builder.backlog(num);
90        self
91    }
92
93    /// Disable vectored write even when IO is able to perform it.
94    ///
95    /// This is beneficial when dealing with small size of response body.
96    pub fn disable_vectored_write(mut self) -> Self {
97        self.config = self.config.disable_vectored_write();
98        self
99    }
100
101    /// Change keep alive duration for Http/1 connection.
102    ///
103    /// Connection kept idle for this duration would be closed.
104    pub fn keep_alive_timeout(mut self, dur: Duration) -> Self {
105        self.config = self.config.keep_alive_timeout(dur);
106        self
107    }
108
109    /// Change request timeout for Http/1 connection.
110    ///
111    /// Connection can not finish it's request for this duration would be closed.
112    ///
113    /// This timeout is also used in Http/2 connection handshake phrase.
114    pub fn request_head_timeout(mut self, dur: Duration) -> Self {
115        self.config = self.config.request_head_timeout(dur);
116        self
117    }
118
119    /// Change tls accept timeout for Http/1 and Http/2 connection.
120    ///
121    /// Connection can not finish tls handshake for this duration would be closed.
122    pub fn tls_accept_timeout(mut self, dur: Duration) -> Self {
123        self.config = self.config.tls_accept_timeout(dur);
124        self
125    }
126
127    /// Enable HTTP/2 cleartext (h2c) with prior knowledge.
128    ///
129    /// When enabled, the server peeks at the first bytes of each connection to detect whether
130    /// the client is speaking HTTP/2 (via the connection preface) or HTTP/1.1, and dispatches
131    /// accordingly. This allows a single listener to serve both protocols without TLS-based
132    /// ALPN negotiation.
133    ///
134    /// Typically required for gRPC over plaintext, as most gRPC clients expect HTTP/2.
135    pub fn h2c_prior_knowledge(mut self) -> Self {
136        self.config = self.config.peek_protocol();
137        self
138    }
139
140    /// Change max size for request head.
141    ///
142    /// Request has a bigger head than it would be reject with error.
143    /// Request body has a bigger continuous read would be force to yield.
144    ///
145    /// Default to 1mb.
146    pub fn max_read_buf_size<const READ_BUF_LIMIT_2: usize>(
147        self,
148    ) -> HttpServer<S, HEADER_LIMIT, READ_BUF_LIMIT_2, WRITE_BUF_LIMIT> {
149        self.mutate_const_generic::<HEADER_LIMIT, READ_BUF_LIMIT_2, WRITE_BUF_LIMIT>()
150    }
151
152    /// Change max size for write buffer size.
153    ///
154    /// When write buffer hit limit it would force a drain write to Io stream until it's empty
155    /// (or connection closed by error or remote peer).
156    ///
157    /// Default to 408kb.
158    pub fn max_write_buf_size<const WRITE_BUF_LIMIT_2: usize>(
159        self,
160    ) -> HttpServer<S, HEADER_LIMIT, READ_BUF_LIMIT, WRITE_BUF_LIMIT_2> {
161        self.mutate_const_generic::<HEADER_LIMIT, READ_BUF_LIMIT, WRITE_BUF_LIMIT_2>()
162    }
163
164    /// Change max header fields for one request.
165    ///
166    /// Default to 64.
167    pub fn max_request_headers<const HEADER_LIMIT_2: usize>(
168        self,
169    ) -> HttpServer<S, HEADER_LIMIT_2, READ_BUF_LIMIT, WRITE_BUF_LIMIT> {
170        self.mutate_const_generic::<HEADER_LIMIT_2, READ_BUF_LIMIT, WRITE_BUF_LIMIT>()
171    }
172
173    #[doc(hidden)]
174    pub fn on_worker_start<FS, Fut>(mut self, on_start: FS) -> Self
175    where
176        FS: Fn() -> Fut + Send + Sync + 'static,
177        Fut: Future + Send + 'static,
178    {
179        self.builder = self.builder.on_worker_start(on_start);
180        self
181    }
182
183    #[cfg(feature = "io-uring")]
184    pub fn enable_io_uring(mut self) -> Self {
185        self.enable_io_uring = true;
186        self
187    }
188
189    #[cfg(not(target_family = "wasm"))]
190    pub fn bind<A, ResB>(mut self, addr: A) -> std::io::Result<Self>
191    where
192        A: std::net::ToSocketAddrs,
193        S: Service + 'static,
194        S::Response: ReadyService + Service<Request<RequestExt<RequestBody>>, Response = Response<ResB>> + 'static,
195        S::Error: fmt::Debug,
196        <S::Response as Service<Request<RequestExt<RequestBody>>>>::Error: fmt::Debug,
197        ResB: Body<Data = Bytes> + 'static,
198        ResB::Error: fmt::Debug + 'static,
199    {
200        let http = HttpServiceBuilder::with_config(self.config);
201        let service = self.service.clone();
202        let name = "xitca-web";
203
204        self.builder = if self.enable_io_uring {
205            #[cfg(feature = "io-uring")]
206            {
207                self.builder.bind(name, addr, service.enclosed(http.io_uring()))?
208            }
209
210            #[cfg(not(feature = "io-uring"))]
211            unreachable!()
212        } else {
213            self.builder.bind(name, addr, service.enclosed(http))?
214        };
215
216        Ok(self)
217    }
218
219    pub fn listen<ResB, L>(mut self, listener: L) -> std::io::Result<Self>
220    where
221        S: Service + 'static,
222        S::Response: ReadyService + Service<Request<RequestExt<RequestBody>>, Response = Response<ResB>> + 'static,
223        S::Error: fmt::Debug,
224        <S::Response as Service<Request<RequestExt<RequestBody>>>>::Error: fmt::Debug,
225        ResB: Body<Data = Bytes> + 'static,
226        ResB::Error: fmt::Debug + 'static,
227        L: IntoListener + 'static,
228    {
229        let http = HttpServiceBuilder::with_config(self.config);
230        let service = self.service.clone();
231        let name = "xitca-web";
232
233        self.builder = if self.enable_io_uring {
234            #[cfg(feature = "io-uring")]
235            {
236                self.builder.listen(name, listener, service.enclosed(http.io_uring()))
237            }
238
239            #[cfg(not(feature = "io-uring"))]
240            unreachable!()
241        } else {
242            self.builder.listen(name, listener, service.enclosed(http))
243        };
244
245        Ok(self)
246    }
247
248    #[cfg(feature = "openssl")]
249    pub fn bind_openssl<A: std::net::ToSocketAddrs, ResB>(
250        mut self,
251        addr: A,
252        mut builder: xitca_tls::openssl::ssl::SslAcceptorBuilder,
253    ) -> std::io::Result<Self>
254    where
255        S: Service + 'static,
256        S::Response: ReadyService + Service<Request<RequestExt<RequestBody>>, Response = Response<ResB>> + 'static,
257        S::Error: fmt::Debug,
258        <S::Response as Service<Request<RequestExt<RequestBody>>>>::Error: fmt::Debug,
259        ResB: Body<Data = Bytes> + 'static,
260        ResB::Error: fmt::Debug + 'static,
261    {
262        let config = self.config;
263
264        const H11: &[u8] = b"\x08http/1.1";
265
266        const H2: &[u8] = b"\x02h2";
267
268        builder.set_alpn_select_callback(|_, protocols| {
269            if protocols.windows(3).any(|window| window == H2) {
270                #[cfg(feature = "http2")]
271                {
272                    Ok(b"h2")
273                }
274                #[cfg(not(feature = "http2"))]
275                Err(xitca_tls::openssl::ssl::AlpnError::ALERT_FATAL)
276            } else if protocols.windows(9).any(|window| window == H11) {
277                Ok(b"http/1.1")
278            } else {
279                Err(xitca_tls::openssl::ssl::AlpnError::NOACK)
280            }
281        });
282
283        #[cfg(not(feature = "http2"))]
284        let protos = H11.to_vec();
285
286        #[cfg(feature = "http2")]
287        let protos = H11.iter().chain(H2).cloned().collect::<Vec<_>>();
288
289        builder.set_alpn_protos(&protos)?;
290
291        let acceptor = builder.build();
292        let name = "xitca-web-openssl";
293        let http = HttpServiceBuilder::with_config(config).openssl(acceptor);
294        let service = self.service.clone();
295
296        self.builder = if self.enable_io_uring {
297            #[cfg(feature = "io-uring")]
298            {
299                self.builder.bind(name, addr, service.enclosed(http.io_uring()))?
300            }
301
302            #[cfg(not(feature = "io-uring"))]
303            unreachable!()
304        } else {
305            self.builder.bind(name, addr, service.enclosed(http))?
306        };
307
308        Ok(self)
309    }
310
311    #[cfg(feature = "rustls")]
312    pub fn bind_rustls<A: std::net::ToSocketAddrs, ResB>(
313        mut self,
314        addr: A,
315        #[cfg_attr(not(all(feature = "http1", feature = "http2")), allow(unused_mut))]
316        mut config: xitca_tls::rustls::ServerConfig,
317    ) -> std::io::Result<Self>
318    where
319        S: Service + 'static,
320        S::Response: ReadyService + Service<Request<RequestExt<RequestBody>>, Response = Response<ResB>> + 'static,
321        S::Error: fmt::Debug,
322        <S::Response as Service<Request<RequestExt<RequestBody>>>>::Error: fmt::Debug,
323        ResB: Body<Data = Bytes> + 'static,
324        ResB::Error: fmt::Debug + 'static,
325    {
326        let service_config = self.config;
327
328        #[cfg(feature = "http2")]
329        config.alpn_protocols.push("h2".into());
330
331        #[cfg(feature = "http1")]
332        config.alpn_protocols.push("http/1.1".into());
333
334        let config = std::sync::Arc::new(config);
335        let http = HttpServiceBuilder::with_config(service_config).rustls(config);
336        let service = self.service.clone();
337        let name = "xitca-web-rustls";
338
339        self.builder = if self.enable_io_uring {
340            #[cfg(feature = "io-uring")]
341            {
342                self.builder.bind(name, addr, service.enclosed(http.io_uring()))?
343            }
344
345            #[cfg(not(feature = "io-uring"))]
346            unreachable!()
347        } else {
348            self.builder.bind(name, addr, service.enclosed(http))?
349        };
350
351        Ok(self)
352    }
353
354    #[cfg(unix)]
355    pub fn bind_unix<P: AsRef<std::path::Path>, ResB>(mut self, path: P) -> std::io::Result<Self>
356    where
357        S: Service + 'static,
358        S::Response: ReadyService + Service<Request<RequestExt<RequestBody>>, Response = Response<ResB>> + 'static,
359        S::Error: fmt::Debug,
360        <S::Response as Service<Request<RequestExt<RequestBody>>>>::Error: fmt::Debug,
361        ResB: Body<Data = Bytes> + 'static,
362        ResB::Error: fmt::Debug + 'static,
363    {
364        let http = HttpServiceBuilder::with_config(self.config);
365        let name = "xitca-web";
366        let service = self.service.clone();
367        self.builder = if self.enable_io_uring {
368            #[cfg(feature = "io-uring")]
369            {
370                self.builder.bind_unix(name, path, service.enclosed(http.io_uring()))?
371            }
372
373            #[cfg(not(feature = "io-uring"))]
374            unreachable!()
375        } else {
376            self.builder.bind_unix(name, path, service.enclosed(http))?
377        };
378
379        Ok(self)
380    }
381
382    #[cfg(feature = "http3")]
383    pub fn bind_h3<A: std::net::ToSocketAddrs, ResB>(
384        mut self,
385        addr: A,
386        config: xitca_io::net::QuicConfig,
387    ) -> std::io::Result<Self>
388    where
389        S: Service + 'static,
390        S::Response: ReadyService + Service<Request<RequestExt<RequestBody>>, Response = Response<ResB>> + 'static,
391        S::Error: fmt::Debug,
392        <S::Response as Service<Request<RequestExt<RequestBody>>>>::Error: fmt::Debug,
393        ResB: Body<Data = Bytes> + 'static,
394        ResB::Error: fmt::Debug + 'static,
395    {
396        let service = self
397            .service
398            .clone()
399            .enclosed(HttpServiceBuilder::with_config(self.config));
400
401        self.builder = self.builder.bind_h3("xitca-web", addr, config, service)?;
402        Ok(self)
403    }
404
405    pub fn run(self) -> ServerFuture {
406        self.builder.build()
407    }
408
409    fn mutate_const_generic<const HEADER_LIMIT2: usize, const READ_BUF_LIMIT2: usize, const WRITE_BUF_LIMIT2: usize>(
410        self,
411    ) -> HttpServer<S, HEADER_LIMIT2, READ_BUF_LIMIT2, WRITE_BUF_LIMIT2> {
412        HttpServer {
413            service: self.service,
414            enable_io_uring: self.enable_io_uring,
415            builder: self.builder,
416            config: self
417                .config
418                .mutate_const_generic::<HEADER_LIMIT2, READ_BUF_LIMIT2, WRITE_BUF_LIMIT2>(),
419        }
420    }
421}