nodedb_query/msgpack_scan/
sidecar.rs1use crate::msgpack_scan::reader::{map_header, skip_value, str_bounds};
19
20const SIDECAR_MAGIC: u32 = 0x4E494458;
22const SIDECAR_MAGIC_LE: [u8; 4] = SIDECAR_MAGIC.to_le_bytes();
23
24const ENTRY_SIZE: usize = 16;
26
27const TRAILER_SIZE: usize = 6;
29
30#[derive(Debug, Clone, Copy)]
32pub struct SidecarEntry {
33 pub field_hash: u64,
34 pub value_offset: u32,
35 pub value_len: u32,
36}
37
38#[inline]
40fn fnv1a_hash(bytes: &[u8]) -> u64 {
41 let mut hash: u64 = 0xcbf29ce484222325;
42 for &b in bytes {
43 hash ^= b as u64;
44 hash = hash.wrapping_mul(0x100000001b3);
45 }
46 hash
47}
48
49pub fn has_sidecar(buf: &[u8]) -> bool {
51 if buf.len() < TRAILER_SIZE {
52 return false;
53 }
54 let magic_start = buf.len() - 4;
55 buf[magic_start..] == SIDECAR_MAGIC_LE
56}
57
58pub fn msgpack_bytes(buf: &[u8]) -> &[u8] {
62 if !has_sidecar(buf) {
63 return buf;
64 }
65 let Some(msgpack_len) = sidecar_msgpack_len(buf) else {
66 return buf;
67 };
68 &buf[..msgpack_len]
69}
70
71fn sidecar_msgpack_len(buf: &[u8]) -> Option<usize> {
73 if buf.len() < TRAILER_SIZE {
74 return None;
75 }
76 let entry_count = read_entry_count(buf)?;
77 let sidecar_size = (entry_count as usize) * ENTRY_SIZE + TRAILER_SIZE;
78 buf.len().checked_sub(sidecar_size)
79}
80
81fn read_entry_count(buf: &[u8]) -> Option<u16> {
83 if buf.len() < TRAILER_SIZE {
84 return None;
85 }
86 let count_start = buf.len() - TRAILER_SIZE;
87 let bytes = buf.get(count_start..count_start + 2)?;
88 Some(u16::from_le_bytes([bytes[0], bytes[1]]))
89}
90
91fn binary_search_entries(
96 buf: &[u8],
97 entries_start: usize,
98 count: usize,
99 target: u64,
100) -> Option<usize> {
101 let mut lo = 0usize;
102 let mut hi = count;
103 while lo < hi {
104 let mid = lo + (hi - lo) / 2;
105 let base = entries_start + mid * ENTRY_SIZE;
106 let hash_bytes: [u8; 8] = buf.get(base..base + 8)?.try_into().ok()?;
107 let hash = u64::from_le_bytes(hash_bytes);
108 match hash.cmp(&target) {
109 std::cmp::Ordering::Less => lo = mid + 1,
110 std::cmp::Ordering::Equal => return Some(mid),
111 std::cmp::Ordering::Greater => hi = mid,
112 }
113 }
114 None
115}
116
117fn verify_entry(msgpack: &[u8], value_offset: usize, value_len: usize, field: &str) -> bool {
121 match crate::msgpack_scan::field::extract_field(msgpack, 0, field) {
122 Some((start, end)) => start == value_offset && (end - start) == value_len,
123 None => false,
124 }
125}
126
127pub fn build_sidecar(msgpack: &[u8]) -> Option<Vec<u8>> {
133 let (count, mut pos) = map_header(msgpack, 0)?;
134 if count > u16::MAX as usize {
135 return None;
136 }
137
138 let mut entries: Vec<SidecarEntry> = Vec::with_capacity(count);
139
140 for _ in 0..count {
141 let (key_data_start, key_data_len) = str_bounds(msgpack, pos)?;
143 let key_bytes = msgpack.get(key_data_start..key_data_start + key_data_len)?;
144 let field_hash = fnv1a_hash(key_bytes);
145
146 pos = skip_value(msgpack, pos)?;
148 let value_start = pos;
149 let value_end = skip_value(msgpack, pos)?;
150
151 let value_len = value_end.checked_sub(value_start)?;
152 if value_start > u32::MAX as usize || value_len > u32::MAX as usize {
153 return None;
154 }
155
156 entries.push(SidecarEntry {
157 field_hash,
158 value_offset: value_start as u32,
159 value_len: value_len as u32,
160 });
161
162 pos = value_end;
163 }
164
165 entries.sort_unstable_by_key(|e| e.field_hash);
167
168 let entry_count = entries.len();
169 let total_len = msgpack.len() + entry_count * ENTRY_SIZE + TRAILER_SIZE;
170 let mut out = Vec::with_capacity(total_len);
171 out.extend_from_slice(msgpack);
172
173 for e in &entries {
174 out.extend_from_slice(&e.field_hash.to_le_bytes());
175 out.extend_from_slice(&e.value_offset.to_le_bytes());
176 out.extend_from_slice(&e.value_len.to_le_bytes());
177 }
178
179 out.extend_from_slice(&(entry_count as u16).to_le_bytes());
181 out.extend_from_slice(&SIDECAR_MAGIC_LE);
182
183 Some(out)
184}
185
186pub fn sidecar_lookup(buf: &[u8], field: &str) -> Option<(usize, usize)> {
193 if !has_sidecar(buf) {
194 return None;
195 }
196 let entry_count = read_entry_count(buf)? as usize;
197 if entry_count == 0 {
198 return None;
199 }
200
201 let msgpack_len = sidecar_msgpack_len(buf)?;
202 let target_hash = fnv1a_hash(field.as_bytes());
203 let entries_start = msgpack_len;
204 let msgpack = &buf[..msgpack_len];
205
206 let mid = binary_search_entries(buf, entries_start, entry_count, target_hash)?;
207
208 let first = {
210 let mut i = mid;
211 while i > 0 {
212 let base = entries_start + (i - 1) * ENTRY_SIZE;
213 let hash_bytes: [u8; 8] = buf.get(base..base + 8)?.try_into().ok()?;
214 if u64::from_le_bytes(hash_bytes) != target_hash {
215 break;
216 }
217 i -= 1;
218 }
219 i
220 };
221
222 let last = {
224 let mut i = mid + 1;
225 while i < entry_count {
226 let base = entries_start + i * ENTRY_SIZE;
227 let hash_bytes: [u8; 8] = buf.get(base..base + 8)?.try_into().ok()?;
228 if u64::from_le_bytes(hash_bytes) != target_hash {
229 break;
230 }
231 i += 1;
232 }
233 i
234 };
235
236 for i in first..last {
238 let base = entries_start + i * ENTRY_SIZE;
239 let offset_bytes: [u8; 4] = buf.get(base + 8..base + 12)?.try_into().ok()?;
240 let len_bytes: [u8; 4] = buf.get(base + 12..base + 16)?.try_into().ok()?;
241 let value_offset = u32::from_le_bytes(offset_bytes) as usize;
242 let value_len = u32::from_le_bytes(len_bytes) as usize;
243 let value_end = value_offset.checked_add(value_len)?;
244 if value_end > msgpack_len {
245 return None;
246 }
247 if verify_entry(msgpack, value_offset, value_len, field) {
248 return Some((value_offset, value_end));
249 }
250 }
251 None
252}
253
254pub struct SidecarFieldIndex<'a> {
260 entries: Vec<SidecarEntry>,
261 msgpack: &'a [u8],
263}
264
265impl<'a> SidecarFieldIndex<'a> {
266 pub fn get(&self, field: &str) -> Option<(usize, usize)> {
271 let hash = fnv1a_hash(field.as_bytes());
272 let count = self.entries.len();
273 if count == 0 {
274 return None;
275 }
276
277 let mid = self.entries.partition_point(|e| e.field_hash < hash);
279 if mid >= count || self.entries[mid].field_hash != hash {
280 return None;
281 }
282
283 let first = {
285 let mut i = mid;
286 while i > 0 && self.entries[i - 1].field_hash == hash {
287 i -= 1;
288 }
289 i
290 };
291
292 let last = {
294 let mut i = mid + 1;
295 while i < count && self.entries[i].field_hash == hash {
296 i += 1;
297 }
298 i
299 };
300
301 for i in first..last {
303 let e = &self.entries[i];
304 let value_offset = e.value_offset as usize;
305 let value_len = e.value_len as usize;
306 if verify_entry(self.msgpack, value_offset, value_len, field) {
307 let value_end = value_offset + value_len;
308 return Some((value_offset, value_end));
309 }
310 }
311 None
312 }
313
314 pub fn len(&self) -> usize {
316 self.entries.len()
317 }
318
319 pub fn is_empty(&self) -> bool {
321 self.entries.is_empty()
322 }
323
324 pub fn msgpack_len(&self) -> usize {
326 self.msgpack.len()
327 }
328
329 pub fn msgpack_bytes(&self) -> &[u8] {
331 self.msgpack
332 }
333}
334
335pub fn field_index_from_sidecar(buf: &[u8]) -> Option<SidecarFieldIndex<'_>> {
339 if !has_sidecar(buf) {
340 return None;
341 }
342 let entry_count = read_entry_count(buf)? as usize;
343 let msgpack_len = sidecar_msgpack_len(buf)?;
344 let entries_start = msgpack_len;
345
346 let mut entries = Vec::with_capacity(entry_count);
347 for i in 0..entry_count {
348 let base = entries_start + i * ENTRY_SIZE;
349 let hash_bytes: [u8; 8] = buf.get(base..base + 8)?.try_into().ok()?;
350 let offset_bytes: [u8; 4] = buf.get(base + 8..base + 12)?.try_into().ok()?;
351 let len_bytes: [u8; 4] = buf.get(base + 12..base + 16)?.try_into().ok()?;
352 entries.push(SidecarEntry {
353 field_hash: u64::from_le_bytes(hash_bytes),
354 value_offset: u32::from_le_bytes(offset_bytes),
355 value_len: u32::from_le_bytes(len_bytes),
356 });
357 }
358
359 entries.sort_unstable_by_key(|e| e.field_hash);
362
363 let msgpack = &buf[..msgpack_len];
364 Some(SidecarFieldIndex { entries, msgpack })
365}
366
367#[cfg(test)]
370mod tests {
371 use super::*;
372 use crate::msgpack_scan::reader::{read_f64, read_i64, read_str};
373 use serde_json::json;
374
375 fn encode(v: &serde_json::Value) -> Vec<u8> {
376 nodedb_types::json_msgpack::json_to_msgpack(v).expect("encode")
377 }
378
379 #[test]
382 fn roundtrip_simple() {
383 let raw = encode(&json!({"name": "alice", "age": 30, "score": 99.5}));
384 let indexed = build_sidecar(&raw).expect("build_sidecar");
385
386 assert!(has_sidecar(&indexed));
387 assert_eq!(msgpack_bytes(&indexed), raw.as_slice());
388
389 let (s, _) = sidecar_lookup(&indexed, "name").expect("name");
390 assert_eq!(read_str(&indexed, s), Some("alice"));
391
392 let (s, _) = sidecar_lookup(&indexed, "age").expect("age");
393 assert_eq!(read_i64(&indexed, s), Some(30));
394
395 let (s, _) = sidecar_lookup(&indexed, "score").expect("score");
396 assert_eq!(read_f64(&indexed, s), Some(99.5));
397 }
398
399 #[test]
400 fn missing_field_returns_none() {
401 let raw = encode(&json!({"x": 1}));
402 let indexed = build_sidecar(&raw).unwrap();
403 assert!(sidecar_lookup(&indexed, "y").is_none());
404 }
405
406 #[test]
409 fn empty_map_has_sidecar() {
410 let raw = encode(&json!({}));
411 let indexed = build_sidecar(&raw).expect("empty map");
412 assert!(has_sidecar(&indexed));
413 assert_eq!(msgpack_bytes(&indexed), raw.as_slice());
414 assert!(sidecar_lookup(&indexed, "anything").is_none());
415 }
416
417 #[test]
420 fn raw_msgpack_has_no_sidecar() {
421 let raw = encode(&json!({"k": "v"}));
422 assert!(!has_sidecar(&raw));
423 assert_eq!(msgpack_bytes(&raw), raw.as_slice());
424 assert!(sidecar_lookup(&raw, "k").is_none());
425 }
426
427 #[test]
430 fn field_index_from_sidecar_basic() {
431 let raw = encode(&json!({"a": 1, "b": "hello"}));
432 let indexed = build_sidecar(&raw).unwrap();
433 let idx = field_index_from_sidecar(&indexed).expect("parse sidecar");
434
435 assert_eq!(idx.len(), 2);
436 assert!(!idx.is_empty());
437 assert_eq!(idx.msgpack_len(), raw.len());
438
439 let (s, _) = idx.get("a").expect("a");
440 assert_eq!(read_i64(&indexed, s), Some(1));
441
442 let (s, _) = idx.get("b").expect("b");
443 assert_eq!(read_str(&indexed, s), Some("hello"));
444
445 assert!(idx.get("missing").is_none());
446 }
447
448 #[test]
449 fn field_index_from_sidecar_none_on_raw() {
450 let raw = encode(&json!({"x": 42}));
451 assert!(field_index_from_sidecar(&raw).is_none());
452 }
453
454 #[test]
457 fn sidecar_matches_field_index() {
458 use crate::msgpack_scan::index::FieldIndex;
459
460 let raw = encode(&json!({"alpha": 100, "beta": "foo", "gamma": 3.125}));
461 let indexed = build_sidecar(&raw).unwrap();
462 let fi = FieldIndex::build(&raw, 0).unwrap();
463
464 for field in &["alpha", "beta", "gamma", "missing"] {
465 let sidecar_result = sidecar_lookup(&indexed, field);
466 let fi_result = fi.get(field);
467 assert_eq!(sidecar_result, fi_result, "mismatch for field '{field}'");
468 }
469 }
470
471 #[test]
474 fn empty_slice_has_no_sidecar() {
475 assert!(!has_sidecar(&[]));
476 assert_eq!(msgpack_bytes(&[]), &[] as &[u8]);
477 }
478
479 #[test]
480 fn short_slice_has_no_sidecar() {
481 assert!(!has_sidecar(&[0x01, 0x02, 0x03]));
482 }
483
484 #[test]
485 fn sidecar_is_transparent_to_msgpack_bytes() {
486 let raw = encode(&json!({"z": 99}));
488 assert_eq!(msgpack_bytes(&raw), raw.as_slice());
489
490 let indexed = build_sidecar(&raw).unwrap();
492 assert_eq!(msgpack_bytes(&indexed), raw.as_slice());
493 }
494
495 #[test]
496 fn corrupted_magic_detected_as_no_sidecar() {
497 let raw = encode(&json!({"x": 1}));
498 let mut indexed = build_sidecar(&raw).unwrap();
499 *indexed.last_mut().unwrap() ^= 0xff;
501 assert!(!has_sidecar(&indexed));
502 }
503
504 #[test]
505 fn many_fields_roundtrip() {
506 let mut map = serde_json::Map::new();
507 for i in 0u64..50 {
508 map.insert(format!("field_{i}"), json!(i * 10));
509 }
510 let raw = encode(&serde_json::Value::Object(map));
511 let indexed = build_sidecar(&raw).unwrap();
512
513 assert!(has_sidecar(&indexed));
514 assert_eq!(msgpack_bytes(&indexed), raw.as_slice());
515
516 for i in 0u64..50 {
517 let key = format!("field_{i}");
518 let (s, _) =
519 sidecar_lookup(&indexed, &key).unwrap_or_else(|| panic!("missing key {key}"));
520 assert_eq!(read_i64(&indexed, s), Some((i * 10) as i64));
521 }
522 }
523
524 #[test]
527 fn build_is_deterministic() {
528 let raw = encode(&json!({"p": 1, "q": 2, "r": 3}));
529 let a = build_sidecar(&raw).unwrap();
530 let b = build_sidecar(&raw).unwrap();
531 assert_eq!(a, b);
532 }
533
534 fn test_fnv1a(s: &str) -> u64 {
539 let mut hash: u64 = 0xcbf29ce484222325;
540 for &b in s.as_bytes() {
541 hash ^= b as u64;
542 hash = hash.wrapping_mul(0x100000001b3);
543 }
544 hash
545 }
546
547 #[test]
555 fn hash_collision_resolved_correctly() {
556 use crate::msgpack_scan::field::extract_field;
557
558 let raw = encode(&json!({"field_a": 111, "field_b": 222}));
560
561 let (a_start, a_end) = extract_field(&raw, 0, "field_a").expect("field_a range");
563 let (b_start, b_end) = extract_field(&raw, 0, "field_b").expect("field_b range");
564
565 let hash_a = test_fnv1a("field_a");
569 let hash_b = test_fnv1a("field_b");
570
571 let collision_hash = hash_a;
573
574 let mut buf = raw.clone();
578
579 buf.extend_from_slice(&collision_hash.to_le_bytes());
581 buf.extend_from_slice(&(a_start as u32).to_le_bytes());
582 buf.extend_from_slice(&((a_end - a_start) as u32).to_le_bytes());
583
584 buf.extend_from_slice(&collision_hash.to_le_bytes());
586 buf.extend_from_slice(&(b_start as u32).to_le_bytes());
587 buf.extend_from_slice(&((b_end - b_start) as u32).to_le_bytes());
588
589 buf.extend_from_slice(&2u16.to_le_bytes());
591 buf.extend_from_slice(&SIDECAR_MAGIC_LE);
592
593 assert!(has_sidecar(&buf), "constructed buffer must have sidecar");
594
595 let (la_start, la_end) = sidecar_lookup(&buf, "field_a").expect("field_a must be found");
598 assert_eq!(
599 (la_start, la_end),
600 (a_start, a_end),
601 "field_a range mismatch"
602 );
603
604 assert_ne!(
612 sidecar_lookup(&buf, "field_a"),
613 Some((b_start, b_end)),
614 "field_a lookup must not return field_b's range"
615 );
616
617 let mut buf2 = raw.clone();
620 buf2.extend_from_slice(&hash_b.to_le_bytes());
622 buf2.extend_from_slice(&(a_start as u32).to_le_bytes());
623 buf2.extend_from_slice(&((a_end - a_start) as u32).to_le_bytes());
624 buf2.extend_from_slice(&hash_b.to_le_bytes());
626 buf2.extend_from_slice(&(b_start as u32).to_le_bytes());
627 buf2.extend_from_slice(&((b_end - b_start) as u32).to_le_bytes());
628 buf2.extend_from_slice(&2u16.to_le_bytes());
629 buf2.extend_from_slice(&SIDECAR_MAGIC_LE);
630
631 assert!(has_sidecar(&buf2));
632
633 let (lb_start, lb_end) = sidecar_lookup(&buf2, "field_b")
637 .expect("field_b must be found despite collision at its hash");
638 assert_eq!(
639 (lb_start, lb_end),
640 (b_start, b_end),
641 "field_b range mismatch"
642 );
643 assert_ne!(
644 Some((lb_start, lb_end)),
645 Some((a_start, a_end)),
646 "must not return field_a's range for field_b"
647 );
648
649 let idx = field_index_from_sidecar(&buf2).expect("parse collision sidecar");
651 let (ib_start, ib_end) = idx.get("field_b").expect("idx field_b");
652 assert_eq!((ib_start, ib_end), (b_start, b_end));
653 assert_ne!((ib_start, ib_end), (a_start, a_end));
654 }
655}