Skip to main content

uv_distribution/
distribution_database.rs

1use std::future::Future;
2use std::io;
3use std::path::Path;
4use std::pin::Pin;
5use std::sync::Arc;
6use std::task::{Context, Poll};
7
8use futures::{FutureExt, TryStreamExt};
9use tempfile::TempDir;
10use tokio::io::{AsyncRead, AsyncSeekExt, ReadBuf};
11use tokio::sync::Semaphore;
12use tokio_util::compat::FuturesAsyncReadCompatExt;
13use tracing::{Instrument, info_span, instrument, warn};
14use url::Url;
15
16use uv_cache::{ArchiveId, CacheBucket, CacheEntry, WheelCache};
17use uv_cache_info::{CacheInfo, Timestamp};
18use uv_client::{
19    CacheControl, CachedClientError, Connectivity, DataWithCachePolicy, RegistryClient,
20};
21use uv_distribution_filename::{SourceDistExtension, WheelFilename};
22use uv_distribution_types::{
23    BuildInfo, BuildableSource, BuiltDist, Dist, File, HashPolicy, Hashed, IndexUrl, InstalledDist,
24    Name, SourceDist, ToUrlError,
25};
26use uv_extract::hash::Hasher;
27use uv_fs::write_atomic;
28use uv_platform_tags::Tags;
29use uv_pypi_types::{HashDigest, HashDigests, PyProjectToml};
30use uv_redacted::DisplaySafeUrl;
31use uv_types::{BuildContext, BuildStack};
32use uv_warnings::warn_user_once;
33
34use crate::archive::Archive;
35use crate::metadata::{ArchiveMetadata, Metadata};
36use crate::source::SourceDistributionBuilder;
37use crate::{Error, LocalWheel, Reporter, RequiresDist};
38
39/// A cached high-level interface to convert distributions (a requirement resolved to a location)
40/// to a wheel or wheel metadata.
41///
42/// For wheel metadata, this happens by either fetching the metadata from the remote wheel or by
43/// building the source distribution. For wheel files, either the wheel is downloaded or a source
44/// distribution is downloaded, built and the new wheel gets returned.
45///
46/// All kinds of wheel sources (index, URL, path) and source distribution source (index, URL, path,
47/// Git) are supported.
48///
49/// This struct also has the task of acquiring locks around source dist builds in general and git
50/// operation especially, as well as respecting concurrency limits.
51pub struct DistributionDatabase<'a, Context: BuildContext> {
52    build_context: &'a Context,
53    builder: SourceDistributionBuilder<'a, Context>,
54    client: ManagedClient<'a>,
55    reporter: Option<Arc<dyn Reporter>>,
56}
57
58impl<'a, Context: BuildContext> DistributionDatabase<'a, Context> {
59    pub fn new(
60        client: &'a RegistryClient,
61        build_context: &'a Context,
62        downloads_semaphore: Arc<Semaphore>,
63    ) -> Self {
64        Self {
65            build_context,
66            builder: SourceDistributionBuilder::new(build_context),
67            client: ManagedClient::new(client, downloads_semaphore),
68            reporter: None,
69        }
70    }
71
72    /// Set the build stack to use for the [`DistributionDatabase`].
73    #[must_use]
74    pub fn with_build_stack(self, build_stack: &'a BuildStack) -> Self {
75        Self {
76            builder: self.builder.with_build_stack(build_stack),
77            ..self
78        }
79    }
80
81    /// Set the [`Reporter`] to use for the [`DistributionDatabase`].
82    #[must_use]
83    pub fn with_reporter(self, reporter: Arc<dyn Reporter>) -> Self {
84        Self {
85            builder: self.builder.with_reporter(reporter.clone()),
86            reporter: Some(reporter),
87            ..self
88        }
89    }
90
91    /// Handle a specific `reqwest` error, and convert it to [`io::Error`].
92    fn handle_response_errors(&self, err: reqwest::Error) -> io::Error {
93        if err.is_timeout() {
94            // Assumption: The connect timeout with the 10s default is not the culprit.
95            io::Error::new(
96                io::ErrorKind::TimedOut,
97                format!(
98                    "Failed to download distribution due to network timeout. Try increasing UV_HTTP_TIMEOUT (current value: {}s).",
99                    self.client.unmanaged.read_timeout().as_secs()
100                ),
101            )
102        } else {
103            io::Error::other(err)
104        }
105    }
106
107    /// Either fetch the wheel or fetch and build the source distribution
108    ///
109    /// Returns a wheel that's compliant with the given platform tags.
110    ///
111    /// While hashes will be generated in some cases, hash-checking is only enforced for source
112    /// distributions, and should be enforced by the caller for wheels.
113    #[instrument(skip_all, fields(%dist))]
114    pub async fn get_or_build_wheel(
115        &self,
116        dist: &Dist,
117        tags: &Tags,
118        hashes: HashPolicy<'_>,
119    ) -> Result<LocalWheel, Error> {
120        match dist {
121            Dist::Built(built) => self.get_wheel(built, hashes).await,
122            Dist::Source(source) => self.build_wheel(source, tags, hashes).await,
123        }
124    }
125
126    /// Either fetch the only wheel metadata (directly from the index or with range requests) or
127    /// fetch and build the source distribution.
128    ///
129    /// While hashes will be generated in some cases, hash-checking is only enforced for source
130    /// distributions, and should be enforced by the caller for wheels.
131    #[instrument(skip_all, fields(%dist))]
132    pub async fn get_installed_metadata(
133        &self,
134        dist: &InstalledDist,
135    ) -> Result<ArchiveMetadata, Error> {
136        // If the metadata was provided by the user directly, prefer it.
137        if let Some(metadata) = self
138            .build_context
139            .dependency_metadata()
140            .get(dist.name(), Some(dist.version()))
141        {
142            return Ok(ArchiveMetadata::from_metadata23(metadata.clone()));
143        }
144
145        let metadata = dist
146            .read_metadata()
147            .map_err(|err| Error::ReadInstalled(Box::new(dist.clone()), err))?;
148
149        Ok(ArchiveMetadata::from_metadata23(metadata.clone()))
150    }
151
152    /// Either fetch the only wheel metadata (directly from the index or with range requests) or
153    /// fetch and build the source distribution.
154    ///
155    /// While hashes will be generated in some cases, hash-checking is only enforced for source
156    /// distributions, and should be enforced by the caller for wheels.
157    #[instrument(skip_all, fields(%dist))]
158    pub async fn get_or_build_wheel_metadata(
159        &self,
160        dist: &Dist,
161        hashes: HashPolicy<'_>,
162    ) -> Result<ArchiveMetadata, Error> {
163        match dist {
164            Dist::Built(built) => self.get_wheel_metadata(built, hashes).await,
165            Dist::Source(source) => {
166                self.build_wheel_metadata(&BuildableSource::Dist(source), hashes)
167                    .await
168            }
169        }
170    }
171
172    /// Fetch a wheel from the cache or download it from the index.
173    ///
174    /// While hashes will be generated in all cases, hash-checking is _not_ enforced and should
175    /// instead be enforced by the caller.
176    async fn get_wheel(
177        &self,
178        dist: &BuiltDist,
179        hashes: HashPolicy<'_>,
180    ) -> Result<LocalWheel, Error> {
181        match dist {
182            BuiltDist::Registry(wheels) => {
183                let wheel = wheels.best_wheel();
184                let WheelTarget {
185                    url,
186                    extension,
187                    size,
188                } = WheelTarget::try_from(&*wheel.file)?;
189
190                // Create a cache entry for the wheel.
191                let wheel_entry = self.build_context.cache().entry(
192                    CacheBucket::Wheels,
193                    WheelCache::Index(&wheel.index).wheel_dir(wheel.name().as_ref()),
194                    wheel.filename.cache_key(),
195                );
196
197                // If the URL is a file URL, load the wheel directly.
198                if url.scheme() == "file" {
199                    let path = url
200                        .to_file_path()
201                        .map_err(|()| Error::NonFileUrl(url.clone()))?;
202                    return self
203                        .load_wheel(
204                            &path,
205                            &wheel.filename,
206                            WheelExtension::Whl,
207                            wheel_entry,
208                            dist,
209                            hashes,
210                        )
211                        .await;
212                }
213
214                // Download and unzip.
215                match self
216                    .stream_wheel(
217                        url.clone(),
218                        dist.index(),
219                        &wheel.filename,
220                        extension,
221                        size,
222                        &wheel_entry,
223                        dist,
224                        hashes,
225                    )
226                    .await
227                {
228                    Ok(archive) => Ok(LocalWheel {
229                        dist: Dist::Built(dist.clone()),
230                        archive: self
231                            .build_context
232                            .cache()
233                            .archive(&archive.id)
234                            .into_boxed_path(),
235                        hashes: archive.hashes,
236                        filename: wheel.filename.clone(),
237                        cache: CacheInfo::default(),
238                        build: None,
239                    }),
240                    Err(Error::Extract(name, err)) => {
241                        if err.is_http_streaming_unsupported() {
242                            warn!(
243                                "Streaming unsupported for {dist}; downloading wheel to disk ({err})"
244                            );
245                        } else if err.is_http_streaming_failed() {
246                            warn!("Streaming failed for {dist}; downloading wheel to disk ({err})");
247                        } else {
248                            return Err(Error::Extract(name, err));
249                        }
250
251                        // If the request failed because streaming is unsupported, download the
252                        // wheel directly.
253                        let archive = self
254                            .download_wheel(
255                                url,
256                                dist.index(),
257                                &wheel.filename,
258                                extension,
259                                size,
260                                &wheel_entry,
261                                dist,
262                                hashes,
263                            )
264                            .await?;
265
266                        Ok(LocalWheel {
267                            dist: Dist::Built(dist.clone()),
268                            archive: self
269                                .build_context
270                                .cache()
271                                .archive(&archive.id)
272                                .into_boxed_path(),
273                            hashes: archive.hashes,
274                            filename: wheel.filename.clone(),
275                            cache: CacheInfo::default(),
276                            build: None,
277                        })
278                    }
279                    Err(err) => Err(err),
280                }
281            }
282
283            BuiltDist::DirectUrl(wheel) => {
284                // Create a cache entry for the wheel.
285                let wheel_entry = self.build_context.cache().entry(
286                    CacheBucket::Wheels,
287                    WheelCache::Url(&wheel.url).wheel_dir(wheel.name().as_ref()),
288                    wheel.filename.cache_key(),
289                );
290
291                // Download and unzip.
292                match self
293                    .stream_wheel(
294                        wheel.url.raw().clone(),
295                        None,
296                        &wheel.filename,
297                        WheelExtension::Whl,
298                        None,
299                        &wheel_entry,
300                        dist,
301                        hashes,
302                    )
303                    .await
304                {
305                    Ok(archive) => Ok(LocalWheel {
306                        dist: Dist::Built(dist.clone()),
307                        archive: self
308                            .build_context
309                            .cache()
310                            .archive(&archive.id)
311                            .into_boxed_path(),
312                        hashes: archive.hashes,
313                        filename: wheel.filename.clone(),
314                        cache: CacheInfo::default(),
315                        build: None,
316                    }),
317                    Err(Error::Client(err)) if err.is_http_streaming_unsupported() => {
318                        warn!(
319                            "Streaming unsupported for {dist}; downloading wheel to disk ({err})"
320                        );
321
322                        // If the request failed because streaming is unsupported, download the
323                        // wheel directly.
324                        let archive = self
325                            .download_wheel(
326                                wheel.url.raw().clone(),
327                                None,
328                                &wheel.filename,
329                                WheelExtension::Whl,
330                                None,
331                                &wheel_entry,
332                                dist,
333                                hashes,
334                            )
335                            .await?;
336                        Ok(LocalWheel {
337                            dist: Dist::Built(dist.clone()),
338                            archive: self
339                                .build_context
340                                .cache()
341                                .archive(&archive.id)
342                                .into_boxed_path(),
343                            hashes: archive.hashes,
344                            filename: wheel.filename.clone(),
345                            cache: CacheInfo::default(),
346                            build: None,
347                        })
348                    }
349                    Err(err) => Err(err),
350                }
351            }
352
353            BuiltDist::Path(wheel) => {
354                let cache_entry = self.build_context.cache().entry(
355                    CacheBucket::Wheels,
356                    WheelCache::Url(&wheel.url).wheel_dir(wheel.name().as_ref()),
357                    wheel.filename.cache_key(),
358                );
359
360                self.load_wheel(
361                    &wheel.install_path,
362                    &wheel.filename,
363                    WheelExtension::Whl,
364                    cache_entry,
365                    dist,
366                    hashes,
367                )
368                .await
369            }
370        }
371    }
372
373    /// Convert a source distribution into a wheel, fetching it from the cache or building it if
374    /// necessary.
375    ///
376    /// The returned wheel is guaranteed to come from a distribution with a matching hash, and
377    /// no build processes will be executed for distributions with mismatched hashes.
378    async fn build_wheel(
379        &self,
380        dist: &SourceDist,
381        tags: &Tags,
382        hashes: HashPolicy<'_>,
383    ) -> Result<LocalWheel, Error> {
384        // Warn if the source distribution isn't PEP 625 compliant.
385        // We do this here instead of in `SourceDistExtension::from_path` to minimize log volume:
386        // a non-compliant distribution isn't a huge problem if it's not actually being
387        // materialized into a wheel. Observe that we also allow no extension, since we expect that
388        // for directory and Git installs.
389        // NOTE: Observe that we also allow `.zip` sdists here, which are not PEP 625 compliant.
390        // This is because they were allowed on PyPI until relatively recently (2020).
391        if let Some(extension) = dist.extension()
392            && !matches!(
393                extension,
394                SourceDistExtension::TarGz | SourceDistExtension::Zip
395            )
396        {
397            if matches!(dist, SourceDist::Registry(_)) {
398                // Observe that we display a slightly different warning when the sdist comes
399                // from a registry, since that suggests that the user has inadvertently
400                // (rather than explicitly) depended on a non-compliant sdist.
401                warn_user_once!(
402                    "{dist} uses a legacy source distribution format ('.{extension}') that is not compliant with PEP 625. A future version of uv will reject this source distribution. Consider upgrading to a newer version of {package}",
403                    package = dist.name(),
404                );
405            } else {
406                warn_user_once!(
407                    "{dist} is not a standards-compliant source distribution: expected '.tar.gz' but found '.{extension}'. A future version of uv will reject source distributions that do not meet the requirements specified in PEP 625",
408                );
409            }
410        }
411
412        let built_wheel = self
413            .builder
414            .download_and_build(&BuildableSource::Dist(dist), tags, hashes, &self.client)
415            .boxed_local()
416            .await?;
417
418        // Check that the wheel is compatible with its install target.
419        //
420        // When building a build dependency for a cross-install, the build dependency needs
421        // to install and run on the host instead of the target. In this case the `tags` are already
422        // for the host instead of the target, so this check passes.
423        if !built_wheel.filename.is_compatible(tags) {
424            return if tags.is_cross() {
425                Err(Error::BuiltWheelIncompatibleTargetPlatform {
426                    filename: built_wheel.filename,
427                    python_platform: tags.python_platform().clone(),
428                    python_version: tags.python_version(),
429                })
430            } else {
431                Err(Error::BuiltWheelIncompatibleHostPlatform {
432                    filename: built_wheel.filename,
433                    python_platform: tags.python_platform().clone(),
434                    python_version: tags.python_version(),
435                })
436            };
437        }
438
439        // Acquire the advisory lock.
440        #[cfg(windows)]
441        let _lock = {
442            let lock_entry = CacheEntry::new(
443                built_wheel.target.parent().unwrap(),
444                format!(
445                    "{}.lock",
446                    built_wheel.target.file_name().unwrap().to_str().unwrap()
447                ),
448            );
449            lock_entry.lock().await.map_err(Error::CacheLock)?
450        };
451
452        // If the wheel was unzipped previously, respect it. Source distributions are
453        // cached under a unique revision ID, so unzipped directories are never stale.
454        match self.build_context.cache().resolve_link(&built_wheel.target) {
455            Ok(archive) => {
456                return Ok(LocalWheel {
457                    dist: Dist::Source(dist.clone()),
458                    archive: archive.into_boxed_path(),
459                    filename: built_wheel.filename,
460                    hashes: built_wheel.hashes,
461                    cache: built_wheel.cache_info,
462                    build: Some(built_wheel.build_info),
463                });
464            }
465            Err(err) if err.kind() == io::ErrorKind::NotFound => {}
466            Err(err) => return Err(Error::CacheRead(err)),
467        }
468
469        // Otherwise, unzip the wheel.
470        let id = self
471            .unzip_wheel(&built_wheel.path, &built_wheel.target)
472            .await?;
473
474        Ok(LocalWheel {
475            dist: Dist::Source(dist.clone()),
476            archive: self.build_context.cache().archive(&id).into_boxed_path(),
477            hashes: built_wheel.hashes,
478            filename: built_wheel.filename,
479            cache: built_wheel.cache_info,
480            build: Some(built_wheel.build_info),
481        })
482    }
483
484    /// Fetch the wheel metadata from the index, or from the cache if possible.
485    ///
486    /// While hashes will be generated in some cases, hash-checking is _not_ enforced and should
487    /// instead be enforced by the caller.
488    async fn get_wheel_metadata(
489        &self,
490        dist: &BuiltDist,
491        hashes: HashPolicy<'_>,
492    ) -> Result<ArchiveMetadata, Error> {
493        // If hash generation is enabled, and the distribution isn't hosted on a registry, get the
494        // entire wheel to ensure that the hashes are included in the response. If the distribution
495        // is hosted on an index, the hashes will be included in the simple metadata response.
496        // For hash _validation_, callers are expected to enforce the policy when retrieving the
497        // wheel.
498        //
499        // Historically, for `uv pip compile --universal`, we also generate hashes for
500        // registry-based distributions when the relevant registry doesn't provide them. This was
501        // motivated by `--find-links`. We continue that behavior (under `HashGeneration::All`) for
502        // backwards compatibility, but it's a little dubious, since we're only hashing _one_
503        // distribution here (as opposed to hashing all distributions for the version), and it may
504        // not even be a compatible distribution!
505        //
506        // TODO(charlie): Request the hashes via a separate method, to reduce the coupling in this API.
507        if hashes.is_generate(dist) {
508            let wheel = self.get_wheel(dist, hashes).await?;
509            // If the metadata was provided by the user directly, prefer it.
510            let metadata = if let Some(metadata) = self
511                .build_context
512                .dependency_metadata()
513                .get(dist.name(), Some(dist.version()))
514            {
515                metadata.clone()
516            } else {
517                wheel.metadata()?
518            };
519            let hashes = wheel.hashes;
520            return Ok(ArchiveMetadata {
521                metadata: Metadata::from_metadata23(metadata),
522                hashes,
523            });
524        }
525
526        // If the metadata was provided by the user directly, prefer it.
527        if let Some(metadata) = self
528            .build_context
529            .dependency_metadata()
530            .get(dist.name(), Some(dist.version()))
531        {
532            return Ok(ArchiveMetadata::from_metadata23(metadata.clone()));
533        }
534
535        let result = self
536            .client
537            .managed(|client| {
538                client
539                    .wheel_metadata(dist, self.build_context.capabilities())
540                    .boxed_local()
541            })
542            .await;
543
544        match result {
545            Ok(metadata) => {
546                // Validate that the metadata is consistent with the distribution.
547                Ok(ArchiveMetadata::from_metadata23(metadata))
548            }
549            Err(err) if err.is_http_streaming_unsupported() => {
550                warn!(
551                    "Streaming unsupported when fetching metadata for {dist}; downloading wheel directly ({err})"
552                );
553
554                // If the request failed due to an error that could be resolved by
555                // downloading the wheel directly, try that.
556                let wheel = self.get_wheel(dist, hashes).await?;
557                let metadata = wheel.metadata()?;
558                let hashes = wheel.hashes;
559                Ok(ArchiveMetadata {
560                    metadata: Metadata::from_metadata23(metadata),
561                    hashes,
562                })
563            }
564            Err(err) => Err(err.into()),
565        }
566    }
567
568    /// Build the wheel metadata for a source distribution, or fetch it from the cache if possible.
569    ///
570    /// The returned metadata is guaranteed to come from a distribution with a matching hash, and
571    /// no build processes will be executed for distributions with mismatched hashes.
572    pub async fn build_wheel_metadata(
573        &self,
574        source: &BuildableSource<'_>,
575        hashes: HashPolicy<'_>,
576    ) -> Result<ArchiveMetadata, Error> {
577        // If the metadata was provided by the user directly, prefer it.
578        if let Some(dist) = source.as_dist() {
579            if let Some(metadata) = self
580                .build_context
581                .dependency_metadata()
582                .get(dist.name(), dist.version())
583            {
584                // If we skipped the build, we should still resolve any Git dependencies to precise
585                // commits.
586                self.builder.resolve_revision(source, &self.client).await?;
587
588                return Ok(ArchiveMetadata::from_metadata23(metadata.clone()));
589            }
590        }
591
592        let metadata = self
593            .builder
594            .download_and_build_metadata(source, hashes, &self.client)
595            .boxed_local()
596            .await?;
597
598        Ok(metadata)
599    }
600
601    /// Return the [`RequiresDist`] from a `pyproject.toml`, if it can be statically extracted.
602    pub async fn requires_dist(
603        &self,
604        path: &Path,
605        pyproject_toml: &PyProjectToml,
606    ) -> Result<Option<RequiresDist>, Error> {
607        self.builder
608            .source_tree_requires_dist(
609                path,
610                pyproject_toml,
611                self.client.unmanaged.credentials_cache(),
612            )
613            .await
614    }
615
616    /// Stream a wheel from a URL, unzipping it into the cache as it's downloaded.
617    async fn stream_wheel(
618        &self,
619        url: DisplaySafeUrl,
620        index: Option<&IndexUrl>,
621        filename: &WheelFilename,
622        extension: WheelExtension,
623        size: Option<u64>,
624        wheel_entry: &CacheEntry,
625        dist: &BuiltDist,
626        hashes: HashPolicy<'_>,
627    ) -> Result<Archive, Error> {
628        // Acquire an advisory lock, to guard against concurrent writes.
629        #[cfg(windows)]
630        let _lock = {
631            let lock_entry = wheel_entry.with_file(format!("{}.lock", filename.stem()));
632            lock_entry.lock().await.map_err(Error::CacheLock)?
633        };
634
635        // Create an entry for the HTTP cache.
636        let http_entry = wheel_entry.with_file(format!("{}.http", filename.cache_key()));
637
638        let query_url = &url.clone();
639
640        let download = |response: reqwest::Response| {
641            async {
642                let size = size.or_else(|| content_length(&response));
643
644                let progress = self
645                    .reporter
646                    .as_ref()
647                    .map(|reporter| (reporter, reporter.on_download_start(dist.name(), size)));
648
649                let reader = response
650                    .bytes_stream()
651                    .map_err(|err| self.handle_response_errors(err))
652                    .into_async_read();
653
654                // Create a hasher for each hash algorithm.
655                let algorithms = hashes.algorithms();
656                let mut hashers = algorithms.into_iter().map(Hasher::from).collect::<Vec<_>>();
657                let mut hasher = uv_extract::hash::HashReader::new(reader.compat(), &mut hashers);
658
659                // Download and unzip the wheel to a temporary directory.
660                let temp_dir = tempfile::tempdir_in(self.build_context.cache().root())
661                    .map_err(Error::CacheWrite)?;
662
663                match progress {
664                    Some((reporter, progress)) => {
665                        let mut reader = ProgressReader::new(&mut hasher, progress, &**reporter);
666                        match extension {
667                            WheelExtension::Whl => {
668                                uv_extract::stream::unzip(query_url, &mut reader, temp_dir.path())
669                                    .await
670                                    .map_err(|err| Error::Extract(filename.to_string(), err))?;
671                            }
672                            WheelExtension::WhlZst => {
673                                uv_extract::stream::untar_zst(&mut reader, temp_dir.path())
674                                    .await
675                                    .map_err(|err| Error::Extract(filename.to_string(), err))?;
676                            }
677                        }
678                    }
679                    None => match extension {
680                        WheelExtension::Whl => {
681                            uv_extract::stream::unzip(query_url, &mut hasher, temp_dir.path())
682                                .await
683                                .map_err(|err| Error::Extract(filename.to_string(), err))?;
684                        }
685                        WheelExtension::WhlZst => {
686                            uv_extract::stream::untar_zst(&mut hasher, temp_dir.path())
687                                .await
688                                .map_err(|err| Error::Extract(filename.to_string(), err))?;
689                        }
690                    },
691                }
692
693                // If necessary, exhaust the reader to compute the hash.
694                if !hashes.is_none() {
695                    hasher.finish().await.map_err(Error::HashExhaustion)?;
696                }
697
698                // Persist the temporary directory to the directory store.
699                let id = self
700                    .build_context
701                    .cache()
702                    .persist(temp_dir.keep(), wheel_entry.path())
703                    .await
704                    .map_err(Error::CacheRead)?;
705
706                if let Some((reporter, progress)) = progress {
707                    reporter.on_download_complete(dist.name(), progress);
708                }
709
710                Ok(Archive::new(
711                    id,
712                    hashers.into_iter().map(HashDigest::from).collect(),
713                    filename.clone(),
714                ))
715            }
716            .instrument(info_span!("wheel", wheel = %dist))
717        };
718
719        // Fetch the archive from the cache, or download it if necessary.
720        let req = self.request(url.clone())?;
721
722        // Determine the cache control policy for the URL.
723        let cache_control = match self.client.unmanaged.connectivity() {
724            Connectivity::Online => {
725                if let Some(header) = index.and_then(|index| {
726                    self.build_context
727                        .locations()
728                        .artifact_cache_control_for(index)
729                }) {
730                    CacheControl::Override(header)
731                } else {
732                    CacheControl::from(
733                        self.build_context
734                            .cache()
735                            .freshness(&http_entry, Some(&filename.name), None)
736                            .map_err(Error::CacheRead)?,
737                    )
738                }
739            }
740            Connectivity::Offline => CacheControl::AllowStale,
741        };
742
743        let archive = self
744            .client
745            .managed(|client| {
746                client.cached_client().get_serde_with_retry(
747                    req,
748                    &http_entry,
749                    cache_control.clone(),
750                    download,
751                )
752            })
753            .await
754            .map_err(|err| match err {
755                CachedClientError::Callback { err, .. } => err,
756                CachedClientError::Client(err) => Error::Client(err),
757            })?;
758
759        // If the archive is missing the required hashes, or has since been removed, force a refresh.
760        let archive = Some(archive)
761            .filter(|archive| archive.has_digests(hashes))
762            .filter(|archive| archive.exists(self.build_context.cache()));
763
764        let archive = if let Some(archive) = archive {
765            archive
766        } else {
767            self.client
768                .managed(async |client| {
769                    client
770                        .cached_client()
771                        .skip_cache_with_retry(
772                            self.request(url)?,
773                            &http_entry,
774                            cache_control,
775                            download,
776                        )
777                        .await
778                        .map_err(|err| match err {
779                            CachedClientError::Callback { err, .. } => err,
780                            CachedClientError::Client(err) => Error::Client(err),
781                        })
782                })
783                .await?
784        };
785
786        Ok(archive)
787    }
788
789    /// Download a wheel from a URL, then unzip it into the cache.
790    async fn download_wheel(
791        &self,
792        url: DisplaySafeUrl,
793        index: Option<&IndexUrl>,
794        filename: &WheelFilename,
795        extension: WheelExtension,
796        size: Option<u64>,
797        wheel_entry: &CacheEntry,
798        dist: &BuiltDist,
799        hashes: HashPolicy<'_>,
800    ) -> Result<Archive, Error> {
801        // Acquire an advisory lock, to guard against concurrent writes.
802        #[cfg(windows)]
803        let _lock = {
804            let lock_entry = wheel_entry.with_file(format!("{}.lock", filename.stem()));
805            lock_entry.lock().await.map_err(Error::CacheLock)?
806        };
807
808        // Create an entry for the HTTP cache.
809        let http_entry = wheel_entry.with_file(format!("{}.http", filename.cache_key()));
810
811        let query_url = &url.clone();
812
813        let download = |response: reqwest::Response| {
814            async {
815                let size = size.or_else(|| content_length(&response));
816
817                let progress = self
818                    .reporter
819                    .as_ref()
820                    .map(|reporter| (reporter, reporter.on_download_start(dist.name(), size)));
821
822                let reader = response
823                    .bytes_stream()
824                    .map_err(|err| self.handle_response_errors(err))
825                    .into_async_read();
826
827                // Download the wheel to a temporary file.
828                let temp_file = tempfile::tempfile_in(self.build_context.cache().root())
829                    .map_err(Error::CacheWrite)?;
830                let mut writer = tokio::io::BufWriter::new(fs_err::tokio::File::from_std(
831                    // It's an unnamed file on Linux so that's the best approximation.
832                    fs_err::File::from_parts(temp_file, self.build_context.cache().root()),
833                ));
834
835                match progress {
836                    Some((reporter, progress)) => {
837                        // Wrap the reader in a progress reporter. This will report 100% progress
838                        // after the download is complete, even if we still have to unzip and hash
839                        // part of the file.
840                        let mut reader =
841                            ProgressReader::new(reader.compat(), progress, &**reporter);
842
843                        tokio::io::copy(&mut reader, &mut writer)
844                            .await
845                            .map_err(Error::CacheWrite)?;
846                    }
847                    None => {
848                        tokio::io::copy(&mut reader.compat(), &mut writer)
849                            .await
850                            .map_err(Error::CacheWrite)?;
851                    }
852                }
853
854                // Unzip the wheel to a temporary directory.
855                let temp_dir = tempfile::tempdir_in(self.build_context.cache().root())
856                    .map_err(Error::CacheWrite)?;
857                let mut file = writer.into_inner();
858                file.seek(io::SeekFrom::Start(0))
859                    .await
860                    .map_err(Error::CacheWrite)?;
861
862                // If no hashes are required, parallelize the unzip operation.
863                let hashes = if hashes.is_none() {
864                    let file = file.into_std().await;
865                    tokio::task::spawn_blocking({
866                        let target = temp_dir.path().to_owned();
867                        move || -> Result<(), uv_extract::Error> {
868                            // Unzip the wheel into a temporary directory.
869                            match extension {
870                                WheelExtension::Whl => {
871                                    uv_extract::unzip(file, &target)?;
872                                }
873                                WheelExtension::WhlZst => {
874                                    uv_extract::stream::untar_zst_file(file, &target)?;
875                                }
876                            }
877                            Ok(())
878                        }
879                    })
880                    .await?
881                    .map_err(|err| Error::Extract(filename.to_string(), err))?;
882
883                    HashDigests::empty()
884                } else {
885                    // Create a hasher for each hash algorithm.
886                    let algorithms = hashes.algorithms();
887                    let mut hashers = algorithms.into_iter().map(Hasher::from).collect::<Vec<_>>();
888                    let mut hasher = uv_extract::hash::HashReader::new(file, &mut hashers);
889
890                    match extension {
891                        WheelExtension::Whl => {
892                            uv_extract::stream::unzip(query_url, &mut hasher, temp_dir.path())
893                                .await
894                                .map_err(|err| Error::Extract(filename.to_string(), err))?;
895                        }
896                        WheelExtension::WhlZst => {
897                            uv_extract::stream::untar_zst(&mut hasher, temp_dir.path())
898                                .await
899                                .map_err(|err| Error::Extract(filename.to_string(), err))?;
900                        }
901                    }
902
903                    // If necessary, exhaust the reader to compute the hash.
904                    hasher.finish().await.map_err(Error::HashExhaustion)?;
905
906                    hashers.into_iter().map(HashDigest::from).collect()
907                };
908
909                // Persist the temporary directory to the directory store.
910                let id = self
911                    .build_context
912                    .cache()
913                    .persist(temp_dir.keep(), wheel_entry.path())
914                    .await
915                    .map_err(Error::CacheRead)?;
916
917                if let Some((reporter, progress)) = progress {
918                    reporter.on_download_complete(dist.name(), progress);
919                }
920
921                Ok(Archive::new(id, hashes, filename.clone()))
922            }
923            .instrument(info_span!("wheel", wheel = %dist))
924        };
925
926        // Fetch the archive from the cache, or download it if necessary.
927        let req = self.request(url.clone())?;
928
929        // Determine the cache control policy for the URL.
930        let cache_control = match self.client.unmanaged.connectivity() {
931            Connectivity::Online => {
932                if let Some(header) = index.and_then(|index| {
933                    self.build_context
934                        .locations()
935                        .artifact_cache_control_for(index)
936                }) {
937                    CacheControl::Override(header)
938                } else {
939                    CacheControl::from(
940                        self.build_context
941                            .cache()
942                            .freshness(&http_entry, Some(&filename.name), None)
943                            .map_err(Error::CacheRead)?,
944                    )
945                }
946            }
947            Connectivity::Offline => CacheControl::AllowStale,
948        };
949
950        let archive = self
951            .client
952            .managed(|client| {
953                client.cached_client().get_serde_with_retry(
954                    req,
955                    &http_entry,
956                    cache_control.clone(),
957                    download,
958                )
959            })
960            .await
961            .map_err(|err| match err {
962                CachedClientError::Callback { err, .. } => err,
963                CachedClientError::Client(err) => Error::Client(err),
964            })?;
965
966        // If the archive is missing the required hashes, or has since been removed, force a refresh.
967        let archive = Some(archive)
968            .filter(|archive| archive.has_digests(hashes))
969            .filter(|archive| archive.exists(self.build_context.cache()));
970
971        let archive = if let Some(archive) = archive {
972            archive
973        } else {
974            self.client
975                .managed(async |client| {
976                    client
977                        .cached_client()
978                        .skip_cache_with_retry(
979                            self.request(url)?,
980                            &http_entry,
981                            cache_control,
982                            download,
983                        )
984                        .await
985                        .map_err(|err| match err {
986                            CachedClientError::Callback { err, .. } => err,
987                            CachedClientError::Client(err) => Error::Client(err),
988                        })
989                })
990                .await?
991        };
992
993        Ok(archive)
994    }
995
996    /// Load a wheel from a local path.
997    async fn load_wheel(
998        &self,
999        path: &Path,
1000        filename: &WheelFilename,
1001        extension: WheelExtension,
1002        wheel_entry: CacheEntry,
1003        dist: &BuiltDist,
1004        hashes: HashPolicy<'_>,
1005    ) -> Result<LocalWheel, Error> {
1006        #[cfg(windows)]
1007        let _lock = {
1008            let lock_entry = wheel_entry.with_file(format!("{}.lock", filename.stem()));
1009            lock_entry.lock().await.map_err(Error::CacheLock)?
1010        };
1011
1012        // Determine the last-modified time of the wheel.
1013        let modified = Timestamp::from_path(path).map_err(Error::CacheRead)?;
1014
1015        // Attempt to read the archive pointer from the cache.
1016        let pointer_entry = wheel_entry.with_file(format!("{}.rev", filename.cache_key()));
1017        let pointer = LocalArchivePointer::read_from(&pointer_entry)?;
1018
1019        // Extract the archive from the pointer.
1020        let archive = pointer
1021            .filter(|pointer| pointer.is_up_to_date(modified))
1022            .map(LocalArchivePointer::into_archive)
1023            .filter(|archive| archive.has_digests(hashes));
1024
1025        // If the file is already unzipped, and the cache is up-to-date, return it.
1026        if let Some(archive) = archive {
1027            Ok(LocalWheel {
1028                dist: Dist::Built(dist.clone()),
1029                archive: self
1030                    .build_context
1031                    .cache()
1032                    .archive(&archive.id)
1033                    .into_boxed_path(),
1034                hashes: archive.hashes,
1035                filename: filename.clone(),
1036                cache: CacheInfo::from_timestamp(modified),
1037                build: None,
1038            })
1039        } else if hashes.is_none() {
1040            // Otherwise, unzip the wheel.
1041            let archive = Archive::new(
1042                self.unzip_wheel(path, wheel_entry.path()).await?,
1043                HashDigests::empty(),
1044                filename.clone(),
1045            );
1046
1047            // Write the archive pointer to the cache.
1048            let pointer = LocalArchivePointer {
1049                timestamp: modified,
1050                archive: archive.clone(),
1051            };
1052            pointer.write_to(&pointer_entry).await?;
1053
1054            Ok(LocalWheel {
1055                dist: Dist::Built(dist.clone()),
1056                archive: self
1057                    .build_context
1058                    .cache()
1059                    .archive(&archive.id)
1060                    .into_boxed_path(),
1061                hashes: archive.hashes,
1062                filename: filename.clone(),
1063                cache: CacheInfo::from_timestamp(modified),
1064                build: None,
1065            })
1066        } else {
1067            // If necessary, compute the hashes of the wheel.
1068            let file = fs_err::tokio::File::open(path)
1069                .await
1070                .map_err(Error::CacheRead)?;
1071            let temp_dir = tempfile::tempdir_in(self.build_context.cache().root())
1072                .map_err(Error::CacheWrite)?;
1073
1074            // Create a hasher for each hash algorithm.
1075            let algorithms = hashes.algorithms();
1076            let mut hashers = algorithms.into_iter().map(Hasher::from).collect::<Vec<_>>();
1077            let mut hasher = uv_extract::hash::HashReader::new(file, &mut hashers);
1078
1079            // Unzip the wheel to a temporary directory.
1080            match extension {
1081                WheelExtension::Whl => {
1082                    uv_extract::stream::unzip(path.display(), &mut hasher, temp_dir.path())
1083                        .await
1084                        .map_err(|err| Error::Extract(filename.to_string(), err))?;
1085                }
1086                WheelExtension::WhlZst => {
1087                    uv_extract::stream::untar_zst(&mut hasher, temp_dir.path())
1088                        .await
1089                        .map_err(|err| Error::Extract(filename.to_string(), err))?;
1090                }
1091            }
1092
1093            // Exhaust the reader to compute the hash.
1094            hasher.finish().await.map_err(Error::HashExhaustion)?;
1095
1096            let hashes = hashers.into_iter().map(HashDigest::from).collect();
1097
1098            // Persist the temporary directory to the directory store.
1099            let id = self
1100                .build_context
1101                .cache()
1102                .persist(temp_dir.keep(), wheel_entry.path())
1103                .await
1104                .map_err(Error::CacheWrite)?;
1105
1106            // Create an archive.
1107            let archive = Archive::new(id, hashes, filename.clone());
1108
1109            // Write the archive pointer to the cache.
1110            let pointer = LocalArchivePointer {
1111                timestamp: modified,
1112                archive: archive.clone(),
1113            };
1114            pointer.write_to(&pointer_entry).await?;
1115
1116            Ok(LocalWheel {
1117                dist: Dist::Built(dist.clone()),
1118                archive: self
1119                    .build_context
1120                    .cache()
1121                    .archive(&archive.id)
1122                    .into_boxed_path(),
1123                hashes: archive.hashes,
1124                filename: filename.clone(),
1125                cache: CacheInfo::from_timestamp(modified),
1126                build: None,
1127            })
1128        }
1129    }
1130
1131    /// Unzip a wheel into the cache, returning the path to the unzipped directory.
1132    async fn unzip_wheel(&self, path: &Path, target: &Path) -> Result<ArchiveId, Error> {
1133        let temp_dir = tokio::task::spawn_blocking({
1134            let path = path.to_owned();
1135            let root = self.build_context.cache().root().to_path_buf();
1136            move || -> Result<TempDir, Error> {
1137                // Unzip the wheel into a temporary directory.
1138                let temp_dir = tempfile::tempdir_in(root).map_err(Error::CacheWrite)?;
1139                let reader = fs_err::File::open(&path).map_err(Error::CacheWrite)?;
1140                uv_extract::unzip(reader, temp_dir.path())
1141                    .map_err(|err| Error::Extract(path.to_string_lossy().into_owned(), err))?;
1142                Ok(temp_dir)
1143            }
1144        })
1145        .await??;
1146
1147        // Persist the temporary directory to the directory store.
1148        let id = self
1149            .build_context
1150            .cache()
1151            .persist(temp_dir.keep(), target)
1152            .await
1153            .map_err(Error::CacheWrite)?;
1154
1155        Ok(id)
1156    }
1157
1158    /// Returns a GET [`reqwest::Request`] for the given URL.
1159    fn request(&self, url: DisplaySafeUrl) -> Result<reqwest::Request, reqwest::Error> {
1160        self.client
1161            .unmanaged
1162            .uncached_client(&url)
1163            .get(Url::from(url))
1164            .header(
1165                // `reqwest` defaults to accepting compressed responses.
1166                // Specify identity encoding to get consistent .whl downloading
1167                // behavior from servers. ref: https://github.com/pypa/pip/pull/1688
1168                "accept-encoding",
1169                reqwest::header::HeaderValue::from_static("identity"),
1170            )
1171            .build()
1172    }
1173
1174    /// Return the [`ManagedClient`] used by this resolver.
1175    pub fn client(&self) -> &ManagedClient<'a> {
1176        &self.client
1177    }
1178}
1179
1180/// A wrapper around `RegistryClient` that manages a concurrency limit.
1181pub struct ManagedClient<'a> {
1182    pub unmanaged: &'a RegistryClient,
1183    control: Arc<Semaphore>,
1184}
1185
1186impl<'a> ManagedClient<'a> {
1187    /// Create a new `ManagedClient` using the given client and concurrency semaphore.
1188    fn new(client: &'a RegistryClient, control: Arc<Semaphore>) -> Self {
1189        ManagedClient {
1190            unmanaged: client,
1191            control,
1192        }
1193    }
1194
1195    /// Perform a request using the client, respecting the concurrency limit.
1196    ///
1197    /// If the concurrency limit has been reached, this method will wait until a pending
1198    /// operation completes before executing the closure.
1199    pub async fn managed<F, T>(&self, f: impl FnOnce(&'a RegistryClient) -> F) -> T
1200    where
1201        F: Future<Output = T>,
1202    {
1203        let _permit = self.control.acquire().await.unwrap();
1204        f(self.unmanaged).await
1205    }
1206
1207    /// Perform a request using a client that internally manages the concurrency limit.
1208    ///
1209    /// The callback is passed the client and a semaphore. It must acquire the semaphore before
1210    /// any request through the client and drop it after.
1211    ///
1212    /// This method serves as an escape hatch for functions that may want to send multiple requests
1213    /// in parallel.
1214    pub async fn manual<F, T>(&'a self, f: impl FnOnce(&'a RegistryClient, &'a Semaphore) -> F) -> T
1215    where
1216        F: Future<Output = T>,
1217    {
1218        f(self.unmanaged, &self.control).await
1219    }
1220}
1221
1222/// Returns the value of the `Content-Length` header from the [`reqwest::Response`], if present.
1223fn content_length(response: &reqwest::Response) -> Option<u64> {
1224    response
1225        .headers()
1226        .get(reqwest::header::CONTENT_LENGTH)
1227        .and_then(|val| val.to_str().ok())
1228        .and_then(|val| val.parse::<u64>().ok())
1229}
1230
1231/// An asynchronous reader that reports progress as bytes are read.
1232struct ProgressReader<'a, R> {
1233    reader: R,
1234    index: usize,
1235    reporter: &'a dyn Reporter,
1236}
1237
1238impl<'a, R> ProgressReader<'a, R> {
1239    /// Create a new [`ProgressReader`] that wraps another reader.
1240    fn new(reader: R, index: usize, reporter: &'a dyn Reporter) -> Self {
1241        Self {
1242            reader,
1243            index,
1244            reporter,
1245        }
1246    }
1247}
1248
1249impl<R> AsyncRead for ProgressReader<'_, R>
1250where
1251    R: AsyncRead + Unpin,
1252{
1253    fn poll_read(
1254        mut self: Pin<&mut Self>,
1255        cx: &mut Context<'_>,
1256        buf: &mut ReadBuf<'_>,
1257    ) -> Poll<io::Result<()>> {
1258        Pin::new(&mut self.as_mut().reader)
1259            .poll_read(cx, buf)
1260            .map_ok(|()| {
1261                self.reporter
1262                    .on_download_progress(self.index, buf.filled().len() as u64);
1263            })
1264    }
1265}
1266
1267/// A pointer to an archive in the cache, fetched from an HTTP archive.
1268///
1269/// Encoded with `MsgPack`, and represented on disk by a `.http` file.
1270#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
1271pub struct HttpArchivePointer {
1272    archive: Archive,
1273}
1274
1275impl HttpArchivePointer {
1276    /// Read an [`HttpArchivePointer`] from the cache.
1277    pub fn read_from(path: impl AsRef<Path>) -> Result<Option<Self>, Error> {
1278        match fs_err::File::open(path.as_ref()) {
1279            Ok(file) => {
1280                let data = DataWithCachePolicy::from_reader(file)?.data;
1281                let archive = rmp_serde::from_slice::<Archive>(&data)?;
1282                Ok(Some(Self { archive }))
1283            }
1284            Err(err) if err.kind() == io::ErrorKind::NotFound => Ok(None),
1285            Err(err) => Err(Error::CacheRead(err)),
1286        }
1287    }
1288
1289    /// Return the [`Archive`] from the pointer.
1290    pub fn into_archive(self) -> Archive {
1291        self.archive
1292    }
1293
1294    /// Return the [`CacheInfo`] from the pointer.
1295    pub fn to_cache_info(&self) -> CacheInfo {
1296        CacheInfo::default()
1297    }
1298
1299    /// Return the [`BuildInfo`] from the pointer.
1300    pub fn to_build_info(&self) -> Option<BuildInfo> {
1301        None
1302    }
1303}
1304
1305/// A pointer to an archive in the cache, fetched from a local path.
1306///
1307/// Encoded with `MsgPack`, and represented on disk by a `.rev` file.
1308#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
1309pub struct LocalArchivePointer {
1310    timestamp: Timestamp,
1311    archive: Archive,
1312}
1313
1314impl LocalArchivePointer {
1315    /// Read an [`LocalArchivePointer`] from the cache.
1316    pub fn read_from(path: impl AsRef<Path>) -> Result<Option<Self>, Error> {
1317        match fs_err::read(path) {
1318            Ok(cached) => Ok(Some(rmp_serde::from_slice::<Self>(&cached)?)),
1319            Err(err) if err.kind() == io::ErrorKind::NotFound => Ok(None),
1320            Err(err) => Err(Error::CacheRead(err)),
1321        }
1322    }
1323
1324    /// Write an [`LocalArchivePointer`] to the cache.
1325    pub async fn write_to(&self, entry: &CacheEntry) -> Result<(), Error> {
1326        write_atomic(entry.path(), rmp_serde::to_vec(&self)?)
1327            .await
1328            .map_err(Error::CacheWrite)
1329    }
1330
1331    /// Returns `true` if the archive is up-to-date with the given modified timestamp.
1332    pub fn is_up_to_date(&self, modified: Timestamp) -> bool {
1333        self.timestamp == modified
1334    }
1335
1336    /// Return the [`Archive`] from the pointer.
1337    pub fn into_archive(self) -> Archive {
1338        self.archive
1339    }
1340
1341    /// Return the [`CacheInfo`] from the pointer.
1342    pub fn to_cache_info(&self) -> CacheInfo {
1343        CacheInfo::from_timestamp(self.timestamp)
1344    }
1345
1346    /// Return the [`BuildInfo`] from the pointer.
1347    pub fn to_build_info(&self) -> Option<BuildInfo> {
1348        None
1349    }
1350}
1351
1352#[derive(Debug, Clone)]
1353struct WheelTarget {
1354    /// The URL from which the wheel can be downloaded.
1355    url: DisplaySafeUrl,
1356    /// The expected extension of the wheel file.
1357    extension: WheelExtension,
1358    /// The expected size of the wheel file, if known.
1359    size: Option<u64>,
1360}
1361
1362impl TryFrom<&File> for WheelTarget {
1363    type Error = ToUrlError;
1364
1365    /// Determine the [`WheelTarget`] from a [`File`].
1366    fn try_from(file: &File) -> Result<Self, Self::Error> {
1367        let url = file.url.to_url()?;
1368        if let Some(zstd) = file.zstd.as_ref() {
1369            Ok(Self {
1370                url: add_tar_zst_extension(url),
1371                extension: WheelExtension::WhlZst,
1372                size: zstd.size,
1373            })
1374        } else {
1375            Ok(Self {
1376                url,
1377                extension: WheelExtension::Whl,
1378                size: file.size,
1379            })
1380        }
1381    }
1382}
1383
1384#[derive(Debug, Copy, Clone, PartialEq, Eq)]
1385enum WheelExtension {
1386    /// A `.whl` file.
1387    Whl,
1388    /// A `.whl.tar.zst` file.
1389    WhlZst,
1390}
1391
1392/// Add `.tar.zst` to the end of the URL path, if it doesn't already exist.
1393#[must_use]
1394fn add_tar_zst_extension(mut url: DisplaySafeUrl) -> DisplaySafeUrl {
1395    let mut path = url.path().to_string();
1396
1397    if !path.ends_with(".tar.zst") {
1398        path.push_str(".tar.zst");
1399    }
1400
1401    url.set_path(&path);
1402    url
1403}
1404
1405#[cfg(test)]
1406mod tests {
1407    use super::*;
1408
1409    #[test]
1410    fn test_add_tar_zst_extension() {
1411        let url =
1412            DisplaySafeUrl::parse("https://files.pythonhosted.org/flask-3.1.0-py3-none-any.whl")
1413                .unwrap();
1414        assert_eq!(
1415            add_tar_zst_extension(url).as_str(),
1416            "https://files.pythonhosted.org/flask-3.1.0-py3-none-any.whl.tar.zst"
1417        );
1418
1419        let url = DisplaySafeUrl::parse(
1420            "https://files.pythonhosted.org/flask-3.1.0-py3-none-any.whl.tar.zst",
1421        )
1422        .unwrap();
1423        assert_eq!(
1424            add_tar_zst_extension(url).as_str(),
1425            "https://files.pythonhosted.org/flask-3.1.0-py3-none-any.whl.tar.zst"
1426        );
1427
1428        let url = DisplaySafeUrl::parse(
1429            "https://files.pythonhosted.org/flask-3.1.0%2Bcu124-py3-none-any.whl",
1430        )
1431        .unwrap();
1432        assert_eq!(
1433            add_tar_zst_extension(url).as_str(),
1434            "https://files.pythonhosted.org/flask-3.1.0%2Bcu124-py3-none-any.whl.tar.zst"
1435        );
1436    }
1437}