use std::io::{BufRead, BufReader, Write};
use std::sync::{Arc, Mutex};
use std::time::Duration;
use camber::http::{Request, Response, Router, SseWriter};
#[tokio::test(flavor = "multi_thread")]
async fn serve_async_handles_request() {
let mut router = Router::new();
router.get("/", |_req: &Request| async { Response::text(200, "hello") });
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
tokio::spawn(camber::http::serve_async(listener, router));
let resp = reqwest::get(format!("http://{addr}/")).await.unwrap();
assert_eq!(resp.status(), 200);
assert_eq!(resp.text().await.unwrap(), "hello");
}
#[tokio::test(flavor = "multi_thread")]
async fn serve_async_sse_stream() {
let mut router = Router::new();
router.get_sse("/events", |_req: &Request, writer: &mut SseWriter| {
for i in 0..3 {
writer.event("message", &format!("data-{i}"))?;
}
Ok(())
});
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
tokio::spawn(camber::http::serve_async(listener, router));
let events = tokio::task::spawn_blocking(move || {
let mut stream = std::net::TcpStream::connect(addr).unwrap();
stream
.set_read_timeout(Some(Duration::from_secs(5)))
.unwrap();
write!(
stream,
"GET /events HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n\r\n"
)
.unwrap();
stream.flush().unwrap();
let mut reader = BufReader::new(stream);
let mut line = String::new();
loop {
line.clear();
reader.read_line(&mut line).unwrap();
if line.trim().is_empty() {
break;
}
}
read_sse_events(&mut reader, 3)
})
.await
.unwrap();
assert_eq!(events.len(), 3);
for (i, event) in events.iter().enumerate() {
assert_eq!(event, &format!("event: message\ndata: data-{i}"));
}
}
#[tokio::test(flavor = "multi_thread")]
async fn serve_async_shares_tokio_runtime() {
let (tx, rx) = tokio::sync::oneshot::channel::<bool>();
let tx = Arc::new(Mutex::new(Some(tx)));
let mut router = Router::new();
let tx_clone = Arc::clone(&tx);
router.get("/check", move |_req: &Request| {
let sender = tx_clone.lock().unwrap_or_else(|e| e.into_inner()).take();
async move {
match sender {
Some(tx) => {
tokio::runtime::Handle::current().spawn(async move {
let _ = tx.send(true);
});
Response::text(200, "spawned")
}
None => Response::text(200, "already sent"),
}
}
});
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
tokio::spawn(camber::http::serve_async(listener, router));
let resp = reqwest::get(format!("http://{addr}/check")).await.unwrap();
assert_eq!(resp.status(), 200);
let result = tokio::time::timeout(Duration::from_secs(2), rx)
.await
.expect("timeout waiting for spawned task")
.expect("oneshot channel dropped");
assert!(result);
}
fn read_sse_events(reader: &mut BufReader<std::net::TcpStream>, count: usize) -> Vec<String> {
let mut events = Vec::new();
let mut current = String::new();
let mut line = String::new();
loop {
line.clear();
match reader.read_line(&mut line) {
Ok(0) | Err(_) => break,
Ok(_) => {}
}
let trimmed = line.trim_end_matches(&['\r', '\n'][..]);
if !trimmed.is_empty() && trimmed.bytes().all(|b| b.is_ascii_hexdigit()) {
continue;
}
match trimmed.is_empty() {
true if !current.is_empty() => {
events.push(std::mem::take(&mut current));
if events.len() >= count {
break;
}
}
true => {}
false => {
if !current.is_empty() {
current.push('\n');
}
current.push_str(trimmed);
}
}
}
events
}