use serde::{Deserialize, Serialize};
use std::fs;
use std::path::PathBuf;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Checkpoint {
pub operation: String,
pub dataset: String,
pub file_path: String,
pub processed_count: usize,
pub last_offset: u64,
pub timestamp: String,
pub format: String,
pub graph: Option<String>,
pub total_size: u64,
}
pub struct CheckpointManager {
checkpoint_dir: PathBuf,
}
impl CheckpointManager {
pub fn new() -> Result<Self, Box<dyn std::error::Error>> {
let checkpoint_dir = Self::get_checkpoint_dir()?;
fs::create_dir_all(&checkpoint_dir)?;
Ok(Self { checkpoint_dir })
}
fn get_checkpoint_dir() -> Result<PathBuf, Box<dyn std::error::Error>> {
let config_dir = dirs::config_dir()
.ok_or("Could not determine config directory")?
.join("oxirs")
.join("checkpoints");
Ok(config_dir)
}
fn checkpoint_filename(&self, operation: &str, dataset: &str, file_path: &str) -> PathBuf {
let hash = format!(
"{:x}",
md5::compute(format!("{}{}{}", operation, dataset, file_path))
);
self.checkpoint_dir.join(format!("{}.checkpoint", hash))
}
pub fn save(&self, checkpoint: &Checkpoint) -> Result<(), Box<dyn std::error::Error>> {
let checkpoint_path = self.checkpoint_filename(
&checkpoint.operation,
&checkpoint.dataset,
&checkpoint.file_path,
);
let json = serde_json::to_string_pretty(checkpoint)?;
fs::write(checkpoint_path, json)?;
Ok(())
}
pub fn load(
&self,
operation: &str,
dataset: &str,
file_path: &str,
) -> Result<Option<Checkpoint>, Box<dyn std::error::Error>> {
let checkpoint_path = self.checkpoint_filename(operation, dataset, file_path);
if !checkpoint_path.exists() {
return Ok(None);
}
let content = fs::read_to_string(&checkpoint_path)?;
let checkpoint: Checkpoint = serde_json::from_str(&content)?;
Ok(Some(checkpoint))
}
pub fn delete(
&self,
operation: &str,
dataset: &str,
file_path: &str,
) -> Result<(), Box<dyn std::error::Error>> {
let checkpoint_path = self.checkpoint_filename(operation, dataset, file_path);
if checkpoint_path.exists() {
fs::remove_file(checkpoint_path)?;
}
Ok(())
}
pub fn list_all(&self) -> Result<Vec<Checkpoint>, Box<dyn std::error::Error>> {
let mut checkpoints = Vec::new();
if !self.checkpoint_dir.exists() {
return Ok(checkpoints);
}
for entry in fs::read_dir(&self.checkpoint_dir)? {
let entry = entry?;
let path = entry.path();
if path.extension().and_then(|s| s.to_str()) == Some("checkpoint") {
if let Ok(content) = fs::read_to_string(&path) {
if let Ok(checkpoint) = serde_json::from_str::<Checkpoint>(&content) {
checkpoints.push(checkpoint);
}
}
}
}
Ok(checkpoints)
}
pub fn exists(&self, operation: &str, dataset: &str, file_path: &str) -> bool {
let checkpoint_path = self.checkpoint_filename(operation, dataset, file_path);
checkpoint_path.exists()
}
pub fn progress_percentage(&self, checkpoint: &Checkpoint) -> f64 {
if checkpoint.total_size == 0 {
return 0.0;
}
(checkpoint.last_offset as f64 / checkpoint.total_size as f64) * 100.0
}
}
impl Default for CheckpointManager {
fn default() -> Self {
Self::new().expect("Failed to create checkpoint manager")
}
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::TempDir;
fn test_file_key() -> String {
std::env::temp_dir()
.join("oxirs_checkpoint_test.ttl")
.to_string_lossy()
.into_owned()
}
fn create_test_manager() -> (CheckpointManager, TempDir) {
let temp_dir = TempDir::new().unwrap();
let mut manager = CheckpointManager::new().unwrap();
manager.checkpoint_dir = temp_dir.path().to_path_buf();
fs::create_dir_all(&manager.checkpoint_dir).unwrap();
(manager, temp_dir)
}
#[test]
fn test_save_and_load_checkpoint() {
let (manager, _temp_dir) = create_test_manager();
let checkpoint = Checkpoint {
operation: "import".to_string(),
dataset: "testdb".to_string(),
file_path: test_file_key(),
processed_count: 1000,
last_offset: 5000,
timestamp: chrono::Local::now().to_rfc3339(),
format: "turtle".to_string(),
graph: None,
total_size: 10000,
};
manager.save(&checkpoint).unwrap();
let loaded = manager.load("import", "testdb", &test_file_key()).unwrap();
assert!(loaded.is_some());
let loaded = loaded.unwrap();
assert_eq!(loaded.processed_count, 1000);
assert_eq!(loaded.last_offset, 5000);
}
#[test]
fn test_checkpoint_exists() {
let (manager, _temp_dir) = create_test_manager();
assert!(!manager.exists("import", "testdb", &test_file_key()));
let checkpoint = Checkpoint {
operation: "import".to_string(),
dataset: "testdb".to_string(),
file_path: test_file_key(),
processed_count: 1000,
last_offset: 5000,
timestamp: chrono::Local::now().to_rfc3339(),
format: "turtle".to_string(),
graph: None,
total_size: 10000,
};
manager.save(&checkpoint).unwrap();
assert!(manager.exists("import", "testdb", &test_file_key()));
}
#[test]
fn test_delete_checkpoint() {
let (manager, _temp_dir) = create_test_manager();
let checkpoint = Checkpoint {
operation: "import".to_string(),
dataset: "testdb".to_string(),
file_path: test_file_key(),
processed_count: 1000,
last_offset: 5000,
timestamp: chrono::Local::now().to_rfc3339(),
format: "turtle".to_string(),
graph: None,
total_size: 10000,
};
manager.save(&checkpoint).unwrap();
assert!(manager.exists("import", "testdb", &test_file_key()));
manager
.delete("import", "testdb", &test_file_key())
.unwrap();
assert!(!manager.exists("import", "testdb", &test_file_key()));
}
#[test]
fn test_progress_percentage() {
let manager = CheckpointManager::new().unwrap();
let checkpoint = Checkpoint {
operation: "import".to_string(),
dataset: "testdb".to_string(),
file_path: test_file_key(),
processed_count: 1000,
last_offset: 5000,
timestamp: chrono::Local::now().to_rfc3339(),
format: "turtle".to_string(),
graph: None,
total_size: 10000,
};
let progress = manager.progress_percentage(&checkpoint);
assert_eq!(progress, 50.0);
}
}