1use std::fmt::Formatter;
69use {
70 crate::{
71 binary_package_control::BinaryPackageControlFile,
72 binary_package_list::BinaryPackageList,
73 control::ControlParagraphAsyncReader,
74 deb::reader::BinaryPackageReader,
75 debian_source_control::{DebianSourceControlFile, DebianSourceControlFileFetch},
76 debian_source_package_list::DebianSourcePackageList,
77 error::{DebianError, Result},
78 io::{drain_reader, Compression, ContentDigest, DataResolver},
79 repository::{
80 contents::{ContentsFile, ContentsFileAsyncReader},
81 release::{
82 ChecksumType, ClassifiedReleaseFileEntry, ContentsFileEntry, PackagesFileEntry,
83 ReleaseFile, SourcesFileEntry,
84 },
85 },
86 },
87 async_trait::async_trait,
88 futures::{AsyncRead, AsyncReadExt, StreamExt, TryStreamExt},
89 std::{borrow::Cow, collections::HashMap, ops::Deref, pin::Pin, str::FromStr},
90};
91
92pub mod builder;
93pub mod contents;
94pub mod copier;
95pub mod filesystem;
96#[cfg(feature = "http")]
97pub mod http;
98pub mod proxy_writer;
99pub mod release;
100#[cfg(feature = "s3")]
101pub mod s3;
102pub mod sink_writer;
103
104#[derive(Clone, Debug)]
106pub struct BinaryPackageFetch<'a> {
107 pub control_file: BinaryPackageControlFile<'a>,
109 pub path: String,
113 pub size: u64,
115 pub digest: ContentDigest,
117}
118
119pub struct SourcePackageFetch<'a> {
121 pub control_file: DebianSourceControlFile<'a>,
123 fetch: DebianSourceControlFileFetch,
125}
126
127impl<'a> Deref for SourcePackageFetch<'a> {
128 type Target = DebianSourceControlFileFetch;
129
130 fn deref(&self) -> &Self::Target {
131 &self.fetch
132 }
133}
134
135#[async_trait]
140pub trait RepositoryRootReader: DataResolver + Sync {
141 fn url(&self) -> Result<url::Url>;
143
144 async fn release_reader(&self, distribution: &str) -> Result<Box<dyn ReleaseReader>> {
149 self.release_reader_with_distribution_path(&format!(
150 "dists/{}",
151 distribution.trim_matches('/')
152 ))
153 .await
154 }
155
156 async fn release_reader_with_distribution_path(
162 &self,
163 path: &str,
164 ) -> Result<Box<dyn ReleaseReader>>;
165
166 async fn fetch_inrelease(&self, path: &str) -> Result<ReleaseFile<'static>> {
173 let mut reader = self.get_path(path).await?;
174
175 let mut data = vec![];
176 reader.read_to_end(&mut data).await?;
177
178 Ok(ReleaseFile::from_armored_reader(std::io::Cursor::new(
179 data,
180 ))?)
181 }
182
183 async fn fetch_release(&self, path: &str) -> Result<ReleaseFile<'static>> {
190 let mut reader = self.get_path(path).await?;
191
192 let mut data = vec![];
193 reader.read_to_end(&mut data).await?;
194
195 Ok(ReleaseFile::from_reader(std::io::Cursor::new(data))?)
196 }
197 async fn fetch_inrelease_or_release(
203 &self,
204 inrelease_path: &str,
205 release_path: &str,
206 ) -> Result<ReleaseFile<'static>> {
207 match self.fetch_inrelease(inrelease_path).await {
208 Ok(release) => Ok(release),
209 Err(DebianError::RepositoryIoPath(_, e))
210 if e.kind() == std::io::ErrorKind::NotFound =>
211 {
212 self.fetch_release(release_path).await
213 }
214 Err(e) => Err(e),
215 }
216 }
217
218 async fn fetch_binary_package_generic<'fetch>(
222 &self,
223 fetch: BinaryPackageFetch<'fetch>,
224 ) -> Result<Pin<Box<dyn AsyncRead + Send>>> {
225 self.get_path_with_digest_verification(&fetch.path, fetch.size, fetch.digest)
226 .await
227 }
228
229 async fn fetch_binary_package_deb_reader<'fetch>(
236 &self,
237 fetch: BinaryPackageFetch<'fetch>,
238 ) -> Result<BinaryPackageReader<std::io::Cursor<Vec<u8>>>> {
239 let mut reader = self.fetch_binary_package_generic(fetch).await?;
240 let mut buf = vec![];
242 reader.read_to_end(&mut buf).await?;
243
244 Ok(BinaryPackageReader::new(std::io::Cursor::new(buf))?)
245 }
246
247 async fn fetch_source_package_generic<'fetch>(
251 &self,
252 fetch: SourcePackageFetch<'fetch>,
253 ) -> Result<Pin<Box<dyn AsyncRead + Send>>> {
254 self.get_path_with_digest_verification(&fetch.path, fetch.size, fetch.digest.clone())
255 .await
256 }
257}
258
259#[async_trait]
261pub trait ReleaseReader: DataResolver + Sync {
262 fn url(&self) -> Result<url::Url>;
264
265 fn root_relative_path(&self) -> &str;
271
272 fn release_file(&self) -> &ReleaseFile<'_>;
274
275 fn retrieve_checksum(&self) -> Result<ChecksumType> {
280 let release = self.release_file();
281
282 let checksum = &[ChecksumType::Sha256, ChecksumType::Sha1, ChecksumType::Md5]
283 .iter()
284 .find(|variant| release.field(variant.field_name()).is_some())
285 .ok_or(DebianError::RepositoryReadReleaseNoKnownChecksum)?;
286
287 Ok(**checksum)
288 }
289
290 fn preferred_compression(&self) -> Compression;
292
293 fn set_preferred_compression(&mut self, compression: Compression);
299
300 fn classified_indices_entries(&self) -> Result<Vec<ClassifiedReleaseFileEntry<'_>>> {
302 self.release_file()
303 .iter_classified_index_files(self.retrieve_checksum()?)
304 .ok_or(DebianError::ReleaseNoIndicesFiles)?
305 .collect::<Result<Vec<_>>>()
306 }
307
308 fn packages_indices_entries(&self) -> Result<Vec<PackagesFileEntry<'_>>> {
316 Ok(
317 if let Some(entries) = self
318 .release_file()
319 .iter_packages_indices(self.retrieve_checksum()?)
320 {
321 entries.collect::<Result<Vec<_>>>()?
322 } else {
323 vec![]
324 },
325 )
326 }
327
328 fn packages_indices_entries_preferred_compression(&self) -> Result<Vec<PackagesFileEntry<'_>>> {
333 let mut entries = HashMap::new();
334
335 for entry in self.packages_indices_entries()? {
336 entries
337 .entry((
338 entry.component.clone(),
339 entry.architecture.clone(),
340 entry.is_installer,
341 ))
342 .or_insert_with(Vec::new)
343 .push(entry);
344 }
345
346 entries
347 .into_values()
348 .map(|candidates| {
349 if let Some(entry) = candidates
350 .iter()
351 .find(|entry| entry.compression == self.preferred_compression())
352 {
353 Ok(entry.clone())
354 } else {
355 for compression in Compression::default_preferred_order() {
356 if let Some(entry) = candidates
357 .iter()
358 .find(|entry| entry.compression == compression)
359 {
360 return Ok(entry.clone());
361 }
362 }
363
364 Err(DebianError::RepositoryReadPackagesIndicesEntryNotFound)
365 }
366 })
367 .collect::<Result<Vec<_>>>()
368 }
369
370 fn contents_indices_entries(&self) -> Result<Vec<ContentsFileEntry<'_>>> {
377 Ok(
378 if let Some(entries) = self
379 .release_file()
380 .iter_contents_indices(self.retrieve_checksum()?)
381 {
382 entries.collect::<Result<Vec<_>>>()?
383 } else {
384 vec![]
385 },
386 )
387 }
388
389 fn sources_indices_entries(&self) -> Result<Vec<SourcesFileEntry<'_>>> {
396 Ok(
397 if let Some(entries) = self
398 .release_file()
399 .iter_sources_indices(self.retrieve_checksum()?)
400 {
401 entries.collect::<Result<Vec<_>>>()?
402 } else {
403 vec![]
404 },
405 )
406 }
407
408 fn sources_indices_entries_preferred_compression(&self) -> Result<Vec<SourcesFileEntry<'_>>> {
413 let mut entries = HashMap::new();
414
415 for entry in self.sources_indices_entries()? {
416 entries
417 .entry(entry.component.clone())
418 .or_insert_with(Vec::new)
419 .push(entry);
420 }
421
422 entries
423 .into_values()
424 .map(|candidates| {
425 if let Some(entry) = candidates
426 .iter()
427 .find(|entry| entry.compression == self.preferred_compression())
428 {
429 Ok(entry.clone())
430 } else {
431 for compression in Compression::default_preferred_order() {
432 if let Some(entry) = candidates
433 .iter()
434 .find(|entry| entry.compression == compression)
435 {
436 return Ok(entry.clone());
437 }
438 }
439
440 Err(DebianError::RepositoryReadPackagesIndicesEntryNotFound)
441 }
442 })
443 .collect::<Result<Vec<_>>>()
444 }
445
446 fn packages_entry(
452 &self,
453 component: &str,
454 architecture: &str,
455 is_installer: bool,
456 ) -> Result<PackagesFileEntry<'_>> {
457 self.packages_indices_entries_preferred_compression()?
458 .into_iter()
459 .find(|entry| {
460 entry.component == component
461 && entry.architecture == architecture
462 && entry.is_installer == is_installer
463 })
464 .ok_or(DebianError::RepositoryReadPackagesIndicesEntryNotFound)
465 }
466
467 async fn resolve_packages_from_entry<'entry, 'slf: 'entry>(
469 &'slf self,
470 entry: &'entry PackagesFileEntry<'slf>,
471 ) -> Result<BinaryPackageList<'static>> {
472 let release = self.release_file();
473
474 let path = if release.acquire_by_hash().unwrap_or_default() {
475 entry.by_hash_path()
476 } else {
477 entry.path.to_string()
478 };
479
480 let mut reader = ControlParagraphAsyncReader::new(futures::io::BufReader::new(
481 self.get_path_decoded_with_digest_verification(
482 &path,
483 entry.compression,
484 entry.size,
485 entry.digest.clone(),
486 )
487 .await?,
488 ));
489
490 let mut res = BinaryPackageList::default();
491
492 while let Some(paragraph) = reader.read_paragraph().await? {
493 res.push(BinaryPackageControlFile::from(paragraph));
494 }
495
496 Ok(res)
497 }
498
499 async fn resolve_packages(
501 &self,
502 component: &str,
503 arch: &str,
504 is_installer: bool,
505 ) -> Result<BinaryPackageList<'static>> {
506 let entry = self.packages_entry(component, arch, is_installer)?;
507
508 self.resolve_packages_from_entry(&entry).await
509 }
510
511 async fn resolve_package_fetches(
528 &self,
529 packages_file_filter: Box<dyn (Fn(PackagesFileEntry) -> bool) + Send>,
530 binary_package_filter: Box<dyn (Fn(BinaryPackageControlFile) -> bool) + Send>,
531 threads: usize,
532 ) -> Result<Vec<BinaryPackageFetch<'_>>> {
533 let packages_entries = self.packages_indices_entries_preferred_compression()?;
534
535 let fs = packages_entries
536 .iter()
537 .filter(|entry| packages_file_filter((*entry).clone()))
538 .map(|entry| self.resolve_packages_from_entry(entry))
539 .collect::<Vec<_>>();
540
541 let mut packages_fs = futures::stream::iter(fs).buffer_unordered(threads);
542
543 let mut fetches = vec![];
544
545 while let Some(pl) = packages_fs.try_next().await? {
546 for cf in pl.into_iter() {
547 let cf: BinaryPackageControlFile = cf;
549
550 if binary_package_filter(cf.clone()) {
551 let path = cf.required_field_str("Filename")?.to_string();
552
553 let size = cf.field_u64("Size").ok_or_else(|| {
554 DebianError::ControlRequiredFieldMissing("Size".to_string())
555 })??;
556
557 let digest = ChecksumType::preferred_order()
558 .find_map(|checksum| {
559 cf.field_str(checksum.field_name()).map(|hex_digest| {
560 ContentDigest::from_hex_digest(checksum, hex_digest)
561 })
562 })
563 .ok_or(DebianError::RepositoryReadCouldNotDeterminePackageDigest)??;
564
565 fetches.push(BinaryPackageFetch {
566 control_file: cf,
567 path,
568 size,
569 digest,
570 });
571 }
572 }
573 }
574
575 Ok(fetches)
576 }
577
578 fn sources_entry(&self, component: &str) -> Result<SourcesFileEntry<'_>> {
584 self.sources_indices_entries_preferred_compression()?
585 .into_iter()
586 .find(|entry| entry.component == component)
587 .ok_or(DebianError::RepositoryReadSourcesIndicesEntryNotFound)
588 }
589
590 async fn resolve_sources_from_entry<'entry, 'slf: 'entry>(
594 &'slf self,
595 entry: &'entry SourcesFileEntry<'slf>,
596 ) -> Result<DebianSourcePackageList<'static>> {
597 let release = self.release_file();
598
599 let path = if release.acquire_by_hash().unwrap_or_default() {
600 entry.by_hash_path()
601 } else {
602 entry.path.to_string()
603 };
604
605 let mut reader = ControlParagraphAsyncReader::new(futures::io::BufReader::new(
606 self.get_path_decoded_with_digest_verification(
607 &path,
608 entry.compression,
609 entry.size,
610 entry.digest.clone(),
611 )
612 .await?,
613 ));
614
615 let mut res = DebianSourcePackageList::default();
616
617 while let Some(paragraph) = reader.read_paragraph().await? {
618 res.push(paragraph.into());
619 }
620
621 Ok(res)
622 }
623
624 async fn resolve_sources(&self, component: &str) -> Result<DebianSourcePackageList<'static>> {
629 let entry = self.sources_entry(component)?;
630
631 self.resolve_sources_from_entry(&entry).await
632 }
633
634 async fn resolve_source_fetches(
650 &self,
651 sources_file_filter: Box<dyn (Fn(SourcesFileEntry) -> bool) + Send>,
652 source_package_filter: Box<dyn (Fn(DebianSourceControlFile) -> bool) + Send>,
653 threads: usize,
654 ) -> Result<Vec<SourcePackageFetch<'_>>> {
655 let sources_entries = self.sources_indices_entries_preferred_compression()?;
656
657 let fs = sources_entries
658 .iter()
659 .filter(|entry| sources_file_filter((*entry).clone()))
660 .map(|entry| self.resolve_sources_from_entry(entry))
661 .collect::<Vec<_>>();
662
663 let mut sources_fs = futures::stream::iter(fs).buffer_unordered(threads);
664
665 let mut fetches = vec![];
666
667 while let Some(pl) = sources_fs.try_next().await? {
668 for cf in pl.into_iter() {
669 if source_package_filter(cf.clone_no_signatures()) {
670 for fetch in cf.file_fetches(self.retrieve_checksum()?)? {
671 let fetch = fetch?;
672
673 fetches.push(SourcePackageFetch {
674 control_file: cf.clone_no_signatures(),
675 fetch,
676 });
677 }
678 }
679 }
680 }
681
682 Ok(fetches)
683 }
684
685 fn contents_entry(
689 &self,
690 component: Option<&str>,
691 architecture: &str,
692 is_installer: bool,
693 ) -> Result<ContentsFileEntry> {
694 let component = component.map(Cow::from);
695
696 let entries = self
697 .contents_indices_entries()?
698 .into_iter()
699 .filter(|entry| {
700 entry.component == component
701 && entry.architecture == architecture
702 && entry.is_installer == is_installer
703 })
704 .collect::<Vec<_>>();
705
706 if let Some(entry) = entries
707 .iter()
708 .find(|entry| entry.compression == self.preferred_compression())
709 {
710 Ok(entry.clone())
711 } else {
712 for compression in Compression::default_preferred_order() {
713 if let Some(entry) = entries
714 .iter()
715 .find(|entry| entry.compression == compression)
716 {
717 return Ok(entry.clone());
718 }
719 }
720
721 Err(DebianError::RepositoryReadContentsIndicesEntryNotFound)
722 }
723 }
724
725 async fn resolve_contents(
726 &self,
727 component: Option<&str>,
728 architecture: &str,
729 is_installer: bool,
730 ) -> Result<ContentsFile> {
731 let release = self.release_file();
732 let entry = self.contents_entry(component, architecture, is_installer)?;
733
734 let path = if release.acquire_by_hash().unwrap_or_default() {
735 entry.by_hash_path()
736 } else {
737 entry.path.to_string()
738 };
739
740 let reader = self
741 .get_path_decoded_with_digest_verification(
742 &path,
743 entry.compression,
744 entry.size,
745 entry.digest.clone(),
746 )
747 .await?;
748
749 let mut reader = ContentsFileAsyncReader::new(futures::io::BufReader::new(reader));
750 reader.read_all().await?;
751
752 let (contents, reader) = reader.consume();
753
754 drain_reader(reader)
755 .await
756 .map_err(|e| DebianError::RepositoryIoPath(path, e))?;
757
758 Ok(contents)
759 }
760}
761
762#[derive(Clone, Copy, Debug)]
764pub enum RepositoryPathVerificationState {
765 ExistsNoIntegrityCheck,
767 ExistsIntegrityVerified,
769 ExistsIntegrityMismatch,
771 Missing,
773}
774
775#[derive(Clone, Debug)]
777pub struct RepositoryPathVerification<'a> {
778 pub path: &'a str,
780 pub state: RepositoryPathVerificationState,
782}
783
784impl<'a> std::fmt::Display for RepositoryPathVerification<'a> {
785 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
786 match self.state {
787 RepositoryPathVerificationState::ExistsNoIntegrityCheck => {
788 write!(f, "{} exists (no integrity check performed)", self.path)
789 }
790 RepositoryPathVerificationState::ExistsIntegrityVerified => {
791 write!(f, "{} exists (integrity verified)", self.path)
792 }
793 RepositoryPathVerificationState::ExistsIntegrityMismatch => {
794 write!(f, "{} exists (integrity mismatch!)", self.path)
795 }
796 RepositoryPathVerificationState::Missing => {
797 write!(f, "{} missing", self.path)
798 }
799 }
800 }
801}
802
803#[derive(Clone, Copy, Debug)]
805pub enum CopyPhase {
806 BinaryPackages,
807 InstallerBinaryPackages,
808 Sources,
809 Installers,
810 ReleaseIndices,
811 ReleaseFiles,
812}
813
814impl std::fmt::Display for CopyPhase {
815 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
816 write!(
817 f,
818 "{}",
819 match self {
820 Self::BinaryPackages => "binary packages",
821 Self::InstallerBinaryPackages => "installer binary packages",
822 Self::Sources => "sources",
823 Self::Installers => "installers",
824 Self::ReleaseIndices => "release indices",
825 Self::ReleaseFiles => "release files",
826 }
827 )
828 }
829}
830
831pub enum PublishEvent {
835 ResolvedPoolArtifacts(usize),
836
837 PoolArtifactCurrent(String),
839
840 PoolArtifactMissing(String),
842
843 PoolArtifactsToPublish(usize),
845
846 PoolArtifactCreated(String, u64),
848
849 IndexFileToWrite(String),
851
852 IndexFileWritten(String, u64),
854
855 VerifyingDestinationPath(String),
857
858 CopyPhaseBegin(CopyPhase),
860
861 CopyPhaseEnd(CopyPhase),
863
864 CopyingPath(String, String),
866
867 CopyIndicesPathNotFound(String),
869
870 PathCopied(String, u64),
872
873 PathCopyNoop(String),
875
876 WriteSequenceBeginWithTotalBytes(u64),
878
879 WriteSequenceProgressBytes(u64),
881
882 WriteSequenceFinished,
884}
885
886impl std::fmt::Display for PublishEvent {
887 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
888 match self {
889 Self::ResolvedPoolArtifacts(count) => {
890 write!(f, "resolved {} needed pool artifacts", count)
891 }
892 Self::PoolArtifactCurrent(path) => {
893 write!(f, "pool path {} is present", path)
894 }
895 Self::PoolArtifactMissing(path) => {
896 write!(f, "pool path {} will be written", path)
897 }
898 Self::PoolArtifactsToPublish(count) => {
899 write!(f, "{} pool artifacts will be written", count)
900 }
901 Self::PoolArtifactCreated(path, size) => {
902 write!(f, "wrote {} bytes to {}", size, path)
903 }
904 Self::IndexFileToWrite(path) => {
905 write!(f, "index file {} will be written", path)
906 }
907 Self::IndexFileWritten(path, size) => {
908 write!(f, "wrote {} bytes to {}", size, path)
909 }
910 Self::VerifyingDestinationPath(path) => {
911 write!(f, "verifying destination path {}", path)
912 }
913 Self::CopyPhaseBegin(phase) => {
914 write!(f, "beginning copying of {}", phase)
915 }
916 Self::CopyPhaseEnd(phase) => {
917 write!(f, "finished copying of {}", phase)
918 }
919 Self::CopyingPath(source, dest) => {
920 write!(f, "copying {} to {}", source, dest)
921 }
922 Self::CopyIndicesPathNotFound(path) => {
923 write!(
924 f,
925 "copying indices file {} failed because it wasn't found",
926 path
927 )
928 }
929 Self::PathCopied(path, size) => {
930 write!(f, "copied {} bytes to {}", size, path)
931 }
932 Self::PathCopyNoop(path) => {
933 write!(f, "copy of {} was a no-op", path)
934 }
935 Self::WriteSequenceBeginWithTotalBytes(_)
936 | Self::WriteSequenceProgressBytes(_)
937 | Self::WriteSequenceFinished => Ok(()),
938 }
939 }
940}
941
942impl PublishEvent {
943 pub fn is_loggable(&self) -> bool {
945 !self.is_progress()
946 }
947
948 pub fn is_progress(&self) -> bool {
950 matches!(
951 self,
952 Self::WriteSequenceBeginWithTotalBytes(_)
953 | Self::WriteSequenceProgressBytes(_)
954 | Self::WriteSequenceFinished
955 )
956 }
957}
958
959#[derive(Clone, Debug)]
960pub struct RepositoryWrite<'a> {
961 pub path: Cow<'a, str>,
963 pub bytes_written: u64,
965}
966
967pub enum RepositoryWriteOperation<'a> {
969 PathWritten(RepositoryWrite<'a>),
971 Noop(Cow<'a, str>, u64),
973}
974
975impl<'a> RepositoryWriteOperation<'a> {
976 pub fn bytes_written(&self) -> u64 {
977 match self {
978 Self::PathWritten(write) => write.bytes_written,
979 Self::Noop(_, size) => *size,
980 }
981 }
982}
983
984#[async_trait]
991pub trait RepositoryWriter: Sync {
992 async fn verify_path<'path>(
998 &self,
999 path: &'path str,
1000 expected_content: Option<(u64, ContentDigest)>,
1001 ) -> Result<RepositoryPathVerification<'path>>;
1002
1003 async fn write_path<'path, 'reader>(
1007 &self,
1008 path: Cow<'path, str>,
1009 reader: Pin<Box<dyn AsyncRead + Send + 'reader>>,
1010 ) -> Result<RepositoryWrite<'path>>;
1011
1012 async fn copy_from<'path>(
1024 &self,
1025 reader: &dyn RepositoryRootReader,
1026 source_path: Cow<'path, str>,
1027 expected_content: Option<(u64, ContentDigest)>,
1028 dest_path: Cow<'path, str>,
1029 progress_cb: &Option<Box<dyn Fn(PublishEvent) + Sync>>,
1030 ) -> Result<RepositoryWriteOperation<'path>> {
1031 if let Some(cb) = progress_cb {
1032 cb(PublishEvent::VerifyingDestinationPath(
1033 dest_path.to_string(),
1034 ));
1035 }
1036
1037 let verification = self
1038 .verify_path(dest_path.as_ref(), expected_content.clone())
1039 .await?;
1040
1041 if matches!(
1042 verification.state,
1043 RepositoryPathVerificationState::ExistsIntegrityVerified
1044 ) {
1045 return Ok(RepositoryWriteOperation::Noop(
1046 dest_path,
1047 if let Some((size, _)) = expected_content {
1048 size
1049 } else {
1050 0
1051 },
1052 ));
1053 }
1054
1055 if let Some(cb) = progress_cb {
1056 cb(PublishEvent::CopyingPath(
1057 source_path.to_string(),
1058 dest_path.to_string(),
1059 ));
1060 }
1061
1062 let reader = if let Some((size, digest)) = expected_content {
1063 reader
1064 .get_path_with_digest_verification(source_path.as_ref(), size, digest)
1065 .await?
1066 } else {
1067 reader.get_path(source_path.as_ref()).await?
1068 };
1069
1070 let write = self.write_path(dest_path, reader).await?;
1071
1072 Ok(RepositoryWriteOperation::PathWritten(write))
1073 }
1074}
1075
1076pub fn reader_from_str(s: impl ToString) -> Result<Box<dyn RepositoryRootReader>> {
1084 let s = s.to_string();
1085
1086 if s.contains("://") {
1087 let url = url::Url::parse(&s)?;
1088
1089 match url.scheme() {
1090 "file" => Ok(Box::new(filesystem::FilesystemRepositoryReader::new(
1091 url.to_file_path()
1092 .expect("path conversion should always work for file://"),
1093 ))),
1094 #[cfg(feature = "http")]
1095 "http" | "https" => Ok(Box::new(http::HttpRepositoryClient::new(url)?)),
1096 _ => Err(DebianError::RepositoryReaderUnrecognizedUrl(s)),
1097 }
1098 } else {
1099 Ok(Box::new(filesystem::FilesystemRepositoryReader::new(s)))
1101 }
1102}
1103
1104pub async fn writer_from_str(s: impl ToString) -> Result<Box<dyn RepositoryWriter>> {
1112 let s = s.to_string();
1113
1114 if s.contains("://") {
1115 let url = url::Url::parse(&s)?;
1116
1117 match url.scheme() {
1118 "file" => Ok(Box::new(filesystem::FilesystemRepositoryWriter::new(
1119 url.to_file_path()
1120 .expect("path conversion should always work for file://"),
1121 ))),
1122 "null" => {
1123 let mut writer = sink_writer::SinkWriter::default();
1124
1125 let behavior = match url.host_str() {
1126 Some(s) => sink_writer::SinkWriterVerifyBehavior::from_str(s)?,
1127 None => sink_writer::SinkWriterVerifyBehavior::Missing,
1128 };
1129
1130 writer.set_verify_behavior(behavior);
1131
1132 Ok(Box::new(writer))
1133 }
1134 #[cfg(feature = "s3")]
1135 "s3" => {
1136 let path = url.path();
1137
1138 if let Some((bucket, prefix)) = path.trim_matches('/').split_once('/') {
1139 let region = s3::get_bucket_region(bucket).await?;
1140
1141 Ok(Box::new(s3::S3Writer::new(region, bucket, Some(prefix))))
1142 } else {
1143 let region = s3::get_bucket_region(path).await?;
1144
1145 Ok(Box::new(s3::S3Writer::new(region, path, None)))
1146 }
1147 }
1148 _ => Err(DebianError::RepositoryWriterUnrecognizedUrl(s)),
1149 }
1150 } else {
1151 Ok(Box::new(filesystem::FilesystemRepositoryWriter::new(s)))
1152 }
1153}