1use std::future::Future;
2use std::io;
3use std::path::Path;
4use std::pin::Pin;
5use std::sync::Arc;
6use std::task::{Context, Poll};
7
8use futures::{FutureExt, TryStreamExt};
9use tokio::io::{AsyncRead, AsyncSeekExt, ReadBuf};
10use tokio::sync::Semaphore;
11use tokio_util::compat::FuturesAsyncReadCompatExt;
12use tracing::{Instrument, info_span, instrument, warn};
13use url::Url;
14
15use uv_cache::{ArchiveId, CacheBucket, CacheEntry, WheelCache};
16use uv_cache_info::{CacheInfo, Timestamp};
17use uv_client::{
18 CacheControl, CachedClientError, Connectivity, DataWithCachePolicy, RegistryClient,
19};
20use uv_distribution_filename::{SourceDistExtension, WheelFilename};
21use uv_distribution_types::{
22 BuildInfo, BuildableSource, BuiltDist, Dist, DistRef, File, HashPolicy, Hashed, IndexUrl,
23 InstalledDist, Name, SourceDist, ToUrlError,
24};
25use uv_extract::hash::Hasher;
26use uv_fs::write_atomic;
27use uv_git::{GIT_LFS, GitError};
28use uv_install_wheel::validate_and_heal_record;
29use uv_platform_tags::Tags;
30use uv_pypi_types::{HashDigest, HashDigests, PyProjectToml};
31use uv_redacted::DisplaySafeUrl;
32use uv_types::{BuildContext, BuildStack};
33use uv_warnings::warn_user_once;
34
35use crate::archive::Archive;
36use uv_python::PythonVariant;
37
38use crate::error::PythonVersion;
39use crate::metadata::{ArchiveMetadata, Metadata};
40use crate::source::SourceDistributionBuilder;
41use crate::{Error, LocalWheel, Reporter, RequiresDist};
42
43pub struct DistributionDatabase<'a, Context: BuildContext> {
56 build_context: &'a Context,
57 builder: SourceDistributionBuilder<'a, Context>,
58 client: ManagedClient<'a>,
59 reporter: Option<Arc<dyn Reporter>>,
60}
61
62impl<'a, Context: BuildContext> DistributionDatabase<'a, Context> {
63 pub fn new(
64 client: &'a RegistryClient,
65 build_context: &'a Context,
66 downloads_semaphore: Arc<Semaphore>,
67 ) -> Self {
68 Self {
69 build_context,
70 builder: SourceDistributionBuilder::new(build_context),
71 client: ManagedClient::new(client, downloads_semaphore),
72 reporter: None,
73 }
74 }
75
76 #[must_use]
78 pub fn with_build_stack(self, build_stack: &'a BuildStack) -> Self {
79 Self {
80 builder: self.builder.with_build_stack(build_stack),
81 ..self
82 }
83 }
84
85 #[must_use]
87 pub fn with_reporter(self, reporter: Arc<dyn Reporter>) -> Self {
88 Self {
89 builder: self.builder.with_reporter(reporter.clone()),
90 reporter: Some(reporter),
91 ..self
92 }
93 }
94
95 fn handle_response_errors(&self, err: reqwest::Error) -> io::Error {
97 if err.is_timeout() {
98 io::Error::new(
100 io::ErrorKind::TimedOut,
101 format!(
102 "Failed to download distribution due to network timeout. Try increasing UV_HTTP_TIMEOUT (current value: {}s).",
103 self.client.unmanaged.read_timeout().as_secs()
104 ),
105 )
106 } else {
107 io::Error::other(err)
108 }
109 }
110
111 #[instrument(skip_all, fields(%dist))]
118 pub async fn get_or_build_wheel(
119 &self,
120 dist: &Dist,
121 tags: &Tags,
122 hashes: HashPolicy<'_>,
123 ) -> Result<LocalWheel, Error> {
124 match dist {
125 Dist::Built(built) => self.get_wheel(built, hashes).await,
126 Dist::Source(source) => self.build_wheel(source, tags, hashes).await,
127 }
128 }
129
130 #[instrument(skip_all, fields(%dist))]
136 pub async fn get_installed_metadata(
137 &self,
138 dist: &InstalledDist,
139 ) -> Result<ArchiveMetadata, Error> {
140 if let Some(metadata) = self
142 .build_context
143 .dependency_metadata()
144 .get(dist.name(), Some(dist.version()))
145 {
146 return Ok(ArchiveMetadata::from_metadata23(metadata.clone()));
147 }
148
149 let metadata = dist
150 .read_metadata()
151 .map_err(|err| Error::ReadInstalled(Box::new(dist.clone()), err))?;
152
153 Ok(ArchiveMetadata::from_metadata23(metadata.clone()))
154 }
155
156 #[instrument(skip_all, fields(%dist))]
162 pub async fn get_or_build_wheel_metadata(
163 &self,
164 dist: &Dist,
165 hashes: HashPolicy<'_>,
166 ) -> Result<ArchiveMetadata, Error> {
167 match dist {
168 Dist::Built(built) => self.get_wheel_metadata(built, hashes).await,
169 Dist::Source(source) => {
170 self.build_wheel_metadata(&BuildableSource::Dist(source), hashes)
171 .await
172 }
173 }
174 }
175
176 async fn get_wheel(
181 &self,
182 dist: &BuiltDist,
183 hashes: HashPolicy<'_>,
184 ) -> Result<LocalWheel, Error> {
185 match dist {
186 BuiltDist::Registry(wheels) => {
187 let wheel = wheels.best_wheel();
188 let WheelTarget {
189 url,
190 extension,
191 size,
192 } = WheelTarget::try_from(&*wheel.file)?;
193
194 let wheel_entry = self.build_context.cache().entry(
196 CacheBucket::Wheels,
197 WheelCache::Index(&wheel.index).wheel_dir(wheel.name().as_ref()),
198 wheel.filename.cache_key(),
199 );
200
201 if url.scheme() == "file" {
203 let path = url
204 .to_file_path()
205 .map_err(|()| Error::NonFileUrl(url.clone()))?;
206 return self
207 .load_wheel(
208 &path,
209 &wheel.filename,
210 WheelExtension::Whl,
211 wheel_entry,
212 dist,
213 hashes,
214 )
215 .await;
216 }
217
218 match self
220 .stream_wheel(
221 url.clone(),
222 dist.index(),
223 &wheel.filename,
224 extension,
225 size,
226 &wheel_entry,
227 dist,
228 hashes,
229 )
230 .await
231 {
232 Ok(archive) => Ok(LocalWheel {
233 dist: Dist::Built(dist.clone()),
234 archive: self
235 .build_context
236 .cache()
237 .archive(&archive.id)
238 .into_boxed_path(),
239 hashes: archive.hashes,
240 filename: wheel.filename.clone(),
241 cache: CacheInfo::default(),
242 build: None,
243 }),
244 Err(Error::Extract(name, err)) => {
245 if err.is_http_streaming_unsupported() {
246 warn!(
247 "Streaming unsupported for {dist}; downloading wheel to disk ({err})"
248 );
249 } else if err.is_http_streaming_failed() {
250 warn!("Streaming failed for {dist}; downloading wheel to disk ({err})");
251 } else {
252 return Err(Error::Extract(name, err));
253 }
254
255 let archive = self
258 .download_wheel(
259 url,
260 dist.index(),
261 &wheel.filename,
262 extension,
263 size,
264 &wheel_entry,
265 dist,
266 hashes,
267 )
268 .await?;
269
270 Ok(LocalWheel {
271 dist: Dist::Built(dist.clone()),
272 archive: self
273 .build_context
274 .cache()
275 .archive(&archive.id)
276 .into_boxed_path(),
277 hashes: archive.hashes,
278 filename: wheel.filename.clone(),
279 cache: CacheInfo::default(),
280 build: None,
281 })
282 }
283 Err(err) => Err(err),
284 }
285 }
286
287 BuiltDist::DirectUrl(wheel) => {
288 let wheel_entry = self.build_context.cache().entry(
290 CacheBucket::Wheels,
291 WheelCache::Url(&wheel.url).wheel_dir(wheel.name().as_ref()),
292 wheel.filename.cache_key(),
293 );
294
295 match self
297 .stream_wheel(
298 wheel.url.raw().clone(),
299 None,
300 &wheel.filename,
301 WheelExtension::Whl,
302 None,
303 &wheel_entry,
304 dist,
305 hashes,
306 )
307 .await
308 {
309 Ok(archive) => Ok(LocalWheel {
310 dist: Dist::Built(dist.clone()),
311 archive: self
312 .build_context
313 .cache()
314 .archive(&archive.id)
315 .into_boxed_path(),
316 hashes: archive.hashes,
317 filename: wheel.filename.clone(),
318 cache: CacheInfo::default(),
319 build: None,
320 }),
321 Err(Error::Extract(name, err)) => {
322 if err.is_http_streaming_unsupported() {
323 warn!(
324 "Streaming unsupported for {dist}; downloading wheel to disk ({err})"
325 );
326 } else if err.is_http_streaming_failed() {
327 warn!("Streaming failed for {dist}; downloading wheel to disk ({err})");
328 } else {
329 return Err(Error::Extract(name, err));
330 }
331
332 let archive = self
335 .download_wheel(
336 wheel.url.raw().clone(),
337 None,
338 &wheel.filename,
339 WheelExtension::Whl,
340 None,
341 &wheel_entry,
342 dist,
343 hashes,
344 )
345 .await?;
346 Ok(LocalWheel {
347 dist: Dist::Built(dist.clone()),
348 archive: self
349 .build_context
350 .cache()
351 .archive(&archive.id)
352 .into_boxed_path(),
353 hashes: archive.hashes,
354 filename: wheel.filename.clone(),
355 cache: CacheInfo::default(),
356 build: None,
357 })
358 }
359 Err(err) => Err(err),
360 }
361 }
362
363 BuiltDist::GitPath(wheel) => {
364 let fetch = self
366 .build_context
367 .git()
368 .fetch(
369 &wheel.git,
370 self.client.unmanaged.git_http_settings(wheel.git.url()),
371 self.build_context.cache().bucket(CacheBucket::Git),
372 self.reporter.clone().map(<dyn Reporter>::into_git_reporter),
373 )
374 .await?;
375
376 if wheel.git.lfs().enabled() && !fetch.lfs_ready() {
377 if GIT_LFS.is_err() {
378 return Err(Error::MissingWheelGitLfsArtifacts(
379 wheel.url.to_url(),
380 GitError::GitLfsNotFound,
381 ));
382 }
383 return Err(Error::MissingWheelGitLfsArtifacts(
384 wheel.url.to_url(),
385 GitError::GitLfsNotConfigured,
386 ));
387 }
388
389 let git_sha = fetch.git().precise().expect("Exact commit after checkout");
390 let cache_entry = self.build_context.cache().entry(
391 CacheBucket::Wheels,
392 WheelCache::Git(&wheel.url, git_sha.as_short_str()).root(),
393 wheel.filename.stem(),
394 );
395
396 let install_path = fetch.path().join(&wheel.install_path);
397
398 self.load_wheel(
399 &install_path,
400 &wheel.filename,
401 WheelExtension::Whl,
402 cache_entry,
403 dist,
404 hashes,
405 )
406 .await
407 }
408
409 BuiltDist::Path(wheel) => {
410 let cache_entry = self.build_context.cache().entry(
411 CacheBucket::Wheels,
412 WheelCache::Url(&wheel.url).wheel_dir(wheel.name().as_ref()),
413 wheel.filename.cache_key(),
414 );
415
416 self.load_wheel(
417 &wheel.install_path,
418 &wheel.filename,
419 WheelExtension::Whl,
420 cache_entry,
421 dist,
422 hashes,
423 )
424 .await
425 }
426 }
427 }
428
429 async fn build_wheel(
435 &self,
436 dist: &SourceDist,
437 tags: &Tags,
438 hashes: HashPolicy<'_>,
439 ) -> Result<LocalWheel, Error> {
440 if let Some(extension) = dist.extension()
448 && !matches!(
449 extension,
450 SourceDistExtension::TarGz | SourceDistExtension::Zip
451 )
452 {
453 if matches!(dist, SourceDist::Registry(_)) {
454 warn_user_once!(
458 "{dist} uses a legacy source distribution format ('.{extension}') that is not compliant with PEP 625. A future version of uv will reject this source distribution. Consider upgrading to a newer version of {package}",
459 package = dist.name(),
460 );
461 } else {
462 warn_user_once!(
463 "{dist} is not a standards-compliant source distribution: expected '.tar.gz' but found '.{extension}'. A future version of uv will reject source distributions that do not meet the requirements specified in PEP 625",
464 );
465 }
466 }
467
468 let built_wheel = self
469 .builder
470 .download_and_build(&BuildableSource::Dist(dist), tags, hashes, &self.client)
471 .boxed_local()
472 .await?;
473
474 if !built_wheel.filename.is_compatible(tags) {
480 return if tags.is_cross() {
481 Err(Error::BuiltWheelIncompatibleTargetPlatform {
482 filename: built_wheel.filename,
483 python_platform: tags.python_platform().clone(),
484 python_version: PythonVersion {
485 version: tags.python_version(),
486 variant: if tags.is_freethreaded() {
487 PythonVariant::Freethreaded
488 } else {
489 PythonVariant::Default
490 },
491 },
492 })
493 } else {
494 Err(Error::BuiltWheelIncompatibleHostPlatform {
495 filename: built_wheel.filename,
496 python_platform: tags.python_platform().clone(),
497 python_version: PythonVersion {
498 version: tags.python_version(),
499 variant: if tags.is_freethreaded() {
500 PythonVariant::Freethreaded
501 } else {
502 PythonVariant::Default
503 },
504 },
505 })
506 };
507 }
508
509 #[cfg(windows)]
511 let _lock = {
512 let lock_entry = CacheEntry::new(
513 built_wheel.target.parent().unwrap(),
514 format!(
515 "{}.lock",
516 built_wheel.target.file_name().unwrap().to_str().unwrap()
517 ),
518 );
519 lock_entry.lock().await.map_err(Error::CacheLock)?
520 };
521
522 match self.build_context.cache().resolve_link(&built_wheel.target) {
525 Ok(archive) => {
526 return Ok(LocalWheel {
527 dist: Dist::Source(dist.clone()),
528 archive: archive.into_boxed_path(),
529 filename: built_wheel.filename,
530 hashes: built_wheel.hashes,
531 cache: built_wheel.cache_info,
532 build: Some(built_wheel.build_info),
533 });
534 }
535 Err(err) if err.kind() == io::ErrorKind::NotFound => {}
536 Err(err) => return Err(Error::CacheRead(err)),
537 }
538
539 let id = self
541 .unzip_wheel(
542 &built_wheel.path,
543 &built_wheel.target,
544 DistRef::Source(dist),
545 )
546 .await?;
547
548 Ok(LocalWheel {
549 dist: Dist::Source(dist.clone()),
550 archive: self.build_context.cache().archive(&id).into_boxed_path(),
551 hashes: built_wheel.hashes,
552 filename: built_wheel.filename,
553 cache: built_wheel.cache_info,
554 build: Some(built_wheel.build_info),
555 })
556 }
557
558 async fn get_wheel_metadata(
563 &self,
564 dist: &BuiltDist,
565 hashes: HashPolicy<'_>,
566 ) -> Result<ArchiveMetadata, Error> {
567 if hashes.is_generate(dist) {
582 let wheel = self.get_wheel(dist, hashes).await?;
583 let metadata = if let Some(metadata) = self
585 .build_context
586 .dependency_metadata()
587 .get(dist.name(), Some(dist.version()))
588 {
589 metadata.clone()
590 } else {
591 wheel.metadata()?
592 };
593 let hashes = wheel.hashes;
594 return Ok(ArchiveMetadata {
595 metadata: Metadata::from_metadata23(metadata),
596 hashes,
597 });
598 }
599
600 if let Some(metadata) = self
602 .build_context
603 .dependency_metadata()
604 .get(dist.name(), Some(dist.version()))
605 {
606 return Ok(ArchiveMetadata::from_metadata23(metadata.clone()));
607 }
608
609 let result = self
610 .client
611 .managed(|client| {
612 client
613 .wheel_metadata(
614 dist,
615 self.build_context.git(),
616 self.build_context.capabilities(),
617 self.reporter.clone().map(<dyn Reporter>::into_git_reporter),
618 )
619 .boxed_local()
620 })
621 .await;
622
623 match result {
624 Ok(metadata) => {
625 Ok(ArchiveMetadata::from_metadata23(metadata))
627 }
628 Err(err) if err.is_http_streaming_unsupported() => {
629 warn!(
630 "Streaming unsupported when fetching metadata for {dist}; downloading wheel directly ({err})"
631 );
632
633 let wheel = self.get_wheel(dist, hashes).await?;
636 let metadata = wheel.metadata()?;
637 let hashes = wheel.hashes;
638 Ok(ArchiveMetadata {
639 metadata: Metadata::from_metadata23(metadata),
640 hashes,
641 })
642 }
643 Err(err) => Err(err.into()),
644 }
645 }
646
647 pub async fn build_wheel_metadata(
652 &self,
653 source: &BuildableSource<'_>,
654 hashes: HashPolicy<'_>,
655 ) -> Result<ArchiveMetadata, Error> {
656 if let Some(dist) = source.as_dist() {
658 if let Some(metadata) = self
659 .build_context
660 .dependency_metadata()
661 .get(dist.name(), dist.version())
662 {
663 self.builder.resolve_revision(source, &self.client).await?;
666
667 return Ok(ArchiveMetadata::from_metadata23(metadata.clone()));
668 }
669 }
670
671 let metadata = self
672 .builder
673 .download_and_build_metadata(source, hashes, &self.client)
674 .boxed_local()
675 .await?;
676
677 Ok(metadata)
678 }
679
680 pub async fn requires_dist(
682 &self,
683 path: &Path,
684 pyproject_toml: &PyProjectToml,
685 ) -> Result<Option<RequiresDist>, Error> {
686 self.builder
687 .source_tree_requires_dist(
688 path,
689 pyproject_toml,
690 self.client.unmanaged.credentials_cache(),
691 )
692 .await
693 }
694
695 async fn stream_wheel(
697 &self,
698 url: DisplaySafeUrl,
699 index: Option<&IndexUrl>,
700 filename: &WheelFilename,
701 extension: WheelExtension,
702 size: Option<u64>,
703 wheel_entry: &CacheEntry,
704 dist: &BuiltDist,
705 hashes: HashPolicy<'_>,
706 ) -> Result<Archive, Error> {
707 #[cfg(windows)]
709 let _lock = {
710 let lock_entry = wheel_entry.with_file(format!("{}.lock", filename.stem()));
711 lock_entry.lock().await.map_err(Error::CacheLock)?
712 };
713
714 let http_entry = wheel_entry.with_file(format!("{}.http", filename.cache_key()));
716
717 let query_url = &url.clone();
718
719 let download = |response: reqwest::Response| {
720 async {
721 let size = size.or_else(|| content_length(&response));
722
723 let progress = self
724 .reporter
725 .as_ref()
726 .map(|reporter| (reporter, reporter.on_download_start(dist.name(), size)));
727
728 let reader = response
729 .bytes_stream()
730 .map_err(|err| self.handle_response_errors(err))
731 .into_async_read();
732
733 let algorithms = hashes.algorithms();
735 let mut hashers = algorithms.into_iter().map(Hasher::from).collect::<Vec<_>>();
736 let mut hasher = uv_extract::hash::HashReader::new(reader.compat(), &mut hashers);
737
738 let temp_dir = tempfile::tempdir_in(self.build_context.cache().root())
740 .map_err(Error::CacheWrite)?;
741
742 let files = match progress {
743 Some((reporter, progress)) => {
744 let mut reader = ProgressReader::new(&mut hasher, progress, &**reporter);
745 match extension {
746 WheelExtension::Whl => {
747 uv_extract::stream::unzip(query_url, &mut reader, temp_dir.path())
748 .await
749 .map_err(|err| Error::Extract(filename.to_string(), err))?
750 }
751 WheelExtension::WhlZst => {
752 uv_extract::stream::untar_zst(&mut reader, temp_dir.path())
753 .await
754 .map_err(|err| Error::Extract(filename.to_string(), err))?
755 }
756 }
757 }
758 None => match extension {
759 WheelExtension::Whl => {
760 uv_extract::stream::unzip(query_url, &mut hasher, temp_dir.path())
761 .await
762 .map_err(|err| Error::Extract(filename.to_string(), err))?
763 }
764 WheelExtension::WhlZst => {
765 uv_extract::stream::untar_zst(&mut hasher, temp_dir.path())
766 .await
767 .map_err(|err| Error::Extract(filename.to_string(), err))?
768 }
769 },
770 };
771 if !hashes.is_none() {
773 hasher.finish().await.map_err(Error::HashExhaustion)?;
774 }
775
776 validate_and_heal_record(temp_dir.path(), files.iter(), dist)
779 .map_err(Error::InstallWheelError)?;
780
781 let id = self
783 .build_context
784 .cache()
785 .persist(temp_dir.keep(), wheel_entry.path())
786 .await
787 .map_err(Error::CacheRead)?;
788
789 if let Some((reporter, progress)) = progress {
790 reporter.on_download_complete(dist.name(), progress);
791 }
792
793 Ok(Archive::new(
794 id,
795 hashers.into_iter().map(HashDigest::from).collect(),
796 filename.clone(),
797 ))
798 }
799 .instrument(info_span!("wheel", wheel = %dist))
800 };
801
802 let req = self.request(url.clone())?;
804
805 let cache_control = match self.client.unmanaged.connectivity() {
807 Connectivity::Online => {
808 if let Some(header) = index.and_then(|index| {
809 self.build_context
810 .locations()
811 .artifact_cache_control_for(index)
812 }) {
813 CacheControl::Override(header)
814 } else {
815 CacheControl::from(
816 self.build_context
817 .cache()
818 .freshness(&http_entry, Some(&filename.name), None)
819 .map_err(Error::CacheRead)?,
820 )
821 }
822 }
823 Connectivity::Offline => CacheControl::AllowStale,
824 };
825
826 let archive = self
827 .client
828 .managed(|client| {
829 client.cached_client().get_serde_with_retry(
830 req,
831 &http_entry,
832 cache_control.clone(),
833 download,
834 )
835 })
836 .await
837 .map_err(|err| match err {
838 CachedClientError::Callback { err, .. } => err,
839 CachedClientError::Client(err) => Error::Client(err),
840 })?;
841
842 let archive = Some(archive)
844 .filter(|archive| archive.has_digests(hashes))
845 .filter(|archive| archive.exists(self.build_context.cache()));
846
847 let archive = if let Some(archive) = archive {
848 archive
849 } else {
850 self.client
851 .managed(async |client| {
852 client
853 .cached_client()
854 .skip_cache_with_retry(
855 self.request(url)?,
856 &http_entry,
857 cache_control,
858 download,
859 )
860 .await
861 .map_err(|err| match err {
862 CachedClientError::Callback { err, .. } => err,
863 CachedClientError::Client(err) => Error::Client(err),
864 })
865 })
866 .await?
867 };
868
869 Ok(archive)
870 }
871
872 async fn download_wheel(
874 &self,
875 url: DisplaySafeUrl,
876 index: Option<&IndexUrl>,
877 filename: &WheelFilename,
878 extension: WheelExtension,
879 size: Option<u64>,
880 wheel_entry: &CacheEntry,
881 dist: &BuiltDist,
882 hashes: HashPolicy<'_>,
883 ) -> Result<Archive, Error> {
884 #[cfg(windows)]
886 let _lock = {
887 let lock_entry = wheel_entry.with_file(format!("{}.lock", filename.stem()));
888 lock_entry.lock().await.map_err(Error::CacheLock)?
889 };
890
891 let http_entry = wheel_entry.with_file(format!("{}.http", filename.cache_key()));
893
894 let query_url = &url.clone();
895
896 let download = |response: reqwest::Response| {
897 async {
898 let size = size.or_else(|| content_length(&response));
899
900 let progress = self
901 .reporter
902 .as_ref()
903 .map(|reporter| (reporter, reporter.on_download_start(dist.name(), size)));
904
905 let reader = response
906 .bytes_stream()
907 .map_err(|err| self.handle_response_errors(err))
908 .into_async_read();
909
910 let temp_file = tempfile::tempfile_in(self.build_context.cache().root())
912 .map_err(Error::CacheWrite)?;
913 let mut writer = tokio::io::BufWriter::new(fs_err::tokio::File::from_std(
914 fs_err::File::from_parts(temp_file, self.build_context.cache().root()),
916 ));
917
918 match progress {
919 Some((reporter, progress)) => {
920 let mut reader =
924 ProgressReader::new(reader.compat(), progress, &**reporter);
925
926 tokio::io::copy(&mut reader, &mut writer)
927 .await
928 .map_err(Error::CacheWrite)?;
929 }
930 None => {
931 tokio::io::copy(&mut reader.compat(), &mut writer)
932 .await
933 .map_err(Error::CacheWrite)?;
934 }
935 }
936
937 let temp_dir = tempfile::tempdir_in(self.build_context.cache().root())
939 .map_err(Error::CacheWrite)?;
940 let mut file = writer.into_inner();
941 file.seek(io::SeekFrom::Start(0))
942 .await
943 .map_err(Error::CacheWrite)?;
944
945 let (files, hashes) = if hashes.is_none() {
947 let target = temp_dir.path().to_owned();
948 let files = match extension {
949 WheelExtension::Whl => {
950 let file = file.into_std().await;
951 tokio::task::spawn_blocking(move || uv_extract::unzip(file, &target))
952 .await?
953 }
954 WheelExtension::WhlZst => {
955 uv_extract::stream::untar_zst(file, &target).await
956 }
957 }
958 .map_err(|err| Error::Extract(filename.to_string(), err))?;
959
960 (files, HashDigests::empty())
961 } else {
962 let algorithms = hashes.algorithms();
964 let mut hashers = algorithms.into_iter().map(Hasher::from).collect::<Vec<_>>();
965 let mut hasher = uv_extract::hash::HashReader::new(file, &mut hashers);
966
967 let files = match extension {
968 WheelExtension::Whl => {
969 uv_extract::stream::unzip(query_url, &mut hasher, temp_dir.path())
970 .await
971 .map_err(|err| Error::Extract(filename.to_string(), err))?
972 }
973 WheelExtension::WhlZst => {
974 uv_extract::stream::untar_zst(&mut hasher, temp_dir.path())
975 .await
976 .map_err(|err| Error::Extract(filename.to_string(), err))?
977 }
978 };
979
980 hasher.finish().await.map_err(Error::HashExhaustion)?;
982 let hashes = hashers.into_iter().map(HashDigest::from).collect();
983
984 (files, hashes)
985 };
986
987 validate_and_heal_record(temp_dir.path(), files.iter(), dist)
990 .map_err(Error::InstallWheelError)?;
991
992 let id = self
994 .build_context
995 .cache()
996 .persist(temp_dir.keep(), wheel_entry.path())
997 .await
998 .map_err(Error::CacheRead)?;
999
1000 if let Some((reporter, progress)) = progress {
1001 reporter.on_download_complete(dist.name(), progress);
1002 }
1003
1004 Ok(Archive::new(id, hashes, filename.clone()))
1005 }
1006 .instrument(info_span!("wheel", wheel = %dist))
1007 };
1008
1009 let req = self.request(url.clone())?;
1011
1012 let cache_control = match self.client.unmanaged.connectivity() {
1014 Connectivity::Online => {
1015 if let Some(header) = index.and_then(|index| {
1016 self.build_context
1017 .locations()
1018 .artifact_cache_control_for(index)
1019 }) {
1020 CacheControl::Override(header)
1021 } else {
1022 CacheControl::from(
1023 self.build_context
1024 .cache()
1025 .freshness(&http_entry, Some(&filename.name), None)
1026 .map_err(Error::CacheRead)?,
1027 )
1028 }
1029 }
1030 Connectivity::Offline => CacheControl::AllowStale,
1031 };
1032
1033 let archive = self
1034 .client
1035 .managed(|client| {
1036 client.cached_client().get_serde_with_retry(
1037 req,
1038 &http_entry,
1039 cache_control.clone(),
1040 download,
1041 )
1042 })
1043 .await
1044 .map_err(|err| match err {
1045 CachedClientError::Callback { err, .. } => err,
1046 CachedClientError::Client(err) => Error::Client(err),
1047 })?;
1048
1049 let archive = Some(archive)
1051 .filter(|archive| archive.has_digests(hashes))
1052 .filter(|archive| archive.exists(self.build_context.cache()));
1053
1054 let archive = if let Some(archive) = archive {
1055 archive
1056 } else {
1057 self.client
1058 .managed(async |client| {
1059 client
1060 .cached_client()
1061 .skip_cache_with_retry(
1062 self.request(url)?,
1063 &http_entry,
1064 cache_control,
1065 download,
1066 )
1067 .await
1068 .map_err(|err| match err {
1069 CachedClientError::Callback { err, .. } => err,
1070 CachedClientError::Client(err) => Error::Client(err),
1071 })
1072 })
1073 .await?
1074 };
1075
1076 Ok(archive)
1077 }
1078
1079 async fn load_wheel(
1081 &self,
1082 path: &Path,
1083 filename: &WheelFilename,
1084 extension: WheelExtension,
1085 wheel_entry: CacheEntry,
1086 dist: &BuiltDist,
1087 hashes: HashPolicy<'_>,
1088 ) -> Result<LocalWheel, Error> {
1089 #[cfg(windows)]
1090 let _lock = {
1091 let lock_entry = wheel_entry.with_file(format!("{}.lock", filename.stem()));
1092 lock_entry.lock().await.map_err(Error::CacheLock)?
1093 };
1094
1095 let modified = Timestamp::from_path(path).map_err(Error::CacheRead)?;
1097
1098 let pointer_entry = wheel_entry.with_file(format!("{}.rev", filename.cache_key()));
1100 let pointer = PathArchivePointer::read_from(&pointer_entry)?;
1101
1102 let archive = pointer
1104 .filter(|pointer| pointer.is_up_to_date(modified))
1105 .map(PathArchivePointer::into_archive)
1106 .filter(|archive| archive.has_digests(hashes));
1107
1108 if let Some(archive) = archive {
1110 Ok(LocalWheel {
1111 dist: Dist::Built(dist.clone()),
1112 archive: self
1113 .build_context
1114 .cache()
1115 .archive(&archive.id)
1116 .into_boxed_path(),
1117 hashes: archive.hashes,
1118 filename: filename.clone(),
1119 cache: CacheInfo::from_timestamp(modified),
1120 build: None,
1121 })
1122 } else if hashes.is_none() {
1123 let archive = Archive::new(
1125 self.unzip_wheel(path, wheel_entry.path(), DistRef::Built(dist))
1126 .await?,
1127 HashDigests::empty(),
1128 filename.clone(),
1129 );
1130
1131 let pointer = PathArchivePointer {
1133 timestamp: modified,
1134 archive: archive.clone(),
1135 };
1136 pointer.write_to(&pointer_entry).await?;
1137
1138 Ok(LocalWheel {
1139 dist: Dist::Built(dist.clone()),
1140 archive: self
1141 .build_context
1142 .cache()
1143 .archive(&archive.id)
1144 .into_boxed_path(),
1145 hashes: archive.hashes,
1146 filename: filename.clone(),
1147 cache: CacheInfo::from_timestamp(modified),
1148 build: None,
1149 })
1150 } else {
1151 let file = fs_err::tokio::File::open(path)
1153 .await
1154 .map_err(Error::CacheRead)?;
1155 let temp_dir = tempfile::tempdir_in(self.build_context.cache().root())
1156 .map_err(Error::CacheWrite)?;
1157
1158 let algorithms = hashes.algorithms();
1160 let mut hashers = algorithms.into_iter().map(Hasher::from).collect::<Vec<_>>();
1161 let mut hasher = uv_extract::hash::HashReader::new(file, &mut hashers);
1162
1163 let files = match extension {
1165 WheelExtension::Whl => {
1166 uv_extract::stream::unzip(path.display(), &mut hasher, temp_dir.path())
1167 .await
1168 .map_err(|err| Error::Extract(filename.to_string(), err))?
1169 }
1170 WheelExtension::WhlZst => {
1171 uv_extract::stream::untar_zst(&mut hasher, temp_dir.path())
1172 .await
1173 .map_err(|err| Error::Extract(filename.to_string(), err))?
1174 }
1175 };
1176
1177 hasher.finish().await.map_err(Error::HashExhaustion)?;
1179
1180 let hashes = hashers.into_iter().map(HashDigest::from).collect();
1181
1182 validate_and_heal_record(temp_dir.path(), files.iter(), dist)
1185 .map_err(Error::InstallWheelError)?;
1186
1187 let id = self
1189 .build_context
1190 .cache()
1191 .persist(temp_dir.keep(), wheel_entry.path())
1192 .await
1193 .map_err(Error::CacheWrite)?;
1194
1195 let archive = Archive::new(id, hashes, filename.clone());
1197
1198 let pointer = PathArchivePointer {
1200 timestamp: modified,
1201 archive: archive.clone(),
1202 };
1203 pointer.write_to(&pointer_entry).await?;
1204
1205 Ok(LocalWheel {
1206 dist: Dist::Built(dist.clone()),
1207 archive: self
1208 .build_context
1209 .cache()
1210 .archive(&archive.id)
1211 .into_boxed_path(),
1212 hashes: archive.hashes,
1213 filename: filename.clone(),
1214 cache: CacheInfo::from_timestamp(modified),
1215 build: None,
1216 })
1217 }
1218 }
1219
1220 async fn unzip_wheel(
1222 &self,
1223 path: &Path,
1224 target: &Path,
1225 dist: DistRef<'_>,
1226 ) -> Result<ArchiveId, Error> {
1227 let (temp_dir, files) = tokio::task::spawn_blocking({
1228 let path = path.to_owned();
1229 let root = self.build_context.cache().root().to_path_buf();
1230 move || -> Result<_, Error> {
1231 let temp_dir = tempfile::tempdir_in(root).map_err(Error::CacheWrite)?;
1233 let reader = fs_err::File::open(&path).map_err(Error::CacheWrite)?;
1234 let files = uv_extract::unzip(reader, temp_dir.path())
1235 .map_err(|err| Error::Extract(path.to_string_lossy().into_owned(), err))?;
1236 Ok((temp_dir, files))
1237 }
1238 })
1239 .await??;
1240
1241 validate_and_heal_record(temp_dir.path(), files.iter(), dist)
1243 .map_err(Error::InstallWheelError)?;
1244
1245 let id = self
1247 .build_context
1248 .cache()
1249 .persist(temp_dir.keep(), target)
1250 .await
1251 .map_err(Error::CacheWrite)?;
1252
1253 Ok(id)
1254 }
1255
1256 fn request(&self, url: DisplaySafeUrl) -> Result<reqwest::Request, reqwest::Error> {
1258 self.client
1259 .unmanaged
1260 .uncached_client(&url)
1261 .get(Url::from(url))
1262 .header(
1263 "accept-encoding",
1267 reqwest::header::HeaderValue::from_static("identity"),
1268 )
1269 .build()
1270 }
1271
1272 pub fn client(&self) -> &ManagedClient<'a> {
1274 &self.client
1275 }
1276}
1277
1278pub struct ManagedClient<'a> {
1280 pub unmanaged: &'a RegistryClient,
1281 control: Arc<Semaphore>,
1282}
1283
1284impl<'a> ManagedClient<'a> {
1285 fn new(client: &'a RegistryClient, control: Arc<Semaphore>) -> Self {
1287 ManagedClient {
1288 unmanaged: client,
1289 control,
1290 }
1291 }
1292
1293 pub async fn managed<F, T>(&self, f: impl FnOnce(&'a RegistryClient) -> F) -> T
1298 where
1299 F: Future<Output = T>,
1300 {
1301 let _permit = self.control.acquire().await.unwrap();
1302 f(self.unmanaged).await
1303 }
1304
1305 pub async fn manual<F, T>(&'a self, f: impl FnOnce(&'a RegistryClient, &'a Semaphore) -> F) -> T
1313 where
1314 F: Future<Output = T>,
1315 {
1316 f(self.unmanaged, &self.control).await
1317 }
1318}
1319
1320fn content_length(response: &reqwest::Response) -> Option<u64> {
1322 response
1323 .headers()
1324 .get(reqwest::header::CONTENT_LENGTH)
1325 .and_then(|val| val.to_str().ok())
1326 .and_then(|val| val.parse::<u64>().ok())
1327}
1328
1329struct ProgressReader<'a, R> {
1331 reader: R,
1332 index: usize,
1333 reporter: &'a dyn Reporter,
1334}
1335
1336impl<'a, R> ProgressReader<'a, R> {
1337 fn new(reader: R, index: usize, reporter: &'a dyn Reporter) -> Self {
1339 Self {
1340 reader,
1341 index,
1342 reporter,
1343 }
1344 }
1345}
1346
1347impl<R> AsyncRead for ProgressReader<'_, R>
1348where
1349 R: AsyncRead + Unpin,
1350{
1351 fn poll_read(
1352 mut self: Pin<&mut Self>,
1353 cx: &mut Context<'_>,
1354 buf: &mut ReadBuf<'_>,
1355 ) -> Poll<io::Result<()>> {
1356 Pin::new(&mut self.as_mut().reader)
1357 .poll_read(cx, buf)
1358 .map_ok(|()| {
1359 self.reporter
1360 .on_download_progress(self.index, buf.filled().len() as u64);
1361 })
1362 }
1363}
1364
1365#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
1369pub struct HttpArchivePointer {
1370 archive: Archive,
1371}
1372
1373impl HttpArchivePointer {
1374 pub fn read_from(path: impl AsRef<Path>) -> Result<Option<Self>, Error> {
1376 match fs_err::File::open(path.as_ref()) {
1377 Ok(file) => {
1378 let data = DataWithCachePolicy::from_reader(file)?.data;
1379 let archive = rmp_serde::from_slice::<Archive>(&data)?;
1380 Ok(Some(Self { archive }))
1381 }
1382 Err(err) if err.kind() == io::ErrorKind::NotFound => Ok(None),
1383 Err(err) => Err(Error::CacheRead(err)),
1384 }
1385 }
1386
1387 pub fn into_archive(self) -> Archive {
1389 self.archive
1390 }
1391
1392 pub fn to_cache_info(&self) -> CacheInfo {
1394 CacheInfo::default()
1395 }
1396
1397 pub fn to_build_info(&self) -> Option<BuildInfo> {
1399 None
1400 }
1401}
1402
1403#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
1407pub struct PathArchivePointer {
1408 timestamp: Timestamp,
1409 archive: Archive,
1410}
1411
1412impl PathArchivePointer {
1413 pub fn read_from(path: impl AsRef<Path>) -> Result<Option<Self>, Error> {
1415 match fs_err::read(path) {
1416 Ok(cached) => Ok(Some(rmp_serde::from_slice::<Self>(&cached)?)),
1417 Err(err) if err.kind() == io::ErrorKind::NotFound => Ok(None),
1418 Err(err) => Err(Error::CacheRead(err)),
1419 }
1420 }
1421
1422 pub(crate) async fn write_to(&self, entry: &CacheEntry) -> Result<(), Error> {
1424 write_atomic(entry.path(), rmp_serde::to_vec(&self)?)
1425 .await
1426 .map_err(Error::CacheWrite)
1427 }
1428
1429 pub fn is_up_to_date(&self, modified: Timestamp) -> bool {
1431 self.timestamp == modified
1432 }
1433
1434 pub fn into_archive(self) -> Archive {
1436 self.archive
1437 }
1438
1439 pub fn to_cache_info(&self) -> CacheInfo {
1441 CacheInfo::from_timestamp(self.timestamp)
1442 }
1443
1444 pub fn to_build_info(&self) -> Option<BuildInfo> {
1446 None
1447 }
1448}
1449
1450#[derive(Debug, Clone)]
1451struct WheelTarget {
1452 url: DisplaySafeUrl,
1454 extension: WheelExtension,
1456 size: Option<u64>,
1458}
1459
1460impl TryFrom<&File> for WheelTarget {
1461 type Error = ToUrlError;
1462
1463 fn try_from(file: &File) -> Result<Self, Self::Error> {
1465 let url = file.url.to_url()?;
1466 if let Some(zstd) = file.zstd.as_ref() {
1467 Ok(Self {
1468 url: add_tar_zst_extension(url),
1469 extension: WheelExtension::WhlZst,
1470 size: zstd.size,
1471 })
1472 } else {
1473 Ok(Self {
1474 url,
1475 extension: WheelExtension::Whl,
1476 size: file.size,
1477 })
1478 }
1479 }
1480}
1481
1482#[derive(Debug, Copy, Clone, PartialEq, Eq)]
1483enum WheelExtension {
1484 Whl,
1486 WhlZst,
1488}
1489
1490#[must_use]
1492fn add_tar_zst_extension(mut url: DisplaySafeUrl) -> DisplaySafeUrl {
1493 let mut path = url.path().to_string();
1494
1495 if !path.ends_with(".tar.zst") {
1496 path.push_str(".tar.zst");
1497 }
1498
1499 url.set_path(&path);
1500 url
1501}
1502
1503#[cfg(test)]
1504mod tests {
1505 use super::*;
1506
1507 #[test]
1508 fn test_add_tar_zst_extension() {
1509 let url =
1510 DisplaySafeUrl::parse("https://files.pythonhosted.org/flask-3.1.0-py3-none-any.whl")
1511 .unwrap();
1512 assert_eq!(
1513 add_tar_zst_extension(url).as_str(),
1514 "https://files.pythonhosted.org/flask-3.1.0-py3-none-any.whl.tar.zst"
1515 );
1516
1517 let url = DisplaySafeUrl::parse(
1518 "https://files.pythonhosted.org/flask-3.1.0-py3-none-any.whl.tar.zst",
1519 )
1520 .unwrap();
1521 assert_eq!(
1522 add_tar_zst_extension(url).as_str(),
1523 "https://files.pythonhosted.org/flask-3.1.0-py3-none-any.whl.tar.zst"
1524 );
1525
1526 let url = DisplaySafeUrl::parse(
1527 "https://files.pythonhosted.org/flask-3.1.0%2Bcu124-py3-none-any.whl",
1528 )
1529 .unwrap();
1530 assert_eq!(
1531 add_tar_zst_extension(url).as_str(),
1532 "https://files.pythonhosted.org/flask-3.1.0%2Bcu124-py3-none-any.whl.tar.zst"
1533 );
1534 }
1535}