1use std::collections::HashMap;
8use std::sync::Arc;
9
10use serde::{Deserialize, Serialize};
11
12use crate::config::ServerConfig;
13use crate::server::{ServerError, ServerResult};
14use crate::snapshot::SnapshotManager;
15
16#[derive(Debug, Clone, Serialize, Deserialize)]
20pub struct ClusterStatusResponse {
21 pub node_id: u64,
23 pub is_leader: bool,
25 pub num_shards: usize,
27 pub num_nodes: usize,
29}
30
31#[derive(Debug, Clone, Serialize, Deserialize)]
33pub struct SnapshotInfo {
34 pub id: u64,
36 pub timestamp_ms: u64,
38 pub size_bytes: u64,
40}
41
42#[derive(Debug, Clone, Serialize, Deserialize)]
44pub struct SnapshotListResponse {
45 pub snapshots: Vec<SnapshotInfo>,
47}
48
49#[derive(Debug, Clone, Serialize, Deserialize)]
51pub struct HealthStatus {
52 pub status: String,
54 pub details: HashMap<String, String>,
56}
57
58#[derive(Debug, Clone, Serialize, Deserialize)]
60pub struct ShardSummary {
61 pub shard_id: u64,
63 pub node_id: u64,
65 pub state: String,
67 pub key_count: u64,
69 pub size_bytes: u64,
71}
72
73#[derive(Debug, Clone, Serialize, Deserialize)]
75pub struct ShardListResponse {
76 pub shards: Vec<ShardSummary>,
78 pub total: usize,
80}
81
82#[derive(Debug, Clone, Serialize, Deserialize)]
84pub struct ShardOpResponse {
85 pub log_index: u64,
87 pub status: String,
89}
90
91pub struct AdminApi {
95 config: Arc<ServerConfig>,
96 snapshot_manager: Arc<SnapshotManager>,
97}
98
99impl AdminApi {
100 pub fn new(config: Arc<ServerConfig>, snapshot_manager: Arc<SnapshotManager>) -> Self {
102 Self {
103 config,
104 snapshot_manager,
105 }
106 }
107
108 pub async fn get_cluster_status(&self) -> ServerResult<ClusterStatusResponse> {
113 let node_id = self.config.cluster.as_ref().map(|c| c.node_id).unwrap_or(1);
114
115 let num_nodes = self
116 .config
117 .cluster
118 .as_ref()
119 .map(|c| {
120 let peer_count = c.peers.len();
122 if peer_count == 0 { 1 } else { peer_count }
123 })
124 .unwrap_or(1);
125
126 Ok(ClusterStatusResponse {
127 node_id,
128 is_leader: true,
129 num_shards: 0,
130 num_nodes,
131 })
132 }
133
134 pub async fn create_snapshot(&self) -> ServerResult<SnapshotInfo> {
139 let status = self.get_cluster_status().await?;
141 let payload = serde_json::to_vec(&status)
142 .map_err(|e| ServerError::Storage(format!("Failed to serialise snapshot: {}", e)))?;
143
144 let id = std::time::SystemTime::now()
146 .duration_since(std::time::UNIX_EPOCH)
147 .map(|d| d.as_nanos() as u64)
148 .unwrap_or(0);
149
150 let meta = self.snapshot_manager.write_snapshot(id, &payload)?;
151
152 Ok(SnapshotInfo {
153 id: meta.id,
154 timestamp_ms: meta.timestamp_ms,
155 size_bytes: meta.size_bytes,
156 })
157 }
158
159 pub async fn list_snapshots(&self) -> ServerResult<SnapshotListResponse> {
161 let metas = self.snapshot_manager.list_snapshots()?;
162 let snapshots = metas
163 .into_iter()
164 .map(|m| SnapshotInfo {
165 id: m.id,
166 timestamp_ms: m.timestamp_ms,
167 size_bytes: m.size_bytes,
168 })
169 .collect();
170 Ok(SnapshotListResponse { snapshots })
171 }
172
173 pub async fn restore_snapshot(&self, snapshot_id: u64) -> ServerResult<()> {
179 let _data = self.snapshot_manager.read_snapshot(snapshot_id)?;
180 Ok(())
181 }
182
183 pub async fn get_health(&self) -> ServerResult<HealthStatus> {
185 let mut details = HashMap::new();
186
187 let snap_health = match self.snapshot_manager.list_snapshots() {
189 Ok(snaps) => {
190 details.insert("snapshot_count".to_string(), snaps.len().to_string());
191 "ok".to_string()
192 }
193 Err(e) => {
194 details.insert("snapshot_error".to_string(), e.to_string());
195 "error".to_string()
196 }
197 };
198
199 details.insert(
201 "bind_address".to_string(),
202 self.config.server.bind_address.clone(),
203 );
204 details.insert(
205 "cluster_enabled".to_string(),
206 self.config
207 .cluster
208 .as_ref()
209 .map(|c| c.enabled.to_string())
210 .unwrap_or_else(|| "false".to_string()),
211 );
212
213 let status = if snap_health == "ok" {
214 "healthy".to_string()
215 } else {
216 "degraded".to_string()
217 };
218
219 Ok(HealthStatus { status, details })
220 }
221}
222
223#[cfg(test)]
226mod tests {
227 use super::*;
228
229 fn make_api(suffix: &str) -> (AdminApi, PathBuf) {
230 let ts = std::time::SystemTime::now()
232 .duration_since(std::time::UNIX_EPOCH)
233 .map(|d| d.subsec_nanos())
234 .unwrap_or(0);
235 let dir = std::env::temp_dir().join(format!("amaters_admin_test_{}_{}", suffix, ts));
236 std::fs::create_dir_all(&dir).expect("temp dir");
237 let config = Arc::new(ServerConfig::default());
238 let sm = Arc::new(SnapshotManager::new(&dir).expect("snapshot manager"));
239 (AdminApi::new(config, sm), dir)
240 }
241
242 use std::path::PathBuf;
243
244 #[tokio::test]
245 async fn test_admin_api_cluster_status() {
246 let (api, dir) = make_api("cluster_status");
247 let resp = api.get_cluster_status().await.expect("cluster status");
248 assert!(resp.node_id >= 1);
249 assert!(resp.num_nodes >= 1);
250 std::fs::remove_dir_all(&dir).ok();
251 }
252
253 #[tokio::test]
254 async fn test_admin_api_create_snapshot() {
255 let (api, dir) = make_api("create_snap");
256 let info = api.create_snapshot().await.expect("create snapshot");
257 assert!(info.id > 0);
258 assert!(info.size_bytes > 0);
259 std::fs::remove_dir_all(&dir).ok();
260 }
261
262 #[tokio::test]
263 async fn test_admin_api_list_snapshots() {
264 let (api, dir) = make_api("list_snaps");
265 let info = api.create_snapshot().await.expect("create");
266 let list = api.list_snapshots().await.expect("list");
267 assert!(!list.snapshots.is_empty());
268 assert!(list.snapshots.iter().any(|s| s.id == info.id));
269 std::fs::remove_dir_all(&dir).ok();
270 }
271
272 #[tokio::test]
273 async fn test_admin_api_health_check() {
274 let (api, dir) = make_api("health");
275 let health = api.get_health().await.expect("health");
276 assert_eq!(health.status, "healthy");
277 assert!(health.details.contains_key("bind_address"));
278 std::fs::remove_dir_all(&dir).ok();
279 }
280
281 #[tokio::test]
282 async fn test_admin_api_restore_snapshot() {
283 let (api, dir) = make_api("restore");
284 let info = api.create_snapshot().await.expect("create");
285 api.restore_snapshot(info.id)
286 .await
287 .expect("restore should succeed");
288 std::fs::remove_dir_all(&dir).ok();
289 }
290
291 #[tokio::test]
292 async fn test_admin_api_restore_missing_snapshot() {
293 let (api, dir) = make_api("restore_missing");
294 let result = api.restore_snapshot(u64::MAX).await;
295 assert!(
296 result.is_err(),
297 "Restoring a non-existent snapshot must fail"
298 );
299 std::fs::remove_dir_all(&dir).ok();
300 }
301}