Skip to main content

reddb_server/runtime/
batch_insert.rs

1//! Issue #582 — Analytics slice 4: `BatchInsertEndpoint` HTTP support.
2//!
3//! Owns the config + in-memory idempotency cache that the
4//! `POST /collections/:name/batch` handler uses to:
5//!
6//! * reject batches exceeding `red.batch.max_rows` with `413
7//!   BatchTooLarge` before any storage work,
8//! * deduplicate requests carrying the same `Idempotency-Key` within
9//!   `red.batch.idempotency_window_secs`, returning the previously
10//!   cached response body verbatim,
11//! * report row-level validation failures by index so the caller can
12//!   pinpoint the row that broke the all-or-nothing commit.
13//!
14//! The actual commit + AnalyticsSchemaRegistry validation runs in the
15//! handler (`server::handlers_entity::handle_batch_insert`). This
16//! module only owns the pieces that need to be unit-testable without
17//! booting a full HTTP server.
18
19use std::collections::HashMap;
20use std::sync::{Mutex, OnceLock};
21use std::time::{Duration, Instant};
22
23const DEFAULT_MAX_ROWS: usize = 10_000;
24const DEFAULT_IDEMPOTENCY_WINDOW_SECS: u64 = 300;
25const ENV_MAX_ROWS: &str = "RED_BATCH_MAX_ROWS";
26const ENV_IDEMPOTENCY_WINDOW_SECS: &str = "RED_BATCH_IDEMPOTENCY_WINDOW_SECS";
27
28/// Knobs for the batch endpoint. Read from `RED_BATCH_*` env vars at
29/// process start; the `red.batch.*` config keys in the brief route to
30/// the same defaults until the broader config-overlay binding lands.
31#[derive(Debug, Clone, Copy)]
32pub struct BatchInsertConfig {
33    pub max_rows: usize,
34    pub idempotency_window: Duration,
35}
36
37impl BatchInsertConfig {
38    pub fn from_env() -> Self {
39        let max_rows = std::env::var(ENV_MAX_ROWS)
40            .ok()
41            .and_then(|v| v.parse::<usize>().ok())
42            .filter(|n| *n > 0)
43            .unwrap_or(DEFAULT_MAX_ROWS);
44        let window_secs = std::env::var(ENV_IDEMPOTENCY_WINDOW_SECS)
45            .ok()
46            .and_then(|v| v.parse::<u64>().ok())
47            .unwrap_or(DEFAULT_IDEMPOTENCY_WINDOW_SECS);
48        Self {
49            max_rows,
50            idempotency_window: Duration::from_secs(window_secs),
51        }
52    }
53}
54
55/// A response previously served for a given `(collection,
56/// idempotency-key)`. Stored verbatim so a replay returns byte-for-byte
57/// what the caller would have seen the first time, even if the
58/// underlying state has since drifted.
59#[derive(Debug, Clone)]
60pub struct CachedResponse {
61    pub status: u16,
62    pub body: Vec<u8>,
63    expires_at: Instant,
64}
65
66/// In-memory `(collection, idempotency-key) → CachedResponse` map.
67/// Pruned lazily on every `lookup`/`store` rather than via a background
68/// task — the working set is bounded by the `idempotency_window` and a
69/// `Mutex` is cheap enough at the batch-insert call rate.
70pub struct BatchInsertCache {
71    inner: Mutex<HashMap<(String, String), CachedResponse>>,
72}
73
74impl Default for BatchInsertCache {
75    fn default() -> Self {
76        Self::new()
77    }
78}
79
80impl BatchInsertCache {
81    pub fn new() -> Self {
82        Self {
83            inner: Mutex::new(HashMap::new()),
84        }
85    }
86
87    pub fn lookup(&self, collection: &str, key: &str, now: Instant) -> Option<CachedResponse> {
88        let mut guard = self.inner.lock().ok()?;
89        guard.retain(|_, v| v.expires_at > now);
90        guard
91            .get(&(collection.to_string(), key.to_string()))
92            .cloned()
93    }
94
95    pub fn store(
96        &self,
97        collection: &str,
98        key: &str,
99        status: u16,
100        body: Vec<u8>,
101        window: Duration,
102        now: Instant,
103    ) {
104        let Ok(mut guard) = self.inner.lock() else {
105            return;
106        };
107        guard.retain(|_, v| v.expires_at > now);
108        guard.insert(
109            (collection.to_string(), key.to_string()),
110            CachedResponse {
111                status,
112                body,
113                expires_at: now + window,
114            },
115        );
116    }
117
118    #[cfg(test)]
119    pub fn len(&self) -> usize {
120        self.inner.lock().map(|g| g.len()).unwrap_or(0)
121    }
122}
123
124/// Process-wide cache. The brief states the dedup window is "in-memory
125/// per primary" — one process, one cache.
126pub fn global_cache() -> &'static BatchInsertCache {
127    static CACHE: OnceLock<BatchInsertCache> = OnceLock::new();
128    CACHE.get_or_init(BatchInsertCache::new)
129}
130
131#[derive(Debug, Clone, PartialEq)]
132pub enum BatchInsertError {
133    /// Body did not deserialize to a JSON array.
134    BodyNotJsonArray,
135    /// `rows.len() > config.max_rows`.
136    BatchTooLarge { limit: usize, got: usize },
137    /// Parse / shape failure for row `index`.
138    RowParseFailure { index: usize, reason: String },
139    /// AnalyticsSchemaRegistry rejected the row at `index`.
140    RowSchemaRejected { index: usize, reason: String },
141}
142
143impl BatchInsertError {
144    pub fn http_status(&self) -> u16 {
145        match self {
146            Self::BatchTooLarge { .. } => 413,
147            _ => 400,
148        }
149    }
150
151    pub fn code(&self) -> &'static str {
152        match self {
153            Self::BodyNotJsonArray => "BadRequest",
154            Self::BatchTooLarge { .. } => "BatchTooLarge",
155            Self::RowParseFailure { .. } => "RowParseFailure",
156            Self::RowSchemaRejected { .. } => "RowSchemaRejected",
157        }
158    }
159
160    pub fn row_index(&self) -> Option<usize> {
161        match self {
162            Self::RowParseFailure { index, .. } | Self::RowSchemaRejected { index, .. } => {
163                Some(*index)
164            }
165            _ => None,
166        }
167    }
168
169    pub fn message(&self) -> String {
170        match self {
171            Self::BodyNotJsonArray => "request body must be a JSON array of rows".to_string(),
172            Self::BatchTooLarge { limit, got } => {
173                format!("batch size {got} exceeds red.batch.max_rows={limit}")
174            }
175            Self::RowParseFailure { index, reason } => {
176                format!("row {index} failed validation: {reason}")
177            }
178            Self::RowSchemaRejected { index, reason } => {
179                format!("row {index} rejected by AnalyticsSchemaRegistry: {reason}")
180            }
181        }
182    }
183}
184
185#[cfg(test)]
186mod tests {
187    use super::*;
188
189    #[test]
190    fn default_config_matches_brief() {
191        let cfg = BatchInsertConfig {
192            max_rows: DEFAULT_MAX_ROWS,
193            idempotency_window: Duration::from_secs(DEFAULT_IDEMPOTENCY_WINDOW_SECS),
194        };
195        assert_eq!(cfg.max_rows, 10_000);
196        assert_eq!(cfg.idempotency_window, Duration::from_secs(300));
197    }
198
199    #[test]
200    fn cache_returns_stored_within_window() {
201        let cache = BatchInsertCache::new();
202        let now = Instant::now();
203        cache.store(
204            "events",
205            "abc",
206            200,
207            b"{\"ok\":true,\"count\":3}".to_vec(),
208            Duration::from_secs(60),
209            now,
210        );
211        let hit = cache
212            .lookup("events", "abc", now + Duration::from_secs(30))
213            .expect("cached entry should still be live");
214        assert_eq!(hit.status, 200);
215        assert_eq!(hit.body, b"{\"ok\":true,\"count\":3}");
216    }
217
218    #[test]
219    fn cache_evicts_after_window() {
220        let cache = BatchInsertCache::new();
221        let now = Instant::now();
222        cache.store(
223            "events",
224            "abc",
225            200,
226            b"{}".to_vec(),
227            Duration::from_secs(60),
228            now,
229        );
230        assert!(cache
231            .lookup("events", "abc", now + Duration::from_secs(61))
232            .is_none());
233        // Lookup after expiry also prunes the entry from the map.
234        assert_eq!(cache.len(), 0);
235    }
236
237    #[test]
238    fn cache_misses_for_unknown_key() {
239        let cache = BatchInsertCache::new();
240        assert!(cache.lookup("events", "never-stored", Instant::now()).is_none());
241    }
242
243    #[test]
244    fn cache_namespaced_by_collection() {
245        let cache = BatchInsertCache::new();
246        let now = Instant::now();
247        cache.store(
248            "c1",
249            "k",
250            200,
251            b"a".to_vec(),
252            Duration::from_secs(60),
253            now,
254        );
255        cache.store(
256            "c2",
257            "k",
258            200,
259            b"b".to_vec(),
260            Duration::from_secs(60),
261            now,
262        );
263        assert_eq!(cache.lookup("c1", "k", now).unwrap().body, b"a");
264        assert_eq!(cache.lookup("c2", "k", now).unwrap().body, b"b");
265    }
266
267    #[test]
268    fn batch_too_large_maps_to_413() {
269        let err = BatchInsertError::BatchTooLarge {
270            limit: 10,
271            got: 11,
272        };
273        assert_eq!(err.http_status(), 413);
274        assert_eq!(err.code(), "BatchTooLarge");
275        assert!(err.row_index().is_none());
276        assert!(err.message().contains("10"));
277        assert!(err.message().contains("11"));
278    }
279
280    #[test]
281    fn row_schema_rejected_carries_index() {
282        let err = BatchInsertError::RowSchemaRejected {
283            index: 3,
284            reason: "AnalyticsSchemaError:TypeMismatch:click:v1:user_id".to_string(),
285        };
286        assert_eq!(err.http_status(), 400);
287        assert_eq!(err.code(), "RowSchemaRejected");
288        assert_eq!(err.row_index(), Some(3));
289        assert!(err.message().contains("row 3"));
290        assert!(err.message().contains("TypeMismatch"));
291    }
292
293    #[test]
294    fn row_parse_failure_carries_index() {
295        let err = BatchInsertError::RowParseFailure {
296            index: 7,
297            reason: "missing fields".to_string(),
298        };
299        assert_eq!(err.http_status(), 400);
300        assert_eq!(err.row_index(), Some(7));
301        assert!(err.message().contains("row 7"));
302    }
303
304    #[test]
305    fn body_not_json_array_is_400() {
306        let err = BatchInsertError::BodyNotJsonArray;
307        assert_eq!(err.http_status(), 400);
308        assert!(err.row_index().is_none());
309    }
310}