Skip to main content

aegis_server/
admin.rs

1//! Aegis Admin API
2//!
3//! Administrative endpoints for the web dashboard and management operations.
4//!
5//! @version 0.1.0
6//! @author AutomataNexus Development Team
7
8use serde::{Deserialize, Serialize};
9use std::sync::atomic::{AtomicU64, Ordering};
10use std::sync::RwLock;
11use sysinfo::{Disks, System};
12
13// =============================================================================
14// Cluster Info
15// =============================================================================
16
17/// Information about the cluster.
18#[derive(Debug, Clone, Serialize, Deserialize)]
19pub struct ClusterInfo {
20    pub name: String,
21    pub version: String,
22    pub node_count: usize,
23    pub leader_id: Option<String>,
24    pub state: ClusterState,
25    pub uptime_seconds: u64,
26}
27
28/// State of the cluster.
29#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
30pub enum ClusterState {
31    Healthy,
32    Degraded,
33    Unavailable,
34    Initializing,
35}
36
37impl Default for ClusterInfo {
38    fn default() -> Self {
39        Self {
40            name: "aegis-cluster".to_string(),
41            version: env!("CARGO_PKG_VERSION").to_string(),
42            node_count: 0,
43            leader_id: None,
44            state: ClusterState::Initializing,
45            uptime_seconds: 0,
46        }
47    }
48}
49
50// =============================================================================
51// Node Info
52// =============================================================================
53
54/// Information about a cluster node.
55#[derive(Debug, Clone, Serialize, Deserialize)]
56pub struct NodeInfo {
57    pub id: String,
58    pub address: String,
59    pub role: NodeRole,
60    pub status: NodeStatus,
61    pub version: String,
62    pub uptime_seconds: u64,
63    pub last_heartbeat: u64,
64    pub metrics: NodeMetrics,
65}
66
67/// Role of a node.
68#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
69pub enum NodeRole {
70    Leader,
71    Follower,
72    Candidate,
73    Learner,
74}
75
76/// Status of a node.
77#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
78pub enum NodeStatus {
79    Online,
80    Offline,
81    Joining,
82    Leaving,
83    Unknown,
84}
85
86/// Metrics for a node.
87#[derive(Debug, Clone, Default, Serialize, Deserialize)]
88pub struct NodeMetrics {
89    pub cpu_usage_percent: f64,
90    pub memory_usage_bytes: u64,
91    pub memory_total_bytes: u64,
92    pub disk_usage_bytes: u64,
93    pub disk_total_bytes: u64,
94    pub connections_active: u64,
95    pub queries_per_second: f64,
96    // Network I/O metrics
97    pub network_bytes_in: u64,
98    pub network_bytes_out: u64,
99    pub network_packets_in: u64,
100    pub network_packets_out: u64,
101    // Latency histogram metrics
102    pub latency_p50_ms: f64,
103    pub latency_p90_ms: f64,
104    pub latency_p95_ms: f64,
105    pub latency_p99_ms: f64,
106    pub latency_max_ms: f64,
107}
108
109// =============================================================================
110// Database Info
111// =============================================================================
112
113/// Information about a database.
114#[derive(Debug, Clone, Serialize, Deserialize)]
115pub struct DatabaseInfo {
116    pub name: String,
117    pub size_bytes: u64,
118    pub table_count: usize,
119    pub index_count: usize,
120    pub created_at: u64,
121    pub last_modified: u64,
122}
123
124/// Information about a table.
125#[derive(Debug, Clone, Serialize, Deserialize)]
126pub struct TableInfo {
127    pub name: String,
128    pub database: String,
129    pub row_count: u64,
130    pub size_bytes: u64,
131    pub columns: Vec<ColumnInfo>,
132    pub indexes: Vec<IndexInfo>,
133    pub created_at: u64,
134}
135
136/// Information about a column.
137#[derive(Debug, Clone, Serialize, Deserialize)]
138pub struct ColumnInfo {
139    pub name: String,
140    pub data_type: String,
141    pub nullable: bool,
142    pub primary_key: bool,
143    pub default_value: Option<String>,
144}
145
146/// Information about an index.
147#[derive(Debug, Clone, Serialize, Deserialize)]
148pub struct IndexInfo {
149    pub name: String,
150    pub columns: Vec<String>,
151    pub index_type: String,
152    pub unique: bool,
153    pub size_bytes: u64,
154}
155
156// =============================================================================
157// Query Info
158// =============================================================================
159
160/// Information about a running query.
161#[derive(Debug, Clone, Serialize, Deserialize)]
162pub struct QueryInfo {
163    pub id: String,
164    pub sql: String,
165    pub database: String,
166    pub user: String,
167    pub state: QueryState,
168    pub started_at: u64,
169    pub duration_ms: u64,
170    pub rows_examined: u64,
171    pub rows_returned: u64,
172}
173
174/// State of a query.
175#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
176pub enum QueryState {
177    Running,
178    Finished,
179    Cancelled,
180    Failed,
181}
182
183/// Query statistics.
184#[derive(Debug, Clone, Default, Serialize, Deserialize)]
185pub struct QueryStats {
186    pub total_queries: u64,
187    pub queries_per_second: f64,
188    pub avg_duration_ms: f64,
189    pub p50_duration_ms: f64,
190    pub p95_duration_ms: f64,
191    pub p99_duration_ms: f64,
192    pub slow_queries: u64,
193    pub failed_queries: u64,
194}
195
196// =============================================================================
197// Replication Info
198// =============================================================================
199
200/// Information about replication status.
201#[derive(Debug, Clone, Serialize, Deserialize)]
202pub struct ReplicationInfo {
203    pub enabled: bool,
204    pub mode: ReplicationMode,
205    pub lag_ms: u64,
206    pub last_applied_index: u64,
207    pub commit_index: u64,
208    pub replicas: Vec<ReplicaInfo>,
209}
210
211/// Replication mode.
212#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
213pub enum ReplicationMode {
214    Synchronous,
215    Asynchronous,
216    SemiSynchronous,
217}
218
219/// Information about a replica.
220#[derive(Debug, Clone, Serialize, Deserialize)]
221pub struct ReplicaInfo {
222    pub node_id: String,
223    pub status: ReplicaStatus,
224    pub lag_ms: u64,
225    pub last_applied_index: u64,
226}
227
228/// Status of a replica.
229#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
230pub enum ReplicaStatus {
231    InSync,
232    Lagging,
233    CatchingUp,
234    Offline,
235}
236
237// =============================================================================
238// Shard Info
239// =============================================================================
240
241/// Information about sharding.
242#[derive(Debug, Clone, Serialize, Deserialize)]
243pub struct ShardingInfo {
244    pub enabled: bool,
245    pub shard_count: usize,
246    pub replication_factor: usize,
247    pub shards: Vec<ShardInfo>,
248}
249
250/// Information about a shard.
251#[derive(Debug, Clone, Serialize, Deserialize)]
252pub struct ShardInfo {
253    pub id: String,
254    pub state: ShardState,
255    pub key_range_start: String,
256    pub key_range_end: String,
257    pub primary_node: String,
258    pub replica_nodes: Vec<String>,
259    pub size_bytes: u64,
260    pub row_count: u64,
261}
262
263/// State of a shard.
264#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
265pub enum ShardState {
266    Active,
267    Migrating,
268    Splitting,
269    Merging,
270    Inactive,
271}
272
273// =============================================================================
274// Connection Info
275// =============================================================================
276
277/// Information about connections.
278#[derive(Debug, Clone, Serialize, Deserialize)]
279pub struct ConnectionInfo {
280    pub active: u64,
281    pub idle: u64,
282    pub total: u64,
283    pub max: u64,
284    pub connections: Vec<ConnectionDetails>,
285}
286
287/// Details about a connection.
288#[derive(Debug, Clone, Serialize, Deserialize)]
289pub struct ConnectionDetails {
290    pub id: String,
291    pub user: String,
292    pub database: String,
293    pub client_address: String,
294    pub state: ConnectionState,
295    pub connected_at: u64,
296    pub last_activity: u64,
297    pub current_query: Option<String>,
298}
299
300/// State of a connection.
301#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
302pub enum ConnectionState {
303    Active,
304    Idle,
305    IdleInTransaction,
306    Waiting,
307}
308
309// =============================================================================
310// Storage Info
311// =============================================================================
312
313/// Information about storage.
314#[derive(Debug, Clone, Serialize, Deserialize)]
315pub struct StorageInfo {
316    pub total_bytes: u64,
317    pub used_bytes: u64,
318    pub available_bytes: u64,
319    pub data_bytes: u64,
320    pub index_bytes: u64,
321    pub wal_bytes: u64,
322    pub temp_bytes: u64,
323}
324
325// =============================================================================
326// Alert Info
327// =============================================================================
328
329/// Information about an alert.
330#[derive(Debug, Clone, Serialize, Deserialize)]
331pub struct AlertInfo {
332    pub id: String,
333    pub severity: AlertSeverity,
334    pub source: String,
335    pub message: String,
336    pub timestamp: u64,
337    pub acknowledged: bool,
338    pub resolved: bool,
339}
340
341/// Severity of an alert.
342#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
343pub enum AlertSeverity {
344    Info,
345    Warning,
346    Error,
347    Critical,
348}
349
350// =============================================================================
351// User Info
352// =============================================================================
353
354/// Information about a user.
355#[derive(Debug, Clone, Serialize, Deserialize)]
356pub struct UserInfo {
357    pub username: String,
358    pub roles: Vec<String>,
359    pub created_at: u64,
360    pub last_login: Option<u64>,
361    pub enabled: bool,
362}
363
364// =============================================================================
365// Backup Info
366// =============================================================================
367
368/// Information about backups.
369#[derive(Debug, Clone, Serialize, Deserialize)]
370pub struct BackupInfo {
371    pub id: String,
372    pub backup_type: BackupType,
373    pub state: BackupState,
374    pub size_bytes: u64,
375    pub started_at: u64,
376    pub completed_at: Option<u64>,
377    pub duration_seconds: Option<u64>,
378    pub database: Option<String>,
379}
380
381/// Type of backup.
382#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
383pub enum BackupType {
384    Full,
385    Incremental,
386    Snapshot,
387}
388
389/// State of a backup.
390#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
391pub enum BackupState {
392    InProgress,
393    Completed,
394    Failed,
395    Cancelled,
396}
397
398// =============================================================================
399// Dashboard Summary
400// =============================================================================
401
402/// Summary for the dashboard home page.
403#[derive(Debug, Clone, Default, Serialize, Deserialize)]
404pub struct DashboardSummary {
405    pub cluster: ClusterSummary,
406    pub performance: PerformanceSummary,
407    pub storage: StorageSummary,
408    pub alerts: AlertSummary,
409}
410
411/// Cluster summary.
412#[derive(Debug, Clone, Default, Serialize, Deserialize)]
413pub struct ClusterSummary {
414    pub state: String,
415    pub node_count: usize,
416    pub healthy_nodes: usize,
417    pub leader_id: Option<String>,
418    pub version: String,
419}
420
421/// Performance summary.
422#[derive(Debug, Clone, Default, Serialize, Deserialize)]
423pub struct PerformanceSummary {
424    pub queries_per_second: f64,
425    pub avg_latency_ms: f64,
426    pub active_connections: u64,
427    pub cpu_usage_percent: f64,
428    pub memory_usage_percent: f64,
429}
430
431/// Storage summary.
432#[derive(Debug, Clone, Default, Serialize, Deserialize)]
433pub struct StorageSummary {
434    pub total_bytes: u64,
435    pub used_bytes: u64,
436    pub usage_percent: f64,
437    pub database_count: usize,
438    pub table_count: usize,
439}
440
441/// Alert summary.
442#[derive(Debug, Clone, Default, Serialize, Deserialize)]
443pub struct AlertSummary {
444    pub total: usize,
445    pub critical: usize,
446    pub warning: usize,
447    pub unacknowledged: usize,
448}
449
450// =============================================================================
451// Admin API Service
452// =============================================================================
453
454/// Admin API service for dashboard operations with real system metrics.
455pub struct AdminService {
456    start_time: std::time::Instant,
457    node_id: String,
458    node_name: Option<String>,
459    bind_address: String,
460    cluster_name: String,
461    // Query statistics tracking
462    total_queries: AtomicU64,
463    failed_queries: AtomicU64,
464    slow_queries: AtomicU64,
465    total_query_time_ns: AtomicU64,
466    active_connections: AtomicU64,
467    // Latency tracking (stored as microseconds for precision)
468    latencies: RwLock<Vec<u64>>,
469    // Network tracking
470    bytes_in: AtomicU64,
471    bytes_out: AtomicU64,
472    // Cluster peer tracking
473    peers: RwLock<Vec<PeerNode>>,
474    peer_addresses: RwLock<Vec<String>>,
475}
476
477/// Information about a peer node in the cluster.
478#[derive(Debug, Clone, Serialize, Deserialize)]
479pub struct PeerNode {
480    pub id: String,
481    pub name: Option<String>,
482    pub address: String,
483    pub status: NodeStatus,
484    pub role: NodeRole,
485    pub last_seen: u64,
486    pub version: String,
487    pub uptime_seconds: u64,
488    pub metrics: Option<NodeMetrics>,
489}
490
491impl AdminService {
492    /// Create a new admin service.
493    pub fn new() -> Self {
494        Self::with_config("node-0", None, "127.0.0.1:9090", "aegis-cluster", vec![])
495    }
496
497    /// Create admin service with custom node config.
498    pub fn with_config(
499        node_id: &str,
500        node_name: Option<String>,
501        bind_address: &str,
502        cluster_name: &str,
503        peers: Vec<String>,
504    ) -> Self {
505        Self {
506            start_time: std::time::Instant::now(),
507            node_id: node_id.to_string(),
508            node_name,
509            bind_address: bind_address.to_string(),
510            cluster_name: cluster_name.to_string(),
511            total_queries: AtomicU64::new(0),
512            failed_queries: AtomicU64::new(0),
513            slow_queries: AtomicU64::new(0),
514            total_query_time_ns: AtomicU64::new(0),
515            active_connections: AtomicU64::new(0),
516            latencies: RwLock::new(Vec::with_capacity(1000)),
517            bytes_in: AtomicU64::new(0),
518            bytes_out: AtomicU64::new(0),
519            peers: RwLock::new(Vec::new()),
520            peer_addresses: RwLock::new(peers),
521        }
522    }
523
524    /// Get this node's ID.
525    pub fn node_id(&self) -> &str {
526        &self.node_id
527    }
528
529    /// Get this node's name.
530    pub fn node_name(&self) -> Option<&str> {
531        self.node_name.as_deref()
532    }
533
534    /// Get this node's address.
535    pub fn address(&self) -> &str {
536        &self.bind_address
537    }
538
539    /// Get configured peer addresses.
540    pub fn peer_addresses(&self) -> Vec<String> {
541        self.peer_addresses
542            .read()
543            .expect("peer_addresses lock poisoned")
544            .clone()
545    }
546
547    /// Add a peer address.
548    pub fn add_peer_address(&self, address: String) {
549        let mut addrs = self
550            .peer_addresses
551            .write()
552            .expect("peer_addresses lock poisoned");
553        if !addrs.contains(&address) && address != self.bind_address {
554            addrs.push(address);
555        }
556    }
557
558    /// Register or update a peer node.
559    pub fn register_peer(&self, peer: PeerNode) {
560        let mut peers = self.peers.write().expect("peers lock poisoned");
561        // Update if exists, otherwise add
562        if let Some(existing) = peers
563            .iter_mut()
564            .find(|p| p.id == peer.id || p.address == peer.address)
565        {
566            *existing = peer;
567        } else {
568            peers.push(peer);
569        }
570    }
571
572    /// Remove a peer node.
573    pub fn remove_peer(&self, node_id: &str) {
574        let mut peers = self.peers.write().expect("peers lock poisoned");
575        peers.retain(|p| p.id != node_id);
576    }
577
578    /// Get all known peer nodes.
579    pub fn get_peers(&self) -> Vec<PeerNode> {
580        self.peers.read().expect("peers lock poisoned").clone()
581    }
582
583    /// Mark a peer as offline.
584    pub fn mark_peer_offline(&self, node_id: &str) {
585        let mut peers = self.peers.write().expect("peers lock poisoned");
586        if let Some(peer) = peers.iter_mut().find(|p| p.id == node_id) {
587            peer.status = NodeStatus::Offline;
588        }
589    }
590
591    /// Get node info for this node (for peer registration).
592    pub fn get_self_info(&self) -> PeerNode {
593        let uptime = self.start_time.elapsed().as_secs();
594        let (cpu_usage, memory_used, memory_total, disk_used, disk_total) =
595            self.get_system_metrics();
596        let total_queries = self.total_queries.load(Ordering::Relaxed);
597        let qps = if uptime > 0 {
598            total_queries as f64 / uptime as f64
599        } else {
600            0.0
601        };
602        let (p50, p90, p95, p99, max) = self.calculate_latency_percentiles();
603
604        PeerNode {
605            id: self.node_id.clone(),
606            name: self.node_name.clone(),
607            address: self.bind_address.clone(),
608            status: NodeStatus::Online,
609            role: NodeRole::Leader, // Will be determined by Raft later
610            last_seen: Self::now(),
611            version: env!("CARGO_PKG_VERSION").to_string(),
612            uptime_seconds: uptime,
613            metrics: Some(NodeMetrics {
614                cpu_usage_percent: cpu_usage,
615                memory_usage_bytes: memory_used,
616                memory_total_bytes: memory_total,
617                disk_usage_bytes: disk_used,
618                disk_total_bytes: disk_total,
619                connections_active: self.active_connections.load(Ordering::Relaxed),
620                queries_per_second: qps,
621                network_bytes_in: self.bytes_in.load(Ordering::Relaxed),
622                network_bytes_out: self.bytes_out.load(Ordering::Relaxed),
623                network_packets_in: 0,
624                network_packets_out: 0,
625                latency_p50_ms: p50,
626                latency_p90_ms: p90,
627                latency_p95_ms: p95,
628                latency_p99_ms: p99,
629                latency_max_ms: max,
630            }),
631        }
632    }
633
634    /// Record a query execution for statistics.
635    pub fn record_query(&self, duration_ms: f64, success: bool) {
636        self.total_queries.fetch_add(1, Ordering::Relaxed);
637        self.total_query_time_ns
638            .fetch_add((duration_ms * 1_000_000.0) as u64, Ordering::Relaxed);
639
640        if !success {
641            self.failed_queries.fetch_add(1, Ordering::Relaxed);
642        }
643
644        // Track slow queries (> 100ms)
645        if duration_ms > 100.0 {
646            self.slow_queries.fetch_add(1, Ordering::Relaxed);
647        }
648
649        // Track latency for percentile calculations
650        let latency_us = (duration_ms * 1000.0) as u64;
651        if let Ok(mut latencies) = self.latencies.write() {
652            if latencies.len() >= 10000 {
653                latencies.remove(0);
654            }
655            latencies.push(latency_us);
656        }
657    }
658
659    /// Record network bytes.
660    pub fn record_network(&self, bytes_in: u64, bytes_out: u64) {
661        self.bytes_in.fetch_add(bytes_in, Ordering::Relaxed);
662        self.bytes_out.fetch_add(bytes_out, Ordering::Relaxed);
663    }
664
665    /// Increment active connections.
666    pub fn connection_opened(&self) {
667        self.active_connections.fetch_add(1, Ordering::Relaxed);
668    }
669
670    /// Decrement active connections.
671    pub fn connection_closed(&self) {
672        self.active_connections.fetch_sub(1, Ordering::Relaxed);
673    }
674
675    /// Get real system metrics.
676    fn get_system_metrics(&self) -> (f64, u64, u64, u64, u64) {
677        let mut sys = System::new();
678        sys.refresh_all();
679
680        // Get CPU usage (average across all CPUs)
681        let cpu_usage = sys
682            .cpus()
683            .iter()
684            .map(|cpu| cpu.cpu_usage() as f64)
685            .sum::<f64>()
686            / sys.cpus().len().max(1) as f64;
687
688        let memory_used = sys.used_memory();
689        let memory_total = sys.total_memory();
690
691        // Get disk metrics
692        let disks = Disks::new_with_refreshed_list();
693        let (disk_used, disk_total) =
694            disks
695                .list()
696                .iter()
697                .fold((0u64, 0u64), |(used, total), disk| {
698                    (
699                        used + (disk.total_space() - disk.available_space()),
700                        total + disk.total_space(),
701                    )
702                });
703
704        (cpu_usage, memory_used, memory_total, disk_used, disk_total)
705    }
706
707    /// Get cluster information.
708    pub fn get_cluster_info(&self) -> ClusterInfo {
709        let peers = self.peers.read().expect("peers lock poisoned");
710        let online_peers = peers
711            .iter()
712            .filter(|p| p.status == NodeStatus::Online)
713            .count();
714        let total_nodes = 1 + peers.len(); // self + peers
715        let healthy_nodes = 1 + online_peers; // self is always healthy if running
716
717        let state = if healthy_nodes == total_nodes {
718            ClusterState::Healthy
719        } else if healthy_nodes > total_nodes / 2 {
720            ClusterState::Degraded
721        } else {
722            ClusterState::Unavailable
723        };
724
725        ClusterInfo {
726            name: self.cluster_name.clone(),
727            version: env!("CARGO_PKG_VERSION").to_string(),
728            node_count: total_nodes,
729            leader_id: Some(self.node_id.clone()),
730            state,
731            uptime_seconds: self.start_time.elapsed().as_secs(),
732        }
733    }
734
735    /// Get dashboard summary with real metrics.
736    pub fn get_dashboard_summary(&self) -> DashboardSummary {
737        let (cpu_usage, memory_used, memory_total, disk_used, disk_total) =
738            self.get_system_metrics();
739        let memory_percent = if memory_total > 0 {
740            (memory_used as f64 / memory_total as f64) * 100.0
741        } else {
742            0.0
743        };
744        let storage_percent = if disk_total > 0 {
745            (disk_used as f64 / disk_total as f64) * 100.0
746        } else {
747            0.0
748        };
749
750        let uptime = self.start_time.elapsed().as_secs();
751        let total_queries = self.total_queries.load(Ordering::Relaxed);
752        let qps = if uptime > 0 {
753            total_queries as f64 / uptime as f64
754        } else {
755            0.0
756        };
757        let total_time_ns = self.total_query_time_ns.load(Ordering::Relaxed);
758        let avg_latency = if total_queries > 0 {
759            (total_time_ns as f64 / total_queries as f64) / 1_000_000.0
760        } else {
761            0.0
762        };
763
764        DashboardSummary {
765            cluster: ClusterSummary {
766                state: "Healthy".to_string(),
767                node_count: 1,
768                healthy_nodes: 1,
769                leader_id: Some(self.node_id.clone()),
770                version: env!("CARGO_PKG_VERSION").to_string(),
771            },
772            performance: PerformanceSummary {
773                queries_per_second: qps,
774                avg_latency_ms: avg_latency,
775                active_connections: self.active_connections.load(Ordering::Relaxed),
776                cpu_usage_percent: cpu_usage,
777                memory_usage_percent: memory_percent,
778            },
779            storage: StorageSummary {
780                total_bytes: disk_total,
781                used_bytes: disk_used,
782                usage_percent: storage_percent,
783                database_count: 1, // Single default database
784                table_count: 0,    // Would need schema tracking
785            },
786            alerts: AlertSummary {
787                total: 0,
788                critical: 0,
789                warning: 0,
790                unacknowledged: 0,
791            },
792        }
793    }
794
795    /// Get list of all nodes (self + peers).
796    pub fn get_nodes(&self) -> Vec<NodeInfo> {
797        let uptime = self.start_time.elapsed().as_secs();
798        let (cpu_usage, memory_used, memory_total, disk_used, disk_total) =
799            self.get_system_metrics();
800
801        let total_queries = self.total_queries.load(Ordering::Relaxed);
802        let qps = if uptime > 0 {
803            total_queries as f64 / uptime as f64
804        } else {
805            0.0
806        };
807
808        // Calculate latency percentiles
809        let (p50, p90, p95, p99, max) = self.calculate_latency_percentiles();
810
811        // Start with self
812        let mut nodes = vec![NodeInfo {
813            id: format!(
814                "{}{}",
815                self.node_id,
816                self.node_name
817                    .as_ref()
818                    .map(|n| format!(" ({})", n))
819                    .unwrap_or_default()
820            ),
821            address: self.bind_address.clone(),
822            role: NodeRole::Leader,
823            status: NodeStatus::Online,
824            version: env!("CARGO_PKG_VERSION").to_string(),
825            uptime_seconds: uptime,
826            last_heartbeat: Self::now(),
827            metrics: NodeMetrics {
828                cpu_usage_percent: cpu_usage,
829                memory_usage_bytes: memory_used,
830                memory_total_bytes: memory_total,
831                disk_usage_bytes: disk_used,
832                disk_total_bytes: disk_total,
833                connections_active: self.active_connections.load(Ordering::Relaxed),
834                queries_per_second: qps,
835                network_bytes_in: self.bytes_in.load(Ordering::Relaxed),
836                network_bytes_out: self.bytes_out.load(Ordering::Relaxed),
837                network_packets_in: 0,
838                network_packets_out: 0,
839                latency_p50_ms: p50,
840                latency_p90_ms: p90,
841                latency_p95_ms: p95,
842                latency_p99_ms: p99,
843                latency_max_ms: max,
844            },
845        }];
846
847        // Add peer nodes
848        let peers = self.peers.read().expect("peers lock poisoned");
849        for peer in peers.iter() {
850            nodes.push(NodeInfo {
851                id: format!(
852                    "{}{}",
853                    peer.id,
854                    peer.name
855                        .as_ref()
856                        .map(|n| format!(" ({})", n))
857                        .unwrap_or_default()
858                ),
859                address: peer.address.clone(),
860                role: peer.role,
861                status: peer.status,
862                version: peer.version.clone(),
863                uptime_seconds: peer.uptime_seconds,
864                last_heartbeat: peer.last_seen,
865                metrics: peer.metrics.clone().unwrap_or_default(),
866            });
867        }
868
869        nodes
870    }
871
872    /// Calculate latency percentiles from recorded data.
873    fn calculate_latency_percentiles(&self) -> (f64, f64, f64, f64, f64) {
874        let latencies = match self.latencies.read() {
875            Ok(l) => l.clone(),
876            Err(_) => return (0.0, 0.0, 0.0, 0.0, 0.0),
877        };
878
879        if latencies.is_empty() {
880            return (0.0, 0.0, 0.0, 0.0, 0.0);
881        }
882
883        let mut sorted = latencies.clone();
884        sorted.sort_unstable();
885
886        let len = sorted.len();
887        let p50_idx = (len as f64 * 0.50) as usize;
888        let p90_idx = (len as f64 * 0.90) as usize;
889        let p95_idx = (len as f64 * 0.95) as usize;
890        let p99_idx = (len as f64 * 0.99) as usize;
891
892        let to_ms = |us: u64| us as f64 / 1000.0;
893
894        (
895            to_ms(sorted.get(p50_idx).copied().unwrap_or(0)),
896            to_ms(sorted.get(p90_idx).copied().unwrap_or(0)),
897            to_ms(sorted.get(p95_idx).copied().unwrap_or(0)),
898            to_ms(sorted.get(p99_idx.min(len - 1)).copied().unwrap_or(0)),
899            to_ms(sorted.last().copied().unwrap_or(0)),
900        )
901    }
902
903    /// Get storage information with real disk metrics.
904    pub fn get_storage_info(&self) -> StorageInfo {
905        let disks = Disks::new_with_refreshed_list();
906
907        let (total, available) = disks
908            .list()
909            .iter()
910            .fold((0u64, 0u64), |(total, avail), disk| {
911                (total + disk.total_space(), avail + disk.available_space())
912            });
913
914        let used = total.saturating_sub(available);
915
916        // Estimate breakdown (would need actual tracking for precise values)
917        let data_bytes = (used as f64 * 0.75) as u64;
918        let index_bytes = (used as f64 * 0.15) as u64;
919        let wal_bytes = (used as f64 * 0.08) as u64;
920        let temp_bytes = (used as f64 * 0.02) as u64;
921
922        StorageInfo {
923            total_bytes: total,
924            used_bytes: used,
925            available_bytes: available,
926            data_bytes,
927            index_bytes,
928            wal_bytes,
929            temp_bytes,
930        }
931    }
932
933    /// Get query statistics with real data.
934    pub fn get_query_stats(&self) -> QueryStats {
935        let uptime = self.start_time.elapsed().as_secs();
936        let total_queries = self.total_queries.load(Ordering::Relaxed);
937        let qps = if uptime > 0 {
938            total_queries as f64 / uptime as f64
939        } else {
940            0.0
941        };
942
943        let total_time_ns = self.total_query_time_ns.load(Ordering::Relaxed);
944        let avg_duration = if total_queries > 0 {
945            (total_time_ns as f64 / total_queries as f64) / 1_000_000.0
946        } else {
947            0.0
948        };
949
950        let (p50, _, p95, p99, _) = self.calculate_latency_percentiles();
951
952        QueryStats {
953            total_queries,
954            queries_per_second: qps,
955            avg_duration_ms: avg_duration,
956            p50_duration_ms: p50,
957            p95_duration_ms: p95,
958            p99_duration_ms: p99,
959            slow_queries: self.slow_queries.load(Ordering::Relaxed),
960            failed_queries: self.failed_queries.load(Ordering::Relaxed),
961        }
962    }
963
964    /// Get current timestamp.
965    fn now() -> u64 {
966        std::time::SystemTime::now()
967            .duration_since(std::time::UNIX_EPOCH)
968            .unwrap_or_default()
969            .as_millis() as u64
970    }
971}
972
973impl Default for AdminService {
974    fn default() -> Self {
975        Self::new()
976    }
977}
978
979// =============================================================================
980// Tests
981// =============================================================================
982
983#[cfg(test)]
984mod tests {
985    use super::*;
986
987    #[test]
988    fn test_cluster_info() {
989        let service = AdminService::new();
990        let info = service.get_cluster_info();
991
992        assert_eq!(info.name, "aegis-cluster");
993        assert_eq!(info.state, ClusterState::Healthy);
994    }
995
996    #[test]
997    fn test_dashboard_summary() {
998        let service = AdminService::new();
999        let summary = service.get_dashboard_summary();
1000
1001        // Single node mode
1002        assert_eq!(summary.cluster.node_count, 1);
1003        assert_eq!(summary.cluster.healthy_nodes, 1);
1004        // CPU and memory should be valid percentages
1005        assert!(summary.performance.cpu_usage_percent >= 0.0);
1006        assert!(summary.performance.cpu_usage_percent <= 100.0);
1007        assert!(summary.performance.memory_usage_percent >= 0.0);
1008        assert!(summary.performance.memory_usage_percent <= 100.0);
1009    }
1010
1011    #[test]
1012    fn test_get_nodes() {
1013        let service = AdminService::new();
1014        let nodes = service.get_nodes();
1015
1016        // Single node in standalone mode
1017        assert_eq!(nodes.len(), 1);
1018        assert_eq!(nodes[0].role, NodeRole::Leader);
1019        assert_eq!(nodes[0].status, NodeStatus::Online);
1020    }
1021
1022    #[test]
1023    fn test_storage_info() {
1024        let service = AdminService::new();
1025        let storage = service.get_storage_info();
1026
1027        // Real disk metrics - total should be positive
1028        assert!(storage.total_bytes > 0);
1029        assert!(storage.available_bytes <= storage.total_bytes);
1030        assert_eq!(
1031            storage.available_bytes,
1032            storage.total_bytes - storage.used_bytes
1033        );
1034    }
1035
1036    #[test]
1037    fn test_query_stats() {
1038        let service = AdminService::new();
1039
1040        // Record some queries to have data
1041        service.record_query(1.5, true);
1042        service.record_query(2.0, true);
1043        service.record_query(3.5, true);
1044        service.record_query(150.0, false); // slow + failed
1045
1046        let stats = service.get_query_stats();
1047
1048        assert_eq!(stats.total_queries, 4);
1049        assert_eq!(stats.failed_queries, 1);
1050        assert_eq!(stats.slow_queries, 1);
1051    }
1052
1053    #[test]
1054    fn test_node_metrics() {
1055        let service = AdminService::new();
1056        let nodes = service.get_nodes();
1057
1058        for node in nodes {
1059            assert!(node.metrics.cpu_usage_percent <= 100.0);
1060            assert!(node.metrics.memory_usage_bytes <= node.metrics.memory_total_bytes);
1061        }
1062    }
1063}