use std::io;
use std::pin::Pin;
use std::task::{Context, Poll};
use bytes::Bytes;
use h2::Reason;
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
use crate::other;
pub struct Stream {
pub send_stream: SendStream,
pub recv_stream: RecvStream,
}
impl Stream {
pub fn new(s1: h2::SendStream<Bytes>, s2: h2::RecvStream) -> Stream {
Stream {
send_stream: SendStream::new(s1),
recv_stream: RecvStream::new(s2),
}
}
}
pub struct RecvStream {
inner: h2::RecvStream,
buf: Bytes,
read_closed: bool,
}
impl RecvStream {
fn new(s: h2::RecvStream) -> RecvStream {
RecvStream {
inner: s,
buf: Bytes::new(),
read_closed: false,
}
}
pub fn poll_data(&mut self, cx: &mut Context<'_>) -> Poll<Option<io::Result<Bytes>>> {
self.inner
.poll_data(cx)
.map_err(|e| other(&format!("poll_data err: {:?}", e.to_string())))
}
fn release_capacity(&mut self, size: usize) -> io::Result<()> {
self.inner
.flow_control()
.release_capacity(size)
.map_err(|e| other(&e.to_string()))
}
}
pub struct SendStream {
inner: h2::SendStream<Bytes>,
}
impl SendStream {
fn new(s: h2::SendStream<Bytes>) -> SendStream {
SendStream { inner: s }
}
pub fn send_data(&mut self, data: Bytes, end_of_stream: bool) -> io::Result<()> {
self.inner.send_data(data, end_of_stream).map_err(|e| {
other(&format!(
"poll_write err: {:?}, end_of_stream {}",
e.to_string(),
end_of_stream
))
})
}
pub fn reserve_capacity(&mut self, capacity: usize) {
self.inner.reserve_capacity(capacity);
}
pub fn send_reset(&mut self, reason: Reason) {
self.inner.send_reset(reason);
}
}
impl AsyncRead for Stream {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
loop {
if !self.recv_stream.buf.is_empty() {
let pos = self.recv_stream.buf.len().min(buf.remaining());
buf.put_slice(&self.recv_stream.buf.split_to(pos));
return Poll::Ready(Ok(()));
}
if !self.recv_stream.read_closed {
if let Some(data) = ready!(self.recv_stream.poll_data(cx)) {
let data = data?;
self.recv_stream.release_capacity(data.len())?;
self.recv_stream.buf = data;
} else {
self.recv_stream.read_closed = self.recv_stream.inner.is_end_stream();
}
}
if self.recv_stream.read_closed && self.recv_stream.buf.is_empty() {
return Poll::Ready(Ok(()));
}
}
}
}
impl AsyncWrite for Stream {
fn poll_write(
mut self: Pin<&mut Self>,
_: &mut Context<'_>,
buf: &[u8],
) -> Poll<Result<usize, io::Error>> {
self.send_stream.reserve_capacity(buf.len());
self.send_stream
.send_data(Bytes::copy_from_slice(buf), false)?;
Poll::Ready(Ok(buf.len()))
}
fn poll_flush(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
self.send_stream.send_data(Bytes::new(), true)?;
Poll::Ready(Ok(()))
}
fn poll_shutdown(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
Poll::Ready(Ok(()))
}
}