wmhttp/http2/codec/
framed_write.rs

1// Copyright 2022 - 2023 Wenmeng See the COPYRIGHT
2// file at the top-level directory of this distribution.
3//
4// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
5// http://www.apache.org/licenses/LICENSE-2.0>, at your
6// option. This file may not be copied, modified, or distributed
7// except according to those terms.
8//
9// Author: tickbh
10// -----
11// Created Date: 2023/09/14 09:42:25
12
13use std::{
14    io,
15    pin::Pin,
16    task::{ready, Context, Poll},
17};
18
19use algorithm::buf::{BinaryMut, Bt};
20use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
21use webparse::http::http2::{FrameSize, DEFAULT_MAX_FRAME_SIZE};
22
23#[derive(Debug)]
24pub struct FramedWrite<T> {
25    /// Upstream `AsyncWrite`
26    inner: T,
27
28    binary: BinaryMut,
29
30    max_frame_size: FrameSize,
31}
32
33impl<T> FramedWrite<T>
34where
35    T: AsyncRead + AsyncWrite + Unpin,
36{
37    pub fn new(io: T) -> Self {
38        Self {
39            inner: io,
40            binary: BinaryMut::new(),
41            max_frame_size: DEFAULT_MAX_FRAME_SIZE,
42        }
43    }
44
45    pub fn into_io(self) -> T {
46        self.inner
47    }
48
49    pub fn get_mut(&mut self) -> &mut T {
50        &mut self.inner
51    }
52
53    pub fn get_mut_bytes(&mut self) -> &mut BinaryMut {
54        &mut self.binary
55    }
56
57    pub fn get_bytes(&self) -> &BinaryMut {
58        &self.binary
59    }
60
61    pub fn has_capacity(&self) -> bool {
62        self.binary.remaining() < self.max_frame_size as usize
63    }
64
65    pub fn poll_ready(&mut self, cx: &mut Context) -> Poll<io::Result<()>> {
66        if !self.has_capacity() {
67            // Try flushing
68            ready!(self.flush(cx))?;
69
70            if !self.has_capacity() {
71                return Poll::Pending;
72            }
73        }
74
75        Poll::Ready(Ok(()))
76    }
77
78    pub fn flush(&mut self, cx: &mut Context) -> Poll<io::Result<()>> {
79        let span = tracing::trace_span!("FramedWrite::flush");
80        let _e = span.enter();
81        if !self.binary.has_remaining() {
82            return Poll::Ready(Ok(()));
83        }
84
85        let n = ready!(Pin::new(&mut self.inner).poll_write(cx, self.binary.chunk()))?;
86        self.binary.advance(n);
87        if self.binary.remaining() == 0 && self.binary.cursor() > 10 * self.max_frame_size as usize
88        {
89            self.binary = BinaryMut::new();
90        }
91        Poll::Ready(Ok(()))
92    }
93
94    pub fn shutdown(&mut self, cx: &mut Context) -> Poll<io::Result<()>> {
95        ready!(self.flush(cx))?;
96        Pin::new(&mut self.inner).poll_shutdown(cx)
97    }
98
99    pub fn set_cache_buf(&mut self, write_buf: BinaryMut) {
100        self.binary.put_slice(write_buf.chunk());
101    }
102
103    pub fn is_write_end(&self) -> bool {
104        self.binary.is_empty()
105    }
106}
107
108impl<T: AsyncRead + Unpin> AsyncRead for FramedWrite<T> {
109    fn poll_read(
110        mut self: Pin<&mut Self>,
111        cx: &mut Context<'_>,
112        buf: &mut ReadBuf,
113    ) -> Poll<io::Result<()>> {
114        Pin::new(&mut self.inner).poll_read(cx, buf)
115    }
116}