Skip to main content

nodedb_query/msgpack_scan/
index.rs

1//! Per-document structural index for O(1) field access.
2//!
3//! When a query accesses multiple fields from the same document (e.g.,
4//! GROUP BY + aggregate + filter), building a `FieldIndex` once and
5//! reusing it for all field lookups avoids repeated O(N) key scanning.
6//!
7//! Uses a flat array with linear search for small docs (≤ 16 fields) to
8//! avoid HashMap allocation overhead. Falls back to HashMap for large docs.
9
10use crate::msgpack_scan::reader::{map_header, skip_value, str_bounds};
11
12/// Threshold: docs with more fields than this use HashMap, otherwise flat array.
13const HASH_THRESHOLD: usize = 16;
14
15/// Pre-computed field offset table for a single MessagePack document.
16pub struct FieldIndex {
17    inner: IndexInner,
18}
19
20enum IndexInner {
21    /// Flat array for small documents — linear scan, zero HashMap overhead.
22    Flat(Vec<(Box<str>, usize, usize)>),
23    /// HashMap for large documents — O(1) lookup.
24    Map(std::collections::HashMap<Box<str>, (usize, usize)>),
25}
26
27impl FieldIndex {
28    /// Build an index for the msgpack map at `offset` in `buf`.
29    ///
30    /// Scans all map keys once and records value byte ranges.
31    /// Returns `None` if `buf` is not a valid map at `offset`.
32    pub fn build(buf: &[u8], offset: usize) -> Option<Self> {
33        let (count, mut pos) = map_header(buf, offset)?;
34
35        if count <= HASH_THRESHOLD {
36            let mut entries = Vec::with_capacity(count);
37            for _ in 0..count {
38                let key_str = if let Some((start, len)) = str_bounds(buf, pos) {
39                    std::str::from_utf8(buf.get(start..start + len)?).ok()
40                } else {
41                    None
42                };
43                pos = skip_value(buf, pos)?;
44                let value_start = pos;
45                let value_end = skip_value(buf, pos)?;
46                if let Some(key) = key_str {
47                    entries.push((key.into(), value_start, value_end));
48                }
49                pos = value_end;
50            }
51            Some(Self {
52                inner: IndexInner::Flat(entries),
53            })
54        } else {
55            // Cap pre-allocation: adversarial buffers may claim enormous counts.
56            // Actual insertions are bounded by the buffer size, so over-allocating
57            // wastes memory and under-allocating just triggers rehashing.
58            let cap = count.min(buf.len() / 2 + 1);
59            let mut offsets = std::collections::HashMap::with_capacity(cap);
60            for _ in 0..count {
61                let key_str = if let Some((start, len)) = str_bounds(buf, pos) {
62                    std::str::from_utf8(buf.get(start..start + len)?).ok()
63                } else {
64                    None
65                };
66                pos = skip_value(buf, pos)?;
67                let value_start = pos;
68                let value_end = skip_value(buf, pos)?;
69                if let Some(key) = key_str {
70                    offsets.insert(key.into(), (value_start, value_end));
71                }
72                pos = value_end;
73            }
74            Some(Self {
75                inner: IndexInner::Map(offsets),
76            })
77        }
78    }
79
80    /// Create an empty index (no fields).
81    pub fn empty() -> Self {
82        Self {
83            inner: IndexInner::Flat(Vec::new()),
84        }
85    }
86
87    /// Look up a field's byte range.
88    #[inline]
89    pub fn get(&self, field: &str) -> Option<(usize, usize)> {
90        match &self.inner {
91            IndexInner::Flat(entries) => entries
92                .iter()
93                .find(|(k, _, _)| k.as_ref() == field)
94                .map(|(_, s, e)| (*s, *e)),
95            IndexInner::Map(map) => map.get(field).copied(),
96        }
97    }
98
99    /// Number of indexed fields.
100    #[inline]
101    pub fn len(&self) -> usize {
102        match &self.inner {
103            IndexInner::Flat(entries) => entries.len(),
104            IndexInner::Map(map) => map.len(),
105        }
106    }
107
108    /// Whether the index is empty.
109    #[inline]
110    pub fn is_empty(&self) -> bool {
111        self.len() == 0
112    }
113}
114
115#[cfg(test)]
116mod tests {
117    use super::*;
118    use crate::msgpack_scan::reader::{read_f64, read_i64, read_str};
119    use serde_json::json;
120
121    fn encode(v: &serde_json::Value) -> Vec<u8> {
122        nodedb_types::json_msgpack::json_to_msgpack(v).expect("encode")
123    }
124
125    #[test]
126    fn build_and_lookup() {
127        let buf = encode(&json!({"name": "alice", "age": 30, "score": 99.5}));
128        let idx = FieldIndex::build(&buf, 0).unwrap();
129
130        assert_eq!(idx.len(), 3);
131
132        let (s, _) = idx.get("name").unwrap();
133        assert_eq!(read_str(&buf, s), Some("alice"));
134
135        let (s, _) = idx.get("age").unwrap();
136        assert_eq!(read_i64(&buf, s), Some(30));
137
138        let (s, _) = idx.get("score").unwrap();
139        assert_eq!(read_f64(&buf, s), Some(99.5));
140    }
141
142    #[test]
143    fn missing_field() {
144        let buf = encode(&json!({"x": 1}));
145        let idx = FieldIndex::build(&buf, 0).unwrap();
146        assert!(idx.get("y").is_none());
147    }
148
149    #[test]
150    fn empty_map() {
151        let buf = encode(&json!({}));
152        let idx = FieldIndex::build(&buf, 0).unwrap();
153        assert!(idx.is_empty());
154    }
155
156    #[test]
157    fn small_doc_uses_flat() {
158        let mut map = serde_json::Map::new();
159        for i in 0..10 {
160            map.insert(format!("f{i}"), json!(i));
161        }
162        let buf = encode(&serde_json::Value::Object(map));
163        let idx = FieldIndex::build(&buf, 0).unwrap();
164        assert_eq!(idx.len(), 10);
165        assert!(matches!(idx.inner, IndexInner::Flat(_)));
166    }
167
168    #[test]
169    fn large_doc_uses_hashmap() {
170        let mut map = serde_json::Map::new();
171        for i in 0..20 {
172            map.insert(format!("field_{i}"), json!(i));
173        }
174        let buf = encode(&serde_json::Value::Object(map));
175        let idx = FieldIndex::build(&buf, 0).unwrap();
176        assert_eq!(idx.len(), 20);
177        assert!(matches!(idx.inner, IndexInner::Map(_)));
178
179        for i in 0..20 {
180            let (s, _) = idx.get(&format!("field_{i}")).unwrap();
181            assert_eq!(read_i64(&buf, s), Some(i));
182        }
183    }
184
185    #[test]
186    fn not_a_map() {
187        let buf = encode(&json!([1, 2, 3]));
188        assert!(FieldIndex::build(&buf, 0).is_none());
189    }
190
191    #[test]
192    fn indexed_vs_sequential_same_result() {
193        let buf = encode(&json!({"a": 1, "b": "two", "c": 3.0}));
194        let idx = FieldIndex::build(&buf, 0).unwrap();
195
196        for field in &["a", "b", "c"] {
197            let indexed = idx.get(field);
198            let sequential = crate::msgpack_scan::field::extract_field(&buf, 0, field);
199            assert_eq!(indexed, sequential, "mismatch for field {field}");
200        }
201    }
202
203    // ── Fuzz-style tests ───────────────────────────────────────────────────
204
205    /// Truncate valid msgpack at every byte position — FieldIndex::build must
206    /// never panic; it should return None on truncated input.
207    #[test]
208    fn fuzz_truncated_buffers() {
209        let docs = [
210            json!({"name": "alice", "age": 30, "score": 9.5}),
211            json!({"a": 1, "b": 2, "c": 3, "d": 4, "e": 5}),
212            json!({"nested": {"inner": 42}}),
213        ];
214
215        for doc in &docs {
216            let full = encode(doc);
217            for truncate_at in 0..full.len() {
218                let slice = &full[..truncate_at];
219                // Must not panic — None is the valid outcome for truncated data.
220                let _ = FieldIndex::build(slice, 0);
221            }
222        }
223    }
224
225    /// Deterministic random byte sequences — FieldIndex::build must never panic.
226    #[test]
227    fn fuzz_random_payloads() {
228        let mut state: u64 = 0xabad1dea_deadc0de;
229        let next = |s: &mut u64| -> u8 {
230            *s = s
231                .wrapping_mul(6364136223846793005)
232                .wrapping_add(1442695040888963407);
233            (*s >> 33) as u8
234        };
235
236        let mut buf = [0u8; 128];
237        for _ in 0..1000 {
238            let len = (next(&mut state) as usize % 128) + 1;
239            for b in buf[..len].iter_mut() {
240                *b = next(&mut state);
241            }
242            let slice = &buf[..len];
243            let idx = FieldIndex::build(slice, 0);
244            // If build succeeded, get() on the result must not panic either.
245            if let Some(ref idx) = idx {
246                let _ = idx.get("any_key");
247                let _ = idx.len();
248                let _ = idx.is_empty();
249            }
250        }
251    }
252
253    /// Adversarial map headers — FieldIndex::build must return None.
254    #[test]
255    fn fuzz_adversarial_map_count() {
256        // MAP32 claiming 0xffffffff pairs with empty body
257        let buf = [0xdfu8, 0xff, 0xff, 0xff, 0xff];
258        assert_eq!(FieldIndex::build(&buf, 0).map(|_| ()), None);
259
260        // MAP16 claiming 0xffff pairs with empty body
261        let buf = [0xdeu8, 0xff, 0xff];
262        assert_eq!(FieldIndex::build(&buf, 0).map(|_| ()), None);
263
264        // Fixmap claiming 15 pairs but only 1 byte
265        let buf = [0x8fu8];
266        assert_eq!(FieldIndex::build(&buf, 0).map(|_| ()), None);
267    }
268
269    /// Non-map inputs must return None from build.
270    #[test]
271    fn fuzz_non_map_inputs() {
272        let array_buf = encode(&json!([1, 2, 3]));
273        assert!(FieldIndex::build(&array_buf, 0).is_none());
274
275        let int_buf = encode(&json!(99));
276        assert!(FieldIndex::build(&int_buf, 0).is_none());
277
278        assert!(FieldIndex::build(&[], 0).is_none());
279
280        let nil_buf = [0xc0u8];
281        assert!(FieldIndex::build(&nil_buf, 0).is_none());
282    }
283
284    /// Out-of-bounds offset must return None.
285    #[test]
286    fn fuzz_out_of_bounds_offset() {
287        let buf = encode(&json!({"x": 1}));
288        assert!(FieldIndex::build(&buf, buf.len() + 100).is_none());
289    }
290
291    /// Threshold boundary: a 16-field doc should use Flat, 17-field should
292    /// use HashMap. Fuzz both paths with truncation.
293    #[test]
294    fn fuzz_flat_vs_hashmap_threshold_truncation() {
295        // 16 fields — uses Flat path
296        let mut map16 = serde_json::Map::new();
297        for i in 0..16 {
298            map16.insert(format!("f{i}"), json!(i));
299        }
300        let buf16 = encode(&serde_json::Value::Object(map16));
301        let idx16 = FieldIndex::build(&buf16, 0).unwrap();
302        assert!(matches!(idx16.inner, IndexInner::Flat(_)));
303        assert_eq!(idx16.len(), 16);
304
305        // Truncate the 16-field buffer
306        for t in 0..buf16.len() {
307            let _ = FieldIndex::build(&buf16[..t], 0);
308        }
309
310        // 17 fields — uses HashMap path
311        let mut map17 = serde_json::Map::new();
312        for i in 0..17 {
313            map17.insert(format!("g{i}"), json!(i));
314        }
315        let buf17 = encode(&serde_json::Value::Object(map17));
316        let idx17 = FieldIndex::build(&buf17, 0).unwrap();
317        assert!(matches!(idx17.inner, IndexInner::Map(_)));
318        assert_eq!(idx17.len(), 17);
319
320        // Truncate the 17-field buffer
321        for t in 0..buf17.len() {
322            let _ = FieldIndex::build(&buf17[..t], 0);
323        }
324    }
325
326    /// Build a valid index then look up every present and absent key.
327    #[test]
328    fn fuzz_lookup_all_present_and_absent_keys() {
329        let mut map = serde_json::Map::new();
330        for i in 0..20u64 {
331            map.insert(format!("key{i}"), json!(i));
332        }
333        let buf = encode(&serde_json::Value::Object(map));
334        let idx = FieldIndex::build(&buf, 0).unwrap();
335
336        for i in 0..20u64 {
337            let k = format!("key{i}");
338            let (start, _end) = idx.get(&k).unwrap();
339            assert_eq!(read_i64(&buf, start), Some(i as i64));
340        }
341
342        // These keys are absent
343        for absent in &["KEY0", "key20", "key-1", "", "key 0"] {
344            assert!(idx.get(absent).is_none(), "key '{absent}' should be absent");
345        }
346    }
347}