1use std::cmp::Reverse;
2use std::sync::Arc;
3
4use futures::{FutureExt, Stream, TryFutureExt, TryStreamExt, stream::FuturesUnordered};
5use tracing::{debug, instrument};
6
7use uv_cache::Cache;
8use uv_configuration::BuildOptions;
9use uv_distribution::{DistributionDatabase, LocalWheel};
10use uv_distribution_types::{
11 BuildableSource, CachedDist, DerivationChain, Dist, DistErrorKind, Hashed, Identifier, Name,
12 RemoteSource, Resolution,
13};
14use uv_normalize::PackageName;
15use uv_platform_tags::Tags;
16use uv_redacted::DisplaySafeUrl;
17use uv_types::{BuildContext, HashStrategy, InFlight};
18
19pub struct Preparer<'a, Context: BuildContext> {
23 tags: &'a Tags,
24 cache: &'a Cache,
25 hashes: &'a HashStrategy,
26 build_options: &'a BuildOptions,
27 database: DistributionDatabase<'a, Context>,
28 reporter: Option<Arc<dyn Reporter>>,
29}
30
31impl<'a, Context: BuildContext> Preparer<'a, Context> {
32 pub fn new(
33 cache: &'a Cache,
34 tags: &'a Tags,
35 hashes: &'a HashStrategy,
36 build_options: &'a BuildOptions,
37 database: DistributionDatabase<'a, Context>,
38 ) -> Self {
39 Self {
40 tags,
41 cache,
42 hashes,
43 build_options,
44 database,
45 reporter: None,
46 }
47 }
48
49 #[must_use]
51 pub fn with_reporter(self, reporter: Arc<dyn Reporter>) -> Self {
52 Self {
53 tags: self.tags,
54 cache: self.cache,
55 hashes: self.hashes,
56 build_options: self.build_options,
57 database: self
58 .database
59 .with_reporter(reporter.clone().into_distribution_reporter()),
60 reporter: Some(reporter),
61 }
62 }
63
64 pub fn prepare_stream<'stream>(
66 &'stream self,
67 distributions: Vec<Arc<Dist>>,
68 in_flight: &'stream InFlight,
69 resolution: &'stream Resolution,
70 ) -> impl Stream<Item = Result<CachedDist, Error>> + 'stream {
71 distributions
72 .into_iter()
73 .map(async |dist| {
74 let wheel = self
75 .get_wheel((*dist).clone(), in_flight, resolution)
76 .boxed_local()
77 .await?;
78 if let Some(reporter) = self.reporter.as_ref() {
79 reporter.on_progress(&wheel);
80 }
81 Ok::<CachedDist, Error>(wheel)
82 })
83 .collect::<FuturesUnordered<_>>()
84 }
85
86 #[instrument(skip_all, fields(total = distributions.len()))]
88 pub async fn prepare(
89 &self,
90 mut distributions: Vec<Arc<Dist>>,
91 in_flight: &InFlight,
92 resolution: &Resolution,
93 ) -> Result<Vec<CachedDist>, Error> {
94 distributions
96 .sort_unstable_by_key(|distribution| Reverse(distribution.size().unwrap_or(u64::MAX)));
97
98 let wheels = self
99 .prepare_stream(distributions, in_flight, resolution)
100 .try_collect()
101 .await?;
102
103 if let Some(reporter) = self.reporter.as_ref() {
104 reporter.on_complete();
105 }
106
107 Ok(wheels)
108 }
109 #[instrument(skip_all, fields(name = % dist, size = ? dist.size(), url = dist.file().map(| file | file.url.to_string()).unwrap_or_default()))]
111 pub async fn get_wheel(
112 &self,
113 dist: Dist,
114 in_flight: &InFlight,
115 resolution: &Resolution,
116 ) -> Result<CachedDist, Error> {
117 match dist {
119 Dist::Built(ref dist) => {
120 if self.build_options.no_binary_package(dist.name()) {
121 return Err(Error::NoBinary(dist.name().clone()));
122 }
123 }
124 Dist::Source(ref dist) => {
125 if self.build_options.no_build_package(dist.name()) {
126 if dist.is_editable() {
127 debug!("Allowing build for editable source distribution: {dist}");
128 } else {
129 return Err(Error::NoBuild(dist.name().clone()));
130 }
131 }
132 }
133 }
134
135 let id = dist.distribution_id();
136 if in_flight.downloads.register(id.clone()) {
137 let policy = self.hashes.get(&dist);
138
139 let result = self
140 .database
141 .get_or_build_wheel(&dist, self.tags, policy)
142 .boxed_local()
143 .map_err(|err| Error::from_dist(dist.clone(), err, resolution))
144 .await
145 .and_then(|wheel: LocalWheel| {
146 if wheel.satisfies(policy) {
147 Ok(wheel)
148 } else {
149 let err = uv_distribution::Error::hash_mismatch(
150 dist.to_string(),
151 policy.digests(),
152 wheel.hashes(),
153 );
154 Err(Error::from_dist(dist, err, resolution))
155 }
156 })
157 .map(CachedDist::from);
158 match result {
159 Ok(cached) => {
160 in_flight.downloads.done(id, Ok(cached.clone()));
161 Ok(cached)
162 }
163 Err(err) => {
164 in_flight.downloads.done(id, Err(err.to_string()));
165 Err(err)
166 }
167 }
168 } else {
169 let result = in_flight
170 .downloads
171 .wait(&id)
172 .await
173 .expect("missing value for registered task");
174
175 match result.as_ref() {
176 Ok(cached) => {
177 if *dist.name() != cached.filename().name {
188 let err = uv_distribution::Error::WheelMetadataNameMismatch {
189 given: dist.name().clone(),
190 metadata: cached.filename().name.clone(),
191 };
192 return Err(Error::from_dist(dist, err, resolution));
193 }
194 if let Some(version) = dist.version() {
195 if *version != cached.filename().version
196 && *version != cached.filename().version.clone().without_local()
197 {
198 let err = uv_distribution::Error::WheelMetadataVersionMismatch {
199 given: version.clone(),
200 metadata: cached.filename().version.clone(),
201 };
202 return Err(Error::from_dist(dist, err, resolution));
203 }
204 }
205 Ok(cached.clone())
206 }
207 Err(err) => Err(Error::Thread(err.to_owned())),
208 }
209 }
210 }
211}
212
213#[derive(thiserror::Error, Debug)]
214pub enum Error {
215 #[error("Building source distributions is disabled, but attempted to build `{0}`")]
216 NoBuild(PackageName),
217 #[error("Using pre-built wheels is disabled, but attempted to use `{0}`")]
218 NoBinary(PackageName),
219 #[error("{0} `{1}`")]
220 Dist(
221 DistErrorKind,
222 Box<Dist>,
223 DerivationChain,
224 #[source] uv_distribution::Error,
225 ),
226 #[error("Cyclic build dependency detected for `{0}`")]
227 CyclicBuildDependency(PackageName),
228 #[error("Unzip failed in another thread: {0}")]
229 Thread(String),
230}
231
232impl Error {
233 fn from_dist(dist: Dist, err: uv_distribution::Error, resolution: &Resolution) -> Self {
235 let chain =
236 DerivationChain::from_resolution(resolution, (&dist).into()).unwrap_or_default();
237 Self::Dist(
238 DistErrorKind::from_dist(&dist, &err),
239 Box::new(dist),
240 chain,
241 err,
242 )
243 }
244}
245
246pub trait Reporter: Send + Sync {
247 fn on_progress(&self, dist: &CachedDist);
250
251 fn on_complete(&self);
253
254 fn on_download_start(&self, name: &PackageName, size: Option<u64>) -> usize;
256
257 fn on_download_progress(&self, index: usize, bytes: u64);
260
261 fn on_download_complete(&self, name: &PackageName, index: usize);
263
264 fn on_build_start(&self, source: &BuildableSource) -> usize;
266
267 fn on_build_complete(&self, source: &BuildableSource, id: usize);
269
270 fn on_checkout_start(&self, url: &DisplaySafeUrl, rev: &str) -> usize;
272
273 fn on_checkout_complete(&self, url: &DisplaySafeUrl, rev: &str, index: usize);
275}
276
277impl dyn Reporter {
278 pub(crate) fn into_distribution_reporter(
280 self: Arc<dyn Reporter>,
281 ) -> Arc<dyn uv_distribution::Reporter> {
282 Arc::new(Facade {
283 reporter: self.clone(),
284 })
285 }
286}
287
288struct Facade {
290 reporter: Arc<dyn Reporter>,
291}
292
293impl uv_distribution::Reporter for Facade {
294 fn on_build_start(&self, source: &BuildableSource) -> usize {
295 self.reporter.on_build_start(source)
296 }
297
298 fn on_build_complete(&self, source: &BuildableSource, id: usize) {
299 self.reporter.on_build_complete(source, id);
300 }
301
302 fn on_checkout_start(&self, url: &DisplaySafeUrl, rev: &str) -> usize {
303 self.reporter.on_checkout_start(url, rev)
304 }
305
306 fn on_checkout_complete(&self, url: &DisplaySafeUrl, rev: &str, index: usize) {
307 self.reporter.on_checkout_complete(url, rev, index);
308 }
309
310 fn on_download_start(&self, name: &PackageName, size: Option<u64>) -> usize {
311 self.reporter.on_download_start(name, size)
312 }
313
314 fn on_download_progress(&self, index: usize, inc: u64) {
315 self.reporter.on_download_progress(index, inc);
316 }
317
318 fn on_download_complete(&self, name: &PackageName, index: usize) {
319 self.reporter.on_download_complete(name, index);
320 }
321}