use crate::backend::native::{
constants::node::NODE_SLOT_SIZE, transaction_state::TransactionState, types::NativeResult,
};
use std::io::{Read, Seek, SeekFrom, Write};
pub struct TransactionManager;
impl TransactionManager {
pub fn write_commit_marker_value(file: &mut std::fs::File, value: u64) -> NativeResult<()> {
use crate::backend::native::graph_file::validation::GraphFileValidator;
file.seek(SeekFrom::Start(
GraphFileValidator::commit_marker_offset() as u64
))?;
file.write_all(&value.to_be_bytes())?;
file.sync_all()?;
Ok(())
}
pub fn read_commit_marker_value(file: &mut std::fs::File) -> NativeResult<u64> {
use crate::backend::native::graph_file::validation::GraphFileValidator;
file.seek(SeekFrom::Start(
GraphFileValidator::commit_marker_offset() as u64
))?;
let mut marker_bytes = [0u8; 8];
file.read_exact(&mut marker_bytes)?;
Ok(u64::from_be_bytes(marker_bytes))
}
pub fn begin_cluster_commit(file: &mut std::fs::File) -> NativeResult<()> {
Self::write_commit_marker_value(file, 0)
}
pub fn finish_cluster_commit(file: &mut std::fs::File) -> NativeResult<()> {
use super::validation::GraphFileValidator;
Self::write_commit_marker_value(file, GraphFileValidator::clean_commit_marker())
}
pub fn begin_transaction(
_file: &mut std::fs::File,
tx_state: &mut TransactionState,
_file_path: &str,
node_data_offset: u64,
_file_size_fn: &dyn Fn() -> NativeResult<u64>,
read_bytes_fn: &mut dyn FnMut(u64, &mut [u8]) -> NativeResult<()>,
) -> NativeResult<()> {
if std::env::var("TX_BEGIN_AUDIT").is_ok() {
let slot_offset = node_data_offset + ((257 - 1) as u64 * 4096);
let mut buffer = vec![0u8; 32];
if read_bytes_fn(slot_offset, &mut buffer).is_ok() {
println!(
"[TX_BEGIN_AUDIT] {} node_id={} slot_offset=0x{:x} first_32={:02x?} version={}",
"BEFORE_TX_BEGIN", 257, slot_offset, &buffer, buffer[0]
);
} else {
println!(
"[TX_BEGIN_AUDIT] {} node_id={} slot_offset=0x{:x} READ_FAILED",
"BEFORE_TX_BEGIN", 257, slot_offset
);
}
}
tx_state.begin_tx(1);
if std::env::var("TX_BEGIN_AUDIT").is_ok() {
let slot_offset = node_data_offset + ((257 - 1) as u64 * 4096);
let mut buffer = vec![0u8; 32];
if read_bytes_fn(slot_offset, &mut buffer).is_ok() {
println!(
"[TX_BEGIN_AUDIT] {} node_id={} slot_offset=0x{:x} first_32={:02x?} version={}",
"AFTER_TX_STATE_MODIFY", 257, slot_offset, &buffer, buffer[0]
);
} else {
println!(
"[TX_BEGIN_AUDIT] {} node_id={} slot_offset=0x{:x} READ_FAILED",
"AFTER_TX_STATE_MODIFY", 257, slot_offset
);
}
}
use super::debug::DebugInstrumentation;
DebugInstrumentation::log_transaction_phase("begun", 1);
Ok(())
}
pub fn commit_transaction(
file: &mut std::fs::File,
tx_state: &mut TransactionState,
) -> NativeResult<()> {
tx_state.commit();
file.sync_all()?;
use super::debug::DebugInstrumentation;
DebugInstrumentation::log_transaction_phase("committed", tx_state.tx_id);
Ok(())
}
pub fn rollback_transaction(
file: &mut std::fs::File,
tx_state: &mut TransactionState,
current_size: u64,
node_data_offset: u64,
node_count: u64,
) -> NativeResult<()> {
let node_region_end = node_data_offset + (node_count as u64 * NODE_SLOT_SIZE);
tx_state.rollback();
let intended_rollback_size = 0; let rollback_floor = std::cmp::max(node_region_end, node_data_offset);
let enhanced_rollback_floor = current_size; let final_rollback_size = std::cmp::max(intended_rollback_size, enhanced_rollback_floor);
use super::debug::DebugInstrumentation;
DebugInstrumentation::log_rollback_info(
rollback_floor,
enhanced_rollback_floor,
final_rollback_size,
);
if current_size > final_rollback_size {
DebugInstrumentation::log_slot_corruption_check(
"FILE_TRUNCATE",
current_size,
final_rollback_size,
current_size - final_rollback_size,
);
file.set_len(final_rollback_size)?;
file.sync_all()?;
DebugInstrumentation::log_post_truncate_slot_check(
257,
node_data_offset + ((257 - 1) as u64 * 4096),
2, );
}
DebugInstrumentation::log_rollback_completion(final_rollback_size);
Ok(())
}
pub fn clear_v2_cluster_metadata_on_rollback(
tx_modified_nodes: &mut Vec<u64>,
) -> NativeResult<()> {
#[cfg(feature = "trace_v2_io")]
println!("[phase75] ROLLBACK_CLEANUP: SKIPPING V2 node slot rewrite to prevent corruption");
tx_modified_nodes.clear();
#[cfg(feature = "trace_v2_io")]
println!("[phase75] ROLLBACK_CLEANUP: Completed without V2 slot corruption");
Ok(())
}
pub fn get_transaction_statistics(tx_state: &TransactionState) -> TransactionStatistics {
TransactionStatistics {
tx_id: tx_state.tx_id,
is_active: tx_state.is_in_progress(),
state: if tx_state.is_in_progress() {
"InProgress".to_string()
} else {
"Inactive".to_string()
},
node_count: 0,
edge_count: 0,
free_space_offset: 0,
}
}
}
#[derive(Debug, Clone)]
pub struct TransactionStatistics {
pub tx_id: u64,
pub is_active: bool,
pub state: String,
pub node_count: u64,
pub edge_count: u64,
pub free_space_offset: u64,
}
impl TransactionStatistics {
pub fn is_transaction_in_progress(&self) -> bool {
self.is_active
}
pub fn get_state_description(&self) -> &str {
&self.state
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::backend::native::transaction_state::TransactionState;
use std::fs::OpenOptions;
use tempfile::tempfile;
#[test]
fn test_write_read_commit_marker() {
let mut temp_file = tempfile().unwrap();
let test_value = 0x123456789ABCDEF0u64;
TransactionManager::write_commit_marker_value(&mut temp_file, test_value).unwrap();
let read_value = TransactionManager::read_commit_marker_value(&mut temp_file).unwrap();
assert_eq!(read_value, test_value);
}
#[test]
fn test_cluster_commit_operations() {
let mut temp_file = tempfile().unwrap();
TransactionManager::begin_cluster_commit(&mut temp_file).unwrap();
let marker = TransactionManager::read_commit_marker_value(&mut temp_file).unwrap();
assert_eq!(marker, 0);
TransactionManager::finish_cluster_commit(&mut temp_file).unwrap();
let marker = TransactionManager::read_commit_marker_value(&mut temp_file).unwrap();
assert_ne!(marker, 0); }
#[test]
fn test_transaction_statistics() {
let mut tx_state = TransactionState::new();
tx_state.begin_tx(42);
let stats = TransactionManager::get_transaction_statistics(&tx_state);
assert_eq!(stats.tx_id, 42);
assert!(stats.is_transaction_in_progress());
assert!(!stats.get_state_description().is_empty());
assert_eq!(stats.get_state_description(), "InProgress");
}
#[test]
fn test_clear_v2_cluster_metadata() {
let mut tx_modified_nodes = vec![1u64, 2u64, 3u64];
assert_eq!(tx_modified_nodes.len(), 3);
TransactionManager::clear_v2_cluster_metadata_on_rollback(&mut tx_modified_nodes).unwrap();
assert_eq!(tx_modified_nodes.len(), 0);
}
#[test]
fn test_rollback_with_no_truncation() {
let mut temp_file = tempfile().unwrap();
let mut tx_state = TransactionState::new();
temp_file.set_len(2048).unwrap();
tx_state.begin_tx(1);
let result = TransactionManager::rollback_transaction(
&mut temp_file,
&mut tx_state,
2048, 1024, 10, );
assert!(result.is_ok());
assert_eq!(temp_file.metadata().unwrap().len(), 2048);
}
}