mod control;
pub mod request;
pub(crate) use control::{ControlStreamRecv, ControlStreamSend};
pub use request::RequestStream;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[repr(u64)]
pub enum UniStreamType {
Control = 0x00,
Push = 0x01,
QpackEncoder = 0x02,
QpackDecoder = 0x03,
WebTransport = 0x54,
}
impl UniStreamType {
pub fn from_type(t: u64) -> Option<Self> {
match t {
0x00 => Some(Self::Control),
0x01 => Some(Self::Push),
0x02 => Some(Self::QpackEncoder),
0x03 => Some(Self::QpackDecoder),
0x54 => Some(Self::WebTransport),
_ => None,
}
}
pub fn is_reserved(t: u64) -> bool {
t >= 0x21 && (t - 0x21).is_multiple_of(0x1f)
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum StreamKind {
ClientBidi,
ServerBidi,
ClientUni,
ServerUni,
}
impl StreamKind {
pub fn from_stream_id(stream_id: u64) -> Self {
match stream_id & 0x03 {
0x00 => Self::ClientBidi,
0x01 => Self::ServerBidi,
0x02 => Self::ClientUni,
0x03 => Self::ServerUni,
_ => unreachable!(),
}
}
pub fn is_bidirectional(self) -> bool {
matches!(self, Self::ClientBidi | Self::ServerBidi)
}
pub fn is_unidirectional(self) -> bool {
matches!(self, Self::ClientUni | Self::ServerUni)
}
pub fn is_client_initiated(self) -> bool {
matches!(self, Self::ClientBidi | Self::ClientUni)
}
pub fn is_server_initiated(self) -> bool {
matches!(self, Self::ServerBidi | Self::ServerUni)
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum StreamState {
#[default]
Open,
LocalClosed,
RemoteClosed,
Closed,
Reset,
}
impl StreamState {
pub fn close_local(&mut self) {
*self = match *self {
Self::Open => Self::LocalClosed,
Self::RemoteClosed => Self::Closed,
other => other,
};
}
pub fn close_remote(&mut self) {
*self = match *self {
Self::Open => Self::RemoteClosed,
Self::LocalClosed => Self::Closed,
other => other,
};
}
pub fn reset(&mut self) {
*self = Self::Reset;
}
pub fn is_reset(self) -> bool {
matches!(self, Self::Reset)
}
pub fn can_send(self) -> bool {
matches!(self, Self::Open | Self::RemoteClosed)
}
pub fn can_receive(self) -> bool {
matches!(self, Self::Open | Self::LocalClosed)
}
}
#[derive(Debug, Default)]
pub struct SendBuffer {
data: Vec<u8>,
consumed: usize,
fin: bool,
fin_sent: bool,
}
impl SendBuffer {
pub fn new() -> Self {
Self::default()
}
pub fn push(&mut self, data: &[u8]) {
self.data.extend_from_slice(data);
}
pub fn set_fin(&mut self) {
self.fin = true;
}
pub fn is_fin(&self) -> bool {
self.fin
}
pub fn peek(&self) -> &[u8] {
&self.data[self.consumed..]
}
pub fn consume(&mut self, len: usize) {
self.consumed += len;
if self.consumed >= self.data.len() / 2 {
self.data.drain(..self.consumed);
self.consumed = 0;
}
}
pub fn has_pending(&self) -> bool {
self.consumed < self.data.len() || (self.fin && !self.fin_sent)
}
pub fn mark_fin_sent(&mut self) {
self.fin_sent = true;
}
pub fn is_complete(&self) -> bool {
self.consumed >= self.data.len() && self.fin && self.fin_sent
}
}
#[derive(Debug, Default)]
pub struct RecvBuffer {
data: Vec<u8>,
consumed: usize,
fin: bool,
}
impl RecvBuffer {
pub fn new() -> Self {
Self::default()
}
pub fn push(&mut self, data: &[u8]) {
self.data.extend_from_slice(data);
}
pub fn set_fin(&mut self) {
self.fin = true;
}
pub fn is_fin(&self) -> bool {
self.fin
}
pub fn peek(&self) -> &[u8] {
&self.data[self.consumed..]
}
pub fn consume(&mut self, len: usize) {
self.consumed += len;
if self.consumed >= self.data.len() / 2 {
self.data.drain(..self.consumed);
self.consumed = 0;
}
}
pub fn has_data(&self) -> bool {
self.consumed < self.data.len()
}
pub fn is_complete(&self) -> bool {
!self.has_data() && self.fin
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_uni_stream_type() {
assert_eq!(UniStreamType::from_type(0x00), Some(UniStreamType::Control));
assert_eq!(UniStreamType::from_type(0x01), Some(UniStreamType::Push));
assert_eq!(
UniStreamType::from_type(0x02),
Some(UniStreamType::QpackEncoder)
);
assert_eq!(
UniStreamType::from_type(0x03),
Some(UniStreamType::QpackDecoder)
);
assert_eq!(
UniStreamType::from_type(0x54),
Some(UniStreamType::WebTransport)
);
assert_eq!(UniStreamType::from_type(0x99), None);
}
#[test]
fn test_uni_stream_type_reserved() {
assert!(UniStreamType::is_reserved(0x21)); assert!(UniStreamType::is_reserved(0x40)); assert!(!UniStreamType::is_reserved(0x00));
assert!(!UniStreamType::is_reserved(0x20));
}
#[test]
fn test_stream_kind() {
assert_eq!(StreamKind::from_stream_id(0), StreamKind::ClientBidi);
assert_eq!(StreamKind::from_stream_id(1), StreamKind::ServerBidi);
assert_eq!(StreamKind::from_stream_id(2), StreamKind::ClientUni);
assert_eq!(StreamKind::from_stream_id(3), StreamKind::ServerUni);
assert_eq!(StreamKind::from_stream_id(4), StreamKind::ClientBidi);
}
#[test]
fn test_stream_kind_properties() {
assert!(StreamKind::ClientBidi.is_bidirectional());
assert!(!StreamKind::ClientBidi.is_unidirectional());
assert!(StreamKind::ClientBidi.is_client_initiated());
assert!(StreamKind::ServerUni.is_unidirectional());
assert!(!StreamKind::ServerUni.is_bidirectional());
assert!(StreamKind::ServerUni.is_server_initiated());
}
#[test]
fn test_stream_state() {
let mut state = StreamState::Open;
assert!(state.can_send());
assert!(state.can_receive());
state.close_local();
assert_eq!(state, StreamState::LocalClosed);
assert!(!state.can_send());
assert!(state.can_receive());
state.close_remote();
assert_eq!(state, StreamState::Closed);
assert!(!state.can_send());
assert!(!state.can_receive());
}
#[test]
fn test_send_buffer() {
let mut buf = SendBuffer::new();
assert!(!buf.has_pending());
buf.push(b"hello");
assert!(buf.has_pending());
assert_eq!(buf.peek(), b"hello");
buf.consume(3);
assert_eq!(buf.peek(), b"lo");
buf.set_fin();
assert!(buf.is_fin());
assert!(!buf.is_complete());
buf.consume(2);
assert!(!buf.is_complete());
assert!(buf.has_pending());
buf.mark_fin_sent();
assert!(buf.is_complete());
assert!(!buf.has_pending());
}
#[test]
fn test_recv_buffer() {
let mut buf = RecvBuffer::new();
assert!(!buf.has_data());
buf.push(b"world");
assert!(buf.has_data());
assert_eq!(buf.peek(), b"world");
buf.consume(2);
assert_eq!(buf.peek(), b"rld");
buf.set_fin();
assert!(buf.is_fin());
assert!(!buf.is_complete());
buf.consume(3);
assert!(buf.is_complete());
}
}