1use std::future::Future;
2use std::io;
3use std::path::Path;
4use std::pin::Pin;
5use std::sync::Arc;
6use std::task::{Context, Poll};
7
8use futures::{FutureExt, TryStreamExt};
9use tempfile::TempDir;
10use tokio::io::{AsyncRead, AsyncSeekExt, ReadBuf};
11use tokio::sync::Semaphore;
12use tokio_util::compat::FuturesAsyncReadCompatExt;
13use tracing::{Instrument, info_span, instrument, warn};
14use url::Url;
15
16use uv_cache::{ArchiveId, CacheBucket, CacheEntry, WheelCache};
17use uv_cache_info::{CacheInfo, Timestamp};
18use uv_client::{
19 CacheControl, CachedClientError, Connectivity, DataWithCachePolicy, RegistryClient,
20};
21use uv_distribution_filename::{SourceDistExtension, WheelFilename};
22use uv_distribution_types::{
23 BuildInfo, BuildableSource, BuiltDist, Dist, File, HashPolicy, Hashed, IndexUrl, InstalledDist,
24 Name, SourceDist, ToUrlError,
25};
26use uv_extract::hash::Hasher;
27use uv_fs::write_atomic;
28use uv_platform_tags::Tags;
29use uv_pypi_types::{HashDigest, HashDigests, PyProjectToml};
30use uv_redacted::DisplaySafeUrl;
31use uv_types::{BuildContext, BuildStack};
32use uv_warnings::warn_user_once;
33
34use crate::archive::Archive;
35use uv_python::PythonVariant;
36
37use crate::error::PythonVersion;
38use crate::metadata::{ArchiveMetadata, Metadata};
39use crate::source::SourceDistributionBuilder;
40use crate::{Error, LocalWheel, Reporter, RequiresDist};
41
42pub struct DistributionDatabase<'a, Context: BuildContext> {
55 build_context: &'a Context,
56 builder: SourceDistributionBuilder<'a, Context>,
57 client: ManagedClient<'a>,
58 reporter: Option<Arc<dyn Reporter>>,
59}
60
61impl<'a, Context: BuildContext> DistributionDatabase<'a, Context> {
62 pub fn new(
63 client: &'a RegistryClient,
64 build_context: &'a Context,
65 downloads_semaphore: Arc<Semaphore>,
66 ) -> Self {
67 Self {
68 build_context,
69 builder: SourceDistributionBuilder::new(build_context),
70 client: ManagedClient::new(client, downloads_semaphore),
71 reporter: None,
72 }
73 }
74
75 #[must_use]
77 pub fn with_build_stack(self, build_stack: &'a BuildStack) -> Self {
78 Self {
79 builder: self.builder.with_build_stack(build_stack),
80 ..self
81 }
82 }
83
84 #[must_use]
86 pub fn with_reporter(self, reporter: Arc<dyn Reporter>) -> Self {
87 Self {
88 builder: self.builder.with_reporter(reporter.clone()),
89 reporter: Some(reporter),
90 ..self
91 }
92 }
93
94 fn handle_response_errors(&self, err: reqwest::Error) -> io::Error {
96 if err.is_timeout() {
97 io::Error::new(
99 io::ErrorKind::TimedOut,
100 format!(
101 "Failed to download distribution due to network timeout. Try increasing UV_HTTP_TIMEOUT (current value: {}s).",
102 self.client.unmanaged.read_timeout().as_secs()
103 ),
104 )
105 } else {
106 io::Error::other(err)
107 }
108 }
109
110 #[instrument(skip_all, fields(%dist))]
117 pub async fn get_or_build_wheel(
118 &self,
119 dist: &Dist,
120 tags: &Tags,
121 hashes: HashPolicy<'_>,
122 ) -> Result<LocalWheel, Error> {
123 match dist {
124 Dist::Built(built) => self.get_wheel(built, hashes).await,
125 Dist::Source(source) => self.build_wheel(source, tags, hashes).await,
126 }
127 }
128
129 #[instrument(skip_all, fields(%dist))]
135 pub async fn get_installed_metadata(
136 &self,
137 dist: &InstalledDist,
138 ) -> Result<ArchiveMetadata, Error> {
139 if let Some(metadata) = self
141 .build_context
142 .dependency_metadata()
143 .get(dist.name(), Some(dist.version()))
144 {
145 return Ok(ArchiveMetadata::from_metadata23(metadata.clone()));
146 }
147
148 let metadata = dist
149 .read_metadata()
150 .map_err(|err| Error::ReadInstalled(Box::new(dist.clone()), err))?;
151
152 Ok(ArchiveMetadata::from_metadata23(metadata.clone()))
153 }
154
155 #[instrument(skip_all, fields(%dist))]
161 pub async fn get_or_build_wheel_metadata(
162 &self,
163 dist: &Dist,
164 hashes: HashPolicy<'_>,
165 ) -> Result<ArchiveMetadata, Error> {
166 match dist {
167 Dist::Built(built) => self.get_wheel_metadata(built, hashes).await,
168 Dist::Source(source) => {
169 self.build_wheel_metadata(&BuildableSource::Dist(source), hashes)
170 .await
171 }
172 }
173 }
174
175 async fn get_wheel(
180 &self,
181 dist: &BuiltDist,
182 hashes: HashPolicy<'_>,
183 ) -> Result<LocalWheel, Error> {
184 match dist {
185 BuiltDist::Registry(wheels) => {
186 let wheel = wheels.best_wheel();
187 let WheelTarget {
188 url,
189 extension,
190 size,
191 } = WheelTarget::try_from(&*wheel.file)?;
192
193 let wheel_entry = self.build_context.cache().entry(
195 CacheBucket::Wheels,
196 WheelCache::Index(&wheel.index).wheel_dir(wheel.name().as_ref()),
197 wheel.filename.cache_key(),
198 );
199
200 if url.scheme() == "file" {
202 let path = url
203 .to_file_path()
204 .map_err(|()| Error::NonFileUrl(url.clone()))?;
205 return self
206 .load_wheel(
207 &path,
208 &wheel.filename,
209 WheelExtension::Whl,
210 wheel_entry,
211 dist,
212 hashes,
213 )
214 .await;
215 }
216
217 match self
219 .stream_wheel(
220 url.clone(),
221 dist.index(),
222 &wheel.filename,
223 extension,
224 size,
225 &wheel_entry,
226 dist,
227 hashes,
228 )
229 .await
230 {
231 Ok(archive) => Ok(LocalWheel {
232 dist: Dist::Built(dist.clone()),
233 archive: self
234 .build_context
235 .cache()
236 .archive(&archive.id)
237 .into_boxed_path(),
238 hashes: archive.hashes,
239 filename: wheel.filename.clone(),
240 cache: CacheInfo::default(),
241 build: None,
242 }),
243 Err(Error::Extract(name, err)) => {
244 if err.is_http_streaming_unsupported() {
245 warn!(
246 "Streaming unsupported for {dist}; downloading wheel to disk ({err})"
247 );
248 } else if err.is_http_streaming_failed() {
249 warn!("Streaming failed for {dist}; downloading wheel to disk ({err})");
250 } else {
251 return Err(Error::Extract(name, err));
252 }
253
254 let archive = self
257 .download_wheel(
258 url,
259 dist.index(),
260 &wheel.filename,
261 extension,
262 size,
263 &wheel_entry,
264 dist,
265 hashes,
266 )
267 .await?;
268
269 Ok(LocalWheel {
270 dist: Dist::Built(dist.clone()),
271 archive: self
272 .build_context
273 .cache()
274 .archive(&archive.id)
275 .into_boxed_path(),
276 hashes: archive.hashes,
277 filename: wheel.filename.clone(),
278 cache: CacheInfo::default(),
279 build: None,
280 })
281 }
282 Err(err) => Err(err),
283 }
284 }
285
286 BuiltDist::DirectUrl(wheel) => {
287 let wheel_entry = self.build_context.cache().entry(
289 CacheBucket::Wheels,
290 WheelCache::Url(&wheel.url).wheel_dir(wheel.name().as_ref()),
291 wheel.filename.cache_key(),
292 );
293
294 match self
296 .stream_wheel(
297 wheel.url.raw().clone(),
298 None,
299 &wheel.filename,
300 WheelExtension::Whl,
301 None,
302 &wheel_entry,
303 dist,
304 hashes,
305 )
306 .await
307 {
308 Ok(archive) => Ok(LocalWheel {
309 dist: Dist::Built(dist.clone()),
310 archive: self
311 .build_context
312 .cache()
313 .archive(&archive.id)
314 .into_boxed_path(),
315 hashes: archive.hashes,
316 filename: wheel.filename.clone(),
317 cache: CacheInfo::default(),
318 build: None,
319 }),
320 Err(Error::Extract(name, err)) => {
321 if err.is_http_streaming_unsupported() {
322 warn!(
323 "Streaming unsupported for {dist}; downloading wheel to disk ({err})"
324 );
325 } else if err.is_http_streaming_failed() {
326 warn!("Streaming failed for {dist}; downloading wheel to disk ({err})");
327 } else {
328 return Err(Error::Extract(name, err));
329 }
330
331 let archive = self
334 .download_wheel(
335 wheel.url.raw().clone(),
336 None,
337 &wheel.filename,
338 WheelExtension::Whl,
339 None,
340 &wheel_entry,
341 dist,
342 hashes,
343 )
344 .await?;
345 Ok(LocalWheel {
346 dist: Dist::Built(dist.clone()),
347 archive: self
348 .build_context
349 .cache()
350 .archive(&archive.id)
351 .into_boxed_path(),
352 hashes: archive.hashes,
353 filename: wheel.filename.clone(),
354 cache: CacheInfo::default(),
355 build: None,
356 })
357 }
358 Err(err) => Err(err),
359 }
360 }
361
362 BuiltDist::Path(wheel) => {
363 let cache_entry = self.build_context.cache().entry(
364 CacheBucket::Wheels,
365 WheelCache::Url(&wheel.url).wheel_dir(wheel.name().as_ref()),
366 wheel.filename.cache_key(),
367 );
368
369 self.load_wheel(
370 &wheel.install_path,
371 &wheel.filename,
372 WheelExtension::Whl,
373 cache_entry,
374 dist,
375 hashes,
376 )
377 .await
378 }
379 }
380 }
381
382 async fn build_wheel(
388 &self,
389 dist: &SourceDist,
390 tags: &Tags,
391 hashes: HashPolicy<'_>,
392 ) -> Result<LocalWheel, Error> {
393 if let Some(extension) = dist.extension()
401 && !matches!(
402 extension,
403 SourceDistExtension::TarGz | SourceDistExtension::Zip
404 )
405 {
406 if matches!(dist, SourceDist::Registry(_)) {
407 warn_user_once!(
411 "{dist} uses a legacy source distribution format ('.{extension}') that is not compliant with PEP 625. A future version of uv will reject this source distribution. Consider upgrading to a newer version of {package}",
412 package = dist.name(),
413 );
414 } else {
415 warn_user_once!(
416 "{dist} is not a standards-compliant source distribution: expected '.tar.gz' but found '.{extension}'. A future version of uv will reject source distributions that do not meet the requirements specified in PEP 625",
417 );
418 }
419 }
420
421 let built_wheel = self
422 .builder
423 .download_and_build(&BuildableSource::Dist(dist), tags, hashes, &self.client)
424 .boxed_local()
425 .await?;
426
427 if !built_wheel.filename.is_compatible(tags) {
433 return if tags.is_cross() {
434 Err(Error::BuiltWheelIncompatibleTargetPlatform {
435 filename: built_wheel.filename,
436 python_platform: tags.python_platform().clone(),
437 python_version: PythonVersion {
438 version: tags.python_version(),
439 variant: if tags.is_freethreaded() {
440 PythonVariant::Freethreaded
441 } else {
442 PythonVariant::Default
443 },
444 },
445 })
446 } else {
447 Err(Error::BuiltWheelIncompatibleHostPlatform {
448 filename: built_wheel.filename,
449 python_platform: tags.python_platform().clone(),
450 python_version: PythonVersion {
451 version: tags.python_version(),
452 variant: if tags.is_freethreaded() {
453 PythonVariant::Freethreaded
454 } else {
455 PythonVariant::Default
456 },
457 },
458 })
459 };
460 }
461
462 #[cfg(windows)]
464 let _lock = {
465 let lock_entry = CacheEntry::new(
466 built_wheel.target.parent().unwrap(),
467 format!(
468 "{}.lock",
469 built_wheel.target.file_name().unwrap().to_str().unwrap()
470 ),
471 );
472 lock_entry.lock().await.map_err(Error::CacheLock)?
473 };
474
475 match self.build_context.cache().resolve_link(&built_wheel.target) {
478 Ok(archive) => {
479 return Ok(LocalWheel {
480 dist: Dist::Source(dist.clone()),
481 archive: archive.into_boxed_path(),
482 filename: built_wheel.filename,
483 hashes: built_wheel.hashes,
484 cache: built_wheel.cache_info,
485 build: Some(built_wheel.build_info),
486 });
487 }
488 Err(err) if err.kind() == io::ErrorKind::NotFound => {}
489 Err(err) => return Err(Error::CacheRead(err)),
490 }
491
492 let id = self
494 .unzip_wheel(&built_wheel.path, &built_wheel.target)
495 .await?;
496
497 Ok(LocalWheel {
498 dist: Dist::Source(dist.clone()),
499 archive: self.build_context.cache().archive(&id).into_boxed_path(),
500 hashes: built_wheel.hashes,
501 filename: built_wheel.filename,
502 cache: built_wheel.cache_info,
503 build: Some(built_wheel.build_info),
504 })
505 }
506
507 async fn get_wheel_metadata(
512 &self,
513 dist: &BuiltDist,
514 hashes: HashPolicy<'_>,
515 ) -> Result<ArchiveMetadata, Error> {
516 if hashes.is_generate(dist) {
531 let wheel = self.get_wheel(dist, hashes).await?;
532 let metadata = if let Some(metadata) = self
534 .build_context
535 .dependency_metadata()
536 .get(dist.name(), Some(dist.version()))
537 {
538 metadata.clone()
539 } else {
540 wheel.metadata()?
541 };
542 let hashes = wheel.hashes;
543 return Ok(ArchiveMetadata {
544 metadata: Metadata::from_metadata23(metadata),
545 hashes,
546 });
547 }
548
549 if let Some(metadata) = self
551 .build_context
552 .dependency_metadata()
553 .get(dist.name(), Some(dist.version()))
554 {
555 return Ok(ArchiveMetadata::from_metadata23(metadata.clone()));
556 }
557
558 let result = self
559 .client
560 .managed(|client| {
561 client
562 .wheel_metadata(dist, self.build_context.capabilities())
563 .boxed_local()
564 })
565 .await;
566
567 match result {
568 Ok(metadata) => {
569 Ok(ArchiveMetadata::from_metadata23(metadata))
571 }
572 Err(err) if err.is_http_streaming_unsupported() => {
573 warn!(
574 "Streaming unsupported when fetching metadata for {dist}; downloading wheel directly ({err})"
575 );
576
577 let wheel = self.get_wheel(dist, hashes).await?;
580 let metadata = wheel.metadata()?;
581 let hashes = wheel.hashes;
582 Ok(ArchiveMetadata {
583 metadata: Metadata::from_metadata23(metadata),
584 hashes,
585 })
586 }
587 Err(err) => Err(err.into()),
588 }
589 }
590
591 pub async fn build_wheel_metadata(
596 &self,
597 source: &BuildableSource<'_>,
598 hashes: HashPolicy<'_>,
599 ) -> Result<ArchiveMetadata, Error> {
600 if let Some(dist) = source.as_dist() {
602 if let Some(metadata) = self
603 .build_context
604 .dependency_metadata()
605 .get(dist.name(), dist.version())
606 {
607 self.builder.resolve_revision(source, &self.client).await?;
610
611 return Ok(ArchiveMetadata::from_metadata23(metadata.clone()));
612 }
613 }
614
615 let metadata = self
616 .builder
617 .download_and_build_metadata(source, hashes, &self.client)
618 .boxed_local()
619 .await?;
620
621 Ok(metadata)
622 }
623
624 pub async fn requires_dist(
626 &self,
627 path: &Path,
628 pyproject_toml: &PyProjectToml,
629 ) -> Result<Option<RequiresDist>, Error> {
630 self.builder
631 .source_tree_requires_dist(
632 path,
633 pyproject_toml,
634 self.client.unmanaged.credentials_cache(),
635 )
636 .await
637 }
638
639 async fn stream_wheel(
641 &self,
642 url: DisplaySafeUrl,
643 index: Option<&IndexUrl>,
644 filename: &WheelFilename,
645 extension: WheelExtension,
646 size: Option<u64>,
647 wheel_entry: &CacheEntry,
648 dist: &BuiltDist,
649 hashes: HashPolicy<'_>,
650 ) -> Result<Archive, Error> {
651 #[cfg(windows)]
653 let _lock = {
654 let lock_entry = wheel_entry.with_file(format!("{}.lock", filename.stem()));
655 lock_entry.lock().await.map_err(Error::CacheLock)?
656 };
657
658 let http_entry = wheel_entry.with_file(format!("{}.http", filename.cache_key()));
660
661 let query_url = &url.clone();
662
663 let download = |response: reqwest::Response| {
664 async {
665 let size = size.or_else(|| content_length(&response));
666
667 let progress = self
668 .reporter
669 .as_ref()
670 .map(|reporter| (reporter, reporter.on_download_start(dist.name(), size)));
671
672 let reader = response
673 .bytes_stream()
674 .map_err(|err| self.handle_response_errors(err))
675 .into_async_read();
676
677 let algorithms = hashes.algorithms();
679 let mut hashers = algorithms.into_iter().map(Hasher::from).collect::<Vec<_>>();
680 let mut hasher = uv_extract::hash::HashReader::new(reader.compat(), &mut hashers);
681
682 let temp_dir = tempfile::tempdir_in(self.build_context.cache().root())
684 .map_err(Error::CacheWrite)?;
685
686 match progress {
687 Some((reporter, progress)) => {
688 let mut reader = ProgressReader::new(&mut hasher, progress, &**reporter);
689 match extension {
690 WheelExtension::Whl => {
691 uv_extract::stream::unzip(query_url, &mut reader, temp_dir.path())
692 .await
693 .map_err(|err| Error::Extract(filename.to_string(), err))?;
694 }
695 WheelExtension::WhlZst => {
696 uv_extract::stream::untar_zst(&mut reader, temp_dir.path())
697 .await
698 .map_err(|err| Error::Extract(filename.to_string(), err))?;
699 }
700 }
701 }
702 None => match extension {
703 WheelExtension::Whl => {
704 uv_extract::stream::unzip(query_url, &mut hasher, temp_dir.path())
705 .await
706 .map_err(|err| Error::Extract(filename.to_string(), err))?;
707 }
708 WheelExtension::WhlZst => {
709 uv_extract::stream::untar_zst(&mut hasher, temp_dir.path())
710 .await
711 .map_err(|err| Error::Extract(filename.to_string(), err))?;
712 }
713 },
714 }
715
716 if !hashes.is_none() {
718 hasher.finish().await.map_err(Error::HashExhaustion)?;
719 }
720
721 let id = self
723 .build_context
724 .cache()
725 .persist(temp_dir.keep(), wheel_entry.path())
726 .await
727 .map_err(Error::CacheRead)?;
728
729 if let Some((reporter, progress)) = progress {
730 reporter.on_download_complete(dist.name(), progress);
731 }
732
733 Ok(Archive::new(
734 id,
735 hashers.into_iter().map(HashDigest::from).collect(),
736 filename.clone(),
737 ))
738 }
739 .instrument(info_span!("wheel", wheel = %dist))
740 };
741
742 let req = self.request(url.clone())?;
744
745 let cache_control = match self.client.unmanaged.connectivity() {
747 Connectivity::Online => {
748 if let Some(header) = index.and_then(|index| {
749 self.build_context
750 .locations()
751 .artifact_cache_control_for(index)
752 }) {
753 CacheControl::Override(header)
754 } else {
755 CacheControl::from(
756 self.build_context
757 .cache()
758 .freshness(&http_entry, Some(&filename.name), None)
759 .map_err(Error::CacheRead)?,
760 )
761 }
762 }
763 Connectivity::Offline => CacheControl::AllowStale,
764 };
765
766 let archive = self
767 .client
768 .managed(|client| {
769 client.cached_client().get_serde_with_retry(
770 req,
771 &http_entry,
772 cache_control.clone(),
773 download,
774 )
775 })
776 .await
777 .map_err(|err| match err {
778 CachedClientError::Callback { err, .. } => err,
779 CachedClientError::Client(err) => Error::Client(err),
780 })?;
781
782 let archive = Some(archive)
784 .filter(|archive| archive.has_digests(hashes))
785 .filter(|archive| archive.exists(self.build_context.cache()));
786
787 let archive = if let Some(archive) = archive {
788 archive
789 } else {
790 self.client
791 .managed(async |client| {
792 client
793 .cached_client()
794 .skip_cache_with_retry(
795 self.request(url)?,
796 &http_entry,
797 cache_control,
798 download,
799 )
800 .await
801 .map_err(|err| match err {
802 CachedClientError::Callback { err, .. } => err,
803 CachedClientError::Client(err) => Error::Client(err),
804 })
805 })
806 .await?
807 };
808
809 Ok(archive)
810 }
811
812 async fn download_wheel(
814 &self,
815 url: DisplaySafeUrl,
816 index: Option<&IndexUrl>,
817 filename: &WheelFilename,
818 extension: WheelExtension,
819 size: Option<u64>,
820 wheel_entry: &CacheEntry,
821 dist: &BuiltDist,
822 hashes: HashPolicy<'_>,
823 ) -> Result<Archive, Error> {
824 #[cfg(windows)]
826 let _lock = {
827 let lock_entry = wheel_entry.with_file(format!("{}.lock", filename.stem()));
828 lock_entry.lock().await.map_err(Error::CacheLock)?
829 };
830
831 let http_entry = wheel_entry.with_file(format!("{}.http", filename.cache_key()));
833
834 let query_url = &url.clone();
835
836 let download = |response: reqwest::Response| {
837 async {
838 let size = size.or_else(|| content_length(&response));
839
840 let progress = self
841 .reporter
842 .as_ref()
843 .map(|reporter| (reporter, reporter.on_download_start(dist.name(), size)));
844
845 let reader = response
846 .bytes_stream()
847 .map_err(|err| self.handle_response_errors(err))
848 .into_async_read();
849
850 let temp_file = tempfile::tempfile_in(self.build_context.cache().root())
852 .map_err(Error::CacheWrite)?;
853 let mut writer = tokio::io::BufWriter::new(fs_err::tokio::File::from_std(
854 fs_err::File::from_parts(temp_file, self.build_context.cache().root()),
856 ));
857
858 match progress {
859 Some((reporter, progress)) => {
860 let mut reader =
864 ProgressReader::new(reader.compat(), progress, &**reporter);
865
866 tokio::io::copy(&mut reader, &mut writer)
867 .await
868 .map_err(Error::CacheWrite)?;
869 }
870 None => {
871 tokio::io::copy(&mut reader.compat(), &mut writer)
872 .await
873 .map_err(Error::CacheWrite)?;
874 }
875 }
876
877 let temp_dir = tempfile::tempdir_in(self.build_context.cache().root())
879 .map_err(Error::CacheWrite)?;
880 let mut file = writer.into_inner();
881 file.seek(io::SeekFrom::Start(0))
882 .await
883 .map_err(Error::CacheWrite)?;
884
885 let hashes = if hashes.is_none() {
887 let file = file.into_std().await;
888 tokio::task::spawn_blocking({
889 let target = temp_dir.path().to_owned();
890 move || -> Result<(), uv_extract::Error> {
891 match extension {
893 WheelExtension::Whl => {
894 uv_extract::unzip(file, &target)?;
895 }
896 WheelExtension::WhlZst => {
897 uv_extract::stream::untar_zst_file(file, &target)?;
898 }
899 }
900 Ok(())
901 }
902 })
903 .await?
904 .map_err(|err| Error::Extract(filename.to_string(), err))?;
905
906 HashDigests::empty()
907 } else {
908 let algorithms = hashes.algorithms();
910 let mut hashers = algorithms.into_iter().map(Hasher::from).collect::<Vec<_>>();
911 let mut hasher = uv_extract::hash::HashReader::new(file, &mut hashers);
912
913 match extension {
914 WheelExtension::Whl => {
915 uv_extract::stream::unzip(query_url, &mut hasher, temp_dir.path())
916 .await
917 .map_err(|err| Error::Extract(filename.to_string(), err))?;
918 }
919 WheelExtension::WhlZst => {
920 uv_extract::stream::untar_zst(&mut hasher, temp_dir.path())
921 .await
922 .map_err(|err| Error::Extract(filename.to_string(), err))?;
923 }
924 }
925
926 hasher.finish().await.map_err(Error::HashExhaustion)?;
928
929 hashers.into_iter().map(HashDigest::from).collect()
930 };
931
932 let id = self
934 .build_context
935 .cache()
936 .persist(temp_dir.keep(), wheel_entry.path())
937 .await
938 .map_err(Error::CacheRead)?;
939
940 if let Some((reporter, progress)) = progress {
941 reporter.on_download_complete(dist.name(), progress);
942 }
943
944 Ok(Archive::new(id, hashes, filename.clone()))
945 }
946 .instrument(info_span!("wheel", wheel = %dist))
947 };
948
949 let req = self.request(url.clone())?;
951
952 let cache_control = match self.client.unmanaged.connectivity() {
954 Connectivity::Online => {
955 if let Some(header) = index.and_then(|index| {
956 self.build_context
957 .locations()
958 .artifact_cache_control_for(index)
959 }) {
960 CacheControl::Override(header)
961 } else {
962 CacheControl::from(
963 self.build_context
964 .cache()
965 .freshness(&http_entry, Some(&filename.name), None)
966 .map_err(Error::CacheRead)?,
967 )
968 }
969 }
970 Connectivity::Offline => CacheControl::AllowStale,
971 };
972
973 let archive = self
974 .client
975 .managed(|client| {
976 client.cached_client().get_serde_with_retry(
977 req,
978 &http_entry,
979 cache_control.clone(),
980 download,
981 )
982 })
983 .await
984 .map_err(|err| match err {
985 CachedClientError::Callback { err, .. } => err,
986 CachedClientError::Client(err) => Error::Client(err),
987 })?;
988
989 let archive = Some(archive)
991 .filter(|archive| archive.has_digests(hashes))
992 .filter(|archive| archive.exists(self.build_context.cache()));
993
994 let archive = if let Some(archive) = archive {
995 archive
996 } else {
997 self.client
998 .managed(async |client| {
999 client
1000 .cached_client()
1001 .skip_cache_with_retry(
1002 self.request(url)?,
1003 &http_entry,
1004 cache_control,
1005 download,
1006 )
1007 .await
1008 .map_err(|err| match err {
1009 CachedClientError::Callback { err, .. } => err,
1010 CachedClientError::Client(err) => Error::Client(err),
1011 })
1012 })
1013 .await?
1014 };
1015
1016 Ok(archive)
1017 }
1018
1019 async fn load_wheel(
1021 &self,
1022 path: &Path,
1023 filename: &WheelFilename,
1024 extension: WheelExtension,
1025 wheel_entry: CacheEntry,
1026 dist: &BuiltDist,
1027 hashes: HashPolicy<'_>,
1028 ) -> Result<LocalWheel, Error> {
1029 #[cfg(windows)]
1030 let _lock = {
1031 let lock_entry = wheel_entry.with_file(format!("{}.lock", filename.stem()));
1032 lock_entry.lock().await.map_err(Error::CacheLock)?
1033 };
1034
1035 let modified = Timestamp::from_path(path).map_err(Error::CacheRead)?;
1037
1038 let pointer_entry = wheel_entry.with_file(format!("{}.rev", filename.cache_key()));
1040 let pointer = LocalArchivePointer::read_from(&pointer_entry)?;
1041
1042 let archive = pointer
1044 .filter(|pointer| pointer.is_up_to_date(modified))
1045 .map(LocalArchivePointer::into_archive)
1046 .filter(|archive| archive.has_digests(hashes));
1047
1048 if let Some(archive) = archive {
1050 Ok(LocalWheel {
1051 dist: Dist::Built(dist.clone()),
1052 archive: self
1053 .build_context
1054 .cache()
1055 .archive(&archive.id)
1056 .into_boxed_path(),
1057 hashes: archive.hashes,
1058 filename: filename.clone(),
1059 cache: CacheInfo::from_timestamp(modified),
1060 build: None,
1061 })
1062 } else if hashes.is_none() {
1063 let archive = Archive::new(
1065 self.unzip_wheel(path, wheel_entry.path()).await?,
1066 HashDigests::empty(),
1067 filename.clone(),
1068 );
1069
1070 let pointer = LocalArchivePointer {
1072 timestamp: modified,
1073 archive: archive.clone(),
1074 };
1075 pointer.write_to(&pointer_entry).await?;
1076
1077 Ok(LocalWheel {
1078 dist: Dist::Built(dist.clone()),
1079 archive: self
1080 .build_context
1081 .cache()
1082 .archive(&archive.id)
1083 .into_boxed_path(),
1084 hashes: archive.hashes,
1085 filename: filename.clone(),
1086 cache: CacheInfo::from_timestamp(modified),
1087 build: None,
1088 })
1089 } else {
1090 let file = fs_err::tokio::File::open(path)
1092 .await
1093 .map_err(Error::CacheRead)?;
1094 let temp_dir = tempfile::tempdir_in(self.build_context.cache().root())
1095 .map_err(Error::CacheWrite)?;
1096
1097 let algorithms = hashes.algorithms();
1099 let mut hashers = algorithms.into_iter().map(Hasher::from).collect::<Vec<_>>();
1100 let mut hasher = uv_extract::hash::HashReader::new(file, &mut hashers);
1101
1102 match extension {
1104 WheelExtension::Whl => {
1105 uv_extract::stream::unzip(path.display(), &mut hasher, temp_dir.path())
1106 .await
1107 .map_err(|err| Error::Extract(filename.to_string(), err))?;
1108 }
1109 WheelExtension::WhlZst => {
1110 uv_extract::stream::untar_zst(&mut hasher, temp_dir.path())
1111 .await
1112 .map_err(|err| Error::Extract(filename.to_string(), err))?;
1113 }
1114 }
1115
1116 hasher.finish().await.map_err(Error::HashExhaustion)?;
1118
1119 let hashes = hashers.into_iter().map(HashDigest::from).collect();
1120
1121 let id = self
1123 .build_context
1124 .cache()
1125 .persist(temp_dir.keep(), wheel_entry.path())
1126 .await
1127 .map_err(Error::CacheWrite)?;
1128
1129 let archive = Archive::new(id, hashes, filename.clone());
1131
1132 let pointer = LocalArchivePointer {
1134 timestamp: modified,
1135 archive: archive.clone(),
1136 };
1137 pointer.write_to(&pointer_entry).await?;
1138
1139 Ok(LocalWheel {
1140 dist: Dist::Built(dist.clone()),
1141 archive: self
1142 .build_context
1143 .cache()
1144 .archive(&archive.id)
1145 .into_boxed_path(),
1146 hashes: archive.hashes,
1147 filename: filename.clone(),
1148 cache: CacheInfo::from_timestamp(modified),
1149 build: None,
1150 })
1151 }
1152 }
1153
1154 async fn unzip_wheel(&self, path: &Path, target: &Path) -> Result<ArchiveId, Error> {
1156 let temp_dir = tokio::task::spawn_blocking({
1157 let path = path.to_owned();
1158 let root = self.build_context.cache().root().to_path_buf();
1159 move || -> Result<TempDir, Error> {
1160 let temp_dir = tempfile::tempdir_in(root).map_err(Error::CacheWrite)?;
1162 let reader = fs_err::File::open(&path).map_err(Error::CacheWrite)?;
1163 uv_extract::unzip(reader, temp_dir.path())
1164 .map_err(|err| Error::Extract(path.to_string_lossy().into_owned(), err))?;
1165 Ok(temp_dir)
1166 }
1167 })
1168 .await??;
1169
1170 let id = self
1172 .build_context
1173 .cache()
1174 .persist(temp_dir.keep(), target)
1175 .await
1176 .map_err(Error::CacheWrite)?;
1177
1178 Ok(id)
1179 }
1180
1181 fn request(&self, url: DisplaySafeUrl) -> Result<reqwest::Request, reqwest::Error> {
1183 self.client
1184 .unmanaged
1185 .uncached_client(&url)
1186 .get(Url::from(url))
1187 .header(
1188 "accept-encoding",
1192 reqwest::header::HeaderValue::from_static("identity"),
1193 )
1194 .build()
1195 }
1196
1197 pub fn client(&self) -> &ManagedClient<'a> {
1199 &self.client
1200 }
1201}
1202
1203pub struct ManagedClient<'a> {
1205 pub unmanaged: &'a RegistryClient,
1206 control: Arc<Semaphore>,
1207}
1208
1209impl<'a> ManagedClient<'a> {
1210 fn new(client: &'a RegistryClient, control: Arc<Semaphore>) -> Self {
1212 ManagedClient {
1213 unmanaged: client,
1214 control,
1215 }
1216 }
1217
1218 pub async fn managed<F, T>(&self, f: impl FnOnce(&'a RegistryClient) -> F) -> T
1223 where
1224 F: Future<Output = T>,
1225 {
1226 let _permit = self.control.acquire().await.unwrap();
1227 f(self.unmanaged).await
1228 }
1229
1230 pub async fn manual<F, T>(&'a self, f: impl FnOnce(&'a RegistryClient, &'a Semaphore) -> F) -> T
1238 where
1239 F: Future<Output = T>,
1240 {
1241 f(self.unmanaged, &self.control).await
1242 }
1243}
1244
1245fn content_length(response: &reqwest::Response) -> Option<u64> {
1247 response
1248 .headers()
1249 .get(reqwest::header::CONTENT_LENGTH)
1250 .and_then(|val| val.to_str().ok())
1251 .and_then(|val| val.parse::<u64>().ok())
1252}
1253
1254struct ProgressReader<'a, R> {
1256 reader: R,
1257 index: usize,
1258 reporter: &'a dyn Reporter,
1259}
1260
1261impl<'a, R> ProgressReader<'a, R> {
1262 fn new(reader: R, index: usize, reporter: &'a dyn Reporter) -> Self {
1264 Self {
1265 reader,
1266 index,
1267 reporter,
1268 }
1269 }
1270}
1271
1272impl<R> AsyncRead for ProgressReader<'_, R>
1273where
1274 R: AsyncRead + Unpin,
1275{
1276 fn poll_read(
1277 mut self: Pin<&mut Self>,
1278 cx: &mut Context<'_>,
1279 buf: &mut ReadBuf<'_>,
1280 ) -> Poll<io::Result<()>> {
1281 Pin::new(&mut self.as_mut().reader)
1282 .poll_read(cx, buf)
1283 .map_ok(|()| {
1284 self.reporter
1285 .on_download_progress(self.index, buf.filled().len() as u64);
1286 })
1287 }
1288}
1289
1290#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
1294pub struct HttpArchivePointer {
1295 archive: Archive,
1296}
1297
1298impl HttpArchivePointer {
1299 pub fn read_from(path: impl AsRef<Path>) -> Result<Option<Self>, Error> {
1301 match fs_err::File::open(path.as_ref()) {
1302 Ok(file) => {
1303 let data = DataWithCachePolicy::from_reader(file)?.data;
1304 let archive = rmp_serde::from_slice::<Archive>(&data)?;
1305 Ok(Some(Self { archive }))
1306 }
1307 Err(err) if err.kind() == io::ErrorKind::NotFound => Ok(None),
1308 Err(err) => Err(Error::CacheRead(err)),
1309 }
1310 }
1311
1312 pub fn into_archive(self) -> Archive {
1314 self.archive
1315 }
1316
1317 pub fn to_cache_info(&self) -> CacheInfo {
1319 CacheInfo::default()
1320 }
1321
1322 pub fn to_build_info(&self) -> Option<BuildInfo> {
1324 None
1325 }
1326}
1327
1328#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
1332pub struct LocalArchivePointer {
1333 timestamp: Timestamp,
1334 archive: Archive,
1335}
1336
1337impl LocalArchivePointer {
1338 pub fn read_from(path: impl AsRef<Path>) -> Result<Option<Self>, Error> {
1340 match fs_err::read(path) {
1341 Ok(cached) => Ok(Some(rmp_serde::from_slice::<Self>(&cached)?)),
1342 Err(err) if err.kind() == io::ErrorKind::NotFound => Ok(None),
1343 Err(err) => Err(Error::CacheRead(err)),
1344 }
1345 }
1346
1347 pub async fn write_to(&self, entry: &CacheEntry) -> Result<(), Error> {
1349 write_atomic(entry.path(), rmp_serde::to_vec(&self)?)
1350 .await
1351 .map_err(Error::CacheWrite)
1352 }
1353
1354 pub fn is_up_to_date(&self, modified: Timestamp) -> bool {
1356 self.timestamp == modified
1357 }
1358
1359 pub fn into_archive(self) -> Archive {
1361 self.archive
1362 }
1363
1364 pub fn to_cache_info(&self) -> CacheInfo {
1366 CacheInfo::from_timestamp(self.timestamp)
1367 }
1368
1369 pub fn to_build_info(&self) -> Option<BuildInfo> {
1371 None
1372 }
1373}
1374
1375#[derive(Debug, Clone)]
1376struct WheelTarget {
1377 url: DisplaySafeUrl,
1379 extension: WheelExtension,
1381 size: Option<u64>,
1383}
1384
1385impl TryFrom<&File> for WheelTarget {
1386 type Error = ToUrlError;
1387
1388 fn try_from(file: &File) -> Result<Self, Self::Error> {
1390 let url = file.url.to_url()?;
1391 if let Some(zstd) = file.zstd.as_ref() {
1392 Ok(Self {
1393 url: add_tar_zst_extension(url),
1394 extension: WheelExtension::WhlZst,
1395 size: zstd.size,
1396 })
1397 } else {
1398 Ok(Self {
1399 url,
1400 extension: WheelExtension::Whl,
1401 size: file.size,
1402 })
1403 }
1404 }
1405}
1406
1407#[derive(Debug, Copy, Clone, PartialEq, Eq)]
1408enum WheelExtension {
1409 Whl,
1411 WhlZst,
1413}
1414
1415#[must_use]
1417fn add_tar_zst_extension(mut url: DisplaySafeUrl) -> DisplaySafeUrl {
1418 let mut path = url.path().to_string();
1419
1420 if !path.ends_with(".tar.zst") {
1421 path.push_str(".tar.zst");
1422 }
1423
1424 url.set_path(&path);
1425 url
1426}
1427
1428#[cfg(test)]
1429mod tests {
1430 use super::*;
1431
1432 #[test]
1433 fn test_add_tar_zst_extension() {
1434 let url =
1435 DisplaySafeUrl::parse("https://files.pythonhosted.org/flask-3.1.0-py3-none-any.whl")
1436 .unwrap();
1437 assert_eq!(
1438 add_tar_zst_extension(url).as_str(),
1439 "https://files.pythonhosted.org/flask-3.1.0-py3-none-any.whl.tar.zst"
1440 );
1441
1442 let url = DisplaySafeUrl::parse(
1443 "https://files.pythonhosted.org/flask-3.1.0-py3-none-any.whl.tar.zst",
1444 )
1445 .unwrap();
1446 assert_eq!(
1447 add_tar_zst_extension(url).as_str(),
1448 "https://files.pythonhosted.org/flask-3.1.0-py3-none-any.whl.tar.zst"
1449 );
1450
1451 let url = DisplaySafeUrl::parse(
1452 "https://files.pythonhosted.org/flask-3.1.0%2Bcu124-py3-none-any.whl",
1453 )
1454 .unwrap();
1455 assert_eq!(
1456 add_tar_zst_extension(url).as_str(),
1457 "https://files.pythonhosted.org/flask-3.1.0%2Bcu124-py3-none-any.whl.tar.zst"
1458 );
1459 }
1460}