mod common;
use futures_util::{SinkExt, StreamExt};
use std::time::Duration;
use tokio_tungstenite::tungstenite::Message;
#[tokio::test]
#[ignore]
async fn test_rest_oversized_body_rejected() {
let model_dir = common::model_dir();
let (port, shutdown) = common::start_server(&model_dir).await;
let client = reqwest::Client::builder()
.build()
.expect("Failed to build reqwest client");
let oversized_body: Vec<u8> = vec![0u8; 51 * 1024 * 1024];
let response = client
.post(format!("http://127.0.0.1:{port}/v1/transcribe"))
.body(oversized_body)
.send()
.await
.expect("Request should complete (connection not refused)");
assert_eq!(
response.status().as_u16(),
413,
"Expected 413 Payload Too Large for oversized body"
);
let body_text = response
.text()
.await
.expect("Response body should be readable");
if let Ok(body) = serde_json::from_str::<serde_json::Value>(&body_text) {
assert_eq!(
body["code"], "payload_too_large",
"Handler guard body must carry code='payload_too_large', got: {body}"
);
}
let _ = shutdown.send(());
}
#[tokio::test]
#[ignore]
async fn test_ws_oversized_frame_rejected() {
let model_dir = common::model_dir();
let (port, shutdown) = common::start_server(&model_dir).await;
let (mut ws, _) = tokio_tungstenite::connect_async_with_config(
format!("ws://127.0.0.1:{port}/ws"),
Some({
let mut cfg = tokio_tungstenite::tungstenite::protocol::WebSocketConfig::default();
cfg.max_message_size = None;
cfg.max_frame_size = None;
cfg
}),
false,
)
.await
.expect("WebSocket connection failed");
let _ready = tokio::time::timeout(Duration::from_secs(5), ws.next())
.await
.expect("timeout waiting for Ready")
.expect("stream ended")
.expect("ws error");
let oversized: Vec<u8> = vec![0u8; 600 * 1024];
ws.send(Message::Binary(oversized.into()))
.await
.expect("send should succeed on client side");
let next = tokio::time::timeout(Duration::from_secs(5), ws.next()).await;
match next {
Ok(Some(Ok(Message::Close(_)))) | Ok(None) => {
}
Ok(Some(Err(_))) => {
}
Ok(Some(Ok(other))) => {
panic!("Expected close after oversized frame, got: {other:?}");
}
Err(_) => {
panic!("Timeout waiting for server to close connection after oversized frame");
}
}
let health = reqwest::get(format!("http://127.0.0.1:{port}/health"))
.await
.expect("Health check failed after oversized frame test");
assert!(health.status().is_success(), "Server unhealthy after test");
let _ = shutdown.send(());
}
#[tokio::test]
#[ignore]
async fn test_ws_fifth_client_hangs() {
let model_dir = common::model_dir();
let (port, shutdown) = common::start_server(&model_dir).await;
let mut clients = Vec::new();
for _ in 0..4 {
let (sink, stream, _ready) = common::ws_connect(port).await;
clients.push((sink, stream));
}
let (mut fifth_ws, _) = tokio_tungstenite::connect_async(format!("ws://127.0.0.1:{port}/ws"))
.await
.expect("TCP connection for 5th client should succeed");
let result = tokio::time::timeout(Duration::from_secs(3), fifth_ws.next()).await;
assert!(
result.is_err(),
"5th client should NOT receive Ready while pool is saturated, but got: {result:?}"
);
let stop_json = serde_json::to_string(&serde_json::json!({"type": "stop"})).unwrap();
for (mut sink, mut stream) in clients {
sink.send(Message::Text(stop_json.clone().into()))
.await
.unwrap();
let _ = tokio::time::timeout(Duration::from_secs(5), stream.next()).await;
}
let _ = shutdown.send(());
}
#[tokio::test]
#[ignore]
async fn test_rest_saturated_pool_returns_503() {
let model_dir = common::model_dir();
let (port, shutdown) = common::start_server(&model_dir).await;
let mut clients = Vec::new();
for _ in 0..4 {
let (sink, stream, _ready) = common::ws_connect(port).await;
clients.push((sink, stream));
}
let wav = common::generate_wav(1, 16000);
let client = reqwest::Client::new();
let response = tokio::time::timeout(
Duration::from_secs(35),
client
.post(format!("http://127.0.0.1:{port}/v1/transcribe"))
.body(wav)
.send(),
)
.await
.expect("Test timed out before server returned 503 — check pool timeout in http.rs")
.expect("HTTP request failed");
assert_eq!(
response.status().as_u16(),
503,
"Expected 503 Service Unavailable when pool is saturated"
);
let body_text = response
.text()
.await
.expect("Response body should be readable");
let body: serde_json::Value =
serde_json::from_str(&body_text).expect("Response body should be JSON");
assert_eq!(
body["code"], "timeout",
"Expected code='timeout', got: {body}"
);
let stop_json = serde_json::to_string(&serde_json::json!({"type": "stop"})).unwrap();
for (mut sink, mut stream) in clients {
sink.send(Message::Text(stop_json.clone().into()))
.await
.unwrap();
let _ = tokio::time::timeout(Duration::from_secs(5), stream.next()).await;
}
let _ = shutdown.send(());
}
#[tokio::test]
#[ignore]
async fn test_ws_idle_timeout() {
let model_dir = common::model_dir();
let (port, shutdown) = common::start_server(&model_dir).await;
let (mut ws, _) = tokio_tungstenite::connect_async(format!("ws://127.0.0.1:{port}/ws"))
.await
.expect("WebSocket connection failed");
let _ready = tokio::time::timeout(Duration::from_secs(5), ws.next())
.await
.expect("timeout waiting for Ready")
.expect("stream ended")
.expect("ws error");
let result = tokio::time::timeout(Duration::from_secs(310), ws.next()).await;
match result {
Ok(None) => {
}
Ok(Some(Ok(Message::Close(_)))) => {
}
Ok(Some(Err(_))) => {
}
Ok(Some(Ok(other))) => {
panic!("Expected idle-timeout close, got unexpected message: {other:?}");
}
Err(_) => {
panic!(
"Server did not close the idle connection within 310 seconds \
(expected 300-second idle timeout)"
);
}
}
let _ = shutdown.send(());
}