use crate::LambdaRuntimeApiClient;
use anyhow::Result;
use hyper::{
body::{Body, Incoming},
server::conn::http1,
service::service_fn,
Request, Response,
};
use hyper_util::rt::TokioIo;
use std::{future::Future, net::SocketAddr};
use tokio::{io, net::TcpListener};
use tracing::{debug, error};
pub struct MockLambdaRuntimeApiServer(TcpListener);
impl MockLambdaRuntimeApiServer {
pub async fn bind(port: u16) -> io::Result<Self> {
let listener = TcpListener::bind(SocketAddr::from(([127, 0, 0, 1], port))).await?;
debug!("Listening on: {}", listener.local_addr()?);
Ok(Self(listener))
}
pub async fn handle_next<ResBody, Fut>(
&self,
processor: impl Fn(Request<Incoming>) -> Fut + Send + Sync + 'static,
) -> io::Result<()>
where
ResBody: hyper::body::Body + Send + 'static,
<ResBody as Body>::Error: Into<Box<dyn std::error::Error + Send + Sync>> + Send,
Fut: Future<Output = Result<Response<ResBody>>> + Send,
<ResBody as Body>::Data: Send,
{
let (stream, peer) = self.0.accept().await?;
debug!("Accepted connection from: {}", peer);
tokio::spawn(async move {
if let Err(err) = http1::Builder::new()
.serve_connection(
TokioIo::new(stream),
service_fn(|req| async { processor(req).await }),
)
.await
{
error!("Error serving connection: {:?}", err);
}
});
Ok(())
}
pub async fn serve<ResBody, Fut>(
&self,
processor: impl Fn(Request<Incoming>) -> Fut + Send + Sync + Clone + 'static,
) where
Fut: Future<Output = Result<Response<ResBody>>> + Send,
ResBody: hyper::body::Body + Send + 'static,
<ResBody as Body>::Error: Into<Box<dyn std::error::Error + Send + Sync>> + Send,
<ResBody as Body>::Data: Send,
{
loop {
if let Err(err) = self.handle_next(processor.clone()).await {
error!("Error handling connection: {:?}", err);
}
}
}
pub async fn passthrough(&self) {
self
.serve(|req| async {
LambdaRuntimeApiClient::new().await?.forward(req).await
})
.await
}
}