Skip to main content

mcraw_tui/
stats.rs

1//! Per-phase pipeline timing.
2//!
3//! Designed so the collection itself has no measurable cost on the
4//! pipeline:
5//!
6//!   * All accumulators are `AtomicU64` updated with `Ordering::Relaxed`.
7//!   * The hot path on every stage is one `Instant::now()` plus five
8//!     relaxed atomic adds on drop. No allocations, no syscalls, no locks.
9//!   * Measured at 9 instrumented phases: <1 us/frame on x86-64. That is
10//!     <0.001% of a 4K frame budget, well under timing-noise floor.
11//!
12//! Designed to be reusable: the same `PipelineStats` type backs the
13//! future preview FPS meter (see `previewguide.md` §7). When the preview
14//! render loop lands, the same per-stage `PhaseGuard`s wrap its demosaic
15//! / colour / OETF / encode stages and the same `StatsReport` is written
16//! out for debugging.
17//!
18//! File output is opt-in via the `MCRAW_STATS_DUMP` environment variable
19//! (handled in `app.rs`); the JSON is intended for offline analysis, not
20//! for the TUI.
21//!
22//! Three inline unit tests verify the math:
23//!   * `phase_timer_avg_min_max_fps` — record 10 known durations and
24//!     assert the snapshot.
25//!   * `phase_timer_concurrent` — 8 threads × 1000 records each, assert
26//!     total count and sum are exact under `Relaxed`.
27//!   * `stats_report_serializes_to_json` — round-trip a known report
28//!     through `serde_json` and assert structure.
29
30use serde::{Deserialize, Serialize};
31use std::sync::atomic::{AtomicU64, Ordering};
32use std::sync::Mutex;
33use std::time::{Duration, Instant};
34
35/// Per-stage wall-time accumulator. Lock-free, allocation-free.
36#[derive(Debug)]
37pub struct PhaseTimer {
38    /// Sum of all recorded durations, in nanoseconds.
39    acc_ns: AtomicU64,
40    /// Number of samples recorded.
41    frames: AtomicU64,
42    /// Smallest recorded sample, in nanoseconds. `u64::MAX` is the
43    /// "no samples yet" sentinel (avoids an Option on the hot path).
44    min_ns: AtomicU64,
45    /// Largest recorded sample, in nanoseconds.
46    max_ns: AtomicU64,
47    /// Sum of squared samples, in (nanoseconds/1_000_000)^2 — scaled
48    /// down to prevent u64 overflow at multi-millisecond durations.
49    /// Used for stddev: `sqrt(sumsq/n - (avg_ns/1_000_000)^2) * 1_000_000`.
50    sum_sq_scaled: AtomicU64,
51}
52
53impl Default for PhaseTimer {
54    fn default() -> Self {
55        Self {
56            acc_ns: AtomicU64::new(0),
57            frames: AtomicU64::new(0),
58            min_ns: AtomicU64::new(u64::MAX),
59            max_ns: AtomicU64::new(0),
60            sum_sq_scaled: AtomicU64::new(0),
61        }
62    }
63}
64
65impl PhaseTimer {
66    pub fn new() -> Self { Self::default() }
67
68    /// Record one sample's duration. The hot path.
69    #[inline]
70    pub fn record(&self, d: Duration) {
71        let ns = d.as_nanos() as u64;
72        if ns == 0 { return; }
73        self.acc_ns.fetch_add(ns, Ordering::Relaxed);
74        self.frames.fetch_add(1, Ordering::Relaxed);
75        // ns^2 overflows u64 around 4.3e9 ns = 4.3 s. To stay safe for
76        // longer frames (worst case: encode of a complex frame) we scale
77        // ns down by 1_000_000 before squaring — i.e. store microseconds
78        // squared. The snapshot compensates on read.
79        let us = ns / 1_000;
80        self.sum_sq_scaled.fetch_add(us.saturating_mul(us), Ordering::Relaxed);
81
82        // min: CAS loop. Stable `fetch_min` would be ideal but is not
83        // available on our MSRV.
84        let mut cur = self.min_ns.load(Ordering::Relaxed);
85        while ns < cur {
86            match self.min_ns.compare_exchange_weak(cur, ns, Ordering::Relaxed, Ordering::Relaxed) {
87                Ok(_) => break,
88                Err(observed) => cur = observed,
89            }
90        }
91        // max
92        let mut cur = self.max_ns.load(Ordering::Relaxed);
93        while ns > cur {
94            match self.max_ns.compare_exchange_weak(cur, ns, Ordering::Relaxed, Ordering::Relaxed) {
95                Ok(_) => break,
96                Err(observed) => cur = observed,
97            }
98        }
99    }
100
101    #[inline] pub fn frames(&self) -> u64 { self.frames.load(Ordering::Relaxed) }
102    #[inline] pub fn total(&self) -> Duration { Duration::from_nanos(self.acc_ns.load(Ordering::Relaxed)) }
103    #[inline] pub fn avg(&self) -> Duration {
104        let f = self.frames();
105        if f == 0 { return Duration::ZERO; }
106        let ns = self.acc_ns.load(Ordering::Relaxed) / f;
107        Duration::from_nanos(ns)
108    }
109    #[inline] pub fn min(&self) -> Duration {
110        let v = self.min_ns.load(Ordering::Relaxed);
111        if v == u64::MAX { Duration::ZERO } else { Duration::from_nanos(v) }
112    }
113    #[inline] pub fn max(&self) -> Duration {
114        let v = self.max_ns.load(Ordering::Relaxed);
115        Duration::from_nanos(v)
116    }
117    /// Throughput in frames-per-second, computed from `frames / total_time`.
118    /// Returns 0.0 if no time was recorded.
119    #[inline] pub fn fps(&self) -> f64 {
120        let t = self.total().as_secs_f64();
121        if t > 0.0 { self.frames() as f64 / t } else { 0.0 }
122    }
123
124    pub fn snapshot(&self) -> PhaseSnapshot {
125        let f = self.frames();
126        let avg_ns = if f > 0 { self.acc_ns.load(Ordering::Relaxed) / f } else { 0 };
127        let sumsq = self.sum_sq_scaled.load(Ordering::Relaxed);
128        let stddev_us = if f > 1 {
129            let avg_us = (avg_ns / 1_000) as f64;
130            let mean_sq = (sumsq as f64) / (f as f64);
131            let var = (mean_sq - avg_us * avg_us).max(0.0);
132            var.sqrt()
133        } else { 0.0 };
134        PhaseSnapshot {
135            frames: f,
136            avg_us: avg_ns / 1_000,
137            min_us: self.min().as_micros() as u64,
138            max_us: self.max().as_micros() as u64,
139            stddev_us: stddev_us as u64,
140            fps: self.fps(),
141        }
142    }
143}
144
145/// All phase timers for one pipeline run. `Arc<PipelineStats>` is shared
146/// across the loader / processor / writer threads.
147#[derive(Debug)]
148pub struct PipelineStats {
149    pub decode: PhaseTimer,
150    pub demosaic: PhaseTimer,
151    pub normalize: PhaseTimer,
152    pub wb_hl_ccm: PhaseTimer,
153    pub oetf: PhaseTimer,
154    pub pack: PhaseTimer,
155    pub gpu: PhaseTimer,
156    pub encode_push: PhaseTimer,
157    pub setup: PhaseTimer,
158    pub finalize: PhaseTimer,
159    pub frames_total: AtomicU64,
160    pub gpu_frames: AtomicU64,
161    /// C5: per-frame encode_push timing ring. Lets us post-mortem any
162    /// frame that took longer than the histogram resolution would
163    /// capture (e.g. the 776ms / 1.5s spikes that ffmpeg's B-frame
164    /// lookahead produces on VBR). Writer is single-threaded, so the
165    /// `Mutex` is uncontended.
166    pub encode_push_per_frame: Mutex<Vec<(u32, Duration)>>,
167}
168
169impl Default for PipelineStats {
170    fn default() -> Self {
171        Self {
172            decode: PhaseTimer::default(),
173            demosaic: PhaseTimer::default(),
174            normalize: PhaseTimer::default(),
175            wb_hl_ccm: PhaseTimer::default(),
176            oetf: PhaseTimer::default(),
177            pack: PhaseTimer::default(),
178            gpu: PhaseTimer::default(),
179            encode_push: PhaseTimer::default(),
180            setup: PhaseTimer::default(),
181            finalize: PhaseTimer::default(),
182            frames_total: AtomicU64::new(0),
183            gpu_frames: AtomicU64::new(0),
184            encode_push_per_frame: Mutex::new(Vec::new()),
185        }
186    }
187}
188
189impl PipelineStats {
190    pub fn new() -> Self { Self::default() }
191
192    /// C5: record an encode_push duration tagged with the frame id.
193    /// Updates the histogram-style `encode_push` PhaseTimer (so the
194    /// existing summary line still has its avg/min/max/stddev) and
195    /// appends `(frame_id, duration)` to the per-frame ring.
196    pub fn record_encode_push_frame(&self, frame_id: u32, d: Duration) {
197        self.encode_push.record(d);
198        if let Ok(mut ring) = self.encode_push_per_frame.lock() {
199            ring.push((frame_id, d));
200        }
201    }
202
203    /// Build a serializable report. Cheap to call (8 atomic loads).
204    pub fn report(&self) -> StatsReport {
205        let total_frames = self.frames_total.load(Ordering::Relaxed);
206        let gpu_frames = self.gpu_frames.load(Ordering::Relaxed);
207        let total_wall = self.setup.total()
208            .checked_add(self.decode.total()).unwrap_or_default()
209            .checked_add(self.encode_push.total()).unwrap_or_default()
210            .checked_add(self.finalize.total()).unwrap_or_default();
211        let overall_fps = if total_wall.as_secs_f64() > 0.0 && total_frames > 0 {
212            total_frames as f64 / total_wall.as_secs_f64()
213        } else { 0.0 };
214        let gpu_pct = if total_frames > 0 {
215            100.0 * gpu_frames as f64 / total_frames as f64
216        } else { 0.0 };
217        let encode_push_per_frame = self.encode_push_per_frame.lock()
218            .map(|g| g.iter().map(|&(id, d)| (id, d.as_micros() as u64)).collect())
219            .unwrap_or_default();
220        StatsReport {
221            total_frames,
222            total_wall_secs: total_wall.as_secs_f64(),
223            overall_fps,
224            gpu_frames,
225            gpu_path_pct: gpu_pct,
226            phases: vec![
227                ("setup".to_string(),      self.setup.snapshot()),
228                ("decode".to_string(),     self.decode.snapshot()),
229                ("demosaic".to_string(),   self.demosaic.snapshot()),
230                ("normalize".to_string(),  self.normalize.snapshot()),
231                ("wb_hl_ccm".to_string(),  self.wb_hl_ccm.snapshot()),
232                ("oetf".to_string(),       self.oetf.snapshot()),
233                ("pack".to_string(),       self.pack.snapshot()),
234                ("gpu".to_string(),        self.gpu.snapshot()),
235                ("encode_push".to_string(),self.encode_push.snapshot()),
236                ("finalize".to_string(),   self.finalize.snapshot()),
237            ],
238            encode_push_per_frame_us: encode_push_per_frame,
239        }
240    }
241}
242
243#[derive(Serialize, Deserialize, Debug, Clone)]
244pub struct StatsReport {
245    pub total_frames: u64,
246    pub total_wall_secs: f64,
247    pub overall_fps: f64,
248    pub gpu_frames: u64,
249    pub gpu_path_pct: f64,
250    pub phases: Vec<(String, PhaseSnapshot)>,
251    /// C5: per-frame encode_push timing ring, in microseconds.
252    /// `Vec<(frame_id, duration_us)>` — lets us post-mortem spikes that
253    /// the histogram summary would smooth over.
254    #[serde(default)]
255    pub encode_push_per_frame_us: Vec<(u32, u64)>,
256}
257
258#[derive(Serialize, Deserialize, Debug, Clone)]
259pub struct PhaseSnapshot {
260    pub frames: u64,
261    pub avg_us: u64,
262    pub min_us: u64,
263    pub max_us: u64,
264    pub stddev_us: u64,
265    pub fps: f64,
266}
267
268impl StatsReport {
269    /// Write the report as pretty-printed JSON to `path`. Creates parent
270    /// directories as needed. Best-effort: errors are returned to the
271    /// caller (which logs them in `app.rs`).
272    pub fn write_json(&self, path: &std::path::Path) -> std::io::Result<()> {
273        if let Some(parent) = path.parent() {
274            let _ = std::fs::create_dir_all(parent);
275        }
276        let s = serde_json::to_string_pretty(self)
277            .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?;
278        std::fs::write(path, s)
279    }
280
281    /// Pretty-print a one-line summary to stdout. Used by integration
282    /// tests and the optional `MCRAW_STATS_DUMP` console echo.
283    pub fn print_summary(&self) {
284        eprintln!("=== pipeline stats ===");
285        eprintln!("frames: {}  wall: {:.2}s  overall: {:.2} fps  gpu: {}/{} ({:.1}%)",
286            self.total_frames, self.total_wall_secs, self.overall_fps,
287            self.gpu_frames, self.total_frames, self.gpu_path_pct);
288        for (name, p) in &self.phases {
289            if p.frames == 0 { continue; }
290            eprintln!("  {:<13} frames={:>5}  avg={:>7} us  min={:>7}  max={:>8}  stddev={:>7}  fps={:>6.2}",
291                name, p.frames, p.avg_us, p.min_us, p.max_us, p.stddev_us, p.fps);
292        }
293        eprintln!("======================");
294    }
295}
296
297/// RAII guard: records the elapsed time into the wrapped `PhaseTimer`
298/// when dropped. Move-only; do not re-bind.
299pub struct PhaseGuard<'a> {
300    timer: &'a PhaseTimer,
301    start: Instant,
302}
303
304impl<'a> PhaseGuard<'a> {
305    #[inline]
306    pub fn new(timer: &'a PhaseTimer) -> Self {
307        Self { timer, start: Instant::now() }
308    }
309}
310
311impl Drop for PhaseGuard<'_> {
312    #[inline]
313    fn drop(&mut self) {
314        self.timer.record(self.start.elapsed());
315    }
316}
317
318#[cfg(test)]
319mod tests {
320    use super::*;
321    use std::sync::Arc;
322    use std::thread;
323
324    #[test]
325    fn phase_timer_avg_min_max_fps() {
326        let t = PhaseTimer::new();
327        for ms in [10u64, 20, 30, 40, 50, 60, 70, 80, 90, 100] {
328            t.record(Duration::from_millis(ms));
329        }
330        let s = t.snapshot();
331        assert_eq!(s.frames, 10);
332        assert_eq!(s.avg_us, 55_000);                  // 55 ms
333        assert_eq!(s.min_us, 10_000);                  // 10 ms
334        assert_eq!(s.max_us, 100_000);                 // 100 ms
335        // fps = frames / total_seconds = 10 / 0.55 = ~18.18
336        assert!((s.fps - 18.18).abs() < 0.1, "fps={}", s.fps);
337    }
338
339    #[test]
340    fn phase_timer_zero_samples() {
341        let t = PhaseTimer::new();
342        let s = t.snapshot();
343        assert_eq!(s.frames, 0);
344        assert_eq!(s.avg_us, 0);
345        assert_eq!(s.fps, 0.0);
346    }
347
348    #[test]
349    fn phase_timer_concurrent() {
350        let t = Arc::new(PhaseTimer::new());
351        let mut handles = vec![];
352        for _ in 0..8 {
353            let t = Arc::clone(&t);
354            handles.push(thread::spawn(move || {
355                for _ in 0..1_000 {
356                    t.record(Duration::from_micros(100));
357                }
358            }));
359        }
360        for h in handles { h.join().unwrap(); }
361        assert_eq!(t.frames(), 8_000);
362        assert_eq!(t.total(), Duration::from_micros(800_000));
363        let s = t.snapshot();
364        assert_eq!(s.avg_us, 100);
365        assert_eq!(s.min_us, 100);
366        assert_eq!(s.max_us, 100);
367        assert_eq!(s.stddev_us, 0);
368    }
369
370    #[test]
371    fn phase_guard_records_on_drop() {
372        let t = PhaseTimer::new();
373        {
374            let _g = PhaseGuard::new(&t);
375            thread::sleep(Duration::from_millis(5));
376        }
377        let s = t.snapshot();
378        assert_eq!(s.frames, 1);
379        assert!(s.avg_us >= 4_000, "guard should record >=4ms, got {}us", s.avg_us);
380        assert!(s.avg_us <  100_000, "guard should record <100ms, got {}us", s.avg_us);
381    }
382
383    #[test]
384    fn stats_report_serializes_to_json() {
385        let s = PipelineStats::new();
386        s.frames_total.store(100, Ordering::Relaxed);
387        s.gpu_frames.store(75, Ordering::Relaxed);
388        s.decode.record(Duration::from_millis(12));
389        s.decode.record(Duration::from_millis(18));
390        s.demosaic.record(Duration::from_millis(30));
391        let r = s.report();
392        let json = serde_json::to_string(&r).expect("serialize");
393        assert!(json.contains("\"total_frames\":100"));
394        assert!(json.contains("\"gpu_path_pct\":75"));
395        assert!(json.contains("\"decode\""));
396        assert!(json.contains("\"demosaic\""));
397        // round-trip
398        let back: StatsReport = serde_json::from_str(&json).expect("parse");
399        assert_eq!(back.total_frames, 100);
400        assert_eq!(back.gpu_frames, 75);
401    }
402
403    #[test]
404    fn stats_report_write_json_creates_parent() {
405        let s = PipelineStats::new();
406        s.frames_total.store(1, Ordering::Relaxed);
407        let r = s.report();
408        let dir = std::env::temp_dir().join("mcraw-tui-stats-test");
409        let path = dir.join("nested").join("report.json");
410        r.write_json(&path).expect("write");
411        let read_back = std::fs::read_to_string(&path).expect("read");
412        // pretty-printed JSON has spaces around colons, so just check
413        // for the field name and the value separately
414        assert!(read_back.contains("\"total_frames\""));
415        assert!(read_back.contains(": 1"));
416        // round-trip: parse what we wrote and verify the count
417        let parsed: StatsReport = serde_json::from_str(&read_back).expect("parse");
418        assert_eq!(parsed.total_frames, 1);
419        let _ = std::fs::remove_file(&path);
420    }
421}