compose_rs/command/
stats.rs1use std::{
2 collections::HashMap,
3 io::{BufRead, BufReader},
4 sync::mpsc,
5 thread,
6 time::Duration,
7};
8
9use parse_size::parse_size;
10use serde::{Deserialize, Serialize};
11
12use crate::{parser, ComposeCommand, ComposeError};
13
14use super::CatchOutput;
15
16#[derive(Serialize, Debug, Clone)]
19pub struct StatsUsage {
20 usage: u64,
21 limit: u64,
22}
23
24#[derive(Serialize, Debug, Clone)]
25pub struct StatsIO {
26 input: u64,
27 output: u64,
28}
29
30impl<'de> serde::Deserialize<'de> for StatsIO {
32 fn deserialize<D>(deserializer: D) -> Result<StatsIO, D::Error>
33 where
34 D: serde::Deserializer<'de>,
35 {
36 struct StatsIOVisitor;
37
38 impl<'de> serde::de::Visitor<'de> for StatsIOVisitor {
39 type Value = StatsIO;
40
41 fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
42 formatter.write_str("a string in the format 'input/output'")
43 }
44
45 fn visit_str<E>(self, value: &str) -> Result<StatsIO, E>
46 where
47 E: serde::de::Error,
48 {
49 let parts = value.split('/').map(str::trim).collect::<Vec<&str>>();
50
51 let input = parse_size(parts[0]).map_err(|_| E::custom("Failed to parse input"))?;
52 let output =
53 parse_size(parts[1]).map_err(|_| E::custom("Failed to parse output"))?;
54 Ok(StatsIO { input, output })
55 }
56 }
57
58 deserializer.deserialize_str(StatsIOVisitor)
59 }
60}
61
62impl<'de> serde::Deserialize<'de> for StatsUsage {
63 fn deserialize<D>(deserializer: D) -> Result<StatsUsage, D::Error>
64 where
65 D: serde::Deserializer<'de>,
66 {
67 struct StatsUsageVisitor;
68
69 impl<'de> serde::de::Visitor<'de> for StatsUsageVisitor {
70 type Value = StatsUsage;
71
72 fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
73 formatter.write_str("a string in the format 'current/total'")
74 }
75
76 fn visit_str<E>(self, value: &str) -> Result<StatsUsage, E>
77 where
78 E: serde::de::Error,
79 {
80 let parts = value.split('/').map(str::trim).collect::<Vec<&str>>();
81
82 let current =
83 parse_size(parts[0]).map_err(|_| E::custom("Failed to parse current"))?;
84 let total = parse_size(parts[1]).map_err(|_| E::custom("Failed to parse total"))?;
85 Ok(StatsUsage {
86 usage: current,
87 limit: total,
88 })
89 }
90 }
91
92 deserializer.deserialize_str(StatsUsageVisitor)
93 }
94}
95
96#[derive(Serialize, Debug, Clone)]
97pub struct StatsPercentage(f64);
98
99impl<'de> serde::Deserialize<'de> for StatsPercentage {
100 fn deserialize<D>(deserializer: D) -> Result<StatsPercentage, D::Error>
101 where
102 D: serde::Deserializer<'de>,
103 {
104 struct StatsPercentageVisitor;
105
106 impl<'de> serde::de::Visitor<'de> for StatsPercentageVisitor {
107 type Value = StatsPercentage;
108
109 fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
110 formatter.write_str("a string in the format '0.00%'")
111 }
112
113 fn visit_str<E>(self, value: &str) -> Result<StatsPercentage, E>
114 where
115 E: serde::de::Error,
116 {
117 let value = value.trim_end_matches('%');
118 let value = value.parse::<f64>().map_err(E::custom)?;
119 Ok(StatsPercentage(value))
120 }
121 }
122
123 deserializer.deserialize_str(StatsPercentageVisitor)
124 }
125}
126
127#[derive(Deserialize, Serialize, Debug, Clone)]
128#[serde(rename_all = "PascalCase")]
129pub struct Stats {
130 #[serde(rename = "BlockIO")]
131 pub block_io: StatsIO,
132 #[serde(rename = "CPUPerc")]
133 pub cpu_perc: StatsPercentage,
134 pub container: String,
135 #[serde(rename = "ID")]
136 pub id: String,
137 pub mem_perc: StatsPercentage,
138 pub mem_usage: StatsUsage,
139 pub name: String,
140 #[serde(rename = "NetIO")]
141 pub net_io: StatsIO,
142 #[serde(rename = "PIDs")]
143 pub pids: String,
144}
145
146pub struct StatsCommand {
147 command: std::process::Command,
148 poll_interval: Option<Duration>,
149}
150
151type StatsIterator = Box<dyn Iterator<Item = Result<Vec<Stats>, ComposeError>> + Send>;
152
153impl StatsCommand {
154 pub fn new(cmd: std::process::Command) -> Self {
155 Self {
156 command: cmd,
157 poll_interval: None,
158 }
159 }
160
161 pub fn stream(self) -> Result<StatsIterator, ComposeError> {
162 let mut command = self.command;
163
164 command.arg("stats").arg("--format").arg("json");
165
166 let stdout = command
167 .stdout(std::process::Stdio::piped())
168 .spawn()?
169 .stdout
170 .ok_or(ComposeError::IoError(std::io::Error::other(
171 "Failed to open stdout",
172 )))?;
173
174 let (tx, rx) = mpsc::channel();
175
176 thread::spawn(move || {
177 let mut reader = BufReader::new(stdout);
178 let interval = self.poll_interval.unwrap_or(Duration::from_secs(1));
179
180 loop {
181 let mut lines = Vec::new();
182 let start_time = std::time::Instant::now();
183
184 while start_time.elapsed() < interval {
185 let mut line = String::new();
186 match reader.read_line(&mut line) {
187 Ok(0) => break,
188 Ok(_) => {
189 let line = parser::remove_ansi_codes(&line);
190
191 if let Ok(line) = line {
192 lines.push(line.trim().to_string());
193 } else {
194 tx.send(Err(ComposeError::ParseError(
195 "Failed to parse line".to_string(),
196 )))
197 .expect("Failed to send error");
198 return;
199 }
200 }
201 Err(err) => {
202 tx.send(Err(ComposeError::IoError(err)))
203 .expect("Failed to send error");
204 return;
205 }
206 }
207 }
208
209 if lines.is_empty() {
210 break;
211 }
212
213 let stats = lines
214 .iter()
215 .map(|line| serde_json::from_str(line))
216 .collect::<Result<Vec<Stats>, _>>();
217
218 match stats {
219 Ok(stats) => {
220 let mut dedupe_stats_map: HashMap<String, Vec<Stats>> = HashMap::new();
222
223 for stat in stats {
224 let entry = dedupe_stats_map.entry(stat.container.clone()).or_default();
225 entry.push(stat);
226 }
227
228 let stats = dedupe_stats_map
229 .values()
230 .map(|stats| {
231 let mut avg_stats = stats[0].clone();
232 let len = stats.len() as u64;
233
234 for stat in stats.iter().skip(1) {
235 avg_stats.block_io.input += stat.block_io.input;
236 avg_stats.block_io.output += stat.block_io.output;
237 avg_stats.cpu_perc.0 += stat.cpu_perc.0;
238 avg_stats.mem_perc.0 += stat.mem_perc.0;
239 avg_stats.mem_usage.usage += stat.mem_usage.usage;
240 avg_stats.mem_usage.limit += stat.mem_usage.limit;
241 avg_stats.net_io.input += stat.net_io.input;
242 avg_stats.net_io.output += stat.net_io.output;
243 }
244
245 avg_stats.block_io.input /= len;
246 avg_stats.block_io.output /= len;
247 avg_stats.cpu_perc.0 /= len as f64;
248 avg_stats.mem_perc.0 /= len as f64;
249 avg_stats.mem_usage.usage /= len;
250 avg_stats.mem_usage.limit /= len;
251 avg_stats.net_io.input /= len;
252 avg_stats.net_io.output /= len;
253
254 avg_stats
255 })
256 .collect();
257
258 tx.send(Ok(stats)).expect("Failed to send stats");
259 }
260 Err(err) => {
261 tx.send(Err(ComposeError::ParseError(err.to_string())))
262 .expect("Failed to send error");
263 return;
264 }
265 }
266 }
267 });
268
269 Ok(Box::new(rx.into_iter()))
270 }
271}
272
273impl ComposeCommand<Vec<Stats>, ()> for StatsCommand {
274 const COMMAND: &'static str = "stats";
275
276 fn exec(self) -> Result<Vec<Stats>, ComposeError> {
277 let mut command = self.command;
278
279 command
280 .arg(Self::COMMAND)
281 .arg("--format")
282 .arg("json")
283 .arg("--no-stream");
284
285 let output = command.output().catch_output()?;
286
287 let output = String::from_utf8_lossy(&output.stdout);
288
289 let stats = output
290 .lines()
291 .map(serde_json::from_str)
292 .collect::<Result<Vec<Stats>, _>>()?;
293
294 Ok(stats)
295 }
296}