use crate::{
http::{MsgBufferString, MsgData, Request, StatusCode, u31::U31},
http2::{
CommonStream, Http2Buffer, Http2Inner, Http2RecvStatus, Http2SendStatus,
hpack_static_headers::{HpackStaticRequestHeaders, HpackStaticResponseHeaders},
misc::{manage_recurrent_receiving_of_overall_stream, process_higher_operation_err},
stream_receiver::StreamOverallRecvParams,
stream_state::StreamState,
window::Windows,
write_functions::send_msg,
},
misc::{Lease, LeaseMut, span::Span},
stream::StreamWriter,
sync::Arc,
};
use core::{future::poll_fn, pin::pin, task::Waker};
#[derive(Debug)]
pub struct ClientStream<HB, SW> {
inner: Arc<Http2Inner<HB, SW, true>>,
linger: bool,
span: Span,
stream_id: U31,
windows: Windows,
}
impl<HB, SW> ClientStream<HB, SW> {
pub(crate) const fn new(
inner: Arc<Http2Inner<HB, SW, true>>,
linger: bool,
span: Span,
stream_id: U31,
) -> Self {
Self { inner, linger, span, stream_id, windows: Windows::new() }
}
}
impl<HB, SW> ClientStream<HB, SW>
where
HB: LeaseMut<Http2Buffer>,
SW: StreamWriter,
{
#[inline]
pub const fn common(&mut self) -> CommonStream<'_, HB, SW, true> {
let Self { inner, linger, span, stream_id, windows: _ } = self;
CommonStream { inner, linger: *linger, span, stream_id: *stream_id }
}
#[inline]
pub async fn recv_res(
&mut self,
) -> crate::Result<(Http2RecvStatus<StatusCode, ()>, MsgBufferString)> {
let Self { inner, linger: _, span, stream_id, windows: _ } = self;
let _e = span.enter();
_trace!("Receiving response");
let mut lock_pin = pin!(inner.hd.lock());
let rslt = poll_fn(|cx| {
let mut lock = lock_pin!(cx, inner.hd, lock_pin);
let hdpm = lock.parts_mut();
manage_recurrent_receiving_of_overall_stream(
cx,
hdpm,
&inner.is_conn_open,
*stream_id,
|_, status_code, _, _| status_code,
)
})
.await;
if let Err(err) = &rslt {
process_higher_operation_err(err, inner).await;
}
rslt
}
#[inline]
pub async fn send_req<MD>(&mut self, req: Request<MD>) -> crate::Result<Http2SendStatus>
where
MD: MsgData,
MD::Body: Lease<[u8]>,
{
let Self { inner, linger: _, span, stream_id, windows } = self;
let _e = span.enter();
_trace!("Sending request");
let uri = req.msg_data.uri();
send_msg::<_, _, true>(
req.msg_data.body().lease(),
req.msg_data.headers(),
inner,
(
HpackStaticRequestHeaders {
authority: uri.authority(),
method: Some(req.method),
path: uri.relative_reference_slash(),
protocol: None,
scheme: uri.scheme(),
},
HpackStaticResponseHeaders::EMPTY,
),
*stream_id,
|hdpm| {
if let Some(scrp) = hdpm.hb.scrps.remove(stream_id) {
*windows = scrp.windows;
}
drop(hdpm.hb.sorps.insert(
*stream_id,
StreamOverallRecvParams {
body_len: 0,
content_length: None,
has_initial_header: false,
has_one_or_more_data_frames: false,
is_stream_open: true,
msg_buffer: MsgBufferString::default(),
status_code: StatusCode::Ok,
stream_state: StreamState::HalfClosedLocal,
waker: Waker::noop().clone(),
windows: *windows,
},
));
},
)
.await
}
}