Skip to main content

alopex_server/http/
admin_api.rs

1use std::path::Path;
2use std::sync::Arc;
3
4use axum::extract::{Extension, Path as AxumPath};
5use axum::http::StatusCode;
6use axum::response::{IntoResponse, Response};
7use axum::Json;
8use serde::{Deserialize, Serialize};
9use uuid::Uuid;
10
11use crate::auth::AuthMode;
12use crate::http::{error_response, RequestContext};
13use crate::ops::backup::BackupHandle;
14use crate::ops::restore::{RestoreHandle, RestoreSource};
15use crate::ops::state::{OperationState, RestoreMetadata};
16use crate::ops::status::StatusReporter;
17use crate::ops::status::StatusView;
18use crate::server::ServerState;
19
20#[derive(Serialize)]
21struct AdminCapabilitiesResponse {
22    scope: &'static str,
23    allowed_actions: Vec<&'static str>,
24}
25
26#[derive(Serialize)]
27struct AdminStatusResponse {
28    version: Option<String>,
29    uptime_secs: Option<u64>,
30    connections: Option<u64>,
31    queries_per_second: Option<f64>,
32    #[serde(flatten)]
33    status: StatusView,
34}
35
36#[derive(Serialize)]
37struct AdminMetricsResponse {
38    qps: Option<f64>,
39    avg_latency_ms: Option<f64>,
40    p99_latency_ms: Option<f64>,
41    memory_usage_mb: Option<u64>,
42    active_connections: Option<u64>,
43}
44
45#[derive(Serialize)]
46struct AdminHealthResponse {
47    status: &'static str,
48    message: &'static str,
49}
50
51#[derive(Deserialize)]
52pub struct AdminLifecycleRequest {
53    action: String,
54}
55
56#[derive(Deserialize)]
57pub struct AdminRestoreRequest {
58    #[serde(default)]
59    source: Option<String>,
60}
61
62#[derive(Serialize)]
63struct AdminLifecycleResponse {
64    status: &'static str,
65    message: String,
66}
67
68#[derive(Serialize)]
69struct AdminCompactionResponse {
70    success: bool,
71    message: String,
72}
73
74#[derive(Serialize)]
75struct AdminBackupResponse {
76    handle: String,
77    location: String,
78    state: OperationState,
79}
80
81#[derive(Serialize)]
82struct AdminRestoreResponse {
83    handle: String,
84    state: OperationState,
85    metadata: Option<RestoreMetadata>,
86}
87
88pub async fn capabilities(Extension(state): Extension<Arc<ServerState>>) -> impl IntoResponse {
89    let (scope, allowed_actions) = capabilities_for_auth(&state.auth);
90    Json(AdminCapabilitiesResponse {
91        scope,
92        allowed_actions,
93    })
94}
95
96pub async fn status(Extension(state): Extension<Arc<ServerState>>) -> impl IntoResponse {
97    let uptime = state.start_time.elapsed().as_secs();
98    let reporter = StatusReporter::new(state.lifecycle_state.clone(), state.recovery_info.clone());
99    let status = reporter.status_view();
100    Json(AdminStatusResponse {
101        version: Some(env!("CARGO_PKG_VERSION").to_string()),
102        uptime_secs: Some(uptime),
103        connections: None,
104        queries_per_second: None,
105        status,
106    })
107}
108
109pub async fn metrics(Extension(_state): Extension<Arc<ServerState>>) -> impl IntoResponse {
110    Json(AdminMetricsResponse {
111        qps: None,
112        avg_latency_ms: None,
113        p99_latency_ms: None,
114        memory_usage_mb: None,
115        active_connections: None,
116    })
117}
118
119pub async fn health() -> impl IntoResponse {
120    Json(AdminHealthResponse {
121        status: "ok",
122        message: "ready",
123    })
124}
125
126pub async fn compaction() -> impl IntoResponse {
127    Json(AdminCompactionResponse {
128        success: false,
129        message: "Compaction is not available on this server build.".to_string(),
130    })
131}
132
133pub async fn start_backup(
134    Extension(state): Extension<Arc<ServerState>>,
135    Extension(ctx): Extension<RequestContext>,
136) -> Response {
137    match state.backup_coordinator.start_backup().await {
138        Ok(handle) => match backup_response(&state, &handle) {
139            Ok(response) => Json(response).into_response(),
140            Err(err) => error_response(err, &ctx),
141        },
142        Err(err) => error_response(err, &ctx),
143    }
144}
145
146pub async fn backup_status(
147    AxumPath(id): AxumPath<String>,
148    Extension(state): Extension<Arc<ServerState>>,
149    Extension(ctx): Extension<RequestContext>,
150) -> Response {
151    let handle = match parse_backup_handle(&id) {
152        Ok(handle) => handle,
153        Err(err) => return error_response(err, &ctx),
154    };
155    match backup_response(&state, &handle) {
156        Ok(response) => Json(response).into_response(),
157        Err(err) => error_response(err, &ctx),
158    }
159}
160
161pub async fn start_restore(
162    Extension(state): Extension<Arc<ServerState>>,
163    Extension(ctx): Extension<RequestContext>,
164    Json(request): Json<AdminRestoreRequest>,
165) -> Response {
166    let source_path = match request.source {
167        Some(source) => source.into(),
168        None => match crate::ops::restore::resolve_default_source(&state.config.data_dir) {
169            Ok(path) => path,
170            Err(crate::error::ServerError::NotFound(_)) => {
171                match state.backup_coordinator.latest_location() {
172                    Some(path) => path,
173                    None => {
174                        let data_dir = state.config.data_dir.clone();
175                        let archive_result = tokio::task::spawn_blocking(move || {
176                            perform_lifecycle_action("archive", Path::new(&data_dir))
177                        })
178                        .await
179                        .map_err(|err| crate::error::ServerError::Internal(err.to_string()))
180                        .and_then(|res| res.map_err(crate::error::ServerError::BadRequest));
181                        if let Err(err) = archive_result {
182                            return error_response(err, &ctx);
183                        }
184                        match crate::ops::restore::resolve_default_source(&state.config.data_dir) {
185                            Ok(path) => path,
186                            Err(err) => return error_response(err, &ctx),
187                        }
188                    }
189                }
190            }
191            Err(err) => return error_response(err, &ctx),
192        },
193    };
194    let source = RestoreSource { path: source_path };
195    match state.restore_coordinator.start_restore(source).await {
196        Ok(handle) => match restore_response(&state, &handle) {
197            Ok(response) => Json(response).into_response(),
198            Err(err) => error_response(err, &ctx),
199        },
200        Err(err) => error_response(err, &ctx),
201    }
202}
203
204pub async fn restore_status(
205    AxumPath(id): AxumPath<String>,
206    Extension(state): Extension<Arc<ServerState>>,
207    Extension(ctx): Extension<RequestContext>,
208) -> Response {
209    let handle = match parse_restore_handle(&id) {
210        Ok(handle) => handle,
211        Err(err) => return error_response(err, &ctx),
212    };
213    match restore_response(&state, &handle) {
214        Ok(response) => Json(response).into_response(),
215        Err(err) => error_response(err, &ctx),
216    }
217}
218
219pub async fn lifecycle(
220    Extension(state): Extension<Arc<ServerState>>,
221    Json(request): Json<AdminLifecycleRequest>,
222) -> impl IntoResponse {
223    let data_dir = state.config.data_dir.clone();
224    let action = request.action;
225    let result = tokio::task::spawn_blocking(move || {
226        perform_lifecycle_action(action.as_str(), Path::new(&data_dir))
227    })
228    .await
229    .map_err(|err| err.to_string())
230    .and_then(|res| res.map_err(|err| err.to_string()));
231
232    match result {
233        Ok(message) => (
234            StatusCode::OK,
235            Json(AdminLifecycleResponse {
236                status: "OK",
237                message,
238            }),
239        )
240            .into_response(),
241        Err(err) => (
242            StatusCode::BAD_REQUEST,
243            Json(AdminLifecycleResponse {
244                status: "Error",
245                message: err,
246            }),
247        )
248            .into_response(),
249    }
250}
251
252fn parse_backup_handle(id: &str) -> crate::error::Result<BackupHandle> {
253    let id = Uuid::parse_str(id)
254        .map_err(|_| crate::error::ServerError::BadRequest("invalid backup handle".into()))?;
255    Ok(BackupHandle { id })
256}
257
258fn parse_restore_handle(id: &str) -> crate::error::Result<RestoreHandle> {
259    let id = Uuid::parse_str(id)
260        .map_err(|_| crate::error::ServerError::BadRequest("invalid restore handle".into()))?;
261    Ok(RestoreHandle { id })
262}
263
264fn backup_response(
265    state: &ServerState,
266    handle: &BackupHandle,
267) -> crate::error::Result<AdminBackupResponse> {
268    let location = state.backup_coordinator.location(handle)?;
269    let status = state.backup_coordinator.status(handle)?;
270    Ok(AdminBackupResponse {
271        handle: handle.id.to_string(),
272        location: location.display().to_string(),
273        state: status,
274    })
275}
276
277fn restore_response(
278    state: &ServerState,
279    handle: &RestoreHandle,
280) -> crate::error::Result<AdminRestoreResponse> {
281    let status = state.restore_coordinator.status(handle)?;
282    let metadata = state.restore_coordinator.metadata(handle)?;
283    Ok(AdminRestoreResponse {
284        handle: handle.id.to_string(),
285        state: status,
286        metadata,
287    })
288}
289
290fn capabilities_for_auth(auth: &crate::auth::AuthMiddleware) -> (&'static str, Vec<&'static str>) {
291    match auth.mode() {
292        AuthMode::None => ("full", Vec::new()),
293        AuthMode::Dev { .. } => ("restricted", all_actions()),
294    }
295}
296
297fn all_actions() -> Vec<&'static str> {
298    vec![
299        "read", "create", "update", "delete", "archive", "restore", "backup", "export",
300    ]
301}
302
303fn perform_lifecycle_action(action: &str, data_dir: &Path) -> Result<String, String> {
304    if !data_dir.exists() {
305        return Err(format!(
306            "Data directory does not exist: {}",
307            data_dir.display()
308        ));
309    }
310    if !data_dir.is_dir() {
311        return Err(format!(
312            "Data directory is not a directory: {}",
313            data_dir.display()
314        ));
315    }
316
317    let lifecycle_root = data_dir.join(".lifecycle");
318    std::fs::create_dir_all(&lifecycle_root).map_err(|err| err.to_string())?;
319
320    match action {
321        "archive" => {
322            let dest = lifecycle_root.join("archive").join(timestamp_dir());
323            copy_data_dir(data_dir, &dest)?;
324            write_latest_marker(&lifecycle_root.join("archive"), &dest)?;
325            Ok(format!("Archived data to {}", dest.display()))
326        }
327        "export" => {
328            let dest = lifecycle_root.join("export").join(timestamp_dir());
329            copy_data_dir(data_dir, &dest)?;
330            write_latest_marker(&lifecycle_root.join("export"), &dest)?;
331            Ok(format!("Exported data to {}", dest.display()))
332        }
333        _ => Err("Unknown lifecycle action.".to_string()),
334    }
335}
336
337fn timestamp_dir() -> String {
338    let seconds = std::time::SystemTime::now()
339        .duration_since(std::time::UNIX_EPOCH)
340        .unwrap_or_default()
341        .as_secs();
342    format!("ts-{seconds}")
343}
344
345fn copy_data_dir(src: &Path, dest: &Path) -> Result<(), String> {
346    std::fs::create_dir_all(dest).map_err(|err| err.to_string())?;
347    copy_dir_filtered(src, dest)
348}
349
350fn copy_dir_filtered(src: &Path, dest: &Path) -> Result<(), String> {
351    for entry in std::fs::read_dir(src).map_err(|err| err.to_string())? {
352        let entry = entry.map_err(|err| err.to_string())?;
353        let file_type = entry.file_type().map_err(|err| err.to_string())?;
354        let name = entry.file_name();
355        if name == ".lifecycle" {
356            continue;
357        }
358        let dest_path = dest.join(name);
359        if file_type.is_dir() {
360            copy_data_dir(&entry.path(), &dest_path)?;
361        } else {
362            std::fs::copy(entry.path(), &dest_path).map_err(|err| err.to_string())?;
363        }
364    }
365    Ok(())
366}
367
368fn write_latest_marker(root: &Path, dest: &Path) -> Result<(), String> {
369    let marker = root.join("latest");
370    std::fs::create_dir_all(root).map_err(|err| err.to_string())?;
371    std::fs::write(&marker, dest.to_string_lossy().as_bytes()).map_err(|err| err.to_string())?;
372    Ok(())
373}