1use std::future::Future;
2use std::io;
3use std::path::Path;
4use std::pin::Pin;
5use std::sync::Arc;
6use std::task::{Context, Poll};
7
8use futures::{FutureExt, TryStreamExt};
9use tempfile::TempDir;
10use tokio::io::{AsyncRead, AsyncSeekExt, ReadBuf};
11use tokio::sync::Semaphore;
12use tokio_util::compat::FuturesAsyncReadCompatExt;
13use tracing::{Instrument, info_span, instrument, warn};
14use url::Url;
15
16use uv_cache::{ArchiveId, CacheBucket, CacheEntry, WheelCache};
17use uv_cache_info::{CacheInfo, Timestamp};
18use uv_client::{
19 CacheControl, CachedClientError, Connectivity, DataWithCachePolicy, RegistryClient,
20};
21use uv_distribution_filename::{SourceDistExtension, WheelFilename};
22use uv_distribution_types::{
23 BuildInfo, BuildableSource, BuiltDist, Dist, File, HashPolicy, Hashed, IndexUrl, InstalledDist,
24 Name, SourceDist, ToUrlError,
25};
26use uv_extract::hash::Hasher;
27use uv_fs::write_atomic;
28use uv_platform_tags::Tags;
29use uv_pypi_types::{HashDigest, HashDigests, PyProjectToml};
30use uv_redacted::DisplaySafeUrl;
31use uv_types::{BuildContext, BuildStack};
32use uv_warnings::warn_user_once;
33
34use crate::archive::Archive;
35use crate::metadata::{ArchiveMetadata, Metadata};
36use crate::source::SourceDistributionBuilder;
37use crate::{Error, LocalWheel, Reporter, RequiresDist};
38
39pub struct DistributionDatabase<'a, Context: BuildContext> {
52 build_context: &'a Context,
53 builder: SourceDistributionBuilder<'a, Context>,
54 client: ManagedClient<'a>,
55 reporter: Option<Arc<dyn Reporter>>,
56}
57
58impl<'a, Context: BuildContext> DistributionDatabase<'a, Context> {
59 pub fn new(
60 client: &'a RegistryClient,
61 build_context: &'a Context,
62 downloads_semaphore: Arc<Semaphore>,
63 ) -> Self {
64 Self {
65 build_context,
66 builder: SourceDistributionBuilder::new(build_context),
67 client: ManagedClient::new(client, downloads_semaphore),
68 reporter: None,
69 }
70 }
71
72 #[must_use]
74 pub fn with_build_stack(self, build_stack: &'a BuildStack) -> Self {
75 Self {
76 builder: self.builder.with_build_stack(build_stack),
77 ..self
78 }
79 }
80
81 #[must_use]
83 pub fn with_reporter(self, reporter: Arc<dyn Reporter>) -> Self {
84 Self {
85 builder: self.builder.with_reporter(reporter.clone()),
86 reporter: Some(reporter),
87 ..self
88 }
89 }
90
91 fn handle_response_errors(&self, err: reqwest::Error) -> io::Error {
93 if err.is_timeout() {
94 io::Error::new(
96 io::ErrorKind::TimedOut,
97 format!(
98 "Failed to download distribution due to network timeout. Try increasing UV_HTTP_TIMEOUT (current value: {}s).",
99 self.client.unmanaged.read_timeout().as_secs()
100 ),
101 )
102 } else {
103 io::Error::other(err)
104 }
105 }
106
107 #[instrument(skip_all, fields(%dist))]
114 pub async fn get_or_build_wheel(
115 &self,
116 dist: &Dist,
117 tags: &Tags,
118 hashes: HashPolicy<'_>,
119 ) -> Result<LocalWheel, Error> {
120 match dist {
121 Dist::Built(built) => self.get_wheel(built, hashes).await,
122 Dist::Source(source) => self.build_wheel(source, tags, hashes).await,
123 }
124 }
125
126 #[instrument(skip_all, fields(%dist))]
132 pub async fn get_installed_metadata(
133 &self,
134 dist: &InstalledDist,
135 ) -> Result<ArchiveMetadata, Error> {
136 if let Some(metadata) = self
138 .build_context
139 .dependency_metadata()
140 .get(dist.name(), Some(dist.version()))
141 {
142 return Ok(ArchiveMetadata::from_metadata23(metadata.clone()));
143 }
144
145 let metadata = dist
146 .read_metadata()
147 .map_err(|err| Error::ReadInstalled(Box::new(dist.clone()), err))?;
148
149 Ok(ArchiveMetadata::from_metadata23(metadata.clone()))
150 }
151
152 #[instrument(skip_all, fields(%dist))]
158 pub async fn get_or_build_wheel_metadata(
159 &self,
160 dist: &Dist,
161 hashes: HashPolicy<'_>,
162 ) -> Result<ArchiveMetadata, Error> {
163 match dist {
164 Dist::Built(built) => self.get_wheel_metadata(built, hashes).await,
165 Dist::Source(source) => {
166 self.build_wheel_metadata(&BuildableSource::Dist(source), hashes)
167 .await
168 }
169 }
170 }
171
172 async fn get_wheel(
177 &self,
178 dist: &BuiltDist,
179 hashes: HashPolicy<'_>,
180 ) -> Result<LocalWheel, Error> {
181 match dist {
182 BuiltDist::Registry(wheels) => {
183 let wheel = wheels.best_wheel();
184 let WheelTarget {
185 url,
186 extension,
187 size,
188 } = WheelTarget::try_from(&*wheel.file)?;
189
190 let wheel_entry = self.build_context.cache().entry(
192 CacheBucket::Wheels,
193 WheelCache::Index(&wheel.index).wheel_dir(wheel.name().as_ref()),
194 wheel.filename.cache_key(),
195 );
196
197 if url.scheme() == "file" {
199 let path = url
200 .to_file_path()
201 .map_err(|()| Error::NonFileUrl(url.clone()))?;
202 return self
203 .load_wheel(
204 &path,
205 &wheel.filename,
206 WheelExtension::Whl,
207 wheel_entry,
208 dist,
209 hashes,
210 )
211 .await;
212 }
213
214 match self
216 .stream_wheel(
217 url.clone(),
218 dist.index(),
219 &wheel.filename,
220 extension,
221 size,
222 &wheel_entry,
223 dist,
224 hashes,
225 )
226 .await
227 {
228 Ok(archive) => Ok(LocalWheel {
229 dist: Dist::Built(dist.clone()),
230 archive: self
231 .build_context
232 .cache()
233 .archive(&archive.id)
234 .into_boxed_path(),
235 hashes: archive.hashes,
236 filename: wheel.filename.clone(),
237 cache: CacheInfo::default(),
238 build: None,
239 }),
240 Err(Error::Extract(name, err)) => {
241 if err.is_http_streaming_unsupported() {
242 warn!(
243 "Streaming unsupported for {dist}; downloading wheel to disk ({err})"
244 );
245 } else if err.is_http_streaming_failed() {
246 warn!("Streaming failed for {dist}; downloading wheel to disk ({err})");
247 } else {
248 return Err(Error::Extract(name, err));
249 }
250
251 let archive = self
254 .download_wheel(
255 url,
256 dist.index(),
257 &wheel.filename,
258 extension,
259 size,
260 &wheel_entry,
261 dist,
262 hashes,
263 )
264 .await?;
265
266 Ok(LocalWheel {
267 dist: Dist::Built(dist.clone()),
268 archive: self
269 .build_context
270 .cache()
271 .archive(&archive.id)
272 .into_boxed_path(),
273 hashes: archive.hashes,
274 filename: wheel.filename.clone(),
275 cache: CacheInfo::default(),
276 build: None,
277 })
278 }
279 Err(err) => Err(err),
280 }
281 }
282
283 BuiltDist::DirectUrl(wheel) => {
284 let wheel_entry = self.build_context.cache().entry(
286 CacheBucket::Wheels,
287 WheelCache::Url(&wheel.url).wheel_dir(wheel.name().as_ref()),
288 wheel.filename.cache_key(),
289 );
290
291 match self
293 .stream_wheel(
294 wheel.url.raw().clone(),
295 None,
296 &wheel.filename,
297 WheelExtension::Whl,
298 None,
299 &wheel_entry,
300 dist,
301 hashes,
302 )
303 .await
304 {
305 Ok(archive) => Ok(LocalWheel {
306 dist: Dist::Built(dist.clone()),
307 archive: self
308 .build_context
309 .cache()
310 .archive(&archive.id)
311 .into_boxed_path(),
312 hashes: archive.hashes,
313 filename: wheel.filename.clone(),
314 cache: CacheInfo::default(),
315 build: None,
316 }),
317 Err(Error::Client(err)) if err.is_http_streaming_unsupported() => {
318 warn!(
319 "Streaming unsupported for {dist}; downloading wheel to disk ({err})"
320 );
321
322 let archive = self
325 .download_wheel(
326 wheel.url.raw().clone(),
327 None,
328 &wheel.filename,
329 WheelExtension::Whl,
330 None,
331 &wheel_entry,
332 dist,
333 hashes,
334 )
335 .await?;
336 Ok(LocalWheel {
337 dist: Dist::Built(dist.clone()),
338 archive: self
339 .build_context
340 .cache()
341 .archive(&archive.id)
342 .into_boxed_path(),
343 hashes: archive.hashes,
344 filename: wheel.filename.clone(),
345 cache: CacheInfo::default(),
346 build: None,
347 })
348 }
349 Err(err) => Err(err),
350 }
351 }
352
353 BuiltDist::Path(wheel) => {
354 let cache_entry = self.build_context.cache().entry(
355 CacheBucket::Wheels,
356 WheelCache::Url(&wheel.url).wheel_dir(wheel.name().as_ref()),
357 wheel.filename.cache_key(),
358 );
359
360 self.load_wheel(
361 &wheel.install_path,
362 &wheel.filename,
363 WheelExtension::Whl,
364 cache_entry,
365 dist,
366 hashes,
367 )
368 .await
369 }
370 }
371 }
372
373 async fn build_wheel(
379 &self,
380 dist: &SourceDist,
381 tags: &Tags,
382 hashes: HashPolicy<'_>,
383 ) -> Result<LocalWheel, Error> {
384 if let Some(extension) = dist.extension()
392 && !matches!(
393 extension,
394 SourceDistExtension::TarGz | SourceDistExtension::Zip
395 )
396 {
397 if matches!(dist, SourceDist::Registry(_)) {
398 warn_user_once!(
402 "{dist} uses a legacy source distribution format ('.{extension}') that is not compliant with PEP 625. A future version of uv will reject this source distribution. Consider upgrading to a newer version of {package}",
403 package = dist.name(),
404 );
405 } else {
406 warn_user_once!(
407 "{dist} is not a standards-compliant source distribution: expected '.tar.gz' but found '.{extension}'. A future version of uv will reject source distributions that do not meet the requirements specified in PEP 625",
408 );
409 }
410 }
411
412 let built_wheel = self
413 .builder
414 .download_and_build(&BuildableSource::Dist(dist), tags, hashes, &self.client)
415 .boxed_local()
416 .await?;
417
418 if !built_wheel.filename.is_compatible(tags) {
424 return if tags.is_cross() {
425 Err(Error::BuiltWheelIncompatibleTargetPlatform {
426 filename: built_wheel.filename,
427 python_platform: tags.python_platform().clone(),
428 python_version: tags.python_version(),
429 })
430 } else {
431 Err(Error::BuiltWheelIncompatibleHostPlatform {
432 filename: built_wheel.filename,
433 python_platform: tags.python_platform().clone(),
434 python_version: tags.python_version(),
435 })
436 };
437 }
438
439 #[cfg(windows)]
441 let _lock = {
442 let lock_entry = CacheEntry::new(
443 built_wheel.target.parent().unwrap(),
444 format!(
445 "{}.lock",
446 built_wheel.target.file_name().unwrap().to_str().unwrap()
447 ),
448 );
449 lock_entry.lock().await.map_err(Error::CacheLock)?
450 };
451
452 match self.build_context.cache().resolve_link(&built_wheel.target) {
455 Ok(archive) => {
456 return Ok(LocalWheel {
457 dist: Dist::Source(dist.clone()),
458 archive: archive.into_boxed_path(),
459 filename: built_wheel.filename,
460 hashes: built_wheel.hashes,
461 cache: built_wheel.cache_info,
462 build: Some(built_wheel.build_info),
463 });
464 }
465 Err(err) if err.kind() == io::ErrorKind::NotFound => {}
466 Err(err) => return Err(Error::CacheRead(err)),
467 }
468
469 let id = self
471 .unzip_wheel(&built_wheel.path, &built_wheel.target)
472 .await?;
473
474 Ok(LocalWheel {
475 dist: Dist::Source(dist.clone()),
476 archive: self.build_context.cache().archive(&id).into_boxed_path(),
477 hashes: built_wheel.hashes,
478 filename: built_wheel.filename,
479 cache: built_wheel.cache_info,
480 build: Some(built_wheel.build_info),
481 })
482 }
483
484 async fn get_wheel_metadata(
489 &self,
490 dist: &BuiltDist,
491 hashes: HashPolicy<'_>,
492 ) -> Result<ArchiveMetadata, Error> {
493 if hashes.is_generate(dist) {
508 let wheel = self.get_wheel(dist, hashes).await?;
509 let metadata = if let Some(metadata) = self
511 .build_context
512 .dependency_metadata()
513 .get(dist.name(), Some(dist.version()))
514 {
515 metadata.clone()
516 } else {
517 wheel.metadata()?
518 };
519 let hashes = wheel.hashes;
520 return Ok(ArchiveMetadata {
521 metadata: Metadata::from_metadata23(metadata),
522 hashes,
523 });
524 }
525
526 if let Some(metadata) = self
528 .build_context
529 .dependency_metadata()
530 .get(dist.name(), Some(dist.version()))
531 {
532 return Ok(ArchiveMetadata::from_metadata23(metadata.clone()));
533 }
534
535 let result = self
536 .client
537 .managed(|client| {
538 client
539 .wheel_metadata(dist, self.build_context.capabilities())
540 .boxed_local()
541 })
542 .await;
543
544 match result {
545 Ok(metadata) => {
546 Ok(ArchiveMetadata::from_metadata23(metadata))
548 }
549 Err(err) if err.is_http_streaming_unsupported() => {
550 warn!(
551 "Streaming unsupported when fetching metadata for {dist}; downloading wheel directly ({err})"
552 );
553
554 let wheel = self.get_wheel(dist, hashes).await?;
557 let metadata = wheel.metadata()?;
558 let hashes = wheel.hashes;
559 Ok(ArchiveMetadata {
560 metadata: Metadata::from_metadata23(metadata),
561 hashes,
562 })
563 }
564 Err(err) => Err(err.into()),
565 }
566 }
567
568 pub async fn build_wheel_metadata(
573 &self,
574 source: &BuildableSource<'_>,
575 hashes: HashPolicy<'_>,
576 ) -> Result<ArchiveMetadata, Error> {
577 if let Some(dist) = source.as_dist() {
579 if let Some(metadata) = self
580 .build_context
581 .dependency_metadata()
582 .get(dist.name(), dist.version())
583 {
584 self.builder.resolve_revision(source, &self.client).await?;
587
588 return Ok(ArchiveMetadata::from_metadata23(metadata.clone()));
589 }
590 }
591
592 let metadata = self
593 .builder
594 .download_and_build_metadata(source, hashes, &self.client)
595 .boxed_local()
596 .await?;
597
598 Ok(metadata)
599 }
600
601 pub async fn requires_dist(
603 &self,
604 path: &Path,
605 pyproject_toml: &PyProjectToml,
606 ) -> Result<Option<RequiresDist>, Error> {
607 self.builder
608 .source_tree_requires_dist(
609 path,
610 pyproject_toml,
611 self.client.unmanaged.credentials_cache(),
612 )
613 .await
614 }
615
616 async fn stream_wheel(
618 &self,
619 url: DisplaySafeUrl,
620 index: Option<&IndexUrl>,
621 filename: &WheelFilename,
622 extension: WheelExtension,
623 size: Option<u64>,
624 wheel_entry: &CacheEntry,
625 dist: &BuiltDist,
626 hashes: HashPolicy<'_>,
627 ) -> Result<Archive, Error> {
628 #[cfg(windows)]
630 let _lock = {
631 let lock_entry = wheel_entry.with_file(format!("{}.lock", filename.stem()));
632 lock_entry.lock().await.map_err(Error::CacheLock)?
633 };
634
635 let http_entry = wheel_entry.with_file(format!("{}.http", filename.cache_key()));
637
638 let query_url = &url.clone();
639
640 let download = |response: reqwest::Response| {
641 async {
642 let size = size.or_else(|| content_length(&response));
643
644 let progress = self
645 .reporter
646 .as_ref()
647 .map(|reporter| (reporter, reporter.on_download_start(dist.name(), size)));
648
649 let reader = response
650 .bytes_stream()
651 .map_err(|err| self.handle_response_errors(err))
652 .into_async_read();
653
654 let algorithms = hashes.algorithms();
656 let mut hashers = algorithms.into_iter().map(Hasher::from).collect::<Vec<_>>();
657 let mut hasher = uv_extract::hash::HashReader::new(reader.compat(), &mut hashers);
658
659 let temp_dir = tempfile::tempdir_in(self.build_context.cache().root())
661 .map_err(Error::CacheWrite)?;
662
663 match progress {
664 Some((reporter, progress)) => {
665 let mut reader = ProgressReader::new(&mut hasher, progress, &**reporter);
666 match extension {
667 WheelExtension::Whl => {
668 uv_extract::stream::unzip(query_url, &mut reader, temp_dir.path())
669 .await
670 .map_err(|err| Error::Extract(filename.to_string(), err))?;
671 }
672 WheelExtension::WhlZst => {
673 uv_extract::stream::untar_zst(&mut reader, temp_dir.path())
674 .await
675 .map_err(|err| Error::Extract(filename.to_string(), err))?;
676 }
677 }
678 }
679 None => match extension {
680 WheelExtension::Whl => {
681 uv_extract::stream::unzip(query_url, &mut hasher, temp_dir.path())
682 .await
683 .map_err(|err| Error::Extract(filename.to_string(), err))?;
684 }
685 WheelExtension::WhlZst => {
686 uv_extract::stream::untar_zst(&mut hasher, temp_dir.path())
687 .await
688 .map_err(|err| Error::Extract(filename.to_string(), err))?;
689 }
690 },
691 }
692
693 if !hashes.is_none() {
695 hasher.finish().await.map_err(Error::HashExhaustion)?;
696 }
697
698 let id = self
700 .build_context
701 .cache()
702 .persist(temp_dir.keep(), wheel_entry.path())
703 .await
704 .map_err(Error::CacheRead)?;
705
706 if let Some((reporter, progress)) = progress {
707 reporter.on_download_complete(dist.name(), progress);
708 }
709
710 Ok(Archive::new(
711 id,
712 hashers.into_iter().map(HashDigest::from).collect(),
713 filename.clone(),
714 ))
715 }
716 .instrument(info_span!("wheel", wheel = %dist))
717 };
718
719 let req = self.request(url.clone())?;
721
722 let cache_control = match self.client.unmanaged.connectivity() {
724 Connectivity::Online => {
725 if let Some(header) = index.and_then(|index| {
726 self.build_context
727 .locations()
728 .artifact_cache_control_for(index)
729 }) {
730 CacheControl::Override(header)
731 } else {
732 CacheControl::from(
733 self.build_context
734 .cache()
735 .freshness(&http_entry, Some(&filename.name), None)
736 .map_err(Error::CacheRead)?,
737 )
738 }
739 }
740 Connectivity::Offline => CacheControl::AllowStale,
741 };
742
743 let archive = self
744 .client
745 .managed(|client| {
746 client.cached_client().get_serde_with_retry(
747 req,
748 &http_entry,
749 cache_control.clone(),
750 download,
751 )
752 })
753 .await
754 .map_err(|err| match err {
755 CachedClientError::Callback { err, .. } => err,
756 CachedClientError::Client(err) => Error::Client(err),
757 })?;
758
759 let archive = Some(archive)
761 .filter(|archive| archive.has_digests(hashes))
762 .filter(|archive| archive.exists(self.build_context.cache()));
763
764 let archive = if let Some(archive) = archive {
765 archive
766 } else {
767 self.client
768 .managed(async |client| {
769 client
770 .cached_client()
771 .skip_cache_with_retry(
772 self.request(url)?,
773 &http_entry,
774 cache_control,
775 download,
776 )
777 .await
778 .map_err(|err| match err {
779 CachedClientError::Callback { err, .. } => err,
780 CachedClientError::Client(err) => Error::Client(err),
781 })
782 })
783 .await?
784 };
785
786 Ok(archive)
787 }
788
789 async fn download_wheel(
791 &self,
792 url: DisplaySafeUrl,
793 index: Option<&IndexUrl>,
794 filename: &WheelFilename,
795 extension: WheelExtension,
796 size: Option<u64>,
797 wheel_entry: &CacheEntry,
798 dist: &BuiltDist,
799 hashes: HashPolicy<'_>,
800 ) -> Result<Archive, Error> {
801 #[cfg(windows)]
803 let _lock = {
804 let lock_entry = wheel_entry.with_file(format!("{}.lock", filename.stem()));
805 lock_entry.lock().await.map_err(Error::CacheLock)?
806 };
807
808 let http_entry = wheel_entry.with_file(format!("{}.http", filename.cache_key()));
810
811 let query_url = &url.clone();
812
813 let download = |response: reqwest::Response| {
814 async {
815 let size = size.or_else(|| content_length(&response));
816
817 let progress = self
818 .reporter
819 .as_ref()
820 .map(|reporter| (reporter, reporter.on_download_start(dist.name(), size)));
821
822 let reader = response
823 .bytes_stream()
824 .map_err(|err| self.handle_response_errors(err))
825 .into_async_read();
826
827 let temp_file = tempfile::tempfile_in(self.build_context.cache().root())
829 .map_err(Error::CacheWrite)?;
830 let mut writer = tokio::io::BufWriter::new(fs_err::tokio::File::from_std(
831 fs_err::File::from_parts(temp_file, self.build_context.cache().root()),
833 ));
834
835 match progress {
836 Some((reporter, progress)) => {
837 let mut reader =
841 ProgressReader::new(reader.compat(), progress, &**reporter);
842
843 tokio::io::copy(&mut reader, &mut writer)
844 .await
845 .map_err(Error::CacheWrite)?;
846 }
847 None => {
848 tokio::io::copy(&mut reader.compat(), &mut writer)
849 .await
850 .map_err(Error::CacheWrite)?;
851 }
852 }
853
854 let temp_dir = tempfile::tempdir_in(self.build_context.cache().root())
856 .map_err(Error::CacheWrite)?;
857 let mut file = writer.into_inner();
858 file.seek(io::SeekFrom::Start(0))
859 .await
860 .map_err(Error::CacheWrite)?;
861
862 let hashes = if hashes.is_none() {
864 let file = file.into_std().await;
865 tokio::task::spawn_blocking({
866 let target = temp_dir.path().to_owned();
867 move || -> Result<(), uv_extract::Error> {
868 match extension {
870 WheelExtension::Whl => {
871 uv_extract::unzip(file, &target)?;
872 }
873 WheelExtension::WhlZst => {
874 uv_extract::stream::untar_zst_file(file, &target)?;
875 }
876 }
877 Ok(())
878 }
879 })
880 .await?
881 .map_err(|err| Error::Extract(filename.to_string(), err))?;
882
883 HashDigests::empty()
884 } else {
885 let algorithms = hashes.algorithms();
887 let mut hashers = algorithms.into_iter().map(Hasher::from).collect::<Vec<_>>();
888 let mut hasher = uv_extract::hash::HashReader::new(file, &mut hashers);
889
890 match extension {
891 WheelExtension::Whl => {
892 uv_extract::stream::unzip(query_url, &mut hasher, temp_dir.path())
893 .await
894 .map_err(|err| Error::Extract(filename.to_string(), err))?;
895 }
896 WheelExtension::WhlZst => {
897 uv_extract::stream::untar_zst(&mut hasher, temp_dir.path())
898 .await
899 .map_err(|err| Error::Extract(filename.to_string(), err))?;
900 }
901 }
902
903 hasher.finish().await.map_err(Error::HashExhaustion)?;
905
906 hashers.into_iter().map(HashDigest::from).collect()
907 };
908
909 let id = self
911 .build_context
912 .cache()
913 .persist(temp_dir.keep(), wheel_entry.path())
914 .await
915 .map_err(Error::CacheRead)?;
916
917 if let Some((reporter, progress)) = progress {
918 reporter.on_download_complete(dist.name(), progress);
919 }
920
921 Ok(Archive::new(id, hashes, filename.clone()))
922 }
923 .instrument(info_span!("wheel", wheel = %dist))
924 };
925
926 let req = self.request(url.clone())?;
928
929 let cache_control = match self.client.unmanaged.connectivity() {
931 Connectivity::Online => {
932 if let Some(header) = index.and_then(|index| {
933 self.build_context
934 .locations()
935 .artifact_cache_control_for(index)
936 }) {
937 CacheControl::Override(header)
938 } else {
939 CacheControl::from(
940 self.build_context
941 .cache()
942 .freshness(&http_entry, Some(&filename.name), None)
943 .map_err(Error::CacheRead)?,
944 )
945 }
946 }
947 Connectivity::Offline => CacheControl::AllowStale,
948 };
949
950 let archive = self
951 .client
952 .managed(|client| {
953 client.cached_client().get_serde_with_retry(
954 req,
955 &http_entry,
956 cache_control.clone(),
957 download,
958 )
959 })
960 .await
961 .map_err(|err| match err {
962 CachedClientError::Callback { err, .. } => err,
963 CachedClientError::Client(err) => Error::Client(err),
964 })?;
965
966 let archive = Some(archive)
968 .filter(|archive| archive.has_digests(hashes))
969 .filter(|archive| archive.exists(self.build_context.cache()));
970
971 let archive = if let Some(archive) = archive {
972 archive
973 } else {
974 self.client
975 .managed(async |client| {
976 client
977 .cached_client()
978 .skip_cache_with_retry(
979 self.request(url)?,
980 &http_entry,
981 cache_control,
982 download,
983 )
984 .await
985 .map_err(|err| match err {
986 CachedClientError::Callback { err, .. } => err,
987 CachedClientError::Client(err) => Error::Client(err),
988 })
989 })
990 .await?
991 };
992
993 Ok(archive)
994 }
995
996 async fn load_wheel(
998 &self,
999 path: &Path,
1000 filename: &WheelFilename,
1001 extension: WheelExtension,
1002 wheel_entry: CacheEntry,
1003 dist: &BuiltDist,
1004 hashes: HashPolicy<'_>,
1005 ) -> Result<LocalWheel, Error> {
1006 #[cfg(windows)]
1007 let _lock = {
1008 let lock_entry = wheel_entry.with_file(format!("{}.lock", filename.stem()));
1009 lock_entry.lock().await.map_err(Error::CacheLock)?
1010 };
1011
1012 let modified = Timestamp::from_path(path).map_err(Error::CacheRead)?;
1014
1015 let pointer_entry = wheel_entry.with_file(format!("{}.rev", filename.cache_key()));
1017 let pointer = LocalArchivePointer::read_from(&pointer_entry)?;
1018
1019 let archive = pointer
1021 .filter(|pointer| pointer.is_up_to_date(modified))
1022 .map(LocalArchivePointer::into_archive)
1023 .filter(|archive| archive.has_digests(hashes));
1024
1025 if let Some(archive) = archive {
1027 Ok(LocalWheel {
1028 dist: Dist::Built(dist.clone()),
1029 archive: self
1030 .build_context
1031 .cache()
1032 .archive(&archive.id)
1033 .into_boxed_path(),
1034 hashes: archive.hashes,
1035 filename: filename.clone(),
1036 cache: CacheInfo::from_timestamp(modified),
1037 build: None,
1038 })
1039 } else if hashes.is_none() {
1040 let archive = Archive::new(
1042 self.unzip_wheel(path, wheel_entry.path()).await?,
1043 HashDigests::empty(),
1044 filename.clone(),
1045 );
1046
1047 let pointer = LocalArchivePointer {
1049 timestamp: modified,
1050 archive: archive.clone(),
1051 };
1052 pointer.write_to(&pointer_entry).await?;
1053
1054 Ok(LocalWheel {
1055 dist: Dist::Built(dist.clone()),
1056 archive: self
1057 .build_context
1058 .cache()
1059 .archive(&archive.id)
1060 .into_boxed_path(),
1061 hashes: archive.hashes,
1062 filename: filename.clone(),
1063 cache: CacheInfo::from_timestamp(modified),
1064 build: None,
1065 })
1066 } else {
1067 let file = fs_err::tokio::File::open(path)
1069 .await
1070 .map_err(Error::CacheRead)?;
1071 let temp_dir = tempfile::tempdir_in(self.build_context.cache().root())
1072 .map_err(Error::CacheWrite)?;
1073
1074 let algorithms = hashes.algorithms();
1076 let mut hashers = algorithms.into_iter().map(Hasher::from).collect::<Vec<_>>();
1077 let mut hasher = uv_extract::hash::HashReader::new(file, &mut hashers);
1078
1079 match extension {
1081 WheelExtension::Whl => {
1082 uv_extract::stream::unzip(path.display(), &mut hasher, temp_dir.path())
1083 .await
1084 .map_err(|err| Error::Extract(filename.to_string(), err))?;
1085 }
1086 WheelExtension::WhlZst => {
1087 uv_extract::stream::untar_zst(&mut hasher, temp_dir.path())
1088 .await
1089 .map_err(|err| Error::Extract(filename.to_string(), err))?;
1090 }
1091 }
1092
1093 hasher.finish().await.map_err(Error::HashExhaustion)?;
1095
1096 let hashes = hashers.into_iter().map(HashDigest::from).collect();
1097
1098 let id = self
1100 .build_context
1101 .cache()
1102 .persist(temp_dir.keep(), wheel_entry.path())
1103 .await
1104 .map_err(Error::CacheWrite)?;
1105
1106 let archive = Archive::new(id, hashes, filename.clone());
1108
1109 let pointer = LocalArchivePointer {
1111 timestamp: modified,
1112 archive: archive.clone(),
1113 };
1114 pointer.write_to(&pointer_entry).await?;
1115
1116 Ok(LocalWheel {
1117 dist: Dist::Built(dist.clone()),
1118 archive: self
1119 .build_context
1120 .cache()
1121 .archive(&archive.id)
1122 .into_boxed_path(),
1123 hashes: archive.hashes,
1124 filename: filename.clone(),
1125 cache: CacheInfo::from_timestamp(modified),
1126 build: None,
1127 })
1128 }
1129 }
1130
1131 async fn unzip_wheel(&self, path: &Path, target: &Path) -> Result<ArchiveId, Error> {
1133 let temp_dir = tokio::task::spawn_blocking({
1134 let path = path.to_owned();
1135 let root = self.build_context.cache().root().to_path_buf();
1136 move || -> Result<TempDir, Error> {
1137 let temp_dir = tempfile::tempdir_in(root).map_err(Error::CacheWrite)?;
1139 let reader = fs_err::File::open(&path).map_err(Error::CacheWrite)?;
1140 uv_extract::unzip(reader, temp_dir.path())
1141 .map_err(|err| Error::Extract(path.to_string_lossy().into_owned(), err))?;
1142 Ok(temp_dir)
1143 }
1144 })
1145 .await??;
1146
1147 let id = self
1149 .build_context
1150 .cache()
1151 .persist(temp_dir.keep(), target)
1152 .await
1153 .map_err(Error::CacheWrite)?;
1154
1155 Ok(id)
1156 }
1157
1158 fn request(&self, url: DisplaySafeUrl) -> Result<reqwest::Request, reqwest::Error> {
1160 self.client
1161 .unmanaged
1162 .uncached_client(&url)
1163 .get(Url::from(url))
1164 .header(
1165 "accept-encoding",
1169 reqwest::header::HeaderValue::from_static("identity"),
1170 )
1171 .build()
1172 }
1173
1174 pub fn client(&self) -> &ManagedClient<'a> {
1176 &self.client
1177 }
1178}
1179
1180pub struct ManagedClient<'a> {
1182 pub unmanaged: &'a RegistryClient,
1183 control: Arc<Semaphore>,
1184}
1185
1186impl<'a> ManagedClient<'a> {
1187 fn new(client: &'a RegistryClient, control: Arc<Semaphore>) -> Self {
1189 ManagedClient {
1190 unmanaged: client,
1191 control,
1192 }
1193 }
1194
1195 pub async fn managed<F, T>(&self, f: impl FnOnce(&'a RegistryClient) -> F) -> T
1200 where
1201 F: Future<Output = T>,
1202 {
1203 let _permit = self.control.acquire().await.unwrap();
1204 f(self.unmanaged).await
1205 }
1206
1207 pub async fn manual<F, T>(&'a self, f: impl FnOnce(&'a RegistryClient, &'a Semaphore) -> F) -> T
1215 where
1216 F: Future<Output = T>,
1217 {
1218 f(self.unmanaged, &self.control).await
1219 }
1220}
1221
1222fn content_length(response: &reqwest::Response) -> Option<u64> {
1224 response
1225 .headers()
1226 .get(reqwest::header::CONTENT_LENGTH)
1227 .and_then(|val| val.to_str().ok())
1228 .and_then(|val| val.parse::<u64>().ok())
1229}
1230
1231struct ProgressReader<'a, R> {
1233 reader: R,
1234 index: usize,
1235 reporter: &'a dyn Reporter,
1236}
1237
1238impl<'a, R> ProgressReader<'a, R> {
1239 fn new(reader: R, index: usize, reporter: &'a dyn Reporter) -> Self {
1241 Self {
1242 reader,
1243 index,
1244 reporter,
1245 }
1246 }
1247}
1248
1249impl<R> AsyncRead for ProgressReader<'_, R>
1250where
1251 R: AsyncRead + Unpin,
1252{
1253 fn poll_read(
1254 mut self: Pin<&mut Self>,
1255 cx: &mut Context<'_>,
1256 buf: &mut ReadBuf<'_>,
1257 ) -> Poll<io::Result<()>> {
1258 Pin::new(&mut self.as_mut().reader)
1259 .poll_read(cx, buf)
1260 .map_ok(|()| {
1261 self.reporter
1262 .on_download_progress(self.index, buf.filled().len() as u64);
1263 })
1264 }
1265}
1266
1267#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
1271pub struct HttpArchivePointer {
1272 archive: Archive,
1273}
1274
1275impl HttpArchivePointer {
1276 pub fn read_from(path: impl AsRef<Path>) -> Result<Option<Self>, Error> {
1278 match fs_err::File::open(path.as_ref()) {
1279 Ok(file) => {
1280 let data = DataWithCachePolicy::from_reader(file)?.data;
1281 let archive = rmp_serde::from_slice::<Archive>(&data)?;
1282 Ok(Some(Self { archive }))
1283 }
1284 Err(err) if err.kind() == io::ErrorKind::NotFound => Ok(None),
1285 Err(err) => Err(Error::CacheRead(err)),
1286 }
1287 }
1288
1289 pub fn into_archive(self) -> Archive {
1291 self.archive
1292 }
1293
1294 pub fn to_cache_info(&self) -> CacheInfo {
1296 CacheInfo::default()
1297 }
1298
1299 pub fn to_build_info(&self) -> Option<BuildInfo> {
1301 None
1302 }
1303}
1304
1305#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
1309pub struct LocalArchivePointer {
1310 timestamp: Timestamp,
1311 archive: Archive,
1312}
1313
1314impl LocalArchivePointer {
1315 pub fn read_from(path: impl AsRef<Path>) -> Result<Option<Self>, Error> {
1317 match fs_err::read(path) {
1318 Ok(cached) => Ok(Some(rmp_serde::from_slice::<Self>(&cached)?)),
1319 Err(err) if err.kind() == io::ErrorKind::NotFound => Ok(None),
1320 Err(err) => Err(Error::CacheRead(err)),
1321 }
1322 }
1323
1324 pub async fn write_to(&self, entry: &CacheEntry) -> Result<(), Error> {
1326 write_atomic(entry.path(), rmp_serde::to_vec(&self)?)
1327 .await
1328 .map_err(Error::CacheWrite)
1329 }
1330
1331 pub fn is_up_to_date(&self, modified: Timestamp) -> bool {
1333 self.timestamp == modified
1334 }
1335
1336 pub fn into_archive(self) -> Archive {
1338 self.archive
1339 }
1340
1341 pub fn to_cache_info(&self) -> CacheInfo {
1343 CacheInfo::from_timestamp(self.timestamp)
1344 }
1345
1346 pub fn to_build_info(&self) -> Option<BuildInfo> {
1348 None
1349 }
1350}
1351
1352#[derive(Debug, Clone)]
1353struct WheelTarget {
1354 url: DisplaySafeUrl,
1356 extension: WheelExtension,
1358 size: Option<u64>,
1360}
1361
1362impl TryFrom<&File> for WheelTarget {
1363 type Error = ToUrlError;
1364
1365 fn try_from(file: &File) -> Result<Self, Self::Error> {
1367 let url = file.url.to_url()?;
1368 if let Some(zstd) = file.zstd.as_ref() {
1369 Ok(Self {
1370 url: add_tar_zst_extension(url),
1371 extension: WheelExtension::WhlZst,
1372 size: zstd.size,
1373 })
1374 } else {
1375 Ok(Self {
1376 url,
1377 extension: WheelExtension::Whl,
1378 size: file.size,
1379 })
1380 }
1381 }
1382}
1383
1384#[derive(Debug, Copy, Clone, PartialEq, Eq)]
1385enum WheelExtension {
1386 Whl,
1388 WhlZst,
1390}
1391
1392#[must_use]
1394fn add_tar_zst_extension(mut url: DisplaySafeUrl) -> DisplaySafeUrl {
1395 let mut path = url.path().to_string();
1396
1397 if !path.ends_with(".tar.zst") {
1398 path.push_str(".tar.zst");
1399 }
1400
1401 url.set_path(&path);
1402 url
1403}
1404
1405#[cfg(test)]
1406mod tests {
1407 use super::*;
1408
1409 #[test]
1410 fn test_add_tar_zst_extension() {
1411 let url =
1412 DisplaySafeUrl::parse("https://files.pythonhosted.org/flask-3.1.0-py3-none-any.whl")
1413 .unwrap();
1414 assert_eq!(
1415 add_tar_zst_extension(url).as_str(),
1416 "https://files.pythonhosted.org/flask-3.1.0-py3-none-any.whl.tar.zst"
1417 );
1418
1419 let url = DisplaySafeUrl::parse(
1420 "https://files.pythonhosted.org/flask-3.1.0-py3-none-any.whl.tar.zst",
1421 )
1422 .unwrap();
1423 assert_eq!(
1424 add_tar_zst_extension(url).as_str(),
1425 "https://files.pythonhosted.org/flask-3.1.0-py3-none-any.whl.tar.zst"
1426 );
1427
1428 let url = DisplaySafeUrl::parse(
1429 "https://files.pythonhosted.org/flask-3.1.0%2Bcu124-py3-none-any.whl",
1430 )
1431 .unwrap();
1432 assert_eq!(
1433 add_tar_zst_extension(url).as_str(),
1434 "https://files.pythonhosted.org/flask-3.1.0%2Bcu124-py3-none-any.whl.tar.zst"
1435 );
1436 }
1437}