mod common;
use camber::http::{Request, Router};
use camber::runtime;
use std::io::{BufRead, BufReader, Write};
use std::net::TcpStream;
use std::time::Duration;
fn read_sse_events(reader: &mut BufReader<TcpStream>, count: usize) -> Vec<String> {
let mut events = Vec::new();
let mut current = String::new();
let mut line = String::new();
loop {
line.clear();
match reader.read_line(&mut line) {
Ok(0) | Err(_) => break,
Ok(_) => {}
}
let trimmed = line.trim_end_matches(&['\r', '\n'][..]);
if !trimmed.is_empty() && trimmed.bytes().all(|b| b.is_ascii_hexdigit()) {
continue;
}
match trimmed.is_empty() {
true if !current.is_empty() => {
events.push(std::mem::take(&mut current));
if events.len() >= count {
break;
}
}
true => {}
false => {
if !current.is_empty() {
current.push('\n');
}
current.push_str(trimmed);
}
}
}
events
}
fn skip_http_headers(reader: &mut BufReader<TcpStream>) {
let mut line = String::new();
loop {
line.clear();
match reader.read_line(&mut line) {
Ok(0) | Err(_) => break,
Ok(_) => {}
}
if line.trim().is_empty() {
break;
}
}
}
#[test]
fn sse_streams_multiple_events() {
common::test_runtime()
.shutdown_timeout(Duration::from_secs(2))
.run(|| {
let mut router = Router::new();
router.get_sse(
"/events",
|_req: &Request, writer: &mut camber::http::SseWriter| {
for i in 0..3 {
writer.event("message", &format!("data-{i}"))?;
}
Ok(())
},
);
let addr = common::spawn_server(router);
let mut stream = TcpStream::connect(addr).unwrap();
stream
.set_read_timeout(Some(Duration::from_secs(5)))
.unwrap();
write!(
stream,
"GET /events HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n\r\n"
)
.unwrap();
stream.flush().unwrap();
let mut reader = BufReader::new(stream);
let mut status_line = String::new();
reader.read_line(&mut status_line).unwrap();
assert!(
status_line.starts_with("HTTP/1.1 200"),
"expected 200, got: {status_line}"
);
let mut headers = Vec::new();
let mut line = String::new();
loop {
line.clear();
reader.read_line(&mut line).unwrap();
let trimmed = line.trim_end();
match trimmed.is_empty() {
true => break,
false => headers.push(trimmed.to_owned()),
}
}
let has_event_stream = headers.iter().any(|h| {
h.to_lowercase().contains("content-type") && h.contains("text/event-stream")
});
assert!(
has_event_stream,
"missing Content-Type: text/event-stream, got: {headers:?}"
);
let has_no_cache = headers
.iter()
.any(|h| h.to_lowercase().contains("cache-control") && h.contains("no-cache"));
assert!(
has_no_cache,
"missing Cache-Control: no-cache, got: {headers:?}"
);
let events = read_sse_events(&mut reader, 3);
assert_eq!(events.len(), 3, "expected 3 events, got: {events:?}");
for (i, event) in events.iter().enumerate() {
assert_eq!(
event,
&format!("event: message\ndata: data-{i}"),
"event {i} mismatch: {event}"
);
}
runtime::request_shutdown();
})
.unwrap();
}
#[test]
fn sse_route_ignores_request_body_limit() {
common::test_runtime()
.shutdown_timeout(Duration::from_secs(2))
.run(|| {
let mut router = Router::new().max_request_body(10);
router.get_sse(
"/events",
|_req: &Request, writer: &mut camber::http::SseWriter| {
writer.event("ping", "hello")?;
Ok(())
},
);
let addr = common::spawn_server(router);
let mut stream = TcpStream::connect(addr).unwrap();
stream
.set_read_timeout(Some(Duration::from_secs(5)))
.unwrap();
let body = "x".repeat(1024);
write!(
stream,
"GET /events HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\nContent-Length: {}\r\n\r\n{}",
body.len(),
body,
)
.unwrap();
stream.flush().unwrap();
let mut reader = BufReader::new(stream);
let mut status_line = String::new();
reader.read_line(&mut status_line).unwrap();
assert!(
status_line.starts_with("HTTP/1.1 200"),
"expected 200, got: {status_line}"
);
let mut headers_text = String::new();
loop {
let mut line = String::new();
reader.read_line(&mut line).unwrap();
let trimmed = line.trim_end();
match trimmed.is_empty() {
true => break,
false => {
headers_text.push_str(trimmed);
headers_text.push('\n');
}
}
}
let lower = headers_text.to_lowercase();
assert!(
lower.contains("text/event-stream"),
"expected text/event-stream, got headers: {headers_text}"
);
let events = read_sse_events(&mut reader, 1);
assert_eq!(events.len(), 1);
assert_eq!(events[0], "event: ping\ndata: hello");
runtime::request_shutdown();
})
.unwrap();
}
#[test]
fn sse_client_disconnect_stops_handler() {
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
common::test_runtime()
.shutdown_timeout(Duration::from_secs(2))
.run(|| {
let event_count = Arc::new(AtomicUsize::new(0));
let handler_count = Arc::clone(&event_count);
let mut router = Router::new();
router.get_sse(
"/stream",
move |_req: &Request, writer: &mut camber::http::SseWriter| {
loop {
handler_count.fetch_add(1, Ordering::SeqCst);
match writer.event("tick", "ping") {
Ok(()) => {}
Err(_) => return Ok(()),
}
std::thread::sleep(Duration::from_millis(50));
}
},
);
let addr = common::spawn_server(router);
{
let mut stream = TcpStream::connect(addr).unwrap();
stream
.set_read_timeout(Some(Duration::from_secs(5)))
.unwrap();
write!(
stream,
"GET /stream HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n\r\n"
)
.unwrap();
stream.flush().unwrap();
let mut reader = BufReader::new(stream);
skip_http_headers(&mut reader);
let events = read_sse_events(&mut reader, 2);
assert_eq!(events.len(), 2, "expected 2 events before disconnect");
}
std::thread::sleep(Duration::from_millis(300));
let final_count = event_count.load(Ordering::SeqCst);
std::thread::sleep(Duration::from_millis(200));
let after_wait = event_count.load(Ordering::SeqCst);
assert_eq!(
final_count, after_wait,
"handler should have stopped writing events"
);
runtime::request_shutdown();
})
.unwrap();
}