Skip to main content

amaters_server/
admin.rs

1//! Admin API for cluster and shard management.
2//!
3//! Provides an in-process API surface (not a network listener) that the server
4//! binary and integration tests can call to inspect cluster state, trigger
5//! snapshots, and query health.
6
7use std::collections::HashMap;
8use std::sync::Arc;
9
10use serde::{Deserialize, Serialize};
11
12use crate::config::ServerConfig;
13use crate::server::{ServerError, ServerResult};
14use crate::snapshot::SnapshotManager;
15
16// ─── Response types ───────────────────────────────────────────────────────────
17
18/// Overall cluster status as returned by [`AdminApi::get_cluster_status`].
19#[derive(Debug, Clone, Serialize, Deserialize)]
20pub struct ClusterStatusResponse {
21    /// This node's numeric identifier.
22    pub node_id: u64,
23    /// Whether this node currently believes it is the Raft leader.
24    pub is_leader: bool,
25    /// Number of shards tracked in the placement registry.
26    pub num_shards: usize,
27    /// Number of peer nodes this node knows about (including itself).
28    pub num_nodes: usize,
29}
30
31/// A single snapshot entry as returned in [`SnapshotListResponse`].
32#[derive(Debug, Clone, Serialize, Deserialize)]
33pub struct SnapshotInfo {
34    /// Snapshot identifier.
35    pub id: u64,
36    /// Unix timestamp in milliseconds when the snapshot was created.
37    pub timestamp_ms: u64,
38    /// Size of the compressed snapshot bytes on disk.
39    pub size_bytes: u64,
40}
41
42/// Response returned by [`AdminApi::list_snapshots`].
43#[derive(Debug, Clone, Serialize, Deserialize)]
44pub struct SnapshotListResponse {
45    /// All available snapshots, sorted newest-first.
46    pub snapshots: Vec<SnapshotInfo>,
47}
48
49/// Health check response returned by [`AdminApi::get_health`].
50#[derive(Debug, Clone, Serialize, Deserialize)]
51pub struct HealthStatus {
52    /// Coarse status string: `"healthy"`, `"degraded"`, or `"unhealthy"`.
53    pub status: String,
54    /// Finer-grained per-component details.
55    pub details: HashMap<String, String>,
56}
57
58/// A summary of a single shard's state for admin responses.
59#[derive(Debug, Clone, Serialize, Deserialize)]
60pub struct ShardSummary {
61    /// Shard identifier.
62    pub shard_id: u64,
63    /// Node currently hosting this shard.
64    pub node_id: u64,
65    /// Human-readable state string (e.g. "Active", "Splitting").
66    pub state: String,
67    /// Estimated number of keys in this shard.
68    pub key_count: u64,
69    /// Estimated byte size of this shard.
70    pub size_bytes: u64,
71}
72
73/// Response for shard list admin API.
74#[derive(Debug, Clone, Serialize, Deserialize)]
75pub struct ShardListResponse {
76    /// All known shards.
77    pub shards: Vec<ShardSummary>,
78    /// Total number of shards (same as `shards.len()`).
79    pub total: usize,
80}
81
82/// Response for shard operation (split/merge/transfer) admin API.
83#[derive(Debug, Clone, Serialize, Deserialize)]
84pub struct ShardOpResponse {
85    /// Raft log index at which the operation was committed.
86    pub log_index: u64,
87    /// Human-readable status string.
88    pub status: String,
89}
90
91// ─── AdminApi ─────────────────────────────────────────────────────────────────
92
93/// In-process admin API that exposes cluster management operations.
94pub struct AdminApi {
95    config: Arc<ServerConfig>,
96    snapshot_manager: Arc<SnapshotManager>,
97}
98
99impl AdminApi {
100    /// Create a new [`AdminApi`] instance.
101    pub fn new(config: Arc<ServerConfig>, snapshot_manager: Arc<SnapshotManager>) -> Self {
102        Self {
103            config,
104            snapshot_manager,
105        }
106    }
107
108    /// Return the current cluster status.
109    ///
110    /// When the `cluster` feature is disabled this always reports
111    /// `is_leader = true` (standalone mode) and zero shards / one node.
112    pub async fn get_cluster_status(&self) -> ServerResult<ClusterStatusResponse> {
113        let node_id = self.config.cluster.as_ref().map(|c| c.node_id).unwrap_or(1);
114
115        let num_nodes = self
116            .config
117            .cluster
118            .as_ref()
119            .map(|c| {
120                // +1 for self if self is not already counted
121                let peer_count = c.peers.len();
122                if peer_count == 0 { 1 } else { peer_count }
123            })
124            .unwrap_or(1);
125
126        Ok(ClusterStatusResponse {
127            node_id,
128            is_leader: true,
129            num_shards: 0,
130            num_nodes,
131        })
132    }
133
134    /// Create a new snapshot of the current server state.
135    ///
136    /// The snapshot payload is a minimal status blob.  In a full cluster
137    /// implementation this would be the serialised state-machine snapshot.
138    pub async fn create_snapshot(&self) -> ServerResult<SnapshotInfo> {
139        // Build a trivial payload: JSON-encoded cluster status.
140        let status = self.get_cluster_status().await?;
141        let payload = serde_json::to_vec(&status)
142            .map_err(|e| ServerError::Storage(format!("Failed to serialise snapshot: {}", e)))?;
143
144        // Use nanosecond wall time as snapshot id for uniqueness.
145        let id = std::time::SystemTime::now()
146            .duration_since(std::time::UNIX_EPOCH)
147            .map(|d| d.as_nanos() as u64)
148            .unwrap_or(0);
149
150        let meta = self.snapshot_manager.write_snapshot(id, &payload)?;
151
152        Ok(SnapshotInfo {
153            id: meta.id,
154            timestamp_ms: meta.timestamp_ms,
155            size_bytes: meta.size_bytes,
156        })
157    }
158
159    /// Return the list of snapshots available on disk.
160    pub async fn list_snapshots(&self) -> ServerResult<SnapshotListResponse> {
161        let metas = self.snapshot_manager.list_snapshots()?;
162        let snapshots = metas
163            .into_iter()
164            .map(|m| SnapshotInfo {
165                id: m.id,
166                timestamp_ms: m.timestamp_ms,
167                size_bytes: m.size_bytes,
168            })
169            .collect();
170        Ok(SnapshotListResponse { snapshots })
171    }
172
173    /// Restore state from the snapshot with the given `snapshot_id`.
174    ///
175    /// Currently validates that the snapshot can be read and decompressed
176    /// without error.  Full state-machine restore is a Phase 8 task once the
177    /// Raft leader drives apply.
178    pub async fn restore_snapshot(&self, snapshot_id: u64) -> ServerResult<()> {
179        let _data = self.snapshot_manager.read_snapshot(snapshot_id)?;
180        Ok(())
181    }
182
183    /// Return overall health status for this node.
184    pub async fn get_health(&self) -> ServerResult<HealthStatus> {
185        let mut details = HashMap::new();
186
187        // Storage health: check the snapshot directory is writable
188        let snap_health = match self.snapshot_manager.list_snapshots() {
189            Ok(snaps) => {
190                details.insert("snapshot_count".to_string(), snaps.len().to_string());
191                "ok".to_string()
192            }
193            Err(e) => {
194                details.insert("snapshot_error".to_string(), e.to_string());
195                "error".to_string()
196            }
197        };
198
199        // Config health
200        details.insert(
201            "bind_address".to_string(),
202            self.config.server.bind_address.clone(),
203        );
204        details.insert(
205            "cluster_enabled".to_string(),
206            self.config
207                .cluster
208                .as_ref()
209                .map(|c| c.enabled.to_string())
210                .unwrap_or_else(|| "false".to_string()),
211        );
212
213        let status = if snap_health == "ok" {
214            "healthy".to_string()
215        } else {
216            "degraded".to_string()
217        };
218
219        Ok(HealthStatus { status, details })
220    }
221}
222
223// ─── Unit tests ──────────────────────────────────────────────────────────────
224
225#[cfg(test)]
226mod tests {
227    use super::*;
228
229    fn make_api(suffix: &str) -> (AdminApi, PathBuf) {
230        // Include a nanosecond timestamp to avoid races between overlapping test runs.
231        let ts = std::time::SystemTime::now()
232            .duration_since(std::time::UNIX_EPOCH)
233            .map(|d| d.subsec_nanos())
234            .unwrap_or(0);
235        let dir = std::env::temp_dir().join(format!("amaters_admin_test_{}_{}", suffix, ts));
236        std::fs::create_dir_all(&dir).expect("temp dir");
237        let config = Arc::new(ServerConfig::default());
238        let sm = Arc::new(SnapshotManager::new(&dir).expect("snapshot manager"));
239        (AdminApi::new(config, sm), dir)
240    }
241
242    use std::path::PathBuf;
243
244    #[tokio::test]
245    async fn test_admin_api_cluster_status() {
246        let (api, dir) = make_api("cluster_status");
247        let resp = api.get_cluster_status().await.expect("cluster status");
248        assert!(resp.node_id >= 1);
249        assert!(resp.num_nodes >= 1);
250        std::fs::remove_dir_all(&dir).ok();
251    }
252
253    #[tokio::test]
254    async fn test_admin_api_create_snapshot() {
255        let (api, dir) = make_api("create_snap");
256        let info = api.create_snapshot().await.expect("create snapshot");
257        assert!(info.id > 0);
258        assert!(info.size_bytes > 0);
259        std::fs::remove_dir_all(&dir).ok();
260    }
261
262    #[tokio::test]
263    async fn test_admin_api_list_snapshots() {
264        let (api, dir) = make_api("list_snaps");
265        let info = api.create_snapshot().await.expect("create");
266        let list = api.list_snapshots().await.expect("list");
267        assert!(!list.snapshots.is_empty());
268        assert!(list.snapshots.iter().any(|s| s.id == info.id));
269        std::fs::remove_dir_all(&dir).ok();
270    }
271
272    #[tokio::test]
273    async fn test_admin_api_health_check() {
274        let (api, dir) = make_api("health");
275        let health = api.get_health().await.expect("health");
276        assert_eq!(health.status, "healthy");
277        assert!(health.details.contains_key("bind_address"));
278        std::fs::remove_dir_all(&dir).ok();
279    }
280
281    #[tokio::test]
282    async fn test_admin_api_restore_snapshot() {
283        let (api, dir) = make_api("restore");
284        let info = api.create_snapshot().await.expect("create");
285        api.restore_snapshot(info.id)
286            .await
287            .expect("restore should succeed");
288        std::fs::remove_dir_all(&dir).ok();
289    }
290
291    #[tokio::test]
292    async fn test_admin_api_restore_missing_snapshot() {
293        let (api, dir) = make_api("restore_missing");
294        let result = api.restore_snapshot(u64::MAX).await;
295        assert!(
296            result.is_err(),
297            "Restoring a non-existent snapshot must fail"
298        );
299        std::fs::remove_dir_all(&dir).ok();
300    }
301}