vectx_storage/
persistence.rs

1use anyhow::Result;
2use nix::unistd::{fork, ForkResult};
3use nix::sys::wait::waitpid;
4use nix::sys::wait::WaitStatus;
5use std::path::{Path, PathBuf};
6use std::process;
7use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
8use std::sync::Arc;
9use serde::{Deserialize, Serialize};
10
11static BGSAVE_IN_PROGRESS: AtomicBool = AtomicBool::new(false);
12static LAST_SAVE_TIME: AtomicU64 = AtomicU64::new(0);
13
14#[derive(Debug, Serialize, Deserialize)]
15pub struct SnapshotData {
16    pub collections: Vec<CollectionSnapshot>,
17    pub timestamp: u64,
18}
19
20#[derive(Debug, Serialize, Deserialize)]
21pub struct CollectionSnapshot {
22    pub name: String,
23    pub config: CollectionConfigSnapshot,
24    pub points: Vec<PointSnapshot>,
25}
26
27#[derive(Debug, Serialize, Deserialize)]
28pub struct CollectionConfigSnapshot {
29    pub vector_dim: usize,
30    pub distance: String,
31    pub use_hnsw: bool,
32    pub enable_bm25: bool,
33}
34
35#[derive(Debug, Serialize, Deserialize)]
36pub struct PointSnapshot {
37    pub id: String,
38    pub vector: Vec<f32>,
39    pub payload: Option<serde_json::Value>,
40}
41
42/// Fork-based background save
43pub struct ForkBasedPersistence {
44    #[allow(dead_code)]
45    data_dir: PathBuf,  // Stored for potential future use
46    rdb_filename: PathBuf,
47}
48
49impl ForkBasedPersistence {
50    pub fn new<P: AsRef<Path>>(data_dir: P) -> Self {
51        let data_dir = data_dir.as_ref().to_path_buf();
52        let rdb_filename = data_dir.join("dump.rdb");
53        
54        Self {
55            data_dir,
56            rdb_filename,
57        }
58    }
59
60    /// Start background save (bgsave) - forks a child process
61    pub fn bgsave(&self, collections: &std::collections::HashMap<String, Arc<vectx_core::Collection>>) -> Result<bool> {
62        // Check if already in progress
63        if BGSAVE_IN_PROGRESS.swap(true, Ordering::Acquire) {
64            return Ok(false); // Already in progress
65        }
66
67        // Fork the process
68        match unsafe { fork() } {
69            Ok(ForkResult::Parent { child, .. }) => {
70                // Parent process - continue serving requests
71                // The child will handle the snapshot
72                eprintln!("Background save started by pid {}", child);
73                
74                // Spawn a thread to wait for child and reset flag
75                std::thread::spawn(move || {
76                    match waitpid(child, None) {
77                        Ok(WaitStatus::Exited(_, code)) => {
78                            if code == 0 {
79                                eprintln!("Background save completed successfully");
80                                LAST_SAVE_TIME.store(
81                                    std::time::SystemTime::now()
82                                        .duration_since(std::time::UNIX_EPOCH)
83                                        .unwrap()
84                                        .as_secs(),
85                                    Ordering::Release,
86                                );
87                            } else {
88                                eprintln!("Background save failed with exit code {}", code);
89                            }
90                        }
91                        Ok(status) => {
92                            eprintln!("Background save child process: {:?}", status);
93                        }
94                        Err(e) => {
95                            eprintln!("Error waiting for background save: {}", e);
96                        }
97                    }
98                    BGSAVE_IN_PROGRESS.store(false, Ordering::Release);
99                });
100                
101                Ok(true)
102            }
103            Ok(ForkResult::Child) => {
104                // Child process - perform the snapshot
105                // Set process title (if possible)
106                eprintln!("Child process: Starting snapshot...");
107                
108                // Create snapshot data
109                let snapshot = self.create_snapshot(collections)?;
110                
111                // Write to temporary file first (atomic write)
112                let temp_file = self.rdb_filename.with_extension("tmp");
113                let data = bincode::serialize(&snapshot)
114                    .map_err(|e| anyhow::anyhow!("Serialization error: {}", e))?;
115                std::fs::write(&temp_file, &data)?;
116                
117                // Atomic rename
118                std::fs::rename(&temp_file, &self.rdb_filename)?;
119                
120                eprintln!("Child process: Snapshot saved to {:?}", self.rdb_filename);
121                
122                // Exit child process
123                process::exit(0);
124            }
125            Err(e) => {
126                BGSAVE_IN_PROGRESS.store(false, Ordering::Release);
127                Err(anyhow::anyhow!("Failed to fork: {}", e))
128            }
129        }
130    }
131
132    /// Create snapshot from collections (called in child process)
133    fn create_snapshot(
134        &self,
135        collections: &std::collections::HashMap<String, Arc<vectx_core::Collection>>,
136    ) -> Result<SnapshotData> {
137        let mut collection_snapshots = Vec::new();
138
139        for (name, collection) in collections {
140            let mut points = Vec::new();
141            
142            // Iterate through all points
143            for point in collection.iter() {
144                points.push(PointSnapshot {
145                    id: point.id.to_string(),
146                    vector: point.vector.as_slice().to_vec(),
147                    payload: point.payload.clone(),
148                });
149            }
150
151            collection_snapshots.push(CollectionSnapshot {
152                name: name.clone(),
153                config: CollectionConfigSnapshot {
154                    vector_dim: collection.vector_dim(),
155                    distance: format!("{:?}", collection.distance()),
156                    use_hnsw: true, // TODO: get from config
157                    enable_bm25: false, // TODO: get from config
158                },
159                points,
160            });
161        }
162
163        Ok(SnapshotData {
164            collections: collection_snapshots,
165            timestamp: std::time::SystemTime::now()
166                .duration_since(std::time::UNIX_EPOCH)?
167                .as_secs(),
168        })
169    }
170
171    /// Load snapshot from disk (on startup)
172    /// Handles corruption gracefully:
173    /// - Validates data integrity
174    /// - Backs up corrupt files
175    /// - Returns None instead of crashing
176    /// - Logs detailed warnings
177    pub fn load_snapshot(&self) -> Result<Option<SnapshotData>> {
178        if !self.rdb_filename.exists() {
179            eprintln!("[vectX] No snapshot file found, starting with empty database");
180            return Ok(None);
181        }
182        
183        // Check for version/marker file (indicates complete save)
184        let version_file = self.rdb_filename.with_extension("version");
185        if self.rdb_filename.exists() && !version_file.exists() {
186            // Snapshot exists but no version file - incomplete save (crash recovery)
187            eprintln!("[vectX] Warning: Snapshot file exists but version marker missing.");
188            eprintln!("[vectX] This indicates an incomplete save. Starting fresh.");
189            self.backup_and_remove_corrupt_file("incomplete");
190            return Ok(None);
191        }
192
193        // Read the file
194        let data = match std::fs::read(&self.rdb_filename) {
195            Ok(d) => d,
196            Err(e) => {
197                eprintln!("[vectX] Warning: Could not read snapshot file: {}", e);
198                eprintln!("[vectX] Starting with empty database.");
199                return Ok(None);
200            }
201        };
202        
203        // Check minimum size (basic integrity check like Redis)
204        if data.len() < 16 {
205            eprintln!("[vectX] Warning: Snapshot file too small ({} bytes), likely corrupt", data.len());
206            self.backup_and_remove_corrupt_file("too_small");
207            return Ok(None);
208        }
209        
210        // Deserialize with error handling (Redis: skip corrupt entries where possible)
211        match bincode::deserialize(&data) {
212            Ok(snapshot) => {
213                eprintln!("[vectX] Successfully loaded snapshot ({} bytes)", data.len());
214                Ok(Some(snapshot))
215            }
216            Err(e) => {
217                // Data is corrupted - backup and start fresh
218                eprintln!("[vectX] Warning: Snapshot data is corrupted: {}", e);
219                eprintln!("[vectX] Starting with empty database.");
220                self.backup_and_remove_corrupt_file("corrupt");
221                Ok(None)
222            }
223        }
224    }
225    
226    /// Backup corrupt file and remove original (Redis pattern: preserve for debugging)
227    fn backup_and_remove_corrupt_file(&self, reason: &str) {
228        let timestamp = std::time::SystemTime::now()
229            .duration_since(std::time::UNIX_EPOCH)
230            .map(|d| d.as_secs())
231            .unwrap_or(0);
232        
233        let backup_name = format!("dump.{}.{}.bak", reason, timestamp);
234        let backup_path = self.rdb_filename.with_file_name(backup_name);
235        
236        if let Err(e) = std::fs::rename(&self.rdb_filename, &backup_path) {
237            eprintln!("[vectX] Could not backup corrupt file: {}", e);
238            // Try to delete it instead
239            if let Err(del_err) = std::fs::remove_file(&self.rdb_filename) {
240                eprintln!("[vectX] Could not delete corrupt file: {}", del_err);
241            }
242        } else {
243            eprintln!("[vectX] Corrupt snapshot backed up to: {:?}", backup_path);
244        }
245        
246        // Also remove version file if it exists
247        let version_file = self.rdb_filename.with_extension("version");
248        let _ = std::fs::remove_file(&version_file);
249    }
250
251    /// Check if background save is in progress
252    pub fn is_bgsave_in_progress() -> bool {
253        BGSAVE_IN_PROGRESS.load(Ordering::Acquire)
254    }
255
256    /// Get last save time
257    pub fn last_save_time() -> u64 {
258        LAST_SAVE_TIME.load(Ordering::Acquire)
259    }
260
261    /// Force save (synchronous, blocks until complete)
262    /// Uses atomic rename pattern and version markers for data integrity
263    pub fn save(&self, collections: &std::collections::HashMap<String, Arc<vectx_core::Collection>>) -> Result<()> {
264        let snapshot = self.create_snapshot(collections)?;
265        let temp_file = self.rdb_filename.with_extension("tmp");
266        let version_file = self.rdb_filename.with_extension("version");
267        
268        // Serialize data
269        let data = bincode::serialize(&snapshot)
270            .map_err(|e| anyhow::anyhow!("Serialization error: {}", e))?;
271        
272        // Write to temp file first (atomic write pattern from Redis)
273        std::fs::write(&temp_file, &data)?;
274        
275        // Atomic rename (Redis pattern - prevents partial writes)
276        std::fs::rename(&temp_file, &self.rdb_filename)?;
277        
278        // Write version marker (indicates complete save)
279        let version_data = format!("vectx:0.1.0:{}", data.len());
280        std::fs::write(&version_file, version_data)?;
281        
282        eprintln!("[vectX] Snapshot saved ({} bytes)", data.len());
283        Ok(())
284    }
285}
286