use crate::backend::native::v2::pubsub::Publisher;
use crate::backend::native::v2::storage::SharedDeltaIndex;
use crate::backend::native::v2::wal::{
V2WALCheckpointManager, V2WALConfig, V2WALHeader, V2WALReader, V2WALRecord, V2WALWriter,
transaction_coordinator::IsolationLevel,
};
use crate::backend::native::{NativeBackendError, NativeResult};
use parking_lot::{Mutex, RwLock};
use std::collections::{HashMap, VecDeque};
use std::sync::Arc;
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
#[derive(Debug, Clone)]
struct ActiveTransaction {
tx_id: u64,
start_time: Instant,
start_lsn: u64,
records: Vec<V2WALRecord>,
isolation_level: IsolationLevel,
read_only: bool,
}
#[derive(Debug, Clone)]
pub struct WALManagerMetrics {
pub total_transactions: u64,
pub committed_transactions: u64,
pub rolled_back_transactions: u64,
pub avg_transaction_duration_us: u64,
pub total_records_written: u64,
pub wal_size_bytes: u64,
pub checkpoint_count: u64,
pub recovery_count: u64,
pub group_commit_batches: u64,
pub avg_group_commit_size: f64,
pub compression_ratio: f64,
pub transactions_since_checkpoint: u64,
}
pub struct V2WALManager {
config: V2WALConfig,
writer: Arc<V2WALWriter>,
reader: Arc<Mutex<Option<V2WALReader>>>,
checkpoint_manager: Arc<V2WALCheckpointManager>,
header: Arc<RwLock<V2WALHeader>>,
active_transactions: Arc<RwLock<HashMap<u64, ActiveTransaction>>>,
transaction_coordinator: Arc<Mutex<TransactionCoordinator>>,
cluster_organizer: Arc<Mutex<ClusterAffinityOrganizer>>,
delta_index: SharedDeltaIndex,
metrics: Arc<RwLock<WALManagerMetrics>>,
shutdown_signal: Arc<Mutex<bool>>,
coordinator_handle: Arc<Mutex<Option<std::thread::JoinHandle<()>>>>,
publisher: Arc<Publisher>,
}
#[derive(Debug)]
struct TransactionCoordinator {
pending_transactions: VecDeque<ActiveTransaction>,
max_group_size: usize,
group_timeout: Duration,
last_group_commit: Instant,
group_commit_count: u64,
total_grouped_transactions: u64,
}
#[derive(Debug)]
struct ClusterAffinityOrganizer {
cluster_groups: HashMap<i64, Vec<V2WALRecord>>,
max_cluster_group_size: usize,
cluster_flush_timeout: Duration,
last_cluster_flush: Instant,
}
impl V2WALManager {
pub fn create(config: V2WALConfig) -> NativeResult<Self> {
config.validate()?;
let writer = Arc::new(V2WALWriter::create(config.clone())?);
let reader = Arc::new(Mutex::new(None));
let checkpoint_strategy =
crate::backend::native::v2::wal::checkpoint::CheckpointStrategy::SizeThreshold(
config.max_wal_size / 4,
);
let checkpoint_manager = Arc::new(V2WALCheckpointManager::create(
config.clone(),
checkpoint_strategy,
)?);
let header = Arc::new(RwLock::new(writer.get_header()));
let transaction_coordinator = Arc::new(Mutex::new(TransactionCoordinator {
pending_transactions: VecDeque::new(),
max_group_size: config.max_group_commit_size,
group_timeout: Duration::from_millis(config.group_commit_timeout_ms),
last_group_commit: Instant::now(),
group_commit_count: 0,
total_grouped_transactions: 0,
}));
let cluster_organizer = Arc::new(Mutex::new(ClusterAffinityOrganizer {
cluster_groups: HashMap::new(),
max_cluster_group_size: 100,
cluster_flush_timeout: Duration::from_millis(50),
last_cluster_flush: Instant::now(),
}));
let publisher = Arc::new(Publisher::new());
let manager = Self {
config,
writer,
reader,
checkpoint_manager,
header,
active_transactions: Arc::new(RwLock::new(HashMap::new())),
transaction_coordinator,
cluster_organizer,
metrics: Arc::new(RwLock::new(WALManagerMetrics::default())),
shutdown_signal: Arc::new(Mutex::new(false)),
coordinator_handle: Arc::new(Mutex::new(None)),
delta_index: Arc::new(parking_lot::RwLock::new(
crate::backend::native::v2::storage::DeltaIndex::new(),
)),
publisher,
};
manager.start_background_coordinator()?;
Ok(manager)
}
pub fn open(config: V2WALConfig) -> NativeResult<Self> {
config.validate()?;
let writer = Arc::new(V2WALWriter::open(config.clone())?);
let reader = Arc::new(Mutex::new(None));
let checkpoint_strategy =
crate::backend::native::v2::wal::checkpoint::CheckpointStrategy::SizeThreshold(
config.max_wal_size / 4,
);
let checkpoint_manager = Arc::new(V2WALCheckpointManager::create(
config.clone(),
checkpoint_strategy,
)?);
let header = Arc::new(RwLock::new(writer.get_header()));
let transaction_coordinator = Arc::new(Mutex::new(TransactionCoordinator {
pending_transactions: VecDeque::new(),
max_group_size: config.max_group_commit_size,
group_timeout: Duration::from_millis(config.group_commit_timeout_ms),
last_group_commit: Instant::now(),
group_commit_count: 0,
total_grouped_transactions: 0,
}));
let cluster_organizer = Arc::new(Mutex::new(ClusterAffinityOrganizer {
cluster_groups: HashMap::new(),
max_cluster_group_size: 100,
cluster_flush_timeout: Duration::from_millis(50),
last_cluster_flush: Instant::now(),
}));
let publisher = Arc::new(Publisher::new());
let manager = Self {
config,
writer,
reader,
checkpoint_manager,
header,
active_transactions: Arc::new(RwLock::new(HashMap::new())),
transaction_coordinator,
cluster_organizer,
metrics: Arc::new(RwLock::new(WALManagerMetrics::default())),
shutdown_signal: Arc::new(Mutex::new(false)),
coordinator_handle: Arc::new(Mutex::new(None)),
delta_index: Arc::new(parking_lot::RwLock::new(
crate::backend::native::v2::storage::DeltaIndex::new(),
)),
publisher,
};
manager.start_background_coordinator()?;
Ok(manager)
}
fn ensure_reader_initialized(&self) -> NativeResult<()> {
let mut reader_guard = self.reader.lock();
if reader_guard.is_none() {
let reader = V2WALReader::open(&self.config.wal_path)?;
*reader_guard = Some(reader);
}
Ok(())
}
fn get_reader(&self) -> NativeResult<parking_lot::MutexGuard<'_, Option<V2WALReader>>> {
self.ensure_reader_initialized()?;
Ok(self.reader.lock())
}
pub fn begin_transaction(&self, isolation_level: IsolationLevel) -> NativeResult<u64> {
let start_time = Instant::now();
let tx_id = self.generate_transaction_id();
let start_lsn = {
let header = self.header.read();
header.current_lsn
};
let transaction = ActiveTransaction {
tx_id,
start_time,
start_lsn,
records: Vec::new(),
isolation_level,
read_only: false, };
{
let mut active = self.active_transactions.write();
active.insert(tx_id, transaction);
}
let begin_record = V2WALRecord::TransactionBegin {
tx_id,
timestamp: SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_secs(),
};
self.writer.write_record(begin_record)?;
{
let mut metrics = self.metrics.write();
metrics.total_transactions += 1;
}
Ok(tx_id)
}
pub fn write_transaction_record(&self, tx_id: u64, record: V2WALRecord) -> NativeResult<u64> {
{
let active = self.active_transactions.read();
if !active.contains_key(&tx_id) {
return Err(NativeBackendError::InvalidTransaction {
tx_id,
reason: "Transaction not found or not active".to_string(),
});
}
}
let cluster_key = record.cluster_key();
let record_for_tx = record.clone();
let record_for_cluster = record.clone();
let lsn = self.writer.write_record(record)?;
{
let mut active = self.active_transactions.write();
if let Some(tx) = active.get_mut(&tx_id) {
tx.records.push(record_for_tx);
tx.read_only = false; }
}
if let Some(key) = cluster_key {
let mut organizer = self.cluster_organizer.lock();
organizer
.cluster_groups
.entry(key)
.or_insert_with(Vec::new)
.push(record_for_cluster);
}
{
let writer_metrics = self.writer.get_metrics();
let mut manager_metrics = self.metrics.write();
manager_metrics.total_records_written = writer_metrics.records_written;
}
Ok(lsn)
}
pub fn commit_transaction(&self, tx_id: u64) -> NativeResult<()> {
let start_time = Instant::now();
let transaction = {
let mut active = self.active_transactions.write();
active.remove(&tx_id)
};
let transaction = transaction.ok_or_else(|| NativeBackendError::InvalidTransaction {
tx_id,
reason: "Transaction not found".to_string(),
})?;
let records = transaction.records.clone();
let commit_record = V2WALRecord::TransactionCommit {
tx_id,
timestamp: SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_secs(),
};
let commit_lsn = self.writer.write_record(commit_record)?;
{
let mut header = self.header.write();
header.committed_lsn = commit_lsn;
}
{
use crate::backend::native::v2::pubsub::emit;
let events = emit::records_to_events(&records, commit_lsn);
for event in events {
self.publisher.emit(event);
}
}
{
let mut delta_index = self.delta_index.write();
if let Err(e) = delta_index.apply_commit(records, commit_lsn) {
eprintln!("Failed to populate delta index: {}", e);
}
}
{
let mut coordinator = self.transaction_coordinator.lock();
coordinator.pending_transactions.push_back(transaction);
}
{
let mut metrics = self.metrics.write();
metrics.committed_transactions += 1;
metrics.transactions_since_checkpoint += 1;
let duration_us = start_time.elapsed().as_micros() as u64;
let total_tx = metrics.committed_transactions;
metrics.avg_transaction_duration_us =
((metrics.avg_transaction_duration_us * (total_tx - 1) as u64) + duration_us)
/ total_tx;
}
self.check_group_commit();
if self.config.auto_checkpoint && self.requires_checkpoint() {
let checkpoint_manager = self.checkpoint_manager.clone();
std::thread::spawn(move || {
if let Err(e) = checkpoint_manager.force_checkpoint() {
eprintln!("Background checkpoint failed: {}", e);
}
});
}
Ok(())
}
pub fn rollback_transaction(&self, tx_id: u64) -> NativeResult<()> {
let transaction = {
let mut active = self.active_transactions.write();
active.remove(&tx_id)
};
let _transaction = transaction.ok_or_else(|| NativeBackendError::InvalidTransaction {
tx_id,
reason: "Transaction not found".to_string(),
})?;
let rollback_record = V2WALRecord::TransactionRollback {
tx_id,
timestamp: SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_secs(),
};
self.writer.write_record(rollback_record)?;
{
let mut metrics = self.metrics.write();
metrics.rolled_back_transactions += 1;
}
Ok(())
}
pub fn write_record(&self, record: V2WALRecord) -> NativeResult<u64> {
let result = self.writer.write_record(record)?;
{
let writer_metrics = self.writer.get_metrics();
let mut manager_metrics = self.metrics.write();
manager_metrics.total_records_written = writer_metrics.records_written;
}
Ok(result)
}
pub fn write_records_batch(&self, records: Vec<V2WALRecord>) -> NativeResult<Vec<u64>> {
let result = self.writer.write_records_batch(records)?;
{
let writer_metrics = self.writer.get_metrics();
let mut manager_metrics = self.metrics.write();
manager_metrics.total_records_written = writer_metrics.records_written;
}
Ok(result)
}
pub fn flush(&self) -> NativeResult<()> {
self.writer.flush_buffer()
}
pub fn force_checkpoint(&self) -> NativeResult<()> {
let checkpoint_lsn = {
let header = self.header.read();
header.committed_lsn
};
self.checkpoint_manager.force_checkpoint()?;
self.on_checkpoint_completed(checkpoint_lsn)?;
Ok(())
}
pub fn get_header(&self) -> V2WALHeader {
self.header.read().clone()
}
pub fn get_metrics(&self) -> WALManagerMetrics {
self.metrics.read().clone()
}
pub fn get_publisher(&self) -> &Arc<Publisher> {
&self.publisher
}
pub fn get_active_transaction_count(&self) -> usize {
self.active_transactions.read().len()
}
pub fn get_transactions_since_checkpoint(&self) -> u64 {
self.metrics.read().transactions_since_checkpoint
}
pub fn get_delta_index(&self) -> &SharedDeltaIndex {
&self.delta_index
}
pub fn max_committed_lsn(&self) -> u64 {
let header = self.header.read();
header.committed_lsn
}
pub fn on_checkpoint_completed(&self, checkpointed_lsn: u64) -> NativeResult<()> {
{
let mut metrics = self.metrics.write();
metrics.transactions_since_checkpoint = 0;
}
{
let mut header = self.header.write();
header.checkpointed_lsn = checkpointed_lsn;
}
{
let mut delta_index = self.delta_index.write();
delta_index.checkpoint_completed(checkpointed_lsn);
}
{
let mut metrics = self.metrics.write();
metrics.checkpoint_count += 1;
}
Ok(())
}
pub fn requires_checkpoint(&self) -> bool {
let header = self.header.read();
let wal_size = self.estimate_wal_size();
wal_size > self.config.max_wal_size
|| (header.current_lsn - header.checkpointed_lsn) > self.config.checkpoint_interval
}
fn generate_transaction_id(&self) -> u64 {
static NEXT_TX_ID: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(1);
NEXT_TX_ID.fetch_add(1, std::sync::atomic::Ordering::SeqCst)
}
fn start_background_coordinator(&self) -> NativeResult<()> {
let transaction_coordinator = self.transaction_coordinator.clone();
let cluster_organizer = self.cluster_organizer.clone();
let writer = self.writer.clone();
let shutdown_signal = self.shutdown_signal.clone();
let handle = std::thread::spawn(move || {
let mut last_check = Instant::now();
loop {
{
let shutdown = shutdown_signal.lock();
if *shutdown {
break;
}
}
if last_check.elapsed() >= Duration::from_millis(10) {
Self::process_group_commits(&transaction_coordinator, &writer);
Self::process_cluster_groups(&cluster_organizer, &writer);
last_check = Instant::now();
}
std::thread::sleep(Duration::from_millis(5));
}
});
let mut coordinator_handle = self.coordinator_handle.lock();
*coordinator_handle = Some(handle);
Ok(())
}
fn process_group_commits(
coordinator: &Arc<Mutex<TransactionCoordinator>>,
writer: &Arc<V2WALWriter>,
) {
let mut coord = coordinator.lock();
if coord.pending_transactions.len() >= coord.max_group_size
|| coord.last_group_commit.elapsed() >= coord.group_timeout
{
let batch_size = coord.pending_transactions.len().min(coord.max_group_size);
let batch: Vec<_> = coord.pending_transactions.drain(..batch_size).collect();
if !batch.is_empty() {
let _ = writer.flush_buffer();
coord.group_commit_count += 1;
coord.total_grouped_transactions += batch.len() as u64;
coord.last_group_commit = Instant::now();
}
}
}
fn process_cluster_groups(
organizer: &Arc<Mutex<ClusterAffinityOrganizer>>,
writer: &Arc<V2WALWriter>,
) {
let mut org = organizer.lock();
if org.last_cluster_flush.elapsed() >= org.cluster_flush_timeout {
for (_cluster_key, records) in org.cluster_groups.drain() {
if !records.is_empty() {
let _ = writer.flush_buffer(); }
}
org.last_cluster_flush = Instant::now();
}
}
fn check_group_commit(&self) {
Self::process_group_commits(&self.transaction_coordinator, &self.writer);
}
fn estimate_wal_size(&self) -> u64 {
if let Ok(metadata) = std::fs::metadata(&self.config.wal_path) {
return metadata.len();
}
let metrics = self.writer.get_metrics();
metrics.bytes_written + std::mem::size_of::<V2WALHeader>() as u64
}
pub fn enable_bulk_mode(
&self,
config: &super::bulk_ingest::BulkIngestConfig,
) -> NativeResult<()> {
self.writer.enable_bulk_mode(config)
}
pub fn disable_bulk_mode(&self) -> NativeResult<()> {
self.writer.disable_bulk_mode()
}
pub fn is_bulk_mode_active(&self) -> bool {
self.writer.is_bulk_mode_active()
}
pub fn shutdown(self) -> NativeResult<()> {
{
let mut shutdown = self.shutdown_signal.lock();
*shutdown = true;
}
{
let mut handle = self.coordinator_handle.lock();
if let Some(handle) = handle.take() {
let _ = handle.join();
}
}
self.check_group_commit();
self.flush()?;
self.writer.shutdown()?;
Ok(())
}
pub fn soft_shutdown(&self) -> NativeResult<()> {
{
let mut shutdown = self.shutdown_signal.lock();
*shutdown = true;
}
self.check_group_commit();
self.flush()?;
Ok(())
}
}
impl Default for WALManagerMetrics {
fn default() -> Self {
Self {
total_transactions: 0,
committed_transactions: 0,
rolled_back_transactions: 0,
avg_transaction_duration_us: 0,
total_records_written: 0,
wal_size_bytes: 0,
checkpoint_count: 0,
recovery_count: 0,
group_commit_batches: 0,
avg_group_commit_size: 0.0,
compression_ratio: 1.0,
transactions_since_checkpoint: 0,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::backend::native::GraphFile;
use tempfile::tempdir;
#[test]
fn test_enhanced_wal_manager_create() {
let temp_dir = tempdir().unwrap();
let v2_graph_path = temp_dir.path().join("test.v2");
let _graph_file =
GraphFile::create(&v2_graph_path).expect("Failed to create V2 graph file for test");
let config = V2WALConfig {
graph_path: v2_graph_path.clone(),
wal_path: temp_dir.path().join("test.wal"),
checkpoint_path: temp_dir.path().join("test.checkpoint"),
..Default::default()
};
let manager = V2WALManager::create(config);
assert!(manager.is_ok());
let manager = manager.unwrap();
assert_eq!(manager.get_active_transaction_count(), 0);
let metrics = manager.get_metrics();
assert_eq!(metrics.total_transactions, 0);
assert_eq!(metrics.committed_transactions, 0);
}
#[test]
fn test_transaction_lifecycle() {
let temp_dir = tempdir().unwrap();
let v2_graph_path = temp_dir.path().join("test.v2");
let _graph_file =
GraphFile::create(&v2_graph_path).expect("Failed to create V2 graph file for test");
let config = V2WALConfig {
graph_path: v2_graph_path.clone(),
wal_path: temp_dir.path().join("test.wal"),
checkpoint_path: temp_dir.path().join("test.checkpoint"),
..Default::default()
};
let manager = V2WALManager::create(config).unwrap();
let tx_id = manager
.begin_transaction(IsolationLevel::ReadCommitted)
.unwrap();
assert!(tx_id > 0);
assert_eq!(manager.get_active_transaction_count(), 1);
let record = V2WALRecord::NodeInsert {
node_id: 42,
slot_offset: 1024,
node_data: vec![1, 2, 3],
};
let lsn = manager.write_transaction_record(tx_id, record).unwrap();
assert!(lsn > 0);
manager.commit_transaction(tx_id).unwrap();
assert_eq!(manager.get_active_transaction_count(), 0);
let metrics = manager.get_metrics();
assert_eq!(metrics.total_transactions, 1);
assert_eq!(metrics.committed_transactions, 1);
}
#[test]
fn test_transaction_rollback() {
let temp_dir = tempdir().unwrap();
let v2_graph_path = temp_dir.path().join("test.v2");
let _graph_file =
GraphFile::create(&v2_graph_path).expect("Failed to create V2 graph file for test");
let config = V2WALConfig {
graph_path: v2_graph_path.clone(),
wal_path: temp_dir.path().join("test.wal"),
checkpoint_path: temp_dir.path().join("test.checkpoint"),
..Default::default()
};
let manager = V2WALManager::create(config).unwrap();
let tx_id = manager
.begin_transaction(IsolationLevel::Serializable)
.unwrap();
assert_eq!(manager.get_active_transaction_count(), 1);
let record = V2WALRecord::NodeInsert {
node_id: 43,
slot_offset: 2048,
node_data: vec![4, 5, 6],
};
manager.write_transaction_record(tx_id, record).unwrap();
manager.rollback_transaction(tx_id).unwrap();
assert_eq!(manager.get_active_transaction_count(), 0);
let metrics = manager.get_metrics();
assert_eq!(metrics.total_transactions, 1);
assert_eq!(metrics.committed_transactions, 0);
assert_eq!(metrics.rolled_back_transactions, 1);
}
#[test]
fn test_isolation_levels() {
assert_eq!(IsolationLevel::ReadCommitted, IsolationLevel::ReadCommitted);
assert_ne!(IsolationLevel::ReadCommitted, IsolationLevel::Serializable);
assert_ne!(IsolationLevel::Serializable, IsolationLevel::Snapshot);
}
#[test]
fn test_transaction_coordinator() {
let coordinator = TransactionCoordinator {
pending_transactions: VecDeque::new(),
max_group_size: 10,
group_timeout: Duration::from_millis(100),
last_group_commit: Instant::now(),
group_commit_count: 0,
total_grouped_transactions: 0,
};
assert_eq!(coordinator.pending_transactions.len(), 0);
assert_eq!(coordinator.max_group_size, 10);
assert_eq!(coordinator.group_commit_count, 0);
}
#[test]
fn test_cluster_organizer() {
let organizer = ClusterAffinityOrganizer {
cluster_groups: HashMap::new(),
max_cluster_group_size: 50,
cluster_flush_timeout: Duration::from_millis(25),
last_cluster_flush: Instant::now(),
};
assert_eq!(organizer.cluster_groups.len(), 0);
assert_eq!(organizer.max_cluster_group_size, 50);
}
#[test]
fn test_wal_manager_metrics() {
let mut metrics = WALManagerMetrics::default();
assert_eq!(metrics.total_transactions, 0);
assert_eq!(metrics.committed_transactions, 0);
assert_eq!(metrics.rolled_back_transactions, 0);
assert_eq!(metrics.avg_transaction_duration_us, 0);
metrics.total_transactions = 5;
metrics.committed_transactions = 4;
metrics.rolled_back_transactions = 1;
metrics.avg_transaction_duration_us = 1500;
assert_eq!(metrics.total_transactions, 5);
assert_eq!(metrics.committed_transactions, 4);
assert_eq!(metrics.rolled_back_transactions, 1);
assert_eq!(metrics.avg_transaction_duration_us, 1500);
}
#[test]
fn test_wal_manager_shutdown() {
let temp_dir = tempdir().unwrap();
let v2_graph_path = temp_dir.path().join("test.v2");
let _graph_file =
GraphFile::create(&v2_graph_path).expect("Failed to create V2 graph file for test");
let config = V2WALConfig {
graph_path: v2_graph_path.clone(),
wal_path: temp_dir.path().join("test.wal"),
checkpoint_path: temp_dir.path().join("test.checkpoint"),
..Default::default()
};
let manager = V2WALManager::create(config).unwrap();
let tx_id = manager
.begin_transaction(IsolationLevel::ReadCommitted)
.unwrap();
manager
.write_transaction_record(
tx_id,
V2WALRecord::NodeInsert {
node_id: 44,
slot_offset: 3072,
node_data: vec![7, 8, 9],
},
)
.unwrap();
let shutdown_result = manager.shutdown();
assert!(shutdown_result.is_ok());
}
#[test]
fn test_auto_checkpoint_enabled() {
let temp_dir = tempdir().unwrap();
let v2_graph_path = temp_dir.path().join("test.v2");
let _graph_file =
GraphFile::create(&v2_graph_path).expect("Failed to create V2 graph file for test");
let mut config = V2WALConfig {
graph_path: v2_graph_path.clone(),
wal_path: temp_dir.path().join("test.wal"),
checkpoint_path: temp_dir.path().join("test.checkpoint"),
max_wal_size: 1024 * 1024, checkpoint_interval: 2, auto_checkpoint: true,
..Default::default()
};
let manager = V2WALManager::create(config).unwrap();
let tx_id = manager
.begin_transaction(IsolationLevel::ReadCommitted)
.unwrap();
manager
.write_transaction_record(
tx_id,
V2WALRecord::NodeInsert {
node_id: 1,
slot_offset: 1024,
node_data: vec![1, 2, 3],
},
)
.unwrap();
manager.commit_transaction(tx_id).unwrap();
let tx_id = manager
.begin_transaction(IsolationLevel::ReadCommitted)
.unwrap();
manager
.write_transaction_record(
tx_id,
V2WALRecord::NodeInsert {
node_id: 2,
slot_offset: 2048,
node_data: vec![4, 5, 6],
},
)
.unwrap();
manager.commit_transaction(tx_id).unwrap();
std::thread::sleep(Duration::from_millis(100));
let metrics = manager.get_metrics();
assert_eq!(metrics.committed_transactions, 2);
}
#[test]
fn test_auto_checkpoint_disabled() {
let temp_dir = tempdir().unwrap();
let v2_graph_path = temp_dir.path().join("test.v2");
let _graph_file =
GraphFile::create(&v2_graph_path).expect("Failed to create V2 graph file for test");
let mut config = V2WALConfig {
graph_path: v2_graph_path.clone(),
wal_path: temp_dir.path().join("test.wal"),
checkpoint_path: temp_dir.path().join("test.checkpoint"),
max_wal_size: 1024 * 1024, checkpoint_interval: 2,
auto_checkpoint: false, ..Default::default()
};
let manager = V2WALManager::create(config).unwrap();
for i in 0..5 {
let tx_id = manager
.begin_transaction(IsolationLevel::ReadCommitted)
.unwrap();
manager
.write_transaction_record(
tx_id,
V2WALRecord::NodeInsert {
node_id: i,
slot_offset: ((i + 1) * 1024) as u64,
node_data: vec![i as u8],
},
)
.unwrap();
manager.commit_transaction(tx_id).unwrap();
}
std::thread::sleep(Duration::from_millis(100));
let metrics = manager.get_metrics();
assert_eq!(metrics.committed_transactions, 5);
assert_eq!(metrics.checkpoint_count, 0);
}
#[test]
fn test_checkpoint_does_not_block_commit() {
let temp_dir = tempdir().unwrap();
let v2_graph_path = temp_dir.path().join("test.v2");
let _graph_file =
GraphFile::create(&v2_graph_path).expect("Failed to create V2 graph file for test");
let config = V2WALConfig {
graph_path: v2_graph_path.clone(),
wal_path: temp_dir.path().join("test.wal"),
checkpoint_path: temp_dir.path().join("test.checkpoint"),
max_wal_size: 1024 * 1024, checkpoint_interval: 1,
auto_checkpoint: true,
..Default::default()
};
let manager = V2WALManager::create(config).unwrap();
let start = std::time::Instant::now();
let tx_id = manager
.begin_transaction(IsolationLevel::ReadCommitted)
.unwrap();
manager
.write_transaction_record(
tx_id,
V2WALRecord::NodeInsert {
node_id: 1,
slot_offset: 1024,
node_data: vec![1, 2, 3],
},
)
.unwrap();
manager.commit_transaction(tx_id).unwrap();
let commit_duration = start.elapsed();
assert!(commit_duration < Duration::from_millis(100));
}
#[test]
fn test_wal_size_estimation_uses_actual_file() {
let temp_dir = tempdir().unwrap();
let v2_graph_path = temp_dir.path().join("test.v2");
let _graph_file =
GraphFile::create(&v2_graph_path).expect("Failed to create V2 graph file for test");
let config = V2WALConfig {
graph_path: v2_graph_path.clone(),
wal_path: temp_dir.path().join("test.wal"),
checkpoint_path: temp_dir.path().join("test.checkpoint"),
..Default::default()
};
let manager = V2WALManager::create(config).unwrap();
let tx_id = manager
.begin_transaction(IsolationLevel::ReadCommitted)
.unwrap();
manager
.write_transaction_record(
tx_id,
V2WALRecord::NodeInsert {
node_id: 1,
slot_offset: 1024,
node_data: vec![1, 2, 3],
},
)
.unwrap();
manager.commit_transaction(tx_id).unwrap();
std::thread::sleep(Duration::from_millis(50));
assert!(temp_dir.path().join("test.wal").exists());
let wal_size = std::fs::metadata(temp_dir.path().join("test.wal"))
.unwrap()
.len();
assert!(wal_size > 0);
}
#[test]
fn test_transaction_count_checkpoint_trigger() {
let temp_dir = tempdir().unwrap();
let v2_graph_path = temp_dir.path().join("test.v2");
let _graph_file =
GraphFile::create(&v2_graph_path).expect("Failed to create V2 graph file for test");
let config = V2WALConfig {
graph_path: v2_graph_path.clone(),
wal_path: temp_dir.path().join("test.wal"),
checkpoint_path: temp_dir.path().join("test.checkpoint"),
max_wal_size: 1024 * 1024 * 1024, checkpoint_interval: 3, auto_checkpoint: false, ..Default::default()
};
let manager = V2WALManager::create(config).unwrap();
for i in 0..2 {
let tx_id = manager
.begin_transaction(IsolationLevel::ReadCommitted)
.unwrap();
manager
.write_transaction_record(
tx_id,
V2WALRecord::NodeInsert {
node_id: i,
slot_offset: ((i + 1) * 1024) as u64,
node_data: vec![i as u8],
},
)
.unwrap();
manager.commit_transaction(tx_id).unwrap();
}
let metrics = manager.get_metrics();
assert_eq!(metrics.transactions_since_checkpoint, 2);
assert_eq!(metrics.checkpoint_count, 0);
let tx_id = manager
.begin_transaction(IsolationLevel::ReadCommitted)
.unwrap();
manager
.write_transaction_record(
tx_id,
V2WALRecord::NodeInsert {
node_id: 3,
slot_offset: 4096,
node_data: vec![3],
},
)
.unwrap();
manager.commit_transaction(tx_id).unwrap();
let metrics = manager.get_metrics();
assert_eq!(metrics.transactions_since_checkpoint, 3);
let checkpointed_lsn = manager.get_header().committed_lsn;
manager.on_checkpoint_completed(checkpointed_lsn).unwrap();
let metrics = manager.get_metrics();
assert_eq!(
metrics.transactions_since_checkpoint, 0,
"Counter should reset after checkpoint"
);
assert_eq!(
metrics.checkpoint_count, 1,
"Checkpoint count should increment"
);
}
#[test]
fn test_size_checkpoint_trigger() {
let temp_dir = tempdir().unwrap();
let v2_graph_path = temp_dir.path().join("test.v2");
let _graph_file =
GraphFile::create(&v2_graph_path).expect("Failed to create V2 graph file for test");
let config = V2WALConfig {
graph_path: v2_graph_path.clone(),
wal_path: temp_dir.path().join("test.wal"),
checkpoint_path: temp_dir.path().join("test.checkpoint"),
max_wal_size: 1024 * 1024, auto_checkpoint: false,
..Default::default()
};
let manager = V2WALManager::create(config).unwrap();
let large_data = vec![0u8; 256 * 1024]; for i in 0..5 {
let tx_id = manager
.begin_transaction(IsolationLevel::ReadCommitted)
.unwrap();
manager
.write_transaction_record(
tx_id,
V2WALRecord::NodeInsert {
node_id: i,
slot_offset: ((i + 1) * 1024) as u64,
node_data: large_data.clone(),
},
)
.unwrap();
manager.commit_transaction(tx_id).unwrap();
}
manager.flush().unwrap();
std::thread::sleep(Duration::from_millis(50));
let wal_size = std::fs::metadata(temp_dir.path().join("test.wal"))
.unwrap()
.len();
assert!(
wal_size > 1024 * 1024,
"WAL should exceed 1MB threshold, got {}",
wal_size
);
assert!(
manager.requires_checkpoint(),
"Should require checkpoint when WAL exceeds threshold"
);
}
#[test]
fn test_checkpoint_resets_transaction_counter() {
let temp_dir = tempdir().unwrap();
let v2_graph_path = temp_dir.path().join("test.v2");
let _graph_file =
GraphFile::create(&v2_graph_path).expect("Failed to create V2 graph file for test");
let config = V2WALConfig {
graph_path: v2_graph_path.clone(),
wal_path: temp_dir.path().join("test.wal"),
checkpoint_path: temp_dir.path().join("test.checkpoint"),
max_wal_size: 1024 * 1024 * 1024,
checkpoint_interval: 1000,
auto_checkpoint: false,
..Default::default()
};
let manager = V2WALManager::create(config).unwrap();
for i in 0..5 {
let tx_id = manager
.begin_transaction(IsolationLevel::ReadCommitted)
.unwrap();
manager
.write_transaction_record(
tx_id,
V2WALRecord::NodeInsert {
node_id: i,
slot_offset: ((i + 1) * 1024) as u64,
node_data: vec![i as u8],
},
)
.unwrap();
manager.commit_transaction(tx_id).unwrap();
}
let metrics = manager.get_metrics();
assert_eq!(metrics.transactions_since_checkpoint, 5);
let checkpointed_lsn = manager.get_header().committed_lsn;
manager.on_checkpoint_completed(checkpointed_lsn).unwrap();
let metrics_after = manager.get_metrics();
assert_eq!(
metrics_after.transactions_since_checkpoint, 0,
"Counter should be reset to 0 after checkpoint"
);
assert_eq!(metrics_after.checkpoint_count, 1);
let tx_id = manager
.begin_transaction(IsolationLevel::ReadCommitted)
.unwrap();
manager
.write_transaction_record(
tx_id,
V2WALRecord::NodeInsert {
node_id: 10,
slot_offset: 10240,
node_data: vec![10],
},
)
.unwrap();
manager.commit_transaction(tx_id).unwrap();
let metrics_final = manager.get_metrics();
assert_eq!(
metrics_final.transactions_since_checkpoint, 1,
"Counter should increment from 0 after checkpoint"
);
}
#[test]
fn test_delta_index_lifecycle() {
let temp_dir = tempdir().unwrap();
let v2_graph_path = temp_dir.path().join("test.v2");
let _graph_file =
GraphFile::create(&v2_graph_path).expect("Failed to create V2 graph file for test");
let config = V2WALConfig {
graph_path: v2_graph_path.clone(),
wal_path: temp_dir.path().join("test.wal"),
checkpoint_path: temp_dir.path().join("test.checkpoint"),
max_wal_size: 1024 * 1024 * 1024,
checkpoint_interval: 1000,
auto_checkpoint: false,
..Default::default()
};
let manager = V2WALManager::create(config).unwrap();
let tx1 = manager
.begin_transaction(IsolationLevel::ReadCommitted)
.unwrap();
manager
.write_transaction_record(
tx1,
V2WALRecord::NodeInsert {
node_id: 1i64,
slot_offset: 1024,
node_data: vec![1, 2, 3],
},
)
.unwrap();
manager.commit_transaction(tx1).unwrap();
let delta_index = manager.get_delta_index().read();
assert_eq!(
delta_index.delta_count(),
1,
"Should have 1 delta after commit"
);
drop(delta_index);
let high_checkpoint_lsn = u64::MAX;
manager
.on_checkpoint_completed(high_checkpoint_lsn)
.unwrap();
let delta_index = manager.get_delta_index().read();
assert_eq!(
delta_index.delta_count(),
0,
"All deltas should be dropped after checkpoint"
);
}
}