1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154
use crate::{ default_status_handler, Context, DynHandler, DynStatusHandler, Model, Request, Response, Status, StatusHandler, TargetHandler, }; use futures::task::Poll; use http::{Request as HttpRequest, Response as HttpResponse}; use hyper::server::conn::{AddrIncoming, AddrStream}; use hyper::service::Service; use hyper::Body as HyperBody; use hyper::Server; use std::future::Future; use std::net::SocketAddr; use std::pin::Pin; use std::sync::Arc; pub struct App<M: Model> { handler: Arc<DynHandler<M>>, status_handler: Arc<DynStatusHandler<M>>, pub(crate) model: Arc<M>, } pub struct HttpService<M: Model> { app: App<M>, addr: SocketAddr, } impl<M: Model> App<M> { pub fn new(handler: Arc<DynHandler<M>>, model: Arc<M>) -> Self { Self { handler, status_handler: Arc::from(Box::new(default_status_handler).dynamic()), model, } } pub fn handle_status<F>( &mut self, handler: impl StatusHandler<M, StatusFuture = F>, ) -> &mut Self where F: 'static + Future<Output = Result<(), Status>> + Send, { self.status_handler = Arc::from(Box::new(handler).dynamic()); self } pub fn handle_status_fn<F>( &mut self, handler: impl 'static + Sync + Send + Fn(Context<M>, Status) -> F, ) -> &mut Self where F: 'static + Future<Output = Result<(), Status>> + Send, { self.handle_status(handler) } pub async fn serve(&self, req: Request, peer_addr: SocketAddr) -> Result<Response, Status> { let mut context = Context::new(req, self.clone(), peer_addr); let app = self.clone(); if let Err(status) = (app.handler)(context.clone()).await { (app.status_handler)(context.clone(), status).await?; } Ok(std::mem::take(&mut context.response)) } pub fn listen(&self, addr: SocketAddr) -> hyper::Server<AddrIncoming, App<M>> { log::info!("Server is listening on: http://{}", &addr); Server::bind(&addr).serve(self.clone()) } } macro_rules! impl_poll_ready { () => { fn poll_ready(&mut self, _cx: &mut std::task::Context<'_>) -> Poll<Result<(), Self::Error>> { Poll::Ready(Ok(())) } }; } type AppFuture<M> = Pin<Box<dyn 'static + Future<Output = Result<HttpService<M>, std::io::Error>> + Send>>; impl<M: Model> Service<&AddrStream> for App<M> { type Response = HttpService<M>; type Error = std::io::Error; type Future = AppFuture<M>; impl_poll_ready!(); fn call(&mut self, stream: &AddrStream) -> Self::Future { let addr = stream.remote_addr(); let app = self.clone(); Box::pin(async move { Ok(HttpService::new(app, addr)) }) } } type HttpFuture = Pin<Box<dyn 'static + Future<Output = Result<HttpResponse<HyperBody>, Status>> + Send>>; impl<M: Model> Service<HttpRequest<HyperBody>> for HttpService<M> { type Response = HttpResponse<HyperBody>; type Error = Status; type Future = HttpFuture; impl_poll_ready!(); fn call(&mut self, req: HttpRequest<HyperBody>) -> Self::Future { let service = self.clone(); Box::pin(async move { Ok(service.app.serve(req.into(), service.addr).await?.into()) }) } } impl<M: Model> HttpService<M> { pub fn new(app: App<M>, addr: SocketAddr) -> Self { Self { app, addr } } } impl<M: Model> Clone for App<M> { fn clone(&self) -> Self { Self { handler: self.handler.clone(), status_handler: self.status_handler.clone(), model: self.model.clone(), } } } impl<M: Model> Clone for HttpService<M> { fn clone(&self) -> Self { Self { app: self.app.clone(), addr: self.addr, } } } #[cfg(test)] mod tests { use crate::{Group, Request}; use std::time::Instant; #[tokio::test] async fn gate_simple() -> Result<(), Box<dyn std::error::Error>> { let app = Group::new() .handle_fn(|_ctx, next| { async move { let inbound = Instant::now(); next().await?; println!("time elapsed: {} ms", inbound.elapsed().as_millis()); Ok(()) } }) .app(()); let _resp = app.serve(Request::new(), "127.0.0.1:8080".parse()?).await?; Ok(()) } }