use crate::graph::types::Fact;
use crate::storage::index::FactRef;
use crate::storage::{PAGE_SIZE, StorageBackend};
use anyhow::Result;
pub const PAGE_TYPE_PACKED: u8 = 0x02;
#[allow(dead_code)]
pub const PAGE_TYPE_OVERFLOW: u8 = 0x03;
pub const PACKED_HEADER_SIZE: usize = 12;
pub const MAX_FACT_BYTES: usize = PAGE_SIZE - PACKED_HEADER_SIZE - 4;
pub fn pack_facts(facts: &[Fact], start_page_id: u64) -> Result<(Vec<Vec<u8>>, Vec<FactRef>)> {
let mut pages: Vec<Vec<u8>> = Vec::new();
let mut fact_refs: Vec<FactRef> = Vec::with_capacity(facts.len());
let mut current_page: Vec<u8> = new_packed_page();
let mut current_record_count: u16 = 0;
let mut dir_offset: usize = PACKED_HEADER_SIZE;
let mut data_offset: usize = PAGE_SIZE;
for fact in facts {
let serialised = postcard::to_allocvec(fact)?;
let len = serialised.len();
let dir_entry_size = 4usize;
if len > MAX_FACT_BYTES {
anyhow::bail!(
"Fact serialised size {} bytes exceeds maximum slot size {} bytes",
len,
MAX_FACT_BYTES
);
}
let free = data_offset.saturating_sub(dir_offset.saturating_add(dir_entry_size));
if len > free || current_record_count == u16::MAX {
write_record_count(&mut current_page, current_record_count);
pages.push(current_page);
current_page = new_packed_page();
current_record_count = 0;
dir_offset = PACKED_HEADER_SIZE;
data_offset = PAGE_SIZE;
}
data_offset = data_offset.wrapping_sub(len);
current_page
.get_mut(data_offset..data_offset.saturating_add(len))
.ok_or_else(|| {
anyhow::anyhow!(
"packed page too short: data region {}..{} out of bounds",
data_offset,
data_offset.saturating_add(len)
)
})?
.copy_from_slice(&serialised);
let offset_u16 = u16::try_from(data_offset)
.map_err(|_| anyhow::anyhow!("data_offset {} overflows u16", data_offset))?;
let len_u16 = u16::try_from(len)
.map_err(|_| anyhow::anyhow!("serialised fact too large: {} bytes", len))?;
current_page
.get_mut(dir_offset..dir_offset.saturating_add(2))
.ok_or_else(|| {
anyhow::anyhow!(
"packed page dir out of bounds at {}..{}",
dir_offset,
dir_offset.saturating_add(2)
)
})?
.copy_from_slice(&offset_u16.to_le_bytes());
current_page
.get_mut(dir_offset.saturating_add(2)..dir_offset.saturating_add(4))
.ok_or_else(|| {
anyhow::anyhow!(
"packed page dir out of bounds at {}..{}",
dir_offset.saturating_add(2),
dir_offset.saturating_add(4)
)
})?
.copy_from_slice(&len_u16.to_le_bytes());
dir_offset = dir_offset.saturating_add(4);
let page_id = start_page_id.saturating_add(
u64::try_from(pages.len())
.map_err(|_| anyhow::anyhow!("too many pages: overflows u64"))?,
);
fact_refs.push(FactRef {
page_id,
slot_index: current_record_count,
});
current_record_count = current_record_count.saturating_add(1);
}
write_record_count(&mut current_page, current_record_count);
pages.push(current_page);
Ok((pages, fact_refs))
}
pub fn read_slot(page: &[u8], slot: u16) -> Result<Fact> {
if page.len() < PAGE_SIZE {
anyhow::bail!(
"Page too short: {} bytes (expected {})",
page.len(),
PAGE_SIZE
);
}
let page_type = *page
.first()
.ok_or_else(|| anyhow::anyhow!("packed page empty"))?;
if page_type != PAGE_TYPE_PACKED {
anyhow::bail!("Expected packed page (0x02), got 0x{:02x}", page_type);
}
let b2 = *page
.get(2)
.ok_or_else(|| anyhow::anyhow!("packed page too short for record_count byte 2"))?;
let b3 = *page
.get(3)
.ok_or_else(|| anyhow::anyhow!("packed page too short for record_count byte 3"))?;
let record_count = u16::from_le_bytes([b2, b3]);
if slot >= record_count {
anyhow::bail!(
"Slot {} out of bounds (page has {} records)",
slot,
record_count
);
}
let slot_usize = usize::from(slot);
let dir_base = PACKED_HEADER_SIZE.saturating_add(slot_usize.saturating_mul(4));
let db0 = *page
.get(dir_base)
.ok_or_else(|| anyhow::anyhow!("packed page dir entry {} out of bounds", slot))?;
let db1 = *page
.get(dir_base.saturating_add(1))
.ok_or_else(|| anyhow::anyhow!("packed page dir entry {} byte 1 out of bounds", slot))?;
let db2 = *page
.get(dir_base.saturating_add(2))
.ok_or_else(|| anyhow::anyhow!("packed page dir entry {} byte 2 out of bounds", slot))?;
let db3 = *page
.get(dir_base.saturating_add(3))
.ok_or_else(|| anyhow::anyhow!("packed page dir entry {} byte 3 out of bounds", slot))?;
let offset = usize::from(u16::from_le_bytes([db0, db1]));
let length = usize::from(u16::from_le_bytes([db2, db3]));
if offset.saturating_add(length) > PAGE_SIZE {
anyhow::bail!("Record at slot {} extends beyond page boundary", slot);
}
let fact: Fact = postcard::from_bytes(
page.get(offset..offset.saturating_add(length))
.ok_or_else(|| {
anyhow::anyhow!(
"packed page record {}..{} out of bounds",
offset,
offset.saturating_add(length)
)
})?,
)?;
Ok(fact)
}
pub fn read_all_from_pages(
backend: &dyn StorageBackend,
first_page_id: u64,
num_pages: u64,
) -> Result<Vec<Fact>> {
let mut facts = Vec::new();
for i in 0..num_pages {
let page = backend.read_page(first_page_id.saturating_add(i))?;
let page_type = page.first().copied().unwrap_or(0);
if page.len() < PAGE_SIZE || page_type != PAGE_TYPE_PACKED {
continue;
}
let b2 = page.get(2).copied().unwrap_or(0);
let b3 = page.get(3).copied().unwrap_or(0);
let record_count = u16::from_le_bytes([b2, b3]);
for slot in 0..record_count {
facts.push(read_slot(&page, slot)?);
}
}
Ok(facts)
}
fn new_packed_page() -> Vec<u8> {
let mut page = vec![0u8; PAGE_SIZE];
if let Some(b) = page.get_mut(0) {
*b = PAGE_TYPE_PACKED;
}
page
}
fn write_record_count(page: &mut [u8], count: u16) {
if let Some(slot) = page.get_mut(2..4) {
slot.copy_from_slice(&count.to_le_bytes());
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::graph::types::{Fact, VALID_TIME_FOREVER, Value};
use uuid::Uuid;
fn make_fact(n: u64) -> Fact {
Fact::with_valid_time(
Uuid::from_u128(n as u128),
":attr".to_string(),
Value::Integer(n as i64),
n,
n,
0,
VALID_TIME_FOREVER,
)
}
#[test]
fn test_single_fact_roundtrip() {
let facts = vec![make_fact(1)];
let (pages, refs) = pack_facts(&facts, 1).unwrap();
assert_eq!(pages.len(), 1);
assert_eq!(refs.len(), 1);
assert_eq!(refs[0].page_id, 1);
assert_eq!(refs[0].slot_index, 0);
let recovered = read_slot(&pages[0], 0).unwrap();
assert_eq!(recovered.entity, facts[0].entity);
assert_eq!(recovered.tx_count, facts[0].tx_count);
}
#[test]
fn test_multiple_facts_pack_fewer_pages() {
let facts: Vec<Fact> = (0..50).map(make_fact).collect();
let (pages, refs) = pack_facts(&facts, 1).unwrap();
assert!(
pages.len() < 50,
"packed pages ({}) should be < 50",
pages.len()
);
assert_eq!(refs.len(), 50);
}
#[test]
fn test_slot_index_roundtrip() {
let facts: Vec<Fact> = (0..30).map(make_fact).collect();
let (pages, refs) = pack_facts(&facts, 1).unwrap();
for (i, fact) in facts.iter().enumerate() {
let r = &refs[i];
let page = &pages[(r.page_id - 1) as usize]; let recovered = read_slot(page, r.slot_index).unwrap();
assert_eq!(recovered.entity, fact.entity, "fact {} mismatched", i);
}
}
#[test]
fn test_page_type_byte_is_0x02() {
let facts = vec![make_fact(1)];
let (pages, _) = pack_facts(&facts, 1).unwrap();
assert_eq!(pages[0][0], PAGE_TYPE_PACKED);
}
#[test]
fn test_packed_page_header_byte_layout() {
let facts: Vec<Fact> = (0..3).map(make_fact).collect();
let (pages, _) = pack_facts(&facts, 1).unwrap();
let page = &pages[0];
assert_eq!(page[0], 0x02, "byte 0 must be PAGE_TYPE_PACKED (0x02)");
assert_eq!(page[1], 0x00, "byte 1 must be reserved zero");
let record_count = u16::from_le_bytes([page[2], page[3]]);
assert_eq!(record_count, 3, "record_count at bytes 2-3 must be 3");
assert_eq!(page[2], 3, "record_count low byte at offset 2");
assert_eq!(page[3], 0, "record_count high byte at offset 3");
let next_page = u64::from_le_bytes(page[4..12].try_into().unwrap());
assert_eq!(next_page, 0, "next_page at bytes 4-11 must be 0");
assert_eq!(&page[4..12], &0u64.to_le_bytes(), "next_page raw LE bytes");
}
#[test]
fn test_packed_page_record_directory_layout() {
let facts = vec![make_fact(1)];
let (pages, _) = pack_facts(&facts, 1).unwrap();
let page = &pages[0];
let record_count = u16::from_le_bytes([page[2], page[3]]);
assert_eq!(record_count, 1);
let offset = u16::from_le_bytes([page[12], page[13]]) as usize;
let length = u16::from_le_bytes([page[14], page[15]]) as usize;
assert!(
offset >= PACKED_HEADER_SIZE + 4,
"data offset must be past header+directory"
);
assert!(offset < PAGE_SIZE, "data offset must be within page");
assert!(length > 0, "record length must be nonzero");
assert!(offset + length <= PAGE_SIZE, "record must fit within page");
assert_eq!(
&page[12..14],
&(offset as u16).to_le_bytes(),
"directory offset LE"
);
assert_eq!(
&page[14..16],
&(length as u16).to_le_bytes(),
"directory length LE"
);
}
#[test]
fn test_read_all_from_pages_roundtrip() {
use crate::storage::backend::MemoryBackend;
let facts: Vec<Fact> = (0..60).map(make_fact).collect();
let (pages, _refs) = pack_facts(&facts, 1).unwrap();
let mut backend = MemoryBackend::new();
for (i, page) in pages.iter().enumerate() {
backend.write_page((i + 1) as u64, page).unwrap();
}
let recovered = read_all_from_pages(&backend, 1, pages.len() as u64).unwrap();
assert_eq!(recovered.len(), 60);
for (orig, rec) in facts.iter().zip(recovered.iter()) {
assert_eq!(orig.entity, rec.entity);
}
}
#[test]
fn test_oversized_fact_returns_error() {
let big_string = "x".repeat(5000);
let fact = Fact::with_valid_time(
Uuid::from_u128(999),
":big".to_string(),
Value::String(big_string),
1,
1,
0,
VALID_TIME_FOREVER,
);
let result = pack_facts(&[fact], 1);
assert!(result.is_err(), "oversized fact must return Err, not panic");
}
}