wtx 0.28.0

A collection of different transport implementations and related tools focused primarily on web technologies.
macro_rules! prft {
  ($fi:expr, $hdpm:ident, $pfb:expr, $stream_reader:expr) => {
    ProcessReceiptFrameTy {
      conn_windows: &mut $hdpm.windows,
      fi: $fi,
      hp: &mut $hdpm.hp,
      hpack_dec: &mut $hdpm.hb.hpack_dec,
      hps: &mut $hdpm.hps,
      is_conn_open: &$hdpm.hb.is_conn_open,
      last_stream_id: &mut $hdpm.last_stream_id,
      pfb: $pfb,
      read_frame_waker: &$hdpm.hb.read_frame_waker,
      recv_streams_num: &mut $hdpm.recv_streams_num,
      stream_reader: $stream_reader,
      stream_writer: &mut $hdpm.stream_writer,
      uri_buffer: &mut $hdpm.hb.uri_buffer,
    }
  };
}

use crate::{
  http2::{
    Http2Buffer, Http2Data, Http2Error,
    frame_init::{FrameInit, FrameInitTy},
    go_away_frame::GoAwayFrame,
    misc::{process_higher_operation_err, protocol_err, read_frame, send_go_away, write_array},
    ping_frame::PingFrame,
    process_receipt_frame_ty::ProcessReceiptFrameTy,
    settings_frame::SettingsFrame,
    window_update_frame::WindowUpdateFrame,
  },
  misc::{
    Arc, AtomicWaker, LeaseMut, Lock, RefCounter, StreamReader, StreamWriter,
    partitioned_filled_buffer::PartitionedFilledBuffer,
  },
};
use core::{
  future::{Future, poll_fn},
  mem,
  pin::pin,
  sync::atomic::AtomicBool,
  task::{Poll, ready},
};

pub(crate) async fn frame_reader<HB, HD, SR, SW, const IS_CLIENT: bool>(
  hd: HD,
  is_conn_open: Arc<AtomicBool>,
  max_frame_len: u32,
  mut pfb: PartitionedFilledBuffer,
  read_frame_waker: Arc<AtomicWaker>,
  mut stream_reader: SR,
) where
  HB: LeaseMut<Http2Buffer>,
  HD: RefCounter,
  HD::Item: Lock<Resource = Http2Data<HB, SW, IS_CLIENT>>,
  SR: StreamReader,
  SW: StreamWriter,
{
  let span = _trace_span!("Starting the reading of frames");
  let _e = span._enter();
  loop {
    let fi = match read_frame::<_, false>(
      &is_conn_open,
      max_frame_len,
      &mut pfb,
      &read_frame_waker,
      &mut stream_reader,
    )
    .await
    {
      Err(err) => {
        process_higher_operation_err(&err, &hd).await;
        finish(Some(err), &hd, &mut pfb).await;
        return;
      }
      Ok(None) => {
        finish(None, &hd, &mut pfb).await;
        return;
      }
      Ok(Some(fi)) => fi,
    };
    if let Err(err) = manage_fi(fi, &hd, &is_conn_open, &mut pfb, &mut stream_reader).await {
      process_higher_operation_err(&err, &hd).await;
      finish(Some(err), &hd, &mut pfb).await;
    }
  }
}

#[inline]
async fn finish<HB, HD, SW, const IS_CLIENT: bool>(
  err: Option<crate::Error>,
  hd: &HD,
  pfb: &mut PartitionedFilledBuffer,
) where
  HB: LeaseMut<Http2Buffer>,
  HD: RefCounter,
  HD::Item: Lock<Resource = Http2Data<HB, SW, IS_CLIENT>>,
  SW: StreamWriter,
{
  let mut lock = hd.lock().await;
  let hdpm = lock.parts_mut();
  if let Some(elem) = err {
    *hdpm.frame_reader_error = Some(elem);
  }
  mem::swap(pfb, &mut hdpm.hb.pfb);
  _trace!("Finishing the reading of frames");
}

