1pub use crate::consts::{
9 FLAG_COLUMNAR, FLAG_PAX, FLAG_SCHEMA_EMBEDDED, MAGIC_FILE, MAGIC_FOOTER, MAGIC_PAGE, VERSION,
10};
11use crate::error::{NxsError, Result};
12use crate::parser::{Field, Value};
13use crate::writer::{build_schema, murmur3_64, NxsWriter};
14use std::collections::HashMap;
15
16const FOOTER_ROW: usize = 12;
17const FOOTER_COLUMNAR: usize = 20;
18const FOOTER_PAX: usize = 28;
19
20#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
22pub enum Layout {
23 #[default]
24 Row,
25 Columnar,
26 Pax,
27}
28
29impl Layout {
30 pub fn parse_name(s: &str) -> Option<Self> {
31 match s {
32 "row" => Some(Layout::Row),
33 "columnar" => Some(Layout::Columnar),
34 "pax" => Some(Layout::Pax),
35 _ => None,
36 }
37 }
38
39 pub fn flag(self) -> u16 {
40 match self {
41 Layout::Row => 0,
42 Layout::Columnar => FLAG_COLUMNAR,
43 Layout::Pax => FLAG_PAX,
44 }
45 }
46}
47
48#[derive(Debug, Clone, Default)]
50pub struct CompileOptions {
51 pub layout: Layout,
52 pub page_size: u32,
53}
54
55impl CompileOptions {
56 pub fn validate_flags(&self, tail_ptr_zero: bool) -> Result<()> {
57 if self.layout == Layout::Columnar && tail_ptr_zero {
58 return Err(NxsError::IncompatibleFlags);
59 }
60 Ok(())
61 }
62}
63
64pub fn apply_pragma(opts: &mut CompileOptions, name: &str, value: &str) -> Result<()> {
66 match name {
67 "layout" => {
68 opts.layout = Layout::parse_name(value)
69 .ok_or_else(|| NxsError::ParseError(format!("unknown layout: {value}")))?;
70 }
71 "page-size" => {
72 opts.page_size = value
73 .parse()
74 .map_err(|_| NxsError::ParseError(format!("bad page-size: {value}")))?;
75 if opts.page_size == 0 {
76 return Err(NxsError::ParseError("page-size must be > 0".into()));
77 }
78 }
79 other => {
80 return Err(NxsError::ParseError(format!("unknown pragma: @{other}")));
81 }
82 }
83 Ok(())
84}
85
86pub fn validate_preamble_flags(flags: u16) -> Result<()> {
88 let col = flags & FLAG_COLUMNAR != 0;
89 let pax = flags & FLAG_PAX != 0;
90 if col && pax {
91 return Err(NxsError::InvalidFlags);
92 }
93 if (col || pax) && flags & FLAG_SCHEMA_EMBEDDED == 0 {
94 return Err(NxsError::ParseError(
95 "columnar/PAX requires FLAG_SCHEMA_EMBEDDED".into(),
96 ));
97 }
98 Ok(())
99}
100
101#[derive(Clone, Debug, PartialEq)]
104pub enum Cell {
105 Absent,
106 Null,
107 I64(i64),
108 F64(f64),
109 Bool(bool),
110 Time(i64),
111 Str(String),
112 Binary(Vec<u8>),
113}
114
115impl Cell {
116 fn from_value(v: &Value) -> Result<Self> {
117 match v {
118 Value::Int(n) => Ok(Cell::I64(*n)),
119 Value::Float(f) => Ok(Cell::F64(*f)),
120 Value::Bool(b) => Ok(Cell::Bool(*b)),
121 Value::Time(ns) => Ok(Cell::Time(*ns)),
122 Value::Null => Ok(Cell::Null),
123 Value::Str(s) => Ok(Cell::Str(s.clone())),
124 Value::Binary(b) => Ok(Cell::Binary(b.clone())),
125 Value::Keyword(_) => Err(NxsError::UnsupportedFieldType),
126 Value::Object(_) | Value::List(_) | Value::Macro(_) | Value::Link(_) => Err(
127 NxsError::ParseError("nested values not supported in columnar/PAX records".into()),
128 ),
129 }
130 }
131
132 fn sigil(self) -> u8 {
133 match self {
134 Cell::I64(_) => b'=',
135 Cell::F64(_) => b'~',
136 Cell::Bool(_) => b'?',
137 Cell::Time(_) => b'@',
138 Cell::Str(_) => b'"',
139 Cell::Binary(_) => b'<',
140 Cell::Null => b'^',
141 Cell::Absent => 0,
142 }
143 }
144}
145
146pub fn is_var_sigil(sigil: u8) -> bool {
148 matches!(sigil, b'"' | b'<')
149}
150
151pub fn column_sector_len(sector: &[u8], record_count: usize, sigil: u8) -> Result<usize> {
153 let bm_len = null_bitmap_bytes(record_count);
154 if sector.len() < bm_len {
155 return Err(NxsError::OutOfBounds);
156 }
157 if is_var_sigil(sigil) {
158 let off_bytes = record_count
159 .checked_add(1)
160 .and_then(|n| n.checked_mul(4))
161 .ok_or(NxsError::OutOfBounds)?;
162 if sector.len() < bm_len.checked_add(off_bytes).ok_or(NxsError::OutOfBounds)? {
163 return Err(NxsError::OutOfBounds);
164 }
165 let end_off = bm_len
166 .checked_add(record_count.checked_mul(4).ok_or(NxsError::OutOfBounds)?)
167 .ok_or(NxsError::OutOfBounds)?;
168 let last = u32::from_le_bytes(
169 sector[end_off..end_off + 4]
170 .try_into()
171 .map_err(|_| NxsError::OutOfBounds)?,
172 ) as usize;
173 bm_len
174 .checked_add(off_bytes)
175 .and_then(|x| x.checked_add(last))
176 .ok_or(NxsError::OutOfBounds)
177 } else {
178 let cells = record_count.checked_mul(8).ok_or(NxsError::OutOfBounds)?;
179 bm_len.checked_add(cells).ok_or(NxsError::OutOfBounds)
180 }
181}
182
183pub fn col_var_parts(sector: &[u8], record_count: usize) -> Result<(&[u8], &[u8], &[u8])> {
185 let bm_len = null_bitmap_bytes(record_count);
186 let off_bytes = record_count
187 .checked_add(1)
188 .and_then(|n| n.checked_mul(4))
189 .ok_or(NxsError::OutOfBounds)?;
190 if sector.len() < bm_len.saturating_add(off_bytes) {
191 return Err(NxsError::OutOfBounds);
192 }
193 let bm = §or[..bm_len];
194 let offsets = §or[bm_len..bm_len + off_bytes];
195 let values = §or[bm_len + off_bytes..];
196 Ok((bm, offsets, values))
197}
198
199pub fn var_str_at<'a>(offsets: &'a [u8], values: &'a [u8], record_index: usize) -> Option<&'a str> {
201 let need = record_index.checked_add(2).and_then(|n| n.checked_mul(4))?;
202 if offsets.len() < need {
203 return None;
204 }
205 let start = u32::from_le_bytes(
206 offsets[record_index * 4..record_index * 4 + 4]
207 .try_into()
208 .ok()?,
209 ) as usize;
210 let end = u32::from_le_bytes(
211 offsets[record_index * 4 + 4..record_index * 4 + 8]
212 .try_into()
213 .ok()?,
214 ) as usize;
215 if end < start || end > values.len() {
216 return None;
217 }
218 std::str::from_utf8(&values[start..end]).ok()
219}
220
221pub fn var_binary_at<'a>(
223 offsets: &'a [u8],
224 values: &'a [u8],
225 record_index: usize,
226) -> Option<&'a [u8]> {
227 let need = record_index.checked_add(2).and_then(|n| n.checked_mul(4))?;
228 if offsets.len() < need {
229 return None;
230 }
231 let start = u32::from_le_bytes(
232 offsets[record_index * 4..record_index * 4 + 4]
233 .try_into()
234 .ok()?,
235 ) as usize;
236 let end = u32::from_le_bytes(
237 offsets[record_index * 4 + 4..record_index * 4 + 8]
238 .try_into()
239 .ok()?,
240 ) as usize;
241 if end < start || end > values.len() {
242 return None;
243 }
244 Some(&values[start..end])
245}
246
247#[derive(Clone)]
248pub struct RecordRow {
249 pub cells: Vec<Cell>,
250}
251
252pub fn records_from_fields(fields: &[Field]) -> Result<(Vec<String>, Vec<RecordRow>)> {
254 let mut key_order: Vec<String> = Vec::new();
255 let mut key_index: HashMap<String, usize> = HashMap::new();
256 let mut rows: Vec<RecordRow> = Vec::new();
257
258 for field in fields {
259 let Value::Object(inner) = &field.value else {
260 return Err(NxsError::ParseError(
261 "columnar/PAX compile expects top-level `name { ... }` record blocks".into(),
262 ));
263 };
264 let mut cells = Vec::new();
265 for f in inner {
266 let cell = Cell::from_value(&f.value)?;
267 let idx = if let Some(&i) = key_index.get(&f.key) {
268 i
269 } else {
270 let i = key_order.len();
271 key_order.push(f.key.clone());
272 key_index.insert(f.key.clone(), i);
273 cells.resize(i, Cell::Absent);
274 i
275 };
276 if cells.len() <= idx {
277 cells.resize(idx + 1, Cell::Absent);
278 }
279 cells[idx] = cell;
280 }
281 rows.push(RecordRow { cells });
282 }
283
284 if rows.is_empty() {
285 return Err(NxsError::ParseError("no records to compile".into()));
286 }
287
288 let width = key_order.len();
289 for row in &mut rows {
290 row.cells.resize(width, Cell::Absent);
291 }
292 Ok((key_order, rows))
293}
294
295pub(crate) fn null_bitmap_bytes(n: usize) -> usize {
297 let raw = (n + 7) / 8;
298 ((raw + 7) / 8) * 8
299}
300
301fn encode_null_bitmap(n: usize, present: impl Fn(usize) -> bool) -> Vec<u8> {
302 let len = null_bitmap_bytes(n);
303 let mut b = vec![0u8; len];
304 for i in 0..n {
305 if present(i) {
306 b[i / 8] |= 1 << (i % 8);
307 }
308 }
309 b
310}
311
312fn cell_populated(c: &Cell) -> bool {
313 !matches!(c, Cell::Absent)
314}
315
316fn write_fixed_buffer(n: usize, cells: &[&Cell], encode: impl Fn(&Cell) -> [u8; 8]) -> Vec<u8> {
317 let mut buf = vec![0u8; n * 8];
318 for (i, c) in cells.iter().enumerate().take(n) {
319 if cell_populated(c) {
320 buf[i * 8..(i + 1) * 8].copy_from_slice(&encode(c));
321 }
322 }
323 buf
324}
325
326fn encode_var_column(n: usize, col: &[&Cell]) -> Result<Vec<u8>> {
327 let present = |i: usize| cell_populated(col[i]);
328 let bitmap = encode_null_bitmap(n, present);
329 let mut offsets: Vec<u32> = Vec::with_capacity(n + 1);
330 let mut values: Vec<u8> = Vec::new();
331 offsets.push(0);
332 for cell in col.iter().take(n) {
333 if !cell_populated(cell) {
334 offsets.push(*offsets.last().unwrap_or(&0));
335 continue;
336 }
337 match cell {
338 Cell::Str(s) => values.extend_from_slice(s.as_bytes()),
339 Cell::Binary(b) => values.extend_from_slice(b),
340 _ => {}
341 }
342 let end = values.len();
343 if end > u32::MAX as usize {
344 return Err(NxsError::Overflow);
345 }
346 offsets.push(end as u32);
347 }
348 let mut out = bitmap;
349 for o in offsets {
350 out.extend_from_slice(&o.to_le_bytes());
351 }
352 out.extend_from_slice(&values);
353 Ok(out)
354}
355
356fn encode_field_column(n: usize, col: &[&Cell], sigil: u8) -> Result<Vec<u8>> {
357 if is_var_sigil(sigil) {
358 return encode_var_column(n, col);
359 }
360 let present = |i: usize| cell_populated(col[i]);
361 let bitmap = encode_null_bitmap(n, present);
362 let values = match sigil {
363 b'=' => write_fixed_buffer(n, col, |c| match c {
364 Cell::I64(v) => v.to_le_bytes(),
365 Cell::Null | Cell::Absent => 0i64.to_le_bytes(),
366 _ => [0u8; 8],
367 }),
368 b'~' => write_fixed_buffer(n, col, |c| match c {
369 Cell::F64(v) => v.to_le_bytes(),
370 Cell::Null | Cell::Absent => 0f64.to_le_bytes(),
371 _ => [0u8; 8],
372 }),
373 b'?' => write_fixed_buffer(n, col, |c| match c {
374 Cell::Bool(v) => {
375 let mut b = [0u8; 8];
376 b[0] = if *v { 1 } else { 0 };
377 b
378 }
379 Cell::Null | Cell::Absent => [0u8; 8],
380 _ => [0u8; 8],
381 }),
382 b'@' => write_fixed_buffer(n, col, |c| match c {
383 Cell::Time(v) => v.to_le_bytes(),
384 Cell::Null | Cell::Absent => 0i64.to_le_bytes(),
385 _ => [0u8; 8],
386 }),
387 b'$' => return Err(NxsError::UnsupportedFieldType),
388 _ => write_fixed_buffer(n, col, |c| match c {
389 Cell::I64(v) => v.to_le_bytes(),
390 Cell::Null | Cell::Absent => 0i64.to_le_bytes(),
391 _ => [0u8; 8],
392 }),
393 };
394 let mut out = bitmap;
395 out.extend_from_slice(&values);
396 Ok(out)
397}
398
399pub(crate) fn sigils_for_keys(keys: &[String], rows: &[RecordRow]) -> Vec<u8> {
400 keys.iter()
401 .enumerate()
402 .map(|(fi, _)| {
403 for row in rows {
404 let c = row.cells.get(fi).cloned().unwrap_or(Cell::Absent);
405 if c != Cell::Absent {
406 return c.sigil();
407 }
408 }
409 b'='
410 })
411 .collect()
412}
413
414pub fn finish_columnar(keys: &[String], rows: &[RecordRow]) -> Result<Vec<u8>> {
416 let n = rows.len();
417 let sigils = sigils_for_keys(keys, rows);
418 let schema_bytes = build_schema(
419 &keys.iter().map(|s| s.to_string()).collect::<Vec<_>>(),
420 &sigils,
421 );
422 let dict_hash = murmur3_64(&schema_bytes);
423
424 let mut data = Vec::new();
425 let mut tail_entries: Vec<(u16, u64, u64)> = Vec::new();
426 for (fi, sigil) in sigils.iter().enumerate() {
427 let col: Vec<&Cell> = rows
428 .iter()
429 .map(|r| r.cells.get(fi).unwrap_or(&Cell::Absent))
430 .collect();
431 let field_buf = encode_field_column(n, &col, *sigil)?;
432 let offset = 32 + schema_bytes.len() as u64 + data.len() as u64;
433 let length = field_buf.len() as u64;
434 tail_entries.push((fi as u16, offset, length));
435 data.extend_from_slice(&field_buf);
436 }
437
438 let tail_index_offset = 32 + schema_bytes.len() as u64 + data.len() as u64;
439 let mut tail = Vec::new();
440 for (fid, off, len) in &tail_entries {
441 tail.extend_from_slice(&fid.to_le_bytes());
442 tail.extend_from_slice(&0u16.to_le_bytes());
443 tail.extend_from_slice(&off.to_le_bytes());
444 tail.extend_from_slice(&len.to_le_bytes());
445 }
446 tail.extend_from_slice(&tail_index_offset.to_le_bytes());
447 tail.extend_from_slice(&(n as u64).to_le_bytes());
448 tail.extend_from_slice(&MAGIC_FOOTER.to_le_bytes());
449
450 let flags = FLAG_SCHEMA_EMBEDDED | FLAG_COLUMNAR;
451 let mut out = Vec::with_capacity(32 + schema_bytes.len() + data.len() + tail.len());
452 out.extend_from_slice(&MAGIC_FILE.to_le_bytes());
453 out.extend_from_slice(&VERSION.to_le_bytes());
454 out.extend_from_slice(&flags.to_le_bytes());
455 out.extend_from_slice(&dict_hash.to_le_bytes());
456 out.extend_from_slice(&tail_index_offset.to_le_bytes());
457 out.extend_from_slice(&0u64.to_le_bytes());
458 out.extend_from_slice(&schema_bytes);
459 out.extend_from_slice(&data);
460 out.extend_from_slice(&tail);
461 Ok(out)
462}
463
464pub fn finish_pax(keys: &[String], rows: &[RecordRow], page_size: u32) -> Result<Vec<u8>> {
466 if page_size == 0 {
467 return Err(NxsError::ParseError("page_size must be > 0".into()));
468 }
469 let n = rows.len();
470 let sigils = sigils_for_keys(keys, rows);
471 let schema_bytes = build_schema(
472 &keys.iter().map(|s| s.to_string()).collect::<Vec<_>>(),
473 &sigils,
474 );
475 let dict_hash = murmur3_64(&schema_bytes);
476
477 let mut data = Vec::new();
478 let mut pages: Vec<(u32, u64, u32, u64, u32)> = Vec::new();
479 let mut page_idx = 0u32;
480 let mut rec_start = 0u64;
481 let mut i = 0usize;
482 while i < n {
483 let count = ((n - i) as u32).min(page_size);
484 let page_records = &rows[i..i + count as usize];
485 let page_off = 32 + schema_bytes.len() as u64 + data.len() as u64;
486 let page_bytes = encode_page(
487 page_idx,
488 rec_start,
489 count,
490 keys.len(),
491 &sigils,
492 page_records,
493 )?;
494 let page_len = page_bytes.len() as u32;
495 pages.push((page_idx, rec_start, count, page_off, page_len));
496 data.extend_from_slice(&page_bytes);
497 page_idx += 1;
498 rec_start += count as u64;
499 i += count as usize;
500 }
501
502 let tail_index_offset = 32 + schema_bytes.len() as u64 + data.len() as u64;
503 let mut tail = Vec::new();
504 for (pidx, rstart, rc, poff, plen) in &pages {
505 tail.extend_from_slice(&pidx.to_le_bytes());
506 tail.extend_from_slice(&rstart.to_le_bytes());
507 tail.extend_from_slice(&rc.to_le_bytes());
508 tail.extend_from_slice(&poff.to_le_bytes());
509 tail.extend_from_slice(&plen.to_le_bytes());
510 }
511 tail.extend_from_slice(&tail_index_offset.to_le_bytes());
512 tail.extend_from_slice(&(n as u64).to_le_bytes());
513 tail.extend_from_slice(&(pages.len() as u32).to_le_bytes());
514 tail.extend_from_slice(&page_size.to_le_bytes());
515 tail.extend_from_slice(&MAGIC_FOOTER.to_le_bytes());
516
517 let flags = FLAG_SCHEMA_EMBEDDED | FLAG_PAX;
518 let mut out = Vec::with_capacity(32 + schema_bytes.len() + data.len() + tail.len());
519 out.extend_from_slice(&MAGIC_FILE.to_le_bytes());
520 out.extend_from_slice(&VERSION.to_le_bytes());
521 out.extend_from_slice(&flags.to_le_bytes());
522 out.extend_from_slice(&dict_hash.to_le_bytes());
523 out.extend_from_slice(&tail_index_offset.to_le_bytes());
524 out.extend_from_slice(&0u64.to_le_bytes());
525 out.extend_from_slice(&schema_bytes);
526 out.extend_from_slice(&data);
527 out.extend_from_slice(&tail);
528 Ok(out)
529}
530
531pub(crate) fn encode_page(
532 page_index: u32,
533 record_start: u64,
534 record_count: u32,
535 field_count: usize,
536 sigils: &[u8],
537 rows: &[RecordRow],
538) -> Result<Vec<u8>> {
539 let n = rows.len();
540 let mut body = Vec::new();
541 for fi in 0..field_count {
542 let col: Vec<&Cell> = rows
543 .iter()
544 .map(|r| r.cells.get(fi).unwrap_or(&Cell::Absent))
545 .collect();
546 let sig = sigils.get(fi).copied().unwrap_or(b'=');
547 body.extend_from_slice(&encode_field_column(n, &col, sig)?);
548 }
549 let header_len = 4 + 4 + 8 + 4 + 2 + 2; let page_len = header_len + body.len() + 4;
551 let mut page = Vec::with_capacity(page_len);
552 page.extend_from_slice(&MAGIC_PAGE.to_le_bytes());
553 page.extend_from_slice(&page_index.to_le_bytes());
554 page.extend_from_slice(&record_start.to_le_bytes());
555 page.extend_from_slice(&record_count.to_le_bytes());
556 page.extend_from_slice(&(field_count as u16).to_le_bytes());
557 page.extend_from_slice(&0u16.to_le_bytes());
558 page.extend_from_slice(&body);
559 page.extend_from_slice(&(page_len as u32).to_le_bytes());
560 while page.len() % 8 != 0 {
561 page.push(0);
562 }
563 Ok(page)
564}
565
566pub fn compile_fields(fields: &[Field], opts: &CompileOptions) -> Result<Vec<u8>> {
568 match opts.layout {
569 Layout::Row => {
570 let mut compiler = crate::compiler::Compiler::new();
571 compiler.compile(fields)
572 }
573 Layout::Columnar | Layout::Pax => {
574 let (keys, rows) = records_from_fields(fields)?;
575 if opts.layout == Layout::Columnar {
576 finish_columnar(&keys, &rows)
577 } else {
578 let ps = if opts.page_size == 0 {
579 4096
580 } else {
581 opts.page_size
582 };
583 finish_pax(&keys, &rows, ps)
584 }
585 }
586 }
587}
588
589pub fn columnar_from_writer(w: &NxsWriter<'_>) -> Result<Vec<u8>> {
591 let keys: Vec<String> = w.schema_keys().to_vec();
592 let n = w.record_offsets().len();
593 let width = keys.len();
594 let mut rows: Vec<RecordRow> = vec![
595 RecordRow {
596 cells: vec![Cell::Absent; width]
597 };
598 n
599 ];
600
601 for (ri, &rel_off) in w.record_offsets().iter().enumerate() {
602 let obj_off = rel_off as usize;
603 let cells = decode_row_object(w.data_buf(), obj_off, width, w.slot_sigils())?;
604 rows[ri] = RecordRow { cells };
605 }
606 finish_columnar(&keys, &rows)
607}
608
609fn decode_row_object(buf: &[u8], obj_off: usize, width: usize, sigils: &[u8]) -> Result<Vec<Cell>> {
610 const MAGIC_OBJ: u32 = 0x4E59_584F;
611 if obj_off + 8 > buf.len() {
612 return Err(NxsError::OutOfBounds);
613 }
614 if u32::from_le_bytes(buf[obj_off..obj_off + 4].try_into().unwrap()) != MAGIC_OBJ {
615 return Err(NxsError::BadMagic);
616 }
617 let mut cells = vec![Cell::Absent; width];
618 let mut p = obj_off + 8;
619 let mut slot = 0usize;
620 let mut present = vec![false; width];
621 while slot < width {
622 if p >= buf.len() {
623 return Err(NxsError::OutOfBounds);
624 }
625 let b = buf[p];
626 p += 1;
627 let bits = b & 0x7F;
628 for bit in 0..7 {
629 if slot >= width {
630 break;
631 }
632 present[slot] = (bits >> bit) & 1 != 0;
633 slot += 1;
634 }
635 if b & 0x80 == 0 {
636 break;
637 }
638 }
639 let table_start = p;
640 let mut rank = 0u16;
641 for s in 0..width {
642 if !present[s] {
643 continue;
644 }
645 let ot = table_start + (rank as usize) * 2;
646 if ot + 2 > buf.len() {
647 return Err(NxsError::OutOfBounds);
648 }
649 let rel = u16::from_le_bytes(
650 buf[ot..ot + 2]
651 .try_into()
652 .map_err(|_| NxsError::OutOfBounds)?,
653 );
654 let off = obj_off + rel as usize;
655 let sig = sigils.get(s).copied().unwrap_or(b'=');
656 cells[s] = read_cell_at(buf, off, sig)?;
657 rank += 1;
658 }
659 Ok(cells)
660}
661
662fn read_cell_at(buf: &[u8], off: usize, sigil: u8) -> Result<Cell> {
663 match sigil {
664 b'=' => Ok(Cell::I64(i64::from_le_bytes(
665 buf[off..off + 8]
666 .try_into()
667 .map_err(|_| NxsError::OutOfBounds)?,
668 ))),
669 b'~' => Ok(Cell::F64(f64::from_le_bytes(
670 buf[off..off + 8]
671 .try_into()
672 .map_err(|_| NxsError::OutOfBounds)?,
673 ))),
674 b'?' => Ok(Cell::Bool(buf[off] != 0)),
675 b'@' => Ok(Cell::Time(i64::from_le_bytes(
676 buf[off..off + 8]
677 .try_into()
678 .map_err(|_| NxsError::OutOfBounds)?,
679 ))),
680 b'^' => Ok(Cell::Null),
681 b'"' => {
682 if off + 4 > buf.len() {
683 return Err(NxsError::OutOfBounds);
684 }
685 let len = u32::from_le_bytes(
686 buf[off..off + 4]
687 .try_into()
688 .map_err(|_| NxsError::OutOfBounds)?,
689 ) as usize;
690 if off + 4 + len > buf.len() {
691 return Err(NxsError::OutOfBounds);
692 }
693 let s = std::str::from_utf8(&buf[off + 4..off + 4 + len])
694 .map_err(|_| NxsError::ParseError("invalid UTF-8 in string field".into()))?;
695 Ok(Cell::Str(s.to_string()))
696 }
697 b'<' => {
698 if off + 4 > buf.len() {
699 return Err(NxsError::OutOfBounds);
700 }
701 let len = u32::from_le_bytes(
702 buf[off..off + 4]
703 .try_into()
704 .map_err(|_| NxsError::OutOfBounds)?,
705 ) as usize;
706 if off + 4 + len > buf.len() {
707 return Err(NxsError::OutOfBounds);
708 }
709 Ok(Cell::Binary(buf[off + 4..off + 4 + len].to_vec()))
710 }
711 b'$' => Err(NxsError::UnsupportedFieldType),
712 _ => Err(NxsError::OutOfBounds),
713 }
714}
715
716pub fn footer_size(flags: u16) -> usize {
717 if flags & FLAG_PAX != 0 {
718 FOOTER_PAX
719 } else if flags & FLAG_COLUMNAR != 0 {
720 FOOTER_COLUMNAR
721 } else {
722 FOOTER_ROW
723 }
724}
725
726#[cfg(test)]
727mod tests {
728 use super::*;
729
730 fn flat8_records(n: usize, dense: bool) -> (Vec<String>, Vec<RecordRow>) {
731 let keys = vec!["id".into(), "score".into(), "active".into(), "ts".into()];
732 let mut rows = Vec::new();
733 for i in 0..n {
734 let mut cells = vec![Cell::Absent; 4];
735 if dense || i % 10 == 0 {
736 cells[0] = Cell::I64(i as i64);
737 cells[1] = Cell::F64(i as f64 * 0.5);
738 cells[2] = Cell::Bool(i % 2 == 0);
739 cells[3] = Cell::Time(i as i64 * 1_000_000);
740 }
741 rows.push(RecordRow { cells });
742 }
743 (keys, rows)
744 }
745
746 #[test]
747 fn columnar_roundtrip_magic() {
748 let (keys, rows) = flat8_records(100, true);
749 let bytes = finish_columnar(&keys, &rows).unwrap();
750 assert_eq!(&bytes[0..4], &MAGIC_FILE.to_le_bytes());
751 let flags = u16::from_le_bytes(bytes[6..8].try_into().unwrap());
752 assert!(flags & FLAG_COLUMNAR != 0);
753 assert_eq!(
754 u32::from_le_bytes(bytes[bytes.len() - 4..].try_into().unwrap()),
755 MAGIC_FOOTER
756 );
757 }
758
759 #[test]
760 fn pax_roundtrip_footer() {
761 let (keys, rows) = flat8_records(1000, true);
762 let bytes = finish_pax(&keys, &rows, 256).unwrap();
763 let flags = u16::from_le_bytes(bytes[6..8].try_into().unwrap());
764 assert!(flags & FLAG_PAX != 0);
765 assert_eq!(footer_size(flags), FOOTER_PAX);
766 }
767
768 #[test]
769 fn invalid_flags_rejected() {
770 assert!(validate_preamble_flags(FLAG_COLUMNAR | FLAG_PAX).is_err());
771 }
772}