Skip to main content

uv_distribution/
distribution_database.rs

1use std::future::Future;
2use std::io;
3use std::path::Path;
4use std::pin::Pin;
5use std::sync::Arc;
6use std::task::{Context, Poll};
7
8use futures::{FutureExt, TryStreamExt};
9use tokio::io::{AsyncRead, AsyncSeekExt, ReadBuf};
10use tokio::sync::Semaphore;
11use tokio_util::compat::FuturesAsyncReadCompatExt;
12use tracing::{Instrument, info_span, instrument, warn};
13use url::Url;
14
15use uv_cache::{ArchiveId, CacheBucket, CacheEntry, WheelCache};
16use uv_cache_info::{CacheInfo, Timestamp};
17use uv_client::{
18    CacheControl, CachedClientError, Connectivity, DataWithCachePolicy, RegistryClient,
19};
20use uv_distribution_filename::{SourceDistExtension, WheelFilename};
21use uv_distribution_types::{
22    BuildInfo, BuildableSource, BuiltDist, Dist, DistRef, File, HashPolicy, Hashed, IndexUrl,
23    InstalledDist, Name, SourceDist, ToUrlError,
24};
25use uv_extract::hash::Hasher;
26use uv_fs::write_atomic;
27use uv_git::{GIT_LFS, GitError};
28use uv_install_wheel::validate_and_heal_record;
29use uv_platform_tags::Tags;
30use uv_pypi_types::{HashDigest, HashDigests, PyProjectToml};
31use uv_redacted::DisplaySafeUrl;
32use uv_types::{BuildContext, BuildStack};
33use uv_warnings::warn_user_once;
34
35use crate::archive::Archive;
36use uv_python::PythonVariant;
37
38use crate::error::PythonVersion;
39use crate::metadata::{ArchiveMetadata, Metadata};
40use crate::source::SourceDistributionBuilder;
41use crate::{Error, LocalWheel, Reporter, RequiresDist};
42
43/// A cached high-level interface to convert distributions (a requirement resolved to a location)
44/// to a wheel or wheel metadata.
45///
46/// For wheel metadata, this happens by either fetching the metadata from the remote wheel or by
47/// building the source distribution. For wheel files, either the wheel is downloaded or a source
48/// distribution is downloaded, built and the new wheel gets returned.
49///
50/// All kinds of wheel sources (index, URL, path) and source distribution source (index, URL, path,
51/// Git) are supported.
52///
53/// This struct also has the task of acquiring locks around source dist builds in general and git
54/// operation especially, as well as respecting concurrency limits.
55pub struct DistributionDatabase<'a, Context: BuildContext> {
56    build_context: &'a Context,
57    builder: SourceDistributionBuilder<'a, Context>,
58    client: ManagedClient<'a>,
59    reporter: Option<Arc<dyn Reporter>>,
60}
61
62impl<'a, Context: BuildContext> DistributionDatabase<'a, Context> {
63    pub fn new(
64        client: &'a RegistryClient,
65        build_context: &'a Context,
66        downloads_semaphore: Arc<Semaphore>,
67    ) -> Self {
68        Self {
69            build_context,
70            builder: SourceDistributionBuilder::new(build_context),
71            client: ManagedClient::new(client, downloads_semaphore),
72            reporter: None,
73        }
74    }
75
76    /// Set the build stack to use for the [`DistributionDatabase`].
77    #[must_use]
78    pub fn with_build_stack(self, build_stack: &'a BuildStack) -> Self {
79        Self {
80            builder: self.builder.with_build_stack(build_stack),
81            ..self
82        }
83    }
84
85    /// Set the [`Reporter`] to use for the [`DistributionDatabase`].
86    #[must_use]
87    pub fn with_reporter(self, reporter: Arc<dyn Reporter>) -> Self {
88        Self {
89            builder: self.builder.with_reporter(reporter.clone()),
90            reporter: Some(reporter),
91            ..self
92        }
93    }
94
95    /// Handle a specific `reqwest` error, and convert it to [`io::Error`].
96    fn handle_response_errors(&self, err: reqwest::Error) -> io::Error {
97        if err.is_timeout() {
98            // Assumption: The connect timeout with the 10s default is not the culprit.
99            io::Error::new(
100                io::ErrorKind::TimedOut,
101                format!(
102                    "Failed to download distribution due to network timeout. Try increasing UV_HTTP_TIMEOUT (current value: {}s).",
103                    self.client.unmanaged.read_timeout().as_secs()
104                ),
105            )
106        } else {
107            io::Error::other(err)
108        }
109    }
110
111    /// Either fetch the wheel or fetch and build the source distribution
112    ///
113    /// Returns a wheel that's compliant with the given platform tags.
114    ///
115    /// While hashes will be generated in some cases, hash-checking is only enforced for source
116    /// distributions, and should be enforced by the caller for wheels.
117    #[instrument(skip_all, fields(%dist))]
118    pub async fn get_or_build_wheel(
119        &self,
120        dist: &Dist,
121        tags: &Tags,
122        hashes: HashPolicy<'_>,
123    ) -> Result<LocalWheel, Error> {
124        match dist {
125            Dist::Built(built) => self.get_wheel(built, hashes).await,
126            Dist::Source(source) => self.build_wheel(source, tags, hashes).await,
127        }
128    }
129
130    /// Either fetch the only wheel metadata (directly from the index or with range requests) or
131    /// fetch and build the source distribution.
132    ///
133    /// While hashes will be generated in some cases, hash-checking is only enforced for source
134    /// distributions, and should be enforced by the caller for wheels.
135    #[instrument(skip_all, fields(%dist))]
136    pub async fn get_installed_metadata(
137        &self,
138        dist: &InstalledDist,
139    ) -> Result<ArchiveMetadata, Error> {
140        // If the metadata was provided by the user directly, prefer it.
141        if let Some(metadata) = self
142            .build_context
143            .dependency_metadata()
144            .get(dist.name(), Some(dist.version()))
145        {
146            return Ok(ArchiveMetadata::from_metadata23(metadata.clone()));
147        }
148
149        let metadata = dist
150            .read_metadata()
151            .map_err(|err| Error::ReadInstalled(Box::new(dist.clone()), err))?;
152
153        Ok(ArchiveMetadata::from_metadata23(metadata.clone()))
154    }
155
156    /// Either fetch the only wheel metadata (directly from the index or with range requests) or
157    /// fetch and build the source distribution.
158    ///
159    /// While hashes will be generated in some cases, hash-checking is only enforced for source
160    /// distributions, and should be enforced by the caller for wheels.
161    #[instrument(skip_all, fields(%dist))]
162    pub async fn get_or_build_wheel_metadata(
163        &self,
164        dist: &Dist,
165        hashes: HashPolicy<'_>,
166    ) -> Result<ArchiveMetadata, Error> {
167        match dist {
168            Dist::Built(built) => self.get_wheel_metadata(built, hashes).await,
169            Dist::Source(source) => {
170                self.build_wheel_metadata(&BuildableSource::Dist(source), hashes)
171                    .await
172            }
173        }
174    }
175
176    /// Fetch a wheel from the cache or download it from the index.
177    ///
178    /// While hashes will be generated in all cases, hash-checking is _not_ enforced and should
179    /// instead be enforced by the caller.
180    async fn get_wheel(
181        &self,
182        dist: &BuiltDist,
183        hashes: HashPolicy<'_>,
184    ) -> Result<LocalWheel, Error> {
185        match dist {
186            BuiltDist::Registry(wheels) => {
187                let wheel = wheels.best_wheel();
188                let WheelTarget {
189                    url,
190                    extension,
191                    size,
192                } = WheelTarget::try_from(&*wheel.file)?;
193
194                // Create a cache entry for the wheel.
195                let wheel_entry = self.build_context.cache().entry(
196                    CacheBucket::Wheels,
197                    WheelCache::Index(&wheel.index).wheel_dir(wheel.name().as_ref()),
198                    wheel.filename.cache_key(),
199                );
200
201                // If the URL is a file URL, load the wheel directly.
202                if url.scheme() == "file" {
203                    let path = url
204                        .to_file_path()
205                        .map_err(|()| Error::NonFileUrl(url.clone()))?;
206                    return self
207                        .load_wheel(
208                            &path,
209                            &wheel.filename,
210                            WheelExtension::Whl,
211                            wheel_entry,
212                            dist,
213                            hashes,
214                        )
215                        .await;
216                }
217
218                // Download and unzip.
219                match self
220                    .stream_wheel(
221                        url.clone(),
222                        dist.index(),
223                        &wheel.filename,
224                        extension,
225                        size,
226                        &wheel_entry,
227                        dist,
228                        hashes,
229                    )
230                    .await
231                {
232                    Ok(archive) => Ok(LocalWheel {
233                        dist: Dist::Built(dist.clone()),
234                        archive: self
235                            .build_context
236                            .cache()
237                            .archive(&archive.id)
238                            .into_boxed_path(),
239                        hashes: archive.hashes,
240                        filename: wheel.filename.clone(),
241                        cache: CacheInfo::default(),
242                        build: None,
243                    }),
244                    Err(Error::Extract(name, err)) => {
245                        if err.is_http_streaming_unsupported() {
246                            warn!(
247                                "Streaming unsupported for {dist}; downloading wheel to disk ({err})"
248                            );
249                        } else if err.is_http_streaming_failed() {
250                            warn!("Streaming failed for {dist}; downloading wheel to disk ({err})");
251                        } else {
252                            return Err(Error::Extract(name, err));
253                        }
254
255                        // If the request failed because streaming was unsupported or failed,
256                        // download the wheel directly.
257                        let archive = self
258                            .download_wheel(
259                                url,
260                                dist.index(),
261                                &wheel.filename,
262                                extension,
263                                size,
264                                &wheel_entry,
265                                dist,
266                                hashes,
267                            )
268                            .await?;
269
270                        Ok(LocalWheel {
271                            dist: Dist::Built(dist.clone()),
272                            archive: self
273                                .build_context
274                                .cache()
275                                .archive(&archive.id)
276                                .into_boxed_path(),
277                            hashes: archive.hashes,
278                            filename: wheel.filename.clone(),
279                            cache: CacheInfo::default(),
280                            build: None,
281                        })
282                    }
283                    Err(err) => Err(err),
284                }
285            }
286
287            BuiltDist::DirectUrl(wheel) => {
288                // Create a cache entry for the wheel.
289                let wheel_entry = self.build_context.cache().entry(
290                    CacheBucket::Wheels,
291                    WheelCache::Url(&wheel.url).wheel_dir(wheel.name().as_ref()),
292                    wheel.filename.cache_key(),
293                );
294
295                // Download and unzip.
296                match self
297                    .stream_wheel(
298                        wheel.url.raw().clone(),
299                        None,
300                        &wheel.filename,
301                        WheelExtension::Whl,
302                        None,
303                        &wheel_entry,
304                        dist,
305                        hashes,
306                    )
307                    .await
308                {
309                    Ok(archive) => Ok(LocalWheel {
310                        dist: Dist::Built(dist.clone()),
311                        archive: self
312                            .build_context
313                            .cache()
314                            .archive(&archive.id)
315                            .into_boxed_path(),
316                        hashes: archive.hashes,
317                        filename: wheel.filename.clone(),
318                        cache: CacheInfo::default(),
319                        build: None,
320                    }),
321                    Err(Error::Extract(name, err)) => {
322                        if err.is_http_streaming_unsupported() {
323                            warn!(
324                                "Streaming unsupported for {dist}; downloading wheel to disk ({err})"
325                            );
326                        } else if err.is_http_streaming_failed() {
327                            warn!("Streaming failed for {dist}; downloading wheel to disk ({err})");
328                        } else {
329                            return Err(Error::Extract(name, err));
330                        }
331
332                        // If the request failed because streaming was unsupported or failed,
333                        // download the wheel directly.
334                        let archive = self
335                            .download_wheel(
336                                wheel.url.raw().clone(),
337                                None,
338                                &wheel.filename,
339                                WheelExtension::Whl,
340                                None,
341                                &wheel_entry,
342                                dist,
343                                hashes,
344                            )
345                            .await?;
346                        Ok(LocalWheel {
347                            dist: Dist::Built(dist.clone()),
348                            archive: self
349                                .build_context
350                                .cache()
351                                .archive(&archive.id)
352                                .into_boxed_path(),
353                            hashes: archive.hashes,
354                            filename: wheel.filename.clone(),
355                            cache: CacheInfo::default(),
356                            build: None,
357                        })
358                    }
359                    Err(err) => Err(err),
360                }
361            }
362
363            BuiltDist::GitPath(wheel) => {
364                // Fetch the Git repository.
365                let fetch = self
366                    .build_context
367                    .git()
368                    .fetch(
369                        &wheel.git,
370                        self.client.unmanaged.git_http_settings(wheel.git.url()),
371                        self.build_context.cache().bucket(CacheBucket::Git),
372                        self.reporter.clone().map(<dyn Reporter>::into_git_reporter),
373                    )
374                    .await?;
375
376                if wheel.git.lfs().enabled() && !fetch.lfs_ready() {
377                    if GIT_LFS.is_err() {
378                        return Err(Error::MissingWheelGitLfsArtifacts(
379                            wheel.url.to_url(),
380                            GitError::GitLfsNotFound,
381                        ));
382                    }
383                    return Err(Error::MissingWheelGitLfsArtifacts(
384                        wheel.url.to_url(),
385                        GitError::GitLfsNotConfigured,
386                    ));
387                }
388
389                let git_sha = fetch.git().precise().expect("Exact commit after checkout");
390                let cache_entry = self.build_context.cache().entry(
391                    CacheBucket::Wheels,
392                    WheelCache::Git(&wheel.url, git_sha.as_short_str()).root(),
393                    wheel.filename.stem(),
394                );
395
396                let install_path = fetch.path().join(&wheel.install_path);
397
398                self.load_wheel(
399                    &install_path,
400                    &wheel.filename,
401                    WheelExtension::Whl,
402                    cache_entry,
403                    dist,
404                    hashes,
405                )
406                .await
407            }
408
409            BuiltDist::Path(wheel) => {
410                let cache_entry = self.build_context.cache().entry(
411                    CacheBucket::Wheels,
412                    WheelCache::Url(&wheel.url).wheel_dir(wheel.name().as_ref()),
413                    wheel.filename.cache_key(),
414                );
415
416                self.load_wheel(
417                    &wheel.install_path,
418                    &wheel.filename,
419                    WheelExtension::Whl,
420                    cache_entry,
421                    dist,
422                    hashes,
423                )
424                .await
425            }
426        }
427    }
428
429    /// Convert a source distribution into a wheel, fetching it from the cache or building it if
430    /// necessary.
431    ///
432    /// The returned wheel is guaranteed to come from a distribution with a matching hash, and
433    /// no build processes will be executed for distributions with mismatched hashes.
434    async fn build_wheel(
435        &self,
436        dist: &SourceDist,
437        tags: &Tags,
438        hashes: HashPolicy<'_>,
439    ) -> Result<LocalWheel, Error> {
440        // Warn if the source distribution isn't PEP 625 compliant.
441        // We do this here instead of in `SourceDistExtension::from_path` to minimize log volume:
442        // a non-compliant distribution isn't a huge problem if it's not actually being
443        // materialized into a wheel. Observe that we also allow no extension, since we expect that
444        // for directory and Git installs.
445        // NOTE: Observe that we also allow `.zip` sdists here, which are not PEP 625 compliant.
446        // This is because they were allowed on PyPI until relatively recently (2020).
447        if let Some(extension) = dist.extension()
448            && !matches!(
449                extension,
450                SourceDistExtension::TarGz | SourceDistExtension::Zip
451            )
452        {
453            if matches!(dist, SourceDist::Registry(_)) {
454                // Observe that we display a slightly different warning when the sdist comes
455                // from a registry, since that suggests that the user has inadvertently
456                // (rather than explicitly) depended on a non-compliant sdist.
457                warn_user_once!(
458                    "{dist} uses a legacy source distribution format ('.{extension}') that is not compliant with PEP 625. A future version of uv will reject this source distribution. Consider upgrading to a newer version of {package}",
459                    package = dist.name(),
460                );
461            } else {
462                warn_user_once!(
463                    "{dist} is not a standards-compliant source distribution: expected '.tar.gz' but found '.{extension}'. A future version of uv will reject source distributions that do not meet the requirements specified in PEP 625",
464                );
465            }
466        }
467
468        let built_wheel = self
469            .builder
470            .download_and_build(&BuildableSource::Dist(dist), tags, hashes, &self.client)
471            .boxed_local()
472            .await?;
473
474        // Check that the wheel is compatible with its install target.
475        //
476        // When building a build dependency for a cross-install, the build dependency needs
477        // to install and run on the host instead of the target. In this case the `tags` are already
478        // for the host instead of the target, so this check passes.
479        if !built_wheel.filename.is_compatible(tags) {
480            return if tags.is_cross() {
481                Err(Error::BuiltWheelIncompatibleTargetPlatform {
482                    filename: built_wheel.filename,
483                    python_platform: tags.python_platform().clone(),
484                    python_version: PythonVersion {
485                        version: tags.python_version(),
486                        variant: if tags.is_freethreaded() {
487                            PythonVariant::Freethreaded
488                        } else {
489                            PythonVariant::Default
490                        },
491                    },
492                })
493            } else {
494                Err(Error::BuiltWheelIncompatibleHostPlatform {
495                    filename: built_wheel.filename,
496                    python_platform: tags.python_platform().clone(),
497                    python_version: PythonVersion {
498                        version: tags.python_version(),
499                        variant: if tags.is_freethreaded() {
500                            PythonVariant::Freethreaded
501                        } else {
502                            PythonVariant::Default
503                        },
504                    },
505                })
506            };
507        }
508
509        // Acquire the advisory lock.
510        #[cfg(windows)]
511        let _lock = {
512            let lock_entry = CacheEntry::new(
513                built_wheel.target.parent().unwrap(),
514                format!(
515                    "{}.lock",
516                    built_wheel.target.file_name().unwrap().to_str().unwrap()
517                ),
518            );
519            lock_entry.lock().await.map_err(Error::CacheLock)?
520        };
521
522        // If the wheel was unzipped previously, respect it. Source distributions are
523        // cached under a unique revision ID, so unzipped directories are never stale.
524        match self.build_context.cache().resolve_link(&built_wheel.target) {
525            Ok(archive) => {
526                return Ok(LocalWheel {
527                    dist: Dist::Source(dist.clone()),
528                    archive: archive.into_boxed_path(),
529                    filename: built_wheel.filename,
530                    hashes: built_wheel.hashes,
531                    cache: built_wheel.cache_info,
532                    build: Some(built_wheel.build_info),
533                });
534            }
535            Err(err) if err.kind() == io::ErrorKind::NotFound => {}
536            Err(err) => return Err(Error::CacheRead(err)),
537        }
538
539        // Otherwise, unzip the wheel.
540        let id = self
541            .unzip_wheel(
542                &built_wheel.path,
543                &built_wheel.target,
544                DistRef::Source(dist),
545            )
546            .await?;
547
548        Ok(LocalWheel {
549            dist: Dist::Source(dist.clone()),
550            archive: self.build_context.cache().archive(&id).into_boxed_path(),
551            hashes: built_wheel.hashes,
552            filename: built_wheel.filename,
553            cache: built_wheel.cache_info,
554            build: Some(built_wheel.build_info),
555        })
556    }
557
558    /// Fetch the wheel metadata from the index, or from the cache if possible.
559    ///
560    /// While hashes will be generated in some cases, hash-checking is _not_ enforced and should
561    /// instead be enforced by the caller.
562    async fn get_wheel_metadata(
563        &self,
564        dist: &BuiltDist,
565        hashes: HashPolicy<'_>,
566    ) -> Result<ArchiveMetadata, Error> {
567        // If hash generation is enabled, and the distribution isn't hosted on a registry, get the
568        // entire wheel to ensure that the hashes are included in the response. If the distribution
569        // is hosted on an index, the hashes will be included in the simple metadata response.
570        // For hash _validation_, callers are expected to enforce the policy when retrieving the
571        // wheel.
572        //
573        // Historically, for `uv pip compile --universal`, we also generate hashes for
574        // registry-based distributions when the relevant registry doesn't provide them. This was
575        // motivated by `--find-links`. We continue that behavior (under `HashGeneration::All`) for
576        // backwards compatibility, but it's a little dubious, since we're only hashing _one_
577        // distribution here (as opposed to hashing all distributions for the version), and it may
578        // not even be a compatible distribution!
579        //
580        // TODO(charlie): Request the hashes via a separate method, to reduce the coupling in this API.
581        if hashes.is_generate(dist) {
582            let wheel = self.get_wheel(dist, hashes).await?;
583            // If the metadata was provided by the user directly, prefer it.
584            let metadata = if let Some(metadata) = self
585                .build_context
586                .dependency_metadata()
587                .get(dist.name(), Some(dist.version()))
588            {
589                metadata.clone()
590            } else {
591                wheel.metadata()?
592            };
593            let hashes = wheel.hashes;
594            return Ok(ArchiveMetadata {
595                metadata: Metadata::from_metadata23(metadata),
596                hashes,
597            });
598        }
599
600        // If the metadata was provided by the user directly, prefer it.
601        if let Some(metadata) = self
602            .build_context
603            .dependency_metadata()
604            .get(dist.name(), Some(dist.version()))
605        {
606            return Ok(ArchiveMetadata::from_metadata23(metadata.clone()));
607        }
608
609        let result = self
610            .client
611            .managed(|client| {
612                client
613                    .wheel_metadata(
614                        dist,
615                        self.build_context.git(),
616                        self.build_context.capabilities(),
617                        self.reporter.clone().map(<dyn Reporter>::into_git_reporter),
618                    )
619                    .boxed_local()
620            })
621            .await;
622
623        match result {
624            Ok(metadata) => {
625                // Validate that the metadata is consistent with the distribution.
626                Ok(ArchiveMetadata::from_metadata23(metadata))
627            }
628            Err(err) if err.is_http_streaming_unsupported() => {
629                warn!(
630                    "Streaming unsupported when fetching metadata for {dist}; downloading wheel directly ({err})"
631                );
632
633                // If the request failed due to an error that could be resolved by
634                // downloading the wheel directly, try that.
635                let wheel = self.get_wheel(dist, hashes).await?;
636                let metadata = wheel.metadata()?;
637                let hashes = wheel.hashes;
638                Ok(ArchiveMetadata {
639                    metadata: Metadata::from_metadata23(metadata),
640                    hashes,
641                })
642            }
643            Err(err) => Err(err.into()),
644        }
645    }
646
647    /// Build the wheel metadata for a source distribution, or fetch it from the cache if possible.
648    ///
649    /// The returned metadata is guaranteed to come from a distribution with a matching hash, and
650    /// no build processes will be executed for distributions with mismatched hashes.
651    pub async fn build_wheel_metadata(
652        &self,
653        source: &BuildableSource<'_>,
654        hashes: HashPolicy<'_>,
655    ) -> Result<ArchiveMetadata, Error> {
656        // If the metadata was provided by the user directly, prefer it.
657        if let Some(dist) = source.as_dist() {
658            if let Some(metadata) = self
659                .build_context
660                .dependency_metadata()
661                .get(dist.name(), dist.version())
662            {
663                // If we skipped the build, we should still resolve any Git dependencies to precise
664                // commits.
665                self.builder.resolve_revision(source, &self.client).await?;
666
667                return Ok(ArchiveMetadata::from_metadata23(metadata.clone()));
668            }
669        }
670
671        let metadata = self
672            .builder
673            .download_and_build_metadata(source, hashes, &self.client)
674            .boxed_local()
675            .await?;
676
677        Ok(metadata)
678    }
679
680    /// Return the [`RequiresDist`] from a `pyproject.toml`, if it can be statically extracted.
681    pub async fn requires_dist(
682        &self,
683        path: &Path,
684        pyproject_toml: &PyProjectToml,
685    ) -> Result<Option<RequiresDist>, Error> {
686        self.builder
687            .source_tree_requires_dist(
688                path,
689                pyproject_toml,
690                self.client.unmanaged.credentials_cache(),
691            )
692            .await
693    }
694
695    /// Stream a wheel from a URL, unzipping it into the cache as it's downloaded.
696    async fn stream_wheel(
697        &self,
698        url: DisplaySafeUrl,
699        index: Option<&IndexUrl>,
700        filename: &WheelFilename,
701        extension: WheelExtension,
702        size: Option<u64>,
703        wheel_entry: &CacheEntry,
704        dist: &BuiltDist,
705        hashes: HashPolicy<'_>,
706    ) -> Result<Archive, Error> {
707        // Acquire an advisory lock, to guard against concurrent writes.
708        #[cfg(windows)]
709        let _lock = {
710            let lock_entry = wheel_entry.with_file(format!("{}.lock", filename.stem()));
711            lock_entry.lock().await.map_err(Error::CacheLock)?
712        };
713
714        // Create an entry for the HTTP cache.
715        let http_entry = wheel_entry.with_file(format!("{}.http", filename.cache_key()));
716
717        let query_url = &url.clone();
718
719        let download = |response: reqwest::Response| {
720            async {
721                let size = size.or_else(|| content_length(&response));
722
723                let progress = self
724                    .reporter
725                    .as_ref()
726                    .map(|reporter| (reporter, reporter.on_download_start(dist.name(), size)));
727
728                let reader = response
729                    .bytes_stream()
730                    .map_err(|err| self.handle_response_errors(err))
731                    .into_async_read();
732
733                // Create a hasher for each hash algorithm.
734                let algorithms = hashes.algorithms();
735                let mut hashers = algorithms.into_iter().map(Hasher::from).collect::<Vec<_>>();
736                let mut hasher = uv_extract::hash::HashReader::new(reader.compat(), &mut hashers);
737
738                // Download and unzip the wheel to a temporary directory.
739                let temp_dir = tempfile::tempdir_in(self.build_context.cache().root())
740                    .map_err(Error::CacheWrite)?;
741
742                let files = match progress {
743                    Some((reporter, progress)) => {
744                        let mut reader = ProgressReader::new(&mut hasher, progress, &**reporter);
745                        match extension {
746                            WheelExtension::Whl => {
747                                uv_extract::stream::unzip(query_url, &mut reader, temp_dir.path())
748                                    .await
749                                    .map_err(|err| Error::Extract(filename.to_string(), err))?
750                            }
751                            WheelExtension::WhlZst => {
752                                uv_extract::stream::untar_zst(&mut reader, temp_dir.path())
753                                    .await
754                                    .map_err(|err| Error::Extract(filename.to_string(), err))?
755                            }
756                        }
757                    }
758                    None => match extension {
759                        WheelExtension::Whl => {
760                            uv_extract::stream::unzip(query_url, &mut hasher, temp_dir.path())
761                                .await
762                                .map_err(|err| Error::Extract(filename.to_string(), err))?
763                        }
764                        WheelExtension::WhlZst => {
765                            uv_extract::stream::untar_zst(&mut hasher, temp_dir.path())
766                                .await
767                                .map_err(|err| Error::Extract(filename.to_string(), err))?
768                        }
769                    },
770                };
771                // If necessary, exhaust the reader to compute the hash.
772                if !hashes.is_none() {
773                    hasher.finish().await.map_err(Error::HashExhaustion)?;
774                }
775
776                // Before we make the wheel accessible by persisting it, ensure that the RECORD is
777                // valid.
778                validate_and_heal_record(temp_dir.path(), files.iter(), dist)
779                    .map_err(Error::InstallWheelError)?;
780
781                // Persist the temporary directory to the directory store.
782                let id = self
783                    .build_context
784                    .cache()
785                    .persist(temp_dir.keep(), wheel_entry.path())
786                    .await
787                    .map_err(Error::CacheRead)?;
788
789                if let Some((reporter, progress)) = progress {
790                    reporter.on_download_complete(dist.name(), progress);
791                }
792
793                Ok(Archive::new(
794                    id,
795                    hashers.into_iter().map(HashDigest::from).collect(),
796                    filename.clone(),
797                ))
798            }
799            .instrument(info_span!("wheel", wheel = %dist))
800        };
801
802        // Fetch the archive from the cache, or download it if necessary.
803        let req = self.request(url.clone())?;
804
805        // Determine the cache control policy for the URL.
806        let cache_control = match self.client.unmanaged.connectivity() {
807            Connectivity::Online => {
808                if let Some(header) = index.and_then(|index| {
809                    self.build_context
810                        .locations()
811                        .artifact_cache_control_for(index)
812                }) {
813                    CacheControl::Override(header)
814                } else {
815                    CacheControl::from(
816                        self.build_context
817                            .cache()
818                            .freshness(&http_entry, Some(&filename.name), None)
819                            .map_err(Error::CacheRead)?,
820                    )
821                }
822            }
823            Connectivity::Offline => CacheControl::AllowStale,
824        };
825
826        let archive = self
827            .client
828            .managed(|client| {
829                client.cached_client().get_serde_with_retry(
830                    req,
831                    &http_entry,
832                    cache_control.clone(),
833                    download,
834                )
835            })
836            .await
837            .map_err(|err| match err {
838                CachedClientError::Callback { err, .. } => err,
839                CachedClientError::Client(err) => Error::Client(err),
840            })?;
841
842        // If the archive is missing the required hashes, or has since been removed, force a refresh.
843        let archive = Some(archive)
844            .filter(|archive| archive.has_digests(hashes))
845            .filter(|archive| archive.exists(self.build_context.cache()));
846
847        let archive = if let Some(archive) = archive {
848            archive
849        } else {
850            self.client
851                .managed(async |client| {
852                    client
853                        .cached_client()
854                        .skip_cache_with_retry(
855                            self.request(url)?,
856                            &http_entry,
857                            cache_control,
858                            download,
859                        )
860                        .await
861                        .map_err(|err| match err {
862                            CachedClientError::Callback { err, .. } => err,
863                            CachedClientError::Client(err) => Error::Client(err),
864                        })
865                })
866                .await?
867        };
868
869        Ok(archive)
870    }
871
872    /// Download a wheel from a URL, then unzip it into the cache.
873    async fn download_wheel(
874        &self,
875        url: DisplaySafeUrl,
876        index: Option<&IndexUrl>,
877        filename: &WheelFilename,
878        extension: WheelExtension,
879        size: Option<u64>,
880        wheel_entry: &CacheEntry,
881        dist: &BuiltDist,
882        hashes: HashPolicy<'_>,
883    ) -> Result<Archive, Error> {
884        // Acquire an advisory lock, to guard against concurrent writes.
885        #[cfg(windows)]
886        let _lock = {
887            let lock_entry = wheel_entry.with_file(format!("{}.lock", filename.stem()));
888            lock_entry.lock().await.map_err(Error::CacheLock)?
889        };
890
891        // Create an entry for the HTTP cache.
892        let http_entry = wheel_entry.with_file(format!("{}.http", filename.cache_key()));
893
894        let query_url = &url.clone();
895
896        let download = |response: reqwest::Response| {
897            async {
898                let size = size.or_else(|| content_length(&response));
899
900                let progress = self
901                    .reporter
902                    .as_ref()
903                    .map(|reporter| (reporter, reporter.on_download_start(dist.name(), size)));
904
905                let reader = response
906                    .bytes_stream()
907                    .map_err(|err| self.handle_response_errors(err))
908                    .into_async_read();
909
910                // Download the wheel to a temporary file.
911                let temp_file = tempfile::tempfile_in(self.build_context.cache().root())
912                    .map_err(Error::CacheWrite)?;
913                let mut writer = tokio::io::BufWriter::new(fs_err::tokio::File::from_std(
914                    // It's an unnamed file on Linux so that's the best approximation.
915                    fs_err::File::from_parts(temp_file, self.build_context.cache().root()),
916                ));
917
918                match progress {
919                    Some((reporter, progress)) => {
920                        // Wrap the reader in a progress reporter. This will report 100% progress
921                        // after the download is complete, even if we still have to unzip and hash
922                        // part of the file.
923                        let mut reader =
924                            ProgressReader::new(reader.compat(), progress, &**reporter);
925
926                        tokio::io::copy(&mut reader, &mut writer)
927                            .await
928                            .map_err(Error::CacheWrite)?;
929                    }
930                    None => {
931                        tokio::io::copy(&mut reader.compat(), &mut writer)
932                            .await
933                            .map_err(Error::CacheWrite)?;
934                    }
935                }
936
937                // Unzip the wheel to a temporary directory.
938                let temp_dir = tempfile::tempdir_in(self.build_context.cache().root())
939                    .map_err(Error::CacheWrite)?;
940                let mut file = writer.into_inner();
941                file.seek(io::SeekFrom::Start(0))
942                    .await
943                    .map_err(Error::CacheWrite)?;
944
945                // If no hashes are required, extract the wheel without hashing.
946                let (files, hashes) = if hashes.is_none() {
947                    let target = temp_dir.path().to_owned();
948                    let files = match extension {
949                        WheelExtension::Whl => {
950                            let file = file.into_std().await;
951                            tokio::task::spawn_blocking(move || uv_extract::unzip(file, &target))
952                                .await?
953                        }
954                        WheelExtension::WhlZst => {
955                            uv_extract::stream::untar_zst(file, &target).await
956                        }
957                    }
958                    .map_err(|err| Error::Extract(filename.to_string(), err))?;
959
960                    (files, HashDigests::empty())
961                } else {
962                    // Create a hasher for each hash algorithm.
963                    let algorithms = hashes.algorithms();
964                    let mut hashers = algorithms.into_iter().map(Hasher::from).collect::<Vec<_>>();
965                    let mut hasher = uv_extract::hash::HashReader::new(file, &mut hashers);
966
967                    let files = match extension {
968                        WheelExtension::Whl => {
969                            uv_extract::stream::unzip(query_url, &mut hasher, temp_dir.path())
970                                .await
971                                .map_err(|err| Error::Extract(filename.to_string(), err))?
972                        }
973                        WheelExtension::WhlZst => {
974                            uv_extract::stream::untar_zst(&mut hasher, temp_dir.path())
975                                .await
976                                .map_err(|err| Error::Extract(filename.to_string(), err))?
977                        }
978                    };
979
980                    // If necessary, exhaust the reader to compute the hash.
981                    hasher.finish().await.map_err(Error::HashExhaustion)?;
982                    let hashes = hashers.into_iter().map(HashDigest::from).collect();
983
984                    (files, hashes)
985                };
986
987                // Before we make the wheel accessible by persisting it, ensure that the RECORD is
988                // valid.
989                validate_and_heal_record(temp_dir.path(), files.iter(), dist)
990                    .map_err(Error::InstallWheelError)?;
991
992                // Persist the temporary directory to the directory store.
993                let id = self
994                    .build_context
995                    .cache()
996                    .persist(temp_dir.keep(), wheel_entry.path())
997                    .await
998                    .map_err(Error::CacheRead)?;
999
1000                if let Some((reporter, progress)) = progress {
1001                    reporter.on_download_complete(dist.name(), progress);
1002                }
1003
1004                Ok(Archive::new(id, hashes, filename.clone()))
1005            }
1006            .instrument(info_span!("wheel", wheel = %dist))
1007        };
1008
1009        // Fetch the archive from the cache, or download it if necessary.
1010        let req = self.request(url.clone())?;
1011
1012        // Determine the cache control policy for the URL.
1013        let cache_control = match self.client.unmanaged.connectivity() {
1014            Connectivity::Online => {
1015                if let Some(header) = index.and_then(|index| {
1016                    self.build_context
1017                        .locations()
1018                        .artifact_cache_control_for(index)
1019                }) {
1020                    CacheControl::Override(header)
1021                } else {
1022                    CacheControl::from(
1023                        self.build_context
1024                            .cache()
1025                            .freshness(&http_entry, Some(&filename.name), None)
1026                            .map_err(Error::CacheRead)?,
1027                    )
1028                }
1029            }
1030            Connectivity::Offline => CacheControl::AllowStale,
1031        };
1032
1033        let archive = self
1034            .client
1035            .managed(|client| {
1036                client.cached_client().get_serde_with_retry(
1037                    req,
1038                    &http_entry,
1039                    cache_control.clone(),
1040                    download,
1041                )
1042            })
1043            .await
1044            .map_err(|err| match err {
1045                CachedClientError::Callback { err, .. } => err,
1046                CachedClientError::Client(err) => Error::Client(err),
1047            })?;
1048
1049        // If the archive is missing the required hashes, or has since been removed, force a refresh.
1050        let archive = Some(archive)
1051            .filter(|archive| archive.has_digests(hashes))
1052            .filter(|archive| archive.exists(self.build_context.cache()));
1053
1054        let archive = if let Some(archive) = archive {
1055            archive
1056        } else {
1057            self.client
1058                .managed(async |client| {
1059                    client
1060                        .cached_client()
1061                        .skip_cache_with_retry(
1062                            self.request(url)?,
1063                            &http_entry,
1064                            cache_control,
1065                            download,
1066                        )
1067                        .await
1068                        .map_err(|err| match err {
1069                            CachedClientError::Callback { err, .. } => err,
1070                            CachedClientError::Client(err) => Error::Client(err),
1071                        })
1072                })
1073                .await?
1074        };
1075
1076        Ok(archive)
1077    }
1078
1079    /// Load a wheel from a local path.
1080    async fn load_wheel(
1081        &self,
1082        path: &Path,
1083        filename: &WheelFilename,
1084        extension: WheelExtension,
1085        wheel_entry: CacheEntry,
1086        dist: &BuiltDist,
1087        hashes: HashPolicy<'_>,
1088    ) -> Result<LocalWheel, Error> {
1089        #[cfg(windows)]
1090        let _lock = {
1091            let lock_entry = wheel_entry.with_file(format!("{}.lock", filename.stem()));
1092            lock_entry.lock().await.map_err(Error::CacheLock)?
1093        };
1094
1095        // Determine the last-modified time of the wheel.
1096        let modified = Timestamp::from_path(path).map_err(Error::CacheRead)?;
1097
1098        // Attempt to read the archive pointer from the cache.
1099        let pointer_entry = wheel_entry.with_file(format!("{}.rev", filename.cache_key()));
1100        let pointer = PathArchivePointer::read_from(&pointer_entry)?;
1101
1102        // Extract the archive from the pointer.
1103        let archive = pointer
1104            .filter(|pointer| pointer.is_up_to_date(modified))
1105            .map(PathArchivePointer::into_archive)
1106            .filter(|archive| archive.has_digests(hashes));
1107
1108        // If the file is already unzipped, and the cache is up-to-date, return it.
1109        if let Some(archive) = archive {
1110            Ok(LocalWheel {
1111                dist: Dist::Built(dist.clone()),
1112                archive: self
1113                    .build_context
1114                    .cache()
1115                    .archive(&archive.id)
1116                    .into_boxed_path(),
1117                hashes: archive.hashes,
1118                filename: filename.clone(),
1119                cache: CacheInfo::from_timestamp(modified),
1120                build: None,
1121            })
1122        } else if hashes.is_none() {
1123            // Otherwise, unzip the wheel.
1124            let archive = Archive::new(
1125                self.unzip_wheel(path, wheel_entry.path(), DistRef::Built(dist))
1126                    .await?,
1127                HashDigests::empty(),
1128                filename.clone(),
1129            );
1130
1131            // Write the archive pointer to the cache.
1132            let pointer = PathArchivePointer {
1133                timestamp: modified,
1134                archive: archive.clone(),
1135            };
1136            pointer.write_to(&pointer_entry).await?;
1137
1138            Ok(LocalWheel {
1139                dist: Dist::Built(dist.clone()),
1140                archive: self
1141                    .build_context
1142                    .cache()
1143                    .archive(&archive.id)
1144                    .into_boxed_path(),
1145                hashes: archive.hashes,
1146                filename: filename.clone(),
1147                cache: CacheInfo::from_timestamp(modified),
1148                build: None,
1149            })
1150        } else {
1151            // If necessary, compute the hashes of the wheel.
1152            let file = fs_err::tokio::File::open(path)
1153                .await
1154                .map_err(Error::CacheRead)?;
1155            let temp_dir = tempfile::tempdir_in(self.build_context.cache().root())
1156                .map_err(Error::CacheWrite)?;
1157
1158            // Create a hasher for each hash algorithm.
1159            let algorithms = hashes.algorithms();
1160            let mut hashers = algorithms.into_iter().map(Hasher::from).collect::<Vec<_>>();
1161            let mut hasher = uv_extract::hash::HashReader::new(file, &mut hashers);
1162
1163            // Unzip the wheel to a temporary directory.
1164            let files = match extension {
1165                WheelExtension::Whl => {
1166                    uv_extract::stream::unzip(path.display(), &mut hasher, temp_dir.path())
1167                        .await
1168                        .map_err(|err| Error::Extract(filename.to_string(), err))?
1169                }
1170                WheelExtension::WhlZst => {
1171                    uv_extract::stream::untar_zst(&mut hasher, temp_dir.path())
1172                        .await
1173                        .map_err(|err| Error::Extract(filename.to_string(), err))?
1174                }
1175            };
1176
1177            // Exhaust the reader to compute the hash.
1178            hasher.finish().await.map_err(Error::HashExhaustion)?;
1179
1180            let hashes = hashers.into_iter().map(HashDigest::from).collect();
1181
1182            // Before we make the wheel accessible by persisting it, ensure that the RECORD is
1183            // valid.
1184            validate_and_heal_record(temp_dir.path(), files.iter(), dist)
1185                .map_err(Error::InstallWheelError)?;
1186
1187            // Persist the temporary directory to the directory store.
1188            let id = self
1189                .build_context
1190                .cache()
1191                .persist(temp_dir.keep(), wheel_entry.path())
1192                .await
1193                .map_err(Error::CacheWrite)?;
1194
1195            // Create an archive.
1196            let archive = Archive::new(id, hashes, filename.clone());
1197
1198            // Write the archive pointer to the cache.
1199            let pointer = PathArchivePointer {
1200                timestamp: modified,
1201                archive: archive.clone(),
1202            };
1203            pointer.write_to(&pointer_entry).await?;
1204
1205            Ok(LocalWheel {
1206                dist: Dist::Built(dist.clone()),
1207                archive: self
1208                    .build_context
1209                    .cache()
1210                    .archive(&archive.id)
1211                    .into_boxed_path(),
1212                hashes: archive.hashes,
1213                filename: filename.clone(),
1214                cache: CacheInfo::from_timestamp(modified),
1215                build: None,
1216            })
1217        }
1218    }
1219
1220    /// Unzip a wheel into the cache, returning the path to the unzipped directory.
1221    async fn unzip_wheel(
1222        &self,
1223        path: &Path,
1224        target: &Path,
1225        dist: DistRef<'_>,
1226    ) -> Result<ArchiveId, Error> {
1227        let (temp_dir, files) = tokio::task::spawn_blocking({
1228            let path = path.to_owned();
1229            let root = self.build_context.cache().root().to_path_buf();
1230            move || -> Result<_, Error> {
1231                // Unzip the wheel into a temporary directory.
1232                let temp_dir = tempfile::tempdir_in(root).map_err(Error::CacheWrite)?;
1233                let reader = fs_err::File::open(&path).map_err(Error::CacheWrite)?;
1234                let files = uv_extract::unzip(reader, temp_dir.path())
1235                    .map_err(|err| Error::Extract(path.to_string_lossy().into_owned(), err))?;
1236                Ok((temp_dir, files))
1237            }
1238        })
1239        .await??;
1240
1241        // Before we make the wheel accessible by persisting it, ensure that the RECORD is valid.
1242        validate_and_heal_record(temp_dir.path(), files.iter(), dist)
1243            .map_err(Error::InstallWheelError)?;
1244
1245        // Persist the temporary directory to the directory store.
1246        let id = self
1247            .build_context
1248            .cache()
1249            .persist(temp_dir.keep(), target)
1250            .await
1251            .map_err(Error::CacheWrite)?;
1252
1253        Ok(id)
1254    }
1255
1256    /// Returns a GET [`reqwest::Request`] for the given URL.
1257    fn request(&self, url: DisplaySafeUrl) -> Result<reqwest::Request, reqwest::Error> {
1258        self.client
1259            .unmanaged
1260            .uncached_client(&url)
1261            .get(Url::from(url))
1262            .header(
1263                // `reqwest` defaults to accepting compressed responses.
1264                // Specify identity encoding to get consistent .whl downloading
1265                // behavior from servers. ref: https://github.com/pypa/pip/pull/1688
1266                "accept-encoding",
1267                reqwest::header::HeaderValue::from_static("identity"),
1268            )
1269            .build()
1270    }
1271
1272    /// Return the [`ManagedClient`] used by this resolver.
1273    pub fn client(&self) -> &ManagedClient<'a> {
1274        &self.client
1275    }
1276}
1277
1278/// A wrapper around `RegistryClient` that manages a concurrency limit.
1279pub struct ManagedClient<'a> {
1280    pub unmanaged: &'a RegistryClient,
1281    control: Arc<Semaphore>,
1282}
1283
1284impl<'a> ManagedClient<'a> {
1285    /// Create a new `ManagedClient` using the given client and concurrency semaphore.
1286    fn new(client: &'a RegistryClient, control: Arc<Semaphore>) -> Self {
1287        ManagedClient {
1288            unmanaged: client,
1289            control,
1290        }
1291    }
1292
1293    /// Perform a request using the client, respecting the concurrency limit.
1294    ///
1295    /// If the concurrency limit has been reached, this method will wait until a pending
1296    /// operation completes before executing the closure.
1297    pub async fn managed<F, T>(&self, f: impl FnOnce(&'a RegistryClient) -> F) -> T
1298    where
1299        F: Future<Output = T>,
1300    {
1301        let _permit = self.control.acquire().await.unwrap();
1302        f(self.unmanaged).await
1303    }
1304
1305    /// Perform a request using a client that internally manages the concurrency limit.
1306    ///
1307    /// The callback is passed the client and a semaphore. It must acquire the semaphore before
1308    /// any request through the client and drop it after.
1309    ///
1310    /// This method serves as an escape hatch for functions that may want to send multiple requests
1311    /// in parallel.
1312    pub async fn manual<F, T>(&'a self, f: impl FnOnce(&'a RegistryClient, &'a Semaphore) -> F) -> T
1313    where
1314        F: Future<Output = T>,
1315    {
1316        f(self.unmanaged, &self.control).await
1317    }
1318}
1319
1320/// Returns the value of the `Content-Length` header from the [`reqwest::Response`], if present.
1321fn content_length(response: &reqwest::Response) -> Option<u64> {
1322    response
1323        .headers()
1324        .get(reqwest::header::CONTENT_LENGTH)
1325        .and_then(|val| val.to_str().ok())
1326        .and_then(|val| val.parse::<u64>().ok())
1327}
1328
1329/// An asynchronous reader that reports progress as bytes are read.
1330struct ProgressReader<'a, R> {
1331    reader: R,
1332    index: usize,
1333    reporter: &'a dyn Reporter,
1334}
1335
1336impl<'a, R> ProgressReader<'a, R> {
1337    /// Create a new [`ProgressReader`] that wraps another reader.
1338    fn new(reader: R, index: usize, reporter: &'a dyn Reporter) -> Self {
1339        Self {
1340            reader,
1341            index,
1342            reporter,
1343        }
1344    }
1345}
1346
1347impl<R> AsyncRead for ProgressReader<'_, R>
1348where
1349    R: AsyncRead + Unpin,
1350{
1351    fn poll_read(
1352        mut self: Pin<&mut Self>,
1353        cx: &mut Context<'_>,
1354        buf: &mut ReadBuf<'_>,
1355    ) -> Poll<io::Result<()>> {
1356        Pin::new(&mut self.as_mut().reader)
1357            .poll_read(cx, buf)
1358            .map_ok(|()| {
1359                self.reporter
1360                    .on_download_progress(self.index, buf.filled().len() as u64);
1361            })
1362    }
1363}
1364
1365/// A pointer to an archive in the cache, fetched from an HTTP archive.
1366///
1367/// Encoded with `MsgPack`, and represented on disk by a `.http` file.
1368#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
1369pub struct HttpArchivePointer {
1370    archive: Archive,
1371}
1372
1373impl HttpArchivePointer {
1374    /// Read an [`HttpArchivePointer`] from the cache.
1375    pub fn read_from(path: impl AsRef<Path>) -> Result<Option<Self>, Error> {
1376        match fs_err::File::open(path.as_ref()) {
1377            Ok(file) => {
1378                let data = DataWithCachePolicy::from_reader(file)?.data;
1379                let archive = rmp_serde::from_slice::<Archive>(&data)?;
1380                Ok(Some(Self { archive }))
1381            }
1382            Err(err) if err.kind() == io::ErrorKind::NotFound => Ok(None),
1383            Err(err) => Err(Error::CacheRead(err)),
1384        }
1385    }
1386
1387    /// Return the [`Archive`] from the pointer.
1388    pub fn into_archive(self) -> Archive {
1389        self.archive
1390    }
1391
1392    /// Return the [`CacheInfo`] from the pointer.
1393    pub fn to_cache_info(&self) -> CacheInfo {
1394        CacheInfo::default()
1395    }
1396
1397    /// Return the [`BuildInfo`] from the pointer.
1398    pub fn to_build_info(&self) -> Option<BuildInfo> {
1399        None
1400    }
1401}
1402
1403/// A pointer to an archive in the cache, fetched from a local path.
1404///
1405/// Encoded with `MsgPack`, and represented on disk by a `.rev` file.
1406#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
1407pub struct PathArchivePointer {
1408    timestamp: Timestamp,
1409    archive: Archive,
1410}
1411
1412impl PathArchivePointer {
1413    /// Read an [`PathArchivePointer`] from the cache.
1414    pub fn read_from(path: impl AsRef<Path>) -> Result<Option<Self>, Error> {
1415        match fs_err::read(path) {
1416            Ok(cached) => Ok(Some(rmp_serde::from_slice::<Self>(&cached)?)),
1417            Err(err) if err.kind() == io::ErrorKind::NotFound => Ok(None),
1418            Err(err) => Err(Error::CacheRead(err)),
1419        }
1420    }
1421
1422    /// Write an [`PathArchivePointer`] to the cache.
1423    pub(crate) async fn write_to(&self, entry: &CacheEntry) -> Result<(), Error> {
1424        write_atomic(entry.path(), rmp_serde::to_vec(&self)?)
1425            .await
1426            .map_err(Error::CacheWrite)
1427    }
1428
1429    /// Returns `true` if the archive is up-to-date with the given modified timestamp.
1430    pub fn is_up_to_date(&self, modified: Timestamp) -> bool {
1431        self.timestamp == modified
1432    }
1433
1434    /// Return the [`Archive`] from the pointer.
1435    pub fn into_archive(self) -> Archive {
1436        self.archive
1437    }
1438
1439    /// Return the [`CacheInfo`] from the pointer.
1440    pub fn to_cache_info(&self) -> CacheInfo {
1441        CacheInfo::from_timestamp(self.timestamp)
1442    }
1443
1444    /// Return the [`BuildInfo`] from the pointer.
1445    pub fn to_build_info(&self) -> Option<BuildInfo> {
1446        None
1447    }
1448}
1449
1450#[derive(Debug, Clone)]
1451struct WheelTarget {
1452    /// The URL from which the wheel can be downloaded.
1453    url: DisplaySafeUrl,
1454    /// The expected extension of the wheel file.
1455    extension: WheelExtension,
1456    /// The expected size of the wheel file, if known.
1457    size: Option<u64>,
1458}
1459
1460impl TryFrom<&File> for WheelTarget {
1461    type Error = ToUrlError;
1462
1463    /// Determine the [`WheelTarget`] from a [`File`].
1464    fn try_from(file: &File) -> Result<Self, Self::Error> {
1465        let url = file.url.to_url()?;
1466        if let Some(zstd) = file.zstd.as_ref() {
1467            Ok(Self {
1468                url: add_tar_zst_extension(url),
1469                extension: WheelExtension::WhlZst,
1470                size: zstd.size,
1471            })
1472        } else {
1473            Ok(Self {
1474                url,
1475                extension: WheelExtension::Whl,
1476                size: file.size,
1477            })
1478        }
1479    }
1480}
1481
1482#[derive(Debug, Copy, Clone, PartialEq, Eq)]
1483enum WheelExtension {
1484    /// A `.whl` file.
1485    Whl,
1486    /// A `.whl.tar.zst` file.
1487    WhlZst,
1488}
1489
1490/// Add `.tar.zst` to the end of the URL path, if it doesn't already exist.
1491#[must_use]
1492fn add_tar_zst_extension(mut url: DisplaySafeUrl) -> DisplaySafeUrl {
1493    let mut path = url.path().to_string();
1494
1495    if !path.ends_with(".tar.zst") {
1496        path.push_str(".tar.zst");
1497    }
1498
1499    url.set_path(&path);
1500    url
1501}
1502
1503#[cfg(test)]
1504mod tests {
1505    use super::*;
1506
1507    #[test]
1508    fn test_add_tar_zst_extension() {
1509        let url =
1510            DisplaySafeUrl::parse("https://files.pythonhosted.org/flask-3.1.0-py3-none-any.whl")
1511                .unwrap();
1512        assert_eq!(
1513            add_tar_zst_extension(url).as_str(),
1514            "https://files.pythonhosted.org/flask-3.1.0-py3-none-any.whl.tar.zst"
1515        );
1516
1517        let url = DisplaySafeUrl::parse(
1518            "https://files.pythonhosted.org/flask-3.1.0-py3-none-any.whl.tar.zst",
1519        )
1520        .unwrap();
1521        assert_eq!(
1522            add_tar_zst_extension(url).as_str(),
1523            "https://files.pythonhosted.org/flask-3.1.0-py3-none-any.whl.tar.zst"
1524        );
1525
1526        let url = DisplaySafeUrl::parse(
1527            "https://files.pythonhosted.org/flask-3.1.0%2Bcu124-py3-none-any.whl",
1528        )
1529        .unwrap();
1530        assert_eq!(
1531            add_tar_zst_extension(url).as_str(),
1532            "https://files.pythonhosted.org/flask-3.1.0%2Bcu124-py3-none-any.whl.tar.zst"
1533        );
1534    }
1535}