nodedb_query/msgpack_scan/
sidecar.rs1use crate::msgpack_scan::reader::{map_header, skip_value, str_bounds};
17
18const SIDECAR_MAGIC: u32 = 0x4E494458;
20const SIDECAR_MAGIC_LE: [u8; 4] = SIDECAR_MAGIC.to_le_bytes();
21
22const ENTRY_SIZE: usize = 16;
24
25const TRAILER_SIZE: usize = 6;
27
28#[derive(Debug, Clone, Copy)]
30pub struct SidecarEntry {
31 pub field_hash: u64,
32 pub value_offset: u32,
33 pub value_len: u32,
34}
35
36#[inline]
38fn fnv1a_hash(bytes: &[u8]) -> u64 {
39 let mut hash: u64 = 0xcbf29ce484222325;
40 for &b in bytes {
41 hash ^= b as u64;
42 hash = hash.wrapping_mul(0x100000001b3);
43 }
44 hash
45}
46
47pub fn has_sidecar(buf: &[u8]) -> bool {
49 if buf.len() < TRAILER_SIZE {
50 return false;
51 }
52 let magic_start = buf.len() - 4;
53 buf[magic_start..] == SIDECAR_MAGIC_LE
54}
55
56pub fn msgpack_bytes(buf: &[u8]) -> &[u8] {
60 if !has_sidecar(buf) {
61 return buf;
62 }
63 let Some(msgpack_len) = sidecar_msgpack_len(buf) else {
64 return buf;
65 };
66 &buf[..msgpack_len]
67}
68
69fn sidecar_msgpack_len(buf: &[u8]) -> Option<usize> {
71 if buf.len() < TRAILER_SIZE {
72 return None;
73 }
74 let entry_count = read_entry_count(buf)?;
75 let sidecar_size = (entry_count as usize) * ENTRY_SIZE + TRAILER_SIZE;
76 buf.len().checked_sub(sidecar_size)
77}
78
79fn read_entry_count(buf: &[u8]) -> Option<u16> {
81 if buf.len() < TRAILER_SIZE {
82 return None;
83 }
84 let count_start = buf.len() - TRAILER_SIZE;
85 let bytes = buf.get(count_start..count_start + 2)?;
86 Some(u16::from_le_bytes([bytes[0], bytes[1]]))
87}
88
89fn binary_search_entries(
94 buf: &[u8],
95 entries_start: usize,
96 count: usize,
97 target: u64,
98) -> Option<usize> {
99 let mut lo = 0usize;
100 let mut hi = count;
101 while lo < hi {
102 let mid = lo + (hi - lo) / 2;
103 let base = entries_start + mid * ENTRY_SIZE;
104 let hash_bytes: [u8; 8] = buf.get(base..base + 8)?.try_into().ok()?;
105 let hash = u64::from_le_bytes(hash_bytes);
106 match hash.cmp(&target) {
107 std::cmp::Ordering::Less => lo = mid + 1,
108 std::cmp::Ordering::Equal => return Some(mid),
109 std::cmp::Ordering::Greater => hi = mid,
110 }
111 }
112 None
113}
114
115fn verify_entry(msgpack: &[u8], value_offset: usize, value_len: usize, field: &str) -> bool {
119 match crate::msgpack_scan::field::extract_field(msgpack, 0, field) {
120 Some((start, end)) => start == value_offset && (end - start) == value_len,
121 None => false,
122 }
123}
124
125pub fn build_sidecar(msgpack: &[u8]) -> Option<Vec<u8>> {
131 let (count, mut pos) = map_header(msgpack, 0)?;
132 if count > u16::MAX as usize {
133 return None;
134 }
135
136 let mut entries: Vec<SidecarEntry> = Vec::with_capacity(count);
137
138 for _ in 0..count {
139 let (key_data_start, key_data_len) = str_bounds(msgpack, pos)?;
141 let key_bytes = msgpack.get(key_data_start..key_data_start + key_data_len)?;
142 let field_hash = fnv1a_hash(key_bytes);
143
144 pos = skip_value(msgpack, pos)?;
146 let value_start = pos;
147 let value_end = skip_value(msgpack, pos)?;
148
149 let value_len = value_end.checked_sub(value_start)?;
150 if value_start > u32::MAX as usize || value_len > u32::MAX as usize {
151 return None;
152 }
153
154 entries.push(SidecarEntry {
155 field_hash,
156 value_offset: value_start as u32,
157 value_len: value_len as u32,
158 });
159
160 pos = value_end;
161 }
162
163 entries.sort_unstable_by_key(|e| e.field_hash);
165
166 let entry_count = entries.len();
167 let total_len = msgpack.len() + entry_count * ENTRY_SIZE + TRAILER_SIZE;
168 let mut out = Vec::with_capacity(total_len);
169 out.extend_from_slice(msgpack);
170
171 for e in &entries {
172 out.extend_from_slice(&e.field_hash.to_le_bytes());
173 out.extend_from_slice(&e.value_offset.to_le_bytes());
174 out.extend_from_slice(&e.value_len.to_le_bytes());
175 }
176
177 out.extend_from_slice(&(entry_count as u16).to_le_bytes());
179 out.extend_from_slice(&SIDECAR_MAGIC_LE);
180
181 Some(out)
182}
183
184pub fn sidecar_lookup(buf: &[u8], field: &str) -> Option<(usize, usize)> {
191 if !has_sidecar(buf) {
192 return None;
193 }
194 let entry_count = read_entry_count(buf)? as usize;
195 if entry_count == 0 {
196 return None;
197 }
198
199 let msgpack_len = sidecar_msgpack_len(buf)?;
200 let target_hash = fnv1a_hash(field.as_bytes());
201 let entries_start = msgpack_len;
202 let msgpack = &buf[..msgpack_len];
203
204 let mid = binary_search_entries(buf, entries_start, entry_count, target_hash)?;
205
206 let first = {
208 let mut i = mid;
209 while i > 0 {
210 let base = entries_start + (i - 1) * ENTRY_SIZE;
211 let hash_bytes: [u8; 8] = buf.get(base..base + 8)?.try_into().ok()?;
212 if u64::from_le_bytes(hash_bytes) != target_hash {
213 break;
214 }
215 i -= 1;
216 }
217 i
218 };
219
220 let last = {
222 let mut i = mid + 1;
223 while i < entry_count {
224 let base = entries_start + i * ENTRY_SIZE;
225 let hash_bytes: [u8; 8] = buf.get(base..base + 8)?.try_into().ok()?;
226 if u64::from_le_bytes(hash_bytes) != target_hash {
227 break;
228 }
229 i += 1;
230 }
231 i
232 };
233
234 for i in first..last {
236 let base = entries_start + i * ENTRY_SIZE;
237 let offset_bytes: [u8; 4] = buf.get(base + 8..base + 12)?.try_into().ok()?;
238 let len_bytes: [u8; 4] = buf.get(base + 12..base + 16)?.try_into().ok()?;
239 let value_offset = u32::from_le_bytes(offset_bytes) as usize;
240 let value_len = u32::from_le_bytes(len_bytes) as usize;
241 let value_end = value_offset.checked_add(value_len)?;
242 if value_end > msgpack_len {
243 return None;
244 }
245 if verify_entry(msgpack, value_offset, value_len, field) {
246 return Some((value_offset, value_end));
247 }
248 }
249 None
250}
251
252pub struct SidecarFieldIndex<'a> {
258 entries: Vec<SidecarEntry>,
259 msgpack: &'a [u8],
261}
262
263impl<'a> SidecarFieldIndex<'a> {
264 pub fn get(&self, field: &str) -> Option<(usize, usize)> {
269 let hash = fnv1a_hash(field.as_bytes());
270 let count = self.entries.len();
271 if count == 0 {
272 return None;
273 }
274
275 let mid = self.entries.partition_point(|e| e.field_hash < hash);
277 if mid >= count || self.entries[mid].field_hash != hash {
278 return None;
279 }
280
281 let first = {
283 let mut i = mid;
284 while i > 0 && self.entries[i - 1].field_hash == hash {
285 i -= 1;
286 }
287 i
288 };
289
290 let last = {
292 let mut i = mid + 1;
293 while i < count && self.entries[i].field_hash == hash {
294 i += 1;
295 }
296 i
297 };
298
299 for i in first..last {
301 let e = &self.entries[i];
302 let value_offset = e.value_offset as usize;
303 let value_len = e.value_len as usize;
304 if verify_entry(self.msgpack, value_offset, value_len, field) {
305 let value_end = value_offset + value_len;
306 return Some((value_offset, value_end));
307 }
308 }
309 None
310 }
311
312 pub fn len(&self) -> usize {
314 self.entries.len()
315 }
316
317 pub fn is_empty(&self) -> bool {
319 self.entries.is_empty()
320 }
321
322 pub fn msgpack_len(&self) -> usize {
324 self.msgpack.len()
325 }
326
327 pub fn msgpack_bytes(&self) -> &[u8] {
329 self.msgpack
330 }
331}
332
333pub fn field_index_from_sidecar(buf: &[u8]) -> Option<SidecarFieldIndex<'_>> {
337 if !has_sidecar(buf) {
338 return None;
339 }
340 let entry_count = read_entry_count(buf)? as usize;
341 let msgpack_len = sidecar_msgpack_len(buf)?;
342 let entries_start = msgpack_len;
343
344 let mut entries = Vec::with_capacity(entry_count);
345 for i in 0..entry_count {
346 let base = entries_start + i * ENTRY_SIZE;
347 let hash_bytes: [u8; 8] = buf.get(base..base + 8)?.try_into().ok()?;
348 let offset_bytes: [u8; 4] = buf.get(base + 8..base + 12)?.try_into().ok()?;
349 let len_bytes: [u8; 4] = buf.get(base + 12..base + 16)?.try_into().ok()?;
350 entries.push(SidecarEntry {
351 field_hash: u64::from_le_bytes(hash_bytes),
352 value_offset: u32::from_le_bytes(offset_bytes),
353 value_len: u32::from_le_bytes(len_bytes),
354 });
355 }
356
357 entries.sort_unstable_by_key(|e| e.field_hash);
360
361 let msgpack = &buf[..msgpack_len];
362 Some(SidecarFieldIndex { entries, msgpack })
363}
364
365#[cfg(test)]
368mod tests {
369 use super::*;
370 use crate::msgpack_scan::reader::{read_f64, read_i64, read_str};
371 use serde_json::json;
372
373 fn encode(v: &serde_json::Value) -> Vec<u8> {
374 nodedb_types::json_msgpack::json_to_msgpack(v).expect("encode")
375 }
376
377 #[test]
380 fn roundtrip_simple() {
381 let raw = encode(&json!({"name": "alice", "age": 30, "score": 99.5}));
382 let indexed = build_sidecar(&raw).expect("build_sidecar");
383
384 assert!(has_sidecar(&indexed));
385 assert_eq!(msgpack_bytes(&indexed), raw.as_slice());
386
387 let (s, _) = sidecar_lookup(&indexed, "name").expect("name");
388 assert_eq!(read_str(&indexed, s), Some("alice"));
389
390 let (s, _) = sidecar_lookup(&indexed, "age").expect("age");
391 assert_eq!(read_i64(&indexed, s), Some(30));
392
393 let (s, _) = sidecar_lookup(&indexed, "score").expect("score");
394 assert_eq!(read_f64(&indexed, s), Some(99.5));
395 }
396
397 #[test]
398 fn missing_field_returns_none() {
399 let raw = encode(&json!({"x": 1}));
400 let indexed = build_sidecar(&raw).unwrap();
401 assert!(sidecar_lookup(&indexed, "y").is_none());
402 }
403
404 #[test]
407 fn empty_map_has_sidecar() {
408 let raw = encode(&json!({}));
409 let indexed = build_sidecar(&raw).expect("empty map");
410 assert!(has_sidecar(&indexed));
411 assert_eq!(msgpack_bytes(&indexed), raw.as_slice());
412 assert!(sidecar_lookup(&indexed, "anything").is_none());
413 }
414
415 #[test]
418 fn raw_msgpack_has_no_sidecar() {
419 let raw = encode(&json!({"k": "v"}));
420 assert!(!has_sidecar(&raw));
421 assert_eq!(msgpack_bytes(&raw), raw.as_slice());
422 assert!(sidecar_lookup(&raw, "k").is_none());
423 }
424
425 #[test]
428 fn field_index_from_sidecar_basic() {
429 let raw = encode(&json!({"a": 1, "b": "hello"}));
430 let indexed = build_sidecar(&raw).unwrap();
431 let idx = field_index_from_sidecar(&indexed).expect("parse sidecar");
432
433 assert_eq!(idx.len(), 2);
434 assert!(!idx.is_empty());
435 assert_eq!(idx.msgpack_len(), raw.len());
436
437 let (s, _) = idx.get("a").expect("a");
438 assert_eq!(read_i64(&indexed, s), Some(1));
439
440 let (s, _) = idx.get("b").expect("b");
441 assert_eq!(read_str(&indexed, s), Some("hello"));
442
443 assert!(idx.get("missing").is_none());
444 }
445
446 #[test]
447 fn field_index_from_sidecar_none_on_raw() {
448 let raw = encode(&json!({"x": 42}));
449 assert!(field_index_from_sidecar(&raw).is_none());
450 }
451
452 #[test]
455 fn sidecar_matches_field_index() {
456 use crate::msgpack_scan::index::FieldIndex;
457
458 let raw = encode(&json!({"alpha": 100, "beta": "foo", "gamma": 3.125}));
459 let indexed = build_sidecar(&raw).unwrap();
460 let fi = FieldIndex::build(&raw, 0).unwrap();
461
462 for field in &["alpha", "beta", "gamma", "missing"] {
463 let sidecar_result = sidecar_lookup(&indexed, field);
464 let fi_result = fi.get(field);
465 assert_eq!(sidecar_result, fi_result, "mismatch for field '{field}'");
466 }
467 }
468
469 #[test]
472 fn empty_slice_has_no_sidecar() {
473 assert!(!has_sidecar(&[]));
474 assert_eq!(msgpack_bytes(&[]), &[] as &[u8]);
475 }
476
477 #[test]
478 fn short_slice_has_no_sidecar() {
479 assert!(!has_sidecar(&[0x01, 0x02, 0x03]));
480 }
481
482 #[test]
483 fn sidecar_is_transparent_to_msgpack_bytes() {
484 let raw = encode(&json!({"z": 99}));
486 assert_eq!(msgpack_bytes(&raw), raw.as_slice());
487
488 let indexed = build_sidecar(&raw).unwrap();
490 assert_eq!(msgpack_bytes(&indexed), raw.as_slice());
491 }
492
493 #[test]
494 fn corrupted_magic_detected_as_no_sidecar() {
495 let raw = encode(&json!({"x": 1}));
496 let mut indexed = build_sidecar(&raw).unwrap();
497 *indexed.last_mut().unwrap() ^= 0xff;
499 assert!(!has_sidecar(&indexed));
500 }
501
502 #[test]
503 fn many_fields_roundtrip() {
504 let mut map = serde_json::Map::new();
505 for i in 0u64..50 {
506 map.insert(format!("field_{i}"), json!(i * 10));
507 }
508 let raw = encode(&serde_json::Value::Object(map));
509 let indexed = build_sidecar(&raw).unwrap();
510
511 assert!(has_sidecar(&indexed));
512 assert_eq!(msgpack_bytes(&indexed), raw.as_slice());
513
514 for i in 0u64..50 {
515 let key = format!("field_{i}");
516 let (s, _) =
517 sidecar_lookup(&indexed, &key).unwrap_or_else(|| panic!("missing key {key}"));
518 assert_eq!(read_i64(&indexed, s), Some((i * 10) as i64));
519 }
520 }
521
522 #[test]
525 fn build_is_deterministic() {
526 let raw = encode(&json!({"p": 1, "q": 2, "r": 3}));
527 let a = build_sidecar(&raw).unwrap();
528 let b = build_sidecar(&raw).unwrap();
529 assert_eq!(a, b);
530 }
531
532 fn test_fnv1a(s: &str) -> u64 {
537 let mut hash: u64 = 0xcbf29ce484222325;
538 for &b in s.as_bytes() {
539 hash ^= b as u64;
540 hash = hash.wrapping_mul(0x100000001b3);
541 }
542 hash
543 }
544
545 #[test]
553 fn hash_collision_resolved_correctly() {
554 use crate::msgpack_scan::field::extract_field;
555
556 let raw = encode(&json!({"field_a": 111, "field_b": 222}));
558
559 let (a_start, a_end) = extract_field(&raw, 0, "field_a").expect("field_a range");
561 let (b_start, b_end) = extract_field(&raw, 0, "field_b").expect("field_b range");
562
563 let hash_a = test_fnv1a("field_a");
567 let hash_b = test_fnv1a("field_b");
568
569 let collision_hash = hash_a;
571
572 let mut buf = raw.clone();
576
577 buf.extend_from_slice(&collision_hash.to_le_bytes());
579 buf.extend_from_slice(&(a_start as u32).to_le_bytes());
580 buf.extend_from_slice(&((a_end - a_start) as u32).to_le_bytes());
581
582 buf.extend_from_slice(&collision_hash.to_le_bytes());
584 buf.extend_from_slice(&(b_start as u32).to_le_bytes());
585 buf.extend_from_slice(&((b_end - b_start) as u32).to_le_bytes());
586
587 buf.extend_from_slice(&2u16.to_le_bytes());
589 buf.extend_from_slice(&SIDECAR_MAGIC_LE);
590
591 assert!(has_sidecar(&buf), "constructed buffer must have sidecar");
592
593 let (la_start, la_end) = sidecar_lookup(&buf, "field_a").expect("field_a must be found");
596 assert_eq!(
597 (la_start, la_end),
598 (a_start, a_end),
599 "field_a range mismatch"
600 );
601
602 assert_ne!(
610 sidecar_lookup(&buf, "field_a"),
611 Some((b_start, b_end)),
612 "field_a lookup must not return field_b's range"
613 );
614
615 let mut buf2 = raw.clone();
618 buf2.extend_from_slice(&hash_b.to_le_bytes());
620 buf2.extend_from_slice(&(a_start as u32).to_le_bytes());
621 buf2.extend_from_slice(&((a_end - a_start) as u32).to_le_bytes());
622 buf2.extend_from_slice(&hash_b.to_le_bytes());
624 buf2.extend_from_slice(&(b_start as u32).to_le_bytes());
625 buf2.extend_from_slice(&((b_end - b_start) as u32).to_le_bytes());
626 buf2.extend_from_slice(&2u16.to_le_bytes());
627 buf2.extend_from_slice(&SIDECAR_MAGIC_LE);
628
629 assert!(has_sidecar(&buf2));
630
631 let (lb_start, lb_end) = sidecar_lookup(&buf2, "field_b")
635 .expect("field_b must be found despite collision at its hash");
636 assert_eq!(
637 (lb_start, lb_end),
638 (b_start, b_end),
639 "field_b range mismatch"
640 );
641 assert_ne!(
642 Some((lb_start, lb_end)),
643 Some((a_start, a_end)),
644 "must not return field_a's range for field_b"
645 );
646
647 let idx = field_index_from_sidecar(&buf2).expect("parse collision sidecar");
649 let (ib_start, ib_end) = idx.get("field_b").expect("idx field_b");
650 assert_eq!((ib_start, ib_end), (b_start, b_end));
651 assert_ne!((ib_start, ib_end), (a_start, a_end));
652 }
653}