Skip to main content

common/
walk.rs

1//! Shared primitives for directory-walking operations (copy, link, rm).
2//!
3//! [`EntryKind`] classifies a directory entry by file type, and exposes the
4//! per-type bits (dry-run label, skipped-counter increment) so callers don't
5//! re-implement the dispatch.
6//!
7//! [`next_entry_probed`] wraps `tokio::fs::ReadDir::next_entry` (plus the
8//! follow-up `file_type()` lookup) with the static ops rate gate so
9//! copy/link/rm share a single source of truth for walk iteration. The
10//! walk path is deliberately not congestion-probed — see the function's
11//! own docs for why.
12
13use crate::filter::{FilterResult, FilterSettings};
14use crate::progress::Progress;
15use anyhow::Context;
16
17/// Classification of a filesystem entry by type.
18///
19/// `Special` covers sockets, FIFOs, block/character devices — anything that
20/// isn't a regular file, directory, or symlink. When a caller has only a best
21/// effort `Option<FileType>` (e.g. `entry.file_type().await.ok()`), an unknown
22/// type is treated as `File` to match historical behavior.
23#[derive(Debug, Clone, Copy, PartialEq, Eq)]
24pub enum EntryKind {
25    File,
26    Dir,
27    Symlink,
28    Special,
29}
30
31impl EntryKind {
32    /// Classify from a `Metadata` (root-level entries, where we always have full metadata).
33    #[must_use]
34    pub fn from_metadata(metadata: &std::fs::Metadata) -> Self {
35        if metadata.is_dir() {
36            Self::Dir
37        } else if metadata.is_symlink() {
38            Self::Symlink
39        } else if metadata.is_file() {
40            Self::File
41        } else {
42            Self::Special
43        }
44    }
45    /// Classify from an `Option<FileType>`. Unknown types (`None`) are treated
46    /// as `File` to match historical behavior across copy/link/rm: when
47    /// `entry.file_type()` fails, callers proceed as if the entry were a
48    /// regular file.
49    #[must_use]
50    pub fn from_file_type(file_type: Option<&std::fs::FileType>) -> Self {
51        match file_type {
52            Some(ft) if ft.is_dir() => Self::Dir,
53            Some(ft) if ft.is_symlink() => Self::Symlink,
54            Some(ft) if ft.is_file() => Self::File,
55            Some(_) => Self::Special,
56            None => Self::File,
57        }
58    }
59    /// Short dry-run label used during directory iteration (`"dir"`, `"symlink"`, `"file"`).
60    /// `Special` maps to `"file"` to match historical behavior — the old bool-triplet
61    /// dispatch in copy/link/rm fell through `is_dir`/`is_symlink` to "file" for any
62    /// other type. The explicit `--skip-specials` path uses its own literal "special"
63    /// string and does not call this helper.
64    #[must_use]
65    pub fn label(self) -> &'static str {
66        match self {
67            Self::Dir => "dir",
68            Self::Symlink => "symlink",
69            Self::File | Self::Special => "file",
70        }
71    }
72    /// Long dry-run label used at the root level (`"directory"` instead of `"dir"`).
73    /// `Special` maps to `"file"` for the same reason as [`Self::label`].
74    #[must_use]
75    pub fn label_long(self) -> &'static str {
76        match self {
77            Self::Dir => "directory",
78            Self::Symlink => "symlink",
79            Self::File | Self::Special => "file",
80        }
81    }
82    /// Increment the skipped counter that matches this entry kind. Special
83    /// files count as `files_skipped` — `specials_skipped` is reserved for
84    /// the explicit `--skip-specials` path, not filter skips.
85    pub fn inc_skipped(self, prog: &Progress) {
86        match self {
87            Self::Dir => prog.directories_skipped.inc(),
88            Self::Symlink => prog.symlinks_skipped.inc(),
89            Self::File | Self::Special => prog.files_skipped.inc(),
90        }
91    }
92}
93
94/// Resolve the `throttle::Side` from the matching `congestion::Side`.
95///
96/// The two crates carry independent enum definitions to keep `throttle`
97/// free of any congestion dependency; this is the canonical bridge
98/// (paired with [`throttle_op`]) reused everywhere a congestion-side
99/// signal needs to address a throttle resource.
100pub(crate) fn throttle_side(side: congestion::Side) -> throttle::Side {
101    match side {
102        congestion::Side::Source => throttle::Side::Source,
103        congestion::Side::Destination => throttle::Side::Destination,
104    }
105}
106
107/// Resolve the `throttle::MetadataOp` from the matching `congestion::MetadataOp`.
108pub(crate) fn throttle_op(op: congestion::MetadataOp) -> throttle::MetadataOp {
109    match op {
110        congestion::MetadataOp::Stat => throttle::MetadataOp::Stat,
111        congestion::MetadataOp::ReadLink => throttle::MetadataOp::ReadLink,
112        congestion::MetadataOp::MkDir => throttle::MetadataOp::MkDir,
113        congestion::MetadataOp::RmDir => throttle::MetadataOp::RmDir,
114        congestion::MetadataOp::Unlink => throttle::MetadataOp::Unlink,
115        congestion::MetadataOp::HardLink => throttle::MetadataOp::HardLink,
116        congestion::MetadataOp::Symlink => throttle::MetadataOp::Symlink,
117        congestion::MetadataOp::Chmod => throttle::MetadataOp::Chmod,
118        congestion::MetadataOp::OpenCreate => throttle::MetadataOp::OpenCreate,
119    }
120}
121
122/// Resolve the [`throttle::Resource`] for a single per-file metadata
123/// syscall on the given side.
124pub(crate) fn meta_resource(
125    side: congestion::Side,
126    op: congestion::MetadataOp,
127) -> throttle::Resource {
128    throttle::Resource::meta(throttle_side(side), throttle_op(op))
129}
130
131/// Pull the next directory entry, gated only by the static ops rate
132/// gate.
133///
134/// Walks are deliberately not probed: `tokio::fs::ReadDir::next_entry`
135/// returns buffered entries from a prior `getdents` batch without
136/// entering the kernel, so most "walk probes" don't measure filesystem
137/// service time at all. The resulting bimodal latency distribution
138/// (cache hit vs. real `getdents`) collapses any baseline a controller
139/// could derive from it. The fix is to probe only the per-file
140/// metadata syscalls that follow the walk, where each sample reflects
141/// real filesystem work.
142///
143/// The prologue is therefore just:
144///
145/// 1. Await the static ops rate gate.
146/// 2. Call `next_entry()` and, on success, classify via `file_type()`.
147///
148/// `side` is currently unused at runtime but kept on the signature so
149/// callers stay self-documenting and future per-side gating can be
150/// reintroduced without touching every call site.
151///
152/// The error is left as `anyhow::Error` so each caller can wrap it in the
153/// site-specific error type (`copy::Error`, `link::Error`, `rm::Error`)
154/// without this helper needing to be generic over the summary payload.
155pub async fn next_entry_probed<F>(
156    entries: &mut tokio::fs::ReadDir,
157    _side: congestion::Side,
158    context: F,
159) -> anyhow::Result<Option<(tokio::fs::DirEntry, Option<std::fs::FileType>)>>
160where
161    F: FnOnce() -> String,
162{
163    throttle::get_ops_token().await;
164    let maybe_entry = entries.next_entry().await.with_context(context)?;
165    let Some(entry) = maybe_entry else {
166        return Ok(None);
167    };
168    let entry_file_type = entry.file_type().await.ok();
169    Ok(Some((entry, entry_file_type)))
170}
171
172/// Bracket a single metadata-producing future with the full per-op
173/// gating prologue: the static ops rate gate, the cwnd permit for the
174/// matching `(side, op_kind)` resource, and a congestion probe. The
175/// probe completes successfully when `fut` returns `Ok`, and is
176/// discarded on error so error paths don't skew the controller's
177/// latency baseline.
178///
179/// `op_kind` selects which per-syscall controller this call is
180/// reported to and gated by — `Stat`, `MkDir`, `Unlink`, etc. Pick the
181/// variant that matches the underlying syscall (`metadata` /
182/// `symlink_metadata` / `File::open(read)` / `canonicalize` all map to
183/// `Stat`; `create_dir` to `MkDir`; `remove_file` to `Unlink`; and so
184/// on — see [`congestion::MetadataOp`] for the full mapping).
185///
186/// `--ops-throttle` is the shared metadata rate gate, so this helper
187/// acquires it on every call — same as [`next_entry_probed`]. Callers
188/// that already rate-gate upstream (such as filegen, which gates at
189/// per-task spawn time so we don't fan out an unbounded task queue
190/// before any token is consumed) must use
191/// [`run_metadata_probed_no_rate`] instead to avoid double-counting.
192pub async fn run_metadata_probed<F, T, E>(
193    side: congestion::Side,
194    op_kind: congestion::MetadataOp,
195    fut: F,
196) -> Result<T, E>
197where
198    F: std::future::Future<Output = Result<T, E>>,
199{
200    throttle::get_ops_token().await;
201    run_metadata_probed_no_rate(side, op_kind, fut).await
202}
203
204/// Variant of [`run_metadata_probed`] that skips the static ops rate
205/// gate — for callers that already rate-limit at a coarser granularity
206/// upstream and would otherwise consume two tokens per metadata op.
207///
208/// Concretely: `filegen` gates the rate at task-spawn time so the
209/// number of in-flight `write_file` futures stays bounded by the rate.
210/// The `OpenOptions::open(O_CREAT)` inside the spawned task is the only
211/// metadata syscall in that path; rate-gating it again would halve the
212/// effective rate.
213pub async fn run_metadata_probed_no_rate<F, T, E>(
214    side: congestion::Side,
215    op_kind: congestion::MetadataOp,
216    fut: F,
217) -> Result<T, E>
218where
219    F: std::future::Future<Output = Result<T, E>>,
220{
221    let ops_permit = throttle::ops_in_flight_permit(meta_resource(side, op_kind)).await;
222    let probe = congestion::Probe::start_metadata(side, op_kind);
223    let result = fut.await;
224    match &result {
225        Ok(_) => probe.complete_ok(0),
226        Err(_) => probe.discard(),
227    }
228    drop(ops_permit);
229    result
230}
231
232/// Decide whether an entry should be skipped by the filter, returning the
233/// `FilterResult` that caused the skip. Returns `None` if there is no filter
234/// or the entry is included.
235#[must_use]
236pub fn should_skip_entry(
237    filter: &Option<FilterSettings>,
238    relative_path: &std::path::Path,
239    is_dir: bool,
240) -> Option<FilterResult> {
241    if let Some(f) = filter {
242        let result = f.should_include(relative_path, is_dir);
243        match result {
244            FilterResult::Included => None,
245            _ => Some(result),
246        }
247    } else {
248        None
249    }
250}
251
252/// Path of `entry` relative to `root` (typically `source_root` or `dest_root` at the call
253/// site), with the `unwrap_or(entry)` defensive fallback rcp uses when `entry` isn't
254/// actually under `root`. Naming the pattern lets call sites read "the entry's path inside
255/// the tree" instead of `entry.strip_prefix(root).unwrap_or(entry)` — and removes a class
256/// of "did I get strip_prefix the right way round?" regressions.
257///
258/// Use with `filter_base.join(...)` for a logical filter path inside a delegated subtree
259/// (`copy_with_filter_base`'s non-empty `filter_base` case), or on its own when filter_base
260/// is empty.
261#[must_use]
262pub fn relative_to_root<'a>(
263    entry: &'a std::path::Path,
264    root: &std::path::Path,
265) -> &'a std::path::Path {
266    entry.strip_prefix(root).unwrap_or(entry)
267}