Skip to main content

ntex_h2/
stream.rs

1use std::{cell::Cell, cmp, fmt, future::poll_fn, mem, ops, rc::Rc, task::Context, task::Poll};
2
3use ntex_bytes::Bytes;
4use ntex_error::Error;
5use ntex_http::{HeaderMap, StatusCode, header::CONTENT_LENGTH};
6use ntex_util::task::LocalWaker;
7
8use crate::error::{OperationError, StreamError};
9use crate::frame::{
10    Data, Headers, PseudoHeaders, Reason, Reset, StreamId, WindowSize, WindowUpdate,
11};
12use crate::{connection::Connection, frame, message::Message, window::Window};
13
14/// HTTP/2 Stream
15pub struct Stream(StreamRef);
16
17/// Stream capacity information
18#[derive(Debug)]
19pub struct Capacity {
20    size: Cell<u32>,
21    stream: Rc<StreamState>,
22}
23
24impl Capacity {
25    fn new(size: u32, stream: &Rc<StreamState>) -> Self {
26        if size != 0 {
27            stream.add_recv_capacity(size);
28        }
29
30        Self {
31            size: Cell::new(size),
32            stream: stream.clone(),
33        }
34    }
35
36    #[inline]
37    /// Size of capacity
38    pub fn size(&self) -> usize {
39        self.size.get() as usize
40    }
41
42    /// Consume specified amount of capacity.
43    ///
44    /// # Panics
45    ///
46    /// Panics if provided size larger than capacity.
47    pub fn consume(&self, sz: u32) {
48        let size = self.size.get();
49        if let Some(sz) = size.checked_sub(sz) {
50            #[cfg(feature = "trace")]
51            log::trace!(
52                "{}: {:?} capacity consumed from {size} to {sz}",
53                self.stream.tag(),
54                self.stream.id,
55            );
56            self.size.set(sz);
57            self.stream.consume_capacity(size - sz);
58        } else {
59            panic!("Capacity overflow");
60        }
61    }
62}
63
64/// Panics if capacity belongs to different streams
65impl ops::Add for Capacity {
66    type Output = Self;
67
68    fn add(self, other: Self) -> Self {
69        if Rc::ptr_eq(&self.stream, &other.stream) {
70            let size = Cell::new(self.size.get() + other.size.get());
71            self.size.set(0);
72            other.size.set(0);
73            Self {
74                size,
75                stream: self.stream.clone(),
76            }
77        } else {
78            panic!("Cannot add capacity from different streams");
79        }
80    }
81}
82
83/// Panics if capacity belongs to different streams
84impl ops::AddAssign for Capacity {
85    fn add_assign(&mut self, other: Self) {
86        if Rc::ptr_eq(&self.stream, &other.stream) {
87            let size = self.size.get() + other.size.get();
88            self.size.set(size);
89            other.size.set(0);
90        } else {
91            panic!("Cannot add capacity from different streams");
92        }
93    }
94}
95
96impl Drop for Capacity {
97    fn drop(&mut self) {
98        let size = self.size.get();
99        if size > 0 {
100            self.stream.consume_capacity(size);
101        }
102    }
103}
104
105/// State related to a stream's content-length validation
106#[derive(Debug, Copy, Clone, PartialEq, Eq)]
107pub(super) enum ContentLength {
108    Omitted,
109    Head,
110    Remaining(u64),
111}
112
113#[derive(Clone, Debug)]
114pub struct StreamRef(pub(crate) Rc<StreamState>);
115
116bitflags::bitflags! {
117    #[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
118    struct StreamFlags: u8 {
119        const REMOTE = 0b0000_0001;
120        const FAILED = 0b0000_0010;
121        const DISCONNECT_ON_DROP = 0b0000_0100;
122        const WAIT_FOR_CAPACITY  = 0b0000_1000;
123    }
124}
125
126pub(crate) struct StreamState {
127    /// The h2 stream identifier
128    id: StreamId,
129    flags: Cell<StreamFlags>,
130    content_length: Cell<ContentLength>,
131    /// Receive part
132    recv: Cell<HalfState>,
133    recv_window: Cell<Window>,
134    recv_size: Cell<u32>,
135    /// Send part
136    send: Cell<HalfState>,
137    send_window: Cell<Window>,
138    send_cap: LocalWaker,
139    send_reset: LocalWaker,
140    /// Connection config
141    pub(crate) con: Connection,
142    /// error state
143    error: Cell<Option<Error<OperationError>>>,
144}
145
146#[derive(Debug, Clone, Copy, PartialEq, Eq)]
147pub(crate) enum HalfState {
148    Idle,
149    Payload,
150    Closed(Option<Reason>),
151}
152
153impl HalfState {
154    pub(crate) fn is_closed(self) -> bool {
155        matches!(self, HalfState::Closed(_))
156    }
157}
158
159impl StreamState {
160    #[allow(dead_code)]
161    fn tag(&self) -> &'static str {
162        self.con.tag()
163    }
164
165    fn state_send_payload(&self) {
166        self.send.set(HalfState::Payload);
167    }
168
169    fn state_send_close(&self, reason: Option<Reason>) {
170        #[cfg(feature = "trace")]
171        log::trace!(
172            "{}: {:?} send side is closed with reason {reason:?}",
173            self.tag(),
174            self.id
175        );
176        self.send.set(HalfState::Closed(reason));
177        self.send_cap.wake();
178        self.review_state();
179    }
180
181    fn state_recv_payload(&self) {
182        self.recv.set(HalfState::Payload);
183    }
184
185    fn state_recv_close(&self, reason: Option<Reason>) {
186        #[cfg(feature = "trace")]
187        log::trace!("{}: {:?} receive side is closed", self.tag(), self.id);
188        self.recv.set(HalfState::Closed(reason));
189        self.review_state();
190    }
191
192    fn reset_stream(&self, reason: Option<Reason>) {
193        self.recv.set(HalfState::Closed(reason));
194        self.send.set(HalfState::Closed(None));
195        if let Some(reason) = reason {
196            self.error.set(Some(Error::new(
197                OperationError::LocalReset(reason),
198                self.con.service(),
199            )));
200        }
201        self.review_state();
202    }
203
204    fn remote_reset_stream(&self, reason: Reason) {
205        self.recv.set(HalfState::Closed(None));
206        self.send.set(HalfState::Closed(Some(reason)));
207        self.error.set(Some(Error::new(
208            OperationError::RemoteReset(reason),
209            self.con.service(),
210        )));
211        self.review_state();
212    }
213
214    fn failed(&self, err: Error<OperationError>) {
215        if !self.recv.get().is_closed() {
216            self.recv.set(HalfState::Closed(None));
217        }
218        if !self.send.get().is_closed() {
219            self.send.set(HalfState::Closed(None));
220        }
221        self.error.set(Some(err));
222        self.insert_flag(StreamFlags::FAILED);
223        self.review_state();
224    }
225
226    fn insert_flag(&self, f: StreamFlags) {
227        let mut flags = self.flags.get();
228        flags.insert(f);
229        self.flags.set(flags);
230    }
231
232    fn remove_flag(&self, f: StreamFlags) {
233        let mut flags = self.flags.get();
234        flags.remove(f);
235        self.flags.set(flags);
236    }
237
238    fn check_error(&self) -> Result<(), Error<OperationError>> {
239        if let Some(err) = self.error.take() {
240            self.error.set(Some(err.clone()));
241            Err(err)
242        } else {
243            Ok(())
244        }
245    }
246
247    #[allow(clippy::used_underscore_binding)]
248    fn review_state(&self) {
249        if self.recv.get().is_closed() {
250            self.send_reset.wake();
251
252            if let HalfState::Closed(_reason) = self.send.get() {
253                // stream is closed
254                #[cfg(feature = "trace")]
255                if let Some(reason) = _reason {
256                    log::trace!(
257                        "{}: {:?} is closed with remote reset {reason:?}, dropping stream",
258                        self.tag(),
259                        self.id
260                    );
261                } else {
262                    log::trace!(
263                        "{}: {:?} both sides are closed, dropping stream",
264                        self.tag(),
265                        self.id
266                    );
267                }
268                self.send_cap.wake();
269                self.con.drop_stream(self.id);
270            }
271        }
272    }
273
274    /// added new capacity, update recevice window size
275    fn add_recv_capacity(&self, size: u32) {
276        let cap = self.recv_size.get();
277        self.recv_size.set(cap + size);
278        self.recv_window.set(self.recv_window.get().dec(size));
279
280        #[cfg(feature = "trace")]
281        log::trace!(
282            "{}: {:?} capacity incresed from {cap} to {} ({size})",
283            self.tag(),
284            self.id,
285            cap + size
286        );
287
288        // connection level recv window
289        self.con.add_recv_capacity(size);
290    }
291
292    /// check and update recevice window size
293    fn consume_capacity(&self, size: u32) {
294        let cap = self.recv_size.get();
295        let size = cap - size;
296
297        #[cfg(feature = "trace")]
298        log::trace!(
299            "{}: {:?} capacity decresed from {cap} to {size}",
300            self.tag(),
301            self.id,
302        );
303
304        self.recv_size.set(size);
305        let mut window = self.recv_window.get();
306        if let Some(val) = window.update(
307            size,
308            self.con.config().window_sz,
309            self.con.config().window_sz_threshold,
310        ) {
311            #[cfg(feature = "trace")]
312            log::trace!(
313                "{}: {:?} capacity decresed below threshold {} increase by {val} ({})",
314                self.tag(),
315                self.id,
316                self.con.config().window_sz_threshold,
317                self.con.config().window_sz,
318            );
319            self.recv_window.set(window);
320            self.con.encode(WindowUpdate::new(self.id, val));
321        }
322    }
323}
324
325impl StreamRef {
326    pub(crate) fn new(id: StreamId, remote: bool, con: Connection) -> Self {
327        // if peer has accepted settings, we can use local config window size
328        // otherwise use default window size
329        let recv_window = if con.settings_processed() {
330            Window::new(con.config().window_sz)
331        } else {
332            Window::new(frame::DEFAULT_INITIAL_WINDOW_SIZE)
333        };
334        let send_window = Window::new(con.remote_window_size());
335
336        StreamRef(Rc::new(StreamState {
337            id,
338            con,
339            recv: Cell::new(HalfState::Idle),
340            recv_window: Cell::new(recv_window),
341            recv_size: Cell::new(0),
342            send: Cell::new(HalfState::Idle),
343            send_window: Cell::new(send_window),
344            send_cap: LocalWaker::new(),
345            send_reset: LocalWaker::new(),
346            error: Cell::new(None),
347            content_length: Cell::new(ContentLength::Omitted),
348            flags: Cell::new(if remote {
349                StreamFlags::REMOTE
350            } else {
351                StreamFlags::empty()
352            }),
353        }))
354    }
355
356    #[inline]
357    pub fn id(&self) -> StreamId {
358        self.0.id
359    }
360
361    #[inline]
362    pub fn tag(&self) -> &'static str {
363        self.0.con.tag()
364    }
365
366    /// Check if stream has been opened from remote side
367    #[inline]
368    pub fn is_remote(&self) -> bool {
369        self.0.flags.get().contains(StreamFlags::REMOTE)
370    }
371
372    /// Check if stream has failed
373    #[inline]
374    pub fn is_failed(&self) -> bool {
375        self.0.flags.get().contains(StreamFlags::FAILED)
376    }
377
378    pub(crate) fn service(&self) -> &'static str {
379        self.0.con.service()
380    }
381
382    pub(crate) fn send_state(&self) -> HalfState {
383        self.0.send.get()
384    }
385
386    pub(crate) fn recv_state(&self) -> HalfState {
387        self.0.recv.get()
388    }
389
390    pub(crate) fn disconnect_on_drop(&self) {
391        self.0.insert_flag(StreamFlags::DISCONNECT_ON_DROP);
392    }
393
394    pub(crate) fn is_disconnect_on_drop(&self) -> bool {
395        self.0.flags.get().contains(StreamFlags::DISCONNECT_ON_DROP)
396    }
397
398    /// Reset stream
399    ///
400    /// Returns `true` if the stream state is updated and a `Reset` frame
401    /// has been sent to the peer.
402    #[inline]
403    pub fn reset(&self, reason: Reason) -> bool {
404        if !self.0.recv.get().is_closed() || !self.0.send.get().is_closed() {
405            self.0.con.encode(Reset::new(self.0.id, reason));
406            self.0.reset_stream(Some(reason));
407            true
408        } else {
409            false
410        }
411    }
412
413    /// Get capacity instance for current stream
414    #[inline]
415    pub fn empty_capacity(&self) -> Capacity {
416        Capacity {
417            size: Cell::new(0),
418            stream: self.0.clone(),
419        }
420    }
421
422    #[inline]
423    pub(crate) fn into_stream(self) -> Stream {
424        Stream(self)
425    }
426
427    pub(crate) fn send_headers(&self, mut hdrs: Headers) {
428        hdrs.set_end_headers();
429        if hdrs.is_end_stream() {
430            self.0.state_send_close(None);
431        } else {
432            self.0.state_send_payload();
433        }
434        #[cfg(feature = "trace")]
435        log::trace!(
436            "{}: send headers {hdrs:#?} eos: {:?}",
437            self.tag(),
438            hdrs.is_end_stream()
439        );
440
441        if hdrs
442            .pseudo()
443            .status
444            .is_some_and(|status| status.is_informational())
445        {
446            self.0.content_length.set(ContentLength::Head);
447        }
448        self.0.con.encode(hdrs);
449    }
450
451    pub(crate) fn set_go_away(&self, reason: Reason) {
452        self.0.remote_reset_stream(reason);
453    }
454
455    pub(crate) fn set_failed_stream(&self, err: Error<OperationError>) {
456        self.0.failed(err);
457    }
458
459    pub(crate) fn recv_headers(
460        &self,
461        hdrs: Headers,
462    ) -> Result<Option<Message>, Error<StreamError>> {
463        #[cfg(feature = "trace")]
464        log::trace!(
465            "{}: processing HEADERS for {:?}:\n{hdrs:#?}\nrecv_state:{:?}, send_state: {:?}",
466            self.tag(),
467            self.0.id,
468            self.0.recv.get(),
469            self.0.send.get(),
470        );
471
472        match self.0.recv.get() {
473            HalfState::Idle => {
474                let eof = hdrs.is_end_stream();
475                if eof {
476                    self.0.state_recv_close(None);
477                } else {
478                    self.0.state_recv_payload();
479                }
480                let (pseudo, headers) = hdrs.into_parts();
481
482                if self.0.content_length.get() != ContentLength::Head
483                    && let Some(content_length) = headers.get(CONTENT_LENGTH)
484                {
485                    if let Some(v) = parse_u64(content_length.as_bytes()) {
486                        self.0.content_length.set(ContentLength::Remaining(v));
487                    } else {
488                        proto_err!(stream: "could not parse content-length; stream={:?}", self.0.id);
489                        return Err(Error::new(
490                            StreamError::InvalidContentLength,
491                            self.service(),
492                        ));
493                    }
494                }
495                Ok(Some(Message::new(pseudo, headers, eof, self)))
496            }
497            HalfState::Payload => {
498                // trailers
499                if hdrs.is_end_stream() {
500                    self.0.state_recv_close(None);
501                    Ok(Some(Message::trailers(hdrs.into_fields(), self)))
502                } else {
503                    Err(Error::new(StreamError::TrailersWithoutEos, self.service()))
504                }
505            }
506            HalfState::Closed(_) => Err(Error::new(StreamError::Closed, self.service())),
507        }
508    }
509
510    pub(crate) fn recv_data(&self, data: Data) -> Result<Option<Message>, Error<StreamError>> {
511        let cap = Capacity::new(data.payload().len() as u32, &self.0);
512
513        #[cfg(feature = "trace")]
514        log::trace!(
515            "{}: processing DATA frame for {:?}, len: {:?}",
516            self.tag(),
517            self.0.id,
518            data.payload().len()
519        );
520
521        match self.0.recv.get() {
522            HalfState::Payload => {
523                let eof = data.is_end_stream();
524
525                // Returns `Err` when the decrement cannot be completed due to overflow
526                match self.0.content_length.get() {
527                    ContentLength::Remaining(rem) => {
528                        match rem.checked_sub(data.payload().len() as u64) {
529                            Some(val) => {
530                                self.0.content_length.set(ContentLength::Remaining(val));
531                                if eof && val != 0 {
532                                    return Err(Error::new(
533                                        StreamError::WrongPayloadLength,
534                                        self.service(),
535                                    ));
536                                }
537                            }
538                            None => {
539                                return Err(Error::new(
540                                    StreamError::WrongPayloadLength,
541                                    self.service(),
542                                ));
543                            }
544                        }
545                    }
546                    ContentLength::Head => {
547                        if !data.payload().is_empty() {
548                            return Err(Error::new(StreamError::NonEmptyPayload, self.service()));
549                        }
550                    }
551                    ContentLength::Omitted => (),
552                }
553
554                if eof {
555                    self.0.state_recv_close(None);
556                    Ok(Some(Message::eof_data(data.into_payload(), self)))
557                } else {
558                    Ok(Some(Message::data(data.into_payload(), cap, self)))
559                }
560            }
561            HalfState::Idle => Err(Error::new(
562                StreamError::Idle("DATA framed received"),
563                self.service(),
564            )),
565            HalfState::Closed(_) => Err(Error::new(StreamError::Closed, self.service())),
566        }
567    }
568
569    pub(crate) fn recv_rst_stream(&self, frm: Reset) {
570        self.0.remote_reset_stream(frm.reason());
571    }
572
573    pub(crate) fn recv_window_update_connection(&self) {
574        if self.0.flags.get().contains(StreamFlags::WAIT_FOR_CAPACITY)
575            && self.0.send_window.get().window_size() > 0
576        {
577            self.0.send_cap.wake();
578        }
579    }
580
581    pub(crate) fn recv_window_update(&self, frm: WindowUpdate) -> Result<(), Error<StreamError>> {
582        if frm.size_increment() == 0 {
583            Err(Error::new(
584                StreamError::WindowZeroUpdateValue,
585                self.service(),
586            ))
587        } else {
588            let window = self
589                .0
590                .send_window
591                .get()
592                .inc(frm.size_increment())
593                .map_err(|()| Error::new(StreamError::WindowOverflowed, self.service()))?;
594            self.0.send_window.set(window);
595
596            if window.window_size() > 0 {
597                self.0.send_cap.wake();
598            }
599            Ok(())
600        }
601    }
602
603    pub(crate) fn update_send_window(&self, upd: i32) -> Result<(), Error<StreamError>> {
604        let orig = self.0.send_window.get();
605        let window = match upd.cmp(&0) {
606            cmp::Ordering::Less => orig.dec(upd.unsigned_abs()), // We must decrease the (remote) window
607            cmp::Ordering::Greater => orig
608                .inc(upd)
609                .map_err(|()| Error::new(StreamError::WindowOverflowed, self.service()))?,
610            cmp::Ordering::Equal => return Ok(()),
611        };
612        #[cfg(feature = "trace")]
613        log::trace!(
614            "{}: Updating send window size from {} to {}",
615            self.tag(),
616            orig.window_size,
617            window.window_size
618        );
619        self.0.send_window.set(window);
620        Ok(())
621    }
622
623    pub(crate) fn update_recv_window(
624        &self,
625        upd: i32,
626    ) -> Result<Option<WindowSize>, Error<StreamError>> {
627        let mut window = match upd.cmp(&0) {
628            cmp::Ordering::Less => self.0.recv_window.get().dec(upd.unsigned_abs()), // We must decrease the (local) window
629            cmp::Ordering::Greater => self
630                .0
631                .recv_window
632                .get()
633                .inc(upd)
634                .map_err(|()| Error::new(StreamError::WindowOverflowed, self.service()))?,
635            cmp::Ordering::Equal => return Ok(None),
636        };
637        if let Some(val) = window.update(
638            self.0.recv_size.get(),
639            self.0.con.config().window_sz,
640            self.0.con.config().window_sz_threshold,
641        ) {
642            self.0.recv_window.set(window);
643            Ok(Some(val))
644        } else {
645            self.0.recv_window.set(window);
646            Ok(None)
647        }
648    }
649
650    /// Send stream response
651    pub fn send_response(
652        &self,
653        status: StatusCode,
654        headers: HeaderMap,
655        eof: bool,
656    ) -> Result<(), Error<OperationError>> {
657        self.0.check_error()?;
658
659        match self.0.send.get() {
660            HalfState::Idle => {
661                let pseudo = PseudoHeaders::response(status);
662                let mut hdrs = Headers::new(self.0.id, pseudo, headers, eof);
663
664                if eof {
665                    hdrs.set_end_stream();
666                    self.0.state_send_close(None);
667                } else {
668                    self.0.state_send_payload();
669                }
670                self.0.con.encode(hdrs);
671                Ok(())
672            }
673            HalfState::Payload => Err(Error::new(OperationError::Payload, self.0.con.service())),
674            HalfState::Closed(r) => {
675                Err(Error::new(OperationError::Closed(r), self.0.con.service()))
676            }
677        }
678    }
679
680    /// Send payload
681    pub async fn send_payload(
682        &self,
683        mut res: Bytes,
684        eof: bool,
685    ) -> Result<(), Error<OperationError>> {
686        match self.0.send.get() {
687            HalfState::Payload => {
688                // check if stream is disconnected
689                self.0.check_error()?;
690
691                #[cfg(feature = "trace")]
692                log::trace!(
693                    "{}: {:?} sending {} bytes, eof: {eof}, send: {:?}",
694                    self.0.tag(),
695                    self.0.id,
696                    res.len(),
697                    self.0.send.get()
698                );
699
700                // eof and empty data
701                if eof && res.is_empty() {
702                    let mut data = Data::new(self.0.id, Bytes::new());
703                    data.set_end_stream();
704                    self.0.state_send_close(None);
705
706                    // write to io buffer
707                    self.0.con.encode(data);
708                    return Ok(());
709                }
710
711                loop {
712                    // calaculate available send window size
713                    let win = self.available_send_capacity() as usize;
714                    if win > 0 {
715                        let size =
716                            cmp::min(win, cmp::min(res.len(), self.0.con.remote_frame_size()));
717                        let mut data = if size >= res.len() {
718                            Data::new(self.0.id, mem::replace(&mut res, Bytes::new()))
719                        } else {
720                            #[cfg(feature = "trace")]
721                            log::trace!(
722                                "{}: {:?} sending {size} out of {} bytes",
723                                self.0.tag(),
724                                self.0.id,
725                                res.len()
726                            );
727                            Data::new(self.0.id, res.split_to(size))
728                        };
729                        if eof && res.is_empty() {
730                            data.set_end_stream();
731                            self.0.state_send_close(None);
732                        }
733
734                        // update send window
735                        self.0
736                            .send_window
737                            .set(self.0.send_window.get().dec(size as u32));
738
739                        // update connection send window
740                        self.0.con.consume_send_window(size as u32);
741
742                        // write to io buffer
743                        self.0.con.encode(data);
744                        if res.is_empty() {
745                            return Ok(());
746                        }
747                    } else {
748                        #[cfg(feature = "trace")]
749                        log::trace!(
750                            "{}: Not enough sending capacity for {:?} remaining {:?}",
751                            self.0.tag(),
752                            self.0.id,
753                            res.len()
754                        );
755                        // wait for available send window
756                        self.send_capacity().await?;
757                    }
758                }
759            }
760            HalfState::Idle => Err(Error::new(OperationError::Idle, self.0.con.service())),
761            HalfState::Closed(reason) => Err(Error::new(
762                OperationError::Closed(reason),
763                self.0.con.service(),
764            )),
765        }
766    }
767
768    /// Send client trailers and close stream
769    pub fn send_trailers(&self, map: HeaderMap) {
770        if self.0.send.get() == HalfState::Payload {
771            let mut hdrs = Headers::trailers(self.0.id, map);
772            hdrs.set_end_headers();
773            hdrs.set_end_stream();
774            self.0.con.encode(hdrs);
775            self.0.state_send_close(None);
776        }
777    }
778
779    pub fn available_send_capacity(&self) -> WindowSize {
780        cmp::min(
781            self.0.send_window.get().window_size(),
782            self.0.con.send_window_size(),
783        )
784    }
785
786    pub async fn send_capacity(&self) -> Result<WindowSize, Error<OperationError>> {
787        poll_fn(|cx| self.poll_send_capacity(cx)).await
788    }
789
790    /// Check for available send capacity
791    pub fn poll_send_capacity(
792        &self,
793        cx: &Context<'_>,
794    ) -> Poll<Result<WindowSize, Error<OperationError>>> {
795        self.0.check_error()?;
796        self.0.con.check_error()?;
797
798        let win = self.available_send_capacity();
799        if win > 0 {
800            self.0.remove_flag(StreamFlags::WAIT_FOR_CAPACITY);
801            Poll::Ready(Ok(win))
802        } else {
803            self.0.insert_flag(StreamFlags::WAIT_FOR_CAPACITY);
804            self.0.send_cap.register(cx.waker());
805            Poll::Pending
806        }
807    }
808
809    /// Check if send part of stream get reset
810    pub fn poll_send_reset(&self, cx: &Context<'_>) -> Poll<Result<(), Error<OperationError>>> {
811        if self.0.send.get().is_closed() {
812            Poll::Ready(Ok(()))
813        } else {
814            self.0.check_error()?;
815            self.0.con.check_error()?;
816            self.0.send_reset.register(cx.waker());
817            Poll::Pending
818        }
819    }
820}
821
822impl PartialEq for StreamRef {
823    fn eq(&self, other: &StreamRef) -> bool {
824        Rc::as_ptr(&self.0) == Rc::as_ptr(&other.0)
825    }
826}
827
828impl ops::Deref for Stream {
829    type Target = StreamRef;
830
831    #[inline]
832    fn deref(&self) -> &Self::Target {
833        &self.0
834    }
835}
836
837impl Drop for Stream {
838    fn drop(&mut self) {
839        self.0.reset(Reason::CANCEL);
840    }
841}
842
843impl fmt::Debug for Stream {
844    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
845        let mut builder = f.debug_struct("Stream");
846        builder
847            .field("stream_id", &self.0.0.id)
848            .field("recv_state", &self.0.0.recv.get())
849            .field("send_state", &self.0.0.send.get())
850            .finish()
851    }
852}
853
854impl fmt::Debug for StreamState {
855    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
856        let mut builder = f.debug_struct("StreamState");
857        builder
858            .field("id", &self.id)
859            .field("recv", &self.recv.get())
860            .field("recv_window", &self.recv_window.get())
861            .field("recv_size", &self.recv_size.get())
862            .field("send", &self.send.get())
863            .field("send_window", &self.send_window.get())
864            .field("flags", &self.flags.get())
865            .finish()
866    }
867}
868
869pub(super) fn parse_u64(src: &[u8]) -> Option<u64> {
870    if src.len() > 19 {
871        // At danger for overflow...
872        None
873    } else {
874        let mut ret = 0;
875        for &d in src {
876            if !d.is_ascii_digit() {
877                return None;
878            }
879
880            ret *= 10;
881            ret += u64::from(d - b'0');
882        }
883
884        Some(ret)
885    }
886}