mod common;
use common::{spawn_test_server, test_client};
use durable_streams_server::Config;
use durable_streams_server::protocol::problem::ProblemDetails;
use std::sync::Arc;
use tokio::net::TcpListener;
#[tokio::test]
async fn nested_stream_name_full_lifecycle() {
let (base_url, _port) = spawn_test_server().await;
let client = test_client();
let stream_name = "slides/abc123";
let create = client
.put(format!("{base_url}/v1/stream/{stream_name}"))
.header("Content-Type", "text/plain")
.send()
.await
.expect("create request failed");
assert_eq!(create.status(), 201);
let location = create
.headers()
.get("location")
.expect("missing location header")
.to_str()
.unwrap();
assert!(
location.ends_with("/v1/stream/slides/abc123"),
"location should preserve nested name, got: {location}"
);
let append = client
.post(format!("{base_url}/v1/stream/{stream_name}"))
.header("Content-Type", "text/plain")
.body("hello nested world")
.send()
.await
.expect("append request failed");
assert_eq!(append.status(), 204);
let head = client
.head(format!("{base_url}/v1/stream/{stream_name}"))
.send()
.await
.expect("head request failed");
assert_eq!(head.status(), 200);
assert_eq!(
head.headers().get("content-type").unwrap().to_str().unwrap(),
"text/plain"
);
let read = client
.get(format!("{base_url}/v1/stream/{stream_name}"))
.send()
.await
.expect("read request failed");
assert_eq!(read.status(), 200);
let body = read.text().await.expect("failed to read body");
assert!(body.contains("hello nested world"));
let delete = client
.delete(format!("{base_url}/v1/stream/{stream_name}"))
.send()
.await
.expect("delete request failed");
assert_eq!(delete.status(), 204);
let head_after = client
.head(format!("{base_url}/v1/stream/{stream_name}"))
.send()
.await
.expect("head-after-delete request failed");
assert_eq!(head_after.status(), 404);
}
#[tokio::test]
async fn deeply_nested_stream_within_limits() {
let (base_url, _port) = spawn_test_server().await;
let client = test_client();
let stream_name = "org/project/type/resource-42";
let create = client
.put(format!("{base_url}/v1/stream/{stream_name}"))
.header("Content-Type", "application/json")
.send()
.await
.expect("create request failed");
assert_eq!(create.status(), 201);
}
#[tokio::test]
async fn stream_name_exceeding_segment_limit_rejected() {
let config = Config {
max_stream_name_segments: 3,
..Config::default()
};
let storage = Arc::new(durable_streams_server::InMemoryStorage::new(
config.max_memory_bytes,
config.max_stream_bytes,
));
let listener = TcpListener::bind("127.0.0.1:0")
.await
.expect("failed to bind");
let addr = listener.local_addr().expect("failed to read local addr");
let app = durable_streams_server::build_router(storage, &config);
tokio::spawn(async move {
axum::serve(listener, app).await.expect("server failed");
});
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
let client = test_client();
let base_url = format!("http://127.0.0.1:{}", addr.port());
let create = client
.put(format!("{base_url}/v1/stream/a/b/c/d"))
.header("Content-Type", "text/plain")
.send()
.await
.expect("create request failed");
assert_eq!(create.status(), 400);
let problem: ProblemDetails =
serde_json::from_str(&create.text().await.unwrap()).expect("problem parse failed");
assert_eq!(problem.code, "INVALID_STREAM_NAME");
let detail = problem.detail.as_deref().unwrap();
assert!(
detail.contains("4 path segments") && detail.contains("maximum of 3"),
"expected detail with actual count and limit, got: {detail}"
);
let create_ok = client
.put(format!("{base_url}/v1/stream/a/b/c"))
.header("Content-Type", "text/plain")
.send()
.await
.expect("create request failed");
assert_eq!(create_ok.status(), 201);
}
#[tokio::test]
async fn stream_name_exceeding_byte_limit_rejected() {
let config = Config {
max_stream_name_bytes: 20,
..Config::default()
};
let storage = Arc::new(durable_streams_server::InMemoryStorage::new(
config.max_memory_bytes,
config.max_stream_bytes,
));
let listener = TcpListener::bind("127.0.0.1:0")
.await
.expect("failed to bind");
let addr = listener.local_addr().expect("failed to read local addr");
let app = durable_streams_server::build_router(storage, &config);
tokio::spawn(async move {
axum::serve(listener, app).await.expect("server failed");
});
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
let client = test_client();
let base_url = format!("http://127.0.0.1:{}", addr.port());
let long_name = "this-name-is-definitely-too-long";
let create = client
.put(format!("{base_url}/v1/stream/{long_name}"))
.header("Content-Type", "text/plain")
.send()
.await
.expect("create request failed");
assert_eq!(create.status(), 400);
let problem: ProblemDetails =
serde_json::from_str(&create.text().await.unwrap()).expect("problem parse failed");
assert_eq!(problem.code, "INVALID_STREAM_NAME");
let detail = problem.detail.as_deref().unwrap();
assert!(
detail.contains("bytes") && detail.contains("maximum of 20"),
"expected detail with actual length and limit, got: {detail}"
);
}
#[tokio::test]
async fn empty_segment_rejected() {
let (base_url, _port) = spawn_test_server().await;
let client = test_client();
let response = client
.put(format!("{base_url}/v1/stream/a//b"))
.header("Content-Type", "text/plain")
.send()
.await
.expect("request failed");
assert_eq!(response.status(), 400);
let problem: ProblemDetails =
serde_json::from_str(&response.text().await.unwrap()).expect("problem parse failed");
assert_eq!(problem.code, "INVALID_STREAM_NAME");
assert!(
problem
.detail
.as_deref()
.unwrap()
.contains("empty segments"),
"expected empty segments detail, got: {:?}",
problem.detail
);
}
#[tokio::test]
async fn error_response_includes_instance() {
let config = Config {
max_stream_name_bytes: 5,
..Config::default()
};
let storage = Arc::new(durable_streams_server::InMemoryStorage::new(
config.max_memory_bytes,
config.max_stream_bytes,
));
let listener = TcpListener::bind("127.0.0.1:0")
.await
.expect("failed to bind");
let addr = listener.local_addr().expect("failed to read local addr");
let app = durable_streams_server::build_router(storage, &config);
tokio::spawn(async move {
axum::serve(listener, app).await.expect("server failed");
});
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
let client = test_client();
let base_url = format!("http://127.0.0.1:{}", addr.port());
let response = client
.put(format!("{base_url}/v1/stream/too-long-name"))
.header("Content-Type", "text/plain")
.send()
.await
.expect("request failed");
assert_eq!(response.status(), 400);
let problem: ProblemDetails =
serde_json::from_str(&response.text().await.unwrap()).expect("problem parse failed");
assert!(
problem.instance.is_some(),
"rejection response should include instance field"
);
assert!(
problem.instance.as_deref().unwrap().contains("/v1/stream/too-long-name"),
"instance should contain the request path, got: {:?}",
problem.instance
);
}
#[test]
fn config_validate_rejects_zero_max_stream_name_bytes() {
let config = Config {
max_stream_name_bytes: 0,
..Config::default()
};
let err = config.validate().unwrap_err();
assert!(
err.contains("max_stream_name_bytes"),
"expected error about max_stream_name_bytes, got: {err}"
);
}
#[test]
fn config_validate_rejects_zero_max_stream_name_segments() {
let config = Config {
max_stream_name_segments: 0,
..Config::default()
};
let err = config.validate().unwrap_err();
assert!(
err.contains("max_stream_name_segments"),
"expected error about max_stream_name_segments, got: {err}"
);
}
#[tokio::test]
async fn flat_stream_names_still_work() {
let (base_url, _port) = spawn_test_server().await;
let client = test_client();
let create = client
.put(format!("{base_url}/v1/stream/simple-name"))
.header("Content-Type", "text/plain")
.send()
.await
.expect("create request failed");
assert_eq!(create.status(), 201);
let head = client
.head(format!("{base_url}/v1/stream/simple-name"))
.send()
.await
.expect("head request failed");
assert_eq!(head.status(), 200);
}