congestion/measurement.rs
1//! Hot-path measurement primitives.
2//!
3//! A [`Probe`] brackets a single filesystem or network operation and emits a
4//! [`Sample`] to the currently installed [`SampleSink`] on completion. When
5//! no sink is installed, probes are effectively free: the only cost is two
6//! `Instant::now()` calls plus a single `RwLock::read` that returns `None`.
7//!
8//! The sink is a process-wide singleton installed by the enforcement layer
9//! (typically in tool main functions). Tests can install a
10//! [`testing::CollectingSink`](crate::testing::CollectingSink) to assert that
11//! probes fire as expected.
12
13use crate::controller::{Outcome, Sample};
14
15/// Which side of an operation a probe is on.
16///
17/// Tools like `rcp` and `rcmp` touch two filesystems with different
18/// service-time profiles; we run an independent controller per side so a
19/// saturated source doesn't drag the destination's `cwnd` down or vice
20/// versa. Single-path tools (`rrm`, `filegen`) still partition reads
21/// (`Source`) from writes/mutations (`Destination`) since those have
22/// different latency profiles even on the same filesystem.
23#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
24#[repr(u8)]
25pub enum Side {
26 /// Reads of the source filesystem — directory walks, source-side stats.
27 Source = 0,
28 /// Writes/mutations of the destination filesystem — `create_dir`,
29 /// `hard_link`, `unlink`, `open(O_CREAT)`, etc.
30 Destination = 1,
31}
32
33/// Which metadata syscall is being measured.
34///
35/// Separate ops feed independent controllers because their service-time
36/// distributions differ — `stat` (pure lookup) and `unlink` (mutation
37/// plus parent-directory write) hit different code paths on the
38/// metadata server and converge on very different baselines. Mixing
39/// them in one controller pollutes the per-op latency signal: the
40/// resulting ratio drifts with operation-mix changes that have nothing
41/// to do with congestion (the long-window baseline percentile shifts
42/// as the mix changes, and in cross mode the inter-quantile spread
43/// becomes a function of the mix rather than the load).
44///
45/// The variants are ordered so they index a fixed-size array when paired
46/// with a [`Side`]; see [`N_META_OPS`].
47#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
48#[repr(u8)]
49pub enum MetadataOp {
50 /// `stat` / `lstat` / `symlink_metadata`. Also covers
51 /// `canonicalize` and read-only `File::open` — both are dominated
52 /// by lookup work on the metadata path.
53 Stat = 0,
54 /// `readlink`. Distinct from `Stat` because it pulls the symlink's
55 /// target body, not just the inode header.
56 ReadLink = 1,
57 /// `mkdir` / `create_dir`. Allocates a directory inode and wires
58 /// it into the parent.
59 MkDir = 2,
60 /// `rmdir` / `remove_dir`. Verifies emptiness then removes the dir.
61 RmDir = 3,
62 /// `unlink` / `remove_file`. Decrements link count, frees inode at
63 /// zero.
64 Unlink = 4,
65 /// `link` / `hard_link`. Bumps an existing inode's link count.
66 HardLink = 5,
67 /// `symlink` (creation). Allocates an inode whose body is the
68 /// target path.
69 Symlink = 6,
70 /// Permission / ownership / timestamp updates: `chmod` /
71 /// `set_permissions`, `chown` / `fchownat`, `utimes` / `utimensat`.
72 /// Bucketed together because they're all single inode writes.
73 Chmod = 7,
74 /// `open(O_CREAT)` / `File::create`. Allocates a regular-file
75 /// inode and wires it into the parent.
76 OpenCreate = 8,
77}
78
79/// Number of [`MetadataOp`] variants. Keep in sync when adding variants.
80pub const N_META_OPS: usize = 9;
81
82/// Number of [`Side`] variants.
83pub const N_SIDES: usize = 2;
84
85/// Number of distinct (Side, MetadataOp) controllers.
86pub const N_META_RESOURCES: usize = N_META_OPS * N_SIDES;
87
88impl MetadataOp {
89 /// Short identifier suitable for the progress-bar label and tracing
90 /// `unit` field. Kept as kebab-case so it composes cleanly with side
91 /// prefixes (`src-stat`, `dst-mkdir`, etc.).
92 pub const fn label(self) -> &'static str {
93 match self {
94 Self::Stat => "stat",
95 Self::ReadLink => "read-link",
96 Self::MkDir => "mkdir",
97 Self::RmDir => "rmdir",
98 Self::Unlink => "unlink",
99 Self::HardLink => "hard-link",
100 Self::Symlink => "symlink",
101 Self::Chmod => "chmod",
102 Self::OpenCreate => "open-create",
103 }
104 }
105 /// All op variants, in discriminant order. Useful when wiring up a
106 /// controller for every op kind without having to spell out each.
107 pub const ALL: [Self; N_META_OPS] = [
108 Self::Stat,
109 Self::ReadLink,
110 Self::MkDir,
111 Self::RmDir,
112 Self::Unlink,
113 Self::HardLink,
114 Self::Symlink,
115 Self::Chmod,
116 Self::OpenCreate,
117 ];
118}
119
120impl Side {
121 /// All side variants, in discriminant order.
122 pub const ALL: [Self; N_SIDES] = [Self::Source, Self::Destination];
123}
124
125/// Which resource a probe is measuring.
126///
127/// Separate kinds feed independent controllers in the control loop.
128/// Metadata kinds are split by [`Side`] (filesystems with different
129/// service-time profiles) and by [`MetadataOp`] (syscalls within a
130/// single filesystem that hit different code paths).
131#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
132pub enum ResourceKind {
133 /// Single per-file metadata syscall on the given side.
134 Metadata(Side, MetadataOp),
135 /// Individual read chunks in a copy pipeline. Reserved for future
136 /// data-path controllers; not currently routed to any sink channel.
137 DataRead,
138 /// Individual write chunks in a copy or filegen pipeline. Reserved
139 /// for future data-path controllers.
140 DataWrite,
141}
142
143/// Consumer of completed operation samples.
144///
145/// Implementations must be cheap under a high sample rate since `record` is
146/// called on the hot path. The sink trait is intentionally minimal; richer
147/// behavior (windowing, sharding, routing to multiple controllers) belongs
148/// in the control-loop layer that hosts the sink.
149pub trait SampleSink: Send + Sync {
150 fn record(&self, kind: ResourceKind, sample: &Sample);
151}
152
153static SINK: std::sync::RwLock<Option<std::sync::Arc<dyn SampleSink>>> =
154 std::sync::RwLock::new(None);
155
156/// Install the process-wide [`SampleSink`]. Replaces any prior sink.
157pub fn install_sample_sink(sink: std::sync::Arc<dyn SampleSink>) {
158 *SINK.write().expect("sample sink lock poisoned") = Some(sink);
159}
160
161/// Remove the current sink, if any. After this call, probes are no-ops again.
162pub fn clear_sample_sink() {
163 *SINK.write().expect("sample sink lock poisoned") = None;
164}
165
166fn emit(kind: ResourceKind, sample: &Sample) {
167 // clone the Arc out of the lock before dispatching so a slow sink
168 // does not block `install_sample_sink`/`clear_sample_sink` (which
169 // need the write lock) and so a sink implementation that re-enters
170 // the sink API from within `record` cannot deadlock.
171 let sink = SINK
172 .read()
173 .expect("sample sink lock poisoned")
174 .as_ref()
175 .cloned();
176 if let Some(sink) = sink {
177 sink.record(kind, sample);
178 }
179}
180
181/// A measurement-in-progress for a single operation.
182///
183/// # Lifecycle
184///
185/// ```no_run
186/// use congestion::{MetadataOp, Probe, Side};
187///
188/// # async fn example() {
189/// let probe = Probe::start_metadata(Side::Source, MetadataOp::Stat);
190/// // ... perform the syscall or operation ...
191/// probe.complete_ok(0);
192/// # }
193/// ```
194///
195/// Forgetting to call [`Probe::complete`] / [`Probe::complete_ok`] drops the
196/// probe without recording anything. This is intentional: error paths that
197/// bail out early should not produce misleading latency samples.
198#[must_use = "call Probe::complete_ok or Probe::complete to record the measurement"]
199pub struct Probe {
200 kind: ResourceKind,
201 started_at: std::time::Instant,
202}
203
204impl Probe {
205 /// Begin measuring an operation of the given kind.
206 pub fn start(kind: ResourceKind) -> Self {
207 Self {
208 kind,
209 started_at: std::time::Instant::now(),
210 }
211 }
212 /// Shorthand for `Probe::start(ResourceKind::Metadata(side, op))`.
213 /// Use this to bracket a single per-file metadata syscall.
214 pub fn start_metadata(side: Side, op: MetadataOp) -> Self {
215 Self::start(ResourceKind::Metadata(side, op))
216 }
217 /// Shorthand for `Probe::start(ResourceKind::DataRead)`.
218 pub fn start_read() -> Self {
219 Self::start(ResourceKind::DataRead)
220 }
221 /// Shorthand for `Probe::start(ResourceKind::DataWrite)`.
222 pub fn start_write() -> Self {
223 Self::start(ResourceKind::DataWrite)
224 }
225 /// Complete the probe with outcome [`Outcome::Ok`] and the given byte
226 /// count (use `0` for metadata ops).
227 pub fn complete_ok(self, bytes: u64) {
228 self.complete(bytes, Outcome::Ok);
229 }
230 /// Complete the probe with an explicit outcome.
231 pub fn complete(self, bytes: u64, outcome: Outcome) {
232 emit(
233 self.kind,
234 &Sample {
235 started_at: self.started_at,
236 completed_at: std::time::Instant::now(),
237 bytes,
238 outcome,
239 },
240 );
241 }
242 /// Drop the probe without recording a sample.
243 pub fn discard(self) {}
244}
245
246/// Test-only mutex that serializes access to the process-wide `SampleSink`
247/// global across every test that touches it — including tests in sibling
248/// modules (see `control_loop::tests`) and downstream integration tests
249/// that install their own sinks. Nextest isolates per-process so races are
250/// only observable under `cargo test`'s threaded runner, but we guard
251/// regardless to keep both runners reliable.
252///
253/// Uses [`tokio::sync::Mutex`] so `#[tokio::test]` bodies can hold the guard
254/// across await points without tripping clippy's `await_holding_lock`.
255#[cfg(test)]
256pub(crate) static SINK_GUARD: tokio::sync::Mutex<()> = tokio::sync::Mutex::const_new(());
257
258#[cfg(test)]
259mod tests {
260 use super::*;
261 use crate::testing::CollectingSink;
262
263 #[test]
264 fn probe_without_sink_is_a_no_op() {
265 let _guard = SINK_GUARD.blocking_lock();
266 clear_sample_sink();
267 let probe = Probe::start_metadata(Side::Source, MetadataOp::Stat);
268 probe.complete_ok(0);
269 }
270
271 #[test]
272 fn probe_records_metadata_samples_to_installed_sink() {
273 let _guard = SINK_GUARD.blocking_lock();
274 let sink = std::sync::Arc::new(CollectingSink::new());
275 install_sample_sink(sink.clone());
276 Probe::start_metadata(Side::Source, MetadataOp::Stat).complete_ok(0);
277 Probe::start_metadata(Side::Source, MetadataOp::Stat).complete_ok(0);
278 assert_eq!(sink.metadata_count(), 2);
279 clear_sample_sink();
280 }
281
282 #[test]
283 fn probe_separates_resource_kinds() {
284 let _guard = SINK_GUARD.blocking_lock();
285 let sink = std::sync::Arc::new(CollectingSink::new());
286 install_sample_sink(sink.clone());
287 Probe::start_metadata(Side::Source, MetadataOp::Stat).complete_ok(0);
288 Probe::start_read().complete_ok(4096);
289 Probe::start_write().complete_ok(8192);
290 assert_eq!(sink.metadata_count(), 1);
291 assert_eq!(sink.read_count(), 1);
292 assert_eq!(sink.write_count(), 1);
293 clear_sample_sink();
294 }
295
296 #[test]
297 fn sample_latency_reflects_wall_time() {
298 let _guard = SINK_GUARD.blocking_lock();
299 let sink = std::sync::Arc::new(CollectingSink::new());
300 install_sample_sink(sink.clone());
301 let probe = Probe::start_metadata(Side::Source, MetadataOp::Stat);
302 std::thread::sleep(std::time::Duration::from_millis(5));
303 probe.complete_ok(0);
304 let samples = sink.metadata_samples();
305 assert_eq!(samples.len(), 1);
306 assert!(samples[0].latency() >= std::time::Duration::from_millis(5));
307 clear_sample_sink();
308 }
309
310 #[test]
311 fn discarded_probe_produces_no_sample() {
312 let _guard = SINK_GUARD.blocking_lock();
313 let sink = std::sync::Arc::new(CollectingSink::new());
314 install_sample_sink(sink.clone());
315 Probe::start_metadata(Side::Source, MetadataOp::Stat).discard();
316 assert_eq!(sink.metadata_count(), 0);
317 clear_sample_sink();
318 }
319
320 #[test]
321 fn probe_dropped_without_complete_produces_no_sample() {
322 // matches the behavior of an early-return in an error path: the
323 // probe is simply dropped and no sample is emitted, so a failed
324 // syscall doesn't pollute the controller's latency signal.
325 let _guard = SINK_GUARD.blocking_lock();
326 let sink = std::sync::Arc::new(CollectingSink::new());
327 install_sample_sink(sink.clone());
328 {
329 let _probe = Probe::start_metadata(Side::Source, MetadataOp::Stat);
330 // _probe falls out of scope here without complete or discard
331 }
332 assert_eq!(sink.metadata_count(), 0);
333 clear_sample_sink();
334 }
335
336 #[test]
337 fn installing_a_new_sink_replaces_the_old_one() {
338 let _guard = SINK_GUARD.blocking_lock();
339 let first = std::sync::Arc::new(CollectingSink::new());
340 install_sample_sink(first.clone());
341 Probe::start_metadata(Side::Source, MetadataOp::Stat).complete_ok(0);
342 let second = std::sync::Arc::new(CollectingSink::new());
343 install_sample_sink(second.clone());
344 Probe::start_metadata(Side::Source, MetadataOp::Stat).complete_ok(0);
345 Probe::start_metadata(Side::Source, MetadataOp::Stat).complete_ok(0);
346 assert_eq!(first.metadata_count(), 1);
347 assert_eq!(second.metadata_count(), 2);
348 clear_sample_sink();
349 }
350}