1pub mod types;
2
3mod cache;
4mod coalesce;
5mod discover;
6mod dispatch;
7mod exec;
8mod report;
9mod size;
10mod style;
11
12#[cfg(feature = "cli")]
13pub mod cli;
14
15pub use types::{Config, Edition, Error, FileFailure, Report, Result};
16
17#[doc(hidden)]
26pub mod __test_only {
27 use crate::types::{Config, CrateUnit, Result};
28 use crossbeam_channel::Sender;
29
30 pub fn discover_run(cfg: &Config, tx: &Sender<CrateUnit>) -> Result<()> {
31 crate::discover::run(cfg, tx).map(drop)
32 }
33}
34
35use crossbeam_channel::bounded;
36use std::sync::Arc;
37use std::thread::{self, JoinHandle};
38
39pub fn run(cfg: &Config) -> Result<Report> {
48 if matches!(cfg.workers, Some(0)) {
49 return Err(Error::InvalidWorkers(0));
50 }
51
52 if cfg.warnings {
53 warn_if_stable_rustfmt();
54 }
55
56 let n = cfg
57 .workers
58 .or_else(|| thread::available_parallelism().ok().map(std::num::NonZeroUsize::get))
59 .unwrap_or(1);
60 let cap = cfg.channel_capacity.unwrap_or(512);
61 let batch_size = cfg.batch_size.unwrap_or(3);
62 let solo_threshold = size::HUGE_CUTOFF_BYTES;
66
67 let (unit_tx, unit_rx) = bounded::<types::CrateUnit>(cap);
68 let queue = Arc::new(dispatch::PriorityQueue::new());
69 let (result_tx, result_rx) = bounded::<types::BatchResult>(cap);
70
71 let cfg_d = cfg.clone();
72 let producer = thread::spawn(move || discover::run(&cfg_d, &unit_tx));
75
76 let coalescer_q = queue.clone();
79 let coalescer = thread::spawn(move || {
80 coalesce::run(
81 &unit_rx,
82 &coalescer_q,
83 batch_size,
84 coalesce::DEFAULT_PACK_MULTIPLIER,
85 solo_threshold,
86 );
87 coalescer_q.close();
88 });
89
90 let mut workers = Vec::with_capacity(n);
91 for _ in 0..n {
92 let q = queue.clone();
93 let tx = result_tx.clone();
94 let cfg_w = cfg.clone();
95 workers.push(thread::spawn(move || exec::worker(&q, &tx, &cfg_w)));
96 }
97 drop(result_tx);
98
99 let report = report::aggregate(result_rx);
100
101 let cache_opt = join_fallible(producer, "producer")?;
102 join_void(coalescer, "coalescer")?;
103 for w in workers {
104 join_void(w, "worker")?;
105 }
106
107 if let Some(mut cache) = cache_opt {
110 for f in &report.failures {
111 cache.invalidate(&f.manifest_dir);
112 }
113 let _ = cache.commit_and_save();
114 }
115
116 Ok(report)
117}
118
119fn join_fallible<T>(h: JoinHandle<Result<T>>, name: &'static str) -> Result<T> {
120 h.join().map_err(|_| Error::ThreadPanicked(name))?
121}
122
123fn join_void(h: JoinHandle<()>, name: &'static str) -> Result<()> {
124 h.join().map_err(|_| Error::ThreadPanicked(name))
125}
126
127fn warn_if_stable_rustfmt() {
131 use std::fmt::Write as _;
132 use std::io::Write as _;
133 let Ok(out) = std::process::Command::new("rustfmt")
134 .arg("--version")
135 .output()
136 else {
137 return;
138 };
139 if !out.status.success() {
140 return;
141 }
142 let version = String::from_utf8_lossy(&out.stdout);
143 if version.contains("nightly") {
144 return;
145 }
146 let palette = style::palette();
147 let (w, wr) = (palette.warning.render(), palette.warning.render_reset());
148 let (n, nr) = (palette.note.render(), palette.note.render_reset());
149 let (h, hr) = (palette.help.render(), palette.help.render_reset());
150 let mut buf = String::new();
151 let _ = writeln!(buf, "{w}warning{wr}: rustfmt on PATH is the stable channel");
152 let _ = writeln!(buf, " {n}note{nr}: `{}`", version.trim());
153 let _ = writeln!(
154 buf,
155 " {n}note{nr}: unstable rustfmt.toml options will be silently ignored"
156 );
157 let _ = writeln!(
158 buf,
159 " {h}help{hr}: run via `cargo +nightly ff` for parity with `cargo +nightly fmt`"
160 );
161 buf.push('\n');
162 let _ = std::io::stderr().write_all(buf.as_bytes());
163}