use std::{
collections::{HashMap, HashSet, VecDeque},
fmt,
};
use fluke_buffet::{Piece, PieceCore};
use tokio::sync::Notify;
use crate::Response;
use super::body::StreamIncoming;
use fluke_h2_parse::{FrameType, KnownErrorCode, Settings, StreamId};
pub(crate) struct ConnState {
pub(crate) streams: HashMap<StreamId, StreamState>,
pub(crate) last_stream_id: StreamId,
pub(crate) self_settings: Settings,
pub(crate) peer_settings: Settings,
pub(crate) send_data_maybe: Notify,
pub(crate) streams_with_pending_data: HashSet<StreamId>,
pub(crate) incoming_capacity: i64,
pub(crate) outgoing_capacity: i64,
}
impl Default for ConnState {
fn default() -> Self {
let mut s = Self {
streams: Default::default(),
last_stream_id: StreamId(0),
self_settings: Default::default(),
peer_settings: Default::default(),
send_data_maybe: Default::default(),
streams_with_pending_data: Default::default(),
incoming_capacity: 0,
outgoing_capacity: 0,
};
s.incoming_capacity = s.self_settings.initial_window_size as _;
s.outgoing_capacity = s.peer_settings.initial_window_size as _;
s
}
}
impl ConnState {
pub(crate) fn mk_stream_outgoing(&self) -> StreamOutgoing {
StreamOutgoing {
headers: HeadersOutgoing::WaitingForHeaders,
body: BodyOutgoing::StillReceiving(Default::default()),
capacity: self.peer_settings.initial_window_size as _,
}
}
}
#[derive(Default)]
pub(crate) enum StreamState {
Open {
incoming: StreamIncoming,
outgoing: StreamOutgoing,
},
HalfClosedRemote {
outgoing: StreamOutgoing,
},
HalfClosedLocal {
incoming: StreamIncoming,
},
#[default]
Transition,
}
impl StreamState {
pub(crate) fn outgoing_mut(&mut self) -> Option<&mut StreamOutgoing> {
match self {
StreamState::Open { outgoing, .. } => Some(outgoing),
StreamState::HalfClosedRemote { outgoing, .. } => Some(outgoing),
_ => None,
}
}
}
pub(crate) struct StreamOutgoing {
pub(crate) headers: HeadersOutgoing,
pub(crate) body: BodyOutgoing,
pub(crate) capacity: i64,
}
#[derive(Default)]
pub(crate) enum HeadersOutgoing {
WaitingForHeaders,
WroteNone(Piece),
WroteSome(Piece),
#[default]
WroteAll,
}
impl HeadersOutgoing {
#[inline(always)]
pub(crate) fn has_more_to_write(&self) -> bool {
match self {
HeadersOutgoing::WaitingForHeaders => true,
HeadersOutgoing::WroteNone(_) => true,
HeadersOutgoing::WroteSome(_) => true,
HeadersOutgoing::WroteAll => false,
}
}
#[inline(always)]
pub(crate) fn take_piece(&mut self) -> Piece {
match std::mem::take(self) {
Self::WroteNone(piece) => piece,
Self::WroteSome(piece) => piece,
_ => Piece::empty(),
}
}
}
pub(crate) enum BodyOutgoing {
StillReceiving(VecDeque<Piece>),
DoneReceiving(VecDeque<Piece>),
DoneSending,
}
impl fmt::Debug for BodyOutgoing {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
BodyOutgoing::StillReceiving(pieces) => f
.debug_tuple("BodyOutgoing::StillReceiving")
.field(&pieces.len())
.finish(),
BodyOutgoing::DoneReceiving(pieces) => f
.debug_tuple("BodyOutgoing::DoneReceiving")
.field(&pieces.len())
.finish(),
BodyOutgoing::DoneSending => f.debug_tuple("BodyOutgoing::DoneSending").finish(),
}
}
}
impl BodyOutgoing {
#[inline(always)]
pub(crate) fn might_receive_more(&self) -> bool {
match self {
BodyOutgoing::StillReceiving(_) => true,
BodyOutgoing::DoneReceiving(_) => true,
BodyOutgoing::DoneSending => false,
}
}
#[inline(always)]
pub(crate) fn has_more_to_write(&self) -> bool {
match self {
BodyOutgoing::StillReceiving(_) => true,
BodyOutgoing::DoneReceiving(_) => true,
BodyOutgoing::DoneSending => false,
}
}
#[inline(always)]
pub(crate) fn pop_front(&mut self) -> Option<Piece> {
match self {
BodyOutgoing::StillReceiving(pieces) => pieces.pop_front(),
BodyOutgoing::DoneReceiving(pieces) => {
let piece = pieces.pop_front();
if pieces.is_empty() {
*self = BodyOutgoing::DoneSending;
}
piece
}
BodyOutgoing::DoneSending => None,
}
}
#[inline(always)]
pub(crate) fn push_front(&mut self, piece: Piece) {
match self {
BodyOutgoing::StillReceiving(pieces) => pieces.push_front(piece),
BodyOutgoing::DoneReceiving(pieces) => pieces.push_front(piece),
BodyOutgoing::DoneSending => {
*self = BodyOutgoing::DoneReceiving([piece].into());
}
}
}
#[inline(always)]
pub(crate) fn push_back(&mut self, piece: Piece) {
match self {
BodyOutgoing::StillReceiving(pieces) => pieces.push_back(piece),
BodyOutgoing::DoneReceiving(pieces) => pieces.push_back(piece),
BodyOutgoing::DoneSending => {
unreachable!("received a piece after we were done sending")
}
}
}
}
#[derive(Debug, thiserror::Error)]
pub(crate) enum H2ConnectionError {
#[error("frame too large: {frame_type:?} frame of size {frame_size} exceeds max frame size of {max_frame_size}")]
FrameTooLarge {
frame_type: FrameType,
frame_size: u32,
max_frame_size: u32,
},
#[error("remote hung up while reading payload of {frame_type:?} with length {frame_size}")]
IncompleteFrame {
frame_type: FrameType,
frame_size: u32,
},
#[error("headers frame had invalid priority: stream {stream_id} depends on itself")]
HeadersInvalidPriority { stream_id: StreamId },
#[error("client tried to initiate an even-numbered stream")]
ClientSidShouldBeOdd,
#[error("client stream IDs should be numerically increasing")]
ClientSidShouldBeNumericallyIncreasing {
stream_id: StreamId,
last_stream_id: StreamId,
},
#[error("received {frame_type:?} frame with Padded flag but empty payload")]
PaddedFrameEmpty { frame_type: FrameType },
#[error("received {frame_type:?} with Padded flag but payload was shorter than padding")]
PaddedFrameTooShort {
frame_type: FrameType,
padding_length: usize,
frame_size: u32,
},
#[error("on stream {stream_id}, expected continuation frame, but got {frame_type:?}")]
ExpectedContinuationFrame {
stream_id: StreamId,
frame_type: Option<FrameType>,
},
#[error("expected continuation from for stream {stream_id}, but got continuation for stream {continuation_stream_id}")]
ExpectedContinuationForStream {
stream_id: StreamId,
continuation_stream_id: StreamId,
},
#[error("on stream {stream_id}, received unexpected continuation frame")]
UnexpectedContinuationFrame { stream_id: StreamId },
#[error("compression error: {0:?}")]
CompressionError(String),
#[error("client sent a push promise frame, clients aren't allowed to do that, cf. RFC9113 section 8.4")]
ClientSentPushPromise,
#[error("received window update for unknown/closed stream {stream_id}")]
WindowUpdateForUnknownOrClosedStream { stream_id: StreamId },
#[error("other error: {0:?}")]
Internal(#[from] eyre::Report),
#[error("error reading/parsing H2 frame: {0:?}")]
ReadError(eyre::Report),
#[error("error writing H2 frame: {0:?}")]
WriteError(std::io::Error),
#[error("received rst frame for unknown stream")]
RstStreamForUnknownStream { stream_id: StreamId },
#[error("received frame for closed stream {stream_id}")]
StreamClosed { stream_id: StreamId },
#[error("received ping frame frame with non-zero stream id")]
PingFrameWithNonZeroStreamId { stream_id: StreamId },
#[error("received ping frame with invalid length {len}")]
PingFrameInvalidLength { len: u32 },
#[error("received settings frame with invalid length {len}")]
SettingsAckWithPayload { len: u32 },
#[error("received settings frame with non-zero stream id")]
SettingsWithNonZeroStreamId { stream_id: StreamId },
#[error("received goaway frame with non-zero stream id")]
GoAwayWithNonZeroStreamId { stream_id: StreamId },
#[error("zero increment in window update frame for stream")]
WindowUpdateZeroIncrement,
#[error("received window update that made the window size overflow")]
WindowUpdateOverflow,
#[error("received frame that would cause the window size to underflow")]
WindowUnderflow { stream_id: StreamId },
#[error("received initial window size settings update that made the connection window size overflow")]
StreamWindowSizeOverflowDueToSettings { stream_id: StreamId },
#[error("received window update frame with invalid length {len}")]
WindowUpdateInvalidLength { len: usize },
}
impl H2ConnectionError {
pub(crate) fn as_known_error_code(&self) -> KnownErrorCode {
match self {
H2ConnectionError::FrameTooLarge { .. } => KnownErrorCode::FrameSizeError,
H2ConnectionError::PaddedFrameEmpty { .. } => KnownErrorCode::FrameSizeError,
H2ConnectionError::PaddedFrameTooShort { .. } => KnownErrorCode::FrameSizeError,
H2ConnectionError::PingFrameInvalidLength { .. } => KnownErrorCode::FrameSizeError,
H2ConnectionError::SettingsAckWithPayload { .. } => KnownErrorCode::FrameSizeError,
H2ConnectionError::WindowUpdateInvalidLength { .. } => KnownErrorCode::FrameSizeError,
H2ConnectionError::WindowUpdateOverflow => KnownErrorCode::FlowControlError,
H2ConnectionError::WindowUnderflow { .. } => KnownErrorCode::FlowControlError,
H2ConnectionError::StreamWindowSizeOverflowDueToSettings { .. } => {
KnownErrorCode::FlowControlError
}
H2ConnectionError::CompressionError(_) => KnownErrorCode::CompressionError,
H2ConnectionError::StreamClosed { .. } => KnownErrorCode::StreamClosed,
H2ConnectionError::Internal(_) => KnownErrorCode::InternalError,
_ => KnownErrorCode::ProtocolError,
}
}
}
#[derive(Debug, thiserror::Error)]
pub(crate) enum H2StreamError {
#[allow(dead_code)]
#[error("received {data_length} bytes in data frames but content-length announced {content_length} bytes")]
DataLengthDoesNotMatchContentLength {
data_length: u64,
content_length: u64,
},
#[error("refused stream (would exceed max concurrent streams)")]
RefusedStream,
#[error("trailers must have EndStream flag set")]
TrailersNotEndStream,
#[error("received RST_STREAM frame")]
ReceivedRstStream,
#[error("received PRIORITY frame with invalid size")]
InvalidPriorityFrameSize { frame_size: u32 },
#[error("stream closed")]
StreamClosed,
#[error("received RST_STREAM frame with invalid size, expected 4 got {frame_size}")]
InvalidRstStreamFrameSize { frame_size: u32 },
#[error("received WINDOW_UPDATE that made the window size overflow")]
WindowUpdateOverflow,
}
impl H2StreamError {
pub(crate) fn as_known_error_code(&self) -> KnownErrorCode {
use H2StreamError::*;
use KnownErrorCode as Code;
match self {
StreamClosed => Code::StreamClosed,
RefusedStream => Code::RefusedStream,
InvalidPriorityFrameSize { .. } => Code::FrameSizeError,
InvalidRstStreamFrameSize { .. } => Code::FrameSizeError,
WindowUpdateOverflow => Code::FlowControlError,
_ => Code::ProtocolError,
}
}
}
#[derive(Debug, Clone, Copy)]
pub(crate) enum HeadersOrTrailers {
Headers,
Trailers,
}
#[derive(Debug)]
pub(crate) struct H2Event {
pub(crate) stream_id: StreamId,
pub(crate) payload: H2EventPayload,
}
pub(crate) enum H2EventPayload {
Headers(Response),
BodyChunk(PieceCore),
BodyEnd,
}
impl fmt::Debug for H2EventPayload {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Headers(_) => f.debug_tuple("Headers").finish(),
Self::BodyChunk(_) => f.debug_tuple("BodyChunk").finish(),
Self::BodyEnd => write!(f, "BodyEnd"),
}
}
}