dianmeng/protocol/http2/codec/
mod.rs1mod error;
2mod framed_read;
3mod framed_write;
4
5use std::io;
6use std::pin::Pin;
7use std::sync::{Arc, RwLock};
8use std::task::{Context, Poll};use tokio::io::{Interest, Ready, AsyncReadExt};
9
10use futures_core::Stream;
11use tokio::io::{AsyncRead, AsyncWrite};
12use tokio_util::codec::length_delimited;
13use webparse::BinaryMut;
14use webparse::http::http2::encoder::Encoder;
15use webparse::http::http2::frame::Frame;
16use webparse::http::http2::{HeaderIndex, DEFAULT_MAX_FRAME_SIZE, DEFAULT_SETTINGS_HEADER_TABLE_SIZE, MAX_MAX_FRAME_SIZE};
17
18use crate::ProtResult;
19
20pub use self::framed_read::FramedRead;
21pub use self::framed_write::FramedWrite;
22
23
24#[derive(Debug)]
25pub struct Codec<T> {
26 inner: FramedRead<FramedWrite<T>>,
27 header_index: Arc<RwLock<HeaderIndex>>,
28 header_table_size: usize,
29 max_send_frame_size: usize,
30}
31
32impl<T> Codec<T>
33where
34 T: AsyncRead + AsyncWrite + Unpin,
35{
36 #[inline]
38 pub fn new(io: T) -> Self {
39 Self::with_max_recv_frame_size(io, DEFAULT_MAX_FRAME_SIZE as usize)
40 }
41
42 pub fn with_max_recv_frame_size(io: T, max_frame_size: usize) -> Self {
44 let framed_write = FramedWrite::new(io);
46
47 let delimited = length_delimited::Builder::new()
49 .big_endian()
50 .length_field_length(3)
51 .length_adjustment(9)
52 .num_skip(0) .new_read(framed_write);
54
55 let mut inner = FramedRead::new(delimited);
56
57 Codec {
61 inner,
62 header_index: Arc::new(RwLock::new(HeaderIndex::new())),
63 header_table_size: DEFAULT_SETTINGS_HEADER_TABLE_SIZE,
64 max_send_frame_size: MAX_MAX_FRAME_SIZE as usize,
65 }
66 }
67
68 pub fn get_mut(&mut self) -> &mut T {
69 self.inner.get_mut().get_mut()
70 }
71
72 pub fn poll_ready(&mut self, cx: &mut Context) -> Poll<io::Result<()>> {
78 self.framed_write().poll_ready(cx)
79 }
80
81 pub fn poll_flush(&mut self, cx: &mut Context) -> Poll<io::Result<()>> {
83 self.framed_write().flush(cx)
84 }
85
86 fn framed_write(&mut self) -> &mut FramedWrite<T> {
87 self.inner.get_mut()
88 }
89
90 pub fn send_frame(&mut self, frame: Frame) -> ProtResult<usize> {
91 let mut encoder = Encoder::new_index(self.header_index.clone(), self.max_send_frame_size);
92 let usize = frame.encode(self.framed_write().get_bytes(), &mut encoder)?;
93 Ok(usize)
94 }
95
96 pub fn set_send_header_table_size(&mut self, size: usize) {
97 self.header_table_size = size;
98 }
99
100 pub fn set_max_send_frame_size(&mut self, size: usize) {
101 self.max_send_frame_size = size;
102 }
103
104 pub fn shutdown(&mut self, cx: &mut Context) -> Poll<io::Result<()>> {
105 self.framed_write().shutdown(cx)
106 }
107
108 pub fn set_cache_buf(&mut self, read_buf: BinaryMut, write_buf: BinaryMut) {
109 self.inner.set_cache_buf(read_buf);
110 self.framed_write().set_cache_buf(write_buf);
111 }
112}
113
114impl<T> Stream for Codec<T>
115where
116 T: AsyncRead + Unpin,
117{
118 type Item = ProtResult<Frame>;
119
120 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
121 Pin::new(&mut self.inner).poll_next(cx)
122 }
123}