use std::{
num::NonZeroUsize,
time::{Duration, Instant},
};
use rstest::{fixture, rstest};
use crate::message_assembler::{
AssembledMessage,
CorrelationId,
EnvelopeId,
EnvelopeRouting,
FirstFrameHeader,
FirstFrameInput,
MessageAssemblyError,
MessageAssemblyState,
MessageKey,
MessageSeriesError,
test_helpers::{continuation_header, first_header, first_header_with_total},
};
#[fixture]
fn state_with_defaults() -> MessageAssemblyState {
MessageAssemblyState::new(
NonZeroUsize::new(1024).expect("non-zero"),
Duration::from_secs(30),
)
}
#[rstest]
fn state_tracks_single_message_assembly(
#[from(state_with_defaults)] mut state: MessageAssemblyState,
) {
let first = FirstFrameHeader {
message_key: MessageKey(1),
metadata_len: 2,
body_len: 5,
total_body_len: Some(10),
is_last: false,
};
let input = FirstFrameInput::new(
&first,
EnvelopeRouting::default(),
vec![0x01, 0x02],
b"hello",
)
.expect("valid input");
let result = state.accept_first_frame(input).expect("accept first frame");
assert!(result.is_none());
assert_eq!(state.buffered_count(), 1);
let cont = continuation_header(1, 1, 5, true);
let msg = state
.accept_continuation_frame(&cont, b"world")
.expect("accept continuation")
.expect("should complete");
assert_eq!(msg.message_key(), MessageKey(1));
assert_eq!(msg.metadata(), &[0x01, 0x02]);
assert_eq!(msg.body(), b"helloworld");
assert_eq!(state.buffered_count(), 0);
}
#[rstest]
fn state_tracks_multiple_interleaved_messages(
#[from(state_with_defaults)] mut state: MessageAssemblyState,
) {
let first1 = first_header(1, 2, false);
state
.accept_first_frame(
FirstFrameInput::new(&first1, EnvelopeRouting::default(), vec![], b"A1")
.expect("valid input"),
)
.expect("first frame 1");
let first2 = first_header(2, 2, false);
state
.accept_first_frame(
FirstFrameInput::new(&first2, EnvelopeRouting::default(), vec![], b"B1")
.expect("valid input"),
)
.expect("first frame 2");
assert_eq!(state.buffered_count(), 2);
let cont1 = continuation_header(1, 1, 2, true);
let msg1 = state
.accept_continuation_frame(&cont1, b"A2")
.expect("continuation 1")
.expect("message 1 should complete");
assert_eq!(msg1.body(), b"A1A2");
assert_eq!(state.buffered_count(), 1);
let cont2 = continuation_header(2, 1, 2, true);
let msg2 = state
.accept_continuation_frame(&cont2, b"B2")
.expect("continuation 2")
.expect("message 2 should complete");
assert_eq!(msg2.body(), b"B1B2");
assert_eq!(state.buffered_count(), 0);
}
#[rstest]
fn state_rejects_continuation_without_first_frame(
#[from(state_with_defaults)] mut state: MessageAssemblyState,
) {
let cont = continuation_header(99, 1, 4, false);
let err = state
.accept_continuation_frame(&cont, b"data")
.expect_err("should reject");
assert!(matches!(
err,
MessageAssemblyError::Series(MessageSeriesError::MissingFirstFrame {
key: MessageKey(99)
})
));
}
#[rstest]
fn state_rejects_duplicate_first_frame(
#[from(state_with_defaults)] mut state: MessageAssemblyState,
) {
let first = first_header(1, 5, false);
state
.accept_first_frame(
FirstFrameInput::new(&first, EnvelopeRouting::default(), vec![], b"hello")
.expect("valid input"),
)
.expect("first frame");
let err = state
.accept_first_frame(
FirstFrameInput::new(&first, EnvelopeRouting::default(), vec![], b"again")
.expect("valid input"),
)
.expect_err("should reject duplicate");
assert!(matches!(
err,
MessageAssemblyError::DuplicateFirstFrame { key: MessageKey(1) }
));
}
struct SizeLimitCase {
first_body_len: usize,
total_body_len: Option<usize>,
continuation_body_len: Option<usize>,
expected_attempted: usize,
}
#[rstest]
#[case::first_frame_exceeds_limit(SizeLimitCase {
first_body_len: 20,
total_body_len: None,
continuation_body_len: None,
expected_attempted: 20,
})]
#[case::continuation_exceeds_limit(SizeLimitCase {
first_body_len: 5,
total_body_len: None,
continuation_body_len: Some(10),
expected_attempted: 15,
})]
#[case::declared_total_exceeds_limit(SizeLimitCase {
first_body_len: 5,
total_body_len: Some(20),
continuation_body_len: None,
expected_attempted: 20,
})]
fn state_enforces_size_limit(#[case] params: SizeLimitCase) {
let mut state = MessageAssemblyState::new(
NonZeroUsize::new(10).expect("non-zero"),
Duration::from_secs(30),
);
let first_body = vec![0u8; params.first_body_len];
let first = match params.total_body_len {
Some(total) => first_header_with_total(1, params.first_body_len, total),
None => first_header(
1,
params.first_body_len,
params.continuation_body_len.is_none(),
),
};
let input = FirstFrameInput::new(&first, EnvelopeRouting::default(), vec![], &first_body)
.expect("valid input");
match params.continuation_body_len {
None => {
let err = state
.accept_first_frame(input)
.expect_err("should reject oversized first frame");
assert!(matches!(
err,
MessageAssemblyError::MessageTooLarge {
key: MessageKey(1),
attempted,
..
} if attempted == params.expected_attempted
));
}
Some(cont_len) => {
state.accept_first_frame(input).expect("first frame");
let cont = continuation_header(1, 1, cont_len, true);
let cont_body = vec![0u8; cont_len];
let err = state
.accept_continuation_frame(&cont, &cont_body)
.expect_err("should reject oversized continuation");
assert!(matches!(
err,
MessageAssemblyError::MessageTooLarge {
key: MessageKey(1),
attempted,
..
} if attempted == params.expected_attempted
));
}
}
assert_eq!(state.buffered_count(), 0);
}
#[test]
fn state_purges_expired_assemblies() {
let mut state = MessageAssemblyState::new(
NonZeroUsize::new(1024).expect("non-zero"),
Duration::from_secs(30),
);
let now = Instant::now();
let first = first_header(1, 5, false);
state
.accept_first_frame_at(
FirstFrameInput::new(&first, EnvelopeRouting::default(), vec![], b"hello")
.expect("valid input"),
now,
)
.expect("accept first frame");
assert_eq!(state.buffered_count(), 1);
let future = now + Duration::from_secs(31);
let evicted = state.purge_expired_at(future);
assert_eq!(evicted, vec![MessageKey(1)]);
assert_eq!(state.buffered_count(), 0);
}
#[rstest]
fn state_returns_single_frame_message_immediately(
#[from(state_with_defaults)] mut state: MessageAssemblyState,
) {
let first = FirstFrameHeader {
message_key: MessageKey(1),
metadata_len: 1,
body_len: 5,
total_body_len: None,
is_last: true, };
let msg = state
.accept_first_frame(
FirstFrameInput::new(&first, EnvelopeRouting::default(), vec![0xaa], b"hello")
.expect("valid input"),
)
.expect("accept first frame")
.expect("single frame should complete");
assert_eq!(msg.message_key(), MessageKey(1));
assert_eq!(msg.metadata(), &[0xaa]);
assert_eq!(msg.body(), b"hello");
assert_eq!(state.buffered_count(), 0);
}
#[rstest]
fn completed_assembly_preserves_first_frame_routing_metadata(
#[from(state_with_defaults)] mut state: MessageAssemblyState,
) {
let routing = EnvelopeRouting {
envelope_id: EnvelopeId(42),
correlation_id: Some(CorrelationId(99)),
};
let first = first_header(1, 5, false);
state
.accept_first_frame(
FirstFrameInput::new(&first, routing, vec![], b"hello").expect("valid input"),
)
.expect("first frame");
let cont = continuation_header(1, 1, 5, true);
let msg = state
.accept_continuation_frame(&cont, b"world")
.expect("accept continuation")
.expect("message should complete");
assert_eq!(
msg.routing().envelope_id,
EnvelopeId(42),
"envelope_id should come from first frame"
);
assert_eq!(
msg.routing().correlation_id,
Some(CorrelationId(99)),
"correlation_id should come from first frame"
);
}
#[rstest]
fn interleaved_assemblies_preserve_distinct_routing_metadata(
#[from(state_with_defaults)] mut state: MessageAssemblyState,
) {
let routing1 = EnvelopeRouting {
envelope_id: EnvelopeId(10),
correlation_id: Some(CorrelationId(100)),
};
let first1 = first_header(1, 2, false);
state
.accept_first_frame(
FirstFrameInput::new(&first1, routing1, vec![], b"A1").expect("valid input"),
)
.expect("first frame 1");
let routing2 = EnvelopeRouting {
envelope_id: EnvelopeId(20),
correlation_id: Some(CorrelationId(200)),
};
let first2 = first_header(2, 2, false);
state
.accept_first_frame(
FirstFrameInput::new(&first2, routing2, vec![], b"B1").expect("valid input"),
)
.expect("first frame 2");
let cont1 = continuation_header(1, 1, 2, true);
let msg1 = state
.accept_continuation_frame(&cont1, b"A2")
.expect("continuation 1")
.expect("message 1 should complete");
assert_eq!(msg1.routing().envelope_id, EnvelopeId(10));
assert_eq!(msg1.routing().correlation_id, Some(CorrelationId(100)));
assert_eq!(msg1.body(), b"A1A2");
let cont2 = continuation_header(2, 1, 2, true);
let msg2 = state
.accept_continuation_frame(&cont2, b"B2")
.expect("continuation 2")
.expect("message 2 should complete");
assert_eq!(msg2.routing().envelope_id, EnvelopeId(20));
assert_eq!(msg2.routing().correlation_id, Some(CorrelationId(200)));
assert_eq!(msg2.body(), b"B1B2");
}
#[test]
fn assembled_message_into_body() {
let msg = AssembledMessage::new(
MessageKey(1),
EnvelopeRouting::default(),
vec![0x01],
vec![0x02, 0x03],
);
let body = msg.into_body();
assert_eq!(body, vec![0x02, 0x03]);
}