use std::fmt;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use super::build::CompactionSnapshot;
use super::errors::{CompactionError, InterruptReason};
use super::merge::{MergeStats, MergedEdge, merge_delta_edges};
use crate::graph::unified::edge::DeltaEdge;
pub const DEFAULT_CHUNK_SIZE: usize = 10_000;
#[derive(Debug, Clone)]
pub struct CancellationToken {
cancelled: Arc<AtomicBool>,
}
impl Default for CancellationToken {
fn default() -> Self {
Self::new()
}
}
impl CancellationToken {
#[must_use]
pub fn new() -> Self {
Self {
cancelled: Arc::new(AtomicBool::new(false)),
}
}
pub fn cancel(&self) {
self.cancelled.store(true, Ordering::Release);
}
#[must_use]
pub fn is_cancelled(&self) -> bool {
self.cancelled.load(Ordering::Acquire)
}
pub fn reset(&self) {
self.cancelled.store(false, Ordering::Release);
}
}
#[derive(Debug, Clone, Copy)]
pub struct InterruptibleConfig {
pub chunk_size: usize,
pub check_cancellation: bool,
}
impl Default for InterruptibleConfig {
fn default() -> Self {
Self {
chunk_size: DEFAULT_CHUNK_SIZE,
check_cancellation: true,
}
}
}
impl InterruptibleConfig {
#[must_use]
pub fn with_chunk_size(chunk_size: usize) -> Self {
Self {
chunk_size: chunk_size.max(1), ..Default::default()
}
}
#[must_use]
pub fn without_cancellation_check(mut self) -> Self {
self.check_cancellation = false;
self
}
}
#[derive(Debug, Clone, Copy)]
pub struct CompactionProgress {
pub total_edges: usize,
pub edges_processed: usize,
pub current_chunk: usize,
pub total_chunks: usize,
}
impl CompactionProgress {
#[must_use]
pub fn percent_complete(&self) -> u8 {
if self.total_edges == 0 {
return 100;
}
let pct = (self.edges_processed.saturating_mul(100)) / self.total_edges;
u8::try_from(pct.min(100)).unwrap_or(u8::MAX)
}
#[must_use]
pub fn is_complete(&self) -> bool {
self.edges_processed >= self.total_edges
}
}
impl fmt::Display for CompactionProgress {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"chunk {}/{}: {}/{} edges ({}%)",
self.current_chunk,
self.total_chunks,
self.edges_processed,
self.total_edges,
self.percent_complete()
)
}
}
#[derive(Debug)]
pub struct InterruptibleResult {
pub merged_edges: Vec<MergedEdge>,
pub merge_stats: MergeStats,
pub chunks_processed: usize,
pub was_cancelled: bool,
}
impl InterruptibleResult {
#[must_use]
pub fn is_complete(&self) -> bool {
!self.was_cancelled
}
}
#[derive(Debug, Default)]
pub struct InterruptibleStats {
pub started: AtomicU64,
pub completed: AtomicU64,
pub cancelled: AtomicU64,
pub total_chunks: AtomicU64,
}
impl InterruptibleStats {
#[must_use]
pub fn new() -> Self {
Self::default()
}
pub fn record_start(&self) {
self.started.fetch_add(1, Ordering::Relaxed);
}
pub fn record_complete(&self, chunks: usize) {
self.completed.fetch_add(1, Ordering::Relaxed);
self.total_chunks
.fetch_add(chunks as u64, Ordering::Relaxed);
}
pub fn record_cancel(&self, chunks: usize) {
self.cancelled.fetch_add(1, Ordering::Relaxed);
self.total_chunks
.fetch_add(chunks as u64, Ordering::Relaxed);
}
#[must_use]
pub fn snapshot(&self) -> InterruptibleStatsSnapshot {
InterruptibleStatsSnapshot {
started: self.started.load(Ordering::Relaxed),
completed: self.completed.load(Ordering::Relaxed),
cancelled: self.cancelled.load(Ordering::Relaxed),
total_chunks: self.total_chunks.load(Ordering::Relaxed),
}
}
}
#[derive(Debug, Clone, Copy, Default)]
pub struct InterruptibleStatsSnapshot {
pub started: u64,
pub completed: u64,
pub cancelled: u64,
pub total_chunks: u64,
}
impl InterruptibleStatsSnapshot {
#[must_use]
pub fn completion_rate(&self) -> u8 {
if self.started == 0 {
return 100;
}
let rate = (self.completed.saturating_mul(100)) / self.started;
rate.min(100) as u8
}
#[must_use]
pub fn cancellation_rate(&self) -> u8 {
if self.started == 0 {
return 0;
}
let rate = (self.cancelled.saturating_mul(100)) / self.started;
rate.min(100) as u8
}
}
pub fn compact_interruptible<F>(
snapshot: &CompactionSnapshot,
token: &CancellationToken,
config: &InterruptibleConfig,
mut on_yield: F,
) -> Result<InterruptibleResult, CompactionError>
where
F: FnMut(&CompactionProgress),
{
let total_edges = snapshot.delta_edges.len();
let chunk_size = config.chunk_size;
let total_chunks = if total_edges == 0 {
0
} else {
total_edges.div_ceil(chunk_size)
};
if config.check_cancellation && token.is_cancelled() {
return Err(CompactionError::Interrupted {
reason: InterruptReason::CancellationRequested,
edges_processed: 0,
edges_total: total_edges,
});
}
if total_edges <= chunk_size {
let (merged_edges, merge_stats) = merge_delta_edges(snapshot.delta_edges.clone());
let progress = CompactionProgress {
total_edges,
edges_processed: total_edges,
current_chunk: 1,
total_chunks: 1.max(total_chunks),
};
on_yield(&progress);
return Ok(InterruptibleResult {
merged_edges,
merge_stats,
chunks_processed: 1,
was_cancelled: false,
});
}
let mut all_winners: Vec<DeltaEdge> = Vec::new();
let mut edges_processed = 0;
let mut chunk_dedup_count = 0;
for (chunk_idx, chunk) in snapshot.delta_edges.chunks(chunk_size).enumerate() {
if config.check_cancellation && token.is_cancelled() {
return Err(CompactionError::Interrupted {
reason: InterruptReason::CancellationRequested,
edges_processed,
edges_total: total_edges,
});
}
let chunk_input_count = chunk.len();
let chunk_winners = dedupe_chunk_preserving_ops(chunk.to_vec());
chunk_dedup_count += chunk_input_count - chunk_winners.len();
all_winners.extend(chunk_winners);
edges_processed += chunk.len();
let progress = CompactionProgress {
total_edges,
edges_processed,
current_chunk: chunk_idx + 1,
total_chunks,
};
on_yield(&progress);
}
if config.check_cancellation && token.is_cancelled() {
return Err(CompactionError::Interrupted {
reason: InterruptReason::CancellationRequested,
edges_processed,
edges_total: total_edges,
});
}
let (final_merged, final_stats) = merge_winners_global(all_winners);
Ok(InterruptibleResult {
merged_edges: final_merged,
merge_stats: MergeStats {
input_count: total_edges,
output_count: final_stats.output_count,
deduplicated_count: chunk_dedup_count + final_stats.deduplicated_count,
removed_count: final_stats.removed_count,
},
chunks_processed: total_chunks,
was_cancelled: false,
})
}
fn dedupe_chunk_preserving_ops(mut edges: Vec<DeltaEdge>) -> Vec<DeltaEdge> {
use std::cmp::Ordering;
if edges.is_empty() {
return vec![];
}
edges.sort_by(|a, b| {
let key_a = a.edge_key();
let key_b = b.edge_key();
match a.source.index().cmp(&b.source.index()) {
Ordering::Equal => {} other => return other,
}
match a.source.generation().cmp(&b.source.generation()) {
Ordering::Equal => {} other => return other,
}
match a.target.index().cmp(&b.target.index()) {
Ordering::Equal => {} other => return other,
}
match a.target.generation().cmp(&b.target.generation()) {
Ordering::Equal => {} other => return other,
}
match format!("{:?}", key_a.kind).cmp(&format!("{:?}", key_b.kind)) {
Ordering::Equal => {} other => return other,
}
b.seq.cmp(&a.seq)
});
edges.dedup_by(|a, b| a.edge_key() == b.edge_key());
edges
}
fn merge_winners_global(all_winners: Vec<DeltaEdge>) -> (Vec<MergedEdge>, MergeStats) {
merge_delta_edges(all_winners)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::graph::unified::edge::{DeltaEdge, DeltaOp, EdgeKind};
use crate::graph::unified::file::FileId;
use crate::graph::unified::node::NodeId;
fn make_delta_edge(src: u32, tgt: u32, seq: u64, is_remove: bool) -> DeltaEdge {
DeltaEdge {
source: NodeId::new(src, 0),
target: NodeId::new(tgt, 0),
kind: EdgeKind::Calls {
argument_count: 0,
is_async: false,
},
seq,
op: if is_remove {
DeltaOp::Remove
} else {
DeltaOp::Add
},
file: FileId::new(0),
spans: vec![],
}
}
fn make_snapshot(delta_edges: Vec<DeltaEdge>) -> CompactionSnapshot {
CompactionSnapshot {
csr_edges: Vec::new(),
delta_edges,
node_count: 100,
csr_version: 0,
}
}
#[test]
fn test_cancellation_token_default() {
let token = CancellationToken::default();
assert!(!token.is_cancelled());
}
#[test]
fn test_cancellation_token_cancel() {
let token = CancellationToken::new();
assert!(!token.is_cancelled());
token.cancel();
assert!(token.is_cancelled());
}
#[test]
fn test_cancellation_token_reset() {
let token = CancellationToken::new();
token.cancel();
assert!(token.is_cancelled());
token.reset();
assert!(!token.is_cancelled());
}
#[test]
fn test_cancellation_token_clone() {
let token1 = CancellationToken::new();
let token2 = token1.clone();
token1.cancel();
assert!(token2.is_cancelled());
}
#[test]
fn test_config_default() {
let config = InterruptibleConfig::default();
assert_eq!(config.chunk_size, DEFAULT_CHUNK_SIZE);
assert!(config.check_cancellation);
}
#[test]
fn test_config_custom_chunk_size() {
let config = InterruptibleConfig::with_chunk_size(5000);
assert_eq!(config.chunk_size, 5000);
}
#[test]
fn test_config_min_chunk_size() {
let config = InterruptibleConfig::with_chunk_size(0);
assert_eq!(config.chunk_size, 1); }
#[test]
fn test_config_without_cancellation() {
let config = InterruptibleConfig::default().without_cancellation_check();
assert!(!config.check_cancellation);
}
#[test]
fn test_progress_percent_empty() {
let progress = CompactionProgress {
total_edges: 0,
edges_processed: 0,
current_chunk: 1,
total_chunks: 0,
};
assert_eq!(progress.percent_complete(), 100);
assert!(progress.is_complete());
}
#[test]
fn test_progress_percent_partial() {
let progress = CompactionProgress {
total_edges: 100,
edges_processed: 50,
current_chunk: 1,
total_chunks: 2,
};
assert_eq!(progress.percent_complete(), 50);
assert!(!progress.is_complete());
}
#[test]
fn test_progress_display() {
let progress = CompactionProgress {
total_edges: 100,
edges_processed: 50,
current_chunk: 1,
total_chunks: 2,
};
let display = format!("{progress}");
assert!(display.contains("1/2"));
assert!(display.contains("50/100"));
assert!(display.contains("50%"));
}
#[test]
fn test_compact_empty_snapshot() {
let snapshot = make_snapshot(vec![]);
let token = CancellationToken::new();
let config = InterruptibleConfig::default();
let mut yields = 0;
let result = compact_interruptible(&snapshot, &token, &config, |_| {
yields += 1;
})
.unwrap();
assert!(result.is_complete());
assert!(result.merged_edges.is_empty());
assert_eq!(yields, 1); }
#[test]
fn test_compact_small_snapshot() {
let snapshot = make_snapshot(vec![
make_delta_edge(0, 1, 1, false),
make_delta_edge(1, 2, 2, false),
]);
let token = CancellationToken::new();
let config = InterruptibleConfig::default();
let mut yields = 0;
let result = compact_interruptible(&snapshot, &token, &config, |_| {
yields += 1;
})
.unwrap();
assert!(result.is_complete());
assert_eq!(result.merged_edges.len(), 2);
assert_eq!(yields, 1);
}
#[test]
fn test_compact_with_chunks() {
let mut edges = Vec::new();
for i in 0..100 {
edges.push(make_delta_edge(i, i + 1, u64::from(i), false));
}
let snapshot = make_snapshot(edges);
let token = CancellationToken::new();
let config = InterruptibleConfig::with_chunk_size(30); let mut yield_count = 0;
let mut last_progress = None;
let result = compact_interruptible(&snapshot, &token, &config, |progress| {
yield_count += 1;
last_progress = Some(*progress);
})
.unwrap();
assert!(result.is_complete());
assert_eq!(result.merged_edges.len(), 100);
assert!(yield_count >= 3); assert!(last_progress.unwrap().is_complete());
}
#[test]
fn test_compact_early_cancellation() {
let snapshot = make_snapshot(vec![make_delta_edge(0, 1, 1, false)]);
let token = CancellationToken::new();
token.cancel(); let config = InterruptibleConfig::default();
let result = compact_interruptible(&snapshot, &token, &config, |_| {});
match result {
Err(CompactionError::Interrupted {
reason: InterruptReason::CancellationRequested,
edges_processed: 0,
..
}) => {}
_ => panic!("expected Interrupted error"),
}
}
#[test]
fn test_compact_mid_cancellation() {
let mut edges = Vec::new();
for i in 0..100 {
edges.push(make_delta_edge(i, i + 1, u64::from(i), false));
}
let snapshot = make_snapshot(edges);
let token = CancellationToken::new();
let config = InterruptibleConfig::with_chunk_size(10);
let result = compact_interruptible(&snapshot, &token, &config, |progress| {
if progress.current_chunk == 2 {
token.cancel();
}
});
match result {
Err(CompactionError::Interrupted {
reason: InterruptReason::CancellationRequested,
edges_processed,
..
}) => {
assert!(edges_processed > 0); assert!(edges_processed < 100); }
_ => panic!("expected Interrupted error"),
}
}
#[test]
fn test_compact_with_removes() {
let snapshot = make_snapshot(vec![
make_delta_edge(0, 1, 1, false), make_delta_edge(0, 1, 2, true), make_delta_edge(1, 2, 3, false), ]);
let token = CancellationToken::new();
let config = InterruptibleConfig::default();
let result = compact_interruptible(&snapshot, &token, &config, |_| {}).unwrap();
assert_eq!(result.merged_edges.len(), 1);
assert_eq!(result.merged_edges[0].source.index(), 1);
assert_eq!(result.merged_edges[0].target.index(), 2);
}
#[test]
fn test_stats_tracking() {
let stats = InterruptibleStats::new();
assert_eq!(stats.snapshot().started, 0);
stats.record_start();
stats.record_start();
assert_eq!(stats.snapshot().started, 2);
stats.record_complete(5);
assert_eq!(stats.snapshot().completed, 1);
assert_eq!(stats.snapshot().total_chunks, 5);
stats.record_cancel(3);
assert_eq!(stats.snapshot().cancelled, 1);
assert_eq!(stats.snapshot().total_chunks, 8);
}
#[test]
fn test_stats_rates() {
let mut snapshot = InterruptibleStatsSnapshot::default();
assert_eq!(snapshot.completion_rate(), 100);
assert_eq!(snapshot.cancellation_rate(), 0);
snapshot.started = 10;
snapshot.completed = 7;
snapshot.cancelled = 3;
assert_eq!(snapshot.completion_rate(), 70);
assert_eq!(snapshot.cancellation_rate(), 30);
}
#[test]
fn test_interruptible_result_is_complete() {
let result = InterruptibleResult {
merged_edges: vec![],
merge_stats: MergeStats::default(),
chunks_processed: 1,
was_cancelled: false,
};
assert!(result.is_complete());
let result = InterruptibleResult {
merged_edges: vec![],
merge_stats: MergeStats::default(),
chunks_processed: 1,
was_cancelled: true,
};
assert!(!result.is_complete());
}
#[test]
fn test_no_cancellation_check() {
let snapshot = make_snapshot(vec![make_delta_edge(0, 1, 1, false)]);
let token = CancellationToken::new();
token.cancel(); let config = InterruptibleConfig::default().without_cancellation_check();
let result = compact_interruptible(&snapshot, &token, &config, |_| {}).unwrap();
assert!(result.is_complete());
}
#[test]
fn test_cross_chunk_remove_cancels_add() {
let mut edges = Vec::new();
for i in 0..5 {
edges.push(make_delta_edge(i, i + 1, u64::from(i + 1), false));
}
edges.push(make_delta_edge(0, 1, 100, true)); for i in 10..14 {
edges.push(make_delta_edge(i, i + 1, u64::from(i + 1), false));
}
let snapshot = make_snapshot(edges);
let token = CancellationToken::new();
let config = InterruptibleConfig::with_chunk_size(5);
let result = compact_interruptible(&snapshot, &token, &config, |_| {}).unwrap();
assert!(result.is_complete());
let has_edge_0_1 = result
.merged_edges
.iter()
.any(|e| e.source.index() == 0 && e.target.index() == 1);
assert!(
!has_edge_0_1,
"Edge 0->1 should be removed by cross-chunk Remove operation"
);
assert_eq!(result.merged_edges.len(), 8);
assert_eq!(result.merge_stats.removed_count, 1);
}
#[test]
fn test_cross_chunk_add_wins_over_remove() {
let mut edges = Vec::new();
edges.push(make_delta_edge(0, 1, 1, true)); for i in 1..5 {
edges.push(make_delta_edge(i, i + 1, u64::from(i + 1), false));
}
edges.push(make_delta_edge(0, 1, 100, false)); for i in 10..14 {
edges.push(make_delta_edge(i, i + 1, u64::from(i + 1), false));
}
let snapshot = make_snapshot(edges);
let token = CancellationToken::new();
let config = InterruptibleConfig::with_chunk_size(5);
let result = compact_interruptible(&snapshot, &token, &config, |_| {}).unwrap();
let edge_0_1 = result
.merged_edges
.iter()
.find(|e| e.source.index() == 0 && e.target.index() == 1);
assert!(
edge_0_1.is_some(),
"Edge 0->1 should be present (Add with higher seq won)"
);
assert_eq!(edge_0_1.unwrap().seq, 100);
}
}