1use std::task::{Context, Poll};
2use std::{cell::RefCell, collections::VecDeque, fmt, future::poll_fn, pin::Pin, rc::Rc};
3
4use ntex_bytes::Bytes;
5use ntex_http::HeaderMap;
6use ntex_service::{Service, ServiceCtx};
7use ntex_util::{HashMap, Stream as FutStream, future::Either, task::LocalWaker};
8
9use crate::error::OperationError;
10use crate::frame::{Reason, StreamId, WindowSize};
11use crate::message::{Message, MessageKind};
12use crate::{Stream, StreamRef};
13
14#[derive(Clone, Default)]
15pub(super) struct InflightStorage(Rc<InflightStorageInner>);
16
17#[derive(Default)]
18struct InflightStorageInner {
19 inflight: RefCell<HashMap<StreamId, Inflight>>,
20 cb: Option<Box<dyn Fn(StreamId)>>,
21}
22
23#[derive(Debug)]
24pub(super) struct Inflight {
25 _stream: Stream,
26 response: Option<Either<Message, VecDeque<Message>>>,
27 waker: LocalWaker,
28}
29
30impl Inflight {
31 fn pop(&mut self) -> Option<Message> {
32 match self.response.take() {
33 None => None,
34 Some(Either::Left(msg)) => Some(msg),
35 Some(Either::Right(mut msgs)) => {
36 let msg = msgs.pop_front();
37 if !msgs.is_empty() {
38 self.response = Some(Either::Right(msgs));
39 }
40 msg
41 }
42 }
43 }
44
45 fn push(&mut self, item: Message) {
46 match self.response.take() {
47 Some(Either::Left(msg)) => {
48 let mut msgs = VecDeque::with_capacity(8);
49 msgs.push_back(msg);
50 msgs.push_back(item);
51 self.response = Some(Either::Right(msgs));
52 }
53 Some(Either::Right(mut messages)) => {
54 messages.push_back(item);
55 self.response = Some(Either::Right(messages));
56 }
57 None => self.response = Some(Either::Left(item)),
58 };
59 self.waker.wake();
60 }
61}
62
63#[derive(Debug)]
64pub struct SendStream(StreamRef, ());
66
67impl Drop for SendStream {
68 fn drop(&mut self) {
69 if !self.0.send_state().is_closed() {
70 self.0.reset(Reason::CANCEL);
71
72 if self.0.is_disconnect_on_drop() {
73 self.0.0.con.disconnect_when_ready();
74 }
75 }
76 }
77}
78
79impl SendStream {
80 #[inline]
81 pub fn id(&self) -> StreamId {
83 self.0.id()
84 }
85
86 #[inline]
87 pub fn tag(&self) -> &'static str {
89 self.0.tag()
90 }
91
92 #[inline]
93 pub fn stream(&self) -> &StreamRef {
94 &self.0
95 }
96
97 #[inline]
98 pub fn available_send_capacity(&self) -> WindowSize {
100 self.0.available_send_capacity()
101 }
102
103 #[inline]
104 pub async fn send_capacity(&self) -> Result<WindowSize, OperationError> {
106 self.0.send_capacity().await
107 }
108
109 #[inline]
110 pub async fn send_payload(&self, res: Bytes, eof: bool) -> Result<(), OperationError> {
112 self.0.send_payload(res, eof).await
113 }
114
115 #[inline]
116 pub fn send_trailers(&self, map: HeaderMap) {
118 self.0.send_trailers(map)
119 }
120
121 #[inline]
126 pub fn reset(&self, reason: Reason) -> bool {
127 self.0.reset(reason)
128 }
129
130 #[inline]
131 pub fn disconnect_on_drop(&self) {
133 self.0.disconnect_on_drop()
134 }
135
136 #[inline]
137 pub fn poll_send_capacity(&self, cx: &Context<'_>) -> Poll<Result<WindowSize, OperationError>> {
139 self.0.poll_send_capacity(cx)
140 }
141
142 #[inline]
143 pub fn poll_send_reset(&self, cx: &Context<'_>) -> Poll<Result<(), OperationError>> {
145 self.0.poll_send_reset(cx)
146 }
147}
148
149#[derive(Debug)]
150pub struct RecvStream(StreamRef, InflightStorage);
152
153impl RecvStream {
154 #[inline]
155 pub fn id(&self) -> StreamId {
157 self.0.id()
158 }
159
160 #[inline]
161 pub fn tag(&self) -> &'static str {
163 self.0.tag()
164 }
165
166 #[inline]
167 pub fn stream(&self) -> &StreamRef {
168 &self.0
169 }
170
171 #[inline]
172 pub fn disconnect_on_drop(&self) {
174 self.0.disconnect_on_drop()
175 }
176
177 pub async fn recv(&self) -> Option<Message> {
179 poll_fn(|cx| self.poll_recv(cx)).await
180 }
181
182 pub fn poll_recv(&self, cx: &mut Context<'_>) -> Poll<Option<Message>> {
186 if let Some(inflight) = self.1.0.inflight.borrow_mut().get_mut(&self.0.id()) {
187 if let Some(msg) = inflight.pop() {
188 Poll::Ready(Some(msg))
189 } else if self.0.recv_state().is_closed() {
190 Poll::Ready(None)
191 } else {
192 inflight.waker.register(cx.waker());
193 Poll::Pending
194 }
195 } else {
196 log::warn!(
197 "{}: Stream does not exists, {:?}",
198 self.0.tag(),
199 self.0.id()
200 );
201 Poll::Ready(None)
202 }
203 }
204}
205
206impl Drop for RecvStream {
207 fn drop(&mut self) {
208 if !self.0.recv_state().is_closed() {
209 self.0.reset(Reason::CANCEL);
210
211 if self.0.is_disconnect_on_drop() {
212 self.0.0.con.disconnect_when_ready();
213 }
214 }
215 self.1.0.inflight.borrow_mut().remove(&self.0.id());
216 }
217}
218
219impl FutStream for RecvStream {
220 type Item = Message;
221
222 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
223 self.poll_recv(cx)
224 }
225}
226
227pub(super) struct HandleService(InflightStorage);
228
229impl HandleService {
230 pub(super) fn new(storage: InflightStorage) -> Self {
231 Self(storage)
232 }
233}
234
235impl Service<Message> for HandleService {
236 type Response = ();
237 type Error = ();
238
239 async fn call(&self, msg: Message, _: ServiceCtx<'_, Self>) -> Result<(), ()> {
240 let id = msg.id();
241 if let Some(inflight) = self.0.0.inflight.borrow_mut().get_mut(&id) {
242 let eof = match msg.kind() {
243 MessageKind::Headers { eof, .. } => *eof,
244 MessageKind::Eof(..) | MessageKind::Disconnect(..) => true,
245 _ => false,
246 };
247 inflight.push(msg);
248
249 if eof {
250 self.0.notify(id);
251 log::debug!("Stream {id:?} is closed, notify");
252 }
253 } else if !matches!(msg.kind(), MessageKind::Disconnect(_)) {
254 log::error!(
255 "{}: Received message for unknown stream, {msg:?}",
256 msg.stream().tag(),
257 );
258 }
259 Ok(())
260 }
261}
262
263impl InflightStorage {
264 pub(super) fn new<F>(f: F) -> Self
265 where
266 F: Fn(StreamId) + 'static,
267 {
268 InflightStorage(Rc::new(InflightStorageInner {
269 inflight: Default::default(),
270 cb: Some(Box::new(f)),
271 }))
272 }
273
274 pub(super) fn notify(&self, id: StreamId) {
275 if let Some(ref cb) = self.0.cb {
276 (*cb)(id)
277 }
278 }
279
280 pub(super) fn inflight(&self, stream: Stream) -> (SendStream, RecvStream) {
281 let id = stream.id();
282 let snd = SendStream(stream.clone(), ());
283 let rcv = RecvStream(stream.clone(), self.clone());
284 let inflight = Inflight {
285 _stream: stream,
286 response: None,
287 waker: LocalWaker::default(),
288 };
289 self.0.inflight.borrow_mut().insert(id, inflight);
290 (snd, rcv)
291 }
292}
293
294impl fmt::Debug for InflightStorage {
295 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
296 f.debug_struct("InflightStorage")
297 .field("inflight", &self.0.inflight)
298 .finish()
299 }
300}