use arc_swap::ArcSwap;
use parking_lot::Mutex;
use rusqlite::{Connection, OpenFlags, Result as SqliteResult};
use std::collections::{HashMap, VecDeque};
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::SystemTime;
pub type NodeId = i64;
pub const DEFAULT_MAX_HISTORY: usize = 64;
#[derive(Debug, Clone)]
pub struct SnapshotState {
pub outgoing: HashMap<NodeId, Vec<NodeId>>,
pub incoming: HashMap<NodeId, Vec<NodeId>>,
pub created_at: std::time::SystemTime,
}
impl SnapshotState {
pub fn new(
outgoing: &HashMap<NodeId, Vec<NodeId>>,
incoming: &HashMap<NodeId, Vec<NodeId>>,
) -> Self {
Self {
outgoing: outgoing.clone(),
incoming: incoming.clone(),
created_at: std::time::SystemTime::now(),
}
}
pub fn node_count(&self) -> usize {
self.outgoing.len()
}
pub fn edge_count(&self) -> usize {
self.outgoing.values().map(|adj| adj.len()).sum()
}
pub fn contains_node(&self, node_id: NodeId) -> bool {
self.outgoing.contains_key(&node_id)
}
pub fn get_outgoing(&self, node_id: NodeId) -> Option<&Vec<NodeId>> {
self.outgoing.get(&node_id)
}
pub fn get_incoming(&self, node_id: NodeId) -> Option<&Vec<NodeId>> {
self.incoming.get(&node_id)
}
}
#[derive(Debug, Clone)]
pub struct VersionedSnapshot {
pub version: u64,
pub created_at: SystemTime,
pub state: Arc<SnapshotState>,
}
#[derive(Debug)]
pub struct SnapshotManager {
current: ArcSwap<SnapshotState>,
history: Mutex<VecDeque<VersionedSnapshot>>,
max_history: usize,
next_version: AtomicU64,
}
impl SnapshotManager {
pub fn new() -> Self {
let initial_state = SnapshotState::new(&HashMap::new(), &HashMap::new());
Self {
current: ArcSwap::new(Arc::new(initial_state)),
history: Mutex::new(VecDeque::new()),
max_history: DEFAULT_MAX_HISTORY,
next_version: AtomicU64::new(1),
}
}
pub fn with_state(
outgoing: &HashMap<NodeId, Vec<NodeId>>,
incoming: &HashMap<NodeId, Vec<NodeId>>,
) -> Self {
let initial_state = SnapshotState::new(outgoing, incoming);
Self {
current: ArcSwap::new(Arc::new(initial_state)),
history: Mutex::new(VecDeque::new()),
max_history: DEFAULT_MAX_HISTORY,
next_version: AtomicU64::new(1),
}
}
pub fn with_max_history(max_history: usize) -> Self {
let initial_state = SnapshotState::new(&HashMap::new(), &HashMap::new());
Self {
current: ArcSwap::new(Arc::new(initial_state)),
history: Mutex::new(VecDeque::new()),
max_history: max_history.max(1),
next_version: AtomicU64::new(1),
}
}
pub fn update_snapshot(
&self,
outgoing: &HashMap<NodeId, Vec<NodeId>>,
incoming: &HashMap<NodeId, Vec<NodeId>>,
) {
let new_state = SnapshotState::new(outgoing, incoming);
#[cfg(debug_assertions)]
{
assert_eq!(
new_state.node_count(),
outgoing.len(),
"Snapshot state node count mismatch"
);
assert_eq!(
new_state.edge_count(),
outgoing.values().map(|v| v.len()).sum::<usize>(),
"Snapshot state edge count mismatch"
);
}
self.current.store(Arc::new(new_state));
}
pub fn acquire_snapshot(&self) -> Arc<SnapshotState> {
let state = self.current.load();
let snapshot = Arc::clone(&state);
#[cfg(debug_assertions)]
{
let node_count = snapshot.node_count();
let edge_count = snapshot.edge_count();
assert!(node_count <= 10_000_000, "Suspiciously large node count");
assert!(edge_count <= 100_000_000, "Suspiciously large edge count");
}
snapshot
}
pub fn current_snapshot(&self) -> Arc<SnapshotState> {
self.current.load().clone()
}
pub fn checkpoint(&self) -> u64 {
let state = self.current.load();
let version = self.next_version.fetch_add(1, Ordering::SeqCst);
let snapshot = VersionedSnapshot {
version,
created_at: SystemTime::now(),
state: Arc::clone(&state),
};
let mut history = self.history.lock();
if history.len() >= self.max_history {
history.pop_front();
}
history.push_back(snapshot);
version
}
pub fn as_of(&self, version: u64) -> Option<VersionedSnapshot> {
let history = self.history.lock();
history
.binary_search_by_key(&version, |v| v.version)
.ok()
.map(|i| history[i].clone())
}
pub fn as_of_at(&self, timestamp: SystemTime) -> Option<VersionedSnapshot> {
let history = self.history.lock();
if history.is_empty() {
return None;
}
let idx = history.partition_point(|v| v.created_at <= timestamp);
if idx == 0 {
return None;
}
Some(history[idx - 1].clone())
}
pub fn version_count(&self) -> usize {
self.history.lock().len()
}
pub fn oldest_version(&self) -> Option<u64> {
self.history.lock().front().map(|v| v.version)
}
pub fn newest_version(&self) -> Option<u64> {
self.history.lock().back().map(|v| v.version)
}
pub fn versions(&self) -> Vec<VersionedSnapshot> {
self.history.lock().iter().cloned().collect()
}
pub fn clear_history(&self) {
self.history.lock().clear();
}
}
impl Default for SnapshotManager {
fn default() -> Self {
Self::new()
}
}
pub struct GraphSnapshot {
state: Arc<SnapshotState>,
conn: Connection,
}
impl GraphSnapshot {
pub fn new(state: Arc<SnapshotState>, db_path: &str) -> SqliteResult<Self> {
let conn = Connection::open_with_flags(
db_path,
OpenFlags::SQLITE_OPEN_READ_ONLY | OpenFlags::SQLITE_OPEN_NO_MUTEX,
)?;
Ok(Self { state, conn })
}
pub fn state(&self) -> &Arc<SnapshotState> {
&self.state
}
pub fn connection(&self) -> &Connection {
&self.conn
}
pub fn node_count(&self) -> usize {
self.state.node_count()
}
pub fn edge_count(&self) -> usize {
self.state.edge_count()
}
pub fn contains_node(&self, node_id: NodeId) -> bool {
self.state.contains_node(node_id)
}
pub fn get_outgoing(&self, node_id: NodeId) -> Option<&Vec<NodeId>> {
self.state.get_outgoing(node_id)
}
pub fn get_incoming(&self, node_id: NodeId) -> Option<&Vec<NodeId>> {
self.state.get_incoming(node_id)
}
pub fn created_at(&self) -> std::time::SystemTime {
self.state.created_at
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_snapshot_state_creation() {
let mut outgoing = HashMap::new();
let mut incoming = HashMap::new();
outgoing.insert(1, vec![2, 3]);
incoming.insert(1, vec![]);
let state = SnapshotState::new(&outgoing, &incoming);
assert_eq!(state.node_count(), 1);
assert_eq!(state.edge_count(), 2);
assert!(state.contains_node(1));
assert!(!state.contains_node(2));
}
#[test]
fn test_snapshot_manager() {
let mut outgoing = HashMap::new();
let mut incoming = HashMap::new();
outgoing.insert(1, vec![2]);
incoming.insert(1, vec![]);
let manager = SnapshotManager::with_state(&outgoing, &incoming);
let snapshot = manager.acquire_snapshot();
assert_eq!(snapshot.node_count(), 1);
assert!(snapshot.contains_node(1));
outgoing.insert(2, vec![]);
incoming.insert(2, vec![1]);
manager.update_snapshot(&outgoing, &incoming);
let new_snapshot = manager.acquire_snapshot();
assert_eq!(new_snapshot.node_count(), 2);
assert_eq!(snapshot.node_count(), 1);
}
#[test]
fn test_checkpoint_assigns_monotonic_versions() {
let manager = SnapshotManager::new();
assert_eq!(manager.version_count(), 0);
let v1 = manager.checkpoint();
let v2 = manager.checkpoint();
let v3 = manager.checkpoint();
assert_eq!(v1, 1);
assert_eq!(v2, 2);
assert_eq!(v3, 3);
assert_eq!(manager.version_count(), 3);
assert_eq!(manager.oldest_version(), Some(1));
assert_eq!(manager.newest_version(), Some(3));
}
#[test]
fn test_as_of_finds_correct_version() {
let manager = SnapshotManager::with_state(&HashMap::from([(1, vec![2])]), &HashMap::new());
let v1 = manager.checkpoint();
manager.update_snapshot(&HashMap::from([(1, vec![2, 3])]), &HashMap::new());
let v2 = manager.checkpoint();
let snap1 = manager.as_of(v1).expect("version 1 should exist");
assert_eq!(snap1.version, v1);
assert_eq!(snap1.state.get_outgoing(1), Some(&vec![2]));
let snap2 = manager.as_of(v2).expect("version 2 should exist");
assert_eq!(snap2.version, v2);
assert_eq!(snap2.state.get_outgoing(1), Some(&vec![2, 3]));
let snap1_again = manager.as_of(v1).expect("version 1 still exists");
assert_eq!(snap1_again.state.get_outgoing(1), Some(&vec![2]));
}
#[test]
fn test_as_of_returns_none_for_unknown_version() {
let manager = SnapshotManager::new();
manager.checkpoint();
manager.checkpoint();
assert!(manager.as_of(999).is_none());
assert!(manager.as_of(0).is_none());
}
#[test]
fn test_bounded_eviction_keeps_max_history() {
let manager = SnapshotManager::with_max_history(3);
let v1 = manager.checkpoint();
let v2 = manager.checkpoint();
let v3 = manager.checkpoint();
assert_eq!(manager.version_count(), 3);
assert_eq!(manager.oldest_version(), Some(v1));
assert_eq!(manager.newest_version(), Some(v3));
assert!(manager.as_of(v3).is_some());
let v4 = manager.checkpoint();
assert_eq!(manager.version_count(), 3);
assert_eq!(manager.oldest_version(), Some(v2));
assert_eq!(manager.newest_version(), Some(v4));
assert!(manager.as_of(v1).is_none(), "v1 should have been evicted");
assert!(manager.as_of(v4).is_some());
}
#[test]
fn test_versions_iteration_is_oldest_first() {
let manager = SnapshotManager::new();
manager.checkpoint();
manager.checkpoint();
manager.checkpoint();
let versions = manager.versions();
assert_eq!(versions.len(), 3);
assert_eq!(versions[0].version, 1);
assert_eq!(versions[1].version, 2);
assert_eq!(versions[2].version, 3);
}
#[test]
fn test_clear_history_keeps_live_state() {
let manager = SnapshotManager::with_state(&HashMap::from([(1, vec![2])]), &HashMap::new());
manager.checkpoint();
manager.checkpoint();
assert_eq!(manager.version_count(), 2);
manager.clear_history();
assert_eq!(manager.version_count(), 0);
assert!(manager.oldest_version().is_none());
let snap = manager.acquire_snapshot();
assert_eq!(snap.node_count(), 1);
}
#[test]
fn test_as_of_at_finds_nearest_timestamp() {
use std::time::{Duration, UNIX_EPOCH};
let manager = SnapshotManager::new();
let mut history = manager.history.lock();
for i in 0u64..5 {
let state = SnapshotState::new(
&HashMap::from([(i as i64, vec![(i + 1) as i64])]),
&HashMap::new(),
);
history.push_back(VersionedSnapshot {
version: i + 1,
created_at: UNIX_EPOCH + Duration::from_secs(100 * (i + 1)),
state: Arc::new(state),
});
}
drop(history);
manager.next_version.store(6, Ordering::SeqCst);
let t250 = UNIX_EPOCH + Duration::from_secs(250);
let snap = manager.as_of_at(t250).expect("should find v2");
assert_eq!(snap.version, 2);
let t300 = UNIX_EPOCH + Duration::from_secs(300);
let snap = manager.as_of_at(t300).expect("should find v3");
assert_eq!(snap.version, 3);
let t50 = UNIX_EPOCH + Duration::from_secs(50);
assert!(manager.as_of_at(t50).is_none());
let t999 = UNIX_EPOCH + Duration::from_secs(999);
let snap = manager.as_of_at(t999).expect("should find v5");
assert_eq!(snap.version, 5);
}
#[test]
fn test_empty_history_queries() {
let manager = SnapshotManager::new();
assert_eq!(manager.version_count(), 0);
assert!(manager.oldest_version().is_none());
assert!(manager.newest_version().is_none());
assert!(manager.as_of(1).is_none());
assert!(manager.as_of_at(SystemTime::now()).is_none());
assert!(manager.versions().is_empty());
}
}