aws_lambda_runtime_proxy/
client.rs

1use std::{
2  env::{self, VarError},
3  sync::LazyLock,
4};
5
6use anyhow::Result;
7use http_body_util::{BodyExt, Full};
8use hyper::{
9  body::{Body, Bytes, Incoming},
10  client::conn::http1::{self, SendRequest},
11  Request, Response,
12};
13use hyper_util::rt::TokioIo;
14use tokio::net::TcpStream;
15use tracing::error;
16
17/// An http client for the Lambda Runtime API.
18/// A new-type wrapper around [`SendRequest<ReqBody>`].
19/// # Examples
20/// ```
21/// # use hyper::{body::Incoming, Request};
22/// use aws_lambda_runtime_proxy::LambdaRuntimeApiClient;
23///
24/// # async fn t1(req: Request<Incoming>) {
25/// let mut client = LambdaRuntimeApiClient::new().await.unwrap();
26/// // forward the original request to the runtime API
27/// client.forward(req).await.unwrap();
28/// # }
29/// # async fn t2(req: Request<Incoming>) {
30/// # let mut client = LambdaRuntimeApiClient::new().await.unwrap();
31/// // construct a custom request and send it to the runtime API
32/// client.as_mut().send_request(req).await.unwrap();
33/// # }
34/// ```
35pub struct LambdaRuntimeApiClient<ReqBody>(SendRequest<ReqBody>);
36
37impl<ReqBody> AsRef<SendRequest<ReqBody>> for LambdaRuntimeApiClient<ReqBody> {
38  fn as_ref(&self) -> &SendRequest<ReqBody> {
39    &self.0
40  }
41}
42impl<ReqBody> AsMut<SendRequest<ReqBody>> for LambdaRuntimeApiClient<ReqBody> {
43  fn as_mut(&mut self) -> &mut SendRequest<ReqBody> {
44    &mut self.0
45  }
46}
47
48impl<ReqBody: Body + Send + 'static> LambdaRuntimeApiClient<ReqBody> {
49  /// Create a new client and connect to the runtime API.
50  pub async fn new() -> Result<Self>
51  where
52    ReqBody::Data: Send,
53    ReqBody::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
54  {
55    static REAL_RUNTIME_API: LazyLock<Result<String, VarError>> =
56      LazyLock::new(|| env::var("AWS_LAMBDA_RUNTIME_API"));
57
58    let stream = TcpStream::connect(REAL_RUNTIME_API.as_ref()?).await?;
59    let io = TokioIo::new(stream);
60    let (sender, conn) = http1::handshake(io).await?;
61
62    // Spawn a task to poll the connection, driving the HTTP state
63    tokio::spawn(async move {
64      if let Err(err) = conn.await {
65        error!("Connection failed: {:?}", err);
66      }
67    });
68
69    Ok(Self(sender))
70  }
71}
72
73impl LambdaRuntimeApiClient<Incoming> {
74  /// Send a request to the runtime API and return the response.
75  pub async fn forward(&mut self, req: Request<Incoming>) -> Result<Response<Full<Bytes>>> {
76    let res = self.as_mut().send_request(req).await?;
77    let (parts, body) = res.into_parts();
78    let bytes = body.collect().await?.to_bytes();
79    Ok(Response::from_parts(parts, Full::new(bytes)))
80
81    // TODO: why we can't just return `self.send_request(req).await`?
82    // tested and it works but will add ~40ms latency when serving API GW event (maybe for all big event), why?
83    // maybe because of the `Incoming` type? can we stream the body instead of buffering it?
84  }
85}