1use std::panic::AssertUnwindSafe;
2use std::path::{Path, PathBuf};
3use std::process::Stdio;
4use std::time::Duration;
5use std::{env, io, panic};
6
7use async_channel::{Receiver, SendError};
8use tempfile::tempdir_in;
9use thiserror::Error;
10use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader};
11use tokio::process::{Child, ChildStderr, ChildStdin, ChildStdout, Command};
12use tokio::sync::oneshot;
13use tracing::{debug, instrument};
14use walkdir::WalkDir;
15
16use uv_configuration::Concurrency;
17use uv_fs::Simplified;
18use uv_static::EnvVars;
19use uv_warnings::warn_user;
20
21const COMPILEALL_SCRIPT: &str = include_str!("pip_compileall.py");
22const DEFAULT_COMPILE_TIMEOUT: Duration = Duration::from_mins(1);
24
25#[derive(Debug, Error)]
26pub enum CompileError {
27 #[error("Failed to list files in `site-packages`")]
28 Walkdir(#[from] walkdir::Error),
29 #[error("Failed to send task to worker")]
30 WorkerDisappeared(SendError<PathBuf>),
31 #[error("The task executor is broken, did some other task panic?")]
32 Join,
33 #[error("Failed to start Python interpreter to run compile script")]
34 PythonSubcommand(#[source] io::Error),
35 #[error("Failed to create temporary script file")]
36 TempFile(#[source] io::Error),
37 #[error(r#"Bytecode compilation failed, expected "{0}", received: "{1}""#)]
38 WrongPath(String, String),
39 #[error("Failed to write to Python {device}")]
40 ChildStdio {
41 device: &'static str,
42 #[source]
43 err: io::Error,
44 },
45 #[error("Python process stderr:\n{stderr}")]
46 ErrorWithStderr {
47 stderr: String,
48 #[source]
49 err: Box<Self>,
50 },
51 #[error("Bytecode timed out ({}s) compiling file: `{}`", elapsed.as_secs_f32(), source_file)]
52 CompileTimeout {
53 elapsed: Duration,
54 source_file: String,
55 },
56 #[error("Python startup timed out ({}s)", _0.as_secs_f32())]
57 StartupTimeout(Duration),
58 #[error("Got invalid value from environment for {var}: {message}.")]
59 EnvironmentError { var: &'static str, message: String },
60}
61
62#[instrument(skip(python_executable))]
73pub async fn compile_tree(
74 dir: &Path,
75 python_executable: &Path,
76 concurrency: &Concurrency,
77 cache: &Path,
78) -> Result<usize, CompileError> {
79 debug_assert!(
80 dir.is_absolute(),
81 "compileall doesn't work with relative paths: `{}`",
82 dir.display()
83 );
84 let worker_count = concurrency.installs;
85
86 let (sender, receiver) = async_channel::bounded::<PathBuf>(worker_count * 10);
88
89 let tempdir = tempdir_in(cache).map_err(CompileError::TempFile)?;
91 let pip_compileall_py = tempdir.path().join("pip_compileall.py");
92
93 let timeout: Option<Duration> = match env::var(EnvVars::UV_COMPILE_BYTECODE_TIMEOUT) {
94 Ok(value) => match value.as_str() {
95 "0" => None,
96 _ => match value.parse::<u64>().map(Duration::from_secs) {
97 Ok(duration) => Some(duration),
98 Err(_) => {
99 return Err(CompileError::EnvironmentError {
100 var: EnvVars::UV_COMPILE_BYTECODE_TIMEOUT,
101 message: format!("Expected an integer number of seconds, got \"{value}\""),
102 });
103 }
104 },
105 },
106 Err(_) => Some(DEFAULT_COMPILE_TIMEOUT),
107 };
108 if let Some(duration) = timeout {
109 debug!(
110 "Using bytecode compilation timeout of {}s",
111 duration.as_secs()
112 );
113 } else {
114 debug!("Disabling bytecode compilation timeout");
115 }
116
117 debug!("Starting {} bytecode compilation workers", worker_count);
118 let mut worker_handles = Vec::new();
119 for _ in 0..worker_count {
120 let (tx, rx) = oneshot::channel();
121
122 let worker = worker(
123 dir.to_path_buf(),
124 python_executable.to_path_buf(),
125 pip_compileall_py.clone(),
126 receiver.clone(),
127 timeout,
128 );
129
130 std::thread::Builder::new()
132 .name("uv-compile".to_owned())
133 .spawn(move || {
134 let result = panic::catch_unwind(AssertUnwindSafe(|| {
136 tokio::runtime::Builder::new_current_thread()
137 .enable_all()
138 .build()
139 .expect("Failed to build runtime")
140 .block_on(worker)
141 }));
142
143 let _ = tx.send(result);
145 })
146 .expect("Failed to start compilation worker");
147
148 worker_handles.push(async { rx.await.unwrap() });
149 }
150 drop(receiver);
152
153 let mut source_files = 0;
155 let mut send_error = None;
156 let walker = WalkDir::new(dir)
157 .into_iter()
158 .filter_entry(|dir| dir.file_name() != "__pycache__");
160 for entry in walker {
161 let (entry, metadata) =
163 match entry.and_then(|entry| entry.metadata().map(|metadata| (entry, metadata))) {
164 Ok((entry, metadata)) => (entry, metadata),
165 Err(err) => {
166 if err
167 .io_error()
168 .is_some_and(|err| err.kind() == io::ErrorKind::NotFound)
169 {
170 continue;
172 }
173 return Err(err.into());
174 }
175 };
176 if metadata.is_file() && entry.path().extension().is_some_and(|ext| ext == "py") {
178 source_files += 1;
179 if let Err(err) = sender.send(entry.path().to_owned()).await {
180 send_error = Some(err);
185 break;
186 }
187 }
188 }
189
190 drop(sender);
193
194 for result in futures::future::join_all(worker_handles).await {
196 match result {
197 Err(_) => return Err(CompileError::Join),
199 Ok(Err(compile_error)) => return Err(compile_error),
201 Ok(Ok(())) => {}
202 }
203 }
204
205 if let Some(send_error) = send_error {
206 return Err(CompileError::WorkerDisappeared(send_error));
209 }
210
211 Ok(source_files)
212}
213
214async fn worker(
215 dir: PathBuf,
216 interpreter: PathBuf,
217 pip_compileall_py: PathBuf,
218 receiver: Receiver<PathBuf>,
219 timeout: Option<Duration>,
220) -> Result<(), CompileError> {
221 fs_err::tokio::write(&pip_compileall_py, COMPILEALL_SCRIPT)
222 .await
223 .map_err(CompileError::TempFile)?;
224
225 let wait_until_ready = async {
230 loop {
231 if let Some(child) =
233 launch_bytecode_compiler(&dir, &interpreter, &pip_compileall_py).await?
234 {
235 break Ok::<_, CompileError>(child);
236 }
237 }
238 };
239
240 let (mut bytecode_compiler, child_stdin, mut child_stdout, mut child_stderr) =
243 if let Some(duration) = timeout {
244 tokio::time::timeout(duration, wait_until_ready)
245 .await
246 .map_err(|_| CompileError::StartupTimeout(timeout.unwrap()))??
247 } else {
248 wait_until_ready.await?
249 };
250
251 let stderr_reader = tokio::task::spawn(async move {
252 let mut child_stderr_collected: Vec<u8> = Vec::new();
253 child_stderr
254 .read_to_end(&mut child_stderr_collected)
255 .await?;
256 Ok(child_stderr_collected)
257 });
258
259 let result = worker_main_loop(receiver, child_stdin, &mut child_stdout, timeout).await;
260 let _ = bytecode_compiler.kill().await;
262
263 let child_stderr_collected = stderr_reader
266 .await
267 .map_err(|_| CompileError::Join)?
268 .map_err(|err| CompileError::ChildStdio {
269 device: "stderr",
270 err,
271 })?;
272 let result = if child_stderr_collected.is_empty() {
273 result
274 } else {
275 let stderr = String::from_utf8_lossy(&child_stderr_collected);
276 match result {
277 Ok(()) => {
278 debug!(
279 "Bytecode compilation `python` at {} stderr:\n{}\n---",
280 interpreter.user_display(),
281 stderr
282 );
283 Ok(())
284 }
285 Err(err) => Err(CompileError::ErrorWithStderr {
286 stderr: stderr.trim().to_string(),
287 err: Box::new(err),
288 }),
289 }
290 };
291
292 debug!("Bytecode compilation worker exiting: {:?}", result);
293
294 result
295}
296
297async fn launch_bytecode_compiler(
299 dir: &Path,
300 interpreter: &Path,
301 pip_compileall_py: &Path,
302) -> Result<
303 Option<(
304 Child,
305 ChildStdin,
306 BufReader<ChildStdout>,
307 BufReader<ChildStderr>,
308 )>,
309 CompileError,
310> {
311 let mut bytecode_compiler = Command::new(interpreter)
313 .arg(pip_compileall_py)
314 .stdin(Stdio::piped())
315 .stdout(Stdio::piped())
316 .stderr(Stdio::piped())
317 .current_dir(dir)
318 .env(EnvVars::PYTHONUNBUFFERED, "1")
320 .spawn()
321 .map_err(CompileError::PythonSubcommand)?;
322
323 let child_stdin = bytecode_compiler
326 .stdin
327 .take()
328 .expect("Child must have stdin");
329 let mut child_stdout = BufReader::new(
330 bytecode_compiler
331 .stdout
332 .take()
333 .expect("Child must have stdout"),
334 );
335 let child_stderr = BufReader::new(
336 bytecode_compiler
337 .stderr
338 .take()
339 .expect("Child must have stderr"),
340 );
341
342 let mut out_line = String::new();
344 child_stdout
345 .read_line(&mut out_line)
346 .await
347 .map_err(|err| CompileError::ChildStdio {
348 device: "stdout",
349 err,
350 })?;
351
352 if out_line.trim_end() == "Ready" {
353 Ok(Some((
355 bytecode_compiler,
356 child_stdin,
357 child_stdout,
358 child_stderr,
359 )))
360 } else if out_line.is_empty() {
361 Ok(None)
363 } else {
364 Err(CompileError::WrongPath("Ready".to_string(), out_line))
366 }
367}
368
369async fn worker_main_loop(
373 receiver: Receiver<PathBuf>,
374 mut child_stdin: ChildStdin,
375 child_stdout: &mut BufReader<ChildStdout>,
376 timeout: Option<Duration>,
377) -> Result<(), CompileError> {
378 let mut out_line = String::new();
379 while let Ok(source_file) = receiver.recv().await {
380 let source_file = source_file.display().to_string();
381 if source_file.contains(['\r', '\n']) {
382 warn_user!("Path contains newline, skipping: {source_file:?}");
383 continue;
384 }
385 let bytes = format!("{source_file}\n").into_bytes();
387
388 let python_handle = async {
389 child_stdin
390 .write_all(&bytes)
391 .await
392 .map_err(|err| CompileError::ChildStdio {
393 device: "stdin",
394 err,
395 })?;
396
397 out_line.clear();
398 child_stdout.read_line(&mut out_line).await.map_err(|err| {
399 CompileError::ChildStdio {
400 device: "stdout",
401 err,
402 }
403 })?;
404 Ok::<(), CompileError>(())
405 };
406
407 if let Some(duration) = timeout {
410 tokio::time::timeout(duration, python_handle)
411 .await
412 .map_err(|_| CompileError::CompileTimeout {
413 elapsed: duration,
414 source_file: source_file.clone(),
415 })??;
416 } else {
417 python_handle.await?;
418 }
419
420 let actual = out_line.trim_end_matches(['\n', '\r']);
423 if actual != source_file {
424 return Err(CompileError::WrongPath(source_file, actual.to_string()));
425 }
426 }
427 Ok(())
428}