Skip to main content

nxs/
layout.rs

1//! Columnar and PAX `.nxb` layout writers (OLAP.md v0.1).
2//!
3//! Phase 1: dense numeric columnar (`FLAG_COLUMNAR`).
4//! Phase 2: PAX pages with per-page column groups (`FLAG_PAX`).
5//! Phase 3: variable-length string/binary columns (u32 offsets + values tail).
6
7// Re-export shared constants so callers that `use crate::layout::…` still compile.
8pub use crate::consts::{
9    FLAG_COLUMNAR, FLAG_PAX, FLAG_SCHEMA_EMBEDDED, MAGIC_FILE, MAGIC_FOOTER, MAGIC_PAGE, VERSION,
10};
11use crate::error::{NxsError, Result};
12use crate::parser::{Field, Value};
13use crate::writer::{build_schema, murmur3_64, NxsWriter};
14use std::collections::HashMap;
15
16const FOOTER_ROW: usize = 12;
17const FOOTER_COLUMNAR: usize = 20;
18const FOOTER_PAX: usize = 28;
19
20/// Layout selection for compile / writer finish.
21#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
22pub enum Layout {
23    #[default]
24    Row,
25    Columnar,
26    Pax,
27}
28
29impl Layout {
30    pub fn parse_name(s: &str) -> Option<Self> {
31        match s {
32            "row" => Some(Layout::Row),
33            "columnar" => Some(Layout::Columnar),
34            "pax" => Some(Layout::Pax),
35            _ => None,
36        }
37    }
38
39    pub fn flag(self) -> u16 {
40        match self {
41            Layout::Row => 0,
42            Layout::Columnar => FLAG_COLUMNAR,
43            Layout::Pax => FLAG_PAX,
44        }
45    }
46}
47
48/// Parsed file directives (`@layout`, `@page-size`).
49#[derive(Debug, Clone, Default)]
50pub struct CompileOptions {
51    pub layout: Layout,
52    pub page_size: u32,
53}
54
55impl CompileOptions {
56    pub fn validate_flags(&self, tail_ptr_zero: bool) -> Result<()> {
57        if self.layout == Layout::Columnar && tail_ptr_zero {
58            return Err(NxsError::IncompatibleFlags);
59        }
60        Ok(())
61    }
62}
63
64/// Apply pragma from `@name` macro token (value follows as next token).
65pub fn apply_pragma(opts: &mut CompileOptions, name: &str, value: &str) -> Result<()> {
66    match name {
67        "layout" => {
68            opts.layout = Layout::parse_name(value)
69                .ok_or_else(|| NxsError::ParseError(format!("unknown layout: {value}")))?;
70        }
71        "page-size" => {
72            opts.page_size = value
73                .parse()
74                .map_err(|_| NxsError::ParseError(format!("bad page-size: {value}")))?;
75            if opts.page_size == 0 {
76                return Err(NxsError::ParseError("page-size must be > 0".into()));
77            }
78        }
79        other => {
80            return Err(NxsError::ParseError(format!("unknown pragma: @{other}")));
81        }
82    }
83    Ok(())
84}
85
86/// Validate preamble flag combinations.
87pub fn validate_preamble_flags(flags: u16) -> Result<()> {
88    let col = flags & FLAG_COLUMNAR != 0;
89    let pax = flags & FLAG_PAX != 0;
90    if col && pax {
91        return Err(NxsError::InvalidFlags);
92    }
93    if (col || pax) && flags & FLAG_SCHEMA_EMBEDDED == 0 {
94        return Err(NxsError::ParseError(
95            "columnar/PAX requires FLAG_SCHEMA_EMBEDDED".into(),
96        ));
97    }
98    Ok(())
99}
100
101// ── Record model for layout emitters ─────────────────────────────────────────
102
103#[derive(Clone, Debug, PartialEq)]
104pub enum Cell {
105    Absent,
106    Null,
107    I64(i64),
108    F64(f64),
109    Bool(bool),
110    Time(i64),
111    Str(String),
112    Binary(Vec<u8>),
113}
114
115impl Cell {
116    fn from_value(v: &Value) -> Result<Self> {
117        match v {
118            Value::Int(n) => Ok(Cell::I64(*n)),
119            Value::Float(f) => Ok(Cell::F64(*f)),
120            Value::Bool(b) => Ok(Cell::Bool(*b)),
121            Value::Time(ns) => Ok(Cell::Time(*ns)),
122            Value::Null => Ok(Cell::Null),
123            Value::Str(s) => Ok(Cell::Str(s.clone())),
124            Value::Binary(b) => Ok(Cell::Binary(b.clone())),
125            Value::Keyword(_) => Err(NxsError::UnsupportedFieldType),
126            Value::Object(_) | Value::List(_) | Value::Macro(_) | Value::Link(_) => Err(
127                NxsError::ParseError("nested values not supported in columnar/PAX records".into()),
128            ),
129        }
130    }
131
132    fn sigil(self) -> u8 {
133        match self {
134            Cell::I64(_) => b'=',
135            Cell::F64(_) => b'~',
136            Cell::Bool(_) => b'?',
137            Cell::Time(_) => b'@',
138            Cell::Str(_) => b'"',
139            Cell::Binary(_) => b'<',
140            Cell::Null => b'^',
141            Cell::Absent => 0,
142        }
143    }
144}
145
146/// True when the schema sigil denotes a variable-length column (`"` string, `<` binary).
147pub fn is_var_sigil(sigil: u8) -> bool {
148    matches!(sigil, b'"' | b'<')
149}
150
151/// Byte length of one encoded field column (null bitmap + value buffer(s)).
152pub fn column_sector_len(sector: &[u8], record_count: usize, sigil: u8) -> Result<usize> {
153    let bm_len = null_bitmap_bytes(record_count);
154    if sector.len() < bm_len {
155        return Err(NxsError::OutOfBounds);
156    }
157    if is_var_sigil(sigil) {
158        let off_bytes = record_count
159            .checked_add(1)
160            .and_then(|n| n.checked_mul(4))
161            .ok_or(NxsError::OutOfBounds)?;
162        if sector.len() < bm_len.checked_add(off_bytes).ok_or(NxsError::OutOfBounds)? {
163            return Err(NxsError::OutOfBounds);
164        }
165        let end_off = bm_len
166            .checked_add(record_count.checked_mul(4).ok_or(NxsError::OutOfBounds)?)
167            .ok_or(NxsError::OutOfBounds)?;
168        let last = u32::from_le_bytes(
169            sector[end_off..end_off + 4]
170                .try_into()
171                .map_err(|_| NxsError::OutOfBounds)?,
172        ) as usize;
173        bm_len
174            .checked_add(off_bytes)
175            .and_then(|x| x.checked_add(last))
176            .ok_or(NxsError::OutOfBounds)
177    } else {
178        let cells = record_count.checked_mul(8).ok_or(NxsError::OutOfBounds)?;
179        bm_len.checked_add(cells).ok_or(NxsError::OutOfBounds)
180    }
181}
182
183/// Column tail after the null bitmap: `(N+1)` little-endian u32 offsets, then UTF-8/raw bytes.
184pub fn col_var_parts(sector: &[u8], record_count: usize) -> Result<(&[u8], &[u8], &[u8])> {
185    let bm_len = null_bitmap_bytes(record_count);
186    let off_bytes = record_count
187        .checked_add(1)
188        .and_then(|n| n.checked_mul(4))
189        .ok_or(NxsError::OutOfBounds)?;
190    if sector.len() < bm_len.saturating_add(off_bytes) {
191        return Err(NxsError::OutOfBounds);
192    }
193    let bm = &sector[..bm_len];
194    let offsets = &sector[bm_len..bm_len + off_bytes];
195    let values = &sector[bm_len + off_bytes..];
196    Ok((bm, offsets, values))
197}
198
199/// Read one UTF-8 string cell from a variable-length column sector.
200pub fn var_str_at<'a>(offsets: &'a [u8], values: &'a [u8], record_index: usize) -> Option<&'a str> {
201    let need = record_index.checked_add(2).and_then(|n| n.checked_mul(4))?;
202    if offsets.len() < need {
203        return None;
204    }
205    let start = u32::from_le_bytes(
206        offsets[record_index * 4..record_index * 4 + 4]
207            .try_into()
208            .ok()?,
209    ) as usize;
210    let end = u32::from_le_bytes(
211        offsets[record_index * 4 + 4..record_index * 4 + 8]
212            .try_into()
213            .ok()?,
214    ) as usize;
215    if end < start || end > values.len() {
216        return None;
217    }
218    std::str::from_utf8(&values[start..end]).ok()
219}
220
221/// Read one binary cell from a variable-length column sector.
222pub fn var_binary_at<'a>(
223    offsets: &'a [u8],
224    values: &'a [u8],
225    record_index: usize,
226) -> Option<&'a [u8]> {
227    let need = record_index.checked_add(2).and_then(|n| n.checked_mul(4))?;
228    if offsets.len() < need {
229        return None;
230    }
231    let start = u32::from_le_bytes(
232        offsets[record_index * 4..record_index * 4 + 4]
233            .try_into()
234            .ok()?,
235    ) as usize;
236    let end = u32::from_le_bytes(
237        offsets[record_index * 4 + 4..record_index * 4 + 8]
238            .try_into()
239            .ok()?,
240    ) as usize;
241    if end < start || end > values.len() {
242        return None;
243    }
244    Some(&values[start..end])
245}
246
247#[derive(Clone)]
248pub struct RecordRow {
249    pub cells: Vec<Cell>,
250}
251
252/// Extract top-level records from parsed fields (each `key { ... }` object).
253pub fn records_from_fields(fields: &[Field]) -> Result<(Vec<String>, Vec<RecordRow>)> {
254    let mut key_order: Vec<String> = Vec::new();
255    let mut key_index: HashMap<String, usize> = HashMap::new();
256    let mut rows: Vec<RecordRow> = Vec::new();
257
258    for field in fields {
259        let Value::Object(inner) = &field.value else {
260            return Err(NxsError::ParseError(
261                "columnar/PAX compile expects top-level `name { ... }` record blocks".into(),
262            ));
263        };
264        let mut cells = Vec::new();
265        for f in inner {
266            let cell = Cell::from_value(&f.value)?;
267            let idx = if let Some(&i) = key_index.get(&f.key) {
268                i
269            } else {
270                let i = key_order.len();
271                key_order.push(f.key.clone());
272                key_index.insert(f.key.clone(), i);
273                cells.resize(i, Cell::Absent);
274                i
275            };
276            if cells.len() <= idx {
277                cells.resize(idx + 1, Cell::Absent);
278            }
279            cells[idx] = cell;
280        }
281        rows.push(RecordRow { cells });
282    }
283
284    if rows.is_empty() {
285        return Err(NxsError::ParseError("no records to compile".into()));
286    }
287
288    let width = key_order.len();
289    for row in &mut rows {
290        row.cells.resize(width, Cell::Absent);
291    }
292    Ok((key_order, rows))
293}
294
295/// Round `n` bits up to the nearest multiple of 8 bytes (64 bits).
296pub(crate) fn null_bitmap_bytes(n: usize) -> usize {
297    let raw = (n + 7) / 8;
298    ((raw + 7) / 8) * 8
299}
300
301fn encode_null_bitmap(n: usize, present: impl Fn(usize) -> bool) -> Vec<u8> {
302    let len = null_bitmap_bytes(n);
303    let mut b = vec![0u8; len];
304    for i in 0..n {
305        if present(i) {
306            b[i / 8] |= 1 << (i % 8);
307        }
308    }
309    b
310}
311
312fn cell_populated(c: &Cell) -> bool {
313    !matches!(c, Cell::Absent)
314}
315
316fn write_fixed_buffer(n: usize, cells: &[&Cell], encode: impl Fn(&Cell) -> [u8; 8]) -> Vec<u8> {
317    let mut buf = vec![0u8; n * 8];
318    for (i, c) in cells.iter().enumerate().take(n) {
319        if cell_populated(c) {
320            buf[i * 8..(i + 1) * 8].copy_from_slice(&encode(c));
321        }
322    }
323    buf
324}
325
326fn encode_var_column(n: usize, col: &[&Cell]) -> Result<Vec<u8>> {
327    let present = |i: usize| cell_populated(col[i]);
328    let bitmap = encode_null_bitmap(n, present);
329    let mut offsets: Vec<u32> = Vec::with_capacity(n + 1);
330    let mut values: Vec<u8> = Vec::new();
331    offsets.push(0);
332    for cell in col.iter().take(n) {
333        if !cell_populated(cell) {
334            offsets.push(*offsets.last().unwrap_or(&0));
335            continue;
336        }
337        match cell {
338            Cell::Str(s) => values.extend_from_slice(s.as_bytes()),
339            Cell::Binary(b) => values.extend_from_slice(b),
340            _ => {}
341        }
342        let end = values.len();
343        if end > u32::MAX as usize {
344            return Err(NxsError::Overflow);
345        }
346        offsets.push(end as u32);
347    }
348    let mut out = bitmap;
349    for o in offsets {
350        out.extend_from_slice(&o.to_le_bytes());
351    }
352    out.extend_from_slice(&values);
353    Ok(out)
354}
355
356fn encode_field_column(n: usize, col: &[&Cell], sigil: u8) -> Result<Vec<u8>> {
357    if is_var_sigil(sigil) {
358        return encode_var_column(n, col);
359    }
360    let present = |i: usize| cell_populated(col[i]);
361    let bitmap = encode_null_bitmap(n, present);
362    let values = match sigil {
363        b'=' => write_fixed_buffer(n, col, |c| match c {
364            Cell::I64(v) => v.to_le_bytes(),
365            Cell::Null | Cell::Absent => 0i64.to_le_bytes(),
366            _ => [0u8; 8],
367        }),
368        b'~' => write_fixed_buffer(n, col, |c| match c {
369            Cell::F64(v) => v.to_le_bytes(),
370            Cell::Null | Cell::Absent => 0f64.to_le_bytes(),
371            _ => [0u8; 8],
372        }),
373        b'?' => write_fixed_buffer(n, col, |c| match c {
374            Cell::Bool(v) => {
375                let mut b = [0u8; 8];
376                b[0] = if *v { 1 } else { 0 };
377                b
378            }
379            Cell::Null | Cell::Absent => [0u8; 8],
380            _ => [0u8; 8],
381        }),
382        b'@' => write_fixed_buffer(n, col, |c| match c {
383            Cell::Time(v) => v.to_le_bytes(),
384            Cell::Null | Cell::Absent => 0i64.to_le_bytes(),
385            _ => [0u8; 8],
386        }),
387        b'$' => return Err(NxsError::UnsupportedFieldType),
388        _ => write_fixed_buffer(n, col, |c| match c {
389            Cell::I64(v) => v.to_le_bytes(),
390            Cell::Null | Cell::Absent => 0i64.to_le_bytes(),
391            _ => [0u8; 8],
392        }),
393    };
394    let mut out = bitmap;
395    out.extend_from_slice(&values);
396    Ok(out)
397}
398
399pub(crate) fn sigils_for_keys(keys: &[String], rows: &[RecordRow]) -> Vec<u8> {
400    keys.iter()
401        .enumerate()
402        .map(|(fi, _)| {
403            for row in rows {
404                let c = row.cells.get(fi).cloned().unwrap_or(Cell::Absent);
405                if c != Cell::Absent {
406                    return c.sigil();
407                }
408            }
409            b'='
410        })
411        .collect()
412}
413
414/// Build a sealed columnar `.nxb` from record rows.
415pub fn finish_columnar(keys: &[String], rows: &[RecordRow]) -> Result<Vec<u8>> {
416    let n = rows.len();
417    let sigils = sigils_for_keys(keys, rows);
418    let schema_bytes = build_schema(
419        &keys.iter().map(|s| s.to_string()).collect::<Vec<_>>(),
420        &sigils,
421    );
422    let dict_hash = murmur3_64(&schema_bytes);
423
424    let mut data = Vec::new();
425    let mut tail_entries: Vec<(u16, u64, u64)> = Vec::new();
426    for (fi, sigil) in sigils.iter().enumerate() {
427        let col: Vec<&Cell> = rows
428            .iter()
429            .map(|r| r.cells.get(fi).unwrap_or(&Cell::Absent))
430            .collect();
431        let field_buf = encode_field_column(n, &col, *sigil)?;
432        let offset = 32 + schema_bytes.len() as u64 + data.len() as u64;
433        let length = field_buf.len() as u64;
434        tail_entries.push((fi as u16, offset, length));
435        data.extend_from_slice(&field_buf);
436    }
437
438    let tail_index_offset = 32 + schema_bytes.len() as u64 + data.len() as u64;
439    let mut tail = Vec::new();
440    for (fid, off, len) in &tail_entries {
441        tail.extend_from_slice(&fid.to_le_bytes());
442        tail.extend_from_slice(&0u16.to_le_bytes());
443        tail.extend_from_slice(&off.to_le_bytes());
444        tail.extend_from_slice(&len.to_le_bytes());
445    }
446    tail.extend_from_slice(&tail_index_offset.to_le_bytes());
447    tail.extend_from_slice(&(n as u64).to_le_bytes());
448    tail.extend_from_slice(&MAGIC_FOOTER.to_le_bytes());
449
450    let flags = FLAG_SCHEMA_EMBEDDED | FLAG_COLUMNAR;
451    let mut out = Vec::with_capacity(32 + schema_bytes.len() + data.len() + tail.len());
452    out.extend_from_slice(&MAGIC_FILE.to_le_bytes());
453    out.extend_from_slice(&VERSION.to_le_bytes());
454    out.extend_from_slice(&flags.to_le_bytes());
455    out.extend_from_slice(&dict_hash.to_le_bytes());
456    out.extend_from_slice(&tail_index_offset.to_le_bytes());
457    out.extend_from_slice(&0u64.to_le_bytes());
458    out.extend_from_slice(&schema_bytes);
459    out.extend_from_slice(&data);
460    out.extend_from_slice(&tail);
461    Ok(out)
462}
463
464/// Build a sealed PAX `.nxb`.
465pub fn finish_pax(keys: &[String], rows: &[RecordRow], page_size: u32) -> Result<Vec<u8>> {
466    if page_size == 0 {
467        return Err(NxsError::ParseError("page_size must be > 0".into()));
468    }
469    let n = rows.len();
470    let sigils = sigils_for_keys(keys, rows);
471    let schema_bytes = build_schema(
472        &keys.iter().map(|s| s.to_string()).collect::<Vec<_>>(),
473        &sigils,
474    );
475    let dict_hash = murmur3_64(&schema_bytes);
476
477    let mut data = Vec::new();
478    let mut pages: Vec<(u32, u64, u32, u64, u32)> = Vec::new();
479    let mut page_idx = 0u32;
480    let mut rec_start = 0u64;
481    let mut i = 0usize;
482    while i < n {
483        let count = ((n - i) as u32).min(page_size);
484        let page_records = &rows[i..i + count as usize];
485        let page_off = 32 + schema_bytes.len() as u64 + data.len() as u64;
486        let page_bytes = encode_page(
487            page_idx,
488            rec_start,
489            count,
490            keys.len(),
491            &sigils,
492            page_records,
493        )?;
494        let page_len = page_bytes.len() as u32;
495        pages.push((page_idx, rec_start, count, page_off, page_len));
496        data.extend_from_slice(&page_bytes);
497        page_idx += 1;
498        rec_start += count as u64;
499        i += count as usize;
500    }
501
502    let tail_index_offset = 32 + schema_bytes.len() as u64 + data.len() as u64;
503    let mut tail = Vec::new();
504    for (pidx, rstart, rc, poff, plen) in &pages {
505        tail.extend_from_slice(&pidx.to_le_bytes());
506        tail.extend_from_slice(&rstart.to_le_bytes());
507        tail.extend_from_slice(&rc.to_le_bytes());
508        tail.extend_from_slice(&poff.to_le_bytes());
509        tail.extend_from_slice(&plen.to_le_bytes());
510    }
511    tail.extend_from_slice(&tail_index_offset.to_le_bytes());
512    tail.extend_from_slice(&(n as u64).to_le_bytes());
513    tail.extend_from_slice(&(pages.len() as u32).to_le_bytes());
514    tail.extend_from_slice(&page_size.to_le_bytes());
515    tail.extend_from_slice(&MAGIC_FOOTER.to_le_bytes());
516
517    let flags = FLAG_SCHEMA_EMBEDDED | FLAG_PAX;
518    let mut out = Vec::with_capacity(32 + schema_bytes.len() + data.len() + tail.len());
519    out.extend_from_slice(&MAGIC_FILE.to_le_bytes());
520    out.extend_from_slice(&VERSION.to_le_bytes());
521    out.extend_from_slice(&flags.to_le_bytes());
522    out.extend_from_slice(&dict_hash.to_le_bytes());
523    out.extend_from_slice(&tail_index_offset.to_le_bytes());
524    out.extend_from_slice(&0u64.to_le_bytes());
525    out.extend_from_slice(&schema_bytes);
526    out.extend_from_slice(&data);
527    out.extend_from_slice(&tail);
528    Ok(out)
529}
530
531pub(crate) fn encode_page(
532    page_index: u32,
533    record_start: u64,
534    record_count: u32,
535    field_count: usize,
536    sigils: &[u8],
537    rows: &[RecordRow],
538) -> Result<Vec<u8>> {
539    let n = rows.len();
540    let mut body = Vec::new();
541    for fi in 0..field_count {
542        let col: Vec<&Cell> = rows
543            .iter()
544            .map(|r| r.cells.get(fi).unwrap_or(&Cell::Absent))
545            .collect();
546        let sig = sigils.get(fi).copied().unwrap_or(b'=');
547        body.extend_from_slice(&encode_field_column(n, &col, sig)?);
548    }
549    let header_len = 4 + 4 + 8 + 4 + 2 + 2; // 24
550    let page_len = header_len + body.len() + 4;
551    let mut page = Vec::with_capacity(page_len);
552    page.extend_from_slice(&MAGIC_PAGE.to_le_bytes());
553    page.extend_from_slice(&page_index.to_le_bytes());
554    page.extend_from_slice(&record_start.to_le_bytes());
555    page.extend_from_slice(&record_count.to_le_bytes());
556    page.extend_from_slice(&(field_count as u16).to_le_bytes());
557    page.extend_from_slice(&0u16.to_le_bytes());
558    page.extend_from_slice(&body);
559    page.extend_from_slice(&(page_len as u32).to_le_bytes());
560    while page.len() % 8 != 0 {
561        page.push(0);
562    }
563    Ok(page)
564}
565
566/// Compile parsed fields with the selected layout.
567pub fn compile_fields(fields: &[Field], opts: &CompileOptions) -> Result<Vec<u8>> {
568    match opts.layout {
569        Layout::Row => {
570            let mut compiler = crate::compiler::Compiler::new();
571            compiler.compile(fields)
572        }
573        Layout::Columnar | Layout::Pax => {
574            let (keys, rows) = records_from_fields(fields)?;
575            if opts.layout == Layout::Columnar {
576                finish_columnar(&keys, &rows)
577            } else {
578                let ps = if opts.page_size == 0 {
579                    4096
580                } else {
581                    opts.page_size
582                };
583                finish_pax(&keys, &rows, ps)
584            }
585        }
586    }
587}
588
589/// Build columnar file from row-oriented writer buffers (conformance generator).
590pub fn columnar_from_writer(w: &NxsWriter<'_>) -> Result<Vec<u8>> {
591    let keys: Vec<String> = w.schema_keys().to_vec();
592    let n = w.record_offsets().len();
593    let width = keys.len();
594    let mut rows: Vec<RecordRow> = vec![
595        RecordRow {
596            cells: vec![Cell::Absent; width]
597        };
598        n
599    ];
600
601    for (ri, &rel_off) in w.record_offsets().iter().enumerate() {
602        let obj_off = rel_off as usize;
603        let cells = decode_row_object(w.data_buf(), obj_off, width, w.slot_sigils())?;
604        rows[ri] = RecordRow { cells };
605    }
606    finish_columnar(&keys, &rows)
607}
608
609fn decode_row_object(buf: &[u8], obj_off: usize, width: usize, sigils: &[u8]) -> Result<Vec<Cell>> {
610    const MAGIC_OBJ: u32 = 0x4E59_584F;
611    if obj_off + 8 > buf.len() {
612        return Err(NxsError::OutOfBounds);
613    }
614    if u32::from_le_bytes(buf[obj_off..obj_off + 4].try_into().unwrap()) != MAGIC_OBJ {
615        return Err(NxsError::BadMagic);
616    }
617    let mut cells = vec![Cell::Absent; width];
618    let mut p = obj_off + 8;
619    let mut slot = 0usize;
620    let mut present = vec![false; width];
621    while slot < width {
622        if p >= buf.len() {
623            return Err(NxsError::OutOfBounds);
624        }
625        let b = buf[p];
626        p += 1;
627        let bits = b & 0x7F;
628        for bit in 0..7 {
629            if slot >= width {
630                break;
631            }
632            present[slot] = (bits >> bit) & 1 != 0;
633            slot += 1;
634        }
635        if b & 0x80 == 0 {
636            break;
637        }
638    }
639    let table_start = p;
640    let mut rank = 0u16;
641    for s in 0..width {
642        if !present[s] {
643            continue;
644        }
645        let ot = table_start + (rank as usize) * 2;
646        if ot + 2 > buf.len() {
647            return Err(NxsError::OutOfBounds);
648        }
649        let rel = u16::from_le_bytes(
650            buf[ot..ot + 2]
651                .try_into()
652                .map_err(|_| NxsError::OutOfBounds)?,
653        );
654        let off = obj_off + rel as usize;
655        let sig = sigils.get(s).copied().unwrap_or(b'=');
656        cells[s] = read_cell_at(buf, off, sig)?;
657        rank += 1;
658    }
659    Ok(cells)
660}
661
662fn read_cell_at(buf: &[u8], off: usize, sigil: u8) -> Result<Cell> {
663    match sigil {
664        b'=' => Ok(Cell::I64(i64::from_le_bytes(
665            buf[off..off + 8]
666                .try_into()
667                .map_err(|_| NxsError::OutOfBounds)?,
668        ))),
669        b'~' => Ok(Cell::F64(f64::from_le_bytes(
670            buf[off..off + 8]
671                .try_into()
672                .map_err(|_| NxsError::OutOfBounds)?,
673        ))),
674        b'?' => Ok(Cell::Bool(buf[off] != 0)),
675        b'@' => Ok(Cell::Time(i64::from_le_bytes(
676            buf[off..off + 8]
677                .try_into()
678                .map_err(|_| NxsError::OutOfBounds)?,
679        ))),
680        b'^' => Ok(Cell::Null),
681        b'"' => {
682            if off + 4 > buf.len() {
683                return Err(NxsError::OutOfBounds);
684            }
685            let len = u32::from_le_bytes(
686                buf[off..off + 4]
687                    .try_into()
688                    .map_err(|_| NxsError::OutOfBounds)?,
689            ) as usize;
690            if off + 4 + len > buf.len() {
691                return Err(NxsError::OutOfBounds);
692            }
693            let s = std::str::from_utf8(&buf[off + 4..off + 4 + len])
694                .map_err(|_| NxsError::ParseError("invalid UTF-8 in string field".into()))?;
695            Ok(Cell::Str(s.to_string()))
696        }
697        b'<' => {
698            if off + 4 > buf.len() {
699                return Err(NxsError::OutOfBounds);
700            }
701            let len = u32::from_le_bytes(
702                buf[off..off + 4]
703                    .try_into()
704                    .map_err(|_| NxsError::OutOfBounds)?,
705            ) as usize;
706            if off + 4 + len > buf.len() {
707                return Err(NxsError::OutOfBounds);
708            }
709            Ok(Cell::Binary(buf[off + 4..off + 4 + len].to_vec()))
710        }
711        b'$' => Err(NxsError::UnsupportedFieldType),
712        _ => Err(NxsError::OutOfBounds),
713    }
714}
715
716pub fn footer_size(flags: u16) -> usize {
717    if flags & FLAG_PAX != 0 {
718        FOOTER_PAX
719    } else if flags & FLAG_COLUMNAR != 0 {
720        FOOTER_COLUMNAR
721    } else {
722        FOOTER_ROW
723    }
724}
725
726#[cfg(test)]
727mod tests {
728    use super::*;
729
730    fn flat8_records(n: usize, dense: bool) -> (Vec<String>, Vec<RecordRow>) {
731        let keys = vec!["id".into(), "score".into(), "active".into(), "ts".into()];
732        let mut rows = Vec::new();
733        for i in 0..n {
734            let mut cells = vec![Cell::Absent; 4];
735            if dense || i % 10 == 0 {
736                cells[0] = Cell::I64(i as i64);
737                cells[1] = Cell::F64(i as f64 * 0.5);
738                cells[2] = Cell::Bool(i % 2 == 0);
739                cells[3] = Cell::Time(i as i64 * 1_000_000);
740            }
741            rows.push(RecordRow { cells });
742        }
743        (keys, rows)
744    }
745
746    #[test]
747    fn columnar_roundtrip_magic() {
748        let (keys, rows) = flat8_records(100, true);
749        let bytes = finish_columnar(&keys, &rows).unwrap();
750        assert_eq!(&bytes[0..4], &MAGIC_FILE.to_le_bytes());
751        let flags = u16::from_le_bytes(bytes[6..8].try_into().unwrap());
752        assert!(flags & FLAG_COLUMNAR != 0);
753        assert_eq!(
754            u32::from_le_bytes(bytes[bytes.len() - 4..].try_into().unwrap()),
755            MAGIC_FOOTER
756        );
757    }
758
759    #[test]
760    fn pax_roundtrip_footer() {
761        let (keys, rows) = flat8_records(1000, true);
762        let bytes = finish_pax(&keys, &rows, 256).unwrap();
763        let flags = u16::from_le_bytes(bytes[6..8].try_into().unwrap());
764        assert!(flags & FLAG_PAX != 0);
765        assert_eq!(footer_size(flags), FOOTER_PAX);
766    }
767
768    #[test]
769    fn invalid_flags_rejected() {
770        assert!(validate_preamble_flags(FLAG_COLUMNAR | FLAG_PAX).is_err());
771    }
772}