Skip to main content

alopex_server/ops/
backup.rs

1use std::collections::HashMap;
2use std::fs;
3use std::path::{Path, PathBuf};
4use std::sync::{Arc, Mutex};
5use std::time::{SystemTime, UNIX_EPOCH};
6
7use alopex_core::lsm::checkpoint::load_checkpoint_meta;
8use alopex_core::lsm::sstable::SSTableReader;
9use alopex_core::lsm::wal::WalReader;
10use alopex_core::lsm::LsmKVConfig;
11use crc32fast::Hasher;
12use serde::{Deserialize, Serialize};
13use tokio::task;
14use uuid::Uuid;
15
16use crate::error::{Result, ServerError};
17use crate::ops::state::{LifecycleStateManager, OperationState, Progress};
18
19const SNAPSHOT_MANIFEST_NAME: &str = "snapshot.manifest";
20const SNAPSHOT_MANIFEST_VERSION: u32 = 1;
21
22#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
23pub struct BackupHandle {
24    pub id: Uuid,
25}
26
27#[derive(Debug, Clone)]
28pub struct BackupMetadata {
29    pub handle: BackupHandle,
30    pub location: PathBuf,
31}
32
33#[derive(Debug, Clone)]
34struct BackupRecord {
35    metadata: BackupMetadata,
36    state: OperationState,
37}
38
39#[derive(Debug, Default)]
40struct BackupRuntime {
41    active: Option<BackupHandle>,
42    history: HashMap<BackupHandle, BackupRecord>,
43    last_location: Option<PathBuf>,
44}
45
46#[derive(Debug, Clone, Serialize, Deserialize)]
47struct SnapshotManifest {
48    version: u32,
49    entries: Vec<SnapshotEntry>,
50}
51
52#[derive(Debug, Clone, Serialize, Deserialize)]
53struct SnapshotEntry {
54    path: String,
55    size: u64,
56    crc32: u32,
57}
58
59#[derive(Clone)]
60pub struct BackupCoordinator {
61    data_dir: PathBuf,
62    state: Arc<LifecycleStateManager>,
63    checkpoint: Arc<dyn Fn() -> Result<()> + Send + Sync>,
64    runtime: Arc<Mutex<BackupRuntime>>,
65}
66
67impl BackupCoordinator {
68    pub fn new(
69        data_dir: PathBuf,
70        state: Arc<LifecycleStateManager>,
71        checkpoint: Arc<dyn Fn() -> Result<()> + Send + Sync>,
72    ) -> Self {
73        Self {
74            data_dir,
75            state,
76            checkpoint,
77            runtime: Arc::new(Mutex::new(BackupRuntime::default())),
78        }
79    }
80
81    pub async fn start_backup(&self) -> Result<BackupHandle> {
82        let mut runtime = self.runtime.lock().expect("backup runtime lock poisoned");
83        if runtime.active.is_some() {
84            return Err(ServerError::Conflict("backup already running".to_string()));
85        }
86
87        let handle = BackupHandle { id: Uuid::new_v4() };
88        let dest = backup_destination(&self.data_dir);
89        fs::create_dir_all(&dest)?;
90        let metadata = BackupMetadata {
91            handle: handle.clone(),
92            location: dest.clone(),
93        };
94        let mut running = OperationState::running();
95        running.set_progress(Progress::percent(0))?;
96        runtime.active = Some(handle.clone());
97        runtime.last_location = Some(dest.clone());
98        runtime.history.insert(
99            handle.clone(),
100            BackupRecord {
101                metadata: metadata.clone(),
102                state: running.clone(),
103            },
104        );
105        self.state.set_backup_state(running);
106
107        let state = self.state.clone();
108        let data_dir = self.data_dir.clone();
109        let runtime = self.runtime.clone();
110        let checkpoint = self.checkpoint.clone();
111        let handle_for_task = handle.clone();
112        task::spawn(async move {
113            let result = task::spawn_blocking(move || run_backup(&data_dir, &dest, checkpoint))
114                .await
115                .map_err(|err| ServerError::Internal(err.to_string()))
116                .and_then(|res| res);
117
118            let mut runtime = runtime.lock().expect("backup runtime lock poisoned");
119            runtime.active = None;
120
121            match result {
122                Ok(()) => {
123                    let completed = OperationState::completed(Some(Progress::percent(100)))
124                        .unwrap_or_else(|err| OperationState::failed(err.to_string()));
125                    if let Some(record) = runtime.history.get_mut(&handle_for_task) {
126                        record.state = completed.clone();
127                    }
128                    state.set_backup_state(completed);
129                }
130                Err(err) => {
131                    let failed = OperationState::failed(err.to_string());
132                    if let Some(record) = runtime.history.get_mut(&handle_for_task) {
133                        record.state = failed.clone();
134                    }
135                    state.set_backup_state(failed);
136                }
137            }
138        });
139
140        Ok(handle)
141    }
142
143    pub fn status(&self, handle: &BackupHandle) -> Result<OperationState> {
144        let runtime = self.runtime.lock().expect("backup runtime lock poisoned");
145        runtime
146            .history
147            .get(handle)
148            .map(|record| record.state.clone())
149            .ok_or_else(|| ServerError::NotFound("backup handle not found".to_string()))
150    }
151
152    pub fn location(&self, handle: &BackupHandle) -> Result<PathBuf> {
153        let runtime = self.runtime.lock().expect("backup runtime lock poisoned");
154        runtime
155            .history
156            .get(handle)
157            .map(|record| record.metadata.location.clone())
158            .ok_or_else(|| ServerError::NotFound("backup handle not found".to_string()))
159    }
160
161    pub fn latest_location(&self) -> Option<PathBuf> {
162        let runtime = self.runtime.lock().expect("backup runtime lock poisoned");
163        runtime.last_location.clone()
164    }
165}
166
167fn run_backup(
168    data_dir: &Path,
169    dest: &Path,
170    checkpoint: Arc<dyn Fn() -> Result<()> + Send + Sync>,
171) -> Result<()> {
172    if !data_dir.exists() {
173        return Err(ServerError::NotFound(format!(
174            "data directory does not exist: {}",
175            data_dir.display()
176        )));
177    }
178    if !data_dir.is_dir() {
179        return Err(ServerError::BadRequest(format!(
180            "data directory is not a directory: {}",
181            data_dir.display()
182        )));
183    }
184
185    checkpoint().map_err(|err| ServerError::Internal(format!("checkpoint failed: {err}")))?;
186    fs::create_dir_all(dest)?;
187    let manifest = build_snapshot_manifest(data_dir)?;
188    copy_dir_filtered(data_dir, dest)?;
189    write_snapshot_manifest(dest, &manifest)?;
190    verify_snapshot(dest)?;
191    write_latest_marker(&backup_root(data_dir), dest)?;
192    Ok(())
193}
194
195fn backup_destination(data_dir: &Path) -> PathBuf {
196    backup_root(data_dir).join(timestamp_dir())
197}
198
199fn backup_root(data_dir: &Path) -> PathBuf {
200    data_dir.join(".lifecycle").join("backup")
201}
202
203fn timestamp_dir() -> String {
204    let seconds = SystemTime::now()
205        .duration_since(UNIX_EPOCH)
206        .unwrap_or_default()
207        .as_secs();
208    format!("ts-{seconds}")
209}
210
211fn write_latest_marker(root: &Path, latest: &Path) -> Result<()> {
212    fs::create_dir_all(root)?;
213    let marker = root.join("latest");
214    fs::write(marker, latest.display().to_string().as_bytes())?;
215    Ok(())
216}
217
218pub(crate) fn copy_dir_filtered(src: &Path, dest: &Path) -> Result<()> {
219    for entry in fs::read_dir(src)? {
220        let entry = entry?;
221        let file_type = entry.file_type()?;
222        let name = entry.file_name();
223        if name == ".lifecycle" {
224            continue;
225        }
226        let dest_path = dest.join(name);
227        if file_type.is_dir() {
228            fs::create_dir_all(&dest_path)?;
229            copy_dir_filtered(&entry.path(), &dest_path)?;
230        } else {
231            fs::copy(entry.path(), &dest_path)?;
232        }
233    }
234    Ok(())
235}
236
237fn verify_snapshot(dest: &Path) -> Result<()> {
238    let manifest = read_snapshot_manifest(dest)?;
239    validate_manifest(dest, &manifest)?;
240
241    let checkpoint_path = dest.join("checkpoint.meta");
242    let meta = load_checkpoint_meta(&checkpoint_path)?;
243    if meta.is_none() {
244        return Err(ServerError::Internal(
245            "checkpoint metadata missing or corrupted".to_string(),
246        ));
247    }
248    let wal_path = dest.join("lsm.wal");
249    if !wal_path.exists() {
250        return Err(ServerError::Internal(
251            "snapshot missing lsm.wal".to_string(),
252        ));
253    }
254    let sst_dir = dest.join("sst");
255    if !sst_dir.exists() {
256        return Err(ServerError::Internal(
257            "snapshot missing sst directory".to_string(),
258        ));
259    }
260
261    let wal_config = LsmKVConfig::default().wal;
262    let mut reader = WalReader::open(&wal_path, wal_config)?;
263    let _ = reader.replay()?;
264
265    for entry in fs::read_dir(&sst_dir)? {
266        let entry = entry?;
267        let path = entry.path();
268        if path.is_file()
269            && path
270                .extension()
271                .and_then(|ext| ext.to_str())
272                .is_some_and(|ext| ext.eq_ignore_ascii_case("sst"))
273        {
274            let _ = SSTableReader::open(&path)?;
275        }
276    }
277    Ok(())
278}
279
280fn build_snapshot_manifest(source: &Path) -> Result<SnapshotManifest> {
281    let mut entries = Vec::new();
282    collect_manifest_entries(source, source, &mut entries, true)?;
283    Ok(SnapshotManifest {
284        version: SNAPSHOT_MANIFEST_VERSION,
285        entries,
286    })
287}
288
289fn write_snapshot_manifest(dest: &Path, manifest: &SnapshotManifest) -> Result<()> {
290    let manifest_path = dest.join(SNAPSHOT_MANIFEST_NAME);
291    let payload = serde_json::to_vec_pretty(&manifest)
292        .map_err(|err| ServerError::Internal(format!("manifest encode failed: {err}")))?;
293    fs::write(&manifest_path, payload)?;
294    Ok(())
295}
296
297fn read_snapshot_manifest(dest: &Path) -> Result<SnapshotManifest> {
298    let manifest_path = dest.join(SNAPSHOT_MANIFEST_NAME);
299    let payload = fs::read(&manifest_path)?;
300    let manifest: SnapshotManifest = serde_json::from_slice(&payload)
301        .map_err(|err| ServerError::Internal(format!("manifest decode failed: {err}")))?;
302    if manifest.version != SNAPSHOT_MANIFEST_VERSION {
303        return Err(ServerError::Internal(format!(
304            "unsupported manifest version: {}",
305            manifest.version
306        )));
307    }
308    Ok(manifest)
309}
310
311fn validate_manifest(dest: &Path, manifest: &SnapshotManifest) -> Result<()> {
312    for entry in &manifest.entries {
313        let path = dest.join(&entry.path);
314        let metadata = path.metadata().map_err(|err| {
315            ServerError::Internal(format!("snapshot entry missing {}: {err}", entry.path))
316        })?;
317        if metadata.len() != entry.size {
318            return Err(ServerError::Internal(format!(
319                "snapshot entry size mismatch {}",
320                entry.path
321            )));
322        }
323        let crc = crc32_file(&path)?;
324        if crc != entry.crc32 {
325            return Err(ServerError::Internal(format!(
326                "snapshot entry crc mismatch {}",
327                entry.path
328            )));
329        }
330    }
331    Ok(())
332}
333
334fn collect_manifest_entries(
335    root: &Path,
336    current: &Path,
337    entries: &mut Vec<SnapshotEntry>,
338    skip_lifecycle: bool,
339) -> Result<()> {
340    for entry in fs::read_dir(current)? {
341        let entry = entry?;
342        let path = entry.path();
343        let name = entry.file_name();
344        if skip_lifecycle && name == ".lifecycle" {
345            continue;
346        }
347        if name == SNAPSHOT_MANIFEST_NAME {
348            continue;
349        }
350        let metadata = entry.metadata()?;
351        if metadata.is_dir() {
352            collect_manifest_entries(root, &path, entries, skip_lifecycle)?;
353        } else if metadata.is_file() {
354            let relative = path
355                .strip_prefix(root)
356                .map_err(|err| ServerError::Internal(format!("manifest path error: {err}")))?;
357            let crc32 = crc32_file(&path)?;
358            entries.push(SnapshotEntry {
359                path: relative.to_string_lossy().replace('\\', "/"),
360                size: metadata.len(),
361                crc32,
362            });
363        }
364    }
365    Ok(())
366}
367
368fn crc32_file(path: &Path) -> Result<u32> {
369    let mut file = fs::File::open(path)?;
370    let mut buf = [0u8; 8192];
371    let mut hasher = Hasher::new();
372    loop {
373        let read = std::io::Read::read(&mut file, &mut buf)?;
374        if read == 0 {
375            break;
376        }
377        hasher.update(&buf[..read]);
378    }
379    Ok(hasher.finalize())
380}