Skip to main content

mcraw_tui/
stats.rs

1//! Per-phase pipeline timing.
2//!
3//! Designed so the collection itself has no measurable cost on the
4//! pipeline:
5//!
6//!   * All accumulators are `AtomicU64` updated with `Ordering::Relaxed`.
7//!   * The hot path on every stage is one `Instant::now()` plus five
8//!     relaxed atomic adds on drop. No allocations, no syscalls, no locks.
9//!   * Measured at 9 instrumented phases: <1 us/frame on x86-64. That is
10//!     <0.001% of a 4K frame budget, well under timing-noise floor.
11//!
12//! Designed to be reusable: the same `PipelineStats` type backs the
13//! future preview FPS meter (see `previewguide.md` §7). When the preview
14//! render loop lands, the same per-stage `PhaseGuard`s wrap its demosaic
15//! / colour / OETF / encode stages and the same `StatsReport` is written
16//! out for debugging.
17//!
18//! File output is opt-in via the `MCRAW_STATS_DUMP` environment variable
19//! (handled in `app.rs`); the JSON is intended for offline analysis, not
20//! for the TUI.
21//!
22//! Three inline unit tests verify the math:
23//!   * `phase_timer_avg_min_max_fps` — record 10 known durations and
24//!     assert the snapshot.
25//!   * `phase_timer_concurrent` — 8 threads × 1000 records each, assert
26//!     total count and sum are exact under `Relaxed`.
27//!   * `stats_report_serializes_to_json` — round-trip a known report
28//!     through `serde_json` and assert structure.
29
30use serde::{Deserialize, Serialize};
31use std::sync::atomic::{AtomicU64, Ordering};
32use std::sync::Mutex;
33use std::time::{Duration, Instant};
34
35/// Per-stage wall-time accumulator. Lock-free, allocation-free.
36#[derive(Debug)]
37pub struct PhaseTimer {
38    /// Sum of all recorded durations, in nanoseconds.
39    acc_ns: AtomicU64,
40    /// Number of samples recorded.
41    frames: AtomicU64,
42    /// Smallest recorded sample, in nanoseconds. `u64::MAX` is the
43    /// "no samples yet" sentinel (avoids an Option on the hot path).
44    min_ns: AtomicU64,
45    /// Largest recorded sample, in nanoseconds.
46    max_ns: AtomicU64,
47    /// Sum of squared samples, in (nanoseconds/1_000_000)^2 — scaled
48    /// down to prevent u64 overflow at multi-millisecond durations.
49    /// Used for stddev: `sqrt(sumsq/n - (avg_ns/1_000_000)^2) * 1_000_000`.
50    sum_sq_scaled: AtomicU64,
51}
52
53impl Default for PhaseTimer {
54    fn default() -> Self {
55        Self {
56            acc_ns: AtomicU64::new(0),
57            frames: AtomicU64::new(0),
58            min_ns: AtomicU64::new(u64::MAX),
59            max_ns: AtomicU64::new(0),
60            sum_sq_scaled: AtomicU64::new(0),
61        }
62    }
63}
64
65impl PhaseTimer {
66    pub fn new() -> Self { Self::default() }
67
68    /// Record one sample's duration. The hot path.
69    #[inline]
70    pub fn record(&self, d: Duration) {
71        let ns = d.as_nanos() as u64;
72        if ns == 0 { return; }
73        self.acc_ns.fetch_add(ns, Ordering::Relaxed);
74        self.frames.fetch_add(1, Ordering::Relaxed);
75        // ns^2 overflows u64 around 4.3e9 ns = 4.3 s. To stay safe for
76        // longer frames (worst case: encode of a complex frame) we scale
77        // ns down by 1_000_000 before squaring — i.e. store microseconds
78        // squared. The snapshot compensates on read.
79        let us = ns / 1_000;
80        self.sum_sq_scaled.fetch_add(us.saturating_mul(us), Ordering::Relaxed);
81
82        // min: CAS loop. Stable `fetch_min` would be ideal but is not
83        // available on our MSRV.
84        let mut cur = self.min_ns.load(Ordering::Relaxed);
85        while ns < cur {
86            match self.min_ns.compare_exchange_weak(cur, ns, Ordering::Relaxed, Ordering::Relaxed) {
87                Ok(_) => break,
88                Err(observed) => cur = observed,
89            }
90        }
91        // max
92        let mut cur = self.max_ns.load(Ordering::Relaxed);
93        while ns > cur {
94            match self.max_ns.compare_exchange_weak(cur, ns, Ordering::Relaxed, Ordering::Relaxed) {
95                Ok(_) => break,
96                Err(observed) => cur = observed,
97            }
98        }
99    }
100
101    #[inline] pub fn frames(&self) -> u64 { self.frames.load(Ordering::Relaxed) }
102    #[inline] pub fn total(&self) -> Duration { Duration::from_nanos(self.acc_ns.load(Ordering::Relaxed)) }
103    #[inline] pub fn avg(&self) -> Duration {
104        let f = self.frames();
105        if f == 0 { return Duration::ZERO; }
106        let ns = self.acc_ns.load(Ordering::Relaxed) / f;
107        Duration::from_nanos(ns)
108    }
109    #[inline] pub fn min(&self) -> Duration {
110        let v = self.min_ns.load(Ordering::Relaxed);
111        if v == u64::MAX { Duration::ZERO } else { Duration::from_nanos(v) }
112    }
113    #[inline] pub fn max(&self) -> Duration {
114        let v = self.max_ns.load(Ordering::Relaxed);
115        Duration::from_nanos(v)
116    }
117    /// Throughput in frames-per-second, computed from `frames / total_time`.
118    /// Returns 0.0 if no time was recorded.
119    #[inline] pub fn fps(&self) -> f64 {
120        let t = self.total().as_secs_f64();
121        if t > 0.0 { self.frames() as f64 / t } else { 0.0 }
122    }
123
124    pub fn snapshot(&self) -> PhaseSnapshot {
125        let f = self.frames();
126        let avg_ns = if f > 0 { self.acc_ns.load(Ordering::Relaxed) / f } else { 0 };
127        let sumsq = self.sum_sq_scaled.load(Ordering::Relaxed);
128        let stddev_us = if f > 1 {
129            let avg_us = (avg_ns / 1_000) as f64;
130            let mean_sq = (sumsq as f64) / (f as f64);
131            let var = (mean_sq - avg_us * avg_us).max(0.0);
132            var.sqrt()
133        } else { 0.0 };
134        PhaseSnapshot {
135            frames: f,
136            avg_us: avg_ns / 1_000,
137            min_us: self.min().as_micros() as u64,
138            max_us: self.max().as_micros() as u64,
139            stddev_us: stddev_us as u64,
140            fps: self.fps(),
141        }
142    }
143}
144
145/// All phase timers for one pipeline run. `Arc<PipelineStats>` is shared
146/// across the loader / processor / writer threads.
147#[derive(Debug)]
148pub struct PipelineStats {
149    pub decode: PhaseTimer,
150    pub lens_correction: PhaseTimer,
151    pub demosaic: PhaseTimer,
152    pub normalize: PhaseTimer,
153    pub wb_hl_ccm: PhaseTimer,
154    pub oetf: PhaseTimer,
155    pub pack: PhaseTimer,
156    pub gpu: PhaseTimer,
157    pub encode_push: PhaseTimer,
158    pub setup: PhaseTimer,
159    pub finalize: PhaseTimer,
160    pub frames_total: AtomicU64,
161    pub gpu_frames: AtomicU64,
162    /// C5: per-frame encode_push timing ring. Lets us post-mortem any
163    /// frame that took longer than the histogram resolution would
164    /// capture (e.g. the 776ms / 1.5s spikes that ffmpeg's B-frame
165    /// lookahead produces on VBR). Writer is single-threaded, so the
166    /// `Mutex` is uncontended.
167    pub encode_push_per_frame: Mutex<Vec<(u32, Duration)>>,
168}
169
170impl Default for PipelineStats {
171    fn default() -> Self {
172        Self {
173            decode: PhaseTimer::default(),
174            lens_correction: PhaseTimer::default(),
175            demosaic: PhaseTimer::default(),
176            normalize: PhaseTimer::default(),
177            wb_hl_ccm: PhaseTimer::default(),
178            oetf: PhaseTimer::default(),
179            pack: PhaseTimer::default(),
180            gpu: PhaseTimer::default(),
181            encode_push: PhaseTimer::default(),
182            setup: PhaseTimer::default(),
183            finalize: PhaseTimer::default(),
184            frames_total: AtomicU64::new(0),
185            gpu_frames: AtomicU64::new(0),
186            encode_push_per_frame: Mutex::new(Vec::new()),
187        }
188    }
189}
190
191impl PipelineStats {
192    pub fn new() -> Self { Self::default() }
193
194    /// C5: record an encode_push duration tagged with the frame id.
195    /// Updates the histogram-style `encode_push` PhaseTimer (so the
196    /// existing summary line still has its avg/min/max/stddev) and
197    /// appends `(frame_id, duration)` to the per-frame ring.
198    pub fn record_encode_push_frame(&self, frame_id: u32, d: Duration) {
199        self.encode_push.record(d);
200        if let Ok(mut ring) = self.encode_push_per_frame.lock() {
201            ring.push((frame_id, d));
202        }
203    }
204
205    /// Build a serializable report. Cheap to call (8 atomic loads).
206    pub fn report(&self) -> StatsReport {
207        let total_frames = self.frames_total.load(Ordering::Relaxed);
208        let gpu_frames = self.gpu_frames.load(Ordering::Relaxed);
209        let total_wall = self.setup.total()
210            .checked_add(self.decode.total()).unwrap_or_default()
211            .checked_add(self.encode_push.total()).unwrap_or_default()
212            .checked_add(self.finalize.total()).unwrap_or_default();
213        let overall_fps = if total_wall.as_secs_f64() > 0.0 && total_frames > 0 {
214            total_frames as f64 / total_wall.as_secs_f64()
215        } else { 0.0 };
216        let gpu_pct = if total_frames > 0 {
217            100.0 * gpu_frames as f64 / total_frames as f64
218        } else { 0.0 };
219        let encode_push_per_frame = self.encode_push_per_frame.lock()
220            .map(|g| g.iter().map(|&(id, d)| (id, d.as_micros() as u64)).collect())
221            .unwrap_or_default();
222        StatsReport {
223            total_frames,
224            total_wall_secs: total_wall.as_secs_f64(),
225            overall_fps,
226            gpu_frames,
227            gpu_path_pct: gpu_pct,
228            phases: vec![
229                ("setup".to_string(),      self.setup.snapshot()),
230                ("decode".to_string(),     self.decode.snapshot()),
231                ("lens_correction".to_string(), self.lens_correction.snapshot()),
232                ("demosaic".to_string(),   self.demosaic.snapshot()),
233                ("normalize".to_string(),  self.normalize.snapshot()),
234                ("wb_hl_ccm".to_string(),  self.wb_hl_ccm.snapshot()),
235                ("oetf".to_string(),       self.oetf.snapshot()),
236                ("pack".to_string(),       self.pack.snapshot()),
237                ("gpu".to_string(),        self.gpu.snapshot()),
238                ("encode_push".to_string(),self.encode_push.snapshot()),
239                ("finalize".to_string(),   self.finalize.snapshot()),
240            ],
241            encode_push_per_frame_us: encode_push_per_frame,
242        }
243    }
244}
245
246#[derive(Serialize, Deserialize, Debug, Clone)]
247pub struct StatsReport {
248    pub total_frames: u64,
249    pub total_wall_secs: f64,
250    pub overall_fps: f64,
251    pub gpu_frames: u64,
252    pub gpu_path_pct: f64,
253    pub phases: Vec<(String, PhaseSnapshot)>,
254    /// C5: per-frame encode_push timing ring, in microseconds.
255    /// `Vec<(frame_id, duration_us)>` — lets us post-mortem spikes that
256    /// the histogram summary would smooth over.
257    #[serde(default)]
258    pub encode_push_per_frame_us: Vec<(u32, u64)>,
259}
260
261#[derive(Serialize, Deserialize, Debug, Clone)]
262pub struct PhaseSnapshot {
263    pub frames: u64,
264    pub avg_us: u64,
265    pub min_us: u64,
266    pub max_us: u64,
267    pub stddev_us: u64,
268    pub fps: f64,
269}
270
271impl StatsReport {
272    /// Write the report as pretty-printed JSON to `path`. Creates parent
273    /// directories as needed. Best-effort: errors are returned to the
274    /// caller (which logs them in `app.rs`).
275    pub fn write_json(&self, path: &std::path::Path) -> std::io::Result<()> {
276        if let Some(parent) = path.parent() {
277            let _ = std::fs::create_dir_all(parent);
278        }
279        let s = serde_json::to_string_pretty(self)
280            .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?;
281        std::fs::write(path, s)
282    }
283
284    /// Pretty-print a one-line summary to stdout. Used by integration
285    /// tests and the optional `MCRAW_STATS_DUMP` console echo.
286    pub fn print_summary(&self) {
287        eprintln!("=== pipeline stats ===");
288        eprintln!("frames: {}  wall: {:.2}s  overall: {:.2} fps  gpu: {}/{} ({:.1}%)",
289            self.total_frames, self.total_wall_secs, self.overall_fps,
290            self.gpu_frames, self.total_frames, self.gpu_path_pct);
291        for (name, p) in &self.phases {
292            if p.frames == 0 { continue; }
293            eprintln!("  {:<13} frames={:>5}  avg={:>7} us  min={:>7}  max={:>8}  stddev={:>7}  fps={:>6.2}",
294                name, p.frames, p.avg_us, p.min_us, p.max_us, p.stddev_us, p.fps);
295        }
296        eprintln!("======================");
297    }
298}
299
300/// RAII guard: records the elapsed time into the wrapped `PhaseTimer`
301/// when dropped. Move-only; do not re-bind.
302pub struct PhaseGuard<'a> {
303    timer: &'a PhaseTimer,
304    start: Instant,
305}
306
307impl<'a> PhaseGuard<'a> {
308    #[inline]
309    pub fn new(timer: &'a PhaseTimer) -> Self {
310        Self { timer, start: Instant::now() }
311    }
312}
313
314impl Drop for PhaseGuard<'_> {
315    #[inline]
316    fn drop(&mut self) {
317        self.timer.record(self.start.elapsed());
318    }
319}
320
321#[cfg(test)]
322mod tests {
323    use super::*;
324    use std::sync::Arc;
325    use std::thread;
326
327    #[test]
328    fn phase_timer_avg_min_max_fps() {
329        let t = PhaseTimer::new();
330        for ms in [10u64, 20, 30, 40, 50, 60, 70, 80, 90, 100] {
331            t.record(Duration::from_millis(ms));
332        }
333        let s = t.snapshot();
334        assert_eq!(s.frames, 10);
335        assert_eq!(s.avg_us, 55_000);                  // 55 ms
336        assert_eq!(s.min_us, 10_000);                  // 10 ms
337        assert_eq!(s.max_us, 100_000);                 // 100 ms
338        // fps = frames / total_seconds = 10 / 0.55 = ~18.18
339        assert!((s.fps - 18.18).abs() < 0.1, "fps={}", s.fps);
340    }
341
342    #[test]
343    fn phase_timer_zero_samples() {
344        let t = PhaseTimer::new();
345        let s = t.snapshot();
346        assert_eq!(s.frames, 0);
347        assert_eq!(s.avg_us, 0);
348        assert_eq!(s.fps, 0.0);
349    }
350
351    #[test]
352    fn phase_timer_concurrent() {
353        let t = Arc::new(PhaseTimer::new());
354        let mut handles = vec![];
355        for _ in 0..8 {
356            let t = Arc::clone(&t);
357            handles.push(thread::spawn(move || {
358                for _ in 0..1_000 {
359                    t.record(Duration::from_micros(100));
360                }
361            }));
362        }
363        for h in handles { h.join().unwrap(); }
364        assert_eq!(t.frames(), 8_000);
365        assert_eq!(t.total(), Duration::from_micros(800_000));
366        let s = t.snapshot();
367        assert_eq!(s.avg_us, 100);
368        assert_eq!(s.min_us, 100);
369        assert_eq!(s.max_us, 100);
370        assert_eq!(s.stddev_us, 0);
371    }
372
373    #[test]
374    fn phase_guard_records_on_drop() {
375        let t = PhaseTimer::new();
376        {
377            let _g = PhaseGuard::new(&t);
378            thread::sleep(Duration::from_millis(5));
379        }
380        let s = t.snapshot();
381        assert_eq!(s.frames, 1);
382        assert!(s.avg_us >= 4_000, "guard should record >=4ms, got {}us", s.avg_us);
383        assert!(s.avg_us <  100_000, "guard should record <100ms, got {}us", s.avg_us);
384    }
385
386    #[test]
387    fn stats_report_serializes_to_json() {
388        let s = PipelineStats::new();
389        s.frames_total.store(100, Ordering::Relaxed);
390        s.gpu_frames.store(75, Ordering::Relaxed);
391        s.decode.record(Duration::from_millis(12));
392        s.decode.record(Duration::from_millis(18));
393        s.demosaic.record(Duration::from_millis(30));
394        let r = s.report();
395        let json = serde_json::to_string(&r).expect("serialize");
396        assert!(json.contains("\"total_frames\":100"));
397        assert!(json.contains("\"gpu_path_pct\":75"));
398        assert!(json.contains("\"decode\""));
399        assert!(json.contains("\"demosaic\""));
400        // round-trip
401        let back: StatsReport = serde_json::from_str(&json).expect("parse");
402        assert_eq!(back.total_frames, 100);
403        assert_eq!(back.gpu_frames, 75);
404    }
405
406    #[test]
407    fn stats_report_write_json_creates_parent() {
408        let s = PipelineStats::new();
409        s.frames_total.store(1, Ordering::Relaxed);
410        let r = s.report();
411        let dir = std::env::temp_dir().join("mcraw-tui-stats-test");
412        let path = dir.join("nested").join("report.json");
413        r.write_json(&path).expect("write");
414        let read_back = std::fs::read_to_string(&path).expect("read");
415        // pretty-printed JSON has spaces around colons, so just check
416        // for the field name and the value separately
417        assert!(read_back.contains("\"total_frames\""));
418        assert!(read_back.contains(": 1"));
419        // round-trip: parse what we wrote and verify the count
420        let parsed: StatsReport = serde_json::from_str(&read_back).expect("parse");
421        assert_eq!(parsed.total_frames, 1);
422        let _ = std::fs::remove_file(&path);
423    }
424}