use crate::{
http::{Headers, StatusCode},
http2::{
Http2Buffer, Http2Data, Http2RecvStatus, Http2SendStatus, SendDataMode,
hpack_static_headers::{HpackStaticRequestHeaders, HpackStaticResponseHeaders},
misc::{check_content_length, frame_reader_rslt, sorp_mut, status_recv, status_send},
send_data_mode::SendDataModeBytes,
send_msg::{
encode_headers, write_standalone_data, write_standalone_headers, write_standalone_trailers,
},
u31::U31,
window::WindowsPair,
},
misc::{Arc, LeaseMut, Lock, RefCounter, StreamWriter, Vector, facades::span::_Span},
};
use core::{
future::{Future, poll_fn},
mem,
pin::pin,
sync::atomic::AtomicBool,
task::{Poll, ready},
};
#[derive(Debug)]
pub struct CommonStream<'instance, HD, const IS_CLIENT: bool> {
pub(crate) hd: &'instance mut HD,
pub(crate) is_conn_open: &'instance Arc<AtomicBool>,
pub(crate) span: &'instance mut _Span,
pub(crate) stream_id: U31,
}
impl<HB, HD, SW, const IS_CLIENT: bool> CommonStream<'_, HD, IS_CLIENT>
where
HB: LeaseMut<Http2Buffer>,
HD: RefCounter,
HD::Item: Lock<Resource = Http2Data<HB, SW, IS_CLIENT>>,
SW: StreamWriter,
{
#[inline]
pub async fn clear(&self, linger: bool) -> crate::Result<()> {
if linger {
crate::misc::sleep(core::time::Duration::from_millis(50)).await?;
}
let mut lock = self.hd.lock().await;
let hdpm = lock.parts_mut();
if let Some(elem) = hdpm.hb.scrp.remove(&self.stream_id) {
elem.waker.wake();
}
if let Some(elem) = hdpm.hb.sorp.remove(&self.stream_id) {
elem.waker.wake();
}
Ok(())
}
#[inline]
pub async fn recv_data(&mut self) -> crate::Result<Http2RecvStatus<Vector<u8>, Vector<u8>>> {
let _e = self.span._enter();
_trace!("Fetching data");
let mut pin = pin!(self.hd.lock());
poll_fn(|cx| {
let mut lock = lock_pin!(cx, self.hd, pin);
let hdpm = lock.parts_mut();
let sorp = sorp_mut(&mut hdpm.hb.sorp, self.stream_id)?;
if let Some(elem) = status_recv(self.is_conn_open, sorp, |local_sorp| {
check_content_length(local_sorp)?;
Ok(mem::take(&mut local_sorp.rrb.body))
})? {
return Poll::Ready(Ok(elem));
}
if sorp.has_one_or_more_data_frames && !sorp.rrb.body.is_empty() {
frame_reader_rslt(hdpm.frame_reader_error)?;
let rslt = sorp.rrb.body.clone();
sorp.rrb.body.clear();
Poll::Ready(Ok(Http2RecvStatus::Ongoing(rslt)))
} else {
sorp.waker.clone_from(cx.waker());
Poll::Pending
}
})
.await
}
#[inline]
pub async fn recv_trailers(&mut self) -> crate::Result<Http2RecvStatus<Headers, ()>> {
let _e = self.span._enter();
_trace!("Fetching trailers");
let mut pin = pin!(self.hd.lock());
poll_fn(|cx| {
let mut lock = lock_pin!(cx, self.hd, pin);
let hdpm = lock.parts_mut();
let sorp = sorp_mut(&mut hdpm.hb.sorp, self.stream_id)?;
if let Some(elem) = status_recv(self.is_conn_open, sorp, |local_sorp| {
Ok(mem::take(&mut local_sorp.rrb.headers))
})? {
return Poll::Ready(Ok(elem));
}
sorp.waker.clone_from(cx.waker());
frame_reader_rslt(hdpm.frame_reader_error)?;
Poll::Pending
})
.await
}
#[inline]
pub async fn release_capacity(&mut self, value: u32) -> crate::Result<()> {
let mut lock = self.hd.lock().await;
let hdpm = lock.parts_mut();
let elem = sorp_mut(&mut hdpm.hb.sorp, self.stream_id)?;
let mut wp = WindowsPair::new(hdpm.windows, &mut elem.windows);
wp.withdrawn_recv(
hdpm.hp,
self.is_conn_open,
hdpm.stream_writer,
self.stream_id,
U31::from_u32(value),
)
.await
}
#[inline]
pub async fn reserve_capacity(&mut self, value: u32) -> crate::Result<()> {
let mut lock = self.hd.lock().await;
let hdpm = lock.parts_mut();
let elem = sorp_mut(&mut hdpm.hb.sorp, self.stream_id)?;
let mut wp = WindowsPair::new(hdpm.windows, &mut elem.windows);
wp.withdrawn_send(Some(self.stream_id), U31::from_u32(value))
}
#[inline]
pub async fn send_data<'bytes, B, const IS_SCATTERED: bool>(
&mut self,
mut data: SendDataMode<B, IS_SCATTERED>,
is_eos: bool,
) -> crate::Result<Http2SendStatus>
where
B: SendDataModeBytes<'bytes, IS_SCATTERED>,
{
let _e = self.span._enter();
_trace!("Sending data");
let mut has_data = false;
let mut pin = pin!(self.hd.lock());
poll_fn(|cx| {
let mut lock = lock_pin!(cx, self.hd, pin);
let hdpm = lock.parts_mut();
let sorp = sorp_mut(&mut hdpm.hb.sorp, self.stream_id)?;
if let Some(elem) = status_send::<false>(self.is_conn_open, sorp) {
return Poll::Ready(Ok(elem));
}
let mut wp = WindowsPair::new(hdpm.windows, &mut sorp.windows);
let Ok(available_send @ 1..=u32::MAX) = u32::try_from(wp.available_send()) else {
cx.waker().wake_by_ref();
return Poll::Pending;
};
let fut = write_standalone_data(
available_send,
&mut data,
is_eos,
&mut has_data,
false,
self.is_conn_open,
hdpm.hps.max_frame_len,
hdpm.stream_writer,
self.stream_id,
&mut wp,
);
let _ = ready!(pin!(fut).poll(cx))?;
if has_data {
Poll::Ready(Ok(Http2SendStatus::Ok))
} else {
cx.waker().wake_by_ref();
Poll::Pending
}
})
.await
}
send_go_away_method!();
#[inline]
pub async fn send_headers(
&mut self,
headers: &Headers,
is_eos: bool,
status_code: StatusCode,
) -> crate::Result<Http2SendStatus> {
let _e = self.span._enter();
_trace!("Sending headers");
let mut guard = self.hd.lock().await;
let hdpm = guard.parts_mut();
let sorp = sorp_mut(&mut hdpm.hb.sorp, self.stream_id)?;
if let Some(elem) = status_send::<false>(self.is_conn_open, sorp) {
return Ok(elem);
}
let hsreh = HpackStaticResponseHeaders { status_code: Some(status_code) };
encode_headers::<false>(
headers,
(&mut hdpm.hb.hpack_enc, &mut hdpm.hb.hpack_enc_buffer),
(HpackStaticRequestHeaders::EMPTY, hsreh),
)?;
let _ = write_standalone_headers::<_, IS_CLIENT>(
&mut hdpm.hb.hpack_enc_buffer,
(HpackStaticRequestHeaders::EMPTY, hsreh),
self.is_conn_open,
is_eos,
hdpm.hps.max_frame_len,
hdpm.stream_writer,
self.stream_id,
)
.await?;
Ok(Http2SendStatus::Ok)
}
#[inline]
pub async fn send_reset(&self, error_code: crate::http2::Http2ErrorCode) {
let mut guard = self.hd.lock().await;
let hdpm = guard.parts_mut();
let _ = crate::http2::misc::send_reset_stream(
error_code,
&mut hdpm.hb.scrp,
&mut hdpm.hb.sorp,
hdpm.stream_writer,
self.stream_id,
)
.await;
}
#[inline]
pub async fn send_trailers(&mut self, trailers: &Headers) -> crate::Result<Http2SendStatus> {
let _e = self.span._enter();
_trace!("Sending {} trailers", trailers.headers_len());
let mut lock = self.hd.lock().await;
let hdpm = lock.parts_mut();
let sorp = sorp_mut(&mut hdpm.hb.sorp, self.stream_id)?;
if let Some(elem) = status_send::<false>(self.is_conn_open, sorp) {
return Ok(elem);
}
write_standalone_trailers(
trailers,
(&mut hdpm.hb.hpack_enc, &mut hdpm.hb.hpack_enc_buffer),
self.is_conn_open,
hdpm.hps.max_frame_len,
hdpm.stream_writer,
self.stream_id,
)
.await?;
Ok(Http2SendStatus::Ok)
}
#[inline]
pub const fn stream_id(&self) -> u32 {
self.stream_id.u32()
}
#[inline]
pub async fn windows(&self) -> crate::Result<crate::http2::Windows> {
let mut lock = self.hd.lock().await;
let hdpm = lock.parts_mut();
let elem = sorp_mut(&mut hdpm.hb.sorp, self.stream_id)?;
Ok(elem.windows)
}
}