use super::{ClosedReason, H2Driver};
use crate::h2::{H2ErrorCode, H2Settings, frame};
use futures_lite::io::{AsyncRead, AsyncWrite};
use std::{
io,
pin::Pin,
task::{Context, Poll, ready},
};
impl<T> H2Driver<T>
where
T: AsyncRead + AsyncWrite + Unpin + Send,
{
pub(super) fn poll_flush_outbound(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
while self.write_cursor < self.write_buf.len() {
let n = ready!(
Pin::new(&mut self.transport).poll_write(cx, &self.write_buf[self.write_cursor..])
)?;
if n == 0 {
return Poll::Ready(Err(io::Error::from(io::ErrorKind::WriteZero)));
}
self.write_cursor += n;
}
self.write_buf.clear();
self.write_cursor = 0;
if self.write_flush_pending {
ready!(Pin::new(&mut self.transport).poll_flush(cx))?;
self.write_flush_pending = false;
}
Poll::Ready(Ok(()))
}
pub(super) fn queue_frame(
&mut self,
max_len: usize,
encode: impl FnOnce(&mut [u8]) -> Option<usize>,
) {
let start = self.write_buf.len();
self.write_buf.resize(start + max_len, 0);
let n = encode(&mut self.write_buf[start..]).expect("buffer sized from max_len");
self.write_buf.truncate(start + n);
self.write_flush_pending = true;
}
pub(super) fn queue_settings(&mut self) {
let settings = H2Settings::from_config(self.connection.context().config());
self.queue_frame(frame::settings::encoded_len(&settings), |buf| {
frame::settings::encode(&settings, buf)
});
}
pub(super) fn queue_client_preface(&mut self) {
self.write_buf
.extend_from_slice(super::recv::CLIENT_PREFACE);
self.write_flush_pending = true;
}
pub(super) fn queue_settings_ack(&mut self) {
self.queue_frame(
frame::settings::ACK_ENCODED_LEN,
frame::settings::encode_ack,
);
}
pub(super) fn queue_ping_ack(&mut self, opaque_data: [u8; 8]) {
self.queue_frame(frame::ping::ENCODED_LEN, |buf| {
frame::ping::encode(opaque_data, true, buf)
});
}
pub(super) fn queue_active_ping(&mut self, opaque_data: [u8; 8]) {
self.queue_frame(frame::ping::ENCODED_LEN, |buf| {
frame::ping::encode(opaque_data, false, buf)
});
}
pub(super) fn queue_window_update(&mut self, stream_id: u32, increment: u32) {
self.queue_frame(frame::window_update::ENCODED_LEN, |buf| {
frame::window_update::encode(stream_id, increment, buf)
});
}
pub(super) fn queue_goaway(&mut self, last_stream_id: u32, code: H2ErrorCode) {
self.queue_frame(frame::goaway::encoded_len(0), |buf| {
frame::goaway::encode(last_stream_id, code, &[], buf)
});
}
pub(super) fn queue_rst_stream(&mut self, stream_id: u32, code: H2ErrorCode) {
self.queue_frame(frame::rst_stream::ENCODED_LEN, |buf| {
frame::rst_stream::encode(stream_id, code, buf)
});
self.closed_streams.record(stream_id, ClosedReason::Reset);
}
}