use std::fmt;
use super::super::admission::BufferStateSnapshot;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub struct CounterCheckpoint {
pub committed_bytes: usize,
pub committed_ops: usize,
pub reserved_bytes: usize,
pub reserved_ops: usize,
}
impl CounterCheckpoint {
#[must_use]
pub fn new(
committed_bytes: usize,
committed_ops: usize,
reserved_bytes: usize,
reserved_ops: usize,
) -> Self {
Self {
committed_bytes,
committed_ops,
reserved_bytes,
reserved_ops,
}
}
#[must_use]
pub fn from_snapshot(snapshot: &BufferStateSnapshot) -> Self {
Self {
committed_bytes: snapshot.committed_bytes,
committed_ops: snapshot.committed_ops,
reserved_bytes: snapshot.reserved_bytes,
reserved_ops: snapshot.reserved_ops,
}
}
#[must_use]
#[inline]
pub const fn total_bytes(&self) -> usize {
self.committed_bytes + self.reserved_bytes
}
#[must_use]
#[inline]
pub const fn total_ops(&self) -> usize {
self.committed_ops + self.reserved_ops
}
}
impl fmt::Display for CounterCheckpoint {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"bytes: {} committed + {} reserved, ops: {} committed + {} reserved",
self.committed_bytes, self.reserved_bytes, self.committed_ops, self.reserved_ops
)
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub struct EdgeStoreCheckpoint {
pub csr_version: u64,
pub delta_edge_count: usize,
pub delta_byte_size: usize,
pub seq_counter: u64,
pub tombstone_count: usize,
}
impl EdgeStoreCheckpoint {
#[must_use]
pub fn new(
csr_version: u64,
delta_edge_count: usize,
delta_byte_size: usize,
seq_counter: u64,
tombstone_count: usize,
) -> Self {
Self {
csr_version,
delta_edge_count,
delta_byte_size,
seq_counter,
tombstone_count,
}
}
#[must_use]
pub fn has_changed(
&self,
current_csr_version: u64,
current_delta_count: usize,
current_seq: u64,
) -> bool {
self.csr_version != current_csr_version
|| self.delta_edge_count != current_delta_count
|| self.seq_counter != current_seq
}
}
impl fmt::Display for EdgeStoreCheckpoint {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"csr_v{}, {} deltas ({} bytes), seq={}, {} tombstones",
self.csr_version,
self.delta_edge_count,
self.delta_byte_size,
self.seq_counter,
self.tombstone_count
)
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct CompactionCheckpoint {
pub forward: EdgeStoreCheckpoint,
pub reverse: EdgeStoreCheckpoint,
pub counters: CounterCheckpoint,
pub created_at_epoch_ms: u64,
}
impl CompactionCheckpoint {
#[must_use]
pub fn new(
forward: EdgeStoreCheckpoint,
reverse: EdgeStoreCheckpoint,
counters: CounterCheckpoint,
) -> Self {
Self {
forward,
reverse,
counters,
created_at_epoch_ms: current_epoch_ms(),
}
}
#[must_use]
#[allow(clippy::too_many_arguments)]
pub fn from_components(
forward_csr_version: u64,
forward_delta_count: usize,
forward_delta_bytes: usize,
forward_seq: u64,
forward_tombstones: usize,
reverse_csr_version: u64,
reverse_delta_count: usize,
reverse_delta_bytes: usize,
reverse_seq: u64,
reverse_tombstones: usize,
committed_bytes: usize,
committed_ops: usize,
reserved_bytes: usize,
reserved_ops: usize,
) -> Self {
Self::new(
EdgeStoreCheckpoint::new(
forward_csr_version,
forward_delta_count,
forward_delta_bytes,
forward_seq,
forward_tombstones,
),
EdgeStoreCheckpoint::new(
reverse_csr_version,
reverse_delta_count,
reverse_delta_bytes,
reverse_seq,
reverse_tombstones,
),
CounterCheckpoint::new(committed_bytes, committed_ops, reserved_bytes, reserved_ops),
)
}
#[must_use]
pub fn has_concurrent_modification(
&self,
forward_csr_version: u64,
forward_delta_count: usize,
forward_seq: u64,
reverse_csr_version: u64,
reverse_delta_count: usize,
reverse_seq: u64,
) -> bool {
self.forward
.has_changed(forward_csr_version, forward_delta_count, forward_seq)
|| self
.reverse
.has_changed(reverse_csr_version, reverse_delta_count, reverse_seq)
}
#[must_use]
pub fn age_ms(&self) -> u64 {
current_epoch_ms().saturating_sub(self.created_at_epoch_ms)
}
#[must_use]
pub fn stats(&self) -> CheckpointStats {
CheckpointStats {
forward_delta_count: self.forward.delta_edge_count,
forward_delta_bytes: self.forward.delta_byte_size,
forward_tombstones: self.forward.tombstone_count,
reverse_delta_count: self.reverse.delta_edge_count,
reverse_delta_bytes: self.reverse.delta_byte_size,
reverse_tombstones: self.reverse.tombstone_count,
total_committed_bytes: self.counters.committed_bytes,
total_committed_ops: self.counters.committed_ops,
total_reserved_bytes: self.counters.reserved_bytes,
total_reserved_ops: self.counters.reserved_ops,
}
}
}
impl Default for CompactionCheckpoint {
fn default() -> Self {
Self {
forward: EdgeStoreCheckpoint::default(),
reverse: EdgeStoreCheckpoint::default(),
counters: CounterCheckpoint::default(),
created_at_epoch_ms: current_epoch_ms(),
}
}
}
impl fmt::Display for CompactionCheckpoint {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"CompactionCheckpoint {{ forward: [{}], reverse: [{}], counters: [{}] }}",
self.forward, self.reverse, self.counters
)
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub struct CheckpointStats {
pub forward_delta_count: usize,
pub forward_delta_bytes: usize,
pub forward_tombstones: usize,
pub reverse_delta_count: usize,
pub reverse_delta_bytes: usize,
pub reverse_tombstones: usize,
pub total_committed_bytes: usize,
pub total_committed_ops: usize,
pub total_reserved_bytes: usize,
pub total_reserved_ops: usize,
}
impl CheckpointStats {
#[must_use]
#[inline]
pub const fn total_delta_edges(&self) -> usize {
self.forward_delta_count + self.reverse_delta_count
}
#[must_use]
#[inline]
pub const fn total_delta_bytes(&self) -> usize {
self.forward_delta_bytes + self.reverse_delta_bytes
}
#[must_use]
#[inline]
pub const fn total_tombstones(&self) -> usize {
self.forward_tombstones + self.reverse_tombstones
}
}
impl fmt::Display for CheckpointStats {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"deltas: {} edges ({} bytes), tombstones: {}, committed: {} bytes/{} ops, reserved: {} bytes/{} ops",
self.total_delta_edges(),
self.total_delta_bytes(),
self.total_tombstones(),
self.total_committed_bytes,
self.total_committed_ops,
self.total_reserved_bytes,
self.total_reserved_ops
)
}
}
fn current_epoch_ms() -> u64 {
use std::time::{SystemTime, UNIX_EPOCH};
SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| u64::try_from(d.as_millis()).unwrap_or(u64::MAX))
.unwrap_or(0)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_counter_checkpoint_new() {
let cp = CounterCheckpoint::new(100, 10, 50, 5);
assert_eq!(cp.committed_bytes, 100);
assert_eq!(cp.committed_ops, 10);
assert_eq!(cp.reserved_bytes, 50);
assert_eq!(cp.reserved_ops, 5);
}
#[test]
fn test_counter_checkpoint_totals() {
let cp = CounterCheckpoint::new(100, 10, 50, 5);
assert_eq!(cp.total_bytes(), 150);
assert_eq!(cp.total_ops(), 15);
}
#[test]
fn test_counter_checkpoint_from_snapshot() {
let snapshot = BufferStateSnapshot {
committed_bytes: 200,
committed_ops: 20,
reserved_bytes: 100,
reserved_ops: 10,
active_guards: 2,
};
let cp = CounterCheckpoint::from_snapshot(&snapshot);
assert_eq!(cp.committed_bytes, 200);
assert_eq!(cp.committed_ops, 20);
assert_eq!(cp.reserved_bytes, 100);
assert_eq!(cp.reserved_ops, 10);
}
#[test]
fn test_counter_checkpoint_display() {
let cp = CounterCheckpoint::new(100, 10, 50, 5);
let display = format!("{cp}");
assert!(display.contains("100 committed"));
assert!(display.contains("50 reserved"));
}
#[test]
fn test_edge_store_checkpoint_new() {
let cp = EdgeStoreCheckpoint::new(1, 100, 5000, 42, 10);
assert_eq!(cp.csr_version, 1);
assert_eq!(cp.delta_edge_count, 100);
assert_eq!(cp.delta_byte_size, 5000);
assert_eq!(cp.seq_counter, 42);
assert_eq!(cp.tombstone_count, 10);
}
#[test]
fn test_edge_store_checkpoint_has_changed() {
let cp = EdgeStoreCheckpoint::new(1, 100, 5000, 42, 10);
assert!(!cp.has_changed(1, 100, 42));
assert!(cp.has_changed(2, 100, 42));
assert!(cp.has_changed(1, 101, 42));
assert!(cp.has_changed(1, 100, 43));
}
#[test]
fn test_edge_store_checkpoint_display() {
let cp = EdgeStoreCheckpoint::new(1, 100, 5000, 42, 10);
let display = format!("{cp}");
assert!(display.contains("csr_v1"));
assert!(display.contains("100 deltas"));
assert!(display.contains("seq=42"));
}
#[test]
fn test_compaction_checkpoint_new() {
let forward = EdgeStoreCheckpoint::new(1, 50, 2500, 20, 5);
let reverse = EdgeStoreCheckpoint::new(1, 50, 2500, 20, 5);
let counters = CounterCheckpoint::new(100, 10, 50, 5);
let cp = CompactionCheckpoint::new(forward, reverse, counters);
assert_eq!(cp.forward.delta_edge_count, 50);
assert_eq!(cp.reverse.delta_edge_count, 50);
assert_eq!(cp.counters.committed_bytes, 100);
assert!(cp.created_at_epoch_ms > 0);
}
#[test]
fn test_compaction_checkpoint_from_components() {
let cp = CompactionCheckpoint::from_components(
1, 50, 2500, 20, 5, 2, 60, 3000, 25, 8, 100, 10, 50, 5, );
assert_eq!(cp.forward.csr_version, 1);
assert_eq!(cp.forward.delta_edge_count, 50);
assert_eq!(cp.reverse.csr_version, 2);
assert_eq!(cp.reverse.delta_edge_count, 60);
assert_eq!(cp.counters.committed_bytes, 100);
}
#[test]
fn test_compaction_checkpoint_has_concurrent_modification() {
let cp = CompactionCheckpoint::from_components(
1, 50, 2500, 20, 5, 1, 60, 3000, 25, 8, 100, 10, 50, 5, );
assert!(!cp.has_concurrent_modification(1, 50, 20, 1, 60, 25));
assert!(cp.has_concurrent_modification(2, 50, 20, 1, 60, 25));
assert!(cp.has_concurrent_modification(1, 50, 20, 2, 60, 25));
}
#[test]
fn test_compaction_checkpoint_stats() {
let cp = CompactionCheckpoint::from_components(
1, 50, 2500, 20, 5, 1, 60, 3000, 25, 8, 100, 10, 50, 5, );
let stats = cp.stats();
assert_eq!(stats.total_delta_edges(), 110);
assert_eq!(stats.total_delta_bytes(), 5500);
assert_eq!(stats.total_tombstones(), 13);
assert_eq!(stats.total_committed_bytes, 100);
}
#[test]
fn test_compaction_checkpoint_age() {
let cp = CompactionCheckpoint::default();
std::thread::sleep(std::time::Duration::from_millis(10));
let age = cp.age_ms();
assert!(age >= 10);
}
#[test]
fn test_checkpoint_stats_display() {
let stats = CheckpointStats {
forward_delta_count: 50,
forward_delta_bytes: 2500,
forward_tombstones: 5,
reverse_delta_count: 60,
reverse_delta_bytes: 3000,
reverse_tombstones: 8,
total_committed_bytes: 100,
total_committed_ops: 10,
total_reserved_bytes: 50,
total_reserved_ops: 5,
};
let display = format!("{stats}");
assert!(display.contains("110 edges"));
assert!(display.contains("5500 bytes"));
assert!(display.contains("13")); }
#[test]
fn test_compaction_checkpoint_display() {
let cp = CompactionCheckpoint::from_components(
1, 50, 2500, 20, 5, 1, 60, 3000, 25, 8, 100, 10, 50, 5, );
let display = format!("{cp}");
assert!(display.contains("forward:"));
assert!(display.contains("reverse:"));
assert!(display.contains("counters:"));
}
#[test]
fn test_default_values() {
let counter_cp = CounterCheckpoint::default();
assert_eq!(counter_cp.committed_bytes, 0);
assert_eq!(counter_cp.total_bytes(), 0);
let edge_cp = EdgeStoreCheckpoint::default();
assert_eq!(edge_cp.csr_version, 0);
assert_eq!(edge_cp.delta_edge_count, 0);
let comp_cp = CompactionCheckpoint::default();
assert_eq!(comp_cp.forward.csr_version, 0);
assert_eq!(comp_cp.counters.committed_bytes, 0);
}
#[test]
fn test_counter_checkpoint_equality() {
let cp1 = CounterCheckpoint::new(100, 10, 50, 5);
let cp2 = CounterCheckpoint::new(100, 10, 50, 5);
let cp3 = CounterCheckpoint::new(100, 10, 50, 6);
assert_eq!(cp1, cp2);
assert_ne!(cp1, cp3);
}
#[test]
fn test_edge_store_checkpoint_equality() {
let cp1 = EdgeStoreCheckpoint::new(1, 100, 5000, 42, 10);
let cp2 = EdgeStoreCheckpoint::new(1, 100, 5000, 42, 10);
let cp3 = EdgeStoreCheckpoint::new(2, 100, 5000, 42, 10);
assert_eq!(cp1, cp2);
assert_ne!(cp1, cp3);
}
}