#![allow(
clippy::unwrap_used,
clippy::panic,
clippy::todo,
clippy::missing_panics_doc,
clippy::must_use_candidate,
missing_debug_implementations,
clippy::cast_precision_loss,
clippy::clone_on_ref_ptr,
clippy::match_same_arms,
clippy::items_after_statements,
unreachable_pub,
clippy::print_stdout,
clippy::similar_names
)]
mod common;
use std::collections::HashSet;
use std::time::Duration;
#[tokio::test]
async fn test_sub_batching_delivers_all_messages() {
let mut config = common::get_test_config();
config.websocket.message_fetch_batch_size = 20;
config.websocket.max_batch_bytes = 1024;
let app = common::TestApp::spawn_with_config(config).await;
let user_a = app.register_user(&common::generate_username("alice")).await;
let user_b = app.register_user(&common::generate_username("bob")).await;
let message_count = 20;
let mut expected_contents: Vec<Vec<u8>> = Vec::new();
for i in 0..message_count {
let content = format!("sub-batch message {i}").into_bytes();
expected_contents.push(content.clone());
app.send_message(&user_a.token, user_b.device_id, &content).await;
}
let mut ws = app.connect_ws(&user_b.token).await;
let mut received_contents: Vec<Vec<u8>> = Vec::new();
let timeout = Duration::from_secs(10);
let start = std::time::Instant::now();
while received_contents.len() < message_count && start.elapsed() < timeout {
if let Some(env) = ws.receive_envelope_timeout(Duration::from_millis(2000)).await {
received_contents.push(env.message);
} else {
break;
}
}
assert_eq!(
received_contents.len(),
message_count,
"Expected {message_count} messages, got {}",
received_contents.len()
);
let expected_set: HashSet<&[u8]> = expected_contents.iter().map(|v| v.as_slice()).collect();
let received_set: HashSet<&[u8]> = received_contents.iter().map(|v| v.as_slice()).collect();
assert_eq!(expected_set, received_set, "Message contents do not match");
}
#[tokio::test]
async fn test_oversized_single_message_still_delivered() {
let mut config = common::get_test_config();
config.websocket.message_fetch_batch_size = 10;
config.websocket.max_batch_bytes = 1024;
let app = common::TestApp::spawn_with_config(config).await;
let user_a = app.register_user(&common::generate_username("alice")).await;
let user_b = app.register_user(&common::generate_username("bob")).await;
let large_payload = vec![0xABu8; 4096];
app.send_message(&user_a.token, user_b.device_id, &large_payload).await;
let mut ws = app.connect_ws(&user_b.token).await;
let env = ws.receive_envelope().await.expect("Oversized message was not delivered");
assert_eq!(env.message, large_payload);
}
#[tokio::test]
async fn test_mixed_size_messages_all_delivered() {
let mut config = common::get_test_config();
config.websocket.message_fetch_batch_size = 20;
config.websocket.max_batch_bytes = 2048;
let app = common::TestApp::spawn_with_config(config).await;
let user_a = app.register_user(&common::generate_username("alice")).await;
let user_b = app.register_user(&common::generate_username("bob")).await;
let mut expected_contents: Vec<Vec<u8>> = Vec::new();
for i in 0..15 {
let content = if i % 3 == 0 {
vec![i as u8; 3000]
} else {
format!("small {i}").into_bytes()
};
expected_contents.push(content.clone());
app.send_message(&user_a.token, user_b.device_id, &content).await;
}
let mut ws = app.connect_ws(&user_b.token).await;
let mut received_contents: Vec<Vec<u8>> = Vec::new();
let timeout = Duration::from_secs(10);
let start = std::time::Instant::now();
while received_contents.len() < expected_contents.len() && start.elapsed() < timeout {
if let Some(env) = ws.receive_envelope_timeout(Duration::from_millis(2000)).await {
received_contents.push(env.message);
} else {
break;
}
}
assert_eq!(
received_contents.len(),
expected_contents.len(),
"Expected {} messages, got {}",
expected_contents.len(),
received_contents.len()
);
let expected_set: HashSet<&[u8]> = expected_contents.iter().map(|v| v.as_slice()).collect();
let received_set: HashSet<&[u8]> = received_contents.iter().map(|v| v.as_slice()).collect();
assert_eq!(expected_set, received_set, "Message contents do not match");
}
#[tokio::test]
async fn test_batch_delivery_default_config() {
let app = common::TestApp::spawn().await;
let user_a = app.register_user(&common::generate_username("alice")).await;
let user_b = app.register_user(&common::generate_username("bob")).await;
let message_count = 30;
let mut expected: Vec<Vec<u8>> = Vec::new();
for i in 0..message_count {
let content = format!("batch default {i}").into_bytes();
expected.push(content.clone());
app.send_message(&user_a.token, user_b.device_id, &content).await;
}
let mut ws = app.connect_ws(&user_b.token).await;
let mut received: Vec<Vec<u8>> = Vec::new();
let timeout = Duration::from_secs(10);
let start = std::time::Instant::now();
while received.len() < message_count && start.elapsed() < timeout {
if let Some(env) = ws.receive_envelope_timeout(Duration::from_millis(2000)).await {
received.push(env.message);
} else {
break;
}
}
assert_eq!(received.len(), message_count, "Expected {message_count} messages, got {}", received.len());
let expected_set: HashSet<&[u8]> = expected.iter().map(|v| v.as_slice()).collect();
let received_set: HashSet<&[u8]> = received.iter().map(|v| v.as_slice()).collect();
assert_eq!(expected_set, received_set, "Message contents do not match");
}