wtx 0.43.0

A collection of different transport implementations and related tools focused primarily on web technologies.
Documentation
use crate::{
  collection::ArrayVectorU8,
  http::{HttpError, ReqResBuffer, StatusCode},
  http2::{
    Http2Error, Http2ErrorCode, Http2Params, Scrp, Sorp,
    data_frame::DataFrame,
    frame_init::FrameInit,
    hpack_decoder::HpackDecoder,
    http2_params_send::Http2ParamsSend,
    initial_server_stream_remote::InitialServerStreamRemote,
    misc::{protocol_err, read_header_and_continuations, server_header_stream_state, sorp_mut},
    reader_data::ReaderData,
    stream_receiver::StreamOverallRecvParams,
    stream_state::StreamState,
    u31::U31,
    window::{Windows, WindowsPair},
    window_update_frame::WindowUpdateFrame,
  },
  stream::StreamReader,
  sync::{AtomicBool, AtomicWaker},
};
use core::task::Waker;

#[derive(Debug)]
pub(crate) struct ProcessReceiptFrameTy<'instance, SR> {
  pub(crate) conn_windows: &'instance mut Windows,
  pub(crate) fi: FrameInit,
  pub(crate) hp: &'instance mut Http2Params,
  pub(crate) hpack_dec: &'instance mut HpackDecoder,
  pub(crate) hps: &'instance mut Http2ParamsSend,
  pub(crate) is_conn_open: &'instance AtomicBool,
  pub(crate) last_stream_id: &'instance mut U31,
  pub(crate) rd: &'instance mut ReaderData<SR>,
  pub(crate) read_frame_waker: &'instance AtomicWaker,
  pub(crate) recv_streams_num: &'instance mut u32,
}

