1use std::sync::Arc;
7use std::time::Duration;
8
9use tracing::debug;
10
11use orca_core::types::{RuntimeKind, WorkloadStatus};
12
13use crate::state::AppState;
14
15const STATS_INTERVAL: Duration = Duration::from_secs(2);
18
19#[derive(Debug, Clone, serde::Serialize)]
21pub struct ContainerStats {
22 pub memory_usage: String,
24 pub cpu_percent: f64,
26}
27
28pub fn spawn_stats_collector(state: Arc<AppState>) {
30 tokio::spawn(async move {
31 loop {
32 tokio::time::sleep(STATS_INTERVAL).await;
33 collect_all_stats(&state).await;
34 }
35 });
36}
37
38async fn collect_all_stats(state: &AppState) {
40 let targets: Vec<(String, Vec<StatsTarget>)> = {
42 let services = state.services.read().await;
43 services
44 .values()
45 .filter_map(|svc| {
46 if svc.config.runtime != RuntimeKind::Container {
47 return None;
48 }
49 let running: Vec<StatsTarget> = svc
50 .instances
51 .iter()
52 .filter(|i| i.status == WorkloadStatus::Running)
53 .map(|i| StatsTarget {
54 handle: i.handle.clone(),
55 })
56 .collect();
57 if running.is_empty() {
58 return None;
59 }
60 Some((svc.config.name.clone(), running))
61 })
62 .collect()
63 };
64
65 let runtime = state.container_runtime.as_ref();
66 let mut new_stats = std::collections::HashMap::new();
67
68 for (name, instances) in &targets {
69 let mut total_mem: u64 = 0;
70 let mut total_cpu: f64 = 0.0;
71 let mut count: u32 = 0;
72
73 for target in instances {
74 match runtime.stats(&target.handle).await {
75 Ok(rs) => {
76 total_mem += rs.memory_bytes;
77 total_cpu += rs.cpu_percent;
78 count += 1;
79 }
80 Err(e) => {
81 debug!(service = %name, "Stats unavailable: {e}");
82 }
83 }
84 }
85
86 if count > 0 {
87 new_stats.insert(
88 name.clone(),
89 ContainerStats {
90 memory_usage: format_bytes(total_mem),
91 cpu_percent: (total_cpu * 100.0).round() / 100.0,
92 },
93 );
94 }
95 }
96
97 let mut cache = state.container_stats.write().await;
98 *cache = new_stats;
99}
100
101fn format_bytes(bytes: u64) -> String {
103 const GI: u64 = 1024 * 1024 * 1024;
104 const MI: u64 = 1024 * 1024;
105 const KI: u64 = 1024;
106
107 if bytes >= GI {
108 format!("{}Gi", bytes / GI)
109 } else if bytes >= MI {
110 format!("{}Mi", bytes / MI)
111 } else if bytes >= KI {
112 format!("{}Ki", bytes / KI)
113 } else {
114 format!("{bytes}B")
115 }
116}
117
118struct StatsTarget {
120 handle: orca_core::runtime::WorkloadHandle,
121}
122
123#[cfg(test)]
124mod tests {
125 use super::*;
126
127 #[test]
128 fn format_bytes_gi() {
129 assert_eq!(format_bytes(2 * 1024 * 1024 * 1024), "2Gi");
130 }
131
132 #[test]
133 fn format_bytes_mi() {
134 assert_eq!(format_bytes(512 * 1024 * 1024), "512Mi");
135 }
136
137 #[test]
138 fn format_bytes_ki() {
139 assert_eq!(format_bytes(64 * 1024), "64Ki");
140 }
141
142 #[test]
143 fn format_bytes_small() {
144 assert_eq!(format_bytes(42), "42B");
145 }
146
147 #[test]
148 fn container_stats_serializes() {
149 let stats = ContainerStats {
150 memory_usage: "128Mi".to_string(),
151 cpu_percent: 42.5,
152 };
153 let json = serde_json::to_value(&stats).unwrap();
154 assert_eq!(json["memory_usage"], "128Mi");
155 assert_eq!(json["cpu_percent"], 42.5);
156 }
157
158 #[tokio::test]
159 async fn app_state_starts_with_empty_stats() {
160 use std::collections::HashMap;
161 use std::sync::Arc;
162 use tokio::sync::RwLock;
163
164 use orca_core::config::ClusterConfig;
165 use orca_core::testing::MockRuntime;
166
167 let state = crate::state::AppState::new(
168 ClusterConfig::default(),
169 Arc::new(MockRuntime::new()),
170 None,
171 Arc::new(RwLock::new(HashMap::new())),
172 Arc::new(RwLock::new(Vec::new())),
173 );
174 let stats = state.container_stats.read().await;
175 assert!(stats.is_empty());
176 }
177}