use std::fmt;
use super::super::admission::SharedBufferState;
use super::super::edge::{BidirectionalEdgeStore, EdgeStore};
use super::super::storage::CsrGraph;
use super::checkpoint::{CheckpointStats, CompactionCheckpoint, EdgeStoreCheckpoint};
use super::errors::{CompactionError, Direction};
#[derive(Debug)]
pub struct SwapInput {
pub forward_csr: CsrGraph,
pub reverse_csr: CsrGraph,
pub checkpoint: CompactionCheckpoint,
}
impl SwapInput {
#[must_use]
pub fn new(
forward_csr: CsrGraph,
reverse_csr: CsrGraph,
checkpoint: CompactionCheckpoint,
) -> Self {
Self {
forward_csr,
reverse_csr,
checkpoint,
}
}
}
#[derive(Debug, Clone)]
pub struct SwapResult {
pub pre_swap_stats: CheckpointStats,
pub forward_edge_count: usize,
pub forward_node_count: usize,
pub reverse_edge_count: usize,
pub reverse_node_count: usize,
pub forward_csr_version: u64,
pub reverse_csr_version: u64,
}
impl fmt::Display for SwapResult {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"SwapResult {{ forward: {} edges/{} nodes (v{}), reverse: {} edges/{} nodes (v{}) }}",
self.forward_edge_count,
self.forward_node_count,
self.forward_csr_version,
self.reverse_edge_count,
self.reverse_node_count,
self.reverse_csr_version
)
}
}
pub fn swap_bidirectional_csr(
store: &BidirectionalEdgeStore,
buffer_state: &SharedBufferState,
input: SwapInput,
) -> Result<SwapResult, CompactionError> {
let checkpoint = &input.checkpoint;
let pre_swap_stats = checkpoint.stats();
let forward_edge_count = input.forward_csr.edge_count();
let forward_node_count = input.forward_csr.node_count();
let reverse_edge_count = input.reverse_csr.edge_count();
let reverse_node_count = input.reverse_csr.node_count();
let mut forward = store.forward_mut();
let mut reverse = store.reverse_mut();
if checkpoint.has_concurrent_modification(
forward.csr_version(),
forward.delta_count(),
forward.seq_counter(),
reverse.csr_version(),
reverse.delta_count(),
reverse.seq_counter(),
) {
let direction = if checkpoint.forward.has_changed(
forward.csr_version(),
forward.delta_count(),
forward.seq_counter(),
) {
Direction::Forward
} else {
Direction::Reverse
};
let (expected_seq, actual_seq) = if direction == Direction::Forward {
(checkpoint.forward.seq_counter, forward.seq_counter())
} else {
(checkpoint.reverse.seq_counter, reverse.seq_counter())
};
return Err(CompactionError::ConcurrentModification {
expected_seq,
actual_seq,
direction,
});
}
let (old_forward_csr, old_forward_tombstones, forward_csr_version) =
forward.swap_csr_returning_old(input.forward_csr);
forward.clear_delta();
reverse.swap_csr(input.reverse_csr);
reverse.clear_delta();
let reverse_csr_version = reverse.csr_version();
drop(forward);
drop(reverse);
if let Err(active_guards) = buffer_state.try_reset_to_zero() {
log::warn!(
"Counter reset failed with {active_guards} active guards after successful CSR swap"
);
return Err(CompactionError::CounterReconcileFailed {
active_guards,
forward_swapped: true,
reverse_swapped: true,
});
}
drop(old_forward_csr);
drop(old_forward_tombstones);
Ok(SwapResult {
pre_swap_stats,
forward_edge_count,
forward_node_count,
reverse_edge_count,
reverse_node_count,
forward_csr_version,
reverse_csr_version,
})
}
pub fn swap_single_csr(
store: &mut EdgeStore,
new_csr: CsrGraph,
checkpoint: &EdgeStoreCheckpoint,
direction: Direction,
) -> Result<(), CompactionError> {
if checkpoint.has_changed(
store.csr_version(),
store.delta_count(),
store.seq_counter(),
) {
return Err(CompactionError::ConcurrentModification {
expected_seq: checkpoint.seq_counter,
actual_seq: store.seq_counter(),
direction,
});
}
store.swap_csr(new_csr);
store.clear_delta();
Ok(())
}
#[cfg(test)]
mod tests {
use super::super::super::edge::EdgeKind;
use super::super::super::file::FileId;
use super::super::super::node::NodeId;
use super::super::checkpoint::CounterCheckpoint;
use super::*;
fn create_test_csr(node_count: usize, edges: &[(u32, u32)]) -> CsrGraph {
use super::super::super::storage::CsrBuilder;
let mut builder = CsrBuilder::new(node_count);
for (src, tgt) in edges {
builder
.add_edge(
*src,
NodeId::new(*tgt, 0),
EdgeKind::Calls {
argument_count: 0,
is_async: false,
},
1,
vec![],
)
.unwrap();
}
builder.build().unwrap()
}
fn create_empty_checkpoint() -> CompactionCheckpoint {
CompactionCheckpoint::new(
EdgeStoreCheckpoint::new(0, 0, 0, 0, 0),
EdgeStoreCheckpoint::new(0, 0, 0, 0, 0),
CounterCheckpoint::new(0, 0, 0, 0),
)
}
#[test]
fn test_swap_input_new() {
let forward_csr = create_test_csr(3, &[(0, 1), (1, 2)]);
let reverse_csr = create_test_csr(3, &[(1, 0), (2, 1)]);
let checkpoint = create_empty_checkpoint();
let input = SwapInput::new(forward_csr, reverse_csr, checkpoint);
assert_eq!(input.forward_csr.edge_count(), 2);
assert_eq!(input.reverse_csr.edge_count(), 2);
}
#[test]
fn test_swap_result_display() {
let result = SwapResult {
pre_swap_stats: CheckpointStats::default(),
forward_edge_count: 10,
forward_node_count: 5,
reverse_edge_count: 10,
reverse_node_count: 5,
forward_csr_version: 1,
reverse_csr_version: 1,
};
let display = format!("{result}");
assert!(display.contains("10 edges"));
assert!(display.contains("5 nodes"));
}
#[test]
fn test_swap_bidirectional_success() {
let store = BidirectionalEdgeStore::new();
let buffer_state = SharedBufferState::new();
let checkpoint = create_empty_checkpoint();
let forward_csr = create_test_csr(3, &[(0, 1), (1, 2)]);
let reverse_csr = create_test_csr(3, &[(1, 0), (2, 1)]);
let input = SwapInput::new(forward_csr, reverse_csr, checkpoint);
let result = swap_bidirectional_csr(&store, &buffer_state, input).unwrap();
assert_eq!(result.forward_edge_count, 2);
assert_eq!(result.reverse_edge_count, 2);
assert_eq!(result.forward_csr_version, 1);
assert_eq!(result.reverse_csr_version, 1);
}
#[test]
fn test_swap_bidirectional_concurrent_modification() {
let store = BidirectionalEdgeStore::new();
let buffer_state = SharedBufferState::new();
store.add_edge(
NodeId::new(0, 0),
NodeId::new(1, 0),
EdgeKind::Calls {
argument_count: 0,
is_async: false,
},
FileId::new(1),
);
let checkpoint = create_empty_checkpoint();
let forward_csr = create_test_csr(3, &[(0, 1)]);
let reverse_csr = create_test_csr(3, &[(1, 0)]);
let input = SwapInput::new(forward_csr, reverse_csr, checkpoint);
let result = swap_bidirectional_csr(&store, &buffer_state, input);
assert!(matches!(
result,
Err(CompactionError::ConcurrentModification { .. })
));
}
#[test]
fn test_swap_single_csr_success() {
let mut store = EdgeStore::new();
let checkpoint = EdgeStoreCheckpoint::new(0, 0, 0, 0, 0);
let new_csr = create_test_csr(3, &[(0, 1), (1, 2)]);
let result = swap_single_csr(&mut store, new_csr, &checkpoint, Direction::Forward);
assert!(result.is_ok());
assert_eq!(store.csr_version(), 1);
}
#[test]
fn test_swap_single_csr_concurrent_modification() {
let mut store = EdgeStore::new();
store.add_edge(
NodeId::new(0, 0),
NodeId::new(1, 0),
EdgeKind::Calls {
argument_count: 0,
is_async: false,
},
FileId::new(1),
);
let checkpoint = EdgeStoreCheckpoint::new(0, 0, 0, 0, 0);
let new_csr = create_test_csr(3, &[(0, 1)]);
let result = swap_single_csr(&mut store, new_csr, &checkpoint, Direction::Forward);
assert!(matches!(
result,
Err(CompactionError::ConcurrentModification { .. })
));
}
#[test]
fn test_swap_clears_deltas() {
let store = BidirectionalEdgeStore::new();
let buffer_state = SharedBufferState::new();
let checkpoint = create_empty_checkpoint();
let forward_csr = create_test_csr(2, &[(0, 1)]);
let reverse_csr = create_test_csr(2, &[(1, 0)]);
let input = SwapInput::new(forward_csr, reverse_csr, checkpoint);
swap_bidirectional_csr(&store, &buffer_state, input).unwrap();
assert_eq!(store.forward().delta_count(), 0);
assert_eq!(store.reverse().delta_count(), 0);
}
#[test]
fn test_swap_resets_counters() {
let store = BidirectionalEdgeStore::new();
let buffer_state = SharedBufferState::new();
let checkpoint = create_empty_checkpoint();
let forward_csr = create_test_csr(2, &[(0, 1)]);
let reverse_csr = create_test_csr(2, &[(1, 0)]);
let input = SwapInput::new(forward_csr, reverse_csr, checkpoint);
swap_bidirectional_csr(&store, &buffer_state, input).unwrap();
let snapshot = buffer_state.snapshot();
assert_eq!(snapshot.committed_bytes, 0);
assert_eq!(snapshot.committed_ops, 0);
}
#[test]
fn test_swap_with_existing_data() {
let store = BidirectionalEdgeStore::new();
let buffer_state = SharedBufferState::new();
{
let checkpoint = create_empty_checkpoint();
let forward_csr = create_test_csr(3, &[(0, 1)]);
let reverse_csr = create_test_csr(3, &[(1, 0)]);
let input = SwapInput::new(forward_csr, reverse_csr, checkpoint);
swap_bidirectional_csr(&store, &buffer_state, input).unwrap();
}
let checkpoint = CompactionCheckpoint::new(
EdgeStoreCheckpoint::new(1, 0, 0, 0, 0), EdgeStoreCheckpoint::new(1, 0, 0, 0, 0),
CounterCheckpoint::new(0, 0, 0, 0),
);
let forward_csr = create_test_csr(4, &[(0, 1), (1, 2), (2, 3)]);
let reverse_csr = create_test_csr(4, &[(1, 0), (2, 1), (3, 2)]);
let input = SwapInput::new(forward_csr, reverse_csr, checkpoint);
let result = swap_bidirectional_csr(&store, &buffer_state, input).unwrap();
assert_eq!(result.forward_edge_count, 3);
assert_eq!(result.reverse_edge_count, 3);
assert_eq!(result.forward_csr_version, 2);
}
#[test]
fn test_pre_swap_stats_preserved() {
let store = BidirectionalEdgeStore::new();
let buffer_state = SharedBufferState::new();
let checkpoint = CompactionCheckpoint::from_components(
0, 50, 2500, 20, 5, 0, 60, 3000, 25, 8, 100, 10, 50, 5, );
let _ = checkpoint;
let checkpoint = create_empty_checkpoint();
let forward_csr = create_test_csr(2, &[(0, 1)]);
let reverse_csr = create_test_csr(2, &[(1, 0)]);
let input = SwapInput::new(forward_csr, reverse_csr, checkpoint);
let result = swap_bidirectional_csr(&store, &buffer_state, input).unwrap();
assert_eq!(result.pre_swap_stats.total_delta_edges(), 0);
}
}