1use std::{
2 future::Future,
3 pin::Pin,
4 task::{Context, Poll},
5};
6
7use http::{Request, Response};
8use http_body::Body;
9use pin_project_lite::pin_project;
10use tokio::sync::{mpsc, oneshot};
11
12use super::{body::Incoming, proto::http2::client::ResponseFutMap, Error};
13
14type RetryPromise<T, U> = oneshot::Receiver<Result<U, TrySendError<T>>>;
15
16pub(crate) fn channel<T, U>() -> (Sender<T, U>, Receiver<T, U>) {
17 let (tx, rx) = mpsc::unbounded_channel();
18 let (giver, taker) = want::new();
19 (
20 Sender {
21 buffered_once: false,
22 giver,
23 inner: tx,
24 },
25 Receiver { inner: rx, taker },
26 )
27}
28
29#[derive(Debug)]
36pub struct TrySendError<T> {
37 pub(crate) error: Error,
38 pub(crate) message: Option<T>,
39}
40
41pub(crate) struct Sender<T, U> {
46 buffered_once: bool,
50 giver: want::Giver,
55 inner: mpsc::UnboundedSender<Envelope<T, U>>,
57}
58
59pub(crate) struct UnboundedSender<T, U> {
64 giver: want::SharedGiver,
66 inner: mpsc::UnboundedSender<Envelope<T, U>>,
67}
68
69impl<T, U> Sender<T, U> {
70 #[inline]
71 pub(crate) fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<super::Result<()>> {
72 self.giver.poll_want(cx).map_err(|_| Error::new_closed())
73 }
74
75 #[inline]
76 pub(crate) fn is_ready(&self) -> bool {
77 self.giver.is_wanting()
78 }
79
80 pub(crate) fn try_send(&mut self, val: T) -> Result<RetryPromise<T, U>, T> {
81 if self.giver.give() || !self.buffered_once {
82 self.buffered_once = true;
87 } else {
88 return Err(val);
89 };
90
91 let (tx, rx) = oneshot::channel();
92 self.inner
93 .send(Envelope(Some((val, Callback(Some(tx))))))
94 .map(move |_| rx)
95 .map_err(|mut e| (e.0).0.take().expect("envelope not dropped").0)
96 }
97
98 #[inline]
99 pub(crate) fn unbound(self) -> UnboundedSender<T, U> {
100 UnboundedSender {
101 giver: self.giver.shared(),
102 inner: self.inner,
103 }
104 }
105}
106
107impl<T, U> UnboundedSender<T, U> {
108 #[inline]
109 pub(crate) fn is_ready(&self) -> bool {
110 !self.giver.is_canceled()
111 }
112
113 #[inline]
114 pub(crate) fn is_closed(&self) -> bool {
115 self.giver.is_canceled()
116 }
117
118 pub(crate) fn try_send(&mut self, val: T) -> Result<RetryPromise<T, U>, T> {
119 let (tx, rx) = oneshot::channel();
120 self.inner
121 .send(Envelope(Some((val, Callback(Some(tx))))))
122 .map(move |_| rx)
123 .map_err(|mut e| (e.0).0.take().expect("envelope not dropped").0)
124 }
125}
126
127impl<T, U> Clone for UnboundedSender<T, U> {
128 #[inline]
129 fn clone(&self) -> Self {
130 UnboundedSender {
131 giver: self.giver.clone(),
132 inner: self.inner.clone(),
133 }
134 }
135}
136
137pub(crate) struct Receiver<T, U> {
138 inner: mpsc::UnboundedReceiver<Envelope<T, U>>,
139 taker: want::Taker,
140}
141
142impl<T, U> Receiver<T, U> {
143 pub(crate) fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<(T, Callback<T, U>)>> {
144 match self.inner.poll_recv(cx) {
145 Poll::Ready(item) => {
146 Poll::Ready(item.map(|mut env| env.0.take().expect("envelope not dropped")))
147 }
148 Poll::Pending => {
149 self.taker.want();
150 Poll::Pending
151 }
152 }
153 }
154
155 #[inline]
156 pub(crate) fn close(&mut self) {
157 self.taker.cancel();
158 self.inner.close();
159 }
160
161 #[inline]
162 pub(crate) fn try_recv(&mut self) -> Option<(T, Callback<T, U>)> {
163 use futures_util::FutureExt;
164 match self.inner.recv().now_or_never() {
165 Some(Some(mut env)) => env.0.take(),
166 _ => None,
167 }
168 }
169}
170
171impl<T, U> Drop for Receiver<T, U> {
172 #[inline]
173 fn drop(&mut self) {
174 self.taker.cancel();
177 }
178}
179
180struct Envelope<T, U>(Option<(T, Callback<T, U>)>);
181
182impl<T, U> Drop for Envelope<T, U> {
183 fn drop(&mut self) {
184 if let Some((val, cb)) = self.0.take() {
185 cb.send(Err(TrySendError {
186 error: Error::new_canceled().with("connection closed"),
187 message: Some(val),
188 }));
189 }
190 }
191}
192
193pub(crate) struct Callback<T, U>(Option<oneshot::Sender<Result<U, TrySendError<T>>>>);
194
195impl<T, U> Drop for Callback<T, U> {
196 fn drop(&mut self) {
197 if let Some(tx) = self.0.take() {
198 let _ = tx.send(Err(TrySendError {
199 error: dispatch_gone(),
200 message: None,
201 }));
202 }
203 }
204}
205
206#[cold]
207fn dispatch_gone() -> Error {
208 Error::new_user_dispatch_gone().with(if std::thread::panicking() {
210 "user code panicked"
211 } else {
212 "runtime dropped the dispatch task"
213 })
214}
215
216impl<T, U> Callback<T, U> {
217 const MISSING_SENDER: &'static str = "callback sender missing";
218
219 #[inline]
220 pub(crate) fn is_canceled(&self) -> bool {
221 self.0.as_ref().expect(Self::MISSING_SENDER).is_closed()
222 }
223
224 #[inline]
225 pub(crate) fn poll_canceled(&mut self, cx: &mut Context<'_>) -> Poll<()> {
226 self.0.as_mut().expect(Self::MISSING_SENDER).poll_closed(cx)
227 }
228
229 #[inline]
230 pub(crate) fn send(mut self, val: Result<U, TrySendError<T>>) {
231 let _ = self.0.take().expect(Self::MISSING_SENDER).send(val);
232 }
233}
234
235impl<T> TrySendError<T> {
236 #[inline]
242 pub fn take_message(&mut self) -> Option<T> {
243 self.message.take()
244 }
245
246 #[inline]
248 pub fn into_error(self) -> Error {
249 self.error
250 }
251}
252
253pin_project! {
254 pub(crate) struct SendWhen<B>
255 where
256 B: Body,
257 B: 'static,
258 {
259 #[pin]
260 pub(crate) when: ResponseFutMap<B>,
261 #[pin]
262 pub(crate) call_back: Option<Callback<Request<B>, Response<Incoming>>>,
263 }
264}
265
266impl<B> Future for SendWhen<B>
267where
268 B: Body + 'static,
269 B::Data: Send,
270{
271 type Output = ();
272
273 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
274 let mut this = self.project();
275 let mut call_back = this.call_back.take().expect("polled after complete");
276
277 match Pin::new(&mut this.when).poll(cx) {
278 Poll::Ready(Ok(res)) => {
279 call_back.send(Ok(res));
280 Poll::Ready(())
281 }
282 Poll::Pending => {
283 match call_back.poll_canceled(cx) {
285 Poll::Ready(v) => v,
286 Poll::Pending => {
287 this.call_back.set(Some(call_back));
289 return Poll::Pending;
290 }
291 };
292 trace!("send_when canceled");
293 this.when.as_mut().cancel();
296 Poll::Ready(())
297 }
298 Poll::Ready(Err((error, message))) => {
299 call_back.send(Err(TrySendError { error, message }));
300 Poll::Ready(())
301 }
302 }
303 }
304}