1use std::collections::HashMap;
20use std::sync::{Mutex, OnceLock};
21use std::time::{Duration, Instant};
22
23const DEFAULT_MAX_ROWS: usize = 10_000;
24const DEFAULT_IDEMPOTENCY_WINDOW_SECS: u64 = 300;
25const ENV_MAX_ROWS: &str = "RED_BATCH_MAX_ROWS";
26const ENV_IDEMPOTENCY_WINDOW_SECS: &str = "RED_BATCH_IDEMPOTENCY_WINDOW_SECS";
27
28#[derive(Debug, Clone, Copy)]
32pub struct BatchInsertConfig {
33 pub max_rows: usize,
34 pub idempotency_window: Duration,
35}
36
37impl BatchInsertConfig {
38 pub fn from_env() -> Self {
39 let max_rows = std::env::var(ENV_MAX_ROWS)
40 .ok()
41 .and_then(|v| v.parse::<usize>().ok())
42 .filter(|n| *n > 0)
43 .unwrap_or(DEFAULT_MAX_ROWS);
44 let window_secs = std::env::var(ENV_IDEMPOTENCY_WINDOW_SECS)
45 .ok()
46 .and_then(|v| v.parse::<u64>().ok())
47 .unwrap_or(DEFAULT_IDEMPOTENCY_WINDOW_SECS);
48 Self {
49 max_rows,
50 idempotency_window: Duration::from_secs(window_secs),
51 }
52 }
53}
54
55#[derive(Debug, Clone)]
60pub struct CachedResponse {
61 pub status: u16,
62 pub body: Vec<u8>,
63 expires_at: Instant,
64}
65
66pub struct BatchInsertCache {
71 inner: Mutex<HashMap<(String, String), CachedResponse>>,
72}
73
74impl Default for BatchInsertCache {
75 fn default() -> Self {
76 Self::new()
77 }
78}
79
80impl BatchInsertCache {
81 pub fn new() -> Self {
82 Self {
83 inner: Mutex::new(HashMap::new()),
84 }
85 }
86
87 pub fn lookup(&self, collection: &str, key: &str, now: Instant) -> Option<CachedResponse> {
88 let mut guard = self.inner.lock().ok()?;
89 guard.retain(|_, v| v.expires_at > now);
90 guard
91 .get(&(collection.to_string(), key.to_string()))
92 .cloned()
93 }
94
95 pub fn store(
96 &self,
97 collection: &str,
98 key: &str,
99 status: u16,
100 body: Vec<u8>,
101 window: Duration,
102 now: Instant,
103 ) {
104 let Ok(mut guard) = self.inner.lock() else {
105 return;
106 };
107 guard.retain(|_, v| v.expires_at > now);
108 guard.insert(
109 (collection.to_string(), key.to_string()),
110 CachedResponse {
111 status,
112 body,
113 expires_at: now + window,
114 },
115 );
116 }
117
118 #[cfg(test)]
119 pub fn len(&self) -> usize {
120 self.inner.lock().map(|g| g.len()).unwrap_or(0)
121 }
122}
123
124pub fn global_cache() -> &'static BatchInsertCache {
127 static CACHE: OnceLock<BatchInsertCache> = OnceLock::new();
128 CACHE.get_or_init(BatchInsertCache::new)
129}
130
131#[derive(Debug, Clone, PartialEq)]
132pub enum BatchInsertError {
133 BodyNotJsonArray,
135 BatchTooLarge { limit: usize, got: usize },
137 RowParseFailure { index: usize, reason: String },
139 RowSchemaRejected { index: usize, reason: String },
141}
142
143impl BatchInsertError {
144 pub fn http_status(&self) -> u16 {
145 match self {
146 Self::BatchTooLarge { .. } => 413,
147 _ => 400,
148 }
149 }
150
151 pub fn code(&self) -> &'static str {
152 match self {
153 Self::BodyNotJsonArray => "BadRequest",
154 Self::BatchTooLarge { .. } => "BatchTooLarge",
155 Self::RowParseFailure { .. } => "RowParseFailure",
156 Self::RowSchemaRejected { .. } => "RowSchemaRejected",
157 }
158 }
159
160 pub fn row_index(&self) -> Option<usize> {
161 match self {
162 Self::RowParseFailure { index, .. } | Self::RowSchemaRejected { index, .. } => {
163 Some(*index)
164 }
165 _ => None,
166 }
167 }
168
169 pub fn message(&self) -> String {
170 match self {
171 Self::BodyNotJsonArray => "request body must be a JSON array of rows".to_string(),
172 Self::BatchTooLarge { limit, got } => {
173 format!("batch size {got} exceeds red.batch.max_rows={limit}")
174 }
175 Self::RowParseFailure { index, reason } => {
176 format!("row {index} failed validation: {reason}")
177 }
178 Self::RowSchemaRejected { index, reason } => {
179 format!("row {index} rejected by AnalyticsSchemaRegistry: {reason}")
180 }
181 }
182 }
183}
184
185#[cfg(test)]
186mod tests {
187 use super::*;
188
189 #[test]
190 fn default_config_matches_brief() {
191 let cfg = BatchInsertConfig {
192 max_rows: DEFAULT_MAX_ROWS,
193 idempotency_window: Duration::from_secs(DEFAULT_IDEMPOTENCY_WINDOW_SECS),
194 };
195 assert_eq!(cfg.max_rows, 10_000);
196 assert_eq!(cfg.idempotency_window, Duration::from_secs(300));
197 }
198
199 #[test]
200 fn cache_returns_stored_within_window() {
201 let cache = BatchInsertCache::new();
202 let now = Instant::now();
203 cache.store(
204 "events",
205 "abc",
206 200,
207 b"{\"ok\":true,\"count\":3}".to_vec(),
208 Duration::from_secs(60),
209 now,
210 );
211 let hit = cache
212 .lookup("events", "abc", now + Duration::from_secs(30))
213 .expect("cached entry should still be live");
214 assert_eq!(hit.status, 200);
215 assert_eq!(hit.body, b"{\"ok\":true,\"count\":3}");
216 }
217
218 #[test]
219 fn cache_evicts_after_window() {
220 let cache = BatchInsertCache::new();
221 let now = Instant::now();
222 cache.store(
223 "events",
224 "abc",
225 200,
226 b"{}".to_vec(),
227 Duration::from_secs(60),
228 now,
229 );
230 assert!(cache
231 .lookup("events", "abc", now + Duration::from_secs(61))
232 .is_none());
233 assert_eq!(cache.len(), 0);
235 }
236
237 #[test]
238 fn cache_misses_for_unknown_key() {
239 let cache = BatchInsertCache::new();
240 assert!(cache.lookup("events", "never-stored", Instant::now()).is_none());
241 }
242
243 #[test]
244 fn cache_namespaced_by_collection() {
245 let cache = BatchInsertCache::new();
246 let now = Instant::now();
247 cache.store(
248 "c1",
249 "k",
250 200,
251 b"a".to_vec(),
252 Duration::from_secs(60),
253 now,
254 );
255 cache.store(
256 "c2",
257 "k",
258 200,
259 b"b".to_vec(),
260 Duration::from_secs(60),
261 now,
262 );
263 assert_eq!(cache.lookup("c1", "k", now).unwrap().body, b"a");
264 assert_eq!(cache.lookup("c2", "k", now).unwrap().body, b"b");
265 }
266
267 #[test]
268 fn batch_too_large_maps_to_413() {
269 let err = BatchInsertError::BatchTooLarge {
270 limit: 10,
271 got: 11,
272 };
273 assert_eq!(err.http_status(), 413);
274 assert_eq!(err.code(), "BatchTooLarge");
275 assert!(err.row_index().is_none());
276 assert!(err.message().contains("10"));
277 assert!(err.message().contains("11"));
278 }
279
280 #[test]
281 fn row_schema_rejected_carries_index() {
282 let err = BatchInsertError::RowSchemaRejected {
283 index: 3,
284 reason: "AnalyticsSchemaError:TypeMismatch:click:v1:user_id".to_string(),
285 };
286 assert_eq!(err.http_status(), 400);
287 assert_eq!(err.code(), "RowSchemaRejected");
288 assert_eq!(err.row_index(), Some(3));
289 assert!(err.message().contains("row 3"));
290 assert!(err.message().contains("TypeMismatch"));
291 }
292
293 #[test]
294 fn row_parse_failure_carries_index() {
295 let err = BatchInsertError::RowParseFailure {
296 index: 7,
297 reason: "missing fields".to_string(),
298 };
299 assert_eq!(err.http_status(), 400);
300 assert_eq!(err.row_index(), Some(7));
301 assert!(err.message().contains("row 7"));
302 }
303
304 #[test]
305 fn body_not_json_array_is_400() {
306 let err = BatchInsertError::BodyNotJsonArray;
307 assert_eq!(err.http_status(), 400);
308 assert!(err.row_index().is_none());
309 }
310}