1use anyhow::Result;
2use nix::unistd::{fork, ForkResult};
3use nix::sys::wait::waitpid;
4use nix::sys::wait::WaitStatus;
5use std::path::{Path, PathBuf};
6use std::process;
7use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
8use std::sync::Arc;
9use serde::{Deserialize, Serialize};
10
11static BGSAVE_IN_PROGRESS: AtomicBool = AtomicBool::new(false);
12static LAST_SAVE_TIME: AtomicU64 = AtomicU64::new(0);
13
14#[derive(Debug, Serialize, Deserialize)]
15pub struct SnapshotData {
16 pub collections: Vec<CollectionSnapshot>,
17 pub timestamp: u64,
18}
19
20#[derive(Debug, Serialize, Deserialize)]
21pub struct CollectionSnapshot {
22 pub name: String,
23 pub config: CollectionConfigSnapshot,
24 pub points: Vec<PointSnapshot>,
25}
26
27#[derive(Debug, Serialize, Deserialize)]
28pub struct CollectionConfigSnapshot {
29 pub vector_dim: usize,
30 pub distance: String,
31 pub use_hnsw: bool,
32 pub enable_bm25: bool,
33}
34
35#[derive(Debug, Serialize, Deserialize)]
36pub struct PointSnapshot {
37 pub id: String,
38 pub vector: Vec<f32>,
39 pub payload: Option<serde_json::Value>,
40}
41
42pub struct ForkBasedPersistence {
44 #[allow(dead_code)]
45 data_dir: PathBuf, rdb_filename: PathBuf,
47}
48
49impl ForkBasedPersistence {
50 pub fn new<P: AsRef<Path>>(data_dir: P) -> Self {
51 let data_dir = data_dir.as_ref().to_path_buf();
52 let rdb_filename = data_dir.join("dump.rdb");
53
54 Self {
55 data_dir,
56 rdb_filename,
57 }
58 }
59
60 pub fn bgsave(&self, collections: &std::collections::HashMap<String, Arc<vectx_core::Collection>>) -> Result<bool> {
62 if BGSAVE_IN_PROGRESS.swap(true, Ordering::Acquire) {
64 return Ok(false); }
66
67 match unsafe { fork() } {
69 Ok(ForkResult::Parent { child, .. }) => {
70 eprintln!("Background save started by pid {}", child);
73
74 std::thread::spawn(move || {
76 match waitpid(child, None) {
77 Ok(WaitStatus::Exited(_, code)) => {
78 if code == 0 {
79 eprintln!("Background save completed successfully");
80 LAST_SAVE_TIME.store(
81 std::time::SystemTime::now()
82 .duration_since(std::time::UNIX_EPOCH)
83 .unwrap()
84 .as_secs(),
85 Ordering::Release,
86 );
87 } else {
88 eprintln!("Background save failed with exit code {}", code);
89 }
90 }
91 Ok(status) => {
92 eprintln!("Background save child process: {:?}", status);
93 }
94 Err(e) => {
95 eprintln!("Error waiting for background save: {}", e);
96 }
97 }
98 BGSAVE_IN_PROGRESS.store(false, Ordering::Release);
99 });
100
101 Ok(true)
102 }
103 Ok(ForkResult::Child) => {
104 eprintln!("Child process: Starting snapshot...");
107
108 let snapshot = self.create_snapshot(collections)?;
110
111 let temp_file = self.rdb_filename.with_extension("tmp");
113 let data = bincode::serialize(&snapshot)
114 .map_err(|e| anyhow::anyhow!("Serialization error: {}", e))?;
115 std::fs::write(&temp_file, &data)?;
116
117 std::fs::rename(&temp_file, &self.rdb_filename)?;
119
120 eprintln!("Child process: Snapshot saved to {:?}", self.rdb_filename);
121
122 process::exit(0);
124 }
125 Err(e) => {
126 BGSAVE_IN_PROGRESS.store(false, Ordering::Release);
127 Err(anyhow::anyhow!("Failed to fork: {}", e))
128 }
129 }
130 }
131
132 fn create_snapshot(
134 &self,
135 collections: &std::collections::HashMap<String, Arc<vectx_core::Collection>>,
136 ) -> Result<SnapshotData> {
137 let mut collection_snapshots = Vec::new();
138
139 for (name, collection) in collections {
140 let mut points = Vec::new();
141
142 for point in collection.iter() {
144 points.push(PointSnapshot {
145 id: point.id.to_string(),
146 vector: point.vector.as_slice().to_vec(),
147 payload: point.payload.clone(),
148 });
149 }
150
151 collection_snapshots.push(CollectionSnapshot {
152 name: name.clone(),
153 config: CollectionConfigSnapshot {
154 vector_dim: collection.vector_dim(),
155 distance: format!("{:?}", collection.distance()),
156 use_hnsw: true, enable_bm25: false, },
159 points,
160 });
161 }
162
163 Ok(SnapshotData {
164 collections: collection_snapshots,
165 timestamp: std::time::SystemTime::now()
166 .duration_since(std::time::UNIX_EPOCH)?
167 .as_secs(),
168 })
169 }
170
171 pub fn load_snapshot(&self) -> Result<Option<SnapshotData>> {
178 if !self.rdb_filename.exists() {
179 eprintln!("[vectX] No snapshot file found, starting with empty database");
180 return Ok(None);
181 }
182
183 let version_file = self.rdb_filename.with_extension("version");
185 if self.rdb_filename.exists() && !version_file.exists() {
186 eprintln!("[vectX] Warning: Snapshot file exists but version marker missing.");
188 eprintln!("[vectX] This indicates an incomplete save. Starting fresh.");
189 self.backup_and_remove_corrupt_file("incomplete");
190 return Ok(None);
191 }
192
193 let data = match std::fs::read(&self.rdb_filename) {
195 Ok(d) => d,
196 Err(e) => {
197 eprintln!("[vectX] Warning: Could not read snapshot file: {}", e);
198 eprintln!("[vectX] Starting with empty database.");
199 return Ok(None);
200 }
201 };
202
203 if data.len() < 16 {
205 eprintln!("[vectX] Warning: Snapshot file too small ({} bytes), likely corrupt", data.len());
206 self.backup_and_remove_corrupt_file("too_small");
207 return Ok(None);
208 }
209
210 match bincode::deserialize(&data) {
212 Ok(snapshot) => {
213 eprintln!("[vectX] Successfully loaded snapshot ({} bytes)", data.len());
214 Ok(Some(snapshot))
215 }
216 Err(e) => {
217 eprintln!("[vectX] Warning: Snapshot data is corrupted: {}", e);
219 eprintln!("[vectX] Starting with empty database.");
220 self.backup_and_remove_corrupt_file("corrupt");
221 Ok(None)
222 }
223 }
224 }
225
226 fn backup_and_remove_corrupt_file(&self, reason: &str) {
228 let timestamp = std::time::SystemTime::now()
229 .duration_since(std::time::UNIX_EPOCH)
230 .map(|d| d.as_secs())
231 .unwrap_or(0);
232
233 let backup_name = format!("dump.{}.{}.bak", reason, timestamp);
234 let backup_path = self.rdb_filename.with_file_name(backup_name);
235
236 if let Err(e) = std::fs::rename(&self.rdb_filename, &backup_path) {
237 eprintln!("[vectX] Could not backup corrupt file: {}", e);
238 if let Err(del_err) = std::fs::remove_file(&self.rdb_filename) {
240 eprintln!("[vectX] Could not delete corrupt file: {}", del_err);
241 }
242 } else {
243 eprintln!("[vectX] Corrupt snapshot backed up to: {:?}", backup_path);
244 }
245
246 let version_file = self.rdb_filename.with_extension("version");
248 let _ = std::fs::remove_file(&version_file);
249 }
250
251 pub fn is_bgsave_in_progress() -> bool {
253 BGSAVE_IN_PROGRESS.load(Ordering::Acquire)
254 }
255
256 pub fn last_save_time() -> u64 {
258 LAST_SAVE_TIME.load(Ordering::Acquire)
259 }
260
261 pub fn save(&self, collections: &std::collections::HashMap<String, Arc<vectx_core::Collection>>) -> Result<()> {
264 let snapshot = self.create_snapshot(collections)?;
265 let temp_file = self.rdb_filename.with_extension("tmp");
266 let version_file = self.rdb_filename.with_extension("version");
267
268 let data = bincode::serialize(&snapshot)
270 .map_err(|e| anyhow::anyhow!("Serialization error: {}", e))?;
271
272 std::fs::write(&temp_file, &data)?;
274
275 std::fs::rename(&temp_file, &self.rdb_filename)?;
277
278 let version_data = format!("vectx:0.1.0:{}", data.len());
280 std::fs::write(&version_file, version_data)?;
281
282 eprintln!("[vectX] Snapshot saved ({} bytes)", data.len());
283 Ok(())
284 }
285}
286