dianmeng/protocol/http2/codec/
mod.rs

1mod error;
2mod framed_read;
3mod framed_write;
4
5use std::io;
6use std::pin::Pin;
7use std::sync::{Arc, RwLock};
8use std::task::{Context, Poll};use tokio::io::{Interest, Ready, AsyncReadExt};
9
10use futures_core::Stream;
11use tokio::io::{AsyncRead, AsyncWrite};
12use tokio_util::codec::length_delimited;
13use webparse::BinaryMut;
14use webparse::http::http2::encoder::Encoder;
15use webparse::http::http2::frame::Frame;
16use webparse::http::http2::{HeaderIndex, DEFAULT_MAX_FRAME_SIZE, DEFAULT_SETTINGS_HEADER_TABLE_SIZE, MAX_MAX_FRAME_SIZE};
17
18use crate::ProtResult;
19
20pub use self::framed_read::FramedRead;
21pub use self::framed_write::FramedWrite;
22
23
24#[derive(Debug)]
25pub struct Codec<T> {
26    inner: FramedRead<FramedWrite<T>>,
27    header_index: Arc<RwLock<HeaderIndex>>,
28    header_table_size: usize,
29    max_send_frame_size: usize,
30}
31
32impl<T> Codec<T>
33where
34    T: AsyncRead + AsyncWrite + Unpin,
35{
36    /// Returns a new `Codec` with the default max frame size
37    #[inline]
38    pub fn new(io: T) -> Self {
39        Self::with_max_recv_frame_size(io, DEFAULT_MAX_FRAME_SIZE as usize)
40    }
41
42    /// Returns a new `Codec` with the given maximum frame size
43    pub fn with_max_recv_frame_size(io: T, max_frame_size: usize) -> Self {
44        // Wrap with writer
45        let framed_write = FramedWrite::new(io);
46
47        // Delimit the frames
48        let delimited = length_delimited::Builder::new()
49            .big_endian()
50            .length_field_length(3)
51            .length_adjustment(9)
52            .num_skip(0) // Don't skip the header
53            .new_read(framed_write);
54
55        let mut inner = FramedRead::new(delimited);
56
57        // Use FramedRead's method since it checks the value is within range.
58        // inner.set_max_frame_size(max_frame_size);
59
60        Codec {
61            inner,
62            header_index: Arc::new(RwLock::new(HeaderIndex::new())),
63            header_table_size: DEFAULT_SETTINGS_HEADER_TABLE_SIZE,
64            max_send_frame_size: MAX_MAX_FRAME_SIZE as usize,
65        }
66    }
67
68    pub fn get_mut(&mut self) -> &mut T {
69        self.inner.get_mut().get_mut()
70    }
71
72    // pub async fn ready(&self, interest: Interest) -> io::Result<Ready> {
73    //     // self.get_mut().read_exact(buf)
74    // }
75
76    /// Returns `Ready` when the codec can buffer a frame
77    pub fn poll_ready(&mut self, cx: &mut Context) -> Poll<io::Result<()>> {
78        self.framed_write().poll_ready(cx)
79    }
80
81    /// Returns `Ready` when the codec can buffer a frame
82    pub fn poll_flush(&mut self, cx: &mut Context) -> Poll<io::Result<()>> {
83        self.framed_write().flush(cx)
84    }
85
86    fn framed_write(&mut self) -> &mut FramedWrite<T> {
87        self.inner.get_mut()
88    }
89
90    pub fn send_frame(&mut self, frame: Frame) -> ProtResult<usize> {
91        let mut encoder = Encoder::new_index(self.header_index.clone(), self.max_send_frame_size);
92        let usize = frame.encode(self.framed_write().get_bytes(), &mut encoder)?;
93        Ok(usize)
94    }
95
96    pub fn set_send_header_table_size(&mut self, size: usize) {
97        self.header_table_size = size;
98    }
99    
100    pub fn set_max_send_frame_size(&mut self, size: usize) {
101        self.max_send_frame_size = size;
102    }
103
104    pub fn shutdown(&mut self, cx: &mut Context) -> Poll<io::Result<()>> {
105        self.framed_write().shutdown(cx)
106    }
107
108    pub fn set_cache_buf(&mut self, read_buf: BinaryMut, write_buf: BinaryMut) {
109        self.inner.set_cache_buf(read_buf);
110        self.framed_write().set_cache_buf(write_buf);
111    }
112}
113
114impl<T> Stream for Codec<T>
115where
116    T: AsyncRead + Unpin,
117{
118    type Item = ProtResult<Frame>;
119
120    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
121        Pin::new(&mut self.inner).poll_next(cx)
122    }
123}