1use std::fs::{self, File, OpenOptions};
32use std::io::Write;
33use std::path::{Path, PathBuf};
34use std::time::Duration;
35
36use chrono::Utc;
37use fs2::FileExt;
38use serde_json;
39use thiserror::Error;
40
41use super::graph_config_schema::{GraphConfigFile, SCHEMA_VERSION};
42use super::graph_config_store::{GraphConfigPaths, GraphConfigStore};
43
44#[derive(Debug, Error)]
46pub enum PersistenceError {
47 #[error("IO error at {path}: {source}")]
49 IoError {
50 path: PathBuf,
52 #[source]
54 source: std::io::Error,
55 },
56
57 #[error("Failed to serialize config: {0}")]
59 SerializationError(String),
60
61 #[error("Failed to deserialize config: {0}")]
63 DeserializationError(String),
64
65 #[error("Failed to acquire lock at {path} within {timeout_ms}ms")]
67 LockTimeout {
68 path: PathBuf,
70 timeout_ms: u64,
72 },
73
74 #[error("Stale lock detected at {path}: {details}")]
76 StaleLock {
77 path: PathBuf,
79 details: String,
81 },
82
83 #[error("Corrupt config file at {path}: {reason}")]
85 CorruptConfig {
86 path: PathBuf,
88 reason: String,
90 },
91
92 #[error("No usable config found: {reason}")]
94 NoUsableConfig {
95 reason: String,
97 },
98
99 #[error("Integrity mismatch: expected {expected}, found {found}")]
101 IntegrityMismatch {
102 expected: String,
104 found: String,
106 },
107}
108
109pub type PersistenceResult<T> = Result<T, PersistenceError>;
111
112#[derive(Debug, Clone, PartialEq, Eq)]
114pub enum IntegrityStatus {
115 Ok,
117 Mismatch,
119 Unavailable,
121}
122
123#[derive(Debug, Clone, PartialEq, Eq)]
125pub enum SchemaStatus {
126 Ok,
128 Invalid,
130}
131
132#[derive(Debug, Clone)]
134pub struct LoadReport {
135 pub warnings: Vec<String>,
137 pub recovery_actions: Vec<String>,
139 pub integrity_status: IntegrityStatus,
141 pub schema_status: SchemaStatus,
143}
144
145impl Default for LoadReport {
146 fn default() -> Self {
147 Self {
148 warnings: Vec::new(),
149 recovery_actions: Vec::new(),
150 integrity_status: IntegrityStatus::Unavailable,
151 schema_status: SchemaStatus::Ok,
152 }
153 }
154}
155
156#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
158pub struct LockInfo {
159 pub pid: u32,
161 pub hostname: String,
163 pub acquired_at_utc: String,
165 pub tool: String,
167 #[serde(skip_serializing_if = "Option::is_none")]
169 pub command: Option<String>,
170}
171
172impl Default for LockInfo {
173 fn default() -> Self {
174 Self {
175 pid: std::process::id(),
176 hostname: hostname::get().map_or_else(
177 |_| "unknown".to_string(),
178 |h| h.to_string_lossy().to_string(),
179 ),
180 acquired_at_utc: Utc::now().to_rfc3339(),
181 tool: "cli".to_string(),
182 command: None,
183 }
184 }
185}
186
187pub struct ConfigPersistence {
192 paths: GraphConfigPaths,
193}
194
195impl ConfigPersistence {
196 #[must_use]
198 pub fn new(store: &GraphConfigStore) -> Self {
199 Self {
200 paths: store.paths().clone(),
201 }
202 }
203
204 #[must_use]
206 pub fn from_paths(paths: GraphConfigPaths) -> Self {
207 Self { paths }
208 }
209
210 pub fn load(&self) -> PersistenceResult<(GraphConfigFile, LoadReport)> {
222 let mut report = LoadReport::default();
223
224 let config_path = self.paths.config_file();
226 match Self::try_load_file(&config_path) {
227 Ok((config, file_report)) => {
228 report.warnings.extend(file_report.warnings);
229 report.integrity_status = file_report.integrity_status;
230 report.schema_status = file_report.schema_status;
231 return Ok((config, report));
232 }
233 Err(e) => {
234 report
235 .warnings
236 .push(format!("Failed to load config.json: {e}"));
237 }
238 }
239
240 let previous_path = self.paths.previous_file();
242 if previous_path.exists() {
243 report
244 .recovery_actions
245 .push("Attempting to load config.json.previous".to_string());
246
247 match Self::try_load_file(&previous_path) {
248 Ok((config, file_report)) => {
249 report.warnings.extend(file_report.warnings);
250 report.integrity_status = file_report.integrity_status;
251 report.schema_status = file_report.schema_status;
252 report
253 .recovery_actions
254 .push("Recovered from config.json.previous".to_string());
255 return Ok((config, report));
256 }
257 Err(e) => {
258 report
259 .warnings
260 .push(format!("Failed to load config.json.previous: {e}"));
261 }
262 }
263 }
264
265 Err(PersistenceError::NoUsableConfig {
267 reason: "Neither config.json nor config.json.previous could be loaded. \
268 Run `sqry config init` to create a new config file."
269 .to_string(),
270 })
271 }
272
273 fn try_load_file(path: &Path) -> PersistenceResult<(GraphConfigFile, LoadReport)> {
275 let mut report = LoadReport::default();
276
277 if !path.exists() {
278 return Err(PersistenceError::IoError {
279 path: path.to_path_buf(),
280 source: std::io::Error::new(std::io::ErrorKind::NotFound, "File not found"),
281 });
282 }
283
284 let content = fs::read_to_string(path).map_err(|e| PersistenceError::IoError {
286 path: path.to_path_buf(),
287 source: e,
288 })?;
289
290 let config: GraphConfigFile = serde_json::from_str(&content)
292 .map_err(|e| PersistenceError::DeserializationError(e.to_string()))?;
293
294 if config.schema_version != SCHEMA_VERSION {
296 report.schema_status = SchemaStatus::Invalid;
297 return Err(PersistenceError::CorruptConfig {
298 path: path.to_path_buf(),
299 reason: format!(
300 "Incompatible schema version: expected {}, found {}",
301 SCHEMA_VERSION, config.schema_version
302 ),
303 });
304 }
305
306 let computed_hash = Self::compute_integrity_hash(&config.config)?;
308 if config.integrity.normalized_hash.is_empty() {
309 report.integrity_status = IntegrityStatus::Unavailable;
310 } else if config.integrity.normalized_hash != computed_hash {
311 report.integrity_status = IntegrityStatus::Mismatch;
312 report.warnings.push(format!(
313 "Integrity hash mismatch (possibly manual edit). \
314 Expected: {}, Found: {}",
315 config.integrity.normalized_hash, computed_hash
316 ));
317 } else {
318 report.integrity_status = IntegrityStatus::Ok;
319 }
320
321 Ok((config, report))
322 }
323
324 #[must_use]
326 pub fn exists(&self) -> bool {
327 self.paths.config_file().exists() || self.paths.previous_file().exists()
328 }
329
330 pub fn save(
346 &self,
347 config: &mut GraphConfigFile,
348 lock_timeout_ms: u64,
349 tool: &str,
350 ) -> PersistenceResult<()> {
351 let config_dir = self.paths.config_dir();
353 if !config_dir.exists() {
354 fs::create_dir_all(&config_dir).map_err(|e| PersistenceError::IoError {
355 path: config_dir.clone(),
356 source: e,
357 })?;
358 }
359
360 let lock_guard = self.acquire_lock(lock_timeout_ms, tool)?;
362
363 config.metadata.updated_at = Utc::now().to_rfc3339();
365
366 let hash = Self::compute_integrity_hash(&config.config)?;
368 config.integrity.normalized_hash = hash;
369 config.integrity.last_verified_at = Utc::now().to_rfc3339();
370
371 let json = serde_json::to_string_pretty(config)
373 .map_err(|e| PersistenceError::SerializationError(e.to_string()))?;
374
375 self.atomic_write(&json)?;
377
378 drop(lock_guard);
380
381 Ok(())
382 }
383
384 pub fn init(&self, lock_timeout_ms: u64, tool: &str) -> PersistenceResult<GraphConfigFile> {
390 let mut config = GraphConfigFile::default();
391 self.save(&mut config, lock_timeout_ms, tool)?;
392 Ok(config)
393 }
394
395 fn atomic_write(&self, content: &str) -> PersistenceResult<()> {
401 let config_path = self.paths.config_file();
402 let config_dir = self.paths.config_dir();
403
404 let temp_name = format!(
406 "config.json.tmp.{}.{}",
407 std::process::id(),
408 uuid::Uuid::new_v4()
409 );
410 let temp_path = config_dir.join(&temp_name);
411
412 let mut temp_file = File::create(&temp_path).map_err(|e| PersistenceError::IoError {
414 path: temp_path.clone(),
415 source: e,
416 })?;
417
418 temp_file
419 .write_all(content.as_bytes())
420 .map_err(|e| PersistenceError::IoError {
421 path: temp_path.clone(),
422 source: e,
423 })?;
424
425 temp_file
427 .sync_all()
428 .map_err(|e| PersistenceError::IoError {
429 path: temp_path.clone(),
430 source: e,
431 })?;
432
433 drop(temp_file);
434
435 if config_path.exists() {
437 let previous_path = self.paths.previous_file();
438 fs::rename(&config_path, &previous_path).map_err(|e| PersistenceError::IoError {
439 path: config_path.clone(),
440 source: e,
441 })?;
442 }
443
444 fs::rename(&temp_path, &config_path).map_err(|e| PersistenceError::IoError {
446 path: temp_path.clone(),
447 source: e,
448 })?;
449
450 Self::fsync_dir(&config_dir)?;
452
453 Ok(())
454 }
455
456 #[cfg(unix)]
458 fn fsync_dir(dir: &Path) -> PersistenceResult<()> {
459 let dir_file = File::open(dir).map_err(|e| PersistenceError::IoError {
460 path: dir.to_path_buf(),
461 source: e,
462 })?;
463
464 dir_file.sync_all().map_err(|e| PersistenceError::IoError {
465 path: dir.to_path_buf(),
466 source: e,
467 })?;
468
469 Ok(())
470 }
471
472 #[cfg(not(unix))]
473 fn fsync_dir(_dir: &Path) -> PersistenceResult<()> {
474 Ok(())
477 }
478
479 fn compute_integrity_hash(
485 config: &super::graph_config_schema::GraphConfig,
486 ) -> PersistenceResult<String> {
487 let json = serde_json::to_string(config)
489 .map_err(|e| PersistenceError::SerializationError(e.to_string()))?;
490
491 let hash = blake3::hash(json.as_bytes());
493 Ok(hash.to_hex().to_string())
494 }
495
496 fn acquire_lock(&self, timeout_ms: u64, tool: &str) -> PersistenceResult<LockGuard> {
502 let lock_path = self.paths.lock_file();
503
504 let config_dir = self.paths.config_dir();
506 if !config_dir.exists() {
507 fs::create_dir_all(&config_dir).map_err(|e| PersistenceError::IoError {
508 path: config_dir,
509 source: e,
510 })?;
511 }
512
513 let lock_file = OpenOptions::new()
515 .read(true)
516 .write(true)
517 .create(true)
518 .truncate(false)
519 .open(&lock_path)
520 .map_err(|e| PersistenceError::IoError {
521 path: lock_path.clone(),
522 source: e,
523 })?;
524
525 let timeout = Duration::from_millis(timeout_ms);
527 let start = std::time::Instant::now();
528
529 loop {
530 if let Ok(()) = lock_file.try_lock_exclusive() {
531 let lock_info = LockInfo {
533 tool: tool.to_string(),
534 ..Default::default()
535 };
536 let info_json =
537 serde_json::to_string_pretty(&lock_info).unwrap_or_else(|_| "{}".to_string());
538
539 let _ = lock_file.set_len(0);
541 let _ = (&lock_file).write_all(info_json.as_bytes());
542 let _ = lock_file.sync_all();
543
544 return Ok(LockGuard {
545 file: lock_file,
546 path: lock_path,
547 });
548 }
549 if start.elapsed() >= timeout {
550 return Err(PersistenceError::LockTimeout {
551 path: lock_path,
552 timeout_ms,
553 });
554 }
555 std::thread::sleep(Duration::from_millis(50));
556 }
557 }
558
559 pub fn quarantine_corrupt(&self, path: &Path) -> PersistenceResult<PathBuf> {
569 let timestamp = Utc::now().format("%Y%m%dT%H%M%SZ").to_string();
570 let corrupt_path = self.paths.corrupt_file(×tamp);
571
572 fs::rename(path, &corrupt_path).map_err(|e| PersistenceError::IoError {
573 path: path.to_path_buf(),
574 source: e,
575 })?;
576
577 Ok(corrupt_path)
578 }
579
580 pub fn repair(&self, lock_timeout_ms: u64) -> PersistenceResult<RepairReport> {
586 let mut report = RepairReport::default();
587
588 let _lock_guard = self.acquire_lock(lock_timeout_ms, "cli")?;
589
590 let config_path = self.paths.config_file();
591 let previous_path = self.paths.previous_file();
592
593 let config_dir = self.paths.config_dir();
595 if let Ok(entries) = fs::read_dir(&config_dir) {
596 for entry in entries.flatten() {
597 let name = entry.file_name();
598 let name_str = name.to_string_lossy();
599 if name_str.starts_with("config.json.tmp.") {
600 let artifact_path = entry.path();
601 let quarantine_path = self.quarantine_corrupt(&artifact_path)?;
602 report.quarantined.push((artifact_path, quarantine_path));
603 }
604 }
605 }
606
607 if config_path.exists() {
609 match Self::try_load_file(&config_path) {
610 Ok(_) => {
611 report.config_status = "valid".to_string();
612 }
613 Err(e) => {
614 report.config_status = format!("corrupt: {e}");
615 let quarantine_path = self.quarantine_corrupt(&config_path)?;
616 report
617 .quarantined
618 .push((config_path.clone(), quarantine_path));
619
620 if previous_path.exists() {
622 fs::rename(&previous_path, &config_path).map_err(|e| {
623 PersistenceError::IoError {
624 path: previous_path.clone(),
625 source: e,
626 }
627 })?;
628 report.restored_from_previous = true;
629 }
630 }
631 }
632 } else if previous_path.exists() {
633 fs::rename(&previous_path, &config_path).map_err(|e| PersistenceError::IoError {
635 path: previous_path.clone(),
636 source: e,
637 })?;
638 report.restored_from_previous = true;
639 }
640
641 Ok(report)
642 }
643}
644
645struct LockGuard {
647 file: File,
648 #[allow(dead_code)] path: PathBuf,
650}
651
652impl Drop for LockGuard {
653 fn drop(&mut self) {
654 let _ = self.file.unlock();
655 }
657}
658
659#[derive(Debug, Default)]
661pub struct RepairReport {
662 pub config_status: String,
664 pub quarantined: Vec<(PathBuf, PathBuf)>,
666 pub restored_from_previous: bool,
668}
669
670#[cfg(test)]
675mod tests {
676 use super::*;
677 use tempfile::TempDir;
678
679 fn create_test_persistence() -> (TempDir, ConfigPersistence) {
680 let temp = TempDir::new().unwrap();
681 let paths = GraphConfigPaths::new(temp.path()).unwrap();
682 let persistence = ConfigPersistence::from_paths(paths);
683 (temp, persistence)
684 }
685
686 #[test]
687 fn test_init_creates_config() {
688 let (_temp, persistence) = create_test_persistence();
689
690 let config = persistence.init(5000, "test").unwrap();
691 assert_eq!(config.schema_version, SCHEMA_VERSION);
692
693 assert!(persistence.paths.config_file().exists());
695 }
696
697 #[test]
698 fn test_save_load_roundtrip() {
699 let (_temp, persistence) = create_test_persistence();
700
701 let mut config = GraphConfigFile::default();
703 config.config.limits.max_results = 12345;
704 persistence.save(&mut config, 5000, "test").unwrap();
705
706 let (loaded, report) = persistence.load().unwrap();
708 assert_eq!(loaded.config.limits.max_results, 12345);
709 assert_eq!(report.integrity_status, IntegrityStatus::Ok);
710 }
711
712 #[test]
713 fn test_integrity_hash_computed() {
714 let (_temp, persistence) = create_test_persistence();
715
716 let mut config = GraphConfigFile::default();
717 persistence.save(&mut config, 5000, "test").unwrap();
718
719 assert!(!config.integrity.normalized_hash.is_empty());
721 }
722
723 #[test]
724 fn test_previous_file_created_on_update() {
725 let (_temp, persistence) = create_test_persistence();
726
727 let mut config = GraphConfigFile::default();
729 config.config.limits.max_results = 100;
730 persistence.save(&mut config, 5000, "test").unwrap();
731
732 config.config.limits.max_results = 200;
734 persistence.save(&mut config, 5000, "test").unwrap();
735
736 assert!(persistence.paths.previous_file().exists());
738 }
739
740 #[test]
741 fn test_load_nonexistent_returns_error() {
742 let (_temp, persistence) = create_test_persistence();
743
744 let result = persistence.load();
745 assert!(result.is_err());
746 }
747
748 #[test]
749 fn test_exists_false_when_no_config() {
750 let (_temp, persistence) = create_test_persistence();
751 assert!(!persistence.exists());
752 }
753
754 #[test]
755 fn test_exists_true_after_init() {
756 let (_temp, persistence) = create_test_persistence();
757 persistence.init(5000, "test").unwrap();
758 assert!(persistence.exists());
759 }
760
761 #[test]
762 fn test_integrity_mismatch_warning() {
763 let (_temp, persistence) = create_test_persistence();
764
765 let mut config = GraphConfigFile::default();
767 persistence.save(&mut config, 5000, "test").unwrap();
768
769 let config_path = persistence.paths.config_file();
771 let content = fs::read_to_string(&config_path).unwrap();
772 let modified = content.replace("5000", "9999");
773 fs::write(&config_path, modified).unwrap();
774
775 let (_, report) = persistence.load().unwrap();
777 assert_eq!(report.integrity_status, IntegrityStatus::Mismatch);
778 assert!(!report.warnings.is_empty());
779 }
780
781 #[test]
782 fn test_repair_promotes_previous_when_config_missing() {
783 let (_temp, persistence) = create_test_persistence();
784
785 let mut config = GraphConfigFile::default();
787 config.config.limits.max_results = 42;
788 persistence.save(&mut config, 5000, "test").unwrap();
789
790 config.config.limits.max_results = 43;
792 persistence.save(&mut config, 5000, "test").unwrap();
793
794 assert!(persistence.paths.previous_file().exists());
796
797 fs::remove_file(persistence.paths.config_file()).unwrap();
799 assert!(!persistence.paths.config_file().exists());
800 assert!(persistence.paths.previous_file().exists());
801
802 let report = persistence.repair(5000).unwrap();
804 assert!(report.restored_from_previous);
805 assert!(persistence.paths.config_file().exists());
806 }
807
808 #[test]
809 fn test_quarantine_corrupt_file() {
810 let (_temp, persistence) = create_test_persistence();
811
812 fs::create_dir_all(persistence.paths.config_dir()).unwrap();
814 let config_path = persistence.paths.config_file();
815 fs::write(&config_path, "not valid json").unwrap();
816
817 let quarantine_path = persistence.quarantine_corrupt(&config_path).unwrap();
819 assert!(!config_path.exists());
820 assert!(quarantine_path.exists());
821 assert!(
822 quarantine_path
823 .file_name()
824 .unwrap()
825 .to_string_lossy()
826 .contains("corrupt")
827 );
828 }
829
830 #[test]
831 fn test_lock_timeout() {
832 let (_temp, persistence) = create_test_persistence();
833
834 let lock1 = persistence.acquire_lock(5000, "test1").unwrap();
836
837 let result = persistence.acquire_lock(100, "test2");
839 assert!(matches!(result, Err(PersistenceError::LockTimeout { .. })));
840
841 drop(lock1);
842 }
843
844 #[test]
845 fn test_lock_released_on_drop() {
846 let (_temp, persistence) = create_test_persistence();
847
848 {
849 let _lock = persistence.acquire_lock(5000, "test1").unwrap();
850 }
852
853 let _lock2 = persistence.acquire_lock(5000, "test2").unwrap();
855 }
856}