use crate::delete_bitmap::DeleteBitmap;
use crate::error::ColumnarError;
use crate::format::{HEADER_SIZE, SegmentFooter, SegmentHeader};
use crate::predicate::ScanPredicate;
use super::block_decode::{
append_null_fill, decode_block, empty_decoded, infer_column_type, result_valid_len,
result_valid_slice_mut,
};
use super::types::DecodedColumn;
pub struct SegmentReader<'a> {
pub(super) data: &'a [u8],
pub(super) footer: SegmentFooter,
}
#[derive(Debug)]
pub struct OwnedSegmentReader {
plaintext: Vec<u8>,
footer: SegmentFooter,
}
impl OwnedSegmentReader {
fn from_plaintext(plaintext: Vec<u8>) -> Result<Self, ColumnarError> {
SegmentHeader::from_bytes(&plaintext)?;
let footer = SegmentFooter::from_segment_tail(&plaintext)?;
Ok(Self { plaintext, footer })
}
pub fn open_with_kek(
blob: &[u8],
kek: Option<&nodedb_wal::crypto::WalEncryptionKey>,
) -> Result<Self, ColumnarError> {
let is_encrypted = blob.len() >= 4 && blob[0..4] == crate::encrypt::SEGC_MAGIC;
if is_encrypted {
let key = kek.ok_or(ColumnarError::MissingKek)?;
let plaintext = crate::encrypt::decrypt_segment(key, blob)?;
Self::from_plaintext(plaintext)
} else if kek.is_some() {
Err(ColumnarError::KekRequired)
} else {
Self::from_plaintext(blob.to_vec())
}
}
pub fn reader(&self) -> SegmentReader<'_> {
SegmentReader {
data: &self.plaintext,
footer: self.footer.clone(),
}
}
pub fn footer(&self) -> &SegmentFooter {
&self.footer
}
pub fn row_count(&self) -> u64 {
self.footer.row_count
}
}
impl<'a> SegmentReader<'a> {
pub fn open(data: &'a [u8]) -> Result<Self, ColumnarError> {
if data.len() >= 4 && data[0..4] == crate::encrypt::SEGC_MAGIC {
return Err(ColumnarError::MissingKek);
}
SegmentHeader::from_bytes(data)?;
let footer = SegmentFooter::from_segment_tail(data)?;
Ok(Self { data, footer })
}
pub fn footer(&self) -> &SegmentFooter {
&self.footer
}
pub fn row_count(&self) -> u64 {
self.footer.row_count
}
pub fn column_count(&self) -> usize {
self.footer.column_count as usize
}
pub fn read_column(&self, col_idx: usize) -> Result<DecodedColumn, ColumnarError> {
self.read_column_filtered(col_idx, &[])
}
pub fn read_column_filtered(
&self,
col_idx: usize,
predicates: &[ScanPredicate],
) -> Result<DecodedColumn, ColumnarError> {
self.read_column_impl(col_idx, predicates, &DeleteBitmap::new())
}
pub fn read_columns(
&self,
col_indices: &[usize],
predicates: &[ScanPredicate],
) -> Result<Vec<DecodedColumn>, ColumnarError> {
col_indices
.iter()
.map(|&idx| self.read_column_filtered(idx, predicates))
.collect()
}
pub fn read_column_with_deletes(
&self,
col_idx: usize,
predicates: &[ScanPredicate],
deletes: &DeleteBitmap,
) -> Result<DecodedColumn, ColumnarError> {
self.read_column_impl(col_idx, predicates, deletes)
}
pub fn read_columns_with_deletes(
&self,
col_indices: &[usize],
predicates: &[ScanPredicate],
deletes: &DeleteBitmap,
) -> Result<Vec<DecodedColumn>, ColumnarError> {
col_indices
.iter()
.map(|&idx| self.read_column_with_deletes(idx, predicates, deletes))
.collect()
}
fn read_column_impl(
&self,
col_idx: usize,
predicates: &[ScanPredicate],
deletes: &DeleteBitmap,
) -> Result<DecodedColumn, ColumnarError> {
if col_idx >= self.footer.columns.len() {
return Err(ColumnarError::ColumnOutOfRange {
index: col_idx,
count: self.footer.columns.len(),
});
}
let col_meta = &self.footer.columns[col_idx];
let my_preds: Vec<&ScanPredicate> =
predicates.iter().filter(|p| p.col_idx == col_idx).collect();
let col_start = HEADER_SIZE + col_meta.offset as usize;
let mut cursor = col_start;
let col_type = infer_column_type(col_meta);
let mut result = empty_decoded(&col_type);
let mut global_row: u32 = 0;
for block_stat in &col_meta.block_stats {
let block_row_count = block_stat.row_count;
if cursor + 4 > self.data.len() {
return Err(ColumnarError::TruncatedSegment {
expected: cursor + 4,
got: self.data.len(),
});
}
let block_len = u32::from_le_bytes([
self.data[cursor],
self.data[cursor + 1],
self.data[cursor + 2],
self.data[cursor + 3],
]) as usize;
cursor += 4;
let block_data = &self.data[cursor..cursor + block_len];
cursor += block_len;
let pred_skip = my_preds.iter().any(|p| p.can_skip_block(block_stat));
let delete_skip =
!deletes.is_empty() && deletes.is_block_fully_deleted(global_row, block_row_count);
if pred_skip || delete_skip {
append_null_fill(&mut result, block_row_count as usize);
global_row += block_row_count;
continue;
}
let pre_len = result_valid_len(&result);
decode_block(
&mut result,
block_data,
&col_type,
col_meta.codec,
block_row_count as usize,
col_meta.dictionary.as_deref(),
)?;
if !deletes.is_empty() {
let valid_slice = result_valid_slice_mut(&mut result, pre_len);
deletes.apply_to_validity(valid_slice, global_row);
}
global_row += block_row_count;
}
Ok(result)
}
}