nornir 0.4.29

Companion to cargo: dependency tracking, release gating, deploy, benchmarks, and documentation assembly. Project-agnostic.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
//! Low-overhead CPU/memory telemetry for a single bench unit.
//!
//! A [`Sampler`] spawns one background thread that, ~once a second, reads three
//! cheap `/proc` files — `/proc/stat` (aggregate + per-core jiffy counters),
//! `/proc/meminfo` (system memory), and `/proc/self/status` (this process's
//! peak RSS) — and folds the deltas into running min/avg/max accumulators. When
//! the bench unit finishes the caller calls [`Sampler::stop`], which joins the
//! thread and returns a [`Telemetry`] summary.
//!
//! Why `/proc` and not a perf tool: the whole point is **non-perturbation**.
//! Each sample is a handful of small synchronous file reads on the *sampler*
//! thread (not the bench thread) and a couple of integer subtractions — well
//! under a millisecond, once a second. No `perf`, no `ptrace`, no extra crate.
//!
//! ## What is computed
//! - **CPU% aggregate** — `100 * (1 - idle_delta/total_delta)` from
//!   `/proc/stat`'s first (`cpu`) line across a sampling interval. avg + max.
//! - **cores-busy** — of the N per-core (`cpu0`, `cpu1`, …) lines, how many had
//!   per-core utilisation **> 50%** over the interval. avg (rounded) + max.
//!   Also published live (see [`Sampler::cores_busy_now`]) so a long-running
//!   bench can name `cores-busy/N` in its progress line.
//! - **peak RSS** — the max `VmHWM` seen in `/proc/self/status` (MB).
//! - **mem%** — peak `(MemTotal-MemAvailable)/MemTotal` from `/proc/meminfo`.
//! - **elapsed_ms** — wall time from `start` to `stop`.
//!
//! Non-Linux (no `/proc`) degrades gracefully: the thread can't read the files,
//! so the summary is all-zero except `elapsed_ms`. The sampler never panics and
//! never affects the bench result.

use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
use std::time::{Duration, Instant};

/// The per-core utilisation threshold (fraction, 0..1) above which a core is
/// counted "busy" for the cores-busy metric.
const BUSY_THRESHOLD: f64 = 0.50;

/// Sampling period. 1 Hz keeps overhead negligible while still resolving a
/// multi-second bench's load profile.
const SAMPLE_PERIOD: Duration = Duration::from_millis(1000);

/// Summary telemetry for one bench unit. All values are best-effort; on a host
/// without `/proc` everything except `elapsed_ms` is zero.
#[derive(Debug, Clone, Default, PartialEq)]
pub struct Telemetry {
    /// Number of logical cores the sampler observed in `/proc/stat`.
    pub n_cores: u32,
    /// Aggregate CPU utilisation across all cores, percent (0..100).
    pub cpu_pct_avg: f64,
    pub cpu_pct_max: f64,
    /// How many cores were >50% busy, averaged over the samples / peak.
    pub cores_busy_avg: f64,
    pub cores_busy_max: u32,
    /// Peak resident set size of *this* process, in MB.
    pub mem_peak_mb: f64,
    /// Peak system memory utilisation, percent (0..100).
    pub mem_pct_max: f64,
    /// Wall-clock duration of the sampled unit.
    pub elapsed_ms: f64,
    /// How many interval samples contributed to the averages.
    pub samples: u32,
}

/// One `/proc/stat` snapshot: aggregate (idle, total) jiffies plus the same per
/// core. Deltas between two snapshots give per-interval utilisation.
struct StatSnapshot {
    /// (idle_jiffies, total_jiffies) for the aggregate `cpu` line.
    agg: (u64, u64),
    /// (idle, total) per logical core, in `cpu0..cpuN` order.
    per_core: Vec<(u64, u64)>,
}

/// Live-updated, lock-free handle the sampler thread writes and the progress
/// emitter reads, so `[BENCH]` lines can show the current `cores-busy/N`.
#[derive(Default)]
struct Live {
    cores_busy_now: AtomicU32,
    n_cores: AtomicU32,
}

