1use std::{cell::Cell, cmp, fmt, future::poll_fn, ops, rc::Rc, task::Context, task::Poll};
2
3use ntex_bytes::{BytePages, Bytes};
4use ntex_error::{Error, ErrorMapping};
5use ntex_http::{HeaderMap, StatusCode, header::CONTENT_LENGTH};
6use ntex_util::{future::Either, task::LocalWaker};
7
8use crate::error::{OperationError, StreamError};
9use crate::frame::{
10 Data, Head, Headers, Kind, PseudoHeaders, Reason, Reset, StreamId, WindowSize, WindowUpdate,
11};
12use crate::{connection::Connection, frame, message::Message, window::Window};
13
14pub struct Stream(StreamRef);
16
17#[derive(Debug)]
19pub struct Capacity {
20 size: Cell<u32>,
21 stream: Rc<StreamState>,
22}
23
24impl Capacity {
25 fn new(size: u32, stream: &Rc<StreamState>) -> Self {
26 if size != 0 {
27 stream.add_recv_capacity(size);
28 }
29
30 Self {
31 size: Cell::new(size),
32 stream: stream.clone(),
33 }
34 }
35
36 #[inline]
37 pub fn size(&self) -> usize {
39 self.size.get() as usize
40 }
41
42 pub fn consume(&self, sz: u32) {
48 let size = self.size.get();
49 if let Some(sz) = size.checked_sub(sz) {
50 #[cfg(feature = "trace")]
51 log::trace!(
52 "{}: {:?} capacity consumed from {size} to {sz}",
53 self.stream.tag(),
54 self.stream.id,
55 );
56 self.size.set(sz);
57 self.stream.consume_capacity(size - sz);
58 } else {
59 panic!("Capacity overflow");
60 }
61 }
62}
63
64impl ops::Add for Capacity {
66 type Output = Self;
67
68 fn add(self, other: Self) -> Self {
69 if Rc::ptr_eq(&self.stream, &other.stream) {
70 let size = Cell::new(self.size.get() + other.size.get());
71 self.size.set(0);
72 other.size.set(0);
73 Self {
74 size,
75 stream: self.stream.clone(),
76 }
77 } else {
78 panic!("Cannot add capacity from different streams");
79 }
80 }
81}
82
83impl ops::AddAssign for Capacity {
85 fn add_assign(&mut self, other: Self) {
86 if Rc::ptr_eq(&self.stream, &other.stream) {
87 let size = self.size.get() + other.size.get();
88 self.size.set(size);
89 other.size.set(0);
90 } else {
91 panic!("Cannot add capacity from different streams");
92 }
93 }
94}
95
96impl Drop for Capacity {
97 fn drop(&mut self) {
98 let size = self.size.get();
99 if size > 0 {
100 self.stream.consume_capacity(size);
101 }
102 }
103}
104
105#[derive(Debug, Copy, Clone, PartialEq, Eq)]
107pub(super) enum ContentLength {
108 Omitted,
109 Head,
110 Remaining(u64),
111}
112
113#[derive(Clone, Debug)]
114pub struct StreamRef(pub(crate) Rc<StreamState>);
115
116bitflags::bitflags! {
117 #[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
118 struct StreamFlags: u8 {
119 const REMOTE = 0b0000_0001;
120 const FAILED = 0b0000_0010;
121 const DISCONNECT_ON_DROP = 0b0000_0100;
122 const WAIT_FOR_CAPACITY = 0b0000_1000;
123 }
124}
125
126pub(crate) struct StreamState {
127 id: StreamId,
129 flags: Cell<StreamFlags>,
130 content_length: Cell<ContentLength>,
131 recv: Cell<HalfState>,
133 recv_window: Cell<Window>,
134 recv_size: Cell<u32>,
135 send: Cell<HalfState>,
137 send_window: Cell<Window>,
138 send_cap: LocalWaker,
139 send_reset: LocalWaker,
140 pub(crate) con: Connection,
142 error: Cell<Option<Error<OperationError>>>,
144}
145
146#[derive(Debug, Clone, Copy, PartialEq, Eq)]
147pub(crate) enum HalfState {
148 Idle,
149 Payload,
150 Closed(Option<Reason>),
151}
152
153impl HalfState {
154 pub(crate) fn is_closed(self) -> bool {
155 matches!(self, HalfState::Closed(_))
156 }
157}
158
159impl StreamState {
160 #[allow(dead_code)]
161 fn tag(&self) -> &'static str {
162 self.con.tag()
163 }
164
165 fn state_send_payload(&self) {
166 self.send.set(HalfState::Payload);
167 }
168
169 fn state_send_close(&self, reason: Option<Reason>) {
170 #[cfg(feature = "trace")]
171 log::trace!(
172 "{}: {:?} send side is closed with reason {reason:?}",
173 self.tag(),
174 self.id
175 );
176 self.send.set(HalfState::Closed(reason));
177 self.send_cap.wake();
178 self.review_state();
179 }
180
181 fn state_recv_payload(&self) {
182 self.recv.set(HalfState::Payload);
183 }
184
185 fn state_recv_close(&self, reason: Option<Reason>) {
186 #[cfg(feature = "trace")]
187 log::trace!("{}: {:?} receive side is closed", self.tag(), self.id);
188 self.recv.set(HalfState::Closed(reason));
189 self.review_state();
190 }
191
192 fn reset_stream(&self, reason: Option<Reason>) {
193 self.recv.set(HalfState::Closed(reason));
194 self.send.set(HalfState::Closed(None));
195 if let Some(reason) = reason {
196 self.error.set(Some(Error::new(
197 OperationError::LocalReset(reason),
198 self.con.service(),
199 )));
200 }
201 self.review_state();
202 }
203
204 fn remote_reset_stream(&self, reason: Reason) {
205 self.recv.set(HalfState::Closed(None));
206 self.send.set(HalfState::Closed(Some(reason)));
207 self.error.set(Some(Error::new(
208 OperationError::RemoteReset(reason),
209 self.con.service(),
210 )));
211 self.review_state();
212 }
213
214 fn failed(&self, err: Error<OperationError>) {
215 if !self.recv.get().is_closed() {
216 self.recv.set(HalfState::Closed(None));
217 }
218 if !self.send.get().is_closed() {
219 self.send.set(HalfState::Closed(None));
220 }
221 self.error.set(Some(err));
222 self.insert_flag(StreamFlags::FAILED);
223 self.review_state();
224 }
225
226 fn insert_flag(&self, f: StreamFlags) {
227 let mut flags = self.flags.get();
228 flags.insert(f);
229 self.flags.set(flags);
230 }
231
232 fn remove_flag(&self, f: StreamFlags) {
233 let mut flags = self.flags.get();
234 flags.remove(f);
235 self.flags.set(flags);
236 }
237
238 fn check_error(&self) -> Result<(), Error<OperationError>> {
239 if let Some(err) = self.error.take() {
240 self.error.set(Some(err.clone()));
241 Err(err)
242 } else {
243 Ok(())
244 }
245 }
246
247 #[allow(clippy::used_underscore_binding)]
248 fn review_state(&self) {
249 if self.recv.get().is_closed() {
250 self.send_reset.wake();
251
252 if let HalfState::Closed(_reason) = self.send.get() {
253 #[cfg(feature = "trace")]
255 if let Some(reason) = _reason {
256 log::trace!(
257 "{}: {:?} is closed with remote reset {reason:?}, dropping stream",
258 self.tag(),
259 self.id
260 );
261 } else {
262 log::trace!(
263 "{}: {:?} both sides are closed, dropping stream",
264 self.tag(),
265 self.id
266 );
267 }
268 self.send_cap.wake();
269 self.con.drop_stream(self.id);
270 }
271 }
272 }
273
274 fn add_recv_capacity(&self, size: u32) {
276 let cap = self.recv_size.get();
277 self.recv_size.set(cap + size);
278 self.recv_window.set(self.recv_window.get().dec(size));
279
280 #[cfg(feature = "trace")]
281 log::trace!(
282 "{}: {:?} capacity incresed from {cap} to {} ({size})",
283 self.tag(),
284 self.id,
285 cap + size
286 );
287
288 self.con.add_recv_capacity(size);
290 }
291
292 fn consume_capacity(&self, size: u32) {
294 let cap = self.recv_size.get();
295 let size = cap - size;
296
297 #[cfg(feature = "trace")]
298 log::trace!(
299 "{}: {:?} capacity decresed from {cap} to {size}",
300 self.tag(),
301 self.id,
302 );
303
304 self.recv_size.set(size);
305 let mut window = self.recv_window.get();
306 if let Some(val) = window.update(
307 size,
308 self.con.config().window_sz,
309 self.con.config().window_sz_threshold,
310 ) {
311 #[cfg(feature = "trace")]
312 log::trace!(
313 "{}: {:?} capacity decresed below threshold {} increase by {val} ({})",
314 self.tag(),
315 self.id,
316 self.con.config().window_sz_threshold,
317 self.con.config().window_sz,
318 );
319 self.recv_window.set(window);
320 self.con.encode(WindowUpdate::new(self.id, val));
321 }
322 }
323}
324
325impl StreamRef {
326 pub(crate) fn new(id: StreamId, remote: bool, con: Connection) -> Self {
327 let recv_window = if con.settings_processed() {
330 Window::new(con.config().window_sz)
331 } else {
332 Window::new(frame::DEFAULT_INITIAL_WINDOW_SIZE)
333 };
334 let send_window = Window::new(con.remote_window_size());
335
336 StreamRef(Rc::new(StreamState {
337 id,
338 con,
339 recv: Cell::new(HalfState::Idle),
340 recv_window: Cell::new(recv_window),
341 recv_size: Cell::new(0),
342 send: Cell::new(HalfState::Idle),
343 send_window: Cell::new(send_window),
344 send_cap: LocalWaker::new(),
345 send_reset: LocalWaker::new(),
346 error: Cell::new(None),
347 content_length: Cell::new(ContentLength::Omitted),
348 flags: Cell::new(if remote {
349 StreamFlags::REMOTE
350 } else {
351 StreamFlags::empty()
352 }),
353 }))
354 }
355
356 #[inline]
357 pub fn id(&self) -> StreamId {
358 self.0.id
359 }
360
361 #[inline]
362 pub fn tag(&self) -> &'static str {
363 self.0.con.tag()
364 }
365
366 #[inline]
368 pub fn is_remote(&self) -> bool {
369 self.0.flags.get().contains(StreamFlags::REMOTE)
370 }
371
372 #[inline]
374 pub fn is_failed(&self) -> bool {
375 self.0.flags.get().contains(StreamFlags::FAILED)
376 }
377
378 pub(crate) fn service(&self) -> &'static str {
379 self.0.con.service()
380 }
381
382 pub(crate) fn send_state(&self) -> HalfState {
383 self.0.send.get()
384 }
385
386 pub(crate) fn recv_state(&self) -> HalfState {
387 self.0.recv.get()
388 }
389
390 pub(crate) fn disconnect_on_drop(&self) {
391 self.0.insert_flag(StreamFlags::DISCONNECT_ON_DROP);
392 }
393
394 pub(crate) fn is_disconnect_on_drop(&self) -> bool {
395 self.0.flags.get().contains(StreamFlags::DISCONNECT_ON_DROP)
396 }
397
398 #[inline]
403 pub fn reset(&self, reason: Reason) -> bool {
404 if !self.0.recv.get().is_closed() || !self.0.send.get().is_closed() {
405 self.0.con.encode(Reset::new(self.0.id, reason));
406 self.0.reset_stream(Some(reason));
407 true
408 } else {
409 false
410 }
411 }
412
413 #[inline]
415 pub fn empty_capacity(&self) -> Capacity {
416 Capacity {
417 size: Cell::new(0),
418 stream: self.0.clone(),
419 }
420 }
421
422 #[inline]
423 pub(crate) fn into_stream(self) -> Stream {
424 Stream(self)
425 }
426
427 pub(crate) fn send_headers(&self, mut hdrs: Headers) {
428 hdrs.set_end_headers();
429 if hdrs.is_end_stream() {
430 self.0.state_send_close(None);
431 } else {
432 self.0.state_send_payload();
433 }
434 #[cfg(feature = "trace")]
435 log::trace!(
436 "{}: send headers {hdrs:#?} eos: {:?}",
437 self.tag(),
438 hdrs.is_end_stream()
439 );
440
441 if hdrs
442 .pseudo()
443 .status
444 .is_some_and(|status| status.is_informational())
445 {
446 self.0.content_length.set(ContentLength::Head);
447 }
448 self.0.con.encode(hdrs);
449 }
450
451 pub(crate) fn set_go_away(&self, reason: Reason) {
452 self.0.remote_reset_stream(reason);
453 }
454
455 pub(crate) fn set_failed_stream(&self, err: Error<OperationError>) {
456 self.0.failed(err);
457 }
458
459 pub(crate) fn recv_headers(
460 &self,
461 hdrs: Headers,
462 ) -> Result<Option<Message>, Error<StreamError>> {
463 #[cfg(feature = "trace")]
464 log::trace!(
465 "{}: processing HEADERS for {:?}:\n{hdrs:#?}\nrecv_state:{:?}, send_state: {:?}",
466 self.tag(),
467 self.0.id,
468 self.0.recv.get(),
469 self.0.send.get(),
470 );
471
472 match self.0.recv.get() {
473 HalfState::Idle => {
474 let eof = hdrs.is_end_stream();
475 if eof {
476 self.0.state_recv_close(None);
477 } else {
478 self.0.state_recv_payload();
479 }
480 let (pseudo, headers) = hdrs.into_parts();
481
482 if self.0.content_length.get() != ContentLength::Head
483 && let Some(content_length) = headers.get(CONTENT_LENGTH)
484 {
485 if let Some(v) = parse_u64(content_length.as_bytes()) {
486 self.0.content_length.set(ContentLength::Remaining(v));
487 } else {
488 proto_err!(stream: "could not parse content-length; stream={:?}", self.0.id);
489 return Err(Error::new(
490 StreamError::InvalidContentLength,
491 self.service(),
492 ));
493 }
494 }
495 Ok(Some(Message::new(pseudo, headers, eof, self)))
496 }
497 HalfState::Payload => {
498 if hdrs.is_end_stream() {
500 self.0.state_recv_close(None);
501 Ok(Some(Message::trailers(hdrs.into_fields(), self)))
502 } else {
503 Err(Error::new(StreamError::TrailersWithoutEos, self.service()))
504 }
505 }
506 HalfState::Closed(_) => Err(Error::new(StreamError::Closed, self.service())),
507 }
508 }
509
510 pub(crate) fn recv_data(&self, data: Data) -> Result<Option<Message>, Error<StreamError>> {
511 let cap = Capacity::new(data.payload().len() as u32, &self.0);
512
513 #[cfg(feature = "trace")]
514 log::trace!(
515 "{}: processing DATA frame for {:?}, len: {:?}",
516 self.tag(),
517 self.0.id,
518 data.payload().len()
519 );
520
521 match self.0.recv.get() {
522 HalfState::Payload => {
523 let eof = data.is_end_stream();
524
525 match self.0.content_length.get() {
527 ContentLength::Remaining(rem) => {
528 match rem.checked_sub(data.payload().len() as u64) {
529 Some(val) => {
530 self.0.content_length.set(ContentLength::Remaining(val));
531 if eof && val != 0 {
532 return Err(Error::new(
533 StreamError::WrongPayloadLength,
534 self.service(),
535 ));
536 }
537 }
538 None => {
539 return Err(Error::new(
540 StreamError::WrongPayloadLength,
541 self.service(),
542 ));
543 }
544 }
545 }
546 ContentLength::Head => {
547 if !data.payload().is_empty() {
548 return Err(Error::new(StreamError::NonEmptyPayload, self.service()));
549 }
550 }
551 ContentLength::Omitted => (),
552 }
553
554 if eof {
555 self.0.state_recv_close(None);
556 Ok(Some(Message::eof_data(data.into_payload(), self)))
557 } else {
558 Ok(Some(Message::data(data.into_payload(), cap, self)))
559 }
560 }
561 HalfState::Idle => Err(Error::new(
562 StreamError::Idle("DATA framed received"),
563 self.service(),
564 )),
565 HalfState::Closed(_) => Err(Error::new(StreamError::Closed, self.service())),
566 }
567 }
568
569 pub(crate) fn recv_rst_stream(&self, frm: Reset) {
570 self.0.remote_reset_stream(frm.reason());
571 }
572
573 pub(crate) fn recv_window_update_connection(&self) {
574 if self.0.flags.get().contains(StreamFlags::WAIT_FOR_CAPACITY)
575 && self.0.send_window.get().window_size() > 0
576 {
577 self.0.send_cap.wake();
578 }
579 }
580
581 pub(crate) fn recv_window_update(&self, frm: WindowUpdate) -> Result<(), Error<StreamError>> {
582 if frm.size_increment() == 0 {
583 Err(Error::new(
584 StreamError::WindowZeroUpdateValue,
585 self.service(),
586 ))
587 } else {
588 let window = self
589 .0
590 .send_window
591 .get()
592 .inc(frm.size_increment())
593 .map_err(|()| Error::new(StreamError::WindowOverflowed, self.service()))?;
594 self.0.send_window.set(window);
595
596 if window.window_size() > 0 {
597 self.0.send_cap.wake();
598 }
599 Ok(())
600 }
601 }
602
603 pub(crate) fn update_send_window(&self, upd: i32) -> Result<(), Error<StreamError>> {
604 let orig = self.0.send_window.get();
605 let window = match upd.cmp(&0) {
606 cmp::Ordering::Less => orig.dec(upd.unsigned_abs()), cmp::Ordering::Greater => orig
608 .inc(upd)
609 .map_err(|()| Error::new(StreamError::WindowOverflowed, self.service()))?,
610 cmp::Ordering::Equal => return Ok(()),
611 };
612 #[cfg(feature = "trace")]
613 log::trace!(
614 "{}: Updating send window size from {} to {}",
615 self.tag(),
616 orig.window_size,
617 window.window_size
618 );
619 self.0.send_window.set(window);
620 Ok(())
621 }
622
623 pub(crate) fn update_recv_window(
624 &self,
625 upd: i32,
626 ) -> Result<Option<WindowSize>, Error<StreamError>> {
627 let mut window = match upd.cmp(&0) {
628 cmp::Ordering::Less => self.0.recv_window.get().dec(upd.unsigned_abs()), cmp::Ordering::Greater => self
630 .0
631 .recv_window
632 .get()
633 .inc(upd)
634 .map_err(|()| Error::new(StreamError::WindowOverflowed, self.service()))?,
635 cmp::Ordering::Equal => return Ok(None),
636 };
637 if let Some(val) = window.update(
638 self.0.recv_size.get(),
639 self.0.con.config().window_sz,
640 self.0.con.config().window_sz_threshold,
641 ) {
642 self.0.recv_window.set(window);
643 Ok(Some(val))
644 } else {
645 self.0.recv_window.set(window);
646 Ok(None)
647 }
648 }
649
650 pub fn send_response(
652 &self,
653 status: StatusCode,
654 headers: HeaderMap,
655 eof: bool,
656 ) -> Result<(), Error<OperationError>> {
657 self.0.check_error()?;
658
659 match self.0.send.get() {
660 HalfState::Idle => {
661 let pseudo = PseudoHeaders::response(status);
662 let mut hdrs = Headers::new(self.0.id, pseudo, headers, eof);
663
664 if eof {
665 hdrs.set_end_stream();
666 self.0.state_send_close(None);
667 } else {
668 self.0.state_send_payload();
669 }
670 self.0.con.encode(hdrs);
671 Ok(())
672 }
673 HalfState::Payload => Err(Error::new(OperationError::Payload, self.0.con.service())),
674 HalfState::Closed(r) => {
675 Err(Error::new(OperationError::Closed(r), self.0.con.service()))
676 }
677 }
678 }
679
680 pub async fn send_payload<D>(&self, data: D, eof: bool) -> Result<(), Error<OperationError>>
682 where
683 Bytes: From<D>,
684 {
685 self.send_pages(Bytes::from(data), eof).await
686 }
687
688 pub async fn send_pages<D>(&self, data: D, eof: bool) -> Result<(), Error<OperationError>>
690 where
691 StreamData: From<D>,
692 {
693 let mut data = StreamData::from(data);
694
695 match self.0.send.get() {
696 HalfState::Payload => {
697 self.0.check_error()?;
699
700 #[cfg(feature = "trace")]
701 log::trace!(
702 "{}: {:?} sending {} bytes, eof: {eof}, send: {:?}",
703 self.0.tag(),
704 self.0.id,
705 res.len(),
706 self.0.send.get()
707 );
708
709 if eof && data.is_empty() {
711 let mut data = Data::new(self.0.id, Bytes::new());
712 data.set_end_stream();
713 self.0.state_send_close(None);
714
715 self.0.con.encode(data);
717 return Ok(());
718 }
719
720 loop {
721 let win = self.available_send_capacity() as usize;
723 if win > 0 {
724 let size =
725 cmp::min(win, cmp::min(data.len(), self.0.con.remote_frame_size()));
726
727 let empty = self
729 .0
730 .con
731 .encode_data_frame(|buf| {
732 data.encode(self.0.tag(), self.0.id, size, eof, buf)
733 })
734 .map_err(|_| OperationError::Disconnected)
735 .into_error()?;
736
737 if eof && empty {
738 self.0.state_send_close(None);
739 }
740
741 self.0
743 .send_window
744 .set(self.0.send_window.get().dec(size as u32));
745
746 self.0.con.consume_send_window(size as u32);
748
749 if empty {
750 return Ok(());
751 }
752 } else {
753 #[cfg(feature = "trace")]
754 log::trace!(
755 "{}: Not enough sending capacity for {:?} remaining {:?}",
756 self.0.tag(),
757 self.0.id,
758 data.len()
759 );
760 self.send_capacity().await?;
762 }
763 }
764 }
765 HalfState::Idle => Err(Error::new(OperationError::Idle, self.0.con.service())),
766 HalfState::Closed(reason) => Err(Error::new(
767 OperationError::Closed(reason),
768 self.0.con.service(),
769 )),
770 }
771 }
772
773 pub fn send_trailers(&self, map: HeaderMap) {
775 if self.0.send.get() == HalfState::Payload {
776 let mut hdrs = Headers::trailers(self.0.id, map);
777 hdrs.set_end_headers();
778 hdrs.set_end_stream();
779 self.0.con.encode(hdrs);
780 self.0.state_send_close(None);
781 }
782 }
783
784 pub fn available_send_capacity(&self) -> WindowSize {
785 cmp::min(
786 self.0.send_window.get().window_size(),
787 self.0.con.send_window_size(),
788 )
789 }
790
791 pub async fn send_capacity(&self) -> Result<WindowSize, Error<OperationError>> {
792 poll_fn(|cx| self.poll_send_capacity(cx)).await
793 }
794
795 pub fn poll_send_capacity(
797 &self,
798 cx: &Context<'_>,
799 ) -> Poll<Result<WindowSize, Error<OperationError>>> {
800 self.0.check_error()?;
801 self.0.con.check_error()?;
802
803 let win = self.available_send_capacity();
804 if win > 0 {
805 self.0.remove_flag(StreamFlags::WAIT_FOR_CAPACITY);
806 Poll::Ready(Ok(win))
807 } else {
808 self.0.insert_flag(StreamFlags::WAIT_FOR_CAPACITY);
809 self.0.send_cap.register(cx.waker());
810 Poll::Pending
811 }
812 }
813
814 pub fn poll_send_reset(&self, cx: &Context<'_>) -> Poll<Result<(), Error<OperationError>>> {
816 if self.0.send.get().is_closed() {
817 Poll::Ready(Ok(()))
818 } else {
819 self.0.check_error()?;
820 self.0.con.check_error()?;
821 self.0.send_reset.register(cx.waker());
822 Poll::Pending
823 }
824 }
825}
826
827impl PartialEq for StreamRef {
828 fn eq(&self, other: &StreamRef) -> bool {
829 Rc::as_ptr(&self.0) == Rc::as_ptr(&other.0)
830 }
831}
832
833impl ops::Deref for Stream {
834 type Target = StreamRef;
835
836 #[inline]
837 fn deref(&self) -> &Self::Target {
838 &self.0
839 }
840}
841
842impl Drop for Stream {
843 fn drop(&mut self) {
844 self.0.reset(Reason::CANCEL);
845 }
846}
847
848impl fmt::Debug for Stream {
849 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
850 let mut builder = f.debug_struct("Stream");
851 builder
852 .field("stream_id", &self.0.0.id)
853 .field("recv_state", &self.0.0.recv.get())
854 .field("send_state", &self.0.0.send.get())
855 .finish()
856 }
857}
858
859impl fmt::Debug for StreamState {
860 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
861 let mut builder = f.debug_struct("StreamState");
862 builder
863 .field("id", &self.id)
864 .field("recv", &self.recv.get())
865 .field("recv_window", &self.recv_window.get())
866 .field("recv_size", &self.recv_size.get())
867 .field("send", &self.send.get())
868 .field("send_window", &self.send_window.get())
869 .field("flags", &self.flags.get())
870 .finish()
871 }
872}
873
874pub(super) fn parse_u64(src: &[u8]) -> Option<u64> {
875 if src.len() > 19 {
876 None
878 } else {
879 let mut ret = 0;
880 for &d in src {
881 if !d.is_ascii_digit() {
882 return None;
883 }
884
885 ret *= 10;
886 ret += u64::from(d - b'0');
887 }
888
889 Some(ret)
890 }
891}
892
893#[derive(Debug)]
894pub struct StreamData(Either<Bytes, BytePages>);
895
896impl From<Bytes> for StreamData {
897 fn from(bytes: Bytes) -> Self {
898 Self(Either::Left(bytes))
899 }
900}
901
902impl From<BytePages> for StreamData {
903 fn from(bytes: BytePages) -> Self {
904 Self(Either::Right(bytes))
905 }
906}
907
908impl StreamData {
909 fn len(&self) -> usize {
910 match &self.0 {
911 Either::Left(b) => b.len(),
912 Either::Right(b) => b.len(),
913 }
914 }
915
916 fn is_empty(&self) -> bool {
917 match &self.0 {
918 Either::Left(b) => b.is_empty(),
919 Either::Right(b) => b.is_empty(),
920 }
921 }
922
923 fn encode(
924 &mut self,
925 _tag: &'static str,
926 id: StreamId,
927 mut size: usize,
928 eof: bool,
929 dst: &mut BytePages,
930 ) -> bool {
931 #[cfg(feature = "trace")]
932 log::trace!("{_tag}: {id:?} sending {size} out of {} bytes", self.len());
933
934 if size >= self.len() {
935 Head::new(Kind::Data, if eof { 0x1 } else { 0 }, id).encode(size, dst);
936 match &mut self.0 {
937 Either::Left(b) => {
938 dst.append(b.clone());
939 *b = Bytes::new();
940 }
941 Either::Right(pages) => pages.move_to(dst),
942 }
943 true
944 } else {
945 Head::new(Kind::Data, 0, id).encode(size, dst);
946
947 match &mut self.0 {
948 Either::Left(b) => dst.append(b.split_to(size)),
949 Either::Right(pages) => {
950 while let Some(mut page) = pages.take() {
951 let len = cmp::min(size, page.len());
952 size -= len;
953 if page.len() == len {
954 dst.append(page);
955 } else {
956 dst.append(page.split_to(len));
957 pages.prepend(page);
958 break;
959 }
960 }
961 }
962 }
963 false
964 }
965 }
966}