use super::*;
use crate::error::Result;
use crate::memory::SecureMemoryPool;
use std::sync::atomic::{AtomicU64, AtomicU32, AtomicU16, AtomicU8, Ordering};
use std::sync::{Arc, RwLock, Mutex};
use std::ptr::NonNull;
#[repr(align(64))] #[derive(Debug)]
pub struct CacheNode {
pub fi_offset: AtomicU64,
pub fi_prev: AtomicU32,
pub fi_next: AtomicU32,
pub lru_prev: AtomicU32,
pub lru_next: AtomicU32,
pub ref_count: AtomicU16,
pub is_loaded: AtomicU8,
pub hash_link: AtomicU32,
pub page_data: AtomicU64,
pub last_access: AtomicU64,
pub access_count: AtomicU32,
_padding: [u8; 8],
}
impl CacheNode {
pub fn new() -> Self {
Self {
fi_offset: AtomicU64::new(u64::MAX),
fi_prev: AtomicU32::new(INVALID_NODE),
fi_next: AtomicU32::new(INVALID_NODE),
lru_prev: AtomicU32::new(INVALID_NODE),
lru_next: AtomicU32::new(INVALID_NODE),
ref_count: AtomicU16::new(0),
is_loaded: AtomicU8::new(0),
hash_link: AtomicU32::new(INVALID_NODE),
page_data: AtomicU64::new(0),
last_access: AtomicU64::new(0),
access_count: AtomicU32::new(0),
_padding: [0; 8],
}
}
pub fn initialize(&self, file_id: FileId, page_id: PageId, page_ptr: *mut u8) {
let fi_offset = ((file_id as u64) << 32) | (page_id as u64);
self.fi_offset.store(fi_offset, Ordering::Relaxed);
self.page_data.store(page_ptr as u64, Ordering::Relaxed);
self.is_loaded.store(0, Ordering::Relaxed);
self.ref_count.store(0, Ordering::Relaxed);
self.access_count.store(0, Ordering::Relaxed);
self.update_last_access();
}
pub fn reset(&self) {
self.fi_offset.store(u64::MAX, Ordering::Relaxed);
self.fi_prev.store(INVALID_NODE, Ordering::Relaxed);
self.fi_next.store(INVALID_NODE, Ordering::Relaxed);
self.lru_prev.store(INVALID_NODE, Ordering::Relaxed);
self.lru_next.store(INVALID_NODE, Ordering::Relaxed);
self.ref_count.store(0, Ordering::Relaxed);
self.is_loaded.store(0, Ordering::Relaxed);
self.hash_link.store(INVALID_NODE, Ordering::Relaxed);
self.page_data.store(0, Ordering::Relaxed);
self.last_access.store(0, Ordering::Relaxed);
self.access_count.store(0, Ordering::Relaxed);
}
pub fn file_id(&self) -> FileId {
let fi_offset = self.fi_offset.load(Ordering::Relaxed);
(fi_offset >> 32) as FileId
}
pub fn page_id(&self) -> PageId {
let fi_offset = self.fi_offset.load(Ordering::Relaxed);
fi_offset as PageId
}
pub fn fi_offset_key(&self) -> u64 {
self.fi_offset.load(Ordering::Relaxed)
}
pub fn matches(&self, file_id: FileId, page_id: PageId) -> bool {
let expected = ((file_id as u64) << 32) | (page_id as u64);
self.fi_offset.load(Ordering::Relaxed) == expected
}
pub fn page_data_ptr(&self) -> *mut u8 {
self.page_data.load(Ordering::Relaxed) as *mut u8
}
pub fn is_page_loaded(&self) -> bool {
self.is_loaded.load(Ordering::Relaxed) != 0
}
pub fn mark_loaded(&self) {
self.is_loaded.store(1, Ordering::Release);
}
pub fn inc_ref(&self) -> u16 {
self.ref_count.fetch_add(1, Ordering::Relaxed)
}
pub fn dec_ref(&self) -> u16 {
self.ref_count.fetch_sub(1, Ordering::Relaxed) - 1
}
pub fn ref_count(&self) -> u16 {
self.ref_count.load(Ordering::Relaxed)
}
pub fn update_last_access(&self) {
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_nanos() as u64;
self.last_access.store(now, Ordering::Relaxed);
self.access_count.fetch_add(1, Ordering::Relaxed);
}
pub fn last_access_time(&self) -> u64 {
self.last_access.load(Ordering::Relaxed)
}
pub fn access_frequency(&self) -> u32 {
self.access_count.load(Ordering::Relaxed)
}
}
impl Default for CacheNode {
fn default() -> Self {
Self::new()
}
}
pub struct LruList {
head: AtomicU32,
tail: AtomicU32,
count: AtomicU32,
}
impl LruList {
pub fn new() -> Self {
Self {
head: AtomicU32::new(INVALID_NODE),
tail: AtomicU32::new(INVALID_NODE),
count: AtomicU32::new(0),
}
}
pub fn insert_head(&self, nodes: &[CacheNode], node_idx: NodeIndex) {
let old_head = self.head.load(Ordering::Relaxed);
nodes[node_idx as usize].lru_prev.store(INVALID_NODE, Ordering::Relaxed);
nodes[node_idx as usize].lru_next.store(old_head, Ordering::Relaxed);
if old_head != INVALID_NODE {
nodes[old_head as usize].lru_prev.store(node_idx, Ordering::Relaxed);
} else {
self.tail.store(node_idx, Ordering::Relaxed);
}
self.head.store(node_idx, Ordering::Relaxed);
self.count.fetch_add(1, Ordering::Relaxed);
}
pub fn remove(&self, nodes: &[CacheNode], node_idx: NodeIndex) {
let prev = nodes[node_idx as usize].lru_prev.load(Ordering::Relaxed);
let next = nodes[node_idx as usize].lru_next.load(Ordering::Relaxed);
if prev != INVALID_NODE {
nodes[prev as usize].lru_next.store(next, Ordering::Relaxed);
} else {
self.head.store(next, Ordering::Relaxed);
}
if next != INVALID_NODE {
nodes[next as usize].lru_prev.store(prev, Ordering::Relaxed);
} else {
self.tail.store(prev, Ordering::Relaxed);
}
nodes[node_idx as usize].lru_prev.store(INVALID_NODE, Ordering::Relaxed);
nodes[node_idx as usize].lru_next.store(INVALID_NODE, Ordering::Relaxed);
self.count.fetch_sub(1, Ordering::Relaxed);
}
pub fn move_to_head(&self, nodes: &[CacheNode], node_idx: NodeIndex) {
self.remove(nodes, node_idx);
self.insert_head(nodes, node_idx);
}
pub fn get_lru_node(&self) -> NodeIndex {
self.tail.load(Ordering::Relaxed)
}
pub fn get_mru_node(&self) -> NodeIndex {
self.head.load(Ordering::Relaxed)
}
pub fn count(&self) -> u32 {
self.count.load(Ordering::Relaxed)
}
#[inline]
pub fn is_empty(&self) -> bool {
self.count() == 0
}
}
impl Default for LruList {
fn default() -> Self {
Self::new()
}
}
pub struct HashTable {
buckets: Vec<AtomicU32>,
size: usize,
mask: u64,
collisions: AtomicU64,
max_probe_distance: AtomicU32,
}
impl HashTable {
pub fn new(size: usize) -> Self {
assert!(size.is_power_of_two(), "Hash table size must be power of 2");
let mut buckets = Vec::with_capacity(size);
buckets.resize_with(size, || AtomicU32::new(INVALID_NODE));
Self {
buckets,
size,
mask: (size - 1) as u64,
collisions: AtomicU64::new(0),
max_probe_distance: AtomicU32::new(0),
}
}
#[inline]
pub fn hash_index(&self, file_id: FileId, page_id: PageId) -> usize {
let hash = hash_file_page(file_id, page_id);
(hash & self.mask) as usize
}
pub fn find(&self, nodes: &[CacheNode], file_id: FileId, page_id: PageId) -> Option<NodeIndex> {
let bucket_idx = self.hash_index(file_id, page_id);
let mut current = self.buckets[bucket_idx].load(Ordering::Relaxed);
let mut probe_distance = 0;
while current != INVALID_NODE {
if current < nodes.len() as u32 {
prefetch_hint(&nodes[current as usize] as *const _ as *const u8);
}
if nodes[current as usize].matches(file_id, page_id) {
return Some(current);
}
current = nodes[current as usize].hash_link.load(Ordering::Relaxed);
probe_distance += 1;
let current_max = self.max_probe_distance.load(Ordering::Relaxed);
if probe_distance > current_max {
let _ = self.max_probe_distance.compare_exchange_weak(
current_max, probe_distance, Ordering::Relaxed, Ordering::Relaxed
);
}
}
None
}
pub fn insert(&self, nodes: &[CacheNode], node_idx: NodeIndex, file_id: FileId, page_id: PageId) {
let bucket_idx = self.hash_index(file_id, page_id);
let old_head = self.buckets[bucket_idx].load(Ordering::Relaxed);
nodes[node_idx as usize].hash_link.store(old_head, Ordering::Relaxed);
self.buckets[bucket_idx].store(node_idx, Ordering::Relaxed);
if old_head != INVALID_NODE {
self.collisions.fetch_add(1, Ordering::Relaxed);
}
}
pub fn remove(&self, nodes: &[CacheNode], node_idx: NodeIndex, file_id: FileId, page_id: PageId) {
let bucket_idx = self.hash_index(file_id, page_id);
let current = self.buckets[bucket_idx].load(Ordering::Relaxed);
if current == node_idx {
let next = nodes[node_idx as usize].hash_link.load(Ordering::Relaxed);
self.buckets[bucket_idx].store(next, Ordering::Relaxed);
} else {
let mut prev = current;
while prev != INVALID_NODE {
let next = nodes[prev as usize].hash_link.load(Ordering::Relaxed);
if next == node_idx {
let next_next = nodes[node_idx as usize].hash_link.load(Ordering::Relaxed);
nodes[prev as usize].hash_link.store(next_next, Ordering::Relaxed);
break;
}
prev = next;
}
}
nodes[node_idx as usize].hash_link.store(INVALID_NODE, Ordering::Relaxed);
}
pub fn collision_count(&self) -> u64 {
self.collisions.load(Ordering::Relaxed)
}
pub fn max_probe_distance(&self) -> u32 {
self.max_probe_distance.load(Ordering::Relaxed)
}
pub fn load_factor(&self, total_nodes: usize) -> f64 {
total_nodes as f64 / self.size as f64
}
#[inline]
pub fn size(&self) -> usize {
self.size
}
}
#[derive(Debug)]
pub struct FileInfo {
pub fd: i32,
pub head_page: AtomicU32,
pub file_size: AtomicU64,
pub status: AtomicU32,
pub cached_pages: AtomicU32,
}
impl FileInfo {
pub fn new(fd: i32) -> Self {
Self {
fd,
head_page: AtomicU32::new(INVALID_NODE),
file_size: AtomicU64::new(0),
status: AtomicU32::new(0),
cached_pages: AtomicU32::new(0),
}
}
pub fn is_open(&self) -> bool {
self.fd >= 0
}
pub fn mark_closed(&self) {
self.status.store(1, Ordering::Relaxed); }
pub fn is_closed(&self) -> bool {
self.status.load(Ordering::Relaxed) != 0
}
}