Skip to main content

uv_distribution/
distribution_database.rs

1use std::future::Future;
2use std::io;
3use std::path::Path;
4use std::pin::Pin;
5use std::sync::Arc;
6use std::task::{Context, Poll};
7
8use futures::{FutureExt, TryStreamExt};
9use tempfile::TempDir;
10use tokio::io::{AsyncRead, AsyncSeekExt, ReadBuf};
11use tokio::sync::Semaphore;
12use tokio_util::compat::FuturesAsyncReadCompatExt;
13use tracing::{Instrument, info_span, instrument, warn};
14use url::Url;
15
16use uv_cache::{ArchiveId, CacheBucket, CacheEntry, WheelCache};
17use uv_cache_info::{CacheInfo, Timestamp};
18use uv_client::{
19    CacheControl, CachedClientError, Connectivity, DataWithCachePolicy, RegistryClient,
20};
21use uv_distribution_filename::{SourceDistExtension, WheelFilename};
22use uv_distribution_types::{
23    BuildInfo, BuildableSource, BuiltDist, Dist, File, HashPolicy, Hashed, IndexUrl, InstalledDist,
24    Name, SourceDist, ToUrlError,
25};
26use uv_extract::hash::Hasher;
27use uv_fs::write_atomic;
28use uv_platform_tags::Tags;
29use uv_pypi_types::{HashDigest, HashDigests, PyProjectToml};
30use uv_redacted::DisplaySafeUrl;
31use uv_types::{BuildContext, BuildStack};
32use uv_warnings::warn_user_once;
33
34use crate::archive::Archive;
35use crate::metadata::{ArchiveMetadata, Metadata};
36use crate::source::SourceDistributionBuilder;
37use crate::{Error, LocalWheel, Reporter, RequiresDist};
38
39/// A cached high-level interface to convert distributions (a requirement resolved to a location)
40/// to a wheel or wheel metadata.
41///
42/// For wheel metadata, this happens by either fetching the metadata from the remote wheel or by
43/// building the source distribution. For wheel files, either the wheel is downloaded or a source
44/// distribution is downloaded, built and the new wheel gets returned.
45///
46/// All kinds of wheel sources (index, URL, path) and source distribution source (index, URL, path,
47/// Git) are supported.
48///
49/// This struct also has the task of acquiring locks around source dist builds in general and git
50/// operation especially, as well as respecting concurrency limits.
51pub struct DistributionDatabase<'a, Context: BuildContext> {
52    build_context: &'a Context,
53    builder: SourceDistributionBuilder<'a, Context>,
54    client: ManagedClient<'a>,
55    reporter: Option<Arc<dyn Reporter>>,
56}
57
58impl<'a, Context: BuildContext> DistributionDatabase<'a, Context> {
59    pub fn new(
60        client: &'a RegistryClient,
61        build_context: &'a Context,
62        downloads_semaphore: Arc<Semaphore>,
63    ) -> Self {
64        Self {
65            build_context,
66            builder: SourceDistributionBuilder::new(build_context),
67            client: ManagedClient::new(client, downloads_semaphore),
68            reporter: None,
69        }
70    }
71
72    /// Set the build stack to use for the [`DistributionDatabase`].
73    #[must_use]
74    pub fn with_build_stack(self, build_stack: &'a BuildStack) -> Self {
75        Self {
76            builder: self.builder.with_build_stack(build_stack),
77            ..self
78        }
79    }
80
81    /// Set the [`Reporter`] to use for the [`DistributionDatabase`].
82    #[must_use]
83    pub fn with_reporter(self, reporter: Arc<dyn Reporter>) -> Self {
84        Self {
85            builder: self.builder.with_reporter(reporter.clone()),
86            reporter: Some(reporter),
87            ..self
88        }
89    }
90
91    /// Handle a specific `reqwest` error, and convert it to [`io::Error`].
92    fn handle_response_errors(&self, err: reqwest::Error) -> io::Error {
93        if err.is_timeout() {
94            // Assumption: The connect timeout with the 10s default is not the culprit.
95            io::Error::new(
96                io::ErrorKind::TimedOut,
97                format!(
98                    "Failed to download distribution due to network timeout. Try increasing UV_HTTP_TIMEOUT (current value: {}s).",
99                    self.client.unmanaged.read_timeout().as_secs()
100                ),
101            )
102        } else {
103            io::Error::other(err)
104        }
105    }
106
107    /// Either fetch the wheel or fetch and build the source distribution
108    ///
109    /// Returns a wheel that's compliant with the given platform tags.
110    ///
111    /// While hashes will be generated in some cases, hash-checking is only enforced for source
112    /// distributions, and should be enforced by the caller for wheels.
113    #[instrument(skip_all, fields(%dist))]
114    pub async fn get_or_build_wheel(
115        &self,
116        dist: &Dist,
117        tags: &Tags,
118        hashes: HashPolicy<'_>,
119    ) -> Result<LocalWheel, Error> {
120        match dist {
121            Dist::Built(built) => self.get_wheel(built, hashes).await,
122            Dist::Source(source) => self.build_wheel(source, tags, hashes).await,
123        }
124    }
125
126    /// Either fetch the only wheel metadata (directly from the index or with range requests) or
127    /// fetch and build the source distribution.
128    ///
129    /// While hashes will be generated in some cases, hash-checking is only enforced for source
130    /// distributions, and should be enforced by the caller for wheels.
131    #[instrument(skip_all, fields(%dist))]
132    pub async fn get_installed_metadata(
133        &self,
134        dist: &InstalledDist,
135    ) -> Result<ArchiveMetadata, Error> {
136        // If the metadata was provided by the user directly, prefer it.
137        if let Some(metadata) = self
138            .build_context
139            .dependency_metadata()
140            .get(dist.name(), Some(dist.version()))
141        {
142            return Ok(ArchiveMetadata::from_metadata23(metadata.clone()));
143        }
144
145        let metadata = dist
146            .read_metadata()
147            .map_err(|err| Error::ReadInstalled(Box::new(dist.clone()), err))?;
148
149        Ok(ArchiveMetadata::from_metadata23(metadata.clone()))
150    }
151
152    /// Either fetch the only wheel metadata (directly from the index or with range requests) or
153    /// fetch and build the source distribution.
154    ///
155    /// While hashes will be generated in some cases, hash-checking is only enforced for source
156    /// distributions, and should be enforced by the caller for wheels.
157    #[instrument(skip_all, fields(%dist))]
158    pub async fn get_or_build_wheel_metadata(
159        &self,
160        dist: &Dist,
161        hashes: HashPolicy<'_>,
162    ) -> Result<ArchiveMetadata, Error> {
163        match dist {
164            Dist::Built(built) => self.get_wheel_metadata(built, hashes).await,
165            Dist::Source(source) => {
166                self.build_wheel_metadata(&BuildableSource::Dist(source), hashes)
167                    .await
168            }
169        }
170    }
171
172    /// Fetch a wheel from the cache or download it from the index.
173    ///
174    /// While hashes will be generated in all cases, hash-checking is _not_ enforced and should
175    /// instead be enforced by the caller.
176    async fn get_wheel(
177        &self,
178        dist: &BuiltDist,
179        hashes: HashPolicy<'_>,
180    ) -> Result<LocalWheel, Error> {
181        match dist {
182            BuiltDist::Registry(wheels) => {
183                let wheel = wheels.best_wheel();
184                let WheelTarget {
185                    url,
186                    extension,
187                    size,
188                } = WheelTarget::try_from(&*wheel.file)?;
189
190                // Create a cache entry for the wheel.
191                let wheel_entry = self.build_context.cache().entry(
192                    CacheBucket::Wheels,
193                    WheelCache::Index(&wheel.index).wheel_dir(wheel.name().as_ref()),
194                    wheel.filename.cache_key(),
195                );
196
197                // If the URL is a file URL, load the wheel directly.
198                if url.scheme() == "file" {
199                    let path = url
200                        .to_file_path()
201                        .map_err(|()| Error::NonFileUrl(url.clone()))?;
202                    return self
203                        .load_wheel(
204                            &path,
205                            &wheel.filename,
206                            WheelExtension::Whl,
207                            wheel_entry,
208                            dist,
209                            hashes,
210                        )
211                        .await;
212                }
213
214                // Download and unzip.
215                match self
216                    .stream_wheel(
217                        url.clone(),
218                        dist.index(),
219                        &wheel.filename,
220                        extension,
221                        size,
222                        &wheel_entry,
223                        dist,
224                        hashes,
225                    )
226                    .await
227                {
228                    Ok(archive) => Ok(LocalWheel {
229                        dist: Dist::Built(dist.clone()),
230                        archive: self
231                            .build_context
232                            .cache()
233                            .archive(&archive.id)
234                            .into_boxed_path(),
235                        hashes: archive.hashes,
236                        filename: wheel.filename.clone(),
237                        cache: CacheInfo::default(),
238                        build: None,
239                    }),
240                    Err(Error::Extract(name, err)) => {
241                        if err.is_http_streaming_unsupported() {
242                            warn!(
243                                "Streaming unsupported for {dist}; downloading wheel to disk ({err})"
244                            );
245                        } else if err.is_http_streaming_failed() {
246                            warn!("Streaming failed for {dist}; downloading wheel to disk ({err})");
247                        } else {
248                            return Err(Error::Extract(name, err));
249                        }
250
251                        // If the request failed because streaming was unsupported or failed,
252                        // download the wheel directly.
253                        let archive = self
254                            .download_wheel(
255                                url,
256                                dist.index(),
257                                &wheel.filename,
258                                extension,
259                                size,
260                                &wheel_entry,
261                                dist,
262                                hashes,
263                            )
264                            .await?;
265
266                        Ok(LocalWheel {
267                            dist: Dist::Built(dist.clone()),
268                            archive: self
269                                .build_context
270                                .cache()
271                                .archive(&archive.id)
272                                .into_boxed_path(),
273                            hashes: archive.hashes,
274                            filename: wheel.filename.clone(),
275                            cache: CacheInfo::default(),
276                            build: None,
277                        })
278                    }
279                    Err(err) => Err(err),
280                }
281            }
282
283            BuiltDist::DirectUrl(wheel) => {
284                // Create a cache entry for the wheel.
285                let wheel_entry = self.build_context.cache().entry(
286                    CacheBucket::Wheels,
287                    WheelCache::Url(&wheel.url).wheel_dir(wheel.name().as_ref()),
288                    wheel.filename.cache_key(),
289                );
290
291                // Download and unzip.
292                match self
293                    .stream_wheel(
294                        wheel.url.raw().clone(),
295                        None,
296                        &wheel.filename,
297                        WheelExtension::Whl,
298                        None,
299                        &wheel_entry,
300                        dist,
301                        hashes,
302                    )
303                    .await
304                {
305                    Ok(archive) => Ok(LocalWheel {
306                        dist: Dist::Built(dist.clone()),
307                        archive: self
308                            .build_context
309                            .cache()
310                            .archive(&archive.id)
311                            .into_boxed_path(),
312                        hashes: archive.hashes,
313                        filename: wheel.filename.clone(),
314                        cache: CacheInfo::default(),
315                        build: None,
316                    }),
317                    Err(Error::Extract(name, err)) => {
318                        if err.is_http_streaming_unsupported() {
319                            warn!(
320                                "Streaming unsupported for {dist}; downloading wheel to disk ({err})"
321                            );
322                        } else if err.is_http_streaming_failed() {
323                            warn!("Streaming failed for {dist}; downloading wheel to disk ({err})");
324                        } else {
325                            return Err(Error::Extract(name, err));
326                        }
327
328                        // If the request failed because streaming was unsupported or failed,
329                        // download the wheel directly.
330                        let archive = self
331                            .download_wheel(
332                                wheel.url.raw().clone(),
333                                None,
334                                &wheel.filename,
335                                WheelExtension::Whl,
336                                None,
337                                &wheel_entry,
338                                dist,
339                                hashes,
340                            )
341                            .await?;
342                        Ok(LocalWheel {
343                            dist: Dist::Built(dist.clone()),
344                            archive: self
345                                .build_context
346                                .cache()
347                                .archive(&archive.id)
348                                .into_boxed_path(),
349                            hashes: archive.hashes,
350                            filename: wheel.filename.clone(),
351                            cache: CacheInfo::default(),
352                            build: None,
353                        })
354                    }
355                    Err(err) => Err(err),
356                }
357            }
358
359            BuiltDist::Path(wheel) => {
360                let cache_entry = self.build_context.cache().entry(
361                    CacheBucket::Wheels,
362                    WheelCache::Url(&wheel.url).wheel_dir(wheel.name().as_ref()),
363                    wheel.filename.cache_key(),
364                );
365
366                self.load_wheel(
367                    &wheel.install_path,
368                    &wheel.filename,
369                    WheelExtension::Whl,
370                    cache_entry,
371                    dist,
372                    hashes,
373                )
374                .await
375            }
376        }
377    }
378
379    /// Convert a source distribution into a wheel, fetching it from the cache or building it if
380    /// necessary.
381    ///
382    /// The returned wheel is guaranteed to come from a distribution with a matching hash, and
383    /// no build processes will be executed for distributions with mismatched hashes.
384    async fn build_wheel(
385        &self,
386        dist: &SourceDist,
387        tags: &Tags,
388        hashes: HashPolicy<'_>,
389    ) -> Result<LocalWheel, Error> {
390        // Warn if the source distribution isn't PEP 625 compliant.
391        // We do this here instead of in `SourceDistExtension::from_path` to minimize log volume:
392        // a non-compliant distribution isn't a huge problem if it's not actually being
393        // materialized into a wheel. Observe that we also allow no extension, since we expect that
394        // for directory and Git installs.
395        // NOTE: Observe that we also allow `.zip` sdists here, which are not PEP 625 compliant.
396        // This is because they were allowed on PyPI until relatively recently (2020).
397        if let Some(extension) = dist.extension()
398            && !matches!(
399                extension,
400                SourceDistExtension::TarGz | SourceDistExtension::Zip
401            )
402        {
403            if matches!(dist, SourceDist::Registry(_)) {
404                // Observe that we display a slightly different warning when the sdist comes
405                // from a registry, since that suggests that the user has inadvertently
406                // (rather than explicitly) depended on a non-compliant sdist.
407                warn_user_once!(
408                    "{dist} uses a legacy source distribution format ('.{extension}') that is not compliant with PEP 625. A future version of uv will reject this source distribution. Consider upgrading to a newer version of {package}",
409                    package = dist.name(),
410                );
411            } else {
412                warn_user_once!(
413                    "{dist} is not a standards-compliant source distribution: expected '.tar.gz' but found '.{extension}'. A future version of uv will reject source distributions that do not meet the requirements specified in PEP 625",
414                );
415            }
416        }
417
418        let built_wheel = self
419            .builder
420            .download_and_build(&BuildableSource::Dist(dist), tags, hashes, &self.client)
421            .boxed_local()
422            .await?;
423
424        // Check that the wheel is compatible with its install target.
425        //
426        // When building a build dependency for a cross-install, the build dependency needs
427        // to install and run on the host instead of the target. In this case the `tags` are already
428        // for the host instead of the target, so this check passes.
429        if !built_wheel.filename.is_compatible(tags) {
430            return if tags.is_cross() {
431                Err(Error::BuiltWheelIncompatibleTargetPlatform {
432                    filename: built_wheel.filename,
433                    python_platform: tags.python_platform().clone(),
434                    python_version: tags.python_version(),
435                })
436            } else {
437                Err(Error::BuiltWheelIncompatibleHostPlatform {
438                    filename: built_wheel.filename,
439                    python_platform: tags.python_platform().clone(),
440                    python_version: tags.python_version(),
441                })
442            };
443        }
444
445        // Acquire the advisory lock.
446        #[cfg(windows)]
447        let _lock = {
448            let lock_entry = CacheEntry::new(
449                built_wheel.target.parent().unwrap(),
450                format!(
451                    "{}.lock",
452                    built_wheel.target.file_name().unwrap().to_str().unwrap()
453                ),
454            );
455            lock_entry.lock().await.map_err(Error::CacheLock)?
456        };
457
458        // If the wheel was unzipped previously, respect it. Source distributions are
459        // cached under a unique revision ID, so unzipped directories are never stale.
460        match self.build_context.cache().resolve_link(&built_wheel.target) {
461            Ok(archive) => {
462                return Ok(LocalWheel {
463                    dist: Dist::Source(dist.clone()),
464                    archive: archive.into_boxed_path(),
465                    filename: built_wheel.filename,
466                    hashes: built_wheel.hashes,
467                    cache: built_wheel.cache_info,
468                    build: Some(built_wheel.build_info),
469                });
470            }
471            Err(err) if err.kind() == io::ErrorKind::NotFound => {}
472            Err(err) => return Err(Error::CacheRead(err)),
473        }
474
475        // Otherwise, unzip the wheel.
476        let id = self
477            .unzip_wheel(&built_wheel.path, &built_wheel.target)
478            .await?;
479
480        Ok(LocalWheel {
481            dist: Dist::Source(dist.clone()),
482            archive: self.build_context.cache().archive(&id).into_boxed_path(),
483            hashes: built_wheel.hashes,
484            filename: built_wheel.filename,
485            cache: built_wheel.cache_info,
486            build: Some(built_wheel.build_info),
487        })
488    }
489
490    /// Fetch the wheel metadata from the index, or from the cache if possible.
491    ///
492    /// While hashes will be generated in some cases, hash-checking is _not_ enforced and should
493    /// instead be enforced by the caller.
494    async fn get_wheel_metadata(
495        &self,
496        dist: &BuiltDist,
497        hashes: HashPolicy<'_>,
498    ) -> Result<ArchiveMetadata, Error> {
499        // If hash generation is enabled, and the distribution isn't hosted on a registry, get the
500        // entire wheel to ensure that the hashes are included in the response. If the distribution
501        // is hosted on an index, the hashes will be included in the simple metadata response.
502        // For hash _validation_, callers are expected to enforce the policy when retrieving the
503        // wheel.
504        //
505        // Historically, for `uv pip compile --universal`, we also generate hashes for
506        // registry-based distributions when the relevant registry doesn't provide them. This was
507        // motivated by `--find-links`. We continue that behavior (under `HashGeneration::All`) for
508        // backwards compatibility, but it's a little dubious, since we're only hashing _one_
509        // distribution here (as opposed to hashing all distributions for the version), and it may
510        // not even be a compatible distribution!
511        //
512        // TODO(charlie): Request the hashes via a separate method, to reduce the coupling in this API.
513        if hashes.is_generate(dist) {
514            let wheel = self.get_wheel(dist, hashes).await?;
515            // If the metadata was provided by the user directly, prefer it.
516            let metadata = if let Some(metadata) = self
517                .build_context
518                .dependency_metadata()
519                .get(dist.name(), Some(dist.version()))
520            {
521                metadata.clone()
522            } else {
523                wheel.metadata()?
524            };
525            let hashes = wheel.hashes;
526            return Ok(ArchiveMetadata {
527                metadata: Metadata::from_metadata23(metadata),
528                hashes,
529            });
530        }
531
532        // If the metadata was provided by the user directly, prefer it.
533        if let Some(metadata) = self
534            .build_context
535            .dependency_metadata()
536            .get(dist.name(), Some(dist.version()))
537        {
538            return Ok(ArchiveMetadata::from_metadata23(metadata.clone()));
539        }
540
541        let result = self
542            .client
543            .managed(|client| {
544                client
545                    .wheel_metadata(dist, self.build_context.capabilities())
546                    .boxed_local()
547            })
548            .await;
549
550        match result {
551            Ok(metadata) => {
552                // Validate that the metadata is consistent with the distribution.
553                Ok(ArchiveMetadata::from_metadata23(metadata))
554            }
555            Err(err) if err.is_http_streaming_unsupported() => {
556                warn!(
557                    "Streaming unsupported when fetching metadata for {dist}; downloading wheel directly ({err})"
558                );
559
560                // If the request failed due to an error that could be resolved by
561                // downloading the wheel directly, try that.
562                let wheel = self.get_wheel(dist, hashes).await?;
563                let metadata = wheel.metadata()?;
564                let hashes = wheel.hashes;
565                Ok(ArchiveMetadata {
566                    metadata: Metadata::from_metadata23(metadata),
567                    hashes,
568                })
569            }
570            Err(err) => Err(err.into()),
571        }
572    }
573
574    /// Build the wheel metadata for a source distribution, or fetch it from the cache if possible.
575    ///
576    /// The returned metadata is guaranteed to come from a distribution with a matching hash, and
577    /// no build processes will be executed for distributions with mismatched hashes.
578    pub async fn build_wheel_metadata(
579        &self,
580        source: &BuildableSource<'_>,
581        hashes: HashPolicy<'_>,
582    ) -> Result<ArchiveMetadata, Error> {
583        // If the metadata was provided by the user directly, prefer it.
584        if let Some(dist) = source.as_dist() {
585            if let Some(metadata) = self
586                .build_context
587                .dependency_metadata()
588                .get(dist.name(), dist.version())
589            {
590                // If we skipped the build, we should still resolve any Git dependencies to precise
591                // commits.
592                self.builder.resolve_revision(source, &self.client).await?;
593
594                return Ok(ArchiveMetadata::from_metadata23(metadata.clone()));
595            }
596        }
597
598        let metadata = self
599            .builder
600            .download_and_build_metadata(source, hashes, &self.client)
601            .boxed_local()
602            .await?;
603
604        Ok(metadata)
605    }
606
607    /// Return the [`RequiresDist`] from a `pyproject.toml`, if it can be statically extracted.
608    pub async fn requires_dist(
609        &self,
610        path: &Path,
611        pyproject_toml: &PyProjectToml,
612    ) -> Result<Option<RequiresDist>, Error> {
613        self.builder
614            .source_tree_requires_dist(
615                path,
616                pyproject_toml,
617                self.client.unmanaged.credentials_cache(),
618            )
619            .await
620    }
621
622    /// Stream a wheel from a URL, unzipping it into the cache as it's downloaded.
623    async fn stream_wheel(
624        &self,
625        url: DisplaySafeUrl,
626        index: Option<&IndexUrl>,
627        filename: &WheelFilename,
628        extension: WheelExtension,
629        size: Option<u64>,
630        wheel_entry: &CacheEntry,
631        dist: &BuiltDist,
632        hashes: HashPolicy<'_>,
633    ) -> Result<Archive, Error> {
634        // Acquire an advisory lock, to guard against concurrent writes.
635        #[cfg(windows)]
636        let _lock = {
637            let lock_entry = wheel_entry.with_file(format!("{}.lock", filename.stem()));
638            lock_entry.lock().await.map_err(Error::CacheLock)?
639        };
640
641        // Create an entry for the HTTP cache.
642        let http_entry = wheel_entry.with_file(format!("{}.http", filename.cache_key()));
643
644        let query_url = &url.clone();
645
646        let download = |response: reqwest::Response| {
647            async {
648                let size = size.or_else(|| content_length(&response));
649
650                let progress = self
651                    .reporter
652                    .as_ref()
653                    .map(|reporter| (reporter, reporter.on_download_start(dist.name(), size)));
654
655                let reader = response
656                    .bytes_stream()
657                    .map_err(|err| self.handle_response_errors(err))
658                    .into_async_read();
659
660                // Create a hasher for each hash algorithm.
661                let algorithms = hashes.algorithms();
662                let mut hashers = algorithms.into_iter().map(Hasher::from).collect::<Vec<_>>();
663                let mut hasher = uv_extract::hash::HashReader::new(reader.compat(), &mut hashers);
664
665                // Download and unzip the wheel to a temporary directory.
666                let temp_dir = tempfile::tempdir_in(self.build_context.cache().root())
667                    .map_err(Error::CacheWrite)?;
668
669                match progress {
670                    Some((reporter, progress)) => {
671                        let mut reader = ProgressReader::new(&mut hasher, progress, &**reporter);
672                        match extension {
673                            WheelExtension::Whl => {
674                                uv_extract::stream::unzip(query_url, &mut reader, temp_dir.path())
675                                    .await
676                                    .map_err(|err| Error::Extract(filename.to_string(), err))?;
677                            }
678                            WheelExtension::WhlZst => {
679                                uv_extract::stream::untar_zst(&mut reader, temp_dir.path())
680                                    .await
681                                    .map_err(|err| Error::Extract(filename.to_string(), err))?;
682                            }
683                        }
684                    }
685                    None => match extension {
686                        WheelExtension::Whl => {
687                            uv_extract::stream::unzip(query_url, &mut hasher, temp_dir.path())
688                                .await
689                                .map_err(|err| Error::Extract(filename.to_string(), err))?;
690                        }
691                        WheelExtension::WhlZst => {
692                            uv_extract::stream::untar_zst(&mut hasher, temp_dir.path())
693                                .await
694                                .map_err(|err| Error::Extract(filename.to_string(), err))?;
695                        }
696                    },
697                }
698
699                // If necessary, exhaust the reader to compute the hash.
700                if !hashes.is_none() {
701                    hasher.finish().await.map_err(Error::HashExhaustion)?;
702                }
703
704                // Persist the temporary directory to the directory store.
705                let id = self
706                    .build_context
707                    .cache()
708                    .persist(temp_dir.keep(), wheel_entry.path())
709                    .await
710                    .map_err(Error::CacheRead)?;
711
712                if let Some((reporter, progress)) = progress {
713                    reporter.on_download_complete(dist.name(), progress);
714                }
715
716                Ok(Archive::new(
717                    id,
718                    hashers.into_iter().map(HashDigest::from).collect(),
719                    filename.clone(),
720                ))
721            }
722            .instrument(info_span!("wheel", wheel = %dist))
723        };
724
725        // Fetch the archive from the cache, or download it if necessary.
726        let req = self.request(url.clone())?;
727
728        // Determine the cache control policy for the URL.
729        let cache_control = match self.client.unmanaged.connectivity() {
730            Connectivity::Online => {
731                if let Some(header) = index.and_then(|index| {
732                    self.build_context
733                        .locations()
734                        .artifact_cache_control_for(index)
735                }) {
736                    CacheControl::Override(header)
737                } else {
738                    CacheControl::from(
739                        self.build_context
740                            .cache()
741                            .freshness(&http_entry, Some(&filename.name), None)
742                            .map_err(Error::CacheRead)?,
743                    )
744                }
745            }
746            Connectivity::Offline => CacheControl::AllowStale,
747        };
748
749        let archive = self
750            .client
751            .managed(|client| {
752                client.cached_client().get_serde_with_retry(
753                    req,
754                    &http_entry,
755                    cache_control.clone(),
756                    download,
757                )
758            })
759            .await
760            .map_err(|err| match err {
761                CachedClientError::Callback { err, .. } => err,
762                CachedClientError::Client(err) => Error::Client(err),
763            })?;
764
765        // If the archive is missing the required hashes, or has since been removed, force a refresh.
766        let archive = Some(archive)
767            .filter(|archive| archive.has_digests(hashes))
768            .filter(|archive| archive.exists(self.build_context.cache()));
769
770        let archive = if let Some(archive) = archive {
771            archive
772        } else {
773            self.client
774                .managed(async |client| {
775                    client
776                        .cached_client()
777                        .skip_cache_with_retry(
778                            self.request(url)?,
779                            &http_entry,
780                            cache_control,
781                            download,
782                        )
783                        .await
784                        .map_err(|err| match err {
785                            CachedClientError::Callback { err, .. } => err,
786                            CachedClientError::Client(err) => Error::Client(err),
787                        })
788                })
789                .await?
790        };
791
792        Ok(archive)
793    }
794
795    /// Download a wheel from a URL, then unzip it into the cache.
796    async fn download_wheel(
797        &self,
798        url: DisplaySafeUrl,
799        index: Option<&IndexUrl>,
800        filename: &WheelFilename,
801        extension: WheelExtension,
802        size: Option<u64>,
803        wheel_entry: &CacheEntry,
804        dist: &BuiltDist,
805        hashes: HashPolicy<'_>,
806    ) -> Result<Archive, Error> {
807        // Acquire an advisory lock, to guard against concurrent writes.
808        #[cfg(windows)]
809        let _lock = {
810            let lock_entry = wheel_entry.with_file(format!("{}.lock", filename.stem()));
811            lock_entry.lock().await.map_err(Error::CacheLock)?
812        };
813
814        // Create an entry for the HTTP cache.
815        let http_entry = wheel_entry.with_file(format!("{}.http", filename.cache_key()));
816
817        let query_url = &url.clone();
818
819        let download = |response: reqwest::Response| {
820            async {
821                let size = size.or_else(|| content_length(&response));
822
823                let progress = self
824                    .reporter
825                    .as_ref()
826                    .map(|reporter| (reporter, reporter.on_download_start(dist.name(), size)));
827
828                let reader = response
829                    .bytes_stream()
830                    .map_err(|err| self.handle_response_errors(err))
831                    .into_async_read();
832
833                // Download the wheel to a temporary file.
834                let temp_file = tempfile::tempfile_in(self.build_context.cache().root())
835                    .map_err(Error::CacheWrite)?;
836                let mut writer = tokio::io::BufWriter::new(fs_err::tokio::File::from_std(
837                    // It's an unnamed file on Linux so that's the best approximation.
838                    fs_err::File::from_parts(temp_file, self.build_context.cache().root()),
839                ));
840
841                match progress {
842                    Some((reporter, progress)) => {
843                        // Wrap the reader in a progress reporter. This will report 100% progress
844                        // after the download is complete, even if we still have to unzip and hash
845                        // part of the file.
846                        let mut reader =
847                            ProgressReader::new(reader.compat(), progress, &**reporter);
848
849                        tokio::io::copy(&mut reader, &mut writer)
850                            .await
851                            .map_err(Error::CacheWrite)?;
852                    }
853                    None => {
854                        tokio::io::copy(&mut reader.compat(), &mut writer)
855                            .await
856                            .map_err(Error::CacheWrite)?;
857                    }
858                }
859
860                // Unzip the wheel to a temporary directory.
861                let temp_dir = tempfile::tempdir_in(self.build_context.cache().root())
862                    .map_err(Error::CacheWrite)?;
863                let mut file = writer.into_inner();
864                file.seek(io::SeekFrom::Start(0))
865                    .await
866                    .map_err(Error::CacheWrite)?;
867
868                // If no hashes are required, parallelize the unzip operation.
869                let hashes = if hashes.is_none() {
870                    let file = file.into_std().await;
871                    tokio::task::spawn_blocking({
872                        let target = temp_dir.path().to_owned();
873                        move || -> Result<(), uv_extract::Error> {
874                            // Unzip the wheel into a temporary directory.
875                            match extension {
876                                WheelExtension::Whl => {
877                                    uv_extract::unzip(file, &target)?;
878                                }
879                                WheelExtension::WhlZst => {
880                                    uv_extract::stream::untar_zst_file(file, &target)?;
881                                }
882                            }
883                            Ok(())
884                        }
885                    })
886                    .await?
887                    .map_err(|err| Error::Extract(filename.to_string(), err))?;
888
889                    HashDigests::empty()
890                } else {
891                    // Create a hasher for each hash algorithm.
892                    let algorithms = hashes.algorithms();
893                    let mut hashers = algorithms.into_iter().map(Hasher::from).collect::<Vec<_>>();
894                    let mut hasher = uv_extract::hash::HashReader::new(file, &mut hashers);
895
896                    match extension {
897                        WheelExtension::Whl => {
898                            uv_extract::stream::unzip(query_url, &mut hasher, temp_dir.path())
899                                .await
900                                .map_err(|err| Error::Extract(filename.to_string(), err))?;
901                        }
902                        WheelExtension::WhlZst => {
903                            uv_extract::stream::untar_zst(&mut hasher, temp_dir.path())
904                                .await
905                                .map_err(|err| Error::Extract(filename.to_string(), err))?;
906                        }
907                    }
908
909                    // If necessary, exhaust the reader to compute the hash.
910                    hasher.finish().await.map_err(Error::HashExhaustion)?;
911
912                    hashers.into_iter().map(HashDigest::from).collect()
913                };
914
915                // Persist the temporary directory to the directory store.
916                let id = self
917                    .build_context
918                    .cache()
919                    .persist(temp_dir.keep(), wheel_entry.path())
920                    .await
921                    .map_err(Error::CacheRead)?;
922
923                if let Some((reporter, progress)) = progress {
924                    reporter.on_download_complete(dist.name(), progress);
925                }
926
927                Ok(Archive::new(id, hashes, filename.clone()))
928            }
929            .instrument(info_span!("wheel", wheel = %dist))
930        };
931
932        // Fetch the archive from the cache, or download it if necessary.
933        let req = self.request(url.clone())?;
934
935        // Determine the cache control policy for the URL.
936        let cache_control = match self.client.unmanaged.connectivity() {
937            Connectivity::Online => {
938                if let Some(header) = index.and_then(|index| {
939                    self.build_context
940                        .locations()
941                        .artifact_cache_control_for(index)
942                }) {
943                    CacheControl::Override(header)
944                } else {
945                    CacheControl::from(
946                        self.build_context
947                            .cache()
948                            .freshness(&http_entry, Some(&filename.name), None)
949                            .map_err(Error::CacheRead)?,
950                    )
951                }
952            }
953            Connectivity::Offline => CacheControl::AllowStale,
954        };
955
956        let archive = self
957            .client
958            .managed(|client| {
959                client.cached_client().get_serde_with_retry(
960                    req,
961                    &http_entry,
962                    cache_control.clone(),
963                    download,
964                )
965            })
966            .await
967            .map_err(|err| match err {
968                CachedClientError::Callback { err, .. } => err,
969                CachedClientError::Client(err) => Error::Client(err),
970            })?;
971
972        // If the archive is missing the required hashes, or has since been removed, force a refresh.
973        let archive = Some(archive)
974            .filter(|archive| archive.has_digests(hashes))
975            .filter(|archive| archive.exists(self.build_context.cache()));
976
977        let archive = if let Some(archive) = archive {
978            archive
979        } else {
980            self.client
981                .managed(async |client| {
982                    client
983                        .cached_client()
984                        .skip_cache_with_retry(
985                            self.request(url)?,
986                            &http_entry,
987                            cache_control,
988                            download,
989                        )
990                        .await
991                        .map_err(|err| match err {
992                            CachedClientError::Callback { err, .. } => err,
993                            CachedClientError::Client(err) => Error::Client(err),
994                        })
995                })
996                .await?
997        };
998
999        Ok(archive)
1000    }
1001
1002    /// Load a wheel from a local path.
1003    async fn load_wheel(
1004        &self,
1005        path: &Path,
1006        filename: &WheelFilename,
1007        extension: WheelExtension,
1008        wheel_entry: CacheEntry,
1009        dist: &BuiltDist,
1010        hashes: HashPolicy<'_>,
1011    ) -> Result<LocalWheel, Error> {
1012        #[cfg(windows)]
1013        let _lock = {
1014            let lock_entry = wheel_entry.with_file(format!("{}.lock", filename.stem()));
1015            lock_entry.lock().await.map_err(Error::CacheLock)?
1016        };
1017
1018        // Determine the last-modified time of the wheel.
1019        let modified = Timestamp::from_path(path).map_err(Error::CacheRead)?;
1020
1021        // Attempt to read the archive pointer from the cache.
1022        let pointer_entry = wheel_entry.with_file(format!("{}.rev", filename.cache_key()));
1023        let pointer = LocalArchivePointer::read_from(&pointer_entry)?;
1024
1025        // Extract the archive from the pointer.
1026        let archive = pointer
1027            .filter(|pointer| pointer.is_up_to_date(modified))
1028            .map(LocalArchivePointer::into_archive)
1029            .filter(|archive| archive.has_digests(hashes));
1030
1031        // If the file is already unzipped, and the cache is up-to-date, return it.
1032        if let Some(archive) = archive {
1033            Ok(LocalWheel {
1034                dist: Dist::Built(dist.clone()),
1035                archive: self
1036                    .build_context
1037                    .cache()
1038                    .archive(&archive.id)
1039                    .into_boxed_path(),
1040                hashes: archive.hashes,
1041                filename: filename.clone(),
1042                cache: CacheInfo::from_timestamp(modified),
1043                build: None,
1044            })
1045        } else if hashes.is_none() {
1046            // Otherwise, unzip the wheel.
1047            let archive = Archive::new(
1048                self.unzip_wheel(path, wheel_entry.path()).await?,
1049                HashDigests::empty(),
1050                filename.clone(),
1051            );
1052
1053            // Write the archive pointer to the cache.
1054            let pointer = LocalArchivePointer {
1055                timestamp: modified,
1056                archive: archive.clone(),
1057            };
1058            pointer.write_to(&pointer_entry).await?;
1059
1060            Ok(LocalWheel {
1061                dist: Dist::Built(dist.clone()),
1062                archive: self
1063                    .build_context
1064                    .cache()
1065                    .archive(&archive.id)
1066                    .into_boxed_path(),
1067                hashes: archive.hashes,
1068                filename: filename.clone(),
1069                cache: CacheInfo::from_timestamp(modified),
1070                build: None,
1071            })
1072        } else {
1073            // If necessary, compute the hashes of the wheel.
1074            let file = fs_err::tokio::File::open(path)
1075                .await
1076                .map_err(Error::CacheRead)?;
1077            let temp_dir = tempfile::tempdir_in(self.build_context.cache().root())
1078                .map_err(Error::CacheWrite)?;
1079
1080            // Create a hasher for each hash algorithm.
1081            let algorithms = hashes.algorithms();
1082            let mut hashers = algorithms.into_iter().map(Hasher::from).collect::<Vec<_>>();
1083            let mut hasher = uv_extract::hash::HashReader::new(file, &mut hashers);
1084
1085            // Unzip the wheel to a temporary directory.
1086            match extension {
1087                WheelExtension::Whl => {
1088                    uv_extract::stream::unzip(path.display(), &mut hasher, temp_dir.path())
1089                        .await
1090                        .map_err(|err| Error::Extract(filename.to_string(), err))?;
1091                }
1092                WheelExtension::WhlZst => {
1093                    uv_extract::stream::untar_zst(&mut hasher, temp_dir.path())
1094                        .await
1095                        .map_err(|err| Error::Extract(filename.to_string(), err))?;
1096                }
1097            }
1098
1099            // Exhaust the reader to compute the hash.
1100            hasher.finish().await.map_err(Error::HashExhaustion)?;
1101
1102            let hashes = hashers.into_iter().map(HashDigest::from).collect();
1103
1104            // Persist the temporary directory to the directory store.
1105            let id = self
1106                .build_context
1107                .cache()
1108                .persist(temp_dir.keep(), wheel_entry.path())
1109                .await
1110                .map_err(Error::CacheWrite)?;
1111
1112            // Create an archive.
1113            let archive = Archive::new(id, hashes, filename.clone());
1114
1115            // Write the archive pointer to the cache.
1116            let pointer = LocalArchivePointer {
1117                timestamp: modified,
1118                archive: archive.clone(),
1119            };
1120            pointer.write_to(&pointer_entry).await?;
1121
1122            Ok(LocalWheel {
1123                dist: Dist::Built(dist.clone()),
1124                archive: self
1125                    .build_context
1126                    .cache()
1127                    .archive(&archive.id)
1128                    .into_boxed_path(),
1129                hashes: archive.hashes,
1130                filename: filename.clone(),
1131                cache: CacheInfo::from_timestamp(modified),
1132                build: None,
1133            })
1134        }
1135    }
1136
1137    /// Unzip a wheel into the cache, returning the path to the unzipped directory.
1138    async fn unzip_wheel(&self, path: &Path, target: &Path) -> Result<ArchiveId, Error> {
1139        let temp_dir = tokio::task::spawn_blocking({
1140            let path = path.to_owned();
1141            let root = self.build_context.cache().root().to_path_buf();
1142            move || -> Result<TempDir, Error> {
1143                // Unzip the wheel into a temporary directory.
1144                let temp_dir = tempfile::tempdir_in(root).map_err(Error::CacheWrite)?;
1145                let reader = fs_err::File::open(&path).map_err(Error::CacheWrite)?;
1146                uv_extract::unzip(reader, temp_dir.path())
1147                    .map_err(|err| Error::Extract(path.to_string_lossy().into_owned(), err))?;
1148                Ok(temp_dir)
1149            }
1150        })
1151        .await??;
1152
1153        // Persist the temporary directory to the directory store.
1154        let id = self
1155            .build_context
1156            .cache()
1157            .persist(temp_dir.keep(), target)
1158            .await
1159            .map_err(Error::CacheWrite)?;
1160
1161        Ok(id)
1162    }
1163
1164    /// Returns a GET [`reqwest::Request`] for the given URL.
1165    fn request(&self, url: DisplaySafeUrl) -> Result<reqwest::Request, reqwest::Error> {
1166        self.client
1167            .unmanaged
1168            .uncached_client(&url)
1169            .get(Url::from(url))
1170            .header(
1171                // `reqwest` defaults to accepting compressed responses.
1172                // Specify identity encoding to get consistent .whl downloading
1173                // behavior from servers. ref: https://github.com/pypa/pip/pull/1688
1174                "accept-encoding",
1175                reqwest::header::HeaderValue::from_static("identity"),
1176            )
1177            .build()
1178    }
1179
1180    /// Return the [`ManagedClient`] used by this resolver.
1181    pub fn client(&self) -> &ManagedClient<'a> {
1182        &self.client
1183    }
1184}
1185
1186/// A wrapper around `RegistryClient` that manages a concurrency limit.
1187pub struct ManagedClient<'a> {
1188    pub unmanaged: &'a RegistryClient,
1189    control: Arc<Semaphore>,
1190}
1191
1192impl<'a> ManagedClient<'a> {
1193    /// Create a new `ManagedClient` using the given client and concurrency semaphore.
1194    fn new(client: &'a RegistryClient, control: Arc<Semaphore>) -> Self {
1195        ManagedClient {
1196            unmanaged: client,
1197            control,
1198        }
1199    }
1200
1201    /// Perform a request using the client, respecting the concurrency limit.
1202    ///
1203    /// If the concurrency limit has been reached, this method will wait until a pending
1204    /// operation completes before executing the closure.
1205    pub async fn managed<F, T>(&self, f: impl FnOnce(&'a RegistryClient) -> F) -> T
1206    where
1207        F: Future<Output = T>,
1208    {
1209        let _permit = self.control.acquire().await.unwrap();
1210        f(self.unmanaged).await
1211    }
1212
1213    /// Perform a request using a client that internally manages the concurrency limit.
1214    ///
1215    /// The callback is passed the client and a semaphore. It must acquire the semaphore before
1216    /// any request through the client and drop it after.
1217    ///
1218    /// This method serves as an escape hatch for functions that may want to send multiple requests
1219    /// in parallel.
1220    pub async fn manual<F, T>(&'a self, f: impl FnOnce(&'a RegistryClient, &'a Semaphore) -> F) -> T
1221    where
1222        F: Future<Output = T>,
1223    {
1224        f(self.unmanaged, &self.control).await
1225    }
1226}
1227
1228/// Returns the value of the `Content-Length` header from the [`reqwest::Response`], if present.
1229fn content_length(response: &reqwest::Response) -> Option<u64> {
1230    response
1231        .headers()
1232        .get(reqwest::header::CONTENT_LENGTH)
1233        .and_then(|val| val.to_str().ok())
1234        .and_then(|val| val.parse::<u64>().ok())
1235}
1236
1237/// An asynchronous reader that reports progress as bytes are read.
1238struct ProgressReader<'a, R> {
1239    reader: R,
1240    index: usize,
1241    reporter: &'a dyn Reporter,
1242}
1243
1244impl<'a, R> ProgressReader<'a, R> {
1245    /// Create a new [`ProgressReader`] that wraps another reader.
1246    fn new(reader: R, index: usize, reporter: &'a dyn Reporter) -> Self {
1247        Self {
1248            reader,
1249            index,
1250            reporter,
1251        }
1252    }
1253}
1254
1255impl<R> AsyncRead for ProgressReader<'_, R>
1256where
1257    R: AsyncRead + Unpin,
1258{
1259    fn poll_read(
1260        mut self: Pin<&mut Self>,
1261        cx: &mut Context<'_>,
1262        buf: &mut ReadBuf<'_>,
1263    ) -> Poll<io::Result<()>> {
1264        Pin::new(&mut self.as_mut().reader)
1265            .poll_read(cx, buf)
1266            .map_ok(|()| {
1267                self.reporter
1268                    .on_download_progress(self.index, buf.filled().len() as u64);
1269            })
1270    }
1271}
1272
1273/// A pointer to an archive in the cache, fetched from an HTTP archive.
1274///
1275/// Encoded with `MsgPack`, and represented on disk by a `.http` file.
1276#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
1277pub struct HttpArchivePointer {
1278    archive: Archive,
1279}
1280
1281impl HttpArchivePointer {
1282    /// Read an [`HttpArchivePointer`] from the cache.
1283    pub fn read_from(path: impl AsRef<Path>) -> Result<Option<Self>, Error> {
1284        match fs_err::File::open(path.as_ref()) {
1285            Ok(file) => {
1286                let data = DataWithCachePolicy::from_reader(file)?.data;
1287                let archive = rmp_serde::from_slice::<Archive>(&data)?;
1288                Ok(Some(Self { archive }))
1289            }
1290            Err(err) if err.kind() == io::ErrorKind::NotFound => Ok(None),
1291            Err(err) => Err(Error::CacheRead(err)),
1292        }
1293    }
1294
1295    /// Return the [`Archive`] from the pointer.
1296    pub fn into_archive(self) -> Archive {
1297        self.archive
1298    }
1299
1300    /// Return the [`CacheInfo`] from the pointer.
1301    pub fn to_cache_info(&self) -> CacheInfo {
1302        CacheInfo::default()
1303    }
1304
1305    /// Return the [`BuildInfo`] from the pointer.
1306    pub fn to_build_info(&self) -> Option<BuildInfo> {
1307        None
1308    }
1309}
1310
1311/// A pointer to an archive in the cache, fetched from a local path.
1312///
1313/// Encoded with `MsgPack`, and represented on disk by a `.rev` file.
1314#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
1315pub struct LocalArchivePointer {
1316    timestamp: Timestamp,
1317    archive: Archive,
1318}
1319
1320impl LocalArchivePointer {
1321    /// Read an [`LocalArchivePointer`] from the cache.
1322    pub fn read_from(path: impl AsRef<Path>) -> Result<Option<Self>, Error> {
1323        match fs_err::read(path) {
1324            Ok(cached) => Ok(Some(rmp_serde::from_slice::<Self>(&cached)?)),
1325            Err(err) if err.kind() == io::ErrorKind::NotFound => Ok(None),
1326            Err(err) => Err(Error::CacheRead(err)),
1327        }
1328    }
1329
1330    /// Write an [`LocalArchivePointer`] to the cache.
1331    pub async fn write_to(&self, entry: &CacheEntry) -> Result<(), Error> {
1332        write_atomic(entry.path(), rmp_serde::to_vec(&self)?)
1333            .await
1334            .map_err(Error::CacheWrite)
1335    }
1336
1337    /// Returns `true` if the archive is up-to-date with the given modified timestamp.
1338    pub fn is_up_to_date(&self, modified: Timestamp) -> bool {
1339        self.timestamp == modified
1340    }
1341
1342    /// Return the [`Archive`] from the pointer.
1343    pub fn into_archive(self) -> Archive {
1344        self.archive
1345    }
1346
1347    /// Return the [`CacheInfo`] from the pointer.
1348    pub fn to_cache_info(&self) -> CacheInfo {
1349        CacheInfo::from_timestamp(self.timestamp)
1350    }
1351
1352    /// Return the [`BuildInfo`] from the pointer.
1353    pub fn to_build_info(&self) -> Option<BuildInfo> {
1354        None
1355    }
1356}
1357
1358#[derive(Debug, Clone)]
1359struct WheelTarget {
1360    /// The URL from which the wheel can be downloaded.
1361    url: DisplaySafeUrl,
1362    /// The expected extension of the wheel file.
1363    extension: WheelExtension,
1364    /// The expected size of the wheel file, if known.
1365    size: Option<u64>,
1366}
1367
1368impl TryFrom<&File> for WheelTarget {
1369    type Error = ToUrlError;
1370
1371    /// Determine the [`WheelTarget`] from a [`File`].
1372    fn try_from(file: &File) -> Result<Self, Self::Error> {
1373        let url = file.url.to_url()?;
1374        if let Some(zstd) = file.zstd.as_ref() {
1375            Ok(Self {
1376                url: add_tar_zst_extension(url),
1377                extension: WheelExtension::WhlZst,
1378                size: zstd.size,
1379            })
1380        } else {
1381            Ok(Self {
1382                url,
1383                extension: WheelExtension::Whl,
1384                size: file.size,
1385            })
1386        }
1387    }
1388}
1389
1390#[derive(Debug, Copy, Clone, PartialEq, Eq)]
1391enum WheelExtension {
1392    /// A `.whl` file.
1393    Whl,
1394    /// A `.whl.tar.zst` file.
1395    WhlZst,
1396}
1397
1398/// Add `.tar.zst` to the end of the URL path, if it doesn't already exist.
1399#[must_use]
1400fn add_tar_zst_extension(mut url: DisplaySafeUrl) -> DisplaySafeUrl {
1401    let mut path = url.path().to_string();
1402
1403    if !path.ends_with(".tar.zst") {
1404        path.push_str(".tar.zst");
1405    }
1406
1407    url.set_path(&path);
1408    url
1409}
1410
1411#[cfg(test)]
1412mod tests {
1413    use super::*;
1414
1415    #[test]
1416    fn test_add_tar_zst_extension() {
1417        let url =
1418            DisplaySafeUrl::parse("https://files.pythonhosted.org/flask-3.1.0-py3-none-any.whl")
1419                .unwrap();
1420        assert_eq!(
1421            add_tar_zst_extension(url).as_str(),
1422            "https://files.pythonhosted.org/flask-3.1.0-py3-none-any.whl.tar.zst"
1423        );
1424
1425        let url = DisplaySafeUrl::parse(
1426            "https://files.pythonhosted.org/flask-3.1.0-py3-none-any.whl.tar.zst",
1427        )
1428        .unwrap();
1429        assert_eq!(
1430            add_tar_zst_extension(url).as_str(),
1431            "https://files.pythonhosted.org/flask-3.1.0-py3-none-any.whl.tar.zst"
1432        );
1433
1434        let url = DisplaySafeUrl::parse(
1435            "https://files.pythonhosted.org/flask-3.1.0%2Bcu124-py3-none-any.whl",
1436        )
1437        .unwrap();
1438        assert_eq!(
1439            add_tar_zst_extension(url).as_str(),
1440            "https://files.pythonhosted.org/flask-3.1.0%2Bcu124-py3-none-any.whl.tar.zst"
1441        );
1442    }
1443}