use crate::{
http::{Method, MsgBufferString, MsgData, Protocol, Response, u31::U31},
http2::{
CommonStream, Http2Buffer, Http2Inner, Http2RecvStatus, Http2SendStatus,
hpack_static_headers::{HpackStaticRequestHeaders, HpackStaticResponseHeaders},
misc::{manage_recurrent_receiving_of_overall_stream, process_higher_operation_err},
stream_receiver::StreamControlRecvParams,
write_functions::send_msg,
},
misc::{Lease, LeaseMut, SingleTypeStorage, span::Span},
stream::StreamWriter,
sync::Arc,
};
use core::{future::poll_fn, pin::pin, task::Waker};
#[derive(Debug)]
pub struct ServerStream<HB, SW> {
inner: Arc<Http2Inner<HB, SW, false>>,
linger: bool,
method: Method,
protocol: Option<Protocol>,
span: Span,
stream_id: U31,
}
impl<HB, SW> ServerStream<HB, SW>
where
HB: LeaseMut<Http2Buffer>,
SW: StreamWriter,
{
pub(crate) const fn new(
inner: Arc<Http2Inner<HB, SW, false>>,
linger: bool,
method: Method,
protocol: Option<Protocol>,
span: Span,
stream_id: U31,
) -> Self {
Self { inner, linger, method, protocol, span, stream_id }
}
#[inline]
pub const fn common(&mut self) -> CommonStream<'_, HB, SW, false> {
let Self { inner, linger, method: _, protocol: _, span, stream_id } = self;
CommonStream { inner, linger: *linger, span, stream_id: *stream_id }
}
#[inline]
pub const fn method(&self) -> Method {
self.method
}
#[inline]
pub const fn protocol(&self) -> Option<Protocol> {
self.protocol
}
#[inline]
pub async fn recv_req(&mut self) -> crate::Result<(Http2RecvStatus<(), ()>, MsgBufferString)> {
let Self { inner, linger: _, method: _, protocol: _, span, stream_id } = self;
let _e = span.enter();
_trace!("Receiving request");
let rslt = {
let mut lock_pin = pin!(inner.hd.lock());
poll_fn(|cx| {
let mut lock = lock_pin!(cx, inner.hd, lock_pin);
manage_recurrent_receiving_of_overall_stream(
cx,
lock.parts_mut(),
&inner.is_conn_open,
*stream_id,
|hdpm, _, stream_state, windows| {
drop(hdpm.hb.scrps.insert(
*stream_id,
StreamControlRecvParams {
is_stream_open: true,
stream_state,
waker: Waker::noop().clone(),
windows,
},
));
},
)
})
.await
};
if let Err(err) = &rslt {
process_higher_operation_err(err, inner).await;
}
rslt
}
#[inline]
pub async fn send_res<MD>(&mut self, res: Response<MD>) -> crate::Result<Http2SendStatus>
where
MD: MsgData,
MD::Body: Lease<[u8]>,
{
let Self { inner, linger: _, method: _, protocol: _, span, stream_id } = self;
let _e = span.enter();
_trace!("Sending response");
let hss = send_msg::<_, _, false>(
res.msg_data.body().lease(),
res.msg_data.headers(),
inner,
(
HpackStaticRequestHeaders::EMPTY,
HpackStaticResponseHeaders { status_code: Some(res.status_code) },
),
*stream_id,
|_| {},
)
.await?;
if !matches!(hss, Http2SendStatus::ClosedConnection) {
return Ok(hss);
}
Ok(Http2SendStatus::Ok)
}
}
impl<HB, SW> Lease<ServerStream<HB, SW>> for ServerStream<HB, SW> {
#[inline]
fn lease(&self) -> &ServerStream<HB, SW> {
self
}
}
impl<HB, SW> LeaseMut<ServerStream<HB, SW>> for ServerStream<HB, SW> {
#[inline]
fn lease_mut(&mut self) -> &mut ServerStream<HB, SW> {
self
}
}
impl<HB, SW> SingleTypeStorage for ServerStream<HB, SW> {
type Item = (HB, SW);
}
impl<HB, SW> Clone for ServerStream<HB, SW> {
#[inline]
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
linger: self.linger,
method: self.method,
protocol: self.protocol,
span: self.span.clone(),
stream_id: self.stream_id,
}
}
}