use std::fs::{self, File, OpenOptions};
use std::io::Write;
use std::path::{Path, PathBuf};
use std::time::Duration;
use chrono::Utc;
use fs2::FileExt;
use serde_json;
use thiserror::Error;
use super::graph_config_schema::{GraphConfigFile, SCHEMA_VERSION};
use super::graph_config_store::{GraphConfigPaths, GraphConfigStore};
#[derive(Debug, Error)]
pub enum PersistenceError {
#[error("IO error at {path}: {source}")]
IoError {
path: PathBuf,
#[source]
source: std::io::Error,
},
#[error("Failed to serialize config: {0}")]
SerializationError(String),
#[error("Failed to deserialize config: {0}")]
DeserializationError(String),
#[error("Failed to acquire lock at {path} within {timeout_ms}ms")]
LockTimeout {
path: PathBuf,
timeout_ms: u64,
},
#[error("Stale lock detected at {path}: {details}")]
StaleLock {
path: PathBuf,
details: String,
},
#[error("Corrupt config file at {path}: {reason}")]
CorruptConfig {
path: PathBuf,
reason: String,
},
#[error("No usable config found: {reason}")]
NoUsableConfig {
reason: String,
},
#[error("Integrity mismatch: expected {expected}, found {found}")]
IntegrityMismatch {
expected: String,
found: String,
},
}
pub type PersistenceResult<T> = Result<T, PersistenceError>;
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum IntegrityStatus {
Ok,
Mismatch,
Unavailable,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum SchemaStatus {
Ok,
Invalid,
}
#[derive(Debug, Clone)]
pub struct LoadReport {
pub warnings: Vec<String>,
pub recovery_actions: Vec<String>,
pub integrity_status: IntegrityStatus,
pub schema_status: SchemaStatus,
}
impl Default for LoadReport {
fn default() -> Self {
Self {
warnings: Vec::new(),
recovery_actions: Vec::new(),
integrity_status: IntegrityStatus::Unavailable,
schema_status: SchemaStatus::Ok,
}
}
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct LockInfo {
pub pid: u32,
pub hostname: String,
pub acquired_at_utc: String,
pub tool: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub command: Option<String>,
}
impl Default for LockInfo {
fn default() -> Self {
Self {
pid: std::process::id(),
hostname: hostname::get().map_or_else(
|_| "unknown".to_string(),
|h| h.to_string_lossy().to_string(),
),
acquired_at_utc: Utc::now().to_rfc3339(),
tool: "cli".to_string(),
command: None,
}
}
}
pub struct ConfigPersistence {
paths: GraphConfigPaths,
}
impl ConfigPersistence {
#[must_use]
pub fn new(store: &GraphConfigStore) -> Self {
Self {
paths: store.paths().clone(),
}
}
#[must_use]
pub fn from_paths(paths: GraphConfigPaths) -> Self {
Self { paths }
}
pub fn load(&self) -> PersistenceResult<(GraphConfigFile, LoadReport)> {
let mut report = LoadReport::default();
let config_path = self.paths.config_file();
match Self::try_load_file(&config_path) {
Ok((config, file_report)) => {
report.warnings.extend(file_report.warnings);
report.integrity_status = file_report.integrity_status;
report.schema_status = file_report.schema_status;
return Ok((config, report));
}
Err(e) => {
report
.warnings
.push(format!("Failed to load config.json: {e}"));
}
}
let previous_path = self.paths.previous_file();
if previous_path.exists() {
report
.recovery_actions
.push("Attempting to load config.json.previous".to_string());
match Self::try_load_file(&previous_path) {
Ok((config, file_report)) => {
report.warnings.extend(file_report.warnings);
report.integrity_status = file_report.integrity_status;
report.schema_status = file_report.schema_status;
report
.recovery_actions
.push("Recovered from config.json.previous".to_string());
return Ok((config, report));
}
Err(e) => {
report
.warnings
.push(format!("Failed to load config.json.previous: {e}"));
}
}
}
Err(PersistenceError::NoUsableConfig {
reason: "Neither config.json nor config.json.previous could be loaded. \
Run `sqry config init` to create a new config file."
.to_string(),
})
}
fn try_load_file(path: &Path) -> PersistenceResult<(GraphConfigFile, LoadReport)> {
let mut report = LoadReport::default();
if !path.exists() {
return Err(PersistenceError::IoError {
path: path.to_path_buf(),
source: std::io::Error::new(std::io::ErrorKind::NotFound, "File not found"),
});
}
let content = fs::read_to_string(path).map_err(|e| PersistenceError::IoError {
path: path.to_path_buf(),
source: e,
})?;
let config: GraphConfigFile = serde_json::from_str(&content)
.map_err(|e| PersistenceError::DeserializationError(e.to_string()))?;
if config.schema_version != SCHEMA_VERSION {
report.schema_status = SchemaStatus::Invalid;
return Err(PersistenceError::CorruptConfig {
path: path.to_path_buf(),
reason: format!(
"Incompatible schema version: expected {}, found {}",
SCHEMA_VERSION, config.schema_version
),
});
}
let computed_hash = Self::compute_integrity_hash(&config.config)?;
if config.integrity.normalized_hash.is_empty() {
report.integrity_status = IntegrityStatus::Unavailable;
} else if config.integrity.normalized_hash != computed_hash {
report.integrity_status = IntegrityStatus::Mismatch;
report.warnings.push(format!(
"Integrity hash mismatch (possibly manual edit). \
Expected: {}, Found: {}",
config.integrity.normalized_hash, computed_hash
));
} else {
report.integrity_status = IntegrityStatus::Ok;
}
Ok((config, report))
}
#[must_use]
pub fn exists(&self) -> bool {
self.paths.config_file().exists() || self.paths.previous_file().exists()
}
pub fn save(
&self,
config: &mut GraphConfigFile,
lock_timeout_ms: u64,
tool: &str,
) -> PersistenceResult<()> {
let config_dir = self.paths.config_dir();
if !config_dir.exists() {
fs::create_dir_all(&config_dir).map_err(|e| PersistenceError::IoError {
path: config_dir.clone(),
source: e,
})?;
}
let lock_guard = self.acquire_lock(lock_timeout_ms, tool)?;
config.metadata.updated_at = Utc::now().to_rfc3339();
let hash = Self::compute_integrity_hash(&config.config)?;
config.integrity.normalized_hash = hash;
config.integrity.last_verified_at = Utc::now().to_rfc3339();
let json = serde_json::to_string_pretty(config)
.map_err(|e| PersistenceError::SerializationError(e.to_string()))?;
self.atomic_write(&json)?;
drop(lock_guard);
Ok(())
}
pub fn init(&self, lock_timeout_ms: u64, tool: &str) -> PersistenceResult<GraphConfigFile> {
let mut config = GraphConfigFile::default();
self.save(&mut config, lock_timeout_ms, tool)?;
Ok(config)
}
fn atomic_write(&self, content: &str) -> PersistenceResult<()> {
let config_path = self.paths.config_file();
let config_dir = self.paths.config_dir();
let temp_name = format!(
"config.json.tmp.{}.{}",
std::process::id(),
uuid::Uuid::new_v4()
);
let temp_path = config_dir.join(&temp_name);
let mut temp_file = File::create(&temp_path).map_err(|e| PersistenceError::IoError {
path: temp_path.clone(),
source: e,
})?;
temp_file
.write_all(content.as_bytes())
.map_err(|e| PersistenceError::IoError {
path: temp_path.clone(),
source: e,
})?;
temp_file
.sync_all()
.map_err(|e| PersistenceError::IoError {
path: temp_path.clone(),
source: e,
})?;
drop(temp_file);
if config_path.exists() {
let previous_path = self.paths.previous_file();
fs::rename(&config_path, &previous_path).map_err(|e| PersistenceError::IoError {
path: config_path.clone(),
source: e,
})?;
}
fs::rename(&temp_path, &config_path).map_err(|e| PersistenceError::IoError {
path: temp_path.clone(),
source: e,
})?;
Self::fsync_dir(&config_dir)?;
Ok(())
}
#[cfg(unix)]
fn fsync_dir(dir: &Path) -> PersistenceResult<()> {
let dir_file = File::open(dir).map_err(|e| PersistenceError::IoError {
path: dir.to_path_buf(),
source: e,
})?;
dir_file.sync_all().map_err(|e| PersistenceError::IoError {
path: dir.to_path_buf(),
source: e,
})?;
Ok(())
}
#[cfg(not(unix))]
fn fsync_dir(_dir: &Path) -> PersistenceResult<()> {
Ok(())
}
fn compute_integrity_hash(
config: &super::graph_config_schema::GraphConfig,
) -> PersistenceResult<String> {
let json = serde_json::to_string(config)
.map_err(|e| PersistenceError::SerializationError(e.to_string()))?;
let hash = blake3::hash(json.as_bytes());
Ok(hash.to_hex().to_string())
}
fn acquire_lock(&self, timeout_ms: u64, tool: &str) -> PersistenceResult<LockGuard> {
let lock_path = self.paths.lock_file();
let config_dir = self.paths.config_dir();
if !config_dir.exists() {
fs::create_dir_all(&config_dir).map_err(|e| PersistenceError::IoError {
path: config_dir,
source: e,
})?;
}
let lock_file = OpenOptions::new()
.read(true)
.write(true)
.create(true)
.truncate(false)
.open(&lock_path)
.map_err(|e| PersistenceError::IoError {
path: lock_path.clone(),
source: e,
})?;
let timeout = Duration::from_millis(timeout_ms);
let start = std::time::Instant::now();
loop {
if let Ok(()) = lock_file.try_lock_exclusive() {
let lock_info = LockInfo {
tool: tool.to_string(),
..Default::default()
};
let info_json =
serde_json::to_string_pretty(&lock_info).unwrap_or_else(|_| "{}".to_string());
let _ = lock_file.set_len(0);
let _ = (&lock_file).write_all(info_json.as_bytes());
let _ = lock_file.sync_all();
return Ok(LockGuard {
file: lock_file,
path: lock_path,
});
}
if start.elapsed() >= timeout {
return Err(PersistenceError::LockTimeout {
path: lock_path,
timeout_ms,
});
}
std::thread::sleep(Duration::from_millis(50));
}
}
pub fn quarantine_corrupt(&self, path: &Path) -> PersistenceResult<PathBuf> {
let timestamp = Utc::now().format("%Y%m%dT%H%M%SZ").to_string();
let corrupt_path = self.paths.corrupt_file(×tamp);
fs::rename(path, &corrupt_path).map_err(|e| PersistenceError::IoError {
path: path.to_path_buf(),
source: e,
})?;
Ok(corrupt_path)
}
pub fn repair(&self, lock_timeout_ms: u64) -> PersistenceResult<RepairReport> {
let mut report = RepairReport::default();
let _lock_guard = self.acquire_lock(lock_timeout_ms, "cli")?;
let config_path = self.paths.config_file();
let previous_path = self.paths.previous_file();
let config_dir = self.paths.config_dir();
if let Ok(entries) = fs::read_dir(&config_dir) {
for entry in entries.flatten() {
let name = entry.file_name();
let name_str = name.to_string_lossy();
if name_str.starts_with("config.json.tmp.") {
let artifact_path = entry.path();
let quarantine_path = self.quarantine_corrupt(&artifact_path)?;
report.quarantined.push((artifact_path, quarantine_path));
}
}
}
if config_path.exists() {
match Self::try_load_file(&config_path) {
Ok(_) => {
report.config_status = "valid".to_string();
}
Err(e) => {
report.config_status = format!("corrupt: {e}");
let quarantine_path = self.quarantine_corrupt(&config_path)?;
report
.quarantined
.push((config_path.clone(), quarantine_path));
if previous_path.exists() {
fs::rename(&previous_path, &config_path).map_err(|e| {
PersistenceError::IoError {
path: previous_path.clone(),
source: e,
}
})?;
report.restored_from_previous = true;
}
}
}
} else if previous_path.exists() {
fs::rename(&previous_path, &config_path).map_err(|e| PersistenceError::IoError {
path: previous_path.clone(),
source: e,
})?;
report.restored_from_previous = true;
}
Ok(report)
}
}
struct LockGuard {
file: File,
#[allow(dead_code)] path: PathBuf,
}
impl Drop for LockGuard {
fn drop(&mut self) {
let _ = self.file.unlock();
}
}
#[derive(Debug, Default)]
pub struct RepairReport {
pub config_status: String,
pub quarantined: Vec<(PathBuf, PathBuf)>,
pub restored_from_previous: bool,
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::TempDir;
fn create_test_persistence() -> (TempDir, ConfigPersistence) {
let temp = TempDir::new().unwrap();
let paths = GraphConfigPaths::new(temp.path()).unwrap();
let persistence = ConfigPersistence::from_paths(paths);
(temp, persistence)
}
#[test]
fn test_init_creates_config() {
let (_temp, persistence) = create_test_persistence();
let config = persistence.init(5000, "test").unwrap();
assert_eq!(config.schema_version, SCHEMA_VERSION);
assert!(persistence.paths.config_file().exists());
}
#[test]
fn test_save_load_roundtrip() {
let (_temp, persistence) = create_test_persistence();
let mut config = GraphConfigFile::default();
config.config.limits.max_results = 12345;
persistence.save(&mut config, 5000, "test").unwrap();
let (loaded, report) = persistence.load().unwrap();
assert_eq!(loaded.config.limits.max_results, 12345);
assert_eq!(report.integrity_status, IntegrityStatus::Ok);
}
#[test]
fn test_integrity_hash_computed() {
let (_temp, persistence) = create_test_persistence();
let mut config = GraphConfigFile::default();
persistence.save(&mut config, 5000, "test").unwrap();
assert!(!config.integrity.normalized_hash.is_empty());
}
#[test]
fn test_previous_file_created_on_update() {
let (_temp, persistence) = create_test_persistence();
let mut config = GraphConfigFile::default();
config.config.limits.max_results = 100;
persistence.save(&mut config, 5000, "test").unwrap();
config.config.limits.max_results = 200;
persistence.save(&mut config, 5000, "test").unwrap();
assert!(persistence.paths.previous_file().exists());
}
#[test]
fn test_load_nonexistent_returns_error() {
let (_temp, persistence) = create_test_persistence();
let result = persistence.load();
assert!(result.is_err());
}
#[test]
fn test_exists_false_when_no_config() {
let (_temp, persistence) = create_test_persistence();
assert!(!persistence.exists());
}
#[test]
fn test_exists_true_after_init() {
let (_temp, persistence) = create_test_persistence();
persistence.init(5000, "test").unwrap();
assert!(persistence.exists());
}
#[test]
fn test_integrity_mismatch_warning() {
let (_temp, persistence) = create_test_persistence();
let mut config = GraphConfigFile::default();
persistence.save(&mut config, 5000, "test").unwrap();
let config_path = persistence.paths.config_file();
let content = fs::read_to_string(&config_path).unwrap();
let modified = content.replace("5000", "9999");
fs::write(&config_path, modified).unwrap();
let (_, report) = persistence.load().unwrap();
assert_eq!(report.integrity_status, IntegrityStatus::Mismatch);
assert!(!report.warnings.is_empty());
}
#[test]
fn test_repair_promotes_previous_when_config_missing() {
let (_temp, persistence) = create_test_persistence();
let mut config = GraphConfigFile::default();
config.config.limits.max_results = 42;
persistence.save(&mut config, 5000, "test").unwrap();
config.config.limits.max_results = 43;
persistence.save(&mut config, 5000, "test").unwrap();
assert!(persistence.paths.previous_file().exists());
fs::remove_file(persistence.paths.config_file()).unwrap();
assert!(!persistence.paths.config_file().exists());
assert!(persistence.paths.previous_file().exists());
let report = persistence.repair(5000).unwrap();
assert!(report.restored_from_previous);
assert!(persistence.paths.config_file().exists());
}
#[test]
fn test_quarantine_corrupt_file() {
let (_temp, persistence) = create_test_persistence();
fs::create_dir_all(persistence.paths.config_dir()).unwrap();
let config_path = persistence.paths.config_file();
fs::write(&config_path, "not valid json").unwrap();
let quarantine_path = persistence.quarantine_corrupt(&config_path).unwrap();
assert!(!config_path.exists());
assert!(quarantine_path.exists());
assert!(
quarantine_path
.file_name()
.unwrap()
.to_string_lossy()
.contains("corrupt")
);
}
#[test]
fn test_lock_timeout() {
let (_temp, persistence) = create_test_persistence();
let lock1 = persistence.acquire_lock(5000, "test1").unwrap();
let result = persistence.acquire_lock(100, "test2");
assert!(matches!(result, Err(PersistenceError::LockTimeout { .. })));
drop(lock1);
}
#[test]
fn test_lock_released_on_drop() {
let (_temp, persistence) = create_test_persistence();
{
let _lock = persistence.acquire_lock(5000, "test1").unwrap();
}
let _lock2 = persistence.acquire_lock(5000, "test2").unwrap();
}
}