snapdir_core/progress.rs
1//! A pure, lock-free progress [`Meter`] for the filesystem walk and (later) the
2//! transfer path.
3//!
4//! Per the library-purity principle this module does **no** terminal I/O and
5//! reads **no** `$HOME`/config/environment for behavior. The [`Meter`] is a bag
6//! of [`std::sync::atomic`] counters updated with [`Ordering::Relaxed`]: the
7//! recording side ([`walk_with_meter`](crate::walk_with_meter)) bumps the
8//! counters as it hashes files, and a (separately-laned) CLI renderer takes a
9//! cheap [`MeterSnapshot`] to draw a progress bar. All methods take `&self`, so
10//! the meter is shared across threads behind an [`Arc`](std::sync::Arc) without
11//! a lock.
12//!
13//! The meter is intentionally *advisory*: recording into it never changes the
14//! walk's output. A walk with a meter and the same walk without one produce
15//! byte-identical manifests.
16
17use std::sync::atomic::{AtomicU64, AtomicU8, Ordering};
18
19/// The coarse phase a [`Meter`] is currently recording.
20///
21/// Maps to/from a `u8` for lock-free storage in an [`AtomicU8`]. The default is
22/// [`Phase::Idle`] so a freshly-constructed [`MeterSnapshot`] (and an untouched
23/// meter) reports an idle phase.
24#[derive(Clone, Copy, Debug, PartialEq, Eq, Default)]
25pub enum Phase {
26 /// No work is being recorded yet.
27 #[default]
28 Idle,
29 /// Files are being read and hashed (the walk's content pass).
30 Hashing,
31 /// Objects are being transferred to/from a store.
32 Transfer,
33}
34
35impl Phase {
36 /// Encodes the phase as a `u8` for atomic storage.
37 const fn as_u8(self) -> u8 {
38 match self {
39 Phase::Idle => 0,
40 Phase::Hashing => 1,
41 Phase::Transfer => 2,
42 }
43 }
44
45 /// Decodes a `u8` back into a [`Phase`]. Any out-of-range value (which the
46 /// meter never stores) decodes to [`Phase::Idle`].
47 const fn from_u8(value: u8) -> Self {
48 match value {
49 1 => Phase::Hashing,
50 2 => Phase::Transfer,
51 _ => Phase::Idle,
52 }
53 }
54}
55
56/// A point-in-time copy of a [`Meter`]'s counters.
57///
58/// Cheap to copy ([`Copy`]) and free of any atomics, so a renderer can read a
59/// consistent-enough view without holding a reference to the live meter.
60#[derive(Clone, Copy, Debug, Default)]
61pub struct MeterSnapshot {
62 /// Total bytes read in (e.g. file content hashed during the walk).
63 pub bytes_in: u64,
64 /// Total bytes written/sent out (e.g. uploaded to a store).
65 pub bytes_out: u64,
66 /// Objects finished (e.g. files hashed).
67 pub objects_done: u64,
68 /// Expected total objects, when known (`0` means unknown).
69 pub objects_total: u64,
70 /// Objects skipped (e.g. already present, deduplicated).
71 pub objects_skipped: u64,
72 /// Objects currently in flight (a gauge: started minus finished).
73 pub in_flight: u64,
74 /// The current coarse [`Phase`].
75 pub phase: Phase,
76 /// Advisory: the current adaptive throughput limit in bytes/sec, or `0`
77 /// when not adaptive / unset. Display-only; never throttles the walk.
78 pub current_limit: u64,
79 /// Advisory: the adaptive controller's target throughput in bytes/sec, or
80 /// `0` when not adaptive / unset. Display-only; never throttles the walk.
81 pub target_rate: u64,
82}
83
84/// A lock-free progress meter shared across threads behind an
85/// [`Arc`](std::sync::Arc).
86///
87/// Every method takes `&self` and uses [`Ordering::Relaxed`] — the counters are
88/// advisory progress, never a synchronization primitive, so relaxed ordering is
89/// both correct and cheap (a couple of atomic ops per file). [`Meter`] is
90/// [`Send`] + [`Sync`] because all of its fields are atomics.
91#[derive(Debug, Default)]
92pub struct Meter {
93 bytes_in: AtomicU64,
94 bytes_out: AtomicU64,
95 objects_done: AtomicU64,
96 objects_total: AtomicU64,
97 objects_skipped: AtomicU64,
98 in_flight: AtomicU64,
99 phase: AtomicU8,
100 /// Advisory adaptive throughput limit in bytes/sec (`0` = unset). Display
101 /// only — set by the adaptive controller for the renderer to show; reading
102 /// or writing it never affects the walk's output.
103 current_limit: AtomicU64,
104 /// Advisory adaptive target throughput in bytes/sec (`0` = unset). Display
105 /// only, with the same advisory semantics as `current_limit`.
106 target_rate: AtomicU64,
107}
108
109impl Meter {
110 /// Creates a fresh meter with all counters at zero and [`Phase::Idle`].
111 #[must_use]
112 pub fn new() -> Self {
113 Self::default()
114 }
115
116 /// Adds `n` to the bytes-in counter (content read/hashed).
117 pub fn add_in(&self, n: u64) {
118 self.bytes_in.fetch_add(n, Ordering::Relaxed);
119 }
120
121 /// Adds `n` to the bytes-out counter (content written/sent).
122 pub fn add_out(&self, n: u64) {
123 self.bytes_out.fetch_add(n, Ordering::Relaxed);
124 }
125
126 /// Records that an object started: bumps the in-flight gauge by one.
127 pub fn object_started(&self) {
128 self.in_flight.fetch_add(1, Ordering::Relaxed);
129 }
130
131 /// Records that an object finished: drops the in-flight gauge by one and
132 /// bumps the done counter by one. Saturates the gauge at zero so a stray
133 /// finish never underflows.
134 pub fn object_finished(&self) {
135 // Decrement the gauge without underflowing past zero.
136 let mut current = self.in_flight.load(Ordering::Relaxed);
137 while current > 0 {
138 match self.in_flight.compare_exchange_weak(
139 current,
140 current - 1,
141 Ordering::Relaxed,
142 Ordering::Relaxed,
143 ) {
144 Ok(_) => break,
145 Err(observed) => current = observed,
146 }
147 }
148 self.objects_done.fetch_add(1, Ordering::Relaxed);
149 }
150
151 /// Sets the expected total object count.
152 pub fn set_total(&self, n: u64) {
153 self.objects_total.store(n, Ordering::Relaxed);
154 }
155
156 /// Adds `n` to the skipped-objects counter.
157 pub fn add_skipped(&self, n: u64) {
158 self.objects_skipped.fetch_add(n, Ordering::Relaxed);
159 }
160
161 /// Sets the advisory adaptive throughput limit (bytes/sec; `0` = unset).
162 ///
163 /// Display-only: the renderer reads this to show the live adaptive value.
164 /// It does not throttle or otherwise change the walk's behavior or output.
165 pub fn set_current_limit(&self, n: u64) {
166 self.current_limit.store(n, Ordering::Relaxed);
167 }
168
169 /// Sets the advisory adaptive target throughput (bytes/sec; `0` = unset).
170 ///
171 /// Display-only, with the same advisory semantics as
172 /// [`set_current_limit`](Meter::set_current_limit).
173 pub fn set_target_rate(&self, n: u64) {
174 self.target_rate.store(n, Ordering::Relaxed);
175 }
176
177 /// Sets the current coarse [`Phase`].
178 pub fn set_phase(&self, p: Phase) {
179 self.phase.store(p.as_u8(), Ordering::Relaxed);
180 }
181
182 /// Reads the current coarse [`Phase`].
183 #[must_use]
184 pub fn phase(&self) -> Phase {
185 Phase::from_u8(self.phase.load(Ordering::Relaxed))
186 }
187
188 /// Takes a point-in-time [`MeterSnapshot`] of every counter.
189 ///
190 /// The loads are independently relaxed, so the snapshot is *eventually*
191 /// consistent rather than a single atomic view — that is fine for an
192 /// advisory progress display.
193 #[must_use]
194 pub fn snapshot(&self) -> MeterSnapshot {
195 MeterSnapshot {
196 bytes_in: self.bytes_in.load(Ordering::Relaxed),
197 bytes_out: self.bytes_out.load(Ordering::Relaxed),
198 objects_done: self.objects_done.load(Ordering::Relaxed),
199 objects_total: self.objects_total.load(Ordering::Relaxed),
200 objects_skipped: self.objects_skipped.load(Ordering::Relaxed),
201 in_flight: self.in_flight.load(Ordering::Relaxed),
202 phase: self.phase(),
203 current_limit: self.current_limit.load(Ordering::Relaxed),
204 target_rate: self.target_rate.load(Ordering::Relaxed),
205 }
206 }
207}
208
209#[cfg(test)]
210mod tests {
211 use super::*;
212
213 #[test]
214 fn progress_meter_counters_and_snapshot() {
215 let meter = Meter::new();
216 // Default snapshot is all-zero and Idle.
217 let initial = meter.snapshot();
218 assert_eq!(initial.bytes_in, 0);
219 assert_eq!(initial.bytes_out, 0);
220 assert_eq!(initial.objects_done, 0);
221 assert_eq!(initial.objects_total, 0);
222 assert_eq!(initial.objects_skipped, 0);
223 assert_eq!(initial.in_flight, 0);
224 assert_eq!(initial.phase, Phase::Idle);
225
226 meter.add_in(100);
227 meter.add_in(23);
228 meter.add_out(7);
229 meter.set_total(10);
230 meter.add_skipped(2);
231 meter.add_skipped(1);
232 meter.set_phase(Phase::Hashing);
233
234 // One object in flight after a started/finished pair leaves a second
235 // still in flight.
236 meter.object_started();
237 meter.object_started();
238 meter.object_finished();
239
240 let snap = meter.snapshot();
241 assert_eq!(snap.bytes_in, 123);
242 assert_eq!(snap.bytes_out, 7);
243 assert_eq!(snap.objects_done, 1);
244 assert_eq!(snap.objects_total, 10);
245 assert_eq!(snap.objects_skipped, 3);
246 assert_eq!(snap.in_flight, 1);
247 assert_eq!(snap.phase, Phase::Hashing);
248
249 // Phase round-trips through the atomic for every variant.
250 for p in [Phase::Idle, Phase::Hashing, Phase::Transfer] {
251 meter.set_phase(p);
252 assert_eq!(meter.phase(), p);
253 assert_eq!(meter.snapshot().phase, p);
254 }
255 }
256
257 #[test]
258 fn progress_meter_in_flight_gauge() {
259 let meter = Meter::new();
260 meter.object_started();
261 meter.object_started();
262 meter.object_started();
263 meter.object_finished();
264 meter.object_finished();
265
266 let snap = meter.snapshot();
267 assert_eq!(snap.in_flight, 1, "3 started - 2 finished == 1 in flight");
268 assert_eq!(snap.objects_done, 2, "2 finished == 2 done");
269
270 // A finish past zero saturates the gauge instead of underflowing.
271 meter.object_finished();
272 meter.object_finished();
273 let snap = meter.snapshot();
274 assert_eq!(snap.in_flight, 0, "gauge saturates at 0, no underflow");
275 assert_eq!(snap.objects_done, 4);
276 }
277
278 #[test]
279 fn resources_meter_adaptive_fields_round_trip() {
280 let meter = Meter::new();
281 // Default advisory atoms are 0 (not adaptive / unset).
282 let initial = meter.snapshot();
283 assert_eq!(initial.current_limit, 0);
284 assert_eq!(initial.target_rate, 0);
285
286 meter.set_current_limit(5_000_000);
287 meter.set_target_rate(8_000_000);
288 let snap = meter.snapshot();
289 assert_eq!(snap.current_limit, 5_000_000);
290 assert_eq!(snap.target_rate, 8_000_000);
291
292 // Setters overwrite (store, not add) and 0 clears back to unset.
293 meter.set_current_limit(0);
294 let snap = meter.snapshot();
295 assert_eq!(snap.current_limit, 0);
296 assert_eq!(snap.target_rate, 8_000_000, "target unchanged");
297 }
298}