1use std::fmt;
39use std::sync::Arc;
40use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
41
42use super::build::CompactionSnapshot;
43use super::errors::{CompactionError, InterruptReason};
44use super::merge::{MergeStats, MergedEdge, merge_delta_edges};
45use crate::graph::unified::edge::DeltaEdge;
46
47pub const DEFAULT_CHUNK_SIZE: usize = 10_000;
49
50#[derive(Debug, Clone)]
55pub struct CancellationToken {
56 cancelled: Arc<AtomicBool>,
57}
58
59impl Default for CancellationToken {
60 fn default() -> Self {
61 Self::new()
62 }
63}
64
65impl CancellationToken {
66 #[must_use]
68 pub fn new() -> Self {
69 Self {
70 cancelled: Arc::new(AtomicBool::new(false)),
71 }
72 }
73
74 pub fn cancel(&self) {
76 self.cancelled.store(true, Ordering::Release);
77 }
78
79 #[must_use]
81 pub fn is_cancelled(&self) -> bool {
82 self.cancelled.load(Ordering::Acquire)
83 }
84
85 pub fn reset(&self) {
87 self.cancelled.store(false, Ordering::Release);
88 }
89}
90
91#[derive(Debug, Clone, Copy)]
93pub struct InterruptibleConfig {
94 pub chunk_size: usize,
97
98 pub check_cancellation: bool,
101}
102
103impl Default for InterruptibleConfig {
104 fn default() -> Self {
105 Self {
106 chunk_size: DEFAULT_CHUNK_SIZE,
107 check_cancellation: true,
108 }
109 }
110}
111
112impl InterruptibleConfig {
113 #[must_use]
115 pub fn with_chunk_size(chunk_size: usize) -> Self {
116 Self {
117 chunk_size: chunk_size.max(1), ..Default::default()
119 }
120 }
121
122 #[must_use]
124 pub fn without_cancellation_check(mut self) -> Self {
125 self.check_cancellation = false;
126 self
127 }
128}
129
130#[derive(Debug, Clone, Copy)]
132pub struct CompactionProgress {
133 pub total_edges: usize,
135 pub edges_processed: usize,
137 pub current_chunk: usize,
139 pub total_chunks: usize,
141}
142
143impl CompactionProgress {
144 #[must_use]
146 pub fn percent_complete(&self) -> u8 {
147 if self.total_edges == 0 {
148 return 100;
149 }
150 let pct = (self.edges_processed.saturating_mul(100)) / self.total_edges;
151 u8::try_from(pct.min(100)).unwrap_or(u8::MAX)
152 }
153
154 #[must_use]
156 pub fn is_complete(&self) -> bool {
157 self.edges_processed >= self.total_edges
158 }
159}
160
161impl fmt::Display for CompactionProgress {
162 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
163 write!(
164 f,
165 "chunk {}/{}: {}/{} edges ({}%)",
166 self.current_chunk,
167 self.total_chunks,
168 self.edges_processed,
169 self.total_edges,
170 self.percent_complete()
171 )
172 }
173}
174
175#[derive(Debug)]
177pub struct InterruptibleResult {
178 pub merged_edges: Vec<MergedEdge>,
180 pub merge_stats: MergeStats,
182 pub chunks_processed: usize,
184 pub was_cancelled: bool,
186}
187
188impl InterruptibleResult {
189 #[must_use]
191 pub fn is_complete(&self) -> bool {
192 !self.was_cancelled
193 }
194}
195
196#[derive(Debug, Default)]
198pub struct InterruptibleStats {
199 pub started: AtomicU64,
201 pub completed: AtomicU64,
203 pub cancelled: AtomicU64,
205 pub total_chunks: AtomicU64,
207}
208
209impl InterruptibleStats {
210 #[must_use]
212 pub fn new() -> Self {
213 Self::default()
214 }
215
216 pub fn record_start(&self) {
218 self.started.fetch_add(1, Ordering::Relaxed);
219 }
220
221 pub fn record_complete(&self, chunks: usize) {
223 self.completed.fetch_add(1, Ordering::Relaxed);
224 self.total_chunks
225 .fetch_add(chunks as u64, Ordering::Relaxed);
226 }
227
228 pub fn record_cancel(&self, chunks: usize) {
230 self.cancelled.fetch_add(1, Ordering::Relaxed);
231 self.total_chunks
232 .fetch_add(chunks as u64, Ordering::Relaxed);
233 }
234
235 #[must_use]
237 pub fn snapshot(&self) -> InterruptibleStatsSnapshot {
238 InterruptibleStatsSnapshot {
239 started: self.started.load(Ordering::Relaxed),
240 completed: self.completed.load(Ordering::Relaxed),
241 cancelled: self.cancelled.load(Ordering::Relaxed),
242 total_chunks: self.total_chunks.load(Ordering::Relaxed),
243 }
244 }
245}
246
247#[derive(Debug, Clone, Copy, Default)]
249pub struct InterruptibleStatsSnapshot {
250 pub started: u64,
252 pub completed: u64,
254 pub cancelled: u64,
256 pub total_chunks: u64,
258}
259
260impl InterruptibleStatsSnapshot {
261 #[must_use]
263 pub fn completion_rate(&self) -> u8 {
264 if self.started == 0 {
265 return 100;
266 }
267 let rate = (self.completed.saturating_mul(100)) / self.started;
268 rate.min(100) as u8
269 }
270
271 #[must_use]
273 pub fn cancellation_rate(&self) -> u8 {
274 if self.started == 0 {
275 return 0;
276 }
277 let rate = (self.cancelled.saturating_mul(100)) / self.started;
278 rate.min(100) as u8
279 }
280}
281
282pub fn compact_interruptible<F>(
303 snapshot: &CompactionSnapshot,
304 token: &CancellationToken,
305 config: &InterruptibleConfig,
306 mut on_yield: F,
307) -> Result<InterruptibleResult, CompactionError>
308where
309 F: FnMut(&CompactionProgress),
310{
311 let total_edges = snapshot.delta_edges.len();
312 let chunk_size = config.chunk_size;
313 let total_chunks = if total_edges == 0 {
314 0
315 } else {
316 total_edges.div_ceil(chunk_size)
317 };
318
319 if config.check_cancellation && token.is_cancelled() {
321 return Err(CompactionError::Interrupted {
322 reason: InterruptReason::CancellationRequested,
323 edges_processed: 0,
324 edges_total: total_edges,
325 });
326 }
327
328 if total_edges <= chunk_size {
330 let (merged_edges, merge_stats) = merge_delta_edges(snapshot.delta_edges.clone());
331
332 let progress = CompactionProgress {
334 total_edges,
335 edges_processed: total_edges,
336 current_chunk: 1,
337 total_chunks: 1.max(total_chunks),
338 };
339 on_yield(&progress);
340
341 return Ok(InterruptibleResult {
342 merged_edges,
343 merge_stats,
344 chunks_processed: 1,
345 was_cancelled: false,
346 });
347 }
348
349 let mut all_winners: Vec<DeltaEdge> = Vec::new();
353 let mut edges_processed = 0;
354 let mut chunk_dedup_count = 0;
355
356 for (chunk_idx, chunk) in snapshot.delta_edges.chunks(chunk_size).enumerate() {
357 if config.check_cancellation && token.is_cancelled() {
359 return Err(CompactionError::Interrupted {
360 reason: InterruptReason::CancellationRequested,
361 edges_processed,
362 edges_total: total_edges,
363 });
364 }
365
366 let chunk_input_count = chunk.len();
368 let chunk_winners = dedupe_chunk_preserving_ops(chunk.to_vec());
369 chunk_dedup_count += chunk_input_count - chunk_winners.len();
370
371 all_winners.extend(chunk_winners);
372 edges_processed += chunk.len();
373
374 let progress = CompactionProgress {
376 total_edges,
377 edges_processed,
378 current_chunk: chunk_idx + 1,
379 total_chunks,
380 };
381 on_yield(&progress);
382 }
383
384 if config.check_cancellation && token.is_cancelled() {
386 return Err(CompactionError::Interrupted {
387 reason: InterruptReason::CancellationRequested,
388 edges_processed,
389 edges_total: total_edges,
390 });
391 }
392
393 let (final_merged, final_stats) = merge_winners_global(all_winners);
396
397 Ok(InterruptibleResult {
398 merged_edges: final_merged,
399 merge_stats: MergeStats {
400 input_count: total_edges,
401 output_count: final_stats.output_count,
402 deduplicated_count: chunk_dedup_count + final_stats.deduplicated_count,
403 removed_count: final_stats.removed_count,
404 },
405 chunks_processed: total_chunks,
406 was_cancelled: false,
407 })
408}
409
410fn dedupe_chunk_preserving_ops(mut edges: Vec<DeltaEdge>) -> Vec<DeltaEdge> {
424 use std::cmp::Ordering;
425
426 if edges.is_empty() {
427 return vec![];
428 }
429
430 edges.sort_by(|a, b| {
432 let key_a = a.edge_key();
433 let key_b = b.edge_key();
434
435 match a.source.index().cmp(&b.source.index()) {
437 Ordering::Equal => {} other => return other,
439 }
440 match a.source.generation().cmp(&b.source.generation()) {
441 Ordering::Equal => {} other => return other,
443 }
444
445 match a.target.index().cmp(&b.target.index()) {
447 Ordering::Equal => {} other => return other,
449 }
450 match a.target.generation().cmp(&b.target.generation()) {
451 Ordering::Equal => {} other => return other,
453 }
454
455 match format!("{:?}", key_a.kind).cmp(&format!("{:?}", key_b.kind)) {
457 Ordering::Equal => {} other => return other,
459 }
460
461 b.seq.cmp(&a.seq)
463 });
464
465 edges.dedup_by(|a, b| a.edge_key() == b.edge_key());
467
468 edges
469}
470
471fn merge_winners_global(all_winners: Vec<DeltaEdge>) -> (Vec<MergedEdge>, MergeStats) {
490 merge_delta_edges(all_winners)
493}
494
495#[cfg(test)]
496mod tests {
497 use super::*;
498 use crate::graph::unified::edge::{DeltaEdge, DeltaOp, EdgeKind};
499 use crate::graph::unified::file::FileId;
500 use crate::graph::unified::node::NodeId;
501
502 fn make_delta_edge(src: u32, tgt: u32, seq: u64, is_remove: bool) -> DeltaEdge {
503 DeltaEdge {
504 source: NodeId::new(src, 0),
505 target: NodeId::new(tgt, 0),
506 kind: EdgeKind::Calls {
507 argument_count: 0,
508 is_async: false,
509 },
510 seq,
511 op: if is_remove {
512 DeltaOp::Remove
513 } else {
514 DeltaOp::Add
515 },
516 file: FileId::new(0),
517 spans: vec![],
518 }
519 }
520
521 fn make_snapshot(delta_edges: Vec<DeltaEdge>) -> CompactionSnapshot {
522 CompactionSnapshot {
523 csr_edges: Vec::new(),
524 delta_edges,
525 node_count: 100,
526 csr_version: 0,
527 }
528 }
529
530 #[test]
531 fn test_cancellation_token_default() {
532 let token = CancellationToken::default();
533 assert!(!token.is_cancelled());
534 }
535
536 #[test]
537 fn test_cancellation_token_cancel() {
538 let token = CancellationToken::new();
539 assert!(!token.is_cancelled());
540
541 token.cancel();
542 assert!(token.is_cancelled());
543 }
544
545 #[test]
546 fn test_cancellation_token_reset() {
547 let token = CancellationToken::new();
548 token.cancel();
549 assert!(token.is_cancelled());
550
551 token.reset();
552 assert!(!token.is_cancelled());
553 }
554
555 #[test]
556 fn test_cancellation_token_clone() {
557 let token1 = CancellationToken::new();
558 let token2 = token1.clone();
559
560 token1.cancel();
561 assert!(token2.is_cancelled());
562 }
563
564 #[test]
565 fn test_config_default() {
566 let config = InterruptibleConfig::default();
567 assert_eq!(config.chunk_size, DEFAULT_CHUNK_SIZE);
568 assert!(config.check_cancellation);
569 }
570
571 #[test]
572 fn test_config_custom_chunk_size() {
573 let config = InterruptibleConfig::with_chunk_size(5000);
574 assert_eq!(config.chunk_size, 5000);
575 }
576
577 #[test]
578 fn test_config_min_chunk_size() {
579 let config = InterruptibleConfig::with_chunk_size(0);
580 assert_eq!(config.chunk_size, 1); }
582
583 #[test]
584 fn test_config_without_cancellation() {
585 let config = InterruptibleConfig::default().without_cancellation_check();
586 assert!(!config.check_cancellation);
587 }
588
589 #[test]
590 fn test_progress_percent_empty() {
591 let progress = CompactionProgress {
592 total_edges: 0,
593 edges_processed: 0,
594 current_chunk: 1,
595 total_chunks: 0,
596 };
597 assert_eq!(progress.percent_complete(), 100);
598 assert!(progress.is_complete());
599 }
600
601 #[test]
602 fn test_progress_percent_partial() {
603 let progress = CompactionProgress {
604 total_edges: 100,
605 edges_processed: 50,
606 current_chunk: 1,
607 total_chunks: 2,
608 };
609 assert_eq!(progress.percent_complete(), 50);
610 assert!(!progress.is_complete());
611 }
612
613 #[test]
614 fn test_progress_display() {
615 let progress = CompactionProgress {
616 total_edges: 100,
617 edges_processed: 50,
618 current_chunk: 1,
619 total_chunks: 2,
620 };
621 let display = format!("{progress}");
622 assert!(display.contains("1/2"));
623 assert!(display.contains("50/100"));
624 assert!(display.contains("50%"));
625 }
626
627 #[test]
628 fn test_compact_empty_snapshot() {
629 let snapshot = make_snapshot(vec![]);
630 let token = CancellationToken::new();
631 let config = InterruptibleConfig::default();
632 let mut yields = 0;
633
634 let result = compact_interruptible(&snapshot, &token, &config, |_| {
635 yields += 1;
636 })
637 .unwrap();
638
639 assert!(result.is_complete());
640 assert!(result.merged_edges.is_empty());
641 assert_eq!(yields, 1); }
643
644 #[test]
645 fn test_compact_small_snapshot() {
646 let snapshot = make_snapshot(vec![
647 make_delta_edge(0, 1, 1, false),
648 make_delta_edge(1, 2, 2, false),
649 ]);
650 let token = CancellationToken::new();
651 let config = InterruptibleConfig::default();
652 let mut yields = 0;
653
654 let result = compact_interruptible(&snapshot, &token, &config, |_| {
655 yields += 1;
656 })
657 .unwrap();
658
659 assert!(result.is_complete());
660 assert_eq!(result.merged_edges.len(), 2);
661 assert_eq!(yields, 1);
662 }
663
664 #[test]
665 fn test_compact_with_chunks() {
666 let mut edges = Vec::new();
668 for i in 0..100 {
669 edges.push(make_delta_edge(i, i + 1, u64::from(i), false));
670 }
671
672 let snapshot = make_snapshot(edges);
673 let token = CancellationToken::new();
674 let config = InterruptibleConfig::with_chunk_size(30); let mut yield_count = 0;
676 let mut last_progress = None;
677
678 let result = compact_interruptible(&snapshot, &token, &config, |progress| {
679 yield_count += 1;
680 last_progress = Some(*progress);
681 })
682 .unwrap();
683
684 assert!(result.is_complete());
685 assert_eq!(result.merged_edges.len(), 100);
686 assert!(yield_count >= 3); assert!(last_progress.unwrap().is_complete());
688 }
689
690 #[test]
691 fn test_compact_early_cancellation() {
692 let snapshot = make_snapshot(vec![make_delta_edge(0, 1, 1, false)]);
693 let token = CancellationToken::new();
694 token.cancel(); let config = InterruptibleConfig::default();
696
697 let result = compact_interruptible(&snapshot, &token, &config, |_| {});
698
699 match result {
700 Err(CompactionError::Interrupted {
701 reason: InterruptReason::CancellationRequested,
702 edges_processed: 0,
703 ..
704 }) => {}
705 _ => panic!("expected Interrupted error"),
706 }
707 }
708
709 #[test]
710 fn test_compact_mid_cancellation() {
711 let mut edges = Vec::new();
712 for i in 0..100 {
713 edges.push(make_delta_edge(i, i + 1, u64::from(i), false));
714 }
715
716 let snapshot = make_snapshot(edges);
717 let token = CancellationToken::new();
718 let config = InterruptibleConfig::with_chunk_size(10);
719
720 let result = compact_interruptible(&snapshot, &token, &config, |progress| {
721 if progress.current_chunk == 2 {
722 token.cancel();
723 }
724 });
725
726 match result {
727 Err(CompactionError::Interrupted {
728 reason: InterruptReason::CancellationRequested,
729 edges_processed,
730 ..
731 }) => {
732 assert!(edges_processed > 0); assert!(edges_processed < 100); }
735 _ => panic!("expected Interrupted error"),
736 }
737 }
738
739 #[test]
740 fn test_compact_with_removes() {
741 let snapshot = make_snapshot(vec![
742 make_delta_edge(0, 1, 1, false), make_delta_edge(0, 1, 2, true), make_delta_edge(1, 2, 3, false), ]);
746 let token = CancellationToken::new();
747 let config = InterruptibleConfig::default();
748
749 let result = compact_interruptible(&snapshot, &token, &config, |_| {}).unwrap();
750
751 assert_eq!(result.merged_edges.len(), 1);
753 assert_eq!(result.merged_edges[0].source.index(), 1);
754 assert_eq!(result.merged_edges[0].target.index(), 2);
755 }
756
757 #[test]
758 fn test_stats_tracking() {
759 let stats = InterruptibleStats::new();
760 assert_eq!(stats.snapshot().started, 0);
761
762 stats.record_start();
763 stats.record_start();
764 assert_eq!(stats.snapshot().started, 2);
765
766 stats.record_complete(5);
767 assert_eq!(stats.snapshot().completed, 1);
768 assert_eq!(stats.snapshot().total_chunks, 5);
769
770 stats.record_cancel(3);
771 assert_eq!(stats.snapshot().cancelled, 1);
772 assert_eq!(stats.snapshot().total_chunks, 8);
773 }
774
775 #[test]
776 fn test_stats_rates() {
777 let mut snapshot = InterruptibleStatsSnapshot::default();
778 assert_eq!(snapshot.completion_rate(), 100);
779 assert_eq!(snapshot.cancellation_rate(), 0);
780
781 snapshot.started = 10;
782 snapshot.completed = 7;
783 snapshot.cancelled = 3;
784
785 assert_eq!(snapshot.completion_rate(), 70);
786 assert_eq!(snapshot.cancellation_rate(), 30);
787 }
788
789 #[test]
790 fn test_interruptible_result_is_complete() {
791 let result = InterruptibleResult {
792 merged_edges: vec![],
793 merge_stats: MergeStats::default(),
794 chunks_processed: 1,
795 was_cancelled: false,
796 };
797 assert!(result.is_complete());
798
799 let result = InterruptibleResult {
800 merged_edges: vec![],
801 merge_stats: MergeStats::default(),
802 chunks_processed: 1,
803 was_cancelled: true,
804 };
805 assert!(!result.is_complete());
806 }
807
808 #[test]
809 fn test_no_cancellation_check() {
810 let snapshot = make_snapshot(vec![make_delta_edge(0, 1, 1, false)]);
811 let token = CancellationToken::new();
812 token.cancel(); let config = InterruptibleConfig::default().without_cancellation_check();
814
815 let result = compact_interruptible(&snapshot, &token, &config, |_| {}).unwrap();
817 assert!(result.is_complete());
818 }
819
820 #[test]
826 fn test_cross_chunk_remove_cancels_add() {
827 let mut edges = Vec::new();
831
832 for i in 0..5 {
834 edges.push(make_delta_edge(i, i + 1, u64::from(i + 1), false));
835 }
836
837 edges.push(make_delta_edge(0, 1, 100, true)); for i in 10..14 {
841 edges.push(make_delta_edge(i, i + 1, u64::from(i + 1), false));
842 }
843
844 let snapshot = make_snapshot(edges);
845 let token = CancellationToken::new();
846 let config = InterruptibleConfig::with_chunk_size(5);
848
849 let result = compact_interruptible(&snapshot, &token, &config, |_| {}).unwrap();
850
851 assert!(result.is_complete());
852
853 let has_edge_0_1 = result
855 .merged_edges
856 .iter()
857 .any(|e| e.source.index() == 0 && e.target.index() == 1);
858 assert!(
859 !has_edge_0_1,
860 "Edge 0->1 should be removed by cross-chunk Remove operation"
861 );
862
863 assert_eq!(result.merged_edges.len(), 8);
867
868 assert_eq!(result.merge_stats.removed_count, 1);
870 }
871
872 #[test]
875 fn test_cross_chunk_add_wins_over_remove() {
876 let mut edges = Vec::new();
877
878 edges.push(make_delta_edge(0, 1, 1, true)); for i in 1..5 {
881 edges.push(make_delta_edge(i, i + 1, u64::from(i + 1), false));
882 }
883
884 edges.push(make_delta_edge(0, 1, 100, false)); for i in 10..14 {
887 edges.push(make_delta_edge(i, i + 1, u64::from(i + 1), false));
888 }
889
890 let snapshot = make_snapshot(edges);
891 let token = CancellationToken::new();
892 let config = InterruptibleConfig::with_chunk_size(5);
893
894 let result = compact_interruptible(&snapshot, &token, &config, |_| {}).unwrap();
895
896 let edge_0_1 = result
898 .merged_edges
899 .iter()
900 .find(|e| e.source.index() == 0 && e.target.index() == 1);
901 assert!(
902 edge_0_1.is_some(),
903 "Edge 0->1 should be present (Add with higher seq won)"
904 );
905 assert_eq!(edge_0_1.unwrap().seq, 100);
906 }
907}