impl<'instance, SR> ProcessReceiptFrameTy<'instance, SR>
where
  SR: StreamReader,
{
  pub(crate) async fn data(self, sorp: &mut Sorp) -> crate::Result<ArrayVectorU8<u8, 26>> {
    let Some(elem) = sorp.get_mut(&self.fi.stream_id) else {
      if self.fi.stream_id <= *self.last_stream_id {
        return Err(crate::Error::Http2ErrorGoAway(
          Http2ErrorCode::StreamClosed,
          Http2Error::UnknownDataStreamReceiver,
        ));
      }
      return Err(protocol_err(Http2Error::UnknownDataStreamReceiver));
    };
    if elem.stream_state.recv_eos() {
      return Err(crate::Error::Http2ErrorGoAway(
        Http2ErrorCode::StreamClosed,
        Http2Error::InvalidReceivedFrameAfterEos,
      ));
    }
    let local_body_len_opt = elem.body_len.checked_add(self.fi.data_len);
    let Some(local_body_len) = local_body_len_opt.filter(|el| *el <= self.hp.max_body_len()) else {
      return Err(protocol_err(Http2Error::LargeBodyLen(local_body_len_opt)));
    };
    elem.body_len = local_body_len;
    let (df, body_bytes) = DataFrame::read(self.rd.pfb.current(), self.fi)?;
    elem.rrb.body.extend_from_copyable_slice(body_bytes)?;
    elem.has_one_or_more_data_frames = true;
    if df.has_eos() {
      elem.stream_state = StreamState::HalfClosedRemote;
    }
    elem.waker.wake_by_ref();
    WindowsPair::new(self.conn_windows, &mut elem.windows).withdrawn_recv(
      self.hp,
      self.fi.stream_id,
      df.data_len(),
    )
  }

  pub(crate) async fn header_client(self, sorp: &mut Sorp) -> crate::Result<()> {
    let elem = sorp_mut(sorp, self.fi.stream_id)?;
    let has_eos = if elem.has_initial_header {
      read_header_and_continuations::<_, _, true, true>(
        self.fi,
        self.is_conn_open,
        self.hp,
        self.hpack_dec,
        self.rd,
        self.read_frame_waker,
        &mut elem.rrb,
        |_| Ok(()),
      )
      .await?
      .1
    } else {
      let (_, has_eos, status_code) = read_header_and_continuations::<_, _, true, false>(
        self.fi,
        self.is_conn_open,
        self.hp,
        self.hpack_dec,
        self.rd,
        self.read_frame_waker,
        &mut elem.rrb,
        |hf| hf.hsresh().status_code.ok_or_else(|| HttpError::MissingResponseStatusCode.into()),
      )
      .await?;
      elem.has_initial_header = true;
      elem.status_code = status_code;
      has_eos
    };
    if has_eos {
      elem.stream_state = StreamState::Closed;
      elem.waker.wake_by_ref();
    }
    Ok(())
  }

  pub(crate) async fn header_server_init(
    self,
    sorp: &mut Sorp,
  ) -> crate::Result<InitialServerStreamRemote> {
    if self.fi.stream_id <= *self.last_stream_id || self.fi.stream_id.u32().is_multiple_of(2) {
      return Err(protocol_err(Http2Error::UnexpectedStreamId));
    }
    if *self.recv_streams_num >= self.hp.max_recv_streams_num() {
      return Err(protocol_err(Http2Error::ExceedAmountOfOpenedStreams));
    }
    *self.recv_streams_num = self.recv_streams_num.wrapping_add(1);
    *self.last_stream_id = self.fi.stream_id;
    let mut rrb = ReqResBuffer::empty();
    let tuple = read_header_and_continuations::<_, _, false, false>(
      self.fi,
      self.is_conn_open,
      self.hp,
      self.hpack_dec,
      self.rd,
      self.read_frame_waker,
      &mut rrb,
      |hf| Ok((hf.hsreqh().method.ok_or(HttpError::MissingRequestMethod)?, hf.hsreqh().protocol)),
    )
    .await?;
    let (content_length, has_eos, (method, protocol)) = tuple;
    let stream_state = server_header_stream_state(has_eos);
    drop(sorp.insert(
      self.fi.stream_id,
      StreamOverallRecvParams {
        body_len: 0,
        content_length,
        has_initial_header: true,
        has_one_or_more_data_frames: false,
        is_stream_open: true,
        rrb,
        status_code: StatusCode::Ok,
        stream_state,
        waker: Waker::noop().clone(),
        windows: Windows::initial(self.hp, self.hps),
      },
    ));
    Ok(InitialServerStreamRemote { method, protocol, stream_id: self.fi.stream_id })
  }

  pub(crate) async fn header_server_trailer(
    self,
    sorp: &mut StreamOverallRecvParams,
  ) -> crate::Result<()> {
    if sorp.stream_state.recv_eos() {
      return Err(protocol_err(Http2Error::UnexpectedHeaderFrame));
    }
    let (_, has_eos, _) = read_header_and_continuations::<_, _, false, true>(
      self.fi,
      self.is_conn_open,
      self.hp,
      self.hpack_dec,
      self.rd,
      self.read_frame_waker,
      &mut sorp.rrb,
      |_| Ok(()),
    )
    .await?;
    sorp.stream_state = server_header_stream_state(has_eos);
    if has_eos {
      sorp.waker.wake_by_ref();
    }
    Ok(())
  }

  pub(crate) fn window_update(self, scrp: &mut Scrp, sorp: &mut Sorp) -> crate::Result<()> {
    if let Some(elem) = scrp.get_mut(&self.fi.stream_id) {
      self.do_window_update(&mut elem.windows, &elem.waker)?;
      return Ok(());
    };
    if let Some(elem) = sorp.get_mut(&self.fi.stream_id) {
      self.do_window_update(&mut elem.windows, &elem.waker)?;
      return Ok(());
    };
    Err(protocol_err(Http2Error::UnknownWindowUpdateStreamReceiver))
  }

  fn do_window_update(self, windows: &mut Windows, waker: &Waker) -> crate::Result<()> {
    let wuf = WindowUpdateFrame::read(self.rd.pfb.current(), self.fi)?;
    windows.send_mut().deposit(Some(self.fi.stream_id), wuf.size_increment().i32())?;
    waker.wake_by_ref();
    Ok(())
  }
}