Skip to main content

nxs/
query.rs

1//! Zero-allocation query engine for .nxb files.
2//!
3//! # Usage
4//!
5//! ```no_run
6//! use nxs::query::{Reader, And, eq, gt};
7//!
8//! let data = std::fs::read("data.nxb").unwrap();
9//! let reader = Reader::new(&data).unwrap();
10//!
11//! for record in reader.where_pred(And(eq("active", true), gt("score", 80.0f64))) {
12//!     println!("{:?}", record.get_str("username"));
13//! }
14//! ```
15
16use crate::column_prefetch::ColumnWarmState;
17use crate::error::{NxsError, Result};
18use crate::layout::{
19    col_var_parts, column_sector_len, is_var_sigil, null_bitmap_bytes, var_str_at,
20};
21use crate::prefetch::PrefetchEngine;
22
23pub use crate::prefetch::{AccessHint, CacheStats, OpenOptions};
24
25// ── Format constants ──────────────────────────────────────────────────────────
26use crate::consts::{
27    FLAG_COLUMNAR, FLAG_PAX, FLAG_SCHEMA_EMBEDDED, MAGIC_FILE, MAGIC_FOOTER, MAGIC_OBJ,
28};
29
30/// Data-sector layout (OLAP.md).
31#[derive(Debug, Clone, Copy, PartialEq, Eq)]
32pub enum Layout {
33    Row,
34    Columnar,
35    Pax,
36}
37
38/// Bytes per entry in the PAX tail index (see conformance `generate.rs`).
39const PAX_TAIL_ENTRY_BYTES: usize = 28;
40
41fn footer_size(flags: u16) -> usize {
42    if flags & FLAG_PAX != 0 {
43        28
44    } else if flags & FLAG_COLUMNAR != 0 {
45        20
46    } else {
47        12
48    }
49}
50
51fn col_bit(bm: &[u8], rec: usize) -> bool {
52    (bm[rec / 8] >> (rec % 8)) & 1 == 1
53}
54
55// ── Reader ────────────────────────────────────────────────────────────────────
56
57/// Zero-copy reader for a `.nxb` buffer; supports row, columnar, and PAX layouts.
58///
59/// When opened with [`Self::with_options`], prefetch state is protected by internal
60/// mutexes so the reader remains [`Send`] + [`Sync`].
61pub struct Reader<'a> {
62    data: &'a [u8],
63    keys: Vec<String>,
64    key_sigils: Vec<u8>,
65    key_index: std::collections::HashMap<String, usize>,
66    record_count: usize,
67    tail_start: usize,
68    layout: Layout,
69    col_buf_off: Vec<u64>,
70    col_buf_len: Vec<u64>,
71    prefetch: Option<PrefetchEngine>,
72    column: ColumnWarmState,
73}
74
75impl<'a> Reader<'a> {
76    /// Validate the file header, detect layout, and build the schema index.
77    pub fn new(data: &'a [u8]) -> Result<Self> {
78        if data.len() < 32 {
79            return Err(NxsError::OutOfBounds);
80        }
81        if u32::from_le_bytes(data[0..4].try_into().map_err(|_| NxsError::OutOfBounds)?)
82            != MAGIC_FILE
83        {
84            return Err(NxsError::BadMagic);
85        }
86        if u32::from_le_bytes(
87            data[data.len() - 4..]
88                .try_into()
89                .map_err(|_| NxsError::OutOfBounds)?,
90        ) != MAGIC_FOOTER
91        {
92            return Err(NxsError::BadMagic);
93        }
94
95        let flags = u16::from_le_bytes(data[6..8].try_into().map_err(|_| NxsError::OutOfBounds)?);
96        if flags & FLAG_COLUMNAR != 0 && flags & FLAG_PAX != 0 {
97            return Err(NxsError::InvalidFlags);
98        }
99        let preamble_tail =
100            u64::from_le_bytes(data[16..24].try_into().map_err(|_| NxsError::OutOfBounds)?);
101        if flags & FLAG_COLUMNAR != 0 && preamble_tail == 0 {
102            return Err(NxsError::IncompatibleFlags);
103        }
104
105        let (keys, key_sigils, _schema_end) = if flags & FLAG_SCHEMA_EMBEDDED != 0 {
106            parse_schema(data, 32)?
107        } else {
108            (vec![], vec![], 32)
109        };
110
111        let key_index: std::collections::HashMap<String, usize> = keys
112            .iter()
113            .enumerate()
114            .map(|(i, k)| (k.clone(), i))
115            .collect();
116
117        let (layout, record_count, tail_start, col_buf_off, col_buf_len) = if flags & FLAG_COLUMNAR
118            != 0
119        {
120            let footer = footer_size(flags);
121            let fo = data.len() - footer;
122            let tail_ptr = u64::from_le_bytes(
123                data[fo..fo + 8]
124                    .try_into()
125                    .map_err(|_| NxsError::OutOfBounds)?,
126            ) as usize;
127            let record_count = u64::from_le_bytes(
128                data[fo + 8..fo + 16]
129                    .try_into()
130                    .map_err(|_| NxsError::OutOfBounds)?,
131            ) as usize;
132            let kc = keys.len();
133            let tail_end = tail_ptr
134                .checked_add(kc.checked_mul(20).ok_or(NxsError::OutOfBounds)?)
135                .ok_or(NxsError::OutOfBounds)?;
136            if tail_ptr >= fo || tail_end > fo {
137                return Err(NxsError::OutOfBounds);
138            }
139            let mut off = vec![0u64; kc];
140            let mut len = vec![0u64; kc];
141            for i in 0..kc {
142                let e = tail_ptr + i * 20;
143                let fid = u16::from_le_bytes(
144                    data[e..e + 2]
145                        .try_into()
146                        .map_err(|_| NxsError::OutOfBounds)?,
147                ) as usize;
148                if fid >= kc {
149                    return Err(NxsError::OutOfBounds);
150                }
151                off[fid] = u64::from_le_bytes(
152                    data[e + 4..e + 12]
153                        .try_into()
154                        .map_err(|_| NxsError::OutOfBounds)?,
155                );
156                len[fid] = u64::from_le_bytes(
157                    data[e + 12..e + 20]
158                        .try_into()
159                        .map_err(|_| NxsError::OutOfBounds)?,
160                );
161            }
162            (Layout::Columnar, record_count, tail_ptr, off, len)
163        } else if flags & FLAG_PAX != 0 {
164            let footer = footer_size(flags);
165            let fo = data.len() - footer;
166            let tail_ptr = u64::from_le_bytes(
167                data[fo..fo + 8]
168                    .try_into()
169                    .map_err(|_| NxsError::OutOfBounds)?,
170            ) as usize;
171            let record_count = u64::from_le_bytes(
172                data[fo + 8..fo + 16]
173                    .try_into()
174                    .map_err(|_| NxsError::OutOfBounds)?,
175            ) as usize;
176            (Layout::Pax, record_count, tail_ptr, vec![], vec![])
177        } else {
178            let mut tail_ptr = usize::try_from(preamble_tail).map_err(|_| NxsError::OutOfBounds)?;
179            if tail_ptr == 0 {
180                if data.len() < 44 {
181                    return Err(NxsError::OutOfBounds);
182                }
183                tail_ptr = u64::from_le_bytes(
184                    data[data.len() - 12..data.len() - 4]
185                        .try_into()
186                        .map_err(|_| NxsError::OutOfBounds)?,
187                ) as usize;
188            }
189            if tail_ptr > data.len().saturating_sub(4) {
190                return Err(NxsError::OutOfBounds);
191            }
192            let record_count =
193                u32::from_le_bytes(data[tail_ptr..tail_ptr + 4].try_into().unwrap()) as usize;
194            (Layout::Row, record_count, tail_ptr + 4, vec![], vec![])
195        };
196
197        Ok(Self {
198            data,
199            keys,
200            key_sigils,
201            key_index,
202            record_count,
203            tail_start,
204            layout,
205            col_buf_off,
206            col_buf_len,
207            prefetch: None,
208            column: ColumnWarmState::default(),
209        })
210    }
211
212    /// Open with prefetch options (row-layout viewport cache; phase 1+2).
213    pub fn with_options(data: &'a [u8], options: OpenOptions) -> Result<Self> {
214        options.validate()?;
215        let mut reader = Self::new(data)?;
216        if reader.layout == Layout::Row {
217            let prefetch = PrefetchEngine::new(options, data.len());
218            if prefetch.strategy() == crate::prefetch::PrefetchStrategy::Eager {
219                prefetch.start_eager_background(data.to_vec(), reader.tail_start);
220            }
221            reader.prefetch = Some(prefetch);
222        }
223        Ok(reader)
224    }
225
226    /// Wait for in-progress eager / background prefetch (§8).
227    pub fn warmup(&self) {
228        if let Some(prefetch) = &self.prefetch {
229            prefetch.warmup();
230        }
231    }
232
233    /// Stop scheduling speculative and eager prefetch (§8.1).
234    pub fn pause_prefetch(&self) {
235        if let Some(prefetch) = &self.prefetch {
236            prefetch.pause_prefetch();
237        }
238    }
239
240    /// Resume speculative prefetch after [`Self::pause_prefetch`].
241    pub fn resume_prefetch(&self) {
242        if let Some(prefetch) = &self.prefetch {
243            prefetch.resume_prefetch();
244        }
245    }
246
247    /// Prefetch a single column buffer (columnar layout only; §7.4).
248    pub fn prefetch_column(&self, key: &str) -> Result<()> {
249        if self.layout != Layout::Columnar {
250            return Err(NxsError::UnsupportedLayout);
251        }
252        let slot = *self
253            .key_index
254            .get(key)
255            .ok_or_else(|| NxsError::ParseError(format!("key not found: {key}")))?;
256        let off = *self.col_buf_off.get(slot).ok_or(NxsError::OutOfBounds)? as usize;
257        let len = *self.col_buf_len.get(slot).ok_or(NxsError::OutOfBounds)? as usize;
258        let end = off.checked_add(len).ok_or(NxsError::OutOfBounds)?;
259        if end > self.data.len() {
260            return Err(NxsError::OutOfBounds);
261        }
262        if self.column.prefetch(slot) {
263            const PAGE: usize = 4096;
264            let sector = &self.data[off..end];
265            for page_start in (0..sector.len()).step_by(PAGE) {
266                std::hint::black_box(sector[page_start]);
267            }
268        }
269        Ok(())
270    }
271
272    /// Prefetch pages covering records `[start_index, end_index]` (row layout only).
273    pub fn prefetch_viewport(&self, start_index: usize, end_index: usize) -> Result<()> {
274        if self.layout != Layout::Row {
275            return Ok(());
276        }
277        if let Some(prefetch) = &self.prefetch {
278            prefetch.prefetch_viewport(
279                self.data,
280                self.tail_start,
281                self.record_count,
282                start_index,
283                end_index,
284            );
285        }
286        Ok(())
287    }
288
289    /// Page-cache statistics. Row prefetch counters are zero when opened via
290    /// [`Self::new`]; columnar readers may still report [`CacheStats::column_fetches_issued`].
291    pub fn cache_stats(&self) -> CacheStats {
292        let mut stats = if let Some(prefetch) = &self.prefetch {
293            prefetch.cache_stats()
294        } else {
295            CacheStats {
296                pages_cached: 0,
297                pages_max: 0,
298                memory_used_bytes: 0,
299                cache_hits: 0,
300                cache_misses: 0,
301                fetches_issued: 0,
302                column_fetches_issued: 0,
303                strategy: "disabled".to_string(),
304                pattern: "unknown".to_string(),
305            }
306        };
307        if self.layout == Layout::Columnar {
308            stats.column_fetches_issued = self.column.fetches();
309        }
310        stats
311    }
312
313    fn touch_record_page(&self, index: usize) {
314        if self.layout != Layout::Row {
315            return;
316        }
317        let Some(prefetch) = &self.prefetch else {
318            return;
319        };
320        prefetch.on_access(self.data, self.tail_start, self.record_count, index);
321    }
322
323    /// Row, columnar, or PAX layout.
324    pub fn layout(&self) -> Layout {
325        self.layout
326    }
327
328    /// Sum `key` as f64 across all records (uses column buffers when columnar/PAX).
329    pub fn col_sum_f64(&self, key: &str) -> Option<f64> {
330        let slot = self.slot(key)?;
331        match self.layout {
332            Layout::Row => {
333                let mut sum = 0.0;
334                let mut any = false;
335                for rec in self.all() {
336                    if let Some(v) = rec.get_f64(key) {
337                        sum += v;
338                        any = true;
339                    }
340                }
341                any.then_some(sum)
342            }
343            Layout::Columnar => {
344                let (bm, vals) = self.col_field_parts(slot).ok()?;
345                Some(crate::col_reduce::sum_f64_column(
346                    vals,
347                    bm,
348                    self.record_count,
349                ))
350            }
351            Layout::Pax => {
352                let mut sum = 0.0;
353                for i in 0..self.record_count {
354                    if let Some(v) = self.pax_get_f64(i, slot) {
355                        sum += v;
356                    }
357                }
358                Some(sum)
359            }
360        }
361    }
362
363    /// Zero-copy slice of a column's dense numeric value buffer (columnar only).
364    pub fn col_buffer(&self, key: &str) -> Option<&[u8]> {
365        if self.layout != Layout::Columnar {
366            return None;
367        }
368        let slot = self.slot(key)?;
369        if is_var_sigil(self.key_sigils.get(slot).copied().unwrap_or(0)) {
370            return None;
371        }
372        let (_, vals) = self.col_field_parts(slot).ok()?;
373        Some(vals)
374    }
375
376    /// Zero-copy string/binary column (`offsets` + `values`); columnar only.
377    pub fn col_var_buffer(&self, key: &str) -> Result<crate::arrow_project::VarColumnView<'_>> {
378        if self.layout != Layout::Columnar {
379            return Err(NxsError::UnsupportedFieldType);
380        }
381        let slot = self.slot(key).ok_or(NxsError::OutOfBounds)?;
382        if !is_var_sigil(self.key_sigils.get(slot).copied().unwrap_or(0)) {
383            return Err(NxsError::UnsupportedFieldType);
384        }
385        let (bm, offsets, values) = self.col_field_var_parts(slot)?;
386        Ok(crate::arrow_project::VarColumnView {
387            null_bitmap: bm,
388            offsets,
389            values,
390            record_count: self.record_count,
391        })
392    }
393
394    fn pax_column_sector(&self, page_idx: usize, slot: usize) -> Result<&[u8]> {
395        const MAGIC_PAGE: u32 = 0x4E58_5350;
396        let e = page_idx
397            .checked_mul(PAX_TAIL_ENTRY_BYTES)
398            .and_then(|n| self.tail_start.checked_add(n))
399            .ok_or(NxsError::OutOfBounds)?;
400        let page_off_start = e.checked_add(16).ok_or(NxsError::OutOfBounds)?;
401        let page_off_end = e.checked_add(24).ok_or(NxsError::OutOfBounds)?;
402        let poff = u64::from_le_bytes(
403            self.data
404                .get(page_off_start..page_off_end)
405                .ok_or(NxsError::OutOfBounds)?
406                .try_into()
407                .map_err(|_| NxsError::OutOfBounds)?,
408        ) as usize;
409        if poff > self.data.len().saturating_sub(24) {
410            return Err(NxsError::OutOfBounds);
411        }
412        if u32::from_le_bytes(
413            self.data[poff..poff + 4]
414                .try_into()
415                .map_err(|_| NxsError::OutOfBounds)?,
416        ) != MAGIC_PAGE
417        {
418            return Err(NxsError::InvalidPageMagic);
419        }
420        let rc = u32::from_le_bytes(
421            self.data[poff + 16..poff + 20]
422                .try_into()
423                .map_err(|_| NxsError::OutOfBounds)?,
424        ) as usize;
425        let field_count = u16::from_le_bytes(
426            self.data[poff + 20..poff + 22]
427                .try_into()
428                .map_err(|_| NxsError::OutOfBounds)?,
429        ) as usize;
430        if slot >= field_count {
431            return Err(NxsError::OutOfBounds);
432        }
433        let mut body = poff.checked_add(24).ok_or(NxsError::OutOfBounds)?;
434        for fi in 0..slot {
435            if body > self.data.len() {
436                return Err(NxsError::OutOfBounds);
437            }
438            let sig = self.key_sigils.get(fi).copied().unwrap_or(b'=');
439            let slen = column_sector_len(&self.data[body..], rc, sig)?;
440            body = body.checked_add(slen).ok_or(NxsError::OutOfBounds)?;
441        }
442        if body > self.data.len() {
443            return Err(NxsError::OutOfBounds);
444        }
445        let sig = self.key_sigils.get(slot).copied().unwrap_or(b'=');
446        let slen = column_sector_len(&self.data[body..], rc, sig)?;
447        if body > self.data.len().saturating_sub(slen) {
448            return Err(NxsError::OutOfBounds);
449        }
450        Ok(&self.data[body..body + slen])
451    }
452
453    fn pax_page_field_var_parts(
454        &self,
455        page_idx: usize,
456        slot: usize,
457    ) -> Result<(&[u8], &[u8], &[u8])> {
458        let sector = self.pax_column_sector(page_idx, slot)?;
459        let rc = self
460            .pax_page_rec_count(page_idx)
461            .ok_or(NxsError::OutOfBounds)? as usize;
462        col_var_parts(sector, rc)
463    }
464
465    fn pax_locate_record(&self, record_index: usize) -> Option<(usize, usize)> {
466        let mut lo = 0i32;
467        let mut hi = self.page_count().saturating_sub(1) as i32;
468        while lo <= hi {
469            let mid = ((lo + hi) / 2) as usize;
470            let start = self.pax_page_rec_start(mid)?;
471            let count = self.pax_page_rec_count(mid)?;
472            if (record_index as u64) < start {
473                hi = mid as i32 - 1;
474            } else if record_index >= start as usize + count as usize {
475                lo = mid as i32 + 1;
476            } else {
477                let local = record_index - start as usize;
478                return Some((mid, local));
479            }
480        }
481        None
482    }
483
484    fn pax_get_f64(&self, record_index: usize, slot: usize) -> Option<f64> {
485        let (pi, local) = self.pax_locate_record(record_index)?;
486        if is_var_sigil(*self.key_sigils.get(slot)?) {
487            return None;
488        }
489        let (bm, vals) = self.pax_page_field_parts(pi, slot).ok()?;
490        if !col_bit(bm, local) {
491            return None;
492        }
493        let off = local * 8;
494        Some(f64::from_le_bytes(vals.get(off..off + 8)?.try_into().ok()?))
495    }
496
497    fn pax_get_i64(&self, record_index: usize, slot: usize) -> Option<i64> {
498        let (pi, local) = self.pax_locate_record(record_index)?;
499        if is_var_sigil(*self.key_sigils.get(slot)?) {
500            return None;
501        }
502        let (bm, vals) = self.pax_page_field_parts(pi, slot).ok()?;
503        if !col_bit(bm, local) {
504            return None;
505        }
506        let off = local * 8;
507        Some(i64::from_le_bytes(vals.get(off..off + 8)?.try_into().ok()?))
508    }
509
510    fn pax_get_bool(&self, record_index: usize, slot: usize) -> Option<bool> {
511        let (pi, local) = self.pax_locate_record(record_index)?;
512        if is_var_sigil(*self.key_sigils.get(slot)?) {
513            return None;
514        }
515        let (bm, vals) = self.pax_page_field_parts(pi, slot).ok()?;
516        if !col_bit(bm, local) {
517            return None;
518        }
519        Some(vals.get(local * 8)? != &0)
520    }
521
522    fn pax_get_str(&self, record_index: usize, slot: usize) -> Option<&str> {
523        let (pi, local) = self.pax_locate_record(record_index)?;
524        if self.key_sigils.get(slot).copied() != Some(b'"') {
525            return None;
526        }
527        let (bm, offsets, values) = self.pax_page_field_var_parts(pi, slot).ok()?;
528        if !col_bit(bm, local) {
529            return None;
530        }
531        var_str_at(offsets, values, local)
532    }
533
534    fn page_count(&self) -> usize {
535        if self.layout != Layout::Pax {
536            return 0;
537        }
538        let tp = self.tail_start;
539        if tp > self.data.len().saturating_sub(4) {
540            return 0;
541        }
542        // page count stored in footer; re-read from footer
543        let fo = self.data.len() - footer_size(FLAG_PAX);
544        u32::from_le_bytes(self.data[fo + 16..fo + 20].try_into().unwrap_or([0; 4])) as usize
545    }
546
547    fn pax_page_rec_start(&self, page_idx: usize) -> Option<u64> {
548        let e = page_idx
549            .checked_mul(PAX_TAIL_ENTRY_BYTES)
550            .and_then(|n| self.tail_start.checked_add(n))?;
551        let start = e.checked_add(4)?;
552        let end = e.checked_add(12)?;
553        Some(u64::from_le_bytes(
554            self.data.get(start..end)?.try_into().ok()?,
555        ))
556    }
557
558    fn pax_page_rec_count(&self, page_idx: usize) -> Option<u32> {
559        let e = page_idx
560            .checked_mul(PAX_TAIL_ENTRY_BYTES)
561            .and_then(|n| self.tail_start.checked_add(n))?;
562        let start = e.checked_add(12)?;
563        let end = e.checked_add(16)?;
564        Some(u32::from_le_bytes(
565            self.data.get(start..end)?.try_into().ok()?,
566        ))
567    }
568
569    fn pax_page_field_parts(&self, page_idx: usize, slot: usize) -> Result<(&[u8], &[u8])> {
570        let sector = self.pax_column_sector(page_idx, slot)?;
571        let rc = self
572            .pax_page_rec_count(page_idx)
573            .ok_or(NxsError::OutOfBounds)? as usize;
574        let bm_len = null_bitmap_bytes(rc);
575        if sector.len() < bm_len {
576            return Err(NxsError::OutOfBounds);
577        }
578        let vals_end = bm_len + rc * 8;
579        if sector.len() < vals_end {
580            return Err(NxsError::OutOfBounds);
581        }
582        Ok((&sector[..bm_len], &sector[bm_len..vals_end]))
583    }
584
585    /// Variable-length column parts (null bitmap, u32 offsets, values) for columnar layout.
586    pub fn col_field_var_parts(&self, slot: usize) -> Result<(&[u8], &[u8], &[u8])> {
587        let off = *self.col_buf_off.get(slot).ok_or(NxsError::OutOfBounds)? as usize;
588        let len = *self.col_buf_len.get(slot).ok_or(NxsError::OutOfBounds)? as usize;
589        let end = off.checked_add(len).ok_or(NxsError::OutOfBounds)?;
590        if end > self.data.len() {
591            return Err(NxsError::OutOfBounds);
592        }
593        col_var_parts(&self.data[off..end], self.record_count)
594    }
595
596    fn col_field_parts(&self, slot: usize) -> Result<(&[u8], &[u8])> {
597        if self
598            .key_sigils
599            .get(slot)
600            .copied()
601            .map(is_var_sigil)
602            .unwrap_or(false)
603        {
604            return Err(NxsError::UnsupportedFieldType);
605        }
606        let off = *self.col_buf_off.get(slot).ok_or(NxsError::OutOfBounds)? as usize;
607        let len = *self.col_buf_len.get(slot).ok_or(NxsError::OutOfBounds)? as usize;
608        let end = off.checked_add(len).ok_or(NxsError::OutOfBounds)?;
609        if end > self.data.len() {
610            return Err(NxsError::OutOfBounds);
611        }
612        let bm_len = null_bitmap_bytes(self.record_count);
613        let vals_len = self.record_count.saturating_mul(8);
614        let vals_end = bm_len.checked_add(vals_len).ok_or(NxsError::OutOfBounds)?;
615        if len < vals_end {
616            return Err(NxsError::OutOfBounds);
617        }
618        let sector = &self.data[off..end];
619        Ok((&sector[..bm_len], &sector[bm_len..vals_end]))
620    }
621
622    /// Number of top-level records in the file.
623    pub fn record_count(&self) -> usize {
624        self.record_count
625    }
626
627    /// Schema key names.
628    pub fn keys(&self) -> &[String] {
629        &self.keys
630    }
631
632    /// Schema sigil bytes, parallel to `keys()`.
633    pub fn key_sigils(&self) -> &[u8] {
634        &self.key_sigils
635    }
636
637    /// Resolve a key name to its slot index. O(1) via HashMap.
638    pub fn slot(&self, key: &str) -> Option<usize> {
639        self.key_index.get(key).copied()
640    }
641
642    /// Access a single record by zero-based index. O(1) via tail-index.
643    pub fn record(&self, i: usize) -> Option<Record<'a, '_>> {
644        if i >= self.record_count {
645            return None;
646        }
647        self.touch_record_page(i);
648        let offset = if self.layout == Layout::Row {
649            let entry = self.tail_start + i * 10;
650            u64::from_le_bytes(self.data.get(entry + 2..entry + 10)?.try_into().ok()?) as usize
651        } else {
652            i
653        };
654        Some(Record {
655            data: self.data,
656            reader: self,
657            offset,
658        })
659    }
660
661    /// Return an iterator over all records.
662    pub fn all(&'a self) -> Records<'a, 'a, AlwaysTrue> {
663        Records {
664            reader: self,
665            pred: AlwaysTrue,
666            index: 0,
667        }
668    }
669
670    /// Return a lazy iterator over records matching `pred`.
671    pub fn where_pred<P: Predicate>(&'a self, pred: P) -> Records<'a, 'a, P> {
672        Records {
673            reader: self,
674            pred,
675            index: 0,
676        }
677    }
678}
679
680// ── Record ────────────────────────────────────────────────────────────────────
681
682/// A lazy view into a single NYXO object within the buffer.
683/// Field reads decode directly from the mapped bytes — no allocation.
684pub struct Record<'data, 'reader> {
685    data: &'data [u8],
686    reader: &'reader Reader<'data>,
687    offset: usize,
688}
689
690impl<'data, 'reader> Record<'data, 'reader> {
691    /// Resolve the byte offset of slot `s` within this object. Returns `None` if absent.
692    fn resolve(&self, slot: usize) -> Option<usize> {
693        resolve_slot(self.data, self.offset, slot)
694    }
695
696    /// Read an `i64` field.
697    pub fn get_i64(&self, key: &str) -> Option<i64> {
698        let slot = self.reader.slot(key)?;
699        match self.reader.layout {
700            Layout::Columnar => {
701                if is_var_sigil(*self.reader.key_sigils.get(slot)?) {
702                    return None;
703                }
704                let ri = self.offset;
705                let (bm, vals) = self.reader.col_field_parts(slot).ok()?;
706                if !col_bit(bm, ri) {
707                    return None;
708                }
709                let off = ri * 8;
710                Some(i64::from_le_bytes(vals.get(off..off + 8)?.try_into().ok()?))
711            }
712            Layout::Pax => self.reader.pax_get_i64(self.offset, slot),
713            Layout::Row => {
714                let off = self.resolve(slot)?;
715                Some(i64::from_le_bytes(
716                    self.data.get(off..off + 8)?.try_into().ok()?,
717                ))
718            }
719        }
720    }
721
722    /// Read an `f64` field.
723    pub fn get_f64(&self, key: &str) -> Option<f64> {
724        let slot = self.reader.slot(key)?;
725        if self.reader.layout == Layout::Columnar {
726            if is_var_sigil(*self.reader.key_sigils.get(slot)?) {
727                return None;
728            }
729            let ri = self.offset;
730            let (bm, vals) = self.reader.col_field_parts(slot).ok()?;
731            if !col_bit(bm, ri) {
732                return None;
733            }
734            let off = ri * 8;
735            return Some(f64::from_le_bytes(vals.get(off..off + 8)?.try_into().ok()?));
736        }
737        if self.reader.layout == Layout::Pax {
738            return self.reader.pax_get_f64(self.offset, slot);
739        }
740        let off = self.resolve(slot)?;
741        Some(f64::from_le_bytes(
742            self.data.get(off..off + 8)?.try_into().ok()?,
743        ))
744    }
745
746    /// Read a `bool` field.
747    pub fn get_bool(&self, key: &str) -> Option<bool> {
748        let slot = self.reader.slot(key)?;
749        match self.reader.layout {
750            Layout::Columnar => {
751                if is_var_sigil(*self.reader.key_sigils.get(slot)?) {
752                    return None;
753                }
754                let ri = self.offset;
755                let (bm, vals) = self.reader.col_field_parts(slot).ok()?;
756                if !col_bit(bm, ri) {
757                    return None;
758                }
759                Some(vals.get(ri * 8)? != &0)
760            }
761            Layout::Pax => self.reader.pax_get_bool(self.offset, slot),
762            Layout::Row => {
763                let off = self.resolve(slot)?;
764                Some(*self.data.get(off)? != 0)
765            }
766        }
767    }
768
769    /// Read a `&str` field (zero-copy slice into the buffer).
770    pub fn get_str(&self, key: &str) -> Option<&str> {
771        let slot = self.reader.slot(key)?;
772        match self.reader.layout {
773            Layout::Columnar => {
774                if self.reader.key_sigils.get(slot).copied() != Some(b'"') {
775                    return None;
776                }
777                let ri = self.offset;
778                let (bm, offsets, values) = self.reader.col_field_var_parts(slot).ok()?;
779                if !col_bit(bm, ri) {
780                    return None;
781                }
782                var_str_at(offsets, values, ri)
783            }
784            Layout::Pax => self.reader.pax_get_str(self.offset, slot),
785            Layout::Row => {
786                let off = self.resolve(slot)?;
787                let len =
788                    u32::from_le_bytes(self.data.get(off..off + 4)?.try_into().ok()?) as usize;
789                let bytes = self.data.get(off + 4..off + 4 + len)?;
790                std::str::from_utf8(bytes).ok()
791            }
792        }
793    }
794
795    /// Walk a dot-notated path and read the leaf as `&str`.
796    /// Example: `record.get_str_path("address.city")`
797    pub fn get_str_path(&self, dot_path: &str) -> Option<&str> {
798        let (leaf_off, data) = self.walk_path(dot_path)?;
799        let len = u32::from_le_bytes(data.get(leaf_off..leaf_off + 4)?.try_into().ok()?) as usize;
800        let bytes = data.get(leaf_off + 4..leaf_off + 4 + len)?;
801        std::str::from_utf8(bytes).ok()
802    }
803
804    /// Walk a dot-notated path and read the leaf as `i64`.
805    pub fn get_i64_path(&self, dot_path: &str) -> Option<i64> {
806        let (off, data) = self.walk_path(dot_path)?;
807        Some(i64::from_le_bytes(data.get(off..off + 8)?.try_into().ok()?))
808    }
809
810    /// Walk a dot-notated path and read the leaf as `f64`.
811    pub fn get_f64_path(&self, dot_path: &str) -> Option<f64> {
812        let (off, data) = self.walk_path(dot_path)?;
813        Some(f64::from_le_bytes(data.get(off..off + 8)?.try_into().ok()?))
814    }
815
816    /// Walk a dot-notated path and read the leaf as `bool`.
817    pub fn get_bool_path(&self, dot_path: &str) -> Option<bool> {
818        let (off, data) = self.walk_path(dot_path)?;
819        Some(*data.get(off)? != 0)
820    }
821
822    /// Navigate all but the last path segment, returning (leaf_offset, data).
823    fn walk_path(&self, dot_path: &str) -> Option<(usize, &'data [u8])> {
824        // Use plain split('.') so arbitrarily deep paths work correctly.
825        // Previously splitn(8, '.') silently concatenated segments 8+ into the 8th
826        // key string, causing incorrect lookups with no error for paths > 8 segments.
827        let mut parts = dot_path.split('.');
828        let mut obj_offset = self.offset;
829        let data = self.data;
830        let mut part = parts.next()?;
831        loop {
832            let slot = self.reader.slot(part)?;
833            let field_off = resolve_slot(data, obj_offset, slot)?;
834            match parts.next() {
835                None => return Some((field_off, data)),
836                Some(next) => {
837                    // intermediate: must be NYXO
838                    let magic =
839                        u32::from_le_bytes(data.get(field_off..field_off + 4)?.try_into().ok()?);
840                    if magic != MAGIC_OBJ {
841                        return None;
842                    }
843                    obj_offset = field_off;
844                    part = next;
845                }
846            }
847        }
848    }
849}
850
851// ── Iterator ──────────────────────────────────────────────────────────────────
852
853/// A lazy iterator over records filtered by `P`.
854/// Does not allocate; predicate evaluation reads directly from the buffer.
855pub struct Records<'data, 'reader, P: Predicate> {
856    reader: &'reader Reader<'data>,
857    pred: P,
858    index: usize,
859}
860
861impl<'data, 'reader, P: Predicate> Iterator for Records<'data, 'reader, P> {
862    type Item = Record<'data, 'reader>;
863
864    fn next(&mut self) -> Option<Self::Item> {
865        let r = self.reader;
866        loop {
867            if self.index >= r.record_count {
868                return None;
869            }
870            let i = self.index;
871            self.index += 1;
872            // For Row layout: look up absolute byte offset via the per-record tail-index entry
873            // (each entry is 10 bytes: 2-byte flags + 8-byte absolute offset).
874            // For Columnar/PAX layout: there is no row tail-index; the record is identified
875            // by its zero-based record index directly.
876            let abs = match r.layout {
877                Layout::Row => {
878                    let entry = r.tail_start + i * 10;
879                    u64::from_le_bytes(r.data.get(entry + 2..entry + 10)?.try_into().ok()?) as usize
880                }
881                Layout::Columnar | Layout::Pax => i,
882            };
883            if self.pred.test(r.data, r, abs) {
884                return Some(Record {
885                    data: r.data,
886                    reader: r,
887                    offset: abs,
888                });
889            }
890        }
891    }
892}
893
894// ── Predicates ────────────────────────────────────────────────────────────────
895
896/// A predicate tests a record in-place without allocation.
897pub trait Predicate {
898    fn test(&self, data: &[u8], reader: &Reader<'_>, obj_offset: usize) -> bool;
899}
900
901/// Always-true predicate for `Reader::all()`.
902pub struct AlwaysTrue;
903impl Predicate for AlwaysTrue {
904    fn test(&self, _: &[u8], _: &Reader<'_>, _: usize) -> bool {
905        true
906    }
907}
908
909/// `Eq("key", value)` — equality for bool, &str, i64, f64.
910pub struct Eq<'k, V> {
911    pub key: &'k str,
912    pub value: V,
913}
914
915pub fn eq<'k, V>(key: &'k str, value: V) -> crate::query::Eq<'k, V> {
916    crate::query::Eq { key, value }
917}
918
919/// Helper: resolve the byte offset of a field for Row layout, or read directly for
920/// Columnar/PAX layout using the reader's column buffers.
921/// Returns `None` if the field is absent or the layout doesn't support direct resolution.
922#[allow(dead_code)]
923fn row_field_offset(data: &[u8], reader: &Reader<'_>, off: usize, slot: usize) -> Option<usize> {
924    // For Row layout `off` is the byte offset to a NYXO object.
925    // For Columnar/PAX `off` is the record index — row-oriented slot resolution is invalid.
926    match reader.layout {
927        Layout::Row => resolve_slot(data, off, slot),
928        Layout::Columnar | Layout::Pax => None,
929    }
930}
931
932impl Predicate for Eq<'_, bool> {
933    fn test(&self, data: &[u8], reader: &Reader<'_>, off: usize) -> bool {
934        let Some(slot) = reader.slot(self.key) else {
935            return false;
936        };
937        match reader.layout {
938            Layout::Columnar => {
939                if is_var_sigil(*reader.key_sigils.get(slot).unwrap_or(&0)) {
940                    return false;
941                }
942                let Ok((bm, vals)) = reader.col_field_parts(slot) else {
943                    return false;
944                };
945                if !col_bit(bm, off) {
946                    return false;
947                }
948                vals.get(off * 8)
949                    .map(|&b| (b != 0) == self.value)
950                    .unwrap_or(false)
951            }
952            Layout::Pax => reader
953                .pax_get_bool(off, slot)
954                .map(|v| v == self.value)
955                .unwrap_or(false),
956            Layout::Row => {
957                let Some(foff) = resolve_slot(data, off, slot) else {
958                    return false;
959                };
960                data.get(foff)
961                    .map(|&b| (b != 0) == self.value)
962                    .unwrap_or(false)
963            }
964        }
965    }
966}
967
968impl<'k> Predicate for Eq<'k, &str> {
969    fn test(&self, data: &[u8], reader: &Reader<'_>, off: usize) -> bool {
970        let Some(slot) = reader.slot(self.key) else {
971            return false;
972        };
973        match reader.layout {
974            Layout::Columnar => reader
975                .col_field_var_parts(slot)
976                .ok()
977                .and_then(|(bm, offsets, values)| {
978                    if !col_bit(bm, off) {
979                        return None;
980                    }
981                    crate::layout::var_str_at(offsets, values, off)
982                })
983                .map(|s| s == self.value)
984                .unwrap_or(false),
985            Layout::Pax => reader
986                .pax_get_str(off, slot)
987                .map(|s| s == self.value)
988                .unwrap_or(false),
989            Layout::Row => {
990                let Some(foff) = resolve_slot(data, off, slot) else {
991                    return false;
992                };
993                let Some(len_bytes) = data.get(foff..foff + 4) else {
994                    return false;
995                };
996                let len = u32::from_le_bytes(len_bytes.try_into().unwrap()) as usize;
997                data.get(foff + 4..foff + 4 + len)
998                    .and_then(|b| std::str::from_utf8(b).ok())
999                    .map(|s| s == self.value)
1000                    .unwrap_or(false)
1001            }
1002        }
1003    }
1004}
1005
1006impl Predicate for Eq<'_, i64> {
1007    fn test(&self, data: &[u8], reader: &Reader<'_>, off: usize) -> bool {
1008        let Some(slot) = reader.slot(self.key) else {
1009            return false;
1010        };
1011        match reader.layout {
1012            Layout::Columnar => {
1013                if is_var_sigil(*reader.key_sigils.get(slot).unwrap_or(&0)) {
1014                    return false;
1015                }
1016                let Ok((bm, vals)) = reader.col_field_parts(slot) else {
1017                    return false;
1018                };
1019                if !col_bit(bm, off) {
1020                    return false;
1021                }
1022                let o = off * 8;
1023                vals.get(o..o + 8)
1024                    .and_then(|b| b.try_into().ok())
1025                    .map(|b| i64::from_le_bytes(b) == self.value)
1026                    .unwrap_or(false)
1027            }
1028            Layout::Pax => reader
1029                .pax_get_i64(off, slot)
1030                .map(|v| v == self.value)
1031                .unwrap_or(false),
1032            Layout::Row => {
1033                let Some(foff) = resolve_slot(data, off, slot) else {
1034                    return false;
1035                };
1036                data.get(foff..foff + 8)
1037                    .and_then(|b| b.try_into().ok())
1038                    .map(|b| i64::from_le_bytes(b) == self.value)
1039                    .unwrap_or(false)
1040            }
1041        }
1042    }
1043}
1044
1045impl Predicate for Eq<'_, f64> {
1046    fn test(&self, data: &[u8], reader: &Reader<'_>, off: usize) -> bool {
1047        let Some(slot) = reader.slot(self.key) else {
1048            return false;
1049        };
1050        match reader.layout {
1051            Layout::Columnar => {
1052                if is_var_sigil(*reader.key_sigils.get(slot).unwrap_or(&0)) {
1053                    return false;
1054                }
1055                let Ok((bm, vals)) = reader.col_field_parts(slot) else {
1056                    return false;
1057                };
1058                if !col_bit(bm, off) {
1059                    return false;
1060                }
1061                let o = off * 8;
1062                vals.get(o..o + 8)
1063                    .and_then(|b| b.try_into().ok())
1064                    .map(|b| f64::from_le_bytes(b) == self.value)
1065                    .unwrap_or(false)
1066            }
1067            Layout::Pax => reader
1068                .pax_get_f64(off, slot)
1069                .map(|v| v == self.value)
1070                .unwrap_or(false),
1071            Layout::Row => {
1072                let Some(foff) = resolve_slot(data, off, slot) else {
1073                    return false;
1074                };
1075                data.get(foff..foff + 8)
1076                    .and_then(|b| b.try_into().ok())
1077                    .map(|b| f64::from_le_bytes(b) == self.value)
1078                    .unwrap_or(false)
1079            }
1080        }
1081    }
1082}
1083
1084/// `Gt("key", value)` — greater-than for f64 or i64.
1085pub struct Gt<'k, V> {
1086    pub key: &'k str,
1087    pub value: V,
1088}
1089
1090pub fn gt<'k, V>(key: &'k str, value: V) -> crate::query::Gt<'k, V> {
1091    crate::query::Gt { key, value }
1092}
1093
1094impl Predicate for Gt<'_, f64> {
1095    fn test(&self, data: &[u8], reader: &Reader<'_>, off: usize) -> bool {
1096        let Some(slot) = reader.slot(self.key) else {
1097            return false;
1098        };
1099        match reader.layout {
1100            Layout::Columnar => {
1101                if is_var_sigil(*reader.key_sigils.get(slot).unwrap_or(&0)) {
1102                    return false;
1103                }
1104                let Ok((bm, vals)) = reader.col_field_parts(slot) else {
1105                    return false;
1106                };
1107                if !col_bit(bm, off) {
1108                    return false;
1109                }
1110                let o = off * 8;
1111                vals.get(o..o + 8)
1112                    .and_then(|b| b.try_into().ok())
1113                    .map(|b| f64::from_le_bytes(b) > self.value)
1114                    .unwrap_or(false)
1115            }
1116            Layout::Pax => reader
1117                .pax_get_f64(off, slot)
1118                .map(|v| v > self.value)
1119                .unwrap_or(false),
1120            Layout::Row => {
1121                let Some(foff) = resolve_slot(data, off, slot) else {
1122                    return false;
1123                };
1124                data.get(foff..foff + 8)
1125                    .and_then(|b| b.try_into().ok())
1126                    .map(|b| f64::from_le_bytes(b) > self.value)
1127                    .unwrap_or(false)
1128            }
1129        }
1130    }
1131}
1132
1133impl Predicate for Gt<'_, i64> {
1134    fn test(&self, data: &[u8], reader: &Reader<'_>, off: usize) -> bool {
1135        let Some(slot) = reader.slot(self.key) else {
1136            return false;
1137        };
1138        match reader.layout {
1139            Layout::Columnar => {
1140                if is_var_sigil(*reader.key_sigils.get(slot).unwrap_or(&0)) {
1141                    return false;
1142                }
1143                let Ok((bm, vals)) = reader.col_field_parts(slot) else {
1144                    return false;
1145                };
1146                if !col_bit(bm, off) {
1147                    return false;
1148                }
1149                let o = off * 8;
1150                vals.get(o..o + 8)
1151                    .and_then(|b| b.try_into().ok())
1152                    .map(|b| i64::from_le_bytes(b) > self.value)
1153                    .unwrap_or(false)
1154            }
1155            Layout::Pax => reader
1156                .pax_get_i64(off, slot)
1157                .map(|v| v > self.value)
1158                .unwrap_or(false),
1159            Layout::Row => {
1160                let Some(foff) = resolve_slot(data, off, slot) else {
1161                    return false;
1162                };
1163                data.get(foff..foff + 8)
1164                    .and_then(|b| b.try_into().ok())
1165                    .map(|b| i64::from_le_bytes(b) > self.value)
1166                    .unwrap_or(false)
1167            }
1168        }
1169    }
1170}
1171
1172/// `Lt("key", value)` — less-than.
1173pub struct Lt<'k, V> {
1174    pub key: &'k str,
1175    pub value: V,
1176}
1177
1178pub fn lt<'k, V>(key: &'k str, value: V) -> crate::query::Lt<'k, V> {
1179    crate::query::Lt { key, value }
1180}
1181
1182impl Predicate for Lt<'_, f64> {
1183    fn test(&self, data: &[u8], reader: &Reader<'_>, off: usize) -> bool {
1184        let Some(slot) = reader.slot(self.key) else {
1185            return false;
1186        };
1187        match reader.layout {
1188            Layout::Columnar => {
1189                if is_var_sigil(*reader.key_sigils.get(slot).unwrap_or(&0)) {
1190                    return false;
1191                }
1192                let Ok((bm, vals)) = reader.col_field_parts(slot) else {
1193                    return false;
1194                };
1195                if !col_bit(bm, off) {
1196                    return false;
1197                }
1198                let o = off * 8;
1199                vals.get(o..o + 8)
1200                    .and_then(|b| b.try_into().ok())
1201                    .map(|b| f64::from_le_bytes(b) < self.value)
1202                    .unwrap_or(false)
1203            }
1204            Layout::Pax => reader
1205                .pax_get_f64(off, slot)
1206                .map(|v| v < self.value)
1207                .unwrap_or(false),
1208            Layout::Row => {
1209                let Some(foff) = resolve_slot(data, off, slot) else {
1210                    return false;
1211                };
1212                data.get(foff..foff + 8)
1213                    .and_then(|b| b.try_into().ok())
1214                    .map(|b| f64::from_le_bytes(b) < self.value)
1215                    .unwrap_or(false)
1216            }
1217        }
1218    }
1219}
1220
1221impl Predicate for Lt<'_, i64> {
1222    fn test(&self, data: &[u8], reader: &Reader<'_>, off: usize) -> bool {
1223        let Some(slot) = reader.slot(self.key) else {
1224            return false;
1225        };
1226        match reader.layout {
1227            Layout::Columnar => {
1228                if is_var_sigil(*reader.key_sigils.get(slot).unwrap_or(&0)) {
1229                    return false;
1230                }
1231                let Ok((bm, vals)) = reader.col_field_parts(slot) else {
1232                    return false;
1233                };
1234                if !col_bit(bm, off) {
1235                    return false;
1236                }
1237                let o = off * 8;
1238                vals.get(o..o + 8)
1239                    .and_then(|b| b.try_into().ok())
1240                    .map(|b| i64::from_le_bytes(b) < self.value)
1241                    .unwrap_or(false)
1242            }
1243            Layout::Pax => reader
1244                .pax_get_i64(off, slot)
1245                .map(|v| v < self.value)
1246                .unwrap_or(false),
1247            Layout::Row => {
1248                let Some(foff) = resolve_slot(data, off, slot) else {
1249                    return false;
1250                };
1251                data.get(foff..foff + 8)
1252                    .and_then(|b| b.try_into().ok())
1253                    .map(|b| i64::from_le_bytes(b) < self.value)
1254                    .unwrap_or(false)
1255            }
1256        }
1257    }
1258}
1259
1260/// `And(p1, p2)` — logical AND of two predicates.
1261pub struct And<A, B>(pub A, pub B);
1262
1263impl<A: Predicate, B: Predicate> Predicate for And<A, B> {
1264    fn test(&self, data: &[u8], reader: &Reader<'_>, off: usize) -> bool {
1265        self.0.test(data, reader, off) && self.1.test(data, reader, off)
1266    }
1267}
1268
1269/// `Or(p1, p2)` — logical OR of two predicates.
1270pub struct Or<A, B>(pub A, pub B);
1271
1272impl<A: Predicate, B: Predicate> Predicate for Or<A, B> {
1273    fn test(&self, data: &[u8], reader: &Reader<'_>, off: usize) -> bool {
1274        self.0.test(data, reader, off) || self.1.test(data, reader, off)
1275    }
1276}
1277
1278/// `Not(p)` — logical NOT.
1279pub struct Not<P>(pub P);
1280
1281impl<P: Predicate> Predicate for Not<P> {
1282    fn test(&self, data: &[u8], reader: &Reader<'_>, off: usize) -> bool {
1283        !self.0.test(data, reader, off)
1284    }
1285}
1286
1287// ── Schema parser ─────────────────────────────────────────────────────────────
1288
1289pub(crate) fn parse_schema(data: &[u8], offset: usize) -> Result<(Vec<String>, Vec<u8>, usize)> {
1290    if offset + 2 > data.len() {
1291        return Err(NxsError::OutOfBounds);
1292    }
1293    let key_count = u16::from_le_bytes(
1294        data[offset..offset + 2]
1295            .try_into()
1296            .map_err(|_| NxsError::OutOfBounds)?,
1297    ) as usize;
1298    let mut pos = offset + 2;
1299
1300    if pos + key_count > data.len() {
1301        return Err(NxsError::OutOfBounds);
1302    }
1303    let sigils = data[pos..pos + key_count].to_vec();
1304    pos += key_count;
1305
1306    let mut keys = Vec::with_capacity(key_count);
1307    for _ in 0..key_count {
1308        let start = pos;
1309        while pos < data.len() && data[pos] != 0 {
1310            pos += 1;
1311        }
1312        if pos >= data.len() {
1313            return Err(NxsError::OutOfBounds);
1314        }
1315        keys.push(
1316            std::str::from_utf8(&data[start..pos])
1317                .map_err(|_| NxsError::ParseError("invalid utf-8 key".into()))?
1318                .to_owned(),
1319        );
1320        pos += 1; // skip null terminator
1321    }
1322    // align to 8 bytes
1323    if pos % 8 != 0 {
1324        pos += 8 - pos % 8;
1325    }
1326    Ok((keys, sigils, pos))
1327}
1328
1329// ── resolveSlot ───────────────────────────────────────────────────────────────
1330
1331/// Stateless LEB128 bitmask walker — returns the absolute byte offset of
1332/// the value at `slot` within the NYXO object at `obj_offset`, or `None`.
1333pub(crate) fn resolve_slot(data: &[u8], obj_offset: usize, slot: usize) -> Option<usize> {
1334    let mut p = obj_offset.checked_add(8)?; // skip NYXO magic (4) + length (4)
1335    let mut cur: usize = 0;
1336    let mut table_idx: usize = 0;
1337    let mut found = false;
1338    let mut b: u8;
1339    loop {
1340        b = *data.get(p)?;
1341        p = p.checked_add(1)?;
1342        let bits = b & 0x7F;
1343        for bit in 0..7usize {
1344            if cur == slot {
1345                if (bits >> bit) & 1 == 0 {
1346                    return None;
1347                }
1348                found = true;
1349            } else if cur < slot && (bits >> bit) & 1 == 1 {
1350                table_idx = table_idx.checked_add(1)?;
1351            }
1352            cur = cur.checked_add(1)?;
1353        }
1354        if found && b & 0x80 == 0 {
1355            break;
1356        }
1357        if cur > slot && found {
1358            break;
1359        }
1360        if b & 0x80 == 0 {
1361            return None;
1362        }
1363    }
1364    // skip remaining continuation bytes
1365    while b & 0x80 != 0 {
1366        b = *data.get(p)?;
1367        p = p.checked_add(1)?;
1368    }
1369    let table_off = table_idx.checked_mul(2)?;
1370    let table_start = p.checked_add(table_off)?;
1371    let table_end = table_start.checked_add(2)?;
1372    let rel = u16::from_le_bytes(data.get(table_start..table_end)?.try_into().ok()?) as usize;
1373    obj_offset.checked_add(rel)
1374}
1375
1376// ── Tests ─────────────────────────────────────────────────────────────────────
1377
1378#[cfg(test)]
1379mod tests {
1380    use super::*;
1381    use crate::writer::{NxsWriter, Schema};
1382
1383    fn make_nxb() -> Vec<u8> {
1384        let schema = Schema::new(&["id", "username", "score", "active"]);
1385        let mut w = NxsWriter::new(&schema);
1386        for (id, name, score, active) in [
1387            (1i64, "alice", 95.0f64, true),
1388            (2i64, "bob", 42.0f64, false),
1389            (3i64, "carol", 88.0f64, true),
1390            (4i64, "dave", 15.0f64, false),
1391            (5i64, "eve", 77.0f64, true),
1392        ] {
1393            w.begin_object();
1394            w.write_i64(crate::writer::Slot(0), id);
1395            w.write_str(crate::writer::Slot(1), name);
1396            w.write_f64(crate::writer::Slot(2), score);
1397            w.write_bool(crate::writer::Slot(3), active);
1398            w.end_object();
1399        }
1400        w.finish()
1401    }
1402
1403    #[test]
1404    fn reader_opens_and_counts() {
1405        let data = make_nxb();
1406        let r = Reader::new(&data).unwrap();
1407        assert_eq!(r.record_count(), 5);
1408        assert_eq!(r.keys(), &["id", "username", "score", "active"]);
1409    }
1410
1411    #[test]
1412    fn record_access_by_index() {
1413        let data = make_nxb();
1414        let r = Reader::new(&data).unwrap();
1415        let rec = r.record(2).unwrap();
1416        assert_eq!(rec.get_str("username"), Some("carol"));
1417        assert_eq!(rec.get_i64("id"), Some(3));
1418        assert!((rec.get_f64("score").unwrap() - 88.0).abs() < 1e-9);
1419        assert_eq!(rec.get_bool("active"), Some(true));
1420    }
1421
1422    #[test]
1423    fn all_iterates_every_record() {
1424        let data = make_nxb();
1425        let r = Reader::new(&data).unwrap();
1426        assert_eq!(r.all().count(), 5);
1427    }
1428
1429    #[test]
1430    fn where_eq_bool() {
1431        let data = make_nxb();
1432        let r = Reader::new(&data).unwrap();
1433        let active: Vec<_> = r
1434            .where_pred(eq("active", true))
1435            .map(|rec| rec.get_str("username").unwrap().to_owned())
1436            .collect();
1437        assert_eq!(active, vec!["alice", "carol", "eve"]);
1438    }
1439
1440    #[test]
1441    fn where_gt_f64() {
1442        let data = make_nxb();
1443        let r = Reader::new(&data).unwrap();
1444        let count = r.where_pred(gt("score", 80.0f64)).count();
1445        assert_eq!(count, 2); // alice(95) + carol(88)
1446    }
1447
1448    #[test]
1449    fn where_lt_f64() {
1450        let data = make_nxb();
1451        let r = Reader::new(&data).unwrap();
1452        let count = r.where_pred(lt("score", 50.0f64)).count();
1453        assert_eq!(count, 2); // bob(42) + dave(15)
1454    }
1455
1456    #[test]
1457    fn where_and() {
1458        let data = make_nxb();
1459        let r = Reader::new(&data).unwrap();
1460        let count = r
1461            .where_pred(And(eq("active", true), gt("score", 80.0f64)))
1462            .count();
1463        assert_eq!(count, 2); // alice + carol
1464    }
1465
1466    #[test]
1467    fn where_or() {
1468        let data = make_nxb();
1469        let r = Reader::new(&data).unwrap();
1470        let count = r
1471            .where_pred(Or(gt("score", 90.0f64), lt("score", 20.0f64)))
1472            .count();
1473        assert_eq!(count, 2); // alice(95) + dave(15)
1474    }
1475
1476    #[test]
1477    fn where_not() {
1478        let data = make_nxb();
1479        let r = Reader::new(&data).unwrap();
1480        let count = r.where_pred(Not(eq("active", true))).count();
1481        assert_eq!(count, 2); // bob + dave
1482    }
1483
1484    #[test]
1485    fn early_termination() {
1486        let data = make_nxb();
1487        let r = Reader::new(&data).unwrap();
1488        let first = r.all().next().unwrap();
1489        assert_eq!(first.get_str("username"), Some("alice"));
1490    }
1491
1492    #[test]
1493    fn unknown_key_matches_nothing() {
1494        let data = make_nxb();
1495        let r = Reader::new(&data).unwrap();
1496        assert_eq!(r.where_pred(eq("nonexistent", true)).count(), 0);
1497    }
1498
1499    #[test]
1500    fn get_str_path_single_segment() {
1501        let data = make_nxb();
1502        let r = Reader::new(&data).unwrap();
1503        let rec = r.record(0).unwrap();
1504        assert_eq!(rec.get_str_path("username"), Some("alice"));
1505    }
1506
1507    #[test]
1508    fn get_str_path_absent_returns_none() {
1509        let data = make_nxb();
1510        let r = Reader::new(&data).unwrap();
1511        let rec = r.record(0).unwrap();
1512        assert_eq!(rec.get_str_path("no.such.path"), None);
1513    }
1514
1515    fn make_columnar_nxb() -> Vec<u8> {
1516        use crate::layout::{finish_columnar, Cell, RecordRow};
1517        let keys = vec!["id".to_string(), "score".to_string(), "active".to_string()];
1518        let rows: Vec<RecordRow> = vec![
1519            RecordRow {
1520                cells: vec![Cell::I64(1), Cell::F64(95.0), Cell::Bool(true)],
1521            },
1522            RecordRow {
1523                cells: vec![Cell::I64(2), Cell::F64(42.0), Cell::Bool(false)],
1524            },
1525            RecordRow {
1526                cells: vec![Cell::I64(3), Cell::F64(88.0), Cell::Bool(true)],
1527            },
1528            RecordRow {
1529                cells: vec![Cell::I64(4), Cell::F64(15.0), Cell::Bool(false)],
1530            },
1531            RecordRow {
1532                cells: vec![Cell::I64(5), Cell::F64(77.0), Cell::Bool(true)],
1533            },
1534        ];
1535        finish_columnar(&keys, &rows).unwrap()
1536    }
1537
1538    #[test]
1539    fn columnar_where_pred_iterates_correctly() {
1540        let data = make_columnar_nxb();
1541        let r = Reader::new(&data).unwrap();
1542        assert_eq!(r.layout(), Layout::Columnar);
1543        assert_eq!(r.record_count(), 5);
1544
1545        // Test all() iterates every record without reading garbage
1546        assert_eq!(r.all().count(), 5);
1547
1548        // Test where_pred with eq on bool (Layout::Columnar must not use row tail-index)
1549        let active_ids: Vec<i64> = r
1550            .where_pred(eq("active", true))
1551            .filter_map(|rec| rec.get_i64("id"))
1552            .collect();
1553        assert_eq!(active_ids, vec![1, 3, 5]);
1554
1555        // Test where_pred with gt on f64
1556        let high_score_ids: Vec<i64> = r
1557            .where_pred(gt("score", 80.0f64))
1558            .filter_map(|rec| rec.get_i64("id"))
1559            .collect();
1560        assert_eq!(high_score_ids, vec![1, 3]);
1561
1562        // Test that record values are correct (not garbage from a bad offset)
1563        let rec = r.record(2).unwrap();
1564        assert_eq!(rec.get_i64("id"), Some(3));
1565        assert!((rec.get_f64("score").unwrap() - 88.0).abs() < 1e-9);
1566        assert_eq!(rec.get_bool("active"), Some(true));
1567    }
1568
1569    #[test]
1570    fn walk_path_deep_segments_returns_none_not_wrong_key() {
1571        // Before the fix, splitn(8, '.') would concatenate segments 8+ into "seg8.seg9..."
1572        // and try to look up a key with a literal dot in its name — returning None silently
1573        // rather than traversing correctly. With plain split('.') every segment is looked
1574        // up individually, so a path with 9 non-existent segments correctly returns None.
1575        let data = make_nxb();
1576        let r = Reader::new(&data).unwrap();
1577        let rec = r.record(0).unwrap();
1578
1579        // 9-segment path — all segments non-existent, must return None (not garbage)
1580        let deep = "a.b.c.d.e.f.g.h.i"; // 9 segments
1581        assert_eq!(rec.get_str_path(deep), None);
1582        assert_eq!(rec.get_i64_path(deep), None);
1583
1584        // 10-segment path
1585        let deeper = "a.b.c.d.e.f.g.h.i.j"; // 10 segments
1586        assert_eq!(rec.get_str_path(deeper), None);
1587
1588        // Existing single-level key at depth 1 must still work
1589        assert_eq!(rec.get_str_path("username"), Some("alice"));
1590    }
1591
1592    #[test]
1593    fn columnar_conformance_vector_col_sum() {
1594        let path = std::path::Path::new(env!("CARGO_MANIFEST_DIR"))
1595            .join("../conformance/columnar_flat8_dense_100.nxb");
1596        let data = match std::fs::read(&path) {
1597            Ok(d) => d,
1598            Err(_) => return,
1599        };
1600        let r = Reader::new(&data).unwrap();
1601        assert_eq!(r.layout(), Layout::Columnar);
1602        assert_eq!(r.record_count(), 100);
1603        let sum = r.col_sum_f64("score").unwrap();
1604        let want: f64 = (0..100).map(|i| i as f64 * 0.5).sum();
1605        assert!((sum - want).abs() < 1e-9, "sum {sum} want {want}");
1606        let buf = r.col_buffer("score").unwrap();
1607        assert_eq!(buf.len(), 100 * 8);
1608    }
1609
1610    #[test]
1611    fn columnar_strings_roundtrip() {
1612        use crate::layout::{finish_columnar, null_bitmap_bytes, var_str_at, Cell, RecordRow};
1613
1614        let keys = vec!["id".into(), "name".into(), "score".into()];
1615        let mut rows = Vec::new();
1616        for i in 0..100usize {
1617            rows.push(RecordRow {
1618                cells: vec![
1619                    Cell::I64(i as i64),
1620                    Cell::Str(format!("user_{i}")),
1621                    Cell::F64(i as f64 * 1.25),
1622                ],
1623            });
1624        }
1625        let bytes = finish_columnar(&keys, &rows).unwrap();
1626        let r = Reader::new(&bytes).unwrap();
1627        assert_eq!(r.record_count(), 100);
1628        for i in 0..100 {
1629            let rec = r.record(i).unwrap();
1630            assert_eq!(rec.get_i64("id"), Some(i as i64));
1631            let want = format!("user_{i}");
1632            assert_eq!(rec.get_str("name"), Some(want.as_str()));
1633            assert!((rec.get_f64("score").unwrap() - i as f64 * 1.25).abs() < 1e-9);
1634        }
1635        let (bm, offsets, values) = r.col_field_var_parts(1).unwrap();
1636        assert_eq!(bm.len(), null_bitmap_bytes(100));
1637        assert_eq!(offsets.len(), 101 * 4);
1638        assert!(!values.is_empty());
1639        assert_eq!(var_str_at(offsets, values, 42), Some("user_42"));
1640    }
1641
1642    #[test]
1643    fn pax_strings_roundtrip_across_pages() {
1644        use crate::layout::{finish_pax, Cell, RecordRow};
1645
1646        let keys = vec!["id".into(), "name".into(), "score".into()];
1647        let rows: Vec<RecordRow> = (0..300usize)
1648            .map(|i| RecordRow {
1649                cells: vec![
1650                    Cell::I64(i as i64),
1651                    Cell::Str(format!("user_{i}")),
1652                    Cell::F64(i as f64),
1653                ],
1654            })
1655            .collect();
1656        let bytes = finish_pax(&keys, &rows, 128).unwrap();
1657        let r = Reader::new(&bytes).unwrap();
1658        assert_eq!(r.record_count(), 300);
1659        for i in [0usize, 127, 128, 257, 299] {
1660            let rec = r.record(i).unwrap();
1661            let want = format!("user_{i}");
1662            assert_eq!(rec.get_str("name"), Some(want.as_str()));
1663            assert_eq!(rec.get_i64("id"), Some(i as i64));
1664        }
1665    }
1666}