Skip to main content

cargo_ff/
lib.rs

1pub mod types;
2
3mod cache;
4mod coalesce;
5mod discover;
6mod dispatch;
7mod exec;
8mod report;
9mod size;
10mod style;
11
12#[cfg(feature = "cli")]
13pub mod cli;
14
15pub use types::{Config, Edition, Error, FileFailure, Report, Result};
16
17/// Internal test surface — **not** part of the public API despite being `pub`.
18///
19/// Rust integration tests compile as a separate crate, so exercising a private
20/// module like [`discover`](crate::discover) from `tests/` needs a `pub`
21/// escape hatch. The `__` prefix plus `#[doc(hidden)]` keep it out of the docs
22/// and signal "do not use": it carries no stability guarantee and may change
23/// or disappear in any release. This is the conventional idiom for
24/// integration-testing crate internals.
25#[doc(hidden)]
26pub mod __test_only {
27    use crate::types::{Config, CrateUnit, Result};
28    use crossbeam_channel::Sender;
29
30    pub fn discover_run(cfg: &Config, tx: &Sender<CrateUnit>) -> Result<()> {
31        crate::discover::run(cfg, tx).map(drop)
32    }
33}
34
35use crossbeam_channel::bounded;
36use std::sync::Arc;
37use std::thread::{self, JoinHandle};
38
39/// Format every selected crate in the workspace, dispatching rustfmt across a
40/// pool of worker threads, and return the aggregated [`Report`].
41///
42/// # Errors
43///
44/// Returns [`Error`] when `cargo metadata` fails, a requested package or
45/// edition is unknown, `cfg.workers` is `Some(0)`, or a producer/worker thread
46/// panics.
47pub fn run(cfg: &Config) -> Result<Report> {
48    if matches!(cfg.workers, Some(0)) {
49        return Err(Error::InvalidWorkers(0));
50    }
51
52    if cfg.warnings {
53        warn_if_stable_rustfmt();
54    }
55
56    let n = cfg
57        .workers
58        .or_else(|| thread::available_parallelism().ok().map(std::num::NonZeroUsize::get))
59        .unwrap_or(1);
60    let cap = cfg.channel_capacity.unwrap_or(512);
61    let batch_size = cfg.batch_size.unwrap_or(3);
62    // Same cutoff used by the size proxy: any crate the proxy clamped
63    // to `HUGE_CUTOFF_BYTES` is by definition the threshold or above,
64    // so the comparison `>= HUGE_CUTOFF_BYTES` exactly catches them.
65    let solo_threshold = size::HUGE_CUTOFF_BYTES;
66
67    let (unit_tx, unit_rx) = bounded::<types::CrateUnit>(cap);
68    let queue = Arc::new(dispatch::PriorityQueue::new());
69    let (result_tx, result_rx) = bounded::<types::BatchResult>(cap);
70
71    let cfg_d = cfg.clone();
72    // Each thread owns its channel endpoint; dropping it when the closure
73    // exits is what propagates "no more work" down the pipeline.
74    let producer = thread::spawn(move || discover::run(&cfg_d, &unit_tx));
75
76    // Coalescer pushes batches into `queue`; closes it on exit so
77    // workers' `pop()` returns `None` once everything is drained.
78    let coalescer_q = queue.clone();
79    let coalescer = thread::spawn(move || {
80        coalesce::run(
81            &unit_rx,
82            &coalescer_q,
83            batch_size,
84            coalesce::DEFAULT_PACK_MULTIPLIER,
85            solo_threshold,
86        );
87        coalescer_q.close();
88    });
89
90    let mut workers = Vec::with_capacity(n);
91    for _ in 0..n {
92        let q = queue.clone();
93        let tx = result_tx.clone();
94        let cfg_w = cfg.clone();
95        workers.push(thread::spawn(move || exec::worker(&q, &tx, &cfg_w)));
96    }
97    drop(result_tx);
98
99    let report = report::aggregate(result_rx);
100
101    let cache_opt = join_fallible(producer, "producer")?;
102    join_void(coalescer, "coalescer")?;
103    for w in workers {
104        join_void(w, "worker")?;
105    }
106
107    // Commit the skip cache. Drop pending entries for crates that had
108    // any --check failure so future runs re-fingerprint and surface them.
109    if let Some(mut cache) = cache_opt {
110        for f in &report.failures {
111            cache.invalidate(&f.manifest_dir);
112        }
113        let _ = cache.commit_and_save();
114    }
115
116    Ok(report)
117}
118
119fn join_fallible<T>(h: JoinHandle<Result<T>>, name: &'static str) -> Result<T> {
120    h.join().map_err(|_| Error::ThreadPanicked(name))?
121}
122
123fn join_void(h: JoinHandle<()>, name: &'static str) -> Result<()> {
124    h.join().map_err(|_| Error::ThreadPanicked(name))
125}
126
127/// `rustfmt --version` prints e.g. `rustfmt 1.8.0-nightly (…)`. If the
128/// `nightly` marker is absent, unstable `rustfmt.toml` options are silently
129/// dropped and output diverges from `cargo +nightly fmt`.
130fn warn_if_stable_rustfmt() {
131    use std::fmt::Write as _;
132    use std::io::Write as _;
133    let Ok(out) = std::process::Command::new("rustfmt")
134        .arg("--version")
135        .output()
136    else {
137        return;
138    };
139    if !out.status.success() {
140        return;
141    }
142    let version = String::from_utf8_lossy(&out.stdout);
143    if version.contains("nightly") {
144        return;
145    }
146    let palette = style::palette();
147    let (w, wr) = (palette.warning.render(), palette.warning.render_reset());
148    let (n, nr) = (palette.note.render(), palette.note.render_reset());
149    let (h, hr) = (palette.help.render(), palette.help.render_reset());
150    let mut buf = String::new();
151    let _ = writeln!(buf, "{w}warning{wr}: rustfmt on PATH is the stable channel");
152    let _ = writeln!(buf, "   {n}note{nr}: `{}`", version.trim());
153    let _ = writeln!(
154        buf,
155        "   {n}note{nr}: unstable rustfmt.toml options will be silently ignored"
156    );
157    let _ = writeln!(
158        buf,
159        "   {h}help{hr}: run via `cargo +nightly ff` for parity with `cargo +nightly fmt`"
160    );
161    buf.push('\n');
162    let _ = std::io::stderr().write_all(buf.as_bytes());
163}