use std::collections::HashSet;
use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
use std::sync::{Arc, RwLock};
pub type NodeId = u64;
pub type PageId = u64;
#[derive(Debug, Clone)]
pub struct PerNodeLogConfig {
pub max_inline_log_size: usize,
pub max_log_size: usize,
pub compaction_threshold: f64,
pub max_log_entries: usize,
pub background_compaction: bool,
pub compaction_batch_size: usize,
pub parallel_recovery: bool,
pub recovery_threads: usize,
pub track_dirty_nodes: bool,
pub enabled: bool,
}
impl Default for PerNodeLogConfig {
fn default() -> Self {
Self {
max_inline_log_size: 64,
max_log_size: 4096,
compaction_threshold: 1.0,
max_log_entries: 100,
background_compaction: true,
compaction_batch_size: 16,
parallel_recovery: true,
recovery_threads: std::thread::available_parallelism()
.map(|p| p.get().min(16))
.unwrap_or(4),
track_dirty_nodes: true,
enabled: false,
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum NodeLogEntry {
InsertChild {
key: u8,
child_id: NodeId,
},
RemoveChild {
key: u8,
},
SetValue {
value: Vec<u8>,
},
ClearValue,
SetPrefix {
prefix: Vec<u8>,
},
}
mod log_entry_type {
pub const INSERT_CHILD: u8 = 0x01;
pub const REMOVE_CHILD: u8 = 0x02;
pub const SET_VALUE: u8 = 0x03;
pub const CLEAR_VALUE: u8 = 0x04;
pub const SET_PREFIX: u8 = 0x05;
}
impl NodeLogEntry {
pub fn serialize(&self) -> Vec<u8> {
let mut buf = Vec::with_capacity(16);
match self {
Self::InsertChild { key, child_id } => {
buf.push(log_entry_type::INSERT_CHILD);
buf.push(*key);
buf.extend_from_slice(&child_id.to_le_bytes());
}
Self::RemoveChild { key } => {
buf.push(log_entry_type::REMOVE_CHILD);
buf.push(*key);
}
Self::SetValue { value } => {
buf.push(log_entry_type::SET_VALUE);
buf.extend_from_slice(&(value.len() as u16).to_le_bytes());
buf.extend_from_slice(value);
}
Self::ClearValue => {
buf.push(log_entry_type::CLEAR_VALUE);
}
Self::SetPrefix { prefix } => {
buf.push(log_entry_type::SET_PREFIX);
buf.push(prefix.len() as u8);
buf.extend_from_slice(prefix);
}
}
buf
}
pub fn deserialize(data: &[u8]) -> Option<(Self, usize)> {
if data.is_empty() {
return None;
}
match data[0] {
log_entry_type::INSERT_CHILD => {
if data.len() < 10 {
return None;
}
let key = data[1];
let child_id = u64::from_le_bytes(data[2..10].try_into().ok()?);
Some((Self::InsertChild { key, child_id }, 10))
}
log_entry_type::REMOVE_CHILD => {
if data.len() < 2 {
return None;
}
let key = data[1];
Some((Self::RemoveChild { key }, 2))
}
log_entry_type::SET_VALUE => {
if data.len() < 3 {
return None;
}
let len = u16::from_le_bytes(data[1..3].try_into().ok()?) as usize;
if data.len() < 3 + len {
return None;
}
let value = data[3..3 + len].to_vec();
Some((Self::SetValue { value }, 3 + len))
}
log_entry_type::CLEAR_VALUE => Some((Self::ClearValue, 1)),
log_entry_type::SET_PREFIX => {
if data.len() < 2 {
return None;
}
let len = data[1] as usize;
if data.len() < 2 + len {
return None;
}
let prefix = data[2..2 + len].to_vec();
Some((Self::SetPrefix { prefix }, 2 + len))
}
_ => None,
}
}
pub fn serialized_size(&self) -> usize {
match self {
Self::InsertChild { .. } => 10,
Self::RemoveChild { .. } => 2,
Self::SetValue { value } => 3 + value.len(),
Self::ClearValue => 1,
Self::SetPrefix { prefix } => 2 + prefix.len(),
}
}
}
#[derive(Debug, Clone)]
pub struct InlineLog {
data: Vec<u8>,
capacity: usize,
len: usize,
entry_count: usize,
}
impl InlineLog {
pub fn new(capacity: usize) -> Self {
Self {
data: vec![0u8; capacity],
capacity,
len: 0,
entry_count: 0,
}
}
pub fn from_data(data: Vec<u8>, entry_count: usize) -> Self {
let len = data.len();
let capacity = data.capacity().max(len);
Self {
data,
capacity,
len,
entry_count,
}
}
pub fn available_space(&self) -> usize {
self.capacity.saturating_sub(self.len)
}
pub fn used_space(&self) -> usize {
self.len
}
pub fn entry_count(&self) -> usize {
self.entry_count
}
pub fn is_empty(&self) -> bool {
self.len == 0
}
pub fn try_append(&mut self, entry: &NodeLogEntry) -> bool {
let serialized = entry.serialize();
if serialized.len() > self.available_space() {
return false;
}
let start = self.len;
let end = start + serialized.len();
if end > self.data.len() {
self.data.resize(end.max(self.capacity), 0);
}
self.data[start..end].copy_from_slice(&serialized);
self.len = end;
self.entry_count += 1;
true
}
pub fn as_slice(&self) -> &[u8] {
&self.data[..self.len]
}
pub fn clear(&mut self) {
self.len = 0;
self.entry_count = 0;
}
pub fn iter(&self) -> InlineLogIter<'_> {
InlineLogIter {
data: self.as_slice(),
offset: 0,
}
}
}
pub struct InlineLogIter<'a> {
data: &'a [u8],
offset: usize,
}
impl<'a> Iterator for InlineLogIter<'a> {
type Item = NodeLogEntry;
fn next(&mut self) -> Option<Self::Item> {
if self.offset >= self.data.len() {
return None;
}
let (entry, consumed) = NodeLogEntry::deserialize(&self.data[self.offset..])?;
self.offset += consumed;
Some(entry)
}
}
#[derive(Debug, Clone, Default)]
pub struct PerNodeLogStats {
pub entries_written: u64,
pub entries_read: u64,
pub inline_bytes_written: u64,
pub overflow_bytes_written: u64,
pub dirty_node_count: usize,
pub compactions: u64,
pub overflow_allocations: u64,
pub overflow_frees: u64,
pub recovery_time_us: u64,
pub parallel_recoveries: u64,
}
#[derive(Debug)]
pub struct PerNodeLogStatsAtomic {
entries_written: AtomicU64,
entries_read: AtomicU64,
inline_bytes_written: AtomicU64,
overflow_bytes_written: AtomicU64,
dirty_node_count: AtomicUsize,
compactions: AtomicU64,
overflow_allocations: AtomicU64,
overflow_frees: AtomicU64,
}
impl PerNodeLogStatsAtomic {
pub fn new() -> Self {
Self {
entries_written: AtomicU64::new(0),
entries_read: AtomicU64::new(0),
inline_bytes_written: AtomicU64::new(0),
overflow_bytes_written: AtomicU64::new(0),
dirty_node_count: AtomicUsize::new(0),
compactions: AtomicU64::new(0),
overflow_allocations: AtomicU64::new(0),
overflow_frees: AtomicU64::new(0),
}
}
pub fn record_entry_written(&self, bytes: usize, is_overflow: bool) {
self.entries_written.fetch_add(1, Ordering::Relaxed);
if is_overflow {
self.overflow_bytes_written
.fetch_add(bytes as u64, Ordering::Relaxed);
} else {
self.inline_bytes_written
.fetch_add(bytes as u64, Ordering::Relaxed);
}
}
pub fn record_entry_read(&self) {
self.entries_read.fetch_add(1, Ordering::Relaxed);
}
pub fn set_dirty_count(&self, count: usize) {
self.dirty_node_count.store(count, Ordering::Relaxed);
}
pub fn increment_dirty_count(&self) {
self.dirty_node_count.fetch_add(1, Ordering::Relaxed);
}
pub fn decrement_dirty_count(&self) {
self.dirty_node_count.fetch_sub(1, Ordering::Relaxed);
}
pub fn record_compaction(&self) {
self.compactions.fetch_add(1, Ordering::Relaxed);
}
pub fn record_overflow_alloc(&self) {
self.overflow_allocations.fetch_add(1, Ordering::Relaxed);
}
pub fn record_overflow_free(&self) {
self.overflow_frees.fetch_add(1, Ordering::Relaxed);
}
pub fn snapshot(&self) -> PerNodeLogStats {
PerNodeLogStats {
entries_written: self.entries_written.load(Ordering::Relaxed),
entries_read: self.entries_read.load(Ordering::Relaxed),
inline_bytes_written: self.inline_bytes_written.load(Ordering::Relaxed),
overflow_bytes_written: self.overflow_bytes_written.load(Ordering::Relaxed),
dirty_node_count: self.dirty_node_count.load(Ordering::Relaxed),
compactions: self.compactions.load(Ordering::Relaxed),
overflow_allocations: self.overflow_allocations.load(Ordering::Relaxed),
overflow_frees: self.overflow_frees.load(Ordering::Relaxed),
recovery_time_us: 0,
parallel_recoveries: 0,
}
}
}
impl Default for PerNodeLogStatsAtomic {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug)]
pub struct DirtyNodeTracker {
dirty_nodes: RwLock<HashSet<NodeId>>,
stats: Arc<PerNodeLogStatsAtomic>,
}
impl DirtyNodeTracker {
pub fn new(stats: Arc<PerNodeLogStatsAtomic>) -> Self {
Self {
dirty_nodes: RwLock::new(HashSet::new()),
stats,
}
}
pub fn mark_dirty(&self, node_id: NodeId) {
let mut dirty = self.dirty_nodes.write().expect("lock poisoned");
if dirty.insert(node_id) {
self.stats.increment_dirty_count();
}
}
pub fn mark_clean(&self, node_id: NodeId) {
let mut dirty = self.dirty_nodes.write().expect("lock poisoned");
if dirty.remove(&node_id) {
self.stats.decrement_dirty_count();
}
}
pub fn is_dirty(&self, node_id: NodeId) -> bool {
let dirty = self.dirty_nodes.read().expect("lock poisoned");
dirty.contains(&node_id)
}
pub fn get_dirty_nodes(&self) -> Vec<NodeId> {
let dirty = self.dirty_nodes.read().expect("lock poisoned");
dirty.iter().copied().collect()
}
pub fn dirty_count(&self) -> usize {
let dirty = self.dirty_nodes.read().expect("lock poisoned");
dirty.len()
}
pub fn clear(&self) {
let mut dirty = self.dirty_nodes.write().expect("lock poisoned");
dirty.clear();
self.stats.set_dirty_count(0);
}
}
#[derive(Debug, Clone)]
pub struct NodeRecoveryResult {
pub node_id: NodeId,
pub entries_replayed: usize,
pub time_us: u64,
pub success: bool,
pub error: Option<String>,
}
#[derive(Debug, Clone, Default)]
pub struct RecoveryResult {
pub nodes_recovered: usize,
pub entries_replayed: usize,
pub total_time_us: u64,
pub failures: usize,
pub node_results: Vec<NodeRecoveryResult>,
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_config_defaults() {
let config = PerNodeLogConfig::default();
assert_eq!(config.max_inline_log_size, 64);
assert_eq!(config.max_log_size, 4096);
assert!((config.compaction_threshold - 1.0).abs() < f64::EPSILON);
assert_eq!(config.max_log_entries, 100);
assert!(config.background_compaction);
assert!(config.parallel_recovery);
assert!(config.track_dirty_nodes);
assert!(!config.enabled);
}
#[test]
fn test_insert_child_serialization() {
let entry = NodeLogEntry::InsertChild {
key: 0x42,
child_id: 12345678901234567890,
};
let serialized = entry.serialize();
assert_eq!(serialized.len(), 10);
assert_eq!(serialized[0], log_entry_type::INSERT_CHILD);
assert_eq!(serialized[1], 0x42);
let (deserialized, consumed) = NodeLogEntry::deserialize(&serialized).unwrap();
assert_eq!(consumed, 10);
assert_eq!(entry, deserialized);
}
#[test]
fn test_remove_child_serialization() {
let entry = NodeLogEntry::RemoveChild { key: 0xFF };
let serialized = entry.serialize();
assert_eq!(serialized.len(), 2);
let (deserialized, consumed) = NodeLogEntry::deserialize(&serialized).unwrap();
assert_eq!(consumed, 2);
assert_eq!(entry, deserialized);
}
#[test]
fn test_set_value_serialization() {
let entry = NodeLogEntry::SetValue {
value: vec![1, 2, 3, 4, 5],
};
let serialized = entry.serialize();
assert_eq!(serialized.len(), 8);
let (deserialized, consumed) = NodeLogEntry::deserialize(&serialized).unwrap();
assert_eq!(consumed, 8);
assert_eq!(entry, deserialized);
}
#[test]
fn test_clear_value_serialization() {
let entry = NodeLogEntry::ClearValue;
let serialized = entry.serialize();
assert_eq!(serialized.len(), 1);
let (deserialized, consumed) = NodeLogEntry::deserialize(&serialized).unwrap();
assert_eq!(consumed, 1);
assert_eq!(entry, deserialized);
}
#[test]
fn test_set_prefix_serialization() {
let entry = NodeLogEntry::SetPrefix {
prefix: b"hello".to_vec(),
};
let serialized = entry.serialize();
assert_eq!(serialized.len(), 7);
let (deserialized, consumed) = NodeLogEntry::deserialize(&serialized).unwrap();
assert_eq!(consumed, 7);
assert_eq!(entry, deserialized);
}
#[test]
fn test_inline_log_basic_operations() {
let mut log = InlineLog::new(64);
assert!(log.is_empty());
assert_eq!(log.available_space(), 64);
let entry1 = NodeLogEntry::InsertChild {
key: 0x01,
child_id: 100,
};
assert!(log.try_append(&entry1));
assert_eq!(log.entry_count(), 1);
assert_eq!(log.used_space(), 10);
let entry2 = NodeLogEntry::RemoveChild { key: 0x02 };
assert!(log.try_append(&entry2));
assert_eq!(log.entry_count(), 2);
assert_eq!(log.used_space(), 12);
}
#[test]
fn test_inline_log_overflow() {
let mut log = InlineLog::new(16);
let entry1 = NodeLogEntry::InsertChild {
key: 0x01,
child_id: 100,
};
assert!(log.try_append(&entry1));
let entry2 = NodeLogEntry::InsertChild {
key: 0x02,
child_id: 200,
};
assert!(!log.try_append(&entry2));
let entry3 = NodeLogEntry::RemoveChild { key: 0x03 };
assert!(log.try_append(&entry3));
}
#[test]
fn test_inline_log_iteration() {
let mut log = InlineLog::new(64);
log.try_append(&NodeLogEntry::InsertChild {
key: 0x01,
child_id: 100,
});
log.try_append(&NodeLogEntry::RemoveChild { key: 0x02 });
log.try_append(&NodeLogEntry::ClearValue);
let entries: Vec<_> = log.iter().collect();
assert_eq!(entries.len(), 3);
assert_eq!(
entries[0],
NodeLogEntry::InsertChild {
key: 0x01,
child_id: 100
}
);
assert_eq!(entries[1], NodeLogEntry::RemoveChild { key: 0x02 });
assert_eq!(entries[2], NodeLogEntry::ClearValue);
}
#[test]
fn test_inline_log_clear() {
let mut log = InlineLog::new(64);
log.try_append(&NodeLogEntry::InsertChild {
key: 0x01,
child_id: 100,
});
assert!(!log.is_empty());
log.clear();
assert!(log.is_empty());
assert_eq!(log.entry_count(), 0);
assert_eq!(log.used_space(), 0);
}
#[test]
fn test_dirty_node_tracker() {
let stats = Arc::new(PerNodeLogStatsAtomic::new());
let tracker = DirtyNodeTracker::new(Arc::clone(&stats));
assert_eq!(tracker.dirty_count(), 0);
tracker.mark_dirty(1);
tracker.mark_dirty(2);
tracker.mark_dirty(3);
assert_eq!(tracker.dirty_count(), 3);
assert_eq!(stats.snapshot().dirty_node_count, 3);
assert!(tracker.is_dirty(1));
assert!(!tracker.is_dirty(4));
tracker.mark_dirty(1);
assert_eq!(tracker.dirty_count(), 3);
tracker.mark_clean(2);
assert_eq!(tracker.dirty_count(), 2);
assert!(!tracker.is_dirty(2));
let dirty = tracker.get_dirty_nodes();
assert_eq!(dirty.len(), 2);
assert!(dirty.contains(&1));
assert!(dirty.contains(&3));
tracker.clear();
assert_eq!(tracker.dirty_count(), 0);
}
#[test]
fn test_stats_atomic() {
let stats = PerNodeLogStatsAtomic::new();
stats.record_entry_written(10, false);
stats.record_entry_written(100, true);
stats.record_entry_read();
stats.record_entry_read();
stats.record_compaction();
stats.record_overflow_alloc();
let snapshot = stats.snapshot();
assert_eq!(snapshot.entries_written, 2);
assert_eq!(snapshot.entries_read, 2);
assert_eq!(snapshot.inline_bytes_written, 10);
assert_eq!(snapshot.overflow_bytes_written, 100);
assert_eq!(snapshot.compactions, 1);
assert_eq!(snapshot.overflow_allocations, 1);
}
#[test]
fn test_serialized_size() {
let entry1 = NodeLogEntry::InsertChild {
key: 0,
child_id: 0,
};
assert_eq!(entry1.serialized_size(), entry1.serialize().len());
let entry2 = NodeLogEntry::RemoveChild { key: 0 };
assert_eq!(entry2.serialized_size(), entry2.serialize().len());
let entry3 = NodeLogEntry::SetValue {
value: vec![1, 2, 3],
};
assert_eq!(entry3.serialized_size(), entry3.serialize().len());
let entry4 = NodeLogEntry::ClearValue;
assert_eq!(entry4.serialized_size(), entry4.serialize().len());
let entry5 = NodeLogEntry::SetPrefix { prefix: vec![1, 2] };
assert_eq!(entry5.serialized_size(), entry5.serialize().len());
}
#[test]
fn test_deserialize_truncated_data() {
assert!(NodeLogEntry::deserialize(&[]).is_none());
assert!(NodeLogEntry::deserialize(&[log_entry_type::INSERT_CHILD]).is_none());
assert!(NodeLogEntry::deserialize(&[log_entry_type::INSERT_CHILD, 0x42]).is_none());
assert!(NodeLogEntry::deserialize(&[log_entry_type::SET_VALUE, 0x00]).is_none());
assert!(NodeLogEntry::deserialize(&[log_entry_type::SET_VALUE, 0x05, 0x00]).is_none());
assert!(NodeLogEntry::deserialize(&[0xFF]).is_none());
}
#[test]
fn test_inline_log_from_data() {
let data = vec![
log_entry_type::REMOVE_CHILD,
0x42,
log_entry_type::CLEAR_VALUE,
];
let log = InlineLog::from_data(data.clone(), 2);
assert_eq!(log.used_space(), 3);
assert_eq!(log.entry_count(), 2);
let entries: Vec<_> = log.iter().collect();
assert_eq!(entries.len(), 2);
assert_eq!(entries[0], NodeLogEntry::RemoveChild { key: 0x42 });
assert_eq!(entries[1], NodeLogEntry::ClearValue);
}
}