Skip to main content

sqry_core/graph/unified/compaction/
interruptible.rs

1//! Interruptible Compaction: Chunk-based compaction with yield points.
2//!
3//! This module implements  (Interruptible Compaction) to ensure LSP
4//! responsiveness during compaction operations.
5//!
6//! # Design
7//!
8//! - **Chunk-based processing**: Process edges in chunks of 10K (configurable)
9//! - **Yield points**: Callback invoked between chunks for cooperative scheduling
10//! - **Cancellation support**: Atomic flag to request compaction abort
11//! - **Progress tracking**: Monitor compaction progress
12//!
13//! # Example
14//!
15//! ```rust,ignore
16//! use sqry_core::graph::unified::compaction::interruptible::{
17//!     CancellationToken, InterruptibleConfig, compact_interruptible,
18//! };
19//!
20//! let token = CancellationToken::new();
21//! let config = InterruptibleConfig::default();
22//!
23//! // In another thread, can cancel:
24//! // token.cancel();
25//!
26//! let result = compact_interruptible(
27//!     &snapshot,
28//!     node_count,
29//!     &token,
30//!     &config,
31//!     |progress| {
32//!         // Yield point - release locks, check interrupts
33//!         println!("Progress: {}%", progress.percent_complete());
34//!     },
35//! );
36//! ```
37
38use std::fmt;
39use std::sync::Arc;
40use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
41
42use super::build::CompactionSnapshot;
43use super::errors::{CompactionError, InterruptReason};
44use super::merge::{MergeStats, MergedEdge, merge_delta_edges};
45use crate::graph::unified::edge::DeltaEdge;
46
47/// Default chunk size: 10,000 edges per chunk.
48pub const DEFAULT_CHUNK_SIZE: usize = 10_000;
49
50/// Cancellation token for interruptible operations.
51///
52/// Thread-safe flag that signals compaction should be cancelled.
53/// Can be cloned and shared across threads.
54#[derive(Debug, Clone)]
55pub struct CancellationToken {
56    cancelled: Arc<AtomicBool>,
57}
58
59impl Default for CancellationToken {
60    fn default() -> Self {
61        Self::new()
62    }
63}
64
65impl CancellationToken {
66    /// Creates a new cancellation token (not cancelled).
67    #[must_use]
68    pub fn new() -> Self {
69        Self {
70            cancelled: Arc::new(AtomicBool::new(false)),
71        }
72    }
73
74    /// Requests cancellation.
75    pub fn cancel(&self) {
76        self.cancelled.store(true, Ordering::Release);
77    }
78
79    /// Checks if cancellation was requested.
80    #[must_use]
81    pub fn is_cancelled(&self) -> bool {
82        self.cancelled.load(Ordering::Acquire)
83    }
84
85    /// Resets the token to non-cancelled state.
86    pub fn reset(&self) {
87        self.cancelled.store(false, Ordering::Release);
88    }
89}
90
91/// Configuration for interruptible compaction.
92#[derive(Debug, Clone, Copy)]
93pub struct InterruptibleConfig {
94    /// Number of edges to process per chunk.
95    /// Default: 10,000 (10K)
96    pub chunk_size: usize,
97
98    /// Whether to check cancellation between chunks.
99    /// Default: true
100    pub check_cancellation: bool,
101}
102
103impl Default for InterruptibleConfig {
104    fn default() -> Self {
105        Self {
106            chunk_size: DEFAULT_CHUNK_SIZE,
107            check_cancellation: true,
108        }
109    }
110}
111
112impl InterruptibleConfig {
113    /// Creates config with custom chunk size.
114    #[must_use]
115    pub fn with_chunk_size(chunk_size: usize) -> Self {
116        Self {
117            chunk_size: chunk_size.max(1), // At least 1
118            ..Default::default()
119        }
120    }
121
122    /// Disables cancellation checking (for benchmarking).
123    #[must_use]
124    pub fn without_cancellation_check(mut self) -> Self {
125        self.check_cancellation = false;
126        self
127    }
128}
129
130/// Progress information passed to yield callback.
131#[derive(Debug, Clone, Copy)]
132pub struct CompactionProgress {
133    /// Total number of edges to process.
134    pub total_edges: usize,
135    /// Number of edges processed so far.
136    pub edges_processed: usize,
137    /// Current chunk number (1-indexed).
138    pub current_chunk: usize,
139    /// Total number of chunks.
140    pub total_chunks: usize,
141}
142
143impl CompactionProgress {
144    /// Returns completion percentage (0-100).
145    #[must_use]
146    pub fn percent_complete(&self) -> u8 {
147        if self.total_edges == 0 {
148            return 100;
149        }
150        let pct = (self.edges_processed.saturating_mul(100)) / self.total_edges;
151        u8::try_from(pct.min(100)).unwrap_or(u8::MAX)
152    }
153
154    /// Returns true if compaction is complete.
155    #[must_use]
156    pub fn is_complete(&self) -> bool {
157        self.edges_processed >= self.total_edges
158    }
159}
160
161impl fmt::Display for CompactionProgress {
162    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
163        write!(
164            f,
165            "chunk {}/{}: {}/{} edges ({}%)",
166            self.current_chunk,
167            self.total_chunks,
168            self.edges_processed,
169            self.total_edges,
170            self.percent_complete()
171        )
172    }
173}
174
175/// Result of interruptible compaction.
176#[derive(Debug)]
177pub struct InterruptibleResult {
178    /// Merged edges (surviving Add operations).
179    pub merged_edges: Vec<MergedEdge>,
180    /// Statistics about the merge process.
181    pub merge_stats: MergeStats,
182    /// Number of chunks processed.
183    pub chunks_processed: usize,
184    /// Whether compaction was cancelled.
185    pub was_cancelled: bool,
186}
187
188impl InterruptibleResult {
189    /// Returns true if compaction completed fully.
190    #[must_use]
191    pub fn is_complete(&self) -> bool {
192        !self.was_cancelled
193    }
194}
195
196/// Statistics tracker for interruptible operations.
197#[derive(Debug, Default)]
198pub struct InterruptibleStats {
199    /// Number of compactions started.
200    pub started: AtomicU64,
201    /// Number of compactions completed.
202    pub completed: AtomicU64,
203    /// Number of compactions cancelled.
204    pub cancelled: AtomicU64,
205    /// Total chunks processed across all compactions.
206    pub total_chunks: AtomicU64,
207}
208
209impl InterruptibleStats {
210    /// Creates new stats tracker.
211    #[must_use]
212    pub fn new() -> Self {
213        Self::default()
214    }
215
216    /// Records a compaction start.
217    pub fn record_start(&self) {
218        self.started.fetch_add(1, Ordering::Relaxed);
219    }
220
221    /// Records a compaction completion.
222    pub fn record_complete(&self, chunks: usize) {
223        self.completed.fetch_add(1, Ordering::Relaxed);
224        self.total_chunks
225            .fetch_add(chunks as u64, Ordering::Relaxed);
226    }
227
228    /// Records a compaction cancellation.
229    pub fn record_cancel(&self, chunks: usize) {
230        self.cancelled.fetch_add(1, Ordering::Relaxed);
231        self.total_chunks
232            .fetch_add(chunks as u64, Ordering::Relaxed);
233    }
234
235    /// Returns snapshot of current stats.
236    #[must_use]
237    pub fn snapshot(&self) -> InterruptibleStatsSnapshot {
238        InterruptibleStatsSnapshot {
239            started: self.started.load(Ordering::Relaxed),
240            completed: self.completed.load(Ordering::Relaxed),
241            cancelled: self.cancelled.load(Ordering::Relaxed),
242            total_chunks: self.total_chunks.load(Ordering::Relaxed),
243        }
244    }
245}
246
247/// Snapshot of interruptible stats.
248#[derive(Debug, Clone, Copy, Default)]
249pub struct InterruptibleStatsSnapshot {
250    /// Number of compactions started.
251    pub started: u64,
252    /// Number of compactions completed.
253    pub completed: u64,
254    /// Number of compactions cancelled.
255    pub cancelled: u64,
256    /// Total chunks processed.
257    pub total_chunks: u64,
258}
259
260impl InterruptibleStatsSnapshot {
261    /// Returns completion rate as percentage.
262    #[must_use]
263    pub fn completion_rate(&self) -> u8 {
264        if self.started == 0 {
265            return 100;
266        }
267        let rate = (self.completed.saturating_mul(100)) / self.started;
268        rate.min(100) as u8
269    }
270
271    /// Returns cancellation rate as percentage.
272    #[must_use]
273    pub fn cancellation_rate(&self) -> u8 {
274        if self.started == 0 {
275            return 0;
276        }
277        let rate = (self.cancelled.saturating_mul(100)) / self.started;
278        rate.min(100) as u8
279    }
280}
281
282/// Performs interruptible compaction with yield points.
283///
284/// Processes delta edges in chunks, calling the yield callback between chunks.
285/// This allows the caller to release locks, check for cancellation, or perform
286/// other cooperative scheduling tasks.
287///
288/// # Arguments
289///
290/// * `snapshot` - Snapshot of delta edges to compact
291/// * `token` - Cancellation token to check between chunks
292/// * `config` - Interruptible compaction configuration
293/// * `on_yield` - Callback invoked after each chunk with progress info
294///
295/// # Returns
296///
297/// Returns merged edges and stats, or error if compaction failed.
298///
299/// # Errors
300///
301/// Returns `CompactionError::Interrupted` if cancellation was requested.
302pub fn compact_interruptible<F>(
303    snapshot: &CompactionSnapshot,
304    token: &CancellationToken,
305    config: &InterruptibleConfig,
306    mut on_yield: F,
307) -> Result<InterruptibleResult, CompactionError>
308where
309    F: FnMut(&CompactionProgress),
310{
311    let total_edges = snapshot.delta_edges.len();
312    let chunk_size = config.chunk_size;
313    let total_chunks = if total_edges == 0 {
314        0
315    } else {
316        total_edges.div_ceil(chunk_size)
317    };
318
319    // Check for early cancellation
320    if config.check_cancellation && token.is_cancelled() {
321        return Err(CompactionError::Interrupted {
322            reason: InterruptReason::CancellationRequested,
323            edges_processed: 0,
324            edges_total: total_edges,
325        });
326    }
327
328    // For small snapshots, process in one go
329    if total_edges <= chunk_size {
330        let (merged_edges, merge_stats) = merge_delta_edges(snapshot.delta_edges.clone());
331
332        // Final yield with complete status
333        let progress = CompactionProgress {
334            total_edges,
335            edges_processed: total_edges,
336            current_chunk: 1,
337            total_chunks: 1.max(total_chunks),
338        };
339        on_yield(&progress);
340
341        return Ok(InterruptibleResult {
342            merged_edges,
343            merge_stats,
344            chunks_processed: 1,
345            was_cancelled: false,
346        });
347    }
348
349    // Process in chunks - collect winning operations (preserving removes)
350    // This fixes the cross-chunk remove bug: removes in later chunks
351    // can now correctly cancel adds from earlier chunks.
352    let mut all_winners: Vec<DeltaEdge> = Vec::new();
353    let mut edges_processed = 0;
354    let mut chunk_dedup_count = 0;
355
356    for (chunk_idx, chunk) in snapshot.delta_edges.chunks(chunk_size).enumerate() {
357        // Check cancellation before processing chunk
358        if config.check_cancellation && token.is_cancelled() {
359            return Err(CompactionError::Interrupted {
360                reason: InterruptReason::CancellationRequested,
361                edges_processed,
362                edges_total: total_edges,
363            });
364        }
365
366        // Deduplicate within chunk, preserving op types (including removes)
367        let chunk_input_count = chunk.len();
368        let chunk_winners = dedupe_chunk_preserving_ops(chunk.to_vec());
369        chunk_dedup_count += chunk_input_count - chunk_winners.len();
370
371        all_winners.extend(chunk_winners);
372        edges_processed += chunk.len();
373
374        // Yield point
375        let progress = CompactionProgress {
376            total_edges,
377            edges_processed,
378            current_chunk: chunk_idx + 1,
379            total_chunks,
380        };
381        on_yield(&progress);
382    }
383
384    // Check cancellation before final global merge (which can be expensive)
385    if config.check_cancellation && token.is_cancelled() {
386        return Err(CompactionError::Interrupted {
387            reason: InterruptReason::CancellationRequested,
388            edges_processed,
389            edges_total: total_edges,
390        });
391    }
392
393    // Final global merge: apply LWW across all chunks, then filter removes
394    // This correctly handles cross-chunk removes canceling adds
395    let (final_merged, final_stats) = merge_winners_global(all_winners);
396
397    Ok(InterruptibleResult {
398        merged_edges: final_merged,
399        merge_stats: MergeStats {
400            input_count: total_edges,
401            output_count: final_stats.output_count,
402            deduplicated_count: chunk_dedup_count + final_stats.deduplicated_count,
403            removed_count: final_stats.removed_count,
404        },
405        chunks_processed: total_chunks,
406        was_cancelled: false,
407    })
408}
409
410/// Deduplicates delta edges within a chunk, preserving operation types.
411///
412/// This function applies LWW within a chunk but does NOT filter removes.
413/// The winning operation (Add or Remove) for each edge key is preserved
414/// for later cross-chunk reconciliation.
415///
416/// # Arguments
417///
418/// * `edges` - Delta edges to deduplicate
419///
420/// # Returns
421///
422/// Vector of winning delta operations (including removes)
423fn dedupe_chunk_preserving_ops(mut edges: Vec<DeltaEdge>) -> Vec<DeltaEdge> {
424    use std::cmp::Ordering;
425
426    if edges.is_empty() {
427        return vec![];
428    }
429
430    // Sort by edge key, then by DESCENDING seq (highest seq first)
431    edges.sort_by(|a, b| {
432        let key_a = a.edge_key();
433        let key_b = b.edge_key();
434
435        // Compare by source
436        match a.source.index().cmp(&b.source.index()) {
437            Ordering::Equal => {} // Continue to next comparison
438            other => return other,
439        }
440        match a.source.generation().cmp(&b.source.generation()) {
441            Ordering::Equal => {} // Continue to next comparison
442            other => return other,
443        }
444
445        // Compare by target
446        match a.target.index().cmp(&b.target.index()) {
447            Ordering::Equal => {} // Continue to next comparison
448            other => return other,
449        }
450        match a.target.generation().cmp(&b.target.generation()) {
451            Ordering::Equal => {} // Continue to next comparison
452            other => return other,
453        }
454
455        // Compare by kind (using debug format for stability)
456        match format!("{:?}", key_a.kind).cmp(&format!("{:?}", key_b.kind)) {
457            Ordering::Equal => {} // Continue to final seq comparison
458            other => return other,
459        }
460
461        // DESCENDING seq (highest first)
462        b.seq.cmp(&a.seq)
463    });
464
465    // Dedup by edge key (keeps first = highest seq due to descending sort)
466    edges.dedup_by(|a, b| a.edge_key() == b.edge_key());
467
468    edges
469}
470
471/// Merges winning delta operations from all chunks with global LWW semantics.
472///
473/// This function performs the final cross-chunk merge, correctly handling
474/// the case where a Remove in one chunk should cancel an Add from another chunk.
475///
476/// # Algorithm
477///
478/// 1. Deduplicate all winning ops by edge key (LWW - highest seq wins)
479/// 2. Filter out removes (only Add operations survive)
480/// 3. Convert to `MergedEdge` for output
481///
482/// # Arguments
483///
484/// * `all_winners` - Winning delta operations from all chunks
485///
486/// # Returns
487///
488/// Tuple of (merged edges, statistics)
489fn merge_winners_global(all_winners: Vec<DeltaEdge>) -> (Vec<MergedEdge>, MergeStats) {
490    // Just use merge_delta_edges which handles this correctly
491    // It does: sort by key+desc_seq, dedup, filter removes
492    merge_delta_edges(all_winners)
493}
494
495#[cfg(test)]
496mod tests {
497    use super::*;
498    use crate::graph::unified::edge::{DeltaEdge, DeltaOp, EdgeKind};
499    use crate::graph::unified::file::FileId;
500    use crate::graph::unified::node::NodeId;
501
502    fn make_delta_edge(src: u32, tgt: u32, seq: u64, is_remove: bool) -> DeltaEdge {
503        DeltaEdge {
504            source: NodeId::new(src, 0),
505            target: NodeId::new(tgt, 0),
506            kind: EdgeKind::Calls {
507                argument_count: 0,
508                is_async: false,
509            },
510            seq,
511            op: if is_remove {
512                DeltaOp::Remove
513            } else {
514                DeltaOp::Add
515            },
516            file: FileId::new(0),
517            spans: vec![],
518        }
519    }
520
521    fn make_snapshot(delta_edges: Vec<DeltaEdge>) -> CompactionSnapshot {
522        CompactionSnapshot {
523            csr_edges: Vec::new(),
524            delta_edges,
525            node_count: 100,
526            csr_version: 0,
527        }
528    }
529
530    #[test]
531    fn test_cancellation_token_default() {
532        let token = CancellationToken::default();
533        assert!(!token.is_cancelled());
534    }
535
536    #[test]
537    fn test_cancellation_token_cancel() {
538        let token = CancellationToken::new();
539        assert!(!token.is_cancelled());
540
541        token.cancel();
542        assert!(token.is_cancelled());
543    }
544
545    #[test]
546    fn test_cancellation_token_reset() {
547        let token = CancellationToken::new();
548        token.cancel();
549        assert!(token.is_cancelled());
550
551        token.reset();
552        assert!(!token.is_cancelled());
553    }
554
555    #[test]
556    fn test_cancellation_token_clone() {
557        let token1 = CancellationToken::new();
558        let token2 = token1.clone();
559
560        token1.cancel();
561        assert!(token2.is_cancelled());
562    }
563
564    #[test]
565    fn test_config_default() {
566        let config = InterruptibleConfig::default();
567        assert_eq!(config.chunk_size, DEFAULT_CHUNK_SIZE);
568        assert!(config.check_cancellation);
569    }
570
571    #[test]
572    fn test_config_custom_chunk_size() {
573        let config = InterruptibleConfig::with_chunk_size(5000);
574        assert_eq!(config.chunk_size, 5000);
575    }
576
577    #[test]
578    fn test_config_min_chunk_size() {
579        let config = InterruptibleConfig::with_chunk_size(0);
580        assert_eq!(config.chunk_size, 1); // Minimum is 1
581    }
582
583    #[test]
584    fn test_config_without_cancellation() {
585        let config = InterruptibleConfig::default().without_cancellation_check();
586        assert!(!config.check_cancellation);
587    }
588
589    #[test]
590    fn test_progress_percent_empty() {
591        let progress = CompactionProgress {
592            total_edges: 0,
593            edges_processed: 0,
594            current_chunk: 1,
595            total_chunks: 0,
596        };
597        assert_eq!(progress.percent_complete(), 100);
598        assert!(progress.is_complete());
599    }
600
601    #[test]
602    fn test_progress_percent_partial() {
603        let progress = CompactionProgress {
604            total_edges: 100,
605            edges_processed: 50,
606            current_chunk: 1,
607            total_chunks: 2,
608        };
609        assert_eq!(progress.percent_complete(), 50);
610        assert!(!progress.is_complete());
611    }
612
613    #[test]
614    fn test_progress_display() {
615        let progress = CompactionProgress {
616            total_edges: 100,
617            edges_processed: 50,
618            current_chunk: 1,
619            total_chunks: 2,
620        };
621        let display = format!("{progress}");
622        assert!(display.contains("1/2"));
623        assert!(display.contains("50/100"));
624        assert!(display.contains("50%"));
625    }
626
627    #[test]
628    fn test_compact_empty_snapshot() {
629        let snapshot = make_snapshot(vec![]);
630        let token = CancellationToken::new();
631        let config = InterruptibleConfig::default();
632        let mut yields = 0;
633
634        let result = compact_interruptible(&snapshot, &token, &config, |_| {
635            yields += 1;
636        })
637        .unwrap();
638
639        assert!(result.is_complete());
640        assert!(result.merged_edges.is_empty());
641        assert_eq!(yields, 1); // One yield for empty
642    }
643
644    #[test]
645    fn test_compact_small_snapshot() {
646        let snapshot = make_snapshot(vec![
647            make_delta_edge(0, 1, 1, false),
648            make_delta_edge(1, 2, 2, false),
649        ]);
650        let token = CancellationToken::new();
651        let config = InterruptibleConfig::default();
652        let mut yields = 0;
653
654        let result = compact_interruptible(&snapshot, &token, &config, |_| {
655            yields += 1;
656        })
657        .unwrap();
658
659        assert!(result.is_complete());
660        assert_eq!(result.merged_edges.len(), 2);
661        assert_eq!(yields, 1);
662    }
663
664    #[test]
665    fn test_compact_with_chunks() {
666        // Create more edges than default chunk size
667        let mut edges = Vec::new();
668        for i in 0..100 {
669            edges.push(make_delta_edge(i, i + 1, u64::from(i), false));
670        }
671
672        let snapshot = make_snapshot(edges);
673        let token = CancellationToken::new();
674        let config = InterruptibleConfig::with_chunk_size(30); // 4 chunks of ~30
675        let mut yield_count = 0;
676        let mut last_progress = None;
677
678        let result = compact_interruptible(&snapshot, &token, &config, |progress| {
679            yield_count += 1;
680            last_progress = Some(*progress);
681        })
682        .unwrap();
683
684        assert!(result.is_complete());
685        assert_eq!(result.merged_edges.len(), 100);
686        assert!(yield_count >= 3); // At least 3 yields for 4 chunks
687        assert!(last_progress.unwrap().is_complete());
688    }
689
690    #[test]
691    fn test_compact_early_cancellation() {
692        let snapshot = make_snapshot(vec![make_delta_edge(0, 1, 1, false)]);
693        let token = CancellationToken::new();
694        token.cancel(); // Cancel before start
695        let config = InterruptibleConfig::default();
696
697        let result = compact_interruptible(&snapshot, &token, &config, |_| {});
698
699        match result {
700            Err(CompactionError::Interrupted {
701                reason: InterruptReason::CancellationRequested,
702                edges_processed: 0,
703                ..
704            }) => {}
705            _ => panic!("expected Interrupted error"),
706        }
707    }
708
709    #[test]
710    fn test_compact_mid_cancellation() {
711        let mut edges = Vec::new();
712        for i in 0..100 {
713            edges.push(make_delta_edge(i, i + 1, u64::from(i), false));
714        }
715
716        let snapshot = make_snapshot(edges);
717        let token = CancellationToken::new();
718        let config = InterruptibleConfig::with_chunk_size(10);
719
720        let result = compact_interruptible(&snapshot, &token, &config, |progress| {
721            if progress.current_chunk == 2 {
722                token.cancel();
723            }
724        });
725
726        match result {
727            Err(CompactionError::Interrupted {
728                reason: InterruptReason::CancellationRequested,
729                edges_processed,
730                ..
731            }) => {
732                assert!(edges_processed > 0); // Some processed
733                assert!(edges_processed < 100); // Not all
734            }
735            _ => panic!("expected Interrupted error"),
736        }
737    }
738
739    #[test]
740    fn test_compact_with_removes() {
741        let snapshot = make_snapshot(vec![
742            make_delta_edge(0, 1, 1, false), // Add
743            make_delta_edge(0, 1, 2, true),  // Remove (wins)
744            make_delta_edge(1, 2, 3, false), // Add
745        ]);
746        let token = CancellationToken::new();
747        let config = InterruptibleConfig::default();
748
749        let result = compact_interruptible(&snapshot, &token, &config, |_| {}).unwrap();
750
751        // Only edge 1->2 should survive
752        assert_eq!(result.merged_edges.len(), 1);
753        assert_eq!(result.merged_edges[0].source.index(), 1);
754        assert_eq!(result.merged_edges[0].target.index(), 2);
755    }
756
757    #[test]
758    fn test_stats_tracking() {
759        let stats = InterruptibleStats::new();
760        assert_eq!(stats.snapshot().started, 0);
761
762        stats.record_start();
763        stats.record_start();
764        assert_eq!(stats.snapshot().started, 2);
765
766        stats.record_complete(5);
767        assert_eq!(stats.snapshot().completed, 1);
768        assert_eq!(stats.snapshot().total_chunks, 5);
769
770        stats.record_cancel(3);
771        assert_eq!(stats.snapshot().cancelled, 1);
772        assert_eq!(stats.snapshot().total_chunks, 8);
773    }
774
775    #[test]
776    fn test_stats_rates() {
777        let mut snapshot = InterruptibleStatsSnapshot::default();
778        assert_eq!(snapshot.completion_rate(), 100);
779        assert_eq!(snapshot.cancellation_rate(), 0);
780
781        snapshot.started = 10;
782        snapshot.completed = 7;
783        snapshot.cancelled = 3;
784
785        assert_eq!(snapshot.completion_rate(), 70);
786        assert_eq!(snapshot.cancellation_rate(), 30);
787    }
788
789    #[test]
790    fn test_interruptible_result_is_complete() {
791        let result = InterruptibleResult {
792            merged_edges: vec![],
793            merge_stats: MergeStats::default(),
794            chunks_processed: 1,
795            was_cancelled: false,
796        };
797        assert!(result.is_complete());
798
799        let result = InterruptibleResult {
800            merged_edges: vec![],
801            merge_stats: MergeStats::default(),
802            chunks_processed: 1,
803            was_cancelled: true,
804        };
805        assert!(!result.is_complete());
806    }
807
808    #[test]
809    fn test_no_cancellation_check() {
810        let snapshot = make_snapshot(vec![make_delta_edge(0, 1, 1, false)]);
811        let token = CancellationToken::new();
812        token.cancel(); // Cancel, but won't be checked
813        let config = InterruptibleConfig::default().without_cancellation_check();
814
815        // Should complete despite cancellation
816        let result = compact_interruptible(&snapshot, &token, &config, |_| {}).unwrap();
817        assert!(result.is_complete());
818    }
819
820    /// Test that Remove operations in later chunks correctly cancel
821    /// Add operations from earlier chunks (cross-chunk LWW semantics).
822    ///
823    /// This is a regression test for the critical bug where cross-chunk
824    /// removes were dropped, causing deleted edges to be resurrected.
825    #[test]
826    fn test_cross_chunk_remove_cancels_add() {
827        // Create edges that will be split across chunks:
828        // - First 5 edges are regular adds (will go in chunk 1)
829        // - Edge 0->1 appears again as Remove with higher seq (will go in chunk 2)
830        let mut edges = Vec::new();
831
832        // Chunk 1: Add edges 0->1, 1->2, 2->3, 3->4, 4->5
833        for i in 0..5 {
834            edges.push(make_delta_edge(i, i + 1, u64::from(i + 1), false));
835        }
836
837        // Chunk 2: Remove edge 0->1 with higher seq (should cancel the add)
838        // Plus some more adds to fill the chunk
839        edges.push(make_delta_edge(0, 1, 100, true)); // Remove with seq=100, higher than add seq=1
840        for i in 10..14 {
841            edges.push(make_delta_edge(i, i + 1, u64::from(i + 1), false));
842        }
843
844        let snapshot = make_snapshot(edges);
845        let token = CancellationToken::new();
846        // Use chunk size of 5 to ensure edges are split across chunks
847        let config = InterruptibleConfig::with_chunk_size(5);
848
849        let result = compact_interruptible(&snapshot, &token, &config, |_| {}).unwrap();
850
851        assert!(result.is_complete());
852
853        // Edge 0->1 should NOT be in the result (Remove won)
854        let has_edge_0_1 = result
855            .merged_edges
856            .iter()
857            .any(|e| e.source.index() == 0 && e.target.index() == 1);
858        assert!(
859            !has_edge_0_1,
860            "Edge 0->1 should be removed by cross-chunk Remove operation"
861        );
862
863        // Other edges should survive
864        // Edges 1->2, 2->3, 3->4, 4->5 from chunk 1
865        // Edges 10->11, 11->12, 12->13, 13->14 from chunk 2
866        assert_eq!(result.merged_edges.len(), 8);
867
868        // Verify the removed_count reflects the Remove that won
869        assert_eq!(result.merge_stats.removed_count, 1);
870    }
871
872    /// Test that Add operations in later chunks correctly win over
873    /// Remove operations from earlier chunks (cross-chunk LWW semantics).
874    #[test]
875    fn test_cross_chunk_add_wins_over_remove() {
876        let mut edges = Vec::new();
877
878        // Chunk 1: Remove edge 0->1 with seq=1
879        edges.push(make_delta_edge(0, 1, 1, true)); // Remove
880        for i in 1..5 {
881            edges.push(make_delta_edge(i, i + 1, u64::from(i + 1), false));
882        }
883
884        // Chunk 2: Add edge 0->1 with higher seq (should resurrect the edge)
885        edges.push(make_delta_edge(0, 1, 100, false)); // Add with seq=100
886        for i in 10..14 {
887            edges.push(make_delta_edge(i, i + 1, u64::from(i + 1), false));
888        }
889
890        let snapshot = make_snapshot(edges);
891        let token = CancellationToken::new();
892        let config = InterruptibleConfig::with_chunk_size(5);
893
894        let result = compact_interruptible(&snapshot, &token, &config, |_| {}).unwrap();
895
896        // Edge 0->1 SHOULD be in the result (Add with higher seq won)
897        let edge_0_1 = result
898            .merged_edges
899            .iter()
900            .find(|e| e.source.index() == 0 && e.target.index() == 1);
901        assert!(
902            edge_0_1.is_some(),
903            "Edge 0->1 should be present (Add with higher seq won)"
904        );
905        assert_eq!(edge_0_1.unwrap().seq, 100);
906    }
907}