use std::ops::DerefMut;
use std::pin::Pin;
use std::task::{ready, Context, Poll};
use std::{error::Error as StdError, future::Future};
use bytes::Bytes;
use dyn_clone::{clone_box, DynClone};
use futures::FutureExt;
use http_body::Frame;
use tonic::body::Body as TonicBody;
use tonic::codegen::Service;
use crate::utils::CondSend;
dyn_clone::clone_trait_object!(AbstractTransport);
type BoxedResponse = http::Response<BoxedBody>;
type BoxedError = Box<dyn StdError + Sync + Send + 'static>;
type BoxedResponseFuture =
Pin<Box<dyn ConditionalSendFuture<Output = Result<BoxedResponse, BoxedError>> + 'static>>;
pub(crate) struct BoxedTransport {
inner: Box<dyn AbstractTransport + Send + Sync>,
}
pub(crate) struct BoxedBody {
inner: Box<dyn AbstractBody + Unpin + Send + 'static>,
}
pub(crate) trait ConditionalSendFuture: Future + CondSend {}
trait AbstractBody {
fn poll_frame_inner(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Frame<Bytes>, BoxedError>>>;
}
impl<T: Future + CondSend> ConditionalSendFuture for T {}
trait AbstractTransport: DynClone {
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), BoxedError>>;
fn call(&mut self, req: http::Request<TonicBody>) -> BoxedResponseFuture;
}
trait AbstractResponse {
fn boxed(self) -> http::Response<BoxedBody>;
}
impl http_body::Body for BoxedBody {
type Data = Bytes;
type Error = BoxedError;
fn poll_frame(
mut self: Pin<&mut BoxedBody>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
AbstractBody::poll_frame_inner(Pin::new(self.inner.deref_mut()), cx)
}
}
impl<T> AbstractBody for T
where
T: http_body::Body<Data = Bytes>,
<T as http_body::Body>::Error: StdError + Send + Sync + 'static,
{
fn poll_frame_inner(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Frame<Bytes>, BoxedError>>> {
let ready = ready!(self.poll_frame(cx));
let Some(result) = ready else {
return Poll::Ready(None);
};
Poll::Ready(Some(result.map_err(box_err)))
}
}
impl Clone for BoxedTransport {
fn clone(&self) -> Self {
Self {
inner: clone_box(&*self.inner),
}
}
}
impl<B> AbstractResponse for http::Response<B>
where
B: AbstractBody + Unpin + Send + 'static,
{
fn boxed(self) -> http::Response<BoxedBody> {
self.map(boxed_body)
}
}
impl<T> AbstractTransport for T
where
T: Service<http::Request<TonicBody>> + Clone,
<T as Service<http::Request<TonicBody>>>::Response: AbstractResponse,
<T as Service<http::Request<TonicBody>>>::Error: StdError + Send + Sync + 'static,
<T as Service<http::Request<TonicBody>>>::Future: CondSend + 'static,
{
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), BoxedError>> {
Service::poll_ready(self, cx).map_err(box_err)
}
fn call(&mut self, req: http::Request<TonicBody>) -> BoxedResponseFuture {
Box::pin(Service::call(self, req).map(|response| match response {
Ok(response) => Ok(response.boxed()),
Err(e) => Err(box_err(e)),
}))
}
}
impl Service<http::Request<TonicBody>> for BoxedTransport {
type Response = http::Response<BoxedBody>;
type Error = BoxedError;
type Future = BoxedResponseFuture;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.inner.poll_ready(cx)
}
fn call(&mut self, req: http::Request<TonicBody>) -> Self::Future {
self.inner.call(req)
}
}
pub(crate) fn boxed<T, B>(transport: T) -> BoxedTransport
where
B: http_body::Body<Data = Bytes> + Send + Unpin + 'static,
<B as http_body::Body>::Error: StdError + Sync + Send,
T: Service<http::Request<TonicBody>, Response = http::Response<B>>
+ Send
+ Sync
+ Clone
+ 'static,
<T as Service<http::Request<TonicBody>>>::Error: StdError + Send + Sync + 'static,
<T as Service<http::Request<TonicBody>>>::Future: CondSend + 'static,
{
BoxedTransport {
inner: Box::new(transport),
}
}
fn boxed_body<B>(body: B) -> BoxedBody
where
B: AbstractBody + Send + Unpin + 'static,
{
BoxedBody {
inner: Box::new(body),
}
}
fn box_err<E: StdError + Send + Sync + 'static>(e: E) -> BoxedError {
Box::new(e)
}
#[cfg(test)]
mod tests {
use super::*;
#[cfg(not(target_arch = "wasm32"))]
#[allow(clippy::diverging_sub_expression)]
#[allow(dead_code)]
#[allow(unreachable_code)]
#[allow(unused_variables)]
fn can_box_tonic_channel() {
let endpoint: tonic::transport::Endpoint = unimplemented!();
let _boxed = boxed(endpoint.connect_lazy());
}
#[cfg(target_arch = "wasm32")]
#[allow(clippy::diverging_sub_expression)]
#[allow(dead_code)]
#[allow(unreachable_code)]
#[allow(unused_variables)]
fn can_box_grpc_web_client() {
let client: tonic_web_wasm_client::Client = unimplemented!();
let _boxed = boxed(client);
}
}