aws_lambda_runtime_proxy/
client.rs1use std::{
2 env::{self, VarError},
3 sync::LazyLock,
4};
5
6use anyhow::Result;
7use http_body_util::{BodyExt, Full};
8use hyper::{
9 body::{Body, Bytes, Incoming},
10 client::conn::http1::{self, SendRequest},
11 Request, Response,
12};
13use hyper_util::rt::TokioIo;
14use tokio::net::TcpStream;
15use tracing::error;
16
17pub struct LambdaRuntimeApiClient<ReqBody>(SendRequest<ReqBody>);
36
37impl<ReqBody> AsRef<SendRequest<ReqBody>> for LambdaRuntimeApiClient<ReqBody> {
38 fn as_ref(&self) -> &SendRequest<ReqBody> {
39 &self.0
40 }
41}
42impl<ReqBody> AsMut<SendRequest<ReqBody>> for LambdaRuntimeApiClient<ReqBody> {
43 fn as_mut(&mut self) -> &mut SendRequest<ReqBody> {
44 &mut self.0
45 }
46}
47
48impl<ReqBody: Body + Send + 'static> LambdaRuntimeApiClient<ReqBody> {
49 pub async fn new() -> Result<Self>
51 where
52 ReqBody::Data: Send,
53 ReqBody::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
54 {
55 static REAL_RUNTIME_API: LazyLock<Result<String, VarError>> =
56 LazyLock::new(|| env::var("AWS_LAMBDA_RUNTIME_API"));
57
58 let stream = TcpStream::connect(REAL_RUNTIME_API.as_ref()?).await?;
59 let io = TokioIo::new(stream);
60 let (sender, conn) = http1::handshake(io).await?;
61
62 tokio::spawn(async move {
64 if let Err(err) = conn.await {
65 error!("Connection failed: {:?}", err);
66 }
67 });
68
69 Ok(Self(sender))
70 }
71}
72
73impl LambdaRuntimeApiClient<Incoming> {
74 pub async fn forward(&mut self, req: Request<Incoming>) -> Result<Response<Full<Bytes>>> {
76 let res = self.as_mut().send_request(req).await?;
77 let (parts, body) = res.into_parts();
78 let bytes = body.collect().await?.to_bytes();
79 Ok(Response::from_parts(parts, Full::new(bytes)))
80
81 }
85}