use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
#[derive(Debug)]
pub(crate) struct PersistenceTracker {
vector_mutations: AtomicU64,
graph_mutations: AtomicU64,
temporal_mutations: AtomicU64,
string_mutations: AtomicU64,
last_vector_persist: AtomicU64,
last_graph_persist: AtomicU64,
last_temporal_persist: AtomicU64,
last_string_persist: AtomicU64,
last_vector_lsn: AtomicU64,
last_graph_lsn: AtomicU64,
last_temporal_lsn: AtomicU64,
last_string_lsn: AtomicU64,
last_persisted_node_count: AtomicU64,
last_persisted_edge_count: AtomicU64,
last_persisted_string_count: AtomicU64,
shutdown: AtomicBool,
}
impl PersistenceTracker {
pub fn new() -> Self {
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or(std::time::Duration::from_secs(0))
.as_secs();
Self {
vector_mutations: AtomicU64::new(0),
graph_mutations: AtomicU64::new(0),
temporal_mutations: AtomicU64::new(0),
string_mutations: AtomicU64::new(0),
last_vector_persist: AtomicU64::new(now),
last_graph_persist: AtomicU64::new(now),
last_temporal_persist: AtomicU64::new(now),
last_string_persist: AtomicU64::new(now),
last_vector_lsn: AtomicU64::new(0),
last_graph_lsn: AtomicU64::new(0),
last_temporal_lsn: AtomicU64::new(0),
last_string_lsn: AtomicU64::new(0),
last_persisted_node_count: AtomicU64::new(0),
last_persisted_edge_count: AtomicU64::new(0),
last_persisted_string_count: AtomicU64::new(0),
shutdown: AtomicBool::new(false),
}
}
pub fn set_start_lsn(&self, lsn: u64) {
self.last_vector_lsn.store(lsn, Ordering::Relaxed);
self.last_graph_lsn.store(lsn, Ordering::Relaxed);
self.last_temporal_lsn.store(lsn, Ordering::Relaxed);
self.last_string_lsn.store(lsn, Ordering::Relaxed);
}
pub fn update_vector_lsn(&self, lsn: u64) {
self.last_vector_lsn.fetch_max(lsn, Ordering::Release);
}
pub fn update_graph_lsn(&self, lsn: u64) {
self.last_graph_lsn.fetch_max(lsn, Ordering::Release);
}
pub fn update_temporal_lsn(&self, lsn: u64) {
self.last_temporal_lsn.fetch_max(lsn, Ordering::Release);
}
pub fn update_string_lsn(&self, lsn: u64) {
self.last_string_lsn.fetch_max(lsn, Ordering::Release);
}
pub fn update_last_persisted_counts(&self, node_count: u64, edge_count: u64) {
self.last_persisted_node_count
.store(node_count, Ordering::Release);
self.last_persisted_edge_count
.store(edge_count, Ordering::Release);
}
pub fn get_last_persisted_node_count(&self) -> u64 {
self.last_persisted_node_count.load(Ordering::Acquire)
}
pub fn get_last_persisted_edge_count(&self) -> u64 {
self.last_persisted_edge_count.load(Ordering::Acquire)
}
pub fn update_last_persisted_string_count(&self, count: u64) {
self.last_persisted_string_count
.store(count, Ordering::Release);
}
pub fn get_last_persisted_string_count(&self) -> u64 {
self.last_persisted_string_count.load(Ordering::Acquire)
}
pub fn get_safe_manifest_lsn(&self) -> u64 {
let vector = self.last_vector_lsn.load(Ordering::Acquire);
let graph = self.last_graph_lsn.load(Ordering::Acquire);
let temporal = self.last_temporal_lsn.load(Ordering::Acquire);
let string = self.last_string_lsn.load(Ordering::Acquire);
vector.min(graph).min(temporal).min(string)
}
pub fn record_vector_mutation(&self) {
self.vector_mutations.fetch_add(1, Ordering::Relaxed);
}
pub fn record_graph_mutation(&self) {
self.graph_mutations.fetch_add(1, Ordering::Relaxed);
}
pub fn record_temporal_mutation(&self) {
self.temporal_mutations.fetch_add(1, Ordering::Relaxed);
}
pub fn record_string_mutation(&self) {
self.string_mutations.fetch_add(1, Ordering::Relaxed);
}
pub fn reset_vector_mutations(&self) -> u64 {
let count = self.vector_mutations.swap(0, Ordering::Relaxed);
self.last_vector_persist.store(
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or(std::time::Duration::from_secs(0))
.as_secs(),
Ordering::Relaxed,
);
count
}
pub fn reset_graph_mutations(&self) -> u64 {
let count = self.graph_mutations.swap(0, Ordering::Relaxed);
self.last_graph_persist.store(
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or(std::time::Duration::from_secs(0))
.as_secs(),
Ordering::Relaxed,
);
count
}
pub fn reset_temporal_mutations(&self) -> u64 {
let count = self.temporal_mutations.swap(0, Ordering::Relaxed);
self.last_temporal_persist.store(
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or(std::time::Duration::from_secs(0))
.as_secs(),
Ordering::Relaxed,
);
count
}
pub fn reset_string_mutations(&self) -> u64 {
let count = self.string_mutations.swap(0, Ordering::Relaxed);
self.last_string_persist.store(
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or(std::time::Duration::from_secs(0))
.as_secs(),
Ordering::Relaxed,
);
count
}
pub fn get_vector_mutations(&self) -> u64 {
self.vector_mutations.load(Ordering::Relaxed)
}
pub fn get_graph_mutations(&self) -> u64 {
self.graph_mutations.load(Ordering::Relaxed)
}
pub fn get_temporal_mutations(&self) -> u64 {
self.temporal_mutations.load(Ordering::Relaxed)
}
pub fn get_string_mutations(&self) -> u64 {
self.string_mutations.load(Ordering::Relaxed)
}
pub fn seconds_since_vector_persist(&self) -> u64 {
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or(std::time::Duration::from_secs(0))
.as_secs();
let last = self.last_vector_persist.load(Ordering::Relaxed);
now.saturating_sub(last)
}
pub fn seconds_since_graph_persist(&self) -> u64 {
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or(std::time::Duration::from_secs(0))
.as_secs();
let last = self.last_graph_persist.load(Ordering::Relaxed);
now.saturating_sub(last)
}
pub fn seconds_since_temporal_persist(&self) -> u64 {
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or(std::time::Duration::from_secs(0))
.as_secs();
let last = self.last_temporal_persist.load(Ordering::Relaxed);
now.saturating_sub(last)
}
pub fn seconds_since_string_persist(&self) -> u64 {
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or(std::time::Duration::from_secs(0))
.as_secs();
let last = self.last_string_persist.load(Ordering::Relaxed);
now.saturating_sub(last)
}
pub fn signal_shutdown(&self) {
self.shutdown.store(true, Ordering::Release);
}
pub fn is_shutdown(&self) -> bool {
self.shutdown.load(Ordering::Acquire)
}
}
#[cfg(test)]
#[path = "tracker_tests.rs"]
mod tests;