1use std::future::Future;
2use std::io;
3use std::path::Path;
4use std::pin::Pin;
5use std::sync::Arc;
6use std::task::{Context, Poll};
7
8use futures::{FutureExt, TryStreamExt};
9use tempfile::TempDir;
10use tokio::io::{AsyncRead, AsyncSeekExt, ReadBuf};
11use tokio::sync::Semaphore;
12use tokio_util::compat::FuturesAsyncReadCompatExt;
13use tracing::{Instrument, info_span, instrument, warn};
14use url::Url;
15
16use uv_cache::{ArchiveId, CacheBucket, CacheEntry, WheelCache};
17use uv_cache_info::{CacheInfo, Timestamp};
18use uv_client::{
19 CacheControl, CachedClientError, Connectivity, DataWithCachePolicy, RegistryClient,
20};
21use uv_distribution_filename::{SourceDistExtension, WheelFilename};
22use uv_distribution_types::{
23 BuildInfo, BuildableSource, BuiltDist, Dist, File, HashPolicy, Hashed, IndexUrl, InstalledDist,
24 Name, SourceDist, ToUrlError,
25};
26use uv_extract::hash::Hasher;
27use uv_fs::write_atomic;
28use uv_platform_tags::Tags;
29use uv_pypi_types::{HashDigest, HashDigests, PyProjectToml};
30use uv_redacted::DisplaySafeUrl;
31use uv_types::{BuildContext, BuildStack};
32use uv_warnings::warn_user_once;
33
34use crate::archive::Archive;
35use crate::metadata::{ArchiveMetadata, Metadata};
36use crate::source::SourceDistributionBuilder;
37use crate::{Error, LocalWheel, Reporter, RequiresDist};
38
39pub struct DistributionDatabase<'a, Context: BuildContext> {
52 build_context: &'a Context,
53 builder: SourceDistributionBuilder<'a, Context>,
54 client: ManagedClient<'a>,
55 reporter: Option<Arc<dyn Reporter>>,
56}
57
58impl<'a, Context: BuildContext> DistributionDatabase<'a, Context> {
59 pub fn new(
60 client: &'a RegistryClient,
61 build_context: &'a Context,
62 downloads_semaphore: Arc<Semaphore>,
63 ) -> Self {
64 Self {
65 build_context,
66 builder: SourceDistributionBuilder::new(build_context),
67 client: ManagedClient::new(client, downloads_semaphore),
68 reporter: None,
69 }
70 }
71
72 #[must_use]
74 pub fn with_build_stack(self, build_stack: &'a BuildStack) -> Self {
75 Self {
76 builder: self.builder.with_build_stack(build_stack),
77 ..self
78 }
79 }
80
81 #[must_use]
83 pub fn with_reporter(self, reporter: Arc<dyn Reporter>) -> Self {
84 Self {
85 builder: self.builder.with_reporter(reporter.clone()),
86 reporter: Some(reporter),
87 ..self
88 }
89 }
90
91 fn handle_response_errors(&self, err: reqwest::Error) -> io::Error {
93 if err.is_timeout() {
94 io::Error::new(
96 io::ErrorKind::TimedOut,
97 format!(
98 "Failed to download distribution due to network timeout. Try increasing UV_HTTP_TIMEOUT (current value: {}s).",
99 self.client.unmanaged.read_timeout().as_secs()
100 ),
101 )
102 } else {
103 io::Error::other(err)
104 }
105 }
106
107 #[instrument(skip_all, fields(%dist))]
114 pub async fn get_or_build_wheel(
115 &self,
116 dist: &Dist,
117 tags: &Tags,
118 hashes: HashPolicy<'_>,
119 ) -> Result<LocalWheel, Error> {
120 match dist {
121 Dist::Built(built) => self.get_wheel(built, hashes).await,
122 Dist::Source(source) => self.build_wheel(source, tags, hashes).await,
123 }
124 }
125
126 #[instrument(skip_all, fields(%dist))]
132 pub async fn get_installed_metadata(
133 &self,
134 dist: &InstalledDist,
135 ) -> Result<ArchiveMetadata, Error> {
136 if let Some(metadata) = self
138 .build_context
139 .dependency_metadata()
140 .get(dist.name(), Some(dist.version()))
141 {
142 return Ok(ArchiveMetadata::from_metadata23(metadata.clone()));
143 }
144
145 let metadata = dist
146 .read_metadata()
147 .map_err(|err| Error::ReadInstalled(Box::new(dist.clone()), err))?;
148
149 Ok(ArchiveMetadata::from_metadata23(metadata.clone()))
150 }
151
152 #[instrument(skip_all, fields(%dist))]
158 pub async fn get_or_build_wheel_metadata(
159 &self,
160 dist: &Dist,
161 hashes: HashPolicy<'_>,
162 ) -> Result<ArchiveMetadata, Error> {
163 match dist {
164 Dist::Built(built) => self.get_wheel_metadata(built, hashes).await,
165 Dist::Source(source) => {
166 self.build_wheel_metadata(&BuildableSource::Dist(source), hashes)
167 .await
168 }
169 }
170 }
171
172 async fn get_wheel(
177 &self,
178 dist: &BuiltDist,
179 hashes: HashPolicy<'_>,
180 ) -> Result<LocalWheel, Error> {
181 match dist {
182 BuiltDist::Registry(wheels) => {
183 let wheel = wheels.best_wheel();
184 let WheelTarget {
185 url,
186 extension,
187 size,
188 } = WheelTarget::try_from(&*wheel.file)?;
189
190 let wheel_entry = self.build_context.cache().entry(
192 CacheBucket::Wheels,
193 WheelCache::Index(&wheel.index).wheel_dir(wheel.name().as_ref()),
194 wheel.filename.cache_key(),
195 );
196
197 if url.scheme() == "file" {
199 let path = url
200 .to_file_path()
201 .map_err(|()| Error::NonFileUrl(url.clone()))?;
202 return self
203 .load_wheel(
204 &path,
205 &wheel.filename,
206 WheelExtension::Whl,
207 wheel_entry,
208 dist,
209 hashes,
210 )
211 .await;
212 }
213
214 match self
216 .stream_wheel(
217 url.clone(),
218 dist.index(),
219 &wheel.filename,
220 extension,
221 size,
222 &wheel_entry,
223 dist,
224 hashes,
225 )
226 .await
227 {
228 Ok(archive) => Ok(LocalWheel {
229 dist: Dist::Built(dist.clone()),
230 archive: self
231 .build_context
232 .cache()
233 .archive(&archive.id)
234 .into_boxed_path(),
235 hashes: archive.hashes,
236 filename: wheel.filename.clone(),
237 cache: CacheInfo::default(),
238 build: None,
239 }),
240 Err(Error::Extract(name, err)) => {
241 if err.is_http_streaming_unsupported() {
242 warn!(
243 "Streaming unsupported for {dist}; downloading wheel to disk ({err})"
244 );
245 } else if err.is_http_streaming_failed() {
246 warn!("Streaming failed for {dist}; downloading wheel to disk ({err})");
247 } else {
248 return Err(Error::Extract(name, err));
249 }
250
251 let archive = self
254 .download_wheel(
255 url,
256 dist.index(),
257 &wheel.filename,
258 extension,
259 size,
260 &wheel_entry,
261 dist,
262 hashes,
263 )
264 .await?;
265
266 Ok(LocalWheel {
267 dist: Dist::Built(dist.clone()),
268 archive: self
269 .build_context
270 .cache()
271 .archive(&archive.id)
272 .into_boxed_path(),
273 hashes: archive.hashes,
274 filename: wheel.filename.clone(),
275 cache: CacheInfo::default(),
276 build: None,
277 })
278 }
279 Err(err) => Err(err),
280 }
281 }
282
283 BuiltDist::DirectUrl(wheel) => {
284 let wheel_entry = self.build_context.cache().entry(
286 CacheBucket::Wheels,
287 WheelCache::Url(&wheel.url).wheel_dir(wheel.name().as_ref()),
288 wheel.filename.cache_key(),
289 );
290
291 match self
293 .stream_wheel(
294 wheel.url.raw().clone(),
295 None,
296 &wheel.filename,
297 WheelExtension::Whl,
298 None,
299 &wheel_entry,
300 dist,
301 hashes,
302 )
303 .await
304 {
305 Ok(archive) => Ok(LocalWheel {
306 dist: Dist::Built(dist.clone()),
307 archive: self
308 .build_context
309 .cache()
310 .archive(&archive.id)
311 .into_boxed_path(),
312 hashes: archive.hashes,
313 filename: wheel.filename.clone(),
314 cache: CacheInfo::default(),
315 build: None,
316 }),
317 Err(Error::Extract(name, err)) => {
318 if err.is_http_streaming_unsupported() {
319 warn!(
320 "Streaming unsupported for {dist}; downloading wheel to disk ({err})"
321 );
322 } else if err.is_http_streaming_failed() {
323 warn!("Streaming failed for {dist}; downloading wheel to disk ({err})");
324 } else {
325 return Err(Error::Extract(name, err));
326 }
327
328 let archive = self
331 .download_wheel(
332 wheel.url.raw().clone(),
333 None,
334 &wheel.filename,
335 WheelExtension::Whl,
336 None,
337 &wheel_entry,
338 dist,
339 hashes,
340 )
341 .await?;
342 Ok(LocalWheel {
343 dist: Dist::Built(dist.clone()),
344 archive: self
345 .build_context
346 .cache()
347 .archive(&archive.id)
348 .into_boxed_path(),
349 hashes: archive.hashes,
350 filename: wheel.filename.clone(),
351 cache: CacheInfo::default(),
352 build: None,
353 })
354 }
355 Err(err) => Err(err),
356 }
357 }
358
359 BuiltDist::Path(wheel) => {
360 let cache_entry = self.build_context.cache().entry(
361 CacheBucket::Wheels,
362 WheelCache::Url(&wheel.url).wheel_dir(wheel.name().as_ref()),
363 wheel.filename.cache_key(),
364 );
365
366 self.load_wheel(
367 &wheel.install_path,
368 &wheel.filename,
369 WheelExtension::Whl,
370 cache_entry,
371 dist,
372 hashes,
373 )
374 .await
375 }
376 }
377 }
378
379 async fn build_wheel(
385 &self,
386 dist: &SourceDist,
387 tags: &Tags,
388 hashes: HashPolicy<'_>,
389 ) -> Result<LocalWheel, Error> {
390 if let Some(extension) = dist.extension()
398 && !matches!(
399 extension,
400 SourceDistExtension::TarGz | SourceDistExtension::Zip
401 )
402 {
403 if matches!(dist, SourceDist::Registry(_)) {
404 warn_user_once!(
408 "{dist} uses a legacy source distribution format ('.{extension}') that is not compliant with PEP 625. A future version of uv will reject this source distribution. Consider upgrading to a newer version of {package}",
409 package = dist.name(),
410 );
411 } else {
412 warn_user_once!(
413 "{dist} is not a standards-compliant source distribution: expected '.tar.gz' but found '.{extension}'. A future version of uv will reject source distributions that do not meet the requirements specified in PEP 625",
414 );
415 }
416 }
417
418 let built_wheel = self
419 .builder
420 .download_and_build(&BuildableSource::Dist(dist), tags, hashes, &self.client)
421 .boxed_local()
422 .await?;
423
424 if !built_wheel.filename.is_compatible(tags) {
430 return if tags.is_cross() {
431 Err(Error::BuiltWheelIncompatibleTargetPlatform {
432 filename: built_wheel.filename,
433 python_platform: tags.python_platform().clone(),
434 python_version: tags.python_version(),
435 })
436 } else {
437 Err(Error::BuiltWheelIncompatibleHostPlatform {
438 filename: built_wheel.filename,
439 python_platform: tags.python_platform().clone(),
440 python_version: tags.python_version(),
441 })
442 };
443 }
444
445 #[cfg(windows)]
447 let _lock = {
448 let lock_entry = CacheEntry::new(
449 built_wheel.target.parent().unwrap(),
450 format!(
451 "{}.lock",
452 built_wheel.target.file_name().unwrap().to_str().unwrap()
453 ),
454 );
455 lock_entry.lock().await.map_err(Error::CacheLock)?
456 };
457
458 match self.build_context.cache().resolve_link(&built_wheel.target) {
461 Ok(archive) => {
462 return Ok(LocalWheel {
463 dist: Dist::Source(dist.clone()),
464 archive: archive.into_boxed_path(),
465 filename: built_wheel.filename,
466 hashes: built_wheel.hashes,
467 cache: built_wheel.cache_info,
468 build: Some(built_wheel.build_info),
469 });
470 }
471 Err(err) if err.kind() == io::ErrorKind::NotFound => {}
472 Err(err) => return Err(Error::CacheRead(err)),
473 }
474
475 let id = self
477 .unzip_wheel(&built_wheel.path, &built_wheel.target)
478 .await?;
479
480 Ok(LocalWheel {
481 dist: Dist::Source(dist.clone()),
482 archive: self.build_context.cache().archive(&id).into_boxed_path(),
483 hashes: built_wheel.hashes,
484 filename: built_wheel.filename,
485 cache: built_wheel.cache_info,
486 build: Some(built_wheel.build_info),
487 })
488 }
489
490 async fn get_wheel_metadata(
495 &self,
496 dist: &BuiltDist,
497 hashes: HashPolicy<'_>,
498 ) -> Result<ArchiveMetadata, Error> {
499 if hashes.is_generate(dist) {
514 let wheel = self.get_wheel(dist, hashes).await?;
515 let metadata = if let Some(metadata) = self
517 .build_context
518 .dependency_metadata()
519 .get(dist.name(), Some(dist.version()))
520 {
521 metadata.clone()
522 } else {
523 wheel.metadata()?
524 };
525 let hashes = wheel.hashes;
526 return Ok(ArchiveMetadata {
527 metadata: Metadata::from_metadata23(metadata),
528 hashes,
529 });
530 }
531
532 if let Some(metadata) = self
534 .build_context
535 .dependency_metadata()
536 .get(dist.name(), Some(dist.version()))
537 {
538 return Ok(ArchiveMetadata::from_metadata23(metadata.clone()));
539 }
540
541 let result = self
542 .client
543 .managed(|client| {
544 client
545 .wheel_metadata(dist, self.build_context.capabilities())
546 .boxed_local()
547 })
548 .await;
549
550 match result {
551 Ok(metadata) => {
552 Ok(ArchiveMetadata::from_metadata23(metadata))
554 }
555 Err(err) if err.is_http_streaming_unsupported() => {
556 warn!(
557 "Streaming unsupported when fetching metadata for {dist}; downloading wheel directly ({err})"
558 );
559
560 let wheel = self.get_wheel(dist, hashes).await?;
563 let metadata = wheel.metadata()?;
564 let hashes = wheel.hashes;
565 Ok(ArchiveMetadata {
566 metadata: Metadata::from_metadata23(metadata),
567 hashes,
568 })
569 }
570 Err(err) => Err(err.into()),
571 }
572 }
573
574 pub async fn build_wheel_metadata(
579 &self,
580 source: &BuildableSource<'_>,
581 hashes: HashPolicy<'_>,
582 ) -> Result<ArchiveMetadata, Error> {
583 if let Some(dist) = source.as_dist() {
585 if let Some(metadata) = self
586 .build_context
587 .dependency_metadata()
588 .get(dist.name(), dist.version())
589 {
590 self.builder.resolve_revision(source, &self.client).await?;
593
594 return Ok(ArchiveMetadata::from_metadata23(metadata.clone()));
595 }
596 }
597
598 let metadata = self
599 .builder
600 .download_and_build_metadata(source, hashes, &self.client)
601 .boxed_local()
602 .await?;
603
604 Ok(metadata)
605 }
606
607 pub async fn requires_dist(
609 &self,
610 path: &Path,
611 pyproject_toml: &PyProjectToml,
612 ) -> Result<Option<RequiresDist>, Error> {
613 self.builder
614 .source_tree_requires_dist(
615 path,
616 pyproject_toml,
617 self.client.unmanaged.credentials_cache(),
618 )
619 .await
620 }
621
622 async fn stream_wheel(
624 &self,
625 url: DisplaySafeUrl,
626 index: Option<&IndexUrl>,
627 filename: &WheelFilename,
628 extension: WheelExtension,
629 size: Option<u64>,
630 wheel_entry: &CacheEntry,
631 dist: &BuiltDist,
632 hashes: HashPolicy<'_>,
633 ) -> Result<Archive, Error> {
634 #[cfg(windows)]
636 let _lock = {
637 let lock_entry = wheel_entry.with_file(format!("{}.lock", filename.stem()));
638 lock_entry.lock().await.map_err(Error::CacheLock)?
639 };
640
641 let http_entry = wheel_entry.with_file(format!("{}.http", filename.cache_key()));
643
644 let query_url = &url.clone();
645
646 let download = |response: reqwest::Response| {
647 async {
648 let size = size.or_else(|| content_length(&response));
649
650 let progress = self
651 .reporter
652 .as_ref()
653 .map(|reporter| (reporter, reporter.on_download_start(dist.name(), size)));
654
655 let reader = response
656 .bytes_stream()
657 .map_err(|err| self.handle_response_errors(err))
658 .into_async_read();
659
660 let algorithms = hashes.algorithms();
662 let mut hashers = algorithms.into_iter().map(Hasher::from).collect::<Vec<_>>();
663 let mut hasher = uv_extract::hash::HashReader::new(reader.compat(), &mut hashers);
664
665 let temp_dir = tempfile::tempdir_in(self.build_context.cache().root())
667 .map_err(Error::CacheWrite)?;
668
669 match progress {
670 Some((reporter, progress)) => {
671 let mut reader = ProgressReader::new(&mut hasher, progress, &**reporter);
672 match extension {
673 WheelExtension::Whl => {
674 uv_extract::stream::unzip(query_url, &mut reader, temp_dir.path())
675 .await
676 .map_err(|err| Error::Extract(filename.to_string(), err))?;
677 }
678 WheelExtension::WhlZst => {
679 uv_extract::stream::untar_zst(&mut reader, temp_dir.path())
680 .await
681 .map_err(|err| Error::Extract(filename.to_string(), err))?;
682 }
683 }
684 }
685 None => match extension {
686 WheelExtension::Whl => {
687 uv_extract::stream::unzip(query_url, &mut hasher, temp_dir.path())
688 .await
689 .map_err(|err| Error::Extract(filename.to_string(), err))?;
690 }
691 WheelExtension::WhlZst => {
692 uv_extract::stream::untar_zst(&mut hasher, temp_dir.path())
693 .await
694 .map_err(|err| Error::Extract(filename.to_string(), err))?;
695 }
696 },
697 }
698
699 if !hashes.is_none() {
701 hasher.finish().await.map_err(Error::HashExhaustion)?;
702 }
703
704 let id = self
706 .build_context
707 .cache()
708 .persist(temp_dir.keep(), wheel_entry.path())
709 .await
710 .map_err(Error::CacheRead)?;
711
712 if let Some((reporter, progress)) = progress {
713 reporter.on_download_complete(dist.name(), progress);
714 }
715
716 Ok(Archive::new(
717 id,
718 hashers.into_iter().map(HashDigest::from).collect(),
719 filename.clone(),
720 ))
721 }
722 .instrument(info_span!("wheel", wheel = %dist))
723 };
724
725 let req = self.request(url.clone())?;
727
728 let cache_control = match self.client.unmanaged.connectivity() {
730 Connectivity::Online => {
731 if let Some(header) = index.and_then(|index| {
732 self.build_context
733 .locations()
734 .artifact_cache_control_for(index)
735 }) {
736 CacheControl::Override(header)
737 } else {
738 CacheControl::from(
739 self.build_context
740 .cache()
741 .freshness(&http_entry, Some(&filename.name), None)
742 .map_err(Error::CacheRead)?,
743 )
744 }
745 }
746 Connectivity::Offline => CacheControl::AllowStale,
747 };
748
749 let archive = self
750 .client
751 .managed(|client| {
752 client.cached_client().get_serde_with_retry(
753 req,
754 &http_entry,
755 cache_control.clone(),
756 download,
757 )
758 })
759 .await
760 .map_err(|err| match err {
761 CachedClientError::Callback { err, .. } => err,
762 CachedClientError::Client(err) => Error::Client(err),
763 })?;
764
765 let archive = Some(archive)
767 .filter(|archive| archive.has_digests(hashes))
768 .filter(|archive| archive.exists(self.build_context.cache()));
769
770 let archive = if let Some(archive) = archive {
771 archive
772 } else {
773 self.client
774 .managed(async |client| {
775 client
776 .cached_client()
777 .skip_cache_with_retry(
778 self.request(url)?,
779 &http_entry,
780 cache_control,
781 download,
782 )
783 .await
784 .map_err(|err| match err {
785 CachedClientError::Callback { err, .. } => err,
786 CachedClientError::Client(err) => Error::Client(err),
787 })
788 })
789 .await?
790 };
791
792 Ok(archive)
793 }
794
795 async fn download_wheel(
797 &self,
798 url: DisplaySafeUrl,
799 index: Option<&IndexUrl>,
800 filename: &WheelFilename,
801 extension: WheelExtension,
802 size: Option<u64>,
803 wheel_entry: &CacheEntry,
804 dist: &BuiltDist,
805 hashes: HashPolicy<'_>,
806 ) -> Result<Archive, Error> {
807 #[cfg(windows)]
809 let _lock = {
810 let lock_entry = wheel_entry.with_file(format!("{}.lock", filename.stem()));
811 lock_entry.lock().await.map_err(Error::CacheLock)?
812 };
813
814 let http_entry = wheel_entry.with_file(format!("{}.http", filename.cache_key()));
816
817 let query_url = &url.clone();
818
819 let download = |response: reqwest::Response| {
820 async {
821 let size = size.or_else(|| content_length(&response));
822
823 let progress = self
824 .reporter
825 .as_ref()
826 .map(|reporter| (reporter, reporter.on_download_start(dist.name(), size)));
827
828 let reader = response
829 .bytes_stream()
830 .map_err(|err| self.handle_response_errors(err))
831 .into_async_read();
832
833 let temp_file = tempfile::tempfile_in(self.build_context.cache().root())
835 .map_err(Error::CacheWrite)?;
836 let mut writer = tokio::io::BufWriter::new(fs_err::tokio::File::from_std(
837 fs_err::File::from_parts(temp_file, self.build_context.cache().root()),
839 ));
840
841 match progress {
842 Some((reporter, progress)) => {
843 let mut reader =
847 ProgressReader::new(reader.compat(), progress, &**reporter);
848
849 tokio::io::copy(&mut reader, &mut writer)
850 .await
851 .map_err(Error::CacheWrite)?;
852 }
853 None => {
854 tokio::io::copy(&mut reader.compat(), &mut writer)
855 .await
856 .map_err(Error::CacheWrite)?;
857 }
858 }
859
860 let temp_dir = tempfile::tempdir_in(self.build_context.cache().root())
862 .map_err(Error::CacheWrite)?;
863 let mut file = writer.into_inner();
864 file.seek(io::SeekFrom::Start(0))
865 .await
866 .map_err(Error::CacheWrite)?;
867
868 let hashes = if hashes.is_none() {
870 let file = file.into_std().await;
871 tokio::task::spawn_blocking({
872 let target = temp_dir.path().to_owned();
873 move || -> Result<(), uv_extract::Error> {
874 match extension {
876 WheelExtension::Whl => {
877 uv_extract::unzip(file, &target)?;
878 }
879 WheelExtension::WhlZst => {
880 uv_extract::stream::untar_zst_file(file, &target)?;
881 }
882 }
883 Ok(())
884 }
885 })
886 .await?
887 .map_err(|err| Error::Extract(filename.to_string(), err))?;
888
889 HashDigests::empty()
890 } else {
891 let algorithms = hashes.algorithms();
893 let mut hashers = algorithms.into_iter().map(Hasher::from).collect::<Vec<_>>();
894 let mut hasher = uv_extract::hash::HashReader::new(file, &mut hashers);
895
896 match extension {
897 WheelExtension::Whl => {
898 uv_extract::stream::unzip(query_url, &mut hasher, temp_dir.path())
899 .await
900 .map_err(|err| Error::Extract(filename.to_string(), err))?;
901 }
902 WheelExtension::WhlZst => {
903 uv_extract::stream::untar_zst(&mut hasher, temp_dir.path())
904 .await
905 .map_err(|err| Error::Extract(filename.to_string(), err))?;
906 }
907 }
908
909 hasher.finish().await.map_err(Error::HashExhaustion)?;
911
912 hashers.into_iter().map(HashDigest::from).collect()
913 };
914
915 let id = self
917 .build_context
918 .cache()
919 .persist(temp_dir.keep(), wheel_entry.path())
920 .await
921 .map_err(Error::CacheRead)?;
922
923 if let Some((reporter, progress)) = progress {
924 reporter.on_download_complete(dist.name(), progress);
925 }
926
927 Ok(Archive::new(id, hashes, filename.clone()))
928 }
929 .instrument(info_span!("wheel", wheel = %dist))
930 };
931
932 let req = self.request(url.clone())?;
934
935 let cache_control = match self.client.unmanaged.connectivity() {
937 Connectivity::Online => {
938 if let Some(header) = index.and_then(|index| {
939 self.build_context
940 .locations()
941 .artifact_cache_control_for(index)
942 }) {
943 CacheControl::Override(header)
944 } else {
945 CacheControl::from(
946 self.build_context
947 .cache()
948 .freshness(&http_entry, Some(&filename.name), None)
949 .map_err(Error::CacheRead)?,
950 )
951 }
952 }
953 Connectivity::Offline => CacheControl::AllowStale,
954 };
955
956 let archive = self
957 .client
958 .managed(|client| {
959 client.cached_client().get_serde_with_retry(
960 req,
961 &http_entry,
962 cache_control.clone(),
963 download,
964 )
965 })
966 .await
967 .map_err(|err| match err {
968 CachedClientError::Callback { err, .. } => err,
969 CachedClientError::Client(err) => Error::Client(err),
970 })?;
971
972 let archive = Some(archive)
974 .filter(|archive| archive.has_digests(hashes))
975 .filter(|archive| archive.exists(self.build_context.cache()));
976
977 let archive = if let Some(archive) = archive {
978 archive
979 } else {
980 self.client
981 .managed(async |client| {
982 client
983 .cached_client()
984 .skip_cache_with_retry(
985 self.request(url)?,
986 &http_entry,
987 cache_control,
988 download,
989 )
990 .await
991 .map_err(|err| match err {
992 CachedClientError::Callback { err, .. } => err,
993 CachedClientError::Client(err) => Error::Client(err),
994 })
995 })
996 .await?
997 };
998
999 Ok(archive)
1000 }
1001
1002 async fn load_wheel(
1004 &self,
1005 path: &Path,
1006 filename: &WheelFilename,
1007 extension: WheelExtension,
1008 wheel_entry: CacheEntry,
1009 dist: &BuiltDist,
1010 hashes: HashPolicy<'_>,
1011 ) -> Result<LocalWheel, Error> {
1012 #[cfg(windows)]
1013 let _lock = {
1014 let lock_entry = wheel_entry.with_file(format!("{}.lock", filename.stem()));
1015 lock_entry.lock().await.map_err(Error::CacheLock)?
1016 };
1017
1018 let modified = Timestamp::from_path(path).map_err(Error::CacheRead)?;
1020
1021 let pointer_entry = wheel_entry.with_file(format!("{}.rev", filename.cache_key()));
1023 let pointer = LocalArchivePointer::read_from(&pointer_entry)?;
1024
1025 let archive = pointer
1027 .filter(|pointer| pointer.is_up_to_date(modified))
1028 .map(LocalArchivePointer::into_archive)
1029 .filter(|archive| archive.has_digests(hashes));
1030
1031 if let Some(archive) = archive {
1033 Ok(LocalWheel {
1034 dist: Dist::Built(dist.clone()),
1035 archive: self
1036 .build_context
1037 .cache()
1038 .archive(&archive.id)
1039 .into_boxed_path(),
1040 hashes: archive.hashes,
1041 filename: filename.clone(),
1042 cache: CacheInfo::from_timestamp(modified),
1043 build: None,
1044 })
1045 } else if hashes.is_none() {
1046 let archive = Archive::new(
1048 self.unzip_wheel(path, wheel_entry.path()).await?,
1049 HashDigests::empty(),
1050 filename.clone(),
1051 );
1052
1053 let pointer = LocalArchivePointer {
1055 timestamp: modified,
1056 archive: archive.clone(),
1057 };
1058 pointer.write_to(&pointer_entry).await?;
1059
1060 Ok(LocalWheel {
1061 dist: Dist::Built(dist.clone()),
1062 archive: self
1063 .build_context
1064 .cache()
1065 .archive(&archive.id)
1066 .into_boxed_path(),
1067 hashes: archive.hashes,
1068 filename: filename.clone(),
1069 cache: CacheInfo::from_timestamp(modified),
1070 build: None,
1071 })
1072 } else {
1073 let file = fs_err::tokio::File::open(path)
1075 .await
1076 .map_err(Error::CacheRead)?;
1077 let temp_dir = tempfile::tempdir_in(self.build_context.cache().root())
1078 .map_err(Error::CacheWrite)?;
1079
1080 let algorithms = hashes.algorithms();
1082 let mut hashers = algorithms.into_iter().map(Hasher::from).collect::<Vec<_>>();
1083 let mut hasher = uv_extract::hash::HashReader::new(file, &mut hashers);
1084
1085 match extension {
1087 WheelExtension::Whl => {
1088 uv_extract::stream::unzip(path.display(), &mut hasher, temp_dir.path())
1089 .await
1090 .map_err(|err| Error::Extract(filename.to_string(), err))?;
1091 }
1092 WheelExtension::WhlZst => {
1093 uv_extract::stream::untar_zst(&mut hasher, temp_dir.path())
1094 .await
1095 .map_err(|err| Error::Extract(filename.to_string(), err))?;
1096 }
1097 }
1098
1099 hasher.finish().await.map_err(Error::HashExhaustion)?;
1101
1102 let hashes = hashers.into_iter().map(HashDigest::from).collect();
1103
1104 let id = self
1106 .build_context
1107 .cache()
1108 .persist(temp_dir.keep(), wheel_entry.path())
1109 .await
1110 .map_err(Error::CacheWrite)?;
1111
1112 let archive = Archive::new(id, hashes, filename.clone());
1114
1115 let pointer = LocalArchivePointer {
1117 timestamp: modified,
1118 archive: archive.clone(),
1119 };
1120 pointer.write_to(&pointer_entry).await?;
1121
1122 Ok(LocalWheel {
1123 dist: Dist::Built(dist.clone()),
1124 archive: self
1125 .build_context
1126 .cache()
1127 .archive(&archive.id)
1128 .into_boxed_path(),
1129 hashes: archive.hashes,
1130 filename: filename.clone(),
1131 cache: CacheInfo::from_timestamp(modified),
1132 build: None,
1133 })
1134 }
1135 }
1136
1137 async fn unzip_wheel(&self, path: &Path, target: &Path) -> Result<ArchiveId, Error> {
1139 let temp_dir = tokio::task::spawn_blocking({
1140 let path = path.to_owned();
1141 let root = self.build_context.cache().root().to_path_buf();
1142 move || -> Result<TempDir, Error> {
1143 let temp_dir = tempfile::tempdir_in(root).map_err(Error::CacheWrite)?;
1145 let reader = fs_err::File::open(&path).map_err(Error::CacheWrite)?;
1146 uv_extract::unzip(reader, temp_dir.path())
1147 .map_err(|err| Error::Extract(path.to_string_lossy().into_owned(), err))?;
1148 Ok(temp_dir)
1149 }
1150 })
1151 .await??;
1152
1153 let id = self
1155 .build_context
1156 .cache()
1157 .persist(temp_dir.keep(), target)
1158 .await
1159 .map_err(Error::CacheWrite)?;
1160
1161 Ok(id)
1162 }
1163
1164 fn request(&self, url: DisplaySafeUrl) -> Result<reqwest::Request, reqwest::Error> {
1166 self.client
1167 .unmanaged
1168 .uncached_client(&url)
1169 .get(Url::from(url))
1170 .header(
1171 "accept-encoding",
1175 reqwest::header::HeaderValue::from_static("identity"),
1176 )
1177 .build()
1178 }
1179
1180 pub fn client(&self) -> &ManagedClient<'a> {
1182 &self.client
1183 }
1184}
1185
1186pub struct ManagedClient<'a> {
1188 pub unmanaged: &'a RegistryClient,
1189 control: Arc<Semaphore>,
1190}
1191
1192impl<'a> ManagedClient<'a> {
1193 fn new(client: &'a RegistryClient, control: Arc<Semaphore>) -> Self {
1195 ManagedClient {
1196 unmanaged: client,
1197 control,
1198 }
1199 }
1200
1201 pub async fn managed<F, T>(&self, f: impl FnOnce(&'a RegistryClient) -> F) -> T
1206 where
1207 F: Future<Output = T>,
1208 {
1209 let _permit = self.control.acquire().await.unwrap();
1210 f(self.unmanaged).await
1211 }
1212
1213 pub async fn manual<F, T>(&'a self, f: impl FnOnce(&'a RegistryClient, &'a Semaphore) -> F) -> T
1221 where
1222 F: Future<Output = T>,
1223 {
1224 f(self.unmanaged, &self.control).await
1225 }
1226}
1227
1228fn content_length(response: &reqwest::Response) -> Option<u64> {
1230 response
1231 .headers()
1232 .get(reqwest::header::CONTENT_LENGTH)
1233 .and_then(|val| val.to_str().ok())
1234 .and_then(|val| val.parse::<u64>().ok())
1235}
1236
1237struct ProgressReader<'a, R> {
1239 reader: R,
1240 index: usize,
1241 reporter: &'a dyn Reporter,
1242}
1243
1244impl<'a, R> ProgressReader<'a, R> {
1245 fn new(reader: R, index: usize, reporter: &'a dyn Reporter) -> Self {
1247 Self {
1248 reader,
1249 index,
1250 reporter,
1251 }
1252 }
1253}
1254
1255impl<R> AsyncRead for ProgressReader<'_, R>
1256where
1257 R: AsyncRead + Unpin,
1258{
1259 fn poll_read(
1260 mut self: Pin<&mut Self>,
1261 cx: &mut Context<'_>,
1262 buf: &mut ReadBuf<'_>,
1263 ) -> Poll<io::Result<()>> {
1264 Pin::new(&mut self.as_mut().reader)
1265 .poll_read(cx, buf)
1266 .map_ok(|()| {
1267 self.reporter
1268 .on_download_progress(self.index, buf.filled().len() as u64);
1269 })
1270 }
1271}
1272
1273#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
1277pub struct HttpArchivePointer {
1278 archive: Archive,
1279}
1280
1281impl HttpArchivePointer {
1282 pub fn read_from(path: impl AsRef<Path>) -> Result<Option<Self>, Error> {
1284 match fs_err::File::open(path.as_ref()) {
1285 Ok(file) => {
1286 let data = DataWithCachePolicy::from_reader(file)?.data;
1287 let archive = rmp_serde::from_slice::<Archive>(&data)?;
1288 Ok(Some(Self { archive }))
1289 }
1290 Err(err) if err.kind() == io::ErrorKind::NotFound => Ok(None),
1291 Err(err) => Err(Error::CacheRead(err)),
1292 }
1293 }
1294
1295 pub fn into_archive(self) -> Archive {
1297 self.archive
1298 }
1299
1300 pub fn to_cache_info(&self) -> CacheInfo {
1302 CacheInfo::default()
1303 }
1304
1305 pub fn to_build_info(&self) -> Option<BuildInfo> {
1307 None
1308 }
1309}
1310
1311#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
1315pub struct LocalArchivePointer {
1316 timestamp: Timestamp,
1317 archive: Archive,
1318}
1319
1320impl LocalArchivePointer {
1321 pub fn read_from(path: impl AsRef<Path>) -> Result<Option<Self>, Error> {
1323 match fs_err::read(path) {
1324 Ok(cached) => Ok(Some(rmp_serde::from_slice::<Self>(&cached)?)),
1325 Err(err) if err.kind() == io::ErrorKind::NotFound => Ok(None),
1326 Err(err) => Err(Error::CacheRead(err)),
1327 }
1328 }
1329
1330 pub async fn write_to(&self, entry: &CacheEntry) -> Result<(), Error> {
1332 write_atomic(entry.path(), rmp_serde::to_vec(&self)?)
1333 .await
1334 .map_err(Error::CacheWrite)
1335 }
1336
1337 pub fn is_up_to_date(&self, modified: Timestamp) -> bool {
1339 self.timestamp == modified
1340 }
1341
1342 pub fn into_archive(self) -> Archive {
1344 self.archive
1345 }
1346
1347 pub fn to_cache_info(&self) -> CacheInfo {
1349 CacheInfo::from_timestamp(self.timestamp)
1350 }
1351
1352 pub fn to_build_info(&self) -> Option<BuildInfo> {
1354 None
1355 }
1356}
1357
1358#[derive(Debug, Clone)]
1359struct WheelTarget {
1360 url: DisplaySafeUrl,
1362 extension: WheelExtension,
1364 size: Option<u64>,
1366}
1367
1368impl TryFrom<&File> for WheelTarget {
1369 type Error = ToUrlError;
1370
1371 fn try_from(file: &File) -> Result<Self, Self::Error> {
1373 let url = file.url.to_url()?;
1374 if let Some(zstd) = file.zstd.as_ref() {
1375 Ok(Self {
1376 url: add_tar_zst_extension(url),
1377 extension: WheelExtension::WhlZst,
1378 size: zstd.size,
1379 })
1380 } else {
1381 Ok(Self {
1382 url,
1383 extension: WheelExtension::Whl,
1384 size: file.size,
1385 })
1386 }
1387 }
1388}
1389
1390#[derive(Debug, Copy, Clone, PartialEq, Eq)]
1391enum WheelExtension {
1392 Whl,
1394 WhlZst,
1396}
1397
1398#[must_use]
1400fn add_tar_zst_extension(mut url: DisplaySafeUrl) -> DisplaySafeUrl {
1401 let mut path = url.path().to_string();
1402
1403 if !path.ends_with(".tar.zst") {
1404 path.push_str(".tar.zst");
1405 }
1406
1407 url.set_path(&path);
1408 url
1409}
1410
1411#[cfg(test)]
1412mod tests {
1413 use super::*;
1414
1415 #[test]
1416 fn test_add_tar_zst_extension() {
1417 let url =
1418 DisplaySafeUrl::parse("https://files.pythonhosted.org/flask-3.1.0-py3-none-any.whl")
1419 .unwrap();
1420 assert_eq!(
1421 add_tar_zst_extension(url).as_str(),
1422 "https://files.pythonhosted.org/flask-3.1.0-py3-none-any.whl.tar.zst"
1423 );
1424
1425 let url = DisplaySafeUrl::parse(
1426 "https://files.pythonhosted.org/flask-3.1.0-py3-none-any.whl.tar.zst",
1427 )
1428 .unwrap();
1429 assert_eq!(
1430 add_tar_zst_extension(url).as_str(),
1431 "https://files.pythonhosted.org/flask-3.1.0-py3-none-any.whl.tar.zst"
1432 );
1433
1434 let url = DisplaySafeUrl::parse(
1435 "https://files.pythonhosted.org/flask-3.1.0%2Bcu124-py3-none-any.whl",
1436 )
1437 .unwrap();
1438 assert_eq!(
1439 add_tar_zst_extension(url).as_str(),
1440 "https://files.pythonhosted.org/flask-3.1.0%2Bcu124-py3-none-any.whl.tar.zst"
1441 );
1442 }
1443}