Skip to main content

alopex_server/ops/
restore.rs

1use std::collections::HashMap;
2use std::fs;
3use std::path::{Path, PathBuf};
4use std::sync::{Arc, Mutex};
5use std::time::{SystemTime, UNIX_EPOCH};
6
7use serde::{Deserialize, Serialize};
8use tokio::task;
9use uuid::Uuid;
10
11use crate::error::{Result, ServerError};
12use crate::ops::backup::copy_dir_filtered;
13use crate::ops::state::{LifecycleStateManager, Mode, OperationState, Progress, RestoreMetadata};
14
15#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
16pub struct RestoreHandle {
17    pub id: Uuid,
18}
19
20#[derive(Debug, Clone)]
21pub struct RestoreSource {
22    pub path: PathBuf,
23}
24
25#[derive(Debug, Default)]
26struct RestoreRuntime {
27    active: Option<RestoreHandle>,
28    history: HashMap<RestoreHandle, RestoreRecord>,
29}
30
31#[derive(Debug, Clone)]
32pub struct RestoreCoordinator {
33    data_dir: PathBuf,
34    state: Arc<LifecycleStateManager>,
35    runtime: Arc<Mutex<RestoreRuntime>>,
36}
37
38impl RestoreCoordinator {
39    pub fn new(data_dir: PathBuf, state: Arc<LifecycleStateManager>) -> Self {
40        Self {
41            data_dir,
42            state,
43            runtime: Arc::new(Mutex::new(RestoreRuntime::default())),
44        }
45    }
46
47    pub async fn start_restore(&self, source: RestoreSource) -> Result<RestoreHandle> {
48        let mut runtime = self.runtime.lock().expect("restore runtime lock poisoned");
49        if runtime.active.is_some() {
50            return Err(ServerError::Conflict("restore already running".to_string()));
51        }
52
53        let handle = RestoreHandle { id: Uuid::new_v4() };
54        let mut running = OperationState::running();
55        running.set_progress(Progress::percent(0))?;
56        runtime.active = Some(handle.clone());
57        runtime.history.insert(
58            handle.clone(),
59            RestoreRecord {
60                state: running.clone(),
61                metadata: None,
62            },
63        );
64        self.state.set_restore_state(running);
65        self.state.set_mode(Mode::Maintenance);
66
67        let state = self.state.clone();
68        let data_dir = self.data_dir.clone();
69        let runtime = self.runtime.clone();
70        let handle_for_task = handle.clone();
71        task::spawn(async move {
72            let result = task::spawn_blocking(move || run_restore(&data_dir, &source.path))
73                .await
74                .map_err(|err| ServerError::Internal(err.to_string()))
75                .and_then(|res| res);
76
77            let mut runtime = runtime.lock().expect("restore runtime lock poisoned");
78            runtime.active = None;
79
80            match result {
81                Ok(metadata) => {
82                    state.set_mode(Mode::Normal);
83                    state.set_restore_metadata(Some(metadata.clone()));
84                    let completed = OperationState::completed(Some(Progress::percent(100)))
85                        .unwrap_or_else(|err| OperationState::failed(err.to_string()));
86                    if let Some(record) = runtime.history.get_mut(&handle_for_task) {
87                        record.state = completed.clone();
88                        record.metadata = Some(metadata);
89                    }
90                    state.set_restore_state(completed);
91                }
92                Err(err) => {
93                    state.set_mode(Mode::ReadOnly);
94                    let failed = OperationState::failed(err.to_string());
95                    if let Some(record) = runtime.history.get_mut(&handle_for_task) {
96                        record.state = failed.clone();
97                    }
98                    state.set_restore_state(failed);
99                }
100            }
101        });
102
103        Ok(handle)
104    }
105
106    pub fn status(&self, handle: &RestoreHandle) -> Result<OperationState> {
107        let runtime = self.runtime.lock().expect("restore runtime lock poisoned");
108        runtime
109            .history
110            .get(handle)
111            .map(|record| record.state.clone())
112            .ok_or_else(|| ServerError::NotFound("restore handle not found".to_string()))
113    }
114
115    pub fn metadata(&self, handle: &RestoreHandle) -> Result<Option<RestoreMetadata>> {
116        let runtime = self.runtime.lock().expect("restore runtime lock poisoned");
117        runtime
118            .history
119            .get(handle)
120            .map(|record| record.metadata.clone())
121            .ok_or_else(|| ServerError::NotFound("restore handle not found".to_string()))
122    }
123}
124
125fn run_restore(data_dir: &Path, source: &Path) -> Result<RestoreMetadata> {
126    validate_source(source)?;
127    let backup_dir = restore_backup_dir(data_dir);
128    fs::create_dir_all(&backup_dir)?;
129    copy_dir_filtered(data_dir, &backup_dir)?;
130    clear_data_dir(data_dir)?;
131    copy_dir_filtered(source, data_dir)?;
132    Ok(RestoreMetadata {
133        backup_id: backup_id_from_path(source),
134        location: source.display().to_string(),
135        restored_at_ms: now_ms(),
136        size_bytes: dir_size_bytes(source)?,
137    })
138}
139
140pub fn resolve_default_source(data_dir: &Path) -> Result<PathBuf> {
141    let lifecycle_root = data_dir.join(".lifecycle");
142    let backup_root = lifecycle_root.join("backup");
143    let archive_root = lifecycle_root.join("archive");
144    if let Ok(path) = read_latest_marker(&backup_root) {
145        return Ok(path);
146    }
147    read_latest_marker(&archive_root)
148}
149
150fn validate_source(source: &Path) -> Result<()> {
151    if !source.exists() {
152        return Err(ServerError::NotFound(format!(
153            "restore source does not exist: {}",
154            source.display()
155        )));
156    }
157    if !source.is_dir() {
158        return Err(ServerError::BadRequest(format!(
159            "restore source is not a directory: {}",
160            source.display()
161        )));
162    }
163    Ok(())
164}
165
166fn restore_backup_dir(data_dir: &Path) -> PathBuf {
167    data_dir
168        .join(".lifecycle")
169        .join("restore-backup")
170        .join(timestamp_dir())
171}
172
173fn timestamp_dir() -> String {
174    let seconds = SystemTime::now()
175        .duration_since(UNIX_EPOCH)
176        .unwrap_or_default()
177        .as_secs();
178    format!("ts-{seconds}")
179}
180
181fn clear_data_dir(dir: &Path) -> Result<()> {
182    for entry in fs::read_dir(dir)? {
183        let entry = entry?;
184        let name = entry.file_name();
185        if name == ".lifecycle" {
186            continue;
187        }
188        let path = entry.path();
189        if path.is_dir() {
190            fs::remove_dir_all(&path)?;
191        } else {
192            fs::remove_file(&path)?;
193        }
194    }
195    Ok(())
196}
197
198fn backup_id_from_path(source: &Path) -> String {
199    source
200        .file_name()
201        .map(|name| name.to_string_lossy().into_owned())
202        .unwrap_or_else(|| source.display().to_string())
203}
204
205fn read_latest_marker(root: &Path) -> Result<PathBuf> {
206    let marker = root.join("latest");
207    if !marker.exists() {
208        return Err(ServerError::NotFound(format!(
209            "no snapshots found in {}",
210            root.display()
211        )));
212    }
213    let path = fs::read_to_string(&marker)?;
214    let path = PathBuf::from(path.trim());
215    if !path.exists() {
216        return Err(ServerError::NotFound(format!(
217            "latest snapshot path does not exist: {}",
218            path.display()
219        )));
220    }
221    Ok(path)
222}
223
224fn dir_size_bytes(dir: &Path) -> Result<u64> {
225    let mut size = 0u64;
226    for entry in fs::read_dir(dir)? {
227        let entry = entry?;
228        if entry.file_name() == ".lifecycle" {
229            continue;
230        }
231        let path = entry.path();
232        let metadata = entry.metadata()?;
233        if metadata.is_dir() {
234            size = size.saturating_add(dir_size_bytes(&path)?);
235        } else {
236            size = size.saturating_add(metadata.len());
237        }
238    }
239    Ok(size)
240}
241
242fn now_ms() -> u64 {
243    SystemTime::now()
244        .duration_since(UNIX_EPOCH)
245        .unwrap_or_default()
246        .as_millis() as u64
247}
248
249#[derive(Debug, Clone)]
250struct RestoreRecord {
251    state: OperationState,
252    metadata: Option<RestoreMetadata>,
253}