Skip to main content

nodedb_query/msgpack_scan/
index.rs

1// SPDX-License-Identifier: Apache-2.0
2
3//! Per-document structural index for O(1) field access.
4//!
5//! When a query accesses multiple fields from the same document (e.g.,
6//! GROUP BY + aggregate + filter), building a `FieldIndex` once and
7//! reusing it for all field lookups avoids repeated O(N) key scanning.
8//!
9//! Uses a flat array with linear search for small docs (≤ 16 fields) to
10//! avoid HashMap allocation overhead. Falls back to HashMap for large docs.
11
12use crate::msgpack_scan::reader::{map_header, skip_value, str_bounds};
13
14/// Threshold: docs with more fields than this use HashMap, otherwise flat array.
15const HASH_THRESHOLD: usize = 16;
16
17/// Pre-computed field offset table for a single MessagePack document.
18pub struct FieldIndex {
19    inner: IndexInner,
20}
21
22enum IndexInner {
23    /// Flat array for small documents — linear scan, zero HashMap overhead.
24    Flat(Vec<(Box<str>, usize, usize)>),
25    /// HashMap for large documents — O(1) lookup.
26    Map(std::collections::HashMap<Box<str>, (usize, usize)>),
27}
28
29impl FieldIndex {
30    /// Build an index for the msgpack map at `offset` in `buf`.
31    ///
32    /// Scans all map keys once and records value byte ranges.
33    /// Returns `None` if `buf` is not a valid map at `offset`.
34    pub fn build(buf: &[u8], offset: usize) -> Option<Self> {
35        let (count, mut pos) = map_header(buf, offset)?;
36
37        if count <= HASH_THRESHOLD {
38            let mut entries = Vec::with_capacity(count);
39            for _ in 0..count {
40                let key_str = if let Some((start, len)) = str_bounds(buf, pos) {
41                    std::str::from_utf8(buf.get(start..start + len)?).ok()
42                } else {
43                    None
44                };
45                pos = skip_value(buf, pos)?;
46                let value_start = pos;
47                let value_end = skip_value(buf, pos)?;
48                if let Some(key) = key_str {
49                    entries.push((key.into(), value_start, value_end));
50                }
51                pos = value_end;
52            }
53            Some(Self {
54                inner: IndexInner::Flat(entries),
55            })
56        } else {
57            // Cap pre-allocation: adversarial buffers may claim enormous counts.
58            // Actual insertions are bounded by the buffer size, so over-allocating
59            // wastes memory and under-allocating just triggers rehashing.
60            let cap = count.min(buf.len() / 2 + 1);
61            let mut offsets = std::collections::HashMap::with_capacity(cap);
62            for _ in 0..count {
63                let key_str = if let Some((start, len)) = str_bounds(buf, pos) {
64                    std::str::from_utf8(buf.get(start..start + len)?).ok()
65                } else {
66                    None
67                };
68                pos = skip_value(buf, pos)?;
69                let value_start = pos;
70                let value_end = skip_value(buf, pos)?;
71                if let Some(key) = key_str {
72                    offsets.insert(key.into(), (value_start, value_end));
73                }
74                pos = value_end;
75            }
76            Some(Self {
77                inner: IndexInner::Map(offsets),
78            })
79        }
80    }
81
82    /// Create an empty index (no fields).
83    pub fn empty() -> Self {
84        Self {
85            inner: IndexInner::Flat(Vec::new()),
86        }
87    }
88
89    /// Look up a field's byte range.
90    #[inline]
91    pub fn get(&self, field: &str) -> Option<(usize, usize)> {
92        match &self.inner {
93            IndexInner::Flat(entries) => entries
94                .iter()
95                .find(|(k, _, _)| k.as_ref() == field)
96                .map(|(_, s, e)| (*s, *e)),
97            IndexInner::Map(map) => map.get(field).copied(),
98        }
99    }
100
101    /// Number of indexed fields.
102    #[inline]
103    pub fn len(&self) -> usize {
104        match &self.inner {
105            IndexInner::Flat(entries) => entries.len(),
106            IndexInner::Map(map) => map.len(),
107        }
108    }
109
110    /// Whether the index is empty.
111    #[inline]
112    pub fn is_empty(&self) -> bool {
113        self.len() == 0
114    }
115}
116
117#[cfg(test)]
118mod tests {
119    use super::*;
120    use crate::msgpack_scan::reader::{read_f64, read_i64, read_str};
121    use serde_json::json;
122
123    fn encode(v: &serde_json::Value) -> Vec<u8> {
124        nodedb_types::json_msgpack::json_to_msgpack(v).expect("encode")
125    }
126
127    #[test]
128    fn build_and_lookup() {
129        let buf = encode(&json!({"name": "alice", "age": 30, "score": 99.5}));
130        let idx = FieldIndex::build(&buf, 0).unwrap();
131
132        assert_eq!(idx.len(), 3);
133
134        let (s, _) = idx.get("name").unwrap();
135        assert_eq!(read_str(&buf, s), Some("alice"));
136
137        let (s, _) = idx.get("age").unwrap();
138        assert_eq!(read_i64(&buf, s), Some(30));
139
140        let (s, _) = idx.get("score").unwrap();
141        assert_eq!(read_f64(&buf, s), Some(99.5));
142    }
143
144    #[test]
145    fn missing_field() {
146        let buf = encode(&json!({"x": 1}));
147        let idx = FieldIndex::build(&buf, 0).unwrap();
148        assert!(idx.get("y").is_none());
149    }
150
151    #[test]
152    fn empty_map() {
153        let buf = encode(&json!({}));
154        let idx = FieldIndex::build(&buf, 0).unwrap();
155        assert!(idx.is_empty());
156    }
157
158    #[test]
159    fn small_doc_uses_flat() {
160        let mut map = serde_json::Map::new();
161        for i in 0..10 {
162            map.insert(format!("f{i}"), json!(i));
163        }
164        let buf = encode(&serde_json::Value::Object(map));
165        let idx = FieldIndex::build(&buf, 0).unwrap();
166        assert_eq!(idx.len(), 10);
167        assert!(matches!(idx.inner, IndexInner::Flat(_)));
168    }
169
170    #[test]
171    fn large_doc_uses_hashmap() {
172        let mut map = serde_json::Map::new();
173        for i in 0..20 {
174            map.insert(format!("field_{i}"), json!(i));
175        }
176        let buf = encode(&serde_json::Value::Object(map));
177        let idx = FieldIndex::build(&buf, 0).unwrap();
178        assert_eq!(idx.len(), 20);
179        assert!(matches!(idx.inner, IndexInner::Map(_)));
180
181        for i in 0..20 {
182            let (s, _) = idx.get(&format!("field_{i}")).unwrap();
183            assert_eq!(read_i64(&buf, s), Some(i));
184        }
185    }
186
187    #[test]
188    fn not_a_map() {
189        let buf = encode(&json!([1, 2, 3]));
190        assert!(FieldIndex::build(&buf, 0).is_none());
191    }
192
193    #[test]
194    fn indexed_vs_sequential_same_result() {
195        let buf = encode(&json!({"a": 1, "b": "two", "c": 3.0}));
196        let idx = FieldIndex::build(&buf, 0).unwrap();
197
198        for field in &["a", "b", "c"] {
199            let indexed = idx.get(field);
200            let sequential = crate::msgpack_scan::field::extract_field(&buf, 0, field);
201            assert_eq!(indexed, sequential, "mismatch for field {field}");
202        }
203    }
204
205    // ── Fuzz-style tests ───────────────────────────────────────────────────
206
207    /// Truncate valid msgpack at every byte position — FieldIndex::build must
208    /// never panic; it should return None on truncated input.
209    #[test]
210    fn fuzz_truncated_buffers() {
211        let docs = [
212            json!({"name": "alice", "age": 30, "score": 9.5}),
213            json!({"a": 1, "b": 2, "c": 3, "d": 4, "e": 5}),
214            json!({"nested": {"inner": 42}}),
215        ];
216
217        for doc in &docs {
218            let full = encode(doc);
219            for truncate_at in 0..full.len() {
220                let slice = &full[..truncate_at];
221                // Must not panic — None is the valid outcome for truncated data.
222                let _ = FieldIndex::build(slice, 0);
223            }
224        }
225    }
226
227    /// Deterministic random byte sequences — FieldIndex::build must never panic.
228    #[test]
229    fn fuzz_random_payloads() {
230        let mut state: u64 = 0xabad1dea_deadc0de;
231        let next = |s: &mut u64| -> u8 {
232            *s = s
233                .wrapping_mul(6364136223846793005)
234                .wrapping_add(1442695040888963407);
235            (*s >> 33) as u8
236        };
237
238        let mut buf = [0u8; 128];
239        for _ in 0..1000 {
240            let len = (next(&mut state) as usize % 128) + 1;
241            for b in buf[..len].iter_mut() {
242                *b = next(&mut state);
243            }
244            let slice = &buf[..len];
245            let idx = FieldIndex::build(slice, 0);
246            // If build succeeded, get() on the result must not panic either.
247            if let Some(ref idx) = idx {
248                let _ = idx.get("any_key");
249                let _ = idx.len();
250                let _ = idx.is_empty();
251            }
252        }
253    }
254
255    /// Adversarial map headers — FieldIndex::build must return None.
256    #[test]
257    fn fuzz_adversarial_map_count() {
258        // MAP32 claiming 0xffffffff pairs with empty body
259        let buf = [0xdfu8, 0xff, 0xff, 0xff, 0xff];
260        assert_eq!(FieldIndex::build(&buf, 0).map(|_| ()), None);
261
262        // MAP16 claiming 0xffff pairs with empty body
263        let buf = [0xdeu8, 0xff, 0xff];
264        assert_eq!(FieldIndex::build(&buf, 0).map(|_| ()), None);
265
266        // Fixmap claiming 15 pairs but only 1 byte
267        let buf = [0x8fu8];
268        assert_eq!(FieldIndex::build(&buf, 0).map(|_| ()), None);
269    }
270
271    /// Non-map inputs must return None from build.
272    #[test]
273    fn fuzz_non_map_inputs() {
274        let array_buf = encode(&json!([1, 2, 3]));
275        assert!(FieldIndex::build(&array_buf, 0).is_none());
276
277        let int_buf = encode(&json!(99));
278        assert!(FieldIndex::build(&int_buf, 0).is_none());
279
280        assert!(FieldIndex::build(&[], 0).is_none());
281
282        let nil_buf = [0xc0u8];
283        assert!(FieldIndex::build(&nil_buf, 0).is_none());
284    }
285
286    /// Out-of-bounds offset must return None.
287    #[test]
288    fn fuzz_out_of_bounds_offset() {
289        let buf = encode(&json!({"x": 1}));
290        assert!(FieldIndex::build(&buf, buf.len() + 100).is_none());
291    }
292
293    /// Threshold boundary: a 16-field doc should use Flat, 17-field should
294    /// use HashMap. Fuzz both paths with truncation.
295    #[test]
296    fn fuzz_flat_vs_hashmap_threshold_truncation() {
297        // 16 fields — uses Flat path
298        let mut map16 = serde_json::Map::new();
299        for i in 0..16 {
300            map16.insert(format!("f{i}"), json!(i));
301        }
302        let buf16 = encode(&serde_json::Value::Object(map16));
303        let idx16 = FieldIndex::build(&buf16, 0).unwrap();
304        assert!(matches!(idx16.inner, IndexInner::Flat(_)));
305        assert_eq!(idx16.len(), 16);
306
307        // Truncate the 16-field buffer
308        for t in 0..buf16.len() {
309            let _ = FieldIndex::build(&buf16[..t], 0);
310        }
311
312        // 17 fields — uses HashMap path
313        let mut map17 = serde_json::Map::new();
314        for i in 0..17 {
315            map17.insert(format!("g{i}"), json!(i));
316        }
317        let buf17 = encode(&serde_json::Value::Object(map17));
318        let idx17 = FieldIndex::build(&buf17, 0).unwrap();
319        assert!(matches!(idx17.inner, IndexInner::Map(_)));
320        assert_eq!(idx17.len(), 17);
321
322        // Truncate the 17-field buffer
323        for t in 0..buf17.len() {
324            let _ = FieldIndex::build(&buf17[..t], 0);
325        }
326    }
327
328    /// Build a valid index then look up every present and absent key.
329    #[test]
330    fn fuzz_lookup_all_present_and_absent_keys() {
331        let mut map = serde_json::Map::new();
332        for i in 0..20u64 {
333            map.insert(format!("key{i}"), json!(i));
334        }
335        let buf = encode(&serde_json::Value::Object(map));
336        let idx = FieldIndex::build(&buf, 0).unwrap();
337
338        for i in 0..20u64 {
339            let k = format!("key{i}");
340            let (start, _end) = idx.get(&k).unwrap();
341            assert_eq!(read_i64(&buf, start), Some(i as i64));
342        }
343
344        // These keys are absent
345        for absent in &["KEY0", "key20", "key-1", "", "key 0"] {
346            assert!(idx.get(absent).is_none(), "key '{absent}' should be absent");
347        }
348    }
349}