aws_lambda_runtime_proxy/
server.rs

1use crate::LambdaRuntimeApiClient;
2use anyhow::Result;
3use hyper::{
4  body::{Body, Incoming},
5  server::conn::http1,
6  service::service_fn,
7  Request, Response,
8};
9use hyper_util::rt::TokioIo;
10use std::{future::Future, net::SocketAddr};
11use tokio::{io, net::TcpListener};
12use tracing::{debug, error};
13
14/// A mock server for the Lambda Runtime API.
15/// Use [`Self::bind`] to create a new server, and [`Self::serve`] to start serving requests.
16///
17/// If you want to handle each connection manually, use [`Self::handle_next`].
18/// If you want to forward requests to the real Lambda Runtime API, use [`Self::passthrough`].
19/// # Examples
20/// ```
21/// use aws_lambda_runtime_proxy::MockLambdaRuntimeApiServer;
22///
23/// # async fn t1() {
24/// let server = MockLambdaRuntimeApiServer::bind(3000).await.unwrap();
25/// // proxy all requests to the real Lambda Runtime API
26/// server.passthrough().await;
27/// # }
28/// ```
29pub struct MockLambdaRuntimeApiServer(TcpListener);
30
31impl MockLambdaRuntimeApiServer {
32  /// Create a new server bound to the provided port.
33  pub async fn bind(port: u16) -> io::Result<Self> {
34    let listener = TcpListener::bind(SocketAddr::from(([127, 0, 0, 1], port))).await?;
35    debug!("Listening on: {}", listener.local_addr()?);
36    Ok(Self(listener))
37  }
38
39  /// Handle the next incoming connection with the provided processor.
40  pub async fn handle_next<ResBody, Fut>(
41    &self,
42    processor: impl Fn(Request<Incoming>) -> Fut + Send + Sync + 'static,
43  ) -> io::Result<()>
44  where
45    ResBody: hyper::body::Body + Send + 'static,
46    <ResBody as Body>::Error: Into<Box<dyn std::error::Error + Send + Sync>> + Send,
47    Fut: Future<Output = Result<Response<ResBody>>> + Send,
48    <ResBody as Body>::Data: Send,
49  {
50    let (stream, peer) = self.0.accept().await?;
51    debug!("Accepted connection from: {}", peer);
52
53    // in lambda's execution environment there is usually only one connection
54    // but we can't rely on that, so spawn a task for each connection
55    tokio::spawn(async move {
56      if let Err(err) = http1::Builder::new()
57        .serve_connection(
58          TokioIo::new(stream),
59          service_fn(|req| async { processor(req).await }),
60        )
61        .await
62      {
63        error!("Error serving connection: {:?}", err);
64      }
65    });
66
67    Ok(())
68  }
69
70  /// Block the current thread and handle connections with the processor in a loop.
71  pub async fn serve<ResBody, Fut>(
72    &self,
73    processor: impl Fn(Request<Incoming>) -> Fut + Send + Sync + Clone + 'static,
74  ) where
75    Fut: Future<Output = Result<Response<ResBody>>> + Send,
76    ResBody: hyper::body::Body + Send + 'static,
77    <ResBody as Body>::Error: Into<Box<dyn std::error::Error + Send + Sync>> + Send,
78    <ResBody as Body>::Data: Send,
79  {
80    loop {
81      if let Err(err) = self.handle_next(processor.clone()).await {
82        error!("Error handling connection: {:?}", err);
83      }
84    }
85  }
86
87  /// Block the current thread and handle connections in a loop,
88  /// forwarding requests to a new [`LambdaRuntimeApiClient`], and responding with the client's response.
89  pub async fn passthrough(&self) {
90    self
91      .serve(|req| async {
92        // tested and it looks like creating the client every time is faster
93        // than locking an Arc<Mutex<LambdaRuntimeApiClient>> and reuse it.
94        // creating a new client and sending the request usually cost < 1ms.
95        LambdaRuntimeApiClient::new().await?.forward(req).await
96      })
97      .await
98  }
99}