Skip to main content

congestion/
measurement.rs

1//! Hot-path measurement primitives.
2//!
3//! A [`Probe`] brackets a single filesystem or network operation and emits a
4//! [`Sample`] to the currently installed [`SampleSink`] on completion. When
5//! no sink is installed, probes are effectively free: the only cost is two
6//! `Instant::now()` calls plus a single `RwLock::read` that returns `None`.
7//!
8//! The sink is a process-wide singleton installed by the enforcement layer
9//! (typically in tool main functions). Tests can install a
10//! [`testing::CollectingSink`](crate::testing::CollectingSink) to assert that
11//! probes fire as expected.
12
13use crate::controller::{Outcome, Sample};
14
15/// Which side of an operation a probe is on.
16///
17/// Tools like `rcp` and `rcmp` touch two filesystems with different
18/// service-time profiles; we run an independent controller per side so a
19/// saturated source doesn't drag the destination's `cwnd` down or vice
20/// versa. Single-path tools (`rrm`, `filegen`) still partition reads
21/// (`Source`) from writes/mutations (`Destination`) since those have
22/// different latency profiles even on the same filesystem.
23#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
24#[repr(u8)]
25pub enum Side {
26    /// Reads of the source filesystem — directory walks, source-side stats.
27    Source = 0,
28    /// Writes/mutations of the destination filesystem — `create_dir`,
29    /// `hard_link`, `unlink`, `open(O_CREAT)`, etc.
30    Destination = 1,
31}
32
33/// Which metadata syscall is being measured.
34///
35/// Separate ops feed independent controllers because their service-time
36/// distributions differ — `stat` (pure lookup) and `unlink` (mutation
37/// plus parent-directory write) hit different code paths on the
38/// metadata server and converge on very different baselines. Mixing
39/// them in one controller pollutes the per-op latency signal: the
40/// resulting ratio drifts with operation-mix changes that have nothing
41/// to do with congestion (the long-window baseline percentile shifts
42/// as the mix changes, and in cross mode the inter-quantile spread
43/// becomes a function of the mix rather than the load).
44///
45/// The variants are ordered so they index a fixed-size array when paired
46/// with a [`Side`]; see [`N_META_OPS`].
47#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
48#[repr(u8)]
49pub enum MetadataOp {
50    /// `stat` / `lstat` / `symlink_metadata`. Also covers
51    /// `canonicalize` and read-only `File::open` — both are dominated
52    /// by lookup work on the metadata path.
53    Stat = 0,
54    /// `readlink`. Distinct from `Stat` because it pulls the symlink's
55    /// target body, not just the inode header.
56    ReadLink = 1,
57    /// `mkdir` / `create_dir`. Allocates a directory inode and wires
58    /// it into the parent.
59    MkDir = 2,
60    /// `rmdir` / `remove_dir`. Verifies emptiness then removes the dir.
61    RmDir = 3,
62    /// `unlink` / `remove_file`. Decrements link count, frees inode at
63    /// zero.
64    Unlink = 4,
65    /// `link` / `hard_link`. Bumps an existing inode's link count.
66    HardLink = 5,
67    /// `symlink` (creation). Allocates an inode whose body is the
68    /// target path.
69    Symlink = 6,
70    /// Permission / ownership / timestamp updates: `chmod` /
71    /// `set_permissions`, `chown` / `fchownat`, `utimes` / `utimensat`.
72    /// Bucketed together because they're all single inode writes.
73    Chmod = 7,
74    /// `open(O_CREAT)` / `File::create`. Allocates a regular-file
75    /// inode and wires it into the parent.
76    OpenCreate = 8,
77}
78
79/// Number of [`MetadataOp`] variants. Keep in sync when adding variants.
80pub const N_META_OPS: usize = 9;
81
82/// Number of [`Side`] variants.
83pub const N_SIDES: usize = 2;
84
85/// Number of distinct (Side, MetadataOp) controllers.
86pub const N_META_RESOURCES: usize = N_META_OPS * N_SIDES;
87
88impl MetadataOp {
89    /// Short identifier suitable for the progress-bar label and tracing
90    /// `unit` field. Kept as kebab-case so it composes cleanly with side
91    /// prefixes (`src-stat`, `dst-mkdir`, etc.).
92    pub const fn label(self) -> &'static str {
93        match self {
94            Self::Stat => "stat",
95            Self::ReadLink => "read-link",
96            Self::MkDir => "mkdir",
97            Self::RmDir => "rmdir",
98            Self::Unlink => "unlink",
99            Self::HardLink => "hard-link",
100            Self::Symlink => "symlink",
101            Self::Chmod => "chmod",
102            Self::OpenCreate => "open-create",
103        }
104    }
105    /// All op variants, in discriminant order. Useful when wiring up a
106    /// controller for every op kind without having to spell out each.
107    pub const ALL: [Self; N_META_OPS] = [
108        Self::Stat,
109        Self::ReadLink,
110        Self::MkDir,
111        Self::RmDir,
112        Self::Unlink,
113        Self::HardLink,
114        Self::Symlink,
115        Self::Chmod,
116        Self::OpenCreate,
117    ];
118}
119
120impl Side {
121    /// All side variants, in discriminant order.
122    pub const ALL: [Self; N_SIDES] = [Self::Source, Self::Destination];
123}
124
125/// Which resource a probe is measuring.
126///
127/// Separate kinds feed independent controllers in the control loop.
128/// Metadata kinds are split by [`Side`] (filesystems with different
129/// service-time profiles) and by [`MetadataOp`] (syscalls within a
130/// single filesystem that hit different code paths).
131#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
132pub enum ResourceKind {
133    /// Single per-file metadata syscall on the given side.
134    Metadata(Side, MetadataOp),
135    /// Individual read chunks in a copy pipeline. Reserved for future
136    /// data-path controllers; not currently routed to any sink channel.
137    DataRead,
138    /// Individual write chunks in a copy or filegen pipeline. Reserved
139    /// for future data-path controllers.
140    DataWrite,
141}
142
143/// Consumer of completed operation samples.
144///
145/// Implementations must be cheap under a high sample rate since `record` is
146/// called on the hot path. The sink trait is intentionally minimal; richer
147/// behavior (windowing, sharding, routing to multiple controllers) belongs
148/// in the control-loop layer that hosts the sink.
149pub trait SampleSink: Send + Sync {
150    fn record(&self, kind: ResourceKind, sample: &Sample);
151}
152
153static SINK: std::sync::RwLock<Option<std::sync::Arc<dyn SampleSink>>> =
154    std::sync::RwLock::new(None);
155
156/// Install the process-wide [`SampleSink`]. Replaces any prior sink.
157pub fn install_sample_sink(sink: std::sync::Arc<dyn SampleSink>) {
158    *SINK.write().expect("sample sink lock poisoned") = Some(sink);
159}
160
161/// Remove the current sink, if any. After this call, probes are no-ops again.
162pub fn clear_sample_sink() {
163    *SINK.write().expect("sample sink lock poisoned") = None;
164}
165
166fn emit(kind: ResourceKind, sample: &Sample) {
167    // clone the Arc out of the lock before dispatching so a slow sink
168    // does not block `install_sample_sink`/`clear_sample_sink` (which
169    // need the write lock) and so a sink implementation that re-enters
170    // the sink API from within `record` cannot deadlock.
171    let sink = SINK
172        .read()
173        .expect("sample sink lock poisoned")
174        .as_ref()
175        .cloned();
176    if let Some(sink) = sink {
177        sink.record(kind, sample);
178    }
179}
180
181/// A measurement-in-progress for a single operation.
182///
183/// # Lifecycle
184///
185/// ```no_run
186/// use congestion::{MetadataOp, Probe, Side};
187///
188/// # async fn example() {
189/// let probe = Probe::start_metadata(Side::Source, MetadataOp::Stat);
190/// // ... perform the syscall or operation ...
191/// probe.complete_ok(0);
192/// # }
193/// ```
194///
195/// Forgetting to call [`Probe::complete`] / [`Probe::complete_ok`] drops the
196/// probe without recording anything. This is intentional: error paths that
197/// bail out early should not produce misleading latency samples.
198#[must_use = "call Probe::complete_ok or Probe::complete to record the measurement"]
199pub struct Probe {
200    kind: ResourceKind,
201    started_at: std::time::Instant,
202}
203
204impl Probe {
205    /// Begin measuring an operation of the given kind.
206    pub fn start(kind: ResourceKind) -> Self {
207        Self {
208            kind,
209            started_at: std::time::Instant::now(),
210        }
211    }
212    /// Shorthand for `Probe::start(ResourceKind::Metadata(side, op))`.
213    /// Use this to bracket a single per-file metadata syscall.
214    pub fn start_metadata(side: Side, op: MetadataOp) -> Self {
215        Self::start(ResourceKind::Metadata(side, op))
216    }
217    /// Shorthand for `Probe::start(ResourceKind::DataRead)`.
218    pub fn start_read() -> Self {
219        Self::start(ResourceKind::DataRead)
220    }
221    /// Shorthand for `Probe::start(ResourceKind::DataWrite)`.
222    pub fn start_write() -> Self {
223        Self::start(ResourceKind::DataWrite)
224    }
225    /// Complete the probe with outcome [`Outcome::Ok`] and the given byte
226    /// count (use `0` for metadata ops).
227    pub fn complete_ok(self, bytes: u64) {
228        self.complete(bytes, Outcome::Ok);
229    }
230    /// Complete the probe with an explicit outcome.
231    pub fn complete(self, bytes: u64, outcome: Outcome) {
232        emit(
233            self.kind,
234            &Sample {
235                started_at: self.started_at,
236                completed_at: std::time::Instant::now(),
237                bytes,
238                outcome,
239            },
240        );
241    }
242    /// Drop the probe without recording a sample.
243    pub fn discard(self) {}
244}
245
246/// Test-only mutex that serializes access to the process-wide `SampleSink`
247/// global across every test that touches it — including tests in sibling
248/// modules (see `control_loop::tests`) and downstream integration tests
249/// that install their own sinks. Nextest isolates per-process so races are
250/// only observable under `cargo test`'s threaded runner, but we guard
251/// regardless to keep both runners reliable.
252///
253/// Uses [`tokio::sync::Mutex`] so `#[tokio::test]` bodies can hold the guard
254/// across await points without tripping clippy's `await_holding_lock`.
255#[cfg(test)]
256pub(crate) static SINK_GUARD: tokio::sync::Mutex<()> = tokio::sync::Mutex::const_new(());
257
258#[cfg(test)]
259mod tests {
260    use super::*;
261    use crate::testing::CollectingSink;
262
263    #[test]
264    fn probe_without_sink_is_a_no_op() {
265        let _guard = SINK_GUARD.blocking_lock();
266        clear_sample_sink();
267        let probe = Probe::start_metadata(Side::Source, MetadataOp::Stat);
268        probe.complete_ok(0);
269    }
270
271    #[test]
272    fn probe_records_metadata_samples_to_installed_sink() {
273        let _guard = SINK_GUARD.blocking_lock();
274        let sink = std::sync::Arc::new(CollectingSink::new());
275        install_sample_sink(sink.clone());
276        Probe::start_metadata(Side::Source, MetadataOp::Stat).complete_ok(0);
277        Probe::start_metadata(Side::Source, MetadataOp::Stat).complete_ok(0);
278        assert_eq!(sink.metadata_count(), 2);
279        clear_sample_sink();
280    }
281
282    #[test]
283    fn probe_separates_resource_kinds() {
284        let _guard = SINK_GUARD.blocking_lock();
285        let sink = std::sync::Arc::new(CollectingSink::new());
286        install_sample_sink(sink.clone());
287        Probe::start_metadata(Side::Source, MetadataOp::Stat).complete_ok(0);
288        Probe::start_read().complete_ok(4096);
289        Probe::start_write().complete_ok(8192);
290        assert_eq!(sink.metadata_count(), 1);
291        assert_eq!(sink.read_count(), 1);
292        assert_eq!(sink.write_count(), 1);
293        clear_sample_sink();
294    }
295
296    #[test]
297    fn sample_latency_reflects_wall_time() {
298        let _guard = SINK_GUARD.blocking_lock();
299        let sink = std::sync::Arc::new(CollectingSink::new());
300        install_sample_sink(sink.clone());
301        let probe = Probe::start_metadata(Side::Source, MetadataOp::Stat);
302        std::thread::sleep(std::time::Duration::from_millis(5));
303        probe.complete_ok(0);
304        let samples = sink.metadata_samples();
305        assert_eq!(samples.len(), 1);
306        assert!(samples[0].latency() >= std::time::Duration::from_millis(5));
307        clear_sample_sink();
308    }
309
310    #[test]
311    fn discarded_probe_produces_no_sample() {
312        let _guard = SINK_GUARD.blocking_lock();
313        let sink = std::sync::Arc::new(CollectingSink::new());
314        install_sample_sink(sink.clone());
315        Probe::start_metadata(Side::Source, MetadataOp::Stat).discard();
316        assert_eq!(sink.metadata_count(), 0);
317        clear_sample_sink();
318    }
319
320    #[test]
321    fn probe_dropped_without_complete_produces_no_sample() {
322        // matches the behavior of an early-return in an error path: the
323        // probe is simply dropped and no sample is emitted, so a failed
324        // syscall doesn't pollute the controller's latency signal.
325        let _guard = SINK_GUARD.blocking_lock();
326        let sink = std::sync::Arc::new(CollectingSink::new());
327        install_sample_sink(sink.clone());
328        {
329            let _probe = Probe::start_metadata(Side::Source, MetadataOp::Stat);
330            // _probe falls out of scope here without complete or discard
331        }
332        assert_eq!(sink.metadata_count(), 0);
333        clear_sample_sink();
334    }
335
336    #[test]
337    fn installing_a_new_sink_replaces_the_old_one() {
338        let _guard = SINK_GUARD.blocking_lock();
339        let first = std::sync::Arc::new(CollectingSink::new());
340        install_sample_sink(first.clone());
341        Probe::start_metadata(Side::Source, MetadataOp::Stat).complete_ok(0);
342        let second = std::sync::Arc::new(CollectingSink::new());
343        install_sample_sink(second.clone());
344        Probe::start_metadata(Side::Source, MetadataOp::Stat).complete_ok(0);
345        Probe::start_metadata(Side::Source, MetadataOp::Stat).complete_ok(0);
346        assert_eq!(first.metadata_count(), 1);
347        assert_eq!(second.metadata_count(), 2);
348        clear_sample_sink();
349    }
350}