use core::{
mem,
pin::Pin,
task::{Context, Poll},
};
use crate::{
body::{Body, Frame, SizeHint},
bytes::Bytes,
error::BodyError,
};
use super::{
dispatcher::{Message, Shared},
proto::frame::{settings, stream_id::StreamId},
};
pub struct RequestBody {
stream_id: StreamId,
size: SizeHint,
ctx: Shared,
pending_window: usize,
}
impl RequestBody {
pub(super) fn new(stream_id: StreamId, size: SizeHint, ctx: Shared) -> Self {
Self {
stream_id,
size,
ctx,
pending_window: 0,
}
}
}
impl Drop for RequestBody {
fn drop(&mut self) {
let mut inner = self.ctx.borrow_mut();
let ci = &mut *inner;
ci.request_body_drop(&self.stream_id);
let size = mem::replace(&mut self.pending_window, 0);
ci.queue.push_window_update(size);
}
}
impl Body for RequestBody {
type Data = Bytes;
type Error = BodyError;
fn poll_frame(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Result<Frame<Bytes>, BodyError>>> {
let this = self.get_mut();
let flow = &mut *this.ctx.borrow_mut();
let stream = flow.stream_map.get_mut(&this.stream_id).unwrap();
if let Some(frame) = stream.recv.queue.pop_front(&mut flow.frame_buf) {
if let Some(bytes) = frame.data_ref() {
this.pending_window += bytes.len();
if this.pending_window >= settings::DEFAULT_INITIAL_WINDOW_SIZE as usize * 3 / 4 {
let window = mem::replace(&mut this.pending_window, 0);
stream.recv.window += window;
flow.queue.push_window_update(window);
flow.queue.messages.push_back(Message::WindowUpdate {
stream_id: this.stream_id,
size: window,
});
}
}
Poll::Ready(Some(Ok(frame)))
} else if let Some(e) = stream.recv.take_error() {
Poll::Ready(Some(Err(e.into())))
} else if stream.recv.is_eof() {
Poll::Ready(None)
} else {
stream.recv.waker = Some(cx.waker().clone());
Poll::Pending
}
}
fn is_end_stream(&self) -> bool {
let flow = self.ctx.borrow();
let stream = flow.stream_map.get(&self.stream_id).unwrap();
stream.recv.is_eof() && stream.recv.queue.is_empty()
}
#[inline]
fn size_hint(&self) -> SizeHint {
self.size
}
}
impl From<RequestBody> for crate::body::RequestBody {
fn from(body: RequestBody) -> Self {
crate::body::RequestBody::H2(body)
}
}