mod common;
use common::{spawn_test_server, test_client, unique_stream_name};
#[tokio::test]
async fn test_create_stream_returns_201() {
let (base_url, _port) = spawn_test_server().await;
let client = test_client();
let stream_name = unique_stream_name();
let response = client
.put(format!("{base_url}/v1/stream/{stream_name}"))
.header("Content-Type", "text/plain")
.send()
.await
.expect("Failed to send request");
assert_eq!(response.status(), 201, "Expected 201 Created");
let location = response
.headers()
.get("location")
.expect("Missing Location header")
.to_str()
.unwrap();
assert!(
location.ends_with(&format!("/v1/stream/{stream_name}")),
"Location should end with /v1/stream/{{name}}, got: {location}"
);
assert!(
location.starts_with("http"),
"Location should be absolute URL, got: {location}"
);
let content_type = response
.headers()
.get("content-type")
.expect("Missing Content-Type header")
.to_str()
.unwrap();
assert_eq!(content_type, "text/plain");
let next_offset = response
.headers()
.get("Stream-Next-Offset")
.expect("Missing Stream-Next-Offset header")
.to_str()
.unwrap();
assert_eq!(next_offset, "0000000000000000_0000000000000000");
assert_eq!(
response
.headers()
.get("X-Content-Type-Options")
.unwrap()
.to_str()
.unwrap(),
"nosniff"
);
assert_eq!(
response
.headers()
.get("Cross-Origin-Resource-Policy")
.unwrap()
.to_str()
.unwrap(),
"cross-origin"
);
}
#[tokio::test]
async fn test_idempotent_create_returns_200() {
let (base_url, _port) = spawn_test_server().await;
let client = test_client();
let stream_name = unique_stream_name();
let response1 = client
.put(format!("{base_url}/v1/stream/{stream_name}"))
.header("Content-Type", "text/plain")
.send()
.await
.unwrap();
assert_eq!(response1.status(), 201);
let response2 = client
.put(format!("{base_url}/v1/stream/{stream_name}"))
.header("Content-Type", "text/plain")
.send()
.await
.unwrap();
assert_eq!(
response2.status(),
200,
"Expected 200 OK for idempotent create"
);
assert!(response2.headers().get("content-type").is_some());
assert!(response2.headers().get("Stream-Next-Offset").is_some());
}
#[tokio::test]
async fn test_config_mismatch_returns_409() {
let (base_url, _port) = spawn_test_server().await;
let client = test_client();
let stream_name = unique_stream_name();
client
.put(format!("{base_url}/v1/stream/{stream_name}"))
.header("Content-Type", "text/plain")
.send()
.await
.unwrap();
let response = client
.put(format!("{base_url}/v1/stream/{stream_name}"))
.header("Content-Type", "application/json")
.send()
.await
.unwrap();
assert_eq!(response.status(), 409, "Expected 409 Conflict");
}
#[tokio::test]
async fn test_put_with_body_creates_and_appends() {
let (base_url, _port) = spawn_test_server().await;
let client = test_client();
let stream_name = unique_stream_name();
let response = client
.put(format!("{base_url}/v1/stream/{stream_name}"))
.header("Content-Type", "text/plain")
.body("initial data")
.send()
.await
.unwrap();
assert_eq!(response.status(), 201, "Expected 201 Created");
let read_response = client
.get(format!("{base_url}/v1/stream/{stream_name}?offset=-1"))
.send()
.await
.unwrap();
assert_eq!(read_response.status(), 200);
let body = read_response.text().await.unwrap();
assert_eq!(body, "initial data");
}
#[tokio::test]
async fn test_missing_content_type_defaults_to_octet_stream() {
let (base_url, _port) = spawn_test_server().await;
let client = test_client();
let stream_name = unique_stream_name();
let response = client
.put(format!("{base_url}/v1/stream/{stream_name}"))
.send()
.await
.unwrap();
assert_eq!(
response.status(),
201,
"Expected 201 Created with default Content-Type"
);
let content_type = response
.headers()
.get("content-type")
.expect("Missing Content-Type header")
.to_str()
.unwrap();
assert_eq!(content_type, "application/octet-stream");
}
#[tokio::test]
async fn test_content_type_case_insensitive() {
let (base_url, _port) = spawn_test_server().await;
let client = test_client();
let stream_name = unique_stream_name();
let response1 = client
.put(format!("{base_url}/v1/stream/{stream_name}"))
.header("Content-Type", "text/plain")
.send()
.await
.unwrap();
assert_eq!(response1.status(), 201);
let response2 = client
.put(format!("{base_url}/v1/stream/{stream_name}"))
.header("Content-Type", "TEXT/PLAIN")
.send()
.await
.unwrap();
assert_eq!(
response2.status(),
200,
"Expected 200 for case-insensitive match"
);
}
#[tokio::test]
async fn test_content_type_charset_stripped() {
let (base_url, _port) = spawn_test_server().await;
let client = test_client();
let stream_name = unique_stream_name();
let response1 = client
.put(format!("{base_url}/v1/stream/{stream_name}"))
.header("Content-Type", "text/plain; charset=utf-8")
.send()
.await
.unwrap();
assert_eq!(response1.status(), 201);
let content_type = response1
.headers()
.get("content-type")
.unwrap()
.to_str()
.unwrap();
assert_eq!(content_type, "text/plain");
let response2 = client
.put(format!("{base_url}/v1/stream/{stream_name}"))
.header("Content-Type", "text/plain")
.send()
.await
.unwrap();
assert_eq!(response2.status(), 200);
}
#[tokio::test]
async fn test_ttl_validation_leading_zeros() {
let (base_url, _port) = spawn_test_server().await;
let client = test_client();
let stream_name = unique_stream_name();
let response = client
.put(format!("{base_url}/v1/stream/{stream_name}"))
.header("Content-Type", "text/plain")
.header("Stream-TTL", "0123") .send()
.await
.unwrap();
assert_eq!(
response.status(),
400,
"Expected 400 for leading zeros in TTL"
);
}
#[tokio::test]
async fn test_ttl_validation_floats() {
let (base_url, _port) = spawn_test_server().await;
let client = test_client();
let stream_name = unique_stream_name();
let response = client
.put(format!("{base_url}/v1/stream/{stream_name}"))
.header("Content-Type", "text/plain")
.header("Stream-TTL", "3600.5") .send()
.await
.unwrap();
assert_eq!(response.status(), 400, "Expected 400 for float TTL");
}
#[tokio::test]
async fn test_ttl_validation_scientific_notation() {
let (base_url, _port) = spawn_test_server().await;
let client = test_client();
let stream_name = unique_stream_name();
let response = client
.put(format!("{base_url}/v1/stream/{stream_name}"))
.header("Content-Type", "text/plain")
.header("Stream-TTL", "1e3") .send()
.await
.unwrap();
assert_eq!(
response.status(),
400,
"Expected 400 for scientific notation in TTL"
);
}
#[tokio::test]
async fn test_both_ttl_and_expires_at_returns_400() {
let (base_url, _port) = spawn_test_server().await;
let client = test_client();
let stream_name = unique_stream_name();
let response = client
.put(format!("{base_url}/v1/stream/{stream_name}"))
.header("Content-Type", "text/plain")
.header("Stream-TTL", "3600")
.header("Stream-Expires-At", "2025-12-31T23:59:59Z")
.send()
.await
.unwrap();
assert_eq!(
response.status(),
400,
"Expected 400 for both TTL and Expires-At"
);
}
#[tokio::test]
async fn test_valid_ttl() {
let (base_url, _port) = spawn_test_server().await;
let client = test_client();
let stream_name = unique_stream_name();
let response = client
.put(format!("{base_url}/v1/stream/{stream_name}"))
.header("Content-Type", "text/plain")
.header("Stream-TTL", "3600")
.send()
.await
.unwrap();
assert_eq!(response.status(), 201, "Expected 201 for valid TTL");
}
#[tokio::test]
async fn test_head_returns_metadata() {
let (base_url, _port) = spawn_test_server().await;
let client = test_client();
let stream_name = unique_stream_name();
client
.put(format!("{base_url}/v1/stream/{stream_name}"))
.header("Content-Type", "application/json")
.send()
.await
.unwrap();
let response = client
.head(format!("{base_url}/v1/stream/{stream_name}"))
.send()
.await
.unwrap();
assert_eq!(response.status(), 200, "Expected 200 OK");
let content_type = response
.headers()
.get("content-type")
.expect("Missing Content-Type header")
.to_str()
.unwrap();
assert_eq!(content_type, "application/json");
let next_offset = response
.headers()
.get("Stream-Next-Offset")
.expect("Missing Stream-Next-Offset header")
.to_str()
.unwrap();
assert_eq!(next_offset, "0000000000000000_0000000000000000");
let cache_control = response
.headers()
.get("cache-control")
.expect("Missing Cache-Control header")
.to_str()
.unwrap();
assert_eq!(cache_control, "no-store");
assert!(response.headers().get("Stream-Closed").is_none());
let body_bytes = response.bytes().await.unwrap();
assert!(body_bytes.is_empty(), "HEAD response should have no body");
}
#[tokio::test]
async fn test_head_nonexistent_returns_404() {
let (base_url, _port) = spawn_test_server().await;
let client = test_client();
let stream_name = unique_stream_name();
let response = client
.head(format!("{base_url}/v1/stream/{stream_name}"))
.send()
.await
.unwrap();
assert_eq!(
response.status(),
404,
"Expected 404 for non-existent stream"
);
}
#[tokio::test]
async fn test_head_includes_ttl_metadata() {
let (base_url, _port) = spawn_test_server().await;
let client = test_client();
let stream_name = unique_stream_name();
client
.put(format!("{base_url}/v1/stream/{stream_name}"))
.header("Content-Type", "text/plain")
.header("Stream-TTL", "7200")
.send()
.await
.unwrap();
let response = client
.head(format!("{base_url}/v1/stream/{stream_name}"))
.send()
.await
.unwrap();
assert_eq!(response.status(), 200);
let ttl = response
.headers()
.get("Stream-TTL")
.expect("Missing Stream-TTL header")
.to_str()
.unwrap()
.parse::<u64>()
.unwrap();
assert!((7198..=7200).contains(&ttl), "TTL should be close to 7200");
assert!(response.headers().get("Stream-Expires-At").is_some());
}
#[tokio::test]
async fn test_head_includes_closed_flag() {
let (base_url, _port) = spawn_test_server().await;
let client = test_client();
let stream_name = unique_stream_name();
client
.put(format!("{base_url}/v1/stream/{stream_name}"))
.header("Content-Type", "text/plain")
.header("Stream-Closed", "true")
.send()
.await
.unwrap();
let response = client
.head(format!("{base_url}/v1/stream/{stream_name}"))
.send()
.await
.unwrap();
assert_eq!(response.status(), 200);
let closed = response
.headers()
.get("Stream-Closed")
.expect("Missing Stream-Closed header")
.to_str()
.unwrap();
assert_eq!(closed, "true");
}
#[tokio::test]
async fn test_delete_stream_returns_204() {
let (base_url, _port) = spawn_test_server().await;
let client = test_client();
let stream_name = unique_stream_name();
client
.put(format!("{base_url}/v1/stream/{stream_name}"))
.header("Content-Type", "text/plain")
.send()
.await
.unwrap();
let response = client
.delete(format!("{base_url}/v1/stream/{stream_name}"))
.send()
.await
.unwrap();
assert_eq!(response.status(), 204, "Expected 204 No Content");
let head_response = client
.head(format!("{base_url}/v1/stream/{stream_name}"))
.send()
.await
.unwrap();
assert_eq!(head_response.status(), 404, "Expected 404 after deletion");
}
#[tokio::test]
async fn test_delete_nonexistent_returns_404() {
let (base_url, _port) = spawn_test_server().await;
let client = test_client();
let stream_name = unique_stream_name();
let response = client
.delete(format!("{base_url}/v1/stream/{stream_name}"))
.send()
.await
.unwrap();
assert_eq!(
response.status(),
404,
"Expected 404 for non-existent stream"
);
}
#[tokio::test]
async fn test_recreate_after_delete_with_different_config() {
let (base_url, _port) = spawn_test_server().await;
let client = test_client();
let stream_name = unique_stream_name();
client
.put(format!("{base_url}/v1/stream/{stream_name}"))
.header("Content-Type", "text/plain")
.send()
.await
.unwrap();
client
.delete(format!("{base_url}/v1/stream/{stream_name}"))
.send()
.await
.unwrap();
let response = client
.put(format!("{base_url}/v1/stream/{stream_name}"))
.header("Content-Type", "application/json")
.send()
.await
.unwrap();
assert_eq!(
response.status(),
201,
"Expected 201 for recreation with different config"
);
let head_response = client
.head(format!("{base_url}/v1/stream/{stream_name}"))
.send()
.await
.unwrap();
let content_type = head_response
.headers()
.get("content-type")
.unwrap()
.to_str()
.unwrap();
assert_eq!(content_type, "application/json");
}