use crate::codecs::{read_u32_le, read_u64_le, write_u32_le, write_u64_le};
use crate::store::indexing::IndexKind;
use llkv_result::{Error, Result};
use llkv_storage::{
pager::{BatchGet, BatchPut, GetResult, Pager},
types::PhysicalKey,
};
use llkv_types::ids::LogicalFieldId;
use std::mem;
use std::sync::Arc;
#[derive(Clone, Copy, Debug, Default)]
#[repr(C)]
pub(crate) struct ChunkMetadata {
pub(crate) chunk_pk: PhysicalKey,
pub(crate) value_order_perm_pk: PhysicalKey, pub(crate) row_count: u64,
pub(crate) serialized_bytes: u64,
pub(crate) min_val_u64: u64, pub(crate) max_val_u64: u64,
}
impl ChunkMetadata {
pub(crate) const DISK_SIZE: usize = mem::size_of::<Self>();
pub(crate) fn to_le_bytes(self) -> Vec<u8> {
let mut buf = Vec::with_capacity(Self::DISK_SIZE);
write_u64_le(&mut buf, self.chunk_pk);
write_u64_le(&mut buf, self.value_order_perm_pk);
write_u64_le(&mut buf, self.row_count);
write_u64_le(&mut buf, self.serialized_bytes);
write_u64_le(&mut buf, self.min_val_u64);
write_u64_le(&mut buf, self.max_val_u64);
buf
}
pub(crate) fn from_le_bytes(bytes: &[u8]) -> Self {
let mut o = 0usize;
let chunk_pk = read_u64_le(bytes, &mut o);
let value_order_perm_pk = read_u64_le(bytes, &mut o);
let row_count = read_u64_le(bytes, &mut o);
let serialized_bytes = read_u64_le(bytes, &mut o);
let min_val_u64 = read_u64_le(bytes, &mut o);
let max_val_u64 = read_u64_le(bytes, &mut o);
Self {
chunk_pk,
value_order_perm_pk,
row_count,
serialized_bytes,
min_val_u64,
max_val_u64,
}
}
}
#[derive(Clone, Debug, Default)]
pub(crate) struct ColumnDescriptor {
pub(crate) field_id: LogicalFieldId,
pub(crate) head_page_pk: PhysicalKey,
pub(crate) tail_page_pk: PhysicalKey,
pub(crate) total_row_count: u64,
pub(crate) total_chunk_count: u64,
pub(crate) data_type_code: u32,
pub(crate) _padding: u32,
pub(crate) index_metadata: Vec<u8>,
}
impl ColumnDescriptor {
pub(crate) const FIXED_DISK_SIZE_WITHOUT_INDEX_META: usize = 48;
pub(crate) const FIXED_DISK_SIZE: usize = Self::FIXED_DISK_SIZE_WITHOUT_INDEX_META + 4;
pub(crate) fn load_or_create<P: Pager>(
pager: Arc<P>,
descriptor_pk: PhysicalKey,
field_id: LogicalFieldId,
) -> Result<(ColumnDescriptor, Vec<u8>)> {
match pager
.batch_get(&[BatchGet::Raw { key: descriptor_pk }])?
.pop()
{
Some(GetResult::Raw { bytes, .. }) => {
let descriptor = ColumnDescriptor::from_le_bytes(bytes.as_ref());
if descriptor.head_page_pk == 0 {
tracing::debug!(
?field_id,
descriptor_pk,
"load_or_create: descriptor exists but is empty (head_page_pk == 0), reinitializing"
);
let first_page_pk = pager.alloc_many(1)?[0];
let descriptor = ColumnDescriptor {
field_id,
head_page_pk: first_page_pk,
tail_page_pk: first_page_pk,
..descriptor };
let header = DescriptorPageHeader {
next_page_pk: 0,
entry_count: 0,
_padding: [0; 4],
};
let tail_page_bytes = header.to_le_bytes().to_vec();
return Ok((descriptor, tail_page_bytes));
}
let tail_page_bytes = pager
.batch_get(&[BatchGet::Raw {
key: descriptor.tail_page_pk,
}])?
.pop()
.and_then(|r| match r {
GetResult::Raw { bytes, .. } => Some(bytes),
_ => None,
})
.ok_or_else(|| {
tracing::error!(
descriptor_pk,
tail_page_pk = descriptor.tail_page_pk,
"load_or_create: missing tail page"
);
Error::NotFound
})?
.as_ref()
.to_vec();
Ok((descriptor, tail_page_bytes))
}
_ => {
let first_page_pk = pager.alloc_many(1)?[0];
let descriptor = ColumnDescriptor {
field_id,
head_page_pk: first_page_pk,
tail_page_pk: first_page_pk,
..Default::default()
};
let header = DescriptorPageHeader {
next_page_pk: 0,
entry_count: 0,
_padding: [0; 4],
};
let tail_page_bytes = header.to_le_bytes().to_vec();
Ok((descriptor, tail_page_bytes))
}
}
}
pub(crate) fn rewrite_pages<P: Pager>(
&mut self,
pager: Arc<P>,
descriptor_pk: PhysicalKey,
metas: &mut [ChunkMetadata],
puts: &mut Vec<BatchPut>,
) -> Result<()> {
let mut current_page_pk = self.head_page_pk;
let mut page_start_idx = 0usize;
while current_page_pk != 0 {
let page_blob = pager
.batch_get(&[BatchGet::Raw {
key: current_page_pk,
}])?
.pop()
.and_then(|res| match res {
GetResult::Raw { bytes, .. } => Some(bytes),
_ => None,
})
.ok_or(Error::NotFound)?
.as_ref()
.to_vec();
let header =
DescriptorPageHeader::from_le_bytes(&page_blob[..DescriptorPageHeader::DISK_SIZE]);
let n_on_page = header.entry_count as usize;
let end_idx = page_start_idx + n_on_page;
let mut new_page_data = Vec::new();
for m in &metas[page_start_idx..end_idx] {
new_page_data.extend_from_slice(&m.to_le_bytes());
}
let mut final_page_bytes =
Vec::with_capacity(DescriptorPageHeader::DISK_SIZE + new_page_data.len());
final_page_bytes.extend_from_slice(&header.to_le_bytes());
final_page_bytes.extend_from_slice(&new_page_data);
puts.push(BatchPut::Raw {
key: current_page_pk,
bytes: final_page_bytes,
});
current_page_pk = header.next_page_pk;
page_start_idx = end_idx;
}
let mut total_rows = 0u64;
for m in metas.iter() {
total_rows += m.row_count;
}
self.total_row_count = total_rows;
puts.push(BatchPut::Raw {
key: descriptor_pk,
bytes: self.to_le_bytes(),
});
Ok(())
}
pub(crate) fn to_le_bytes(&self) -> Vec<u8> {
let index_meta_len = self.index_metadata.len() as u32;
let mut buf = Vec::with_capacity(Self::FIXED_DISK_SIZE + self.index_metadata.len());
let field_id_u64: u64 = self.field_id.into();
write_u64_le(&mut buf, field_id_u64);
write_u64_le(&mut buf, self.head_page_pk);
write_u64_le(&mut buf, self.tail_page_pk);
write_u64_le(&mut buf, self.total_row_count);
write_u64_le(&mut buf, self.total_chunk_count);
write_u32_le(&mut buf, self.data_type_code);
write_u32_le(&mut buf, self._padding);
write_u32_le(&mut buf, index_meta_len);
buf.extend_from_slice(&self.index_metadata);
buf
}
pub(crate) fn from_le_bytes(bytes: &[u8]) -> Self {
let mut o = 0usize;
let field_id = LogicalFieldId::from(read_u64_le(bytes, &mut o));
let head_page_pk = read_u64_le(bytes, &mut o);
let tail_page_pk = read_u64_le(bytes, &mut o);
let total_row_count = read_u64_le(bytes, &mut o);
let total_chunk_count = read_u64_le(bytes, &mut o);
let (data_type_code, padding, index_meta_len) = if bytes.len() >= o + 12 {
let dtc = read_u32_le(bytes, &mut o);
let pad = read_u32_le(bytes, &mut o);
let iml = read_u32_le(bytes, &mut o) as usize;
(dtc, pad, iml)
} else {
(0, 0, 0)
};
let index_metadata = if index_meta_len > 0 && bytes.len() >= o + index_meta_len {
bytes[o..o + index_meta_len].to_vec()
} else {
Vec::new()
};
Self {
field_id,
head_page_pk,
tail_page_pk,
total_row_count,
total_chunk_count,
data_type_code,
_padding: padding,
index_metadata,
}
}
pub(crate) fn get_indexes(&self) -> Result<Vec<IndexKind>> {
if self.index_metadata.is_empty() {
return Ok(Vec::new());
}
let mut o = 0usize;
let data = &self.index_metadata;
if data.len() < 4 {
return Err(Error::Internal("Invalid index metadata: too short".into()));
}
let num_indexes = read_u32_le(data, &mut o) as usize;
let expected_len = 4 + num_indexes;
if data.len() < expected_len {
return Err(Error::Internal(
"Invalid index metadata: unexpected eof".into(),
));
}
let mut indexes = Vec::with_capacity(num_indexes);
for _ in 0..num_indexes {
let kind_u8 = data[o];
indexes.push(IndexKind::try_from(kind_u8)?);
o += 1;
}
Ok(indexes)
}
pub(crate) fn set_indexes(&mut self, indexes: &[IndexKind]) -> Result<()> {
let mut buf = Vec::with_capacity(4 + indexes.len());
write_u32_le(&mut buf, indexes.len() as u32);
for index in indexes {
buf.push(u8::from(*index));
}
self.index_metadata = buf;
Ok(())
}
}
#[derive(Clone, Copy, Debug)]
#[repr(C)]
pub struct DescriptorPageHeader {
pub(crate) next_page_pk: PhysicalKey, pub(crate) entry_count: u32,
pub(crate) _padding: [u8; 4],
}
impl DescriptorPageHeader {
pub(crate) const DISK_SIZE: usize = mem::size_of::<Self>();
pub(crate) fn to_le_bytes(self) -> [u8; Self::DISK_SIZE] {
let mut v = Vec::with_capacity(Self::DISK_SIZE);
write_u64_le(&mut v, self.next_page_pk);
write_u32_le(&mut v, self.entry_count);
if v.len() < Self::DISK_SIZE {
v.extend(std::iter::repeat_n(0u8, Self::DISK_SIZE - v.len()));
}
let mut buf = [0u8; Self::DISK_SIZE];
buf.copy_from_slice(&v);
buf
}
pub(crate) fn from_le_bytes(bytes: &[u8]) -> Self {
let mut o = 0usize;
let next_page_pk = read_u64_le(bytes, &mut o);
let entry_count = read_u32_le(bytes, &mut o);
Self {
next_page_pk,
entry_count,
_padding: [0; 4],
}
}
}
pub(crate) struct DescriptorIterator<'a, P: Pager> {
pager: &'a P,
current_page_pk: PhysicalKey,
current_blob: Option<P::Blob>,
cursor_in_page: usize,
}
impl<'a, P: Pager> DescriptorIterator<'a, P> {
pub(crate) fn new(pager: &'a P, head_page_pk: PhysicalKey) -> Self {
Self {
pager,
current_page_pk: head_page_pk,
current_blob: None,
cursor_in_page: 0,
}
}
}
impl<'a, P: Pager> Iterator for DescriptorIterator<'a, P> {
type Item = Result<ChunkMetadata>;
fn next(&mut self) -> Option<Self::Item> {
loop {
if self.current_blob.is_none() {
if self.current_page_pk == 0 {
return None; }
match self.pager.batch_get(&[BatchGet::Raw {
key: self.current_page_pk,
}]) {
Ok(mut results) => match results.pop() {
Some(GetResult::Raw { bytes, .. }) => {
self.current_blob = Some(bytes);
self.cursor_in_page = 0;
}
Some(GetResult::Missing { .. }) => {
return Some(Err(Error::NotFound));
}
None => return Some(Err(Error::Internal("batch_get empty result".into()))),
},
Err(e) => return Some(Err(e)),
}
}
let blob_bytes = self.current_blob.as_ref().unwrap().as_ref();
let hdr_sz = DescriptorPageHeader::DISK_SIZE;
let header = DescriptorPageHeader::from_le_bytes(&blob_bytes[..hdr_sz]);
if self.cursor_in_page < header.entry_count as usize {
let off = hdr_sz + self.cursor_in_page * ChunkMetadata::DISK_SIZE;
let end = off + ChunkMetadata::DISK_SIZE;
let entry_bytes = &blob_bytes[off..end];
let metadata = ChunkMetadata::from_le_bytes(entry_bytes);
self.cursor_in_page += 1;
return Some(Ok(metadata));
} else {
self.current_page_pk = header.next_page_pk;
self.current_blob = None;
}
}
}
}