Crate hyper_caching_body

Crate hyper_caching_body 

Source
Expand description

Cacheable HTTP body

See CachingBody for details

§Example


use hyper::{server::conn::http1, service::service_fn};
use hyper_caching_body::CachingBody;
use hyper_util::rt::TokioIo;
use std::net::SocketAddr;
use tokio::net::{TcpListener, TcpStream};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
   let in_addr: SocketAddr = ([127, 0, 0, 1], 3001).into();
   let out_addr: SocketAddr = ([127, 0, 0, 1], 8080).into();

   let out_addr_clone = out_addr;

   let listener = TcpListener::bind(in_addr).await?;

   println!("Listening on http://{}", in_addr);
   println!("Proxying on http://{}", out_addr);

   loop {
       let (stream, _) = listener.accept().await?;
       let io = TokioIo::new(stream);

       let service = service_fn(move |mut req| {
           let uri_string = format!(
               "http://{}{}",
               out_addr_clone,
               req.uri()
                   .path_and_query()
                   .map(|x| x.as_str())
                   .unwrap_or("/")
           );
           let uri = uri_string.parse().unwrap();
           *req.uri_mut() = uri;

           let host = req.uri().host().expect("uri has no host");
           let port = req.uri().port_u16().unwrap_or(80);
           let addr = format!("{}:{}", host, port);

           async move {
               let client_stream = TcpStream::connect(addr).await.unwrap();
               let io = TokioIo::new(client_stream);

               let (mut sender, conn) = hyper::client::conn::http1::handshake(io).await?;
               tokio::task::spawn(async move {
                   if let Err(err) = conn.await {
                       println!("Connection failed: {:?}", err);
                   }
               });

               // Here we create a channel to receive buffer contents in another task

               let (tx, mut rx) = tokio::sync::mpsc::channel(1);
               let res = sender
                   .send_request(req)
                   .await
                   .map(|r| r.map(|b| CachingBody::new(b, tx))); // Wrap the body

               // Spawn a task to receive buffe`r contents in another task
               tokio::task::spawn(async move {
                   if let Some(body) = rx.recv().await {
                       dbg!(body);
                   }
               });
               res
           }
       });

       tokio::task::spawn(async move {
           if let Err(err) = http1::Builder::new().serve_connection(io, service).await {
               println!("Failed to serve the connection: {:?}", err);
           }
       });
   }
}

Structs§

CachingBody
A wrapper for hyper::body::Incoming that caches the contents in-flight On each