pub const SLOT_ROW_HEADER_SIZE: usize = 28;
pub const MAX_SLOT_COLUMNS: usize = 16;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[repr(u8)]
pub enum SlotRowFlags {
Normal = 0,
Deleted = 1,
Uncommitted = 2,
Compressed = 4,
}
impl SlotRowFlags {
pub fn is_deleted(&self) -> bool {
(*self as u8) & 1 != 0
}
pub fn is_uncommitted(&self) -> bool {
(*self as u8) & 2 != 0
}
pub fn is_compressed(&self) -> bool {
(*self as u8) & 4 != 0
}
}
#[derive(Debug, Clone, Copy)]
#[repr(C, packed)]
pub struct Slot {
pub offset: u16,
pub len: u16,
}
impl Slot {
pub const SIZE: usize = 4;
#[inline]
pub fn new(offset: u16, len: u16) -> Self {
Self { offset, len }
}
#[inline]
pub fn is_null(&self) -> bool {
self.len == 0 && self.offset == 0xFFFF
}
#[inline]
pub fn null() -> Self {
Self { offset: 0xFFFF, len: 0 }
}
}
#[repr(C)]
pub struct SlotRow {
row_id: u64,
null_bitmap: u16,
slot_count: u8,
flags: u8,
txn_start: u64,
txn_end: u64,
storage: Vec<u8>,
}
impl SlotRow {
pub fn new(row_id: u64, slot_count: u8) -> Self {
assert!((slot_count as usize) <= MAX_SLOT_COLUMNS);
let slots_size = (slot_count as usize) * Slot::SIZE;
Self {
row_id,
null_bitmap: 0,
slot_count,
flags: SlotRowFlags::Normal as u8,
txn_start: 0,
txn_end: u64::MAX,
storage: vec![0u8; slots_size],
}
}
pub fn from_values(row_id: u64, values: &[Option<&[u8]>]) -> Self {
let slot_count = values.len().min(MAX_SLOT_COLUMNS) as u8;
let slots_size = (slot_count as usize) * Slot::SIZE;
let data_size: usize = values.iter()
.map(|v| v.map(|b| b.len()).unwrap_or(0))
.sum();
let mut storage = vec![0u8; slots_size + data_size];
let mut null_bitmap = 0u16;
let mut data_offset = 0u16;
for (i, value) in values.iter().enumerate() {
let slot = match value {
Some(data) => {
let slot = Slot::new(data_offset, data.len() as u16);
let data_start = slots_size + data_offset as usize;
storage[data_start..data_start + data.len()].copy_from_slice(data);
data_offset += data.len() as u16;
slot
}
None => {
null_bitmap |= 1 << i;
Slot::null()
}
};
let slot_start = i * Slot::SIZE;
storage[slot_start..slot_start + 2].copy_from_slice(&slot.offset.to_le_bytes());
storage[slot_start + 2..slot_start + 4].copy_from_slice(&slot.len.to_le_bytes());
}
Self {
row_id,
null_bitmap,
slot_count,
flags: SlotRowFlags::Normal as u8,
txn_start: 0,
txn_end: u64::MAX,
storage,
}
}
#[inline]
pub fn row_id(&self) -> u64 {
self.row_id
}
#[inline]
pub fn column_count(&self) -> usize {
self.slot_count as usize
}
#[inline]
pub fn is_null(&self, column_idx: usize) -> bool {
if column_idx >= self.slot_count as usize {
return true;
}
(self.null_bitmap & (1 << column_idx)) != 0
}
#[inline]
fn get_slot(&self, column_idx: usize) -> Option<Slot> {
if column_idx >= self.slot_count as usize {
return None;
}
let slot_start = column_idx * Slot::SIZE;
if slot_start + Slot::SIZE > self.storage.len() {
return None;
}
let offset = u16::from_le_bytes([
self.storage[slot_start],
self.storage[slot_start + 1],
]);
let len = u16::from_le_bytes([
self.storage[slot_start + 2],
self.storage[slot_start + 3],
]);
Some(Slot { offset, len })
}
#[inline]
pub fn get_bytes(&self, column_idx: usize) -> Option<&[u8]> {
if self.is_null(column_idx) {
return None;
}
let slot = self.get_slot(column_idx)?;
if slot.is_null() {
return None;
}
let slots_size = (self.slot_count as usize) * Slot::SIZE;
let data_start = slots_size + slot.offset as usize;
let data_end = data_start + slot.len as usize;
if data_end > self.storage.len() {
return None;
}
Some(&self.storage[data_start..data_end])
}
#[inline]
pub fn get_i64(&self, column_idx: usize) -> Option<i64> {
let bytes = self.get_bytes(column_idx)?;
if bytes.len() != 8 {
return None;
}
Some(i64::from_le_bytes(bytes.try_into().ok()?))
}
#[inline]
pub fn get_u64(&self, column_idx: usize) -> Option<u64> {
let bytes = self.get_bytes(column_idx)?;
if bytes.len() != 8 {
return None;
}
Some(u64::from_le_bytes(bytes.try_into().ok()?))
}
#[inline]
pub fn get_f64(&self, column_idx: usize) -> Option<f64> {
let bytes = self.get_bytes(column_idx)?;
if bytes.len() != 8 {
return None;
}
Some(f64::from_le_bytes(bytes.try_into().ok()?))
}
#[inline]
pub fn get_bool(&self, column_idx: usize) -> Option<bool> {
let bytes = self.get_bytes(column_idx)?;
if bytes.is_empty() {
return None;
}
Some(bytes[0] != 0)
}
#[inline]
pub fn get_str(&self, column_idx: usize) -> Option<&str> {
let bytes = self.get_bytes(column_idx)?;
std::str::from_utf8(bytes).ok()
}
pub fn set_mvcc(&mut self, txn_start: u64, txn_end: u64) {
self.txn_start = txn_start;
self.txn_end = txn_end;
}
#[inline]
pub fn txn_start(&self) -> u64 {
self.txn_start
}
#[inline]
pub fn txn_end(&self) -> u64 {
self.txn_end
}
#[inline]
pub fn is_visible_at(&self, snapshot_ts: u64) -> bool {
self.txn_start < snapshot_ts && snapshot_ts <= self.txn_end
}
pub fn set_deleted(&mut self) {
self.flags |= SlotRowFlags::Deleted as u8;
}
#[inline]
pub fn is_deleted(&self) -> bool {
(self.flags & SlotRowFlags::Deleted as u8) != 0
}
pub fn memory_size(&self) -> usize {
SLOT_ROW_HEADER_SIZE + self.storage.len()
}
pub fn to_bytes(&self) -> Vec<u8> {
let total_size = SLOT_ROW_HEADER_SIZE + self.storage.len();
let mut buf = Vec::with_capacity(total_size);
buf.extend_from_slice(&self.row_id.to_le_bytes());
buf.extend_from_slice(&self.null_bitmap.to_le_bytes());
buf.push(self.slot_count);
buf.push(self.flags);
buf.extend_from_slice(&self.txn_start.to_le_bytes());
buf.extend_from_slice(&self.txn_end.to_le_bytes());
buf.extend_from_slice(&self.storage);
buf
}
pub fn from_bytes(data: &[u8]) -> Option<Self> {
if data.len() < SLOT_ROW_HEADER_SIZE {
return None;
}
let row_id = u64::from_le_bytes(data[0..8].try_into().ok()?);
let null_bitmap = u16::from_le_bytes(data[8..10].try_into().ok()?);
let slot_count = data[10];
let flags = data[11];
let txn_start = u64::from_le_bytes(data[12..20].try_into().ok()?);
let txn_end = u64::from_le_bytes(data[20..28].try_into().ok()?);
let storage = data[28..].to_vec();
Some(Self {
row_id,
null_bitmap,
slot_count,
flags,
txn_start,
txn_end,
storage,
})
}
}
impl std::fmt::Debug for SlotRow {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("SlotRow")
.field("row_id", &self.row_id)
.field("slot_count", &self.slot_count)
.field("null_bitmap", &format!("{:016b}", self.null_bitmap))
.field("flags", &self.flags)
.field("txn_start", &self.txn_start)
.field("txn_end", &self.txn_end)
.field("storage_len", &self.storage.len())
.finish()
}
}
pub struct SlotRowArena {
blocks: Vec<Vec<u8>>,
current_block: usize,
current_offset: usize,
block_size: usize,
total_allocated: usize,
}
impl SlotRowArena {
pub const DEFAULT_BLOCK_SIZE: usize = 64 * 1024;
pub fn new() -> Self {
Self::with_block_size(Self::DEFAULT_BLOCK_SIZE)
}
pub fn with_block_size(block_size: usize) -> Self {
Self {
blocks: vec![vec![0u8; block_size]],
current_block: 0,
current_offset: 0,
block_size,
total_allocated: 0,
}
}
pub fn allocate(&mut self, size: usize) -> &mut [u8] {
if self.current_offset + size > self.block_size {
self.blocks.push(vec![0u8; self.block_size.max(size)]);
self.current_block = self.blocks.len() - 1;
self.current_offset = 0;
}
let start = self.current_offset;
self.current_offset += size;
self.total_allocated += size;
&mut self.blocks[self.current_block][start..start + size]
}
pub fn store(&mut self, row: &SlotRow) -> SlotRowHandle {
let bytes = row.to_bytes();
let slot = self.allocate(bytes.len());
slot.copy_from_slice(&bytes);
SlotRowHandle {
block_idx: self.current_block,
offset: self.current_offset - bytes.len(),
len: bytes.len(),
}
}
pub fn get(&self, handle: &SlotRowHandle) -> Option<SlotRow> {
let block = self.blocks.get(handle.block_idx)?;
let data = block.get(handle.offset..handle.offset + handle.len)?;
SlotRow::from_bytes(data)
}
pub fn total_allocated(&self) -> usize {
self.total_allocated
}
pub fn block_count(&self) -> usize {
self.blocks.len()
}
pub fn reset(&mut self) {
self.current_block = 0;
self.current_offset = 0;
self.total_allocated = 0;
}
}
impl Default for SlotRowArena {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone, Copy)]
pub struct SlotRowHandle {
block_idx: usize,
offset: usize,
len: usize,
}
impl SlotRowHandle {
pub fn len(&self) -> usize {
self.len
}
pub fn is_empty(&self) -> bool {
self.len == 0
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_slot_row_basic() {
let row = SlotRow::from_values(1, &[
Some(b"hello"),
Some(&42i64.to_le_bytes()),
None,
Some(b"world"),
]);
assert_eq!(row.row_id(), 1);
assert_eq!(row.column_count(), 4);
assert!(!row.is_null(0));
assert!(!row.is_null(1));
assert!(row.is_null(2));
assert!(!row.is_null(3));
}
#[test]
fn test_slot_row_get_bytes() {
let row = SlotRow::from_values(1, &[
Some(b"hello"),
Some(b"world"),
]);
assert_eq!(row.get_bytes(0), Some(b"hello".as_slice()));
assert_eq!(row.get_bytes(1), Some(b"world".as_slice()));
assert_eq!(row.get_bytes(2), None);
}
#[test]
fn test_slot_row_get_typed() {
let row = SlotRow::from_values(1, &[
Some(&42i64.to_le_bytes()),
Some(&3.14f64.to_le_bytes()),
Some(&1u8.to_le_bytes()),
]);
assert_eq!(row.get_i64(0), Some(42));
assert_eq!(row.get_f64(1), Some(3.14));
assert_eq!(row.get_bool(2), Some(true));
}
#[test]
fn test_slot_row_get_str() {
let row = SlotRow::from_values(1, &[
Some(b"hello world"),
]);
assert_eq!(row.get_str(0), Some("hello world"));
}
#[test]
fn test_slot_row_mvcc() {
let mut row = SlotRow::from_values(1, &[Some(b"test")]);
row.set_mvcc(100, 200);
assert_eq!(row.txn_start(), 100);
assert_eq!(row.txn_end(), 200);
assert!(row.is_visible_at(150));
assert!(!row.is_visible_at(50));
assert!(!row.is_visible_at(250));
}
#[test]
fn test_slot_row_serialize() {
let row = SlotRow::from_values(42, &[
Some(b"hello"),
Some(&123i64.to_le_bytes()),
None,
]);
let bytes = row.to_bytes();
let restored = SlotRow::from_bytes(&bytes).unwrap();
assert_eq!(restored.row_id(), 42);
assert_eq!(restored.get_bytes(0), Some(b"hello".as_slice()));
assert_eq!(restored.get_i64(1), Some(123));
assert!(restored.is_null(2));
}
#[test]
fn test_slot_row_memory_size() {
let row = SlotRow::from_values(1, &[
Some(b"hello"),
Some(b"world"),
]);
let size = row.memory_size();
assert!(size < 100, "SlotRow size {} should be < 100 bytes", size);
}
#[test]
fn test_slot_row_arena() {
let mut arena = SlotRowArena::new();
let row1 = SlotRow::from_values(1, &[Some(b"hello")]);
let row2 = SlotRow::from_values(2, &[Some(b"world")]);
let h1 = arena.store(&row1);
let h2 = arena.store(&row2);
let r1 = arena.get(&h1).unwrap();
let r2 = arena.get(&h2).unwrap();
assert_eq!(r1.row_id(), 1);
assert_eq!(r2.row_id(), 2);
assert_eq!(r1.get_str(0), Some("hello"));
assert_eq!(r2.get_str(0), Some("world"));
}
#[test]
fn test_slot_row_arena_many_rows() {
let mut arena = SlotRowArena::with_block_size(1024);
let mut handles = Vec::new();
for i in 0..1000 {
let row = SlotRow::from_values(i, &[
Some(&i.to_le_bytes()),
Some(format!("row_{}", i).as_bytes()),
]);
handles.push(arena.store(&row));
}
for (i, handle) in handles.iter().enumerate() {
let row = arena.get(handle).unwrap();
assert_eq!(row.row_id(), i as u64);
assert_eq!(row.get_u64(0), Some(i as u64));
}
assert!(arena.block_count() > 1);
}
}