mod common;
use common::{spawn_test_server, test_client, unique_stream_name};
#[tokio::test]
async fn test_read_returns_200() {
let (base_url, _port) = spawn_test_server().await;
let client = test_client();
let stream_name = unique_stream_name();
client
.put(format!("{base_url}/v1/stream/{stream_name}"))
.header("Content-Type", "text/plain")
.send()
.await
.unwrap();
client
.post(format!("{base_url}/v1/stream/{stream_name}"))
.header("Content-Type", "text/plain")
.body("Hello, world!")
.send()
.await
.unwrap();
let response = client
.get(format!("{base_url}/v1/stream/{stream_name}"))
.send()
.await
.unwrap();
assert_eq!(response.status(), 200, "Expected 200 OK");
let content_type = response
.headers()
.get("content-type")
.expect("Missing Content-Type header")
.to_str()
.unwrap();
assert_eq!(content_type, "text/plain");
let next_offset = response
.headers()
.get("Stream-Next-Offset")
.expect("Missing Stream-Next-Offset header")
.to_str()
.unwrap();
assert_eq!(next_offset, "0000000000000001_000000000000000d");
let up_to_date = response
.headers()
.get("Stream-Up-To-Date")
.expect("Missing Stream-Up-To-Date header")
.to_str()
.unwrap();
assert_eq!(up_to_date, "true");
let etag = response
.headers()
.get("etag")
.expect("Missing ETag header")
.to_str()
.unwrap();
assert_eq!(etag, "\"-1:0000000000000001_000000000000000d\"");
let cache_control = response
.headers()
.get("cache-control")
.expect("Missing Cache-Control header")
.to_str()
.unwrap();
assert_eq!(cache_control, "no-store");
let body = response.text().await.unwrap();
assert_eq!(body, "Hello, world!");
}
#[tokio::test]
async fn test_read_empty_stream() {
let (base_url, _port) = spawn_test_server().await;
let client = test_client();
let stream_name = unique_stream_name();
client
.put(format!("{base_url}/v1/stream/{stream_name}"))
.header("Content-Type", "text/plain")
.send()
.await
.unwrap();
let response = client
.get(format!("{base_url}/v1/stream/{stream_name}"))
.send()
.await
.unwrap();
assert_eq!(response.status(), 200);
let body = response.text().await.unwrap();
assert!(body.is_empty(), "Empty stream should return empty body");
}
#[tokio::test]
async fn test_read_multiple_messages_concatenated() {
let (base_url, _port) = spawn_test_server().await;
let client = test_client();
let stream_name = unique_stream_name();
client
.put(format!("{base_url}/v1/stream/{stream_name}"))
.header("Content-Type", "text/plain")
.send()
.await
.unwrap();
client
.post(format!("{base_url}/v1/stream/{stream_name}"))
.header("Content-Type", "text/plain")
.body("first")
.send()
.await
.unwrap();
client
.post(format!("{base_url}/v1/stream/{stream_name}"))
.header("Content-Type", "text/plain")
.body("second")
.send()
.await
.unwrap();
client
.post(format!("{base_url}/v1/stream/{stream_name}"))
.header("Content-Type", "text/plain")
.body("third")
.send()
.await
.unwrap();
let response = client
.get(format!("{base_url}/v1/stream/{stream_name}"))
.send()
.await
.unwrap();
let body = response.text().await.unwrap();
assert_eq!(body, "firstsecondthird", "Messages should be concatenated");
}
#[tokio::test]
async fn test_read_from_start_sentinel() {
let (base_url, _port) = spawn_test_server().await;
let client = test_client();
let stream_name = unique_stream_name();
client
.put(format!("{base_url}/v1/stream/{stream_name}"))
.header("Content-Type", "text/plain")
.send()
.await
.unwrap();
client
.post(format!("{base_url}/v1/stream/{stream_name}"))
.header("Content-Type", "text/plain")
.body("data")
.send()
.await
.unwrap();
let response = client
.get(format!("{base_url}/v1/stream/{stream_name}?offset=-1"))
.send()
.await
.unwrap();
let body = response.text().await.unwrap();
assert_eq!(body, "data");
}
#[tokio::test]
async fn test_read_from_now_sentinel() {
let (base_url, _port) = spawn_test_server().await;
let client = test_client();
let stream_name = unique_stream_name();
client
.put(format!("{base_url}/v1/stream/{stream_name}"))
.header("Content-Type", "text/plain")
.send()
.await
.unwrap();
client
.post(format!("{base_url}/v1/stream/{stream_name}"))
.header("Content-Type", "text/plain")
.body("data")
.send()
.await
.unwrap();
let response = client
.get(format!("{base_url}/v1/stream/{stream_name}?offset=now"))
.send()
.await
.unwrap();
let up_to_date = response
.headers()
.get("Stream-Up-To-Date")
.unwrap()
.to_str()
.unwrap()
.to_string();
let body = response.text().await.unwrap();
assert!(
body.is_empty(),
"Reading from 'now' should return empty body"
);
assert_eq!(up_to_date, "true");
}
#[tokio::test]
async fn test_resumable_reads() {
let (base_url, _port) = spawn_test_server().await;
let client = test_client();
let stream_name = unique_stream_name();
client
.put(format!("{base_url}/v1/stream/{stream_name}"))
.header("Content-Type", "text/plain")
.send()
.await
.unwrap();
client
.post(format!("{base_url}/v1/stream/{stream_name}"))
.header("Content-Type", "text/plain")
.body("first")
.send()
.await
.unwrap();
client
.post(format!("{base_url}/v1/stream/{stream_name}"))
.header("Content-Type", "text/plain")
.body("second")
.send()
.await
.unwrap();
let response1 = client
.get(format!("{base_url}/v1/stream/{stream_name}"))
.send()
.await
.unwrap();
let body1 = response1.text().await.unwrap();
assert_eq!(body1, "firstsecond");
client
.post(format!("{base_url}/v1/stream/{stream_name}"))
.header("Content-Type", "text/plain")
.body("third")
.send()
.await
.unwrap();
let response2 = client
.get(format!(
"{base_url}/v1/stream/{stream_name}?offset=0000000000000002_000000000000000b"
))
.send()
.await
.unwrap();
let body2 = response2.text().await.unwrap();
assert_eq!(
body2, "third",
"Should only return new data after resume offset"
);
}
#[tokio::test]
async fn test_read_your_writes() {
let (base_url, _port) = spawn_test_server().await;
let client = test_client();
let stream_name = unique_stream_name();
client
.put(format!("{base_url}/v1/stream/{stream_name}"))
.header("Content-Type", "text/plain")
.send()
.await
.unwrap();
client
.post(format!("{base_url}/v1/stream/{stream_name}"))
.header("Content-Type", "text/plain")
.body("immediate")
.send()
.await
.unwrap();
let response = client
.get(format!("{base_url}/v1/stream/{stream_name}"))
.send()
.await
.unwrap();
let body = response.text().await.unwrap();
assert_eq!(body, "immediate", "Read-your-writes must be consistent");
}
#[tokio::test]
async fn test_read_closed_stream_at_tail() {
let (base_url, _port) = spawn_test_server().await;
let client = test_client();
let stream_name = unique_stream_name();
client
.put(format!("{base_url}/v1/stream/{stream_name}"))
.header("Content-Type", "text/plain")
.send()
.await
.unwrap();
client
.post(format!("{base_url}/v1/stream/{stream_name}"))
.header("Content-Type", "text/plain")
.header("Stream-Closed", "true")
.body("final")
.send()
.await
.unwrap();
let response = client
.get(format!("{base_url}/v1/stream/{stream_name}"))
.send()
.await
.unwrap();
let closed = response
.headers()
.get("Stream-Closed")
.expect("Missing Stream-Closed header")
.to_str()
.unwrap()
.to_string();
let etag = response
.headers()
.get("etag")
.unwrap()
.to_str()
.unwrap()
.to_string();
let body = response.text().await.unwrap();
assert_eq!(body, "final");
assert_eq!(closed, "true");
assert!(
etag.ends_with(":c\""),
"ETag should have :c suffix for closed stream at tail"
);
}
#[tokio::test]
async fn test_if_none_match_returns_304() {
let (base_url, _port) = spawn_test_server().await;
let client = test_client();
let stream_name = unique_stream_name();
client
.put(format!("{base_url}/v1/stream/{stream_name}"))
.header("Content-Type", "text/plain")
.send()
.await
.unwrap();
client
.post(format!("{base_url}/v1/stream/{stream_name}"))
.header("Content-Type", "text/plain")
.body("data")
.send()
.await
.unwrap();
let response1 = client
.get(format!("{base_url}/v1/stream/{stream_name}"))
.send()
.await
.unwrap();
let etag = response1.headers().get("etag").unwrap().to_str().unwrap();
let response2 = client
.get(format!("{base_url}/v1/stream/{stream_name}"))
.header("If-None-Match", etag)
.send()
.await
.unwrap();
assert_eq!(response2.status(), 304, "Expected 304 Not Modified");
assert!(response2.headers().get("Stream-Next-Offset").is_some());
assert!(response2.headers().get("Stream-Up-To-Date").is_some());
}
#[tokio::test]
async fn test_read_nonexistent_stream_returns_404() {
let (base_url, _port) = spawn_test_server().await;
let client = test_client();
let stream_name = unique_stream_name();
let response = client
.get(format!("{base_url}/v1/stream/{stream_name}"))
.send()
.await
.unwrap();
assert_eq!(response.status(), 404, "Expected 404 Not Found");
}
#[tokio::test]
async fn test_invalid_offset_returns_400() {
let (base_url, _port) = spawn_test_server().await;
let client = test_client();
let stream_name = unique_stream_name();
client
.put(format!("{base_url}/v1/stream/{stream_name}"))
.header("Content-Type", "text/plain")
.send()
.await
.unwrap();
let response = client
.get(format!("{base_url}/v1/stream/{stream_name}?offset=invalid"))
.send()
.await
.unwrap();
assert_eq!(response.status(), 400, "Expected 400 Bad Request");
}
#[tokio::test]
async fn test_response_headers_match_body_snapshot() {
let (base_url, _port) = spawn_test_server().await;
let client = test_client();
let stream_name = unique_stream_name();
client
.put(format!("{base_url}/v1/stream/{stream_name}"))
.header("Content-Type", "text/plain")
.send()
.await
.unwrap();
client
.post(format!("{base_url}/v1/stream/{stream_name}"))
.header("Content-Type", "text/plain")
.body("message1")
.send()
.await
.unwrap();
let response1 = client
.get(format!("{base_url}/v1/stream/{stream_name}"))
.send()
.await
.unwrap();
let next_offset1 = response1
.headers()
.get("Stream-Next-Offset")
.unwrap()
.to_str()
.unwrap()
.to_string();
let etag1 = response1
.headers()
.get("etag")
.unwrap()
.to_str()
.unwrap()
.to_string();
let body1 = response1.text().await.unwrap();
assert_eq!(body1, "message1");
assert_eq!(next_offset1, "0000000000000001_0000000000000008");
assert_eq!(etag1, "\"-1:0000000000000001_0000000000000008\"");
client
.post(format!("{base_url}/v1/stream/{stream_name}"))
.header("Content-Type", "text/plain")
.body("message2")
.send()
.await
.unwrap();
let response2 = client
.get(format!("{base_url}/v1/stream/{stream_name}"))
.send()
.await
.unwrap();
let next_offset2 = response2
.headers()
.get("Stream-Next-Offset")
.unwrap()
.to_str()
.unwrap()
.to_string();
let etag2 = response2
.headers()
.get("etag")
.unwrap()
.to_str()
.unwrap()
.to_string();
let body2 = response2.text().await.unwrap();
assert_eq!(body2, "message1message2");
assert_eq!(next_offset2, "0000000000000002_0000000000000010");
assert_eq!(etag2, "\"-1:0000000000000002_0000000000000010\"");
let response3 = client
.get(format!(
"{base_url}/v1/stream/{stream_name}?offset={next_offset1}"
))
.send()
.await
.unwrap();
let body3 = response3.text().await.unwrap();
assert_eq!(
body3, "message2",
"Resuming from first read's next_offset should return only new data, \
proving the offset was consistent with the first read's body"
);
}