#[inline]
async fn manage_fi<HB, HD, SR, SW, const IS_CLIENT: bool>(
  fi: FrameInit,
  hd: &HD,
  is_conn_open: &AtomicBool,
  pfb: &mut PartitionedFilledBuffer,
  stream_reader: &mut SR,
) -> crate::Result<()>
where
  HB: LeaseMut<Http2Buffer>,
  HD: RefCounter,
  HD::Item: Lock<Resource = Http2Data<HB, SW, IS_CLIENT>>,
  SR: StreamReader,
  SW: StreamWriter,
{
  match fi.ty {
    FrameInitTy::Continuation => {
      return Err(protocol_err(Http2Error::InvalidContinuationFrame));
    }
    FrameInitTy::Data => {
      let mut lock = hd.lock().await;
      let mut hdpm = lock.parts_mut();
      prft!(fi, hdpm, pfb, stream_reader).data(&mut hdpm.hb.sorp).await?;
    }
    FrameInitTy::GoAway => {
      let gaf = GoAwayFrame::read(pfb._current(), fi)?;
      send_go_away(gaf.error_code(), &mut hd.lock().await.parts_mut()).await;
    }
    FrameInitTy::Headers => {
      let mut lock = hd.lock().await;
      let mut hdpm = lock.parts_mut();
      if hdpm.hb.scrp.contains_key(&fi.stream_id) {
        return Err(protocol_err(Http2Error::UnexpectedNonControlFrame));
      }
      if IS_CLIENT {
        prft!(fi, hdpm, pfb, stream_reader).header_client(&mut hdpm.hb.sorp).await?;
      } else if let Some(elem) = hdpm.hb.sorp.get_mut(&fi.stream_id) {
        prft!(fi, hdpm, pfb, stream_reader).header_server_trailer(elem).await?;
      } else if let Some(ish) = hdpm.hb.initial_server_headers.front_mut() {
        let prft = prft!(fi, hdpm, pfb, stream_reader);
        let rslt = prft.header_server_init(ish, &mut hdpm.hb.sorp).await;
        ish.waker.wake_by_ref();
        hdpm.hb.initial_server_headers.increase_cursor();
        rslt?;
      } else {
        drop(lock);
        let mut lock_pin = pin!(hd.lock());
        poll_fn(|cx| {
          let mut local_lock = lock_pin!(cx, hd, lock_pin);
          let mut local_hdpm = local_lock.parts_mut();
          let Some(ish) = local_hdpm.hb.initial_server_headers.front_mut() else {
            cx.waker().wake_by_ref();
            return Poll::Pending;
          };
          let prft = prft!(fi, local_hdpm, pfb, stream_reader);
          let poll = pin!(prft.header_server_init(ish, &mut local_hdpm.hb.sorp)).poll(cx);
          let rslt = ready!(poll);
          ish.waker.wake_by_ref();
          local_hdpm.hb.initial_server_headers.increase_cursor();
          Poll::Ready(rslt)
        })
        .await?;
      }
    }
    FrameInitTy::Ping => {
      let mut pf = PingFrame::read(pfb._current(), fi)?;
      if !pf.has_ack() {
        pf.set_ack();
        write_array([&pf.bytes()], is_conn_open, hd.lock().await.parts_mut().stream_writer).await?;
      }
    }
    FrameInitTy::Reset => {
      let mut lock = hd.lock().await;
      let mut hdpm = lock.parts_mut();
      let prft = prft!(fi, hdpm, pfb, stream_reader);
      prft.reset(&mut hdpm.hb.scrp, &mut hdpm.hb.sorp).await?;
    }
    FrameInitTy::Settings => {
      let sf = SettingsFrame::read(pfb._current(), fi)?;
      if !sf.has_ack() {
        let mut lock = hd.lock().await;
        let hdpm = lock.parts_mut();
        hdpm.hps.update(&mut hdpm.hb.hpack_enc, &mut hdpm.hb.scrp, &sf, &mut hdpm.hb.sorp)?;
        let array = &mut [0; 45];
        write_array(
          [SettingsFrame::ack().bytes(array)],
          is_conn_open,
          lock.parts_mut().stream_writer,
        )
        .await?;
      }
    }
    FrameInitTy::WindowUpdate => {
      if fi.stream_id.is_zero() {
        let wuf = WindowUpdateFrame::read(pfb._current(), fi)?;
        hd.lock().await.parts_mut().windows.send_mut().deposit(None, wuf.size_increment().i32())?;
      } else {
        let mut lock = hd.lock().await;
        let mut hdpm = lock.parts_mut();
        let prft = prft!(fi, hdpm, pfb, stream_reader);
        prft.window_update(&mut hdpm.hb.scrp, &mut hdpm.hb.sorp)?;
      }
    }
  }
  Ok(())
}