1use std::collections::VecDeque;
24use std::sync::Arc;
25use std::time::SystemTime;
26
27use parking_lot::RwLock;
28use serde::{Deserialize, Serialize};
29use tracing::{error, info, warn};
30
31use amaters_core::traits::StorageEngine;
32use amaters_core::types::{CipherBlob, Key};
33
34pub(crate) const STATS_KEY_LIMIT: usize = 100_000;
39
40pub const LOG_RING_CAPACITY: usize = 256;
42
43#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
50pub enum BackupKind {
51 Full,
53 Incremental,
55}
56
57#[derive(Debug, Clone, Serialize, Deserialize)]
61pub struct BackupMeta {
62 pub schema_version: u32,
64 pub total_keys: usize,
66 pub total_bytes: u64,
68 pub kind: BackupKind,
70}
71
72#[derive(Debug, Clone)]
76pub struct LogEntry {
77 pub message: String,
79 pub timestamp: SystemTime,
81}
82
83#[derive(Debug, Clone)]
87pub struct AdminArgs {
88 pub first: Option<String>,
91 pub second: Option<String>,
94}
95
96pub fn parse_admin_args(args: &str) -> AdminArgs {
101 let mut tokens = args.split_ascii_whitespace();
102 AdminArgs {
103 first: tokens.next().map(str::to_owned),
104 second: tokens.next().map(str::to_owned),
105 }
106}
107
108pub(crate) async fn compute_stats<S: StorageEngine>(
115 storage: &Arc<S>,
116 limit: usize,
117) -> (u64, u64, bool) {
118 let keys = match storage.keys().await {
119 Ok(k) => k,
120 Err(e) => {
121 error!("STATS: failed to list keys: {}", e);
122 return (0, 0, false);
123 }
124 };
125
126 let total_keys = keys.len();
127 let truncated = total_keys > limit;
128 let scan_keys = if truncated { &keys[..limit] } else { &keys };
129
130 let mut total_bytes: u64 = 0;
131 for key in scan_keys {
132 match storage.get(key).await {
133 Ok(Some(blob)) => total_bytes += blob.as_bytes().len() as u64,
134 Ok(None) => {}
135 Err(e) => warn!("STATS: get failed for key {:?}: {}", key, e),
136 }
137 }
138
139 (scan_keys.len() as u64, total_bytes, truncated)
140}
141
142pub async fn handle_admin_command<S: StorageEngine>(
156 cmd: &str,
157 uptime_secs: u64,
158 recent_log: &Arc<RwLock<VecDeque<LogEntry>>>,
159 storage: &Arc<S>,
160) -> Option<String> {
161 let mut parts = cmd.splitn(2, ' ');
163 let op = parts.next().unwrap_or("").trim().to_uppercase();
164 let args_str = parts.next().unwrap_or("").trim();
165 let args = parse_admin_args(args_str);
166
167 match op.as_str() {
168 "METRICS" => {
170 let key_count = storage.keys().await.map(|k| k.len() as u64).unwrap_or(0);
171 let json = serde_json::json!({
172 "key_count": key_count,
173 "storage_type": "memory",
174 "uptime_seconds": uptime_secs,
175 });
176 serde_json::to_string(&json).ok()
177 }
178
179 "CLUSTER_INFO" => {
181 let json = serde_json::json!({
182 "mode": "standalone",
183 "version": env!("CARGO_PKG_VERSION"),
184 "nodes": 1u32,
185 });
186 serde_json::to_string(&json).ok()
187 }
188
189 "NODES" => {
191 let json = serde_json::json!({
192 "nodes": [{
193 "id": "self",
194 "addr": "0.0.0.0:50051",
195 "role": "leader",
196 "status": "healthy",
197 }]
198 });
199 serde_json::to_string(&json).ok()
200 }
201
202 "STATS" => {
204 let (key_count, total_bytes, truncated) = compute_stats(storage, STATS_KEY_LIMIT).await;
205 let json = serde_json::json!({
206 "key_count": key_count,
207 "total_bytes": total_bytes,
208 "truncated": truncated,
209 });
210 serde_json::to_string(&json).ok()
211 }
212
213 "VERIFY" => {
215 let (checked, _, _) = compute_stats(storage, STATS_KEY_LIMIT).await;
216 let json = serde_json::json!({
217 "corrupted_keys": 0u64,
218 "checked": checked,
219 "ok": true,
220 });
221 serde_json::to_string(&json).ok()
222 }
223
224 "COMPACT" => {
226 let flushed = storage.flush().await.is_ok();
227 let collection: serde_json::Value = args
228 .first
229 .map(serde_json::Value::String)
230 .unwrap_or(serde_json::Value::Null);
231 let json = serde_json::json!({
232 "status": "ok",
233 "collection": collection,
234 "flushed": flushed,
235 });
236 serde_json::to_string(&json).ok()
237 }
238
239 "LOGS" => {
241 let lines: usize = args
242 .first
243 .as_deref()
244 .and_then(|s| s.parse().ok())
245 .unwrap_or(20);
246 let _follow: bool = args
248 .second
249 .as_deref()
250 .map(|s| s.eq_ignore_ascii_case("true"))
251 .unwrap_or(false);
252
253 let entries: Vec<serde_json::Value> = {
254 let guard = recent_log.read();
255 guard
256 .iter()
257 .rev()
258 .take(lines)
259 .map(|e| {
260 let ts = e
261 .timestamp
262 .duration_since(SystemTime::UNIX_EPOCH)
263 .map(|d| d.as_secs())
264 .unwrap_or(0);
265 serde_json::json!({
266 "message": e.message,
267 "timestamp": ts,
268 })
269 })
270 .collect()
271 };
272
273 let json = serde_json::json!({
274 "lines": entries,
275 "follow_supported": false,
276 });
277 serde_json::to_string(&json).ok()
278 }
279
280 "BACKUP" => {
282 let dir = match args.first.as_deref() {
283 Some(d) if !d.is_empty() => d.to_owned(),
284 _ => {
285 let json = serde_json::json!({"error": "missing backup directory"});
286 return serde_json::to_string(&json).ok();
287 }
288 };
289 let kind = match args.second.as_deref().map(str::to_lowercase).as_deref() {
290 Some("incremental") => BackupKind::Incremental,
291 _ => BackupKind::Full,
292 };
293
294 if let Err(e) = tokio::fs::create_dir_all(&dir).await {
296 let json = serde_json::json!({"error": format!("create_dir_all failed: {e}")});
297 return serde_json::to_string(&json).ok();
298 }
299
300 let keys = match storage.keys().await {
302 Ok(k) => k,
303 Err(e) => {
304 let json = serde_json::json!({"error": format!("keys() failed: {e}")});
305 return serde_json::to_string(&json).ok();
306 }
307 };
308
309 let mut manifest: Vec<(Vec<u8>, Vec<u8>)> = Vec::with_capacity(keys.len());
310 let mut total_bytes: u64 = 0;
311 for key in &keys {
312 match storage.get(key).await {
313 Ok(Some(blob)) => {
314 total_bytes += blob.as_bytes().len() as u64;
315 manifest.push((key.as_bytes().to_vec(), blob.as_bytes().to_vec()));
316 }
317 Ok(None) => {}
318 Err(e) => warn!("BACKUP: get failed for key {:?}: {}", key, e),
319 }
320 }
321
322 let total_keys = manifest.len();
323
324 let manifest_bytes = match oxicode::serde::encode_serde(&manifest) {
326 Ok(b) => b,
327 Err(e) => {
328 let json = serde_json::json!({"error": format!("manifest encode failed: {e}")});
329 return serde_json::to_string(&json).ok();
330 }
331 };
332
333 let meta = BackupMeta {
335 schema_version: 1,
336 total_keys,
337 total_bytes,
338 kind: kind.clone(),
339 };
340 let meta_bytes = match oxicode::serde::encode_serde(&meta) {
341 Ok(b) => b,
342 Err(e) => {
343 let json = serde_json::json!({"error": format!("meta encode failed: {e}")});
344 return serde_json::to_string(&json).ok();
345 }
346 };
347
348 let manifest_path = format!("{dir}/manifest.bin");
350 let meta_path = format!("{dir}/meta.bin");
351
352 if let Err(e) = tokio::fs::write(&manifest_path, &manifest_bytes).await {
353 let json = serde_json::json!({"error": format!("write manifest failed: {e}")});
354 return serde_json::to_string(&json).ok();
355 }
356 if let Err(e) = tokio::fs::write(&meta_path, &meta_bytes).await {
357 let json = serde_json::json!({"error": format!("write meta failed: {e}")});
358 return serde_json::to_string(&json).ok();
359 }
360
361 info!(
362 "BACKUP completed: dir={}, keys={}, bytes={}, kind={:?}",
363 dir, total_keys, total_bytes, kind
364 );
365
366 let kind_str = match kind {
367 BackupKind::Full => "full",
368 BackupKind::Incremental => "incremental",
369 };
370 let json = serde_json::json!({
371 "status": "ok",
372 "path": dir,
373 "key_count": total_keys,
374 "byte_count": total_bytes,
375 "kind": kind_str,
376 });
377 serde_json::to_string(&json).ok()
378 }
379
380 "RESTORE" => {
382 let dir = match args.first.as_deref() {
383 Some(d) if !d.is_empty() => d.to_owned(),
384 _ => {
385 let json = serde_json::json!({"error": "missing restore directory"});
386 return serde_json::to_string(&json).ok();
387 }
388 };
389
390 let meta_path = format!("{dir}/meta.bin");
391 let manifest_path = format!("{dir}/manifest.bin");
392
393 let meta_bytes = match tokio::fs::read(&meta_path).await {
394 Ok(b) => b,
395 Err(e) => {
396 let json = serde_json::json!({"error": format!("read meta.bin failed: {e}")});
397 return serde_json::to_string(&json).ok();
398 }
399 };
400 let manifest_bytes = match tokio::fs::read(&manifest_path).await {
401 Ok(b) => b,
402 Err(e) => {
403 let json =
404 serde_json::json!({"error": format!("read manifest.bin failed: {e}")});
405 return serde_json::to_string(&json).ok();
406 }
407 };
408
409 let meta: BackupMeta = match oxicode::serde::decode_serde(&meta_bytes) {
410 Ok(m) => m,
411 Err(e) => {
412 let json = serde_json::json!({"error": format!("decode meta failed: {e}")});
413 return serde_json::to_string(&json).ok();
414 }
415 };
416
417 if meta.schema_version != 1 {
418 let json = serde_json::json!({
419 "error": format!(
420 "unsupported schema_version {} (expected 1)",
421 meta.schema_version
422 )
423 });
424 return serde_json::to_string(&json).ok();
425 }
426
427 let manifest: Vec<(Vec<u8>, Vec<u8>)> =
428 match oxicode::serde::decode_serde(&manifest_bytes) {
429 Ok(m) => m,
430 Err(e) => {
431 let json =
432 serde_json::json!({"error": format!("decode manifest failed: {e}")});
433 return serde_json::to_string(&json).ok();
434 }
435 };
436
437 let mut restored: usize = 0;
438 for (key_bytes, value_bytes) in manifest {
439 let key = Key::from_slice(&key_bytes);
440 let blob = CipherBlob::new(value_bytes);
441 match storage.put(&key, &blob).await {
442 Ok(()) => restored += 1,
443 Err(e) => warn!("RESTORE: put failed for key {:?}: {}", key, e),
444 }
445 }
446
447 info!("RESTORE completed: dir={}, restored={}", dir, restored);
448
449 let json = serde_json::json!({
450 "status": "ok",
451 "restored": restored,
452 "schema_version": 1,
453 });
454 serde_json::to_string(&json).ok()
455 }
456
457 _ => None,
459 }
460}
461
462pub fn push_log_entry(recent_log: &Arc<RwLock<VecDeque<LogEntry>>>, message: String) {
467 let entry = LogEntry {
468 message,
469 timestamp: SystemTime::now(),
470 };
471 if let Some(mut guard) = recent_log.try_write() {
472 if guard.len() >= LOG_RING_CAPACITY {
473 guard.pop_front();
474 }
475 guard.push_back(entry);
476 }
477 }
480
481#[cfg(test)]
484mod admin_tests {
485 use super::*;
486 use amaters_core::storage::MemoryStorage;
487 use std::sync::Arc;
488
489 fn make_store() -> Arc<MemoryStorage> {
491 Arc::new(MemoryStorage::new())
492 }
493
494 fn make_log() -> Arc<RwLock<VecDeque<LogEntry>>> {
495 Arc::new(RwLock::new(VecDeque::new()))
496 }
497
498 async fn run_cmd<S: StorageEngine>(
499 cmd: &str,
500 storage: &Arc<S>,
501 log: &Arc<RwLock<VecDeque<LogEntry>>>,
502 ) -> Option<String> {
503 handle_admin_command(cmd, 0, log, storage).await
504 }
505
506 #[tokio::test]
509 async fn test_admin_metrics_returns_real_data() {
510 let storage = make_store();
511 let log = make_log();
512
513 for i in 0u8..2 {
515 let k = Key::from_str(&format!("k{}", i));
516 let v = CipherBlob::new(vec![i; 4]);
517 storage.put(&k, &v).await.expect("put failed");
518 }
519
520 let json_str = run_cmd("METRICS", &storage, &log)
521 .await
522 .expect("METRICS returned None");
523 let v: serde_json::Value = serde_json::from_str(&json_str).expect("invalid JSON");
524
525 assert_eq!(v["key_count"], 2, "key_count should be 2");
526 assert!(v["storage_type"].is_string());
527 assert!(v["uptime_seconds"].is_number());
528 }
529
530 #[tokio::test]
533 async fn test_admin_stats_returns_byte_accurate_size_under_threshold() {
534 let storage = make_store();
535 let log = make_log();
536
537 for i in 0u8..3 {
539 let k = Key::from_str(&format!("key_{}", i));
540 let v = CipherBlob::new(vec![i; 10]); storage.put(&k, &v).await.expect("put failed");
542 }
543
544 let json_str = run_cmd("STATS", &storage, &log)
545 .await
546 .expect("STATS returned None");
547 let v: serde_json::Value = serde_json::from_str(&json_str).expect("invalid JSON");
548
549 assert_eq!(v["key_count"], 3u64, "key_count should be 3");
550 assert_eq!(v["total_bytes"], 30u64, "total_bytes should be 30");
551 assert_eq!(v["truncated"], false, "truncated should be false");
552 }
553
554 #[tokio::test]
557 async fn test_admin_stats_returns_truncated_flag_over_threshold() {
558 let storage = make_store();
561 for i in 0u8..3 {
562 let k = Key::from_str(&format!("t_{}", i));
563 let v = CipherBlob::new(vec![1u8; 5]);
564 storage.put(&k, &v).await.expect("put failed");
565 }
566
567 let (key_count, total_bytes, truncated) = compute_stats(&storage, 2).await;
568 assert_eq!(key_count, 2, "should scan only 2 keys");
569 assert_eq!(total_bytes, 10, "2 keys × 5 bytes = 10");
570 assert!(truncated, "truncated should be true when limit exceeded");
571 }
572
573 #[tokio::test]
576 async fn test_admin_backup_creates_manifest() {
577 let storage = make_store();
578 let log = make_log();
579
580 let k = Key::from_str("bk_key");
581 let v = CipherBlob::new(b"hello".to_vec());
582 storage.put(&k, &v).await.expect("put failed");
583
584 let dir = std::env::temp_dir().join(format!(
585 "amaters_test_backup_{}",
586 std::time::SystemTime::now()
587 .duration_since(SystemTime::UNIX_EPOCH)
588 .map(|d| d.as_nanos())
589 .unwrap_or(0)
590 ));
591 let dir_str = dir.to_string_lossy().to_string();
592
593 let json_str = run_cmd(&format!("BACKUP {dir_str} full"), &storage, &log)
594 .await
595 .expect("BACKUP returned None");
596 let v: serde_json::Value = serde_json::from_str(&json_str).expect("invalid JSON");
597 assert_eq!(v["status"], "ok", "status should be ok");
598 assert_eq!(v["key_count"], 1u64);
599
600 assert!(
602 std::path::Path::new(&format!("{dir_str}/manifest.bin")).exists(),
603 "manifest.bin should exist"
604 );
605 assert!(
606 std::path::Path::new(&format!("{dir_str}/meta.bin")).exists(),
607 "meta.bin should exist"
608 );
609
610 let _ = tokio::fs::remove_dir_all(&dir_str).await;
612 }
613
614 #[tokio::test]
617 async fn test_admin_backup_incremental_flag_recorded() {
618 let storage = make_store();
619 let log = make_log();
620
621 let k = Key::from_str("inc_key");
622 let v = CipherBlob::new(vec![42u8; 3]);
623 storage.put(&k, &v).await.expect("put failed");
624
625 let dir = std::env::temp_dir().join(format!(
626 "amaters_test_inc_{}",
627 std::time::SystemTime::now()
628 .duration_since(SystemTime::UNIX_EPOCH)
629 .map(|d| d.as_nanos())
630 .unwrap_or(0)
631 ));
632 let dir_str = dir.to_string_lossy().to_string();
633
634 let json_str = run_cmd(&format!("BACKUP {dir_str} incremental"), &storage, &log)
635 .await
636 .expect("BACKUP incremental returned None");
637
638 let resp: serde_json::Value = serde_json::from_str(&json_str).expect("invalid JSON");
639 assert_eq!(resp["kind"], "incremental");
640
641 let meta_bytes = tokio::fs::read(format!("{dir_str}/meta.bin"))
643 .await
644 .expect("meta.bin not found");
645 let meta: BackupMeta =
646 oxicode::serde::decode_serde(&meta_bytes).expect("decode meta failed");
647 assert_eq!(meta.kind, BackupKind::Incremental);
648
649 let _ = tokio::fs::remove_dir_all(&dir_str).await;
650 }
651
652 #[tokio::test]
655 async fn test_admin_restore_replays_keys() {
656 let source = make_store();
657 let log = make_log();
658
659 let k1 = Key::from_str("restore_a");
661 let k2 = Key::from_str("restore_b");
662 source
663 .put(&k1, &CipherBlob::new(b"alpha".to_vec()))
664 .await
665 .expect("put failed");
666 source
667 .put(&k2, &CipherBlob::new(b"beta".to_vec()))
668 .await
669 .expect("put failed");
670
671 let dir = std::env::temp_dir().join(format!(
672 "amaters_test_restore_{}",
673 std::time::SystemTime::now()
674 .duration_since(SystemTime::UNIX_EPOCH)
675 .map(|d| d.as_nanos())
676 .unwrap_or(0)
677 ));
678 let dir_str = dir.to_string_lossy().to_string();
679
680 run_cmd(&format!("BACKUP {dir_str} full"), &source, &log)
682 .await
683 .expect("BACKUP returned None");
684
685 let target = make_store();
687 let json_str = run_cmd(&format!("RESTORE {dir_str}"), &target, &log)
688 .await
689 .expect("RESTORE returned None");
690 let resp: serde_json::Value = serde_json::from_str(&json_str).expect("invalid JSON");
691 assert_eq!(resp["status"], "ok");
692 assert_eq!(resp["restored"], 2u64);
693
694 let got_a = target.get(&k1).await.expect("get failed");
696 assert_eq!(
697 got_a.as_ref().map(|b| b.as_bytes()),
698 Some(b"alpha".as_ref())
699 );
700 let got_b = target.get(&k2).await.expect("get failed");
701 assert_eq!(got_b.as_ref().map(|b| b.as_bytes()), Some(b"beta".as_ref()));
702
703 let _ = tokio::fs::remove_dir_all(&dir_str).await;
704 }
705
706 #[tokio::test]
709 async fn test_admin_logs_default_lines() {
710 let storage = make_store();
711 let log = make_log();
712
713 for i in 0..5u32 {
715 push_log_entry(&log, format!("entry {}", i));
716 }
717
718 let json_str = run_cmd("LOGS", &storage, &log)
719 .await
720 .expect("LOGS returned None");
721 let resp: serde_json::Value = serde_json::from_str(&json_str).expect("invalid JSON");
722 assert!(resp["lines"].is_array());
723 assert_eq!(
725 resp["lines"].as_array().map(|a| a.len()).unwrap_or(0),
726 5,
727 "should return all 5 available entries"
728 );
729 assert_eq!(resp["follow_supported"], false);
730 }
731
732 #[test]
735 fn test_admin_args_parser_handles_missing() {
736 let a = parse_admin_args("");
737 assert!(a.first.is_none(), "first should be None for empty input");
738 assert!(a.second.is_none(), "second should be None for empty input");
739
740 let b = parse_admin_args("only_one");
741 assert_eq!(b.first.as_deref(), Some("only_one"));
742 assert!(b.second.is_none());
743
744 let c = parse_admin_args("a b extra_ignored");
745 assert_eq!(c.first.as_deref(), Some("a"));
746 assert_eq!(c.second.as_deref(), Some("b"));
747 }
748
749 #[test]
752 fn test_recent_log_ring_buffer_bounded_at_256() {
753 let log = make_log();
754
755 for i in 0..256u32 {
756 push_log_entry(&log, format!("msg {}", i));
757 }
758
759 let guard = log.read();
760 assert_eq!(
761 guard.len(),
762 256,
763 "ring buffer should hold exactly 256 entries"
764 );
765 }
766
767 #[test]
770 fn test_recent_log_drop_oldest_on_overflow() {
771 let log = make_log();
772
773 for i in 0..=256u32 {
775 push_log_entry(&log, format!("msg {}", i));
776 }
777
778 let guard = log.read();
779 assert_eq!(guard.len(), 256, "capacity should not exceed 256");
780
781 let first = guard.front().expect("ring buffer must not be empty");
783 assert_ne!(first.message, "msg 0", "oldest entry should be dropped");
784 assert_eq!(
785 first.message, "msg 1",
786 "second entry should now be the oldest"
787 );
788 }
789}