ntex_h2/client/
stream.rs

1use std::task::{Context, Poll};
2use std::{cell::RefCell, collections::VecDeque, fmt, future::poll_fn, pin::Pin, rc::Rc};
3
4use ntex_bytes::Bytes;
5use ntex_http::HeaderMap;
6use ntex_service::{Service, ServiceCtx};
7use ntex_util::{future::Either, task::LocalWaker, HashMap, Stream as FutStream};
8
9use crate::error::OperationError;
10use crate::frame::{Reason, StreamId, WindowSize};
11use crate::message::{Message, MessageKind};
12use crate::{Stream, StreamRef};
13
14#[derive(Clone, Default)]
15pub(super) struct InflightStorage(Rc<InflightStorageInner>);
16
17#[derive(Default)]
18struct InflightStorageInner {
19    inflight: RefCell<HashMap<StreamId, Inflight>>,
20    cb: Option<Box<dyn Fn(StreamId)>>,
21}
22
23#[derive(Debug)]
24pub(super) struct Inflight {
25    _stream: Stream,
26    response: Option<Either<Message, VecDeque<Message>>>,
27    waker: LocalWaker,
28}
29
30impl Inflight {
31    fn pop(&mut self) -> Option<Message> {
32        match self.response.take() {
33            None => None,
34            Some(Either::Left(msg)) => Some(msg),
35            Some(Either::Right(mut msgs)) => {
36                let msg = msgs.pop_front();
37                if !msgs.is_empty() {
38                    self.response = Some(Either::Right(msgs));
39                }
40                msg
41            }
42        }
43    }
44
45    fn push(&mut self, item: Message) {
46        match self.response.take() {
47            Some(Either::Left(msg)) => {
48                let mut msgs = VecDeque::with_capacity(8);
49                msgs.push_back(msg);
50                msgs.push_back(item);
51                self.response = Some(Either::Right(msgs));
52            }
53            Some(Either::Right(mut messages)) => {
54                messages.push_back(item);
55                self.response = Some(Either::Right(messages));
56            }
57            None => self.response = Some(Either::Left(item)),
58        };
59        self.waker.wake();
60    }
61}
62
63#[derive(Debug)]
64/// Send part of the client stream
65pub struct SendStream(StreamRef, ());
66
67impl Drop for SendStream {
68    fn drop(&mut self) {
69        if !self.0.send_state().is_closed() {
70            self.0.reset(Reason::CANCEL);
71
72            if self.0.is_disconnect_on_drop() {
73                self.0 .0.con.disconnect_when_ready();
74            }
75        }
76    }
77}
78
79impl SendStream {
80    #[inline]
81    /// Get stream id
82    pub fn id(&self) -> StreamId {
83        self.0.id()
84    }
85
86    #[inline]
87    /// Get io tag
88    pub fn tag(&self) -> &'static str {
89        self.0.tag()
90    }
91
92    #[inline]
93    pub fn stream(&self) -> &StreamRef {
94        &self.0
95    }
96
97    #[inline]
98    /// Get available capacity
99    pub fn available_send_capacity(&self) -> WindowSize {
100        self.0.available_send_capacity()
101    }
102
103    #[inline]
104    /// Wait for available capacity
105    pub async fn send_capacity(&self) -> Result<WindowSize, OperationError> {
106        self.0.send_capacity().await
107    }
108
109    #[inline]
110    /// Send payload
111    pub async fn send_payload(&self, res: Bytes, eof: bool) -> Result<(), OperationError> {
112        self.0.send_payload(res, eof).await
113    }
114
115    #[inline]
116    /// Send trailers
117    pub fn send_trailers(&self, map: HeaderMap) {
118        self.0.send_trailers(map)
119    }
120
121    #[inline]
122    /// Reset stream
123    pub fn reset(&self, reason: Reason) {
124        self.0.reset(reason)
125    }
126
127    #[inline]
128    /// Disconnect connection on stream drop
129    pub fn disconnect_on_drop(&self) {
130        self.0.disconnect_on_drop()
131    }
132
133    #[inline]
134    /// Check for available send capacity
135    pub fn poll_send_capacity(&self, cx: &Context<'_>) -> Poll<Result<WindowSize, OperationError>> {
136        self.0.poll_send_capacity(cx)
137    }
138
139    #[inline]
140    /// Check if send part of stream get reset
141    pub fn poll_send_reset(&self, cx: &Context<'_>) -> Poll<Result<(), OperationError>> {
142        self.0.poll_send_reset(cx)
143    }
144}
145
146#[derive(Debug)]
147/// Receiving part of the client stream
148pub struct RecvStream(StreamRef, InflightStorage);
149
150impl RecvStream {
151    #[inline]
152    /// Get stream id
153    pub fn id(&self) -> StreamId {
154        self.0.id()
155    }
156
157    #[inline]
158    /// Get io tag
159    pub fn tag(&self) -> &'static str {
160        self.0.tag()
161    }
162
163    #[inline]
164    pub fn stream(&self) -> &StreamRef {
165        &self.0
166    }
167
168    #[inline]
169    /// Disconnect connection on stream drop
170    pub fn disconnect_on_drop(&self) {
171        self.0.disconnect_on_drop()
172    }
173
174    /// Attempt to pull out the next value of http/2 stream
175    pub async fn recv(&self) -> Option<Message> {
176        poll_fn(|cx| self.poll_recv(cx)).await
177    }
178
179    /// Attempt to pull out the next value of this http/2 stream, registering
180    /// the current task for wakeup if the value is not yet available,
181    /// and returning None if the stream is exhausted.
182    pub fn poll_recv(&self, cx: &mut Context<'_>) -> Poll<Option<Message>> {
183        if let Some(inflight) = self.1 .0.inflight.borrow_mut().get_mut(&self.0.id()) {
184            if let Some(msg) = inflight.pop() {
185                Poll::Ready(Some(msg))
186            } else if self.0.recv_state().is_closed() {
187                Poll::Ready(None)
188            } else {
189                inflight.waker.register(cx.waker());
190                Poll::Pending
191            }
192        } else {
193            log::warn!(
194                "{}: Stream does not exists, {:?}",
195                self.0.tag(),
196                self.0.id()
197            );
198            Poll::Ready(None)
199        }
200    }
201}
202
203impl Drop for RecvStream {
204    fn drop(&mut self) {
205        if !self.0.recv_state().is_closed() {
206            self.0.reset(Reason::CANCEL);
207
208            if self.0.is_disconnect_on_drop() {
209                self.0 .0.con.disconnect_when_ready();
210            }
211        }
212        self.1 .0.inflight.borrow_mut().remove(&self.0.id());
213    }
214}
215
216impl FutStream for RecvStream {
217    type Item = Message;
218
219    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
220        self.poll_recv(cx)
221    }
222}
223
224pub(super) struct HandleService(InflightStorage);
225
226impl HandleService {
227    pub(super) fn new(storage: InflightStorage) -> Self {
228        Self(storage)
229    }
230}
231
232impl Service<Message> for HandleService {
233    type Response = ();
234    type Error = ();
235
236    async fn call(&self, msg: Message, _: ServiceCtx<'_, Self>) -> Result<(), ()> {
237        let id = msg.id();
238        if let Some(inflight) = self.0 .0.inflight.borrow_mut().get_mut(&id) {
239            let eof = match msg.kind() {
240                MessageKind::Headers { eof, .. } => *eof,
241                MessageKind::Eof(..) | MessageKind::Disconnect(..) => true,
242                _ => false,
243            };
244            inflight.push(msg);
245
246            if eof {
247                self.0.notify(id);
248                log::debug!("Stream {:?} is closed, notify", id);
249            }
250        } else if !matches!(msg.kind(), MessageKind::Disconnect(_)) {
251            log::error!(
252                "{}: Received message for unknown stream, {:?}",
253                msg.stream().tag(),
254                msg
255            );
256        }
257        Ok(())
258    }
259}
260
261impl InflightStorage {
262    pub(super) fn new<F>(f: F) -> Self
263    where
264        F: Fn(StreamId) + 'static,
265    {
266        InflightStorage(Rc::new(InflightStorageInner {
267            inflight: Default::default(),
268            cb: Some(Box::new(f)),
269        }))
270    }
271
272    pub(super) fn notify(&self, id: StreamId) {
273        if let Some(ref cb) = self.0.cb {
274            (*cb)(id)
275        }
276    }
277
278    pub(super) fn inflight(&self, stream: Stream) -> (SendStream, RecvStream) {
279        let id = stream.id();
280        let snd = SendStream(stream.clone(), ());
281        let rcv = RecvStream(stream.clone(), self.clone());
282        let inflight = Inflight {
283            _stream: stream,
284            response: None,
285            waker: LocalWaker::default(),
286        };
287        self.0.inflight.borrow_mut().insert(id, inflight);
288        (snd, rcv)
289    }
290}
291
292impl fmt::Debug for InflightStorage {
293    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
294        f.debug_struct("InflightStorage")
295            .field("inflight", &self.0.inflight)
296            .finish()
297    }
298}