mod common;
use common::{spawn_test_server, spawn_test_server_with_limits, test_client, unique_stream_name};
#[tokio::test]
async fn test_single_json_value() {
let (base_url, _port) = spawn_test_server().await;
let client = test_client();
let stream_name = unique_stream_name();
client
.put(format!("{base_url}/v1/stream/{stream_name}"))
.header("Content-Type", "application/json")
.send()
.await
.unwrap();
client
.post(format!("{base_url}/v1/stream/{stream_name}"))
.header("Content-Type", "application/json")
.body(r#"{"event":"click","x":100}"#)
.send()
.await
.unwrap();
let response = client
.get(format!("{base_url}/v1/stream/{stream_name}"))
.send()
.await
.unwrap();
assert_eq!(response.status(), 200);
let body = response.text().await.unwrap();
assert_eq!(body, r#"[{"event":"click","x":100}]"#);
}
#[tokio::test]
async fn test_array_flattening() {
let (base_url, _port) = spawn_test_server().await;
let client = test_client();
let stream_name = unique_stream_name();
client
.put(format!("{base_url}/v1/stream/{stream_name}"))
.header("Content-Type", "application/json")
.send()
.await
.unwrap();
client
.post(format!("{base_url}/v1/stream/{stream_name}"))
.header("Content-Type", "application/json")
.body(r#"[{"event":"click","x":100},{"event":"scroll","y":50}]"#)
.send()
.await
.unwrap();
let response = client
.get(format!("{base_url}/v1/stream/{stream_name}"))
.send()
.await
.unwrap();
assert_eq!(response.status(), 200);
let body = response.text().await.unwrap();
assert_eq!(
body,
r#"[{"event":"click","x":100},{"event":"scroll","y":50}]"#
);
}
#[tokio::test]
async fn test_empty_array_rejection() {
let (base_url, _port) = spawn_test_server().await;
let client = test_client();
let stream_name = unique_stream_name();
client
.put(format!("{base_url}/v1/stream/{stream_name}"))
.header("Content-Type", "application/json")
.send()
.await
.unwrap();
let response = client
.post(format!("{base_url}/v1/stream/{stream_name}"))
.header("Content-Type", "application/json")
.body("[]")
.send()
.await
.unwrap();
assert_eq!(response.status(), 400);
}
#[tokio::test]
async fn test_invalid_json() {
let (base_url, _port) = spawn_test_server().await;
let client = test_client();
let stream_name = unique_stream_name();
client
.put(format!("{base_url}/v1/stream/{stream_name}"))
.header("Content-Type", "application/json")
.send()
.await
.unwrap();
let response = client
.post(format!("{base_url}/v1/stream/{stream_name}"))
.header("Content-Type", "application/json")
.body("{invalid json}")
.send()
.await
.unwrap();
assert_eq!(response.status(), 400);
}
#[tokio::test]
async fn test_nested_arrays_preserved() {
let (base_url, _port) = spawn_test_server().await;
let client = test_client();
let stream_name = unique_stream_name();
client
.put(format!("{base_url}/v1/stream/{stream_name}"))
.header("Content-Type", "application/json")
.send()
.await
.unwrap();
client
.post(format!("{base_url}/v1/stream/{stream_name}"))
.header("Content-Type", "application/json")
.body(r#"[{"tags":["a","b"]},{"items":[1,2,3]}]"#)
.send()
.await
.unwrap();
let response = client
.get(format!("{base_url}/v1/stream/{stream_name}"))
.send()
.await
.unwrap();
assert_eq!(response.status(), 200);
let body = response.text().await.unwrap();
assert_eq!(body, r#"[{"tags":["a","b"]},{"items":[1,2,3]}]"#);
}
#[tokio::test]
async fn test_empty_stream_returns_empty_array() {
let (base_url, _port) = spawn_test_server().await;
let client = test_client();
let stream_name = unique_stream_name();
client
.put(format!("{base_url}/v1/stream/{stream_name}"))
.header("Content-Type", "application/json")
.send()
.await
.unwrap();
let response = client
.get(format!("{base_url}/v1/stream/{stream_name}"))
.send()
.await
.unwrap();
assert_eq!(response.status(), 200);
let body = response.text().await.unwrap();
assert_eq!(body, "[]");
}
#[tokio::test]
async fn test_mixed_batching() {
let (base_url, _port) = spawn_test_server().await;
let client = test_client();
let stream_name = unique_stream_name();
client
.put(format!("{base_url}/v1/stream/{stream_name}"))
.header("Content-Type", "application/json")
.send()
.await
.unwrap();
client
.post(format!("{base_url}/v1/stream/{stream_name}"))
.header("Content-Type", "application/json")
.body(r#"{"a":1}"#)
.send()
.await
.unwrap();
client
.post(format!("{base_url}/v1/stream/{stream_name}"))
.header("Content-Type", "application/json")
.body(r#"[{"b":2},{"c":3}]"#)
.send()
.await
.unwrap();
client
.post(format!("{base_url}/v1/stream/{stream_name}"))
.header("Content-Type", "application/json")
.body(r#"{"d":4}"#)
.send()
.await
.unwrap();
let response = client
.get(format!("{base_url}/v1/stream/{stream_name}"))
.send()
.await
.unwrap();
assert_eq!(response.status(), 200);
let body = response.text().await.unwrap();
assert_eq!(body, r#"[{"a":1},{"b":2},{"c":3},{"d":4}]"#);
}
#[tokio::test]
async fn test_non_json_no_flattening() {
let (base_url, _port) = spawn_test_server().await;
let client = test_client();
let stream_name = unique_stream_name();
client
.put(format!("{base_url}/v1/stream/{stream_name}"))
.header("Content-Type", "text/plain")
.send()
.await
.unwrap();
client
.post(format!("{base_url}/v1/stream/{stream_name}"))
.header("Content-Type", "text/plain")
.body(r#"[{"a":1},{"b":2}]"#)
.send()
.await
.unwrap();
let response = client
.get(format!("{base_url}/v1/stream/{stream_name}"))
.send()
.await
.unwrap();
assert_eq!(response.status(), 200);
let body = response.text().await.unwrap();
assert_eq!(body, r#"[{"a":1},{"b":2}]"#); }
#[tokio::test]
async fn test_json_charset_ignored() {
let (base_url, _port) = spawn_test_server().await;
let client = test_client();
let stream_name = unique_stream_name();
client
.put(format!("{base_url}/v1/stream/{stream_name}"))
.header("Content-Type", "application/json; charset=utf-8")
.send()
.await
.unwrap();
let response = client
.post(format!("{base_url}/v1/stream/{stream_name}"))
.header("Content-Type", "application/json")
.body(r#"{"test":true}"#)
.send()
.await
.unwrap();
assert_eq!(response.status(), 204);
let response = client
.get(format!("{base_url}/v1/stream/{stream_name}"))
.send()
.await
.unwrap();
assert_eq!(response.status(), 200);
let body = response.text().await.unwrap();
assert_eq!(body, r#"[{"test":true}]"#);
}
#[tokio::test]
async fn test_json_array_append_is_atomic_on_limit_error() {
let (base_url, _port) = spawn_test_server_with_limits(1024 * 1024, 20).await;
let client = test_client();
let stream_name = unique_stream_name();
client
.put(format!("{base_url}/v1/stream/{stream_name}"))
.header("Content-Type", "application/json")
.send()
.await
.unwrap();
let response = client
.post(format!("{base_url}/v1/stream/{stream_name}"))
.header("Content-Type", "application/json")
.body(r#"["12345","12345678901234567890"]"#)
.send()
.await
.unwrap();
assert_eq!(response.status(), 413);
let response = client
.get(format!("{base_url}/v1/stream/{stream_name}"))
.send()
.await
.unwrap();
assert_eq!(response.status(), 200);
let body = response.text().await.unwrap();
assert_eq!(body, "[]");
}