1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
//! `MultiPacketWorld` fixture for rstest-bdd tests.
//!
//! Provides test fixtures to verify message ordering, back-pressure handling,
//! and channel lifecycle.
use std::{error::Error, fmt};
use rstest::fixture;
use tokio::sync::mpsc::{self, error::TrySendError};
use tokio_util::sync::CancellationToken;
use wireframe::{connection::ConnectionActor, response::Response};
/// Re-export `TestResult` from `wireframe_testing` for use in steps.
pub use wireframe_testing::TestResult;
use crate::build_small_queues;
#[derive(Debug)]
struct WireframeRunError(wireframe::WireframeError);
impl fmt::Display for WireframeRunError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!(f, "{:?}", self.0) }
}
impl Error for WireframeRunError {}
#[derive(Debug, Default)]
/// Test world exercising multi-packet channel behaviours and back-pressure.
pub struct MultiPacketWorld {
messages: Vec<u8>,
is_overflow_error: bool,
}
// rustfmt collapses simple fixtures into one line, which triggers unused_braces.
#[rustfmt::skip]
#[fixture]
pub fn multi_packet_world() -> MultiPacketWorld {
MultiPacketWorld::default()
}
impl MultiPacketWorld {
async fn collect_frames_from(rx: mpsc::Receiver<u8>) -> TestResult<Vec<u8>> {
let (queues, handle) = build_small_queues::<u8>()?;
let shutdown = CancellationToken::new();
let mut actor: ConnectionActor<_, ()> =
ConnectionActor::new(queues, handle, None, shutdown);
actor
.set_multi_packet(Some(rx))
.map_err(|e| format!("set_multi_packet failed: {e}"))?;
let mut frames = Vec::new();
actor
.run(&mut frames)
.await
.map_err(WireframeRunError)
.map_err(Box::<dyn std::error::Error + Send + Sync>::from)?;
Ok(frames)
}
/// Send a single byte with back-pressure then close the channel.
async fn send_with_backpressure(sender: mpsc::Sender<u8>, value: u8) -> TestResult<()> {
sender.send(value).await?;
drop(sender);
Ok(())
}
/// Helper method to process messages through a multi-packet response built
/// via [`Response::with_channel`].
///
/// # Errors
/// Returns an error if the response cannot be converted to a multi-packet
/// stream or if producer tasks fail.
async fn process_messages(&mut self, messages: &[u8]) -> TestResult {
let (sender, response): (mpsc::Sender<u8>, Response<u8, ()>) = Response::with_channel(4);
let Response::MultiPacket(rx) = response else {
return Err("helper did not return a MultiPacket response".into());
};
let payload = messages.to_vec();
let producer = tokio::spawn(Self::send_payload(sender, payload));
let frames = Self::collect_frames_from(rx).await?;
producer.await?;
self.messages = frames;
self.is_overflow_error = false;
Ok(())
}
/// Send each byte to the channel, stopping silently if the receiver closes
/// to simulate a producer completing without error when the consumer is
/// gone.
async fn send_payload(sender: mpsc::Sender<u8>, payload: Vec<u8>) {
for msg in payload {
if sender.send(msg).await.is_err() {
return;
}
}
}
/// Send messages through a multi-packet response and record them.
///
/// # Errors
/// Returns an error if the response cannot be converted to a multi-packet
/// stream or if producer tasks fail.
pub async fn process(&mut self) -> TestResult { self.process_messages(&[1, 2, 3]).await }
/// Record zero messages from a closed channel.
///
/// # Errors
/// Returns an error if the response cannot be converted to a multi-packet
/// stream or if producer tasks fail.
pub async fn process_empty(&mut self) -> TestResult { self.process_messages(&[]).await }
/// Attempt to send more messages than the channel can buffer at once.
///
/// # Errors
/// Returns an error if sending to the channel fails unexpectedly or the
/// producer task returns an error.
pub async fn process_overflow(&mut self) -> TestResult {
let (sender, response): (mpsc::Sender<u8>, Response<u8, ()>) = Response::with_channel(1);
let Response::MultiPacket(rx) = response else {
return Err("helper did not return a MultiPacket response".into());
};
sender.try_send(1)?;
let overflow_error = matches!(sender.try_send(2), Err(TrySendError::Full(2)));
let producer = tokio::spawn(Self::send_with_backpressure(sender, 2));
let frames = Self::collect_frames_from(rx).await?;
// Unwrap JoinError from await, then the task's Result
producer.await??;
self.messages = frames;
self.is_overflow_error = overflow_error;
Ok(())
}
/// Verify that no messages were received.
///
/// # Panics
/// Panics if any messages are present.
pub fn verify_empty(&self) {
assert!(self.messages.is_empty());
}
/// Verify messages were received in order.
///
/// # Panics
///
/// Panics if the messages are not in the expected order.
pub fn verify(&self) {
assert_eq!(self.messages, vec![1, 2, 3]);
}
/// Verify that the channel enforced back-pressure.
///
/// # Panics
/// Panics if no overflow occurred or if the expected messages are missing.
pub fn verify_overflow(&self) {
assert!(
self.is_overflow_error,
"expected overflow error when channel capacity was exceeded",
);
assert_eq!(self.messages, vec![1, 2]);
}
}