use std::time::{Duration, Instant};
use rstest::rstest;
use super::{
connection_budgeted_state,
dual_budgeted_state,
in_flight_budgeted_state,
nz,
submit_first,
submit_first_at,
unbounded_state,
};
use crate::message_assembler::{
MessageAssemblyError,
MessageAssemblyState,
MessageKey,
test_helpers::continuation_header,
};
fn state_for_dimension(dimension: &str) -> MessageAssemblyState {
match dimension {
"connection" => connection_budgeted_state(),
"in_flight" => in_flight_budgeted_state(),
_ => panic!("unknown budget dimension: {dimension}"),
}
}
fn assert_budget_exceeded(err: &MessageAssemblyError, dimension: &str, key: u64, attempted: usize) {
match dimension {
"connection" => assert!(
matches!(
err,
MessageAssemblyError::ConnectionBudgetExceeded {
key: k,
attempted: a,
..
} if k == &MessageKey(key) && *a == attempted
),
"expected ConnectionBudgetExceeded(key={key}, attempted={attempted}), got: {err:?}"
),
"in_flight" => assert!(
matches!(
err,
MessageAssemblyError::InFlightBudgetExceeded {
key: k,
attempted: a,
..
} if k == &MessageKey(key) && *a == attempted
),
"expected InFlightBudgetExceeded(key={key}, attempted={attempted}), got: {err:?}"
),
_ => panic!("unknown budget dimension: {dimension}"),
}
}
fn assert_budget_exceeded_any(err: &MessageAssemblyError, dimension: &str) {
match dimension {
"connection" => assert!(
matches!(err, MessageAssemblyError::ConnectionBudgetExceeded { .. }),
"expected ConnectionBudgetExceeded, got: {err:?}"
),
"in_flight" => assert!(
matches!(err, MessageAssemblyError::InFlightBudgetExceeded { .. }),
"expected InFlightBudgetExceeded, got: {err:?}"
),
_ => panic!("unknown budget dimension: {dimension}"),
}
}
#[rstest]
#[case::connection("connection")]
#[case::in_flight("in_flight")]
fn budget_allows_frames_within_limit(#[case] dimension: &str) {
let mut state = state_for_dimension(dimension);
submit_first(&mut state, 1, &[0u8; 10], false).expect("within budget");
assert_eq!(state.total_buffered_bytes(), 10);
}
#[rstest]
#[case::connection("connection")]
#[case::in_flight("in_flight")]
fn budget_rejects_first_frame_exceeding_limit(#[case] dimension: &str) {
let mut state = state_for_dimension(dimension);
let err = submit_first(&mut state, 1, &[0u8; 21], false).expect_err("should reject");
assert_budget_exceeded(&err, dimension, 1, 21);
assert_eq!(state.buffered_count(), 0);
}
#[rstest]
#[case::connection("connection")]
#[case::in_flight("in_flight")]
fn budget_rejects_continuation_exceeding_limit(#[case] dimension: &str) {
let mut state = state_for_dimension(dimension);
submit_first(&mut state, 1, &[0u8; 10], false).expect("first frame");
let cont = continuation_header(1, 1, 11, true);
let err = state
.accept_continuation_frame(&cont, &[0u8; 11])
.expect_err("should reject");
assert_budget_exceeded_any(&err, dimension);
assert_eq!(state.buffered_count(), 0);
}
#[rstest]
fn connection_budget_frees_partial_on_violation(
#[from(connection_budgeted_state)] mut state: MessageAssemblyState,
) {
submit_first(&mut state, 1, &[0u8; 8], false).expect("first");
submit_first(&mut state, 2, &[0u8; 8], false).expect("second");
assert_eq!(state.buffered_count(), 2);
let cont = continuation_header(1, 1, 5, false);
let err = state
.accept_continuation_frame(&cont, &[0u8; 5])
.expect_err("should reject");
assert!(matches!(
err,
MessageAssemblyError::ConnectionBudgetExceeded { .. }
));
assert_eq!(state.buffered_count(), 1);
assert_eq!(state.total_buffered_bytes(), 8);
}
#[rstest]
fn dual_budget_in_flight_triggers_before_connection(
#[from(dual_budgeted_state)] mut state: MessageAssemblyState,
) {
let err = submit_first(&mut state, 1, &[0u8; 21], false).expect_err("should reject");
assert!(
matches!(err, MessageAssemblyError::InFlightBudgetExceeded { .. }),
"expected in-flight to trigger first, got: {err:?}"
);
}
#[test]
fn dual_budget_connection_triggers_when_in_flight_not_exceeded() {
let mut state = MessageAssemblyState::with_budgets(
nz(1024),
Duration::from_secs(30),
Some(nz(15)),
Some(nz(20)),
);
let err = submit_first(&mut state, 1, &[0u8; 16], false).expect_err("should reject");
assert!(
matches!(err, MessageAssemblyError::ConnectionBudgetExceeded { .. }),
"expected connection to trigger first, got: {err:?}"
);
}
#[rstest]
fn no_budgets_allows_large_frames(#[from(unbounded_state)] mut state: MessageAssemblyState) {
submit_first(&mut state, 1, &[0u8; 500], false).expect("no budget enforcement");
submit_first(&mut state, 2, &[0u8; 500], false).expect("no budget enforcement");
assert_eq!(state.total_buffered_bytes(), 1000);
}
#[rstest]
fn budget_violation_does_not_affect_other_assemblies(
#[from(connection_budgeted_state)] mut state: MessageAssemblyState,
) {
submit_first(&mut state, 1, &[0u8; 10], false).expect("first");
assert_eq!(state.buffered_count(), 1);
let err = submit_first(&mut state, 2, &[0u8; 11], false).expect_err("should reject");
assert!(matches!(
err,
MessageAssemblyError::ConnectionBudgetExceeded { .. }
));
assert_eq!(state.buffered_count(), 1);
assert_eq!(state.total_buffered_bytes(), 10);
let cont = continuation_header(1, 1, 3, true);
let msg = state
.accept_continuation_frame(&cont, b"fin")
.expect("cont")
.expect("complete");
assert_eq!(msg.body(), &[&[0u8; 10][..], b"fin"].concat());
}
#[test]
fn headroom_reclaimed_after_purge_allows_new_assembly() {
let mut state =
MessageAssemblyState::with_budgets(nz(1024), Duration::from_secs(30), Some(nz(20)), None);
let now = Instant::now();
submit_first_at(&mut state, 1, &[0u8; 15], now).expect("first");
assert_eq!(state.total_buffered_bytes(), 15);
let err = submit_first(&mut state, 2, &[0u8; 6], false).expect_err("over budget");
assert!(matches!(
err,
MessageAssemblyError::ConnectionBudgetExceeded { .. }
));
let future = now + Duration::from_secs(31);
state.purge_expired_at(future);
assert_eq!(state.total_buffered_bytes(), 0);
submit_first(&mut state, 3, &[0u8; 6], false).expect("within reclaimed budget");
assert_eq!(state.total_buffered_bytes(), 6);
}
#[test]
fn headroom_reclaimed_after_completion_allows_new_frame() {
let mut state =
MessageAssemblyState::with_budgets(nz(1024), Duration::from_secs(30), None, Some(nz(20)));
submit_first(&mut state, 1, &[0u8; 15], false).expect("first");
let cont = continuation_header(1, 1, 3, true);
state
.accept_continuation_frame(&cont, b"end")
.expect("cont")
.expect("complete");
assert_eq!(state.total_buffered_bytes(), 0);
submit_first(&mut state, 2, &[0u8; 20], false).expect("within reclaimed budget");
assert_eq!(state.total_buffered_bytes(), 20);
}
#[rstest]
fn single_frame_message_not_subject_to_aggregate_budgets(
#[from(connection_budgeted_state)] mut state: MessageAssemblyState,
) {
submit_first(&mut state, 1, &[0u8; 19], false).expect("first");
assert_eq!(state.total_buffered_bytes(), 19);
let msg = submit_first(&mut state, 2, b"big-single-frame-payload", true)
.expect("single frame accepted")
.expect("should complete immediately");
assert_eq!(msg.body(), b"big-single-frame-payload");
assert_eq!(state.total_buffered_bytes(), 19);
}