use crate::backend::native::types::NativeNodeId;
use std::collections::HashSet;
pub struct TransactionAuditor {
tx_modified_nodes: HashSet<NativeNodeId>,
tx_begin_audit_enabled: bool,
phase75_instrumentation_enabled: bool,
edge_cluster_debug_enabled: bool,
}
impl TransactionAuditor {
pub fn new() -> Self {
Self {
tx_modified_nodes: HashSet::new(),
tx_begin_audit_enabled: std::env::var("TX_BEGIN_AUDIT").is_ok(),
phase75_instrumentation_enabled: std::env::var("PHASE75_INSTRUMENTATION").is_ok(),
edge_cluster_debug_enabled: std::env::var("EDGE_CLUSTER_DEBUG").is_ok(),
}
}
pub fn record_node_v2_cluster_modified(&mut self, node_id: NativeNodeId) {
self.tx_modified_nodes.insert(node_id);
#[cfg(feature = "trace_v2_io")]
if self.phase75_instrumentation_enabled {
println!(
"[phase75] WRITESET_RECORD: node_id={} marked for rollback cleanup",
node_id
);
}
}
pub fn is_node_modified(&self, node_id: NativeNodeId) -> bool {
self.tx_modified_nodes.contains(&node_id)
}
pub fn get_modified_nodes(&self) -> Vec<NativeNodeId> {
self.tx_modified_nodes.iter().copied().collect()
}
pub fn modified_node_count(&self) -> usize {
self.tx_modified_nodes.len()
}
pub fn clear_modified_nodes(&mut self) {
#[cfg(feature = "trace_v2_io")]
if self.phase75_instrumentation_enabled {
println!("[phase75] ROLLBACK_CLEANUP: Clearing transaction modification tracking");
}
self.tx_modified_nodes.clear();
}
pub fn audit_transaction_begin<F>(
&self,
node_data_offset: u64,
read_bytes_fn: F,
) -> NativeResult<()>
where
F: FnOnce(u64, &mut [u8]) -> NativeResult<()>,
{
if !self.tx_begin_audit_enabled {
return Ok(());
}
const AUDIT_NODE_ID: NativeNodeId = 257;
let slot_offset = node_data_offset + ((AUDIT_NODE_ID - 1) as u64 * 4096);
let mut buffer = vec![0u8; 32];
match read_bytes_fn(slot_offset, &mut buffer) {
Ok(_) => {
println!(
"[TX_BEGIN_AUDIT] {} node_id={} slot_offset=0x{:x} first_32={:02x?} version={}",
"BEFORE_TX_BEGIN", AUDIT_NODE_ID, slot_offset, &buffer, buffer[0]
);
}
Err(_) => {
println!(
"[TX_BEGIN_AUDIT] {} node_id={} slot_offset=0x{:x} READ_FAILED",
"BEFORE_TX_BEGIN", AUDIT_NODE_ID, slot_offset
);
}
}
Ok(())
}
pub fn debug_edge_cluster_before_transaction<F>(
&self,
file_path: &std::path::Path,
file_size_fn: F,
) -> NativeResult<()>
where
F: FnOnce() -> NativeResult<u64>,
{
if !self.edge_cluster_debug_enabled {
return Ok(());
}
const NODE1_SLOT_OFFSET: u64 = 0x400;
let mut disk_file = std::fs::File::open(file_path)?;
let mut node1_bytes = vec![0u8; 32];
use std::io::{Read, Seek};
disk_file.seek(std::io::SeekFrom::Start(NODE1_SLOT_OFFSET))?;
disk_file.read_exact(&mut node1_bytes)?;
let version_before_tx_ops = node1_bytes[0];
let file_size_before_tx_ops = file_size_fn().unwrap_or(0);
println!(
"[EDGE_CLUSTER_DEBUG] BEFORE_TX_OPS: node1_version={}, file_size={}, node1_bytes={:02x?}",
version_before_tx_ops, file_size_before_tx_ops, &node1_bytes
);
Ok(())
}
pub fn clear_v2_cluster_metadata_on_rollback(&mut self) -> NativeResult<()> {
#[cfg(feature = "trace_v2_io")]
if self.phase75_instrumentation_enabled {
println!(
"[phase75] ROLLBACK_CLEANUP: SKIPPING V2 node slot rewrite to prevent corruption"
);
}
self.clear_modified_nodes();
#[cfg(feature = "trace_v2_io")]
if self.phase75_instrumentation_enabled {
println!("[phase75] ROLLBACK_CLEANUP: Completed without V2 slot corruption");
}
Ok(())
}
pub fn generate_audit_report(&self) -> String {
let mut report = String::new();
report.push_str("=== Transaction Audit Report ===\n");
report.push_str(&format!("Modified nodes: {}\n", self.modified_node_count()));
if !self.tx_modified_nodes.is_empty() {
report.push_str("Modified node IDs: ");
let mut node_ids: Vec<_> = self.tx_modified_nodes.iter().copied().collect();
node_ids.sort();
for (i, node_id) in node_ids.iter().enumerate() {
if i > 0 {
report.push_str(", ");
}
report.push_str(&node_id.to_string());
}
report.push('\n');
}
report.push_str(&format!(
"TX_BEGIN_AUDIT enabled: {}\n",
self.tx_begin_audit_enabled
));
report.push_str(&format!(
"PHASE75_INSTRUMENTATION enabled: {}\n",
self.phase75_instrumentation_enabled
));
report.push_str(&format!(
"EDGE_CLUSTER_DEBUG enabled: {}\n",
self.edge_cluster_debug_enabled
));
report
}
pub fn has_debugging_enabled(&self) -> bool {
self.tx_begin_audit_enabled
|| self.phase75_instrumentation_enabled
|| self.edge_cluster_debug_enabled
}
pub fn get_statistics(&self) -> TransactionAuditorStatistics {
TransactionAuditorStatistics {
modified_node_count: self.modified_node_count(),
tx_begin_audit_enabled: self.tx_begin_audit_enabled,
phase75_instrumentation_enabled: self.phase75_instrumentation_enabled,
edge_cluster_debug_enabled: self.edge_cluster_debug_enabled,
has_debugging_enabled: self.has_debugging_enabled(),
}
}
}
#[derive(Debug, Clone)]
pub struct TransactionAuditorStatistics {
pub modified_node_count: usize,
pub tx_begin_audit_enabled: bool,
pub phase75_instrumentation_enabled: bool,
pub edge_cluster_debug_enabled: bool,
pub has_debugging_enabled: bool,
}
impl Default for TransactionAuditor {
fn default() -> Self {
Self::new()
}
}
pub use crate::backend::native::types::NativeResult;
#[cfg(test)]
mod tests {
use super::*;
use tempfile::NamedTempFile;
#[test]
fn test_transaction_auditor_creation() {
let auditor = TransactionAuditor::new();
assert_eq!(auditor.modified_node_count(), 0);
assert!(!auditor.has_debugging_enabled()); }
#[test]
fn test_node_modification_tracking() {
let mut auditor = TransactionAuditor::new();
assert_eq!(auditor.modified_node_count(), 0);
assert!(!auditor.is_node_modified(100));
auditor.record_node_v2_cluster_modified(100);
auditor.record_node_v2_cluster_modified(200);
assert_eq!(auditor.modified_node_count(), 2);
assert!(auditor.is_node_modified(100));
assert!(auditor.is_node_modified(200));
assert!(!auditor.is_node_modified(300));
let modified_nodes = auditor.get_modified_nodes();
assert_eq!(modified_nodes.len(), 2);
assert!(modified_nodes.contains(&100));
assert!(modified_nodes.contains(&200));
}
#[test]
fn test_clear_modified_nodes() {
let mut auditor = TransactionAuditor::new();
auditor.record_node_v2_cluster_modified(100);
auditor.record_node_v2_cluster_modified(200);
assert_eq!(auditor.modified_node_count(), 2);
auditor.clear_modified_nodes();
assert_eq!(auditor.modified_node_count(), 0);
assert!(!auditor.is_node_modified(100));
assert!(!auditor.is_node_modified(200));
}
#[test]
fn test_audit_report_generation() {
let mut auditor = TransactionAuditor::new();
let empty_report = auditor.generate_audit_report();
assert!(empty_report.contains("Modified nodes: 0"));
auditor.record_node_v2_cluster_modified(100);
auditor.record_node_v2_cluster_modified(50);
auditor.record_node_v2_cluster_modified(200);
let report = auditor.generate_audit_report();
assert!(report.contains("Modified nodes: 3"));
assert!(report.contains("50, 100, 200")); }
#[test]
fn test_statistics() {
let mut auditor = TransactionAuditor::new();
auditor.record_node_v2_cluster_modified(100);
auditor.record_node_v2_cluster_modified(200);
let stats = auditor.get_statistics();
assert_eq!(stats.modified_node_count, 2);
assert_eq!(stats.tx_begin_audit_enabled, auditor.tx_begin_audit_enabled);
assert_eq!(
stats.phase75_instrumentation_enabled,
auditor.phase75_instrumentation_enabled
);
assert_eq!(
stats.edge_cluster_debug_enabled,
auditor.edge_cluster_debug_enabled
);
}
#[test]
fn test_audit_transaction_begin_disabled() {
let auditor = TransactionAuditor::new();
let result = auditor.audit_transaction_begin(1024, |_, _| Ok(()));
assert!(result.is_ok());
}
#[test]
fn test_debug_edge_cluster_disabled() {
let auditor = TransactionAuditor::new();
let temp_file = NamedTempFile::new().unwrap();
let result = auditor.debug_edge_cluster_before_transaction(temp_file.path(), || Ok(1024));
assert!(result.is_ok());
}
#[test]
fn test_clear_v2_cluster_metadata_on_rollback() {
let mut auditor = TransactionAuditor::new();
auditor.record_node_v2_cluster_modified(100);
auditor.record_node_v2_cluster_modified(200);
assert_eq!(auditor.modified_node_count(), 2);
let result = auditor.clear_v2_cluster_metadata_on_rollback();
assert!(result.is_ok());
assert_eq!(auditor.modified_node_count(), 0);
}
}