1use std::fmt;
36
37use super::super::admission::BufferStateSnapshot;
38
39#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
43pub struct CounterCheckpoint {
44 pub committed_bytes: usize,
46 pub committed_ops: usize,
48 pub reserved_bytes: usize,
50 pub reserved_ops: usize,
52}
53
54impl CounterCheckpoint {
55 #[must_use]
57 pub fn new(
58 committed_bytes: usize,
59 committed_ops: usize,
60 reserved_bytes: usize,
61 reserved_ops: usize,
62 ) -> Self {
63 Self {
64 committed_bytes,
65 committed_ops,
66 reserved_bytes,
67 reserved_ops,
68 }
69 }
70
71 #[must_use]
73 pub fn from_snapshot(snapshot: &BufferStateSnapshot) -> Self {
74 Self {
75 committed_bytes: snapshot.committed_bytes,
76 committed_ops: snapshot.committed_ops,
77 reserved_bytes: snapshot.reserved_bytes,
78 reserved_ops: snapshot.reserved_ops,
79 }
80 }
81
82 #[must_use]
84 #[inline]
85 pub const fn total_bytes(&self) -> usize {
86 self.committed_bytes + self.reserved_bytes
87 }
88
89 #[must_use]
91 #[inline]
92 pub const fn total_ops(&self) -> usize {
93 self.committed_ops + self.reserved_ops
94 }
95}
96
97impl fmt::Display for CounterCheckpoint {
98 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
99 write!(
100 f,
101 "bytes: {} committed + {} reserved, ops: {} committed + {} reserved",
102 self.committed_bytes, self.reserved_bytes, self.committed_ops, self.reserved_ops
103 )
104 }
105}
106
107#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
113pub struct EdgeStoreCheckpoint {
114 pub csr_version: u64,
116 pub delta_edge_count: usize,
118 pub delta_byte_size: usize,
120 pub seq_counter: u64,
122 pub tombstone_count: usize,
124}
125
126impl EdgeStoreCheckpoint {
127 #[must_use]
129 pub fn new(
130 csr_version: u64,
131 delta_edge_count: usize,
132 delta_byte_size: usize,
133 seq_counter: u64,
134 tombstone_count: usize,
135 ) -> Self {
136 Self {
137 csr_version,
138 delta_edge_count,
139 delta_byte_size,
140 seq_counter,
141 tombstone_count,
142 }
143 }
144
145 #[must_use]
147 pub fn has_changed(
148 &self,
149 current_csr_version: u64,
150 current_delta_count: usize,
151 current_seq: u64,
152 ) -> bool {
153 self.csr_version != current_csr_version
154 || self.delta_edge_count != current_delta_count
155 || self.seq_counter != current_seq
156 }
157}
158
159impl fmt::Display for EdgeStoreCheckpoint {
160 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
161 write!(
162 f,
163 "csr_v{}, {} deltas ({} bytes), seq={}, {} tombstones",
164 self.csr_version,
165 self.delta_edge_count,
166 self.delta_byte_size,
167 self.seq_counter,
168 self.tombstone_count
169 )
170 }
171}
172
173#[derive(Debug, Clone, PartialEq, Eq)]
191pub struct CompactionCheckpoint {
192 pub forward: EdgeStoreCheckpoint,
194 pub reverse: EdgeStoreCheckpoint,
196 pub counters: CounterCheckpoint,
198 pub created_at_epoch_ms: u64,
200}
201
202impl CompactionCheckpoint {
203 #[must_use]
205 pub fn new(
206 forward: EdgeStoreCheckpoint,
207 reverse: EdgeStoreCheckpoint,
208 counters: CounterCheckpoint,
209 ) -> Self {
210 Self {
211 forward,
212 reverse,
213 counters,
214 created_at_epoch_ms: current_epoch_ms(),
215 }
216 }
217
218 #[must_use]
220 #[allow(clippy::too_many_arguments)]
221 pub fn from_components(
222 forward_csr_version: u64,
223 forward_delta_count: usize,
224 forward_delta_bytes: usize,
225 forward_seq: u64,
226 forward_tombstones: usize,
227 reverse_csr_version: u64,
228 reverse_delta_count: usize,
229 reverse_delta_bytes: usize,
230 reverse_seq: u64,
231 reverse_tombstones: usize,
232 committed_bytes: usize,
233 committed_ops: usize,
234 reserved_bytes: usize,
235 reserved_ops: usize,
236 ) -> Self {
237 Self::new(
238 EdgeStoreCheckpoint::new(
239 forward_csr_version,
240 forward_delta_count,
241 forward_delta_bytes,
242 forward_seq,
243 forward_tombstones,
244 ),
245 EdgeStoreCheckpoint::new(
246 reverse_csr_version,
247 reverse_delta_count,
248 reverse_delta_bytes,
249 reverse_seq,
250 reverse_tombstones,
251 ),
252 CounterCheckpoint::new(committed_bytes, committed_ops, reserved_bytes, reserved_ops),
253 )
254 }
255
256 #[must_use]
258 pub fn has_concurrent_modification(
259 &self,
260 forward_csr_version: u64,
261 forward_delta_count: usize,
262 forward_seq: u64,
263 reverse_csr_version: u64,
264 reverse_delta_count: usize,
265 reverse_seq: u64,
266 ) -> bool {
267 self.forward
268 .has_changed(forward_csr_version, forward_delta_count, forward_seq)
269 || self
270 .reverse
271 .has_changed(reverse_csr_version, reverse_delta_count, reverse_seq)
272 }
273
274 #[must_use]
276 pub fn age_ms(&self) -> u64 {
277 current_epoch_ms().saturating_sub(self.created_at_epoch_ms)
278 }
279
280 #[must_use]
282 pub fn stats(&self) -> CheckpointStats {
283 CheckpointStats {
284 forward_delta_count: self.forward.delta_edge_count,
285 forward_delta_bytes: self.forward.delta_byte_size,
286 forward_tombstones: self.forward.tombstone_count,
287 reverse_delta_count: self.reverse.delta_edge_count,
288 reverse_delta_bytes: self.reverse.delta_byte_size,
289 reverse_tombstones: self.reverse.tombstone_count,
290 total_committed_bytes: self.counters.committed_bytes,
291 total_committed_ops: self.counters.committed_ops,
292 total_reserved_bytes: self.counters.reserved_bytes,
293 total_reserved_ops: self.counters.reserved_ops,
294 }
295 }
296}
297
298impl Default for CompactionCheckpoint {
299 fn default() -> Self {
300 Self {
301 forward: EdgeStoreCheckpoint::default(),
302 reverse: EdgeStoreCheckpoint::default(),
303 counters: CounterCheckpoint::default(),
304 created_at_epoch_ms: current_epoch_ms(),
305 }
306 }
307}
308
309impl fmt::Display for CompactionCheckpoint {
310 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
311 write!(
312 f,
313 "CompactionCheckpoint {{ forward: [{}], reverse: [{}], counters: [{}] }}",
314 self.forward, self.reverse, self.counters
315 )
316 }
317}
318
319#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
321pub struct CheckpointStats {
322 pub forward_delta_count: usize,
324 pub forward_delta_bytes: usize,
326 pub forward_tombstones: usize,
328 pub reverse_delta_count: usize,
330 pub reverse_delta_bytes: usize,
332 pub reverse_tombstones: usize,
334 pub total_committed_bytes: usize,
336 pub total_committed_ops: usize,
338 pub total_reserved_bytes: usize,
340 pub total_reserved_ops: usize,
342}
343
344impl CheckpointStats {
345 #[must_use]
347 #[inline]
348 pub const fn total_delta_edges(&self) -> usize {
349 self.forward_delta_count + self.reverse_delta_count
350 }
351
352 #[must_use]
354 #[inline]
355 pub const fn total_delta_bytes(&self) -> usize {
356 self.forward_delta_bytes + self.reverse_delta_bytes
357 }
358
359 #[must_use]
361 #[inline]
362 pub const fn total_tombstones(&self) -> usize {
363 self.forward_tombstones + self.reverse_tombstones
364 }
365}
366
367impl fmt::Display for CheckpointStats {
368 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
369 write!(
370 f,
371 "deltas: {} edges ({} bytes), tombstones: {}, committed: {} bytes/{} ops, reserved: {} bytes/{} ops",
372 self.total_delta_edges(),
373 self.total_delta_bytes(),
374 self.total_tombstones(),
375 self.total_committed_bytes,
376 self.total_committed_ops,
377 self.total_reserved_bytes,
378 self.total_reserved_ops
379 )
380 }
381}
382
383fn current_epoch_ms() -> u64 {
385 use std::time::{SystemTime, UNIX_EPOCH};
386
387 SystemTime::now()
388 .duration_since(UNIX_EPOCH)
389 .map(|d| u64::try_from(d.as_millis()).unwrap_or(u64::MAX))
390 .unwrap_or(0)
391}
392
393#[cfg(test)]
394mod tests {
395 use super::*;
396
397 #[test]
398 fn test_counter_checkpoint_new() {
399 let cp = CounterCheckpoint::new(100, 10, 50, 5);
400 assert_eq!(cp.committed_bytes, 100);
401 assert_eq!(cp.committed_ops, 10);
402 assert_eq!(cp.reserved_bytes, 50);
403 assert_eq!(cp.reserved_ops, 5);
404 }
405
406 #[test]
407 fn test_counter_checkpoint_totals() {
408 let cp = CounterCheckpoint::new(100, 10, 50, 5);
409 assert_eq!(cp.total_bytes(), 150);
410 assert_eq!(cp.total_ops(), 15);
411 }
412
413 #[test]
414 fn test_counter_checkpoint_from_snapshot() {
415 let snapshot = BufferStateSnapshot {
416 committed_bytes: 200,
417 committed_ops: 20,
418 reserved_bytes: 100,
419 reserved_ops: 10,
420 active_guards: 2,
421 };
422
423 let cp = CounterCheckpoint::from_snapshot(&snapshot);
424 assert_eq!(cp.committed_bytes, 200);
425 assert_eq!(cp.committed_ops, 20);
426 assert_eq!(cp.reserved_bytes, 100);
427 assert_eq!(cp.reserved_ops, 10);
428 }
429
430 #[test]
431 fn test_counter_checkpoint_display() {
432 let cp = CounterCheckpoint::new(100, 10, 50, 5);
433 let display = format!("{cp}");
434 assert!(display.contains("100 committed"));
435 assert!(display.contains("50 reserved"));
436 }
437
438 #[test]
439 fn test_edge_store_checkpoint_new() {
440 let cp = EdgeStoreCheckpoint::new(1, 100, 5000, 42, 10);
441 assert_eq!(cp.csr_version, 1);
442 assert_eq!(cp.delta_edge_count, 100);
443 assert_eq!(cp.delta_byte_size, 5000);
444 assert_eq!(cp.seq_counter, 42);
445 assert_eq!(cp.tombstone_count, 10);
446 }
447
448 #[test]
449 fn test_edge_store_checkpoint_has_changed() {
450 let cp = EdgeStoreCheckpoint::new(1, 100, 5000, 42, 10);
451
452 assert!(!cp.has_changed(1, 100, 42));
454
455 assert!(cp.has_changed(2, 100, 42));
457
458 assert!(cp.has_changed(1, 101, 42));
460
461 assert!(cp.has_changed(1, 100, 43));
463 }
464
465 #[test]
466 fn test_edge_store_checkpoint_display() {
467 let cp = EdgeStoreCheckpoint::new(1, 100, 5000, 42, 10);
468 let display = format!("{cp}");
469 assert!(display.contains("csr_v1"));
470 assert!(display.contains("100 deltas"));
471 assert!(display.contains("seq=42"));
472 }
473
474 #[test]
475 fn test_compaction_checkpoint_new() {
476 let forward = EdgeStoreCheckpoint::new(1, 50, 2500, 20, 5);
477 let reverse = EdgeStoreCheckpoint::new(1, 50, 2500, 20, 5);
478 let counters = CounterCheckpoint::new(100, 10, 50, 5);
479
480 let cp = CompactionCheckpoint::new(forward, reverse, counters);
481 assert_eq!(cp.forward.delta_edge_count, 50);
482 assert_eq!(cp.reverse.delta_edge_count, 50);
483 assert_eq!(cp.counters.committed_bytes, 100);
484 assert!(cp.created_at_epoch_ms > 0);
485 }
486
487 #[test]
488 fn test_compaction_checkpoint_from_components() {
489 let cp = CompactionCheckpoint::from_components(
490 1, 50, 2500, 20, 5, 2, 60, 3000, 25, 8, 100, 10, 50, 5, );
494
495 assert_eq!(cp.forward.csr_version, 1);
496 assert_eq!(cp.forward.delta_edge_count, 50);
497 assert_eq!(cp.reverse.csr_version, 2);
498 assert_eq!(cp.reverse.delta_edge_count, 60);
499 assert_eq!(cp.counters.committed_bytes, 100);
500 }
501
502 #[test]
503 fn test_compaction_checkpoint_has_concurrent_modification() {
504 let cp = CompactionCheckpoint::from_components(
505 1, 50, 2500, 20, 5, 1, 60, 3000, 25, 8, 100, 10, 50, 5, );
509
510 assert!(!cp.has_concurrent_modification(1, 50, 20, 1, 60, 25));
512
513 assert!(cp.has_concurrent_modification(2, 50, 20, 1, 60, 25));
515
516 assert!(cp.has_concurrent_modification(1, 50, 20, 2, 60, 25));
518 }
519
520 #[test]
521 fn test_compaction_checkpoint_stats() {
522 let cp = CompactionCheckpoint::from_components(
523 1, 50, 2500, 20, 5, 1, 60, 3000, 25, 8, 100, 10, 50, 5, );
527
528 let stats = cp.stats();
529 assert_eq!(stats.total_delta_edges(), 110);
530 assert_eq!(stats.total_delta_bytes(), 5500);
531 assert_eq!(stats.total_tombstones(), 13);
532 assert_eq!(stats.total_committed_bytes, 100);
533 }
534
535 #[test]
536 fn test_compaction_checkpoint_age() {
537 let cp = CompactionCheckpoint::default();
538
539 std::thread::sleep(std::time::Duration::from_millis(10));
541
542 let age = cp.age_ms();
543 assert!(age >= 10);
544 }
545
546 #[test]
547 fn test_checkpoint_stats_display() {
548 let stats = CheckpointStats {
549 forward_delta_count: 50,
550 forward_delta_bytes: 2500,
551 forward_tombstones: 5,
552 reverse_delta_count: 60,
553 reverse_delta_bytes: 3000,
554 reverse_tombstones: 8,
555 total_committed_bytes: 100,
556 total_committed_ops: 10,
557 total_reserved_bytes: 50,
558 total_reserved_ops: 5,
559 };
560
561 let display = format!("{stats}");
562 assert!(display.contains("110 edges"));
563 assert!(display.contains("5500 bytes"));
564 assert!(display.contains("13")); }
566
567 #[test]
568 fn test_compaction_checkpoint_display() {
569 let cp = CompactionCheckpoint::from_components(
570 1, 50, 2500, 20, 5, 1, 60, 3000, 25, 8, 100, 10, 50, 5, );
574
575 let display = format!("{cp}");
576 assert!(display.contains("forward:"));
577 assert!(display.contains("reverse:"));
578 assert!(display.contains("counters:"));
579 }
580
581 #[test]
582 fn test_default_values() {
583 let counter_cp = CounterCheckpoint::default();
584 assert_eq!(counter_cp.committed_bytes, 0);
585 assert_eq!(counter_cp.total_bytes(), 0);
586
587 let edge_cp = EdgeStoreCheckpoint::default();
588 assert_eq!(edge_cp.csr_version, 0);
589 assert_eq!(edge_cp.delta_edge_count, 0);
590
591 let comp_cp = CompactionCheckpoint::default();
592 assert_eq!(comp_cp.forward.csr_version, 0);
593 assert_eq!(comp_cp.counters.committed_bytes, 0);
594 }
595
596 #[test]
597 fn test_counter_checkpoint_equality() {
598 let cp1 = CounterCheckpoint::new(100, 10, 50, 5);
599 let cp2 = CounterCheckpoint::new(100, 10, 50, 5);
600 let cp3 = CounterCheckpoint::new(100, 10, 50, 6);
601
602 assert_eq!(cp1, cp2);
603 assert_ne!(cp1, cp3);
604 }
605
606 #[test]
607 fn test_edge_store_checkpoint_equality() {
608 let cp1 = EdgeStoreCheckpoint::new(1, 100, 5000, 42, 10);
609 let cp2 = EdgeStoreCheckpoint::new(1, 100, 5000, 42, 10);
610 let cp3 = EdgeStoreCheckpoint::new(2, 100, 5000, 42, 10);
611
612 assert_eq!(cp1, cp2);
613 assert_ne!(cp1, cp3);
614 }
615}