1use std::future::Future;
2use std::io;
3use std::path::Path;
4use std::pin::Pin;
5use std::sync::Arc;
6use std::task::{Context, Poll};
7
8use futures::{FutureExt, TryStreamExt};
9use tempfile::TempDir;
10use tokio::io::{AsyncRead, AsyncSeekExt, ReadBuf};
11use tokio::sync::Semaphore;
12use tokio_util::compat::FuturesAsyncReadCompatExt;
13use tracing::{Instrument, info_span, instrument, warn};
14use url::Url;
15
16use uv_cache::{ArchiveId, CacheBucket, CacheEntry, WheelCache};
17use uv_cache_info::{CacheInfo, Timestamp};
18use uv_client::{
19 CacheControl, CachedClientError, Connectivity, DataWithCachePolicy, RegistryClient,
20};
21use uv_distribution_filename::WheelFilename;
22use uv_distribution_types::{
23 BuildInfo, BuildableSource, BuiltDist, Dist, File, HashPolicy, Hashed, IndexUrl, InstalledDist,
24 Name, SourceDist, ToUrlError,
25};
26use uv_extract::hash::Hasher;
27use uv_fs::write_atomic;
28use uv_platform_tags::Tags;
29use uv_pypi_types::{HashDigest, HashDigests, PyProjectToml};
30use uv_redacted::DisplaySafeUrl;
31use uv_types::{BuildContext, BuildStack};
32
33use crate::archive::Archive;
34use crate::metadata::{ArchiveMetadata, Metadata};
35use crate::source::SourceDistributionBuilder;
36use crate::{Error, LocalWheel, Reporter, RequiresDist};
37
38pub struct DistributionDatabase<'a, Context: BuildContext> {
51 build_context: &'a Context,
52 builder: SourceDistributionBuilder<'a, Context>,
53 client: ManagedClient<'a>,
54 reporter: Option<Arc<dyn Reporter>>,
55}
56
57impl<'a, Context: BuildContext> DistributionDatabase<'a, Context> {
58 pub fn new(
59 client: &'a RegistryClient,
60 build_context: &'a Context,
61 concurrent_downloads: usize,
62 ) -> Self {
63 Self {
64 build_context,
65 builder: SourceDistributionBuilder::new(build_context),
66 client: ManagedClient::new(client, concurrent_downloads),
67 reporter: None,
68 }
69 }
70
71 #[must_use]
73 pub fn with_build_stack(self, build_stack: &'a BuildStack) -> Self {
74 Self {
75 builder: self.builder.with_build_stack(build_stack),
76 ..self
77 }
78 }
79
80 #[must_use]
82 pub fn with_reporter(self, reporter: Arc<dyn Reporter>) -> Self {
83 Self {
84 builder: self.builder.with_reporter(reporter.clone()),
85 reporter: Some(reporter),
86 ..self
87 }
88 }
89
90 fn handle_response_errors(&self, err: reqwest::Error) -> io::Error {
92 if err.is_timeout() {
93 io::Error::new(
94 io::ErrorKind::TimedOut,
95 format!(
96 "Failed to download distribution due to network timeout. Try increasing UV_HTTP_TIMEOUT (current value: {}s).",
97 self.client.unmanaged.timeout().as_secs()
98 ),
99 )
100 } else {
101 io::Error::other(err)
102 }
103 }
104
105 #[instrument(skip_all, fields(%dist))]
112 pub async fn get_or_build_wheel(
113 &self,
114 dist: &Dist,
115 tags: &Tags,
116 hashes: HashPolicy<'_>,
117 ) -> Result<LocalWheel, Error> {
118 match dist {
119 Dist::Built(built) => self.get_wheel(built, hashes).await,
120 Dist::Source(source) => self.build_wheel(source, tags, hashes).await,
121 }
122 }
123
124 #[instrument(skip_all, fields(%dist))]
130 pub async fn get_installed_metadata(
131 &self,
132 dist: &InstalledDist,
133 ) -> Result<ArchiveMetadata, Error> {
134 if let Some(metadata) = self
136 .build_context
137 .dependency_metadata()
138 .get(dist.name(), Some(dist.version()))
139 {
140 return Ok(ArchiveMetadata::from_metadata23(metadata.clone()));
141 }
142
143 let metadata = dist
144 .read_metadata()
145 .map_err(|err| Error::ReadInstalled(Box::new(dist.clone()), err))?;
146
147 Ok(ArchiveMetadata::from_metadata23(metadata.clone()))
148 }
149
150 #[instrument(skip_all, fields(%dist))]
156 pub async fn get_or_build_wheel_metadata(
157 &self,
158 dist: &Dist,
159 hashes: HashPolicy<'_>,
160 ) -> Result<ArchiveMetadata, Error> {
161 match dist {
162 Dist::Built(built) => self.get_wheel_metadata(built, hashes).await,
163 Dist::Source(source) => {
164 self.build_wheel_metadata(&BuildableSource::Dist(source), hashes)
165 .await
166 }
167 }
168 }
169
170 async fn get_wheel(
175 &self,
176 dist: &BuiltDist,
177 hashes: HashPolicy<'_>,
178 ) -> Result<LocalWheel, Error> {
179 match dist {
180 BuiltDist::Registry(wheels) => {
181 let wheel = wheels.best_wheel();
182 let WheelTarget {
183 url,
184 extension,
185 size,
186 } = WheelTarget::try_from(&*wheel.file)?;
187
188 let wheel_entry = self.build_context.cache().entry(
190 CacheBucket::Wheels,
191 WheelCache::Index(&wheel.index).wheel_dir(wheel.name().as_ref()),
192 wheel.filename.cache_key(),
193 );
194
195 if url.scheme() == "file" {
197 let path = url
198 .to_file_path()
199 .map_err(|()| Error::NonFileUrl(url.clone()))?;
200 return self
201 .load_wheel(
202 &path,
203 &wheel.filename,
204 WheelExtension::Whl,
205 wheel_entry,
206 dist,
207 hashes,
208 )
209 .await;
210 }
211
212 match self
214 .stream_wheel(
215 url.clone(),
216 dist.index(),
217 &wheel.filename,
218 extension,
219 size,
220 &wheel_entry,
221 dist,
222 hashes,
223 )
224 .await
225 {
226 Ok(archive) => Ok(LocalWheel {
227 dist: Dist::Built(dist.clone()),
228 archive: self
229 .build_context
230 .cache()
231 .archive(&archive.id)
232 .into_boxed_path(),
233 hashes: archive.hashes,
234 filename: wheel.filename.clone(),
235 cache: CacheInfo::default(),
236 build: None,
237 }),
238 Err(Error::Extract(name, err)) => {
239 if err.is_http_streaming_unsupported() {
240 warn!(
241 "Streaming unsupported for {dist}; downloading wheel to disk ({err})"
242 );
243 } else if err.is_http_streaming_failed() {
244 warn!("Streaming failed for {dist}; downloading wheel to disk ({err})");
245 } else {
246 return Err(Error::Extract(name, err));
247 }
248
249 let archive = self
252 .download_wheel(
253 url,
254 dist.index(),
255 &wheel.filename,
256 extension,
257 size,
258 &wheel_entry,
259 dist,
260 hashes,
261 )
262 .await?;
263
264 Ok(LocalWheel {
265 dist: Dist::Built(dist.clone()),
266 archive: self
267 .build_context
268 .cache()
269 .archive(&archive.id)
270 .into_boxed_path(),
271 hashes: archive.hashes,
272 filename: wheel.filename.clone(),
273 cache: CacheInfo::default(),
274 build: None,
275 })
276 }
277 Err(err) => Err(err),
278 }
279 }
280
281 BuiltDist::DirectUrl(wheel) => {
282 let wheel_entry = self.build_context.cache().entry(
284 CacheBucket::Wheels,
285 WheelCache::Url(&wheel.url).wheel_dir(wheel.name().as_ref()),
286 wheel.filename.cache_key(),
287 );
288
289 match self
291 .stream_wheel(
292 wheel.url.raw().clone(),
293 None,
294 &wheel.filename,
295 WheelExtension::Whl,
296 None,
297 &wheel_entry,
298 dist,
299 hashes,
300 )
301 .await
302 {
303 Ok(archive) => Ok(LocalWheel {
304 dist: Dist::Built(dist.clone()),
305 archive: self
306 .build_context
307 .cache()
308 .archive(&archive.id)
309 .into_boxed_path(),
310 hashes: archive.hashes,
311 filename: wheel.filename.clone(),
312 cache: CacheInfo::default(),
313 build: None,
314 }),
315 Err(Error::Client(err)) if err.is_http_streaming_unsupported() => {
316 warn!(
317 "Streaming unsupported for {dist}; downloading wheel to disk ({err})"
318 );
319
320 let archive = self
323 .download_wheel(
324 wheel.url.raw().clone(),
325 None,
326 &wheel.filename,
327 WheelExtension::Whl,
328 None,
329 &wheel_entry,
330 dist,
331 hashes,
332 )
333 .await?;
334 Ok(LocalWheel {
335 dist: Dist::Built(dist.clone()),
336 archive: self
337 .build_context
338 .cache()
339 .archive(&archive.id)
340 .into_boxed_path(),
341 hashes: archive.hashes,
342 filename: wheel.filename.clone(),
343 cache: CacheInfo::default(),
344 build: None,
345 })
346 }
347 Err(err) => Err(err),
348 }
349 }
350
351 BuiltDist::Path(wheel) => {
352 let cache_entry = self.build_context.cache().entry(
353 CacheBucket::Wheels,
354 WheelCache::Url(&wheel.url).wheel_dir(wheel.name().as_ref()),
355 wheel.filename.cache_key(),
356 );
357
358 self.load_wheel(
359 &wheel.install_path,
360 &wheel.filename,
361 WheelExtension::Whl,
362 cache_entry,
363 dist,
364 hashes,
365 )
366 .await
367 }
368 }
369 }
370
371 async fn build_wheel(
377 &self,
378 dist: &SourceDist,
379 tags: &Tags,
380 hashes: HashPolicy<'_>,
381 ) -> Result<LocalWheel, Error> {
382 let built_wheel = self
383 .builder
384 .download_and_build(&BuildableSource::Dist(dist), tags, hashes, &self.client)
385 .boxed_local()
386 .await?;
387
388 if !built_wheel.filename.is_compatible(tags) {
394 return if tags.is_cross() {
395 Err(Error::BuiltWheelIncompatibleTargetPlatform {
396 filename: built_wheel.filename,
397 python_platform: tags.python_platform().clone(),
398 python_version: tags.python_version(),
399 })
400 } else {
401 Err(Error::BuiltWheelIncompatibleHostPlatform {
402 filename: built_wheel.filename,
403 python_platform: tags.python_platform().clone(),
404 python_version: tags.python_version(),
405 })
406 };
407 }
408
409 #[cfg(windows)]
411 let _lock = {
412 let lock_entry = CacheEntry::new(
413 built_wheel.target.parent().unwrap(),
414 format!(
415 "{}.lock",
416 built_wheel.target.file_name().unwrap().to_str().unwrap()
417 ),
418 );
419 lock_entry.lock().await.map_err(Error::CacheLock)?
420 };
421
422 match self.build_context.cache().resolve_link(&built_wheel.target) {
425 Ok(archive) => {
426 return Ok(LocalWheel {
427 dist: Dist::Source(dist.clone()),
428 archive: archive.into_boxed_path(),
429 filename: built_wheel.filename,
430 hashes: built_wheel.hashes,
431 cache: built_wheel.cache_info,
432 build: Some(built_wheel.build_info),
433 });
434 }
435 Err(err) if err.kind() == io::ErrorKind::NotFound => {}
436 Err(err) => return Err(Error::CacheRead(err)),
437 }
438
439 let id = self
441 .unzip_wheel(&built_wheel.path, &built_wheel.target)
442 .await?;
443
444 Ok(LocalWheel {
445 dist: Dist::Source(dist.clone()),
446 archive: self.build_context.cache().archive(&id).into_boxed_path(),
447 hashes: built_wheel.hashes,
448 filename: built_wheel.filename,
449 cache: built_wheel.cache_info,
450 build: Some(built_wheel.build_info),
451 })
452 }
453
454 async fn get_wheel_metadata(
459 &self,
460 dist: &BuiltDist,
461 hashes: HashPolicy<'_>,
462 ) -> Result<ArchiveMetadata, Error> {
463 if hashes.is_generate(dist) {
478 let wheel = self.get_wheel(dist, hashes).await?;
479 let metadata = if let Some(metadata) = self
481 .build_context
482 .dependency_metadata()
483 .get(dist.name(), Some(dist.version()))
484 {
485 metadata.clone()
486 } else {
487 wheel.metadata()?
488 };
489 let hashes = wheel.hashes;
490 return Ok(ArchiveMetadata {
491 metadata: Metadata::from_metadata23(metadata),
492 hashes,
493 });
494 }
495
496 if let Some(metadata) = self
498 .build_context
499 .dependency_metadata()
500 .get(dist.name(), Some(dist.version()))
501 {
502 return Ok(ArchiveMetadata::from_metadata23(metadata.clone()));
503 }
504
505 let result = self
506 .client
507 .managed(|client| {
508 client
509 .wheel_metadata(dist, self.build_context.capabilities())
510 .boxed_local()
511 })
512 .await;
513
514 match result {
515 Ok(metadata) => {
516 Ok(ArchiveMetadata::from_metadata23(metadata))
518 }
519 Err(err) if err.is_http_streaming_unsupported() => {
520 warn!(
521 "Streaming unsupported when fetching metadata for {dist}; downloading wheel directly ({err})"
522 );
523
524 let wheel = self.get_wheel(dist, hashes).await?;
527 let metadata = wheel.metadata()?;
528 let hashes = wheel.hashes;
529 Ok(ArchiveMetadata {
530 metadata: Metadata::from_metadata23(metadata),
531 hashes,
532 })
533 }
534 Err(err) => Err(err.into()),
535 }
536 }
537
538 pub async fn build_wheel_metadata(
543 &self,
544 source: &BuildableSource<'_>,
545 hashes: HashPolicy<'_>,
546 ) -> Result<ArchiveMetadata, Error> {
547 if let Some(dist) = source.as_dist() {
549 if let Some(metadata) = self
550 .build_context
551 .dependency_metadata()
552 .get(dist.name(), dist.version())
553 {
554 self.builder.resolve_revision(source, &self.client).await?;
557
558 return Ok(ArchiveMetadata::from_metadata23(metadata.clone()));
559 }
560 }
561
562 let metadata = self
563 .builder
564 .download_and_build_metadata(source, hashes, &self.client)
565 .boxed_local()
566 .await?;
567
568 Ok(metadata)
569 }
570
571 pub async fn requires_dist(
573 &self,
574 path: &Path,
575 pyproject_toml: &PyProjectToml,
576 ) -> Result<Option<RequiresDist>, Error> {
577 self.builder
578 .source_tree_requires_dist(
579 path,
580 pyproject_toml,
581 self.client.unmanaged.credentials_cache(),
582 )
583 .await
584 }
585
586 async fn stream_wheel(
588 &self,
589 url: DisplaySafeUrl,
590 index: Option<&IndexUrl>,
591 filename: &WheelFilename,
592 extension: WheelExtension,
593 size: Option<u64>,
594 wheel_entry: &CacheEntry,
595 dist: &BuiltDist,
596 hashes: HashPolicy<'_>,
597 ) -> Result<Archive, Error> {
598 #[cfg(windows)]
600 let _lock = {
601 let lock_entry = wheel_entry.with_file(format!("{}.lock", filename.stem()));
602 lock_entry.lock().await.map_err(Error::CacheLock)?
603 };
604
605 let http_entry = wheel_entry.with_file(format!("{}.http", filename.cache_key()));
607
608 let download = |response: reqwest::Response| {
609 async {
610 let size = size.or_else(|| content_length(&response));
611
612 let progress = self
613 .reporter
614 .as_ref()
615 .map(|reporter| (reporter, reporter.on_download_start(dist.name(), size)));
616
617 let reader = response
618 .bytes_stream()
619 .map_err(|err| self.handle_response_errors(err))
620 .into_async_read();
621
622 let algorithms = hashes.algorithms();
624 let mut hashers = algorithms.into_iter().map(Hasher::from).collect::<Vec<_>>();
625 let mut hasher = uv_extract::hash::HashReader::new(reader.compat(), &mut hashers);
626
627 let temp_dir = tempfile::tempdir_in(self.build_context.cache().root())
629 .map_err(Error::CacheWrite)?;
630
631 match progress {
632 Some((reporter, progress)) => {
633 let mut reader = ProgressReader::new(&mut hasher, progress, &**reporter);
634 match extension {
635 WheelExtension::Whl => {
636 uv_extract::stream::unzip(&mut reader, temp_dir.path())
637 .await
638 .map_err(|err| Error::Extract(filename.to_string(), err))?;
639 }
640 WheelExtension::WhlZst => {
641 uv_extract::stream::untar_zst(&mut reader, temp_dir.path())
642 .await
643 .map_err(|err| Error::Extract(filename.to_string(), err))?;
644 }
645 }
646 }
647 None => match extension {
648 WheelExtension::Whl => {
649 uv_extract::stream::unzip(&mut hasher, temp_dir.path())
650 .await
651 .map_err(|err| Error::Extract(filename.to_string(), err))?;
652 }
653 WheelExtension::WhlZst => {
654 uv_extract::stream::untar_zst(&mut hasher, temp_dir.path())
655 .await
656 .map_err(|err| Error::Extract(filename.to_string(), err))?;
657 }
658 },
659 }
660
661 if !hashes.is_none() {
663 hasher.finish().await.map_err(Error::HashExhaustion)?;
664 }
665
666 let id = self
668 .build_context
669 .cache()
670 .persist(temp_dir.keep(), wheel_entry.path())
671 .await
672 .map_err(Error::CacheRead)?;
673
674 if let Some((reporter, progress)) = progress {
675 reporter.on_download_complete(dist.name(), progress);
676 }
677
678 Ok(Archive::new(
679 id,
680 hashers.into_iter().map(HashDigest::from).collect(),
681 filename.clone(),
682 ))
683 }
684 .instrument(info_span!("wheel", wheel = %dist))
685 };
686
687 let req = self.request(url.clone())?;
689
690 let cache_control = match self.client.unmanaged.connectivity() {
692 Connectivity::Online => {
693 if let Some(header) = index.and_then(|index| {
694 self.build_context
695 .locations()
696 .artifact_cache_control_for(index)
697 }) {
698 CacheControl::Override(header)
699 } else {
700 CacheControl::from(
701 self.build_context
702 .cache()
703 .freshness(&http_entry, Some(&filename.name), None)
704 .map_err(Error::CacheRead)?,
705 )
706 }
707 }
708 Connectivity::Offline => CacheControl::AllowStale,
709 };
710
711 let archive = self
712 .client
713 .managed(|client| {
714 client.cached_client().get_serde_with_retry(
715 req,
716 &http_entry,
717 cache_control,
718 download,
719 )
720 })
721 .await
722 .map_err(|err| match err {
723 CachedClientError::Callback { err, .. } => err,
724 CachedClientError::Client { err, .. } => Error::Client(err),
725 })?;
726
727 let archive = Some(archive)
729 .filter(|archive| archive.has_digests(hashes))
730 .filter(|archive| archive.exists(self.build_context.cache()));
731
732 let archive = if let Some(archive) = archive {
733 archive
734 } else {
735 self.client
736 .managed(async |client| {
737 client
738 .cached_client()
739 .skip_cache_with_retry(
740 self.request(url)?,
741 &http_entry,
742 cache_control,
743 download,
744 )
745 .await
746 .map_err(|err| match err {
747 CachedClientError::Callback { err, .. } => err,
748 CachedClientError::Client { err, .. } => Error::Client(err),
749 })
750 })
751 .await?
752 };
753
754 Ok(archive)
755 }
756
757 async fn download_wheel(
759 &self,
760 url: DisplaySafeUrl,
761 index: Option<&IndexUrl>,
762 filename: &WheelFilename,
763 extension: WheelExtension,
764 size: Option<u64>,
765 wheel_entry: &CacheEntry,
766 dist: &BuiltDist,
767 hashes: HashPolicy<'_>,
768 ) -> Result<Archive, Error> {
769 #[cfg(windows)]
771 let _lock = {
772 let lock_entry = wheel_entry.with_file(format!("{}.lock", filename.stem()));
773 lock_entry.lock().await.map_err(Error::CacheLock)?
774 };
775
776 let http_entry = wheel_entry.with_file(format!("{}.http", filename.cache_key()));
778
779 let download = |response: reqwest::Response| {
780 async {
781 let size = size.or_else(|| content_length(&response));
782
783 let progress = self
784 .reporter
785 .as_ref()
786 .map(|reporter| (reporter, reporter.on_download_start(dist.name(), size)));
787
788 let reader = response
789 .bytes_stream()
790 .map_err(|err| self.handle_response_errors(err))
791 .into_async_read();
792
793 let temp_file = tempfile::tempfile_in(self.build_context.cache().root())
795 .map_err(Error::CacheWrite)?;
796 let mut writer = tokio::io::BufWriter::new(fs_err::tokio::File::from_std(
797 fs_err::File::from_parts(temp_file, self.build_context.cache().root()),
799 ));
800
801 match progress {
802 Some((reporter, progress)) => {
803 let mut reader =
807 ProgressReader::new(reader.compat(), progress, &**reporter);
808
809 tokio::io::copy(&mut reader, &mut writer)
810 .await
811 .map_err(Error::CacheWrite)?;
812 }
813 None => {
814 tokio::io::copy(&mut reader.compat(), &mut writer)
815 .await
816 .map_err(Error::CacheWrite)?;
817 }
818 }
819
820 let temp_dir = tempfile::tempdir_in(self.build_context.cache().root())
822 .map_err(Error::CacheWrite)?;
823 let mut file = writer.into_inner();
824 file.seek(io::SeekFrom::Start(0))
825 .await
826 .map_err(Error::CacheWrite)?;
827
828 let hashes = if hashes.is_none() {
830 let file = file.into_std().await;
831 tokio::task::spawn_blocking({
832 let target = temp_dir.path().to_owned();
833 move || -> Result<(), uv_extract::Error> {
834 match extension {
836 WheelExtension::Whl => {
837 uv_extract::unzip(file, &target)?;
838 }
839 WheelExtension::WhlZst => {
840 uv_extract::stream::untar_zst_file(file, &target)?;
841 }
842 }
843 Ok(())
844 }
845 })
846 .await?
847 .map_err(|err| Error::Extract(filename.to_string(), err))?;
848
849 HashDigests::empty()
850 } else {
851 let algorithms = hashes.algorithms();
853 let mut hashers = algorithms.into_iter().map(Hasher::from).collect::<Vec<_>>();
854 let mut hasher = uv_extract::hash::HashReader::new(file, &mut hashers);
855
856 match extension {
857 WheelExtension::Whl => {
858 uv_extract::stream::unzip(&mut hasher, temp_dir.path())
859 .await
860 .map_err(|err| Error::Extract(filename.to_string(), err))?;
861 }
862 WheelExtension::WhlZst => {
863 uv_extract::stream::untar_zst(&mut hasher, temp_dir.path())
864 .await
865 .map_err(|err| Error::Extract(filename.to_string(), err))?;
866 }
867 }
868
869 hasher.finish().await.map_err(Error::HashExhaustion)?;
871
872 hashers.into_iter().map(HashDigest::from).collect()
873 };
874
875 let id = self
877 .build_context
878 .cache()
879 .persist(temp_dir.keep(), wheel_entry.path())
880 .await
881 .map_err(Error::CacheRead)?;
882
883 if let Some((reporter, progress)) = progress {
884 reporter.on_download_complete(dist.name(), progress);
885 }
886
887 Ok(Archive::new(id, hashes, filename.clone()))
888 }
889 .instrument(info_span!("wheel", wheel = %dist))
890 };
891
892 let req = self.request(url.clone())?;
894
895 let cache_control = match self.client.unmanaged.connectivity() {
897 Connectivity::Online => {
898 if let Some(header) = index.and_then(|index| {
899 self.build_context
900 .locations()
901 .artifact_cache_control_for(index)
902 }) {
903 CacheControl::Override(header)
904 } else {
905 CacheControl::from(
906 self.build_context
907 .cache()
908 .freshness(&http_entry, Some(&filename.name), None)
909 .map_err(Error::CacheRead)?,
910 )
911 }
912 }
913 Connectivity::Offline => CacheControl::AllowStale,
914 };
915
916 let archive = self
917 .client
918 .managed(|client| {
919 client.cached_client().get_serde_with_retry(
920 req,
921 &http_entry,
922 cache_control,
923 download,
924 )
925 })
926 .await
927 .map_err(|err| match err {
928 CachedClientError::Callback { err, .. } => err,
929 CachedClientError::Client { err, .. } => Error::Client(err),
930 })?;
931
932 let archive = Some(archive)
934 .filter(|archive| archive.has_digests(hashes))
935 .filter(|archive| archive.exists(self.build_context.cache()));
936
937 let archive = if let Some(archive) = archive {
938 archive
939 } else {
940 self.client
941 .managed(async |client| {
942 client
943 .cached_client()
944 .skip_cache_with_retry(
945 self.request(url)?,
946 &http_entry,
947 cache_control,
948 download,
949 )
950 .await
951 .map_err(|err| match err {
952 CachedClientError::Callback { err, .. } => err,
953 CachedClientError::Client { err, .. } => Error::Client(err),
954 })
955 })
956 .await?
957 };
958
959 Ok(archive)
960 }
961
962 async fn load_wheel(
964 &self,
965 path: &Path,
966 filename: &WheelFilename,
967 extension: WheelExtension,
968 wheel_entry: CacheEntry,
969 dist: &BuiltDist,
970 hashes: HashPolicy<'_>,
971 ) -> Result<LocalWheel, Error> {
972 #[cfg(windows)]
973 let _lock = {
974 let lock_entry = wheel_entry.with_file(format!("{}.lock", filename.stem()));
975 lock_entry.lock().await.map_err(Error::CacheLock)?
976 };
977
978 let modified = Timestamp::from_path(path).map_err(Error::CacheRead)?;
980
981 let pointer_entry = wheel_entry.with_file(format!("{}.rev", filename.cache_key()));
983 let pointer = LocalArchivePointer::read_from(&pointer_entry)?;
984
985 let archive = pointer
987 .filter(|pointer| pointer.is_up_to_date(modified))
988 .map(LocalArchivePointer::into_archive)
989 .filter(|archive| archive.has_digests(hashes));
990
991 if let Some(archive) = archive {
993 Ok(LocalWheel {
994 dist: Dist::Built(dist.clone()),
995 archive: self
996 .build_context
997 .cache()
998 .archive(&archive.id)
999 .into_boxed_path(),
1000 hashes: archive.hashes,
1001 filename: filename.clone(),
1002 cache: CacheInfo::from_timestamp(modified),
1003 build: None,
1004 })
1005 } else if hashes.is_none() {
1006 let archive = Archive::new(
1008 self.unzip_wheel(path, wheel_entry.path()).await?,
1009 HashDigests::empty(),
1010 filename.clone(),
1011 );
1012
1013 let pointer = LocalArchivePointer {
1015 timestamp: modified,
1016 archive: archive.clone(),
1017 };
1018 pointer.write_to(&pointer_entry).await?;
1019
1020 Ok(LocalWheel {
1021 dist: Dist::Built(dist.clone()),
1022 archive: self
1023 .build_context
1024 .cache()
1025 .archive(&archive.id)
1026 .into_boxed_path(),
1027 hashes: archive.hashes,
1028 filename: filename.clone(),
1029 cache: CacheInfo::from_timestamp(modified),
1030 build: None,
1031 })
1032 } else {
1033 let file = fs_err::tokio::File::open(path)
1035 .await
1036 .map_err(Error::CacheRead)?;
1037 let temp_dir = tempfile::tempdir_in(self.build_context.cache().root())
1038 .map_err(Error::CacheWrite)?;
1039
1040 let algorithms = hashes.algorithms();
1042 let mut hashers = algorithms.into_iter().map(Hasher::from).collect::<Vec<_>>();
1043 let mut hasher = uv_extract::hash::HashReader::new(file, &mut hashers);
1044
1045 match extension {
1047 WheelExtension::Whl => {
1048 uv_extract::stream::unzip(&mut hasher, temp_dir.path())
1049 .await
1050 .map_err(|err| Error::Extract(filename.to_string(), err))?;
1051 }
1052 WheelExtension::WhlZst => {
1053 uv_extract::stream::untar_zst(&mut hasher, temp_dir.path())
1054 .await
1055 .map_err(|err| Error::Extract(filename.to_string(), err))?;
1056 }
1057 }
1058
1059 hasher.finish().await.map_err(Error::HashExhaustion)?;
1061
1062 let hashes = hashers.into_iter().map(HashDigest::from).collect();
1063
1064 let id = self
1066 .build_context
1067 .cache()
1068 .persist(temp_dir.keep(), wheel_entry.path())
1069 .await
1070 .map_err(Error::CacheWrite)?;
1071
1072 let archive = Archive::new(id, hashes, filename.clone());
1074
1075 let pointer = LocalArchivePointer {
1077 timestamp: modified,
1078 archive: archive.clone(),
1079 };
1080 pointer.write_to(&pointer_entry).await?;
1081
1082 Ok(LocalWheel {
1083 dist: Dist::Built(dist.clone()),
1084 archive: self
1085 .build_context
1086 .cache()
1087 .archive(&archive.id)
1088 .into_boxed_path(),
1089 hashes: archive.hashes,
1090 filename: filename.clone(),
1091 cache: CacheInfo::from_timestamp(modified),
1092 build: None,
1093 })
1094 }
1095 }
1096
1097 async fn unzip_wheel(&self, path: &Path, target: &Path) -> Result<ArchiveId, Error> {
1099 let temp_dir = tokio::task::spawn_blocking({
1100 let path = path.to_owned();
1101 let root = self.build_context.cache().root().to_path_buf();
1102 move || -> Result<TempDir, Error> {
1103 let temp_dir = tempfile::tempdir_in(root).map_err(Error::CacheWrite)?;
1105 let reader = fs_err::File::open(&path).map_err(Error::CacheWrite)?;
1106 uv_extract::unzip(reader, temp_dir.path())
1107 .map_err(|err| Error::Extract(path.to_string_lossy().into_owned(), err))?;
1108 Ok(temp_dir)
1109 }
1110 })
1111 .await??;
1112
1113 let id = self
1115 .build_context
1116 .cache()
1117 .persist(temp_dir.keep(), target)
1118 .await
1119 .map_err(Error::CacheWrite)?;
1120
1121 Ok(id)
1122 }
1123
1124 fn request(&self, url: DisplaySafeUrl) -> Result<reqwest::Request, reqwest::Error> {
1126 self.client
1127 .unmanaged
1128 .uncached_client(&url)
1129 .get(Url::from(url))
1130 .header(
1131 "accept-encoding",
1135 reqwest::header::HeaderValue::from_static("identity"),
1136 )
1137 .build()
1138 }
1139
1140 pub fn client(&self) -> &ManagedClient<'a> {
1142 &self.client
1143 }
1144}
1145
1146pub struct ManagedClient<'a> {
1148 pub unmanaged: &'a RegistryClient,
1149 control: Semaphore,
1150}
1151
1152impl<'a> ManagedClient<'a> {
1153 fn new(client: &'a RegistryClient, concurrency: usize) -> Self {
1155 ManagedClient {
1156 unmanaged: client,
1157 control: Semaphore::new(concurrency),
1158 }
1159 }
1160
1161 pub async fn managed<F, T>(&self, f: impl FnOnce(&'a RegistryClient) -> F) -> T
1166 where
1167 F: Future<Output = T>,
1168 {
1169 let _permit = self.control.acquire().await.unwrap();
1170 f(self.unmanaged).await
1171 }
1172
1173 pub async fn manual<F, T>(&'a self, f: impl FnOnce(&'a RegistryClient, &'a Semaphore) -> F) -> T
1181 where
1182 F: Future<Output = T>,
1183 {
1184 f(self.unmanaged, &self.control).await
1185 }
1186}
1187
1188fn content_length(response: &reqwest::Response) -> Option<u64> {
1190 response
1191 .headers()
1192 .get(reqwest::header::CONTENT_LENGTH)
1193 .and_then(|val| val.to_str().ok())
1194 .and_then(|val| val.parse::<u64>().ok())
1195}
1196
1197struct ProgressReader<'a, R> {
1199 reader: R,
1200 index: usize,
1201 reporter: &'a dyn Reporter,
1202}
1203
1204impl<'a, R> ProgressReader<'a, R> {
1205 fn new(reader: R, index: usize, reporter: &'a dyn Reporter) -> Self {
1207 Self {
1208 reader,
1209 index,
1210 reporter,
1211 }
1212 }
1213}
1214
1215impl<R> AsyncRead for ProgressReader<'_, R>
1216where
1217 R: AsyncRead + Unpin,
1218{
1219 fn poll_read(
1220 mut self: Pin<&mut Self>,
1221 cx: &mut Context<'_>,
1222 buf: &mut ReadBuf<'_>,
1223 ) -> Poll<io::Result<()>> {
1224 Pin::new(&mut self.as_mut().reader)
1225 .poll_read(cx, buf)
1226 .map_ok(|()| {
1227 self.reporter
1228 .on_download_progress(self.index, buf.filled().len() as u64);
1229 })
1230 }
1231}
1232
1233#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
1237pub struct HttpArchivePointer {
1238 archive: Archive,
1239}
1240
1241impl HttpArchivePointer {
1242 pub fn read_from(path: impl AsRef<Path>) -> Result<Option<Self>, Error> {
1244 match fs_err::File::open(path.as_ref()) {
1245 Ok(file) => {
1246 let data = DataWithCachePolicy::from_reader(file)?.data;
1247 let archive = rmp_serde::from_slice::<Archive>(&data)?;
1248 Ok(Some(Self { archive }))
1249 }
1250 Err(err) if err.kind() == io::ErrorKind::NotFound => Ok(None),
1251 Err(err) => Err(Error::CacheRead(err)),
1252 }
1253 }
1254
1255 pub fn into_archive(self) -> Archive {
1257 self.archive
1258 }
1259
1260 pub fn to_cache_info(&self) -> CacheInfo {
1262 CacheInfo::default()
1263 }
1264
1265 pub fn to_build_info(&self) -> Option<BuildInfo> {
1267 None
1268 }
1269}
1270
1271#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
1275pub struct LocalArchivePointer {
1276 timestamp: Timestamp,
1277 archive: Archive,
1278}
1279
1280impl LocalArchivePointer {
1281 pub fn read_from(path: impl AsRef<Path>) -> Result<Option<Self>, Error> {
1283 match fs_err::read(path) {
1284 Ok(cached) => Ok(Some(rmp_serde::from_slice::<Self>(&cached)?)),
1285 Err(err) if err.kind() == io::ErrorKind::NotFound => Ok(None),
1286 Err(err) => Err(Error::CacheRead(err)),
1287 }
1288 }
1289
1290 pub async fn write_to(&self, entry: &CacheEntry) -> Result<(), Error> {
1292 write_atomic(entry.path(), rmp_serde::to_vec(&self)?)
1293 .await
1294 .map_err(Error::CacheWrite)
1295 }
1296
1297 pub fn is_up_to_date(&self, modified: Timestamp) -> bool {
1299 self.timestamp == modified
1300 }
1301
1302 pub fn into_archive(self) -> Archive {
1304 self.archive
1305 }
1306
1307 pub fn to_cache_info(&self) -> CacheInfo {
1309 CacheInfo::from_timestamp(self.timestamp)
1310 }
1311
1312 pub fn to_build_info(&self) -> Option<BuildInfo> {
1314 None
1315 }
1316}
1317
1318#[derive(Debug, Clone)]
1319struct WheelTarget {
1320 url: DisplaySafeUrl,
1322 extension: WheelExtension,
1324 size: Option<u64>,
1326}
1327
1328impl TryFrom<&File> for WheelTarget {
1329 type Error = ToUrlError;
1330
1331 fn try_from(file: &File) -> Result<Self, Self::Error> {
1333 let url = file.url.to_url()?;
1334 if let Some(zstd) = file.zstd.as_ref() {
1335 Ok(Self {
1336 url: add_tar_zst_extension(url),
1337 extension: WheelExtension::WhlZst,
1338 size: zstd.size,
1339 })
1340 } else {
1341 Ok(Self {
1342 url,
1343 extension: WheelExtension::Whl,
1344 size: file.size,
1345 })
1346 }
1347 }
1348}
1349
1350#[derive(Debug, Copy, Clone, PartialEq, Eq)]
1351enum WheelExtension {
1352 Whl,
1354 WhlZst,
1356}
1357
1358#[must_use]
1360fn add_tar_zst_extension(mut url: DisplaySafeUrl) -> DisplaySafeUrl {
1361 let mut path = url.path().to_string();
1362
1363 if !path.ends_with(".tar.zst") {
1364 path.push_str(".tar.zst");
1365 }
1366
1367 url.set_path(&path);
1368 url
1369}
1370
1371#[cfg(test)]
1372mod tests {
1373 use super::*;
1374
1375 #[test]
1376 fn test_add_tar_zst_extension() {
1377 let url =
1378 DisplaySafeUrl::parse("https://files.pythonhosted.org/flask-3.1.0-py3-none-any.whl")
1379 .unwrap();
1380 assert_eq!(
1381 add_tar_zst_extension(url).as_str(),
1382 "https://files.pythonhosted.org/flask-3.1.0-py3-none-any.whl.tar.zst"
1383 );
1384
1385 let url = DisplaySafeUrl::parse(
1386 "https://files.pythonhosted.org/flask-3.1.0-py3-none-any.whl.tar.zst",
1387 )
1388 .unwrap();
1389 assert_eq!(
1390 add_tar_zst_extension(url).as_str(),
1391 "https://files.pythonhosted.org/flask-3.1.0-py3-none-any.whl.tar.zst"
1392 );
1393
1394 let url = DisplaySafeUrl::parse(
1395 "https://files.pythonhosted.org/flask-3.1.0%2Bcu124-py3-none-any.whl",
1396 )
1397 .unwrap();
1398 assert_eq!(
1399 add_tar_zst_extension(url).as_str(),
1400 "https://files.pythonhosted.org/flask-3.1.0%2Bcu124-py3-none-any.whl.tar.zst"
1401 );
1402 }
1403}