1#![forbid(unsafe_code)]
28
29#[cfg(feature = "compression")]
30pub mod compression;
31pub mod extractor;
32pub mod middleware;
33pub mod response;
34pub mod router;
35#[cfg(feature = "sse")]
36pub mod sse;
37#[cfg(feature = "static-files")]
38pub mod static_files;
39#[cfg(feature = "tower")]
40pub mod tower_compat;
41#[cfg(feature = "tower")]
42pub mod tower_middleware;
43#[cfg(feature = "websocket")]
44pub mod ws;
45#[cfg(feature = "websocket")]
46pub mod ws_frame;
47
48#[cfg(feature = "compression")]
49pub use compression::{Compression, CompressionAlgorithm, CompressionConfig};
50
51#[cfg(feature = "sse")]
52pub use sse::{SseEvent, SseResponse, SseSender};
53#[cfg(feature = "static-files")]
54pub use static_files::{ServeDir, ServeFile};
55
56#[cfg(feature = "tls")]
57pub mod tls;
58#[cfg(feature = "tls")]
59pub use tls::{PeerCertInfo, TlsConfig};
60
61#[cfg(feature = "h3")]
62pub mod h3;
63
64#[cfg(feature = "tower")]
65pub use tower_compat::RouterMakeService;
66#[cfg(feature = "tower")]
67pub use tower_middleware::{LoggingLayer, RequestIdLayer};
68
69#[cfg(feature = "websocket")]
70pub use ws::{CloseFrame, Message, WebSocket, WebSocketUpgrade};
71
72use bytes::Bytes;
73use http_body_util::Full;
74use hyper::service::service_fn;
75use hyper_util::rt::TokioExecutor;
76use hyper_util::server::conn::auto;
77use std::future::Future;
78use std::net::SocketAddr;
79use std::pin::Pin;
80use std::sync::Arc;
81use tokio::net::TcpListener;
82
83use middleware::MiddlewarePipeline;
84use oxihttp_core::OxiHttpError;
85
86pub use extractor::{FromRequestParts, RequestParts, TypedHeader};
87pub use middleware::{BodyLimitConfig, CorsConfig, RateLimiter, TimeoutConfig};
88pub use router::{Request, Router};
89
90#[derive(Debug, Clone, Default)]
96pub struct ServerHttp2Settings {
97 pub initial_stream_window_size: Option<u32>,
99 pub initial_connection_window_size: Option<u32>,
101 pub adaptive_window: Option<bool>,
103 pub max_concurrent_streams: Option<u32>,
105 pub max_frame_size: Option<u32>,
107 pub keep_alive_interval: Option<std::time::Duration>,
109 pub keep_alive_timeout: Option<std::time::Duration>,
111}
112
113fn configure_auto_builder(builder: &mut auto::Builder<TokioExecutor>, h2: &ServerHttp2Settings) {
118 let mut h2b = builder.http2();
119 if let Some(sz) = h2.initial_stream_window_size {
120 h2b.initial_stream_window_size(sz);
121 }
122 if let Some(sz) = h2.initial_connection_window_size {
123 h2b.initial_connection_window_size(sz);
124 }
125 if let Some(adaptive) = h2.adaptive_window {
126 h2b.adaptive_window(adaptive);
127 }
128 if let Some(n) = h2.max_concurrent_streams {
129 h2b.max_concurrent_streams(n);
130 }
131 if let Some(sz) = h2.max_frame_size {
132 h2b.max_frame_size(sz);
133 }
134 if let Some(interval) = h2.keep_alive_interval {
135 h2b.keep_alive_interval(interval);
136 }
137 if let Some(timeout) = h2.keep_alive_timeout {
138 h2b.keep_alive_timeout(timeout);
139 }
140}
141
142pub struct Server;
149
150impl Server {
151 pub fn bind(addr: &str) -> ServerBuilder {
153 ServerBuilder {
154 addr: addr.to_string(),
155 middleware: MiddlewarePipeline::new(),
156 graceful_shutdown: None,
157 max_connections: None,
158 tcp_nodelay: None,
159 tcp_keepalive: None,
160 http2_settings: None,
161 #[cfg(feature = "tls")]
162 tls: None,
163 alpn_protocols: Vec::new(),
164 #[cfg(feature = "tower")]
165 tower_layers: Vec::new(),
166 }
167 }
168}
169
170pub struct ServerBuilder {
172 addr: String,
173 middleware: MiddlewarePipeline,
174 graceful_shutdown: Option<Pin<Box<dyn Future<Output = ()> + Send>>>,
175 max_connections: Option<usize>,
176 tcp_nodelay: Option<bool>,
178 tcp_keepalive: Option<std::time::Duration>,
180 http2_settings: Option<ServerHttp2Settings>,
182 #[cfg(feature = "tls")]
183 tls: Option<tls::TlsConfig>,
184 alpn_protocols: Vec<Vec<u8>>,
191 #[cfg(feature = "tower")]
192 tower_layers: Vec<Arc<dyn tower_compat::ErasedLayer>>,
193}
194
195impl ServerBuilder {
196 pub fn with_cors(mut self, config: CorsConfig) -> Self {
198 self.middleware = self.middleware.with_cors(config);
199 self
200 }
201
202 pub fn with_body_limit(mut self, max_bytes: u64) -> Self {
204 self.middleware = self.middleware.with_body_limit(max_bytes);
205 self
206 }
207
208 pub fn with_rate_limiter(mut self, limiter: RateLimiter) -> Self {
210 self.middleware = self.middleware.with_rate_limiter(limiter);
211 self
212 }
213
214 pub fn with_timeout(mut self, duration: std::time::Duration) -> Self {
216 self.middleware = self.middleware.with_timeout(duration);
217 self
218 }
219
220 #[cfg(feature = "tower")]
229 pub fn with_layer<L>(mut self, layer: L) -> Self
230 where
231 L: tower_layer::Layer<tower_compat::BoxedRouterService> + Send + Sync + Clone + 'static,
232 L::Service: tower_service::Service<
233 http::Request<hyper::body::Incoming>,
234 Response = http::Response<Full<Bytes>>,
235 Error = OxiHttpError,
236 > + Clone
237 + Send
238 + 'static,
239 <L::Service as tower_service::Service<http::Request<hyper::body::Incoming>>>::Future:
240 Send + 'static,
241 {
242 self.tower_layers
243 .push(Arc::new(tower_compat::OwnedLayer(layer)));
244 self
245 }
246
247 pub fn with_max_connections(mut self, n: usize) -> Self {
249 self.max_connections = Some(n);
250 self
251 }
252
253 pub fn with_tcp_nodelay(mut self, nodelay: bool) -> Self {
255 self.tcp_nodelay = Some(nodelay);
256 self
257 }
258
259 pub fn with_tcp_keepalive(mut self, duration: std::time::Duration) -> Self {
261 self.tcp_keepalive = Some(duration);
262 self
263 }
264
265 pub fn with_http2_settings(mut self, settings: ServerHttp2Settings) -> Self {
267 self.http2_settings = Some(settings);
268 self
269 }
270
271 pub fn with_graceful_shutdown<F>(mut self, signal: F) -> Self
273 where
274 F: Future<Output = ()> + Send + 'static,
275 {
276 self.graceful_shutdown = Some(Box::pin(signal));
277 self
278 }
279
280 pub fn shutdown_on_ctrl_c(self) -> Self {
282 self.with_graceful_shutdown(async {
283 let _ = tokio::signal::ctrl_c().await;
284 })
285 }
286
287 #[cfg(feature = "tls")]
292 pub fn with_tls(mut self, config: tls::TlsConfig) -> Self {
293 self.tls = Some(config);
294 self
295 }
296
297 #[cfg(feature = "tls")]
302 pub fn with_tls_from_pem(
303 mut self,
304 cert_pem: &[u8],
305 key_pem: &[u8],
306 ) -> Result<Self, OxiHttpError> {
307 self.tls = Some(tls::TlsConfig::from_pem_with_alpn(
308 cert_pem,
309 key_pem,
310 &self.alpn_protocols,
311 )?);
312 Ok(self)
313 }
314
315 #[cfg(feature = "tls")]
320 pub fn with_tls_from_der(
321 mut self,
322 certs: Vec<rustls_pki_types::CertificateDer<'static>>,
323 key: rustls_pki_types::PrivateKeyDer<'static>,
324 ) -> Result<Self, OxiHttpError> {
325 self.tls = Some(tls::TlsConfig::from_der_with_alpn(
326 certs,
327 key,
328 &self.alpn_protocols,
329 )?);
330 Ok(self)
331 }
332
333 #[cfg(feature = "tls")]
353 pub fn with_alpn<P: AsRef<[u8]>>(mut self, protocols: impl IntoIterator<Item = P>) -> Self {
354 self.alpn_protocols = protocols.into_iter().map(|p| p.as_ref().to_vec()).collect();
355 self
356 }
357
358 pub async fn serve(self, router: Router) -> Result<(), OxiHttpError> {
363 let listener = TcpListener::bind(&self.addr)
364 .await
365 .map_err(|e| OxiHttpError::Io(Arc::new(e)))?;
366 run_server(
367 listener,
368 router,
369 self.middleware,
370 self.max_connections,
371 self.tcp_nodelay,
372 self.tcp_keepalive,
373 self.http2_settings,
374 self.graceful_shutdown,
375 #[cfg(feature = "tls")]
376 self.tls,
377 #[cfg(feature = "tower")]
378 self.tower_layers,
379 )
380 .await
381 }
382
383 pub async fn serve_with_addr(
387 self,
388 router: Router,
389 ) -> Result<
390 (
391 SocketAddr,
392 tokio::task::JoinHandle<Result<(), OxiHttpError>>,
393 ),
394 OxiHttpError,
395 > {
396 let listener = TcpListener::bind(&self.addr)
397 .await
398 .map_err(|e| OxiHttpError::Io(Arc::new(e)))?;
399 let addr = listener
400 .local_addr()
401 .map_err(|e| OxiHttpError::Io(Arc::new(e)))?;
402
403 let router = Arc::new(router);
404 let middleware = Arc::new(self.middleware);
405 let connection_semaphore = self
406 .max_connections
407 .map(|n| Arc::new(tokio::sync::Semaphore::new(n)));
408 let graceful_shutdown = self.graceful_shutdown;
409 let tcp_nodelay = self.tcp_nodelay;
410 let tcp_keepalive = self.tcp_keepalive;
411 let http2_settings = self.http2_settings.map(Arc::new);
412
413 #[cfg(feature = "tls")]
414 let tls_acceptor = self.tls.map(|c| Arc::new(c.acceptor));
415
416 #[cfg(feature = "tower")]
417 let tower_layers = self.tower_layers;
418 #[cfg(not(feature = "tower"))]
419 let tower_layers: Vec<()> = Vec::new();
420
421 let handle = tokio::spawn(async move {
422 let accept_handle = tokio::spawn(accept_loop(
423 listener,
424 router,
425 middleware,
426 connection_semaphore,
427 tcp_nodelay,
428 tcp_keepalive,
429 http2_settings,
430 tower_layers,
431 #[cfg(feature = "tls")]
432 tls_acceptor,
433 ));
434
435 if let Some(shutdown) = graceful_shutdown {
436 tokio::select! {
437 _ = shutdown => {}
438 result = accept_handle => {
439 if let Err(e) = result {
440 return Err(OxiHttpError::Server(format!(
441 "accept loop panicked: {e}"
442 )));
443 }
444 }
445 }
446 } else {
447 accept_handle
448 .await
449 .map_err(|e| OxiHttpError::Server(format!("accept loop panicked: {e}")))?;
450 }
451
452 Ok(())
453 });
454
455 Ok((addr, handle))
456 }
457}
458
459pub struct BoundServer {
465 listener: TcpListener,
466 addr: SocketAddr,
467 inner: ServerBuilder,
468}
469
470impl BoundServer {
471 pub fn local_addr(&self) -> SocketAddr {
473 self.addr
474 }
475
476 pub async fn serve(self, router: Router) -> Result<(), OxiHttpError> {
478 run_server(
479 self.listener,
480 router,
481 self.inner.middleware,
482 self.inner.max_connections,
483 self.inner.tcp_nodelay,
484 self.inner.tcp_keepalive,
485 self.inner.http2_settings,
486 self.inner.graceful_shutdown,
487 #[cfg(feature = "tls")]
488 self.inner.tls,
489 #[cfg(feature = "tower")]
490 self.inner.tower_layers,
491 )
492 .await
493 }
494}
495
496impl ServerBuilder {
497 pub async fn listen(self) -> Result<BoundServer, OxiHttpError> {
513 let listener = TcpListener::bind(&self.addr)
514 .await
515 .map_err(|e| OxiHttpError::Io(Arc::new(e)))?;
516 let addr = listener
517 .local_addr()
518 .map_err(|e| OxiHttpError::Io(Arc::new(e)))?;
519 Ok(BoundServer {
520 listener,
521 addr,
522 inner: self,
523 })
524 }
525}
526
527impl std::fmt::Debug for ServerBuilder {
528 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
529 let mut s = f.debug_struct("ServerBuilder");
530 s.field("addr", &self.addr)
531 .field("middleware", &self.middleware)
532 .field("max_connections", &self.max_connections)
533 .field("tcp_nodelay", &self.tcp_nodelay)
534 .field("tcp_keepalive", &self.tcp_keepalive)
535 .field("http2_settings", &self.http2_settings)
536 .field("alpn_protocols_count", &self.alpn_protocols.len());
537 #[cfg(feature = "tls")]
538 s.field("tls", &self.tls);
539 #[cfg(feature = "tower")]
540 s.field("tower_layers", &self.tower_layers.len());
541 s.finish()
542 }
543}
544
545#[allow(clippy::too_many_arguments)]
551async fn run_server(
552 listener: TcpListener,
553 router: Router,
554 middleware: MiddlewarePipeline,
555 max_connections: Option<usize>,
556 tcp_nodelay: Option<bool>,
557 tcp_keepalive: Option<std::time::Duration>,
558 http2_settings: Option<ServerHttp2Settings>,
559 graceful_shutdown: Option<Pin<Box<dyn Future<Output = ()> + Send>>>,
560 #[cfg(feature = "tls")] tls: Option<tls::TlsConfig>,
561 #[cfg(feature = "tower")] tower_layers: Vec<Arc<dyn tower_compat::ErasedLayer>>,
562) -> Result<(), OxiHttpError> {
563 let router = Arc::new(router);
564 let middleware = Arc::new(middleware);
565 let connection_semaphore = max_connections.map(|n| Arc::new(tokio::sync::Semaphore::new(n)));
566 let http2_settings = http2_settings.map(Arc::new);
567
568 #[cfg(feature = "tls")]
569 let tls_acceptor = tls.map(|c| Arc::new(c.acceptor));
570
571 #[cfg(feature = "tower")]
572 let tower_layers_val = tower_layers;
573 #[cfg(not(feature = "tower"))]
574 let tower_layers_val: Vec<()> = Vec::new();
575
576 let accept_handle = tokio::spawn(accept_loop(
578 listener,
579 router,
580 middleware,
581 connection_semaphore,
582 tcp_nodelay,
583 tcp_keepalive,
584 http2_settings,
585 tower_layers_val,
586 #[cfg(feature = "tls")]
587 tls_acceptor,
588 ));
589
590 if let Some(shutdown) = graceful_shutdown {
592 tokio::select! {
593 _ = shutdown => {
594 }
596 result = accept_handle => {
597 if let Err(e) = result {
598 return Err(OxiHttpError::Server(format!("accept loop panicked: {e}")));
599 }
600 }
601 }
602 } else {
603 accept_handle
604 .await
605 .map_err(|e| OxiHttpError::Server(format!("accept loop panicked: {e}")))?;
606 }
607
608 Ok(())
609}
610
611#[cfg(not(feature = "tower"))]
616#[allow(clippy::too_many_arguments)]
617async fn accept_loop(
618 listener: TcpListener,
619 router: Arc<Router>,
620 middleware: Arc<MiddlewarePipeline>,
621 semaphore: Option<Arc<tokio::sync::Semaphore>>,
622 tcp_nodelay: Option<bool>,
623 tcp_keepalive: Option<std::time::Duration>,
624 http2_settings: Option<Arc<ServerHttp2Settings>>,
625 _tower_layers: Vec<()>,
626 #[cfg(feature = "tls")] tls_acceptor: Option<Arc<tokio_rustls::TlsAcceptor>>,
627) {
628 loop {
629 let accept_result = listener.accept().await;
630 let (stream, _remote_addr) = match accept_result {
631 Ok(conn) => conn,
632 Err(_) => continue,
633 };
634
635 if let Some(nodelay) = tcp_nodelay {
637 let _ = stream.set_nodelay(nodelay);
638 }
639 if let Some(ka_dur) = tcp_keepalive {
640 let ka = socket2::TcpKeepalive::new().with_time(ka_dur);
641 let _ = socket2::SockRef::from(&stream).set_tcp_keepalive(&ka);
642 }
643
644 let router = Arc::clone(&router);
645 let middleware = Arc::clone(&middleware);
646 let permit = if let Some(ref sem) = semaphore {
647 match sem.clone().try_acquire_owned() {
648 Ok(p) => Some(p),
649 Err(_) => continue,
650 }
651 } else {
652 None
653 };
654
655 #[cfg(feature = "tls")]
656 let tls = tls_acceptor.clone();
657 let h2_cfg = http2_settings.clone();
658
659 tokio::spawn(async move {
660 #[cfg(feature = "tls")]
661 if let Some(acceptor) = tls {
662 if let Ok(tls_stream) = acceptor.accept(stream).await {
663 use oxitls::TlsConnectionExt as _;
667 let conn_info = tls_stream.tls_connection_info();
668 let (_, server_conn) = tls_stream.get_ref();
669 let peer_info: Arc<tls::PeerCertInfo> = Arc::new(tls::PeerCertInfo {
670 peer_certificates: server_conn
671 .peer_certificates()
672 .map(|certs| certs.iter().map(|c| c.clone().into_owned()).collect())
673 .unwrap_or_default(),
674 alpn_protocol: conn_info.alpn_protocol.clone(),
675 protocol_version: server_conn.protocol_version().map(|v| format!("{v:?}")),
677 version: conn_info.version,
678 cipher_suite: conn_info.cipher_suite,
679 sni: conn_info.sni.clone(),
680 });
681
682 let svc = service_fn(move |mut req: hyper::Request<hyper::body::Incoming>| {
683 let router = Arc::clone(&router);
684 let middleware = Arc::clone(&middleware);
685 let pi = peer_info.clone();
686 async move {
687 req.extensions_mut().insert(pi);
688 dispatch_with_middleware(router, middleware, req).await
689 }
690 });
691 let mut builder = auto::Builder::new(TokioExecutor::new());
692 if let Some(ref h2) = h2_cfg {
693 configure_auto_builder(&mut builder, h2);
694 }
695 let io = hyper_util::rt::TokioIo::new(tls_stream);
696 let _ = builder.serve_connection_with_upgrades(io, svc).await;
697 }
698 drop(permit);
699 return;
700 }
701
702 let mut builder = auto::Builder::new(TokioExecutor::new());
705 if let Some(ref h2) = h2_cfg {
706 configure_auto_builder(&mut builder, h2);
707 }
708 let svc = service_fn(move |req: hyper::Request<hyper::body::Incoming>| {
709 let router = Arc::clone(&router);
710 let middleware = Arc::clone(&middleware);
711 async move { dispatch_with_middleware(router, middleware, req).await }
712 });
713 let io = hyper_util::rt::TokioIo::new(stream);
714 let _ = builder.serve_connection_with_upgrades(io, svc).await;
715 drop(permit);
716 });
717 }
718}
719
720#[cfg(feature = "tower")]
725#[allow(clippy::too_many_arguments)]
726async fn accept_loop(
727 listener: TcpListener,
728 router: Arc<Router>,
729 middleware: Arc<MiddlewarePipeline>,
730 semaphore: Option<Arc<tokio::sync::Semaphore>>,
731 tcp_nodelay: Option<bool>,
732 tcp_keepalive: Option<std::time::Duration>,
733 http2_settings: Option<Arc<ServerHttp2Settings>>,
734 tower_layers: Vec<Arc<dyn tower_compat::ErasedLayer>>,
735 #[cfg(feature = "tls")] tls_acceptor: Option<Arc<tokio_rustls::TlsAcceptor>>,
736) {
737 use tower_service::Service as _;
738
739 let layered_svc = tower_compat::build_layered_service(Arc::clone(&router), &tower_layers);
741
742 loop {
743 let accept_result = listener.accept().await;
744 let (stream, _remote_addr) = match accept_result {
745 Ok(conn) => conn,
746 Err(_) => continue,
747 };
748
749 if let Some(nodelay) = tcp_nodelay {
751 let _ = stream.set_nodelay(nodelay);
752 }
753 if let Some(ka_dur) = tcp_keepalive {
754 let ka = socket2::TcpKeepalive::new().with_time(ka_dur);
755 let _ = socket2::SockRef::from(&stream).set_tcp_keepalive(&ka);
756 }
757
758 let middleware = Arc::clone(&middleware);
759 let permit = if let Some(ref sem) = semaphore {
760 match sem.clone().try_acquire_owned() {
761 Ok(p) => Some(p),
762 Err(_) => continue,
763 }
764 } else {
765 None
766 };
767
768 #[cfg(feature = "tls")]
769 let tls = tls_acceptor.clone();
770 let h2_cfg = http2_settings.clone();
771
772 let conn_svc = layered_svc.clone();
774
775 tokio::spawn(async move {
776 let mut builder = auto::Builder::new(TokioExecutor::new());
777 if let Some(ref h2) = h2_cfg {
778 configure_auto_builder(&mut builder, h2);
779 }
780
781 #[cfg(feature = "tls")]
782 if let Some(acceptor) = tls {
783 if let Ok(tls_stream) = acceptor.accept(stream).await {
784 use oxitls::TlsConnectionExt as _;
788 let conn_info = tls_stream.tls_connection_info();
789 let (_, server_conn) = tls_stream.get_ref();
790 let peer_info: Arc<tls::PeerCertInfo> = Arc::new(tls::PeerCertInfo {
791 peer_certificates: server_conn
792 .peer_certificates()
793 .map(|certs| certs.iter().map(|c| c.clone().into_owned()).collect())
794 .unwrap_or_default(),
795 alpn_protocol: conn_info.alpn_protocol.clone(),
796 protocol_version: server_conn.protocol_version().map(|v| format!("{v:?}")),
798 version: conn_info.version,
799 cipher_suite: conn_info.cipher_suite,
800 sni: conn_info.sni.clone(),
801 });
802
803 let svc_tls =
804 service_fn(move |mut req: hyper::Request<hyper::body::Incoming>| {
805 let middleware = Arc::clone(&middleware);
806 let mut svc = conn_svc.clone();
809 let peer_info = Arc::clone(&peer_info);
810 async move {
811 req.extensions_mut().insert(peer_info);
812
813 if let Some(result) = middleware.pre_handle(&req).await {
815 return result.map_err(|e| OxiHttpError::Server(e.to_string()));
816 }
817
818 let origin = req
819 .headers()
820 .get(http::header::ORIGIN)
821 .and_then(|v| v.to_str().ok())
822 .map(|s| s.to_string());
823
824 let handler_result =
825 if let Some(ref timeout_config) = middleware.timeout {
826 match tokio::time::timeout(
827 timeout_config.duration,
828 svc.call(req),
829 )
830 .await
831 {
832 Ok(result) => result,
833 Err(_) => middleware::TimeoutConfig::timeout_response(),
834 }
835 } else {
836 svc.call(req).await
837 };
838
839 match handler_result {
840 Ok(mut resp) => {
841 middleware.post_handle(&mut resp, origin.as_deref());
842 Ok(resp)
843 }
844 Err(e) => {
845 let mut resp = hyper::Response::builder()
846 .status(http::StatusCode::INTERNAL_SERVER_ERROR)
847 .body(Full::new(Bytes::from(e.to_string())))
848 .map_err(|e2| OxiHttpError::Server(e2.to_string()))?;
849 middleware.post_handle(&mut resp, origin.as_deref());
850 Ok(resp)
851 }
852 }
853 }
854 });
855
856 let io = hyper_util::rt::TokioIo::new(tls_stream);
857 let _ = builder.serve_connection_with_upgrades(io, svc_tls).await;
858 }
859 drop(permit);
860 return;
861 }
862
863 let svc = service_fn(move |req: hyper::Request<hyper::body::Incoming>| {
864 let middleware = Arc::clone(&middleware);
865 let mut svc = conn_svc.clone();
868 async move {
869 if let Some(result) = middleware.pre_handle(&req).await {
871 return result.map_err(|e| OxiHttpError::Server(e.to_string()));
872 }
873
874 let origin = req
875 .headers()
876 .get(http::header::ORIGIN)
877 .and_then(|v| v.to_str().ok())
878 .map(|s| s.to_string());
879
880 let handler_result = if let Some(ref timeout_config) = middleware.timeout {
884 match tokio::time::timeout(timeout_config.duration, svc.call(req)).await {
885 Ok(result) => result,
886 Err(_) => middleware::TimeoutConfig::timeout_response(),
887 }
888 } else {
889 svc.call(req).await
890 };
891
892 match handler_result {
893 Ok(mut resp) => {
894 middleware.post_handle(&mut resp, origin.as_deref());
895 Ok(resp)
896 }
897 Err(e) => {
898 let mut resp = hyper::Response::builder()
899 .status(http::StatusCode::INTERNAL_SERVER_ERROR)
900 .body(Full::new(Bytes::from(e.to_string())))
901 .map_err(|e2| OxiHttpError::Server(e2.to_string()))?;
902 middleware.post_handle(&mut resp, origin.as_deref());
903 Ok(resp)
904 }
905 }
906 }
907 });
908
909 let io = hyper_util::rt::TokioIo::new(stream);
910 let _ = builder.serve_connection_with_upgrades(io, svc).await;
911 drop(permit);
912 });
913 }
914}
915
916#[cfg(not(feature = "tower"))]
921async fn dispatch_with_middleware(
922 router: Arc<Router>,
923 middleware: Arc<MiddlewarePipeline>,
924 req: hyper::Request<hyper::body::Incoming>,
925) -> Result<hyper::Response<Full<Bytes>>, OxiHttpError> {
926 if let Some(result) = middleware.pre_handle(&req).await {
928 return result.map_err(|e| OxiHttpError::Server(e.to_string()));
929 }
930
931 let origin = req
932 .headers()
933 .get(http::header::ORIGIN)
934 .and_then(|v| v.to_str().ok())
935 .map(|s| s.to_string());
936
937 let handler_result = if let Some(ref timeout_config) = middleware.timeout {
939 match tokio::time::timeout(timeout_config.duration, router.dispatch(req)).await {
940 Ok(result) => result,
941 Err(_) => middleware::TimeoutConfig::timeout_response(),
942 }
943 } else {
944 router.dispatch(req).await
945 };
946
947 match handler_result {
948 Ok(mut resp) => {
949 middleware.post_handle(&mut resp, origin.as_deref());
950 Ok(resp)
951 }
952 Err(e) => {
953 let mut resp = hyper::Response::builder()
954 .status(http::StatusCode::INTERNAL_SERVER_ERROR)
955 .body(Full::new(Bytes::from(e.to_string())))
956 .map_err(|e2| OxiHttpError::Server(e2.to_string()))?;
957 middleware.post_handle(&mut resp, origin.as_deref());
958 Ok(resp)
959 }
960 }
961}