1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
// Case count rationale: 50 cases each — P28 tests validation (fast), P29 tests
// handler-level aggregate check with large payloads (slower, needs higher body limit).
mod common;
use common::*;
use base64::{Engine, engine::general_purpose::STANDARD as BASE64};
use ferrokinesis::store::StoreOptions;
use proptest::prelude::*;
use serde_json::json;
/// P28: PutRecords with > 500 records returns ValidationException.
///
/// The validation layer rejects Records arrays with more than 500 entries
/// (len_lte(500)) before the request reaches dispatch.
#[test]
fn prop_put_records_exceeds_500_records() {
let rt = tokio::runtime::Runtime::new().unwrap();
let server = rt.block_on(TestServer::new());
let mut runner = prop_runner(50);
runner
.run(&(501u32..=550), |record_count| {
let records: Vec<serde_json::Value> = (0..record_count)
.map(|_| {
json!({
"Data": "dGVzdA==",
"PartitionKey": "k",
})
})
.collect();
let (status, body) = rt.block_on(async {
let res = server
.request(
"PutRecords",
&json!({
// Stream doesn't need to exist — validation rejects before dispatch.
"StreamName": "any-stream",
"Records": records,
}),
)
.await;
decode_body(res).await
});
prop_assert_eq!(status, 400, "expected 400 for {} records", record_count);
let err_type = body["__type"].as_str().unwrap_or("");
prop_assert!(
err_type.contains("ValidationException"),
"expected ValidationException for {} records, got {:?}",
record_count,
err_type,
);
Ok(())
})
.unwrap();
}
/// P29: PutRecords with > 5 MB total payload returns InvalidArgumentException.
///
/// Each record stays under 1 MB individually, but the aggregate decoded data +
/// partition key payload exceeds 5,242,880 bytes.
#[test]
fn prop_put_records_exceeds_5mb_total() {
let rt = tokio::runtime::Runtime::new().unwrap();
// Need a higher body limit since the JSON body with large base64 payloads exceeds
// axum's default 2 MiB limit.
let server = rt.block_on(TestServer::with_body_limit(
StoreOptions {
create_stream_ms: 0,
delete_stream_ms: 0,
update_stream_ms: 0,
shard_limit: 50,
..Default::default()
},
10 * 1024 * 1024,
));
let stream_name = unique_stream_name("prop-batch-5mb");
rt.block_on(server.create_stream(&stream_name, 4));
let mut runner = prop_runner(50);
// Generate 6-10 records, each sized so total exceeds 5 MB but each stays under 1 MB.
runner
.run(&(6u32..=10), |n| {
let data_size = 5_242_881usize / n as usize + 1;
let raw_bytes = vec![0x41u8; data_size];
let b64_data = BASE64.encode(&raw_bytes);
let records: Vec<serde_json::Value> = (0..n)
.map(|_| {
json!({
"Data": b64_data,
"PartitionKey": "k",
})
})
.collect();
let (status, body) = rt.block_on(async {
let res = server
.request(
"PutRecords",
&json!({
"StreamName": stream_name,
"Records": records,
}),
)
.await;
decode_body(res).await
});
prop_assert_eq!(
status,
400,
"expected 400 for {} records of {} decoded bytes each",
n,
data_size,
);
prop_assert_eq!(
body["__type"].as_str().unwrap_or(""),
"InvalidArgumentException",
"expected InvalidArgumentException for {} records totaling > 5 MB",
n,
);
Ok(())
})
.unwrap();
}