/// A running telemetry sampler. Construct with [`Sampler::start`], then
/// [`Sampler::stop`] to join and collect the [`Telemetry`].
pub struct Sampler {
    stop: Arc<AtomicBool>,
    live: Arc<Live>,
    handle: Option<std::thread::JoinHandle<Telemetry>>,
    start: Instant,
    // Accumulated in the thread; this is just the wall clock fallback.
}

impl Sampler {
    /// Start sampling on a background thread. Cheap: spawns one thread that
    /// sleeps most of the time.
    pub fn start() -> Self {
        let stop = Arc::new(AtomicBool::new(false));
        let live = Arc::new(Live::default());
        let start = Instant::now();
        let stop_t = Arc::clone(&stop);
        let live_t = Arc::clone(&live);
        let handle = std::thread::Builder::new()
            .name("bench-telemetry".into())
            .spawn(move || sample_loop(stop_t, live_t, start))
            .ok();
        Sampler { stop, live, handle, start }
    }

    /// Current count of cores observed >50% busy in the most recent interval.
    /// Lock-free read for the live progress line. 0 until the first interval
    /// completes (~1s in).
    pub fn cores_busy_now(&self) -> u32 {
        self.live.cores_busy_now.load(Ordering::Relaxed)
    }

    /// Number of logical cores the sampler has seen (0 until first sample).
    pub fn n_cores(&self) -> u32 {
        self.live.n_cores.load(Ordering::Relaxed)
    }

    /// Stop sampling, join the thread, and return the summary. Always sets
    /// `elapsed_ms` from the wall clock even if the thread never produced a
    /// full interval (sub-second bench).
    pub fn stop(mut self) -> Telemetry {
        self.stop.store(true, Ordering::Relaxed);
        let elapsed_ms = self.start.elapsed().as_secs_f64() * 1000.0;
        let mut t = self
            .handle
            .take()
            .and_then(|h| h.join().ok())
            .unwrap_or_default();
        t.elapsed_ms = elapsed_ms;
        t
    }
}

/// The background loop. Reads an initial `/proc/stat` baseline, then every
/// `SAMPLE_PERIOD` reads again, folds the interval's utilisation into the
/// accumulators, and publishes the live cores-busy count. Exits promptly when
/// `stop` is set (checked at sub-period granularity so `stop()` doesn't block a
/// whole second).
fn sample_loop(stop: Arc<AtomicBool>, live: Arc<Live>, _start: Instant) -> Telemetry {
    let mut acc = Telemetry::default();

    let mut prev = read_proc_stat();
    if let Some(p) = &prev {
        let n = p.per_core.len() as u32;
        acc.n_cores = n;
        live.n_cores.store(n, Ordering::Relaxed);
    }

    let mut cpu_sum = 0.0_f64;
    let mut busy_sum = 0.0_f64;

    loop {
        // Sleep in small slices so a stop request is honoured quickly.
        let mut waited = Duration::ZERO;
        while waited < SAMPLE_PERIOD {
            if stop.load(Ordering::Relaxed) {
                break;
            }
            let slice = Duration::from_millis(50);
            std::thread::sleep(slice);
            waited += slice;
        }
        let stopping = stop.load(Ordering::Relaxed);

        if let (Some(p), Some(cur)) = (prev.as_ref(), read_proc_stat()) {
            if let Some((cpu_pct, cores_busy)) = interval_utilisation(p, &cur) {
                acc.samples += 1;
                cpu_sum += cpu_pct;
                busy_sum += cores_busy as f64;
                acc.cpu_pct_max = acc.cpu_pct_max.max(cpu_pct);
                acc.cores_busy_max = acc.cores_busy_max.max(cores_busy);
                live.cores_busy_now.store(cores_busy, Ordering::Relaxed);
            }
            prev = Some(cur);
        }

        // Memory: peak RSS of this process + peak system mem%.
        if let Some(rss_mb) = read_self_peak_rss_mb() {
            acc.mem_peak_mb = acc.mem_peak_mb.max(rss_mb);
        }
        if let Some(mem_pct) = read_system_mem_pct() {
            acc.mem_pct_max = acc.mem_pct_max.max(mem_pct);
        }

        if stopping {
            break;
        }
    }

    if acc.samples > 0 {
        acc.cpu_pct_avg = cpu_sum / acc.samples as f64;
        acc.cores_busy_avg = busy_sum / acc.samples as f64;
    }
    acc
}

