pub mod backend;
pub mod btree;
pub mod btree_v6;
pub mod cache;
pub mod index;
pub mod packed_pages;
pub mod persistent_facts;
use anyhow::Result;
pub const PAGE_SIZE: usize = 4096;
pub const MAGIC_NUMBER: [u8; 4] = *b"MGRF";
pub const FORMAT_VERSION: u32 = 7;
pub const FACT_PAGE_FORMAT_ONE_PER_PAGE: u8 = 0x01;
pub const FACT_PAGE_FORMAT_PACKED: u8 = 0x02;
pub trait StorageBackend: Send + Sync {
fn write_page(&mut self, page_id: u64, data: &[u8]) -> Result<()>;
fn read_page(&self, page_id: u64) -> Result<Vec<u8>>;
fn sync(&mut self) -> Result<()>;
fn page_count(&self) -> Result<u64>;
#[allow(dead_code)]
fn close(&mut self) -> Result<()>;
#[allow(dead_code)]
fn backend_name(&self) -> &'static str;
fn is_new(&self) -> bool;
}
#[derive(Debug, Clone, Copy)]
pub struct FileHeader {
pub magic: [u8; 4],
pub version: u32,
pub page_count: u64,
pub node_count: u64,
pub last_checkpointed_tx_count: u64,
pub eavt_root_page: u64,
pub aevt_root_page: u64,
pub avet_root_page: u64,
pub vaet_root_page: u64,
pub index_checksum: u32,
pub fact_page_format: u8,
pub(crate) _padding: [u8; 3],
pub fact_page_count: u64,
pub header_checksum: u32,
}
impl FileHeader {
pub fn new() -> Self {
FileHeader {
magic: MAGIC_NUMBER,
version: FORMAT_VERSION,
page_count: 1, node_count: 0,
last_checkpointed_tx_count: 0,
eavt_root_page: 0,
aevt_root_page: 0,
avet_root_page: 0,
vaet_root_page: 0,
index_checksum: 0,
fact_page_format: FACT_PAGE_FORMAT_PACKED,
_padding: [0; 3],
fact_page_count: 0,
header_checksum: 0,
}
}
pub fn to_bytes(self) -> Vec<u8> {
let mut b = Vec::with_capacity(84);
b.extend_from_slice(&self.magic);
b.extend_from_slice(&self.version.to_le_bytes());
b.extend_from_slice(&self.page_count.to_le_bytes());
b.extend_from_slice(&self.node_count.to_le_bytes());
b.extend_from_slice(&self.last_checkpointed_tx_count.to_le_bytes());
b.extend_from_slice(&self.eavt_root_page.to_le_bytes());
b.extend_from_slice(&self.aevt_root_page.to_le_bytes());
b.extend_from_slice(&self.avet_root_page.to_le_bytes());
b.extend_from_slice(&self.vaet_root_page.to_le_bytes());
b.extend_from_slice(&self.index_checksum.to_le_bytes());
b.push(self.fact_page_format);
b.extend_from_slice(&self._padding);
b.extend_from_slice(&self.fact_page_count.to_le_bytes());
b.extend_from_slice(&self.header_checksum.to_le_bytes());
b
}
pub fn from_bytes(bytes: &[u8]) -> Result<Self> {
if bytes.len() < 64 {
anyhow::bail!(
"Invalid header: too short (got {} bytes, need 64)",
bytes.len()
);
}
let mut magic = [0u8; 4];
magic.copy_from_slice(&bytes[0..4]);
if magic != MAGIC_NUMBER {
anyhow::bail!("Invalid magic number: not a .graph file");
}
let version = u32::from_le_bytes(bytes[4..8].try_into().unwrap());
let page_count = u64::from_le_bytes(bytes[8..16].try_into().unwrap());
let node_count = u64::from_le_bytes(bytes[16..24].try_into().unwrap());
let last_checkpointed_tx_count = u64::from_le_bytes(bytes[24..32].try_into().unwrap());
if version <= 3 {
return Ok(FileHeader {
magic,
version,
page_count,
node_count,
last_checkpointed_tx_count,
eavt_root_page: 0,
aevt_root_page: 0,
avet_root_page: 0,
vaet_root_page: 0,
index_checksum: 0,
fact_page_format: 0,
_padding: [0; 3],
fact_page_count: 0,
header_checksum: 0,
});
}
if bytes.len() < 72 {
anyhow::bail!(
"Invalid v4/v5/v6 header: expected at least 72 bytes, got {}",
bytes.len()
);
}
let fact_page_count = if version >= 6 {
if bytes.len() < 80 {
anyhow::bail!("Invalid v6 header: expected 80 bytes, got {}", bytes.len());
}
u64::from_le_bytes(bytes[72..80].try_into().unwrap())
} else {
0
};
let header_checksum = if version >= 7 {
if bytes.len() < 84 {
anyhow::bail!("Invalid v7 header: expected 84 bytes, got {}", bytes.len());
}
u32::from_le_bytes(bytes[80..84].try_into().unwrap())
} else {
0
};
Ok(FileHeader {
magic,
version,
page_count,
node_count,
last_checkpointed_tx_count,
eavt_root_page: u64::from_le_bytes(bytes[32..40].try_into().unwrap()),
aevt_root_page: u64::from_le_bytes(bytes[40..48].try_into().unwrap()),
avet_root_page: u64::from_le_bytes(bytes[48..56].try_into().unwrap()),
vaet_root_page: u64::from_le_bytes(bytes[56..64].try_into().unwrap()),
index_checksum: u32::from_le_bytes(bytes[64..68].try_into().unwrap()),
fact_page_format: bytes[68],
_padding: [bytes[69], bytes[70], bytes[71]],
fact_page_count,
header_checksum,
})
}
pub fn validate(&self) -> Result<()> {
if self.magic != MAGIC_NUMBER {
anyhow::bail!("Invalid magic number");
}
if self.version < 1 || self.version > FORMAT_VERSION {
anyhow::bail!(
"Unsupported format version: {} (supported: 1-{})",
self.version,
FORMAT_VERSION
);
}
if self.page_count == 0 {
anyhow::bail!("page_count must be greater than 0");
}
if self.eavt_root_page != 0 && self.eavt_root_page >= self.page_count {
anyhow::bail!(
"eavt_root_page ({}) must be less than page_count ({})",
self.eavt_root_page,
self.page_count
);
}
if self.fact_page_count > self.page_count {
anyhow::bail!(
"fact_page_count ({}) cannot exceed page_count ({})",
self.fact_page_count,
self.page_count
);
}
Ok(())
}
}
impl Default for FileHeader {
fn default() -> Self {
Self::new()
}
}
pub trait CommittedFactReader: Send + Sync {
#[allow(dead_code)]
fn resolve(
&self,
fact_ref: crate::storage::index::FactRef,
) -> Result<crate::graph::types::Fact>;
fn stream_all(&self) -> Result<Vec<crate::graph::types::Fact>>;
#[allow(dead_code)]
fn committed_page_count(&self) -> u64;
}
pub trait CommittedIndexReader: Send + Sync {
#[allow(dead_code)]
fn range_scan_eavt(
&self,
start: &crate::storage::index::EavtKey,
end: Option<&crate::storage::index::EavtKey>,
) -> anyhow::Result<Vec<crate::storage::index::FactRef>>;
#[allow(dead_code)]
fn range_scan_aevt(
&self,
start: &crate::storage::index::AevtKey,
end: Option<&crate::storage::index::AevtKey>,
) -> anyhow::Result<Vec<crate::storage::index::FactRef>>;
#[allow(dead_code)]
fn range_scan_avet(
&self,
start: &crate::storage::index::AvetKey,
end: Option<&crate::storage::index::AvetKey>,
) -> anyhow::Result<Vec<crate::storage::index::FactRef>>;
#[allow(dead_code)]
fn range_scan_vaet(
&self,
start: &crate::storage::index::VaetKey,
end: Option<&crate::storage::index::VaetKey>,
) -> anyhow::Result<Vec<crate::storage::index::FactRef>>;
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_file_header_from_bytes_v3_accepted() {
let mut bytes = vec![0u8; 64];
bytes[0..4].copy_from_slice(b"MGRF");
bytes[4..8].copy_from_slice(&3u32.to_le_bytes()); bytes[8..16].copy_from_slice(&1u64.to_le_bytes()); let header = FileHeader::from_bytes(&bytes).unwrap();
assert_eq!(header.version, 3);
assert_eq!(header.eavt_root_page, 0);
assert_eq!(header.index_checksum, 0);
}
#[test]
fn test_file_header_validation() {
let header = FileHeader::new();
assert!(header.validate().is_ok());
let mut invalid = header;
invalid.magic = *b"XXXX";
assert!(invalid.validate().is_err());
}
#[test]
fn test_format_version_is_7() {
assert_eq!(FORMAT_VERSION, 7);
}
#[test]
fn test_validate_accepts_version_7() {
let mut h = FileHeader::new();
h.version = 7;
assert!(h.validate().is_ok());
}
#[test]
fn test_validate_page_count_must_be_positive() {
let mut h = FileHeader::new();
h.page_count = 0;
let result = h.validate();
assert!(result.is_err());
assert!(result.unwrap_err().to_string().contains("page_count"));
}
#[test]
fn test_validate_eavt_root_page_bounds() {
let mut h = FileHeader::new();
h.page_count = 10;
h.eavt_root_page = 10; let result = h.validate();
assert!(result.is_err());
assert!(result.unwrap_err().to_string().contains("eavt_root_page"));
h.eavt_root_page = 0;
assert!(h.validate().is_ok());
h.eavt_root_page = 5;
assert!(h.validate().is_ok());
}
#[test]
fn test_validate_fact_page_count_bounds() {
let mut h = FileHeader::new();
h.page_count = 10;
h.fact_page_count = 11; let result = h.validate();
assert!(result.is_err());
assert!(result.unwrap_err().to_string().contains("fact_page_count"));
h.fact_page_count = 5;
assert!(h.validate().is_ok());
}
#[test]
fn test_new_header_has_version_7() {
let header = FileHeader::new();
assert_eq!(header.version, FORMAT_VERSION);
assert_eq!(header.version, 7);
}
#[test]
fn test_file_header_serialization_v7() {
let header = FileHeader::new();
let bytes = header.to_bytes();
assert_eq!(bytes.len(), 84);
}
#[test]
fn test_file_header_roundtrip_v7() {
let mut header = FileHeader::new();
header.header_checksum = 0xDEAD_BEEF;
let bytes = header.to_bytes();
let parsed = FileHeader::from_bytes(&bytes).unwrap();
assert_eq!(parsed.header_checksum, 0xDEAD_BEEF);
}
#[test]
fn test_file_header_v7_byte_layout_all_fields() {
let mut h = FileHeader::new();
h.page_count = 0x0102_0304_0506_0708_u64;
h.node_count = 0x1112_1314_1516_1718_u64;
h.last_checkpointed_tx_count = 0x2122_2324_2526_2728_u64;
h.eavt_root_page = 0x3132_3334_3536_3738_u64;
h.aevt_root_page = 0x4142_4344_4546_4748_u64;
h.avet_root_page = 0x5152_5354_5556_5758_u64;
h.vaet_root_page = 0x6162_6364_6566_6768_u64;
h.index_checksum = 0x7172_7374_u32;
h.fact_page_format = 0x02;
h._padding = [0x00; 3];
h.fact_page_count = 0xA1A2_A3A4_A5A6_A7A8_u64;
h.header_checksum = 0xC1C2_C3C4_u32;
let b = h.to_bytes();
assert_eq!(b.len(), 84, "v7 header must be exactly 84 bytes");
assert_eq!(&b[0..4], b"MGRF");
assert_eq!(&b[4..8], &7u32.to_le_bytes());
assert_eq!(&b[8..16], &0x0102_0304_0506_0708_u64.to_le_bytes());
assert_eq!(&b[16..24], &0x1112_1314_1516_1718_u64.to_le_bytes());
assert_eq!(&b[24..32], &0x2122_2324_2526_2728_u64.to_le_bytes());
assert_eq!(&b[32..40], &0x3132_3334_3536_3738_u64.to_le_bytes());
assert_eq!(&b[40..48], &0x4142_4344_4546_4748_u64.to_le_bytes());
assert_eq!(&b[48..56], &0x5152_5354_5556_5758_u64.to_le_bytes());
assert_eq!(&b[56..64], &0x6162_6364_6566_6768_u64.to_le_bytes());
assert_eq!(&b[64..68], &0x7172_7374_u32.to_le_bytes());
assert_eq!(b[68], 0x02);
assert_eq!(&b[69..72], &[0x00u8; 3]);
assert_eq!(&b[72..80], &0xA1A2_A3A4_A5A6_A7A8_u64.to_le_bytes());
assert_eq!(&b[80..84], &0xC1C2_C3C4_u32.to_le_bytes());
}
#[test]
fn test_file_header_v6_reads_header_checksum_zero() {
let mut bytes = vec![0u8; 80];
bytes[0..4].copy_from_slice(b"MGRF");
bytes[4..8].copy_from_slice(&6u32.to_le_bytes());
bytes[8..16].copy_from_slice(&2u64.to_le_bytes());
let h = FileHeader::from_bytes(&bytes).unwrap();
assert_eq!(h.version, 6);
assert_eq!(h.header_checksum, 0);
}
#[test]
fn test_file_header_v7_truncated_rejected() {
let mut bytes = vec![0u8; 80];
bytes[0..4].copy_from_slice(b"MGRF");
bytes[4..8].copy_from_slice(&7u32.to_le_bytes());
assert!(FileHeader::from_bytes(&bytes).is_err());
}
#[test]
fn test_validate_accepts_versions_1_to_7() {
let mut h = FileHeader::new();
for v in 1u32..=7 {
h.version = v;
assert!(h.validate().is_ok(), "version {} should be accepted", v);
}
}
#[test]
fn test_file_header_v7_header_checksum_roundtrip() {
let mut h = FileHeader::new();
h.header_checksum = 42;
let bytes = h.to_bytes();
assert_eq!(bytes.len(), 84);
let parsed = FileHeader::from_bytes(&bytes).unwrap();
assert_eq!(parsed.header_checksum, 42);
}
#[test]
fn test_file_header_from_bytes_truncated_v4_rejected() {
let mut bytes = vec![0u8; 68]; bytes[0..4].copy_from_slice(b"MGRF");
bytes[4..8].copy_from_slice(&4u32.to_le_bytes()); let result = FileHeader::from_bytes(&bytes);
assert!(result.is_err(), "truncated v4 header must be rejected");
}
#[test]
fn test_file_header_v5_fact_page_format_roundtrip() {
let mut h = FileHeader::new();
h.fact_page_format = FACT_PAGE_FORMAT_PACKED;
let bytes = h.to_bytes();
let parsed = FileHeader::from_bytes(&bytes).unwrap();
assert_eq!(parsed.fact_page_format, FACT_PAGE_FORMAT_PACKED);
}
#[test]
fn test_v4_header_reads_fact_page_format_zero() {
let mut bytes = vec![0u8; 72];
bytes[0..4].copy_from_slice(b"MGRF");
bytes[4..8].copy_from_slice(&4u32.to_le_bytes()); bytes[8..16].copy_from_slice(&2u64.to_le_bytes()); let h = FileHeader::from_bytes(&bytes).unwrap();
assert_eq!(h.fact_page_format, 0);
}
#[test]
fn test_validate_accepts_version_5() {
let mut h = FileHeader::new();
h.version = 5;
assert!(h.validate().is_ok());
}
}