use std::collections::HashSet;
use std::fs;
use std::sync::Arc;
use crate::constants::{parse_snapshot_gen, SNAPSHOTS_DIR, TRASH_DIR};
use crate::error::{RayError, Result};
use super::db::GraphDB;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum CheckpointStatus {
Idle,
Running,
Completing,
}
#[derive(Debug, Clone)]
pub struct CheckpointStats {
pub num_nodes: u64,
pub num_edges: u64,
pub snapshot_gen: u64,
pub duration_ms: u64,
}
pub fn should_checkpoint(db: &GraphDB) -> bool {
let delta = db.delta.read();
let created_nodes = delta.created_nodes.len();
let deleted_nodes = delta.deleted_nodes.len();
let total_edges = delta.total_edges_added() + delta.total_edges_deleted();
created_nodes + deleted_nodes + total_edges > 10_000
}
pub fn is_checkpoint_running(db: &GraphDB) -> bool {
matches!(
*db.checkpoint_status.lock(),
CheckpointStatus::Running | CheckpointStatus::Completing
)
}
fn checkpoint_impl(db: &mut GraphDB, manage_status: bool) -> Result<CheckpointStats> {
use std::time::Instant;
let start = Instant::now();
if manage_status {
let mut status = db.checkpoint_status.lock();
if !matches!(*status, CheckpointStatus::Idle) {
return Err(RayError::Internal("Checkpoint already running".to_string()));
}
*status = CheckpointStatus::Running;
}
if let Err(err) = db.optimize() {
*db.checkpoint_status.lock() = CheckpointStatus::Idle;
return Err(err);
}
*db.checkpoint_status.lock() = CheckpointStatus::Completing;
let duration_ms = start.elapsed().as_millis() as u64;
let (num_nodes, num_edges, snapshot_gen) = if let Some(ref snapshot) = db.snapshot {
(
snapshot.header.num_nodes,
snapshot.header.num_edges,
snapshot.header.generation,
)
} else {
(0, 0, 0)
};
*db.checkpoint_status.lock() = CheckpointStatus::Idle;
Ok(CheckpointStats {
num_nodes,
num_edges,
snapshot_gen,
duration_ms,
})
}
pub fn checkpoint(db: &mut GraphDB) -> Result<CheckpointStats> {
checkpoint_impl(db, true)
}
pub fn trigger_background_checkpoint(db: &mut GraphDB) -> Result<()> {
if is_checkpoint_running(db) {
return Ok(());
}
checkpoint(db).map(|_| ())
}
#[cfg(not(target_arch = "wasm32"))]
pub fn trigger_background_checkpoint_async(db: Arc<parking_lot::Mutex<GraphDB>>) -> Result<()> {
let db_guard = db.lock();
{
let mut status = db_guard.checkpoint_status.lock();
if !matches!(*status, CheckpointStatus::Idle) {
return Ok(());
}
*status = CheckpointStatus::Running;
}
drop(db_guard);
std::thread::spawn(move || {
let mut db = db.lock();
let result = checkpoint_impl(&mut db, false);
if result.is_err() {
*db.checkpoint_status.lock() = CheckpointStatus::Idle;
}
});
Ok(())
}
#[cfg(target_arch = "wasm32")]
pub fn trigger_background_checkpoint_async(db: Arc<parking_lot::Mutex<GraphDB>>) -> Result<()> {
let mut db = db.lock();
checkpoint_impl(&mut db, false).map(|_| ())
}
pub fn compact(db: &mut GraphDB) -> Result<CheckpointStats> {
checkpoint(db)
}
pub fn create_snapshot(db: &mut GraphDB) -> Result<u64> {
let stats = checkpoint(db)?;
Ok(stats.snapshot_gen)
}
pub fn prune_snapshots(db: &GraphDB, keep_count: usize) -> Result<usize> {
let manifest = match db.manifest.as_ref() {
Some(m) => m,
None => return Ok(0),
};
let snapshots_dir = db.path.join(SNAPSHOTS_DIR);
if !snapshots_dir.exists() {
return Ok(0);
}
let entries = match fs::read_dir(&snapshots_dir) {
Ok(e) => e,
Err(_) => return Ok(0),
};
let mut snapshots: Vec<(u64, std::path::PathBuf, std::ffi::OsString)> = Vec::new();
for entry in entries.flatten() {
let filename = entry.file_name();
let filename_str = filename.to_string_lossy();
if let Some(gen) = parse_snapshot_gen(&filename_str) {
snapshots.push((gen, entry.path(), filename));
}
}
if snapshots.is_empty() {
return Ok(0);
}
snapshots.sort_by(|a, b| b.0.cmp(&a.0));
let mut keep: HashSet<u64> = snapshots
.iter()
.take(keep_count)
.map(|(gen, _, _)| *gen)
.collect();
if manifest.active_snapshot_gen != 0 {
keep.insert(manifest.active_snapshot_gen);
}
if manifest.prev_snapshot_gen != 0 {
keep.insert(manifest.prev_snapshot_gen);
}
let mut deleted = 0usize;
for (gen, path, filename) in snapshots {
if keep.contains(&gen) {
continue;
}
if fs::remove_file(&path).is_ok() {
deleted += 1;
continue;
}
let trash_dir = db.path.join(TRASH_DIR);
let _ = fs::create_dir_all(&trash_dir);
if fs::rename(&path, trash_dir.join(&filename)).is_ok() {
deleted += 1;
}
}
Ok(deleted)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::graph::db::{close_graph_db, open_graph_db, OpenOptions};
use tempfile::tempdir;
#[test]
fn test_should_checkpoint_empty() {
let temp_dir = tempdir().unwrap();
let db = open_graph_db(temp_dir.path(), OpenOptions::new()).unwrap();
assert!(!should_checkpoint(&db));
close_graph_db(db).unwrap();
}
#[test]
fn test_is_checkpoint_running() {
let temp_dir = tempdir().unwrap();
let db = open_graph_db(temp_dir.path(), OpenOptions::new()).unwrap();
assert!(!is_checkpoint_running(&db));
close_graph_db(db).unwrap();
}
}