use crate::LambdaInvocation;
use futures::{future::BoxFuture, ready, FutureExt, TryFutureExt};
use hyper::body::Incoming;
use lambda_runtime_api_client::{body::Body, BoxError, Client};
use pin_project::pin_project;
use std::{future::Future, pin::Pin, sync::Arc, task};
use tower::Service;
use tracing::error;
pub struct RuntimeApiClientService<S> {
inner: S,
client: Arc<Client>,
}
impl<S> RuntimeApiClientService<S> {
pub fn new(inner: S, client: Arc<Client>) -> Self {
Self { inner, client }
}
}
impl<S> Service<LambdaInvocation> for RuntimeApiClientService<S>
where
S: Service<LambdaInvocation, Error = BoxError>,
S::Future: Future<Output = Result<http::Request<Body>, BoxError>>,
{
type Response = ();
type Error = S::Error;
type Future = RuntimeApiClientFuture<S::Future>;
fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> task::Poll<Result<(), Self::Error>> {
self.inner.poll_ready(cx)
}
fn call(&mut self, req: LambdaInvocation) -> Self::Future {
let request_fut = self.inner.call(req);
let client = self.client.clone();
RuntimeApiClientFuture::First(request_fut, client)
}
}
#[pin_project(project = RuntimeApiClientFutureProj)]
pub enum RuntimeApiClientFuture<F> {
First(#[pin] F, Arc<Client>),
Second(#[pin] BoxFuture<'static, Result<http::Response<Incoming>, BoxError>>),
}
impl<F> Future for RuntimeApiClientFuture<F>
where
F: Future<Output = Result<http::Request<Body>, BoxError>>,
{
type Output = Result<(), BoxError>;
fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> task::Poll<Self::Output> {
task::Poll::Ready(loop {
match self.as_mut().project() {
RuntimeApiClientFutureProj::First(fut, client) => match ready!(fut.poll(cx)) {
Ok(ok) => {
let next_fut = client
.call(ok)
.map_err(|err| {
error!(error = ?err, "failed to send request to Lambda Runtime API");
err
})
.boxed();
self.set(RuntimeApiClientFuture::Second(next_fut));
}
Err(err) => break Err(err),
},
RuntimeApiClientFutureProj::Second(fut) => break ready!(fut.poll(cx)).map(|_| ()),
}
})
}
}