use crate::{
collection::Vector,
http::{ReqResBuffer, ReqResData, Request, StatusCode},
http2::{
CommonStream, Http2Buffer, Http2Inner, Http2RecvStatus, Http2SendStatus,
hpack_static_headers::{HpackStaticRequestHeaders, HpackStaticResponseHeaders},
misc::{
connection_state, frame_reader_rslt, manage_recurrent_receiving_of_overall_stream,
process_higher_operation_err,
},
stream_receiver::StreamOverallRecvParams,
stream_state::StreamState,
u31::U31,
window::Windows,
write_functions::send_msg,
},
misc::{Lease, LeaseMut, span::Span},
stream::StreamWriter,
sync::Arc,
};
use core::{future::poll_fn, pin::pin, task::Poll};
#[derive(Debug)]
pub struct ClientStream<HB, SW> {
inner: Arc<Http2Inner<HB, SW, true>>,
span: Span,
stream_id: U31,
windows: Windows,
}
impl<HB, SW> ClientStream<HB, SW> {
pub(crate) const fn new(
inner: Arc<Http2Inner<HB, SW, true>>,
span: Span,
stream_id: U31,
) -> Self {
Self { inner, span, stream_id, windows: Windows::new() }
}
}
impl<HB, SW> ClientStream<HB, SW>
where
HB: LeaseMut<Http2Buffer>,
SW: StreamWriter,
{
#[inline]
pub const fn common(&mut self) -> CommonStream<'_, HB, SW, true> {
let Self { inner, span, stream_id, windows: _ } = self;
CommonStream { inner, span, stream_id: *stream_id }
}
#[inline]
pub async fn recv_res(
&mut self,
mut rrb: ReqResBuffer,
) -> crate::Result<(Http2RecvStatus<StatusCode, ()>, ReqResBuffer)> {
rrb.clear();
let Self { inner, span, stream_id, windows } = self;
let rrb_opt = &mut Some(rrb);
let _e = span.enter();
_trace!("Receiving response");
let mut lock_pin = pin!(inner.hd.lock());
let rslt = poll_fn(|cx| {
let mut lock = lock_pin!(cx, inner.hd, lock_pin);
let hdpm = lock.parts_mut();
if let Some(elem) = rrb_opt.take() {
if connection_state(&inner.is_conn_open).is_closed() {
frame_reader_rslt(hdpm.frame_reader_error)?;
return Poll::Ready(Ok((Http2RecvStatus::ClosedConnection, elem)));
}
drop(hdpm.hb.sorps.insert(
*stream_id,
StreamOverallRecvParams {
body_len: 0,
content_length: None,
has_initial_header: false,
has_one_or_more_data_frames: false,
is_stream_open: true,
rrb: elem,
status_code: StatusCode::Ok,
stream_state: StreamState::HalfClosedLocal,
waker: cx.waker().clone(),
windows: *windows,
},
));
Poll::Pending
} else {
manage_recurrent_receiving_of_overall_stream(
cx,
hdpm,
&inner.is_conn_open,
*stream_id,
|_, status_code, _, _| status_code,
)
}
})
.await;
if let Err(err) = &rslt {
process_higher_operation_err(err, inner).await;
}
rslt
}
#[inline]
pub async fn send_req<RRD>(
&mut self,
enc_buffer: &mut Vector<u8>,
req: Request<RRD>,
) -> crate::Result<Http2SendStatus>
where
RRD: ReqResData,
RRD::Body: Lease<[u8]>,
{
let Self { inner, span, stream_id, windows } = self;
let _e = span.enter();
_trace!("Sending request");
let uri = req.rrd.uri();
send_msg::<_, _, true>(
req.rrd.body().lease(),
enc_buffer,
req.rrd.headers(),
inner,
(
HpackStaticRequestHeaders {
authority: uri.authority(),
method: Some(req.method),
path: uri.relative_reference_slash(),
protocol: None,
scheme: uri.scheme(),
},
HpackStaticResponseHeaders::EMPTY,
),
*stream_id,
|hdpm| {
if let Some(scrp) = hdpm.hb.scrps.remove(stream_id) {
*windows = scrp.windows;
}
},
)
.await
}
}