/// Compute (aggregate CPU%, cores-busy) for the interval between two snapshots.
/// Returns `None` if the total jiffy delta is zero (no time passed / read
/// glitch) so a degenerate interval doesn't pollute the averages.
fn interval_utilisation(prev: &StatSnapshot, cur: &StatSnapshot) -> Option<(f64, u32)> {
    let (pi, pt) = prev.agg;
    let (ci, ct) = cur.agg;
    let total_d = ct.saturating_sub(pt);
    if total_d == 0 {
        return None;
    }
    let idle_d = ci.saturating_sub(pi);
    let cpu_pct = (1.0 - (idle_d as f64 / total_d as f64)) * 100.0;
    let cpu_pct = cpu_pct.clamp(0.0, 100.0);

    let mut busy = 0u32;
    let n = prev.per_core.len().min(cur.per_core.len());
    for i in 0..n {
        let (cpi, cpt) = prev.per_core[i];
        let (cci, cct) = cur.per_core[i];
        let td = cct.saturating_sub(cpt);
        if td == 0 {
            continue;
        }
        let id = cci.saturating_sub(cpi);
        let util = 1.0 - (id as f64 / td as f64);
        if util > BUSY_THRESHOLD {
            busy += 1;
        }
    }
    Some((cpu_pct, busy))
}

/// Parse `/proc/stat`'s `cpu` (aggregate) and `cpuN` (per-core) lines into a
/// [`StatSnapshot`]. Each line is
/// `cpu  user nice system idle iowait irq softirq steal guest guest_nice`;
/// idle = field 4 (`idle`) + field 5 (`iowait`); total = sum of all fields.
/// Returns `None` on non-Linux / read error.
fn read_proc_stat() -> Option<StatSnapshot> {
    let text = std::fs::read_to_string("/proc/stat").ok()?;
    let mut agg = None;
    let mut per_core = Vec::new();
    for line in text.lines() {
        if !line.starts_with("cpu") {
            // /proc/stat lists all cpu lines first; once we leave them we're done.
            if agg.is_some() {
                break;
            }
            continue;
        }
        let mut it = line.split_whitespace();
        let label = it.next()?; // "cpu" or "cpuN"
        let nums: Vec<u64> = it.filter_map(|f| f.parse::<u64>().ok()).collect();
        if nums.len() < 5 {
            continue;
        }
        let total: u64 = nums.iter().sum();
        // idle + iowait
        let idle = nums[3] + nums[4];
        if label == "cpu" {
            agg = Some((idle, total));
        } else {
            per_core.push((idle, total));
        }
    }
    let agg = agg?;
    Some(StatSnapshot { agg, per_core })
}

/// Peak resident set (`VmHWM`) of the current process, in MB, from
/// `/proc/self/status`. `VmHWM` is a high-water mark the kernel maintains, so a
/// 1 Hz sampler never misses a transient peak between reads.
fn read_self_peak_rss_mb() -> Option<f64> {
    let text = std::fs::read_to_string("/proc/self/status").ok()?;
    for line in text.lines() {
        if let Some(rest) = line.strip_prefix("VmHWM:") {
            // `VmHWM:   123456 kB`
            let kb: f64 = rest.split_whitespace().next()?.parse().ok()?;
            return Some(kb / 1024.0);
        }
    }
    None
}

/// System memory utilisation percent from `/proc/meminfo`:
/// `(MemTotal - MemAvailable) / MemTotal * 100`.
fn read_system_mem_pct() -> Option<f64> {
    let text = std::fs::read_to_string("/proc/meminfo").ok()?;
    let mut total = None;
    let mut avail = None;
    for line in text.lines() {
        if let Some(rest) = line.strip_prefix("MemTotal:") {
            total = rest.split_whitespace().next().and_then(|v| v.parse::<f64>().ok());
        } else if let Some(rest) = line.strip_prefix("MemAvailable:") {
            avail = rest.split_whitespace().next().and_then(|v| v.parse::<f64>().ok());
        }
        if total.is_some() && avail.is_some() {
            break;
        }
    }
    let (t, a) = (total?, avail?);
    if t <= 0.0 {
        return None;
    }
    Some(((t - a) / t * 100.0).clamp(0.0, 100.0))
}

