use crate::webtransport::error::{WtError, WtResult};
pub type WtStreamId = u64;
pub mod stream_id {
use super::WtStreamId;
#[must_use]
pub const fn is_client_initiated(id: WtStreamId) -> bool {
id & 0x01 == 0
}
#[must_use]
pub const fn is_server_initiated(id: WtStreamId) -> bool {
id & 0x01 == 1
}
#[must_use]
pub const fn is_bidirectional(id: WtStreamId) -> bool {
id & 0x02 == 0
}
#[must_use]
pub const fn is_unidirectional(id: WtStreamId) -> bool {
id & 0x02 == 2
}
#[must_use]
pub const fn next(id: WtStreamId) -> WtStreamId {
id + 4
}
#[must_use]
pub const fn first(client: bool, bidirectional: bool) -> WtStreamId {
let initiator = if client { 0 } else { 1 };
let direction = if bidirectional { 0 } else { 2 };
initiator | direction
}
#[must_use]
pub const fn stream_type(id: WtStreamId) -> u8 {
(id & 0x03) as u8
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum SendState {
#[default]
Ready,
Send,
DataSent,
ResetSent,
DataRecvd,
ResetRecvd,
}
impl SendState {
#[must_use]
pub const fn can_send(&self) -> bool {
matches!(self, Self::Ready | Self::Send)
}
#[must_use]
pub const fn is_terminal(&self) -> bool {
matches!(self, Self::DataRecvd | Self::ResetRecvd)
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum RecvState {
#[default]
Recv,
SizeKnown,
DataRecvd,
ResetRecvd,
DataRead,
ResetRead,
}
impl RecvState {
#[must_use]
pub const fn can_recv(&self) -> bool {
matches!(self, Self::Recv | Self::SizeKnown)
}
#[must_use]
pub const fn is_terminal(&self) -> bool {
matches!(self, Self::DataRead | Self::ResetRead)
}
}
#[derive(Debug)]
pub struct WtStream {
id: WtStreamId,
bidirectional: bool,
send_state: SendState,
recv_state: RecvState,
send_offset: u64,
recv_offset: u64,
send_max: u64,
recv_max: u64,
stop_sending_sent: bool,
stop_sending_received: bool,
has_received_data: bool,
}
impl WtStream {
#[must_use]
pub fn new(id: WtStreamId, initial_max_data: u64, bidirectional: bool) -> Self {
Self {
id,
bidirectional,
send_state: SendState::Ready,
recv_state: RecvState::Recv,
send_offset: 0,
recv_offset: 0,
send_max: initial_max_data,
recv_max: initial_max_data,
stop_sending_sent: false,
stop_sending_received: false,
has_received_data: false,
}
}
#[must_use]
pub const fn id(&self) -> WtStreamId {
self.id
}
#[must_use]
pub const fn is_bidirectional(&self) -> bool {
self.bidirectional
}
#[must_use]
pub const fn send_state(&self) -> SendState {
self.send_state
}
#[must_use]
pub const fn recv_state(&self) -> RecvState {
self.recv_state
}
#[must_use]
pub const fn can_send(&self) -> bool {
self.send_state.can_send()
}
#[must_use]
pub const fn can_recv(&self) -> bool {
self.recv_state.can_recv()
}
#[must_use]
pub const fn send_offset(&self) -> u64 {
self.send_offset
}
#[must_use]
pub const fn recv_offset(&self) -> u64 {
self.recv_offset
}
#[must_use]
pub fn send_available(&self) -> u64 {
self.send_max.saturating_sub(self.send_offset)
}
#[must_use]
pub fn recv_available(&self) -> u64 {
self.recv_max.saturating_sub(self.recv_offset)
}
#[must_use]
pub const fn send_max(&self) -> u64 {
self.send_max
}
#[must_use]
pub const fn recv_max(&self) -> u64 {
self.recv_max
}
pub fn send_data(&mut self, size: u64, fin: bool) -> WtResult<()> {
if !self.send_state.can_send() {
return Err(WtError::stream_state_error("cannot send in current state"));
}
let new_offset = self.send_offset.saturating_add(size);
if new_offset > self.send_max {
return Err(WtError::flow_control_error("stream send limit exceeded"));
}
self.send_offset = new_offset;
self.send_state = SendState::Send;
if fin {
self.send_state = SendState::DataSent;
}
Ok(())
}
pub fn send_reset(&mut self) {
self.send_state = SendState::ResetSent;
}
pub fn recv_data(&mut self, size: u64, fin: bool) -> WtResult<()> {
if !self.recv_state.can_recv() {
return Err(WtError::stream_state_error(
"cannot receive in current state",
));
}
let new_offset = self.recv_offset.saturating_add(size);
if new_offset > self.recv_max {
return Err(WtError::flow_control_error("stream recv limit exceeded"));
}
self.recv_offset = new_offset;
if fin {
self.recv_state = RecvState::SizeKnown;
}
Ok(())
}
pub fn recv_reset(&mut self) {
self.recv_state = RecvState::ResetRecvd;
}
pub fn update_send_max(&mut self, maximum: u64) -> WtResult<()> {
if maximum < self.send_max {
return Err(WtError::flow_control_error(
"WT_MAX_STREAM_DATA value decreased",
));
}
self.send_max = maximum;
Ok(())
}
pub fn update_recv_max(&mut self, maximum: u64) {
if maximum > self.recv_max {
self.recv_max = maximum;
}
}
#[must_use]
pub const fn is_closed(&self) -> bool {
self.send_state.is_terminal() && self.recv_state.is_terminal()
}
#[must_use]
pub const fn stop_sending_sent(&self) -> bool {
self.stop_sending_sent
}
#[must_use]
pub const fn stop_sending_received(&self) -> bool {
self.stop_sending_received
}
pub fn set_stop_sending_sent(&mut self) {
self.stop_sending_sent = true;
}
pub fn set_stop_sending_received(&mut self) {
self.stop_sending_received = true;
}
#[must_use]
pub const fn has_received_data(&self) -> bool {
self.has_received_data
}
pub fn set_has_received_data(&mut self) {
self.has_received_data = true;
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_stream_id_client_bidi() {
let id = stream_id::first(true, true);
assert_eq!(id, 0);
assert!(stream_id::is_client_initiated(id));
assert!(stream_id::is_bidirectional(id));
let next = stream_id::next(id);
assert_eq!(next, 4);
assert!(stream_id::is_client_initiated(next));
assert!(stream_id::is_bidirectional(next));
}
#[test]
fn test_stream_id_server_bidi() {
let id = stream_id::first(false, true);
assert_eq!(id, 1);
assert!(stream_id::is_server_initiated(id));
assert!(stream_id::is_bidirectional(id));
}
#[test]
fn test_stream_id_client_uni() {
let id = stream_id::first(true, false);
assert_eq!(id, 2);
assert!(stream_id::is_client_initiated(id));
assert!(stream_id::is_unidirectional(id));
}
#[test]
fn test_stream_id_server_uni() {
let id = stream_id::first(false, false);
assert_eq!(id, 3);
assert!(stream_id::is_server_initiated(id));
assert!(stream_id::is_unidirectional(id));
}
#[test]
fn test_stream_creation() {
let stream = WtStream::new(0, 65536, true);
assert_eq!(stream.id(), 0);
assert!(stream.is_bidirectional());
assert_eq!(stream.send_state(), SendState::Ready);
assert_eq!(stream.recv_state(), RecvState::Recv);
assert!(stream.can_send());
assert!(stream.can_recv());
}
#[test]
fn test_send_data() {
let mut stream = WtStream::new(0, 65536, true);
stream.send_data(100, false).unwrap();
assert_eq!(stream.send_state(), SendState::Send);
assert_eq!(stream.send_offset(), 100);
assert!(stream.can_send());
stream.send_data(100, true).unwrap();
assert_eq!(stream.send_state(), SendState::DataSent);
assert_eq!(stream.send_offset(), 200);
assert!(!stream.can_send());
}
#[test]
fn test_recv_data() {
let mut stream = WtStream::new(0, 65536, true);
stream.recv_data(100, false).unwrap();
assert_eq!(stream.recv_state(), RecvState::Recv);
assert_eq!(stream.recv_offset(), 100);
assert!(stream.can_recv());
stream.recv_data(100, true).unwrap();
assert_eq!(stream.recv_state(), RecvState::SizeKnown);
assert_eq!(stream.recv_offset(), 200);
}
#[test]
fn test_send_reset() {
let mut stream = WtStream::new(0, 65536, true);
stream.send_data(100, false).unwrap();
stream.send_reset();
assert_eq!(stream.send_state(), SendState::ResetSent);
assert!(!stream.can_send());
}
#[test]
fn test_recv_reset() {
let mut stream = WtStream::new(0, 65536, true);
stream.recv_data(100, false).unwrap();
stream.recv_reset();
assert_eq!(stream.recv_state(), RecvState::ResetRecvd);
assert!(!stream.can_recv());
}
#[test]
fn test_update_send_max() {
let mut stream = WtStream::new(0, 65536, true);
assert_eq!(stream.send_available(), 65536);
stream.update_send_max(131072).unwrap();
assert_eq!(stream.send_available(), 131072);
assert!(stream.update_send_max(32768).is_err());
}
}