1use std::collections::HashMap;
20use std::sync::{Mutex, OnceLock};
21use std::time::{Duration, Instant};
22
23const DEFAULT_MAX_ROWS: usize = 10_000;
24const DEFAULT_IDEMPOTENCY_WINDOW_SECS: u64 = 300;
25const ENV_MAX_ROWS: &str = "RED_BATCH_MAX_ROWS";
26const ENV_IDEMPOTENCY_WINDOW_SECS: &str = "RED_BATCH_IDEMPOTENCY_WINDOW_SECS";
27
28#[derive(Debug, Clone, Copy)]
32pub struct BatchInsertConfig {
33 pub max_rows: usize,
34 pub idempotency_window: Duration,
35}
36
37impl BatchInsertConfig {
38 pub fn from_env() -> Self {
39 let max_rows = std::env::var(ENV_MAX_ROWS)
40 .ok()
41 .and_then(|v| v.parse::<usize>().ok())
42 .filter(|n| *n > 0)
43 .unwrap_or(DEFAULT_MAX_ROWS);
44 let window_secs = std::env::var(ENV_IDEMPOTENCY_WINDOW_SECS)
45 .ok()
46 .and_then(|v| v.parse::<u64>().ok())
47 .unwrap_or(DEFAULT_IDEMPOTENCY_WINDOW_SECS);
48 Self {
49 max_rows,
50 idempotency_window: Duration::from_secs(window_secs),
51 }
52 }
53}
54
55#[derive(Debug, Clone)]
60pub struct CachedResponse {
61 pub status: u16,
62 pub body: Vec<u8>,
63 expires_at: Instant,
64}
65
66pub struct BatchInsertCache {
71 inner: Mutex<HashMap<(String, String), CachedResponse>>,
72}
73
74impl Default for BatchInsertCache {
75 fn default() -> Self {
76 Self::new()
77 }
78}
79
80impl BatchInsertCache {
81 pub fn new() -> Self {
82 Self {
83 inner: Mutex::new(HashMap::new()),
84 }
85 }
86
87 pub fn lookup(&self, collection: &str, key: &str, now: Instant) -> Option<CachedResponse> {
88 let mut guard = self.inner.lock().ok()?;
89 guard.retain(|_, v| v.expires_at > now);
90 guard
91 .get(&(collection.to_string(), key.to_string()))
92 .cloned()
93 }
94
95 pub fn store(
96 &self,
97 collection: &str,
98 key: &str,
99 status: u16,
100 body: Vec<u8>,
101 window: Duration,
102 now: Instant,
103 ) {
104 let Ok(mut guard) = self.inner.lock() else {
105 return;
106 };
107 guard.retain(|_, v| v.expires_at > now);
108 guard.insert(
109 (collection.to_string(), key.to_string()),
110 CachedResponse {
111 status,
112 body,
113 expires_at: now + window,
114 },
115 );
116 }
117
118 #[cfg(test)]
119 pub fn len(&self) -> usize {
120 self.inner.lock().map(|g| g.len()).unwrap_or(0)
121 }
122}
123
124pub fn global_cache() -> &'static BatchInsertCache {
127 static CACHE: OnceLock<BatchInsertCache> = OnceLock::new();
128 CACHE.get_or_init(BatchInsertCache::new)
129}
130
131#[derive(Debug, Clone, PartialEq)]
132pub enum BatchInsertError {
133 BodyNotJsonArray,
135 BatchTooLarge { limit: usize, got: usize },
137 RowParseFailure { index: usize, reason: String },
139 RowSchemaRejected { index: usize, reason: String },
141}
142
143impl BatchInsertError {
144 pub fn http_status(&self) -> u16 {
145 match self {
146 Self::BatchTooLarge { .. } => 413,
147 _ => 400,
148 }
149 }
150
151 pub fn code(&self) -> &'static str {
152 match self {
153 Self::BodyNotJsonArray => "BadRequest",
154 Self::BatchTooLarge { .. } => "BatchTooLarge",
155 Self::RowParseFailure { .. } => "RowParseFailure",
156 Self::RowSchemaRejected { .. } => "RowSchemaRejected",
157 }
158 }
159
160 pub fn row_index(&self) -> Option<usize> {
161 match self {
162 Self::RowParseFailure { index, .. } | Self::RowSchemaRejected { index, .. } => {
163 Some(*index)
164 }
165 _ => None,
166 }
167 }
168
169 pub fn message(&self) -> String {
170 match self {
171 Self::BodyNotJsonArray => "request body must be a JSON array of rows".to_string(),
172 Self::BatchTooLarge { limit, got } => {
173 format!("batch size {got} exceeds red.batch.max_rows={limit}")
174 }
175 Self::RowParseFailure { index, reason } => {
176 format!("row {index} failed validation: {reason}")
177 }
178 Self::RowSchemaRejected { index, reason } => {
179 format!("row {index} rejected by AnalyticsSchemaRegistry: {reason}")
180 }
181 }
182 }
183}
184
185#[cfg(test)]
186mod tests {
187 use super::*;
188
189 #[test]
190 fn default_config_matches_brief() {
191 let cfg = BatchInsertConfig {
192 max_rows: DEFAULT_MAX_ROWS,
193 idempotency_window: Duration::from_secs(DEFAULT_IDEMPOTENCY_WINDOW_SECS),
194 };
195 assert_eq!(cfg.max_rows, 10_000);
196 assert_eq!(cfg.idempotency_window, Duration::from_secs(300));
197 }
198
199 #[test]
200 fn cache_returns_stored_within_window() {
201 let cache = BatchInsertCache::new();
202 let now = Instant::now();
203 cache.store(
204 "events",
205 "abc",
206 200,
207 b"{\"ok\":true,\"count\":3}".to_vec(),
208 Duration::from_secs(60),
209 now,
210 );
211 let hit = cache
212 .lookup("events", "abc", now + Duration::from_secs(30))
213 .expect("cached entry should still be live");
214 assert_eq!(hit.status, 200);
215 assert_eq!(hit.body, b"{\"ok\":true,\"count\":3}");
216 }
217
218 #[test]
219 fn cache_evicts_after_window() {
220 let cache = BatchInsertCache::new();
221 let now = Instant::now();
222 cache.store(
223 "events",
224 "abc",
225 200,
226 b"{}".to_vec(),
227 Duration::from_secs(60),
228 now,
229 );
230 assert!(cache
231 .lookup("events", "abc", now + Duration::from_secs(61))
232 .is_none());
233 assert_eq!(cache.len(), 0);
235 }
236
237 #[test]
238 fn cache_misses_for_unknown_key() {
239 let cache = BatchInsertCache::new();
240 assert!(cache
241 .lookup("events", "never-stored", Instant::now())
242 .is_none());
243 }
244
245 #[test]
246 fn cache_namespaced_by_collection() {
247 let cache = BatchInsertCache::new();
248 let now = Instant::now();
249 cache.store("c1", "k", 200, b"a".to_vec(), Duration::from_secs(60), now);
250 cache.store("c2", "k", 200, b"b".to_vec(), Duration::from_secs(60), now);
251 assert_eq!(cache.lookup("c1", "k", now).unwrap().body, b"a");
252 assert_eq!(cache.lookup("c2", "k", now).unwrap().body, b"b");
253 }
254
255 #[test]
256 fn batch_too_large_maps_to_413() {
257 let err = BatchInsertError::BatchTooLarge { limit: 10, got: 11 };
258 assert_eq!(err.http_status(), 413);
259 assert_eq!(err.code(), "BatchTooLarge");
260 assert!(err.row_index().is_none());
261 assert!(err.message().contains("10"));
262 assert!(err.message().contains("11"));
263 }
264
265 #[test]
266 fn row_schema_rejected_carries_index() {
267 let err = BatchInsertError::RowSchemaRejected {
268 index: 3,
269 reason: "AnalyticsSchemaError:TypeMismatch:click:v1:user_id".to_string(),
270 };
271 assert_eq!(err.http_status(), 400);
272 assert_eq!(err.code(), "RowSchemaRejected");
273 assert_eq!(err.row_index(), Some(3));
274 assert!(err.message().contains("row 3"));
275 assert!(err.message().contains("TypeMismatch"));
276 }
277
278 #[test]
279 fn row_parse_failure_carries_index() {
280 let err = BatchInsertError::RowParseFailure {
281 index: 7,
282 reason: "missing fields".to_string(),
283 };
284 assert_eq!(err.http_status(), 400);
285 assert_eq!(err.row_index(), Some(7));
286 assert!(err.message().contains("row 7"));
287 }
288
289 #[test]
290 fn body_not_json_array_is_400() {
291 let err = BatchInsertError::BodyNotJsonArray;
292 assert_eq!(err.http_status(), 400);
293 assert!(err.row_index().is_none());
294 }
295}