Skip to main content

snapdir_core/
progress.rs

1//! A pure, lock-free progress [`Meter`] for the filesystem walk and (later) the
2//! transfer path.
3//!
4//! Per the library-purity principle this module does **no** terminal I/O and
5//! reads **no** `$HOME`/config/environment for behavior. The [`Meter`] is a bag
6//! of [`std::sync::atomic`] counters updated with [`Ordering::Relaxed`]: the
7//! recording side ([`walk_with_meter`](crate::walk_with_meter)) bumps the
8//! counters as it hashes files, and a (separately-laned) CLI renderer takes a
9//! cheap [`MeterSnapshot`] to draw a progress bar. All methods take `&self`, so
10//! the meter is shared across threads behind an [`Arc`](std::sync::Arc) without
11//! a lock.
12//!
13//! The meter is intentionally *advisory*: recording into it never changes the
14//! walk's output. A walk with a meter and the same walk without one produce
15//! byte-identical manifests.
16
17use std::sync::atomic::{AtomicU64, AtomicU8, Ordering};
18
19/// The coarse phase a [`Meter`] is currently recording.
20///
21/// Maps to/from a `u8` for lock-free storage in an [`AtomicU8`]. The default is
22/// [`Phase::Idle`] so a freshly-constructed [`MeterSnapshot`] (and an untouched
23/// meter) reports an idle phase.
24#[derive(Clone, Copy, Debug, PartialEq, Eq, Default)]
25pub enum Phase {
26    /// No work is being recorded yet.
27    #[default]
28    Idle,
29    /// Files are being read and hashed (the walk's content pass).
30    Hashing,
31    /// Objects are being transferred to/from a store.
32    Transfer,
33}
34
35impl Phase {
36    /// Encodes the phase as a `u8` for atomic storage.
37    const fn as_u8(self) -> u8 {
38        match self {
39            Phase::Idle => 0,
40            Phase::Hashing => 1,
41            Phase::Transfer => 2,
42        }
43    }
44
45    /// Decodes a `u8` back into a [`Phase`]. Any out-of-range value (which the
46    /// meter never stores) decodes to [`Phase::Idle`].
47    const fn from_u8(value: u8) -> Self {
48        match value {
49            1 => Phase::Hashing,
50            2 => Phase::Transfer,
51            _ => Phase::Idle,
52        }
53    }
54}
55
56/// A point-in-time copy of a [`Meter`]'s counters.
57///
58/// Cheap to copy ([`Copy`]) and free of any atomics, so a renderer can read a
59/// consistent-enough view without holding a reference to the live meter.
60#[derive(Clone, Copy, Debug, Default)]
61pub struct MeterSnapshot {
62    /// Total bytes read in (e.g. file content hashed during the walk).
63    pub bytes_in: u64,
64    /// Total bytes written/sent out (e.g. uploaded to a store).
65    pub bytes_out: u64,
66    /// Objects finished (e.g. files hashed).
67    pub objects_done: u64,
68    /// Expected total objects, when known (`0` means unknown).
69    pub objects_total: u64,
70    /// Objects skipped (e.g. already present, deduplicated).
71    pub objects_skipped: u64,
72    /// Objects currently in flight (a gauge: started minus finished).
73    pub in_flight: u64,
74    /// The current coarse [`Phase`].
75    pub phase: Phase,
76}
77
78/// A lock-free progress meter shared across threads behind an
79/// [`Arc`](std::sync::Arc).
80///
81/// Every method takes `&self` and uses [`Ordering::Relaxed`] — the counters are
82/// advisory progress, never a synchronization primitive, so relaxed ordering is
83/// both correct and cheap (a couple of atomic ops per file). [`Meter`] is
84/// [`Send`] + [`Sync`] because all of its fields are atomics.
85#[derive(Debug, Default)]
86pub struct Meter {
87    bytes_in: AtomicU64,
88    bytes_out: AtomicU64,
89    objects_done: AtomicU64,
90    objects_total: AtomicU64,
91    objects_skipped: AtomicU64,
92    in_flight: AtomicU64,
93    phase: AtomicU8,
94}
95
96impl Meter {
97    /// Creates a fresh meter with all counters at zero and [`Phase::Idle`].
98    #[must_use]
99    pub fn new() -> Self {
100        Self::default()
101    }
102
103    /// Adds `n` to the bytes-in counter (content read/hashed).
104    pub fn add_in(&self, n: u64) {
105        self.bytes_in.fetch_add(n, Ordering::Relaxed);
106    }
107
108    /// Adds `n` to the bytes-out counter (content written/sent).
109    pub fn add_out(&self, n: u64) {
110        self.bytes_out.fetch_add(n, Ordering::Relaxed);
111    }
112
113    /// Records that an object started: bumps the in-flight gauge by one.
114    pub fn object_started(&self) {
115        self.in_flight.fetch_add(1, Ordering::Relaxed);
116    }
117
118    /// Records that an object finished: drops the in-flight gauge by one and
119    /// bumps the done counter by one. Saturates the gauge at zero so a stray
120    /// finish never underflows.
121    pub fn object_finished(&self) {
122        // Decrement the gauge without underflowing past zero.
123        let mut current = self.in_flight.load(Ordering::Relaxed);
124        while current > 0 {
125            match self.in_flight.compare_exchange_weak(
126                current,
127                current - 1,
128                Ordering::Relaxed,
129                Ordering::Relaxed,
130            ) {
131                Ok(_) => break,
132                Err(observed) => current = observed,
133            }
134        }
135        self.objects_done.fetch_add(1, Ordering::Relaxed);
136    }
137
138    /// Sets the expected total object count.
139    pub fn set_total(&self, n: u64) {
140        self.objects_total.store(n, Ordering::Relaxed);
141    }
142
143    /// Adds `n` to the skipped-objects counter.
144    pub fn add_skipped(&self, n: u64) {
145        self.objects_skipped.fetch_add(n, Ordering::Relaxed);
146    }
147
148    /// Sets the current coarse [`Phase`].
149    pub fn set_phase(&self, p: Phase) {
150        self.phase.store(p.as_u8(), Ordering::Relaxed);
151    }
152
153    /// Reads the current coarse [`Phase`].
154    #[must_use]
155    pub fn phase(&self) -> Phase {
156        Phase::from_u8(self.phase.load(Ordering::Relaxed))
157    }
158
159    /// Takes a point-in-time [`MeterSnapshot`] of every counter.
160    ///
161    /// The loads are independently relaxed, so the snapshot is *eventually*
162    /// consistent rather than a single atomic view — that is fine for an
163    /// advisory progress display.
164    #[must_use]
165    pub fn snapshot(&self) -> MeterSnapshot {
166        MeterSnapshot {
167            bytes_in: self.bytes_in.load(Ordering::Relaxed),
168            bytes_out: self.bytes_out.load(Ordering::Relaxed),
169            objects_done: self.objects_done.load(Ordering::Relaxed),
170            objects_total: self.objects_total.load(Ordering::Relaxed),
171            objects_skipped: self.objects_skipped.load(Ordering::Relaxed),
172            in_flight: self.in_flight.load(Ordering::Relaxed),
173            phase: self.phase(),
174        }
175    }
176}
177
178#[cfg(test)]
179mod tests {
180    use super::*;
181
182    #[test]
183    fn progress_meter_counters_and_snapshot() {
184        let meter = Meter::new();
185        // Default snapshot is all-zero and Idle.
186        let initial = meter.snapshot();
187        assert_eq!(initial.bytes_in, 0);
188        assert_eq!(initial.bytes_out, 0);
189        assert_eq!(initial.objects_done, 0);
190        assert_eq!(initial.objects_total, 0);
191        assert_eq!(initial.objects_skipped, 0);
192        assert_eq!(initial.in_flight, 0);
193        assert_eq!(initial.phase, Phase::Idle);
194
195        meter.add_in(100);
196        meter.add_in(23);
197        meter.add_out(7);
198        meter.set_total(10);
199        meter.add_skipped(2);
200        meter.add_skipped(1);
201        meter.set_phase(Phase::Hashing);
202
203        // One object in flight after a started/finished pair leaves a second
204        // still in flight.
205        meter.object_started();
206        meter.object_started();
207        meter.object_finished();
208
209        let snap = meter.snapshot();
210        assert_eq!(snap.bytes_in, 123);
211        assert_eq!(snap.bytes_out, 7);
212        assert_eq!(snap.objects_done, 1);
213        assert_eq!(snap.objects_total, 10);
214        assert_eq!(snap.objects_skipped, 3);
215        assert_eq!(snap.in_flight, 1);
216        assert_eq!(snap.phase, Phase::Hashing);
217
218        // Phase round-trips through the atomic for every variant.
219        for p in [Phase::Idle, Phase::Hashing, Phase::Transfer] {
220            meter.set_phase(p);
221            assert_eq!(meter.phase(), p);
222            assert_eq!(meter.snapshot().phase, p);
223        }
224    }
225
226    #[test]
227    fn progress_meter_in_flight_gauge() {
228        let meter = Meter::new();
229        meter.object_started();
230        meter.object_started();
231        meter.object_started();
232        meter.object_finished();
233        meter.object_finished();
234
235        let snap = meter.snapshot();
236        assert_eq!(snap.in_flight, 1, "3 started - 2 finished == 1 in flight");
237        assert_eq!(snap.objects_done, 2, "2 finished == 2 done");
238
239        // A finish past zero saturates the gauge instead of underflowing.
240        meter.object_finished();
241        meter.object_finished();
242        let snap = meter.snapshot();
243        assert_eq!(snap.in_flight, 0, "gauge saturates at 0, no underflow");
244        assert_eq!(snap.objects_done, 4);
245    }
246}