1use std::task::{Context, Poll};
2use std::{cell::RefCell, collections::VecDeque, fmt, future::poll_fn, pin::Pin, rc::Rc};
3
4use ntex_bytes::Bytes;
5use ntex_http::HeaderMap;
6use ntex_service::{Service, ServiceCtx};
7use ntex_util::{future::Either, task::LocalWaker, HashMap, Stream as FutStream};
8
9use crate::error::OperationError;
10use crate::frame::{Reason, StreamId, WindowSize};
11use crate::message::{Message, MessageKind};
12use crate::{Stream, StreamRef};
13
14#[derive(Clone, Default)]
15pub(super) struct InflightStorage(Rc<InflightStorageInner>);
16
17#[derive(Default)]
18struct InflightStorageInner {
19 inflight: RefCell<HashMap<StreamId, Inflight>>,
20 cb: Option<Box<dyn Fn(StreamId)>>,
21}
22
23#[derive(Debug)]
24pub(super) struct Inflight {
25 _stream: Stream,
26 response: Option<Either<Message, VecDeque<Message>>>,
27 waker: LocalWaker,
28}
29
30impl Inflight {
31 fn pop(&mut self) -> Option<Message> {
32 match self.response.take() {
33 None => None,
34 Some(Either::Left(msg)) => Some(msg),
35 Some(Either::Right(mut msgs)) => {
36 let msg = msgs.pop_front();
37 if !msgs.is_empty() {
38 self.response = Some(Either::Right(msgs));
39 }
40 msg
41 }
42 }
43 }
44
45 fn push(&mut self, item: Message) {
46 match self.response.take() {
47 Some(Either::Left(msg)) => {
48 let mut msgs = VecDeque::with_capacity(8);
49 msgs.push_back(msg);
50 msgs.push_back(item);
51 self.response = Some(Either::Right(msgs));
52 }
53 Some(Either::Right(mut messages)) => {
54 messages.push_back(item);
55 self.response = Some(Either::Right(messages));
56 }
57 None => self.response = Some(Either::Left(item)),
58 };
59 self.waker.wake();
60 }
61}
62
63#[derive(Debug)]
64pub struct SendStream(StreamRef, ());
66
67impl Drop for SendStream {
68 fn drop(&mut self) {
69 if !self.0.send_state().is_closed() {
70 self.0.reset(Reason::CANCEL);
71
72 if self.0.is_disconnect_on_drop() {
73 self.0 .0.con.disconnect_when_ready();
74 }
75 }
76 }
77}
78
79impl SendStream {
80 #[inline]
81 pub fn id(&self) -> StreamId {
83 self.0.id()
84 }
85
86 #[inline]
87 pub fn tag(&self) -> &'static str {
89 self.0.tag()
90 }
91
92 #[inline]
93 pub fn stream(&self) -> &StreamRef {
94 &self.0
95 }
96
97 #[inline]
98 pub fn available_send_capacity(&self) -> WindowSize {
100 self.0.available_send_capacity()
101 }
102
103 #[inline]
104 pub async fn send_capacity(&self) -> Result<WindowSize, OperationError> {
106 self.0.send_capacity().await
107 }
108
109 #[inline]
110 pub async fn send_payload(&self, res: Bytes, eof: bool) -> Result<(), OperationError> {
112 self.0.send_payload(res, eof).await
113 }
114
115 #[inline]
116 pub fn send_trailers(&self, map: HeaderMap) {
118 self.0.send_trailers(map)
119 }
120
121 #[inline]
122 pub fn reset(&self, reason: Reason) {
124 self.0.reset(reason)
125 }
126
127 #[inline]
128 pub fn disconnect_on_drop(&self) {
130 self.0.disconnect_on_drop()
131 }
132
133 #[inline]
134 pub fn poll_send_capacity(&self, cx: &Context<'_>) -> Poll<Result<WindowSize, OperationError>> {
136 self.0.poll_send_capacity(cx)
137 }
138
139 #[inline]
140 pub fn poll_send_reset(&self, cx: &Context<'_>) -> Poll<Result<(), OperationError>> {
142 self.0.poll_send_reset(cx)
143 }
144}
145
146#[derive(Debug)]
147pub struct RecvStream(StreamRef, InflightStorage);
149
150impl RecvStream {
151 #[inline]
152 pub fn id(&self) -> StreamId {
154 self.0.id()
155 }
156
157 #[inline]
158 pub fn tag(&self) -> &'static str {
160 self.0.tag()
161 }
162
163 #[inline]
164 pub fn stream(&self) -> &StreamRef {
165 &self.0
166 }
167
168 #[inline]
169 pub fn disconnect_on_drop(&self) {
171 self.0.disconnect_on_drop()
172 }
173
174 pub async fn recv(&self) -> Option<Message> {
176 poll_fn(|cx| self.poll_recv(cx)).await
177 }
178
179 pub fn poll_recv(&self, cx: &mut Context<'_>) -> Poll<Option<Message>> {
183 if let Some(inflight) = self.1 .0.inflight.borrow_mut().get_mut(&self.0.id()) {
184 if let Some(msg) = inflight.pop() {
185 Poll::Ready(Some(msg))
186 } else if self.0.recv_state().is_closed() {
187 Poll::Ready(None)
188 } else {
189 inflight.waker.register(cx.waker());
190 Poll::Pending
191 }
192 } else {
193 log::warn!(
194 "{}: Stream does not exists, {:?}",
195 self.0.tag(),
196 self.0.id()
197 );
198 Poll::Ready(None)
199 }
200 }
201}
202
203impl Drop for RecvStream {
204 fn drop(&mut self) {
205 if !self.0.recv_state().is_closed() {
206 self.0.reset(Reason::CANCEL);
207
208 if self.0.is_disconnect_on_drop() {
209 self.0 .0.con.disconnect_when_ready();
210 }
211 }
212 self.1 .0.inflight.borrow_mut().remove(&self.0.id());
213 }
214}
215
216impl FutStream for RecvStream {
217 type Item = Message;
218
219 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
220 self.poll_recv(cx)
221 }
222}
223
224pub(super) struct HandleService(InflightStorage);
225
226impl HandleService {
227 pub(super) fn new(storage: InflightStorage) -> Self {
228 Self(storage)
229 }
230}
231
232impl Service<Message> for HandleService {
233 type Response = ();
234 type Error = ();
235
236 async fn call(&self, msg: Message, _: ServiceCtx<'_, Self>) -> Result<(), ()> {
237 let id = msg.id();
238 if let Some(inflight) = self.0 .0.inflight.borrow_mut().get_mut(&id) {
239 let eof = match msg.kind() {
240 MessageKind::Headers { eof, .. } => *eof,
241 MessageKind::Eof(..) | MessageKind::Disconnect(..) => true,
242 _ => false,
243 };
244 inflight.push(msg);
245
246 if eof {
247 self.0.notify(id);
248 log::debug!("Stream {:?} is closed, notify", id);
249 }
250 } else if !matches!(msg.kind(), MessageKind::Disconnect(_)) {
251 log::error!(
252 "{}: Received message for unknown stream, {:?}",
253 msg.stream().tag(),
254 msg
255 );
256 }
257 Ok(())
258 }
259}
260
261impl InflightStorage {
262 pub(super) fn new<F>(f: F) -> Self
263 where
264 F: Fn(StreamId) + 'static,
265 {
266 InflightStorage(Rc::new(InflightStorageInner {
267 inflight: Default::default(),
268 cb: Some(Box::new(f)),
269 }))
270 }
271
272 pub(super) fn notify(&self, id: StreamId) {
273 if let Some(ref cb) = self.0.cb {
274 (*cb)(id)
275 }
276 }
277
278 pub(super) fn inflight(&self, stream: Stream) -> (SendStream, RecvStream) {
279 let id = stream.id();
280 let snd = SendStream(stream.clone(), ());
281 let rcv = RecvStream(stream.clone(), self.clone());
282 let inflight = Inflight {
283 _stream: stream,
284 response: None,
285 waker: LocalWaker::default(),
286 };
287 self.0.inflight.borrow_mut().insert(id, inflight);
288 (snd, rcv)
289 }
290}
291
292impl fmt::Debug for InflightStorage {
293 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
294 f.debug_struct("InflightStorage")
295 .field("inflight", &self.0.inflight)
296 .finish()
297 }
298}