use super::H2ErrorCode;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub(super) enum StreamLifecycle {
#[default]
Idle,
Open,
HalfClosedLocal,
HalfClosedRemote,
Closed { reason: CloseReason },
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(super) enum CloseReason {
EndStream,
Reset(H2ErrorCode),
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(super) enum StreamEvent {
SendHeaders { end_stream: bool },
RecvHeaders { end_stream: bool },
SendData { end_stream: bool },
RecvData { end_stream: bool },
SendReset(H2ErrorCode),
RecvReset(H2ErrorCode),
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(super) struct StreamProtocolError {
pub(super) level: ErrorLevel,
pub(super) code: H2ErrorCode,
}
impl StreamProtocolError {
fn new(level: ErrorLevel, code: H2ErrorCode) -> Self {
Self { level, code }
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(super) enum ErrorLevel {
Stream,
Connection,
}
impl StreamLifecycle {
pub(super) fn is_closed(self) -> bool {
matches!(self, Self::Closed { .. })
}
pub(super) fn is_reset(self) -> bool {
matches!(
self,
Self::Closed {
reason: CloseReason::Reset(_)
}
)
}
pub(super) fn recv_closed(self) -> bool {
matches!(self, Self::HalfClosedRemote | Self::Closed { .. })
}
pub(super) fn send_closed(self) -> bool {
matches!(self, Self::HalfClosedLocal | Self::Closed { .. })
}
pub(super) fn on_event(&mut self, ev: StreamEvent) -> Result<(), StreamProtocolError> {
use CloseReason::{EndStream, Reset};
use ErrorLevel::{Connection, Stream};
use StreamEvent::{RecvData, RecvHeaders, RecvReset, SendData, SendHeaders, SendReset};
use StreamLifecycle::{Closed, HalfClosedLocal, HalfClosedRemote, Idle, Open};
if let SendReset(code) | RecvReset(code) = ev {
if !self.is_closed() {
*self = Closed {
reason: Reset(code),
};
}
return Ok(());
}
#[allow(
clippy::match_same_arms,
reason = "per-state arms kept for §5.1 legibility"
)]
let next = match (*self, ev) {
(Idle, SendHeaders { end_stream }) => {
if end_stream {
HalfClosedLocal
} else {
Open
}
}
(Idle, RecvHeaders { end_stream }) => {
if end_stream {
HalfClosedRemote
} else {
Open
}
}
(Open, SendHeaders { end_stream } | SendData { end_stream }) => {
if end_stream {
HalfClosedLocal
} else {
Open
}
}
(Open, RecvHeaders { end_stream } | RecvData { end_stream }) => {
if end_stream {
HalfClosedRemote
} else {
Open
}
}
(HalfClosedLocal, RecvHeaders { end_stream } | RecvData { end_stream }) => {
if end_stream {
Closed { reason: EndStream }
} else {
HalfClosedLocal
}
}
(HalfClosedRemote, SendHeaders { end_stream } | SendData { end_stream }) => {
if end_stream {
Closed { reason: EndStream }
} else {
HalfClosedRemote
}
}
(HalfClosedRemote, RecvHeaders { .. } | RecvData { .. })
| (Closed { .. }, RecvHeaders { .. } | RecvData { .. }) => {
return Err(StreamProtocolError::new(Stream, H2ErrorCode::StreamClosed));
}
(Idle, RecvData { .. }) => {
return Err(StreamProtocolError::new(
Connection,
H2ErrorCode::ProtocolError,
));
}
(_, SendHeaders { .. } | SendData { .. }) => {
debug_assert!(
false,
"illegal local h2 send transition: {self:?} <- {ev:?}"
);
return Ok(());
}
(_, SendReset(_) | RecvReset(_)) => unreachable!("resets handled above"),
};
*self = next;
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::{
CloseReason::{EndStream, Reset},
ErrorLevel::{Connection, Stream},
StreamEvent::{RecvData, RecvHeaders, RecvReset, SendData, SendHeaders, SendReset},
StreamLifecycle::{self, Closed, HalfClosedLocal, HalfClosedRemote, Idle, Open},
StreamProtocolError,
};
use crate::h2::H2ErrorCode;
fn step(
state: StreamLifecycle,
ev: super::StreamEvent,
) -> Result<StreamLifecycle, StreamProtocolError> {
let mut state = state;
state.on_event(ev)?;
Ok(state)
}
#[track_caller]
fn walk(start: StreamLifecycle, steps: &[(super::StreamEvent, StreamLifecycle)]) {
let mut state = start;
for (ev, expected) in steps {
state.on_event(*ev).expect("legal transition");
assert_eq!(state, *expected, "after {ev:?}");
}
}
#[test]
fn client_request_response_lifecycle() {
walk(
Idle,
&[
(SendHeaders { end_stream: false }, Open),
(SendData { end_stream: true }, HalfClosedLocal), (RecvHeaders { end_stream: false }, HalfClosedLocal), (RecvData { end_stream: true }, Closed { reason: EndStream }),
],
);
}
#[test]
fn server_request_response_lifecycle() {
walk(
Idle,
&[
(RecvHeaders { end_stream: false }, Open),
(RecvData { end_stream: true }, HalfClosedRemote), (SendHeaders { end_stream: false }, HalfClosedRemote), (SendData { end_stream: true }, Closed { reason: EndStream }),
],
);
}
#[test]
fn bodyless_get_collapses_through_half_closed() {
walk(
Idle,
&[
(RecvHeaders { end_stream: true }, HalfClosedRemote),
(
SendHeaders { end_stream: true },
Closed { reason: EndStream },
),
],
);
}
#[test]
fn bidi_upgrade_survives_peer_half_close_then_completes() {
walk(
Idle,
&[
(RecvHeaders { end_stream: false }, Open),
(SendHeaders { end_stream: false }, Open), (RecvData { end_stream: true }, HalfClosedRemote), (SendData { end_stream: false }, HalfClosedRemote), (SendData { end_stream: true }, Closed { reason: EndStream }),
],
);
}
#[test]
fn reset_from_any_live_state_closes_with_reason() {
for start in [Idle, Open, HalfClosedLocal, HalfClosedRemote] {
assert_eq!(
step(start, RecvReset(H2ErrorCode::Cancel)),
Ok(Closed {
reason: Reset(H2ErrorCode::Cancel)
}),
"peer RST from {start:?}",
);
assert_eq!(
step(start, SendReset(H2ErrorCode::InternalError)),
Ok(Closed {
reason: Reset(H2ErrorCode::InternalError)
}),
"local RST from {start:?}",
);
}
}
#[test]
fn late_reset_on_closed_stream_is_ignored() {
assert_eq!(
step(Closed { reason: EndStream }, RecvReset(H2ErrorCode::Cancel)),
Ok(Closed { reason: EndStream }),
);
let prior = Reset(H2ErrorCode::Cancel);
assert_eq!(
step(
Closed { reason: prior },
RecvReset(H2ErrorCode::InternalError)
),
Ok(Closed { reason: prior }),
);
}
#[test]
fn inbound_after_peer_end_stream_is_lenient_stream_error() {
let err = StreamProtocolError {
level: Stream,
code: H2ErrorCode::StreamClosed,
};
assert_eq!(
step(HalfClosedRemote, RecvData { end_stream: false }),
Err(err)
);
assert_eq!(
step(HalfClosedRemote, RecvData { end_stream: true }),
Err(err)
);
assert_eq!(
step(HalfClosedRemote, RecvHeaders { end_stream: true }),
Err(err)
);
}
#[test]
fn inbound_on_closed_stream_is_stream_error_regardless_of_reason() {
let err = StreamProtocolError {
level: Stream,
code: H2ErrorCode::StreamClosed,
};
assert_eq!(
step(Closed { reason: EndStream }, RecvData { end_stream: false }),
Err(err)
);
assert_eq!(
step(
Closed {
reason: Reset(H2ErrorCode::Cancel)
},
RecvHeaders { end_stream: true }
),
Err(err),
);
}
#[test]
fn peer_data_before_headers_is_connection_error() {
assert_eq!(
step(Idle, RecvData { end_stream: false }),
Err(StreamProtocolError {
level: Connection,
code: H2ErrorCode::ProtocolError
}),
);
}
#[test]
#[should_panic(expected = "illegal local h2 send transition")]
fn local_send_after_our_end_stream_asserts() {
let _ = step(HalfClosedLocal, SendData { end_stream: false });
}
}