alopex_server/ops/
restore.rs1use std::collections::HashMap;
2use std::fs;
3use std::path::{Path, PathBuf};
4use std::sync::{Arc, Mutex};
5use std::time::{SystemTime, UNIX_EPOCH};
6
7use serde::{Deserialize, Serialize};
8use tokio::task;
9use uuid::Uuid;
10
11use crate::error::{Result, ServerError};
12use crate::ops::backup::copy_dir_filtered;
13use crate::ops::state::{LifecycleStateManager, Mode, OperationState, Progress, RestoreMetadata};
14
15#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
16pub struct RestoreHandle {
17 pub id: Uuid,
18}
19
20#[derive(Debug, Clone)]
21pub struct RestoreSource {
22 pub path: PathBuf,
23}
24
25#[derive(Debug, Default)]
26struct RestoreRuntime {
27 active: Option<RestoreHandle>,
28 history: HashMap<RestoreHandle, RestoreRecord>,
29}
30
31#[derive(Debug, Clone)]
32pub struct RestoreCoordinator {
33 data_dir: PathBuf,
34 state: Arc<LifecycleStateManager>,
35 runtime: Arc<Mutex<RestoreRuntime>>,
36}
37
38impl RestoreCoordinator {
39 pub fn new(data_dir: PathBuf, state: Arc<LifecycleStateManager>) -> Self {
40 Self {
41 data_dir,
42 state,
43 runtime: Arc::new(Mutex::new(RestoreRuntime::default())),
44 }
45 }
46
47 pub async fn start_restore(&self, source: RestoreSource) -> Result<RestoreHandle> {
48 let mut runtime = self.runtime.lock().expect("restore runtime lock poisoned");
49 if runtime.active.is_some() {
50 return Err(ServerError::Conflict("restore already running".to_string()));
51 }
52
53 let handle = RestoreHandle { id: Uuid::new_v4() };
54 let mut running = OperationState::running();
55 running.set_progress(Progress::percent(0))?;
56 runtime.active = Some(handle.clone());
57 runtime.history.insert(
58 handle.clone(),
59 RestoreRecord {
60 state: running.clone(),
61 metadata: None,
62 },
63 );
64 self.state.set_restore_state(running);
65 self.state.set_mode(Mode::Maintenance);
66
67 let state = self.state.clone();
68 let data_dir = self.data_dir.clone();
69 let runtime = self.runtime.clone();
70 let handle_for_task = handle.clone();
71 task::spawn(async move {
72 let result = task::spawn_blocking(move || run_restore(&data_dir, &source.path))
73 .await
74 .map_err(|err| ServerError::Internal(err.to_string()))
75 .and_then(|res| res);
76
77 let mut runtime = runtime.lock().expect("restore runtime lock poisoned");
78 runtime.active = None;
79
80 match result {
81 Ok(metadata) => {
82 state.set_mode(Mode::Normal);
83 state.set_restore_metadata(Some(metadata.clone()));
84 let completed = OperationState::completed(Some(Progress::percent(100)))
85 .unwrap_or_else(|err| OperationState::failed(err.to_string()));
86 if let Some(record) = runtime.history.get_mut(&handle_for_task) {
87 record.state = completed.clone();
88 record.metadata = Some(metadata);
89 }
90 state.set_restore_state(completed);
91 }
92 Err(err) => {
93 state.set_mode(Mode::ReadOnly);
94 let failed = OperationState::failed(err.to_string());
95 if let Some(record) = runtime.history.get_mut(&handle_for_task) {
96 record.state = failed.clone();
97 }
98 state.set_restore_state(failed);
99 }
100 }
101 });
102
103 Ok(handle)
104 }
105
106 pub fn status(&self, handle: &RestoreHandle) -> Result<OperationState> {
107 let runtime = self.runtime.lock().expect("restore runtime lock poisoned");
108 runtime
109 .history
110 .get(handle)
111 .map(|record| record.state.clone())
112 .ok_or_else(|| ServerError::NotFound("restore handle not found".to_string()))
113 }
114
115 pub fn metadata(&self, handle: &RestoreHandle) -> Result<Option<RestoreMetadata>> {
116 let runtime = self.runtime.lock().expect("restore runtime lock poisoned");
117 runtime
118 .history
119 .get(handle)
120 .map(|record| record.metadata.clone())
121 .ok_or_else(|| ServerError::NotFound("restore handle not found".to_string()))
122 }
123}
124
125fn run_restore(data_dir: &Path, source: &Path) -> Result<RestoreMetadata> {
126 validate_source(source)?;
127 let backup_dir = restore_backup_dir(data_dir);
128 fs::create_dir_all(&backup_dir)?;
129 copy_dir_filtered(data_dir, &backup_dir)?;
130 clear_data_dir(data_dir)?;
131 copy_dir_filtered(source, data_dir)?;
132 Ok(RestoreMetadata {
133 backup_id: backup_id_from_path(source),
134 location: source.display().to_string(),
135 restored_at_ms: now_ms(),
136 size_bytes: dir_size_bytes(source)?,
137 })
138}
139
140pub fn resolve_default_source(data_dir: &Path) -> Result<PathBuf> {
141 let lifecycle_root = data_dir.join(".lifecycle");
142 let backup_root = lifecycle_root.join("backup");
143 let archive_root = lifecycle_root.join("archive");
144 if let Ok(path) = read_latest_marker(&backup_root) {
145 return Ok(path);
146 }
147 read_latest_marker(&archive_root)
148}
149
150fn validate_source(source: &Path) -> Result<()> {
151 if !source.exists() {
152 return Err(ServerError::NotFound(format!(
153 "restore source does not exist: {}",
154 source.display()
155 )));
156 }
157 if !source.is_dir() {
158 return Err(ServerError::BadRequest(format!(
159 "restore source is not a directory: {}",
160 source.display()
161 )));
162 }
163 Ok(())
164}
165
166fn restore_backup_dir(data_dir: &Path) -> PathBuf {
167 data_dir
168 .join(".lifecycle")
169 .join("restore-backup")
170 .join(timestamp_dir())
171}
172
173fn timestamp_dir() -> String {
174 let seconds = SystemTime::now()
175 .duration_since(UNIX_EPOCH)
176 .unwrap_or_default()
177 .as_secs();
178 format!("ts-{seconds}")
179}
180
181fn clear_data_dir(dir: &Path) -> Result<()> {
182 for entry in fs::read_dir(dir)? {
183 let entry = entry?;
184 let name = entry.file_name();
185 if name == ".lifecycle" {
186 continue;
187 }
188 let path = entry.path();
189 if path.is_dir() {
190 fs::remove_dir_all(&path)?;
191 } else {
192 fs::remove_file(&path)?;
193 }
194 }
195 Ok(())
196}
197
198fn backup_id_from_path(source: &Path) -> String {
199 source
200 .file_name()
201 .map(|name| name.to_string_lossy().into_owned())
202 .unwrap_or_else(|| source.display().to_string())
203}
204
205fn read_latest_marker(root: &Path) -> Result<PathBuf> {
206 let marker = root.join("latest");
207 if !marker.exists() {
208 return Err(ServerError::NotFound(format!(
209 "no snapshots found in {}",
210 root.display()
211 )));
212 }
213 let path = fs::read_to_string(&marker)?;
214 let path = PathBuf::from(path.trim());
215 if !path.exists() {
216 return Err(ServerError::NotFound(format!(
217 "latest snapshot path does not exist: {}",
218 path.display()
219 )));
220 }
221 Ok(path)
222}
223
224fn dir_size_bytes(dir: &Path) -> Result<u64> {
225 let mut size = 0u64;
226 for entry in fs::read_dir(dir)? {
227 let entry = entry?;
228 if entry.file_name() == ".lifecycle" {
229 continue;
230 }
231 let path = entry.path();
232 let metadata = entry.metadata()?;
233 if metadata.is_dir() {
234 size = size.saturating_add(dir_size_bytes(&path)?);
235 } else {
236 size = size.saturating_add(metadata.len());
237 }
238 }
239 Ok(size)
240}
241
242fn now_ms() -> u64 {
243 SystemTime::now()
244 .duration_since(UNIX_EPOCH)
245 .unwrap_or_default()
246 .as_millis() as u64
247}
248
249#[derive(Debug, Clone)]
250struct RestoreRecord {
251 state: OperationState,
252 metadata: Option<RestoreMetadata>,
253}