/// Reserved metric-key prefix under which a unit's telemetry rides inside
/// [`crate::bench::BenchResult::metrics`]. The harness writes these so the
/// numbers survive the bench-binary → nornir stdout-JSON hop; the warehouse
/// writer then lifts them out into the dedicated `bench_telemetry` table.
pub const TELEM_PREFIX: &str = "telem_";

/// Fold a [`Telemetry`] into a result's metric map under the [`TELEM_PREFIX`]
/// keys, so it travels with the [`crate::bench::BenchResult`] through JSON and
/// into the warehouse.
pub fn inject_into_metrics(
    metrics: &mut serde_json::Map<String, serde_json::Value>,
    t: &Telemetry,
) {
    let j = |f: f64| serde_json::json!(f);
    metrics.insert(format!("{TELEM_PREFIX}cpu_pct_avg"), j(round2(t.cpu_pct_avg)));
    metrics.insert(format!("{TELEM_PREFIX}cpu_pct_max"), j(round2(t.cpu_pct_max)));
    metrics.insert(format!("{TELEM_PREFIX}cores_busy_avg"), j(round2(t.cores_busy_avg)));
    metrics.insert(format!("{TELEM_PREFIX}cores_busy_max"), serde_json::json!(t.cores_busy_max));
    metrics.insert(format!("{TELEM_PREFIX}n_cores"), serde_json::json!(t.n_cores));
    metrics.insert(format!("{TELEM_PREFIX}mem_peak_mb"), j(round2(t.mem_peak_mb)));
    metrics.insert(format!("{TELEM_PREFIX}mem_pct_max"), j(round2(t.mem_pct_max)));
    metrics.insert(format!("{TELEM_PREFIX}elapsed_ms"), j(round2(t.elapsed_ms)));
}

/// Reconstruct a [`Telemetry`] from a result's metric map (the inverse of
/// [`inject_into_metrics`]). Returns `None` if the result carries no telemetry
/// keys (e.g. a legacy run, or a `_st` row we chose not to sample). The
/// warehouse writer uses this to populate `bench_telemetry`.
pub fn from_metrics(metrics: &serde_json::Map<String, serde_json::Value>) -> Option<Telemetry> {
    let get = |k: &str| -> Option<f64> {
        metrics.get(&format!("{TELEM_PREFIX}{k}")).and_then(|v| v.as_f64())
    };
    // Require at least one telemetry key to be present.
    if !metrics.keys().any(|k| k.starts_with(TELEM_PREFIX)) {
        return None;
    }
    Some(Telemetry {
        n_cores: get("n_cores").unwrap_or(0.0) as u32,
        cpu_pct_avg: get("cpu_pct_avg").unwrap_or(0.0),
        cpu_pct_max: get("cpu_pct_max").unwrap_or(0.0),
        cores_busy_avg: get("cores_busy_avg").unwrap_or(0.0),
        cores_busy_max: get("cores_busy_max").unwrap_or(0.0) as u32,
        mem_peak_mb: get("mem_peak_mb").unwrap_or(0.0),
        mem_pct_max: get("mem_pct_max").unwrap_or(0.0),
        elapsed_ms: get("elapsed_ms").unwrap_or(0.0),
        samples: 0,
    })
}

