use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
};
use http::{Request, Response};
use http_body::Body;
use pin_project_lite::pin_project;
use tokio::sync::{mpsc, oneshot};
#[cfg(feature = "http2")]
use super::proto::h2::client::ResponseFutMap;
use super::{Error, body::Incoming};
#[cfg(not(feature = "http2"))]
pub(crate) struct ResponseFutMap<B>(std::marker::PhantomData<B>);
#[cfg(not(feature = "http2"))]
impl<B> Future for ResponseFutMap<B> {
type Output = Result<Response<Incoming>, (Error, Option<Request<B>>)>;
fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
Poll::Pending
}
}
pub(crate) type RetryPromise<T, U> = oneshot::Receiver<Result<U, TrySendError<T>>>;
#[derive(Debug)]
pub struct TrySendError<T> {
pub(crate) error: Error,
pub(crate) message: Option<T>,
}
pub(crate) fn channel<T, U>() -> (Sender<T, U>, Receiver<T, U>) {
let (tx, rx) = mpsc::unbounded_channel();
let (giver, taker) = want::new();
let tx = Sender {
buffered_once: false,
giver,
inner: tx,
};
let rx = Receiver { inner: rx, taker };
(tx, rx)
}
pub(crate) struct Sender<T, U> {
buffered_once: bool,
giver: want::Giver,
inner: mpsc::UnboundedSender<Envelope<T, U>>,
}
pub(crate) struct UnboundedSender<T, U> {
giver: want::SharedGiver,
inner: mpsc::UnboundedSender<Envelope<T, U>>,
}
impl<T, U> Sender<T, U> {
pub(crate) fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<super::Result<()>> {
self.giver.poll_want(cx).map_err(|_| Error::new_closed())
}
pub(crate) fn is_ready(&self) -> bool {
self.giver.is_wanting()
}
fn can_send(&mut self) -> bool {
if self.giver.give() || !self.buffered_once {
self.buffered_once = true;
true
} else {
false
}
}
pub(crate) fn try_send(&mut self, val: T) -> Result<RetryPromise<T, U>, T> {
if !self.can_send() {
return Err(val);
}
let (tx, rx) = oneshot::channel();
self.inner
.send(Envelope(Some((val, Callback(Some(tx))))))
.map(move |_| rx)
.map_err(|mut e| (e.0).0.take().expect("envelope not dropped").0)
}
pub(crate) fn unbound(self) -> UnboundedSender<T, U> {
UnboundedSender {
giver: self.giver.shared(),
inner: self.inner,
}
}
}
impl<T, U> UnboundedSender<T, U> {
pub(crate) fn is_ready(&self) -> bool {
!self.giver.is_canceled()
}
pub(crate) fn is_closed(&self) -> bool {
self.giver.is_canceled()
}
pub(crate) fn try_send(&mut self, val: T) -> Result<RetryPromise<T, U>, T> {
let (tx, rx) = oneshot::channel();
self.inner
.send(Envelope(Some((val, Callback(Some(tx))))))
.map(move |_| rx)
.map_err(|mut e| (e.0).0.take().expect("envelope not dropped").0)
}
}
impl<T, U> Clone for UnboundedSender<T, U> {
fn clone(&self) -> Self {
UnboundedSender {
giver: self.giver.clone(),
inner: self.inner.clone(),
}
}
}
pub(crate) struct Receiver<T, U> {
inner: mpsc::UnboundedReceiver<Envelope<T, U>>,
taker: want::Taker,
}
impl<T, U> Receiver<T, U> {
pub(crate) fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<(T, Callback<T, U>)>> {
match self.inner.poll_recv(cx) {
Poll::Ready(item) => {
Poll::Ready(item.map(|mut env| env.0.take().expect("envelope not dropped")))
}
Poll::Pending => {
self.taker.want();
Poll::Pending
}
}
}
pub(crate) fn close(&mut self) {
self.taker.cancel();
self.inner.close();
}
pub(crate) fn try_recv(&mut self) -> Option<(T, Callback<T, U>)> {
use futures_util::FutureExt;
match self.inner.recv().now_or_never() {
Some(Some(mut env)) => env.0.take(),
_ => None,
}
}
}
impl<T, U> Drop for Receiver<T, U> {
fn drop(&mut self) {
self.taker.cancel();
}
}
struct Envelope<T, U>(Option<(T, Callback<T, U>)>);
impl<T, U> Drop for Envelope<T, U> {
fn drop(&mut self) {
if let Some((val, cb)) = self.0.take() {
cb.send(Err(TrySendError {
error: Error::new_canceled().with("connection closed"),
message: Some(val),
}));
}
}
}
pub(crate) struct Callback<T, U>(Option<oneshot::Sender<Result<U, TrySendError<T>>>>);
impl<T, U> Drop for Callback<T, U> {
fn drop(&mut self) {
if let Some(tx) = self.0.take() {
let _ = tx.send(Err(TrySendError {
error: dispatch_gone(),
message: None,
}));
}
}
}
#[cold]
fn dispatch_gone() -> Error {
Error::new_user_dispatch_gone().with(if std::thread::panicking() {
"user code panicked"
} else {
"runtime dropped the dispatch task"
})
}
impl<T, U> Callback<T, U> {
pub(crate) fn is_canceled(&self) -> bool {
if let Some(ref tx) = self.0 {
return tx.is_closed();
}
unreachable!()
}
pub(crate) fn poll_canceled(&mut self, cx: &mut Context<'_>) -> Poll<()> {
if let Some(ref mut tx) = self.0 {
return tx.poll_closed(cx);
}
unreachable!()
}
pub(crate) fn send(mut self, val: Result<U, TrySendError<T>>) {
let _ = self.0.take().unwrap().send(val);
}
}
impl<T> TrySendError<T> {
pub fn take_message(&mut self) -> Option<T> {
self.message.take()
}
pub fn into_error(self) -> Error {
self.error
}
}
pin_project! {
pub struct SendWhen<B>
where
B: Body,
B: 'static,
{
#[pin]
pub(crate) when: ResponseFutMap<B>,
#[pin]
pub(crate) call_back: Option<Callback<Request<B>, Response<Incoming>>>,
}
}
impl<B> Future for SendWhen<B>
where
B: Body + 'static,
B::Data: Send,
{
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.project();
let mut call_back = this.call_back.take().expect("polled after complete");
match Pin::new(&mut this.when).poll(cx) {
Poll::Ready(Ok(res)) => {
call_back.send(Ok(res));
Poll::Ready(())
}
Poll::Pending => {
match call_back.poll_canceled(cx) {
Poll::Ready(v) => v,
Poll::Pending => {
this.call_back.set(Some(call_back));
return Poll::Pending;
}
};
trace!("send_when canceled");
Poll::Ready(())
}
Poll::Ready(Err((error, message))) => {
call_back.send(Err(TrySendError { error, message }));
Poll::Ready(())
}
}
}
}