Skip to main content

uv_installer/
preparer.rs

1use std::cmp::Reverse;
2use std::sync::Arc;
3
4use futures::{FutureExt, Stream, TryFutureExt, TryStreamExt, stream::FuturesUnordered};
5use tracing::{debug, instrument};
6
7use uv_cache::Cache;
8use uv_configuration::BuildOptions;
9use uv_distribution::{DistributionDatabase, LocalWheel};
10use uv_distribution_types::{
11    BuildableSource, CachedDist, DerivationChain, Dist, DistErrorKind, Hashed, Identifier, Name,
12    RemoteSource, Resolution,
13};
14use uv_normalize::PackageName;
15use uv_platform_tags::Tags;
16use uv_redacted::DisplaySafeUrl;
17use uv_types::{BuildContext, HashStrategy, InFlight};
18
19/// Prepare distributions for installation.
20///
21/// Downloads, builds, and unzips a set of distributions.
22pub struct Preparer<'a, Context: BuildContext> {
23    tags: &'a Tags,
24    cache: &'a Cache,
25    hashes: &'a HashStrategy,
26    build_options: &'a BuildOptions,
27    database: DistributionDatabase<'a, Context>,
28    reporter: Option<Arc<dyn Reporter>>,
29}
30
31impl<'a, Context: BuildContext> Preparer<'a, Context> {
32    pub fn new(
33        cache: &'a Cache,
34        tags: &'a Tags,
35        hashes: &'a HashStrategy,
36        build_options: &'a BuildOptions,
37        database: DistributionDatabase<'a, Context>,
38    ) -> Self {
39        Self {
40            tags,
41            cache,
42            hashes,
43            build_options,
44            database,
45            reporter: None,
46        }
47    }
48
49    /// Set the [`Reporter`] to use for operations.
50    #[must_use]
51    pub fn with_reporter(self, reporter: Arc<dyn Reporter>) -> Self {
52        Self {
53            tags: self.tags,
54            cache: self.cache,
55            hashes: self.hashes,
56            build_options: self.build_options,
57            database: self
58                .database
59                .with_reporter(reporter.clone().into_distribution_reporter()),
60            reporter: Some(reporter),
61        }
62    }
63
64    /// Fetch, build, and unzip the distributions in parallel.
65    pub fn prepare_stream<'stream>(
66        &'stream self,
67        distributions: Vec<Arc<Dist>>,
68        in_flight: &'stream InFlight,
69        resolution: &'stream Resolution,
70    ) -> impl Stream<Item = Result<CachedDist, Error>> + 'stream {
71        distributions
72            .into_iter()
73            .map(async |dist| {
74                let wheel = self
75                    .get_wheel((*dist).clone(), in_flight, resolution)
76                    .boxed_local()
77                    .await?;
78                if let Some(reporter) = self.reporter.as_ref() {
79                    reporter.on_progress(&wheel);
80                }
81                Ok::<CachedDist, Error>(wheel)
82            })
83            .collect::<FuturesUnordered<_>>()
84    }
85
86    /// Download, build, and unzip a set of distributions.
87    #[instrument(skip_all, fields(total = distributions.len()))]
88    pub async fn prepare(
89        &self,
90        mut distributions: Vec<Arc<Dist>>,
91        in_flight: &InFlight,
92        resolution: &Resolution,
93    ) -> Result<Vec<CachedDist>, Error> {
94        // Sort the distributions by size.
95        distributions
96            .sort_unstable_by_key(|distribution| Reverse(distribution.size().unwrap_or(u64::MAX)));
97
98        let wheels = self
99            .prepare_stream(distributions, in_flight, resolution)
100            .try_collect()
101            .await?;
102
103        if let Some(reporter) = self.reporter.as_ref() {
104            reporter.on_complete();
105        }
106
107        Ok(wheels)
108    }
109    /// Download, build, and unzip a single wheel.
110    #[instrument(skip_all, fields(name = % dist, size = ? dist.size(), url = dist.file().map(| file | file.url.to_string()).unwrap_or_default()))]
111    pub async fn get_wheel(
112        &self,
113        dist: Dist,
114        in_flight: &InFlight,
115        resolution: &Resolution,
116    ) -> Result<CachedDist, Error> {
117        // Validate that the distribution is compatible with the build options.
118        match dist {
119            Dist::Built(ref dist) => {
120                if self.build_options.no_binary_package(dist.name()) {
121                    return Err(Error::NoBinary(dist.name().clone()));
122                }
123            }
124            Dist::Source(ref dist) => {
125                if self.build_options.no_build_package(dist.name()) {
126                    if dist.is_editable() {
127                        debug!("Allowing build for editable source distribution: {dist}");
128                    } else {
129                        return Err(Error::NoBuild(dist.name().clone()));
130                    }
131                }
132            }
133        }
134
135        let id = dist.distribution_id();
136        if in_flight.downloads.register(id.clone()) {
137            let policy = self.hashes.get(&dist);
138
139            let result = self
140                .database
141                .get_or_build_wheel(&dist, self.tags, policy)
142                .boxed_local()
143                .map_err(|err| Error::from_dist(dist.clone(), err, resolution))
144                .await
145                .and_then(|wheel: LocalWheel| {
146                    if wheel.satisfies(policy) {
147                        Ok(wheel)
148                    } else {
149                        let err = uv_distribution::Error::hash_mismatch(
150                            dist.to_string(),
151                            policy.digests(),
152                            wheel.hashes(),
153                        );
154                        Err(Error::from_dist(dist, err, resolution))
155                    }
156                })
157                .map(CachedDist::from);
158            match result {
159                Ok(cached) => {
160                    in_flight.downloads.done(id, Ok(cached.clone()));
161                    Ok(cached)
162                }
163                Err(err) => {
164                    in_flight.downloads.done(id, Err(err.to_string()));
165                    Err(err)
166                }
167            }
168        } else {
169            let result = in_flight
170                .downloads
171                .wait(&id)
172                .await
173                .expect("missing value for registered task");
174
175            match result.as_ref() {
176                Ok(cached) => {
177                    // Validate that the wheel is compatible with the distribution.
178                    //
179                    // `get_or_build_wheel` is guaranteed to return a wheel that matches the
180                    // distribution. But there could be multiple requested distributions that share
181                    // a cache entry in `in_flight`, so we need to double-check here.
182                    //
183                    // For example, if two requirements are based on the same local path, but use
184                    // different names, then they'll share an `in_flight` entry, but one of the two
185                    // should be rejected (since at least one of the names will not match the
186                    // package name).
187                    if *dist.name() != cached.filename().name {
188                        let err = uv_distribution::Error::WheelMetadataNameMismatch {
189                            given: dist.name().clone(),
190                            metadata: cached.filename().name.clone(),
191                        };
192                        return Err(Error::from_dist(dist, err, resolution));
193                    }
194                    if let Some(version) = dist.version() {
195                        if *version != cached.filename().version
196                            && *version != cached.filename().version.clone().without_local()
197                        {
198                            let err = uv_distribution::Error::WheelMetadataVersionMismatch {
199                                given: version.clone(),
200                                metadata: cached.filename().version.clone(),
201                            };
202                            return Err(Error::from_dist(dist, err, resolution));
203                        }
204                    }
205                    Ok(cached.clone())
206                }
207                Err(err) => Err(Error::Thread(err.to_owned())),
208            }
209        }
210    }
211}
212
213#[derive(thiserror::Error, Debug)]
214pub enum Error {
215    #[error("Building source distributions is disabled, but attempted to build `{0}`")]
216    NoBuild(PackageName),
217    #[error("Using pre-built wheels is disabled, but attempted to use `{0}`")]
218    NoBinary(PackageName),
219    #[error("{0} `{1}`")]
220    Dist(
221        DistErrorKind,
222        Box<Dist>,
223        DerivationChain,
224        #[source] uv_distribution::Error,
225    ),
226    #[error("Cyclic build dependency detected for `{0}`")]
227    CyclicBuildDependency(PackageName),
228    #[error("Unzip failed in another thread: {0}")]
229    Thread(String),
230}
231
232impl Error {
233    /// Create an [`Error`] from a distribution error.
234    fn from_dist(dist: Dist, err: uv_distribution::Error, resolution: &Resolution) -> Self {
235        let chain =
236            DerivationChain::from_resolution(resolution, (&dist).into()).unwrap_or_default();
237        Self::Dist(
238            DistErrorKind::from_dist(&dist, &err),
239            Box::new(dist),
240            chain,
241            err,
242        )
243    }
244}
245
246pub trait Reporter: Send + Sync {
247    /// Callback to invoke when a wheel is unzipped. This implies that the wheel was downloaded and,
248    /// if necessary, built.
249    fn on_progress(&self, dist: &CachedDist);
250
251    /// Callback to invoke when the operation is complete.
252    fn on_complete(&self);
253
254    /// Callback to invoke when a download is kicked off.
255    fn on_download_start(&self, name: &PackageName, size: Option<u64>) -> usize;
256
257    /// Callback to invoke when a download makes progress (i.e. some number of bytes are
258    /// downloaded).
259    fn on_download_progress(&self, index: usize, bytes: u64);
260
261    /// Callback to invoke when a download is complete.
262    fn on_download_complete(&self, name: &PackageName, index: usize);
263
264    /// Callback to invoke when a source distribution build is kicked off.
265    fn on_build_start(&self, source: &BuildableSource) -> usize;
266
267    /// Callback to invoke when a source distribution build is complete.
268    fn on_build_complete(&self, source: &BuildableSource, id: usize);
269
270    /// Callback to invoke when a repository checkout begins.
271    fn on_checkout_start(&self, url: &DisplaySafeUrl, rev: &str) -> usize;
272
273    /// Callback to invoke when a repository checkout completes.
274    fn on_checkout_complete(&self, url: &DisplaySafeUrl, rev: &str, index: usize);
275}
276
277impl dyn Reporter {
278    /// Converts this reporter to a [`uv_distribution::Reporter`].
279    pub(crate) fn into_distribution_reporter(
280        self: Arc<dyn Reporter>,
281    ) -> Arc<dyn uv_distribution::Reporter> {
282        Arc::new(Facade {
283            reporter: self.clone(),
284        })
285    }
286}
287
288/// A facade for converting from [`Reporter`] to [`uv_distribution::Reporter`].
289struct Facade {
290    reporter: Arc<dyn Reporter>,
291}
292
293impl uv_distribution::Reporter for Facade {
294    fn on_build_start(&self, source: &BuildableSource) -> usize {
295        self.reporter.on_build_start(source)
296    }
297
298    fn on_build_complete(&self, source: &BuildableSource, id: usize) {
299        self.reporter.on_build_complete(source, id);
300    }
301
302    fn on_checkout_start(&self, url: &DisplaySafeUrl, rev: &str) -> usize {
303        self.reporter.on_checkout_start(url, rev)
304    }
305
306    fn on_checkout_complete(&self, url: &DisplaySafeUrl, rev: &str, index: usize) {
307        self.reporter.on_checkout_complete(url, rev, index);
308    }
309
310    fn on_download_start(&self, name: &PackageName, size: Option<u64>) -> usize {
311        self.reporter.on_download_start(name, size)
312    }
313
314    fn on_download_progress(&self, index: usize, inc: u64) {
315        self.reporter.on_download_progress(index, inc);
316    }
317
318    fn on_download_complete(&self, name: &PackageName, index: usize) {
319        self.reporter.on_download_complete(name, index);
320    }
321}