#![allow(clippy::disallowed_methods)]
use std::io::{Read, Write};
use std::net::TcpStream;
use std::time::Duration;
fn server_host() -> String {
std::env::var("APR_TEST_SERVER_HOST").unwrap_or_else(|_| "127.0.0.1:8080".to_string())
}
#[test]
#[ignore = "requires running server: apr serve model.gguf --port 8080"]
fn d50_01_abrupt_disconnect_during_request() {
let host = server_host();
if let Ok(mut stream) = TcpStream::connect(&host) {
let _ = stream.set_write_timeout(Some(Duration::from_secs(1)));
let _ = stream.write_all(b"POST /v1/chat/completions HTTP/1.1\r\nHost: localhost\r\nContent-Length: 1000\r\n\r\n{\"partial\":");
drop(stream);
println!("Sent partial request and disconnected");
}
std::thread::sleep(Duration::from_millis(500));
if let Ok(mut stream) = TcpStream::connect(&host) {
let _ = stream.set_read_timeout(Some(Duration::from_secs(5)));
let _ = stream.set_write_timeout(Some(Duration::from_secs(1)));
let _ = stream
.write_all(b"GET /health HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n\r\n");
let _ = stream.flush();
let mut response = String::new();
let _ = stream.read_to_string(&mut response);
assert!(
response.contains("200") || response.contains("OK"),
"Server should respond to health check after client abort. Got: {response}"
);
println!(
"Server recovered: {}",
response.lines().next().unwrap_or("")
);
} else {
panic!("Server became unresponsive after client abort");
}
}
#[test]
#[ignore = "requires running server: apr serve model.gguf --port 8080"]
fn d50_02_multiple_abrupt_disconnects() {
let host = server_host();
for i in 0..10 {
if let Ok(mut stream) = TcpStream::connect(&host) {
let _ = stream.set_write_timeout(Some(Duration::from_millis(100)));
let _ = stream.write_all(b"GET /health HTTP/1.1\r\n");
drop(stream);
println!("Abort {}/10", i + 1);
}
std::thread::sleep(Duration::from_millis(50));
}
std::thread::sleep(Duration::from_secs(1));
if let Ok(mut stream) = TcpStream::connect(&host) {
let _ = stream.set_read_timeout(Some(Duration::from_secs(5)));
let _ = stream.set_write_timeout(Some(Duration::from_secs(1)));
let _ = stream
.write_all(b"GET /health HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n\r\n");
let _ = stream.flush();
let mut response = String::new();
let _ = stream.read_to_string(&mut response);
assert!(
response.contains("200"),
"Server should remain healthy after 10 abrupt disconnects"
);
println!("Server healthy after 10 aborts");
} else {
panic!("Server unresponsive after multiple aborts");
}
}
#[test]
#[ignore = "requires running server: apr serve model.gguf --port 8080"]
fn d50_03_disconnect_during_streaming() {
let host = server_host();
if let Ok(mut stream) = TcpStream::connect(&host) {
let _ = stream.set_write_timeout(Some(Duration::from_secs(1)));
let _ = stream.set_read_timeout(Some(Duration::from_millis(500)));
let body = r#"{"model":"test","messages":[{"role":"user","content":"Count from 1 to 100"}],"stream":true,"max_tokens":50}"#;
let request = format!(
"POST /v1/chat/completions HTTP/1.1\r\nHost: localhost\r\nContent-Type: application/json\r\nContent-Length: {}\r\n\r\n{}",
body.len(),
body
);
let _ = stream.write_all(request.as_bytes());
let _ = stream.flush();
let mut buffer = [0u8; 256];
if let Ok(n) = stream.read(&mut buffer) {
if n > 0 {
println!("Received {n} bytes before abort");
}
}
drop(stream);
println!("Disconnected during streaming");
}
std::thread::sleep(Duration::from_secs(2));
if let Ok(mut stream) = TcpStream::connect(&host) {
let _ = stream.set_read_timeout(Some(Duration::from_secs(5)));
let _ = stream.set_write_timeout(Some(Duration::from_secs(1)));
let _ = stream
.write_all(b"GET /health HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n\r\n");
let _ = stream.flush();
let mut response = String::new();
let _ = stream.read_to_string(&mut response);
assert!(
response.contains("200"),
"Server should be healthy after streaming abort"
);
println!("Server healthy after streaming abort");
} else {
panic!("Server unresponsive after streaming abort");
}
}
#[test]
#[ignore = "requires running server: apr serve model.gguf --port 8080"]
fn d50_04_concurrent_disconnects() {
use std::thread;
let host = server_host();
let handles: Vec<_> = (0..20)
.map(|i| {
let host = host.clone();
thread::spawn(move || {
if let Ok(mut stream) = TcpStream::connect(&host) {
let _ = stream.set_write_timeout(Some(Duration::from_millis(100)));
let _ = stream.write_all(
format!("GET /health HTTP/1.1\r\nX-Request: {i}\r\n").as_bytes(),
);
}
})
})
.collect();
for handle in handles {
let _ = handle.join();
}
println!("All 20 concurrent aborts completed");
std::thread::sleep(Duration::from_secs(1));
if let Ok(mut stream) = TcpStream::connect(&host) {
let _ = stream.set_read_timeout(Some(Duration::from_secs(5)));
let _ = stream.set_write_timeout(Some(Duration::from_secs(1)));
let _ = stream
.write_all(b"GET /health HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n\r\n");
let _ = stream.flush();
let mut response = String::new();
let _ = stream.read_to_string(&mut response);
assert!(
response.contains("200"),
"Server should handle concurrent disconnects"
);
println!("Server healthy after concurrent aborts");
} else {
panic!("Server unresponsive after concurrent aborts");
}
}
#[test]
#[ignore = "requires running server: apr serve model.gguf --port 8080"]
fn d50_05_idle_connection_timeout() {
let host = server_host();
let mut idle_streams = Vec::new();
for i in 0..5 {
if let Ok(stream) = TcpStream::connect(&host) {
println!("Opened idle connection {}", i + 1);
idle_streams.push(stream);
}
}
println!("Waiting for server timeout...");
std::thread::sleep(Duration::from_secs(5));
drop(idle_streams);
if let Ok(mut stream) = TcpStream::connect(&host) {
let _ = stream.set_read_timeout(Some(Duration::from_secs(5)));
let _ = stream.set_write_timeout(Some(Duration::from_secs(1)));
let _ = stream
.write_all(b"GET /health HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n\r\n");
let _ = stream.flush();
let mut response = String::new();
let _ = stream.read_to_string(&mut response);
assert!(
response.contains("200"),
"Server should handle idle connections gracefully"
);
println!("Server healthy after idle connections");
} else {
panic!("Server unresponsive after idle connections");
}
}
#[cfg(test)]
mod unit_tests {
use super::*;
#[test]
fn test_server_host_contains_port() {
let host = server_host();
assert!(
host.contains(':'),
"Server host should include port: {host}"
);
}
}