Skip to main content

sochdb_storage/
hybrid_store.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// SochDB - LLM-Optimized Embedded Database
3// Copyright (C) 2026 Sushanth Reddy Vanagala (https://github.com/sushanthpy)
4//
5// This program is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Affero General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// This program is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU Affero General Public License for more details.
14//
15// You should have received a copy of the GNU Affero General Public License
16// along with this program. If not, see <https://www.gnu.org/licenses/>.
17
18//! Adaptive Hybrid Storage (AHS) - PAX Block Layout
19//!
20//! From mm.md Task 4.1: Hybrid Row-Column Storage with PAX Blocks
21//!
22//! ## Problem
23//!
24//! Current storage uses pure row format requiring full row materialization
25//! even for single-column queries. LLMs often need specific columns
26//! (e.g., just `summary` field for context building).
27//!
28//! ## Solution
29//!
30//! PAX (Partition Attributes Across) provides the best of both worlds:
31//! - Row-oriented at block level (good for point queries)
32//! - Column-oriented within blocks (good for scans)
33//!
34//! ## Layout
35//!
36//! ```text
37//! Block Size = 64KB (L2 cache friendly)
38//!
39//! ┌─────────────────────────────────────────┐
40//! │ Block Header (64 bytes)                 │
41//! │  - row_count, column_count              │
42//! │  - minipage_offsets: [u32; col_count]   │
43//! ├─────────────────────────────────────────┤
44//! │ Null Bitmap (packed)                    │
45//! ├─────────────────────────────────────────┤
46//! │ Minipage 0 (Column 0 values)            │
47//! │ Minipage 1 (Column 1 values)            │
48//! │ ...                                     │
49//! └─────────────────────────────────────────┘
50//! ```
51//!
52//! ## Cache-Oblivious Analysis
53//!
54//! ```text
55//! Cache line = 64 bytes, i64 column: 8 values/line
56//!
57//! Row-store scan (all columns):
58//!   Cache misses = O(N × cols / B) where B = block transfer
59//!
60//! PAX with column pruning (k columns):
61//!   Cache misses = O(N × k / B)
62//!
63//! For 10-column table, selecting 2 columns:
64//!   Bandwidth reduction = 10/2 = 5×
65//! ```
66
67use std::io::{self, Read, Write};
68
69use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
70
71/// Default PAX block size (64KB - L2 cache friendly)
72pub const PAX_BLOCK_SIZE: usize = 64 * 1024;
73
74/// Maximum columns per block
75pub const MAX_COLUMNS: usize = 256;
76
77/// PAX block header size
78pub const PAX_HEADER_SIZE: usize = 64;
79
80/// Magic number for PAX blocks
81pub const PAX_MAGIC: u32 = 0x50415821; // "PAX!"
82
83/// Column type for PAX storage
84#[derive(Debug, Clone, Copy, PartialEq, Eq)]
85#[repr(u8)]
86pub enum PaxColumnType {
87    /// Boolean (bit-packed)
88    Bool = 0,
89    /// 8-bit integer
90    Int8 = 1,
91    /// 16-bit integer
92    Int16 = 2,
93    /// 32-bit integer
94    Int32 = 3,
95    /// 64-bit integer
96    Int64 = 4,
97    /// 32-bit float
98    Float32 = 5,
99    /// 64-bit float
100    Float64 = 6,
101    /// Variable-length binary (offset + data)
102    VarBinary = 7,
103    /// Fixed-size binary
104    FixedBinary = 8,
105}
106
107impl PaxColumnType {
108    /// Get fixed size in bytes, None for variable-length
109    pub fn fixed_size(&self) -> Option<usize> {
110        match self {
111            PaxColumnType::Bool => Some(1), // Stored as byte for simplicity
112            PaxColumnType::Int8 => Some(1),
113            PaxColumnType::Int16 => Some(2),
114            PaxColumnType::Int32 => Some(4),
115            PaxColumnType::Int64 => Some(8),
116            PaxColumnType::Float32 => Some(4),
117            PaxColumnType::Float64 => Some(8),
118            PaxColumnType::VarBinary => None,
119            PaxColumnType::FixedBinary => None, // Size is per-column
120        }
121    }
122
123    pub fn from_byte(b: u8) -> Option<Self> {
124        match b {
125            0 => Some(Self::Bool),
126            1 => Some(Self::Int8),
127            2 => Some(Self::Int16),
128            3 => Some(Self::Int32),
129            4 => Some(Self::Int64),
130            5 => Some(Self::Float32),
131            6 => Some(Self::Float64),
132            7 => Some(Self::VarBinary),
133            8 => Some(Self::FixedBinary),
134            _ => None,
135        }
136    }
137}
138
139/// Column definition for PAX
140#[derive(Debug, Clone)]
141pub struct PaxColumnDef {
142    /// Column name
143    pub name: String,
144    /// Column type
145    pub col_type: PaxColumnType,
146    /// Fixed size for FixedBinary columns
147    pub fixed_size: Option<u16>,
148    /// Whether column is nullable
149    pub nullable: bool,
150}
151
152impl PaxColumnDef {
153    pub fn new(name: impl Into<String>, col_type: PaxColumnType) -> Self {
154        Self {
155            name: name.into(),
156            col_type,
157            fixed_size: None,
158            nullable: true,
159        }
160    }
161
162    pub fn with_fixed_size(mut self, size: u16) -> Self {
163        self.fixed_size = Some(size);
164        self
165    }
166
167    pub fn not_null(mut self) -> Self {
168        self.nullable = false;
169        self
170    }
171}
172
173/// PAX schema
174#[derive(Debug, Clone)]
175pub struct PaxSchema {
176    pub columns: Vec<PaxColumnDef>,
177}
178
179impl PaxSchema {
180    pub fn new(columns: Vec<PaxColumnDef>) -> Self {
181        Self { columns }
182    }
183
184    pub fn column_count(&self) -> usize {
185        self.columns.len()
186    }
187
188    /// Check if schema has any nullable columns
189    pub fn has_nullable(&self) -> bool {
190        self.columns.iter().any(|c| c.nullable)
191    }
192
193    /// Check if schema has any variable-length columns
194    pub fn has_variable(&self) -> bool {
195        self.columns
196            .iter()
197            .any(|c| c.col_type == PaxColumnType::VarBinary)
198    }
199}
200
201/// PAX block header (64 bytes)
202#[derive(Debug, Clone)]
203pub struct PaxBlockHeader {
204    /// Magic number
205    pub magic: u32,
206    /// Version
207    pub version: u16,
208    /// Flags
209    pub flags: u16,
210    /// Number of rows in this block
211    pub row_count: u32,
212    /// Number of columns
213    pub column_count: u16,
214    /// Reserved
215    pub reserved: u16,
216    /// Offset to null bitmap (from block start)
217    pub null_bitmap_offset: u32,
218    /// Size of null bitmap in bytes
219    pub null_bitmap_size: u32,
220    /// Offsets to each minipage (from block start)
221    /// Stored after header, [u32; column_count]
222    pub minipage_offsets: Vec<u32>,
223    /// Sizes of each minipage
224    pub minipage_sizes: Vec<u32>,
225}
226
227impl PaxBlockHeader {
228    /// Header base size (without variable arrays)
229    const BASE_SIZE: usize = 24;
230
231    pub fn new(row_count: u32, column_count: usize) -> Self {
232        Self {
233            magic: PAX_MAGIC,
234            version: 1,
235            flags: 0,
236            row_count,
237            column_count: column_count as u16,
238            reserved: 0,
239            null_bitmap_offset: 0,
240            null_bitmap_size: 0,
241            minipage_offsets: vec![0; column_count],
242            minipage_sizes: vec![0; column_count],
243        }
244    }
245
246    /// Compute total header size including offset arrays
247    pub fn total_size(&self) -> usize {
248        Self::BASE_SIZE + (self.column_count as usize) * 8 // offsets + sizes
249    }
250
251    /// Write header to buffer
252    pub fn write<W: Write>(&self, w: &mut W) -> io::Result<()> {
253        w.write_u32::<LittleEndian>(self.magic)?;
254        w.write_u16::<LittleEndian>(self.version)?;
255        w.write_u16::<LittleEndian>(self.flags)?;
256        w.write_u32::<LittleEndian>(self.row_count)?;
257        w.write_u16::<LittleEndian>(self.column_count)?;
258        w.write_u16::<LittleEndian>(self.reserved)?;
259        w.write_u32::<LittleEndian>(self.null_bitmap_offset)?;
260        w.write_u32::<LittleEndian>(self.null_bitmap_size)?;
261
262        for &offset in &self.minipage_offsets {
263            w.write_u32::<LittleEndian>(offset)?;
264        }
265        for &size in &self.minipage_sizes {
266            w.write_u32::<LittleEndian>(size)?;
267        }
268
269        Ok(())
270    }
271
272    /// Read header from buffer
273    pub fn read<R: Read>(r: &mut R, _column_count: usize) -> io::Result<Self> {
274        let magic = r.read_u32::<LittleEndian>()?;
275        if magic != PAX_MAGIC {
276            return Err(io::Error::new(
277                io::ErrorKind::InvalidData,
278                "Invalid PAX magic",
279            ));
280        }
281
282        let version = r.read_u16::<LittleEndian>()?;
283        let flags = r.read_u16::<LittleEndian>()?;
284        let row_count = r.read_u32::<LittleEndian>()?;
285        let col_count = r.read_u16::<LittleEndian>()?;
286        let reserved = r.read_u16::<LittleEndian>()?;
287        let null_bitmap_offset = r.read_u32::<LittleEndian>()?;
288        let null_bitmap_size = r.read_u32::<LittleEndian>()?;
289
290        let mut minipage_offsets = vec![0u32; col_count as usize];
291        for offset in &mut minipage_offsets {
292            *offset = r.read_u32::<LittleEndian>()?;
293        }
294
295        let mut minipage_sizes = vec![0u32; col_count as usize];
296        for size in &mut minipage_sizes {
297            *size = r.read_u32::<LittleEndian>()?;
298        }
299
300        Ok(Self {
301            magic,
302            version,
303            flags,
304            row_count,
305            column_count: col_count,
306            reserved,
307            null_bitmap_offset,
308            null_bitmap_size,
309            minipage_offsets,
310            minipage_sizes,
311        })
312    }
313}
314
315/// Minipage - columnar data for a single column within a block
316#[derive(Debug)]
317pub struct Minipage {
318    /// Column index
319    pub column_idx: usize,
320    /// Raw data
321    pub data: Vec<u8>,
322    /// Column type
323    pub col_type: PaxColumnType,
324    /// Number of values
325    pub value_count: usize,
326}
327
328impl Minipage {
329    pub fn new(column_idx: usize, col_type: PaxColumnType, capacity: usize) -> Self {
330        let data_capacity = match col_type.fixed_size() {
331            Some(size) => capacity * size,
332            None => capacity * 16, // Estimate for variable-length
333        };
334
335        Self {
336            column_idx,
337            data: Vec::with_capacity(data_capacity),
338            col_type,
339            value_count: 0,
340        }
341    }
342
343    /// Write an i64 value
344    pub fn write_i64(&mut self, value: i64) {
345        self.data.extend_from_slice(&value.to_le_bytes());
346        self.value_count += 1;
347    }
348
349    /// Write an i32 value
350    pub fn write_i32(&mut self, value: i32) {
351        self.data.extend_from_slice(&value.to_le_bytes());
352        self.value_count += 1;
353    }
354
355    /// Write an f64 value
356    pub fn write_f64(&mut self, value: f64) {
357        self.data.extend_from_slice(&value.to_le_bytes());
358        self.value_count += 1;
359    }
360
361    /// Write an f32 value
362    pub fn write_f32(&mut self, value: f32) {
363        self.data.extend_from_slice(&value.to_le_bytes());
364        self.value_count += 1;
365    }
366
367    /// Write a bool value
368    pub fn write_bool(&mut self, value: bool) {
369        self.data.push(if value { 1 } else { 0 });
370        self.value_count += 1;
371    }
372
373    /// Write variable-length binary
374    pub fn write_var_binary(&mut self, value: &[u8]) {
375        self.data
376            .write_u32::<LittleEndian>(value.len() as u32)
377            .unwrap();
378        self.data.extend_from_slice(value);
379        self.value_count += 1;
380    }
381
382    /// Read i64 at index
383    pub fn read_i64(&self, idx: usize) -> Option<i64> {
384        let offset = idx * 8;
385        if offset + 8 > self.data.len() {
386            return None;
387        }
388        let bytes: [u8; 8] = self.data[offset..offset + 8].try_into().ok()?;
389        Some(i64::from_le_bytes(bytes))
390    }
391
392    /// Read i32 at index
393    pub fn read_i32(&self, idx: usize) -> Option<i32> {
394        let offset = idx * 4;
395        if offset + 4 > self.data.len() {
396            return None;
397        }
398        let bytes: [u8; 4] = self.data[offset..offset + 4].try_into().ok()?;
399        Some(i32::from_le_bytes(bytes))
400    }
401
402    /// Read f64 at index
403    pub fn read_f64(&self, idx: usize) -> Option<f64> {
404        let offset = idx * 8;
405        if offset + 8 > self.data.len() {
406            return None;
407        }
408        let bytes: [u8; 8] = self.data[offset..offset + 8].try_into().ok()?;
409        Some(f64::from_le_bytes(bytes))
410    }
411
412    /// Read f32 at index
413    pub fn read_f32(&self, idx: usize) -> Option<f32> {
414        let offset = idx * 4;
415        if offset + 4 > self.data.len() {
416            return None;
417        }
418        let bytes: [u8; 4] = self.data[offset..offset + 4].try_into().ok()?;
419        Some(f32::from_le_bytes(bytes))
420    }
421
422    /// Read bool at index
423    pub fn read_bool(&self, idx: usize) -> Option<bool> {
424        self.data.get(idx).map(|&v| v != 0)
425    }
426
427    /// Get raw slice for SIMD operations
428    pub fn as_i64_slice(&self) -> &[i64] {
429        // Safety: Data is aligned and written as i64
430        let ptr = self.data.as_ptr() as *const i64;
431        let len = self.data.len() / 8;
432        unsafe { std::slice::from_raw_parts(ptr, len) }
433    }
434
435    /// Get raw slice for SIMD operations
436    pub fn as_f64_slice(&self) -> &[f64] {
437        let ptr = self.data.as_ptr() as *const f64;
438        let len = self.data.len() / 8;
439        unsafe { std::slice::from_raw_parts(ptr, len) }
440    }
441
442    /// Get raw slice for SIMD operations
443    pub fn as_i32_slice(&self) -> &[i32] {
444        let ptr = self.data.as_ptr() as *const i32;
445        let len = self.data.len() / 4;
446        unsafe { std::slice::from_raw_parts(ptr, len) }
447    }
448
449    /// Get raw slice for SIMD operations
450    pub fn as_f32_slice(&self) -> &[f32] {
451        let ptr = self.data.as_ptr() as *const f32;
452        let len = self.data.len() / 4;
453        unsafe { std::slice::from_raw_parts(ptr, len) }
454    }
455}
456
457/// PAX block writer
458pub struct PaxBlockWriter {
459    schema: PaxSchema,
460    null_bitmap: Vec<u8>,
461    minipages: Vec<Minipage>,
462    row_count: usize,
463    max_rows: usize,
464}
465
466impl PaxBlockWriter {
467    pub fn new(schema: PaxSchema, max_rows: usize) -> Self {
468        let col_count = schema.column_count();
469        let null_bitmap_size = (max_rows * col_count + 7) / 8;
470
471        let minipages = schema
472            .columns
473            .iter()
474            .enumerate()
475            .map(|(i, col)| Minipage::new(i, col.col_type, max_rows))
476            .collect();
477
478        Self {
479            schema,
480            null_bitmap: vec![0; null_bitmap_size],
481            minipages,
482            row_count: 0,
483            max_rows,
484        }
485    }
486
487    /// Check if block is full
488    pub fn is_full(&self) -> bool {
489        self.row_count >= self.max_rows
490    }
491
492    /// Get current row count
493    pub fn row_count(&self) -> usize {
494        self.row_count
495    }
496
497    /// Mark a cell as null
498    fn set_null(&mut self, row: usize, col: usize) {
499        let bit_idx = row * self.schema.column_count() + col;
500        let byte_idx = bit_idx / 8;
501        let bit_pos = bit_idx % 8;
502        if byte_idx < self.null_bitmap.len() {
503            self.null_bitmap[byte_idx] |= 1 << bit_pos;
504        }
505    }
506
507    /// Start writing a new row
508    pub fn start_row(&mut self) -> PaxRowWriter<'_> {
509        PaxRowWriter {
510            block: self,
511            col_idx: 0,
512        }
513    }
514
515    /// Finish and build the block
516    pub fn finish(self) -> PaxBlock {
517        let mut header = PaxBlockHeader::new(self.row_count as u32, self.schema.column_count());
518
519        // Calculate offsets
520        let header_size = header.total_size();
521        let null_bitmap_size = (self.row_count * self.schema.column_count() + 7) / 8;
522
523        header.null_bitmap_offset = header_size as u32;
524        header.null_bitmap_size = null_bitmap_size as u32;
525
526        let mut offset = header_size + null_bitmap_size;
527        for (i, mp) in self.minipages.iter().enumerate() {
528            header.minipage_offsets[i] = offset as u32;
529            header.minipage_sizes[i] = mp.data.len() as u32;
530            offset += mp.data.len();
531        }
532
533        PaxBlock {
534            header,
535            null_bitmap: self.null_bitmap[..null_bitmap_size].to_vec(),
536            minipages: self.minipages,
537            schema: self.schema,
538        }
539    }
540}
541
542/// Row writer for PAX blocks
543pub struct PaxRowWriter<'a> {
544    block: &'a mut PaxBlockWriter,
545    col_idx: usize,
546}
547
548impl<'a> PaxRowWriter<'a> {
549    /// Write null value
550    pub fn write_null(mut self) -> Self {
551        self.block.set_null(self.block.row_count, self.col_idx);
552        // Write a zero placeholder
553        match self.block.schema.columns[self.col_idx].col_type {
554            PaxColumnType::Bool => self.block.minipages[self.col_idx].write_bool(false),
555            PaxColumnType::Int8 => self.block.minipages[self.col_idx].data.push(0),
556            PaxColumnType::Int16 => self.block.minipages[self.col_idx]
557                .data
558                .extend_from_slice(&[0; 2]),
559            PaxColumnType::Int32 | PaxColumnType::Float32 => {
560                self.block.minipages[self.col_idx]
561                    .data
562                    .extend_from_slice(&[0; 4]);
563            }
564            PaxColumnType::Int64 | PaxColumnType::Float64 => {
565                self.block.minipages[self.col_idx]
566                    .data
567                    .extend_from_slice(&[0; 8]);
568            }
569            PaxColumnType::VarBinary => {
570                self.block.minipages[self.col_idx].write_var_binary(&[]);
571            }
572            PaxColumnType::FixedBinary => {
573                let size = self.block.schema.columns[self.col_idx]
574                    .fixed_size
575                    .unwrap_or(0) as usize;
576                self.block.minipages[self.col_idx]
577                    .data
578                    .extend(std::iter::repeat(0).take(size));
579            }
580        }
581        self.block.minipages[self.col_idx].value_count += 1;
582        self.col_idx += 1;
583        self
584    }
585
586    /// Write i64 value
587    pub fn write_i64(mut self, value: i64) -> Self {
588        self.block.minipages[self.col_idx].write_i64(value);
589        self.col_idx += 1;
590        self
591    }
592
593    /// Write i32 value
594    pub fn write_i32(mut self, value: i32) -> Self {
595        self.block.minipages[self.col_idx].write_i32(value);
596        self.col_idx += 1;
597        self
598    }
599
600    /// Write f64 value
601    pub fn write_f64(mut self, value: f64) -> Self {
602        self.block.minipages[self.col_idx].write_f64(value);
603        self.col_idx += 1;
604        self
605    }
606
607    /// Write f32 value
608    pub fn write_f32(mut self, value: f32) -> Self {
609        self.block.minipages[self.col_idx].write_f32(value);
610        self.col_idx += 1;
611        self
612    }
613
614    /// Write bool value
615    pub fn write_bool(mut self, value: bool) -> Self {
616        self.block.minipages[self.col_idx].write_bool(value);
617        self.col_idx += 1;
618        self
619    }
620
621    /// Write variable-length binary
622    pub fn write_bytes(mut self, value: &[u8]) -> Self {
623        self.block.minipages[self.col_idx].write_var_binary(value);
624        self.col_idx += 1;
625        self
626    }
627
628    /// Write string
629    pub fn write_string(self, value: &str) -> Self {
630        self.write_bytes(value.as_bytes())
631    }
632
633    /// Finish the row
634    pub fn finish(self) {
635        self.block.row_count += 1;
636    }
637}
638
639/// Complete PAX block
640#[derive(Debug)]
641pub struct PaxBlock {
642    pub header: PaxBlockHeader,
643    pub null_bitmap: Vec<u8>,
644    pub minipages: Vec<Minipage>,
645    pub schema: PaxSchema,
646}
647
648impl PaxBlock {
649    /// Get row count
650    pub fn row_count(&self) -> usize {
651        self.header.row_count as usize
652    }
653
654    /// Check if a cell is null
655    pub fn is_null(&self, row: usize, col: usize) -> bool {
656        let bit_idx = row * self.schema.column_count() + col;
657        let byte_idx = bit_idx / 8;
658        let bit_pos = bit_idx % 8;
659        if byte_idx >= self.null_bitmap.len() {
660            return false;
661        }
662        self.null_bitmap[byte_idx] & (1 << bit_pos) != 0
663    }
664
665    /// Get a column minipage for columnar access
666    pub fn get_column(&self, col: usize) -> Option<&Minipage> {
667        self.minipages.get(col)
668    }
669
670    /// Read i64 from specific row/column
671    pub fn read_i64(&self, row: usize, col: usize) -> Option<i64> {
672        if self.is_null(row, col) {
673            return None;
674        }
675        self.minipages.get(col)?.read_i64(row)
676    }
677
678    /// Read f64 from specific row/column
679    pub fn read_f64(&self, row: usize, col: usize) -> Option<f64> {
680        if self.is_null(row, col) {
681            return None;
682        }
683        self.minipages.get(col)?.read_f64(row)
684    }
685
686    /// Read i32 from specific row/column
687    pub fn read_i32(&self, row: usize, col: usize) -> Option<i32> {
688        if self.is_null(row, col) {
689            return None;
690        }
691        self.minipages.get(col)?.read_i32(row)
692    }
693
694    /// Read bool from specific row/column
695    pub fn read_bool(&self, row: usize, col: usize) -> Option<bool> {
696        if self.is_null(row, col) {
697            return None;
698        }
699        self.minipages.get(col)?.read_bool(row)
700    }
701
702    /// Serialize block to bytes
703    pub fn to_bytes(&self) -> Vec<u8> {
704        let mut buffer = Vec::new();
705        self.header.write(&mut buffer).unwrap();
706        buffer.extend_from_slice(&self.null_bitmap);
707        for mp in &self.minipages {
708            buffer.extend_from_slice(&mp.data);
709        }
710        buffer
711    }
712
713    /// Get total size in bytes
714    pub fn size_bytes(&self) -> usize {
715        self.header.total_size()
716            + self.null_bitmap.len()
717            + self.minipages.iter().map(|m| m.data.len()).sum::<usize>()
718    }
719}
720
721/// Column projection for selective reading
722#[derive(Debug, Clone)]
723pub struct ColumnProjection {
724    /// Indices of columns to read
725    columns: Vec<usize>,
726}
727
728impl ColumnProjection {
729    pub fn new(columns: Vec<usize>) -> Self {
730        Self { columns }
731    }
732
733    /// Create projection for all columns
734    pub fn all(column_count: usize) -> Self {
735        Self {
736            columns: (0..column_count).collect(),
737        }
738    }
739
740    /// Get projected column indices
741    pub fn columns(&self) -> &[usize] {
742        &self.columns
743    }
744
745    /// Calculate bandwidth savings ratio
746    ///
747    /// Returns N/k where N = total columns, k = selected columns
748    pub fn bandwidth_savings(&self, total_columns: usize) -> f64 {
749        if self.columns.is_empty() {
750            return 1.0;
751        }
752        total_columns as f64 / self.columns.len() as f64
753    }
754}
755
756/// Iterator over PAX block rows with column projection
757pub struct PaxBlockIterator<'a> {
758    block: &'a PaxBlock,
759    projection: ColumnProjection,
760    current_row: usize,
761}
762
763impl<'a> PaxBlockIterator<'a> {
764    pub fn new(block: &'a PaxBlock, projection: ColumnProjection) -> Self {
765        Self {
766            block,
767            projection,
768            current_row: 0,
769        }
770    }
771
772    /// Get the next row as a view
773    pub fn next_row(&mut self) -> Option<PaxRowViewOwned> {
774        if self.current_row >= self.block.row_count() {
775            return None;
776        }
777
778        let row = PaxRowViewOwned {
779            row_idx: self.current_row,
780            projection: self.projection.clone(),
781        };
782
783        self.current_row += 1;
784        Some(row)
785    }
786}
787
788impl<'a> Iterator for PaxBlockIterator<'a> {
789    type Item = usize; // Returns row index
790
791    fn next(&mut self) -> Option<Self::Item> {
792        if self.current_row >= self.block.row_count() {
793            return None;
794        }
795
796        let row_idx = self.current_row;
797        self.current_row += 1;
798        Some(row_idx)
799    }
800}
801
802/// Owned row view data (no lifetime issues)
803#[derive(Debug, Clone)]
804pub struct PaxRowViewOwned {
805    pub row_idx: usize,
806    pub projection: ColumnProjection,
807}
808
809/// Zero-allocation row view for PAX block
810pub struct PaxRowView<'a> {
811    block: &'a PaxBlock,
812    row_idx: usize,
813    projection: &'a ColumnProjection,
814}
815
816impl<'a> PaxRowView<'a> {
817    /// Get row index
818    pub fn row_index(&self) -> usize {
819        self.row_idx
820    }
821
822    /// Check if projected column is null
823    pub fn is_null(&self, proj_idx: usize) -> bool {
824        let col = self.projection.columns.get(proj_idx).copied().unwrap_or(0);
825        self.block.is_null(self.row_idx, col)
826    }
827
828    /// Read i64 from projected column
829    pub fn read_i64(&self, proj_idx: usize) -> Option<i64> {
830        let col = *self.projection.columns.get(proj_idx)?;
831        self.block.read_i64(self.row_idx, col)
832    }
833
834    /// Read f64 from projected column
835    pub fn read_f64(&self, proj_idx: usize) -> Option<f64> {
836        let col = *self.projection.columns.get(proj_idx)?;
837        self.block.read_f64(self.row_idx, col)
838    }
839}
840
841#[cfg(test)]
842mod tests {
843    use super::*;
844
845    #[test]
846    fn test_pax_block_write_read() {
847        let schema = PaxSchema::new(vec![
848            PaxColumnDef::new("id", PaxColumnType::Int64),
849            PaxColumnDef::new("value", PaxColumnType::Float64),
850            PaxColumnDef::new("flag", PaxColumnType::Bool),
851        ]);
852
853        let mut writer = PaxBlockWriter::new(schema.clone(), 100);
854
855        // Write some rows
856        for i in 0..10 {
857            writer
858                .start_row()
859                .write_i64(i)
860                .write_f64(i as f64 * 1.5)
861                .write_bool(i % 2 == 0)
862                .finish();
863        }
864
865        let block = writer.finish();
866        assert_eq!(block.row_count(), 10);
867
868        // Read back
869        assert_eq!(block.read_i64(0, 0), Some(0));
870        assert_eq!(block.read_f64(0, 1), Some(0.0));
871        assert_eq!(block.read_bool(0, 2), Some(true));
872
873        assert_eq!(block.read_i64(5, 0), Some(5));
874        assert_eq!(block.read_f64(5, 1), Some(7.5));
875        assert_eq!(block.read_bool(5, 2), Some(false));
876    }
877
878    #[test]
879    fn test_column_projection() {
880        let schema = PaxSchema::new(vec![
881            PaxColumnDef::new("a", PaxColumnType::Int64),
882            PaxColumnDef::new("b", PaxColumnType::Int64),
883            PaxColumnDef::new("c", PaxColumnType::Int64),
884            PaxColumnDef::new("d", PaxColumnType::Int64),
885        ]);
886
887        let mut writer = PaxBlockWriter::new(schema.clone(), 100);
888        for i in 0..5 {
889            writer
890                .start_row()
891                .write_i64(i)
892                .write_i64(i * 10)
893                .write_i64(i * 100)
894                .write_i64(i * 1000)
895                .finish();
896        }
897
898        let block = writer.finish();
899
900        // Project only columns 0 and 2
901        let projection = ColumnProjection::new(vec![0, 2]);
902        assert_eq!(projection.bandwidth_savings(4), 2.0);
903
904        let mut iter = PaxBlockIterator::new(&block, projection);
905
906        let row_idx = iter.next().unwrap();
907        // Read from the block using the row index and original column indices
908        // Projection maps columns [0, 2] so we read original columns 0 and 2
909        assert_eq!(block.read_i64(row_idx, 0), Some(0)); // original column 0
910        assert_eq!(block.read_i64(row_idx, 2), Some(0)); // original column 2
911
912        let row_idx = iter.next().unwrap();
913        assert_eq!(block.read_i64(row_idx, 0), Some(1)); // original column 0
914        assert_eq!(block.read_i64(row_idx, 2), Some(100)); // original column 2
915    }
916
917    #[test]
918    fn test_null_handling() {
919        let schema = PaxSchema::new(vec![
920            PaxColumnDef::new("id", PaxColumnType::Int64).not_null(),
921            PaxColumnDef::new("value", PaxColumnType::Float64),
922        ]);
923
924        let mut writer = PaxBlockWriter::new(schema.clone(), 100);
925
926        writer.start_row().write_i64(1).write_f64(1.0).finish();
927        writer.start_row().write_i64(2).write_null().finish();
928        writer.start_row().write_i64(3).write_f64(3.0).finish();
929
930        let block = writer.finish();
931
932        assert!(!block.is_null(0, 0));
933        assert!(!block.is_null(0, 1));
934        assert!(!block.is_null(1, 0));
935        assert!(block.is_null(1, 1));
936        assert!(!block.is_null(2, 1));
937
938        assert_eq!(block.read_f64(0, 1), Some(1.0));
939        assert_eq!(block.read_f64(1, 1), None);
940        assert_eq!(block.read_f64(2, 1), Some(3.0));
941    }
942
943    #[test]
944    fn test_columnar_access() {
945        let schema = PaxSchema::new(vec![PaxColumnDef::new("id", PaxColumnType::Int64)]);
946
947        let mut writer = PaxBlockWriter::new(schema.clone(), 1000);
948        for i in 0..100 {
949            writer.start_row().write_i64(i).finish();
950        }
951
952        let block = writer.finish();
953
954        // Get column minipage for SIMD-friendly access
955        let col = block.get_column(0).unwrap();
956        let slice = col.as_i64_slice();
957
958        assert_eq!(slice.len(), 100);
959        assert_eq!(slice[0], 0);
960        assert_eq!(slice[50], 50);
961        assert_eq!(slice[99], 99);
962    }
963}