use crate::service::{Layer, Service};
use http::Response;
use http_body::{Body, Frame, SizeHint};
use pin_project::pin_project;
use std::pin::Pin;
use std::task::{Context, Poll};
use zipkin::{Detached, Kind, OpenSpan};
pub struct WaitForSpansLayer;
impl<S> Layer<S> for WaitForSpansLayer {
type Service = WaitForSpansService<S>;
fn layer(self, inner: S) -> WaitForSpansService<S> {
WaitForSpansService { inner }
}
}
pub struct WaitForSpansService<S> {
inner: S,
}
impl<S, R, B> Service<R> for WaitForSpansService<S>
where
S: Service<R, Response = Response<B>>,
{
type Response = Response<WaitForSpansBody<B>>;
type Error = S::Error;
async fn call(&self, req: R) -> Result<Self::Response, Self::Error> {
let response = zipkin::next_span()
.with_name("conjure-runtime: wait-for-headers")
.with_kind(Kind::Client)
.detach()
.bind(self.inner.call(req))
.await?;
Ok(response.map(|body| WaitForSpansBody {
body,
_span: zipkin::next_span()
.with_name("conjure-runtime: wait-for-body")
.detach(),
}))
}
}
#[pin_project]
pub struct WaitForSpansBody<B> {
#[pin]
body: B,
_span: OpenSpan<Detached>,
}
impl<B> Body for WaitForSpansBody<B>
where
B: Body,
{
type Data = B::Data;
type Error = B::Error;
fn poll_frame(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
self.project().body.poll_frame(cx)
}
fn is_end_stream(&self) -> bool {
self.body.is_end_stream()
}
fn size_hint(&self) -> SizeHint {
self.body.size_hint()
}
}