1use std::path::PathBuf;
2use std::sync::Arc;
3use std::time::Duration;
4
5use async_zip::Compression;
6use serde::{Deserialize, Serialize};
7
8use crate::constants::*;
9use crate::s3_uri::{S3Object, S3Prefix};
10
11#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
13pub enum ZipCompression {
14 #[default]
16 Deflate,
17 #[cfg(feature = "zstd")]
19 Zstd,
20}
21
22impl ZipCompression {
23 pub(crate) fn to_async_zip(self) -> Compression {
24 match self {
25 ZipCompression::Deflate => Compression::Deflate,
26 #[cfg(feature = "zstd")]
27 ZipCompression::Zstd => Compression::Zstd,
28 }
29 }
30
31 pub fn as_str(self) -> &'static str {
33 match self {
34 ZipCompression::Deflate => "deflate",
35 #[cfg(feature = "zstd")]
36 ZipCompression::Zstd => "zstd",
37 }
38 }
39}
40
41#[derive(Clone, Debug, Default, Eq, PartialEq)]
50pub struct UnzipSelection {
51 patterns: Vec<String>,
52}
53
54impl UnzipSelection {
55 pub fn new() -> Self {
57 Self::default()
58 }
59
60 pub fn patterns(patterns: impl IntoIterator<Item = impl Into<String>>) -> Self {
67 Self {
68 patterns: patterns.into_iter().map(Into::into).collect(),
69 }
70 }
71
72 pub fn include(mut self, pattern: impl Into<String>) -> Self {
77 self.patterns
78 .push(escape_leading_gitignore_marker(pattern.into()));
79 self
80 }
81
82 pub fn exclude(mut self, pattern: impl Into<String>) -> Self {
87 self.patterns.push(format!(
88 "!{}",
89 escape_leading_gitignore_marker(pattern.into())
90 ));
91 self
92 }
93
94 pub fn is_empty(&self) -> bool {
96 self.patterns.is_empty()
97 }
98
99 pub fn as_patterns(&self) -> &[String] {
101 &self.patterns
102 }
103}
104
105impl<const N: usize> From<[&str; N]> for UnzipSelection {
106 fn from(patterns: [&str; N]) -> Self {
107 Self::patterns(patterns)
108 }
109}
110
111impl From<Vec<String>> for UnzipSelection {
112 fn from(patterns: Vec<String>) -> Self {
113 Self { patterns }
114 }
115}
116
117fn escape_leading_gitignore_marker(pattern: String) -> String {
118 if pattern.starts_with('!') || pattern.starts_with('#') {
119 format!("\\{pattern}")
120 } else {
121 pattern
122 }
123}
124
125#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
127pub enum DestinationCleanup {
128 #[default]
130 KeepExtra,
131 DeleteExtra,
133}
134
135impl DestinationCleanup {
136 pub(crate) fn deletes_extra(self) -> bool {
137 matches!(self, Self::DeleteExtra)
138 }
139}
140
141#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
143pub enum ComparisonMode {
144 #[default]
146 CatalogThenHash,
147 HashEntries,
149}
150
151impl ComparisonMode {
152 pub(crate) fn ignores_embedded_catalog(self) -> bool {
153 matches!(self, Self::HashEntries)
154 }
155}
156
157#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
159pub enum ConflictPolicy {
160 #[default]
162 ReportAndContinue,
163 FailFast,
165}
166
167impl ConflictPolicy {
168 pub(crate) fn fails_fast(self) -> bool {
169 matches!(self, Self::FailFast)
170 }
171}
172
173#[derive(Clone, Debug)]
175pub struct SyncOptions {
176 pub(crate) source: S3Object,
178 pub(crate) destination: S3Prefix,
180 pub(crate) cleanup: DestinationCleanup,
185 pub(crate) selection: UnzipSelection,
189 pub(crate) collect_diagnostics: bool,
191 pub(crate) comparison: ComparisonMode,
193 pub(crate) conflict_policy: ConflictPolicy,
195 pub(crate) collect_operations: bool,
197 pub(crate) concurrency: usize,
201 pub(crate) put_concurrency: usize,
205 pub(crate) put_retry_policy: PutRetryPolicy,
207 pub(crate) source_block_size: usize,
211 pub(crate) source_block_merge_gap: usize,
213 pub(crate) source_get_concurrency: usize,
217 pub(crate) source_window_capacity: usize,
222 pub(crate) source_window_memory_budget_mb: Option<u64>,
230 pub(crate) body_chunk_size: usize,
234 pub(crate) pipe_capacity: usize,
238}
239
240impl SyncOptions {
241 pub fn new(source: S3Object, destination: S3Prefix) -> Self {
243 Self {
244 source,
245 destination,
246 cleanup: DestinationCleanup::default(),
247 selection: UnzipSelection::default(),
248 collect_diagnostics: false,
249 comparison: ComparisonMode::default(),
250 conflict_policy: ConflictPolicy::default(),
251 collect_operations: true,
252 concurrency: DEFAULT_CONCURRENCY,
253 put_concurrency: DEFAULT_PUT_CONCURRENCY,
254 put_retry_policy: PutRetryPolicy::default(),
255 source_block_size: DEFAULT_SOURCE_BLOCK_SIZE,
256 source_block_merge_gap: DEFAULT_SOURCE_BLOCK_MERGE_GAP,
257 source_get_concurrency: DEFAULT_SOURCE_GET_CONCURRENCY,
258 source_window_capacity: DEFAULT_SOURCE_WINDOW_CAPACITY,
259 source_window_memory_budget_mb: None,
260 body_chunk_size: DEFAULT_BODY_CHUNK_SIZE,
261 pipe_capacity: DEFAULT_PIPE_CAPACITY,
262 }
263 }
264
265 pub fn source(&self) -> &S3Object {
267 &self.source
268 }
269
270 pub fn destination(&self) -> &S3Prefix {
272 &self.destination
273 }
274
275 pub fn cleanup(&self) -> DestinationCleanup {
277 self.cleanup
278 }
279
280 pub fn selection(&self) -> &UnzipSelection {
282 &self.selection
283 }
284
285 pub fn collects_diagnostics(&self) -> bool {
287 self.collect_diagnostics
288 }
289
290 pub fn comparison_mode(&self) -> ComparisonMode {
292 self.comparison
293 }
294
295 pub fn conflict_policy(&self) -> ConflictPolicy {
297 self.conflict_policy
298 }
299
300 pub fn collects_operations(&self) -> bool {
302 self.collect_operations
303 }
304
305 pub fn concurrency(&self) -> usize {
307 self.concurrency
308 }
309
310 pub fn put_concurrency(&self) -> usize {
312 self.put_concurrency
313 }
314
315 pub fn put_retry_policy(&self) -> &PutRetryPolicy {
317 &self.put_retry_policy
318 }
319
320 pub fn source_block_size(&self) -> usize {
322 self.source_block_size
323 }
324
325 pub fn source_block_merge_gap(&self) -> usize {
327 self.source_block_merge_gap
328 }
329
330 pub fn source_get_concurrency(&self) -> usize {
332 self.source_get_concurrency
333 }
334
335 pub fn source_window_capacity(&self) -> usize {
341 self.source_window_capacity
342 }
343
344 pub fn source_window_memory_budget_mb(&self) -> Option<u64> {
346 self.source_window_memory_budget_mb
347 }
348
349 pub fn body_chunk_size(&self) -> usize {
351 self.body_chunk_size
352 }
353
354 pub fn pipe_capacity(&self) -> usize {
356 self.pipe_capacity
357 }
358
359 pub fn with_selection(mut self, selection: impl Into<UnzipSelection>) -> Self {
361 self.selection = selection.into();
362 self
363 }
364
365 pub fn delete_extra_objects(mut self) -> Self {
370 self.cleanup = DestinationCleanup::DeleteExtra;
371 self
372 }
373
374 pub fn with_cleanup(mut self, cleanup: DestinationCleanup) -> Self {
376 self.cleanup = cleanup;
377 self
378 }
379
380 pub fn collect_diagnostics(mut self) -> Self {
382 self.collect_diagnostics = true;
383 self
384 }
385
386 pub fn force_hash_comparison(mut self) -> Self {
388 self.comparison = ComparisonMode::HashEntries;
389 self
390 }
391
392 pub fn with_comparison_mode(mut self, comparison: ComparisonMode) -> Self {
394 self.comparison = comparison;
395 self
396 }
397
398 pub fn fail_on_conflict(mut self) -> Self {
400 self.conflict_policy = ConflictPolicy::FailFast;
401 self
402 }
403
404 pub fn with_conflict_policy(mut self, conflict_policy: ConflictPolicy) -> Self {
406 self.conflict_policy = conflict_policy;
407 self
408 }
409
410 pub fn without_operations(mut self) -> Self {
412 self.collect_operations = false;
413 self
414 }
415
416 pub fn with_concurrency(mut self, concurrency: usize) -> Self {
418 self.concurrency = concurrency;
419 self
420 }
421
422 pub fn with_put_concurrency(mut self, put_concurrency: usize) -> Self {
424 self.put_concurrency = put_concurrency;
425 self
426 }
427
428 pub fn with_put_retry_policy(mut self, put_retry_policy: PutRetryPolicy) -> Self {
430 self.put_retry_policy = put_retry_policy;
431 self
432 }
433
434 pub fn with_source_block_size(mut self, source_block_size: usize) -> Self {
436 self.source_block_size = source_block_size;
437 self
438 }
439
440 pub fn with_source_block_merge_gap(mut self, source_block_merge_gap: usize) -> Self {
442 self.source_block_merge_gap = source_block_merge_gap;
443 self
444 }
445
446 pub fn with_source_get_concurrency(mut self, source_get_concurrency: usize) -> Self {
448 self.source_get_concurrency = source_get_concurrency;
449 self
450 }
451
452 pub fn with_source_window_capacity(mut self, source_window_capacity: usize) -> Self {
454 self.source_window_capacity = source_window_capacity;
455 self
456 }
457
458 pub fn with_source_window_memory_budget_mb(
460 mut self,
461 source_window_memory_budget_mb: u64,
462 ) -> Self {
463 self.source_window_memory_budget_mb = Some(source_window_memory_budget_mb);
464 self
465 }
466
467 pub fn with_body_chunk_size(mut self, body_chunk_size: usize) -> Self {
469 self.body_chunk_size = body_chunk_size;
470 self
471 }
472
473 pub fn with_pipe_capacity(mut self, pipe_capacity: usize) -> Self {
475 self.pipe_capacity = pipe_capacity;
476 self
477 }
478}
479
480#[derive(Clone, Debug, Eq, PartialEq)]
482pub struct PutRetryPolicy {
483 pub(crate) max_attempts: usize,
487 pub(crate) base_delay: Duration,
489 pub(crate) max_delay: Duration,
493 pub(crate) slowdown_base_delay: Duration,
495 pub(crate) slowdown_max_delay: Duration,
499 pub(crate) jitter: RetryJitter,
501}
502
503impl Default for PutRetryPolicy {
504 fn default() -> Self {
505 Self {
506 max_attempts: PUT_OBJECT_MAX_ATTEMPTS,
507 base_delay: Duration::from_millis(PUT_OBJECT_RETRY_BASE_DELAY_MS),
508 max_delay: Duration::from_millis(PUT_OBJECT_RETRY_MAX_DELAY_MS),
509 slowdown_base_delay: Duration::from_millis(PUT_OBJECT_SLOWDOWN_RETRY_BASE_DELAY_MS),
510 slowdown_max_delay: Duration::from_millis(PUT_OBJECT_SLOWDOWN_RETRY_MAX_DELAY_MS),
511 jitter: RetryJitter::Full,
512 }
513 }
514}
515
516impl PutRetryPolicy {
517 pub fn max_attempts(&self) -> usize {
519 self.max_attempts
520 }
521
522 pub fn base_delay(&self) -> Duration {
524 self.base_delay
525 }
526
527 pub fn max_delay(&self) -> Duration {
529 self.max_delay
530 }
531
532 pub fn slowdown_base_delay(&self) -> Duration {
534 self.slowdown_base_delay
535 }
536
537 pub fn slowdown_max_delay(&self) -> Duration {
539 self.slowdown_max_delay
540 }
541
542 pub fn jitter(&self) -> RetryJitter {
544 self.jitter
545 }
546
547 pub fn with_max_attempts(mut self, max_attempts: usize) -> Self {
549 self.max_attempts = max_attempts;
550 self
551 }
552
553 pub fn with_base_delay(mut self, base_delay: Duration) -> Self {
555 self.base_delay = base_delay;
556 self
557 }
558
559 pub fn with_max_delay(mut self, max_delay: Duration) -> Self {
561 self.max_delay = max_delay;
562 self
563 }
564
565 pub fn with_slowdown_base_delay(mut self, slowdown_base_delay: Duration) -> Self {
567 self.slowdown_base_delay = slowdown_base_delay;
568 self
569 }
570
571 pub fn with_slowdown_max_delay(mut self, slowdown_max_delay: Duration) -> Self {
573 self.slowdown_max_delay = slowdown_max_delay;
574 self
575 }
576
577 pub fn with_jitter(mut self, jitter: RetryJitter) -> Self {
579 self.jitter = jitter;
580 self
581 }
582}
583
584#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize, Deserialize)]
586#[serde(rename_all = "snake_case")]
587pub enum RetryJitter {
588 Full,
590 None,
592}
593
594pub fn adaptive_source_get_concurrency(available_memory_mb: u64) -> usize {
599 let slots = available_memory_mb / ADAPTIVE_SOURCE_GET_MEMORY_STEP_MB;
600 usize::try_from(slots)
601 .unwrap_or(usize::MAX)
602 .clamp(1, ADAPTIVE_SOURCE_MAX_GET_CONCURRENCY)
603}
604
605#[derive(Clone, Copy, Debug, Eq, PartialEq)]
611pub struct AdaptiveSourceWindow {
612 pub(crate) available_memory_mb: u64,
614 pub(crate) source_zip_bytes: u64,
616 pub(crate) zip_file_count: usize,
618 pub(crate) concurrency: usize,
620 pub(crate) source_block_size: usize,
622 pub(crate) source_get_concurrency: usize,
624}
625
626impl AdaptiveSourceWindow {
627 pub fn new(available_memory_mb: u64, source_zip_bytes: u64, zip_file_count: usize) -> Self {
629 Self {
630 available_memory_mb,
631 source_zip_bytes,
632 zip_file_count,
633 concurrency: DEFAULT_CONCURRENCY,
634 source_block_size: DEFAULT_SOURCE_BLOCK_SIZE,
635 source_get_concurrency: DEFAULT_SOURCE_GET_CONCURRENCY,
636 }
637 }
638
639 pub fn with_concurrency(mut self, concurrency: usize) -> Self {
641 self.concurrency = concurrency;
642 self
643 }
644
645 pub fn with_source_block_size(mut self, source_block_size: usize) -> Self {
647 self.source_block_size = source_block_size;
648 self
649 }
650
651 pub fn with_source_get_concurrency(mut self, source_get_concurrency: usize) -> Self {
653 self.source_get_concurrency = source_get_concurrency;
654 self
655 }
656
657 pub fn capacity(self) -> usize {
659 let Some(available_memory_bytes) = self.available_memory_mb.checked_mul(1024 * 1024) else {
660 return usize::try_from(self.source_zip_bytes).unwrap_or(usize::MAX);
661 };
662 let concurrency = u64::try_from(self.concurrency.max(1)).unwrap_or(u64::MAX);
663 let zip_file_count = u64::try_from(self.zip_file_count).unwrap_or(u64::MAX);
664 let worker_budget = concurrency.saturating_mul(ADAPTIVE_CACHE_WORKER_OVERHEAD);
665 let file_budget = zip_file_count.saturating_mul(ADAPTIVE_CACHE_FILE_OVERHEAD);
666 let in_flight_budget = u64::try_from(self.source_get_concurrency.max(1))
667 .unwrap_or(u64::MAX)
668 .saturating_mul(u64::try_from(self.source_block_size).unwrap_or(u64::MAX));
669 let reserved = ADAPTIVE_CACHE_BASE_OVERHEAD
670 .saturating_add(worker_budget)
671 .saturating_add(file_budget)
672 .saturating_add(in_flight_budget);
673 let capacity = available_memory_bytes
674 .saturating_sub(reserved)
675 .min(self.source_zip_bytes);
676 let capacity = if capacity > ADAPTIVE_CACHE_LARGE_THRESHOLD {
677 capacity.saturating_sub(ADAPTIVE_CACHE_LARGE_RSS_SLACK)
678 } else {
679 capacity
680 }
681 .min(ADAPTIVE_CACHE_MAX_WINDOW_CAPACITY);
682
683 let minimum_block_capacity = u64::try_from(self.source_block_size.max(1))
684 .unwrap_or(u64::MAX)
685 .min(self.source_zip_bytes);
686 let capacity = capacity.max(minimum_block_capacity);
687
688 usize::try_from(capacity).unwrap_or(usize::MAX)
689 }
690}
691
692#[derive(Clone)]
694pub struct UploadOptions {
695 pub(crate) source_dir: PathBuf,
697 pub(crate) destination: S3Object,
699 pub(crate) include_catalog: bool,
701 pub(crate) compression: ZipCompression,
703 pub(crate) body_chunk_size: usize,
707 pub(crate) pipe_capacity: usize,
711 pub(crate) progress: Option<UploadProgressHandler>,
713}
714
715#[derive(Clone)]
717pub struct LocalZipOptions {
718 pub(crate) source_dir: PathBuf,
720 pub(crate) destination_zip: PathBuf,
722 pub(crate) include_catalog: bool,
724 pub(crate) compression: ZipCompression,
726 pub(crate) progress: Option<UploadProgressHandler>,
728}
729
730#[derive(Clone)]
732pub struct S3PrefixUploadOptions {
733 pub(crate) source: S3Prefix,
735 pub(crate) destination: S3Object,
737 pub(crate) include_catalog: bool,
739 pub(crate) compression: ZipCompression,
741 pub(crate) body_chunk_size: usize,
745 pub(crate) pipe_capacity: usize,
749 pub(crate) progress: Option<UploadProgressHandler>,
751}
752
753#[derive(Clone)]
755pub struct S3PrefixLocalZipOptions {
756 pub(crate) source: S3Prefix,
758 pub(crate) destination_zip: PathBuf,
760 pub(crate) include_catalog: bool,
762 pub(crate) compression: ZipCompression,
764 pub(crate) progress: Option<UploadProgressHandler>,
766}
767
768#[derive(Clone)]
770pub struct LocalZipSyncOptions {
771 pub(crate) source_zip: PathBuf,
773 pub(crate) destination: S3Prefix,
775 pub(crate) cleanup: DestinationCleanup,
780 pub(crate) selection: UnzipSelection,
784 pub(crate) comparison: ComparisonMode,
786 pub(crate) conflict_policy: ConflictPolicy,
788 pub(crate) collect_operations: bool,
790 pub(crate) concurrency: usize,
792 pub(crate) body_chunk_size: usize,
794 pub(crate) pipe_capacity: usize,
796}
797
798#[derive(Clone)]
800pub struct S3ZipLocalUnzipOptions {
801 pub(crate) source: S3Object,
803 pub(crate) destination_dir: PathBuf,
805 pub(crate) selection: UnzipSelection,
807 pub(crate) collect_diagnostics: bool,
809 pub(crate) comparison: ComparisonMode,
811 pub(crate) collect_operations: bool,
813 pub(crate) concurrency: usize,
815 pub(crate) source_block_size: usize,
817 pub(crate) source_block_merge_gap: usize,
819 pub(crate) source_get_concurrency: usize,
821 pub(crate) source_window_capacity: usize,
823 pub(crate) source_window_memory_budget_mb: Option<u64>,
825}
826
827#[derive(Clone)]
829pub struct LocalUnzipOptions {
830 pub(crate) source_zip: PathBuf,
832 pub(crate) destination_dir: PathBuf,
834 pub(crate) selection: UnzipSelection,
836 pub(crate) comparison: ComparisonMode,
838 pub(crate) collect_operations: bool,
840 pub(crate) concurrency: usize,
842}
843
844impl S3PrefixUploadOptions {
845 pub fn new(source: S3Prefix, destination: S3Object) -> Self {
847 Self {
848 source,
849 destination,
850 include_catalog: true,
851 compression: ZipCompression::Deflate,
852 body_chunk_size: DEFAULT_BODY_CHUNK_SIZE,
853 pipe_capacity: DEFAULT_PIPE_CAPACITY,
854 progress: None,
855 }
856 }
857
858 pub fn without_catalog(mut self) -> Self {
860 self.include_catalog = false;
861 self
862 }
863
864 pub fn with_compression(mut self, compression: ZipCompression) -> Self {
866 self.compression = compression;
867 self
868 }
869
870 pub fn with_body_chunk_size(mut self, body_chunk_size: usize) -> Self {
872 self.body_chunk_size = body_chunk_size;
873 self
874 }
875
876 pub fn with_pipe_capacity(mut self, pipe_capacity: usize) -> Self {
878 self.pipe_capacity = pipe_capacity;
879 self
880 }
881
882 pub fn with_progress(self, callback: impl Fn(UploadProgress) + Send + Sync + 'static) -> Self {
884 self.with_progress_handler(UploadProgressHandler::new(callback))
885 }
886
887 pub fn with_progress_handler(mut self, progress: UploadProgressHandler) -> Self {
889 self.progress = Some(progress);
890 self
891 }
892}
893
894impl LocalZipOptions {
895 pub fn new(source_dir: impl Into<PathBuf>, destination_zip: impl Into<PathBuf>) -> Self {
897 Self {
898 source_dir: source_dir.into(),
899 destination_zip: destination_zip.into(),
900 include_catalog: true,
901 compression: ZipCompression::Deflate,
902 progress: None,
903 }
904 }
905
906 pub fn without_catalog(mut self) -> Self {
908 self.include_catalog = false;
909 self
910 }
911
912 pub fn with_compression(mut self, compression: ZipCompression) -> Self {
914 self.compression = compression;
915 self
916 }
917
918 pub fn with_progress(self, callback: impl Fn(UploadProgress) + Send + Sync + 'static) -> Self {
920 self.with_progress_handler(UploadProgressHandler::new(callback))
921 }
922
923 pub fn with_progress_handler(mut self, progress: UploadProgressHandler) -> Self {
925 self.progress = Some(progress);
926 self
927 }
928}
929
930impl std::fmt::Debug for LocalZipOptions {
931 fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
932 formatter
933 .debug_struct("LocalZipOptions")
934 .field("source_dir", &self.source_dir)
935 .field("destination_zip", &self.destination_zip)
936 .field("include_catalog", &self.include_catalog)
937 .field("compression", &self.compression)
938 .field(
939 "progress",
940 &self.progress.as_ref().map(|_| "UploadProgressHandler"),
941 )
942 .finish()
943 }
944}
945
946impl std::fmt::Debug for S3PrefixUploadOptions {
947 fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
948 formatter
949 .debug_struct("S3PrefixUploadOptions")
950 .field("source", &self.source)
951 .field("destination", &self.destination)
952 .field("include_catalog", &self.include_catalog)
953 .field("compression", &self.compression)
954 .field("body_chunk_size", &self.body_chunk_size)
955 .field("pipe_capacity", &self.pipe_capacity)
956 .field(
957 "progress",
958 &self.progress.as_ref().map(|_| "UploadProgressHandler"),
959 )
960 .finish()
961 }
962}
963
964impl S3PrefixLocalZipOptions {
965 pub fn new(source: S3Prefix, destination_zip: impl Into<PathBuf>) -> Self {
967 Self {
968 source,
969 destination_zip: destination_zip.into(),
970 include_catalog: true,
971 compression: ZipCompression::Deflate,
972 progress: None,
973 }
974 }
975
976 pub fn without_catalog(mut self) -> Self {
978 self.include_catalog = false;
979 self
980 }
981
982 pub fn with_compression(mut self, compression: ZipCompression) -> Self {
984 self.compression = compression;
985 self
986 }
987
988 pub fn with_progress(self, callback: impl Fn(UploadProgress) + Send + Sync + 'static) -> Self {
990 self.with_progress_handler(UploadProgressHandler::new(callback))
991 }
992
993 pub fn with_progress_handler(mut self, progress: UploadProgressHandler) -> Self {
995 self.progress = Some(progress);
996 self
997 }
998}
999
1000impl std::fmt::Debug for S3PrefixLocalZipOptions {
1001 fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1002 formatter
1003 .debug_struct("S3PrefixLocalZipOptions")
1004 .field("source", &self.source)
1005 .field("destination_zip", &self.destination_zip)
1006 .field("include_catalog", &self.include_catalog)
1007 .field("compression", &self.compression)
1008 .field(
1009 "progress",
1010 &self.progress.as_ref().map(|_| "UploadProgressHandler"),
1011 )
1012 .finish()
1013 }
1014}
1015
1016impl UploadOptions {
1017 pub fn new(source_dir: impl Into<PathBuf>, destination: S3Object) -> Self {
1019 Self {
1020 source_dir: source_dir.into(),
1021 destination,
1022 include_catalog: true,
1023 compression: ZipCompression::Deflate,
1024 body_chunk_size: DEFAULT_BODY_CHUNK_SIZE,
1025 pipe_capacity: DEFAULT_PIPE_CAPACITY,
1026 progress: None,
1027 }
1028 }
1029
1030 pub fn without_catalog(mut self) -> Self {
1032 self.include_catalog = false;
1033 self
1034 }
1035
1036 pub fn with_compression(mut self, compression: ZipCompression) -> Self {
1038 self.compression = compression;
1039 self
1040 }
1041
1042 pub fn with_body_chunk_size(mut self, body_chunk_size: usize) -> Self {
1044 self.body_chunk_size = body_chunk_size;
1045 self
1046 }
1047
1048 pub fn with_pipe_capacity(mut self, pipe_capacity: usize) -> Self {
1050 self.pipe_capacity = pipe_capacity;
1051 self
1052 }
1053
1054 pub fn with_progress(self, callback: impl Fn(UploadProgress) + Send + Sync + 'static) -> Self {
1056 self.with_progress_handler(UploadProgressHandler::new(callback))
1057 }
1058
1059 pub fn with_progress_handler(mut self, progress: UploadProgressHandler) -> Self {
1061 self.progress = Some(progress);
1062 self
1063 }
1064}
1065
1066impl LocalZipSyncOptions {
1067 pub fn new(source_zip: impl Into<PathBuf>, destination: S3Prefix) -> Self {
1069 Self {
1070 source_zip: source_zip.into(),
1071 destination,
1072 cleanup: DestinationCleanup::default(),
1073 selection: UnzipSelection::default(),
1074 comparison: ComparisonMode::default(),
1075 conflict_policy: ConflictPolicy::default(),
1076 collect_operations: true,
1077 concurrency: DEFAULT_CONCURRENCY,
1078 body_chunk_size: DEFAULT_BODY_CHUNK_SIZE,
1079 pipe_capacity: DEFAULT_PIPE_CAPACITY,
1080 }
1081 }
1082
1083 pub fn with_selection(mut self, selection: impl Into<UnzipSelection>) -> Self {
1085 self.selection = selection.into();
1086 self
1087 }
1088
1089 pub fn delete_extra_objects(mut self) -> Self {
1094 self.cleanup = DestinationCleanup::DeleteExtra;
1095 self
1096 }
1097
1098 pub fn with_cleanup(mut self, cleanup: DestinationCleanup) -> Self {
1100 self.cleanup = cleanup;
1101 self
1102 }
1103
1104 pub fn force_hash_comparison(mut self) -> Self {
1106 self.comparison = ComparisonMode::HashEntries;
1107 self
1108 }
1109
1110 pub fn with_comparison_mode(mut self, comparison: ComparisonMode) -> Self {
1112 self.comparison = comparison;
1113 self
1114 }
1115
1116 pub fn conflict_policy(&self) -> ConflictPolicy {
1118 self.conflict_policy
1119 }
1120
1121 pub fn fail_on_conflict(mut self) -> Self {
1123 self.conflict_policy = ConflictPolicy::FailFast;
1124 self
1125 }
1126
1127 pub fn with_conflict_policy(mut self, conflict_policy: ConflictPolicy) -> Self {
1129 self.conflict_policy = conflict_policy;
1130 self
1131 }
1132
1133 pub fn without_operations(mut self) -> Self {
1135 self.collect_operations = false;
1136 self
1137 }
1138
1139 pub fn with_concurrency(mut self, concurrency: usize) -> Self {
1141 self.concurrency = concurrency;
1142 self
1143 }
1144
1145 pub fn with_body_chunk_size(mut self, body_chunk_size: usize) -> Self {
1147 self.body_chunk_size = body_chunk_size;
1148 self
1149 }
1150
1151 pub fn with_pipe_capacity(mut self, pipe_capacity: usize) -> Self {
1153 self.pipe_capacity = pipe_capacity;
1154 self
1155 }
1156}
1157
1158impl std::fmt::Debug for LocalZipSyncOptions {
1159 fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1160 formatter
1161 .debug_struct("LocalZipSyncOptions")
1162 .field("source_zip", &self.source_zip)
1163 .field("destination", &self.destination)
1164 .field("cleanup", &self.cleanup)
1165 .field("selection", &self.selection)
1166 .field("comparison", &self.comparison)
1167 .field("conflict_policy", &self.conflict_policy)
1168 .field("collect_operations", &self.collect_operations)
1169 .field("concurrency", &self.concurrency)
1170 .field("body_chunk_size", &self.body_chunk_size)
1171 .field("pipe_capacity", &self.pipe_capacity)
1172 .finish()
1173 }
1174}
1175
1176impl S3ZipLocalUnzipOptions {
1177 pub fn new(source: S3Object, destination_dir: impl Into<PathBuf>) -> Self {
1179 Self {
1180 source,
1181 destination_dir: destination_dir.into(),
1182 selection: UnzipSelection::default(),
1183 collect_diagnostics: false,
1184 comparison: ComparisonMode::default(),
1185 collect_operations: true,
1186 concurrency: DEFAULT_CONCURRENCY,
1187 source_block_size: DEFAULT_SOURCE_BLOCK_SIZE,
1188 source_block_merge_gap: DEFAULT_SOURCE_BLOCK_MERGE_GAP,
1189 source_get_concurrency: DEFAULT_SOURCE_GET_CONCURRENCY,
1190 source_window_capacity: DEFAULT_SOURCE_WINDOW_CAPACITY,
1191 source_window_memory_budget_mb: None,
1192 }
1193 }
1194
1195 pub fn with_selection(mut self, selection: impl Into<UnzipSelection>) -> Self {
1197 self.selection = selection.into();
1198 self
1199 }
1200
1201 pub fn collect_diagnostics(mut self) -> Self {
1203 self.collect_diagnostics = true;
1204 self
1205 }
1206
1207 pub fn force_hash_comparison(mut self) -> Self {
1209 self.comparison = ComparisonMode::HashEntries;
1210 self
1211 }
1212
1213 pub fn with_comparison_mode(mut self, comparison: ComparisonMode) -> Self {
1215 self.comparison = comparison;
1216 self
1217 }
1218
1219 pub fn without_operations(mut self) -> Self {
1221 self.collect_operations = false;
1222 self
1223 }
1224
1225 pub fn with_concurrency(mut self, concurrency: usize) -> Self {
1227 self.concurrency = concurrency;
1228 self
1229 }
1230
1231 pub fn with_source_block_size(mut self, source_block_size: usize) -> Self {
1233 self.source_block_size = source_block_size;
1234 self
1235 }
1236
1237 pub fn with_source_block_merge_gap(mut self, source_block_merge_gap: usize) -> Self {
1239 self.source_block_merge_gap = source_block_merge_gap;
1240 self
1241 }
1242
1243 pub fn with_source_get_concurrency(mut self, source_get_concurrency: usize) -> Self {
1245 self.source_get_concurrency = source_get_concurrency;
1246 self
1247 }
1248
1249 pub fn with_source_window_capacity(mut self, source_window_capacity: usize) -> Self {
1251 self.source_window_capacity = source_window_capacity;
1252 self
1253 }
1254
1255 pub fn with_source_window_memory_budget_mb(
1257 mut self,
1258 source_window_memory_budget_mb: u64,
1259 ) -> Self {
1260 self.source_window_memory_budget_mb = Some(source_window_memory_budget_mb);
1261 self
1262 }
1263}
1264
1265impl std::fmt::Debug for S3ZipLocalUnzipOptions {
1266 fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1267 formatter
1268 .debug_struct("S3ZipLocalUnzipOptions")
1269 .field("source", &self.source)
1270 .field("destination_dir", &self.destination_dir)
1271 .field("selection", &self.selection)
1272 .field("collect_diagnostics", &self.collect_diagnostics)
1273 .field("comparison", &self.comparison)
1274 .field("collect_operations", &self.collect_operations)
1275 .field("concurrency", &self.concurrency)
1276 .field("source_block_size", &self.source_block_size)
1277 .field("source_block_merge_gap", &self.source_block_merge_gap)
1278 .field("source_get_concurrency", &self.source_get_concurrency)
1279 .field("source_window_capacity", &self.source_window_capacity)
1280 .field(
1281 "source_window_memory_budget_mb",
1282 &self.source_window_memory_budget_mb,
1283 )
1284 .finish()
1285 }
1286}
1287
1288impl LocalUnzipOptions {
1289 pub fn new(source_zip: impl Into<PathBuf>, destination_dir: impl Into<PathBuf>) -> Self {
1291 Self {
1292 source_zip: source_zip.into(),
1293 destination_dir: destination_dir.into(),
1294 selection: UnzipSelection::default(),
1295 comparison: ComparisonMode::default(),
1296 collect_operations: true,
1297 concurrency: DEFAULT_CONCURRENCY,
1298 }
1299 }
1300
1301 pub fn with_selection(mut self, selection: impl Into<UnzipSelection>) -> Self {
1303 self.selection = selection.into();
1304 self
1305 }
1306
1307 pub fn force_hash_comparison(mut self) -> Self {
1309 self.comparison = ComparisonMode::HashEntries;
1310 self
1311 }
1312
1313 pub fn with_comparison_mode(mut self, comparison: ComparisonMode) -> Self {
1315 self.comparison = comparison;
1316 self
1317 }
1318
1319 pub fn without_operations(mut self) -> Self {
1321 self.collect_operations = false;
1322 self
1323 }
1324
1325 pub fn with_concurrency(mut self, concurrency: usize) -> Self {
1327 self.concurrency = concurrency;
1328 self
1329 }
1330}
1331
1332impl std::fmt::Debug for LocalUnzipOptions {
1333 fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1334 formatter
1335 .debug_struct("LocalUnzipOptions")
1336 .field("source_zip", &self.source_zip)
1337 .field("destination_dir", &self.destination_dir)
1338 .field("selection", &self.selection)
1339 .field("comparison", &self.comparison)
1340 .field("collect_operations", &self.collect_operations)
1341 .field("concurrency", &self.concurrency)
1342 .finish()
1343 }
1344}
1345
1346impl std::fmt::Debug for UploadOptions {
1347 fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1348 formatter
1349 .debug_struct("UploadOptions")
1350 .field("source_dir", &self.source_dir)
1351 .field("destination", &self.destination)
1352 .field("include_catalog", &self.include_catalog)
1353 .field("compression", &self.compression)
1354 .field("body_chunk_size", &self.body_chunk_size)
1355 .field("pipe_capacity", &self.pipe_capacity)
1356 .field(
1357 "progress",
1358 &self.progress.as_ref().map(|_| "UploadProgressHandler"),
1359 )
1360 .finish()
1361 }
1362}
1363
1364#[derive(Clone)]
1370pub struct UploadProgressHandler {
1371 callback: Arc<dyn Fn(UploadProgress) + Send + Sync + 'static>,
1372}
1373
1374impl UploadProgressHandler {
1375 pub fn new(callback: impl Fn(UploadProgress) + Send + Sync + 'static) -> Self {
1377 Self {
1378 callback: Arc::new(callback),
1379 }
1380 }
1381
1382 pub(crate) fn emit(&self, progress: UploadProgress) {
1383 (self.callback)(progress);
1384 }
1385}
1386
1387impl std::fmt::Debug for UploadProgressHandler {
1388 fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1389 formatter.write_str("UploadProgressHandler")
1390 }
1391}
1392
1393#[derive(Clone, Debug, Eq, PartialEq)]
1395pub enum UploadProgress {
1396 Planned {
1398 total_files: usize,
1400 total_bytes: u64,
1402 },
1403 FileStarted {
1405 current_file: usize,
1407 total_files: usize,
1409 processed_files: usize,
1411 processed_bytes: u64,
1413 total_bytes: u64,
1415 path: String,
1417 },
1418 FileProgress {
1420 current_file: usize,
1422 total_files: usize,
1424 processed_files: usize,
1426 processed_bytes: u64,
1428 total_bytes: u64,
1430 path: String,
1432 },
1433 FileFinished {
1435 processed_files: usize,
1437 total_files: usize,
1439 processed_bytes: u64,
1441 total_bytes: u64,
1443 path: String,
1445 },
1446 Finished {
1451 total_files: usize,
1453 total_bytes: u64,
1455 },
1456}