use std::collections::{HashMap, HashSet};
use futures_util::StreamExt;
use serde::Deserialize;
use crate::compose::types::ComposeFile;
use crate::error::{ComposeError, Result};
use crate::libpod::{parse_json_lines, urlencoded, API_PREFIX};
use super::Engine;
fn containers_query(wanted: &HashSet<String>) -> String {
if wanted.is_empty() {
return String::new();
}
let mut names: Vec<&String> = wanted.iter().collect();
names.sort();
let joined = names
.iter()
.map(|n| urlencoded(n))
.collect::<Vec<_>>()
.join(",");
format!("&containers={joined}")
}
fn null_default<'de, D, T>(d: D) -> std::result::Result<T, D::Error>
where
D: serde::Deserializer<'de>,
T: Default + Deserialize<'de>,
{
Option::<T>::deserialize(d).map(|v| v.unwrap_or_default())
}
#[derive(Deserialize)]
struct StatsReport {
#[serde(rename = "Stats", default)]
stats: Vec<ContainerStat>,
}
#[derive(Deserialize, Default)]
struct ContainerStat {
#[serde(rename = "Name", default)]
name: String,
#[serde(rename = "CPU", default)]
cpu: f64,
#[serde(rename = "MemUsage", default)]
mem_usage: u64,
#[serde(rename = "MemLimit", default)]
mem_limit: u64,
#[serde(rename = "MemPerc", default)]
mem_perc: f64,
#[serde(rename = "BlockInput", default)]
block_in: u64,
#[serde(rename = "BlockOutput", default)]
block_out: u64,
#[serde(rename = "PIDs", default)]
pids: u64,
#[serde(rename = "Network", default, deserialize_with = "null_default")]
network: HashMap<String, NetStat>,
}
#[derive(Deserialize, Default)]
struct NetStat {
#[serde(rename = "RxBytes", default)]
rx: u64,
#[serde(rename = "TxBytes", default)]
tx: u64,
}
fn format_bytes(bytes: u64) -> String {
const UNITS: [&str; 5] = ["B", "KiB", "MiB", "GiB", "TiB"];
let mut value = bytes as f64;
let mut unit = 0;
while value >= 1024.0 && unit < UNITS.len() - 1 {
value /= 1024.0;
unit += 1;
}
if unit == 0 {
format!("{bytes}B")
} else {
format!("{value:.1}{}", UNITS[unit])
}
}
fn format_row(s: &ContainerStat) -> String {
let (rx, tx) = s
.network
.values()
.fold((0u64, 0u64), |(rx, tx), n| (rx + n.rx, tx + n.tx));
format!(
"{:<32} {:>7.2}% {:>10} / {:<10} {:>6.2}% {:>9} / {:<9} {:>9} / {:<9} {:>5}",
s.name,
s.cpu,
format_bytes(s.mem_usage),
format_bytes(s.mem_limit),
s.mem_perc,
format_bytes(rx),
format_bytes(tx),
format_bytes(s.block_in),
format_bytes(s.block_out),
s.pids,
)
}
const HEADER: &str = "NAME CPU % MEM USAGE / LIMIT MEM % NET I/O BLOCK I/O PIDS";
impl Engine {
pub async fn stats(
&self,
file: &ComposeFile,
target_services: &[String],
no_stream: bool,
) -> Result<()> {
let wanted = self.target_container_names(file, target_services);
let containers = containers_query(&wanted);
if no_stream {
let report: StatsReport = self
.client
.get_json(&format!(
"{API_PREFIX}/containers/stats?stream=false{containers}"
))
.await
.map_err(ComposeError::Podman)?;
print_frame(&report, &wanted);
return Ok(());
}
let resp = self
.client
.get_stream(&format!(
"{API_PREFIX}/containers/stats?stream=true{containers}"
))
.await
.map_err(ComposeError::Podman)?;
let mut frames = parse_json_lines::<StatsReport>(resp.into_body());
while let Some(frame) = frames.next().await {
match frame {
Ok(report) => print_frame(&report, &wanted),
Err(e) => {
tracing::debug!("stats stream ended: {e}");
break;
}
}
}
Ok(())
}
fn target_container_names(
&self,
file: &ComposeFile,
target_services: &[String],
) -> HashSet<String> {
file.services
.iter()
.filter(|(name, _)| {
target_services.is_empty() || target_services.iter().any(|t| t == *name)
})
.flat_map(|(name, service)| self.replica_names(name, service))
.collect()
}
}
fn print_frame(report: &StatsReport, wanted: &HashSet<String>) {
let mut rows: Vec<&ContainerStat> = report
.stats
.iter()
.filter(|s| wanted.contains(&s.name))
.collect();
rows.sort_by(|a, b| a.name.cmp(&b.name));
println!("{HEADER}");
for s in rows {
println!("{}", format_row(s));
}
println!();
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn format_bytes_scales_units() {
assert_eq!(format_bytes(512), "512B");
assert_eq!(format_bytes(1024), "1.0KiB");
assert_eq!(format_bytes(1536), "1.5KiB");
assert_eq!(format_bytes(1024 * 1024), "1.0MiB");
assert_eq!(format_bytes(3 * 1024 * 1024 * 1024), "3.0GiB");
}
#[test]
fn format_row_sums_network_and_shows_name() {
let mut network = HashMap::new();
network.insert("eth0".to_string(), NetStat { rx: 1024, tx: 2048 });
let s = ContainerStat {
name: "proj-web".into(),
cpu: 12.5,
mem_usage: 1024 * 1024,
mem_limit: 1024 * 1024 * 1024,
mem_perc: 0.1,
block_in: 0,
block_out: 0,
pids: 3,
network,
};
let row = format_row(&s);
assert!(row.contains("proj-web"));
assert!(row.contains("12.50%"));
assert!(row.contains("1.0MiB"));
assert!(row.contains('3'));
}
#[test]
fn stat_tolerates_null_network() {
let json = r#"{"Name":"proj-web","CPU":1.0,"Network":null}"#;
let stat: ContainerStat = serde_json::from_str(json).unwrap();
assert_eq!(stat.name, "proj-web");
assert!(stat.network.is_empty());
}
#[test]
fn stat_tolerates_missing_network() {
let json = r#"{"Name":"proj-web"}"#;
let stat: ContainerStat = serde_json::from_str(json).unwrap();
assert!(stat.network.is_empty());
}
#[test]
fn containers_query_is_sorted_and_scoped() {
let mut wanted = HashSet::new();
wanted.insert("proj-web-1".to_string());
wanted.insert("proj-db-1".to_string());
assert_eq!(
containers_query(&wanted),
"&containers=proj-db-1,proj-web-1"
);
}
#[test]
fn containers_query_empty_when_none_wanted() {
assert_eq!(containers_query(&HashSet::new()), "");
}
}