fn round2(f: f64) -> f64 {
    (f * 100.0).round() / 100.0
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn proc_stat_parses_aggregate_and_cores() {
        // The parser runs against the real /proc on Linux; assert it sees the
        // aggregate line and at least one core. On non-Linux this is None and we
        // skip (the sampler degrades gracefully there).
        if let Some(snap) = read_proc_stat() {
            assert!(snap.agg.1 > 0, "total jiffies should be > 0");
            assert!(!snap.per_core.is_empty(), "expected at least one cpuN line");
        }
    }

    #[test]
    fn interval_utilisation_counts_busy_cores() {
        // Synthetic two snapshots: core 0 fully busy (no idle delta), core 1
        // fully idle (all delta is idle). Expect cpu% ~50 aggregate, 1 busy core.
        let prev = StatSnapshot {
            agg: (100, 200),
            per_core: vec![(50, 100), (50, 100)],
        };
        let cur = StatSnapshot {
            // aggregate: +100 idle, +200 total → 50% busy
            agg: (200, 400),
            // core0: +0 idle, +100 total → 100% busy
            // core1: +100 idle, +100 total → 0% busy
            per_core: vec![(50, 200), (150, 200)],
        };
        let (cpu_pct, busy) = interval_utilisation(&prev, &cur).unwrap();
        assert!((cpu_pct - 50.0).abs() < 0.01, "cpu_pct={cpu_pct}");
        assert_eq!(busy, 1, "exactly one core should be >50% busy");
    }

    #[test]
    fn zero_interval_is_ignored() {
        let snap = StatSnapshot { agg: (10, 20), per_core: vec![(5, 10)] };
        let same = StatSnapshot { agg: (10, 20), per_core: vec![(5, 10)] };
        assert!(interval_utilisation(&snap, &same).is_none());
    }

    #[test]
    fn metrics_roundtrip() {
        let t = Telemetry {
            n_cores: 8,
            cpu_pct_avg: 73.25,
            cpu_pct_max: 99.5,
            cores_busy_avg: 5.5,
            cores_busy_max: 8,
            mem_peak_mb: 1234.5,
            mem_pct_max: 42.0,
            elapsed_ms: 1500.0,
            samples: 3,
        };
        let mut m = serde_json::Map::new();
        inject_into_metrics(&mut m, &t);
        let back = from_metrics(&m).expect("telemetry keys present");
        assert_eq!(back.n_cores, 8);
        assert_eq!(back.cores_busy_max, 8);
        assert!((back.cpu_pct_avg - 73.25).abs() < 0.01);
        assert!((back.mem_peak_mb - 1234.5).abs() < 0.01);
        // No telemetry keys → None.
        assert!(from_metrics(&serde_json::Map::new()).is_none());
    }

    #[test]
    fn sampler_runs_and_reports_on_a_busy_workload() {
        // INJECT-AND-ASSERT: start the sampler, spin a couple of threads in a
        // tight loop for ~2.2s (long enough for ≥2 sampling intervals), then
        // assert the sampler observed real cores and plausible non-zero load.
        let sampler = Sampler::start();
        let stop = Arc::new(AtomicBool::new(false));
        let mut handles = Vec::new();
        let n = std::thread::available_parallelism().map(|x| x.get()).unwrap_or(2).max(2);
        for _ in 0..n {
            let s = Arc::clone(&stop);
            handles.push(std::thread::spawn(move || {
                let mut x = 0u64;
                while !s.load(Ordering::Relaxed) {
                    // Burn CPU; black-box so it isn't optimised away.
                    x = x.wrapping_mul(2654435761).wrapping_add(1);
                    std::hint::black_box(x);
                }
            }));
        }
        std::thread::sleep(Duration::from_millis(2200));
        stop.store(true, Ordering::Relaxed);
        for h in handles {
            let _ = h.join();
        }
        let t = sampler.stop();

        // On Linux we get real /proc data; assert it's plausible and non-zero.
        if read_proc_stat().is_some() {
            assert!(t.n_cores >= 1, "n_cores={}", t.n_cores);
            assert!(t.samples >= 1, "expected ≥1 interval, got {}", t.samples);
            assert!(t.cpu_pct_max > 0.0, "cpu_pct_max should be >0 under load");
            assert!(
                t.cores_busy_max >= 1,
                "at least one core should be >50% busy under an N-thread spin, got {}",
                t.cores_busy_max
            );
            assert!(t.mem_peak_mb > 0.0, "peak RSS should be >0");
        }
        assert!(t.elapsed_ms >= 2000.0, "elapsed_ms={}", t.elapsed_ms);
    }
}