1use crate::column_prefetch::ColumnWarmState;
17use crate::error::{NxsError, Result};
18use crate::layout::{
19 col_var_parts, column_sector_len, is_var_sigil, null_bitmap_bytes, var_str_at,
20};
21use crate::prefetch::PrefetchEngine;
22
23pub use crate::prefetch::{AccessHint, CacheStats, OpenOptions};
24
25use crate::consts::{
27 FLAG_COLUMNAR, FLAG_PAX, FLAG_SCHEMA_EMBEDDED, MAGIC_FILE, MAGIC_FOOTER, MAGIC_OBJ,
28};
29
30#[derive(Debug, Clone, Copy, PartialEq, Eq)]
32pub enum Layout {
33 Row,
34 Columnar,
35 Pax,
36}
37
38const PAX_TAIL_ENTRY_BYTES: usize = 28;
40
41fn footer_size(flags: u16) -> usize {
42 if flags & FLAG_PAX != 0 {
43 28
44 } else if flags & FLAG_COLUMNAR != 0 {
45 20
46 } else {
47 12
48 }
49}
50
51fn col_bit(bm: &[u8], rec: usize) -> bool {
52 (bm[rec / 8] >> (rec % 8)) & 1 == 1
53}
54
55pub struct Reader<'a> {
62 data: &'a [u8],
63 keys: Vec<String>,
64 key_sigils: Vec<u8>,
65 key_index: std::collections::HashMap<String, usize>,
66 record_count: usize,
67 tail_start: usize,
68 layout: Layout,
69 col_buf_off: Vec<u64>,
70 col_buf_len: Vec<u64>,
71 prefetch: Option<PrefetchEngine>,
72 column: ColumnWarmState,
73}
74
75impl<'a> Reader<'a> {
76 pub fn new(data: &'a [u8]) -> Result<Self> {
78 if data.len() < 32 {
79 return Err(NxsError::OutOfBounds);
80 }
81 if u32::from_le_bytes(data[0..4].try_into().map_err(|_| NxsError::OutOfBounds)?)
82 != MAGIC_FILE
83 {
84 return Err(NxsError::BadMagic);
85 }
86 if u32::from_le_bytes(
87 data[data.len() - 4..]
88 .try_into()
89 .map_err(|_| NxsError::OutOfBounds)?,
90 ) != MAGIC_FOOTER
91 {
92 return Err(NxsError::BadMagic);
93 }
94
95 let flags = u16::from_le_bytes(data[6..8].try_into().map_err(|_| NxsError::OutOfBounds)?);
96 if flags & FLAG_COLUMNAR != 0 && flags & FLAG_PAX != 0 {
97 return Err(NxsError::InvalidFlags);
98 }
99 let preamble_tail =
100 u64::from_le_bytes(data[16..24].try_into().map_err(|_| NxsError::OutOfBounds)?);
101 if flags & FLAG_COLUMNAR != 0 && preamble_tail == 0 {
102 return Err(NxsError::IncompatibleFlags);
103 }
104
105 let (keys, key_sigils, _schema_end) = if flags & FLAG_SCHEMA_EMBEDDED != 0 {
106 parse_schema(data, 32)?
107 } else {
108 (vec![], vec![], 32)
109 };
110
111 let key_index: std::collections::HashMap<String, usize> = keys
112 .iter()
113 .enumerate()
114 .map(|(i, k)| (k.clone(), i))
115 .collect();
116
117 let (layout, record_count, tail_start, col_buf_off, col_buf_len) = if flags & FLAG_COLUMNAR
118 != 0
119 {
120 let footer = footer_size(flags);
121 let fo = data.len() - footer;
122 let tail_ptr = u64::from_le_bytes(
123 data[fo..fo + 8]
124 .try_into()
125 .map_err(|_| NxsError::OutOfBounds)?,
126 ) as usize;
127 let record_count = u64::from_le_bytes(
128 data[fo + 8..fo + 16]
129 .try_into()
130 .map_err(|_| NxsError::OutOfBounds)?,
131 ) as usize;
132 let kc = keys.len();
133 let tail_end = tail_ptr
134 .checked_add(kc.checked_mul(20).ok_or(NxsError::OutOfBounds)?)
135 .ok_or(NxsError::OutOfBounds)?;
136 if tail_ptr >= fo || tail_end > fo {
137 return Err(NxsError::OutOfBounds);
138 }
139 let mut off = vec![0u64; kc];
140 let mut len = vec![0u64; kc];
141 for i in 0..kc {
142 let e = tail_ptr + i * 20;
143 let fid = u16::from_le_bytes(
144 data[e..e + 2]
145 .try_into()
146 .map_err(|_| NxsError::OutOfBounds)?,
147 ) as usize;
148 if fid >= kc {
149 return Err(NxsError::OutOfBounds);
150 }
151 off[fid] = u64::from_le_bytes(
152 data[e + 4..e + 12]
153 .try_into()
154 .map_err(|_| NxsError::OutOfBounds)?,
155 );
156 len[fid] = u64::from_le_bytes(
157 data[e + 12..e + 20]
158 .try_into()
159 .map_err(|_| NxsError::OutOfBounds)?,
160 );
161 }
162 (Layout::Columnar, record_count, tail_ptr, off, len)
163 } else if flags & FLAG_PAX != 0 {
164 let footer = footer_size(flags);
165 let fo = data.len() - footer;
166 let tail_ptr = u64::from_le_bytes(
167 data[fo..fo + 8]
168 .try_into()
169 .map_err(|_| NxsError::OutOfBounds)?,
170 ) as usize;
171 let record_count = u64::from_le_bytes(
172 data[fo + 8..fo + 16]
173 .try_into()
174 .map_err(|_| NxsError::OutOfBounds)?,
175 ) as usize;
176 (Layout::Pax, record_count, tail_ptr, vec![], vec![])
177 } else {
178 let mut tail_ptr = usize::try_from(preamble_tail).map_err(|_| NxsError::OutOfBounds)?;
179 if tail_ptr == 0 {
180 if data.len() < 44 {
181 return Err(NxsError::OutOfBounds);
182 }
183 tail_ptr = u64::from_le_bytes(
184 data[data.len() - 12..data.len() - 4]
185 .try_into()
186 .map_err(|_| NxsError::OutOfBounds)?,
187 ) as usize;
188 }
189 if tail_ptr > data.len().saturating_sub(4) {
190 return Err(NxsError::OutOfBounds);
191 }
192 let record_count =
193 u32::from_le_bytes(data[tail_ptr..tail_ptr + 4].try_into().unwrap()) as usize;
194 (Layout::Row, record_count, tail_ptr + 4, vec![], vec![])
195 };
196
197 Ok(Self {
198 data,
199 keys,
200 key_sigils,
201 key_index,
202 record_count,
203 tail_start,
204 layout,
205 col_buf_off,
206 col_buf_len,
207 prefetch: None,
208 column: ColumnWarmState::default(),
209 })
210 }
211
212 pub fn with_options(data: &'a [u8], options: OpenOptions) -> Result<Self> {
214 options.validate()?;
215 let mut reader = Self::new(data)?;
216 if reader.layout == Layout::Row {
217 let prefetch = PrefetchEngine::new(options, data.len());
218 if prefetch.strategy() == crate::prefetch::PrefetchStrategy::Eager {
219 prefetch.start_eager_background(data.to_vec(), reader.tail_start);
220 }
221 reader.prefetch = Some(prefetch);
222 }
223 Ok(reader)
224 }
225
226 pub fn warmup(&self) {
228 if let Some(prefetch) = &self.prefetch {
229 prefetch.warmup();
230 }
231 }
232
233 pub fn pause_prefetch(&self) {
235 if let Some(prefetch) = &self.prefetch {
236 prefetch.pause_prefetch();
237 }
238 }
239
240 pub fn resume_prefetch(&self) {
242 if let Some(prefetch) = &self.prefetch {
243 prefetch.resume_prefetch();
244 }
245 }
246
247 pub fn prefetch_column(&self, key: &str) -> Result<()> {
249 if self.layout != Layout::Columnar {
250 return Err(NxsError::UnsupportedLayout);
251 }
252 let slot = *self
253 .key_index
254 .get(key)
255 .ok_or_else(|| NxsError::ParseError(format!("key not found: {key}")))?;
256 let off = *self.col_buf_off.get(slot).ok_or(NxsError::OutOfBounds)? as usize;
257 let len = *self.col_buf_len.get(slot).ok_or(NxsError::OutOfBounds)? as usize;
258 let end = off.checked_add(len).ok_or(NxsError::OutOfBounds)?;
259 if end > self.data.len() {
260 return Err(NxsError::OutOfBounds);
261 }
262 if self.column.prefetch(slot) {
263 const PAGE: usize = 4096;
264 let sector = &self.data[off..end];
265 for page_start in (0..sector.len()).step_by(PAGE) {
266 std::hint::black_box(sector[page_start]);
267 }
268 }
269 Ok(())
270 }
271
272 pub fn prefetch_viewport(&self, start_index: usize, end_index: usize) -> Result<()> {
274 if self.layout != Layout::Row {
275 return Ok(());
276 }
277 if let Some(prefetch) = &self.prefetch {
278 prefetch.prefetch_viewport(
279 self.data,
280 self.tail_start,
281 self.record_count,
282 start_index,
283 end_index,
284 );
285 }
286 Ok(())
287 }
288
289 pub fn cache_stats(&self) -> CacheStats {
292 let mut stats = if let Some(prefetch) = &self.prefetch {
293 prefetch.cache_stats()
294 } else {
295 CacheStats {
296 pages_cached: 0,
297 pages_max: 0,
298 memory_used_bytes: 0,
299 cache_hits: 0,
300 cache_misses: 0,
301 fetches_issued: 0,
302 column_fetches_issued: 0,
303 strategy: "disabled".to_string(),
304 pattern: "unknown".to_string(),
305 }
306 };
307 if self.layout == Layout::Columnar {
308 stats.column_fetches_issued = self.column.fetches();
309 }
310 stats
311 }
312
313 fn touch_record_page(&self, index: usize) {
314 if self.layout != Layout::Row {
315 return;
316 }
317 let Some(prefetch) = &self.prefetch else {
318 return;
319 };
320 prefetch.on_access(self.data, self.tail_start, self.record_count, index);
321 }
322
323 pub fn layout(&self) -> Layout {
325 self.layout
326 }
327
328 pub fn col_sum_f64(&self, key: &str) -> Option<f64> {
330 let slot = self.slot(key)?;
331 match self.layout {
332 Layout::Row => {
333 let mut sum = 0.0;
334 let mut any = false;
335 for rec in self.all() {
336 if let Some(v) = rec.get_f64(key) {
337 sum += v;
338 any = true;
339 }
340 }
341 any.then_some(sum)
342 }
343 Layout::Columnar => {
344 let (bm, vals) = self.col_field_parts(slot).ok()?;
345 Some(crate::col_reduce::sum_f64_column(
346 vals,
347 bm,
348 self.record_count,
349 ))
350 }
351 Layout::Pax => {
352 let mut sum = 0.0;
353 for i in 0..self.record_count {
354 if let Some(v) = self.pax_get_f64(i, slot) {
355 sum += v;
356 }
357 }
358 Some(sum)
359 }
360 }
361 }
362
363 pub fn col_buffer(&self, key: &str) -> Option<&[u8]> {
365 if self.layout != Layout::Columnar {
366 return None;
367 }
368 let slot = self.slot(key)?;
369 if is_var_sigil(self.key_sigils.get(slot).copied().unwrap_or(0)) {
370 return None;
371 }
372 let (_, vals) = self.col_field_parts(slot).ok()?;
373 Some(vals)
374 }
375
376 pub fn col_var_buffer(&self, key: &str) -> Result<crate::arrow_project::VarColumnView<'_>> {
378 if self.layout != Layout::Columnar {
379 return Err(NxsError::UnsupportedFieldType);
380 }
381 let slot = self.slot(key).ok_or(NxsError::OutOfBounds)?;
382 if !is_var_sigil(self.key_sigils.get(slot).copied().unwrap_or(0)) {
383 return Err(NxsError::UnsupportedFieldType);
384 }
385 let (bm, offsets, values) = self.col_field_var_parts(slot)?;
386 Ok(crate::arrow_project::VarColumnView {
387 null_bitmap: bm,
388 offsets,
389 values,
390 record_count: self.record_count,
391 })
392 }
393
394 fn pax_column_sector(&self, page_idx: usize, slot: usize) -> Result<&[u8]> {
395 const MAGIC_PAGE: u32 = 0x4E58_5350;
396 let e = page_idx
397 .checked_mul(PAX_TAIL_ENTRY_BYTES)
398 .and_then(|n| self.tail_start.checked_add(n))
399 .ok_or(NxsError::OutOfBounds)?;
400 let page_off_start = e.checked_add(16).ok_or(NxsError::OutOfBounds)?;
401 let page_off_end = e.checked_add(24).ok_or(NxsError::OutOfBounds)?;
402 let poff = u64::from_le_bytes(
403 self.data
404 .get(page_off_start..page_off_end)
405 .ok_or(NxsError::OutOfBounds)?
406 .try_into()
407 .map_err(|_| NxsError::OutOfBounds)?,
408 ) as usize;
409 if poff > self.data.len().saturating_sub(24) {
410 return Err(NxsError::OutOfBounds);
411 }
412 if u32::from_le_bytes(
413 self.data[poff..poff + 4]
414 .try_into()
415 .map_err(|_| NxsError::OutOfBounds)?,
416 ) != MAGIC_PAGE
417 {
418 return Err(NxsError::InvalidPageMagic);
419 }
420 let rc = u32::from_le_bytes(
421 self.data[poff + 16..poff + 20]
422 .try_into()
423 .map_err(|_| NxsError::OutOfBounds)?,
424 ) as usize;
425 let field_count = u16::from_le_bytes(
426 self.data[poff + 20..poff + 22]
427 .try_into()
428 .map_err(|_| NxsError::OutOfBounds)?,
429 ) as usize;
430 if slot >= field_count {
431 return Err(NxsError::OutOfBounds);
432 }
433 let mut body = poff.checked_add(24).ok_or(NxsError::OutOfBounds)?;
434 for fi in 0..slot {
435 if body > self.data.len() {
436 return Err(NxsError::OutOfBounds);
437 }
438 let sig = self.key_sigils.get(fi).copied().unwrap_or(b'=');
439 let slen = column_sector_len(&self.data[body..], rc, sig)?;
440 body = body.checked_add(slen).ok_or(NxsError::OutOfBounds)?;
441 }
442 if body > self.data.len() {
443 return Err(NxsError::OutOfBounds);
444 }
445 let sig = self.key_sigils.get(slot).copied().unwrap_or(b'=');
446 let slen = column_sector_len(&self.data[body..], rc, sig)?;
447 if body > self.data.len().saturating_sub(slen) {
448 return Err(NxsError::OutOfBounds);
449 }
450 Ok(&self.data[body..body + slen])
451 }
452
453 fn pax_page_field_var_parts(
454 &self,
455 page_idx: usize,
456 slot: usize,
457 ) -> Result<(&[u8], &[u8], &[u8])> {
458 let sector = self.pax_column_sector(page_idx, slot)?;
459 let rc = self
460 .pax_page_rec_count(page_idx)
461 .ok_or(NxsError::OutOfBounds)? as usize;
462 col_var_parts(sector, rc)
463 }
464
465 fn pax_locate_record(&self, record_index: usize) -> Option<(usize, usize)> {
466 let mut lo = 0i32;
467 let mut hi = self.page_count().saturating_sub(1) as i32;
468 while lo <= hi {
469 let mid = ((lo + hi) / 2) as usize;
470 let start = self.pax_page_rec_start(mid)?;
471 let count = self.pax_page_rec_count(mid)?;
472 if (record_index as u64) < start {
473 hi = mid as i32 - 1;
474 } else if record_index >= start as usize + count as usize {
475 lo = mid as i32 + 1;
476 } else {
477 let local = record_index - start as usize;
478 return Some((mid, local));
479 }
480 }
481 None
482 }
483
484 fn pax_get_f64(&self, record_index: usize, slot: usize) -> Option<f64> {
485 let (pi, local) = self.pax_locate_record(record_index)?;
486 if is_var_sigil(*self.key_sigils.get(slot)?) {
487 return None;
488 }
489 let (bm, vals) = self.pax_page_field_parts(pi, slot).ok()?;
490 if !col_bit(bm, local) {
491 return None;
492 }
493 let off = local * 8;
494 Some(f64::from_le_bytes(vals.get(off..off + 8)?.try_into().ok()?))
495 }
496
497 fn pax_get_i64(&self, record_index: usize, slot: usize) -> Option<i64> {
498 let (pi, local) = self.pax_locate_record(record_index)?;
499 if is_var_sigil(*self.key_sigils.get(slot)?) {
500 return None;
501 }
502 let (bm, vals) = self.pax_page_field_parts(pi, slot).ok()?;
503 if !col_bit(bm, local) {
504 return None;
505 }
506 let off = local * 8;
507 Some(i64::from_le_bytes(vals.get(off..off + 8)?.try_into().ok()?))
508 }
509
510 fn pax_get_bool(&self, record_index: usize, slot: usize) -> Option<bool> {
511 let (pi, local) = self.pax_locate_record(record_index)?;
512 if is_var_sigil(*self.key_sigils.get(slot)?) {
513 return None;
514 }
515 let (bm, vals) = self.pax_page_field_parts(pi, slot).ok()?;
516 if !col_bit(bm, local) {
517 return None;
518 }
519 Some(vals.get(local * 8)? != &0)
520 }
521
522 fn pax_get_str(&self, record_index: usize, slot: usize) -> Option<&str> {
523 let (pi, local) = self.pax_locate_record(record_index)?;
524 if self.key_sigils.get(slot).copied() != Some(b'"') {
525 return None;
526 }
527 let (bm, offsets, values) = self.pax_page_field_var_parts(pi, slot).ok()?;
528 if !col_bit(bm, local) {
529 return None;
530 }
531 var_str_at(offsets, values, local)
532 }
533
534 fn page_count(&self) -> usize {
535 if self.layout != Layout::Pax {
536 return 0;
537 }
538 let tp = self.tail_start;
539 if tp > self.data.len().saturating_sub(4) {
540 return 0;
541 }
542 let fo = self.data.len() - footer_size(FLAG_PAX);
544 u32::from_le_bytes(self.data[fo + 16..fo + 20].try_into().unwrap_or([0; 4])) as usize
545 }
546
547 fn pax_page_rec_start(&self, page_idx: usize) -> Option<u64> {
548 let e = page_idx
549 .checked_mul(PAX_TAIL_ENTRY_BYTES)
550 .and_then(|n| self.tail_start.checked_add(n))?;
551 let start = e.checked_add(4)?;
552 let end = e.checked_add(12)?;
553 Some(u64::from_le_bytes(
554 self.data.get(start..end)?.try_into().ok()?,
555 ))
556 }
557
558 fn pax_page_rec_count(&self, page_idx: usize) -> Option<u32> {
559 let e = page_idx
560 .checked_mul(PAX_TAIL_ENTRY_BYTES)
561 .and_then(|n| self.tail_start.checked_add(n))?;
562 let start = e.checked_add(12)?;
563 let end = e.checked_add(16)?;
564 Some(u32::from_le_bytes(
565 self.data.get(start..end)?.try_into().ok()?,
566 ))
567 }
568
569 fn pax_page_field_parts(&self, page_idx: usize, slot: usize) -> Result<(&[u8], &[u8])> {
570 let sector = self.pax_column_sector(page_idx, slot)?;
571 let rc = self
572 .pax_page_rec_count(page_idx)
573 .ok_or(NxsError::OutOfBounds)? as usize;
574 let bm_len = null_bitmap_bytes(rc);
575 if sector.len() < bm_len {
576 return Err(NxsError::OutOfBounds);
577 }
578 let vals_end = bm_len + rc * 8;
579 if sector.len() < vals_end {
580 return Err(NxsError::OutOfBounds);
581 }
582 Ok((§or[..bm_len], §or[bm_len..vals_end]))
583 }
584
585 pub fn col_field_var_parts(&self, slot: usize) -> Result<(&[u8], &[u8], &[u8])> {
587 let off = *self.col_buf_off.get(slot).ok_or(NxsError::OutOfBounds)? as usize;
588 let len = *self.col_buf_len.get(slot).ok_or(NxsError::OutOfBounds)? as usize;
589 let end = off.checked_add(len).ok_or(NxsError::OutOfBounds)?;
590 if end > self.data.len() {
591 return Err(NxsError::OutOfBounds);
592 }
593 col_var_parts(&self.data[off..end], self.record_count)
594 }
595
596 fn col_field_parts(&self, slot: usize) -> Result<(&[u8], &[u8])> {
597 if self
598 .key_sigils
599 .get(slot)
600 .copied()
601 .map(is_var_sigil)
602 .unwrap_or(false)
603 {
604 return Err(NxsError::UnsupportedFieldType);
605 }
606 let off = *self.col_buf_off.get(slot).ok_or(NxsError::OutOfBounds)? as usize;
607 let len = *self.col_buf_len.get(slot).ok_or(NxsError::OutOfBounds)? as usize;
608 let end = off.checked_add(len).ok_or(NxsError::OutOfBounds)?;
609 if end > self.data.len() {
610 return Err(NxsError::OutOfBounds);
611 }
612 let bm_len = null_bitmap_bytes(self.record_count);
613 let vals_len = self.record_count.saturating_mul(8);
614 let vals_end = bm_len.checked_add(vals_len).ok_or(NxsError::OutOfBounds)?;
615 if len < vals_end {
616 return Err(NxsError::OutOfBounds);
617 }
618 let sector = &self.data[off..end];
619 Ok((§or[..bm_len], §or[bm_len..vals_end]))
620 }
621
622 pub fn record_count(&self) -> usize {
624 self.record_count
625 }
626
627 pub fn keys(&self) -> &[String] {
629 &self.keys
630 }
631
632 pub fn key_sigils(&self) -> &[u8] {
634 &self.key_sigils
635 }
636
637 pub fn slot(&self, key: &str) -> Option<usize> {
639 self.key_index.get(key).copied()
640 }
641
642 pub fn record(&self, i: usize) -> Option<Record<'a, '_>> {
644 if i >= self.record_count {
645 return None;
646 }
647 self.touch_record_page(i);
648 let offset = if self.layout == Layout::Row {
649 let entry = self.tail_start + i * 10;
650 u64::from_le_bytes(self.data.get(entry + 2..entry + 10)?.try_into().ok()?) as usize
651 } else {
652 i
653 };
654 Some(Record {
655 data: self.data,
656 reader: self,
657 offset,
658 })
659 }
660
661 pub fn all(&'a self) -> Records<'a, 'a, AlwaysTrue> {
663 Records {
664 reader: self,
665 pred: AlwaysTrue,
666 index: 0,
667 }
668 }
669
670 pub fn where_pred<P: Predicate>(&'a self, pred: P) -> Records<'a, 'a, P> {
672 Records {
673 reader: self,
674 pred,
675 index: 0,
676 }
677 }
678}
679
680pub struct Record<'data, 'reader> {
685 data: &'data [u8],
686 reader: &'reader Reader<'data>,
687 offset: usize,
688}
689
690impl<'data, 'reader> Record<'data, 'reader> {
691 fn resolve(&self, slot: usize) -> Option<usize> {
693 resolve_slot(self.data, self.offset, slot)
694 }
695
696 pub fn get_i64(&self, key: &str) -> Option<i64> {
698 let slot = self.reader.slot(key)?;
699 match self.reader.layout {
700 Layout::Columnar => {
701 if is_var_sigil(*self.reader.key_sigils.get(slot)?) {
702 return None;
703 }
704 let ri = self.offset;
705 let (bm, vals) = self.reader.col_field_parts(slot).ok()?;
706 if !col_bit(bm, ri) {
707 return None;
708 }
709 let off = ri * 8;
710 Some(i64::from_le_bytes(vals.get(off..off + 8)?.try_into().ok()?))
711 }
712 Layout::Pax => self.reader.pax_get_i64(self.offset, slot),
713 Layout::Row => {
714 let off = self.resolve(slot)?;
715 Some(i64::from_le_bytes(
716 self.data.get(off..off + 8)?.try_into().ok()?,
717 ))
718 }
719 }
720 }
721
722 pub fn get_f64(&self, key: &str) -> Option<f64> {
724 let slot = self.reader.slot(key)?;
725 if self.reader.layout == Layout::Columnar {
726 if is_var_sigil(*self.reader.key_sigils.get(slot)?) {
727 return None;
728 }
729 let ri = self.offset;
730 let (bm, vals) = self.reader.col_field_parts(slot).ok()?;
731 if !col_bit(bm, ri) {
732 return None;
733 }
734 let off = ri * 8;
735 return Some(f64::from_le_bytes(vals.get(off..off + 8)?.try_into().ok()?));
736 }
737 if self.reader.layout == Layout::Pax {
738 return self.reader.pax_get_f64(self.offset, slot);
739 }
740 let off = self.resolve(slot)?;
741 Some(f64::from_le_bytes(
742 self.data.get(off..off + 8)?.try_into().ok()?,
743 ))
744 }
745
746 pub fn get_bool(&self, key: &str) -> Option<bool> {
748 let slot = self.reader.slot(key)?;
749 match self.reader.layout {
750 Layout::Columnar => {
751 if is_var_sigil(*self.reader.key_sigils.get(slot)?) {
752 return None;
753 }
754 let ri = self.offset;
755 let (bm, vals) = self.reader.col_field_parts(slot).ok()?;
756 if !col_bit(bm, ri) {
757 return None;
758 }
759 Some(vals.get(ri * 8)? != &0)
760 }
761 Layout::Pax => self.reader.pax_get_bool(self.offset, slot),
762 Layout::Row => {
763 let off = self.resolve(slot)?;
764 Some(*self.data.get(off)? != 0)
765 }
766 }
767 }
768
769 pub fn get_str(&self, key: &str) -> Option<&str> {
771 let slot = self.reader.slot(key)?;
772 match self.reader.layout {
773 Layout::Columnar => {
774 if self.reader.key_sigils.get(slot).copied() != Some(b'"') {
775 return None;
776 }
777 let ri = self.offset;
778 let (bm, offsets, values) = self.reader.col_field_var_parts(slot).ok()?;
779 if !col_bit(bm, ri) {
780 return None;
781 }
782 var_str_at(offsets, values, ri)
783 }
784 Layout::Pax => self.reader.pax_get_str(self.offset, slot),
785 Layout::Row => {
786 let off = self.resolve(slot)?;
787 let len =
788 u32::from_le_bytes(self.data.get(off..off + 4)?.try_into().ok()?) as usize;
789 let bytes = self.data.get(off + 4..off + 4 + len)?;
790 std::str::from_utf8(bytes).ok()
791 }
792 }
793 }
794
795 pub fn get_str_path(&self, dot_path: &str) -> Option<&str> {
798 let (leaf_off, data) = self.walk_path(dot_path)?;
799 let len = u32::from_le_bytes(data.get(leaf_off..leaf_off + 4)?.try_into().ok()?) as usize;
800 let bytes = data.get(leaf_off + 4..leaf_off + 4 + len)?;
801 std::str::from_utf8(bytes).ok()
802 }
803
804 pub fn get_i64_path(&self, dot_path: &str) -> Option<i64> {
806 let (off, data) = self.walk_path(dot_path)?;
807 Some(i64::from_le_bytes(data.get(off..off + 8)?.try_into().ok()?))
808 }
809
810 pub fn get_f64_path(&self, dot_path: &str) -> Option<f64> {
812 let (off, data) = self.walk_path(dot_path)?;
813 Some(f64::from_le_bytes(data.get(off..off + 8)?.try_into().ok()?))
814 }
815
816 pub fn get_bool_path(&self, dot_path: &str) -> Option<bool> {
818 let (off, data) = self.walk_path(dot_path)?;
819 Some(*data.get(off)? != 0)
820 }
821
822 fn walk_path(&self, dot_path: &str) -> Option<(usize, &'data [u8])> {
824 let mut parts = dot_path.split('.');
828 let mut obj_offset = self.offset;
829 let data = self.data;
830 let mut part = parts.next()?;
831 loop {
832 let slot = self.reader.slot(part)?;
833 let field_off = resolve_slot(data, obj_offset, slot)?;
834 match parts.next() {
835 None => return Some((field_off, data)),
836 Some(next) => {
837 let magic =
839 u32::from_le_bytes(data.get(field_off..field_off + 4)?.try_into().ok()?);
840 if magic != MAGIC_OBJ {
841 return None;
842 }
843 obj_offset = field_off;
844 part = next;
845 }
846 }
847 }
848 }
849}
850
851pub struct Records<'data, 'reader, P: Predicate> {
856 reader: &'reader Reader<'data>,
857 pred: P,
858 index: usize,
859}
860
861impl<'data, 'reader, P: Predicate> Iterator for Records<'data, 'reader, P> {
862 type Item = Record<'data, 'reader>;
863
864 fn next(&mut self) -> Option<Self::Item> {
865 let r = self.reader;
866 loop {
867 if self.index >= r.record_count {
868 return None;
869 }
870 let i = self.index;
871 self.index += 1;
872 let abs = match r.layout {
877 Layout::Row => {
878 let entry = r.tail_start + i * 10;
879 u64::from_le_bytes(r.data.get(entry + 2..entry + 10)?.try_into().ok()?) as usize
880 }
881 Layout::Columnar | Layout::Pax => i,
882 };
883 if self.pred.test(r.data, r, abs) {
884 return Some(Record {
885 data: r.data,
886 reader: r,
887 offset: abs,
888 });
889 }
890 }
891 }
892}
893
894pub trait Predicate {
898 fn test(&self, data: &[u8], reader: &Reader<'_>, obj_offset: usize) -> bool;
899}
900
901pub struct AlwaysTrue;
903impl Predicate for AlwaysTrue {
904 fn test(&self, _: &[u8], _: &Reader<'_>, _: usize) -> bool {
905 true
906 }
907}
908
909pub struct Eq<'k, V> {
911 pub key: &'k str,
912 pub value: V,
913}
914
915pub fn eq<'k, V>(key: &'k str, value: V) -> crate::query::Eq<'k, V> {
916 crate::query::Eq { key, value }
917}
918
919#[allow(dead_code)]
923fn row_field_offset(data: &[u8], reader: &Reader<'_>, off: usize, slot: usize) -> Option<usize> {
924 match reader.layout {
927 Layout::Row => resolve_slot(data, off, slot),
928 Layout::Columnar | Layout::Pax => None,
929 }
930}
931
932impl Predicate for Eq<'_, bool> {
933 fn test(&self, data: &[u8], reader: &Reader<'_>, off: usize) -> bool {
934 let Some(slot) = reader.slot(self.key) else {
935 return false;
936 };
937 match reader.layout {
938 Layout::Columnar => {
939 if is_var_sigil(*reader.key_sigils.get(slot).unwrap_or(&0)) {
940 return false;
941 }
942 let Ok((bm, vals)) = reader.col_field_parts(slot) else {
943 return false;
944 };
945 if !col_bit(bm, off) {
946 return false;
947 }
948 vals.get(off * 8)
949 .map(|&b| (b != 0) == self.value)
950 .unwrap_or(false)
951 }
952 Layout::Pax => reader
953 .pax_get_bool(off, slot)
954 .map(|v| v == self.value)
955 .unwrap_or(false),
956 Layout::Row => {
957 let Some(foff) = resolve_slot(data, off, slot) else {
958 return false;
959 };
960 data.get(foff)
961 .map(|&b| (b != 0) == self.value)
962 .unwrap_or(false)
963 }
964 }
965 }
966}
967
968impl<'k> Predicate for Eq<'k, &str> {
969 fn test(&self, data: &[u8], reader: &Reader<'_>, off: usize) -> bool {
970 let Some(slot) = reader.slot(self.key) else {
971 return false;
972 };
973 match reader.layout {
974 Layout::Columnar => reader
975 .col_field_var_parts(slot)
976 .ok()
977 .and_then(|(bm, offsets, values)| {
978 if !col_bit(bm, off) {
979 return None;
980 }
981 crate::layout::var_str_at(offsets, values, off)
982 })
983 .map(|s| s == self.value)
984 .unwrap_or(false),
985 Layout::Pax => reader
986 .pax_get_str(off, slot)
987 .map(|s| s == self.value)
988 .unwrap_or(false),
989 Layout::Row => {
990 let Some(foff) = resolve_slot(data, off, slot) else {
991 return false;
992 };
993 let Some(len_bytes) = data.get(foff..foff + 4) else {
994 return false;
995 };
996 let len = u32::from_le_bytes(len_bytes.try_into().unwrap()) as usize;
997 data.get(foff + 4..foff + 4 + len)
998 .and_then(|b| std::str::from_utf8(b).ok())
999 .map(|s| s == self.value)
1000 .unwrap_or(false)
1001 }
1002 }
1003 }
1004}
1005
1006impl Predicate for Eq<'_, i64> {
1007 fn test(&self, data: &[u8], reader: &Reader<'_>, off: usize) -> bool {
1008 let Some(slot) = reader.slot(self.key) else {
1009 return false;
1010 };
1011 match reader.layout {
1012 Layout::Columnar => {
1013 if is_var_sigil(*reader.key_sigils.get(slot).unwrap_or(&0)) {
1014 return false;
1015 }
1016 let Ok((bm, vals)) = reader.col_field_parts(slot) else {
1017 return false;
1018 };
1019 if !col_bit(bm, off) {
1020 return false;
1021 }
1022 let o = off * 8;
1023 vals.get(o..o + 8)
1024 .and_then(|b| b.try_into().ok())
1025 .map(|b| i64::from_le_bytes(b) == self.value)
1026 .unwrap_or(false)
1027 }
1028 Layout::Pax => reader
1029 .pax_get_i64(off, slot)
1030 .map(|v| v == self.value)
1031 .unwrap_or(false),
1032 Layout::Row => {
1033 let Some(foff) = resolve_slot(data, off, slot) else {
1034 return false;
1035 };
1036 data.get(foff..foff + 8)
1037 .and_then(|b| b.try_into().ok())
1038 .map(|b| i64::from_le_bytes(b) == self.value)
1039 .unwrap_or(false)
1040 }
1041 }
1042 }
1043}
1044
1045impl Predicate for Eq<'_, f64> {
1046 fn test(&self, data: &[u8], reader: &Reader<'_>, off: usize) -> bool {
1047 let Some(slot) = reader.slot(self.key) else {
1048 return false;
1049 };
1050 match reader.layout {
1051 Layout::Columnar => {
1052 if is_var_sigil(*reader.key_sigils.get(slot).unwrap_or(&0)) {
1053 return false;
1054 }
1055 let Ok((bm, vals)) = reader.col_field_parts(slot) else {
1056 return false;
1057 };
1058 if !col_bit(bm, off) {
1059 return false;
1060 }
1061 let o = off * 8;
1062 vals.get(o..o + 8)
1063 .and_then(|b| b.try_into().ok())
1064 .map(|b| f64::from_le_bytes(b) == self.value)
1065 .unwrap_or(false)
1066 }
1067 Layout::Pax => reader
1068 .pax_get_f64(off, slot)
1069 .map(|v| v == self.value)
1070 .unwrap_or(false),
1071 Layout::Row => {
1072 let Some(foff) = resolve_slot(data, off, slot) else {
1073 return false;
1074 };
1075 data.get(foff..foff + 8)
1076 .and_then(|b| b.try_into().ok())
1077 .map(|b| f64::from_le_bytes(b) == self.value)
1078 .unwrap_or(false)
1079 }
1080 }
1081 }
1082}
1083
1084pub struct Gt<'k, V> {
1086 pub key: &'k str,
1087 pub value: V,
1088}
1089
1090pub fn gt<'k, V>(key: &'k str, value: V) -> crate::query::Gt<'k, V> {
1091 crate::query::Gt { key, value }
1092}
1093
1094impl Predicate for Gt<'_, f64> {
1095 fn test(&self, data: &[u8], reader: &Reader<'_>, off: usize) -> bool {
1096 let Some(slot) = reader.slot(self.key) else {
1097 return false;
1098 };
1099 match reader.layout {
1100 Layout::Columnar => {
1101 if is_var_sigil(*reader.key_sigils.get(slot).unwrap_or(&0)) {
1102 return false;
1103 }
1104 let Ok((bm, vals)) = reader.col_field_parts(slot) else {
1105 return false;
1106 };
1107 if !col_bit(bm, off) {
1108 return false;
1109 }
1110 let o = off * 8;
1111 vals.get(o..o + 8)
1112 .and_then(|b| b.try_into().ok())
1113 .map(|b| f64::from_le_bytes(b) > self.value)
1114 .unwrap_or(false)
1115 }
1116 Layout::Pax => reader
1117 .pax_get_f64(off, slot)
1118 .map(|v| v > self.value)
1119 .unwrap_or(false),
1120 Layout::Row => {
1121 let Some(foff) = resolve_slot(data, off, slot) else {
1122 return false;
1123 };
1124 data.get(foff..foff + 8)
1125 .and_then(|b| b.try_into().ok())
1126 .map(|b| f64::from_le_bytes(b) > self.value)
1127 .unwrap_or(false)
1128 }
1129 }
1130 }
1131}
1132
1133impl Predicate for Gt<'_, i64> {
1134 fn test(&self, data: &[u8], reader: &Reader<'_>, off: usize) -> bool {
1135 let Some(slot) = reader.slot(self.key) else {
1136 return false;
1137 };
1138 match reader.layout {
1139 Layout::Columnar => {
1140 if is_var_sigil(*reader.key_sigils.get(slot).unwrap_or(&0)) {
1141 return false;
1142 }
1143 let Ok((bm, vals)) = reader.col_field_parts(slot) else {
1144 return false;
1145 };
1146 if !col_bit(bm, off) {
1147 return false;
1148 }
1149 let o = off * 8;
1150 vals.get(o..o + 8)
1151 .and_then(|b| b.try_into().ok())
1152 .map(|b| i64::from_le_bytes(b) > self.value)
1153 .unwrap_or(false)
1154 }
1155 Layout::Pax => reader
1156 .pax_get_i64(off, slot)
1157 .map(|v| v > self.value)
1158 .unwrap_or(false),
1159 Layout::Row => {
1160 let Some(foff) = resolve_slot(data, off, slot) else {
1161 return false;
1162 };
1163 data.get(foff..foff + 8)
1164 .and_then(|b| b.try_into().ok())
1165 .map(|b| i64::from_le_bytes(b) > self.value)
1166 .unwrap_or(false)
1167 }
1168 }
1169 }
1170}
1171
1172pub struct Lt<'k, V> {
1174 pub key: &'k str,
1175 pub value: V,
1176}
1177
1178pub fn lt<'k, V>(key: &'k str, value: V) -> crate::query::Lt<'k, V> {
1179 crate::query::Lt { key, value }
1180}
1181
1182impl Predicate for Lt<'_, f64> {
1183 fn test(&self, data: &[u8], reader: &Reader<'_>, off: usize) -> bool {
1184 let Some(slot) = reader.slot(self.key) else {
1185 return false;
1186 };
1187 match reader.layout {
1188 Layout::Columnar => {
1189 if is_var_sigil(*reader.key_sigils.get(slot).unwrap_or(&0)) {
1190 return false;
1191 }
1192 let Ok((bm, vals)) = reader.col_field_parts(slot) else {
1193 return false;
1194 };
1195 if !col_bit(bm, off) {
1196 return false;
1197 }
1198 let o = off * 8;
1199 vals.get(o..o + 8)
1200 .and_then(|b| b.try_into().ok())
1201 .map(|b| f64::from_le_bytes(b) < self.value)
1202 .unwrap_or(false)
1203 }
1204 Layout::Pax => reader
1205 .pax_get_f64(off, slot)
1206 .map(|v| v < self.value)
1207 .unwrap_or(false),
1208 Layout::Row => {
1209 let Some(foff) = resolve_slot(data, off, slot) else {
1210 return false;
1211 };
1212 data.get(foff..foff + 8)
1213 .and_then(|b| b.try_into().ok())
1214 .map(|b| f64::from_le_bytes(b) < self.value)
1215 .unwrap_or(false)
1216 }
1217 }
1218 }
1219}
1220
1221impl Predicate for Lt<'_, i64> {
1222 fn test(&self, data: &[u8], reader: &Reader<'_>, off: usize) -> bool {
1223 let Some(slot) = reader.slot(self.key) else {
1224 return false;
1225 };
1226 match reader.layout {
1227 Layout::Columnar => {
1228 if is_var_sigil(*reader.key_sigils.get(slot).unwrap_or(&0)) {
1229 return false;
1230 }
1231 let Ok((bm, vals)) = reader.col_field_parts(slot) else {
1232 return false;
1233 };
1234 if !col_bit(bm, off) {
1235 return false;
1236 }
1237 let o = off * 8;
1238 vals.get(o..o + 8)
1239 .and_then(|b| b.try_into().ok())
1240 .map(|b| i64::from_le_bytes(b) < self.value)
1241 .unwrap_or(false)
1242 }
1243 Layout::Pax => reader
1244 .pax_get_i64(off, slot)
1245 .map(|v| v < self.value)
1246 .unwrap_or(false),
1247 Layout::Row => {
1248 let Some(foff) = resolve_slot(data, off, slot) else {
1249 return false;
1250 };
1251 data.get(foff..foff + 8)
1252 .and_then(|b| b.try_into().ok())
1253 .map(|b| i64::from_le_bytes(b) < self.value)
1254 .unwrap_or(false)
1255 }
1256 }
1257 }
1258}
1259
1260pub struct And<A, B>(pub A, pub B);
1262
1263impl<A: Predicate, B: Predicate> Predicate for And<A, B> {
1264 fn test(&self, data: &[u8], reader: &Reader<'_>, off: usize) -> bool {
1265 self.0.test(data, reader, off) && self.1.test(data, reader, off)
1266 }
1267}
1268
1269pub struct Or<A, B>(pub A, pub B);
1271
1272impl<A: Predicate, B: Predicate> Predicate for Or<A, B> {
1273 fn test(&self, data: &[u8], reader: &Reader<'_>, off: usize) -> bool {
1274 self.0.test(data, reader, off) || self.1.test(data, reader, off)
1275 }
1276}
1277
1278pub struct Not<P>(pub P);
1280
1281impl<P: Predicate> Predicate for Not<P> {
1282 fn test(&self, data: &[u8], reader: &Reader<'_>, off: usize) -> bool {
1283 !self.0.test(data, reader, off)
1284 }
1285}
1286
1287pub(crate) fn parse_schema(data: &[u8], offset: usize) -> Result<(Vec<String>, Vec<u8>, usize)> {
1290 if offset + 2 > data.len() {
1291 return Err(NxsError::OutOfBounds);
1292 }
1293 let key_count = u16::from_le_bytes(
1294 data[offset..offset + 2]
1295 .try_into()
1296 .map_err(|_| NxsError::OutOfBounds)?,
1297 ) as usize;
1298 let mut pos = offset + 2;
1299
1300 if pos + key_count > data.len() {
1301 return Err(NxsError::OutOfBounds);
1302 }
1303 let sigils = data[pos..pos + key_count].to_vec();
1304 pos += key_count;
1305
1306 let mut keys = Vec::with_capacity(key_count);
1307 for _ in 0..key_count {
1308 let start = pos;
1309 while pos < data.len() && data[pos] != 0 {
1310 pos += 1;
1311 }
1312 if pos >= data.len() {
1313 return Err(NxsError::OutOfBounds);
1314 }
1315 keys.push(
1316 std::str::from_utf8(&data[start..pos])
1317 .map_err(|_| NxsError::ParseError("invalid utf-8 key".into()))?
1318 .to_owned(),
1319 );
1320 pos += 1; }
1322 if pos % 8 != 0 {
1324 pos += 8 - pos % 8;
1325 }
1326 Ok((keys, sigils, pos))
1327}
1328
1329pub(crate) fn resolve_slot(data: &[u8], obj_offset: usize, slot: usize) -> Option<usize> {
1334 let mut p = obj_offset.checked_add(8)?; let mut cur: usize = 0;
1336 let mut table_idx: usize = 0;
1337 let mut found = false;
1338 let mut b: u8;
1339 loop {
1340 b = *data.get(p)?;
1341 p = p.checked_add(1)?;
1342 let bits = b & 0x7F;
1343 for bit in 0..7usize {
1344 if cur == slot {
1345 if (bits >> bit) & 1 == 0 {
1346 return None;
1347 }
1348 found = true;
1349 } else if cur < slot && (bits >> bit) & 1 == 1 {
1350 table_idx = table_idx.checked_add(1)?;
1351 }
1352 cur = cur.checked_add(1)?;
1353 }
1354 if found && b & 0x80 == 0 {
1355 break;
1356 }
1357 if cur > slot && found {
1358 break;
1359 }
1360 if b & 0x80 == 0 {
1361 return None;
1362 }
1363 }
1364 while b & 0x80 != 0 {
1366 b = *data.get(p)?;
1367 p = p.checked_add(1)?;
1368 }
1369 let table_off = table_idx.checked_mul(2)?;
1370 let table_start = p.checked_add(table_off)?;
1371 let table_end = table_start.checked_add(2)?;
1372 let rel = u16::from_le_bytes(data.get(table_start..table_end)?.try_into().ok()?) as usize;
1373 obj_offset.checked_add(rel)
1374}
1375
1376#[cfg(test)]
1379mod tests {
1380 use super::*;
1381 use crate::writer::{NxsWriter, Schema};
1382
1383 fn make_nxb() -> Vec<u8> {
1384 let schema = Schema::new(&["id", "username", "score", "active"]);
1385 let mut w = NxsWriter::new(&schema);
1386 for (id, name, score, active) in [
1387 (1i64, "alice", 95.0f64, true),
1388 (2i64, "bob", 42.0f64, false),
1389 (3i64, "carol", 88.0f64, true),
1390 (4i64, "dave", 15.0f64, false),
1391 (5i64, "eve", 77.0f64, true),
1392 ] {
1393 w.begin_object();
1394 w.write_i64(crate::writer::Slot(0), id);
1395 w.write_str(crate::writer::Slot(1), name);
1396 w.write_f64(crate::writer::Slot(2), score);
1397 w.write_bool(crate::writer::Slot(3), active);
1398 w.end_object();
1399 }
1400 w.finish()
1401 }
1402
1403 #[test]
1404 fn reader_opens_and_counts() {
1405 let data = make_nxb();
1406 let r = Reader::new(&data).unwrap();
1407 assert_eq!(r.record_count(), 5);
1408 assert_eq!(r.keys(), &["id", "username", "score", "active"]);
1409 }
1410
1411 #[test]
1412 fn record_access_by_index() {
1413 let data = make_nxb();
1414 let r = Reader::new(&data).unwrap();
1415 let rec = r.record(2).unwrap();
1416 assert_eq!(rec.get_str("username"), Some("carol"));
1417 assert_eq!(rec.get_i64("id"), Some(3));
1418 assert!((rec.get_f64("score").unwrap() - 88.0).abs() < 1e-9);
1419 assert_eq!(rec.get_bool("active"), Some(true));
1420 }
1421
1422 #[test]
1423 fn all_iterates_every_record() {
1424 let data = make_nxb();
1425 let r = Reader::new(&data).unwrap();
1426 assert_eq!(r.all().count(), 5);
1427 }
1428
1429 #[test]
1430 fn where_eq_bool() {
1431 let data = make_nxb();
1432 let r = Reader::new(&data).unwrap();
1433 let active: Vec<_> = r
1434 .where_pred(eq("active", true))
1435 .map(|rec| rec.get_str("username").unwrap().to_owned())
1436 .collect();
1437 assert_eq!(active, vec!["alice", "carol", "eve"]);
1438 }
1439
1440 #[test]
1441 fn where_gt_f64() {
1442 let data = make_nxb();
1443 let r = Reader::new(&data).unwrap();
1444 let count = r.where_pred(gt("score", 80.0f64)).count();
1445 assert_eq!(count, 2); }
1447
1448 #[test]
1449 fn where_lt_f64() {
1450 let data = make_nxb();
1451 let r = Reader::new(&data).unwrap();
1452 let count = r.where_pred(lt("score", 50.0f64)).count();
1453 assert_eq!(count, 2); }
1455
1456 #[test]
1457 fn where_and() {
1458 let data = make_nxb();
1459 let r = Reader::new(&data).unwrap();
1460 let count = r
1461 .where_pred(And(eq("active", true), gt("score", 80.0f64)))
1462 .count();
1463 assert_eq!(count, 2); }
1465
1466 #[test]
1467 fn where_or() {
1468 let data = make_nxb();
1469 let r = Reader::new(&data).unwrap();
1470 let count = r
1471 .where_pred(Or(gt("score", 90.0f64), lt("score", 20.0f64)))
1472 .count();
1473 assert_eq!(count, 2); }
1475
1476 #[test]
1477 fn where_not() {
1478 let data = make_nxb();
1479 let r = Reader::new(&data).unwrap();
1480 let count = r.where_pred(Not(eq("active", true))).count();
1481 assert_eq!(count, 2); }
1483
1484 #[test]
1485 fn early_termination() {
1486 let data = make_nxb();
1487 let r = Reader::new(&data).unwrap();
1488 let first = r.all().next().unwrap();
1489 assert_eq!(first.get_str("username"), Some("alice"));
1490 }
1491
1492 #[test]
1493 fn unknown_key_matches_nothing() {
1494 let data = make_nxb();
1495 let r = Reader::new(&data).unwrap();
1496 assert_eq!(r.where_pred(eq("nonexistent", true)).count(), 0);
1497 }
1498
1499 #[test]
1500 fn get_str_path_single_segment() {
1501 let data = make_nxb();
1502 let r = Reader::new(&data).unwrap();
1503 let rec = r.record(0).unwrap();
1504 assert_eq!(rec.get_str_path("username"), Some("alice"));
1505 }
1506
1507 #[test]
1508 fn get_str_path_absent_returns_none() {
1509 let data = make_nxb();
1510 let r = Reader::new(&data).unwrap();
1511 let rec = r.record(0).unwrap();
1512 assert_eq!(rec.get_str_path("no.such.path"), None);
1513 }
1514
1515 fn make_columnar_nxb() -> Vec<u8> {
1516 use crate::layout::{finish_columnar, Cell, RecordRow};
1517 let keys = vec!["id".to_string(), "score".to_string(), "active".to_string()];
1518 let rows: Vec<RecordRow> = vec![
1519 RecordRow {
1520 cells: vec![Cell::I64(1), Cell::F64(95.0), Cell::Bool(true)],
1521 },
1522 RecordRow {
1523 cells: vec![Cell::I64(2), Cell::F64(42.0), Cell::Bool(false)],
1524 },
1525 RecordRow {
1526 cells: vec![Cell::I64(3), Cell::F64(88.0), Cell::Bool(true)],
1527 },
1528 RecordRow {
1529 cells: vec![Cell::I64(4), Cell::F64(15.0), Cell::Bool(false)],
1530 },
1531 RecordRow {
1532 cells: vec![Cell::I64(5), Cell::F64(77.0), Cell::Bool(true)],
1533 },
1534 ];
1535 finish_columnar(&keys, &rows).unwrap()
1536 }
1537
1538 #[test]
1539 fn columnar_where_pred_iterates_correctly() {
1540 let data = make_columnar_nxb();
1541 let r = Reader::new(&data).unwrap();
1542 assert_eq!(r.layout(), Layout::Columnar);
1543 assert_eq!(r.record_count(), 5);
1544
1545 assert_eq!(r.all().count(), 5);
1547
1548 let active_ids: Vec<i64> = r
1550 .where_pred(eq("active", true))
1551 .filter_map(|rec| rec.get_i64("id"))
1552 .collect();
1553 assert_eq!(active_ids, vec![1, 3, 5]);
1554
1555 let high_score_ids: Vec<i64> = r
1557 .where_pred(gt("score", 80.0f64))
1558 .filter_map(|rec| rec.get_i64("id"))
1559 .collect();
1560 assert_eq!(high_score_ids, vec![1, 3]);
1561
1562 let rec = r.record(2).unwrap();
1564 assert_eq!(rec.get_i64("id"), Some(3));
1565 assert!((rec.get_f64("score").unwrap() - 88.0).abs() < 1e-9);
1566 assert_eq!(rec.get_bool("active"), Some(true));
1567 }
1568
1569 #[test]
1570 fn walk_path_deep_segments_returns_none_not_wrong_key() {
1571 let data = make_nxb();
1576 let r = Reader::new(&data).unwrap();
1577 let rec = r.record(0).unwrap();
1578
1579 let deep = "a.b.c.d.e.f.g.h.i"; assert_eq!(rec.get_str_path(deep), None);
1582 assert_eq!(rec.get_i64_path(deep), None);
1583
1584 let deeper = "a.b.c.d.e.f.g.h.i.j"; assert_eq!(rec.get_str_path(deeper), None);
1587
1588 assert_eq!(rec.get_str_path("username"), Some("alice"));
1590 }
1591
1592 #[test]
1593 fn columnar_conformance_vector_col_sum() {
1594 let path = std::path::Path::new(env!("CARGO_MANIFEST_DIR"))
1595 .join("../conformance/columnar_flat8_dense_100.nxb");
1596 let data = match std::fs::read(&path) {
1597 Ok(d) => d,
1598 Err(_) => return,
1599 };
1600 let r = Reader::new(&data).unwrap();
1601 assert_eq!(r.layout(), Layout::Columnar);
1602 assert_eq!(r.record_count(), 100);
1603 let sum = r.col_sum_f64("score").unwrap();
1604 let want: f64 = (0..100).map(|i| i as f64 * 0.5).sum();
1605 assert!((sum - want).abs() < 1e-9, "sum {sum} want {want}");
1606 let buf = r.col_buffer("score").unwrap();
1607 assert_eq!(buf.len(), 100 * 8);
1608 }
1609
1610 #[test]
1611 fn columnar_strings_roundtrip() {
1612 use crate::layout::{finish_columnar, null_bitmap_bytes, var_str_at, Cell, RecordRow};
1613
1614 let keys = vec!["id".into(), "name".into(), "score".into()];
1615 let mut rows = Vec::new();
1616 for i in 0..100usize {
1617 rows.push(RecordRow {
1618 cells: vec![
1619 Cell::I64(i as i64),
1620 Cell::Str(format!("user_{i}")),
1621 Cell::F64(i as f64 * 1.25),
1622 ],
1623 });
1624 }
1625 let bytes = finish_columnar(&keys, &rows).unwrap();
1626 let r = Reader::new(&bytes).unwrap();
1627 assert_eq!(r.record_count(), 100);
1628 for i in 0..100 {
1629 let rec = r.record(i).unwrap();
1630 assert_eq!(rec.get_i64("id"), Some(i as i64));
1631 let want = format!("user_{i}");
1632 assert_eq!(rec.get_str("name"), Some(want.as_str()));
1633 assert!((rec.get_f64("score").unwrap() - i as f64 * 1.25).abs() < 1e-9);
1634 }
1635 let (bm, offsets, values) = r.col_field_var_parts(1).unwrap();
1636 assert_eq!(bm.len(), null_bitmap_bytes(100));
1637 assert_eq!(offsets.len(), 101 * 4);
1638 assert!(!values.is_empty());
1639 assert_eq!(var_str_at(offsets, values, 42), Some("user_42"));
1640 }
1641
1642 #[test]
1643 fn pax_strings_roundtrip_across_pages() {
1644 use crate::layout::{finish_pax, Cell, RecordRow};
1645
1646 let keys = vec!["id".into(), "name".into(), "score".into()];
1647 let rows: Vec<RecordRow> = (0..300usize)
1648 .map(|i| RecordRow {
1649 cells: vec![
1650 Cell::I64(i as i64),
1651 Cell::Str(format!("user_{i}")),
1652 Cell::F64(i as f64),
1653 ],
1654 })
1655 .collect();
1656 let bytes = finish_pax(&keys, &rows, 128).unwrap();
1657 let r = Reader::new(&bytes).unwrap();
1658 assert_eq!(r.record_count(), 300);
1659 for i in [0usize, 127, 128, 257, 299] {
1660 let rec = r.record(i).unwrap();
1661 let want = format!("user_{i}");
1662 assert_eq!(rec.get_str("name"), Some(want.as_str()));
1663 assert_eq!(rec.get_i64("id"), Some(i as i64));
1664 }
1665 }
1666}