use std::fs;
use std::path::Path;
use std::sync::Arc;
use std::time::Duration;
use alopex_core::lsm::{LsmKV, LsmKVConfig};
use alopex_server::auth::AuthMode;
use alopex_server::config::ServerConfig;
use alopex_server::http;
use alopex_server::ops::backup::{BackupCoordinator, BackupHandle};
use alopex_server::ops::restore::{RestoreCoordinator, RestoreSource};
use alopex_server::ops::state::{LifecycleStateManager, Mode, OperationStatus};
use alopex_server::server::ServerState;
use alopex_server::Server;
use axum::body::Body;
use axum::http::{Method, Request, StatusCode};
use serde_json::{json, Value};
use tempfile::tempdir;
use tokio::time::{sleep, Duration as TokioDuration};
use tower::ServiceExt;
async fn build_state() -> (Arc<ServerState>, tempfile::TempDir) {
let temp = tempdir().expect("tempdir");
let config = ServerConfig {
data_dir: temp.path().to_path_buf(),
auth_mode: AuthMode::None,
query_timeout: Duration::from_secs(5),
audit_log_enabled: false,
..ServerConfig::default()
};
let server = Server::new(config).expect("server");
(server.state, temp)
}
async fn send_json(
router: axum::Router,
method: Method,
path: &str,
body: Value,
) -> (StatusCode, Vec<u8>) {
let request = Request::builder()
.method(method)
.uri(path)
.header(axum::http::header::CONTENT_TYPE, "application/json")
.body(Body::from(body.to_string()))
.expect("request");
let response = router.oneshot(request).await.expect("response");
let status = response.status();
let body = hyper::body::to_bytes(response.into_body())
.await
.expect("body");
(status, body.to_vec())
}
async fn send_empty(router: axum::Router, method: Method, path: &str) -> (StatusCode, Vec<u8>) {
let request = Request::builder()
.method(method)
.uri(path)
.body(Body::empty())
.expect("request");
let response = router.oneshot(request).await.expect("response");
let status = response.status();
let body = hyper::body::to_bytes(response.into_body())
.await
.expect("body");
(status, body.to_vec())
}
async fn wait_for_backup(router: axum::Router, handle: &str) -> Value {
let path = format!("/api/admin/backup/{handle}");
for _ in 0..200 {
let (status, body) = send_empty(router.clone(), Method::GET, &path).await;
assert_eq!(status, StatusCode::OK);
let value: Value = serde_json::from_slice(&body).expect("backup status json");
let state = value.get("state").cloned().expect("state");
let status_value = state
.get("status")
.and_then(|v| v.as_str())
.unwrap_or_default();
if status_value != "running" && status_value != "queued" {
return state;
}
sleep(TokioDuration::from_millis(100)).await;
}
panic!("backup did not complete in time");
}
async fn wait_for_restore(router: axum::Router, handle: &str) -> Value {
let path = format!("/api/admin/restore/{handle}");
for _ in 0..200 {
let (status, body) = send_empty(router.clone(), Method::GET, &path).await;
assert_eq!(status, StatusCode::OK);
let value: Value = serde_json::from_slice(&body).expect("restore status json");
let state = value.get("state").cloned().expect("state");
let status_value = state
.get("status")
.and_then(|v| v.as_str())
.unwrap_or_default();
if status_value != "running" && status_value != "queued" {
return value;
}
sleep(TokioDuration::from_millis(100)).await;
}
panic!("restore did not complete in time");
}
async fn wait_for_backup_state(
coordinator: &BackupCoordinator,
handle: &BackupHandle,
) -> alopex_server::ops::state::OperationState {
for _ in 0..50 {
let state = coordinator.status(handle).expect("backup status");
if state.status != OperationStatus::Running && state.status != OperationStatus::Queued {
return state;
}
sleep(TokioDuration::from_millis(50)).await;
}
panic!("backup did not complete in time");
}
async fn wait_for_restore_state(
coordinator: &RestoreCoordinator,
handle: &alopex_server::ops::restore::RestoreHandle,
) -> alopex_server::ops::state::OperationState {
for _ in 0..50 {
let state = coordinator.status(handle).expect("restore status");
if state.status != OperationStatus::Running && state.status != OperationStatus::Queued {
return state;
}
sleep(TokioDuration::from_millis(50)).await;
}
panic!("restore did not complete in time");
}
#[tokio::test]
async fn backup_restore_flow_reports_status_and_metadata() {
let (state, _temp) = build_state().await;
let router = http::router(state.clone());
let (status, _) = send_json(
router.clone(),
Method::POST,
"/sql",
json!({
"sql": "CREATE TABLE items (id INT PRIMARY KEY, name TEXT);"
}),
)
.await;
assert_eq!(status, StatusCode::OK);
let (status, _) = send_json(
router.clone(),
Method::POST,
"/sql",
json!({
"sql": "INSERT INTO items (id, name) VALUES (1, 'alpha');"
}),
)
.await;
assert_eq!(status, StatusCode::OK);
let (status, body) = send_empty(router.clone(), Method::POST, "/api/admin/backup").await;
assert_eq!(status, StatusCode::OK);
let value: Value = serde_json::from_slice(&body).expect("backup json");
let handle = value
.get("handle")
.and_then(|v| v.as_str())
.expect("handle");
let location = value
.get("location")
.and_then(|v| v.as_str())
.expect("location");
let state_value = wait_for_backup(router.clone(), handle).await;
assert_eq!(
state_value.get("status").and_then(|v| v.as_str()),
Some("completed")
);
assert!(Path::new(location).exists());
let (status, body) = send_json(
router.clone(),
Method::POST,
"/api/admin/restore",
json!({ "source": location }),
)
.await;
assert_eq!(status, StatusCode::OK);
let value: Value = serde_json::from_slice(&body).expect("restore json");
let restore_handle = value
.get("handle")
.and_then(|v| v.as_str())
.expect("handle");
let restore_value = wait_for_restore(router.clone(), restore_handle).await;
let restore_state = restore_value.get("state").expect("restore state");
assert_eq!(
restore_state.get("status").and_then(|v| v.as_str()),
Some("completed")
);
let metadata = restore_value.get("metadata").expect("metadata");
assert!(metadata.get("backup_id").and_then(|v| v.as_str()).is_some());
assert!(metadata.get("location").and_then(|v| v.as_str()).is_some());
assert!(metadata
.get("restored_at_ms")
.and_then(|v| v.as_u64())
.is_some());
assert!(metadata
.get("size_bytes")
.and_then(|v| v.as_u64())
.is_some());
}
#[tokio::test]
async fn status_payload_includes_operational_fields() {
let (state, _temp) = build_state().await;
let router = http::router(state.clone());
let (status, body) = send_empty(router, Method::GET, "/api/admin/status").await;
assert_eq!(status, StatusCode::OK);
let value: Value = serde_json::from_slice(&body).expect("status json");
assert!(value.get("overall_status").is_some());
assert!(value.get("read_only").is_some());
assert!(value.get("maintenance").is_some());
assert!(value.get("recovery_state").is_some());
assert!(value.get("backup_state").is_some());
assert!(value.get("restore_state").is_some());
}
#[tokio::test]
async fn backup_fails_on_corrupt_wal_sets_reason() {
let temp = tempdir().expect("tempdir");
let (store, _recovery) =
LsmKV::open_with_config(temp.path(), LsmKVConfig::default()).expect("open lsm");
store.checkpoint().expect("checkpoint");
drop(store);
let wal_path = temp.path().join("lsm.wal");
fs::write(&wal_path, b"corrupt").expect("corrupt wal");
let lifecycle = Arc::new(LifecycleStateManager::new(Mode::Normal));
let checkpoint = Arc::new(|| Ok(()));
let coordinator =
BackupCoordinator::new(temp.path().to_path_buf(), lifecycle.clone(), checkpoint);
let handle = coordinator.start_backup().await.expect("start backup");
let state = wait_for_backup_state(&coordinator, &handle).await;
assert_eq!(state.status, OperationStatus::Failed);
assert!(state.reason.is_some());
}
#[tokio::test]
async fn restore_invalid_source_sets_failed_and_read_only() {
let temp = tempdir().expect("tempdir");
let lifecycle = Arc::new(LifecycleStateManager::new(Mode::Normal));
let coordinator = RestoreCoordinator::new(temp.path().to_path_buf(), lifecycle.clone());
let source = RestoreSource {
path: temp.path().join("missing-backup"),
};
let handle = coordinator
.start_restore(source)
.await
.expect("start restore");
let state = wait_for_restore_state(&coordinator, &handle).await;
assert_eq!(state.status, OperationStatus::Failed);
assert!(state.reason.is_some());
assert_eq!(lifecycle.current_mode(), Mode::ReadOnly);
}
#[tokio::test]
async fn maintenance_blocks_writes_after_restore_start() {
let (state, _temp) = build_state().await;
let router = http::router(state.clone());
let (status, _) = send_json(
router.clone(),
Method::POST,
"/sql",
json!({
"sql": "CREATE TABLE items (id INT PRIMARY KEY, embedding VECTOR(2, L2));"
}),
)
.await;
assert_eq!(status, StatusCode::OK);
let source_dir = tempdir().expect("tempdir");
let source_file = source_dir.path().join("bulk.bin");
fs::write(&source_file, vec![0u8; 5_000_000]).expect("write source");
let (status, _) = send_json(
router.clone(),
Method::POST,
"/api/admin/restore",
json!({ "source": source_dir.path().display().to_string() }),
)
.await;
assert_eq!(status, StatusCode::OK);
let mut in_maintenance = false;
for _ in 0..10 {
if state.lifecycle_state.current_mode() == Mode::Maintenance {
in_maintenance = true;
break;
}
sleep(TokioDuration::from_millis(10)).await;
}
assert!(in_maintenance, "restore did not enter maintenance mode");
let (status, _) = send_json(
router.clone(),
Method::POST,
"/sql",
json!({
"sql": "INSERT INTO items (id, embedding) VALUES (1, [0.0, 0.0]);"
}),
)
.await;
assert_eq!(status, StatusCode::CONFLICT);
let (status, _) = send_json(
router.clone(),
Method::POST,
"/kv/put",
json!({
"key": "blocked",
"value": [1, 2, 3]
}),
)
.await;
assert_eq!(status, StatusCode::CONFLICT);
let (status, _) = send_json(
router,
Method::POST,
"/vector/upsert",
json!({
"table": "items",
"id": 1,
"vector": [0.1, 0.2]
}),
)
.await;
assert_eq!(status, StatusCode::CONFLICT);
}