1use std::{convert::Infallible, fmt::Debug, pin::Pin, time::Duration};
67
68use bytes::Bytes;
69use certain_map::{Attach, Fork};
70use futures::{stream::FuturesUnordered, StreamExt};
71use http::StatusCode;
72use monoio::io::{sink::SinkExt, stream::Stream, AsyncReadRent, AsyncWriteRent, Split, Splitable};
73use monoio_http::{
74 common::{
75 body::{Body, HttpBody, StreamHint},
76 response::Response,
77 },
78 h1::codec::{
79 decoder::{FillPayload, RequestDecoder},
80 encoder::GenericEncoder,
81 },
82 h2::server::SendResponse,
83};
84use monolake_core::{
85 context::PeerAddr,
86 http::{HttpAccept, HttpHandler},
87 AnyError,
88};
89use service_async::{
90 layer::{layer_fn, FactoryLayer},
91 AsyncMakeService, MakeService, Param, ParamRef, Service,
92};
93use tracing::{error, info, warn};
94
95use super::{generate_response, util::AccompanyPair};
96
97#[derive(Clone)]
105pub struct HttpCoreService<H> {
106 handler_chain: H,
107 http_timeout: HttpServerTimeout,
108}
109
110impl<H> HttpCoreService<H> {
111 pub fn new(handler_chain: H, http_timeout: HttpServerTimeout) -> Self {
112 HttpCoreService {
113 handler_chain,
114 http_timeout,
115 }
116 }
117
118 async fn h1_svc<S, CXIn, CXStore, CXState, Err>(&self, stream: S, ctx: CXIn)
119 where
120 CXIn: ParamRef<PeerAddr> + Fork<Store = CXStore, State = CXState>,
121 CXStore: 'static,
122 for<'a> CXState: Attach<CXStore>,
123 for<'a> H: HttpHandler<
124 <CXState as Attach<CXStore>>::Hdr<'a>,
125 HttpBody,
126 Body = HttpBody,
127 Error = Err,
128 >,
129 Err: Into<AnyError> + Debug,
130 S: Split + AsyncReadRent + AsyncWriteRent,
131 {
132 let (reader, writer) = stream.into_split();
133 let mut decoder = RequestDecoder::new(reader);
134 let mut encoder = GenericEncoder::new(writer);
135 decoder.set_timeout(self.http_timeout.keepalive_timeout);
136
137 loop {
138 let decoded = match self.http_timeout.read_header_timeout {
140 Some(header_timeout) => {
141 match monoio::time::timeout(header_timeout, decoder.next()).await {
142 Ok(inner) => inner,
143 Err(_) => {
144 info!(
145 "Connection {:?} decode http header timed out",
146 ParamRef::<PeerAddr>::param_ref(&ctx),
147 );
148 break;
149 }
150 }
151 }
152 None => decoder.next().await,
153 };
154
155 let req = match decoded {
156 Some(Ok(req)) => HttpBody::request(req),
157 Some(Err(err)) => {
158 warn!("decode request header failed: {err}");
160 break;
161 }
162 None => {
163 info!(
165 "Connection {:?} closed",
166 ParamRef::<PeerAddr>::param_ref(&ctx),
167 );
168 break;
169 }
170 };
171
172 let (mut store, state) = ctx.fork();
174 let forked_ctx = unsafe { state.attach(&mut store) };
175
176 let mut acc_fut = AccompanyPair::new(
179 self.handler_chain.handle(req, forked_ctx),
180 decoder.fill_payload(),
181 );
182 let res = unsafe { Pin::new_unchecked(&mut acc_fut) }.await;
183 match res {
184 Ok((resp, should_cont)) => {
185 let mut f = acc_fut.replace(encoder.send_and_flush(resp));
187 match self.http_timeout.read_body_timeout {
188 None => {
189 if let Err(e) = unsafe { Pin::new_unchecked(&mut f) }.await {
190 warn!("error when encode and write response: {e}");
191 break;
192 }
193 }
194 Some(body_timeout) => {
195 match monoio::time::timeout(body_timeout, unsafe {
196 Pin::new_unchecked(&mut f)
197 })
198 .await
199 {
200 Err(_) => {
201 info!(
202 "Connection {:?} write timed out",
203 ParamRef::<PeerAddr>::param_ref(&ctx),
204 );
205 break;
206 }
207 Ok(Err(e)) => {
208 warn!("error when encode and write response: {e}");
209 break;
210 }
211 _ => (),
212 }
213 }
214 }
215
216 if !should_cont {
217 break;
218 }
219 if let Err(e) = f.into_accompany().await {
220 warn!("error when decode request body: {e}");
221 break;
222 }
223 }
224 Err(e) => {
225 error!("error when processing request: {e:?}");
227 if let Err(e) = encoder
228 .send_and_flush(generate_response::<HttpBody>(
229 StatusCode::INTERNAL_SERVER_ERROR,
230 true,
231 ))
232 .await
233 {
234 warn!("error when reply client: {e}");
235 }
236 break;
237 }
238 }
239 }
240 }
241
242 async fn h2_process_response(
243 response: Response<HttpBody>,
244 mut response_handle: SendResponse<Bytes>,
245 ) {
246 let (mut parts, mut body) = response.into_parts();
247 parts.headers.remove("connection");
248 let response = http::Response::from_parts(parts, ());
249
250 match body.stream_hint() {
251 StreamHint::None => {
252 if let Err(e) = response_handle.send_response(response, true) {
253 error!("H2 frontend response send fail {:?}", e);
254 }
255 }
256 StreamHint::Fixed => {
257 let mut send_stream = match response_handle.send_response(response, false) {
258 Ok(s) => s,
259 Err(e) => {
260 error!("H2 frontend response send fail {:?}", e);
261 return;
262 }
263 };
264
265 if let Some(Ok(data)) = body.next_data().await {
266 let _ = send_stream.send_data(data, true);
267 }
268 }
269 StreamHint::Stream => {
270 let mut send_stream = match response_handle.send_response(response, false) {
271 Ok(s) => s,
272 Err(e) => {
273 error!("H2 frontend response send fail {:?}", e);
274 return;
275 }
276 };
277
278 while let Some(Ok(data)) = body.next_data().await {
279 let _ = send_stream.send_data(data, false);
280 }
281
282 let _ = send_stream.send_data(Bytes::new(), true);
283 }
284 }
285 }
286
287 async fn h2_svc<S, CXIn, CXStore, CXState, Err>(&self, stream: S, ctx: CXIn)
288 where
289 CXIn: ParamRef<PeerAddr> + Fork<Store = CXStore, State = CXState>,
290 CXStore: 'static,
291 for<'a> CXState: Attach<CXStore>,
292 for<'a> H: HttpHandler<
293 <CXState as Attach<CXStore>>::Hdr<'a>,
294 HttpBody,
295 Body = HttpBody,
296 Error = Err,
297 >,
298 Err: Into<AnyError> + Debug,
299 S: Split + AsyncReadRent + AsyncWriteRent + Unpin + 'static,
300 {
301 let mut connection = match monoio_http::h2::server::Builder::new()
302 .initial_window_size(1_000_000)
303 .max_concurrent_streams(1000)
304 .handshake::<S, Bytes>(stream)
305 .await
306 {
307 Ok(c) => {
308 info!(
309 "H2 handshake complete for {:?}",
310 ParamRef::<PeerAddr>::param_ref(&ctx),
311 );
312 c
313 }
314 Err(e) => {
315 error!("h2 server build failed: {e:?}");
316 return;
317 }
318 };
319
320 let (tx, mut rx) = local_sync::mpsc::unbounded::channel();
321 let mut backend_resp_stream = FuturesUnordered::new();
322 let mut frontend_resp_stream = FuturesUnordered::new();
323
324 monoio::spawn(async move {
325 let tx = tx.clone();
326 while let Some(result) = connection.accept().await {
327 match tx.send(result) {
328 Ok(_) => {}
329 Err(e) => {
330 error!("Frontend Req send failed {e:?}");
331 break;
332 }
333 }
334 }
335 });
336
337 loop {
338 monoio::select! {
339 Some(Ok((request, response_handle))) = rx.recv() => {
340 let request = HttpBody::request(request);
341 let (mut store, state) = ctx.fork();
343 backend_resp_stream.push(async move {
344 let forked_ctx = unsafe { state.attach(&mut store) };
345 (self.handler_chain.handle(request, forked_ctx).await, response_handle)
346 });
347 }
348 Some(result) = backend_resp_stream.next() => {
349 match result {
350 (Ok((response, _)), response_handle) => {
351 frontend_resp_stream.push(Self::h2_process_response(response, response_handle));
352 }
353 (Err(e), mut response_handle) => {
354 error!("Handler chain returned error : {e:?}");
355 let (parts, _) = generate_response::<HttpBody>(StatusCode::INTERNAL_SERVER_ERROR, false).into_parts();
356 let response = http::Response::from_parts(parts, ());
357 let _ = response_handle.send_response(response, true);
358 }
359 }
360 }
361 Some(_) = frontend_resp_stream.next() => {
362 }
363 else => {
364 break;
367 }
368 }
369 }
370
371 info!(
372 "H2 connection processing complete for {:?}",
373 ParamRef::<PeerAddr>::param_ref(&ctx)
374 );
375 }
376}
377
378impl<H, Stream, CXIn, CXStore, CXState, Err> Service<HttpAccept<Stream, CXIn>>
379 for HttpCoreService<H>
380where
381 CXIn: ParamRef<PeerAddr> + Fork<Store = CXStore, State = CXState>,
382 CXStore: 'static,
383 for<'a> CXState: Attach<CXStore>,
384 for<'a> H:
385 HttpHandler<<CXState as Attach<CXStore>>::Hdr<'a>, HttpBody, Body = HttpBody, Error = Err>,
386 Stream: Split + AsyncReadRent + AsyncWriteRent + Unpin + 'static,
387 Err: Into<AnyError> + Debug,
388{
389 type Response = ();
390 type Error = Infallible;
391
392 async fn call(
393 &self,
394 incoming_stream: HttpAccept<Stream, CXIn>,
395 ) -> Result<Self::Response, Self::Error> {
396 let (use_h2, stream, ctx) = incoming_stream;
397 if use_h2 {
398 self.h2_svc(stream, ctx).await
399 } else {
400 self.h1_svc(stream, ctx).await
401 }
402 Ok(())
403 }
404}
405
406impl<F: MakeService> MakeService for HttpCoreService<F> {
408 type Service = HttpCoreService<F::Service>;
409 type Error = F::Error;
410
411 fn make_via_ref(&self, old: Option<&Self::Service>) -> Result<Self::Service, Self::Error> {
412 Ok(HttpCoreService {
413 handler_chain: self
414 .handler_chain
415 .make_via_ref(old.map(|o| &o.handler_chain))?,
416 http_timeout: self.http_timeout,
417 })
418 }
419}
420
421impl<F: AsyncMakeService> AsyncMakeService for HttpCoreService<F> {
422 type Service = HttpCoreService<F::Service>;
423 type Error = F::Error;
424
425 async fn make_via_ref(
426 &self,
427 old: Option<&Self::Service>,
428 ) -> Result<Self::Service, Self::Error> {
429 Ok(HttpCoreService {
430 handler_chain: self
431 .handler_chain
432 .make_via_ref(old.map(|o| &o.handler_chain))
433 .await?,
434 http_timeout: self.http_timeout,
435 })
436 }
437}
438#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
449pub struct HttpServerTimeout {
450 pub keepalive_timeout: Option<Duration>,
451 pub read_header_timeout: Option<Duration>,
452 pub read_body_timeout: Option<Duration>,
453}
454
455impl Default for HttpServerTimeout {
456 fn default() -> Self {
457 const DEFAULT_KEEPALIVE_SEC: u64 = 75;
458 Self {
459 keepalive_timeout: Some(Duration::from_secs(DEFAULT_KEEPALIVE_SEC)),
460 read_header_timeout: None,
461 read_body_timeout: None,
462 }
463 }
464}
465
466impl<F> HttpCoreService<F> {
467 pub fn layer<C>() -> impl FactoryLayer<C, F, Factory = Self>
468 where
469 C: Param<HttpServerTimeout>,
470 {
471 layer_fn(|c: &C, inner| Self::new(inner, c.param()))
472 }
473}