Skip to main content

s3_unspool/
options.rs

1use std::path::PathBuf;
2use std::sync::Arc;
3use std::time::Duration;
4
5use async_zip::Compression;
6use serde::{Deserialize, Serialize};
7
8use crate::constants::*;
9use crate::s3_uri::{S3Object, S3Prefix};
10
11/// Compression method used for regular file entries when creating ZIP archives.
12#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
13pub enum ZipCompression {
14    /// Use Deflate, the default ZIP compression method supported by common tools.
15    #[default]
16    Deflate,
17    /// Use Zstandard method 93 for regular file entries.
18    #[cfg(feature = "zstd")]
19    Zstd,
20}
21
22impl ZipCompression {
23    pub(crate) fn to_async_zip(self) -> Compression {
24        match self {
25            ZipCompression::Deflate => Compression::Deflate,
26            #[cfg(feature = "zstd")]
27            ZipCompression::Zstd => Compression::Zstd,
28        }
29    }
30
31    /// Returns a stable lowercase name for display or configuration.
32    pub fn as_str(self) -> &'static str {
33        match self {
34            ZipCompression::Deflate => "deflate",
35            #[cfg(feature = "zstd")]
36            ZipCompression::Zstd => "zstd",
37        }
38    }
39}
40
41/// ZIP entry selection patterns for unzip APIs.
42///
43/// When no patterns are configured, unzip operations process every supported ZIP
44/// entry. Patterns are matched against normalized ZIP paths, not local
45/// filesystem paths or destination S3 keys. Use
46/// [`UnzipSelection::new`]/[`UnzipSelection::include`] for builder-style
47/// configuration, or pass an array such as `["docs/**", "!docs/drafts/**"]`
48/// to `with_selection`.
49#[derive(Clone, Debug, Default, Eq, PartialEq)]
50pub struct UnzipSelection {
51    patterns: Vec<String>,
52}
53
54impl UnzipSelection {
55    /// Creates an empty selection that extracts every supported ZIP entry.
56    pub fn new() -> Self {
57        Self::default()
58    }
59
60    /// Creates a selection from ordered include/exclude patterns.
61    ///
62    /// Patterns use gitignore-style matching. Later patterns override earlier
63    /// patterns, and patterns prefixed with `!` exclude matching ZIP paths.
64    /// If only exclude patterns are configured, every non-excluded ZIP path is
65    /// selected.
66    pub fn patterns(patterns: impl IntoIterator<Item = impl Into<String>>) -> Self {
67        Self {
68            patterns: patterns.into_iter().map(Into::into).collect(),
69        }
70    }
71
72    /// Adds an include pattern.
73    ///
74    /// Leading `!` and `#` characters are treated as literal path characters
75    /// in the builder API. Use [`Self::patterns`] for raw selection lines.
76    pub fn include(mut self, pattern: impl Into<String>) -> Self {
77        self.patterns
78            .push(escape_leading_gitignore_marker(pattern.into()));
79        self
80    }
81
82    /// Adds an exclude pattern.
83    ///
84    /// Leading `!` and `#` characters are treated as literal path characters
85    /// in the builder API. Use [`Self::patterns`] for raw selection lines.
86    pub fn exclude(mut self, pattern: impl Into<String>) -> Self {
87        self.patterns.push(format!(
88            "!{}",
89            escape_leading_gitignore_marker(pattern.into())
90        ));
91        self
92    }
93
94    /// Returns true when no selection patterns have been configured.
95    pub fn is_empty(&self) -> bool {
96        self.patterns.is_empty()
97    }
98
99    /// Returns the ordered selection patterns.
100    pub fn as_patterns(&self) -> &[String] {
101        &self.patterns
102    }
103}
104
105impl<const N: usize> From<[&str; N]> for UnzipSelection {
106    fn from(patterns: [&str; N]) -> Self {
107        Self::patterns(patterns)
108    }
109}
110
111impl From<Vec<String>> for UnzipSelection {
112    fn from(patterns: Vec<String>) -> Self {
113        Self { patterns }
114    }
115}
116
117fn escape_leading_gitignore_marker(pattern: String) -> String {
118    if pattern.starts_with('!') || pattern.starts_with('#') {
119        format!("\\{pattern}")
120    } else {
121        pattern
122    }
123}
124
125/// How an unzip-to-S3 operation treats destination objects that are not in the ZIP.
126#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
127pub enum DestinationCleanup {
128    /// Keep destination objects that do not correspond to selected ZIP entries.
129    #[default]
130    KeepExtra,
131    /// Delete destination objects under the destination prefix that are not in the ZIP.
132    DeleteExtra,
133}
134
135impl DestinationCleanup {
136    pub(crate) fn deletes_extra(self) -> bool {
137        matches!(self, Self::DeleteExtra)
138    }
139}
140
141/// How unzip operations compare ZIP entries with existing destination objects.
142#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
143pub enum ComparisonMode {
144    /// Use the embedded catalog when present, then fall back to entry hashing.
145    #[default]
146    CatalogThenHash,
147    /// Ignore any embedded catalog and hash ZIP entries for comparison.
148    HashEntries,
149}
150
151impl ComparisonMode {
152    pub(crate) fn ignores_embedded_catalog(self) -> bool {
153        matches!(self, Self::HashEntries)
154    }
155}
156
157/// How unzip-to-S3 operations handle destination conditional write conflicts.
158#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
159pub enum ConflictPolicy {
160    /// Record conditional conflicts in the report and continue processing.
161    #[default]
162    ReportAndContinue,
163    /// Return an error after the first conditional conflict is observed.
164    FailFast,
165}
166
167impl ConflictPolicy {
168    pub(crate) fn fails_fast(self) -> bool {
169        matches!(self, Self::FailFast)
170    }
171}
172
173/// Options for extracting a ZIP object from S3 into an S3 prefix.
174#[derive(Clone, Debug)]
175pub struct SyncOptions {
176    /// Source ZIP object.
177    pub(crate) source: S3Object,
178    /// Destination prefix that receives ZIP entries.
179    pub(crate) destination: S3Prefix,
180    /// How destination objects outside the ZIP are handled.
181    ///
182    /// Deleting extra objects requires a non-empty destination prefix so a
183    /// bucket root is never swept accidentally.
184    pub(crate) cleanup: DestinationCleanup,
185    /// ZIP entry selection. Empty selection extracts every supported entry.
186    ///
187    /// Selection cannot be combined with [`DestinationCleanup::DeleteExtra`].
188    pub(crate) selection: UnzipSelection,
189    /// Collect source scheduler diagnostics in the returned report.
190    pub(crate) collect_diagnostics: bool,
191    /// Comparison policy for embedded catalogs and entry hashing.
192    pub(crate) comparison: ComparisonMode,
193    /// Conditional write conflict handling policy.
194    pub(crate) conflict_policy: ConflictPolicy,
195    /// Collect one operation record per processed object in the returned report.
196    pub(crate) collect_operations: bool,
197    /// Maximum number of ZIP entries processed concurrently.
198    ///
199    /// Must be greater than zero.
200    pub(crate) concurrency: usize,
201    /// Maximum number of destination `PutObject` requests in flight.
202    ///
203    /// Must be greater than zero.
204    pub(crate) put_concurrency: usize,
205    /// Retry and backoff policy for destination `PutObject` attempts.
206    pub(crate) put_retry_policy: PutRetryPolicy,
207    /// Maximum size for planned source ZIP blocks.
208    ///
209    /// Must be greater than zero.
210    pub(crate) source_block_size: usize,
211    /// Maximum gap that can be read while coalescing adjacent source spans.
212    pub(crate) source_block_merge_gap: usize,
213    /// Maximum number of ranged source `GetObject` requests in flight.
214    ///
215    /// Must be greater than zero.
216    pub(crate) source_get_concurrency: usize,
217    /// Maximum bytes held by the planned source block window.
218    ///
219    /// When nonzero, this must be large enough to hold the effective source
220    /// block size after clamping that block size to the source ZIP size.
221    pub(crate) source_window_capacity: usize,
222    /// Available memory budget, in MiB, used to derive the source block window.
223    ///
224    /// When set, extraction computes [`Self::source_window_capacity`] after the
225    /// ZIP manifest is loaded, using the real source ZIP size and file count.
226    /// This is useful for memory-bounded runtimes that want to assign otherwise
227    /// idle memory to source block buffering while reserving space for catalog
228    /// metadata and worker overhead.
229    pub(crate) source_window_memory_budget_mb: Option<u64>,
230    /// Buffer size used when streaming entry bodies to S3.
231    ///
232    /// Must be greater than zero and no larger than 16 MiB.
233    pub(crate) body_chunk_size: usize,
234    /// Capacity of the in-memory pipe between decompression and S3 upload.
235    ///
236    /// Must be greater than zero and no larger than 64 MiB.
237    pub(crate) pipe_capacity: usize,
238}
239
240impl SyncOptions {
241    /// Creates extract options for a source ZIP object and destination prefix.
242    pub fn new(source: S3Object, destination: S3Prefix) -> Self {
243        Self {
244            source,
245            destination,
246            cleanup: DestinationCleanup::default(),
247            selection: UnzipSelection::default(),
248            collect_diagnostics: false,
249            comparison: ComparisonMode::default(),
250            conflict_policy: ConflictPolicy::default(),
251            collect_operations: true,
252            concurrency: DEFAULT_CONCURRENCY,
253            put_concurrency: DEFAULT_PUT_CONCURRENCY,
254            put_retry_policy: PutRetryPolicy::default(),
255            source_block_size: DEFAULT_SOURCE_BLOCK_SIZE,
256            source_block_merge_gap: DEFAULT_SOURCE_BLOCK_MERGE_GAP,
257            source_get_concurrency: DEFAULT_SOURCE_GET_CONCURRENCY,
258            source_window_capacity: DEFAULT_SOURCE_WINDOW_CAPACITY,
259            source_window_memory_budget_mb: None,
260            body_chunk_size: DEFAULT_BODY_CHUNK_SIZE,
261            pipe_capacity: DEFAULT_PIPE_CAPACITY,
262        }
263    }
264
265    /// Returns the source ZIP object.
266    pub fn source(&self) -> &S3Object {
267        &self.source
268    }
269
270    /// Returns the destination prefix.
271    pub fn destination(&self) -> &S3Prefix {
272        &self.destination
273    }
274
275    /// Returns the destination cleanup policy.
276    pub fn cleanup(&self) -> DestinationCleanup {
277        self.cleanup
278    }
279
280    /// Returns the ZIP entry selection patterns.
281    pub fn selection(&self) -> &UnzipSelection {
282        &self.selection
283    }
284
285    /// Returns whether source scheduler diagnostics are collected.
286    pub fn collects_diagnostics(&self) -> bool {
287        self.collect_diagnostics
288    }
289
290    /// Returns the ZIP entry comparison policy.
291    pub fn comparison_mode(&self) -> ComparisonMode {
292        self.comparison
293    }
294
295    /// Returns the conditional write conflict handling policy.
296    pub fn conflict_policy(&self) -> ConflictPolicy {
297        self.conflict_policy
298    }
299
300    /// Returns whether per-object operation records are collected.
301    pub fn collects_operations(&self) -> bool {
302        self.collect_operations
303    }
304
305    /// Returns the maximum number of ZIP entries processed concurrently.
306    pub fn concurrency(&self) -> usize {
307        self.concurrency
308    }
309
310    /// Returns the maximum number of destination `PutObject` requests in flight.
311    pub fn put_concurrency(&self) -> usize {
312        self.put_concurrency
313    }
314
315    /// Returns the retry and backoff policy for destination `PutObject` attempts.
316    pub fn put_retry_policy(&self) -> &PutRetryPolicy {
317        &self.put_retry_policy
318    }
319
320    /// Returns the maximum size for planned source ZIP blocks.
321    pub fn source_block_size(&self) -> usize {
322        self.source_block_size
323    }
324
325    /// Returns the maximum gap that can be read while coalescing adjacent source spans.
326    pub fn source_block_merge_gap(&self) -> usize {
327        self.source_block_merge_gap
328    }
329
330    /// Returns the maximum number of ranged source `GetObject` requests in flight.
331    pub fn source_get_concurrency(&self) -> usize {
332        self.source_get_concurrency
333    }
334
335    /// Returns the configured source block window capacity.
336    ///
337    /// When [`Self::with_source_window_memory_budget_mb`] is used, extraction
338    /// derives the effective post-manifest value at runtime and reports it in
339    /// [`crate::SyncDiagnostics::source_window_capacity`] when diagnostics are collected.
340    pub fn source_window_capacity(&self) -> usize {
341        self.source_window_capacity
342    }
343
344    /// Returns the available memory budget, in MiB, used to derive the source block window.
345    pub fn source_window_memory_budget_mb(&self) -> Option<u64> {
346        self.source_window_memory_budget_mb
347    }
348
349    /// Returns the buffer size used when streaming entry bodies to S3.
350    pub fn body_chunk_size(&self) -> usize {
351        self.body_chunk_size
352    }
353
354    /// Returns the in-memory pipe capacity between decompression and S3 upload.
355    pub fn pipe_capacity(&self) -> usize {
356        self.pipe_capacity
357    }
358
359    /// Sets ZIP entry selection patterns.
360    pub fn with_selection(mut self, selection: impl Into<UnzipSelection>) -> Self {
361        self.selection = selection.into();
362        self
363    }
364
365    /// Deletes destination objects under the prefix that are not present in the ZIP.
366    ///
367    /// This requires a non-empty destination prefix and cannot be combined with
368    /// a non-empty selection.
369    pub fn delete_extra_objects(mut self) -> Self {
370        self.cleanup = DestinationCleanup::DeleteExtra;
371        self
372    }
373
374    /// Sets the destination cleanup policy.
375    pub fn with_cleanup(mut self, cleanup: DestinationCleanup) -> Self {
376        self.cleanup = cleanup;
377        self
378    }
379
380    /// Collects source scheduler diagnostics in the returned report.
381    pub fn collect_diagnostics(mut self) -> Self {
382        self.collect_diagnostics = true;
383        self
384    }
385
386    /// Ignores any embedded catalog and hashes ZIP entries for comparison.
387    pub fn force_hash_comparison(mut self) -> Self {
388        self.comparison = ComparisonMode::HashEntries;
389        self
390    }
391
392    /// Sets the ZIP entry comparison policy.
393    pub fn with_comparison_mode(mut self, comparison: ComparisonMode) -> Self {
394        self.comparison = comparison;
395        self
396    }
397
398    /// Returns an error after the first conditional write conflict is observed.
399    pub fn fail_on_conflict(mut self) -> Self {
400        self.conflict_policy = ConflictPolicy::FailFast;
401        self
402    }
403
404    /// Sets the conditional write conflict handling policy.
405    pub fn with_conflict_policy(mut self, conflict_policy: ConflictPolicy) -> Self {
406        self.conflict_policy = conflict_policy;
407        self
408    }
409
410    /// Omits per-object operation records from the returned report.
411    pub fn without_operations(mut self) -> Self {
412        self.collect_operations = false;
413        self
414    }
415
416    /// Sets the maximum number of ZIP entries processed concurrently.
417    pub fn with_concurrency(mut self, concurrency: usize) -> Self {
418        self.concurrency = concurrency;
419        self
420    }
421
422    /// Sets the maximum number of destination `PutObject` requests in flight.
423    pub fn with_put_concurrency(mut self, put_concurrency: usize) -> Self {
424        self.put_concurrency = put_concurrency;
425        self
426    }
427
428    /// Sets the retry and backoff policy for destination `PutObject` attempts.
429    pub fn with_put_retry_policy(mut self, put_retry_policy: PutRetryPolicy) -> Self {
430        self.put_retry_policy = put_retry_policy;
431        self
432    }
433
434    /// Sets the maximum size for planned source ZIP blocks.
435    pub fn with_source_block_size(mut self, source_block_size: usize) -> Self {
436        self.source_block_size = source_block_size;
437        self
438    }
439
440    /// Sets the maximum gap that can be read while coalescing adjacent source spans.
441    pub fn with_source_block_merge_gap(mut self, source_block_merge_gap: usize) -> Self {
442        self.source_block_merge_gap = source_block_merge_gap;
443        self
444    }
445
446    /// Sets the maximum number of ranged source `GetObject` requests in flight.
447    pub fn with_source_get_concurrency(mut self, source_get_concurrency: usize) -> Self {
448        self.source_get_concurrency = source_get_concurrency;
449        self
450    }
451
452    /// Sets the maximum bytes held by the planned source block window.
453    pub fn with_source_window_capacity(mut self, source_window_capacity: usize) -> Self {
454        self.source_window_capacity = source_window_capacity;
455        self
456    }
457
458    /// Sets the available memory budget, in MiB, used to derive the source block window.
459    pub fn with_source_window_memory_budget_mb(
460        mut self,
461        source_window_memory_budget_mb: u64,
462    ) -> Self {
463        self.source_window_memory_budget_mb = Some(source_window_memory_budget_mb);
464        self
465    }
466
467    /// Sets the buffer size used when streaming entry bodies to S3.
468    pub fn with_body_chunk_size(mut self, body_chunk_size: usize) -> Self {
469        self.body_chunk_size = body_chunk_size;
470        self
471    }
472
473    /// Sets the in-memory pipe capacity between decompression and S3 upload.
474    pub fn with_pipe_capacity(mut self, pipe_capacity: usize) -> Self {
475        self.pipe_capacity = pipe_capacity;
476        self
477    }
478}
479
480/// Retry and backoff policy for destination `PutObject` attempts.
481#[derive(Clone, Debug, Eq, PartialEq)]
482pub struct PutRetryPolicy {
483    /// Maximum number of application-level `PutObject` attempts per object.
484    ///
485    /// Must be greater than zero.
486    pub(crate) max_attempts: usize,
487    /// Base delay for retryable non-throttling failures.
488    pub(crate) base_delay: Duration,
489    /// Maximum delay for retryable non-throttling failures.
490    ///
491    /// Must be greater than or equal to [`Self::base_delay`].
492    pub(crate) max_delay: Duration,
493    /// Base delay for throttling failures such as S3 `SlowDown`.
494    pub(crate) slowdown_base_delay: Duration,
495    /// Maximum delay for throttling failures such as S3 `SlowDown`.
496    ///
497    /// Must be greater than or equal to [`Self::slowdown_base_delay`].
498    pub(crate) slowdown_max_delay: Duration,
499    /// Jitter mode applied to computed retry delays.
500    pub(crate) jitter: RetryJitter,
501}
502
503impl Default for PutRetryPolicy {
504    fn default() -> Self {
505        Self {
506            max_attempts: PUT_OBJECT_MAX_ATTEMPTS,
507            base_delay: Duration::from_millis(PUT_OBJECT_RETRY_BASE_DELAY_MS),
508            max_delay: Duration::from_millis(PUT_OBJECT_RETRY_MAX_DELAY_MS),
509            slowdown_base_delay: Duration::from_millis(PUT_OBJECT_SLOWDOWN_RETRY_BASE_DELAY_MS),
510            slowdown_max_delay: Duration::from_millis(PUT_OBJECT_SLOWDOWN_RETRY_MAX_DELAY_MS),
511            jitter: RetryJitter::Full,
512        }
513    }
514}
515
516impl PutRetryPolicy {
517    /// Returns the maximum number of application-level `PutObject` attempts.
518    pub fn max_attempts(&self) -> usize {
519        self.max_attempts
520    }
521
522    /// Returns the base delay for retryable non-throttling failures.
523    pub fn base_delay(&self) -> Duration {
524        self.base_delay
525    }
526
527    /// Returns the maximum delay for retryable non-throttling failures.
528    pub fn max_delay(&self) -> Duration {
529        self.max_delay
530    }
531
532    /// Returns the base delay for throttling failures such as S3 `SlowDown`.
533    pub fn slowdown_base_delay(&self) -> Duration {
534        self.slowdown_base_delay
535    }
536
537    /// Returns the maximum delay for throttling failures such as S3 `SlowDown`.
538    pub fn slowdown_max_delay(&self) -> Duration {
539        self.slowdown_max_delay
540    }
541
542    /// Returns the jitter mode applied to computed retry delays.
543    pub fn jitter(&self) -> RetryJitter {
544        self.jitter
545    }
546
547    /// Sets the maximum number of application-level `PutObject` attempts.
548    pub fn with_max_attempts(mut self, max_attempts: usize) -> Self {
549        self.max_attempts = max_attempts;
550        self
551    }
552
553    /// Sets the base delay for retryable non-throttling failures.
554    pub fn with_base_delay(mut self, base_delay: Duration) -> Self {
555        self.base_delay = base_delay;
556        self
557    }
558
559    /// Sets the maximum delay for retryable non-throttling failures.
560    pub fn with_max_delay(mut self, max_delay: Duration) -> Self {
561        self.max_delay = max_delay;
562        self
563    }
564
565    /// Sets the base delay for throttling failures such as S3 `SlowDown`.
566    pub fn with_slowdown_base_delay(mut self, slowdown_base_delay: Duration) -> Self {
567        self.slowdown_base_delay = slowdown_base_delay;
568        self
569    }
570
571    /// Sets the maximum delay for throttling failures such as S3 `SlowDown`.
572    pub fn with_slowdown_max_delay(mut self, slowdown_max_delay: Duration) -> Self {
573        self.slowdown_max_delay = slowdown_max_delay;
574        self
575    }
576
577    /// Sets the jitter mode applied to computed retry delays.
578    pub fn with_jitter(mut self, jitter: RetryJitter) -> Self {
579        self.jitter = jitter;
580        self
581    }
582}
583
584/// Jitter mode used for application-level destination `PutObject` retries.
585#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize, Deserialize)]
586#[serde(rename_all = "snake_case")]
587pub enum RetryJitter {
588    /// Use full jitter, selecting a random delay from zero to the computed cap.
589    Full,
590    /// Use deterministic exponential delays without jitter.
591    None,
592}
593
594/// Computes adaptive source `GetObject` concurrency for a fixed memory envelope.
595///
596/// The policy scales source reads in the same direction as Lambda CPU: one
597/// source request per 256 MiB of configured memory, capped at eight.
598pub fn adaptive_source_get_concurrency(available_memory_mb: u64) -> usize {
599    let slots = available_memory_mb / ADAPTIVE_SOURCE_GET_MEMORY_STEP_MB;
600    usize::try_from(slots)
601        .unwrap_or(usize::MAX)
602        .clamp(1, ADAPTIVE_SOURCE_MAX_GET_CONCURRENCY)
603}
604
605/// Inputs for deriving an adaptive source block window capacity.
606///
607/// The capacity calculation reserves a fixed runtime baseline, a fixed amount
608/// per extraction worker, and a fixed amount per ZIP file entry before assigning
609/// otherwise idle memory to the source ZIP block window.
610#[derive(Clone, Copy, Debug, Eq, PartialEq)]
611pub struct AdaptiveSourceWindow {
612    /// Available runtime memory, in MiB.
613    pub(crate) available_memory_mb: u64,
614    /// Size of the source ZIP object, in bytes.
615    pub(crate) source_zip_bytes: u64,
616    /// Number of regular file entries in the ZIP.
617    pub(crate) zip_file_count: usize,
618    /// Maximum number of ZIP entries processed concurrently.
619    pub(crate) concurrency: usize,
620    /// Maximum size for planned source ZIP blocks.
621    pub(crate) source_block_size: usize,
622    /// Maximum number of ranged source `GetObject` requests in flight.
623    pub(crate) source_get_concurrency: usize,
624}
625
626impl AdaptiveSourceWindow {
627    /// Creates adaptive source window inputs with the crate defaults for scheduler knobs.
628    pub fn new(available_memory_mb: u64, source_zip_bytes: u64, zip_file_count: usize) -> Self {
629        Self {
630            available_memory_mb,
631            source_zip_bytes,
632            zip_file_count,
633            concurrency: DEFAULT_CONCURRENCY,
634            source_block_size: DEFAULT_SOURCE_BLOCK_SIZE,
635            source_get_concurrency: DEFAULT_SOURCE_GET_CONCURRENCY,
636        }
637    }
638
639    /// Sets the maximum number of ZIP entries processed concurrently.
640    pub fn with_concurrency(mut self, concurrency: usize) -> Self {
641        self.concurrency = concurrency;
642        self
643    }
644
645    /// Sets the maximum size for planned source ZIP blocks.
646    pub fn with_source_block_size(mut self, source_block_size: usize) -> Self {
647        self.source_block_size = source_block_size;
648        self
649    }
650
651    /// Sets the maximum number of ranged source `GetObject` requests in flight.
652    pub fn with_source_get_concurrency(mut self, source_get_concurrency: usize) -> Self {
653        self.source_get_concurrency = source_get_concurrency;
654        self
655    }
656
657    /// Computes the adaptive source block window capacity.
658    pub fn capacity(self) -> usize {
659        let Some(available_memory_bytes) = self.available_memory_mb.checked_mul(1024 * 1024) else {
660            return usize::try_from(self.source_zip_bytes).unwrap_or(usize::MAX);
661        };
662        let concurrency = u64::try_from(self.concurrency.max(1)).unwrap_or(u64::MAX);
663        let zip_file_count = u64::try_from(self.zip_file_count).unwrap_or(u64::MAX);
664        let worker_budget = concurrency.saturating_mul(ADAPTIVE_CACHE_WORKER_OVERHEAD);
665        let file_budget = zip_file_count.saturating_mul(ADAPTIVE_CACHE_FILE_OVERHEAD);
666        let in_flight_budget = u64::try_from(self.source_get_concurrency.max(1))
667            .unwrap_or(u64::MAX)
668            .saturating_mul(u64::try_from(self.source_block_size).unwrap_or(u64::MAX));
669        let reserved = ADAPTIVE_CACHE_BASE_OVERHEAD
670            .saturating_add(worker_budget)
671            .saturating_add(file_budget)
672            .saturating_add(in_flight_budget);
673        let capacity = available_memory_bytes
674            .saturating_sub(reserved)
675            .min(self.source_zip_bytes);
676        let capacity = if capacity > ADAPTIVE_CACHE_LARGE_THRESHOLD {
677            capacity.saturating_sub(ADAPTIVE_CACHE_LARGE_RSS_SLACK)
678        } else {
679            capacity
680        }
681        .min(ADAPTIVE_CACHE_MAX_WINDOW_CAPACITY);
682
683        let minimum_block_capacity = u64::try_from(self.source_block_size.max(1))
684            .unwrap_or(u64::MAX)
685            .min(self.source_zip_bytes);
686        let capacity = capacity.max(minimum_block_capacity);
687
688        usize::try_from(capacity).unwrap_or(usize::MAX)
689    }
690}
691
692/// Options for zipping a local directory and uploading it as an S3 object.
693#[derive(Clone)]
694pub struct UploadOptions {
695    /// Local directory whose regular files and empty directories should be included recursively.
696    pub(crate) source_dir: PathBuf,
697    /// Destination ZIP object.
698    pub(crate) destination: S3Object,
699    /// Include the embedded update catalog at [`crate::EMBEDDED_CATALOG_PATH`].
700    pub(crate) include_catalog: bool,
701    /// Compression method for regular file entries.
702    pub(crate) compression: ZipCompression,
703    /// Buffer size used when streaming the ZIP body to S3.
704    ///
705    /// Must be greater than zero and no larger than 16 MiB.
706    pub(crate) body_chunk_size: usize,
707    /// Capacity of the in-memory pipe between ZIP production and S3 upload.
708    ///
709    /// Must be greater than zero and no larger than 64 MiB.
710    pub(crate) pipe_capacity: usize,
711    /// Optional progress callback invoked during upload preparation and ZIP streaming.
712    pub(crate) progress: Option<UploadProgressHandler>,
713}
714
715/// Options for zipping a local directory to a local ZIP file.
716#[derive(Clone)]
717pub struct LocalZipOptions {
718    /// Local directory whose regular files and empty directories should be included recursively.
719    pub(crate) source_dir: PathBuf,
720    /// Destination ZIP file path.
721    pub(crate) destination_zip: PathBuf,
722    /// Include the embedded update catalog at [`crate::EMBEDDED_CATALOG_PATH`].
723    pub(crate) include_catalog: bool,
724    /// Compression method for regular file entries.
725    pub(crate) compression: ZipCompression,
726    /// Optional progress callback invoked during upload preparation and ZIP streaming.
727    pub(crate) progress: Option<UploadProgressHandler>,
728}
729
730/// Options for zipping an S3 prefix and uploading it as an S3 ZIP object.
731#[derive(Clone)]
732pub struct S3PrefixUploadOptions {
733    /// Source prefix whose objects should be included recursively.
734    pub(crate) source: S3Prefix,
735    /// Destination ZIP object.
736    pub(crate) destination: S3Object,
737    /// Include the embedded update catalog at [`crate::EMBEDDED_CATALOG_PATH`].
738    pub(crate) include_catalog: bool,
739    /// Compression method for regular file entries.
740    pub(crate) compression: ZipCompression,
741    /// Buffer size used when streaming the ZIP body to S3.
742    ///
743    /// Must be greater than zero and no larger than 16 MiB.
744    pub(crate) body_chunk_size: usize,
745    /// Capacity of the in-memory pipe between ZIP production and S3 upload.
746    ///
747    /// Must be greater than zero and no larger than 64 MiB.
748    pub(crate) pipe_capacity: usize,
749    /// Optional progress callback invoked during source listing and ZIP streaming.
750    pub(crate) progress: Option<UploadProgressHandler>,
751}
752
753/// Options for zipping an S3 prefix to a local ZIP file.
754#[derive(Clone)]
755pub struct S3PrefixLocalZipOptions {
756    /// Source prefix whose objects should be included recursively.
757    pub(crate) source: S3Prefix,
758    /// Destination ZIP file path.
759    pub(crate) destination_zip: PathBuf,
760    /// Include the embedded update catalog at [`crate::EMBEDDED_CATALOG_PATH`].
761    pub(crate) include_catalog: bool,
762    /// Compression method for regular file entries.
763    pub(crate) compression: ZipCompression,
764    /// Optional progress callback invoked during source listing and ZIP streaming.
765    pub(crate) progress: Option<UploadProgressHandler>,
766}
767
768/// Options for extracting a local ZIP file into an S3 prefix.
769#[derive(Clone)]
770pub struct LocalZipSyncOptions {
771    /// Source ZIP file path.
772    pub(crate) source_zip: PathBuf,
773    /// Destination prefix that receives ZIP entries.
774    pub(crate) destination: S3Prefix,
775    /// How destination objects outside the ZIP are handled.
776    ///
777    /// Deleting extra objects requires a non-empty destination prefix so a
778    /// bucket root is never treated as a sync deletion scope.
779    pub(crate) cleanup: DestinationCleanup,
780    /// ZIP entry selection. Empty selection extracts every supported entry.
781    ///
782    /// Selection cannot be combined with [`DestinationCleanup::DeleteExtra`].
783    pub(crate) selection: UnzipSelection,
784    /// Comparison policy for embedded catalogs and entry hashing.
785    pub(crate) comparison: ComparisonMode,
786    /// Conditional write conflict handling policy.
787    pub(crate) conflict_policy: ConflictPolicy,
788    /// Collect one operation record per processed object in the returned report.
789    pub(crate) collect_operations: bool,
790    /// Maximum number of ZIP entries processed concurrently.
791    pub(crate) concurrency: usize,
792    /// Buffer size used when streaming entry bodies to S3.
793    pub(crate) body_chunk_size: usize,
794    /// Capacity of the in-memory pipe between decompression and S3 upload.
795    pub(crate) pipe_capacity: usize,
796}
797
798/// Options for extracting an S3 ZIP object into a local directory.
799#[derive(Clone)]
800pub struct S3ZipLocalUnzipOptions {
801    /// Source ZIP object.
802    pub(crate) source: S3Object,
803    /// Destination local directory.
804    pub(crate) destination_dir: PathBuf,
805    /// ZIP entry selection. Empty selection extracts every supported entry.
806    pub(crate) selection: UnzipSelection,
807    /// Collect source scheduler diagnostics in the returned report.
808    pub(crate) collect_diagnostics: bool,
809    /// Comparison policy for embedded catalogs and entry hashing.
810    pub(crate) comparison: ComparisonMode,
811    /// Collect one operation record per processed entry in the returned report.
812    pub(crate) collect_operations: bool,
813    /// Maximum number of ZIP entries processed concurrently.
814    pub(crate) concurrency: usize,
815    /// Maximum size for planned source ZIP blocks.
816    pub(crate) source_block_size: usize,
817    /// Maximum gap that can be read while coalescing adjacent source spans.
818    pub(crate) source_block_merge_gap: usize,
819    /// Maximum number of ranged source `GetObject` requests in flight.
820    pub(crate) source_get_concurrency: usize,
821    /// Maximum bytes held by the planned source block window.
822    pub(crate) source_window_capacity: usize,
823    /// Available memory budget, in MiB, used to derive the source block window.
824    pub(crate) source_window_memory_budget_mb: Option<u64>,
825}
826
827/// Options for extracting a local ZIP file into a local directory.
828#[derive(Clone)]
829pub struct LocalUnzipOptions {
830    /// Source ZIP file path.
831    pub(crate) source_zip: PathBuf,
832    /// Destination local directory.
833    pub(crate) destination_dir: PathBuf,
834    /// ZIP entry selection. Empty selection extracts every supported entry.
835    pub(crate) selection: UnzipSelection,
836    /// Comparison policy for embedded catalogs and entry hashing.
837    pub(crate) comparison: ComparisonMode,
838    /// Collect one operation record per processed entry in the returned report.
839    pub(crate) collect_operations: bool,
840    /// Maximum number of ZIP entries processed concurrently.
841    pub(crate) concurrency: usize,
842}
843
844impl S3PrefixUploadOptions {
845    /// Creates upload options for an S3 source prefix and destination object.
846    pub fn new(source: S3Prefix, destination: S3Object) -> Self {
847        Self {
848            source,
849            destination,
850            include_catalog: true,
851            compression: ZipCompression::Deflate,
852            body_chunk_size: DEFAULT_BODY_CHUNK_SIZE,
853            pipe_capacity: DEFAULT_PIPE_CAPACITY,
854            progress: None,
855        }
856    }
857
858    /// Omits the embedded update catalog from the ZIP.
859    pub fn without_catalog(mut self) -> Self {
860        self.include_catalog = false;
861        self
862    }
863
864    /// Sets the compression method used for regular file entries.
865    pub fn with_compression(mut self, compression: ZipCompression) -> Self {
866        self.compression = compression;
867        self
868    }
869
870    /// Sets the buffer size used when streaming the ZIP body to S3.
871    pub fn with_body_chunk_size(mut self, body_chunk_size: usize) -> Self {
872        self.body_chunk_size = body_chunk_size;
873        self
874    }
875
876    /// Sets the in-memory pipe capacity between ZIP production and S3 upload.
877    pub fn with_pipe_capacity(mut self, pipe_capacity: usize) -> Self {
878        self.pipe_capacity = pipe_capacity;
879        self
880    }
881
882    /// Sets the progress callback invoked during source listing and ZIP streaming.
883    pub fn with_progress(self, callback: impl Fn(UploadProgress) + Send + Sync + 'static) -> Self {
884        self.with_progress_handler(UploadProgressHandler::new(callback))
885    }
886
887    /// Sets the progress handler invoked during source listing and ZIP streaming.
888    pub fn with_progress_handler(mut self, progress: UploadProgressHandler) -> Self {
889        self.progress = Some(progress);
890        self
891    }
892}
893
894impl LocalZipOptions {
895    /// Creates options for a local source directory and local destination ZIP.
896    pub fn new(source_dir: impl Into<PathBuf>, destination_zip: impl Into<PathBuf>) -> Self {
897        Self {
898            source_dir: source_dir.into(),
899            destination_zip: destination_zip.into(),
900            include_catalog: true,
901            compression: ZipCompression::Deflate,
902            progress: None,
903        }
904    }
905
906    /// Omits the embedded update catalog from the ZIP.
907    pub fn without_catalog(mut self) -> Self {
908        self.include_catalog = false;
909        self
910    }
911
912    /// Sets the compression method used for regular file entries.
913    pub fn with_compression(mut self, compression: ZipCompression) -> Self {
914        self.compression = compression;
915        self
916    }
917
918    /// Sets the progress callback invoked during upload preparation and ZIP streaming.
919    pub fn with_progress(self, callback: impl Fn(UploadProgress) + Send + Sync + 'static) -> Self {
920        self.with_progress_handler(UploadProgressHandler::new(callback))
921    }
922
923    /// Sets the progress handler invoked during upload preparation and ZIP streaming.
924    pub fn with_progress_handler(mut self, progress: UploadProgressHandler) -> Self {
925        self.progress = Some(progress);
926        self
927    }
928}
929
930impl std::fmt::Debug for LocalZipOptions {
931    fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
932        formatter
933            .debug_struct("LocalZipOptions")
934            .field("source_dir", &self.source_dir)
935            .field("destination_zip", &self.destination_zip)
936            .field("include_catalog", &self.include_catalog)
937            .field("compression", &self.compression)
938            .field(
939                "progress",
940                &self.progress.as_ref().map(|_| "UploadProgressHandler"),
941            )
942            .finish()
943    }
944}
945
946impl std::fmt::Debug for S3PrefixUploadOptions {
947    fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
948        formatter
949            .debug_struct("S3PrefixUploadOptions")
950            .field("source", &self.source)
951            .field("destination", &self.destination)
952            .field("include_catalog", &self.include_catalog)
953            .field("compression", &self.compression)
954            .field("body_chunk_size", &self.body_chunk_size)
955            .field("pipe_capacity", &self.pipe_capacity)
956            .field(
957                "progress",
958                &self.progress.as_ref().map(|_| "UploadProgressHandler"),
959            )
960            .finish()
961    }
962}
963
964impl S3PrefixLocalZipOptions {
965    /// Creates options for an S3 source prefix and local destination ZIP.
966    pub fn new(source: S3Prefix, destination_zip: impl Into<PathBuf>) -> Self {
967        Self {
968            source,
969            destination_zip: destination_zip.into(),
970            include_catalog: true,
971            compression: ZipCompression::Deflate,
972            progress: None,
973        }
974    }
975
976    /// Omits the embedded update catalog from the ZIP.
977    pub fn without_catalog(mut self) -> Self {
978        self.include_catalog = false;
979        self
980    }
981
982    /// Sets the compression method used for regular file entries.
983    pub fn with_compression(mut self, compression: ZipCompression) -> Self {
984        self.compression = compression;
985        self
986    }
987
988    /// Sets the progress callback invoked during source listing and ZIP streaming.
989    pub fn with_progress(self, callback: impl Fn(UploadProgress) + Send + Sync + 'static) -> Self {
990        self.with_progress_handler(UploadProgressHandler::new(callback))
991    }
992
993    /// Sets the progress handler invoked during source listing and ZIP streaming.
994    pub fn with_progress_handler(mut self, progress: UploadProgressHandler) -> Self {
995        self.progress = Some(progress);
996        self
997    }
998}
999
1000impl std::fmt::Debug for S3PrefixLocalZipOptions {
1001    fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1002        formatter
1003            .debug_struct("S3PrefixLocalZipOptions")
1004            .field("source", &self.source)
1005            .field("destination_zip", &self.destination_zip)
1006            .field("include_catalog", &self.include_catalog)
1007            .field("compression", &self.compression)
1008            .field(
1009                "progress",
1010                &self.progress.as_ref().map(|_| "UploadProgressHandler"),
1011            )
1012            .finish()
1013    }
1014}
1015
1016impl UploadOptions {
1017    /// Creates upload options for a local source directory and destination object.
1018    pub fn new(source_dir: impl Into<PathBuf>, destination: S3Object) -> Self {
1019        Self {
1020            source_dir: source_dir.into(),
1021            destination,
1022            include_catalog: true,
1023            compression: ZipCompression::Deflate,
1024            body_chunk_size: DEFAULT_BODY_CHUNK_SIZE,
1025            pipe_capacity: DEFAULT_PIPE_CAPACITY,
1026            progress: None,
1027        }
1028    }
1029
1030    /// Omits the embedded update catalog from the ZIP.
1031    pub fn without_catalog(mut self) -> Self {
1032        self.include_catalog = false;
1033        self
1034    }
1035
1036    /// Sets the compression method used for regular file entries.
1037    pub fn with_compression(mut self, compression: ZipCompression) -> Self {
1038        self.compression = compression;
1039        self
1040    }
1041
1042    /// Sets the buffer size used when streaming the ZIP body to S3.
1043    pub fn with_body_chunk_size(mut self, body_chunk_size: usize) -> Self {
1044        self.body_chunk_size = body_chunk_size;
1045        self
1046    }
1047
1048    /// Sets the in-memory pipe capacity between ZIP production and S3 upload.
1049    pub fn with_pipe_capacity(mut self, pipe_capacity: usize) -> Self {
1050        self.pipe_capacity = pipe_capacity;
1051        self
1052    }
1053
1054    /// Sets the progress callback invoked during upload preparation and ZIP streaming.
1055    pub fn with_progress(self, callback: impl Fn(UploadProgress) + Send + Sync + 'static) -> Self {
1056        self.with_progress_handler(UploadProgressHandler::new(callback))
1057    }
1058
1059    /// Sets the progress handler invoked during upload preparation and ZIP streaming.
1060    pub fn with_progress_handler(mut self, progress: UploadProgressHandler) -> Self {
1061        self.progress = Some(progress);
1062        self
1063    }
1064}
1065
1066impl LocalZipSyncOptions {
1067    /// Creates extract options for a local source ZIP file and destination prefix.
1068    pub fn new(source_zip: impl Into<PathBuf>, destination: S3Prefix) -> Self {
1069        Self {
1070            source_zip: source_zip.into(),
1071            destination,
1072            cleanup: DestinationCleanup::default(),
1073            selection: UnzipSelection::default(),
1074            comparison: ComparisonMode::default(),
1075            conflict_policy: ConflictPolicy::default(),
1076            collect_operations: true,
1077            concurrency: DEFAULT_CONCURRENCY,
1078            body_chunk_size: DEFAULT_BODY_CHUNK_SIZE,
1079            pipe_capacity: DEFAULT_PIPE_CAPACITY,
1080        }
1081    }
1082
1083    /// Sets ZIP entry selection patterns.
1084    pub fn with_selection(mut self, selection: impl Into<UnzipSelection>) -> Self {
1085        self.selection = selection.into();
1086        self
1087    }
1088
1089    /// Deletes destination objects under the prefix that are not present in the ZIP.
1090    ///
1091    /// This requires a non-empty destination prefix and cannot be combined with
1092    /// a non-empty selection.
1093    pub fn delete_extra_objects(mut self) -> Self {
1094        self.cleanup = DestinationCleanup::DeleteExtra;
1095        self
1096    }
1097
1098    /// Sets the destination cleanup policy.
1099    pub fn with_cleanup(mut self, cleanup: DestinationCleanup) -> Self {
1100        self.cleanup = cleanup;
1101        self
1102    }
1103
1104    /// Ignores any embedded catalog and hashes ZIP entries for comparison.
1105    pub fn force_hash_comparison(mut self) -> Self {
1106        self.comparison = ComparisonMode::HashEntries;
1107        self
1108    }
1109
1110    /// Sets the ZIP entry comparison policy.
1111    pub fn with_comparison_mode(mut self, comparison: ComparisonMode) -> Self {
1112        self.comparison = comparison;
1113        self
1114    }
1115
1116    /// Returns the conditional write conflict handling policy.
1117    pub fn conflict_policy(&self) -> ConflictPolicy {
1118        self.conflict_policy
1119    }
1120
1121    /// Returns an error after the first conditional write conflict is observed.
1122    pub fn fail_on_conflict(mut self) -> Self {
1123        self.conflict_policy = ConflictPolicy::FailFast;
1124        self
1125    }
1126
1127    /// Sets the conditional write conflict handling policy.
1128    pub fn with_conflict_policy(mut self, conflict_policy: ConflictPolicy) -> Self {
1129        self.conflict_policy = conflict_policy;
1130        self
1131    }
1132
1133    /// Omits per-object operation records from the returned report.
1134    pub fn without_operations(mut self) -> Self {
1135        self.collect_operations = false;
1136        self
1137    }
1138
1139    /// Sets the maximum number of ZIP entries processed concurrently.
1140    pub fn with_concurrency(mut self, concurrency: usize) -> Self {
1141        self.concurrency = concurrency;
1142        self
1143    }
1144
1145    /// Sets the buffer size used when streaming entry bodies to S3.
1146    pub fn with_body_chunk_size(mut self, body_chunk_size: usize) -> Self {
1147        self.body_chunk_size = body_chunk_size;
1148        self
1149    }
1150
1151    /// Sets the in-memory pipe capacity between decompression and S3 upload.
1152    pub fn with_pipe_capacity(mut self, pipe_capacity: usize) -> Self {
1153        self.pipe_capacity = pipe_capacity;
1154        self
1155    }
1156}
1157
1158impl std::fmt::Debug for LocalZipSyncOptions {
1159    fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1160        formatter
1161            .debug_struct("LocalZipSyncOptions")
1162            .field("source_zip", &self.source_zip)
1163            .field("destination", &self.destination)
1164            .field("cleanup", &self.cleanup)
1165            .field("selection", &self.selection)
1166            .field("comparison", &self.comparison)
1167            .field("conflict_policy", &self.conflict_policy)
1168            .field("collect_operations", &self.collect_operations)
1169            .field("concurrency", &self.concurrency)
1170            .field("body_chunk_size", &self.body_chunk_size)
1171            .field("pipe_capacity", &self.pipe_capacity)
1172            .finish()
1173    }
1174}
1175
1176impl S3ZipLocalUnzipOptions {
1177    /// Creates extract options for a source ZIP object and local destination directory.
1178    pub fn new(source: S3Object, destination_dir: impl Into<PathBuf>) -> Self {
1179        Self {
1180            source,
1181            destination_dir: destination_dir.into(),
1182            selection: UnzipSelection::default(),
1183            collect_diagnostics: false,
1184            comparison: ComparisonMode::default(),
1185            collect_operations: true,
1186            concurrency: DEFAULT_CONCURRENCY,
1187            source_block_size: DEFAULT_SOURCE_BLOCK_SIZE,
1188            source_block_merge_gap: DEFAULT_SOURCE_BLOCK_MERGE_GAP,
1189            source_get_concurrency: DEFAULT_SOURCE_GET_CONCURRENCY,
1190            source_window_capacity: DEFAULT_SOURCE_WINDOW_CAPACITY,
1191            source_window_memory_budget_mb: None,
1192        }
1193    }
1194
1195    /// Sets ZIP entry selection patterns.
1196    pub fn with_selection(mut self, selection: impl Into<UnzipSelection>) -> Self {
1197        self.selection = selection.into();
1198        self
1199    }
1200
1201    /// Collects source scheduler diagnostics in the returned report.
1202    pub fn collect_diagnostics(mut self) -> Self {
1203        self.collect_diagnostics = true;
1204        self
1205    }
1206
1207    /// Ignores any embedded catalog and hashes ZIP entries for comparison.
1208    pub fn force_hash_comparison(mut self) -> Self {
1209        self.comparison = ComparisonMode::HashEntries;
1210        self
1211    }
1212
1213    /// Sets the ZIP entry comparison policy.
1214    pub fn with_comparison_mode(mut self, comparison: ComparisonMode) -> Self {
1215        self.comparison = comparison;
1216        self
1217    }
1218
1219    /// Omits per-entry operation records from the returned report.
1220    pub fn without_operations(mut self) -> Self {
1221        self.collect_operations = false;
1222        self
1223    }
1224
1225    /// Sets the maximum number of ZIP entries processed concurrently.
1226    pub fn with_concurrency(mut self, concurrency: usize) -> Self {
1227        self.concurrency = concurrency;
1228        self
1229    }
1230
1231    /// Sets the maximum size for planned source ZIP blocks.
1232    pub fn with_source_block_size(mut self, source_block_size: usize) -> Self {
1233        self.source_block_size = source_block_size;
1234        self
1235    }
1236
1237    /// Sets the maximum gap that can be read while coalescing adjacent source spans.
1238    pub fn with_source_block_merge_gap(mut self, source_block_merge_gap: usize) -> Self {
1239        self.source_block_merge_gap = source_block_merge_gap;
1240        self
1241    }
1242
1243    /// Sets the maximum number of ranged source `GetObject` requests in flight.
1244    pub fn with_source_get_concurrency(mut self, source_get_concurrency: usize) -> Self {
1245        self.source_get_concurrency = source_get_concurrency;
1246        self
1247    }
1248
1249    /// Sets the maximum bytes held by the planned source block window.
1250    pub fn with_source_window_capacity(mut self, source_window_capacity: usize) -> Self {
1251        self.source_window_capacity = source_window_capacity;
1252        self
1253    }
1254
1255    /// Sets the available memory budget, in MiB, used to derive the source block window.
1256    pub fn with_source_window_memory_budget_mb(
1257        mut self,
1258        source_window_memory_budget_mb: u64,
1259    ) -> Self {
1260        self.source_window_memory_budget_mb = Some(source_window_memory_budget_mb);
1261        self
1262    }
1263}
1264
1265impl std::fmt::Debug for S3ZipLocalUnzipOptions {
1266    fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1267        formatter
1268            .debug_struct("S3ZipLocalUnzipOptions")
1269            .field("source", &self.source)
1270            .field("destination_dir", &self.destination_dir)
1271            .field("selection", &self.selection)
1272            .field("collect_diagnostics", &self.collect_diagnostics)
1273            .field("comparison", &self.comparison)
1274            .field("collect_operations", &self.collect_operations)
1275            .field("concurrency", &self.concurrency)
1276            .field("source_block_size", &self.source_block_size)
1277            .field("source_block_merge_gap", &self.source_block_merge_gap)
1278            .field("source_get_concurrency", &self.source_get_concurrency)
1279            .field("source_window_capacity", &self.source_window_capacity)
1280            .field(
1281                "source_window_memory_budget_mb",
1282                &self.source_window_memory_budget_mb,
1283            )
1284            .finish()
1285    }
1286}
1287
1288impl LocalUnzipOptions {
1289    /// Creates extract options for a source ZIP file and local destination directory.
1290    pub fn new(source_zip: impl Into<PathBuf>, destination_dir: impl Into<PathBuf>) -> Self {
1291        Self {
1292            source_zip: source_zip.into(),
1293            destination_dir: destination_dir.into(),
1294            selection: UnzipSelection::default(),
1295            comparison: ComparisonMode::default(),
1296            collect_operations: true,
1297            concurrency: DEFAULT_CONCURRENCY,
1298        }
1299    }
1300
1301    /// Sets ZIP entry selection patterns.
1302    pub fn with_selection(mut self, selection: impl Into<UnzipSelection>) -> Self {
1303        self.selection = selection.into();
1304        self
1305    }
1306
1307    /// Ignores any embedded catalog and hashes ZIP entries for comparison.
1308    pub fn force_hash_comparison(mut self) -> Self {
1309        self.comparison = ComparisonMode::HashEntries;
1310        self
1311    }
1312
1313    /// Sets the ZIP entry comparison policy.
1314    pub fn with_comparison_mode(mut self, comparison: ComparisonMode) -> Self {
1315        self.comparison = comparison;
1316        self
1317    }
1318
1319    /// Omits per-entry operation records from the returned report.
1320    pub fn without_operations(mut self) -> Self {
1321        self.collect_operations = false;
1322        self
1323    }
1324
1325    /// Sets the maximum number of ZIP entries processed concurrently.
1326    pub fn with_concurrency(mut self, concurrency: usize) -> Self {
1327        self.concurrency = concurrency;
1328        self
1329    }
1330}
1331
1332impl std::fmt::Debug for LocalUnzipOptions {
1333    fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1334        formatter
1335            .debug_struct("LocalUnzipOptions")
1336            .field("source_zip", &self.source_zip)
1337            .field("destination_dir", &self.destination_dir)
1338            .field("selection", &self.selection)
1339            .field("comparison", &self.comparison)
1340            .field("collect_operations", &self.collect_operations)
1341            .field("concurrency", &self.concurrency)
1342            .finish()
1343    }
1344}
1345
1346impl std::fmt::Debug for UploadOptions {
1347    fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1348        formatter
1349            .debug_struct("UploadOptions")
1350            .field("source_dir", &self.source_dir)
1351            .field("destination", &self.destination)
1352            .field("include_catalog", &self.include_catalog)
1353            .field("compression", &self.compression)
1354            .field("body_chunk_size", &self.body_chunk_size)
1355            .field("pipe_capacity", &self.pipe_capacity)
1356            .field(
1357                "progress",
1358                &self.progress.as_ref().map(|_| "UploadProgressHandler"),
1359            )
1360            .finish()
1361    }
1362}
1363
1364/// Upload progress callback wrapper.
1365///
1366/// The callback is invoked synchronously from the upload task whenever progress
1367/// state changes. Keep the callback lightweight; hand work off to another task
1368/// if it needs to perform I/O.
1369#[derive(Clone)]
1370pub struct UploadProgressHandler {
1371    callback: Arc<dyn Fn(UploadProgress) + Send + Sync + 'static>,
1372}
1373
1374impl UploadProgressHandler {
1375    /// Creates an upload progress handler from a callback.
1376    pub fn new(callback: impl Fn(UploadProgress) + Send + Sync + 'static) -> Self {
1377        Self {
1378            callback: Arc::new(callback),
1379        }
1380    }
1381
1382    pub(crate) fn emit(&self, progress: UploadProgress) {
1383        (self.callback)(progress);
1384    }
1385}
1386
1387impl std::fmt::Debug for UploadProgressHandler {
1388    fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1389        formatter.write_str("UploadProgressHandler")
1390    }
1391}
1392
1393/// Progress event emitted while preparing and streaming an upload ZIP.
1394#[derive(Clone, Debug, Eq, PartialEq)]
1395pub enum UploadProgress {
1396    /// The source has been scanned and the total entry count is known.
1397    Planned {
1398        /// Total number of files and preserved directory entries included in the ZIP.
1399        total_files: usize,
1400        /// Total uncompressed bytes across all files.
1401        total_bytes: u64,
1402    },
1403    /// A file or preserved directory entry has started streaming into the ZIP writer.
1404    FileStarted {
1405        /// One-based index of the entry currently being streamed.
1406        current_file: usize,
1407        /// Total number of files and preserved directory entries included in the ZIP.
1408        total_files: usize,
1409        /// Number of entries that have finished streaming into the ZIP.
1410        processed_files: usize,
1411        /// Uncompressed bytes that have finished streaming into the ZIP.
1412        processed_bytes: u64,
1413        /// Total uncompressed bytes across all files.
1414        total_bytes: u64,
1415        /// ZIP path of the entry that just started.
1416        path: String,
1417    },
1418    /// A file is still streaming and byte progress has advanced.
1419    FileProgress {
1420        /// One-based index of the entry currently being streamed.
1421        current_file: usize,
1422        /// Total number of files and preserved directory entries included in the ZIP.
1423        total_files: usize,
1424        /// Number of entries that have finished streaming into the ZIP.
1425        processed_files: usize,
1426        /// Uncompressed bytes that have streamed into the ZIP producer so far.
1427        processed_bytes: u64,
1428        /// Total uncompressed bytes across all files.
1429        total_bytes: u64,
1430        /// ZIP path of the file currently being streamed.
1431        path: String,
1432    },
1433    /// One file or preserved directory entry has finished streaming into the ZIP writer.
1434    FileFinished {
1435        /// Number of entries that have finished streaming into the ZIP.
1436        processed_files: usize,
1437        /// Total number of files and preserved directory entries included in the ZIP.
1438        total_files: usize,
1439        /// Uncompressed bytes that have finished streaming into the ZIP.
1440        processed_bytes: u64,
1441        /// Total uncompressed bytes across all files.
1442        total_bytes: u64,
1443        /// ZIP path of the entry that just finished.
1444        path: String,
1445    },
1446    /// ZIP production has finished writing into the upload pipe.
1447    ///
1448    /// S3 multipart upload completion may still be in progress when this event
1449    /// is emitted.
1450    Finished {
1451        /// Total number of files and preserved directory entries included in the ZIP.
1452        total_files: usize,
1453        /// Total uncompressed bytes across all files.
1454        total_bytes: u64,
1455    },
1456}