1use anyhow::{anyhow, Result};
3use chrono::prelude::*;
4use log::trace;
5use serde::{Deserialize, Serialize};
6use std::collections::BTreeMap;
7use std::ops;
8use std::time::UNIX_EPOCH;
9
10use super::RunnerState;
11use rd_util::*;
12
13const REPORT_DOC: &str = "\
14//
15// rd-agent summary report
16//
17// svc.name is an empty string if the service doesn't exist. svc.state
18// is either Running, Exited, Failed or Other.
19//
20// timestamp: When this report was generated
21// seq: Incremented on each execution, used for temporary settings
22// state: Idle, Running, BenchHashd or BenchIoCost
23// oomd.svc.name: OOMD systemd service name
24// oomd.svc.state: OOMD systemd service state
25// oomd.work_mem_pressure: Memory pressure based kill enabled in workload.slice
26// oomd.work_senpai: Senpai enabled on workload.slice
27// oomd.sys_mem_pressure: Memory pressure based kill enabled in system.slice
28// oomd.sys_senpai: Senpai enabled on system.slice
29// sideloader.svc.name: sideloader systemd service name
30// sideloader.svc.state: sideloader systemd service state
31// sideloader.sysconf_warnings: sideloader system configuration warnings
32// sideloader.overload: sideloader is in overloaded state
33// sideloader.overload_why: the reason for overloaded state
34// sideloader.critical: sideloader is in crticial state
35// sideloader.overload_why: the reason for critical state
36// bench.hashd.svc.name: rd-hashd benchmark systemd service name
37// bench.hashd.svc.state: rd-hashd benchmark systemd service state
38// bench.hashd.phase: rd-hashd benchmark phase
39// bench.hashd.mem_probe_size: memory size rd-hashd benchmark is probing
40// bench.hashd.mem_probe_at: the timestamp this memory probing started at
41// bench.iocost.svc.name: iocost benchmark systemd service name
42// bench.iocost.svc.state: iocost benchmark systemd service state
43// hashd[].svc.name: rd-hashd systemd service name
44// hashd[].svc.state: rd-hashd systemd service state
45// hashd[].load: Current rps / rps_max
46// hashd[].rps: Current rps
47// hashd[].lat_pct: Current control percentile
48// hashd[].lat: Current control percentile latency
49// sysloads{}.svc.name: Sysload systemd service name
50// sysloads{}.svc.state: Sysload systemd service state
51// sideloads{}.svc.name: Sideload systemd service name
52// sideloads{}.svc.state: Sideload systemd service state
53// iocost.model: iocost model parameters currently in effect
54// iocost.qos: iocost QoS parameters currently in effect
55// iolat.{read|write|discard|flush}.p*: IO latency distributions
56// iolat_cum.{read|write|discard|flush}.p*: Cumulative IO latency distributions
57// swappiness: vm.swappiness
58// zswap_enabled: zswap enabled
59//
60//
61";
62
63#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
64pub enum SvcStateReport {
65 Running,
66 Exited,
67 Failed,
68 Other,
69}
70
71impl Default for SvcStateReport {
72 fn default() -> Self {
73 Self::Other
74 }
75}
76
77#[derive(Clone, Serialize, Deserialize, Default)]
78pub struct SvcReport {
79 pub name: String,
80 pub state: SvcStateReport,
81}
82
83#[derive(Clone, Serialize, Deserialize, Default)]
84pub struct ResCtlReport {
85 pub cpu: bool,
86 pub mem: bool,
87 pub io: bool,
88}
89
90#[derive(Clone, Serialize, Deserialize, Default)]
91pub struct OomdReport {
92 pub svc: SvcReport,
93 pub work_mem_pressure: bool,
94 pub work_senpai: bool,
95 pub sys_mem_pressure: bool,
96 pub sys_senpai: bool,
97}
98
99#[derive(Clone, Serialize, Deserialize)]
100pub struct BenchHashdReport {
101 pub svc: SvcReport,
102 pub phase: rd_hashd_intf::Phase,
103 pub mem_probe_size: usize,
104 pub mem_probe_at: DateTime<Local>,
105}
106
107impl Default for BenchHashdReport {
108 fn default() -> Self {
109 Self {
110 svc: Default::default(),
111 phase: Default::default(),
112 mem_probe_size: 0,
113 mem_probe_at: DateTime::from(UNIX_EPOCH),
114 }
115 }
116}
117
118#[derive(Clone, Serialize, Deserialize, Default)]
119pub struct BenchIoCostReport {
120 pub svc: SvcReport,
121}
122
123#[derive(Clone, Serialize, Deserialize, Default)]
124pub struct SideloaderReport {
125 pub svc: SvcReport,
126 pub sysconf_warnings: Vec<String>,
127 pub overload: bool,
128 pub overload_why: String,
129 pub critical: bool,
130 pub critical_why: String,
131}
132
133#[derive(Clone, Serialize, Deserialize)]
134pub struct HashdReport {
135 pub svc: SvcReport,
136 pub phase: rd_hashd_intf::Phase,
137 pub load: f64,
138 pub rps: f64,
139 pub lat_pct: f64,
140 pub lat: rd_hashd_intf::Latencies,
141 pub nr_in_flight: u32,
142 pub nr_done: u64,
143 pub nr_workers: usize,
144 pub nr_idle_workers: usize,
145 pub mem_probe_size: usize,
146 pub mem_probe_at: DateTime<Local>,
147}
148
149impl Default for HashdReport {
150 fn default() -> Self {
151 Self {
152 svc: Default::default(),
153 phase: Default::default(),
154 load: 0.0,
155 rps: 0.0,
156 lat_pct: 0.0,
157 lat: Default::default(),
158 nr_in_flight: 0,
159 nr_done: 0,
160 nr_workers: 0,
161 nr_idle_workers: 0,
162 mem_probe_size: 0,
163 mem_probe_at: DateTime::from(UNIX_EPOCH),
164 }
165 }
166}
167
168impl ops::AddAssign<&HashdReport> for HashdReport {
169 fn add_assign(&mut self, rhs: &HashdReport) {
170 self.load += rhs.load;
171 self.rps += rhs.rps;
172 self.lat_pct += rhs.lat_pct;
173 self.lat += &rhs.lat;
174 self.nr_in_flight += rhs.nr_in_flight;
175 self.nr_done += rhs.nr_done;
176 self.nr_workers += rhs.nr_workers;
177 self.nr_idle_workers += rhs.nr_idle_workers;
178 }
179}
180
181impl<T: Into<f64>> ops::DivAssign<T> for HashdReport {
182 fn div_assign(&mut self, rhs: T) {
183 let div = rhs.into();
184 self.load /= div;
185 self.rps /= div;
186 self.lat_pct /= div;
187 self.lat /= div;
188 self.nr_in_flight = ((self.nr_in_flight as f64) / div).round() as u32;
189 self.nr_done = ((self.nr_done as f64) / div).round() as u64;
190 self.nr_workers = ((self.nr_workers as f64) / div).round() as usize;
191 self.nr_idle_workers = ((self.nr_idle_workers as f64) / div).round() as usize;
192 }
193}
194
195#[derive(Clone, Serialize, Deserialize)]
196pub struct SysloadReport {
197 pub svc: SvcReport,
198 pub scr_path: String,
199}
200
201#[derive(Clone, Serialize, Deserialize)]
202pub struct SideloadReport {
203 pub svc: SvcReport,
204 pub scr_path: String,
205}
206
207#[derive(Debug, Default, Clone, Serialize, Deserialize)]
208pub struct UsageReport {
209 pub cpu_util: f64,
210 pub cpu_sys: f64,
211 pub cpu_usage: f64,
212 pub cpu_usage_sys: f64,
213 pub cpu_usage_base: f64,
214 pub mem_bytes: u64,
215 pub swap_bytes: u64,
216 pub swap_free: u64,
217 pub io_rbytes: u64,
218 pub io_wbytes: u64,
219 pub io_rbps: u64,
220 pub io_wbps: u64,
221 pub io_usage: f64,
222 pub io_util: f64,
223 pub cpu_stalls: (f64, f64),
224 pub mem_stalls: (f64, f64),
225 pub io_stalls: (f64, f64),
226 pub cpu_pressures: (f64, f64),
227 pub mem_pressures: (f64, f64),
228 pub io_pressures: (f64, f64),
229}
230
231impl ops::AddAssign<&UsageReport> for UsageReport {
232 fn add_assign(&mut self, rhs: &UsageReport) {
233 self.cpu_util += rhs.cpu_util;
234 self.cpu_sys += rhs.cpu_sys;
235 self.cpu_usage += rhs.cpu_usage;
236 self.cpu_usage_sys += rhs.cpu_usage_sys;
237 self.mem_bytes += rhs.mem_bytes;
238 self.swap_bytes += rhs.swap_bytes;
239 self.swap_free += rhs.swap_free;
240 self.io_rbytes += rhs.io_rbytes;
241 self.io_wbytes += rhs.io_wbytes;
242 self.io_rbps += rhs.io_rbps;
243 self.io_wbps += rhs.io_wbps;
244 self.io_usage += rhs.io_usage;
245 self.io_util += rhs.io_util;
246 self.cpu_stalls.0 += rhs.cpu_stalls.0;
247 self.cpu_stalls.1 += rhs.cpu_stalls.1;
248 self.mem_stalls.0 += rhs.mem_stalls.0;
249 self.mem_stalls.1 += rhs.mem_stalls.1;
250 self.io_stalls.0 += rhs.io_stalls.0;
251 self.io_stalls.1 += rhs.io_stalls.1;
252 self.cpu_pressures.0 += rhs.cpu_pressures.0;
253 self.cpu_pressures.1 += rhs.cpu_pressures.1;
254 self.mem_pressures.0 += rhs.mem_pressures.0;
255 self.mem_pressures.1 += rhs.mem_pressures.1;
256 self.io_pressures.0 += rhs.io_pressures.0;
257 self.io_pressures.1 += rhs.io_pressures.1;
258 }
259}
260
261impl<T: Into<f64>> ops::DivAssign<T> for UsageReport {
262 fn div_assign(&mut self, rhs: T) {
263 let div = rhs.into();
264 let div_u64 = |v: &mut u64| *v = (*v as f64 / div).round() as u64;
265 self.cpu_util /= div;
266 self.cpu_sys /= div;
267 self.cpu_usage /= div;
268 self.cpu_usage_sys /= div;
269 div_u64(&mut self.mem_bytes);
270 div_u64(&mut self.swap_bytes);
271 div_u64(&mut self.swap_free);
272 div_u64(&mut self.io_rbytes);
273 div_u64(&mut self.io_wbytes);
274 div_u64(&mut self.io_rbps);
275 div_u64(&mut self.io_wbps);
276 self.io_usage /= div;
277 self.io_util /= div;
278 self.cpu_stalls.0 /= div;
279 self.cpu_stalls.1 /= div;
280 self.mem_stalls.0 /= div;
281 self.mem_stalls.1 /= div;
282 self.io_stalls.0 /= div;
283 self.io_stalls.1 /= div;
284 self.cpu_pressures.0 /= div;
285 self.cpu_pressures.1 /= div;
286 self.mem_pressures.0 /= div;
287 self.mem_pressures.1 /= div;
288 self.io_pressures.0 /= div;
289 self.io_pressures.1 /= div;
290 }
291}
292
293#[derive(Clone, Serialize, Deserialize)]
294pub struct IoLatReport {
295 #[serde(flatten)]
296 pub map: BTreeMap<String, BTreeMap<String, f64>>,
297}
298
299impl IoLatReport {
300 pub const PCTS: &'static [&'static str] = &[
301 "00", "01", "05", "10", "25", "50", "75", "90", "95", "99", "99.9", "99.99", "99.999",
302 "100",
303 ];
304}
305
306impl IoLatReport {
307 pub fn accumulate(&mut self, rhs: &IoLatReport) {
308 for key in &["read", "write", "discard", "flush"] {
309 let key = key.to_string();
310 let lpcts = self.map.get_mut(&key).unwrap();
311 let rpcts = &rhs.map[&key];
312 for pct in Self::PCTS.iter() {
313 let pct = pct.to_string();
314 let lv = lpcts.get_mut(&pct).unwrap();
315 *lv = lv.max(rpcts[&pct]);
316 }
317 }
318 }
319}
320
321impl Default for IoLatReport {
322 fn default() -> Self {
323 let mut map = BTreeMap::new();
324 for key in &["read", "write", "discard", "flush"] {
325 let mut pcts = BTreeMap::new();
326 for pct in Self::PCTS.iter() {
327 pcts.insert(pct.to_string(), 0.0);
328 }
329 map.insert(key.to_string(), pcts);
330 }
331 Self { map }
332 }
333}
334
335#[derive(Clone, Debug, Serialize, Deserialize)]
336pub struct IoCostModelReport {
337 pub ctrl: String,
338 pub model: String,
339 #[serde(flatten)]
340 pub knobs: IoCostModelParams,
341}
342
343impl Default for IoCostModelReport {
344 fn default() -> Self {
345 Self {
346 ctrl: "".into(),
347 model: "".into(),
348 knobs: Default::default(),
349 }
350 }
351}
352
353impl IoCostModelReport {
354 pub fn read(devnr: (u32, u32)) -> Result<Self> {
355 let kf = read_cgroup_nested_keyed_file("/sys/fs/cgroup/io.cost.model")?;
356 let map = match kf.get(&format!("{}:{}", devnr.0, devnr.1)) {
357 Some(v) => v,
358 None => return Ok(Default::default()),
359 };
360 let kerr = "missing key in io.cost.model";
361 Ok(Self {
362 ctrl: map.get("ctrl").ok_or(anyhow!(kerr))?.clone(),
363 model: map.get("model").ok_or(anyhow!(kerr))?.clone(),
364 knobs: IoCostModelParams {
365 rbps: map.get("rbps").ok_or(anyhow!(kerr))?.parse::<u64>()?,
366 rseqiops: map.get("rseqiops").ok_or(anyhow!(kerr))?.parse::<u64>()?,
367 rrandiops: map.get("rrandiops").ok_or(anyhow!(kerr))?.parse::<u64>()?,
368 wbps: map.get("wbps").ok_or(anyhow!(kerr))?.parse::<u64>()?,
369 wseqiops: map.get("wseqiops").ok_or(anyhow!(kerr))?.parse::<u64>()?,
370 wrandiops: map.get("wrandiops").ok_or(anyhow!(kerr))?.parse::<u64>()?,
371 },
372 })
373 }
374}
375
376#[derive(Clone, Debug, Serialize, Deserialize)]
377pub struct IoCostQoSReport {
378 pub enable: u32,
379 pub ctrl: String,
380 #[serde(flatten)]
381 pub knobs: IoCostQoSParams,
382}
383
384impl IoCostQoSReport {
385 pub fn read(devnr: (u32, u32)) -> Result<Self> {
386 let kf = read_cgroup_nested_keyed_file("/sys/fs/cgroup/io.cost.qos")?;
387 let map = match kf.get(&format!("{}:{}", devnr.0, devnr.1)) {
388 Some(v) => v,
389 None => return Ok(Default::default()),
390 };
391 let kerr = "missing key in io.cost.qos";
392 Ok(Self {
393 enable: map.get("enable").ok_or(anyhow!(kerr))?.parse::<u32>()?,
394 ctrl: map.get("ctrl").ok_or(anyhow!(kerr))?.clone(),
395 knobs: IoCostQoSParams {
396 rpct: map.get("rpct").ok_or(anyhow!(kerr))?.parse::<f64>()?,
397 rlat: map.get("rlat").ok_or(anyhow!(kerr))?.parse::<u64>()?,
398 wpct: map.get("wpct").ok_or(anyhow!(kerr))?.parse::<f64>()?,
399 wlat: map.get("wlat").ok_or(anyhow!(kerr))?.parse::<u64>()?,
400 min: map.get("min").ok_or(anyhow!(kerr))?.parse::<f64>()?,
401 max: map.get("max").ok_or(anyhow!(kerr))?.parse::<f64>()?,
402 },
403 })
404 }
405}
406
407impl Default for IoCostQoSReport {
408 fn default() -> Self {
409 Self {
410 enable: 0,
411 ctrl: "".into(),
412 knobs: Default::default(),
413 }
414 }
415}
416
417#[derive(Clone, Debug, Default, Serialize, Deserialize)]
418pub struct IoCostReport {
419 pub vrate: f64,
420 pub model: IoCostModelReport,
421 pub qos: IoCostQoSReport,
422}
423
424impl ops::AddAssign<&IoCostReport> for IoCostReport {
425 fn add_assign(&mut self, rhs: &IoCostReport) {
426 let base_vrate = self.vrate;
427 *self = rhs.clone();
428 self.vrate += base_vrate;
429 }
430}
431
432impl<T: Into<f64>> ops::DivAssign<T> for IoCostReport {
433 fn div_assign(&mut self, rhs: T) {
434 let div = rhs.into();
435 self.vrate /= div;
436 }
437}
438
439impl IoCostReport {
440 pub fn read(devnr: (u32, u32)) -> Result<Self> {
441 let kf = read_cgroup_nested_keyed_file("/sys/fs/cgroup/io.stat")?;
442 let vrate = match kf.get(&format!("{}:{}", devnr.0, devnr.1)) {
443 Some(map) => map
444 .get("cost.vrate")
445 .map(String::as_str)
446 .unwrap_or("0.0")
447 .parse::<f64>()?,
448 None => 0.0,
449 };
450 Ok(Self {
451 vrate: vrate,
452 model: IoCostModelReport::read(devnr)?,
453 qos: IoCostQoSReport::read(devnr)?,
454 })
455 }
456}
457
458pub type StatMap = BTreeMap<String, f64>;
459
460#[derive(Clone, Serialize, Deserialize)]
461pub struct Report {
462 pub timestamp: DateTime<Local>,
463 pub seq: u64,
464 pub state: RunnerState,
465 pub resctl: ResCtlReport,
466 pub oomd: OomdReport,
467 pub sideloader: SideloaderReport,
468 pub bench_hashd: BenchHashdReport,
469 pub bench_iocost: BenchIoCostReport,
470 pub hashd: [HashdReport; 2],
471 pub sysloads: BTreeMap<String, SysloadReport>,
472 pub sideloads: BTreeMap<String, SideloadReport>,
473 pub usages: BTreeMap<String, UsageReport>,
474 pub mem_stat: BTreeMap<String, StatMap>,
475 pub io_stat: BTreeMap<String, StatMap>,
476 pub vmstat: StatMap,
477 pub iolat: IoLatReport,
478 pub iolat_cum: IoLatReport,
479 pub iocost: IoCostReport,
480 pub swappiness: u32,
481 pub zswap_enabled: bool,
482}
483
484impl Default for Report {
485 fn default() -> Self {
486 Self {
487 timestamp: DateTime::from(UNIX_EPOCH),
488 seq: 1,
489 state: RunnerState::Idle,
490 resctl: Default::default(),
491 oomd: Default::default(),
492 sideloader: Default::default(),
493 bench_hashd: Default::default(),
494 bench_iocost: Default::default(),
495 hashd: Default::default(),
496 sysloads: Default::default(),
497 sideloads: Default::default(),
498 usages: Default::default(),
499 mem_stat: Default::default(),
500 io_stat: Default::default(),
501 vmstat: Default::default(),
502 iolat: Default::default(),
503 iolat_cum: Default::default(),
504 iocost: Default::default(),
505 swappiness: 60,
506 zswap_enabled: false,
507 }
508 }
509}
510
511impl JsonLoad for Report {}
512
513impl JsonSave for Report {
514 fn preamble() -> Option<String> {
515 Some(REPORT_DOC.to_string())
516 }
517}
518
519pub struct ReportPathIter {
520 dir: String,
521 front: u64,
522 back: u64,
523}
524
525impl ReportPathIter {
526 pub fn new(dir: &str, period: (u64, u64)) -> Self {
527 Self {
528 dir: dir.into(),
529 front: period.0,
530 back: period.1,
531 }
532 }
533}
534
535impl Iterator for ReportPathIter {
536 type Item = (std::path::PathBuf, u64);
537 fn next(&mut self) -> Option<Self::Item> {
538 if self.front >= self.back {
539 return None;
540 }
541 let front = self.front;
542 self.front += 1;
543
544 let path = format!("{}/{}.json", &self.dir, front);
545 trace!("ReportPathIter: {}, {}", &path, front);
546 Some((path.into(), front))
547 }
548}
549
550impl DoubleEndedIterator for ReportPathIter {
551 fn next_back(&mut self) -> Option<Self::Item> {
552 if self.front >= self.back {
553 return None;
554 }
555 let back = self.back;
556 self.back -= 1;
557
558 Some((format!("{}/{}.json", &self.dir, back).into(), back))
559 }
560}
561
562pub struct ReportIter {
563 piter: ReportPathIter,
564}
565
566impl ReportIter {
567 pub fn new(dir: &str, period: (u64, u64)) -> Self {
568 Self {
569 piter: ReportPathIter::new(dir, period),
570 }
571 }
572}
573
574impl Iterator for ReportIter {
575 type Item = (Result<Report>, u64);
576 fn next(&mut self) -> Option<Self::Item> {
577 self.piter
578 .next()
579 .map(|(path, at)| (Report::load(&path), at))
580 }
581}
582
583impl DoubleEndedIterator for ReportIter {
584 fn next_back(&mut self) -> Option<Self::Item> {
585 self.piter
586 .next_back()
587 .map(|(path, at)| (Report::load(&path), at))
588 }
589}