use std::collections::HashMap;
use std::sync::Arc;
use serde::{Deserialize, Serialize};
use crate::config::ServerConfig;
use crate::server::{ServerError, ServerResult};
use crate::snapshot::SnapshotManager;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ClusterStatusResponse {
pub node_id: u64,
pub is_leader: bool,
pub num_shards: usize,
pub num_nodes: usize,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SnapshotInfo {
pub id: u64,
pub timestamp_ms: u64,
pub size_bytes: u64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SnapshotListResponse {
pub snapshots: Vec<SnapshotInfo>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct HealthStatus {
pub status: String,
pub details: HashMap<String, String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ShardSummary {
pub shard_id: u64,
pub node_id: u64,
pub state: String,
pub key_count: u64,
pub size_bytes: u64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ShardListResponse {
pub shards: Vec<ShardSummary>,
pub total: usize,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ShardOpResponse {
pub log_index: u64,
pub status: String,
}
pub struct AdminApi {
config: Arc<ServerConfig>,
snapshot_manager: Arc<SnapshotManager>,
}
impl AdminApi {
pub fn new(config: Arc<ServerConfig>, snapshot_manager: Arc<SnapshotManager>) -> Self {
Self {
config,
snapshot_manager,
}
}
pub async fn get_cluster_status(&self) -> ServerResult<ClusterStatusResponse> {
let node_id = self.config.cluster.as_ref().map(|c| c.node_id).unwrap_or(1);
let num_nodes = self
.config
.cluster
.as_ref()
.map(|c| {
let peer_count = c.peers.len();
if peer_count == 0 { 1 } else { peer_count }
})
.unwrap_or(1);
Ok(ClusterStatusResponse {
node_id,
is_leader: true,
num_shards: 0,
num_nodes,
})
}
pub async fn create_snapshot(&self) -> ServerResult<SnapshotInfo> {
let status = self.get_cluster_status().await?;
let payload = serde_json::to_vec(&status)
.map_err(|e| ServerError::Storage(format!("Failed to serialise snapshot: {}", e)))?;
let id = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_nanos() as u64)
.unwrap_or(0);
let meta = self.snapshot_manager.write_snapshot(id, &payload)?;
Ok(SnapshotInfo {
id: meta.id,
timestamp_ms: meta.timestamp_ms,
size_bytes: meta.size_bytes,
})
}
pub async fn list_snapshots(&self) -> ServerResult<SnapshotListResponse> {
let metas = self.snapshot_manager.list_snapshots()?;
let snapshots = metas
.into_iter()
.map(|m| SnapshotInfo {
id: m.id,
timestamp_ms: m.timestamp_ms,
size_bytes: m.size_bytes,
})
.collect();
Ok(SnapshotListResponse { snapshots })
}
pub async fn restore_snapshot(&self, snapshot_id: u64) -> ServerResult<()> {
let _data = self.snapshot_manager.read_snapshot(snapshot_id)?;
Ok(())
}
pub async fn get_health(&self) -> ServerResult<HealthStatus> {
let mut details = HashMap::new();
let snap_health = match self.snapshot_manager.list_snapshots() {
Ok(snaps) => {
details.insert("snapshot_count".to_string(), snaps.len().to_string());
"ok".to_string()
}
Err(e) => {
details.insert("snapshot_error".to_string(), e.to_string());
"error".to_string()
}
};
details.insert(
"bind_address".to_string(),
self.config.server.bind_address.clone(),
);
details.insert(
"cluster_enabled".to_string(),
self.config
.cluster
.as_ref()
.map(|c| c.enabled.to_string())
.unwrap_or_else(|| "false".to_string()),
);
let status = if snap_health == "ok" {
"healthy".to_string()
} else {
"degraded".to_string()
};
Ok(HealthStatus { status, details })
}
}
#[cfg(test)]
mod tests {
use super::*;
fn make_api(suffix: &str) -> (AdminApi, PathBuf) {
let ts = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.subsec_nanos())
.unwrap_or(0);
let dir = std::env::temp_dir().join(format!("amaters_admin_test_{}_{}", suffix, ts));
std::fs::create_dir_all(&dir).expect("temp dir");
let config = Arc::new(ServerConfig::default());
let sm = Arc::new(SnapshotManager::new(&dir).expect("snapshot manager"));
(AdminApi::new(config, sm), dir)
}
use std::path::PathBuf;
#[tokio::test]
async fn test_admin_api_cluster_status() {
let (api, dir) = make_api("cluster_status");
let resp = api.get_cluster_status().await.expect("cluster status");
assert!(resp.node_id >= 1);
assert!(resp.num_nodes >= 1);
std::fs::remove_dir_all(&dir).ok();
}
#[tokio::test]
async fn test_admin_api_create_snapshot() {
let (api, dir) = make_api("create_snap");
let info = api.create_snapshot().await.expect("create snapshot");
assert!(info.id > 0);
assert!(info.size_bytes > 0);
std::fs::remove_dir_all(&dir).ok();
}
#[tokio::test]
async fn test_admin_api_list_snapshots() {
let (api, dir) = make_api("list_snaps");
let info = api.create_snapshot().await.expect("create");
let list = api.list_snapshots().await.expect("list");
assert!(!list.snapshots.is_empty());
assert!(list.snapshots.iter().any(|s| s.id == info.id));
std::fs::remove_dir_all(&dir).ok();
}
#[tokio::test]
async fn test_admin_api_health_check() {
let (api, dir) = make_api("health");
let health = api.get_health().await.expect("health");
assert_eq!(health.status, "healthy");
assert!(health.details.contains_key("bind_address"));
std::fs::remove_dir_all(&dir).ok();
}
#[tokio::test]
async fn test_admin_api_restore_snapshot() {
let (api, dir) = make_api("restore");
let info = api.create_snapshot().await.expect("create");
api.restore_snapshot(info.id)
.await
.expect("restore should succeed");
std::fs::remove_dir_all(&dir).ok();
}
#[tokio::test]
async fn test_admin_api_restore_missing_snapshot() {
let (api, dir) = make_api("restore_missing");
let result = api.restore_snapshot(u64::MAX).await;
assert!(
result.is_err(),
"Restoring a non-existent snapshot must fail"
);
std::fs::remove_dir_all(&dir).ok();
}
}