ntex_h2/client/
stream.rs

1use std::task::{Context, Poll};
2use std::{cell::RefCell, collections::VecDeque, fmt, future::poll_fn, pin::Pin, rc::Rc};
3
4use ntex_bytes::Bytes;
5use ntex_http::HeaderMap;
6use ntex_service::{Service, ServiceCtx};
7use ntex_util::{HashMap, Stream as FutStream, future::Either, task::LocalWaker};
8
9use crate::error::OperationError;
10use crate::frame::{Reason, StreamId, WindowSize};
11use crate::message::{Message, MessageKind};
12use crate::{Stream, StreamRef};
13
14#[derive(Clone, Default)]
15pub(super) struct InflightStorage(Rc<InflightStorageInner>);
16
17#[derive(Default)]
18struct InflightStorageInner {
19    inflight: RefCell<HashMap<StreamId, Inflight>>,
20    cb: Option<Box<dyn Fn(StreamId)>>,
21}
22
23#[derive(Debug)]
24pub(super) struct Inflight {
25    _stream: Stream,
26    response: Option<Either<Message, VecDeque<Message>>>,
27    waker: LocalWaker,
28}
29
30impl Inflight {
31    fn pop(&mut self) -> Option<Message> {
32        match self.response.take() {
33            None => None,
34            Some(Either::Left(msg)) => Some(msg),
35            Some(Either::Right(mut msgs)) => {
36                let msg = msgs.pop_front();
37                if !msgs.is_empty() {
38                    self.response = Some(Either::Right(msgs));
39                }
40                msg
41            }
42        }
43    }
44
45    fn push(&mut self, item: Message) {
46        match self.response.take() {
47            Some(Either::Left(msg)) => {
48                let mut msgs = VecDeque::with_capacity(8);
49                msgs.push_back(msg);
50                msgs.push_back(item);
51                self.response = Some(Either::Right(msgs));
52            }
53            Some(Either::Right(mut messages)) => {
54                messages.push_back(item);
55                self.response = Some(Either::Right(messages));
56            }
57            None => self.response = Some(Either::Left(item)),
58        };
59        self.waker.wake();
60    }
61}
62
63#[derive(Debug)]
64/// Send part of the client stream
65pub struct SendStream(StreamRef, ());
66
67impl Drop for SendStream {
68    fn drop(&mut self) {
69        if !self.0.send_state().is_closed() {
70            self.0.reset(Reason::CANCEL);
71
72            if self.0.is_disconnect_on_drop() {
73                self.0.0.con.disconnect_when_ready();
74            }
75        }
76    }
77}
78
79impl SendStream {
80    #[inline]
81    /// Get stream id
82    pub fn id(&self) -> StreamId {
83        self.0.id()
84    }
85
86    #[inline]
87    /// Get io tag
88    pub fn tag(&self) -> &'static str {
89        self.0.tag()
90    }
91
92    #[inline]
93    pub fn stream(&self) -> &StreamRef {
94        &self.0
95    }
96
97    #[inline]
98    /// Get available capacity
99    pub fn available_send_capacity(&self) -> WindowSize {
100        self.0.available_send_capacity()
101    }
102
103    #[inline]
104    /// Wait for available capacity
105    pub async fn send_capacity(&self) -> Result<WindowSize, OperationError> {
106        self.0.send_capacity().await
107    }
108
109    #[inline]
110    /// Send payload
111    pub async fn send_payload(&self, res: Bytes, eof: bool) -> Result<(), OperationError> {
112        self.0.send_payload(res, eof).await
113    }
114
115    #[inline]
116    /// Send trailers
117    pub fn send_trailers(&self, map: HeaderMap) {
118        self.0.send_trailers(map)
119    }
120
121    /// Reset stream
122    ///
123    /// Returns `true` if the stream state is updated and a `Reset` frame
124    /// has been sent to the peer.
125    #[inline]
126    pub fn reset(&self, reason: Reason) -> bool {
127        self.0.reset(reason)
128    }
129
130    #[inline]
131    /// Disconnect connection on stream drop
132    pub fn disconnect_on_drop(&self) {
133        self.0.disconnect_on_drop()
134    }
135
136    #[inline]
137    /// Check for available send capacity
138    pub fn poll_send_capacity(&self, cx: &Context<'_>) -> Poll<Result<WindowSize, OperationError>> {
139        self.0.poll_send_capacity(cx)
140    }
141
142    #[inline]
143    /// Check if send part of stream get reset
144    pub fn poll_send_reset(&self, cx: &Context<'_>) -> Poll<Result<(), OperationError>> {
145        self.0.poll_send_reset(cx)
146    }
147}
148
149#[derive(Debug)]
150/// Receiving part of the client stream
151pub struct RecvStream(StreamRef, InflightStorage);
152
153impl RecvStream {
154    #[inline]
155    /// Get stream id
156    pub fn id(&self) -> StreamId {
157        self.0.id()
158    }
159
160    #[inline]
161    /// Get io tag
162    pub fn tag(&self) -> &'static str {
163        self.0.tag()
164    }
165
166    #[inline]
167    pub fn stream(&self) -> &StreamRef {
168        &self.0
169    }
170
171    #[inline]
172    /// Disconnect connection on stream drop
173    pub fn disconnect_on_drop(&self) {
174        self.0.disconnect_on_drop()
175    }
176
177    /// Attempt to pull out the next value of http/2 stream
178    pub async fn recv(&self) -> Option<Message> {
179        poll_fn(|cx| self.poll_recv(cx)).await
180    }
181
182    /// Attempt to pull out the next value of this http/2 stream, registering
183    /// the current task for wakeup if the value is not yet available,
184    /// and returning None if the stream is exhausted.
185    pub fn poll_recv(&self, cx: &mut Context<'_>) -> Poll<Option<Message>> {
186        if let Some(inflight) = self.1.0.inflight.borrow_mut().get_mut(&self.0.id()) {
187            if let Some(msg) = inflight.pop() {
188                Poll::Ready(Some(msg))
189            } else if self.0.recv_state().is_closed() {
190                Poll::Ready(None)
191            } else {
192                inflight.waker.register(cx.waker());
193                Poll::Pending
194            }
195        } else {
196            log::warn!(
197                "{}: Stream does not exists, {:?}",
198                self.0.tag(),
199                self.0.id()
200            );
201            Poll::Ready(None)
202        }
203    }
204}
205
206impl Drop for RecvStream {
207    fn drop(&mut self) {
208        if !self.0.recv_state().is_closed() {
209            self.0.reset(Reason::CANCEL);
210
211            if self.0.is_disconnect_on_drop() {
212                self.0.0.con.disconnect_when_ready();
213            }
214        }
215        self.1.0.inflight.borrow_mut().remove(&self.0.id());
216    }
217}
218
219impl FutStream for RecvStream {
220    type Item = Message;
221
222    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
223        self.poll_recv(cx)
224    }
225}
226
227pub(super) struct HandleService(InflightStorage);
228
229impl HandleService {
230    pub(super) fn new(storage: InflightStorage) -> Self {
231        Self(storage)
232    }
233}
234
235impl Service<Message> for HandleService {
236    type Response = ();
237    type Error = ();
238
239    async fn call(&self, msg: Message, _: ServiceCtx<'_, Self>) -> Result<(), ()> {
240        let id = msg.id();
241        if let Some(inflight) = self.0.0.inflight.borrow_mut().get_mut(&id) {
242            let eof = match msg.kind() {
243                MessageKind::Headers { eof, .. } => *eof,
244                MessageKind::Eof(..) | MessageKind::Disconnect(..) => true,
245                _ => false,
246            };
247            inflight.push(msg);
248
249            if eof {
250                self.0.notify(id);
251                log::debug!("Stream {id:?} is closed, notify");
252            }
253        } else if !matches!(msg.kind(), MessageKind::Disconnect(_)) {
254            log::error!(
255                "{}: Received message for unknown stream, {msg:?}",
256                msg.stream().tag(),
257            );
258        }
259        Ok(())
260    }
261}
262
263impl InflightStorage {
264    pub(super) fn new<F>(f: F) -> Self
265    where
266        F: Fn(StreamId) + 'static,
267    {
268        InflightStorage(Rc::new(InflightStorageInner {
269            inflight: Default::default(),
270            cb: Some(Box::new(f)),
271        }))
272    }
273
274    pub(super) fn notify(&self, id: StreamId) {
275        if let Some(ref cb) = self.0.cb {
276            (*cb)(id)
277        }
278    }
279
280    pub(super) fn inflight(&self, stream: Stream) -> (SendStream, RecvStream) {
281        let id = stream.id();
282        let snd = SendStream(stream.clone(), ());
283        let rcv = RecvStream(stream.clone(), self.clone());
284        let inflight = Inflight {
285            _stream: stream,
286            response: None,
287            waker: LocalWaker::default(),
288        };
289        self.0.inflight.borrow_mut().insert(id, inflight);
290        (snd, rcv)
291    }
292}
293
294impl fmt::Debug for InflightStorage {
295    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
296        f.debug_struct("InflightStorage")
297            .field("inflight", &self.0.inflight)
298            .finish()
299    }
300}