wmhttp/http2/codec/
framed_write.rs1use std::{
14 io,
15 pin::Pin,
16 task::{ready, Context, Poll},
17};
18
19use algorithm::buf::{BinaryMut, Bt};
20use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
21use webparse::http::http2::{FrameSize, DEFAULT_MAX_FRAME_SIZE};
22
23#[derive(Debug)]
24pub struct FramedWrite<T> {
25 inner: T,
27
28 binary: BinaryMut,
29
30 max_frame_size: FrameSize,
31}
32
33impl<T> FramedWrite<T>
34where
35 T: AsyncRead + AsyncWrite + Unpin,
36{
37 pub fn new(io: T) -> Self {
38 Self {
39 inner: io,
40 binary: BinaryMut::new(),
41 max_frame_size: DEFAULT_MAX_FRAME_SIZE,
42 }
43 }
44
45 pub fn into_io(self) -> T {
46 self.inner
47 }
48
49 pub fn get_mut(&mut self) -> &mut T {
50 &mut self.inner
51 }
52
53 pub fn get_mut_bytes(&mut self) -> &mut BinaryMut {
54 &mut self.binary
55 }
56
57 pub fn get_bytes(&self) -> &BinaryMut {
58 &self.binary
59 }
60
61 pub fn has_capacity(&self) -> bool {
62 self.binary.remaining() < self.max_frame_size as usize
63 }
64
65 pub fn poll_ready(&mut self, cx: &mut Context) -> Poll<io::Result<()>> {
66 if !self.has_capacity() {
67 ready!(self.flush(cx))?;
69
70 if !self.has_capacity() {
71 return Poll::Pending;
72 }
73 }
74
75 Poll::Ready(Ok(()))
76 }
77
78 pub fn flush(&mut self, cx: &mut Context) -> Poll<io::Result<()>> {
79 let span = tracing::trace_span!("FramedWrite::flush");
80 let _e = span.enter();
81 if !self.binary.has_remaining() {
82 return Poll::Ready(Ok(()));
83 }
84
85 let n = ready!(Pin::new(&mut self.inner).poll_write(cx, self.binary.chunk()))?;
86 self.binary.advance(n);
87 if self.binary.remaining() == 0 && self.binary.cursor() > 10 * self.max_frame_size as usize
88 {
89 self.binary = BinaryMut::new();
90 }
91 Poll::Ready(Ok(()))
92 }
93
94 pub fn shutdown(&mut self, cx: &mut Context) -> Poll<io::Result<()>> {
95 ready!(self.flush(cx))?;
96 Pin::new(&mut self.inner).poll_shutdown(cx)
97 }
98
99 pub fn set_cache_buf(&mut self, write_buf: BinaryMut) {
100 self.binary.put_slice(write_buf.chunk());
101 }
102
103 pub fn is_write_end(&self) -> bool {
104 self.binary.is_empty()
105 }
106}
107
108impl<T: AsyncRead + Unpin> AsyncRead for FramedWrite<T> {
109 fn poll_read(
110 mut self: Pin<&mut Self>,
111 cx: &mut Context<'_>,
112 buf: &mut ReadBuf,
113 ) -> Poll<io::Result<()>> {
114 Pin::new(&mut self.inner).poll_read(cx, buf)
115 }
116}