snapdir_core/progress.rs
1//! A pure, lock-free progress [`Meter`] for the filesystem walk and (later) the
2//! transfer path.
3//!
4//! Per the library-purity principle this module does **no** terminal I/O and
5//! reads **no** `$HOME`/config/environment for behavior. The [`Meter`] is a bag
6//! of [`std::sync::atomic`] counters updated with [`Ordering::Relaxed`]: the
7//! recording side ([`walk_with_meter`](crate::walk_with_meter)) bumps the
8//! counters as it hashes files, and a (separately-laned) CLI renderer takes a
9//! cheap [`MeterSnapshot`] to draw a progress bar. All methods take `&self`, so
10//! the meter is shared across threads behind an [`Arc`](std::sync::Arc) without
11//! a lock.
12//!
13//! The meter is intentionally *advisory*: recording into it never changes the
14//! walk's output. A walk with a meter and the same walk without one produce
15//! byte-identical manifests.
16
17use std::sync::atomic::{AtomicU64, AtomicU8, Ordering};
18
19/// The coarse phase a [`Meter`] is currently recording.
20///
21/// Maps to/from a `u8` for lock-free storage in an [`AtomicU8`]. The default is
22/// [`Phase::Idle`] so a freshly-constructed [`MeterSnapshot`] (and an untouched
23/// meter) reports an idle phase.
24#[derive(Clone, Copy, Debug, PartialEq, Eq, Default)]
25pub enum Phase {
26 /// No work is being recorded yet.
27 #[default]
28 Idle,
29 /// Files are being read and hashed (the walk's content pass).
30 Hashing,
31 /// Objects are being transferred to/from a store.
32 Transfer,
33}
34
35impl Phase {
36 /// Encodes the phase as a `u8` for atomic storage.
37 const fn as_u8(self) -> u8 {
38 match self {
39 Phase::Idle => 0,
40 Phase::Hashing => 1,
41 Phase::Transfer => 2,
42 }
43 }
44
45 /// Decodes a `u8` back into a [`Phase`]. Any out-of-range value (which the
46 /// meter never stores) decodes to [`Phase::Idle`].
47 const fn from_u8(value: u8) -> Self {
48 match value {
49 1 => Phase::Hashing,
50 2 => Phase::Transfer,
51 _ => Phase::Idle,
52 }
53 }
54}
55
56/// A point-in-time copy of a [`Meter`]'s counters.
57///
58/// Cheap to copy ([`Copy`]) and free of any atomics, so a renderer can read a
59/// consistent-enough view without holding a reference to the live meter.
60#[derive(Clone, Copy, Debug, Default)]
61pub struct MeterSnapshot {
62 /// Total bytes read in (e.g. file content hashed during the walk).
63 pub bytes_in: u64,
64 /// Total bytes written/sent out (e.g. uploaded to a store).
65 pub bytes_out: u64,
66 /// Objects finished (e.g. files hashed).
67 pub objects_done: u64,
68 /// Expected total objects, when known (`0` means unknown).
69 pub objects_total: u64,
70 /// Objects skipped (e.g. already present, deduplicated).
71 pub objects_skipped: u64,
72 /// Objects currently in flight (a gauge: started minus finished).
73 pub in_flight: u64,
74 /// The current coarse [`Phase`].
75 pub phase: Phase,
76}
77
78/// A lock-free progress meter shared across threads behind an
79/// [`Arc`](std::sync::Arc).
80///
81/// Every method takes `&self` and uses [`Ordering::Relaxed`] — the counters are
82/// advisory progress, never a synchronization primitive, so relaxed ordering is
83/// both correct and cheap (a couple of atomic ops per file). [`Meter`] is
84/// [`Send`] + [`Sync`] because all of its fields are atomics.
85#[derive(Debug, Default)]
86pub struct Meter {
87 bytes_in: AtomicU64,
88 bytes_out: AtomicU64,
89 objects_done: AtomicU64,
90 objects_total: AtomicU64,
91 objects_skipped: AtomicU64,
92 in_flight: AtomicU64,
93 phase: AtomicU8,
94}
95
96impl Meter {
97 /// Creates a fresh meter with all counters at zero and [`Phase::Idle`].
98 #[must_use]
99 pub fn new() -> Self {
100 Self::default()
101 }
102
103 /// Adds `n` to the bytes-in counter (content read/hashed).
104 pub fn add_in(&self, n: u64) {
105 self.bytes_in.fetch_add(n, Ordering::Relaxed);
106 }
107
108 /// Adds `n` to the bytes-out counter (content written/sent).
109 pub fn add_out(&self, n: u64) {
110 self.bytes_out.fetch_add(n, Ordering::Relaxed);
111 }
112
113 /// Records that an object started: bumps the in-flight gauge by one.
114 pub fn object_started(&self) {
115 self.in_flight.fetch_add(1, Ordering::Relaxed);
116 }
117
118 /// Records that an object finished: drops the in-flight gauge by one and
119 /// bumps the done counter by one. Saturates the gauge at zero so a stray
120 /// finish never underflows.
121 pub fn object_finished(&self) {
122 // Decrement the gauge without underflowing past zero.
123 let mut current = self.in_flight.load(Ordering::Relaxed);
124 while current > 0 {
125 match self.in_flight.compare_exchange_weak(
126 current,
127 current - 1,
128 Ordering::Relaxed,
129 Ordering::Relaxed,
130 ) {
131 Ok(_) => break,
132 Err(observed) => current = observed,
133 }
134 }
135 self.objects_done.fetch_add(1, Ordering::Relaxed);
136 }
137
138 /// Sets the expected total object count.
139 pub fn set_total(&self, n: u64) {
140 self.objects_total.store(n, Ordering::Relaxed);
141 }
142
143 /// Adds `n` to the skipped-objects counter.
144 pub fn add_skipped(&self, n: u64) {
145 self.objects_skipped.fetch_add(n, Ordering::Relaxed);
146 }
147
148 /// Sets the current coarse [`Phase`].
149 pub fn set_phase(&self, p: Phase) {
150 self.phase.store(p.as_u8(), Ordering::Relaxed);
151 }
152
153 /// Reads the current coarse [`Phase`].
154 #[must_use]
155 pub fn phase(&self) -> Phase {
156 Phase::from_u8(self.phase.load(Ordering::Relaxed))
157 }
158
159 /// Takes a point-in-time [`MeterSnapshot`] of every counter.
160 ///
161 /// The loads are independently relaxed, so the snapshot is *eventually*
162 /// consistent rather than a single atomic view — that is fine for an
163 /// advisory progress display.
164 #[must_use]
165 pub fn snapshot(&self) -> MeterSnapshot {
166 MeterSnapshot {
167 bytes_in: self.bytes_in.load(Ordering::Relaxed),
168 bytes_out: self.bytes_out.load(Ordering::Relaxed),
169 objects_done: self.objects_done.load(Ordering::Relaxed),
170 objects_total: self.objects_total.load(Ordering::Relaxed),
171 objects_skipped: self.objects_skipped.load(Ordering::Relaxed),
172 in_flight: self.in_flight.load(Ordering::Relaxed),
173 phase: self.phase(),
174 }
175 }
176}
177
178#[cfg(test)]
179mod tests {
180 use super::*;
181
182 #[test]
183 fn progress_meter_counters_and_snapshot() {
184 let meter = Meter::new();
185 // Default snapshot is all-zero and Idle.
186 let initial = meter.snapshot();
187 assert_eq!(initial.bytes_in, 0);
188 assert_eq!(initial.bytes_out, 0);
189 assert_eq!(initial.objects_done, 0);
190 assert_eq!(initial.objects_total, 0);
191 assert_eq!(initial.objects_skipped, 0);
192 assert_eq!(initial.in_flight, 0);
193 assert_eq!(initial.phase, Phase::Idle);
194
195 meter.add_in(100);
196 meter.add_in(23);
197 meter.add_out(7);
198 meter.set_total(10);
199 meter.add_skipped(2);
200 meter.add_skipped(1);
201 meter.set_phase(Phase::Hashing);
202
203 // One object in flight after a started/finished pair leaves a second
204 // still in flight.
205 meter.object_started();
206 meter.object_started();
207 meter.object_finished();
208
209 let snap = meter.snapshot();
210 assert_eq!(snap.bytes_in, 123);
211 assert_eq!(snap.bytes_out, 7);
212 assert_eq!(snap.objects_done, 1);
213 assert_eq!(snap.objects_total, 10);
214 assert_eq!(snap.objects_skipped, 3);
215 assert_eq!(snap.in_flight, 1);
216 assert_eq!(snap.phase, Phase::Hashing);
217
218 // Phase round-trips through the atomic for every variant.
219 for p in [Phase::Idle, Phase::Hashing, Phase::Transfer] {
220 meter.set_phase(p);
221 assert_eq!(meter.phase(), p);
222 assert_eq!(meter.snapshot().phase, p);
223 }
224 }
225
226 #[test]
227 fn progress_meter_in_flight_gauge() {
228 let meter = Meter::new();
229 meter.object_started();
230 meter.object_started();
231 meter.object_started();
232 meter.object_finished();
233 meter.object_finished();
234
235 let snap = meter.snapshot();
236 assert_eq!(snap.in_flight, 1, "3 started - 2 finished == 1 in flight");
237 assert_eq!(snap.objects_done, 2, "2 finished == 2 done");
238
239 // A finish past zero saturates the gauge instead of underflowing.
240 meter.object_finished();
241 meter.object_finished();
242 let snap = meter.snapshot();
243 assert_eq!(snap.in_flight, 0, "gauge saturates at 0, no underflow");
244 assert_eq!(snap.objects_done, 4);
245 }
246}