Skip to main content

ntex_h2/
stream.rs

1use std::{cell::Cell, cmp, fmt, future::poll_fn, mem, ops, rc::Rc, task::Context, task::Poll};
2
3use ntex_bytes::Bytes;
4use ntex_error::Error;
5use ntex_http::{HeaderMap, StatusCode, header::CONTENT_LENGTH};
6use ntex_util::task::LocalWaker;
7
8use crate::error::{OperationError, StreamError};
9use crate::frame::{
10    Data, Headers, PseudoHeaders, Reason, Reset, StreamId, WindowSize, WindowUpdate,
11};
12use crate::{connection::Connection, frame, message::Message, window::Window};
13
14/// HTTP/2 Stream
15pub struct Stream(StreamRef);
16
17/// Stream capacity information
18#[derive(Debug)]
19pub struct Capacity {
20    size: Cell<u32>,
21    stream: Rc<StreamState>,
22}
23
24impl Capacity {
25    fn new(size: u32, stream: &Rc<StreamState>) -> Self {
26        stream.add_recv_capacity(size);
27
28        Self {
29            size: Cell::new(size),
30            stream: stream.clone(),
31        }
32    }
33
34    #[inline]
35    /// Size of capacity
36    pub fn size(&self) -> usize {
37        self.size.get() as usize
38    }
39
40    /// Consume specified amount of capacity.
41    ///
42    /// # Panics
43    ///
44    /// Panics if provided size larger than capacity.
45    pub fn consume(&self, sz: u32) {
46        let size = self.size.get();
47        if let Some(sz) = size.checked_sub(sz) {
48            log::trace!(
49                "{}: {:?} capacity consumed from {} to {}",
50                self.stream.tag(),
51                self.stream.id,
52                size,
53                sz
54            );
55            self.size.set(sz);
56            self.stream.consume_capacity(size - sz);
57        } else {
58            panic!("Capacity overflow");
59        }
60    }
61}
62
63/// Panics if capacity belongs to different streams
64impl ops::Add for Capacity {
65    type Output = Self;
66
67    fn add(self, other: Self) -> Self {
68        if Rc::ptr_eq(&self.stream, &other.stream) {
69            let size = Cell::new(self.size.get() + other.size.get());
70            self.size.set(0);
71            other.size.set(0);
72            Self {
73                size,
74                stream: self.stream.clone(),
75            }
76        } else {
77            panic!("Cannot add capacity from different streams");
78        }
79    }
80}
81
82/// Panics if capacity belongs to different streams
83impl ops::AddAssign for Capacity {
84    fn add_assign(&mut self, other: Self) {
85        if Rc::ptr_eq(&self.stream, &other.stream) {
86            let size = self.size.get() + other.size.get();
87            self.size.set(size);
88            other.size.set(0);
89        } else {
90            panic!("Cannot add capacity from different streams");
91        }
92    }
93}
94
95impl Drop for Capacity {
96    fn drop(&mut self) {
97        let size = self.size.get();
98        if size > 0 {
99            self.stream.consume_capacity(size);
100        }
101    }
102}
103
104/// State related to a stream's content-length validation
105#[derive(Debug, Copy, Clone, PartialEq, Eq)]
106pub(super) enum ContentLength {
107    Omitted,
108    Head,
109    Remaining(u64),
110}
111
112#[derive(Clone, Debug)]
113pub struct StreamRef(pub(crate) Rc<StreamState>);
114
115bitflags::bitflags! {
116    #[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
117    struct StreamFlags: u8 {
118        const REMOTE = 0b0000_0001;
119        const FAILED = 0b0000_0010;
120        const DISCONNECT_ON_DROP = 0b0000_0100;
121        const WAIT_FOR_CAPACITY  = 0b0000_1000;
122    }
123}
124
125pub(crate) struct StreamState {
126    /// The h2 stream identifier
127    id: StreamId,
128    flags: Cell<StreamFlags>,
129    content_length: Cell<ContentLength>,
130    /// Receive part
131    recv: Cell<HalfState>,
132    recv_window: Cell<Window>,
133    recv_size: Cell<u32>,
134    /// Send part
135    send: Cell<HalfState>,
136    send_window: Cell<Window>,
137    send_cap: LocalWaker,
138    send_reset: LocalWaker,
139    /// Connection config
140    pub(crate) con: Connection,
141    /// error state
142    error: Cell<Option<Error<OperationError>>>,
143}
144
145#[derive(Debug, Clone, Copy, PartialEq, Eq)]
146pub(crate) enum HalfState {
147    Idle,
148    Payload,
149    Closed(Option<Reason>),
150}
151
152impl HalfState {
153    pub(crate) fn is_closed(self) -> bool {
154        matches!(self, HalfState::Closed(_))
155    }
156}
157
158impl StreamState {
159    fn tag(&self) -> &'static str {
160        self.con.tag()
161    }
162
163    fn state_send_payload(&self) {
164        self.send.set(HalfState::Payload);
165    }
166
167    fn state_send_close(&self, reason: Option<Reason>) {
168        log::trace!(
169            "{}: {:?} send side is closed with reason {:?}",
170            self.tag(),
171            self.id,
172            reason
173        );
174        self.send.set(HalfState::Closed(reason));
175        self.send_cap.wake();
176        self.review_state();
177    }
178
179    fn state_recv_payload(&self) {
180        self.recv.set(HalfState::Payload);
181    }
182
183    fn state_recv_close(&self, reason: Option<Reason>) {
184        log::trace!("{}: {:?} receive side is closed", self.tag(), self.id);
185        self.recv.set(HalfState::Closed(reason));
186        self.review_state();
187    }
188
189    fn reset_stream(&self, reason: Option<Reason>) {
190        self.recv.set(HalfState::Closed(reason));
191        self.send.set(HalfState::Closed(None));
192        if let Some(reason) = reason {
193            self.error.set(Some(Error::new(
194                OperationError::LocalReset(reason),
195                self.con.service(),
196            )));
197        }
198        self.review_state();
199    }
200
201    fn remote_reset_stream(&self, reason: Reason) {
202        self.recv.set(HalfState::Closed(None));
203        self.send.set(HalfState::Closed(Some(reason)));
204        self.error.set(Some(Error::new(
205            OperationError::RemoteReset(reason),
206            self.con.service(),
207        )));
208        self.review_state();
209    }
210
211    fn failed(&self, err: Error<OperationError>) {
212        if !self.recv.get().is_closed() {
213            self.recv.set(HalfState::Closed(None));
214        }
215        if !self.send.get().is_closed() {
216            self.send.set(HalfState::Closed(None));
217        }
218        self.error.set(Some(err));
219        self.insert_flag(StreamFlags::FAILED);
220        self.review_state();
221    }
222
223    fn insert_flag(&self, f: StreamFlags) {
224        let mut flags = self.flags.get();
225        flags.insert(f);
226        self.flags.set(flags);
227    }
228
229    fn remove_flag(&self, f: StreamFlags) {
230        let mut flags = self.flags.get();
231        flags.remove(f);
232        self.flags.set(flags);
233    }
234
235    fn check_error(&self) -> Result<(), Error<OperationError>> {
236        if let Some(err) = self.error.take() {
237            self.error.set(Some(err.clone()));
238            Err(err)
239        } else {
240            Ok(())
241        }
242    }
243
244    fn review_state(&self) {
245        if self.recv.get().is_closed() {
246            self.send_reset.wake();
247
248            if let HalfState::Closed(reason) = self.send.get() {
249                // stream is closed
250                if let Some(reason) = reason {
251                    log::trace!(
252                        "{}: {:?} is closed with remote reset {:?}, dropping stream",
253                        self.tag(),
254                        self.id,
255                        reason
256                    );
257                } else {
258                    log::trace!(
259                        "{}: {:?} both sides are closed, dropping stream",
260                        self.tag(),
261                        self.id
262                    );
263                }
264                self.send_cap.wake();
265                self.con.drop_stream(self.id);
266            }
267        }
268    }
269
270    /// added new capacity, update recevice window size
271    fn add_recv_capacity(&self, size: u32) {
272        let cap = self.recv_size.get();
273        self.recv_size.set(cap + size);
274        self.recv_window.set(self.recv_window.get().dec(size));
275        log::trace!(
276            "{}: {:?} capacity incresed from {} to {}",
277            self.tag(),
278            self.id,
279            cap,
280            cap + size
281        );
282
283        // connection level recv window
284        self.con.add_recv_capacity(size);
285    }
286
287    /// check and update recevice window size
288    fn consume_capacity(&self, size: u32) {
289        let cap = self.recv_size.get();
290        let size = cap - size;
291        log::trace!(
292            "{}: {:?} capacity decresed from {} to {}",
293            self.tag(),
294            self.id,
295            cap,
296            size
297        );
298
299        self.recv_size.set(size);
300        let mut window = self.recv_window.get();
301        if let Some(val) = window.update(
302            size,
303            self.con.config().window_sz,
304            self.con.config().window_sz_threshold,
305        ) {
306            log::trace!(
307                "{}: {:?} capacity decresed below threshold {} increase by {} ({})",
308                self.tag(),
309                self.id,
310                self.con.config().window_sz_threshold,
311                val,
312                self.con.config().window_sz,
313            );
314            self.recv_window.set(window);
315            self.con.encode(WindowUpdate::new(self.id, val));
316        }
317    }
318}
319
320impl StreamRef {
321    pub(crate) fn new(id: StreamId, remote: bool, con: Connection) -> Self {
322        // if peer has accepted settings, we can use local config window size
323        // otherwise use default window size
324        let recv_window = if con.settings_processed() {
325            Window::new(con.config().window_sz)
326        } else {
327            Window::new(frame::DEFAULT_INITIAL_WINDOW_SIZE)
328        };
329        let send_window = Window::new(con.remote_window_size());
330
331        StreamRef(Rc::new(StreamState {
332            id,
333            con,
334            recv: Cell::new(HalfState::Idle),
335            recv_window: Cell::new(recv_window),
336            recv_size: Cell::new(0),
337            send: Cell::new(HalfState::Idle),
338            send_window: Cell::new(send_window),
339            send_cap: LocalWaker::new(),
340            send_reset: LocalWaker::new(),
341            error: Cell::new(None),
342            content_length: Cell::new(ContentLength::Omitted),
343            flags: Cell::new(if remote {
344                StreamFlags::REMOTE
345            } else {
346                StreamFlags::empty()
347            }),
348        }))
349    }
350
351    #[inline]
352    pub fn id(&self) -> StreamId {
353        self.0.id
354    }
355
356    #[inline]
357    pub fn tag(&self) -> &'static str {
358        self.0.con.tag()
359    }
360
361    /// Check if stream has been opened from remote side
362    #[inline]
363    pub fn is_remote(&self) -> bool {
364        self.0.flags.get().contains(StreamFlags::REMOTE)
365    }
366
367    /// Check if stream has failed
368    #[inline]
369    pub fn is_failed(&self) -> bool {
370        self.0.flags.get().contains(StreamFlags::FAILED)
371    }
372
373    pub(crate) fn service(&self) -> &'static str {
374        self.0.con.service()
375    }
376
377    pub(crate) fn send_state(&self) -> HalfState {
378        self.0.send.get()
379    }
380
381    pub(crate) fn recv_state(&self) -> HalfState {
382        self.0.recv.get()
383    }
384
385    pub(crate) fn disconnect_on_drop(&self) {
386        self.0.insert_flag(StreamFlags::DISCONNECT_ON_DROP);
387    }
388
389    pub(crate) fn is_disconnect_on_drop(&self) -> bool {
390        self.0.flags.get().contains(StreamFlags::DISCONNECT_ON_DROP)
391    }
392
393    /// Reset stream
394    ///
395    /// Returns `true` if the stream state is updated and a `Reset` frame
396    /// has been sent to the peer.
397    #[inline]
398    pub fn reset(&self, reason: Reason) -> bool {
399        if !self.0.recv.get().is_closed() || !self.0.send.get().is_closed() {
400            self.0.con.encode(Reset::new(self.0.id, reason));
401            self.0.reset_stream(Some(reason));
402            true
403        } else {
404            false
405        }
406    }
407
408    /// Get capacity instance for current stream
409    #[inline]
410    pub fn empty_capacity(&self) -> Capacity {
411        Capacity {
412            size: Cell::new(0),
413            stream: self.0.clone(),
414        }
415    }
416
417    #[inline]
418    pub(crate) fn into_stream(self) -> Stream {
419        Stream(self)
420    }
421
422    pub(crate) fn send_headers(&self, mut hdrs: Headers) {
423        hdrs.set_end_headers();
424        if hdrs.is_end_stream() {
425            self.0.state_send_close(None);
426        } else {
427            self.0.state_send_payload();
428        }
429        log::trace!(
430            "{}: send headers {:#?} eos: {:?}",
431            self.tag(),
432            hdrs,
433            hdrs.is_end_stream()
434        );
435
436        if hdrs
437            .pseudo()
438            .status
439            .is_some_and(|status| status.is_informational())
440        {
441            self.0.content_length.set(ContentLength::Head);
442        }
443        self.0.con.encode(hdrs);
444    }
445
446    pub(crate) fn set_go_away(&self, reason: Reason) {
447        self.0.remote_reset_stream(reason);
448    }
449
450    pub(crate) fn set_failed_stream(&self, err: Error<OperationError>) {
451        self.0.failed(err);
452    }
453
454    pub(crate) fn recv_headers(
455        &self,
456        hdrs: Headers,
457    ) -> Result<Option<Message>, Error<StreamError>> {
458        log::trace!(
459            "{}: processing HEADERS for {:?}:\n{:#?}\nrecv_state:{:?}, send_state: {:?}",
460            self.tag(),
461            self.0.id,
462            hdrs,
463            self.0.recv.get(),
464            self.0.send.get(),
465        );
466
467        match self.0.recv.get() {
468            HalfState::Idle => {
469                let eof = hdrs.is_end_stream();
470                if eof {
471                    self.0.state_recv_close(None);
472                } else {
473                    self.0.state_recv_payload();
474                }
475                let (pseudo, headers) = hdrs.into_parts();
476
477                if self.0.content_length.get() != ContentLength::Head
478                    && let Some(content_length) = headers.get(CONTENT_LENGTH)
479                {
480                    if let Some(v) = parse_u64(content_length.as_bytes()) {
481                        self.0.content_length.set(ContentLength::Remaining(v));
482                    } else {
483                        proto_err!(stream: "could not parse content-length; stream={:?}", self.0.id);
484                        return Err(Error::new(
485                            StreamError::InvalidContentLength,
486                            self.service(),
487                        ));
488                    }
489                }
490                Ok(Some(Message::new(pseudo, headers, eof, self)))
491            }
492            HalfState::Payload => {
493                // trailers
494                if hdrs.is_end_stream() {
495                    self.0.state_recv_close(None);
496                    Ok(Some(Message::trailers(hdrs.into_fields(), self)))
497                } else {
498                    Err(Error::new(StreamError::TrailersWithoutEos, self.service()))
499                }
500            }
501            HalfState::Closed(_) => Err(Error::new(StreamError::Closed, self.service())),
502        }
503    }
504
505    pub(crate) fn recv_data(&self, data: Data) -> Result<Option<Message>, Error<StreamError>> {
506        let cap = Capacity::new(data.payload().len() as u32, &self.0);
507        log::trace!(
508            "{}: processing DATA frame for {:?}, len: {:?}",
509            self.tag(),
510            self.0.id,
511            data.payload().len()
512        );
513
514        match self.0.recv.get() {
515            HalfState::Payload => {
516                let eof = data.is_end_stream();
517
518                // Returns `Err` when the decrement cannot be completed due to overflow
519                match self.0.content_length.get() {
520                    ContentLength::Remaining(rem) => {
521                        match rem.checked_sub(data.payload().len() as u64) {
522                            Some(val) => {
523                                self.0.content_length.set(ContentLength::Remaining(val));
524                                if eof && val != 0 {
525                                    return Err(Error::new(
526                                        StreamError::WrongPayloadLength,
527                                        self.service(),
528                                    ));
529                                }
530                            }
531                            None => {
532                                return Err(Error::new(
533                                    StreamError::WrongPayloadLength,
534                                    self.service(),
535                                ));
536                            }
537                        }
538                    }
539                    ContentLength::Head => {
540                        if !data.payload().is_empty() {
541                            return Err(Error::new(StreamError::NonEmptyPayload, self.service()));
542                        }
543                    }
544                    ContentLength::Omitted => (),
545                }
546
547                if eof {
548                    self.0.state_recv_close(None);
549                    Ok(Some(Message::eof_data(data.into_payload(), self)))
550                } else {
551                    Ok(Some(Message::data(data.into_payload(), cap, self)))
552                }
553            }
554            HalfState::Idle => Err(Error::new(
555                StreamError::Idle("DATA framed received"),
556                self.service(),
557            )),
558            HalfState::Closed(_) => Err(Error::new(StreamError::Closed, self.service())),
559        }
560    }
561
562    pub(crate) fn recv_rst_stream(&self, frm: Reset) {
563        self.0.remote_reset_stream(frm.reason());
564    }
565
566    pub(crate) fn recv_window_update_connection(&self) {
567        if self.0.flags.get().contains(StreamFlags::WAIT_FOR_CAPACITY)
568            && self.0.send_window.get().window_size() > 0
569        {
570            self.0.send_cap.wake();
571        }
572    }
573
574    pub(crate) fn recv_window_update(&self, frm: WindowUpdate) -> Result<(), Error<StreamError>> {
575        if frm.size_increment() == 0 {
576            Err(Error::new(
577                StreamError::WindowZeroUpdateValue,
578                self.service(),
579            ))
580        } else {
581            let window = self
582                .0
583                .send_window
584                .get()
585                .inc(frm.size_increment())
586                .map_err(|()| Error::new(StreamError::WindowOverflowed, self.service()))?;
587            self.0.send_window.set(window);
588
589            if window.window_size() > 0 {
590                self.0.send_cap.wake();
591            }
592            Ok(())
593        }
594    }
595
596    pub(crate) fn update_send_window(&self, upd: i32) -> Result<(), Error<StreamError>> {
597        let orig = self.0.send_window.get();
598        let window = match upd.cmp(&0) {
599            cmp::Ordering::Less => orig.dec(upd.unsigned_abs()), // We must decrease the (remote) window
600            cmp::Ordering::Greater => orig
601                .inc(upd)
602                .map_err(|()| Error::new(StreamError::WindowOverflowed, self.service()))?,
603            cmp::Ordering::Equal => return Ok(()),
604        };
605        log::trace!(
606            "{}: Updating send window size from {} to {}",
607            self.tag(),
608            orig.window_size,
609            window.window_size
610        );
611        self.0.send_window.set(window);
612        Ok(())
613    }
614
615    pub(crate) fn update_recv_window(
616        &self,
617        upd: i32,
618    ) -> Result<Option<WindowSize>, Error<StreamError>> {
619        let mut window = match upd.cmp(&0) {
620            cmp::Ordering::Less => self.0.recv_window.get().dec(upd.unsigned_abs()), // We must decrease the (local) window
621            cmp::Ordering::Greater => self
622                .0
623                .recv_window
624                .get()
625                .inc(upd)
626                .map_err(|()| Error::new(StreamError::WindowOverflowed, self.service()))?,
627            cmp::Ordering::Equal => return Ok(None),
628        };
629        if let Some(val) = window.update(
630            self.0.recv_size.get(),
631            self.0.con.config().window_sz,
632            self.0.con.config().window_sz_threshold,
633        ) {
634            self.0.recv_window.set(window);
635            Ok(Some(val))
636        } else {
637            self.0.recv_window.set(window);
638            Ok(None)
639        }
640    }
641
642    /// Send stream response
643    pub fn send_response(
644        &self,
645        status: StatusCode,
646        headers: HeaderMap,
647        eof: bool,
648    ) -> Result<(), Error<OperationError>> {
649        self.0.check_error()?;
650
651        match self.0.send.get() {
652            HalfState::Idle => {
653                let pseudo = PseudoHeaders::response(status);
654                let mut hdrs = Headers::new(self.0.id, pseudo, headers, eof);
655
656                if eof {
657                    hdrs.set_end_stream();
658                    self.0.state_send_close(None);
659                } else {
660                    self.0.state_send_payload();
661                }
662                self.0.con.encode(hdrs);
663                Ok(())
664            }
665            HalfState::Payload => Err(Error::new(OperationError::Payload, self.0.con.service())),
666            HalfState::Closed(r) => {
667                Err(Error::new(OperationError::Closed(r), self.0.con.service()))
668            }
669        }
670    }
671
672    /// Send payload
673    pub async fn send_payload(
674        &self,
675        mut res: Bytes,
676        eof: bool,
677    ) -> Result<(), Error<OperationError>> {
678        match self.0.send.get() {
679            HalfState::Payload => {
680                // check if stream is disconnected
681                self.0.check_error()?;
682
683                log::trace!(
684                    "{}: {:?} sending {} bytes, eof: {}, send: {:?}",
685                    self.0.tag(),
686                    self.0.id,
687                    res.len(),
688                    eof,
689                    self.0.send.get()
690                );
691
692                // eof and empty data
693                if eof && res.is_empty() {
694                    let mut data = Data::new(self.0.id, Bytes::new());
695                    data.set_end_stream();
696                    self.0.state_send_close(None);
697
698                    // write to io buffer
699                    self.0.con.encode(data);
700                    return Ok(());
701                }
702
703                loop {
704                    // calaculate available send window size
705                    let win = self.available_send_capacity() as usize;
706                    if win > 0 {
707                        let size =
708                            cmp::min(win, cmp::min(res.len(), self.0.con.remote_frame_size()));
709                        let mut data = if size >= res.len() {
710                            Data::new(self.0.id, mem::replace(&mut res, Bytes::new()))
711                        } else {
712                            log::trace!(
713                                "{}: {:?} sending {} out of {} bytes",
714                                self.0.tag(),
715                                self.0.id,
716                                size,
717                                res.len()
718                            );
719                            Data::new(self.0.id, res.split_to(size))
720                        };
721                        if eof && res.is_empty() {
722                            data.set_end_stream();
723                            self.0.state_send_close(None);
724                        }
725
726                        // update send window
727                        self.0
728                            .send_window
729                            .set(self.0.send_window.get().dec(size as u32));
730
731                        // update connection send window
732                        self.0.con.consume_send_window(size as u32);
733
734                        // write to io buffer
735                        self.0.con.encode(data);
736                        if res.is_empty() {
737                            return Ok(());
738                        }
739                    } else {
740                        log::trace!(
741                            "{}: Not enough sending capacity for {:?} remaining {:?}",
742                            self.0.tag(),
743                            self.0.id,
744                            res.len()
745                        );
746                        // wait for available send window
747                        self.send_capacity().await?;
748                    }
749                }
750            }
751            HalfState::Idle => Err(Error::new(OperationError::Idle, self.0.con.service())),
752            HalfState::Closed(reason) => Err(Error::new(
753                OperationError::Closed(reason),
754                self.0.con.service(),
755            )),
756        }
757    }
758
759    /// Send client trailers and close stream
760    pub fn send_trailers(&self, map: HeaderMap) {
761        if self.0.send.get() == HalfState::Payload {
762            let mut hdrs = Headers::trailers(self.0.id, map);
763            hdrs.set_end_headers();
764            hdrs.set_end_stream();
765            self.0.con.encode(hdrs);
766            self.0.state_send_close(None);
767        }
768    }
769
770    pub fn available_send_capacity(&self) -> WindowSize {
771        cmp::min(
772            self.0.send_window.get().window_size(),
773            self.0.con.send_window_size(),
774        )
775    }
776
777    pub async fn send_capacity(&self) -> Result<WindowSize, Error<OperationError>> {
778        poll_fn(|cx| self.poll_send_capacity(cx)).await
779    }
780
781    /// Check for available send capacity
782    pub fn poll_send_capacity(
783        &self,
784        cx: &Context<'_>,
785    ) -> Poll<Result<WindowSize, Error<OperationError>>> {
786        self.0.check_error()?;
787        self.0.con.check_error()?;
788
789        let win = self.available_send_capacity();
790        if win > 0 {
791            self.0.remove_flag(StreamFlags::WAIT_FOR_CAPACITY);
792            Poll::Ready(Ok(win))
793        } else {
794            self.0.insert_flag(StreamFlags::WAIT_FOR_CAPACITY);
795            self.0.send_cap.register(cx.waker());
796            Poll::Pending
797        }
798    }
799
800    /// Check if send part of stream get reset
801    pub fn poll_send_reset(&self, cx: &Context<'_>) -> Poll<Result<(), Error<OperationError>>> {
802        if self.0.send.get().is_closed() {
803            Poll::Ready(Ok(()))
804        } else {
805            self.0.check_error()?;
806            self.0.con.check_error()?;
807            self.0.send_reset.register(cx.waker());
808            Poll::Pending
809        }
810    }
811}
812
813impl PartialEq for StreamRef {
814    fn eq(&self, other: &StreamRef) -> bool {
815        Rc::as_ptr(&self.0) == Rc::as_ptr(&other.0)
816    }
817}
818
819impl ops::Deref for Stream {
820    type Target = StreamRef;
821
822    #[inline]
823    fn deref(&self) -> &Self::Target {
824        &self.0
825    }
826}
827
828impl Drop for Stream {
829    fn drop(&mut self) {
830        self.0.reset(Reason::CANCEL);
831    }
832}
833
834impl fmt::Debug for Stream {
835    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
836        let mut builder = f.debug_struct("Stream");
837        builder
838            .field("stream_id", &self.0.0.id)
839            .field("recv_state", &self.0.0.recv.get())
840            .field("send_state", &self.0.0.send.get())
841            .finish()
842    }
843}
844
845impl fmt::Debug for StreamState {
846    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
847        let mut builder = f.debug_struct("StreamState");
848        builder
849            .field("id", &self.id)
850            .field("recv", &self.recv.get())
851            .field("recv_window", &self.recv_window.get())
852            .field("recv_size", &self.recv_size.get())
853            .field("send", &self.send.get())
854            .field("send_window", &self.send_window.get())
855            .field("flags", &self.flags.get())
856            .finish()
857    }
858}
859
860pub(super) fn parse_u64(src: &[u8]) -> Option<u64> {
861    if src.len() > 19 {
862        // At danger for overflow...
863        None
864    } else {
865        let mut ret = 0;
866        for &d in src {
867            if !d.is_ascii_digit() {
868                return None;
869            }
870
871            ret *= 10;
872            ret += u64::from(d - b'0');
873        }
874
875        Some(ret)
876    }
877}