Skip to main content

ntex_h2/
stream.rs

1use std::{cell::Cell, cmp, fmt, future::poll_fn, ops, rc::Rc, task::Context, task::Poll};
2
3use ntex_bytes::{BytePages, Bytes};
4use ntex_error::{Error, ErrorMapping};
5use ntex_http::{HeaderMap, StatusCode, header::CONTENT_LENGTH};
6use ntex_util::{future::Either, task::LocalWaker};
7
8use crate::error::{OperationError, StreamError};
9use crate::frame::{
10    Data, Head, Headers, Kind, PseudoHeaders, Reason, Reset, StreamId, WindowSize, WindowUpdate,
11};
12use crate::{connection::Connection, frame, message::Message, window::Window};
13
14/// HTTP/2 Stream
15pub struct Stream(StreamRef);
16
17/// Stream capacity information
18#[derive(Debug)]
19pub struct Capacity {
20    size: Cell<u32>,
21    stream: Rc<StreamState>,
22}
23
24impl Capacity {
25    fn new(size: u32, stream: &Rc<StreamState>) -> Self {
26        if size != 0 {
27            stream.add_recv_capacity(size);
28        }
29
30        Self {
31            size: Cell::new(size),
32            stream: stream.clone(),
33        }
34    }
35
36    #[inline]
37    /// Size of capacity
38    pub fn size(&self) -> usize {
39        self.size.get() as usize
40    }
41
42    /// Consume specified amount of capacity.
43    ///
44    /// # Panics
45    ///
46    /// Panics if provided size larger than capacity.
47    pub fn consume(&self, sz: u32) {
48        let size = self.size.get();
49        if let Some(sz) = size.checked_sub(sz) {
50            #[cfg(feature = "trace")]
51            log::trace!(
52                "{}: {:?} capacity consumed from {size} to {sz}",
53                self.stream.tag(),
54                self.stream.id,
55            );
56            self.size.set(sz);
57            self.stream.consume_capacity(size - sz);
58        } else {
59            panic!("Capacity overflow");
60        }
61    }
62}
63
64/// Panics if capacity belongs to different streams
65impl ops::Add for Capacity {
66    type Output = Self;
67
68    fn add(self, other: Self) -> Self {
69        if Rc::ptr_eq(&self.stream, &other.stream) {
70            let size = Cell::new(self.size.get() + other.size.get());
71            self.size.set(0);
72            other.size.set(0);
73            Self {
74                size,
75                stream: self.stream.clone(),
76            }
77        } else {
78            panic!("Cannot add capacity from different streams");
79        }
80    }
81}
82
83/// Panics if capacity belongs to different streams
84impl ops::AddAssign for Capacity {
85    fn add_assign(&mut self, other: Self) {
86        if Rc::ptr_eq(&self.stream, &other.stream) {
87            let size = self.size.get() + other.size.get();
88            self.size.set(size);
89            other.size.set(0);
90        } else {
91            panic!("Cannot add capacity from different streams");
92        }
93    }
94}
95
96impl Drop for Capacity {
97    fn drop(&mut self) {
98        let size = self.size.get();
99        if size > 0 {
100            self.stream.consume_capacity(size);
101        }
102    }
103}
104
105/// State related to a stream's content-length validation
106#[derive(Debug, Copy, Clone, PartialEq, Eq)]
107pub(super) enum ContentLength {
108    Omitted,
109    Head,
110    Remaining(u64),
111}
112
113#[derive(Clone, Debug)]
114pub struct StreamRef(pub(crate) Rc<StreamState>);
115
116bitflags::bitflags! {
117    #[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
118    struct StreamFlags: u8 {
119        const REMOTE = 0b0000_0001;
120        const FAILED = 0b0000_0010;
121        const DISCONNECT_ON_DROP = 0b0000_0100;
122        const WAIT_FOR_CAPACITY  = 0b0000_1000;
123    }
124}
125
126pub(crate) struct StreamState {
127    /// The h2 stream identifier
128    id: StreamId,
129    flags: Cell<StreamFlags>,
130    content_length: Cell<ContentLength>,
131    /// Receive part
132    recv: Cell<HalfState>,
133    recv_window: Cell<Window>,
134    recv_size: Cell<u32>,
135    /// Send part
136    send: Cell<HalfState>,
137    send_window: Cell<Window>,
138    send_cap: LocalWaker,
139    send_reset: LocalWaker,
140    /// Connection config
141    pub(crate) con: Connection,
142    /// error state
143    error: Cell<Option<Error<OperationError>>>,
144}
145
146#[derive(Debug, Clone, Copy, PartialEq, Eq)]
147pub(crate) enum HalfState {
148    Idle,
149    Payload,
150    Closed(Option<Reason>),
151}
152
153impl HalfState {
154    pub(crate) fn is_closed(self) -> bool {
155        matches!(self, HalfState::Closed(_))
156    }
157}
158
159impl StreamState {
160    #[allow(dead_code)]
161    fn tag(&self) -> &'static str {
162        self.con.tag()
163    }
164
165    fn state_send_payload(&self) {
166        self.send.set(HalfState::Payload);
167    }
168
169    fn state_send_close(&self, reason: Option<Reason>) {
170        #[cfg(feature = "trace")]
171        log::trace!(
172            "{}: {:?} send side is closed with reason {reason:?}",
173            self.tag(),
174            self.id
175        );
176        self.send.set(HalfState::Closed(reason));
177        self.send_cap.wake();
178        self.review_state();
179    }
180
181    fn state_recv_payload(&self) {
182        self.recv.set(HalfState::Payload);
183    }
184
185    fn state_recv_close(&self, reason: Option<Reason>) {
186        #[cfg(feature = "trace")]
187        log::trace!("{}: {:?} receive side is closed", self.tag(), self.id);
188        self.recv.set(HalfState::Closed(reason));
189        self.review_state();
190    }
191
192    fn reset_stream(&self, reason: Option<Reason>) {
193        self.recv.set(HalfState::Closed(reason));
194        self.send.set(HalfState::Closed(None));
195        if let Some(reason) = reason {
196            self.error.set(Some(Error::new(
197                OperationError::LocalReset(reason),
198                self.con.service(),
199            )));
200        }
201        self.review_state();
202    }
203
204    fn remote_reset_stream(&self, reason: Reason) {
205        self.recv.set(HalfState::Closed(None));
206        self.send.set(HalfState::Closed(Some(reason)));
207        self.error.set(Some(Error::new(
208            OperationError::RemoteReset(reason),
209            self.con.service(),
210        )));
211        self.review_state();
212    }
213
214    fn failed(&self, err: Error<OperationError>) {
215        if !self.recv.get().is_closed() {
216            self.recv.set(HalfState::Closed(None));
217        }
218        if !self.send.get().is_closed() {
219            self.send.set(HalfState::Closed(None));
220        }
221        self.error.set(Some(err));
222        self.insert_flag(StreamFlags::FAILED);
223        self.review_state();
224    }
225
226    fn insert_flag(&self, f: StreamFlags) {
227        let mut flags = self.flags.get();
228        flags.insert(f);
229        self.flags.set(flags);
230    }
231
232    fn remove_flag(&self, f: StreamFlags) {
233        let mut flags = self.flags.get();
234        flags.remove(f);
235        self.flags.set(flags);
236    }
237
238    fn check_error(&self) -> Result<(), Error<OperationError>> {
239        if let Some(err) = self.error.take() {
240            self.error.set(Some(err.clone()));
241            Err(err)
242        } else {
243            Ok(())
244        }
245    }
246
247    #[allow(clippy::used_underscore_binding)]
248    fn review_state(&self) {
249        if self.recv.get().is_closed() {
250            self.send_reset.wake();
251
252            if let HalfState::Closed(_reason) = self.send.get() {
253                // stream is closed
254                #[cfg(feature = "trace")]
255                if let Some(reason) = _reason {
256                    log::trace!(
257                        "{}: {:?} is closed with remote reset {reason:?}, dropping stream",
258                        self.tag(),
259                        self.id
260                    );
261                } else {
262                    log::trace!(
263                        "{}: {:?} both sides are closed, dropping stream",
264                        self.tag(),
265                        self.id
266                    );
267                }
268                self.send_cap.wake();
269                self.con.drop_stream(self.id);
270            }
271        }
272    }
273
274    /// added new capacity, update recevice window size
275    fn add_recv_capacity(&self, size: u32) {
276        let cap = self.recv_size.get();
277        self.recv_size.set(cap + size);
278        self.recv_window.set(self.recv_window.get().dec(size));
279
280        #[cfg(feature = "trace")]
281        log::trace!(
282            "{}: {:?} capacity incresed from {cap} to {} ({size})",
283            self.tag(),
284            self.id,
285            cap + size
286        );
287
288        // connection level recv window
289        self.con.add_recv_capacity(size);
290    }
291
292    /// check and update recevice window size
293    fn consume_capacity(&self, size: u32) {
294        let cap = self.recv_size.get();
295        let size = cap - size;
296
297        #[cfg(feature = "trace")]
298        log::trace!(
299            "{}: {:?} capacity decresed from {cap} to {size}",
300            self.tag(),
301            self.id,
302        );
303
304        self.recv_size.set(size);
305        let mut window = self.recv_window.get();
306        if let Some(val) = window.update(
307            size,
308            self.con.config().window_sz,
309            self.con.config().window_sz_threshold,
310        ) {
311            #[cfg(feature = "trace")]
312            log::trace!(
313                "{}: {:?} capacity decresed below threshold {} increase by {val} ({})",
314                self.tag(),
315                self.id,
316                self.con.config().window_sz_threshold,
317                self.con.config().window_sz,
318            );
319            self.recv_window.set(window);
320            self.con.encode(WindowUpdate::new(self.id, val));
321        }
322    }
323}
324
325impl StreamRef {
326    pub(crate) fn new(id: StreamId, remote: bool, con: Connection) -> Self {
327        // if peer has accepted settings, we can use local config window size
328        // otherwise use default window size
329        let recv_window = if con.settings_processed() {
330            Window::new(con.config().window_sz)
331        } else {
332            Window::new(frame::DEFAULT_INITIAL_WINDOW_SIZE)
333        };
334        let send_window = Window::new(con.remote_window_size());
335
336        StreamRef(Rc::new(StreamState {
337            id,
338            con,
339            recv: Cell::new(HalfState::Idle),
340            recv_window: Cell::new(recv_window),
341            recv_size: Cell::new(0),
342            send: Cell::new(HalfState::Idle),
343            send_window: Cell::new(send_window),
344            send_cap: LocalWaker::new(),
345            send_reset: LocalWaker::new(),
346            error: Cell::new(None),
347            content_length: Cell::new(ContentLength::Omitted),
348            flags: Cell::new(if remote {
349                StreamFlags::REMOTE
350            } else {
351                StreamFlags::empty()
352            }),
353        }))
354    }
355
356    #[inline]
357    pub fn id(&self) -> StreamId {
358        self.0.id
359    }
360
361    #[inline]
362    pub fn tag(&self) -> &'static str {
363        self.0.con.tag()
364    }
365
366    /// Check if stream has been opened from remote side
367    #[inline]
368    pub fn is_remote(&self) -> bool {
369        self.0.flags.get().contains(StreamFlags::REMOTE)
370    }
371
372    /// Check if stream has failed
373    #[inline]
374    pub fn is_failed(&self) -> bool {
375        self.0.flags.get().contains(StreamFlags::FAILED)
376    }
377
378    pub(crate) fn service(&self) -> &'static str {
379        self.0.con.service()
380    }
381
382    pub(crate) fn send_state(&self) -> HalfState {
383        self.0.send.get()
384    }
385
386    pub(crate) fn recv_state(&self) -> HalfState {
387        self.0.recv.get()
388    }
389
390    pub(crate) fn disconnect_on_drop(&self) {
391        self.0.insert_flag(StreamFlags::DISCONNECT_ON_DROP);
392    }
393
394    pub(crate) fn is_disconnect_on_drop(&self) -> bool {
395        self.0.flags.get().contains(StreamFlags::DISCONNECT_ON_DROP)
396    }
397
398    /// Reset stream
399    ///
400    /// Returns `true` if the stream state is updated and a `Reset` frame
401    /// has been sent to the peer.
402    #[inline]
403    pub fn reset(&self, reason: Reason) -> bool {
404        if !self.0.recv.get().is_closed() || !self.0.send.get().is_closed() {
405            self.0.con.encode(Reset::new(self.0.id, reason));
406            self.0.reset_stream(Some(reason));
407            true
408        } else {
409            false
410        }
411    }
412
413    /// Get capacity instance for current stream
414    #[inline]
415    pub fn empty_capacity(&self) -> Capacity {
416        Capacity {
417            size: Cell::new(0),
418            stream: self.0.clone(),
419        }
420    }
421
422    #[inline]
423    pub(crate) fn into_stream(self) -> Stream {
424        Stream(self)
425    }
426
427    pub(crate) fn send_headers(&self, mut hdrs: Headers) {
428        hdrs.set_end_headers();
429        if hdrs.is_end_stream() {
430            self.0.state_send_close(None);
431        } else {
432            self.0.state_send_payload();
433        }
434        #[cfg(feature = "trace")]
435        log::trace!(
436            "{}: send headers {hdrs:#?} eos: {:?}",
437            self.tag(),
438            hdrs.is_end_stream()
439        );
440
441        if hdrs
442            .pseudo()
443            .status
444            .is_some_and(|status| status.is_informational())
445        {
446            self.0.content_length.set(ContentLength::Head);
447        }
448        self.0.con.encode(hdrs);
449    }
450
451    pub(crate) fn set_go_away(&self, reason: Reason) {
452        self.0.remote_reset_stream(reason);
453    }
454
455    pub(crate) fn set_failed_stream(&self, err: Error<OperationError>) {
456        self.0.failed(err);
457    }
458
459    pub(crate) fn recv_headers(
460        &self,
461        hdrs: Headers,
462    ) -> Result<Option<Message>, Error<StreamError>> {
463        #[cfg(feature = "trace")]
464        log::trace!(
465            "{}: processing HEADERS for {:?}:\n{hdrs:#?}\nrecv_state:{:?}, send_state: {:?}",
466            self.tag(),
467            self.0.id,
468            self.0.recv.get(),
469            self.0.send.get(),
470        );
471
472        match self.0.recv.get() {
473            HalfState::Idle => {
474                let eof = hdrs.is_end_stream();
475                if eof {
476                    self.0.state_recv_close(None);
477                } else {
478                    self.0.state_recv_payload();
479                }
480                let (pseudo, headers) = hdrs.into_parts();
481
482                if self.0.content_length.get() != ContentLength::Head
483                    && let Some(content_length) = headers.get(CONTENT_LENGTH)
484                {
485                    if let Some(v) = parse_u64(content_length.as_bytes()) {
486                        self.0.content_length.set(ContentLength::Remaining(v));
487                    } else {
488                        proto_err!(stream: "could not parse content-length; stream={:?}", self.0.id);
489                        return Err(Error::new(
490                            StreamError::InvalidContentLength,
491                            self.service(),
492                        ));
493                    }
494                }
495                Ok(Some(Message::new(pseudo, headers, eof, self)))
496            }
497            HalfState::Payload => {
498                // trailers
499                if hdrs.is_end_stream() {
500                    self.0.state_recv_close(None);
501                    Ok(Some(Message::trailers(hdrs.into_fields(), self)))
502                } else {
503                    Err(Error::new(StreamError::TrailersWithoutEos, self.service()))
504                }
505            }
506            HalfState::Closed(_) => Err(Error::new(StreamError::Closed, self.service())),
507        }
508    }
509
510    pub(crate) fn recv_data(&self, data: Data) -> Result<Option<Message>, Error<StreamError>> {
511        let cap = Capacity::new(data.payload().len() as u32, &self.0);
512
513        #[cfg(feature = "trace")]
514        log::trace!(
515            "{}: processing DATA frame for {:?}, len: {:?}",
516            self.tag(),
517            self.0.id,
518            data.payload().len()
519        );
520
521        match self.0.recv.get() {
522            HalfState::Payload => {
523                let eof = data.is_end_stream();
524
525                // Returns `Err` when the decrement cannot be completed due to overflow
526                match self.0.content_length.get() {
527                    ContentLength::Remaining(rem) => {
528                        match rem.checked_sub(data.payload().len() as u64) {
529                            Some(val) => {
530                                self.0.content_length.set(ContentLength::Remaining(val));
531                                if eof && val != 0 {
532                                    return Err(Error::new(
533                                        StreamError::WrongPayloadLength,
534                                        self.service(),
535                                    ));
536                                }
537                            }
538                            None => {
539                                return Err(Error::new(
540                                    StreamError::WrongPayloadLength,
541                                    self.service(),
542                                ));
543                            }
544                        }
545                    }
546                    ContentLength::Head => {
547                        if !data.payload().is_empty() {
548                            return Err(Error::new(StreamError::NonEmptyPayload, self.service()));
549                        }
550                    }
551                    ContentLength::Omitted => (),
552                }
553
554                if eof {
555                    self.0.state_recv_close(None);
556                    Ok(Some(Message::eof_data(data.into_payload(), self)))
557                } else {
558                    Ok(Some(Message::data(data.into_payload(), cap, self)))
559                }
560            }
561            HalfState::Idle => Err(Error::new(
562                StreamError::Idle("DATA framed received"),
563                self.service(),
564            )),
565            HalfState::Closed(_) => Err(Error::new(StreamError::Closed, self.service())),
566        }
567    }
568
569    pub(crate) fn recv_rst_stream(&self, frm: Reset) {
570        self.0.remote_reset_stream(frm.reason());
571    }
572
573    pub(crate) fn recv_window_update_connection(&self) {
574        if self.0.flags.get().contains(StreamFlags::WAIT_FOR_CAPACITY)
575            && self.0.send_window.get().window_size() > 0
576        {
577            self.0.send_cap.wake();
578        }
579    }
580
581    pub(crate) fn recv_window_update(&self, frm: WindowUpdate) -> Result<(), Error<StreamError>> {
582        if frm.size_increment() == 0 {
583            Err(Error::new(
584                StreamError::WindowZeroUpdateValue,
585                self.service(),
586            ))
587        } else {
588            let window = self
589                .0
590                .send_window
591                .get()
592                .inc(frm.size_increment())
593                .map_err(|()| Error::new(StreamError::WindowOverflowed, self.service()))?;
594            self.0.send_window.set(window);
595
596            if window.window_size() > 0 {
597                self.0.send_cap.wake();
598            }
599            Ok(())
600        }
601    }
602
603    pub(crate) fn update_send_window(&self, upd: i32) -> Result<(), Error<StreamError>> {
604        let orig = self.0.send_window.get();
605        let window = match upd.cmp(&0) {
606            cmp::Ordering::Less => orig.dec(upd.unsigned_abs()), // We must decrease the (remote) window
607            cmp::Ordering::Greater => orig
608                .inc(upd)
609                .map_err(|()| Error::new(StreamError::WindowOverflowed, self.service()))?,
610            cmp::Ordering::Equal => return Ok(()),
611        };
612        #[cfg(feature = "trace")]
613        log::trace!(
614            "{}: Updating send window size from {} to {}",
615            self.tag(),
616            orig.window_size,
617            window.window_size
618        );
619        self.0.send_window.set(window);
620        Ok(())
621    }
622
623    pub(crate) fn update_recv_window(
624        &self,
625        upd: i32,
626    ) -> Result<Option<WindowSize>, Error<StreamError>> {
627        let mut window = match upd.cmp(&0) {
628            cmp::Ordering::Less => self.0.recv_window.get().dec(upd.unsigned_abs()), // We must decrease the (local) window
629            cmp::Ordering::Greater => self
630                .0
631                .recv_window
632                .get()
633                .inc(upd)
634                .map_err(|()| Error::new(StreamError::WindowOverflowed, self.service()))?,
635            cmp::Ordering::Equal => return Ok(None),
636        };
637        if let Some(val) = window.update(
638            self.0.recv_size.get(),
639            self.0.con.config().window_sz,
640            self.0.con.config().window_sz_threshold,
641        ) {
642            self.0.recv_window.set(window);
643            Ok(Some(val))
644        } else {
645            self.0.recv_window.set(window);
646            Ok(None)
647        }
648    }
649
650    /// Send stream response
651    pub fn send_response(
652        &self,
653        status: StatusCode,
654        headers: HeaderMap,
655        eof: bool,
656    ) -> Result<(), Error<OperationError>> {
657        self.0.check_error()?;
658
659        match self.0.send.get() {
660            HalfState::Idle => {
661                let pseudo = PseudoHeaders::response(status);
662                let mut hdrs = Headers::new(self.0.id, pseudo, headers, eof);
663
664                if eof {
665                    hdrs.set_end_stream();
666                    self.0.state_send_close(None);
667                } else {
668                    self.0.state_send_payload();
669                }
670                self.0.con.encode(hdrs);
671                Ok(())
672            }
673            HalfState::Payload => Err(Error::new(OperationError::Payload, self.0.con.service())),
674            HalfState::Closed(r) => {
675                Err(Error::new(OperationError::Closed(r), self.0.con.service()))
676            }
677        }
678    }
679
680    /// Send payload
681    pub async fn send_payload<D>(&self, data: D, eof: bool) -> Result<(), Error<OperationError>>
682    where
683        Bytes: From<D>,
684    {
685        self.send_pages(Bytes::from(data), eof).await
686    }
687
688    /// Send payload
689    pub async fn send_pages<D>(&self, data: D, eof: bool) -> Result<(), Error<OperationError>>
690    where
691        StreamData: From<D>,
692    {
693        let mut data = StreamData::from(data);
694
695        match self.0.send.get() {
696            HalfState::Payload => {
697                // check if stream is disconnected
698                self.0.check_error()?;
699
700                #[cfg(feature = "trace")]
701                log::trace!(
702                    "{}: {:?} sending {} bytes, eof: {eof}, send: {:?}",
703                    self.0.tag(),
704                    self.0.id,
705                    res.len(),
706                    self.0.send.get()
707                );
708
709                // eof and empty data
710                if eof && data.is_empty() {
711                    let mut data = Data::new(self.0.id, Bytes::new());
712                    data.set_end_stream();
713                    self.0.state_send_close(None);
714
715                    // write to io buffer
716                    self.0.con.encode(data);
717                    return Ok(());
718                }
719
720                loop {
721                    // calaculate available send window size
722                    let win = self.available_send_capacity() as usize;
723                    if win > 0 {
724                        let size =
725                            cmp::min(win, cmp::min(data.len(), self.0.con.remote_frame_size()));
726
727                        // write to io buffer
728                        let empty = self
729                            .0
730                            .con
731                            .encode_data_frame(|buf| {
732                                data.encode(self.0.tag(), self.0.id, size, eof, buf)
733                            })
734                            .map_err(|_| OperationError::Disconnected)
735                            .into_error()?;
736
737                        if eof && empty {
738                            self.0.state_send_close(None);
739                        }
740
741                        // update send window
742                        self.0
743                            .send_window
744                            .set(self.0.send_window.get().dec(size as u32));
745
746                        // update connection send window
747                        self.0.con.consume_send_window(size as u32);
748
749                        if empty {
750                            return Ok(());
751                        }
752                    } else {
753                        #[cfg(feature = "trace")]
754                        log::trace!(
755                            "{}: Not enough sending capacity for {:?} remaining {:?}",
756                            self.0.tag(),
757                            self.0.id,
758                            data.len()
759                        );
760                        // wait for available send window
761                        self.send_capacity().await?;
762                    }
763                }
764            }
765            HalfState::Idle => Err(Error::new(OperationError::Idle, self.0.con.service())),
766            HalfState::Closed(reason) => Err(Error::new(
767                OperationError::Closed(reason),
768                self.0.con.service(),
769            )),
770        }
771    }
772
773    /// Send client trailers and close stream
774    pub fn send_trailers(&self, map: HeaderMap) {
775        if self.0.send.get() == HalfState::Payload {
776            let mut hdrs = Headers::trailers(self.0.id, map);
777            hdrs.set_end_headers();
778            hdrs.set_end_stream();
779            self.0.con.encode(hdrs);
780            self.0.state_send_close(None);
781        }
782    }
783
784    pub fn available_send_capacity(&self) -> WindowSize {
785        cmp::min(
786            self.0.send_window.get().window_size(),
787            self.0.con.send_window_size(),
788        )
789    }
790
791    pub async fn send_capacity(&self) -> Result<WindowSize, Error<OperationError>> {
792        poll_fn(|cx| self.poll_send_capacity(cx)).await
793    }
794
795    /// Check for available send capacity
796    pub fn poll_send_capacity(
797        &self,
798        cx: &Context<'_>,
799    ) -> Poll<Result<WindowSize, Error<OperationError>>> {
800        self.0.check_error()?;
801        self.0.con.check_error()?;
802
803        let win = self.available_send_capacity();
804        if win > 0 {
805            self.0.remove_flag(StreamFlags::WAIT_FOR_CAPACITY);
806            Poll::Ready(Ok(win))
807        } else {
808            self.0.insert_flag(StreamFlags::WAIT_FOR_CAPACITY);
809            self.0.send_cap.register(cx.waker());
810            Poll::Pending
811        }
812    }
813
814    /// Check if send part of stream get reset
815    pub fn poll_send_reset(&self, cx: &Context<'_>) -> Poll<Result<(), Error<OperationError>>> {
816        if self.0.send.get().is_closed() {
817            Poll::Ready(Ok(()))
818        } else {
819            self.0.check_error()?;
820            self.0.con.check_error()?;
821            self.0.send_reset.register(cx.waker());
822            Poll::Pending
823        }
824    }
825}
826
827impl PartialEq for StreamRef {
828    fn eq(&self, other: &StreamRef) -> bool {
829        Rc::as_ptr(&self.0) == Rc::as_ptr(&other.0)
830    }
831}
832
833impl ops::Deref for Stream {
834    type Target = StreamRef;
835
836    #[inline]
837    fn deref(&self) -> &Self::Target {
838        &self.0
839    }
840}
841
842impl Drop for Stream {
843    fn drop(&mut self) {
844        self.0.reset(Reason::CANCEL);
845    }
846}
847
848impl fmt::Debug for Stream {
849    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
850        let mut builder = f.debug_struct("Stream");
851        builder
852            .field("stream_id", &self.0.0.id)
853            .field("recv_state", &self.0.0.recv.get())
854            .field("send_state", &self.0.0.send.get())
855            .finish()
856    }
857}
858
859impl fmt::Debug for StreamState {
860    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
861        let mut builder = f.debug_struct("StreamState");
862        builder
863            .field("id", &self.id)
864            .field("recv", &self.recv.get())
865            .field("recv_window", &self.recv_window.get())
866            .field("recv_size", &self.recv_size.get())
867            .field("send", &self.send.get())
868            .field("send_window", &self.send_window.get())
869            .field("flags", &self.flags.get())
870            .finish()
871    }
872}
873
874pub(super) fn parse_u64(src: &[u8]) -> Option<u64> {
875    if src.len() > 19 {
876        // At danger for overflow...
877        None
878    } else {
879        let mut ret = 0;
880        for &d in src {
881            if !d.is_ascii_digit() {
882                return None;
883            }
884
885            ret *= 10;
886            ret += u64::from(d - b'0');
887        }
888
889        Some(ret)
890    }
891}
892
893#[derive(Debug)]
894pub struct StreamData(Either<Bytes, BytePages>);
895
896impl From<Bytes> for StreamData {
897    fn from(bytes: Bytes) -> Self {
898        Self(Either::Left(bytes))
899    }
900}
901
902impl From<BytePages> for StreamData {
903    fn from(bytes: BytePages) -> Self {
904        Self(Either::Right(bytes))
905    }
906}
907
908impl StreamData {
909    fn len(&self) -> usize {
910        match &self.0 {
911            Either::Left(b) => b.len(),
912            Either::Right(b) => b.len(),
913        }
914    }
915
916    fn is_empty(&self) -> bool {
917        match &self.0 {
918            Either::Left(b) => b.is_empty(),
919            Either::Right(b) => b.is_empty(),
920        }
921    }
922
923    fn encode(
924        &mut self,
925        _tag: &'static str,
926        id: StreamId,
927        mut size: usize,
928        eof: bool,
929        dst: &mut BytePages,
930    ) -> bool {
931        #[cfg(feature = "trace")]
932        log::trace!("{_tag}: {id:?} sending {size} out of {} bytes", self.len());
933
934        if size >= self.len() {
935            Head::new(Kind::Data, if eof { 0x1 } else { 0 }, id).encode(size, dst);
936            match &mut self.0 {
937                Either::Left(b) => {
938                    dst.append(b.clone());
939                    *b = Bytes::new();
940                }
941                Either::Right(pages) => pages.move_to(dst),
942            }
943            true
944        } else {
945            Head::new(Kind::Data, 0, id).encode(size, dst);
946
947            match &mut self.0 {
948                Either::Left(b) => dst.append(b.split_to(size)),
949                Either::Right(pages) => {
950                    while let Some(mut page) = pages.take() {
951                        let len = cmp::min(size, page.len());
952                        size -= len;
953                        if page.len() == len {
954                            dst.append(page);
955                        } else {
956                            dst.append(page.split_to(len));
957                            pages.prepend(page);
958                            break;
959                        }
960                    }
961                }
962            }
963            false
964        }
965    }
966}