mod common;
use common::{spawn_test_server, test_client, unique_stream_name};
async fn setup_stream(base_url: &str, client: &reqwest::Client) -> String {
let name = unique_stream_name();
client
.put(format!("{base_url}/v1/stream/{name}"))
.header("Content-Type", "text/plain")
.send()
.await
.unwrap();
name
}
#[tokio::test]
async fn test_close_response_includes_stream_closed_header() {
let (base_url, _port) = spawn_test_server().await;
let client = test_client();
let name = setup_stream(&base_url, &client).await;
let response = client
.post(format!("{base_url}/v1/stream/{name}"))
.header("Content-Type", "text/plain")
.header("Stream-Closed", "true")
.send()
.await
.unwrap();
assert_eq!(response.status(), 204);
let closed = response
.headers()
.get("Stream-Closed")
.expect("Missing Stream-Closed header on close response")
.to_str()
.unwrap();
assert_eq!(closed, "true");
assert!(
response.headers().get("Stream-Next-Offset").is_some(),
"Missing Stream-Next-Offset on close response"
);
}
#[tokio::test]
async fn test_close_with_data_response_includes_stream_closed_header() {
let (base_url, _port) = spawn_test_server().await;
let client = test_client();
let name = setup_stream(&base_url, &client).await;
let response = client
.post(format!("{base_url}/v1/stream/{name}"))
.header("Content-Type", "text/plain")
.header("Stream-Closed", "true")
.body("final message")
.send()
.await
.unwrap();
assert_eq!(response.status(), 204);
let closed = response
.headers()
.get("Stream-Closed")
.expect("Missing Stream-Closed header on close-with-data response")
.to_str()
.unwrap();
assert_eq!(closed, "true");
let next_offset = response
.headers()
.get("Stream-Next-Offset")
.expect("Missing Stream-Next-Offset")
.to_str()
.unwrap();
assert_eq!(next_offset, "0000000000000001_000000000000000d");
}
#[tokio::test]
async fn test_idempotent_close_returns_204_with_headers() {
let (base_url, _port) = spawn_test_server().await;
let client = test_client();
let name = setup_stream(&base_url, &client).await;
client
.post(format!("{base_url}/v1/stream/{name}"))
.header("Content-Type", "text/plain")
.header("Stream-Closed", "true")
.send()
.await
.unwrap();
let response = client
.post(format!("{base_url}/v1/stream/{name}"))
.header("Content-Type", "text/plain")
.header("Stream-Closed", "true")
.send()
.await
.unwrap();
assert_eq!(response.status(), 204);
let closed = response
.headers()
.get("Stream-Closed")
.expect("Missing Stream-Closed on idempotent close")
.to_str()
.unwrap();
assert_eq!(closed, "true");
assert!(response.headers().get("Stream-Next-Offset").is_some());
}
#[tokio::test]
async fn test_put_created_closed_response_includes_stream_closed() {
let (base_url, _port) = spawn_test_server().await;
let client = test_client();
let name = unique_stream_name();
let response = client
.put(format!("{base_url}/v1/stream/{name}"))
.header("Content-Type", "text/plain")
.header("Stream-Closed", "true")
.send()
.await
.unwrap();
assert_eq!(response.status(), 201);
let closed = response
.headers()
.get("Stream-Closed")
.expect("Missing Stream-Closed on PUT create-closed response")
.to_str()
.unwrap();
assert_eq!(closed, "true");
}
#[tokio::test]
async fn test_put_idempotent_recreate_closed_includes_header() {
let (base_url, _port) = spawn_test_server().await;
let client = test_client();
let name = unique_stream_name();
client
.put(format!("{base_url}/v1/stream/{name}"))
.header("Content-Type", "text/plain")
.header("Stream-Closed", "true")
.send()
.await
.unwrap();
let response = client
.put(format!("{base_url}/v1/stream/{name}"))
.header("Content-Type", "text/plain")
.header("Stream-Closed", "true")
.send()
.await
.unwrap();
assert_eq!(response.status(), 200);
let closed = response
.headers()
.get("Stream-Closed")
.expect("Missing Stream-Closed on idempotent PUT of closed stream")
.to_str()
.unwrap();
assert_eq!(closed, "true");
}
#[tokio::test]
async fn test_closed_stream_reject_includes_next_offset() {
let (base_url, _port) = spawn_test_server().await;
let client = test_client();
let name = setup_stream(&base_url, &client).await;
client
.post(format!("{base_url}/v1/stream/{name}"))
.header("Content-Type", "text/plain")
.body("data")
.send()
.await
.unwrap();
client
.post(format!("{base_url}/v1/stream/{name}"))
.header("Content-Type", "text/plain")
.header("Stream-Closed", "true")
.send()
.await
.unwrap();
let response = client
.post(format!("{base_url}/v1/stream/{name}"))
.header("Content-Type", "text/plain")
.body("rejected")
.send()
.await
.unwrap();
assert_eq!(response.status(), 409);
let closed = response
.headers()
.get("Stream-Closed")
.expect("Missing Stream-Closed on 409 response")
.to_str()
.unwrap();
assert_eq!(closed, "true");
let next_offset = response
.headers()
.get("Stream-Next-Offset")
.expect("Missing Stream-Next-Offset on 409 response")
.to_str()
.unwrap();
assert_eq!(next_offset, "0000000000000001_0000000000000004");
}
#[tokio::test]
async fn test_close_with_non_true_value_ignored() {
let (base_url, _port) = spawn_test_server().await;
let client = test_client();
let name = setup_stream(&base_url, &client).await;
let response = client
.post(format!("{base_url}/v1/stream/{name}"))
.header("Content-Type", "text/plain")
.header("Stream-Closed", "false")
.body("still open")
.send()
.await
.unwrap();
assert_eq!(response.status(), 204);
assert!(
response.headers().get("Stream-Closed").is_none(),
"Stream-Closed should not be present when value is 'false'"
);
let response2 = client
.post(format!("{base_url}/v1/stream/{name}"))
.header("Content-Type", "text/plain")
.body("more data")
.send()
.await
.unwrap();
assert_eq!(
response2.status(),
204,
"Stream should still accept appends"
);
}
#[tokio::test]
async fn test_read_open_stream_omits_closed_header() {
let (base_url, _port) = spawn_test_server().await;
let client = test_client();
let name = setup_stream(&base_url, &client).await;
for msg in &["first", "second", "third"] {
client
.post(format!("{base_url}/v1/stream/{name}"))
.header("Content-Type", "text/plain")
.body(*msg)
.send()
.await
.unwrap();
}
let response = client
.get(format!("{base_url}/v1/stream/{name}?offset=-1"))
.send()
.await
.unwrap();
assert_eq!(response.status(), 200);
let up_to_date = response
.headers()
.get("Stream-Up-To-Date")
.unwrap()
.to_str()
.unwrap();
assert_eq!(up_to_date, "true", "Should be at tail after reading all");
assert!(
response.headers().get("Stream-Closed").is_none(),
"Stream-Closed must not be present on an open stream"
);
}
#[tokio::test]
async fn test_read_at_tail_includes_closed_header() {
let (base_url, _port) = spawn_test_server().await;
let client = test_client();
let name = setup_stream(&base_url, &client).await;
client
.post(format!("{base_url}/v1/stream/{name}"))
.header("Content-Type", "text/plain")
.body("hello")
.send()
.await
.unwrap();
let close_response = client
.post(format!("{base_url}/v1/stream/{name}"))
.header("Content-Type", "text/plain")
.header("Stream-Closed", "true")
.send()
.await
.unwrap();
let final_offset = close_response
.headers()
.get("Stream-Next-Offset")
.unwrap()
.to_str()
.unwrap()
.to_string();
let response = client
.get(format!("{base_url}/v1/stream/{name}?offset={final_offset}"))
.send()
.await
.unwrap();
assert_eq!(response.status(), 200);
let up_to_date = response
.headers()
.get("Stream-Up-To-Date")
.unwrap()
.to_str()
.unwrap();
assert_eq!(up_to_date, "true");
let closed = response
.headers()
.get("Stream-Closed")
.expect("Missing Stream-Closed at tail of closed stream")
.to_str()
.unwrap();
assert_eq!(closed, "true");
}