use crate::backend::native::NativeBackendError;
use crate::backend::native::NativeResult;
use crate::backend::native::types::NodeFlags;
use crate::backend::native::v3::allocator::PageAllocator;
use crate::backend::native::v3::btree::BTreeManager;
use crate::backend::native::v3::constants::{DEFAULT_PAGE_SIZE, V3_HEADER_SIZE};
use crate::backend::native::v3::file_coordinator::FileCoordinator;
#[cfg(feature = "v3-forensics")]
use crate::backend::native::v3::forensics::{FORENSIC_COUNTERS, PageType, Subsystem};
use crate::backend::native::v3::header::PersistentHeaderV3;
use crate::backend::native::v3::index::IndexPage;
use crate::backend::native::v3::node::{NodePage, NodeRecordV3};
use crate::backend::native::v3::wal::WALWriter;
use parking_lot::RwLock;
use std::collections::HashMap;
use std::fs::{File, OpenOptions};
use std::io::{Read, Seek, SeekFrom, Write};
use std::path::PathBuf;
use std::sync::Arc;
const MAX_TREE_HEIGHT: u32 = 10;
const PAGE_CACHE_SIZE: usize = 64;
pub const DEFAULT_CACHE_CAPACITY: usize = 64;
pub const MAX_CACHE_CAPACITY: usize = 256;
pub const MIN_CACHE_CAPACITY: usize = 1;
#[derive(Debug, Clone)]
pub struct TraversalCache {
cache: HashMap<u64, Arc<NodePage>>,
access_order: Vec<u64>,
capacity: usize,
hits: u64,
misses: u64,
}
impl TraversalCache {
pub fn new(capacity: usize) -> Self {
assert!((MIN_CACHE_CAPACITY..=MAX_CACHE_CAPACITY).contains(&capacity));
Self {
cache: HashMap::with_capacity(capacity),
access_order: Vec::with_capacity(capacity),
capacity,
hits: 0,
misses: 0,
}
}
pub fn with_default_capacity() -> Self {
Self::new(DEFAULT_CACHE_CAPACITY)
}
pub fn get(&mut self, page_id: u64) -> Option<Arc<NodePage>> {
if let Some(page) = self.cache.remove(&page_id) {
self.access_order.retain(|&id| id != page_id);
self.access_order.push(page_id);
self.cache.insert(page_id, page.clone());
self.hits += 1;
Some(page)
} else {
self.misses += 1;
None
}
}
pub fn insert(&mut self, page_id: u64, page: Arc<NodePage>) {
if self.cache.contains_key(&page_id) {
self.access_order.retain(|&id| id != page_id);
}
while self.cache.len() >= self.capacity {
if let Some(oldest_id) = self.access_order.first() {
self.cache.remove(oldest_id);
self.access_order.remove(0);
} else {
break;
}
}
self.access_order.push(page_id);
self.cache.insert(page_id, page);
}
pub fn invalidate(&mut self, page_id: u64) -> bool {
let was_present = self.cache.remove(&page_id).is_some();
self.access_order.retain(|&id| id != page_id);
was_present
}
pub fn clear(&mut self) {
self.cache.clear();
self.access_order.clear();
}
pub fn len(&self) -> usize {
self.cache.len()
}
pub fn is_empty(&self) -> bool {
self.cache.is_empty()
}
pub fn capacity(&self) -> usize {
self.capacity
}
pub fn contains(&self, page_id: &u64) -> bool {
self.cache.contains_key(page_id)
}
pub fn hits(&self) -> u64 {
self.hits
}
pub fn misses(&self) -> u64 {
self.misses
}
pub fn hit_ratio(&self) -> f64 {
let total = self.hits + self.misses;
if total == 0 {
return 0.0;
}
self.hits as f64 / total as f64
}
pub fn reset_stats(&mut self) {
self.hits = 0;
self.misses = 0;
}
}
impl Default for TraversalCache {
fn default() -> Self {
Self::with_default_capacity()
}
}
pub struct NodeStore {
db_path: PathBuf,
file_coordinator: Option<Arc<FileCoordinator>>,
root_page_id: u64,
tree_height: u32,
page_cache: Arc<RwLock<HashMap<u64, Vec<u8>>>>,
unpacked_page_cache: Arc<RwLock<HashMap<u64, Arc<NodePage>>>>,
cache_capacity: usize,
current_access_block: std::sync::atomic::AtomicI64,
block_preferred_pages: HashMap<i64, Vec<u64>>,
max_preferred_pages_per_block: usize,
index_cache: HashMap<u64, IndexPage>,
btree_manager: Option<BTreeManager>,
page_allocator: Option<Arc<RwLock<PageAllocator>>>,
wal_writer: Option<WALWriter>,
next_node_id: i64,
dirty_pages: HashMap<u64, NodePage>,
batch_mode: bool,
}
impl NodeStore {
pub fn new(header: &PersistentHeaderV3, db_path: PathBuf) -> Self {
NodeStore {
db_path,
file_coordinator: None,
root_page_id: header.root_index_page,
tree_height: header.btree_height,
page_cache: Arc::new(RwLock::new(HashMap::with_capacity(PAGE_CACHE_SIZE))),
unpacked_page_cache: Arc::new(RwLock::new(HashMap::with_capacity(PAGE_CACHE_SIZE))),
cache_capacity: PAGE_CACHE_SIZE,
current_access_block: std::sync::atomic::AtomicI64::new(-1),
block_preferred_pages: HashMap::new(),
max_preferred_pages_per_block: 3,
index_cache: HashMap::with_capacity(PAGE_CACHE_SIZE),
btree_manager: None,
page_allocator: None,
wal_writer: None,
next_node_id: 1, dirty_pages: HashMap::new(),
batch_mode: false,
}
}
pub fn with_capacity(
header: &PersistentHeaderV3,
db_path: PathBuf,
cache_capacity: usize,
) -> Self {
NodeStore {
db_path,
file_coordinator: None,
root_page_id: header.root_index_page,
tree_height: header.btree_height,
page_cache: Arc::new(RwLock::new(HashMap::with_capacity(cache_capacity))),
unpacked_page_cache: Arc::new(RwLock::new(HashMap::with_capacity(cache_capacity))),
cache_capacity,
current_access_block: std::sync::atomic::AtomicI64::new(-1),
block_preferred_pages: HashMap::new(),
max_preferred_pages_per_block: 3,
index_cache: HashMap::with_capacity(cache_capacity),
btree_manager: None,
page_allocator: None,
wal_writer: None,
next_node_id: 1,
dirty_pages: HashMap::new(),
batch_mode: false,
}
}
pub fn initialize(
&mut self,
btree_manager: BTreeManager,
page_allocator: Arc<RwLock<PageAllocator>>,
wal_writer: Option<WALWriter>,
) {
self.btree_manager = Some(btree_manager);
self.page_allocator = Some(page_allocator);
self.wal_writer = wal_writer;
}
pub fn set_wal_writer(&mut self, wal: WALWriter) {
self.wal_writer = Some(wal);
}
pub fn set_file_coordinator(&mut self, coordinator: Arc<FileCoordinator>) {
self.file_coordinator = Some(coordinator);
}
pub fn begin_batch(&mut self) {
self.batch_mode = true;
self.dirty_pages.clear();
}
pub fn commit_batch(&mut self) -> NativeResult<usize> {
if !self.batch_mode {
return Ok(0);
}
let page_count = self.dirty_pages.len();
if page_count > 0 {
if let Some(coordinator) = &self.file_coordinator {
for (page_id, page) in &self.dirty_pages {
let page_bytes = page.pack()?;
coordinator.write_page(*page_id, &page_bytes)?;
self.page_cache_insert(*page_id, page_bytes.to_vec());
}
self.dirty_pages.clear();
} else {
let file_exists = self.db_path.exists();
let mut file = OpenOptions::new()
.write(true)
.create(!file_exists) .open(&self.db_path)
.map_err(|e| NativeBackendError::IoError {
context: format!(
"Failed to open database file for batch commit: {}",
self.db_path.display()
),
source: e,
})?;
let mut required_len = file.metadata().map(|m| m.len()).unwrap_or(0);
for (page_id, page) in &self.dirty_pages {
let page_bytes = page.pack()?;
let offset = Self::page_offset(*page_id);
let page_end = offset + page_bytes.len() as u64;
if page_end > required_len {
required_len = page_end;
}
}
let current_len = file.metadata().map(|m| m.len()).unwrap_or(0);
if required_len > current_len {
file.set_len(required_len)
.map_err(|e| NativeBackendError::IoError {
context: format!(
"Failed to extend file to {} bytes for batch commit",
required_len
),
source: e,
})?;
}
for (page_id, page) in &self.dirty_pages {
let page_bytes = page.pack()?;
let offset = Self::page_offset(*page_id);
file.seek(SeekFrom::Start(offset)).map_err(|e| {
NativeBackendError::IoError {
context: format!(
"Failed to seek to page {} offset {}",
page_id, offset
),
source: e,
}
})?;
file.write_all(&page_bytes)
.map_err(|e| NativeBackendError::IoError {
context: format!(
"Failed to write page {} during batch commit",
page_id
),
source: e,
})?;
self.page_cache_insert(*page_id, page_bytes.to_vec());
}
file.sync_all().map_err(|e| NativeBackendError::IoError {
context: "Failed to sync batch commit to disk".to_string(),
source: e,
})?;
self.dirty_pages.clear();
}
}
self.batch_mode = false;
Ok(page_count)
}
pub fn rollback_batch(&mut self) {
self.dirty_pages.clear();
self.batch_mode = false;
}
pub fn is_batch_mode(&self) -> bool {
self.batch_mode
}
pub fn dirty_page_count(&self) -> usize {
self.dirty_pages.len()
}
fn btree_manager_mut(&mut self) -> NativeResult<&mut BTreeManager> {
self.btree_manager
.as_mut()
.ok_or_else(|| NativeBackendError::InvalidHeader {
field: "btree_manager".to_string(),
reason: "BTreeManager not initialized".to_string(),
})
}
fn page_allocator_mut(&self) -> NativeResult<parking_lot::RwLockWriteGuard<'_, PageAllocator>> {
self.page_allocator
.as_ref()
.ok_or_else(|| NativeBackendError::InvalidHeader {
field: "page_allocator".to_string(),
reason: "PageAllocator not initialized".to_string(),
})
.map(|arc| arc.write())
}
pub fn allocate_node_id(&mut self) -> i64 {
let id = self.next_node_id;
self.next_node_id += 1;
id
}
pub fn set_next_node_id(&mut self, next_id: i64) {
self.next_node_id = next_id;
}
pub fn insert_node(&mut self, mut node: NodeRecordV3) -> NativeResult<i64> {
#[cfg(feature = "v3-forensics")]
FORENSIC_COUNTERS
.node_encode_count
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
let node_id = self.allocate_node_id();
node.id = node_id;
let mut attempts = 0;
const MAX_ATTEMPTS: usize = 3;
loop {
attempts += 1;
let page_id = if attempts == 1 {
self.find_or_create_page_for_node(&node)?
} else {
let mut allocator = self.page_allocator_mut()?;
let new_page_id = allocator.allocate()?;
let new_page = NodePage::new(new_page_id);
let page_bytes = new_page.pack()?;
if let Some(coordinator) = &self.file_coordinator {
coordinator.write_page(new_page_id, &page_bytes)?;
}
new_page_id
};
let mut page = self.load_node_page(page_id)?;
match page.add_node(node.clone()) {
Ok(()) => {
self.write_node_page(&page)?;
let btree = self.btree_manager_mut()?;
btree.insert(node_id, page_id)?;
let new_root = btree.root_page_id();
let new_height = btree.tree_height();
self.root_page_id = new_root;
self.tree_height = new_height;
if !self.batch_mode {
if let Some(ref mut wal) = self.wal_writer {
let page_bytes = page.pack()?;
wal.page_write(page_id, 0, page_bytes.to_vec())?;
}
self.page_cache_insert(page_id, page.pack()?.to_vec());
}
return Ok(node_id);
}
Err(NativeBackendError::InvalidHeader { ref field, .. })
if field == "node_page" && attempts < MAX_ATTEMPTS =>
{
continue;
}
Err(e) => {
return Err(e);
}
}
}
}
pub fn update_node(&mut self, node_id: i64, node: NodeRecordV3) -> NativeResult<()> {
let page_id = match self.lookup_page(node_id)? {
Some(pid) => pid,
None => {
return Err(NativeBackendError::InvalidHeader {
field: "update_node".to_string(),
reason: format!("Node {} not found", node_id),
});
}
};
let mut page = self.load_node_page(page_id)?;
let mut found = false;
for (i, existing_node) in page.nodes.iter().enumerate() {
if existing_node.id() == node_id {
page.nodes[i] = node;
found = true;
break;
}
}
if !found {
return Err(NativeBackendError::InvalidHeader {
field: "update_node".to_string(),
reason: format!("Node {} not found in page {}", node_id, page_id),
});
}
page.used_bytes = page
.nodes
.iter()
.map(|n| self.estimate_node_size(n))
.sum::<NativeResult<u16>>()?;
self.write_node_page(&page)?;
if let Some(ref mut wal) = self.wal_writer {
let page_bytes = page.pack()?;
wal.page_write(page_id, 0, page_bytes.to_vec())?;
}
self.page_cache_insert(page_id, page.pack()?.to_vec());
Ok(())
}
pub fn delete_node(&mut self, node_id: i64) -> NativeResult<bool> {
let page_id = match self.lookup_page(node_id)? {
Some(pid) => pid,
None => return Ok(false),
};
let mut page = self.load_node_page(page_id)?;
let mut found = false;
for node in page.nodes.iter_mut() {
if node.id() == node_id {
node.flags = NodeFlags::DELETED;
found = true;
break;
}
}
if !found {
return Ok(false);
}
self.write_node_page(&page)?;
let btree = self.btree_manager_mut()?;
btree.delete(node_id)?;
if let Some(ref mut wal) = self.wal_writer {
let page_bytes = page.pack()?;
wal.page_write(page_id, 0, page_bytes.to_vec())?;
}
self.page_cache_insert(page_id, page.pack()?.to_vec());
Ok(true)
}
fn find_or_create_page_for_node(&mut self, node: &NodeRecordV3) -> NativeResult<u64> {
let node_size = self.estimate_node_size(node)?;
use super::page::node_id_to_block;
let block_id = node_id_to_block(node.id);
if let Some(preferred_pages) = self.block_preferred_pages.get(&block_id) {
for &page_id in preferred_pages.iter().rev() {
if let Some(page) = self.dirty_pages.get(&page_id) {
if page.capacity() >= node_size {
return Ok(page_id);
}
}
if let Some(page_bytes) = self.page_cache_get(page_id) {
if let Ok(page) = NodePage::unpack(&page_bytes) {
if page.capacity() >= node_size {
return Ok(page_id);
}
}
}
}
}
for (&page_id, page) in &self.dirty_pages {
let cap = page.capacity();
if cap >= node_size {
return Ok(page_id);
}
}
{
let cache = self.page_cache.read();
for (&page_id, page_bytes) in cache.iter() {
if self.dirty_pages.contains_key(&page_id) {
continue;
}
if let Ok(page) = NodePage::unpack(page_bytes) {
let cap = page.capacity();
if cap >= node_size {
return Ok(page_id);
}
}
}
}
let new_page_id = {
let mut allocator = self.page_allocator_mut()?;
allocator.allocate()?
};
self.associate_page_with_block(new_page_id, block_id);
let new_page = NodePage::new(new_page_id);
let page_bytes = new_page.pack()?;
if let Some(coordinator) = &self.file_coordinator {
coordinator.write_page(new_page_id, &page_bytes)?;
} else {
let offset = Self::page_offset(new_page_id);
let _required_len = offset + page_bytes.len() as u64;
let file_exists = self.db_path.exists();
let mut file = OpenOptions::new()
.write(true)
.create(!file_exists)
.open(&self.db_path)
.map_err(|e| NativeBackendError::IoError {
context: format!(
"Failed to open db file for new page write: {}",
self.db_path.display()
),
source: e,
})?;
file.seek(SeekFrom::Start(offset))
.map_err(|e| NativeBackendError::IoError {
context: format!(
"Failed to seek to offset {} for new page {}",
offset, new_page_id
),
source: e,
})?;
file.write_all(&page_bytes)
.map_err(|e| NativeBackendError::IoError {
context: format!("Failed to write new page {} to disk", new_page_id),
source: e,
})?;
file.sync_all().map_err(|e| NativeBackendError::IoError {
context: format!("Failed to sync new page {}", new_page_id),
source: e,
})?;
}
self.page_cache_insert(new_page_id, page_bytes.to_vec());
Ok(new_page_id)
}
fn associate_page_with_block(&mut self, page_id: u64, block_id: i64) {
let pages = self.block_preferred_pages.entry(block_id).or_default();
if !pages.contains(&page_id) {
pages.push(page_id);
while pages.len() > self.max_preferred_pages_per_block {
pages.remove(0);
}
}
}
fn load_node_page(&mut self, page_id: u64) -> NativeResult<NodePage> {
if let Some(page) = self.dirty_pages.get(&page_id) {
#[cfg(feature = "v3-forensics")]
FORENSIC_COUNTERS
.dirty_page_hit_count
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
return Ok(page.clone());
}
if let Some(cached) = self.page_cache_get(page_id) {
#[cfg(feature = "v3-forensics")]
FORENSIC_COUNTERS
.node_page_cache_hit_count
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
return NodePage::unpack(&cached);
}
let page_bytes = self.load_page_from_disk(page_id)?;
#[cfg(feature = "v3-forensics")]
FORENSIC_COUNTERS
.node_page_cache_miss_count
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
NodePage::unpack(&page_bytes)
}
fn write_node_page(&mut self, page: &NodePage) -> NativeResult<()> {
let page_id = page.page_id;
#[cfg(feature = "v3-forensics")]
{
FORENSIC_COUNTERS
.page_write_count
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
let offset = if self.file_coordinator.is_some() {
V3_HEADER_SIZE + (page_id.saturating_sub(1)) * DEFAULT_PAGE_SIZE
} else {
Self::page_offset(page_id)
};
crate::track_page_alloc!(page_id, Subsystem::NodeStore, PageType::Node);
crate::track_page_write!(
page_id,
Subsystem::NodeStore,
PageType::Node,
offset,
"NodeStore::write_node_page"
);
}
if self.batch_mode {
self.dirty_pages.insert(page_id, page.clone());
return Ok(());
}
let page_bytes = page.pack()?;
if let Some(coordinator) = &self.file_coordinator {
coordinator.write_page(page_id, &page_bytes)?;
return Ok(());
}
let offset = Self::page_offset(page_id);
let required_len = offset + page_bytes.len() as u64;
let file_exists = self.db_path.exists();
let mut file = OpenOptions::new()
.write(true)
.create(!file_exists) .open(&self.db_path)
.map_err(|e| NativeBackendError::IoError {
context: format!(
"Failed to open database file for writing: {}",
self.db_path.display()
),
source: e,
})?;
let current_len = file.metadata().map(|m| m.len()).unwrap_or(0);
if required_len > current_len {
file.set_len(required_len)
.map_err(|e| NativeBackendError::IoError {
context: format!(
"Failed to extend file to {} bytes for page {}",
required_len, page_id
),
source: e,
})?;
file.sync_all().map_err(|e| NativeBackendError::IoError {
context: format!("Failed to sync after extending file for page {}", page_id),
source: e,
})?;
}
file.seek(SeekFrom::Start(offset))
.map_err(|e| NativeBackendError::IoError {
context: format!("Failed to seek to page {} offset {}", page_id, offset),
source: e,
})?;
file.write_all(&page_bytes)
.map_err(|e| NativeBackendError::IoError {
context: format!("Failed to write page {}", page_id),
source: e,
})?;
file.sync_all().map_err(|e| NativeBackendError::IoError {
context: format!("Failed to sync page {}", page_id),
source: e,
})?;
Ok(())
}
fn estimate_node_size(&self, node: &NodeRecordV3) -> NativeResult<u16> {
use crate::backend::native::v3::compression::delta::encode_id_delta;
use crate::backend::native::v3::compression::varint::varint_size;
let mut size: usize = 0;
let base_id = 0;
let delta = encode_id_delta(node.id(), base_id);
size += varint_size(delta as u64);
size += 4;
size += varint_size(node.kind_offset as u64);
size += varint_size(node.name_offset as u64);
size += varint_size(node.data_len() as u64);
size += varint_size(node.outgoing_cluster_offset);
size += varint_size(node.outgoing_edge_count as u64);
size += varint_size(node.incoming_cluster_offset);
size += varint_size(node.incoming_edge_count as u64);
if let Some(ref data) = node.data_inline {
size += data.len();
} else if node.data_external_offset.is_some() {
size += 8; }
if size > u16::MAX as usize {
return Err(NativeBackendError::InvalidHeader {
field: "compressed_size".to_string(),
reason: format!("compressed size {} exceeds u16::MAX", size),
});
}
Ok(size as u16)
}
pub fn has_index(&self) -> bool {
self.root_page_id != 0
}
pub fn root_page_id_pub(&self) -> u64 {
self.root_page_id
}
pub fn tree_height_pub(&self) -> u32 {
self.tree_height
}
pub fn btree_root_page_id(&self) -> Option<u64> {
self.btree_manager.as_ref().and_then(|btree| {
let root = btree.root_page_id();
if root != 0 && root != u64::MAX {
Some(root)
} else {
None
}
})
}
pub fn btree_height(&self) -> Option<u32> {
self.btree_manager.as_ref().and_then(|btree| {
let height = btree.tree_height();
if height > 0 { Some(height) } else { None }
})
}
pub fn lookup_page(&mut self, node_id: i64) -> NativeResult<Option<u64>> {
if let Some(ref btree) = self.btree_manager {
return btree.lookup(node_id);
}
if self.root_page_id == 0 {
return Ok(None);
}
let search_key = node_id as u64;
let mut current_page_id = self.root_page_id;
let mut depth = 0;
while depth < self.tree_height as usize + MAX_TREE_HEIGHT as usize {
let index_page = if let Some(cached) = self.index_cache.get(¤t_page_id) {
cached.clone()
} else {
let page_bytes = self.load_page_from_disk(current_page_id)?;
let index = IndexPage::unpack(&page_bytes)?;
self.evict_index_cache_if_needed();
self.index_cache.insert(current_page_id, index.clone());
index
};
match index_page {
IndexPage::Leaf {
entries, next_leaf, ..
} => {
let result = IndexPage::binary_search_leaf(&entries, search_key);
return match result {
Ok(idx) => {
if let Some((_, page_id)) = entries.get(idx) {
Ok(Some(*page_id))
} else {
Err(NativeBackendError::InvalidHeader {
field: "btree_leaf".to_string(),
reason: "entry index out of bounds".to_string(),
})
}
}
Err(_idx) => {
if next_leaf == 0 {
Ok(None)
} else {
current_page_id = next_leaf;
continue;
}
}
};
}
IndexPage::Internal { keys, children, .. } => {
let child_idx = IndexPage::find_child_index(&keys, search_key);
if child_idx < children.len() {
current_page_id = children[child_idx];
} else {
return Err(NativeBackendError::InvalidHeader {
field: "btree_internal".to_string(),
reason: format!("child index {} out of bounds", child_idx),
});
}
}
}
depth += 1;
}
Err(NativeBackendError::InvalidHeader {
field: "btree_depth".to_string(),
reason: format!("exceeded maximum depth {}", MAX_TREE_HEIGHT),
})
}
pub fn lookup_node(&mut self, node_id: i64) -> NativeResult<Option<NodeRecordV3>> {
let page_id = match self.lookup_page(node_id)? {
Some(pid) => pid,
None => return Ok(None),
};
let node_page = self.load_node_page(page_id)?;
for node in &node_page.nodes {
if node.id() == node_id {
return Ok(Some(node.clone()));
}
}
Ok(None)
}
fn load_page_from_disk(&mut self, page_id: u64) -> NativeResult<Vec<u8>> {
if let Some(cached) = self.page_cache_get(page_id) {
#[cfg(feature = "v3-forensics")]
FORENSIC_COUNTERS
.node_cache_hit_count
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
return Ok(cached);
}
#[cfg(feature = "v3-forensics")]
{
FORENSIC_COUNTERS
.page_read_count
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
FORENSIC_COUNTERS
.node_cache_miss_count
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
}
let mut buffer = vec![0u8; DEFAULT_PAGE_SIZE as usize];
if let Some(coordinator) = &self.file_coordinator {
coordinator.read_page(page_id, &mut buffer)?;
} else {
let page_offset = Self::page_offset(page_id);
let mut file = File::open(&self.db_path).map_err(|e| NativeBackendError::IoError {
context: format!("Failed to open database file: {}", self.db_path.display()),
source: e,
})?;
file.seek(SeekFrom::Start(page_offset))
.map_err(|e| NativeBackendError::IoError {
context: format!("Failed to seek to page {}", page_id),
source: e,
})?;
file.read_exact(&mut buffer)
.map_err(|e| NativeBackendError::IoError {
context: format!("Failed to read page {}", page_id),
source: e,
})?;
}
self.page_cache_insert(page_id, buffer.clone());
Ok(buffer)
}
fn page_offset(page_id: u64) -> u64 {
if page_id == 0 {
return 0;
}
let data_page_index = page_id.saturating_sub(1);
V3_HEADER_SIZE + data_page_index * DEFAULT_PAGE_SIZE
}
#[inline]
fn extract_block_id_from_page_bytes(page_bytes: &[u8]) -> Option<i64> {
use super::page::BLOCK_SIZE;
if page_bytes.len() < 28 {
return None;
}
let base_id = i64::from_be_bytes(page_bytes[20..28].try_into().ok()?);
let block_id = if base_id < 1 {
0
} else {
(base_id - 1) / BLOCK_SIZE
};
Some(block_id)
}
fn evict_index_cache_if_needed(&mut self) {
if self.index_cache.len() >= self.cache_capacity {
if let Some(key) = self.index_cache.keys().next().copied() {
self.index_cache.remove(&key);
}
}
}
pub fn clear_cache(&mut self) {
self.page_cache.write().clear();
self.index_cache.clear();
}
pub fn cache_stats(&self) -> (usize, usize) {
(self.page_cache.read().len(), self.index_cache.len())
}
fn page_cache_get(&self, page_id: u64) -> Option<Vec<u8>> {
self.page_cache.read().get(&page_id).cloned()
}
fn page_cache_insert(&self, page_id: u64, data: Vec<u8>) {
if let Some(block_id) = Self::extract_block_id_from_page_bytes(&data) {
self.current_access_block
.store(block_id, std::sync::atomic::Ordering::Relaxed);
}
self.unpacked_page_cache_invalidate(page_id);
let mut cache = self.page_cache.write();
cache.insert(page_id, data);
if cache.len() > self.cache_capacity {
if let Some(key) = cache.keys().next().copied() {
cache.remove(&key);
}
}
}
fn page_cache_insert_if_absent(&self, page_id: u64, data: Vec<u8>) {
if let Some(block_id) = Self::extract_block_id_from_page_bytes(&data) {
self.current_access_block
.store(block_id, std::sync::atomic::Ordering::Relaxed);
}
{
let cache_read = self.page_cache.read();
if cache_read.contains_key(&page_id) {
return;
}
}
let mut cache = self.page_cache.write();
if cache.contains_key(&page_id) {
return;
}
cache.insert(page_id, data);
if cache.len() > self.cache_capacity {
if let Some(key) = cache.keys().next().copied() {
cache.remove(&key);
}
}
}
fn unpacked_page_cache_get(&self, page_id: u64) -> Option<Arc<NodePage>> {
let cache = self.unpacked_page_cache.read();
cache.get(&page_id).cloned()
}
fn unpacked_page_cache_insert(&self, page_id: u64, page: Arc<NodePage>) {
let mut cache = self.unpacked_page_cache.write();
cache.insert(page_id, page);
if cache.len() > self.cache_capacity {
if let Some(key) = cache.keys().next().copied() {
cache.remove(&key);
}
}
}
fn unpacked_page_cache_invalidate(&self, page_id: u64) {
let mut cache = self.unpacked_page_cache.write();
cache.remove(&page_id);
}
pub fn update_root(&mut self, new_root: u64) {
self.root_page_id = new_root;
self.index_cache.clear();
}
pub fn update_tree_height(&mut self, new_height: u32) {
self.tree_height = new_height;
}
pub fn is_valid_node_id(&self, node_id: i64) -> bool {
if node_id <= 0 {
return false;
}
if !self.has_index() {
return false;
}
true
}
pub fn lookup_node_ro(&self, node_id: i64) -> NativeResult<Option<NodeRecordV3>> {
#[cfg(feature = "v3-forensics")]
FORENSIC_COUNTERS
.node_decode_count
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
let page_id = match self.lookup_page_ro(node_id)? {
Some(pid) => pid,
None => return Ok(None),
};
if let Some(cached_page) = self.unpacked_page_cache_get(page_id) {
#[cfg(feature = "v3-forensics")]
{
FORENSIC_COUNTERS
.node_page_cache_hit_count
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
FORENSIC_COUNTERS.node_linear_scan_steps.fetch_add(
(cached_page.nodes.len().ilog2().max(1)) as u64,
std::sync::atomic::Ordering::Relaxed,
);
}
return match cached_page.find_node(node_id) {
Some(node_ref) => Ok(Some(node_ref.clone())),
None => Ok(None),
};
}
#[cfg(feature = "v3-forensics")]
FORENSIC_COUNTERS
.node_page_cache_miss_count
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
let page_data = self.load_page_cache_ro(page_id)?;
let page = NodePage::unpack(&page_data)?;
let result = match page.find_node(node_id) {
Some(node_ref) => Ok(Some(node_ref.clone())),
None => Ok(None),
};
self.unpacked_page_cache_insert(page_id, Arc::new(page));
result
}
fn load_page_cache_ro(&self, page_id: u64) -> NativeResult<Vec<u8>> {
#[cfg(feature = "v3-forensics")]
FORENSIC_COUNTERS
.page_read_count
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
if let Some(cached) = self.page_cache_get(page_id) {
#[cfg(feature = "v3-forensics")]
FORENSIC_COUNTERS
.node_page_cache_hit_count
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
return Ok(cached);
}
#[cfg(feature = "v3-forensics")]
FORENSIC_COUNTERS
.node_page_cache_miss_count
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
let mut buffer = vec![0u8; DEFAULT_PAGE_SIZE as usize];
if let Some(coordinator) = &self.file_coordinator {
coordinator.read_page(page_id, &mut buffer)?;
} else {
let page_offset = Self::page_offset(page_id);
let mut file = File::open(&self.db_path).map_err(|e| NativeBackendError::IoError {
context: format!("Failed to open database file: {}", self.db_path.display()),
source: e,
})?;
file.seek(SeekFrom::Start(page_offset))
.map_err(|e| NativeBackendError::IoError {
context: format!("Failed to seek to page {}", page_id),
source: e,
})?;
file.read_exact(&mut buffer)
.map_err(|e| NativeBackendError::IoError {
context: format!("Failed to read page {}", page_id),
source: e,
})?;
}
self.page_cache_insert_if_absent(page_id, buffer.clone());
Ok(buffer)
}
fn lookup_page_ro(&self, node_id: i64) -> NativeResult<Option<u64>> {
if let Some(ref btree) = self.btree_manager {
return btree.lookup(node_id);
}
Ok(None)
}
}
pub struct PageLoader {
file: Arc<File>,
page_size: usize,
header_size: u64,
}
impl PageLoader {
pub fn new(file: Arc<File>, page_size: usize) -> Self {
PageLoader {
file,
page_size,
header_size: V3_HEADER_SIZE,
}
}
pub fn with_default_page_size(file: Arc<File>) -> Self {
Self::new(file, DEFAULT_PAGE_SIZE as usize)
}
pub fn load_page(&self, page_id: u64) -> NativeResult<NodePage> {
if page_id == 0 {
return Err(NativeBackendError::InvalidHeader {
field: "page_id".to_string(),
reason: "Cannot load header page (page 0) as NodePage".to_string(),
});
}
let page_bytes = self.load_page_bytes(page_id)?;
let page = NodePage::unpack(&page_bytes)?;
Ok(page)
}
pub fn load_page_bytes(&self, page_id: u64) -> NativeResult<Vec<u8>> {
if page_id == 0 {
return Err(NativeBackendError::InvalidHeader {
field: "page_id".to_string(),
reason: "Cannot load header page (page 0) bytes".to_string(),
});
}
let offset = Self::page_offset(page_id);
let mut file = self
.file
.try_clone()
.map_err(|_| NativeBackendError::IoError {
context: "Failed to clone file handle for page read".to_string(),
source: std::io::Error::new(std::io::ErrorKind::Other, "Arc clone failed"),
})?;
file.seek(SeekFrom::Start(offset))
.map_err(|e| NativeBackendError::IoError {
context: format!("Failed to seek to page {} offset {}", page_id, offset),
source: e,
})?;
let mut buffer = vec![0u8; self.page_size];
let bytes_read = file
.read(&mut buffer)
.map_err(|e| NativeBackendError::IoError {
context: format!("Failed to read page {} at offset {}", page_id, offset),
source: e,
})?;
if bytes_read != self.page_size {
return Err(NativeBackendError::IoError {
context: format!(
"Incomplete page read: expected {} bytes, got {}",
self.page_size, bytes_read
),
source: std::io::Error::new(std::io::ErrorKind::UnexpectedEof, "Page truncated"),
});
}
Ok(buffer)
}
pub fn page_offset(page_id: u64) -> u64 {
if page_id == 0 {
return 0;
}
let data_page_index = page_id.saturating_sub(1);
V3_HEADER_SIZE + data_page_index * DEFAULT_PAGE_SIZE
}
pub fn validate_page_checksum(&self, page_bytes: &[u8]) -> NativeResult<()> {
use crate::backend::native::v3::node::page::constants;
if page_bytes.len() < constants::PAGE_HEADER_SIZE {
return Err(NativeBackendError::InvalidHeader {
field: "page_checksum".to_string(),
reason: format!(
"Insufficient bytes for checksum: need at least {}, got {}",
constants::PAGE_HEADER_SIZE,
page_bytes.len()
),
});
}
let checksum_offset = constants::CHECKSUM_OFFSET;
let stored_checksum = u32::from_be_bytes(
page_bytes[checksum_offset..checksum_offset + 4]
.try_into()
.map_err(|_| NativeBackendError::InvalidHeader {
field: "checksum".to_string(),
reason: "Failed to read checksum bytes".to_string(),
})?,
);
let calculated_checksum =
crate::backend::native::v3::constants::checksum::xor_checksum(page_bytes) as u32;
if calculated_checksum != stored_checksum {
return Err(NativeBackendError::InvalidChecksum {
expected: stored_checksum as u64,
found: calculated_checksum as u64,
});
}
Ok(())
}
pub fn page_size(&self) -> usize {
self.page_size
}
pub fn header_size(&self) -> u64 {
self.header_size
}
pub fn load_pages<'a, I>(&self, page_ids: I) -> Vec<NativeResult<NodePage>>
where
I: IntoIterator<Item = &'a u64>,
{
page_ids
.into_iter()
.map(|&page_id| self.load_page(page_id))
.collect()
}
}
pub struct TraversalCacheBuilder {
capacity: Option<usize>,
}
impl TraversalCacheBuilder {
pub fn new() -> Self {
Self { capacity: None }
}
pub fn with_capacity(mut self, capacity: usize) -> Self {
self.capacity = Some(capacity);
self
}
pub fn build(self) -> NativeResult<TraversalCache> {
let capacity = self.capacity.unwrap_or(DEFAULT_CACHE_CAPACITY);
if !(MIN_CACHE_CAPACITY..=MAX_CACHE_CAPACITY).contains(&capacity) {
return Err(NativeBackendError::InvalidParameter {
context: "TraversalCache capacity".to_string(),
source: None,
});
}
Ok(TraversalCache::new(capacity))
}
}
impl Default for TraversalCacheBuilder {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
fn test_page(page_id: u64) -> Arc<NodePage> {
Arc::new(NodePage::new(page_id))
}
#[test]
fn test_cache_creation() {
let cache = TraversalCache::new(16);
assert_eq!(cache.capacity(), 16);
}
#[test]
fn test_node_store_new() {
let header = PersistentHeaderV3::new_v3();
let db_path = PathBuf::from("/tmp/test.db");
let store = NodeStore::new(&header, db_path);
assert_eq!(store.root_page_id_pub(), 0);
}
#[test]
fn test_page_offset_calculation() {
assert_eq!(NodeStore::page_offset(1), V3_HEADER_SIZE);
assert_eq!(
NodeStore::page_offset(2),
V3_HEADER_SIZE + DEFAULT_PAGE_SIZE
);
}
#[test]
fn test_constants() {
assert_eq!(MAX_TREE_HEIGHT, 10);
assert_eq!(PAGE_CACHE_SIZE, 64);
}
#[test]
fn test_page_loader_creation() {
let _header = PersistentHeaderV3::new_v3();
let db_path = PathBuf::from("/tmp/test.db");
let _ = std::fs::File::create(&db_path).unwrap();
let file = Arc::new(File::open(&db_path).unwrap());
let page_size = 4096;
let loader = PageLoader::new(file.clone(), page_size);
assert_eq!(loader.page_size(), 4096);
assert_eq!(loader.header_size(), V3_HEADER_SIZE);
let loader_default = PageLoader::with_default_page_size(file);
assert_eq!(loader_default.page_size(), 4096);
}
#[test]
fn test_page_loader_offset_calculation() {
assert_eq!(PageLoader::page_offset(1), V3_HEADER_SIZE);
assert_eq!(
PageLoader::page_offset(2),
V3_HEADER_SIZE + DEFAULT_PAGE_SIZE
);
assert_eq!(PageLoader::page_offset(0), 0);
}
#[test]
fn test_traversal_cache_builder() {
let builder = TraversalCacheBuilder::new();
assert!(builder.capacity.is_none());
let cache = builder.with_capacity(32).build().unwrap();
assert_eq!(cache.capacity(), 32);
}
#[test]
fn test_traversal_cache_builder_invalid_capacity() {
let builder = TraversalCacheBuilder::new();
let result = builder.with_capacity(MAX_CACHE_CAPACITY + 1).build();
assert!(result.is_err());
}
#[test]
fn test_traversal_cache_builder_default() {
let cache = TraversalCacheBuilder::default().build().unwrap();
assert_eq!(cache.capacity(), DEFAULT_CACHE_CAPACITY);
}
}