1use std::path::Path;
2use std::sync::Arc;
3
4use axum::extract::{Extension, Path as AxumPath};
5use axum::http::StatusCode;
6use axum::response::{IntoResponse, Response};
7use axum::Json;
8use serde::{Deserialize, Serialize};
9use uuid::Uuid;
10
11use crate::auth::AuthMode;
12use crate::http::{error_response, RequestContext};
13use crate::ops::backup::BackupHandle;
14use crate::ops::restore::{RestoreHandle, RestoreSource};
15use crate::ops::state::{OperationState, RestoreMetadata};
16use crate::ops::status::StatusReporter;
17use crate::ops::status::StatusView;
18use crate::server::ServerState;
19
20#[derive(Serialize)]
21struct AdminCapabilitiesResponse {
22 scope: &'static str,
23 allowed_actions: Vec<&'static str>,
24}
25
26#[derive(Serialize)]
27struct AdminStatusResponse {
28 version: Option<String>,
29 uptime_secs: Option<u64>,
30 connections: Option<u64>,
31 queries_per_second: Option<f64>,
32 #[serde(flatten)]
33 status: StatusView,
34}
35
36#[derive(Serialize)]
37struct AdminMetricsResponse {
38 qps: Option<f64>,
39 avg_latency_ms: Option<f64>,
40 p99_latency_ms: Option<f64>,
41 memory_usage_mb: Option<u64>,
42 active_connections: Option<u64>,
43}
44
45#[derive(Serialize)]
46struct AdminHealthResponse {
47 status: &'static str,
48 message: &'static str,
49}
50
51#[derive(Deserialize)]
52pub struct AdminLifecycleRequest {
53 action: String,
54}
55
56#[derive(Deserialize)]
57pub struct AdminRestoreRequest {
58 #[serde(default)]
59 source: Option<String>,
60}
61
62#[derive(Serialize)]
63struct AdminLifecycleResponse {
64 status: &'static str,
65 message: String,
66}
67
68#[derive(Serialize)]
69struct AdminCompactionResponse {
70 success: bool,
71 message: String,
72}
73
74#[derive(Serialize)]
75struct AdminBackupResponse {
76 handle: String,
77 location: String,
78 state: OperationState,
79}
80
81#[derive(Serialize)]
82struct AdminRestoreResponse {
83 handle: String,
84 state: OperationState,
85 metadata: Option<RestoreMetadata>,
86}
87
88pub async fn capabilities(Extension(state): Extension<Arc<ServerState>>) -> impl IntoResponse {
89 let (scope, allowed_actions) = capabilities_for_auth(&state.auth);
90 Json(AdminCapabilitiesResponse {
91 scope,
92 allowed_actions,
93 })
94}
95
96pub async fn status(Extension(state): Extension<Arc<ServerState>>) -> impl IntoResponse {
97 let uptime = state.start_time.elapsed().as_secs();
98 let reporter = StatusReporter::new(state.lifecycle_state.clone(), state.recovery_info.clone());
99 let status = reporter.status_view();
100 Json(AdminStatusResponse {
101 version: Some(env!("CARGO_PKG_VERSION").to_string()),
102 uptime_secs: Some(uptime),
103 connections: None,
104 queries_per_second: None,
105 status,
106 })
107}
108
109pub async fn metrics(Extension(_state): Extension<Arc<ServerState>>) -> impl IntoResponse {
110 Json(AdminMetricsResponse {
111 qps: None,
112 avg_latency_ms: None,
113 p99_latency_ms: None,
114 memory_usage_mb: None,
115 active_connections: None,
116 })
117}
118
119pub async fn health() -> impl IntoResponse {
120 Json(AdminHealthResponse {
121 status: "ok",
122 message: "ready",
123 })
124}
125
126pub async fn compaction() -> impl IntoResponse {
127 Json(AdminCompactionResponse {
128 success: false,
129 message: "Compaction is not available on this server build.".to_string(),
130 })
131}
132
133pub async fn start_backup(
134 Extension(state): Extension<Arc<ServerState>>,
135 Extension(ctx): Extension<RequestContext>,
136) -> Response {
137 match state.backup_coordinator.start_backup().await {
138 Ok(handle) => match backup_response(&state, &handle) {
139 Ok(response) => Json(response).into_response(),
140 Err(err) => error_response(err, &ctx),
141 },
142 Err(err) => error_response(err, &ctx),
143 }
144}
145
146pub async fn backup_status(
147 AxumPath(id): AxumPath<String>,
148 Extension(state): Extension<Arc<ServerState>>,
149 Extension(ctx): Extension<RequestContext>,
150) -> Response {
151 let handle = match parse_backup_handle(&id) {
152 Ok(handle) => handle,
153 Err(err) => return error_response(err, &ctx),
154 };
155 match backup_response(&state, &handle) {
156 Ok(response) => Json(response).into_response(),
157 Err(err) => error_response(err, &ctx),
158 }
159}
160
161pub async fn start_restore(
162 Extension(state): Extension<Arc<ServerState>>,
163 Extension(ctx): Extension<RequestContext>,
164 Json(request): Json<AdminRestoreRequest>,
165) -> Response {
166 let source_path = match request.source {
167 Some(source) => source.into(),
168 None => match crate::ops::restore::resolve_default_source(&state.config.data_dir) {
169 Ok(path) => path,
170 Err(crate::error::ServerError::NotFound(_)) => {
171 match state.backup_coordinator.latest_location() {
172 Some(path) => path,
173 None => {
174 let data_dir = state.config.data_dir.clone();
175 let archive_result = tokio::task::spawn_blocking(move || {
176 perform_lifecycle_action("archive", Path::new(&data_dir))
177 })
178 .await
179 .map_err(|err| crate::error::ServerError::Internal(err.to_string()))
180 .and_then(|res| res.map_err(crate::error::ServerError::BadRequest));
181 if let Err(err) = archive_result {
182 return error_response(err, &ctx);
183 }
184 match crate::ops::restore::resolve_default_source(&state.config.data_dir) {
185 Ok(path) => path,
186 Err(err) => return error_response(err, &ctx),
187 }
188 }
189 }
190 }
191 Err(err) => return error_response(err, &ctx),
192 },
193 };
194 let source = RestoreSource { path: source_path };
195 match state.restore_coordinator.start_restore(source).await {
196 Ok(handle) => match restore_response(&state, &handle) {
197 Ok(response) => Json(response).into_response(),
198 Err(err) => error_response(err, &ctx),
199 },
200 Err(err) => error_response(err, &ctx),
201 }
202}
203
204pub async fn restore_status(
205 AxumPath(id): AxumPath<String>,
206 Extension(state): Extension<Arc<ServerState>>,
207 Extension(ctx): Extension<RequestContext>,
208) -> Response {
209 let handle = match parse_restore_handle(&id) {
210 Ok(handle) => handle,
211 Err(err) => return error_response(err, &ctx),
212 };
213 match restore_response(&state, &handle) {
214 Ok(response) => Json(response).into_response(),
215 Err(err) => error_response(err, &ctx),
216 }
217}
218
219pub async fn lifecycle(
220 Extension(state): Extension<Arc<ServerState>>,
221 Json(request): Json<AdminLifecycleRequest>,
222) -> impl IntoResponse {
223 let data_dir = state.config.data_dir.clone();
224 let action = request.action;
225 let result = tokio::task::spawn_blocking(move || {
226 perform_lifecycle_action(action.as_str(), Path::new(&data_dir))
227 })
228 .await
229 .map_err(|err| err.to_string())
230 .and_then(|res| res.map_err(|err| err.to_string()));
231
232 match result {
233 Ok(message) => (
234 StatusCode::OK,
235 Json(AdminLifecycleResponse {
236 status: "OK",
237 message,
238 }),
239 )
240 .into_response(),
241 Err(err) => (
242 StatusCode::BAD_REQUEST,
243 Json(AdminLifecycleResponse {
244 status: "Error",
245 message: err,
246 }),
247 )
248 .into_response(),
249 }
250}
251
252fn parse_backup_handle(id: &str) -> crate::error::Result<BackupHandle> {
253 let id = Uuid::parse_str(id)
254 .map_err(|_| crate::error::ServerError::BadRequest("invalid backup handle".into()))?;
255 Ok(BackupHandle { id })
256}
257
258fn parse_restore_handle(id: &str) -> crate::error::Result<RestoreHandle> {
259 let id = Uuid::parse_str(id)
260 .map_err(|_| crate::error::ServerError::BadRequest("invalid restore handle".into()))?;
261 Ok(RestoreHandle { id })
262}
263
264fn backup_response(
265 state: &ServerState,
266 handle: &BackupHandle,
267) -> crate::error::Result<AdminBackupResponse> {
268 let location = state.backup_coordinator.location(handle)?;
269 let status = state.backup_coordinator.status(handle)?;
270 Ok(AdminBackupResponse {
271 handle: handle.id.to_string(),
272 location: location.display().to_string(),
273 state: status,
274 })
275}
276
277fn restore_response(
278 state: &ServerState,
279 handle: &RestoreHandle,
280) -> crate::error::Result<AdminRestoreResponse> {
281 let status = state.restore_coordinator.status(handle)?;
282 let metadata = state.restore_coordinator.metadata(handle)?;
283 Ok(AdminRestoreResponse {
284 handle: handle.id.to_string(),
285 state: status,
286 metadata,
287 })
288}
289
290fn capabilities_for_auth(auth: &crate::auth::AuthMiddleware) -> (&'static str, Vec<&'static str>) {
291 match auth.mode() {
292 AuthMode::None => ("full", Vec::new()),
293 AuthMode::Dev { .. } => ("restricted", all_actions()),
294 }
295}
296
297fn all_actions() -> Vec<&'static str> {
298 vec![
299 "read", "create", "update", "delete", "archive", "restore", "backup", "export",
300 ]
301}
302
303fn perform_lifecycle_action(action: &str, data_dir: &Path) -> Result<String, String> {
304 if !data_dir.exists() {
305 return Err(format!(
306 "Data directory does not exist: {}",
307 data_dir.display()
308 ));
309 }
310 if !data_dir.is_dir() {
311 return Err(format!(
312 "Data directory is not a directory: {}",
313 data_dir.display()
314 ));
315 }
316
317 let lifecycle_root = data_dir.join(".lifecycle");
318 std::fs::create_dir_all(&lifecycle_root).map_err(|err| err.to_string())?;
319
320 match action {
321 "archive" => {
322 let dest = lifecycle_root.join("archive").join(timestamp_dir());
323 copy_data_dir(data_dir, &dest)?;
324 write_latest_marker(&lifecycle_root.join("archive"), &dest)?;
325 Ok(format!("Archived data to {}", dest.display()))
326 }
327 "export" => {
328 let dest = lifecycle_root.join("export").join(timestamp_dir());
329 copy_data_dir(data_dir, &dest)?;
330 write_latest_marker(&lifecycle_root.join("export"), &dest)?;
331 Ok(format!("Exported data to {}", dest.display()))
332 }
333 _ => Err("Unknown lifecycle action.".to_string()),
334 }
335}
336
337fn timestamp_dir() -> String {
338 let seconds = std::time::SystemTime::now()
339 .duration_since(std::time::UNIX_EPOCH)
340 .unwrap_or_default()
341 .as_secs();
342 format!("ts-{seconds}")
343}
344
345fn copy_data_dir(src: &Path, dest: &Path) -> Result<(), String> {
346 std::fs::create_dir_all(dest).map_err(|err| err.to_string())?;
347 copy_dir_filtered(src, dest)
348}
349
350fn copy_dir_filtered(src: &Path, dest: &Path) -> Result<(), String> {
351 for entry in std::fs::read_dir(src).map_err(|err| err.to_string())? {
352 let entry = entry.map_err(|err| err.to_string())?;
353 let file_type = entry.file_type().map_err(|err| err.to_string())?;
354 let name = entry.file_name();
355 if name == ".lifecycle" {
356 continue;
357 }
358 let dest_path = dest.join(name);
359 if file_type.is_dir() {
360 copy_data_dir(&entry.path(), &dest_path)?;
361 } else {
362 std::fs::copy(entry.path(), &dest_path).map_err(|err| err.to_string())?;
363 }
364 }
365 Ok(())
366}
367
368fn write_latest_marker(root: &Path, dest: &Path) -> Result<(), String> {
369 let marker = root.join("latest");
370 std::fs::create_dir_all(root).map_err(|err| err.to_string())?;
371 std::fs::write(&marker, dest.to_string_lossy().as_bytes()).map_err(|err| err.to_string())?;
372 Ok(())
373}