1use bollard::models::ContainerStatsResponse;
2use bollard::query_parameters::StatsOptions;
3use futures_util::stream::StreamExt;
4use std::time::Instant;
5
6use crate::core::types::{AppEvent, ContainerKey, ContainerStats, EventSender};
7use crate::docker::connection::DockerHost;
8
9pub async fn stream_container_stats(host: DockerHost, truncated_id: String, tx: EventSender) {
19 let stats_options = StatsOptions {
20 stream: true,
21 one_shot: false,
22 };
23
24 let mut stats_stream = host.docker.stats(&truncated_id, Some(stats_options));
25
26 const ALPHA: f64 = 0.3;
29
30 let mut smoothed_cpu: Option<f64> = None;
31 let mut smoothed_memory: Option<f64> = None;
32 let mut smoothed_net_tx: Option<f64> = None;
33 let mut smoothed_net_rx: Option<f64> = None;
34
35 let mut prev_net_tx: Option<u64> = None;
37 let mut prev_net_rx: Option<u64> = None;
38 let mut prev_timestamp: Option<Instant> = None;
39
40 while let Some(result) = stats_stream.next().await {
41 match result {
42 Ok(stats) => {
43 let cpu_percent = calculate_cpu_percentage(&stats);
44 let memory_percent = calculate_memory_percentage(&stats);
45 let (net_tx_rate, net_rx_rate) =
46 calculate_network_rates(&stats, prev_net_tx, prev_net_rx, prev_timestamp);
47
48 let (tx_bytes, rx_bytes) = extract_network_bytes(&stats);
50 prev_net_tx = tx_bytes;
51 prev_net_rx = rx_bytes;
52 prev_timestamp = Some(Instant::now());
53
54 let cpu = match smoothed_cpu {
56 Some(prev) => ALPHA * cpu_percent + (1.0 - ALPHA) * prev,
57 None => cpu_percent, };
59
60 let memory = match smoothed_memory {
61 Some(prev) => ALPHA * memory_percent + (1.0 - ALPHA) * prev,
62 None => memory_percent, };
64
65 let network_tx_bytes_per_sec = match smoothed_net_tx {
66 Some(prev) => ALPHA * net_tx_rate + (1.0 - ALPHA) * prev,
67 None => net_tx_rate,
68 };
69
70 let network_rx_bytes_per_sec = match smoothed_net_rx {
71 Some(prev) => ALPHA * net_rx_rate + (1.0 - ALPHA) * prev,
72 None => net_rx_rate,
73 };
74
75 smoothed_cpu = Some(cpu);
77 smoothed_memory = Some(memory);
78 smoothed_net_tx = Some(network_tx_bytes_per_sec);
79 smoothed_net_rx = Some(network_rx_bytes_per_sec);
80
81 let (memory_used_bytes, memory_limit_bytes) = extract_memory_bytes(&stats);
83
84 let stats = ContainerStats {
85 cpu,
86 memory,
87 memory_used_bytes,
88 memory_limit_bytes,
89 network_tx_bytes_per_sec,
90 network_rx_bytes_per_sec,
91 };
92
93 let key = ContainerKey::new(host.host_id.clone(), truncated_id.clone());
94 if tx.send(AppEvent::ContainerStat(key, stats)).await.is_err() {
95 break;
96 }
97 }
98 Err(_) => break,
99 }
100 }
101
102 let key = ContainerKey::new(host.host_id, truncated_id);
104 let _ = tx.send(AppEvent::ContainerDestroyed(key)).await;
105}
106
107pub fn calculate_cpu_percentage(stats: &ContainerStatsResponse) -> f64 {
109 let cpu_stats = match &stats.cpu_stats {
110 Some(cs) => cs,
111 None => return 0.0,
112 };
113 let precpu_stats = match &stats.precpu_stats {
114 Some(pcs) => pcs,
115 None => return 0.0,
116 };
117
118 let cpu_usage = cpu_stats
119 .cpu_usage
120 .as_ref()
121 .and_then(|u| u.total_usage)
122 .unwrap_or(0);
123 let precpu_usage = precpu_stats
124 .cpu_usage
125 .as_ref()
126 .and_then(|u| u.total_usage)
127 .unwrap_or(0);
128 let cpu_delta = cpu_usage as f64 - precpu_usage as f64;
129
130 let system_delta = cpu_stats.system_cpu_usage.unwrap_or(0) as f64
131 - precpu_stats.system_cpu_usage.unwrap_or(0) as f64;
132 let number_cpus = cpu_stats.online_cpus.unwrap_or(1) as f64;
133
134 if system_delta > 0.0 && cpu_delta > 0.0 {
135 (cpu_delta / system_delta) * number_cpus * 100.0
136 } else {
137 0.0
138 }
139}
140
141pub fn calculate_memory_percentage(stats: &ContainerStatsResponse) -> f64 {
143 let memory_stats = match &stats.memory_stats {
144 Some(ms) => ms,
145 None => return 0.0,
146 };
147
148 let memory_usage = memory_stats.usage.unwrap_or(0) as f64;
149 let memory_limit = memory_stats.limit.unwrap_or(1) as f64;
150
151 if memory_limit > 0.0 {
152 (memory_usage / memory_limit) * 100.0
153 } else {
154 0.0
155 }
156}
157
158fn extract_memory_bytes(stats: &ContainerStatsResponse) -> (u64, u64) {
161 let memory_stats = match &stats.memory_stats {
162 Some(ms) => ms,
163 None => return (0, 0),
164 };
165
166 let memory_used = memory_stats.usage.unwrap_or(0);
167 let memory_limit = memory_stats.limit.unwrap_or(0);
168
169 (memory_used, memory_limit)
170}
171
172fn extract_network_bytes(stats: &ContainerStatsResponse) -> (Option<u64>, Option<u64>) {
174 let networks = match &stats.networks {
175 Some(nets) => nets,
176 None => return (None, None),
177 };
178
179 let mut total_tx = 0u64;
180 let mut total_rx = 0u64;
181
182 for interface_stats in networks.values() {
183 total_tx += interface_stats.tx_bytes.unwrap_or(0);
184 total_rx += interface_stats.rx_bytes.unwrap_or(0);
185 }
186
187 (Some(total_tx), Some(total_rx))
188}
189
190fn calculate_network_rates(
192 stats: &ContainerStatsResponse,
193 prev_tx: Option<u64>,
194 prev_rx: Option<u64>,
195 prev_time: Option<Instant>,
196) -> (f64, f64) {
197 let (current_tx, current_rx) = extract_network_bytes(stats);
198
199 let (prev_tx, prev_rx, prev_time) = match (prev_tx, prev_rx, prev_time) {
201 (Some(tx), Some(rx), Some(time)) => (tx, rx, time),
202 _ => return (0.0, 0.0),
203 };
204
205 let (current_tx, current_rx) = match (current_tx, current_rx) {
206 (Some(tx), Some(rx)) => (tx, rx),
207 _ => return (0.0, 0.0),
208 };
209
210 let elapsed = prev_time.elapsed().as_secs_f64();
211 if elapsed <= 0.0 {
212 return (0.0, 0.0);
213 }
214
215 let tx_delta = current_tx.saturating_sub(prev_tx) as f64;
216 let rx_delta = current_rx.saturating_sub(prev_rx) as f64;
217
218 let tx_rate = tx_delta / elapsed;
219 let rx_rate = rx_delta / elapsed;
220
221 (tx_rate, rx_rate)
222}
223
224#[cfg(test)]
225mod tests {
226 use super::*;
227 use bollard::models::{ContainerCpuStats, ContainerCpuUsage, ContainerMemoryStats};
228
229 fn create_cpu_stats(
230 total_usage: u64,
231 system_cpu_usage: u64,
232 online_cpus: u32,
233 ) -> ContainerCpuStats {
234 ContainerCpuStats {
235 cpu_usage: Some(ContainerCpuUsage {
236 total_usage: Some(total_usage),
237 percpu_usage: None,
238 usage_in_kernelmode: None,
239 usage_in_usermode: None,
240 }),
241 system_cpu_usage: Some(system_cpu_usage),
242 online_cpus: Some(online_cpus),
243 throttling_data: None,
244 }
245 }
246
247 #[test]
248 fn test_calculate_cpu_percentage_normal_usage() {
249 let stats = ContainerStatsResponse {
250 cpu_stats: Some(create_cpu_stats(1_000_000_000, 2_000_000_000, 4)),
251 precpu_stats: Some(create_cpu_stats(500_000_000, 1_000_000_000, 4)),
252 ..Default::default()
253 };
254
255 let cpu = calculate_cpu_percentage(&stats);
256
257 assert_eq!(cpu, 200.0);
261 }
262
263 #[test]
264 fn test_calculate_cpu_percentage_single_core() {
265 let stats = ContainerStatsResponse {
266 cpu_stats: Some(create_cpu_stats(800_000_000, 1_000_000_000, 1)),
267 precpu_stats: Some(create_cpu_stats(200_000_000, 500_000_000, 1)),
268 ..Default::default()
269 };
270
271 let cpu = calculate_cpu_percentage(&stats);
272
273 assert_eq!(cpu, 120.0);
277 }
278
279 #[test]
280 fn test_calculate_cpu_percentage_missing_cpu_stats() {
281 let stats = ContainerStatsResponse {
282 cpu_stats: None,
283 precpu_stats: None,
284 ..Default::default()
285 };
286
287 assert_eq!(calculate_cpu_percentage(&stats), 0.0);
288 }
289
290 #[test]
291 fn test_calculate_cpu_percentage_missing_precpu_stats() {
292 let stats = ContainerStatsResponse {
293 cpu_stats: Some(create_cpu_stats(1_000_000_000, 2_000_000_000, 4)),
294 precpu_stats: None,
295 ..Default::default()
296 };
297
298 assert_eq!(calculate_cpu_percentage(&stats), 0.0);
299 }
300
301 #[test]
302 fn test_calculate_cpu_percentage_zero_system_delta() {
303 let stats = ContainerStatsResponse {
304 cpu_stats: Some(create_cpu_stats(1_000_000_000, 2_000_000_000, 4)),
305 precpu_stats: Some(create_cpu_stats(500_000_000, 2_000_000_000, 4)), ..Default::default()
307 };
308
309 assert_eq!(calculate_cpu_percentage(&stats), 0.0);
311 }
312
313 #[test]
314 fn test_calculate_cpu_percentage_zero_cpu_delta() {
315 let stats = ContainerStatsResponse {
316 cpu_stats: Some(create_cpu_stats(1_000_000_000, 2_000_000_000, 4)),
317 precpu_stats: Some(create_cpu_stats(1_000_000_000, 1_000_000_000, 4)), ..Default::default()
319 };
320
321 assert_eq!(calculate_cpu_percentage(&stats), 0.0);
323 }
324
325 #[test]
326 fn test_calculate_memory_percentage_normal_usage() {
327 let stats = ContainerStatsResponse {
328 memory_stats: Some(ContainerMemoryStats {
329 usage: Some(500_000_000), limit: Some(1_000_000_000), max_usage: None,
332 stats: None,
333 failcnt: None,
334 commitbytes: None,
335 commitpeakbytes: None,
336 privateworkingset: None,
337 }),
338 ..Default::default()
339 };
340
341 assert_eq!(calculate_memory_percentage(&stats), 50.0);
342 }
343
344 #[test]
345 fn test_calculate_memory_percentage_full_usage() {
346 let stats = ContainerStatsResponse {
347 memory_stats: Some(ContainerMemoryStats {
348 usage: Some(1_000_000_000),
349 limit: Some(1_000_000_000),
350 max_usage: None,
351 stats: None,
352 failcnt: None,
353 commitbytes: None,
354 commitpeakbytes: None,
355 privateworkingset: None,
356 }),
357 ..Default::default()
358 };
359
360 assert_eq!(calculate_memory_percentage(&stats), 100.0);
361 }
362
363 #[test]
364 fn test_calculate_memory_percentage_low_usage() {
365 let stats = ContainerStatsResponse {
366 memory_stats: Some(ContainerMemoryStats {
367 usage: Some(100_000_000), limit: Some(2_000_000_000), max_usage: None,
370 stats: None,
371 failcnt: None,
372 commitbytes: None,
373 commitpeakbytes: None,
374 privateworkingset: None,
375 }),
376 ..Default::default()
377 };
378
379 assert_eq!(calculate_memory_percentage(&stats), 5.0);
380 }
381
382 #[test]
383 fn test_calculate_memory_percentage_missing_memory_stats() {
384 let stats = ContainerStatsResponse {
385 memory_stats: None,
386 ..Default::default()
387 };
388
389 assert_eq!(calculate_memory_percentage(&stats), 0.0);
390 }
391
392 #[test]
393 fn test_calculate_memory_percentage_missing_usage() {
394 let stats = ContainerStatsResponse {
395 memory_stats: Some(ContainerMemoryStats {
396 usage: None,
397 limit: Some(1_000_000_000),
398 max_usage: None,
399 stats: None,
400 failcnt: None,
401 commitbytes: None,
402 commitpeakbytes: None,
403 privateworkingset: None,
404 }),
405 ..Default::default()
406 };
407
408 assert_eq!(calculate_memory_percentage(&stats), 0.0);
409 }
410
411 #[test]
412 fn test_calculate_memory_percentage_zero_limit() {
413 let stats = ContainerStatsResponse {
414 memory_stats: Some(ContainerMemoryStats {
415 usage: Some(500_000_000),
416 limit: Some(0),
417 max_usage: None,
418 stats: None,
419 failcnt: None,
420 commitbytes: None,
421 commitpeakbytes: None,
422 privateworkingset: None,
423 }),
424 ..Default::default()
425 };
426
427 assert_eq!(calculate_memory_percentage(&stats), 0.0);
429 }
430}