#[cfg(feature = "v3-forensics")]
use crate::backend::native::v3::forensics::{
FORENSIC_COUNTERS, PAGE_OWNERSHIP, PageType as ForensicPageType, Subsystem,
};
use crate::backend::native::v3::{
allocator::PageAllocator, btree::BTreeManager, constants::DEFAULT_PAGE_SIZE,
file_coordinator::FileCoordinator, header::PersistentHeaderV3, wal::WALWriter,
};
use crate::backend::native::{
types::{NativeBackendError, NativeResult},
v3::compact_edge_record::{CompactEdgeRecord, Direction as V2Direction},
};
use parking_lot::RwLock;
use std::collections::HashMap;
use std::fs::OpenOptions;
use std::io::{Seek, SeekFrom, Write};
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[repr(u8)]
pub enum PageType {
Free = 0,
BTreeIndex = 1,
NodeData = 2,
EdgeCluster = 3,
Wal = 4,
Checkpoint = 5,
}
impl PageType {
pub fn from_u8(value: u8) -> Option<Self> {
match value {
0 => Some(PageType::Free),
1 => Some(PageType::BTreeIndex),
2 => Some(PageType::NodeData),
3 => Some(PageType::EdgeCluster),
4 => Some(PageType::Wal),
5 => Some(PageType::Checkpoint),
_ => None,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum Direction {
Outgoing,
Incoming,
}
impl Direction {
pub fn to_v2(&self) -> V2Direction {
match self {
Direction::Outgoing => V2Direction::Outgoing,
Direction::Incoming => V2Direction::Incoming,
}
}
}
#[derive(Debug, Clone)]
pub struct V3EdgeCluster {
pub src: i64,
pub edges: Vec<CompactEdgeRecord>,
pub direction: Direction,
pub format_version: u8,
pub page_id: u64,
}
impl V3EdgeCluster {
pub fn new(src: i64, direction: Direction, page_id: u64) -> Self {
Self {
src,
edges: Vec::new(),
direction,
format_version: 2, page_id,
}
}
pub fn add_edge(&mut self, dst: i64, edge_type: Option<String>) {
let edge_data = if let Some(et) = edge_type {
let et_bytes = et.as_bytes();
let mut data = Vec::with_capacity(1 + et_bytes.len());
data.push(et_bytes.len() as u8);
data.extend_from_slice(et_bytes);
data
} else {
Vec::new()
};
let edge = CompactEdgeRecord::new(dst, 0, edge_data);
self.edges.push(edge);
}
fn extract_edge_type(edge_data: &[u8]) -> Option<String> {
if edge_data.is_empty() {
return None;
}
let type_len = edge_data[0] as usize;
if edge_data.len() < 1 + type_len {
return None;
}
Some(String::from_utf8_lossy(&edge_data[1..1 + type_len]).to_string())
}
pub fn dsts(&self) -> Vec<i64> {
self.edges.iter().map(|e| e.neighbor_id).collect()
}
pub fn edges_with_types(&self) -> Vec<(i64, Option<String>)> {
self.edges
.iter()
.map(|e| {
let edge_type = Self::extract_edge_type(&e.edge_data);
(e.neighbor_id, edge_type)
})
.collect()
}
pub fn serialize(&self) -> NativeResult<Vec<u8>> {
let mut result = Vec::new();
result.push(self.format_version);
if self.format_version >= 2 {
result.extend_from_slice(&self.src.to_be_bytes());
result.push(if self.direction == Direction::Outgoing { 0 } else { 1 });
}
let count = self.edges.len() as u32;
result.extend_from_slice(&count.to_be_bytes());
for edge in &self.edges {
let edge_bytes = edge.serialize();
result.extend_from_slice(&edge_bytes);
}
Ok(result)
}
pub fn deserialize(bytes: &[u8], page_id: u64) -> NativeResult<Self> {
if bytes.len() < 5 {
return Err(NativeBackendError::DeserializationError {
context: "Edge cluster bytes too short".to_string(),
});
}
let format_version = bytes[0];
if format_version > 2 {
return Err(NativeBackendError::DeserializationError {
context: format!("Unknown edge cluster format version: {}", format_version),
});
}
let mut pos = 1;
let (src, direction) = if format_version >= 2 {
if bytes.len() < 1 + 8 + 1 {
return Err(NativeBackendError::DeserializationError {
context: "Edge cluster v2 header too short".to_string(),
});
}
let src = i64::from_be_bytes(
bytes[pos..pos + 8].try_into().expect("bounds checked above"),
);
pos += 8;
let dir_byte = bytes[pos];
pos += 1;
let direction = if dir_byte == 1 { Direction::Incoming } else { Direction::Outgoing };
(src, direction)
} else {
(0, Direction::Outgoing)
};
if pos + 4 > bytes.len() {
return Err(NativeBackendError::DeserializationError {
context: "Edge cluster truncated at edge count".to_string(),
});
}
let count = u32::from_be_bytes([bytes[pos], bytes[pos+1], bytes[pos+2], bytes[pos+3]]) as usize;
pos += 4;
let mut edges = Vec::with_capacity(count);
for _ in 0..count {
if pos + 12 > bytes.len() {
return Err(NativeBackendError::DeserializationError {
context: "Edge data truncated".to_string(),
});
}
let neighbor_id = i64::from_be_bytes(
bytes[pos..pos + 8].try_into().expect("bounds checked above"),
);
pos += 8;
let type_offset = u16::from_be_bytes(
bytes[pos..pos + 2].try_into().expect("bounds checked above"),
);
pos += 2;
let data_len = u16::from_be_bytes(
bytes[pos..pos + 2].try_into().expect("bounds checked above"),
) as usize;
pos += 2;
let edge_data = if data_len > 0 {
if pos + data_len > bytes.len() {
return Err(NativeBackendError::DeserializationError {
context: "Edge data truncated".to_string(),
});
}
bytes[pos..pos + data_len].to_vec()
} else {
Vec::new()
};
pos += data_len;
edges.push(CompactEdgeRecord::new(neighbor_id, type_offset, edge_data));
}
Ok(Self {
src,
edges,
direction,
format_version,
page_id,
})
}
}
pub struct V3EdgeStore {
#[cfg(test)]
pub btree: parking_lot::RwLock<BTreeManager>,
#[cfg(not(test))]
btree: parking_lot::RwLock<BTreeManager>,
#[cfg(test)]
pub wal: Option<Arc<RwLock<WALWriter>>>,
#[cfg(not(test))]
wal: Option<Arc<RwLock<WALWriter>>>,
cache: RwLock<HashMap<(i64, Direction), Arc<[i64]>>>,
edge_types: RwLock<HashMap<(i64, i64, Direction), String>>,
cache_hits: AtomicU64,
cache_misses: AtomicU64,
hit_time_ns: AtomicU64,
miss_time_ns: AtomicU64,
#[cfg(test)]
pub dirty_clusters: RwLock<HashMap<(i64, Direction), V3EdgeCluster>>,
#[cfg(not(test))]
dirty_clusters: RwLock<HashMap<(i64, Direction), V3EdgeCluster>>,
db_path: Option<PathBuf>,
allocator: Arc<RwLock<PageAllocator>>,
file_coordinator: Option<Arc<FileCoordinator>>,
}
fn edge_key(src: i64, dir: Direction) -> i64 {
let dir_bit = if dir == Direction::Outgoing { 0i64 } else { 1i64 };
let zigzag_src = (src << 1) ^ (src >> 63);
let key = (dir_bit << 62) | (zigzag_src & 0x3FFF_FFFF_FFFF_FFFF);
key
}
impl V3EdgeStore {
pub fn new(
btree: BTreeManager,
wal: Option<WALWriter>,
allocator: Arc<RwLock<PageAllocator>>,
) -> Self {
Self {
btree: parking_lot::RwLock::new(btree),
wal: wal.map(|w| Arc::new(RwLock::new(w))),
cache: RwLock::new(HashMap::new()),
edge_types: RwLock::new(HashMap::new()),
cache_hits: AtomicU64::new(0),
cache_misses: AtomicU64::new(0),
hit_time_ns: AtomicU64::new(0),
miss_time_ns: AtomicU64::new(0),
dirty_clusters: RwLock::new(HashMap::new()),
db_path: None,
allocator,
file_coordinator: None,
}
}
pub fn with_path_and_allocator(
btree: BTreeManager,
wal: Option<WALWriter>,
db_path: PathBuf,
allocator: Arc<RwLock<PageAllocator>>,
) -> Self {
Self {
btree: parking_lot::RwLock::new(btree),
wal: wal.map(|w| Arc::new(RwLock::new(w))),
cache: RwLock::new(HashMap::new()),
edge_types: RwLock::new(HashMap::new()),
cache_hits: AtomicU64::new(0),
cache_misses: AtomicU64::new(0),
hit_time_ns: AtomicU64::new(0),
miss_time_ns: AtomicU64::new(0),
dirty_clusters: RwLock::new(HashMap::new()),
db_path: Some(db_path),
allocator,
file_coordinator: None,
}
}
pub fn with_path(btree: BTreeManager, wal: Option<WALWriter>, db_path: PathBuf) -> Self {
let header = PersistentHeaderV3::new_v3();
Self {
btree: parking_lot::RwLock::new(btree),
wal: wal.map(|w| Arc::new(RwLock::new(w))),
cache: RwLock::new(HashMap::new()),
edge_types: RwLock::new(HashMap::new()),
cache_hits: AtomicU64::new(0),
cache_misses: AtomicU64::new(0),
hit_time_ns: AtomicU64::new(0),
miss_time_ns: AtomicU64::new(0),
dirty_clusters: RwLock::new(HashMap::new()),
db_path: Some(db_path),
allocator: Arc::new(RwLock::new(PageAllocator::new(&header))),
file_coordinator: None,
}
}
pub fn set_file_coordinator(&mut self, coordinator: Arc<FileCoordinator>) {
self.file_coordinator = Some(coordinator);
}
pub fn neighbors(&self, src: i64, dir: Direction) -> NativeResult<Arc<[i64]>> {
let key = (src, dir);
#[cfg(feature = "v3-forensics")]
FORENSIC_COUNTERS
.logical_neighbors_calls
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
{
let cache = self.cache.read();
if let Some(neighbors) = cache.get(&key) {
self.cache_hits.fetch_add(1, Ordering::Relaxed);
#[cfg(feature = "v3-forensics")]
FORENSIC_COUNTERS
.edge_cache_hit_count
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
return Ok(neighbors.clone()); }
}
self.cache_misses.fetch_add(1, Ordering::Relaxed);
#[cfg(feature = "v3-forensics")]
FORENSIC_COUNTERS
.edge_cache_miss_count
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
if let Some(ref db_path) = self.db_path {
if let Ok(neighbors) = self.load_neighbors_from_disk(src, dir, db_path) {
#[cfg(feature = "v3-forensics")]
if !neighbors.is_empty() {
FORENSIC_COUNTERS
.edge_page_read_count
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
}
if !neighbors.is_empty() {
let mut cache = self.cache.write();
cache.insert(key, neighbors.clone());
return Ok(neighbors);
}
}
}
Ok(Arc::from([])) }
fn load_neighbors_from_disk(
&self,
src: i64,
dir: Direction,
db_path: &PathBuf,
) -> NativeResult<Arc<[i64]>> {
use crate::backend::native::v3::constants::V3_HEADER_SIZE;
use std::fs::File;
use std::io::Read;
let key = edge_key(src, dir);
let btree = self.btree.read();
let page_id = match btree.lookup(key) {
Ok(Some(pid)) => pid,
Ok(None) => {
return Ok(Arc::from([]));
}
Err(_) => {
return Ok(Arc::from([]));
}
};
drop(btree);
let offset = V3_HEADER_SIZE as u64 + (page_id - 1) * DEFAULT_PAGE_SIZE;
let mut file = match File::open(db_path) {
Ok(f) => f,
Err(_) => return Ok(Arc::from([])), };
if let Err(_) = file.seek(SeekFrom::Start(offset)) {
return Ok(Arc::from([]));
}
let mut buffer = vec![0u8; 4096]; match file.read(&mut buffer) {
Ok(n) if n > 0 => {
match V3EdgeCluster::deserialize(&buffer, page_id) {
Ok(cluster) => {
let edges_with_types = cluster.edges_with_types();
let mut edge_types = self.edge_types.write();
for (dst, edge_type) in edges_with_types {
if let Some(et) = edge_type {
edge_types.insert((src, dst, dir), et);
}
}
let neighbors: Vec<i64> = cluster.dsts();
Ok(Arc::from(neighbors.into_boxed_slice()))
}
Err(_) => Ok(Arc::from([])), }
}
_ => Ok(Arc::from([])), }
}
pub fn outgoing(&self, src: i64) -> NativeResult<Arc<[i64]>> {
self.neighbors(src, Direction::Outgoing)
}
pub fn incoming(&self, src: i64) -> NativeResult<Arc<[i64]>> {
self.neighbors(src, Direction::Incoming)
}
pub fn neighbors_filtered(
&self,
src: i64,
dir: Direction,
edge_type: &str,
) -> NativeResult<Arc<[i64]>> {
let all_neighbors = self.neighbors(src, dir)?;
let edge_types = self.edge_types.read();
let filtered: Vec<i64> = all_neighbors
.iter()
.filter(|&&dst| {
edge_types
.get(&(src, dst, dir))
.map(|stored_type| stored_type == edge_type)
.unwrap_or(false)
})
.copied()
.collect();
Ok(Arc::from(filtered.into_boxed_slice()))
}
pub fn get_edge_type(&self, src: i64, dst: i64, dir: Direction) -> Option<String> {
let edge_types = self.edge_types.read();
edge_types.get(&(src, dst, dir)).cloned()
}
pub fn insert_edge(
&self,
src: i64,
dst: i64,
dir: Direction,
edge_type: Option<String>,
) -> NativeResult<()> {
let cache_key = (src, dir);
let mut cache = self.cache.write();
if let Some(neighbors) = cache.get_mut(&cache_key) {
let mut vec: Vec<i64> = neighbors.to_vec();
if !vec.contains(&dst) {
vec.push(dst);
*neighbors = Arc::from(vec);
}
} else {
cache.insert(cache_key, Arc::from(vec![dst]));
}
if let Some(ref edge_type_str) = edge_type {
let mut edge_types = self.edge_types.write();
let key = (src, dst, dir);
if let Some(existing_type) = edge_types.get(&key) {
if existing_type != edge_type_str {
eprintln!(
"WARNING: V3EdgeStore inserting edge_type '{}' for ({}, {}, {:?}), overwriting existing type '{}'. This is a known limitation of tuple-key model.",
edge_type_str, src, dst, dir, existing_type
);
}
}
edge_types.insert(key, edge_type_str.clone());
} else {
let mut edge_types = self.edge_types.write();
edge_types.remove(&(src, dst, dir));
}
let page_id = {
let mut dirty = self.dirty_clusters.write();
if !dirty.contains_key(&cache_key) {
let key = edge_key(src, dir);
let btree = self.btree.read();
let page_id_to_use = match btree.lookup(key) {
Ok(Some(pid)) => pid,
Ok(None) | Err(_) => {
drop(btree);
let mut allocator = self.allocator.write();
match allocator.allocate() {
Ok(pid) => pid,
Err(e) => {
eprintln!("WARNING: Failed to allocate edge page: {:?}", e);
0 }
}
}
};
dirty.insert(cache_key, V3EdgeCluster::new(src, dir, page_id_to_use));
}
let cluster = dirty.get_mut(&cache_key).expect("cluster must exist after insert");
let cluster_page_id = cluster.page_id;
cluster.add_edge(dst, edge_type);
cluster_page_id
};
if let Some(ref wal) = self.wal {
let mut wal_guard = wal.write();
let _ = wal_guard.edge_insert(src, dst, dir as u8, page_id);
}
Ok(())
}
pub fn clear_cache(&self) {
self.cache.write().clear();
self.edge_types.write().clear();
self.cache_hits.store(0, Ordering::Relaxed);
self.cache_misses.store(0, Ordering::Relaxed);
}
pub fn print_stats(&self) {
let hits = self.cache_hits.load(Ordering::Relaxed);
let misses = self.cache_misses.load(Ordering::Relaxed);
let cache_size = self.cache.read().len();
let hit_ns = self.hit_time_ns.load(Ordering::Relaxed);
let miss_ns = self.miss_time_ns.load(Ordering::Relaxed);
let total = hits + misses;
let hit_rate = if total > 0 {
(hits as f64 / total as f64) * 100.0
} else {
0.0
};
let avg_hit_ns = if hits > 0 { hit_ns / hits } else { 0 };
let avg_miss_ns = if misses > 0 { miss_ns / misses } else { 0 };
println!("Cache stats:");
println!(" Entries: {}", cache_size);
println!(" Hits: {} ({:.1}%)", hits, hit_rate);
println!(" Misses: {}", misses);
println!(" Avg hit time: {} ns", avg_hit_ns);
println!(" Avg miss time: {} ns", avg_miss_ns);
}
pub fn flush(
&self,
_kv_store: Option<
&parking_lot::RwLock<Option<crate::backend::native::v3::kv_store::store::KvStore>>,
>,
) -> NativeResult<()> {
let db_path = match &self.db_path {
Some(path) => path.clone(),
None => {
self.dirty_clusters.write().clear();
return Ok(());
}
};
let dirty = self.dirty_clusters.write();
if dirty.is_empty() {
return Ok(()); }
let clusters_to_flush: Vec<((i64, Direction), V3EdgeCluster)> =
dirty.iter().map(|(k, v)| (*k, v.clone())).collect();
drop(dirty);
for ((src, dir), cluster) in clusters_to_flush {
let cluster_bytes = cluster.serialize()?;
let page_id = if cluster.page_id == 0 {
let mut allocator = self.allocator.write();
match allocator.allocate() {
Ok(pid) => pid,
Err(e) => {
return Err(NativeBackendError::IoError {
context: format!(
"Failed to allocate edge page for ({}, {:?})",
src, dir
),
source: std::io::Error::new(std::io::ErrorKind::Other, e.to_string()),
});
}
}
} else {
cluster.page_id
};
self.write_page_to_disk(&db_path, page_id, &cluster_bytes)?;
{
let mut btree = self.btree.write();
let key = edge_key(src, dir);
let _ = btree.insert(key, page_id);
}
}
self.dirty_clusters.write().clear();
if let Some(ref wal) = self.wal {
let mut wal_guard = wal.write();
let btree = self.btree.read();
let root_page_id = btree.root_page_id();
let tree_height = btree.tree_height();
let _ = wal_guard.checkpoint(
root_page_id,
0, tree_height,
0, &PersistentHeaderV3::new_v3(), );
let _ = wal_guard.flush();
let _ = wal_guard.truncate();
}
self.persist_btree_metadata()?;
Ok(())
}
fn metadata_path(&self) -> Option<PathBuf> {
self.db_path.as_ref().map(|p| p.with_extension("v3edgemeta"))
}
fn persist_btree_metadata(&self) -> NativeResult<()> {
let meta_path = match self.metadata_path() {
Some(p) => p,
None => return Ok(()), };
let btree = self.btree.read();
let root_page_id = btree.root_page_id();
let tree_height = btree.tree_height();
let mut data = Vec::with_capacity(24);
data.extend_from_slice(b"V3EDGE\x00\x00"); data.extend_from_slice(&root_page_id.to_le_bytes()); data.extend_from_slice(&(tree_height as u32).to_le_bytes());
let checksum: u32 = data.iter().fold(0u32, |acc, &b| acc.wrapping_add(b as u32));
data.extend_from_slice(&checksum.to_le_bytes());
std::fs::write(&meta_path, &data).map_err(|e| NativeBackendError::IoError {
context: format!("Failed to write edge metadata: {}", meta_path.display()),
source: e,
})?;
Ok(())
}
fn recover_btree_metadata(&self) -> NativeResult<Option<(u64, u32)>> {
let meta_path = match self.metadata_path() {
Some(p) => p,
None => return Ok(None), };
if !meta_path.exists() {
return Ok(None);
}
let data = std::fs::read(&meta_path).map_err(|e| NativeBackendError::IoError {
context: format!("Failed to read edge metadata: {}", meta_path.display()),
source: e,
})?;
if data.len() < 24 {
return Ok(None); }
if &data[0..8] != b"V3EDGE\x00\x00" {
return Ok(None); }
let root_page_id = u64::from_le_bytes([data[8], data[9], data[10], data[11], data[12], data[13], data[14], data[15]]);
let tree_height = u32::from_le_bytes([data[16], data[17], data[18], data[19]]);
let stored_checksum = u32::from_le_bytes([data[20], data[21], data[22], data[23]]);
let computed_checksum: u32 = data[..20].iter().fold(0u32, |acc, &b| acc.wrapping_add(b as u32));
if stored_checksum != computed_checksum {
return Ok(None); }
Ok(Some((root_page_id, tree_height)))
}
pub fn restore_btree_from_metadata(&self) -> NativeResult<bool> {
if let Some((root_page_id, tree_height)) = self.recover_btree_metadata()? {
let mut btree = self.btree.write();
btree.set_root_page_id(root_page_id);
btree.set_tree_height(tree_height);
Ok(true)
} else {
Ok(false)
}
}
fn write_page_to_disk(&self, db_path: &PathBuf, page_id: u64, data: &[u8]) -> NativeResult<()> {
#[cfg(feature = "v3-forensics")]
{
use crate::backend::native::v3::constants::V3_HEADER_SIZE;
let offset: u64 = if page_id == 0 {
0
} else {
(V3_HEADER_SIZE as u64) + (page_id - 1) * DEFAULT_PAGE_SIZE
};
crate::track_page_alloc!(page_id, Subsystem::EdgeStore, ForensicPageType::Edge);
crate::track_page_write!(
page_id,
Subsystem::EdgeStore,
ForensicPageType::Edge,
offset,
"EdgeStore::write_page_to_disk"
);
}
if let Some(ref coordinator) = self.file_coordinator {
let page_data = if data.len() < DEFAULT_PAGE_SIZE as usize {
let mut padded = data.to_vec();
padded.resize(DEFAULT_PAGE_SIZE as usize, 0);
padded
} else {
data.to_vec()
};
return coordinator.write_page(page_id, &page_data);
}
use crate::backend::native::v3::constants::V3_HEADER_SIZE;
let offset: u64 = if page_id == 0 {
0
} else {
(V3_HEADER_SIZE as u64) + (page_id - 1) * DEFAULT_PAGE_SIZE
};
let file_exists = db_path.exists();
let mut file = OpenOptions::new()
.write(true)
.create(!file_exists)
.open(db_path)
.map_err(|e| NativeBackendError::IoError {
context: format!("Failed to open db file for page write: {}", page_id),
source: e,
})?;
let required_len = offset + data.len() as u64;
let current_len = file.metadata().map(|m| m.len()).unwrap_or(0);
if required_len > current_len {
file.set_len(required_len)
.map_err(|e| NativeBackendError::IoError {
context: format!(
"Failed to extend file to {} bytes for page {}",
required_len, page_id
),
source: e,
})?;
}
file.seek(SeekFrom::Start(offset as u64))
.map_err(|e| NativeBackendError::IoError {
context: format!("Failed to seek to page {} offset {}", page_id, offset),
source: e,
})?;
file.write_all(data)
.map_err(|e| NativeBackendError::IoError {
context: format!("Failed to write page {} data", page_id),
source: e,
})?;
file.sync_data().map_err(|e| NativeBackendError::IoError {
context: format!("Failed to sync page {} write", page_id),
source: e,
})?;
Ok(())
}
#[cfg(test)]
pub fn flush_wal(&self) -> NativeResult<()> {
if let Some(ref wal) = self.wal {
let mut wal_guard = wal.write();
wal_guard.flush()
} else {
Ok(())
}
}
pub fn btree_root_page_id(&self) -> Option<u64> {
let root = self.btree.read().root_page_id();
if root != 0 && root != u64::MAX {
Some(root)
} else {
None
}
}
pub fn btree_height(&self) -> u32 {
self.btree.read().tree_height()
}
pub fn set_wal(&mut self, wal: Arc<RwLock<WALWriter>>) {
self.wal = Some(wal);
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::backend::native::v3::{
allocator::PageAllocator, btree::BTreeManager, header::PersistentHeaderV3,
};
use parking_lot::RwLock;
use std::path::PathBuf;
use std::sync::Arc;
use tempfile::TempDir;
#[test]
fn test_page_type_from_u8() {
assert_eq!(PageType::from_u8(0), Some(PageType::Free));
assert_eq!(PageType::from_u8(1), Some(PageType::BTreeIndex));
assert_eq!(PageType::from_u8(2), Some(PageType::NodeData));
assert_eq!(PageType::from_u8(3), Some(PageType::EdgeCluster));
assert_eq!(PageType::from_u8(255), None);
}
#[test]
fn test_direction_conversion() {
assert_eq!(Direction::Outgoing.to_v2(), V2Direction::Outgoing);
assert_eq!(Direction::Incoming.to_v2(), V2Direction::Incoming);
}
#[test]
fn test_v3_edge_cluster_new() {
let cluster = V3EdgeCluster::new(42, Direction::Outgoing, 100);
assert_eq!(cluster.src, 42);
assert!(cluster.edges.is_empty());
assert_eq!(cluster.direction, Direction::Outgoing);
assert_eq!(cluster.page_id, 100);
assert_eq!(cluster.format_version, 2);
}
#[test]
fn test_v3_edge_cluster_add_edge() {
let mut cluster = V3EdgeCluster::new(1, Direction::Outgoing, 1);
cluster.add_edge(2, None);
cluster.add_edge(3, None);
assert_eq!(cluster.dsts(), vec![2, 3]);
}
#[test]
fn test_v3_edge_cluster_serialize_roundtrip() {
let mut cluster = V3EdgeCluster::new(42, Direction::Outgoing, 100);
cluster.add_edge(100, None);
cluster.add_edge(200, None);
let bytes = cluster.serialize().unwrap();
let deserialized = V3EdgeCluster::deserialize(&bytes, 100).unwrap();
assert_eq!(deserialized.format_version, 2);
assert_eq!(deserialized.src, 42, "src should survive roundtrip");
assert_eq!(deserialized.direction, Direction::Outgoing, "direction should survive roundtrip");
assert_eq!(deserialized.dsts(), vec![100, 200]);
assert_eq!(deserialized.page_id, 100);
}
#[test]
fn test_v3_edge_cluster_roundtrip_incoming() {
let mut cluster = V3EdgeCluster::new(99, Direction::Incoming, 50);
cluster.add_edge(10, None);
let bytes = cluster.serialize().unwrap();
let deserialized = V3EdgeCluster::deserialize(&bytes, 50).unwrap();
assert_eq!(deserialized.src, 99);
assert_eq!(deserialized.direction, Direction::Incoming,
"Incoming direction must survive serialization roundtrip");
}
fn create_test_edge_store(
db_path: Option<PathBuf>,
) -> (V3EdgeStore, Arc<RwLock<PageAllocator>>) {
let header = PersistentHeaderV3::new_v3();
let allocator = Arc::new(RwLock::new(PageAllocator::new(&header)));
let btree = if let Some(ref path) = db_path {
BTreeManager::new(allocator.clone(), None, path.clone())
} else {
BTreeManager::new(allocator.clone(), None, None::<PathBuf>)
};
let edge_store = if let Some(ref path) = db_path {
let wal_path = path.with_extension("v3wal");
let writer = WALWriter::new(wal_path, 1).expect("Failed to create WAL writer");
writer.write_header().expect("Failed to write WAL header");
V3EdgeStore::with_path_and_allocator(btree, Some(writer), path.clone(), allocator.clone())
} else {
V3EdgeStore::new(btree, None, allocator.clone())
};
let _ = edge_store.restore_btree_from_metadata();
(edge_store, allocator)
}
#[test]
fn test_edge_insert_creates_wal_record() {
let temp_dir = TempDir::new().unwrap();
let db_path = temp_dir.path().join("test.graph");
let wal_path = db_path.with_extension("v3wal");
let (edge_store, _allocator) = create_test_edge_store(Some(db_path.clone()));
edge_store
.insert_edge(1, 2, Direction::Outgoing, None)
.expect("Insert failed");
edge_store.flush_wal().expect("WAL flush failed");
assert!(
wal_path.exists(),
"CRITICAL TODO: WAL file should exist after edge insert with WAL enabled"
);
let wal_content = std::fs::read(&wal_path).expect("Failed to read WAL file");
assert!(
wal_content.len() > 64, "CRITICAL TODO: WAL should contain edge insert record beyond header"
);
}
#[test]
fn test_flush_writes_dirty_clusters_to_pages() {
let temp_dir = TempDir::new().unwrap();
let db_path = temp_dir.path().join("test.graph");
std::fs::write(&db_path, vec![0u8; 4096]).expect("Failed to create db file");
let (edge_store, _allocator) = create_test_edge_store(Some(db_path.clone()));
edge_store
.insert_edge(1, 2, Direction::Outgoing, None)
.expect("Insert 1->2 failed");
edge_store
.insert_edge(1, 3, Direction::Outgoing, None)
.expect("Insert 1->3 failed");
edge_store
.insert_edge(2, 4, Direction::Outgoing, None)
.expect("Insert 2->4 failed");
let result = edge_store.flush(None);
assert!(result.is_ok(), "Flush should succeed");
let file_size = std::fs::metadata(&db_path)
.expect("Failed to read file metadata")
.len();
assert!(
file_size > 4096,
"CRITICAL TODO: Database file should grow after flush writes dirty clusters"
);
}
#[test]
fn test_flush_updates_btree_index() {
let temp_dir = TempDir::new().unwrap();
let db_path = temp_dir.path().join("test.graph");
std::fs::write(&db_path, vec![0u8; 4096]).expect("Failed to create db file");
let (edge_store, _allocator) = create_test_edge_store(Some(db_path.clone()));
edge_store
.insert_edge(1, 2, Direction::Outgoing, None)
.expect("Insert failed");
edge_store
.insert_edge(1, 3, Direction::Outgoing, None)
.expect("Insert failed");
edge_store.flush(None).expect("Flush failed");
let btree = edge_store.btree.read();
let lookup_key = edge_key(1, Direction::Outgoing);
let lookup_result = btree.lookup(lookup_key);
assert!(
lookup_result.is_ok(),
"B+Tree lookup should succeed"
);
assert!(
lookup_result.unwrap().is_some(),
"B+Tree should contain edge page mapping for node 1 after flush"
);
}
#[test]
fn test_wal_checkpoint_after_flush() {
let temp_dir = TempDir::new().unwrap();
let db_path = temp_dir.path().join("test.graph");
let wal_path = db_path.with_extension("v3wal");
std::fs::write(&db_path, vec![0u8; 4096]).expect("Failed to create db file");
let (edge_store, _allocator) = create_test_edge_store(Some(db_path.clone()));
for i in 0..5 {
edge_store
.insert_edge(1, i as i64 + 10, Direction::Outgoing, None)
.expect(&format!("Insert iteration {} failed", i));
edge_store.flush(None).expect("Flush failed");
}
assert!(
!wal_path.exists(),
"WAL file should be truncated (removed) after flush"
);
}
#[test]
fn test_edge_recovery_after_crash() {
let temp_dir = TempDir::new().unwrap();
let db_path = temp_dir.path().join("test.graph");
let wal_path = db_path.with_extension("v3wal");
std::fs::write(&db_path, vec![0u8; 4096]).expect("Failed to create db file");
{
let (edge_store, _allocator) = create_test_edge_store(Some(db_path.clone()));
edge_store
.insert_edge(1, 2, Direction::Outgoing, None)
.expect("Insert failed");
edge_store
.insert_edge(1, 3, Direction::Outgoing, None)
.expect("Insert failed");
edge_store
.insert_edge(2, 4, Direction::Outgoing, None)
.expect("Insert failed");
edge_store.flush(None).expect("Flush failed");
assert!(
!wal_path.exists(),
"WAL file should be truncated (removed) after flush with checkpoint"
);
}
{
let (recovered_store, _allocator) = create_test_edge_store(Some(db_path.clone()));
let neighbors = recovered_store
.outgoing(1)
.expect("Failed to get neighbors");
assert!(
neighbors.len() >= 2,
"After recovery, node 1 should have at least 2 outgoing neighbors"
);
let neighbor_vec: Vec<i64> = neighbors.iter().copied().collect();
assert!(
neighbor_vec.contains(&2),
"Node 1 should have edge to node 2"
);
assert!(
neighbor_vec.contains(&3),
"Node 1 should have edge to node 3"
);
}
}
#[test]
fn test_data_persists_after_multiple_wal_truncations() {
let temp_dir = TempDir::new().unwrap();
let db_path = temp_dir.path().join("test.graph");
let wal_path = db_path.with_extension("v3wal");
std::fs::write(&db_path, vec![0u8; 4096]).expect("Failed to create db file");
{
let (edge_store, _allocator) = create_test_edge_store(Some(db_path.clone()));
for i in 0..5 {
edge_store
.insert_edge(1, i + 10, Direction::Outgoing, None)
.expect("Insert failed");
}
edge_store.flush(None).expect("Flush failed");
assert!(
!wal_path.exists(),
"WAL should be truncated after first flush"
);
for i in 0..5 {
edge_store
.insert_edge(2, i + 20, Direction::Outgoing, None)
.expect("Insert failed");
}
edge_store.flush(None).expect("Flush failed");
assert!(
!wal_path.exists(),
"WAL should be truncated after second flush"
);
}
let (recovered_store, _allocator) = create_test_edge_store(Some(db_path.clone()));
let neighbors1 = recovered_store
.outgoing(1)
.expect("Failed to get node 1 neighbors");
assert_eq!(
neighbors1.len(),
5,
"Node 1 should have 5 outgoing neighbors"
);
let neighbors2 = recovered_store
.outgoing(2)
.expect("Failed to get node 2 neighbors");
assert_eq!(
neighbors2.len(),
5,
"Node 2 should have 5 outgoing neighbors"
);
}
#[test]
fn test_flush_with_no_dirty_clusters() {
let temp_dir = TempDir::new().unwrap();
let db_path = temp_dir.path().join("test.graph");
let (edge_store, _allocator) = create_test_edge_store(Some(db_path));
let result = edge_store.flush(None);
assert!(result.is_ok(), "Flush with empty cache should succeed");
}
#[test]
fn test_multiple_flushes_idempotent() {
let temp_dir = TempDir::new().unwrap();
let db_path = temp_dir.path().join("test.graph");
std::fs::write(&db_path, vec![0u8; 4096]).expect("Failed to create db file");
let (edge_store, _allocator) = create_test_edge_store(Some(db_path.clone()));
edge_store
.insert_edge(1, 2, Direction::Outgoing, None)
.expect("Insert failed");
for _ in 0..3 {
edge_store.flush(None).expect("Flush failed");
}
}
#[test]
fn test_wal_edge_insert_record_format() {
use crate::backend::native::v3::wal::{V3_WAL_HEADER_SIZE, V3WALRecord, V3WALRecordType};
use std::fs;
let temp_dir = TempDir::new().unwrap();
let db_path = temp_dir.path().join("test.graph");
let wal_path = db_path.with_extension("v3wal");
let (edge_store, _allocator) = create_test_edge_store(Some(db_path.clone()));
edge_store
.insert_edge(1, 2, Direction::Outgoing, None)
.expect("Insert failed");
edge_store.flush_wal().expect("WAL flush failed");
let wal_content = fs::read(&wal_path).expect("Failed to read WAL");
assert!(
wal_content.len() > V3_WAL_HEADER_SIZE,
"WAL should have records beyond header"
);
let mut pos = V3_WAL_HEADER_SIZE;
let mut found_edge_insert = false;
while pos < wal_content.len() - 8 {
if pos + 4 > wal_content.len() {
break;
}
let size = u32::from_le_bytes([
wal_content[pos],
wal_content[pos + 1],
wal_content[pos + 2],
wal_content[pos + 3],
]) as usize;
pos += 4;
if pos + size > wal_content.len() || size == 0 {
break;
}
let record_bytes = &wal_content[pos..pos + size];
if let Ok(record) = V3WALRecord::from_bytes(record_bytes) {
if record.record_type() == V3WALRecordType::EdgeInsert {
found_edge_insert = true;
break;
}
}
pos += size;
}
assert!(
found_edge_insert,
"WAL should contain EdgeInsert record (type 12)"
);
}
#[test]
fn test_edge_type_serialization_roundtrip() {
let mut cluster = V3EdgeCluster::new(1, Direction::Outgoing, 100);
cluster.add_edge(2, Some("TEST_TYPE".to_string()));
assert_eq!(cluster.edges.len(), 1);
let edge = &cluster.edges[0];
assert!(
!edge.edge_data.is_empty(),
"edge_data should not be empty when edge_type is set"
);
let extracted = V3EdgeCluster::extract_edge_type(&edge.edge_data);
assert_eq!(extracted, Some("TEST_TYPE".to_string()));
let serialized = cluster.serialize().unwrap();
let deserialized = V3EdgeCluster::deserialize(&serialized, 100).unwrap();
assert_eq!(deserialized.edges.len(), 1);
let deser_edge = &deserialized.edges[0];
let deser_type = V3EdgeCluster::extract_edge_type(&deser_edge.edge_data);
assert_eq!(
deser_type,
Some("TEST_TYPE".to_string()),
"edge_type should survive serialization roundtrip"
);
}
#[test]
fn test_edges_with_types_extraction() {
let mut cluster = V3EdgeCluster::new(1, Direction::Outgoing, 100);
cluster.add_edge(2, Some("CALLS".to_string()));
cluster.add_edge(3, Some("USES".to_string()));
cluster.add_edge(4, None);
let edges_with_types = cluster.edges_with_types();
assert_eq!(edges_with_types.len(), 3);
assert_eq!(edges_with_types[0].0, 2);
assert_eq!(edges_with_types[0].1, Some("CALLS".to_string()));
assert_eq!(edges_with_types[1].0, 3);
assert_eq!(edges_with_types[1].1, Some("USES".to_string()));
assert_eq!(edges_with_types[2].0, 4);
assert_eq!(edges_with_types[2].1, None);
}
}