1use std::collections::HashMap;
2use std::fs;
3use std::path::{Path, PathBuf};
4use std::sync::{Arc, Mutex};
5use std::time::{SystemTime, UNIX_EPOCH};
6
7use alopex_core::lsm::checkpoint::load_checkpoint_meta;
8use alopex_core::lsm::sstable::SSTableReader;
9use alopex_core::lsm::wal::WalReader;
10use alopex_core::lsm::LsmKVConfig;
11use crc32fast::Hasher;
12use serde::{Deserialize, Serialize};
13use tokio::task;
14use uuid::Uuid;
15
16use crate::error::{Result, ServerError};
17use crate::ops::state::{LifecycleStateManager, OperationState, Progress};
18
19const SNAPSHOT_MANIFEST_NAME: &str = "snapshot.manifest";
20const SNAPSHOT_MANIFEST_VERSION: u32 = 1;
21
22#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
23pub struct BackupHandle {
24 pub id: Uuid,
25}
26
27#[derive(Debug, Clone)]
28pub struct BackupMetadata {
29 pub handle: BackupHandle,
30 pub location: PathBuf,
31}
32
33#[derive(Debug, Clone)]
34struct BackupRecord {
35 metadata: BackupMetadata,
36 state: OperationState,
37}
38
39#[derive(Debug, Default)]
40struct BackupRuntime {
41 active: Option<BackupHandle>,
42 history: HashMap<BackupHandle, BackupRecord>,
43 last_location: Option<PathBuf>,
44}
45
46#[derive(Debug, Clone, Serialize, Deserialize)]
47struct SnapshotManifest {
48 version: u32,
49 entries: Vec<SnapshotEntry>,
50}
51
52#[derive(Debug, Clone, Serialize, Deserialize)]
53struct SnapshotEntry {
54 path: String,
55 size: u64,
56 crc32: u32,
57}
58
59#[derive(Clone)]
60pub struct BackupCoordinator {
61 data_dir: PathBuf,
62 state: Arc<LifecycleStateManager>,
63 checkpoint: Arc<dyn Fn() -> Result<()> + Send + Sync>,
64 runtime: Arc<Mutex<BackupRuntime>>,
65}
66
67impl BackupCoordinator {
68 pub fn new(
69 data_dir: PathBuf,
70 state: Arc<LifecycleStateManager>,
71 checkpoint: Arc<dyn Fn() -> Result<()> + Send + Sync>,
72 ) -> Self {
73 Self {
74 data_dir,
75 state,
76 checkpoint,
77 runtime: Arc::new(Mutex::new(BackupRuntime::default())),
78 }
79 }
80
81 pub async fn start_backup(&self) -> Result<BackupHandle> {
82 let mut runtime = self.runtime.lock().expect("backup runtime lock poisoned");
83 if runtime.active.is_some() {
84 return Err(ServerError::Conflict("backup already running".to_string()));
85 }
86
87 let handle = BackupHandle { id: Uuid::new_v4() };
88 let dest = backup_destination(&self.data_dir);
89 fs::create_dir_all(&dest)?;
90 let metadata = BackupMetadata {
91 handle: handle.clone(),
92 location: dest.clone(),
93 };
94 let mut running = OperationState::running();
95 running.set_progress(Progress::percent(0))?;
96 runtime.active = Some(handle.clone());
97 runtime.last_location = Some(dest.clone());
98 runtime.history.insert(
99 handle.clone(),
100 BackupRecord {
101 metadata: metadata.clone(),
102 state: running.clone(),
103 },
104 );
105 self.state.set_backup_state(running);
106
107 let state = self.state.clone();
108 let data_dir = self.data_dir.clone();
109 let runtime = self.runtime.clone();
110 let checkpoint = self.checkpoint.clone();
111 let handle_for_task = handle.clone();
112 task::spawn(async move {
113 let result = task::spawn_blocking(move || run_backup(&data_dir, &dest, checkpoint))
114 .await
115 .map_err(|err| ServerError::Internal(err.to_string()))
116 .and_then(|res| res);
117
118 let mut runtime = runtime.lock().expect("backup runtime lock poisoned");
119 runtime.active = None;
120
121 match result {
122 Ok(()) => {
123 let completed = OperationState::completed(Some(Progress::percent(100)))
124 .unwrap_or_else(|err| OperationState::failed(err.to_string()));
125 if let Some(record) = runtime.history.get_mut(&handle_for_task) {
126 record.state = completed.clone();
127 }
128 state.set_backup_state(completed);
129 }
130 Err(err) => {
131 let failed = OperationState::failed(err.to_string());
132 if let Some(record) = runtime.history.get_mut(&handle_for_task) {
133 record.state = failed.clone();
134 }
135 state.set_backup_state(failed);
136 }
137 }
138 });
139
140 Ok(handle)
141 }
142
143 pub fn status(&self, handle: &BackupHandle) -> Result<OperationState> {
144 let runtime = self.runtime.lock().expect("backup runtime lock poisoned");
145 runtime
146 .history
147 .get(handle)
148 .map(|record| record.state.clone())
149 .ok_or_else(|| ServerError::NotFound("backup handle not found".to_string()))
150 }
151
152 pub fn location(&self, handle: &BackupHandle) -> Result<PathBuf> {
153 let runtime = self.runtime.lock().expect("backup runtime lock poisoned");
154 runtime
155 .history
156 .get(handle)
157 .map(|record| record.metadata.location.clone())
158 .ok_or_else(|| ServerError::NotFound("backup handle not found".to_string()))
159 }
160
161 pub fn latest_location(&self) -> Option<PathBuf> {
162 let runtime = self.runtime.lock().expect("backup runtime lock poisoned");
163 runtime.last_location.clone()
164 }
165}
166
167fn run_backup(
168 data_dir: &Path,
169 dest: &Path,
170 checkpoint: Arc<dyn Fn() -> Result<()> + Send + Sync>,
171) -> Result<()> {
172 if !data_dir.exists() {
173 return Err(ServerError::NotFound(format!(
174 "data directory does not exist: {}",
175 data_dir.display()
176 )));
177 }
178 if !data_dir.is_dir() {
179 return Err(ServerError::BadRequest(format!(
180 "data directory is not a directory: {}",
181 data_dir.display()
182 )));
183 }
184
185 checkpoint().map_err(|err| ServerError::Internal(format!("checkpoint failed: {err}")))?;
186 fs::create_dir_all(dest)?;
187 let manifest = build_snapshot_manifest(data_dir)?;
188 copy_dir_filtered(data_dir, dest)?;
189 write_snapshot_manifest(dest, &manifest)?;
190 verify_snapshot(dest)?;
191 write_latest_marker(&backup_root(data_dir), dest)?;
192 Ok(())
193}
194
195fn backup_destination(data_dir: &Path) -> PathBuf {
196 backup_root(data_dir).join(timestamp_dir())
197}
198
199fn backup_root(data_dir: &Path) -> PathBuf {
200 data_dir.join(".lifecycle").join("backup")
201}
202
203fn timestamp_dir() -> String {
204 let seconds = SystemTime::now()
205 .duration_since(UNIX_EPOCH)
206 .unwrap_or_default()
207 .as_secs();
208 format!("ts-{seconds}")
209}
210
211fn write_latest_marker(root: &Path, latest: &Path) -> Result<()> {
212 fs::create_dir_all(root)?;
213 let marker = root.join("latest");
214 fs::write(marker, latest.display().to_string().as_bytes())?;
215 Ok(())
216}
217
218pub(crate) fn copy_dir_filtered(src: &Path, dest: &Path) -> Result<()> {
219 for entry in fs::read_dir(src)? {
220 let entry = entry?;
221 let file_type = entry.file_type()?;
222 let name = entry.file_name();
223 if name == ".lifecycle" {
224 continue;
225 }
226 let dest_path = dest.join(name);
227 if file_type.is_dir() {
228 fs::create_dir_all(&dest_path)?;
229 copy_dir_filtered(&entry.path(), &dest_path)?;
230 } else {
231 fs::copy(entry.path(), &dest_path)?;
232 }
233 }
234 Ok(())
235}
236
237fn verify_snapshot(dest: &Path) -> Result<()> {
238 let manifest = read_snapshot_manifest(dest)?;
239 validate_manifest(dest, &manifest)?;
240
241 let checkpoint_path = dest.join("checkpoint.meta");
242 let meta = load_checkpoint_meta(&checkpoint_path)?;
243 if meta.is_none() {
244 return Err(ServerError::Internal(
245 "checkpoint metadata missing or corrupted".to_string(),
246 ));
247 }
248 let wal_path = dest.join("lsm.wal");
249 if !wal_path.exists() {
250 return Err(ServerError::Internal(
251 "snapshot missing lsm.wal".to_string(),
252 ));
253 }
254 let sst_dir = dest.join("sst");
255 if !sst_dir.exists() {
256 return Err(ServerError::Internal(
257 "snapshot missing sst directory".to_string(),
258 ));
259 }
260
261 let wal_config = LsmKVConfig::default().wal;
262 let mut reader = WalReader::open(&wal_path, wal_config)?;
263 let _ = reader.replay()?;
264
265 for entry in fs::read_dir(&sst_dir)? {
266 let entry = entry?;
267 let path = entry.path();
268 if path.is_file()
269 && path
270 .extension()
271 .and_then(|ext| ext.to_str())
272 .is_some_and(|ext| ext.eq_ignore_ascii_case("sst"))
273 {
274 let _ = SSTableReader::open(&path)?;
275 }
276 }
277 Ok(())
278}
279
280fn build_snapshot_manifest(source: &Path) -> Result<SnapshotManifest> {
281 let mut entries = Vec::new();
282 collect_manifest_entries(source, source, &mut entries, true)?;
283 Ok(SnapshotManifest {
284 version: SNAPSHOT_MANIFEST_VERSION,
285 entries,
286 })
287}
288
289fn write_snapshot_manifest(dest: &Path, manifest: &SnapshotManifest) -> Result<()> {
290 let manifest_path = dest.join(SNAPSHOT_MANIFEST_NAME);
291 let payload = serde_json::to_vec_pretty(&manifest)
292 .map_err(|err| ServerError::Internal(format!("manifest encode failed: {err}")))?;
293 fs::write(&manifest_path, payload)?;
294 Ok(())
295}
296
297fn read_snapshot_manifest(dest: &Path) -> Result<SnapshotManifest> {
298 let manifest_path = dest.join(SNAPSHOT_MANIFEST_NAME);
299 let payload = fs::read(&manifest_path)?;
300 let manifest: SnapshotManifest = serde_json::from_slice(&payload)
301 .map_err(|err| ServerError::Internal(format!("manifest decode failed: {err}")))?;
302 if manifest.version != SNAPSHOT_MANIFEST_VERSION {
303 return Err(ServerError::Internal(format!(
304 "unsupported manifest version: {}",
305 manifest.version
306 )));
307 }
308 Ok(manifest)
309}
310
311fn validate_manifest(dest: &Path, manifest: &SnapshotManifest) -> Result<()> {
312 for entry in &manifest.entries {
313 let path = dest.join(&entry.path);
314 let metadata = path.metadata().map_err(|err| {
315 ServerError::Internal(format!("snapshot entry missing {}: {err}", entry.path))
316 })?;
317 if metadata.len() != entry.size {
318 return Err(ServerError::Internal(format!(
319 "snapshot entry size mismatch {}",
320 entry.path
321 )));
322 }
323 let crc = crc32_file(&path)?;
324 if crc != entry.crc32 {
325 return Err(ServerError::Internal(format!(
326 "snapshot entry crc mismatch {}",
327 entry.path
328 )));
329 }
330 }
331 Ok(())
332}
333
334fn collect_manifest_entries(
335 root: &Path,
336 current: &Path,
337 entries: &mut Vec<SnapshotEntry>,
338 skip_lifecycle: bool,
339) -> Result<()> {
340 for entry in fs::read_dir(current)? {
341 let entry = entry?;
342 let path = entry.path();
343 let name = entry.file_name();
344 if skip_lifecycle && name == ".lifecycle" {
345 continue;
346 }
347 if name == SNAPSHOT_MANIFEST_NAME {
348 continue;
349 }
350 let metadata = entry.metadata()?;
351 if metadata.is_dir() {
352 collect_manifest_entries(root, &path, entries, skip_lifecycle)?;
353 } else if metadata.is_file() {
354 let relative = path
355 .strip_prefix(root)
356 .map_err(|err| ServerError::Internal(format!("manifest path error: {err}")))?;
357 let crc32 = crc32_file(&path)?;
358 entries.push(SnapshotEntry {
359 path: relative.to_string_lossy().replace('\\', "/"),
360 size: metadata.len(),
361 crc32,
362 });
363 }
364 }
365 Ok(())
366}
367
368fn crc32_file(path: &Path) -> Result<u32> {
369 let mut file = fs::File::open(path)?;
370 let mut buf = [0u8; 8192];
371 let mut hasher = Hasher::new();
372 loop {
373 let read = std::io::Read::read(&mut file, &mut buf)?;
374 if read == 0 {
375 break;
376 }
377 hasher.update(&buf[..read]);
378 }
379 Ok(hasher.finalize())
380}