monolake_services/http/
core.rs

1//! Core HTTP service implementation for handling downstream client connections.
2//!
3//! This module provides a high-performance, asynchronous HTTP service that handles
4//! connections from downstream clients. It supports HTTP/1, HTTP/1.1, and HTTP/2 protocols,
5//! and is designed to work with monoio's asynchronous runtime, providing fine-grained
6//! control over various timeouts.
7//!
8//! # Key Components
9//!
10//! - [`HttpCoreService`]: The main service component responsible for handling HTTP connections from
11//!   downstream clients. It can be composed of a stack of handlers implementing the `HttpHandler`
12//!   trait.
13//! - [`HttpServerTimeout`]: Configuration for various timeout settings in the HTTP server.
14//!
15//! # Features
16//!
17//! - Support for HTTP/1, HTTP/1.1, and HTTP/2 protocols
18//! - Composable design allowing a stack of `HttpHandler` implementations
19//! - Automatic protocol detection when combined with `H2Detect`
20//! - Efficient handling of concurrent requests using asynchronous I/O
21//! - Configurable timeout settings for different stages of request processing
22//! - Integration with `service_async` for easy composition in service stacks
23//! - Automatic response encoding and error handling
24//!
25//! # Usage
26//!
27//! `HttpCoreService` is typically used as part of a larger service stack, often in combination
28//! with `H2Detect` for automatic protocol detection. Here's a basic example:
29//!
30//! ```ignore
31//! use service_async::{layer::FactoryLayer, stack::FactoryStack};
32//!
33//! use crate::http::{HttpCoreService, H2Detect};
34//!
35//! let config = Config { /* ... */ };
36//! let stack = FactoryStack::new(config)
37//!     .push(HttpCoreService::layer())
38//!     .push(H2Detect::layer())
39//!     // ... other handlers implementing HttpHandler ...
40//!     ;
41//!
42//! let service = stack.make_async().await.unwrap();
43//! // Use the service to handle incoming HTTP connections from downstream clients
44//! ```
45//!
46//! # Handler Composition
47//!
48//! `HttpCoreService` can be composed of multiple handlers implementing the `HttpHandler` trait.
49//! This allows for a flexible and modular approach to request processing. Handlers can be
50//! chained together to form a processing pipeline, each handling a specific aspect of the
51//! HTTP request/response cycle.
52//!
53//! # Automatic Protocol Detection
54//!
55//! When used in conjunction with `H2Detect`, `HttpCoreService` can automatically
56//! detect whether an incoming connection is using HTTP/1, HTTP/1.1, or HTTP/2, and handle
57//! it appropriately. This allows for seamless support of multiple HTTP versions without
58//! the need for separate server configurations.
59//!
60//! # Performance Considerations
61//!
62//! - Uses monoio's efficient async I/O operations for improved performance
63//! - Implements connection keep-alive for HTTP/1.1 to reduce connection overhead
64//! - Supports HTTP/2 multiplexing for efficient handling of concurrent requests
65//! - Automatic protocol detection allows for optimized handling based on the client's capabilities
66use std::{convert::Infallible, fmt::Debug, pin::Pin, time::Duration};
67
68use bytes::Bytes;
69use certain_map::{Attach, Fork};
70use futures::{stream::FuturesUnordered, StreamExt};
71use http::StatusCode;
72use monoio::io::{sink::SinkExt, stream::Stream, AsyncReadRent, AsyncWriteRent, Split, Splitable};
73use monoio_http::{
74    common::{
75        body::{Body, HttpBody, StreamHint},
76        response::Response,
77    },
78    h1::codec::{
79        decoder::{FillPayload, RequestDecoder},
80        encoder::GenericEncoder,
81    },
82    h2::server::SendResponse,
83};
84use monolake_core::{
85    context::PeerAddr,
86    http::{HttpAccept, HttpHandler},
87    AnyError,
88};
89use service_async::{
90    layer::{layer_fn, FactoryLayer},
91    AsyncMakeService, MakeService, Param, ParamRef, Service,
92};
93use tracing::{error, info, warn};
94
95use super::{generate_response, util::AccompanyPair};
96
97/// Core HTTP service handler supporting both HTTP/1.1 and HTTP/2 protocols.
98///
99/// `HttpCoreService` is responsible for accepting HTTP connections, decoding requests,
100/// routing them through a handler chain, and encoding responses. It supports both
101/// HTTP/1.1 with keep-alive and HTTP/2 with multiplexing.
102/// For implementation details and example usage, see the
103/// [module level documentation](crate::http::core).
104#[derive(Clone)]
105pub struct HttpCoreService<H> {
106    handler_chain: H,
107    http_timeout: HttpServerTimeout,
108}
109
110impl<H> HttpCoreService<H> {
111    pub fn new(handler_chain: H, http_timeout: HttpServerTimeout) -> Self {
112        HttpCoreService {
113            handler_chain,
114            http_timeout,
115        }
116    }
117
118    async fn h1_svc<S, CXIn, CXStore, CXState, Err>(&self, stream: S, ctx: CXIn)
119    where
120        CXIn: ParamRef<PeerAddr> + Fork<Store = CXStore, State = CXState>,
121        CXStore: 'static,
122        for<'a> CXState: Attach<CXStore>,
123        for<'a> H: HttpHandler<
124            <CXState as Attach<CXStore>>::Hdr<'a>,
125            HttpBody,
126            Body = HttpBody,
127            Error = Err,
128        >,
129        Err: Into<AnyError> + Debug,
130        S: Split + AsyncReadRent + AsyncWriteRent,
131    {
132        let (reader, writer) = stream.into_split();
133        let mut decoder = RequestDecoder::new(reader);
134        let mut encoder = GenericEncoder::new(writer);
135        decoder.set_timeout(self.http_timeout.keepalive_timeout);
136
137        loop {
138            // decode request with header timeout
139            let decoded = match self.http_timeout.read_header_timeout {
140                Some(header_timeout) => {
141                    match monoio::time::timeout(header_timeout, decoder.next()).await {
142                        Ok(inner) => inner,
143                        Err(_) => {
144                            info!(
145                                "Connection {:?} decode http header timed out",
146                                ParamRef::<PeerAddr>::param_ref(&ctx),
147                            );
148                            break;
149                        }
150                    }
151                }
152                None => decoder.next().await,
153            };
154
155            let req = match decoded {
156                Some(Ok(req)) => HttpBody::request(req),
157                Some(Err(err)) => {
158                    // decode error
159                    warn!("decode request header failed: {err}");
160                    break;
161                }
162                None => {
163                    // EOF
164                    info!(
165                        "Connection {:?} closed",
166                        ParamRef::<PeerAddr>::param_ref(&ctx),
167                    );
168                    break;
169                }
170            };
171
172            // fork ctx
173            let (mut store, state) = ctx.fork();
174            let forked_ctx = unsafe { state.attach(&mut store) };
175
176            // handle request and reply response
177            // 1. do these things simultaneously: read body and send + handle request
178            let mut acc_fut = AccompanyPair::new(
179                self.handler_chain.handle(req, forked_ctx),
180                decoder.fill_payload(),
181            );
182            let res = unsafe { Pin::new_unchecked(&mut acc_fut) }.await;
183            match res {
184                Ok((resp, should_cont)) => {
185                    // 2. do these things simultaneously: read body and send + handle response
186                    let mut f = acc_fut.replace(encoder.send_and_flush(resp));
187                    match self.http_timeout.read_body_timeout {
188                        None => {
189                            if let Err(e) = unsafe { Pin::new_unchecked(&mut f) }.await {
190                                warn!("error when encode and write response: {e}");
191                                break;
192                            }
193                        }
194                        Some(body_timeout) => {
195                            match monoio::time::timeout(body_timeout, unsafe {
196                                Pin::new_unchecked(&mut f)
197                            })
198                            .await
199                            {
200                                Err(_) => {
201                                    info!(
202                                        "Connection {:?} write timed out",
203                                        ParamRef::<PeerAddr>::param_ref(&ctx),
204                                    );
205                                    break;
206                                }
207                                Ok(Err(e)) => {
208                                    warn!("error when encode and write response: {e}");
209                                    break;
210                                }
211                                _ => (),
212                            }
213                        }
214                    }
215
216                    if !should_cont {
217                        break;
218                    }
219                    if let Err(e) = f.into_accompany().await {
220                        warn!("error when decode request body: {e}");
221                        break;
222                    }
223                }
224                Err(e) => {
225                    // something error when process request(not a biz error)
226                    error!("error when processing request: {e:?}");
227                    if let Err(e) = encoder
228                        .send_and_flush(generate_response::<HttpBody>(
229                            StatusCode::INTERNAL_SERVER_ERROR,
230                            true,
231                        ))
232                        .await
233                    {
234                        warn!("error when reply client: {e}");
235                    }
236                    break;
237                }
238            }
239        }
240    }
241
242    async fn h2_process_response(
243        response: Response<HttpBody>,
244        mut response_handle: SendResponse<Bytes>,
245    ) {
246        let (mut parts, mut body) = response.into_parts();
247        parts.headers.remove("connection");
248        let response = http::Response::from_parts(parts, ());
249
250        match body.stream_hint() {
251            StreamHint::None => {
252                if let Err(e) = response_handle.send_response(response, true) {
253                    error!("H2 frontend response send fail {:?}", e);
254                }
255            }
256            StreamHint::Fixed => {
257                let mut send_stream = match response_handle.send_response(response, false) {
258                    Ok(s) => s,
259                    Err(e) => {
260                        error!("H2 frontend response send fail {:?}", e);
261                        return;
262                    }
263                };
264
265                if let Some(Ok(data)) = body.next_data().await {
266                    let _ = send_stream.send_data(data, true);
267                }
268            }
269            StreamHint::Stream => {
270                let mut send_stream = match response_handle.send_response(response, false) {
271                    Ok(s) => s,
272                    Err(e) => {
273                        error!("H2 frontend response send fail {:?}", e);
274                        return;
275                    }
276                };
277
278                while let Some(Ok(data)) = body.next_data().await {
279                    let _ = send_stream.send_data(data, false);
280                }
281
282                let _ = send_stream.send_data(Bytes::new(), true);
283            }
284        }
285    }
286
287    async fn h2_svc<S, CXIn, CXStore, CXState, Err>(&self, stream: S, ctx: CXIn)
288    where
289        CXIn: ParamRef<PeerAddr> + Fork<Store = CXStore, State = CXState>,
290        CXStore: 'static,
291        for<'a> CXState: Attach<CXStore>,
292        for<'a> H: HttpHandler<
293            <CXState as Attach<CXStore>>::Hdr<'a>,
294            HttpBody,
295            Body = HttpBody,
296            Error = Err,
297        >,
298        Err: Into<AnyError> + Debug,
299        S: Split + AsyncReadRent + AsyncWriteRent + Unpin + 'static,
300    {
301        let mut connection = match monoio_http::h2::server::Builder::new()
302            .initial_window_size(1_000_000)
303            .max_concurrent_streams(1000)
304            .handshake::<S, Bytes>(stream)
305            .await
306        {
307            Ok(c) => {
308                info!(
309                    "H2 handshake complete for {:?}",
310                    ParamRef::<PeerAddr>::param_ref(&ctx),
311                );
312                c
313            }
314            Err(e) => {
315                error!("h2 server build failed: {e:?}");
316                return;
317            }
318        };
319
320        let (tx, mut rx) = local_sync::mpsc::unbounded::channel();
321        let mut backend_resp_stream = FuturesUnordered::new();
322        let mut frontend_resp_stream = FuturesUnordered::new();
323
324        monoio::spawn(async move {
325            let tx = tx.clone();
326            while let Some(result) = connection.accept().await {
327                match tx.send(result) {
328                    Ok(_) => {}
329                    Err(e) => {
330                        error!("Frontend Req send failed {e:?}");
331                        break;
332                    }
333                }
334            }
335        });
336
337        loop {
338            monoio::select! {
339                 Some(Ok((request, response_handle))) = rx.recv() => {
340                        let request = HttpBody::request(request);
341                        // fork ctx
342                        let (mut store, state) = ctx.fork();
343                        backend_resp_stream.push(async move {
344                            let forked_ctx = unsafe { state.attach(&mut store) };
345                            (self.handler_chain.handle(request, forked_ctx).await, response_handle)
346                        });
347                 }
348                 Some(result) = backend_resp_stream.next() => {
349                     match result {
350                         (Ok((response, _)), response_handle) => {
351                             frontend_resp_stream.push(Self::h2_process_response(response, response_handle));
352                         }
353                         (Err(e), mut response_handle) => {
354                             error!("Handler chain returned error : {e:?}");
355                             let (parts, _) = generate_response::<HttpBody>(StatusCode::INTERNAL_SERVER_ERROR, false).into_parts();
356                             let response = http::Response::from_parts(parts, ());
357                             let _ = response_handle.send_response(response, true);
358                         }
359                     }
360                 }
361                 Some(_) = frontend_resp_stream.next() => {
362                 }
363                  else => {
364                     // No more futures to drive, break the loop
365                     // and drop the service.
366                     break;
367                  }
368            }
369        }
370
371        info!(
372            "H2 connection processing complete for {:?}",
373            ParamRef::<PeerAddr>::param_ref(&ctx)
374        );
375    }
376}
377
378impl<H, Stream, CXIn, CXStore, CXState, Err> Service<HttpAccept<Stream, CXIn>>
379    for HttpCoreService<H>
380where
381    CXIn: ParamRef<PeerAddr> + Fork<Store = CXStore, State = CXState>,
382    CXStore: 'static,
383    for<'a> CXState: Attach<CXStore>,
384    for<'a> H:
385        HttpHandler<<CXState as Attach<CXStore>>::Hdr<'a>, HttpBody, Body = HttpBody, Error = Err>,
386    Stream: Split + AsyncReadRent + AsyncWriteRent + Unpin + 'static,
387    Err: Into<AnyError> + Debug,
388{
389    type Response = ();
390    type Error = Infallible;
391
392    async fn call(
393        &self,
394        incoming_stream: HttpAccept<Stream, CXIn>,
395    ) -> Result<Self::Response, Self::Error> {
396        let (use_h2, stream, ctx) = incoming_stream;
397        if use_h2 {
398            self.h2_svc(stream, ctx).await
399        } else {
400            self.h1_svc(stream, ctx).await
401        }
402        Ok(())
403    }
404}
405
406// HttpCoreService is a Service and a MakeService.
407impl<F: MakeService> MakeService for HttpCoreService<F> {
408    type Service = HttpCoreService<F::Service>;
409    type Error = F::Error;
410
411    fn make_via_ref(&self, old: Option<&Self::Service>) -> Result<Self::Service, Self::Error> {
412        Ok(HttpCoreService {
413            handler_chain: self
414                .handler_chain
415                .make_via_ref(old.map(|o| &o.handler_chain))?,
416            http_timeout: self.http_timeout,
417        })
418    }
419}
420
421impl<F: AsyncMakeService> AsyncMakeService for HttpCoreService<F> {
422    type Service = HttpCoreService<F::Service>;
423    type Error = F::Error;
424
425    async fn make_via_ref(
426        &self,
427        old: Option<&Self::Service>,
428    ) -> Result<Self::Service, Self::Error> {
429        Ok(HttpCoreService {
430            handler_chain: self
431                .handler_chain
432                .make_via_ref(old.map(|o| &o.handler_chain))
433                .await?,
434            http_timeout: self.http_timeout,
435        })
436    }
437}
438/// Represents the timeout settings for the HTTP server.
439///
440/// The `HttpServerTimeout` struct contains three optional fields:
441/// - `keepalive_timeout`: The timeout for keeping the connection alive. If no byte is received
442///   within this timeout, the connection will be closed.
443/// - `read_header_timeout`: The timeout for reading the full HTTP header.
444/// - `read_body_timeout`: The timeout for receiving the full request body.
445///
446/// By default, the `keepalive_timeout` is set to 75 seconds, while the other two timeouts are not
447/// set.
448#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
449pub struct HttpServerTimeout {
450    pub keepalive_timeout: Option<Duration>,
451    pub read_header_timeout: Option<Duration>,
452    pub read_body_timeout: Option<Duration>,
453}
454
455impl Default for HttpServerTimeout {
456    fn default() -> Self {
457        const DEFAULT_KEEPALIVE_SEC: u64 = 75;
458        Self {
459            keepalive_timeout: Some(Duration::from_secs(DEFAULT_KEEPALIVE_SEC)),
460            read_header_timeout: None,
461            read_body_timeout: None,
462        }
463    }
464}
465
466impl<F> HttpCoreService<F> {
467    pub fn layer<C>() -> impl FactoryLayer<C, F, Factory = Self>
468    where
469        C: Param<HttpServerTimeout>,
470    {
471        layer_fn(|c: &C, inner| Self::new(inner, c.param()))
472    }
473}