Skip to main content

snapdir_core/
progress.rs

1//! A pure, lock-free progress [`Meter`] for the filesystem walk and (later) the
2//! transfer path.
3//!
4//! Per the library-purity principle this module does **no** terminal I/O and
5//! reads **no** `$HOME`/config/environment for behavior. The [`Meter`] is a bag
6//! of [`std::sync::atomic`] counters updated with [`Ordering::Relaxed`]: the
7//! recording side ([`walk_with_meter`](crate::walk_with_meter)) bumps the
8//! counters as it hashes files, and a (separately-laned) CLI renderer takes a
9//! cheap [`MeterSnapshot`] to draw a progress bar. All methods take `&self`, so
10//! the meter is shared across threads behind an [`Arc`](std::sync::Arc) without
11//! a lock.
12//!
13//! The meter is intentionally *advisory*: recording into it never changes the
14//! walk's output. A walk with a meter and the same walk without one produce
15//! byte-identical manifests.
16
17use std::sync::atomic::{AtomicU64, AtomicU8, Ordering};
18
19/// The coarse phase a [`Meter`] is currently recording.
20///
21/// Maps to/from a `u8` for lock-free storage in an [`AtomicU8`]. The default is
22/// [`Phase::Idle`] so a freshly-constructed [`MeterSnapshot`] (and an untouched
23/// meter) reports an idle phase.
24#[derive(Clone, Copy, Debug, PartialEq, Eq, Default)]
25pub enum Phase {
26    /// No work is being recorded yet.
27    #[default]
28    Idle,
29    /// Files are being read and hashed (the walk's content pass).
30    Hashing,
31    /// Objects are being transferred to/from a store.
32    Transfer,
33}
34
35impl Phase {
36    /// Encodes the phase as a `u8` for atomic storage.
37    const fn as_u8(self) -> u8 {
38        match self {
39            Phase::Idle => 0,
40            Phase::Hashing => 1,
41            Phase::Transfer => 2,
42        }
43    }
44
45    /// Decodes a `u8` back into a [`Phase`]. Any out-of-range value (which the
46    /// meter never stores) decodes to [`Phase::Idle`].
47    const fn from_u8(value: u8) -> Self {
48        match value {
49            1 => Phase::Hashing,
50            2 => Phase::Transfer,
51            _ => Phase::Idle,
52        }
53    }
54}
55
56/// A point-in-time copy of a [`Meter`]'s counters.
57///
58/// Cheap to copy ([`Copy`]) and free of any atomics, so a renderer can read a
59/// consistent-enough view without holding a reference to the live meter.
60#[derive(Clone, Copy, Debug, Default)]
61pub struct MeterSnapshot {
62    /// Total bytes read in (e.g. file content hashed during the walk).
63    pub bytes_in: u64,
64    /// Total bytes written/sent out (e.g. uploaded to a store).
65    pub bytes_out: u64,
66    /// Objects finished (e.g. files hashed).
67    pub objects_done: u64,
68    /// Expected total objects, when known (`0` means unknown).
69    pub objects_total: u64,
70    /// Objects skipped (e.g. already present, deduplicated).
71    pub objects_skipped: u64,
72    /// Objects currently in flight (a gauge: started minus finished).
73    pub in_flight: u64,
74    /// The current coarse [`Phase`].
75    pub phase: Phase,
76    /// Advisory: the current adaptive throughput limit in bytes/sec, or `0`
77    /// when not adaptive / unset. Display-only; never throttles the walk.
78    pub current_limit: u64,
79    /// Advisory: the adaptive controller's target throughput in bytes/sec, or
80    /// `0` when not adaptive / unset. Display-only; never throttles the walk.
81    pub target_rate: u64,
82}
83
84/// A lock-free progress meter shared across threads behind an
85/// [`Arc`](std::sync::Arc).
86///
87/// Every method takes `&self` and uses [`Ordering::Relaxed`] — the counters are
88/// advisory progress, never a synchronization primitive, so relaxed ordering is
89/// both correct and cheap (a couple of atomic ops per file). [`Meter`] is
90/// [`Send`] + [`Sync`] because all of its fields are atomics.
91#[derive(Debug, Default)]
92pub struct Meter {
93    bytes_in: AtomicU64,
94    bytes_out: AtomicU64,
95    objects_done: AtomicU64,
96    objects_total: AtomicU64,
97    objects_skipped: AtomicU64,
98    in_flight: AtomicU64,
99    phase: AtomicU8,
100    /// Advisory adaptive throughput limit in bytes/sec (`0` = unset). Display
101    /// only — set by the adaptive controller for the renderer to show; reading
102    /// or writing it never affects the walk's output.
103    current_limit: AtomicU64,
104    /// Advisory adaptive target throughput in bytes/sec (`0` = unset). Display
105    /// only, with the same advisory semantics as `current_limit`.
106    target_rate: AtomicU64,
107}
108
109impl Meter {
110    /// Creates a fresh meter with all counters at zero and [`Phase::Idle`].
111    #[must_use]
112    pub fn new() -> Self {
113        Self::default()
114    }
115
116    /// Adds `n` to the bytes-in counter (content read/hashed).
117    pub fn add_in(&self, n: u64) {
118        self.bytes_in.fetch_add(n, Ordering::Relaxed);
119    }
120
121    /// Adds `n` to the bytes-out counter (content written/sent).
122    pub fn add_out(&self, n: u64) {
123        self.bytes_out.fetch_add(n, Ordering::Relaxed);
124    }
125
126    /// Records that an object started: bumps the in-flight gauge by one.
127    pub fn object_started(&self) {
128        self.in_flight.fetch_add(1, Ordering::Relaxed);
129    }
130
131    /// Records that an object finished: drops the in-flight gauge by one and
132    /// bumps the done counter by one. Saturates the gauge at zero so a stray
133    /// finish never underflows.
134    pub fn object_finished(&self) {
135        // Decrement the gauge without underflowing past zero.
136        let mut current = self.in_flight.load(Ordering::Relaxed);
137        while current > 0 {
138            match self.in_flight.compare_exchange_weak(
139                current,
140                current - 1,
141                Ordering::Relaxed,
142                Ordering::Relaxed,
143            ) {
144                Ok(_) => break,
145                Err(observed) => current = observed,
146            }
147        }
148        self.objects_done.fetch_add(1, Ordering::Relaxed);
149    }
150
151    /// Sets the expected total object count.
152    pub fn set_total(&self, n: u64) {
153        self.objects_total.store(n, Ordering::Relaxed);
154    }
155
156    /// Adds `n` to the skipped-objects counter.
157    pub fn add_skipped(&self, n: u64) {
158        self.objects_skipped.fetch_add(n, Ordering::Relaxed);
159    }
160
161    /// Sets the advisory adaptive throughput limit (bytes/sec; `0` = unset).
162    ///
163    /// Display-only: the renderer reads this to show the live adaptive value.
164    /// It does not throttle or otherwise change the walk's behavior or output.
165    pub fn set_current_limit(&self, n: u64) {
166        self.current_limit.store(n, Ordering::Relaxed);
167    }
168
169    /// Sets the advisory adaptive target throughput (bytes/sec; `0` = unset).
170    ///
171    /// Display-only, with the same advisory semantics as
172    /// [`set_current_limit`](Meter::set_current_limit).
173    pub fn set_target_rate(&self, n: u64) {
174        self.target_rate.store(n, Ordering::Relaxed);
175    }
176
177    /// Sets the current coarse [`Phase`].
178    pub fn set_phase(&self, p: Phase) {
179        self.phase.store(p.as_u8(), Ordering::Relaxed);
180    }
181
182    /// Reads the current coarse [`Phase`].
183    #[must_use]
184    pub fn phase(&self) -> Phase {
185        Phase::from_u8(self.phase.load(Ordering::Relaxed))
186    }
187
188    /// Takes a point-in-time [`MeterSnapshot`] of every counter.
189    ///
190    /// The loads are independently relaxed, so the snapshot is *eventually*
191    /// consistent rather than a single atomic view — that is fine for an
192    /// advisory progress display.
193    #[must_use]
194    pub fn snapshot(&self) -> MeterSnapshot {
195        MeterSnapshot {
196            bytes_in: self.bytes_in.load(Ordering::Relaxed),
197            bytes_out: self.bytes_out.load(Ordering::Relaxed),
198            objects_done: self.objects_done.load(Ordering::Relaxed),
199            objects_total: self.objects_total.load(Ordering::Relaxed),
200            objects_skipped: self.objects_skipped.load(Ordering::Relaxed),
201            in_flight: self.in_flight.load(Ordering::Relaxed),
202            phase: self.phase(),
203            current_limit: self.current_limit.load(Ordering::Relaxed),
204            target_rate: self.target_rate.load(Ordering::Relaxed),
205        }
206    }
207}
208
209#[cfg(test)]
210mod tests {
211    use super::*;
212
213    #[test]
214    fn progress_meter_counters_and_snapshot() {
215        let meter = Meter::new();
216        // Default snapshot is all-zero and Idle.
217        let initial = meter.snapshot();
218        assert_eq!(initial.bytes_in, 0);
219        assert_eq!(initial.bytes_out, 0);
220        assert_eq!(initial.objects_done, 0);
221        assert_eq!(initial.objects_total, 0);
222        assert_eq!(initial.objects_skipped, 0);
223        assert_eq!(initial.in_flight, 0);
224        assert_eq!(initial.phase, Phase::Idle);
225
226        meter.add_in(100);
227        meter.add_in(23);
228        meter.add_out(7);
229        meter.set_total(10);
230        meter.add_skipped(2);
231        meter.add_skipped(1);
232        meter.set_phase(Phase::Hashing);
233
234        // One object in flight after a started/finished pair leaves a second
235        // still in flight.
236        meter.object_started();
237        meter.object_started();
238        meter.object_finished();
239
240        let snap = meter.snapshot();
241        assert_eq!(snap.bytes_in, 123);
242        assert_eq!(snap.bytes_out, 7);
243        assert_eq!(snap.objects_done, 1);
244        assert_eq!(snap.objects_total, 10);
245        assert_eq!(snap.objects_skipped, 3);
246        assert_eq!(snap.in_flight, 1);
247        assert_eq!(snap.phase, Phase::Hashing);
248
249        // Phase round-trips through the atomic for every variant.
250        for p in [Phase::Idle, Phase::Hashing, Phase::Transfer] {
251            meter.set_phase(p);
252            assert_eq!(meter.phase(), p);
253            assert_eq!(meter.snapshot().phase, p);
254        }
255    }
256
257    #[test]
258    fn progress_meter_in_flight_gauge() {
259        let meter = Meter::new();
260        meter.object_started();
261        meter.object_started();
262        meter.object_started();
263        meter.object_finished();
264        meter.object_finished();
265
266        let snap = meter.snapshot();
267        assert_eq!(snap.in_flight, 1, "3 started - 2 finished == 1 in flight");
268        assert_eq!(snap.objects_done, 2, "2 finished == 2 done");
269
270        // A finish past zero saturates the gauge instead of underflowing.
271        meter.object_finished();
272        meter.object_finished();
273        let snap = meter.snapshot();
274        assert_eq!(snap.in_flight, 0, "gauge saturates at 0, no underflow");
275        assert_eq!(snap.objects_done, 4);
276    }
277
278    #[test]
279    fn resources_meter_adaptive_fields_round_trip() {
280        let meter = Meter::new();
281        // Default advisory atoms are 0 (not adaptive / unset).
282        let initial = meter.snapshot();
283        assert_eq!(initial.current_limit, 0);
284        assert_eq!(initial.target_rate, 0);
285
286        meter.set_current_limit(5_000_000);
287        meter.set_target_rate(8_000_000);
288        let snap = meter.snapshot();
289        assert_eq!(snap.current_limit, 5_000_000);
290        assert_eq!(snap.target_rate, 8_000_000);
291
292        // Setters overwrite (store, not add) and 0 clears back to unset.
293        meter.set_current_limit(0);
294        let snap = meter.snapshot();
295        assert_eq!(snap.current_limit, 0);
296        assert_eq!(snap.target_rate, 8_000_000, "target unchanged");
297    }
298}