compose_rs/command/
stats.rs

1use std::{
2    collections::HashMap,
3    io::{BufRead, BufReader},
4    sync::mpsc,
5    thread,
6    time::Duration,
7};
8
9use parse_size::parse_size;
10use serde::{Deserialize, Serialize};
11
12use crate::{parser, ComposeCommand, ComposeError};
13
14use super::CatchOutput;
15
16//{"BlockIO":"0B / 0B","CPUPerc":"0.03%","Container":"9ca40acb565a","ID":"9ca40acb565a","MemPerc":"0.13%","MemUsage":"10MiB / 7.685GiB","Name":"examples-rqlite-1","NetIO":"1.39kB / 0B","PIDs":"10"}
17
18#[derive(Serialize, Debug, Clone)]
19pub struct StatsUsage {
20    usage: u64,
21    limit: u64,
22}
23
24#[derive(Serialize, Debug, Clone)]
25pub struct StatsIO {
26    input: u64,
27    output: u64,
28}
29
30// Implement the Deserialize trait for StatsIO, same as StatsUsage
31impl<'de> serde::Deserialize<'de> for StatsIO {
32    fn deserialize<D>(deserializer: D) -> Result<StatsIO, D::Error>
33    where
34        D: serde::Deserializer<'de>,
35    {
36        struct StatsIOVisitor;
37
38        impl<'de> serde::de::Visitor<'de> for StatsIOVisitor {
39            type Value = StatsIO;
40
41            fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
42                formatter.write_str("a string in the format 'input/output'")
43            }
44
45            fn visit_str<E>(self, value: &str) -> Result<StatsIO, E>
46            where
47                E: serde::de::Error,
48            {
49                let parts = value.split('/').map(str::trim).collect::<Vec<&str>>();
50
51                let input = parse_size(parts[0]).map_err(|_| E::custom("Failed to parse input"))?;
52                let output =
53                    parse_size(parts[1]).map_err(|_| E::custom("Failed to parse output"))?;
54                Ok(StatsIO { input, output })
55            }
56        }
57
58        deserializer.deserialize_str(StatsIOVisitor)
59    }
60}
61
62impl<'de> serde::Deserialize<'de> for StatsUsage {
63    fn deserialize<D>(deserializer: D) -> Result<StatsUsage, D::Error>
64    where
65        D: serde::Deserializer<'de>,
66    {
67        struct StatsUsageVisitor;
68
69        impl<'de> serde::de::Visitor<'de> for StatsUsageVisitor {
70            type Value = StatsUsage;
71
72            fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
73                formatter.write_str("a string in the format 'current/total'")
74            }
75
76            fn visit_str<E>(self, value: &str) -> Result<StatsUsage, E>
77            where
78                E: serde::de::Error,
79            {
80                let parts = value.split('/').map(str::trim).collect::<Vec<&str>>();
81
82                let current =
83                    parse_size(parts[0]).map_err(|_| E::custom("Failed to parse current"))?;
84                let total = parse_size(parts[1]).map_err(|_| E::custom("Failed to parse total"))?;
85                Ok(StatsUsage {
86                    usage: current,
87                    limit: total,
88                })
89            }
90        }
91
92        deserializer.deserialize_str(StatsUsageVisitor)
93    }
94}
95
96#[derive(Serialize, Debug, Clone)]
97pub struct StatsPercentage(f64);
98
99impl<'de> serde::Deserialize<'de> for StatsPercentage {
100    fn deserialize<D>(deserializer: D) -> Result<StatsPercentage, D::Error>
101    where
102        D: serde::Deserializer<'de>,
103    {
104        struct StatsPercentageVisitor;
105
106        impl<'de> serde::de::Visitor<'de> for StatsPercentageVisitor {
107            type Value = StatsPercentage;
108
109            fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
110                formatter.write_str("a string in the format '0.00%'")
111            }
112
113            fn visit_str<E>(self, value: &str) -> Result<StatsPercentage, E>
114            where
115                E: serde::de::Error,
116            {
117                let value = value.trim_end_matches('%');
118                let value = value.parse::<f64>().map_err(E::custom)?;
119                Ok(StatsPercentage(value))
120            }
121        }
122
123        deserializer.deserialize_str(StatsPercentageVisitor)
124    }
125}
126
127#[derive(Deserialize, Serialize, Debug, Clone)]
128#[serde(rename_all = "PascalCase")]
129pub struct Stats {
130    #[serde(rename = "BlockIO")]
131    pub block_io: StatsIO,
132    #[serde(rename = "CPUPerc")]
133    pub cpu_perc: StatsPercentage,
134    pub container: String,
135    #[serde(rename = "ID")]
136    pub id: String,
137    pub mem_perc: StatsPercentage,
138    pub mem_usage: StatsUsage,
139    pub name: String,
140    #[serde(rename = "NetIO")]
141    pub net_io: StatsIO,
142    #[serde(rename = "PIDs")]
143    pub pids: String,
144}
145
146pub struct StatsCommand {
147    command: std::process::Command,
148    poll_interval: Option<Duration>,
149}
150
151type StatsIterator = Box<dyn Iterator<Item = Result<Vec<Stats>, ComposeError>> + Send>;
152
153impl StatsCommand {
154    pub fn new(cmd: std::process::Command) -> Self {
155        Self {
156            command: cmd,
157            poll_interval: None,
158        }
159    }
160
161    pub fn stream(self) -> Result<StatsIterator, ComposeError> {
162        let mut command = self.command;
163
164        command.arg("stats").arg("--format").arg("json");
165
166        let stdout = command
167            .stdout(std::process::Stdio::piped())
168            .spawn()?
169            .stdout
170            .ok_or(ComposeError::IoError(std::io::Error::other(
171                "Failed to open stdout",
172            )))?;
173
174        let (tx, rx) = mpsc::channel();
175
176        thread::spawn(move || {
177            let mut reader = BufReader::new(stdout);
178            let interval = self.poll_interval.unwrap_or(Duration::from_secs(1));
179
180            loop {
181                let mut lines = Vec::new();
182                let start_time = std::time::Instant::now();
183
184                while start_time.elapsed() < interval {
185                    let mut line = String::new();
186                    match reader.read_line(&mut line) {
187                        Ok(0) => break,
188                        Ok(_) => {
189                            let line = parser::remove_ansi_codes(&line);
190
191                            if let Ok(line) = line {
192                                lines.push(line.trim().to_string());
193                            } else {
194                                tx.send(Err(ComposeError::ParseError(
195                                    "Failed to parse line".to_string(),
196                                )))
197                                .expect("Failed to send error");
198                                return;
199                            }
200                        }
201                        Err(err) => {
202                            tx.send(Err(ComposeError::IoError(err)))
203                                .expect("Failed to send error");
204                            return;
205                        }
206                    }
207                }
208
209                if lines.is_empty() {
210                    break;
211                }
212
213                let stats = lines
214                    .iter()
215                    .map(|line| serde_json::from_str(line))
216                    .collect::<Result<Vec<Stats>, _>>();
217
218                match stats {
219                    Ok(stats) => {
220                        // if container occurs multiple times in the output, we calculate the average
221                        let mut dedupe_stats_map: HashMap<String, Vec<Stats>> = HashMap::new();
222
223                        for stat in stats {
224                            let entry = dedupe_stats_map.entry(stat.container.clone()).or_default();
225                            entry.push(stat);
226                        }
227
228                        let stats = dedupe_stats_map
229                            .values()
230                            .map(|stats| {
231                                let mut avg_stats = stats[0].clone();
232                                let len = stats.len() as u64;
233
234                                for stat in stats.iter().skip(1) {
235                                    avg_stats.block_io.input += stat.block_io.input;
236                                    avg_stats.block_io.output += stat.block_io.output;
237                                    avg_stats.cpu_perc.0 += stat.cpu_perc.0;
238                                    avg_stats.mem_perc.0 += stat.mem_perc.0;
239                                    avg_stats.mem_usage.usage += stat.mem_usage.usage;
240                                    avg_stats.mem_usage.limit += stat.mem_usage.limit;
241                                    avg_stats.net_io.input += stat.net_io.input;
242                                    avg_stats.net_io.output += stat.net_io.output;
243                                }
244
245                                avg_stats.block_io.input /= len;
246                                avg_stats.block_io.output /= len;
247                                avg_stats.cpu_perc.0 /= len as f64;
248                                avg_stats.mem_perc.0 /= len as f64;
249                                avg_stats.mem_usage.usage /= len;
250                                avg_stats.mem_usage.limit /= len;
251                                avg_stats.net_io.input /= len;
252                                avg_stats.net_io.output /= len;
253
254                                avg_stats
255                            })
256                            .collect();
257
258                        tx.send(Ok(stats)).expect("Failed to send stats");
259                    }
260                    Err(err) => {
261                        tx.send(Err(ComposeError::ParseError(err.to_string())))
262                            .expect("Failed to send error");
263                        return;
264                    }
265                }
266            }
267        });
268
269        Ok(Box::new(rx.into_iter()))
270    }
271}
272
273impl ComposeCommand<Vec<Stats>, ()> for StatsCommand {
274    const COMMAND: &'static str = "stats";
275
276    fn exec(self) -> Result<Vec<Stats>, ComposeError> {
277        let mut command = self.command;
278
279        command
280            .arg(Self::COMMAND)
281            .arg("--format")
282            .arg("json")
283            .arg("--no-stream");
284
285        let output = command.output().catch_output()?;
286
287        let output = String::from_utf8_lossy(&output.stdout);
288
289        let stats = output
290            .lines()
291            .map(serde_json::from_str)
292            .collect::<Result<Vec<Stats>, _>>()?;
293
294        Ok(stats)
295    }
296}