Skip to main content

sqry_core/graph/unified/compaction/
checkpoint.rs

1//! `CompactionCheckpoint`: State snapshot for rollback during compaction.
2//!
3//! This module implements checkpoint/restore functionality for atomic
4//! bidirectional compaction operations.
5//!
6//! # Design
7//!
8//! - ****: Checkpoint captures complete state for rollback
9//! - ****: Rollback restores both committed and reserved counters
10//!
11//! # Usage
12//!
13//! Checkpoints are created before attempting compaction, and restored
14//! if the compaction fails mid-way (e.g., forward succeeds but reverse fails).
15//!
16//! ```rust,ignore
17//! use sqry_core::graph::unified::compaction::CompactionCheckpoint;
18//!
19//! // Create checkpoint before compaction
20//! let checkpoint = CompactionCheckpoint::capture(&edge_store, &buffer_state);
21//!
22//! // Attempt compaction...
23//! if let Err(e) = compact_operation() {
24//!     // Rollback on failure
25//!     checkpoint.restore(&mut edge_store, &buffer_state);
26//! }
27//! ```
28//!
29//! # Thread Safety
30//!
31//! Checkpoints capture a consistent snapshot at creation time.
32//! The restore operation requires exclusive access to the edge store
33//! and asserts that no reservation guards are active.
34
35use std::fmt;
36
37use super::super::admission::BufferStateSnapshot;
38
39/// Counter state snapshot for checkpoint/restore.
40///
41/// Captures the committed and reserved counter values at checkpoint time.
42#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
43pub struct CounterCheckpoint {
44    /// Committed bytes at checkpoint time
45    pub committed_bytes: usize,
46    /// Committed operations at checkpoint time
47    pub committed_ops: usize,
48    /// Reserved bytes at checkpoint time
49    pub reserved_bytes: usize,
50    /// Reserved operations at checkpoint time
51    pub reserved_ops: usize,
52}
53
54impl CounterCheckpoint {
55    /// Creates a new counter checkpoint.
56    #[must_use]
57    pub fn new(
58        committed_bytes: usize,
59        committed_ops: usize,
60        reserved_bytes: usize,
61        reserved_ops: usize,
62    ) -> Self {
63        Self {
64            committed_bytes,
65            committed_ops,
66            reserved_bytes,
67            reserved_ops,
68        }
69    }
70
71    /// Creates a checkpoint from a buffer state snapshot.
72    #[must_use]
73    pub fn from_snapshot(snapshot: &BufferStateSnapshot) -> Self {
74        Self {
75            committed_bytes: snapshot.committed_bytes,
76            committed_ops: snapshot.committed_ops,
77            reserved_bytes: snapshot.reserved_bytes,
78            reserved_ops: snapshot.reserved_ops,
79        }
80    }
81
82    /// Returns total bytes (committed + reserved).
83    #[must_use]
84    #[inline]
85    pub const fn total_bytes(&self) -> usize {
86        self.committed_bytes + self.reserved_bytes
87    }
88
89    /// Returns total operations (committed + reserved).
90    #[must_use]
91    #[inline]
92    pub const fn total_ops(&self) -> usize {
93        self.committed_ops + self.reserved_ops
94    }
95}
96
97impl fmt::Display for CounterCheckpoint {
98    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
99        write!(
100            f,
101            "bytes: {} committed + {} reserved, ops: {} committed + {} reserved",
102            self.committed_bytes, self.reserved_bytes, self.committed_ops, self.reserved_ops
103        )
104    }
105}
106
107/// Edge store state snapshot for checkpoint/restore.
108///
109/// Captures the CSR version, delta buffer size, and sequence counter
110/// at checkpoint time. This lightweight checkpoint doesn't clone the
111/// actual data - it just captures version/size info for validation.
112#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
113pub struct EdgeStoreCheckpoint {
114    /// CSR version number at checkpoint time
115    pub csr_version: u64,
116    /// Number of delta edges at checkpoint time
117    pub delta_edge_count: usize,
118    /// Delta buffer byte size at checkpoint time
119    pub delta_byte_size: usize,
120    /// Sequence counter value at checkpoint time
121    pub seq_counter: u64,
122    /// Number of tombstones at checkpoint time
123    pub tombstone_count: usize,
124}
125
126impl EdgeStoreCheckpoint {
127    /// Creates a new edge store checkpoint.
128    #[must_use]
129    pub fn new(
130        csr_version: u64,
131        delta_edge_count: usize,
132        delta_byte_size: usize,
133        seq_counter: u64,
134        tombstone_count: usize,
135    ) -> Self {
136        Self {
137            csr_version,
138            delta_edge_count,
139            delta_byte_size,
140            seq_counter,
141            tombstone_count,
142        }
143    }
144
145    /// Returns true if the edge store state has changed since checkpoint.
146    #[must_use]
147    pub fn has_changed(
148        &self,
149        current_csr_version: u64,
150        current_delta_count: usize,
151        current_seq: u64,
152    ) -> bool {
153        self.csr_version != current_csr_version
154            || self.delta_edge_count != current_delta_count
155            || self.seq_counter != current_seq
156    }
157}
158
159impl fmt::Display for EdgeStoreCheckpoint {
160    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
161        write!(
162            f,
163            "csr_v{}, {} deltas ({} bytes), seq={}, {} tombstones",
164            self.csr_version,
165            self.delta_edge_count,
166            self.delta_byte_size,
167            self.seq_counter,
168            self.tombstone_count
169        )
170    }
171}
172
173/// Complete compaction checkpoint for bidirectional edge store.
174///
175/// Captures enough state to validate and restore after a failed
176/// compaction operation.
177///
178/// # State Captured
179///
180/// - Forward edge store: CSR version, delta size, seq counter, tombstones
181/// - Reverse edge store: CSR version, delta size, seq counter, tombstones
182/// - Counter state: committed and reserved bytes/ops
183///
184/// # Usage Notes
185///
186/// For full rollback with data restoration, the caller must retain
187/// cloned copies of the actual CSR and delta buffer data. This checkpoint
188/// provides the lightweight metadata needed for validation and counter
189/// restoration.
190#[derive(Debug, Clone, PartialEq, Eq)]
191pub struct CompactionCheckpoint {
192    /// Forward edge store state
193    pub forward: EdgeStoreCheckpoint,
194    /// Reverse edge store state
195    pub reverse: EdgeStoreCheckpoint,
196    /// Counter state
197    pub counters: CounterCheckpoint,
198    /// Timestamp when checkpoint was created (for diagnostics)
199    pub created_at_epoch_ms: u64,
200}
201
202impl CompactionCheckpoint {
203    /// Creates a new compaction checkpoint.
204    #[must_use]
205    pub fn new(
206        forward: EdgeStoreCheckpoint,
207        reverse: EdgeStoreCheckpoint,
208        counters: CounterCheckpoint,
209    ) -> Self {
210        Self {
211            forward,
212            reverse,
213            counters,
214            created_at_epoch_ms: current_epoch_ms(),
215        }
216    }
217
218    /// Creates a checkpoint from individual components.
219    #[must_use]
220    #[allow(clippy::too_many_arguments)]
221    pub fn from_components(
222        forward_csr_version: u64,
223        forward_delta_count: usize,
224        forward_delta_bytes: usize,
225        forward_seq: u64,
226        forward_tombstones: usize,
227        reverse_csr_version: u64,
228        reverse_delta_count: usize,
229        reverse_delta_bytes: usize,
230        reverse_seq: u64,
231        reverse_tombstones: usize,
232        committed_bytes: usize,
233        committed_ops: usize,
234        reserved_bytes: usize,
235        reserved_ops: usize,
236    ) -> Self {
237        Self::new(
238            EdgeStoreCheckpoint::new(
239                forward_csr_version,
240                forward_delta_count,
241                forward_delta_bytes,
242                forward_seq,
243                forward_tombstones,
244            ),
245            EdgeStoreCheckpoint::new(
246                reverse_csr_version,
247                reverse_delta_count,
248                reverse_delta_bytes,
249                reverse_seq,
250                reverse_tombstones,
251            ),
252            CounterCheckpoint::new(committed_bytes, committed_ops, reserved_bytes, reserved_ops),
253        )
254    }
255
256    /// Returns true if either store has changed since checkpoint.
257    #[must_use]
258    pub fn has_concurrent_modification(
259        &self,
260        forward_csr_version: u64,
261        forward_delta_count: usize,
262        forward_seq: u64,
263        reverse_csr_version: u64,
264        reverse_delta_count: usize,
265        reverse_seq: u64,
266    ) -> bool {
267        self.forward
268            .has_changed(forward_csr_version, forward_delta_count, forward_seq)
269            || self
270                .reverse
271                .has_changed(reverse_csr_version, reverse_delta_count, reverse_seq)
272    }
273
274    /// Returns the age of this checkpoint in milliseconds.
275    #[must_use]
276    pub fn age_ms(&self) -> u64 {
277        current_epoch_ms().saturating_sub(self.created_at_epoch_ms)
278    }
279
280    /// Returns statistics about this checkpoint.
281    #[must_use]
282    pub fn stats(&self) -> CheckpointStats {
283        CheckpointStats {
284            forward_delta_count: self.forward.delta_edge_count,
285            forward_delta_bytes: self.forward.delta_byte_size,
286            forward_tombstones: self.forward.tombstone_count,
287            reverse_delta_count: self.reverse.delta_edge_count,
288            reverse_delta_bytes: self.reverse.delta_byte_size,
289            reverse_tombstones: self.reverse.tombstone_count,
290            total_committed_bytes: self.counters.committed_bytes,
291            total_committed_ops: self.counters.committed_ops,
292            total_reserved_bytes: self.counters.reserved_bytes,
293            total_reserved_ops: self.counters.reserved_ops,
294        }
295    }
296}
297
298impl Default for CompactionCheckpoint {
299    fn default() -> Self {
300        Self {
301            forward: EdgeStoreCheckpoint::default(),
302            reverse: EdgeStoreCheckpoint::default(),
303            counters: CounterCheckpoint::default(),
304            created_at_epoch_ms: current_epoch_ms(),
305        }
306    }
307}
308
309impl fmt::Display for CompactionCheckpoint {
310    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
311        write!(
312            f,
313            "CompactionCheckpoint {{ forward: [{}], reverse: [{}], counters: [{}] }}",
314            self.forward, self.reverse, self.counters
315        )
316    }
317}
318
319/// Statistics about a checkpoint.
320#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
321pub struct CheckpointStats {
322    /// Forward delta edge count
323    pub forward_delta_count: usize,
324    /// Forward delta byte size
325    pub forward_delta_bytes: usize,
326    /// Forward tombstone count
327    pub forward_tombstones: usize,
328    /// Reverse delta edge count
329    pub reverse_delta_count: usize,
330    /// Reverse delta byte size
331    pub reverse_delta_bytes: usize,
332    /// Reverse tombstone count
333    pub reverse_tombstones: usize,
334    /// Total committed bytes
335    pub total_committed_bytes: usize,
336    /// Total committed operations
337    pub total_committed_ops: usize,
338    /// Total reserved bytes
339    pub total_reserved_bytes: usize,
340    /// Total reserved operations
341    pub total_reserved_ops: usize,
342}
343
344impl CheckpointStats {
345    /// Total delta edges across both stores.
346    #[must_use]
347    #[inline]
348    pub const fn total_delta_edges(&self) -> usize {
349        self.forward_delta_count + self.reverse_delta_count
350    }
351
352    /// Total delta bytes across both stores.
353    #[must_use]
354    #[inline]
355    pub const fn total_delta_bytes(&self) -> usize {
356        self.forward_delta_bytes + self.reverse_delta_bytes
357    }
358
359    /// Total tombstones across both stores.
360    #[must_use]
361    #[inline]
362    pub const fn total_tombstones(&self) -> usize {
363        self.forward_tombstones + self.reverse_tombstones
364    }
365}
366
367impl fmt::Display for CheckpointStats {
368    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
369        write!(
370            f,
371            "deltas: {} edges ({} bytes), tombstones: {}, committed: {} bytes/{} ops, reserved: {} bytes/{} ops",
372            self.total_delta_edges(),
373            self.total_delta_bytes(),
374            self.total_tombstones(),
375            self.total_committed_bytes,
376            self.total_committed_ops,
377            self.total_reserved_bytes,
378            self.total_reserved_ops
379        )
380    }
381}
382
383/// Returns the current epoch time in milliseconds.
384fn current_epoch_ms() -> u64 {
385    use std::time::{SystemTime, UNIX_EPOCH};
386
387    SystemTime::now()
388        .duration_since(UNIX_EPOCH)
389        .map(|d| u64::try_from(d.as_millis()).unwrap_or(u64::MAX))
390        .unwrap_or(0)
391}
392
393#[cfg(test)]
394mod tests {
395    use super::*;
396
397    #[test]
398    fn test_counter_checkpoint_new() {
399        let cp = CounterCheckpoint::new(100, 10, 50, 5);
400        assert_eq!(cp.committed_bytes, 100);
401        assert_eq!(cp.committed_ops, 10);
402        assert_eq!(cp.reserved_bytes, 50);
403        assert_eq!(cp.reserved_ops, 5);
404    }
405
406    #[test]
407    fn test_counter_checkpoint_totals() {
408        let cp = CounterCheckpoint::new(100, 10, 50, 5);
409        assert_eq!(cp.total_bytes(), 150);
410        assert_eq!(cp.total_ops(), 15);
411    }
412
413    #[test]
414    fn test_counter_checkpoint_from_snapshot() {
415        let snapshot = BufferStateSnapshot {
416            committed_bytes: 200,
417            committed_ops: 20,
418            reserved_bytes: 100,
419            reserved_ops: 10,
420            active_guards: 2,
421        };
422
423        let cp = CounterCheckpoint::from_snapshot(&snapshot);
424        assert_eq!(cp.committed_bytes, 200);
425        assert_eq!(cp.committed_ops, 20);
426        assert_eq!(cp.reserved_bytes, 100);
427        assert_eq!(cp.reserved_ops, 10);
428    }
429
430    #[test]
431    fn test_counter_checkpoint_display() {
432        let cp = CounterCheckpoint::new(100, 10, 50, 5);
433        let display = format!("{cp}");
434        assert!(display.contains("100 committed"));
435        assert!(display.contains("50 reserved"));
436    }
437
438    #[test]
439    fn test_edge_store_checkpoint_new() {
440        let cp = EdgeStoreCheckpoint::new(1, 100, 5000, 42, 10);
441        assert_eq!(cp.csr_version, 1);
442        assert_eq!(cp.delta_edge_count, 100);
443        assert_eq!(cp.delta_byte_size, 5000);
444        assert_eq!(cp.seq_counter, 42);
445        assert_eq!(cp.tombstone_count, 10);
446    }
447
448    #[test]
449    fn test_edge_store_checkpoint_has_changed() {
450        let cp = EdgeStoreCheckpoint::new(1, 100, 5000, 42, 10);
451
452        // No change
453        assert!(!cp.has_changed(1, 100, 42));
454
455        // CSR version changed
456        assert!(cp.has_changed(2, 100, 42));
457
458        // Delta count changed
459        assert!(cp.has_changed(1, 101, 42));
460
461        // Seq changed
462        assert!(cp.has_changed(1, 100, 43));
463    }
464
465    #[test]
466    fn test_edge_store_checkpoint_display() {
467        let cp = EdgeStoreCheckpoint::new(1, 100, 5000, 42, 10);
468        let display = format!("{cp}");
469        assert!(display.contains("csr_v1"));
470        assert!(display.contains("100 deltas"));
471        assert!(display.contains("seq=42"));
472    }
473
474    #[test]
475    fn test_compaction_checkpoint_new() {
476        let forward = EdgeStoreCheckpoint::new(1, 50, 2500, 20, 5);
477        let reverse = EdgeStoreCheckpoint::new(1, 50, 2500, 20, 5);
478        let counters = CounterCheckpoint::new(100, 10, 50, 5);
479
480        let cp = CompactionCheckpoint::new(forward, reverse, counters);
481        assert_eq!(cp.forward.delta_edge_count, 50);
482        assert_eq!(cp.reverse.delta_edge_count, 50);
483        assert_eq!(cp.counters.committed_bytes, 100);
484        assert!(cp.created_at_epoch_ms > 0);
485    }
486
487    #[test]
488    fn test_compaction_checkpoint_from_components() {
489        let cp = CompactionCheckpoint::from_components(
490            1, 50, 2500, 20, 5, // forward
491            2, 60, 3000, 25, 8, // reverse
492            100, 10, 50, 5, // counters
493        );
494
495        assert_eq!(cp.forward.csr_version, 1);
496        assert_eq!(cp.forward.delta_edge_count, 50);
497        assert_eq!(cp.reverse.csr_version, 2);
498        assert_eq!(cp.reverse.delta_edge_count, 60);
499        assert_eq!(cp.counters.committed_bytes, 100);
500    }
501
502    #[test]
503    fn test_compaction_checkpoint_has_concurrent_modification() {
504        let cp = CompactionCheckpoint::from_components(
505            1, 50, 2500, 20, 5, // forward
506            1, 60, 3000, 25, 8, // reverse
507            100, 10, 50, 5, // counters
508        );
509
510        // No change
511        assert!(!cp.has_concurrent_modification(1, 50, 20, 1, 60, 25));
512
513        // Forward changed
514        assert!(cp.has_concurrent_modification(2, 50, 20, 1, 60, 25));
515
516        // Reverse changed
517        assert!(cp.has_concurrent_modification(1, 50, 20, 2, 60, 25));
518    }
519
520    #[test]
521    fn test_compaction_checkpoint_stats() {
522        let cp = CompactionCheckpoint::from_components(
523            1, 50, 2500, 20, 5, // forward
524            1, 60, 3000, 25, 8, // reverse
525            100, 10, 50, 5, // counters
526        );
527
528        let stats = cp.stats();
529        assert_eq!(stats.total_delta_edges(), 110);
530        assert_eq!(stats.total_delta_bytes(), 5500);
531        assert_eq!(stats.total_tombstones(), 13);
532        assert_eq!(stats.total_committed_bytes, 100);
533    }
534
535    #[test]
536    fn test_compaction_checkpoint_age() {
537        let cp = CompactionCheckpoint::default();
538
539        // Small sleep to ensure some time passes
540        std::thread::sleep(std::time::Duration::from_millis(10));
541
542        let age = cp.age_ms();
543        assert!(age >= 10);
544    }
545
546    #[test]
547    fn test_checkpoint_stats_display() {
548        let stats = CheckpointStats {
549            forward_delta_count: 50,
550            forward_delta_bytes: 2500,
551            forward_tombstones: 5,
552            reverse_delta_count: 60,
553            reverse_delta_bytes: 3000,
554            reverse_tombstones: 8,
555            total_committed_bytes: 100,
556            total_committed_ops: 10,
557            total_reserved_bytes: 50,
558            total_reserved_ops: 5,
559        };
560
561        let display = format!("{stats}");
562        assert!(display.contains("110 edges"));
563        assert!(display.contains("5500 bytes"));
564        assert!(display.contains("13")); // tombstones
565    }
566
567    #[test]
568    fn test_compaction_checkpoint_display() {
569        let cp = CompactionCheckpoint::from_components(
570            1, 50, 2500, 20, 5, // forward
571            1, 60, 3000, 25, 8, // reverse
572            100, 10, 50, 5, // counters
573        );
574
575        let display = format!("{cp}");
576        assert!(display.contains("forward:"));
577        assert!(display.contains("reverse:"));
578        assert!(display.contains("counters:"));
579    }
580
581    #[test]
582    fn test_default_values() {
583        let counter_cp = CounterCheckpoint::default();
584        assert_eq!(counter_cp.committed_bytes, 0);
585        assert_eq!(counter_cp.total_bytes(), 0);
586
587        let edge_cp = EdgeStoreCheckpoint::default();
588        assert_eq!(edge_cp.csr_version, 0);
589        assert_eq!(edge_cp.delta_edge_count, 0);
590
591        let comp_cp = CompactionCheckpoint::default();
592        assert_eq!(comp_cp.forward.csr_version, 0);
593        assert_eq!(comp_cp.counters.committed_bytes, 0);
594    }
595
596    #[test]
597    fn test_counter_checkpoint_equality() {
598        let cp1 = CounterCheckpoint::new(100, 10, 50, 5);
599        let cp2 = CounterCheckpoint::new(100, 10, 50, 5);
600        let cp3 = CounterCheckpoint::new(100, 10, 50, 6);
601
602        assert_eq!(cp1, cp2);
603        assert_ne!(cp1, cp3);
604    }
605
606    #[test]
607    fn test_edge_store_checkpoint_equality() {
608        let cp1 = EdgeStoreCheckpoint::new(1, 100, 5000, 42, 10);
609        let cp2 = EdgeStoreCheckpoint::new(1, 100, 5000, 42, 10);
610        let cp3 = EdgeStoreCheckpoint::new(2, 100, 5000, 42, 10);
611
612        assert_eq!(cp1, cp2);
613        assert_ne!(cp1, cp3);
614    }
615}