Skip to main content

amaters_net/
server_admin.rs

1//! Admin command handler for AmateRS network layer.
2//!
3//! This module provides the admin command infrastructure that backs the
4//! `__admin__:<CMD>` key-intercept protocol in the gRPC server.  Admin
5//! commands arrive as ordinary GET queries with a specially-prefixed key, which
6//! lets the CLI reach server-side admin functionality without a dedicated RPC
7//! method.
8//!
9//! # Commands
10//!
11//! | Command | Args | Description |
12//! |---------|------|-------------|
13//! | METRICS | — | Key count and uptime JSON |
14//! | CLUSTER_INFO | — | Standalone cluster descriptor |
15//! | NODES | — | Self-only node list |
16//! | STATS | — | Byte-accurate size scan (capped at 100 000 keys) |
17//! | VERIFY | — | Integrity check (always reports 0 corruption for MemoryStorage) |
18//! | COMPACT | `[<collection>]` | Flush storage and return status |
19//! | LOGS | `<lines=20> <follow=false>` | Return ring-buffered log entries |
20//! | BACKUP | `<dir> <full\|incremental>` | Serialize all keys to `<dir>/` |
21//! | RESTORE | `<dir>` | Replay keys from a previous backup |
22
23use std::collections::VecDeque;
24use std::sync::Arc;
25use std::time::SystemTime;
26
27use parking_lot::RwLock;
28use serde::{Deserialize, Serialize};
29use tracing::{error, info, warn};
30
31use amaters_core::traits::StorageEngine;
32use amaters_core::types::{CipherBlob, Key};
33
34// ─── Constants ────────────────────────────────────────────────────────────────
35
36/// Maximum number of keys scanned for STATS and VERIFY before setting the
37/// `"truncated"` flag to avoid excessive latency.
38pub(crate) const STATS_KEY_LIMIT: usize = 100_000;
39
40/// Capacity of the recent-log ring buffer.
41pub const LOG_RING_CAPACITY: usize = 256;
42
43// ─── BackupKind ───────────────────────────────────────────────────────────────
44
45/// Whether a backup should capture the full dataset or only incremental changes.
46///
47/// At the MVP tier there is no real incremental logic; the flag is recorded in
48/// the backup manifest so future tooling can act on it.
49#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
50pub enum BackupKind {
51    /// Capture the complete current dataset.
52    Full,
53    /// Mark as incremental (behaviour identical to `Full` for now).
54    Incremental,
55}
56
57// ─── BackupMeta ───────────────────────────────────────────────────────────────
58
59/// Metadata written to `<dir>/meta.bin` alongside a backup manifest.
60#[derive(Debug, Clone, Serialize, Deserialize)]
61pub struct BackupMeta {
62    /// Bump this whenever the manifest format changes.
63    pub schema_version: u32,
64    /// Number of key-value pairs in the manifest.
65    pub total_keys: usize,
66    /// Total byte count of all values in the manifest.
67    pub total_bytes: u64,
68    /// Whether this backup is full or incremental.
69    pub kind: BackupKind,
70}
71
72// ─── LogEntry ─────────────────────────────────────────────────────────────────
73
74/// A single entry in the recent-log ring buffer.
75#[derive(Debug, Clone)]
76pub struct LogEntry {
77    /// Human-readable log message (method name + elapsed, or error description).
78    pub message: String,
79    /// Wall-clock timestamp of the entry.
80    pub timestamp: SystemTime,
81}
82
83// ─── AdminArgs ────────────────────────────────────────────────────────────────
84
85/// Parsed arguments for an admin command.
86#[derive(Debug, Clone)]
87pub struct AdminArgs {
88    /// First positional argument (e.g. directory path for BACKUP/RESTORE,
89    /// line count for LOGS).
90    pub first: Option<String>,
91    /// Second positional argument (e.g. "full"/"incremental" for BACKUP,
92    /// "true"/"false" for LOGS follow flag).
93    pub second: Option<String>,
94}
95
96/// Parse the argument string that follows an admin command name.
97///
98/// Splits on ASCII whitespace and extracts the first two tokens.  Missing
99/// tokens are represented as `None` — callers apply defaults.
100pub fn parse_admin_args(args: &str) -> AdminArgs {
101    let mut tokens = args.split_ascii_whitespace();
102    AdminArgs {
103        first: tokens.next().map(str::to_owned),
104        second: tokens.next().map(str::to_owned),
105    }
106}
107
108// ─── Stats helper ─────────────────────────────────────────────────────────────
109
110/// Compute `(key_count, total_bytes, truncated)` by scanning up to `limit` keys.
111///
112/// Splitting this out makes it injectable for unit tests without spinning up a
113/// full gRPC server.
114pub(crate) async fn compute_stats<S: StorageEngine>(
115    storage: &Arc<S>,
116    limit: usize,
117) -> (u64, u64, bool) {
118    let keys = match storage.keys().await {
119        Ok(k) => k,
120        Err(e) => {
121            error!("STATS: failed to list keys: {}", e);
122            return (0, 0, false);
123        }
124    };
125
126    let total_keys = keys.len();
127    let truncated = total_keys > limit;
128    let scan_keys = if truncated { &keys[..limit] } else { &keys };
129
130    let mut total_bytes: u64 = 0;
131    for key in scan_keys {
132        match storage.get(key).await {
133            Ok(Some(blob)) => total_bytes += blob.as_bytes().len() as u64,
134            Ok(None) => {}
135            Err(e) => warn!("STATS: get failed for key {:?}: {}", key, e),
136        }
137    }
138
139    (scan_keys.len() as u64, total_bytes, truncated)
140}
141
142// ─── handle_admin_command ─────────────────────────────────────────────────────
143
144/// Execute an admin command and return a JSON string, or `None` for unknown
145/// commands.
146///
147/// The `cmd` parameter is everything *after* the `__admin__:` prefix.
148/// Commands may carry space-separated arguments, e.g. `"LOGS 50 false"`.
149///
150/// # Arguments
151/// * `cmd` - Full command string including arguments.
152/// * `uptime_secs` - Server uptime at call time (seconds).
153/// * `recent_log` - The server's recent-log ring buffer.
154/// * `storage` - Reference to the storage engine.
155pub async fn handle_admin_command<S: StorageEngine>(
156    cmd: &str,
157    uptime_secs: u64,
158    recent_log: &Arc<RwLock<VecDeque<LogEntry>>>,
159    storage: &Arc<S>,
160) -> Option<String> {
161    // Split command name from arguments.
162    let mut parts = cmd.splitn(2, ' ');
163    let op = parts.next().unwrap_or("").trim().to_uppercase();
164    let args_str = parts.next().unwrap_or("").trim();
165    let args = parse_admin_args(args_str);
166
167    match op.as_str() {
168        // ── METRICS ──────────────────────────────────────────────────────────
169        "METRICS" => {
170            let key_count = storage.keys().await.map(|k| k.len() as u64).unwrap_or(0);
171            let json = serde_json::json!({
172                "key_count": key_count,
173                "storage_type": "memory",
174                "uptime_seconds": uptime_secs,
175            });
176            serde_json::to_string(&json).ok()
177        }
178
179        // ── CLUSTER_INFO ─────────────────────────────────────────────────────
180        "CLUSTER_INFO" => {
181            let json = serde_json::json!({
182                "mode": "standalone",
183                "version": env!("CARGO_PKG_VERSION"),
184                "nodes": 1u32,
185            });
186            serde_json::to_string(&json).ok()
187        }
188
189        // ── NODES ─────────────────────────────────────────────────────────────
190        "NODES" => {
191            let json = serde_json::json!({
192                "nodes": [{
193                    "id": "self",
194                    "addr": "0.0.0.0:50051",
195                    "role": "leader",
196                    "status": "healthy",
197                }]
198            });
199            serde_json::to_string(&json).ok()
200        }
201
202        // ── STATS ─────────────────────────────────────────────────────────────
203        "STATS" => {
204            let (key_count, total_bytes, truncated) = compute_stats(storage, STATS_KEY_LIMIT).await;
205            let json = serde_json::json!({
206                "key_count": key_count,
207                "total_bytes": total_bytes,
208                "truncated": truncated,
209            });
210            serde_json::to_string(&json).ok()
211        }
212
213        // ── VERIFY ───────────────────────────────────────────────────────────
214        "VERIFY" => {
215            let (checked, _, _) = compute_stats(storage, STATS_KEY_LIMIT).await;
216            let json = serde_json::json!({
217                "corrupted_keys": 0u64,
218                "checked": checked,
219                "ok": true,
220            });
221            serde_json::to_string(&json).ok()
222        }
223
224        // ── COMPACT ───────────────────────────────────────────────────────────
225        "COMPACT" => {
226            let flushed = storage.flush().await.is_ok();
227            let collection: serde_json::Value = args
228                .first
229                .map(serde_json::Value::String)
230                .unwrap_or(serde_json::Value::Null);
231            let json = serde_json::json!({
232                "status": "ok",
233                "collection": collection,
234                "flushed": flushed,
235            });
236            serde_json::to_string(&json).ok()
237        }
238
239        // ── LOGS ──────────────────────────────────────────────────────────────
240        "LOGS" => {
241            let lines: usize = args
242                .first
243                .as_deref()
244                .and_then(|s| s.parse().ok())
245                .unwrap_or(20);
246            // follow flag is acknowledged but not implemented (MVP).
247            let _follow: bool = args
248                .second
249                .as_deref()
250                .map(|s| s.eq_ignore_ascii_case("true"))
251                .unwrap_or(false);
252
253            let entries: Vec<serde_json::Value> = {
254                let guard = recent_log.read();
255                guard
256                    .iter()
257                    .rev()
258                    .take(lines)
259                    .map(|e| {
260                        let ts = e
261                            .timestamp
262                            .duration_since(SystemTime::UNIX_EPOCH)
263                            .map(|d| d.as_secs())
264                            .unwrap_or(0);
265                        serde_json::json!({
266                            "message": e.message,
267                            "timestamp": ts,
268                        })
269                    })
270                    .collect()
271            };
272
273            let json = serde_json::json!({
274                "lines": entries,
275                "follow_supported": false,
276            });
277            serde_json::to_string(&json).ok()
278        }
279
280        // ── BACKUP ────────────────────────────────────────────────────────────
281        "BACKUP" => {
282            let dir = match args.first.as_deref() {
283                Some(d) if !d.is_empty() => d.to_owned(),
284                _ => {
285                    let json = serde_json::json!({"error": "missing backup directory"});
286                    return serde_json::to_string(&json).ok();
287                }
288            };
289            let kind = match args.second.as_deref().map(str::to_lowercase).as_deref() {
290                Some("incremental") => BackupKind::Incremental,
291                _ => BackupKind::Full,
292            };
293
294            // Create the backup directory.
295            if let Err(e) = tokio::fs::create_dir_all(&dir).await {
296                let json = serde_json::json!({"error": format!("create_dir_all failed: {e}")});
297                return serde_json::to_string(&json).ok();
298            }
299
300            // Collect all key-value pairs.
301            let keys = match storage.keys().await {
302                Ok(k) => k,
303                Err(e) => {
304                    let json = serde_json::json!({"error": format!("keys() failed: {e}")});
305                    return serde_json::to_string(&json).ok();
306                }
307            };
308
309            let mut manifest: Vec<(Vec<u8>, Vec<u8>)> = Vec::with_capacity(keys.len());
310            let mut total_bytes: u64 = 0;
311            for key in &keys {
312                match storage.get(key).await {
313                    Ok(Some(blob)) => {
314                        total_bytes += blob.as_bytes().len() as u64;
315                        manifest.push((key.as_bytes().to_vec(), blob.as_bytes().to_vec()));
316                    }
317                    Ok(None) => {}
318                    Err(e) => warn!("BACKUP: get failed for key {:?}: {}", key, e),
319                }
320            }
321
322            let total_keys = manifest.len();
323
324            // Serialize manifest.
325            let manifest_bytes = match oxicode::serde::encode_serde(&manifest) {
326                Ok(b) => b,
327                Err(e) => {
328                    let json = serde_json::json!({"error": format!("manifest encode failed: {e}")});
329                    return serde_json::to_string(&json).ok();
330                }
331            };
332
333            // Serialize metadata.
334            let meta = BackupMeta {
335                schema_version: 1,
336                total_keys,
337                total_bytes,
338                kind: kind.clone(),
339            };
340            let meta_bytes = match oxicode::serde::encode_serde(&meta) {
341                Ok(b) => b,
342                Err(e) => {
343                    let json = serde_json::json!({"error": format!("meta encode failed: {e}")});
344                    return serde_json::to_string(&json).ok();
345                }
346            };
347
348            // Write files.
349            let manifest_path = format!("{dir}/manifest.bin");
350            let meta_path = format!("{dir}/meta.bin");
351
352            if let Err(e) = tokio::fs::write(&manifest_path, &manifest_bytes).await {
353                let json = serde_json::json!({"error": format!("write manifest failed: {e}")});
354                return serde_json::to_string(&json).ok();
355            }
356            if let Err(e) = tokio::fs::write(&meta_path, &meta_bytes).await {
357                let json = serde_json::json!({"error": format!("write meta failed: {e}")});
358                return serde_json::to_string(&json).ok();
359            }
360
361            info!(
362                "BACKUP completed: dir={}, keys={}, bytes={}, kind={:?}",
363                dir, total_keys, total_bytes, kind
364            );
365
366            let kind_str = match kind {
367                BackupKind::Full => "full",
368                BackupKind::Incremental => "incremental",
369            };
370            let json = serde_json::json!({
371                "status": "ok",
372                "path": dir,
373                "key_count": total_keys,
374                "byte_count": total_bytes,
375                "kind": kind_str,
376            });
377            serde_json::to_string(&json).ok()
378        }
379
380        // ── RESTORE ───────────────────────────────────────────────────────────
381        "RESTORE" => {
382            let dir = match args.first.as_deref() {
383                Some(d) if !d.is_empty() => d.to_owned(),
384                _ => {
385                    let json = serde_json::json!({"error": "missing restore directory"});
386                    return serde_json::to_string(&json).ok();
387                }
388            };
389
390            let meta_path = format!("{dir}/meta.bin");
391            let manifest_path = format!("{dir}/manifest.bin");
392
393            let meta_bytes = match tokio::fs::read(&meta_path).await {
394                Ok(b) => b,
395                Err(e) => {
396                    let json = serde_json::json!({"error": format!("read meta.bin failed: {e}")});
397                    return serde_json::to_string(&json).ok();
398                }
399            };
400            let manifest_bytes = match tokio::fs::read(&manifest_path).await {
401                Ok(b) => b,
402                Err(e) => {
403                    let json =
404                        serde_json::json!({"error": format!("read manifest.bin failed: {e}")});
405                    return serde_json::to_string(&json).ok();
406                }
407            };
408
409            let meta: BackupMeta = match oxicode::serde::decode_serde(&meta_bytes) {
410                Ok(m) => m,
411                Err(e) => {
412                    let json = serde_json::json!({"error": format!("decode meta failed: {e}")});
413                    return serde_json::to_string(&json).ok();
414                }
415            };
416
417            if meta.schema_version != 1 {
418                let json = serde_json::json!({
419                    "error": format!(
420                        "unsupported schema_version {} (expected 1)",
421                        meta.schema_version
422                    )
423                });
424                return serde_json::to_string(&json).ok();
425            }
426
427            let manifest: Vec<(Vec<u8>, Vec<u8>)> =
428                match oxicode::serde::decode_serde(&manifest_bytes) {
429                    Ok(m) => m,
430                    Err(e) => {
431                        let json =
432                            serde_json::json!({"error": format!("decode manifest failed: {e}")});
433                        return serde_json::to_string(&json).ok();
434                    }
435                };
436
437            let mut restored: usize = 0;
438            for (key_bytes, value_bytes) in manifest {
439                let key = Key::from_slice(&key_bytes);
440                let blob = CipherBlob::new(value_bytes);
441                match storage.put(&key, &blob).await {
442                    Ok(()) => restored += 1,
443                    Err(e) => warn!("RESTORE: put failed for key {:?}: {}", key, e),
444                }
445            }
446
447            info!("RESTORE completed: dir={}, restored={}", dir, restored);
448
449            let json = serde_json::json!({
450                "status": "ok",
451                "restored": restored,
452                "schema_version": 1,
453            });
454            serde_json::to_string(&json).ok()
455        }
456
457        // ── Unknown ───────────────────────────────────────────────────────────
458        _ => None,
459    }
460}
461
462/// Push a log entry to the ring buffer, enforcing the 256-entry capacity bound.
463///
464/// Uses `try_write()` with a silent drop on contention to avoid deadlocks
465/// during error-handling paths that may already hold the lock.
466pub fn push_log_entry(recent_log: &Arc<RwLock<VecDeque<LogEntry>>>, message: String) {
467    let entry = LogEntry {
468        message,
469        timestamp: SystemTime::now(),
470    };
471    if let Some(mut guard) = recent_log.try_write() {
472        if guard.len() >= LOG_RING_CAPACITY {
473            guard.pop_front();
474        }
475        guard.push_back(entry);
476    }
477    // If try_write() fails (lock held by reader/writer), we silently drop the
478    // entry rather than block or deadlock.
479}
480
481// ─── Tests ────────────────────────────────────────────────────────────────────
482
483#[cfg(test)]
484mod admin_tests {
485    use super::*;
486    use amaters_core::storage::MemoryStorage;
487    use std::sync::Arc;
488
489    // Helper: fresh storage + recent_log
490    fn make_store() -> Arc<MemoryStorage> {
491        Arc::new(MemoryStorage::new())
492    }
493
494    fn make_log() -> Arc<RwLock<VecDeque<LogEntry>>> {
495        Arc::new(RwLock::new(VecDeque::new()))
496    }
497
498    async fn run_cmd<S: StorageEngine>(
499        cmd: &str,
500        storage: &Arc<S>,
501        log: &Arc<RwLock<VecDeque<LogEntry>>>,
502    ) -> Option<String> {
503        handle_admin_command(cmd, 0, log, storage).await
504    }
505
506    // ── test_admin_metrics_returns_real_data ──────────────────────────────────
507
508    #[tokio::test]
509    async fn test_admin_metrics_returns_real_data() {
510        let storage = make_store();
511        let log = make_log();
512
513        // Insert two keys.
514        for i in 0u8..2 {
515            let k = Key::from_str(&format!("k{}", i));
516            let v = CipherBlob::new(vec![i; 4]);
517            storage.put(&k, &v).await.expect("put failed");
518        }
519
520        let json_str = run_cmd("METRICS", &storage, &log)
521            .await
522            .expect("METRICS returned None");
523        let v: serde_json::Value = serde_json::from_str(&json_str).expect("invalid JSON");
524
525        assert_eq!(v["key_count"], 2, "key_count should be 2");
526        assert!(v["storage_type"].is_string());
527        assert!(v["uptime_seconds"].is_number());
528    }
529
530    // ── test_admin_stats_returns_byte_accurate_size_under_threshold ──────────
531
532    #[tokio::test]
533    async fn test_admin_stats_returns_byte_accurate_size_under_threshold() {
534        let storage = make_store();
535        let log = make_log();
536
537        // Insert 3 keys with known byte sizes.
538        for i in 0u8..3 {
539            let k = Key::from_str(&format!("key_{}", i));
540            let v = CipherBlob::new(vec![i; 10]); // 10 bytes each → 30 total
541            storage.put(&k, &v).await.expect("put failed");
542        }
543
544        let json_str = run_cmd("STATS", &storage, &log)
545            .await
546            .expect("STATS returned None");
547        let v: serde_json::Value = serde_json::from_str(&json_str).expect("invalid JSON");
548
549        assert_eq!(v["key_count"], 3u64, "key_count should be 3");
550        assert_eq!(v["total_bytes"], 30u64, "total_bytes should be 30");
551        assert_eq!(v["truncated"], false, "truncated should be false");
552    }
553
554    // ── test_admin_stats_returns_truncated_flag_over_threshold ───────────────
555
556    #[tokio::test]
557    async fn test_admin_stats_returns_truncated_flag_over_threshold() {
558        // Use limit=2 with 3 keys — exercises the cap logic without inserting
559        // 100 000 keys.
560        let storage = make_store();
561        for i in 0u8..3 {
562            let k = Key::from_str(&format!("t_{}", i));
563            let v = CipherBlob::new(vec![1u8; 5]);
564            storage.put(&k, &v).await.expect("put failed");
565        }
566
567        let (key_count, total_bytes, truncated) = compute_stats(&storage, 2).await;
568        assert_eq!(key_count, 2, "should scan only 2 keys");
569        assert_eq!(total_bytes, 10, "2 keys × 5 bytes = 10");
570        assert!(truncated, "truncated should be true when limit exceeded");
571    }
572
573    // ── test_admin_backup_creates_manifest ────────────────────────────────────
574
575    #[tokio::test]
576    async fn test_admin_backup_creates_manifest() {
577        let storage = make_store();
578        let log = make_log();
579
580        let k = Key::from_str("bk_key");
581        let v = CipherBlob::new(b"hello".to_vec());
582        storage.put(&k, &v).await.expect("put failed");
583
584        let dir = std::env::temp_dir().join(format!(
585            "amaters_test_backup_{}",
586            std::time::SystemTime::now()
587                .duration_since(SystemTime::UNIX_EPOCH)
588                .map(|d| d.as_nanos())
589                .unwrap_or(0)
590        ));
591        let dir_str = dir.to_string_lossy().to_string();
592
593        let json_str = run_cmd(&format!("BACKUP {dir_str} full"), &storage, &log)
594            .await
595            .expect("BACKUP returned None");
596        let v: serde_json::Value = serde_json::from_str(&json_str).expect("invalid JSON");
597        assert_eq!(v["status"], "ok", "status should be ok");
598        assert_eq!(v["key_count"], 1u64);
599
600        // Verify files exist.
601        assert!(
602            std::path::Path::new(&format!("{dir_str}/manifest.bin")).exists(),
603            "manifest.bin should exist"
604        );
605        assert!(
606            std::path::Path::new(&format!("{dir_str}/meta.bin")).exists(),
607            "meta.bin should exist"
608        );
609
610        // Cleanup.
611        let _ = tokio::fs::remove_dir_all(&dir_str).await;
612    }
613
614    // ── test_admin_backup_incremental_flag_recorded ───────────────────────────
615
616    #[tokio::test]
617    async fn test_admin_backup_incremental_flag_recorded() {
618        let storage = make_store();
619        let log = make_log();
620
621        let k = Key::from_str("inc_key");
622        let v = CipherBlob::new(vec![42u8; 3]);
623        storage.put(&k, &v).await.expect("put failed");
624
625        let dir = std::env::temp_dir().join(format!(
626            "amaters_test_inc_{}",
627            std::time::SystemTime::now()
628                .duration_since(SystemTime::UNIX_EPOCH)
629                .map(|d| d.as_nanos())
630                .unwrap_or(0)
631        ));
632        let dir_str = dir.to_string_lossy().to_string();
633
634        let json_str = run_cmd(&format!("BACKUP {dir_str} incremental"), &storage, &log)
635            .await
636            .expect("BACKUP incremental returned None");
637
638        let resp: serde_json::Value = serde_json::from_str(&json_str).expect("invalid JSON");
639        assert_eq!(resp["kind"], "incremental");
640
641        // Read meta.bin and verify BackupKind.
642        let meta_bytes = tokio::fs::read(format!("{dir_str}/meta.bin"))
643            .await
644            .expect("meta.bin not found");
645        let meta: BackupMeta =
646            oxicode::serde::decode_serde(&meta_bytes).expect("decode meta failed");
647        assert_eq!(meta.kind, BackupKind::Incremental);
648
649        let _ = tokio::fs::remove_dir_all(&dir_str).await;
650    }
651
652    // ── test_admin_restore_replays_keys ───────────────────────────────────────
653
654    #[tokio::test]
655    async fn test_admin_restore_replays_keys() {
656        let source = make_store();
657        let log = make_log();
658
659        // Insert two keys into source.
660        let k1 = Key::from_str("restore_a");
661        let k2 = Key::from_str("restore_b");
662        source
663            .put(&k1, &CipherBlob::new(b"alpha".to_vec()))
664            .await
665            .expect("put failed");
666        source
667            .put(&k2, &CipherBlob::new(b"beta".to_vec()))
668            .await
669            .expect("put failed");
670
671        let dir = std::env::temp_dir().join(format!(
672            "amaters_test_restore_{}",
673            std::time::SystemTime::now()
674                .duration_since(SystemTime::UNIX_EPOCH)
675                .map(|d| d.as_nanos())
676                .unwrap_or(0)
677        ));
678        let dir_str = dir.to_string_lossy().to_string();
679
680        // Backup from source.
681        run_cmd(&format!("BACKUP {dir_str} full"), &source, &log)
682            .await
683            .expect("BACKUP returned None");
684
685        // Restore into a fresh store.
686        let target = make_store();
687        let json_str = run_cmd(&format!("RESTORE {dir_str}"), &target, &log)
688            .await
689            .expect("RESTORE returned None");
690        let resp: serde_json::Value = serde_json::from_str(&json_str).expect("invalid JSON");
691        assert_eq!(resp["status"], "ok");
692        assert_eq!(resp["restored"], 2u64);
693
694        // Verify both keys exist in target.
695        let got_a = target.get(&k1).await.expect("get failed");
696        assert_eq!(
697            got_a.as_ref().map(|b| b.as_bytes()),
698            Some(b"alpha".as_ref())
699        );
700        let got_b = target.get(&k2).await.expect("get failed");
701        assert_eq!(got_b.as_ref().map(|b| b.as_bytes()), Some(b"beta".as_ref()));
702
703        let _ = tokio::fs::remove_dir_all(&dir_str).await;
704    }
705
706    // ── test_admin_logs_default_lines ─────────────────────────────────────────
707
708    #[tokio::test]
709    async fn test_admin_logs_default_lines() {
710        let storage = make_store();
711        let log = make_log();
712
713        // Push 5 entries.
714        for i in 0..5u32 {
715            push_log_entry(&log, format!("entry {}", i));
716        }
717
718        let json_str = run_cmd("LOGS", &storage, &log)
719            .await
720            .expect("LOGS returned None");
721        let resp: serde_json::Value = serde_json::from_str(&json_str).expect("invalid JSON");
722        assert!(resp["lines"].is_array());
723        // Default is 20; we only have 5 entries.
724        assert_eq!(
725            resp["lines"].as_array().map(|a| a.len()).unwrap_or(0),
726            5,
727            "should return all 5 available entries"
728        );
729        assert_eq!(resp["follow_supported"], false);
730    }
731
732    // ── test_admin_args_parser_handles_missing ────────────────────────────────
733
734    #[test]
735    fn test_admin_args_parser_handles_missing() {
736        let a = parse_admin_args("");
737        assert!(a.first.is_none(), "first should be None for empty input");
738        assert!(a.second.is_none(), "second should be None for empty input");
739
740        let b = parse_admin_args("only_one");
741        assert_eq!(b.first.as_deref(), Some("only_one"));
742        assert!(b.second.is_none());
743
744        let c = parse_admin_args("a b extra_ignored");
745        assert_eq!(c.first.as_deref(), Some("a"));
746        assert_eq!(c.second.as_deref(), Some("b"));
747    }
748
749    // ── test_recent_log_ring_buffer_bounded_at_256 ────────────────────────────
750
751    #[test]
752    fn test_recent_log_ring_buffer_bounded_at_256() {
753        let log = make_log();
754
755        for i in 0..256u32 {
756            push_log_entry(&log, format!("msg {}", i));
757        }
758
759        let guard = log.read();
760        assert_eq!(
761            guard.len(),
762            256,
763            "ring buffer should hold exactly 256 entries"
764        );
765    }
766
767    // ── test_recent_log_drop_oldest_on_overflow ───────────────────────────────
768
769    #[test]
770    fn test_recent_log_drop_oldest_on_overflow() {
771        let log = make_log();
772
773        // Fill to capacity, then push one more.
774        for i in 0..=256u32 {
775            push_log_entry(&log, format!("msg {}", i));
776        }
777
778        let guard = log.read();
779        assert_eq!(guard.len(), 256, "capacity should not exceed 256");
780
781        // The oldest entry ("msg 0") should have been dropped.
782        let first = guard.front().expect("ring buffer must not be empty");
783        assert_ne!(first.message, "msg 0", "oldest entry should be dropped");
784        assert_eq!(
785            first.message, "msg 1",
786            "second entry should now be the oldest"
787        );
788    }
789}