s3-unspool 0.1.0-beta.6

Fast streaming extraction of large ZIP archives from S3 into S3 prefixes with conditional writes.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
use std::collections::BTreeMap;

use serde::{Deserialize, Serialize};

use crate::options::RetryJitter;
use crate::s3_uri::{S3Object, S3Prefix};

/// Summary returned by [`crate::upload_directory_zip_to_s3`].
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct UploadReport {
    /// Local directory that was uploaded.
    pub source_dir: String,
    /// Destination ZIP object.
    pub destination: S3Object,
    /// Number of regular files included in the ZIP.
    pub files: usize,
    /// Number of preserved directory entries included in the ZIP.
    #[serde(default)]
    pub directories: usize,
    /// Whether the embedded update catalog was included.
    #[serde(default = "default_include_catalog")]
    pub include_catalog: bool,
    /// Total uncompressed payload bytes.
    pub uncompressed_bytes: u64,
    /// Total uploaded ZIP object bytes.
    pub zip_bytes: u64,
}

/// Summary returned by [`crate::zip_s3_prefix_to_s3`].
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct S3PrefixUploadReport {
    /// Source prefix that was uploaded.
    pub source: S3Prefix,
    /// Destination ZIP object.
    pub destination: S3Object,
    /// Number of regular source objects included as ZIP file entries.
    pub files: usize,
    /// Number of zero-byte trailing-slash source objects included as ZIP directories.
    pub directories: usize,
    /// Whether the embedded update catalog was included.
    #[serde(default = "default_include_catalog")]
    pub include_catalog: bool,
    /// Total number of ZIP entries written, excluding the embedded catalog.
    pub entries: usize,
    /// Total uncompressed payload bytes across regular file entries.
    pub uncompressed_bytes: u64,
    /// Total uploaded ZIP object bytes.
    pub zip_bytes: u64,
}

/// Summary returned by local ZIP creation helpers.
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct LocalZipReport {
    /// Source tree that was zipped.
    pub source: String,
    /// Destination ZIP file path.
    pub destination_zip: String,
    /// Number of regular file entries included in the ZIP.
    pub files: usize,
    /// Number of preserved directory entries included in the ZIP.
    pub directories: usize,
    /// Whether the embedded update catalog was included.
    #[serde(default = "default_include_catalog")]
    pub include_catalog: bool,
    /// Total number of ZIP entries written, excluding the embedded catalog.
    pub entries: usize,
    /// Total uncompressed payload bytes across regular file entries.
    pub uncompressed_bytes: u64,
    /// Size of the generated ZIP file.
    pub zip_bytes: u64,
}

/// Summary returned by ZIP dry-run helpers.
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct ZipDryRunReport {
    /// Source tree that would be zipped.
    pub source: String,
    /// Destination ZIP file path or S3 object URI.
    pub destination: String,
    /// Number of regular file entries that would be included.
    pub files: usize,
    /// Number of directory entries that would be included.
    pub directories: usize,
    /// Total number of ZIP entries that would be written, excluding the embedded catalog.
    pub entries: usize,
    /// Total uncompressed payload bytes across regular file entries.
    pub uncompressed_bytes: u64,
    /// Whether the embedded update catalog would be included.
    pub include_catalog: bool,
}

fn default_include_catalog() -> bool {
    true
}

/// Aggregate counters for an extract run.
#[derive(Clone, Debug, Default, Serialize, Deserialize)]
pub struct SyncSummary {
    /// Number of source ZIP entries found, excluding the embedded catalog.
    pub zip_files: usize,
    /// Number of destination objects listed before extraction.
    pub destination_objects: usize,
    /// Number of missing destination objects uploaded.
    pub uploaded_new: usize,
    /// Number of changed destination objects uploaded.
    pub uploaded_changed: usize,
    /// Number of unchanged destination objects skipped.
    pub skipped_unchanged: usize,
    /// Number of conditional write conflicts.
    pub conditional_conflicts: usize,
    /// Number of extra destination objects deleted.
    pub deleted_extra: usize,
    /// Number of per-object errors.
    pub errors: usize,
}

/// Aggregate counters for an extract dry run.
#[derive(Clone, Debug, Default, Serialize, Deserialize)]
pub struct UnzipDryRunSummary {
    /// Number of source ZIP entries found, excluding the embedded catalog.
    pub zip_files: usize,
    /// Number of destination objects listed before extraction, for S3 destinations.
    pub destination_objects: usize,
    /// Number of missing destination entries that would be uploaded or created.
    pub would_upload_new: usize,
    /// Number of existing destination entries that would be replaced.
    pub would_upload_changed: usize,
    /// Number of destination entries that already match the source entry.
    pub skipped_unchanged: usize,
    /// Number of extra destination objects that would be deleted.
    pub would_delete_extra: usize,
    /// Number of per-entry errors found while planning.
    pub errors: usize,
}

/// Full report returned by [`crate::sync_zip_to_s3`].
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct SyncReport {
    /// Source ZIP object.
    pub source: S3Object,
    /// Destination prefix.
    pub destination: S3Prefix,
    /// Aggregate extract counters.
    pub summary: SyncSummary,
    /// Optional source scheduler and destination `PutObject` diagnostics.
    #[serde(skip_serializing_if = "Option::is_none")]
    pub diagnostics: Option<SyncDiagnostics>,
    /// Per-object operation records.
    pub operations: Vec<ObjectReport>,
}

impl SyncReport {
    /// Returns `true` when one or more object operations failed.
    pub fn has_errors(&self) -> bool {
        self.summary.errors > 0
    }
}

/// Full report returned when extracting a local ZIP into an S3 prefix.
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct LocalZipToS3Report {
    /// Source ZIP file path.
    pub source_zip: String,
    /// Destination prefix.
    pub destination: S3Prefix,
    /// Aggregate extract counters.
    pub summary: SyncSummary,
    /// Per-entry operation records.
    pub operations: Vec<ObjectReport>,
}

impl LocalZipToS3Report {
    /// Returns `true` when one or more entry operations failed.
    pub fn has_errors(&self) -> bool {
        self.summary.errors > 0
    }
}

/// Full report returned when extracting a ZIP into a local directory.
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct LocalUnzipReport {
    /// Source ZIP object URI or local file path.
    pub source_zip: String,
    /// Destination local directory.
    pub destination_dir: String,
    /// Aggregate extract counters.
    pub summary: SyncSummary,
    /// Optional source scheduler diagnostics for S3 ZIP sources.
    #[serde(skip_serializing_if = "Option::is_none")]
    pub diagnostics: Option<LocalUnzipDiagnostics>,
    /// Per-entry operation records.
    pub operations: Vec<ObjectReport>,
}

impl LocalUnzipReport {
    /// Returns `true` when one or more entry operations failed.
    pub fn has_errors(&self) -> bool {
        self.summary.errors > 0
    }
}

/// Full report returned by extract dry-run helpers.
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct UnzipDryRunReport {
    /// Source ZIP object URI or local file path.
    pub source_zip: String,
    /// Destination prefix URI or local directory.
    pub destination: String,
    /// Aggregate dry-run counters.
    pub summary: UnzipDryRunSummary,
    /// Optional source scheduler diagnostics for S3 ZIP sources.
    #[serde(skip_serializing_if = "Option::is_none")]
    pub diagnostics: Option<DryRunDiagnostics>,
    /// Per-entry dry-run operation records.
    pub operations: Vec<DryRunObjectReport>,
}

impl UnzipDryRunReport {
    /// Returns `true` when one or more planned operations failed.
    pub fn has_errors(&self) -> bool {
        self.summary.errors > 0
    }
}

/// Effective extract settings and aggregate diagnostics.
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct SyncDiagnostics {
    /// Effective entry concurrency.
    pub concurrency: usize,
    /// Effective destination `PutObject` concurrency.
    pub put_concurrency: usize,
    /// Effective destination `PutObject` retry policy.
    pub put_retry: PutRetryDiagnostics,
    /// Effective source block size in bytes.
    pub source_block_size: usize,
    /// Effective source block merge gap in bytes.
    pub source_block_merge_gap: usize,
    /// Effective source ranged `GetObject` concurrency.
    pub source_get_concurrency: usize,
    /// Effective source block window capacity in bytes.
    pub source_window_capacity: usize,
    /// Aggregate source scheduler counters.
    pub source: SourceDiagnostics,
    /// Aggregate destination `PutObject` counters.
    pub put: PutDiagnostics,
}

/// Effective local unzip settings and aggregate source diagnostics.
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct LocalUnzipDiagnostics {
    /// Effective entry concurrency.
    pub concurrency: usize,
    /// Effective source block size in bytes.
    pub source_block_size: usize,
    /// Effective source block merge gap in bytes.
    pub source_block_merge_gap: usize,
    /// Effective source ranged `GetObject` concurrency.
    pub source_get_concurrency: usize,
    /// Effective source block window capacity in bytes.
    pub source_window_capacity: usize,
    /// Aggregate source scheduler counters.
    pub source: SourceDiagnostics,
}

/// Effective dry-run source settings and aggregate diagnostics.
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct DryRunDiagnostics {
    /// Effective entry concurrency.
    pub concurrency: usize,
    /// Effective source block size in bytes.
    pub source_block_size: usize,
    /// Effective source block merge gap in bytes.
    pub source_block_merge_gap: usize,
    /// Effective source ranged `GetObject` concurrency.
    pub source_get_concurrency: usize,
    /// Effective source block window capacity in bytes.
    pub source_window_capacity: usize,
    /// Aggregate source scheduler counters.
    pub source: SourceDiagnostics,
}

/// Source scheduler and ranged `GetObject` counters.
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct SourceDiagnostics {
    /// Source ZIP object size in bytes.
    pub source_zip_bytes: u64,
    /// Number of ZIP entries included in source plans.
    pub planned_entries: u64,
    /// Number of source blocks included in source plans.
    pub planned_blocks: u64,
    /// Number of source blocks fetched successfully.
    pub fetched_blocks: u64,
    /// Total ranged `GetObject` attempts, including retries.
    pub source_get_attempts: u64,
    /// Total ranged `GetObject` retries.
    pub source_get_retries: u64,
    /// Ranged `GetObject` request errors.
    pub source_get_request_errors: u64,
    /// Ranged `GetObject` response body errors.
    pub source_get_body_errors: u64,
    /// Ranged `GetObject` responses that ended before the requested bytes were read.
    pub source_get_short_body_errors: u64,
    /// Source block fetches that failed after all retry attempts.
    pub source_get_errors: u64,
    /// Sum of planned source block sizes.
    pub planned_source_bytes: u64,
    /// Sum of fetched source block sizes.
    pub fetched_source_bytes: u64,
    /// Unique source bytes covered by fetched ranges.
    pub unique_source_bytes: u64,
    /// Ratio of fetched source bytes to unique fetched source bytes.
    pub source_amplification: f64,
    /// Number of block read requests served from ready blocks.
    pub block_hits: u64,
    /// Number of block read requests that waited for scheduled data.
    pub block_waits: u64,
    /// Number of ready source blocks released from the resident window after all
    /// planned claims consumed them.
    pub block_releases: u64,
    /// Number of reader cache misses. This should remain zero for the planned
    /// source scheduler.
    pub block_misses: u64,
    /// Number of explicit replay fetches for blocks that had already been
    /// released from the resident window.
    pub block_refetches: u64,
    /// Highest number of concurrent ranged `GetObject` requests.
    pub active_gets_high_water: u64,
}

/// Effective destination `PutObject` retry settings.
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct PutRetryDiagnostics {
    /// Maximum application-level `PutObject` attempts per object.
    pub max_attempts: usize,
    /// Base delay for retryable non-throttling failures, in milliseconds.
    pub base_delay_ms: u64,
    /// Maximum delay for retryable non-throttling failures, in milliseconds.
    pub max_delay_ms: u64,
    /// Base delay for throttling failures such as S3 `SlowDown`, in milliseconds.
    pub slowdown_base_delay_ms: u64,
    /// Maximum delay for throttling failures such as S3 `SlowDown`, in milliseconds.
    pub slowdown_max_delay_ms: u64,
    /// Jitter mode applied to computed retry delays.
    pub jitter: RetryJitter,
}

/// Destination `PutObject` failure counters.
#[derive(Clone, Debug, Default, Serialize, Deserialize)]
pub struct PutDiagnostics {
    /// Number of failed `PutObject` attempts, including retryable attempts that
    /// later succeeded.
    pub failed_attempts: u64,
    /// Failed `PutObject` attempts grouped by AWS error code or SDK failure kind.
    pub failures_by_error_code: BTreeMap<String, u64>,
    /// Application-level retry attempts scheduled after failed `PutObject` attempts.
    pub retry_attempts: u64,
    /// Failed `PutObject` attempts classified as throttling.
    pub throttled_attempts: u64,
    /// Number of waits on the shared destination PUT throttle.
    pub throttle_waits: u64,
    /// Total milliseconds spent waiting on the shared destination PUT throttle.
    pub throttle_wait_millis: u64,
}

/// Status for a single destination object operation.
#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum OperationStatus {
    /// The destination key was absent and was uploaded.
    UploadedNew,
    /// The destination key existed and was overwritten.
    UploadedChanged,
    /// The destination key existed and already matched the source entry.
    SkippedUnchanged,
    /// A conditional write failed because the destination changed after listing.
    ConditionalConflict,
    /// The destination key was extra and was deleted.
    DeletedExtra,
    /// The object operation failed.
    Error,
}

/// Status for a single dry-run destination operation.
#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum DryRunOperationStatus {
    /// The destination entry is absent and would be uploaded or created.
    WouldUploadNew,
    /// The destination entry exists and would be replaced.
    WouldUploadChanged,
    /// The destination entry already matches the source entry.
    SkippedUnchanged,
    /// The destination object is extra and would be deleted.
    WouldDeleteExtra,
    /// The object operation failed while planning.
    Error,
}

/// Per-object operation result.
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct ObjectReport {
    /// Operation status.
    pub status: OperationStatus,
    /// Destination object key or local path.
    pub key: String,
    /// Source ZIP path when the operation corresponds to a ZIP entry.
    #[serde(skip_serializing_if = "Option::is_none")]
    pub zip_path: Option<String>,
    /// Source entry size in bytes when known.
    #[serde(skip_serializing_if = "Option::is_none")]
    pub size: Option<u64>,
    /// Source MD5 digest when known.
    #[serde(skip_serializing_if = "Option::is_none")]
    pub md5: Option<String>,
    /// Destination ETag observed during the initial listing when available.
    #[serde(skip_serializing_if = "Option::is_none")]
    pub destination_etag: Option<String>,
    /// Error or conflict message when present.
    #[serde(skip_serializing_if = "Option::is_none")]
    pub message: Option<String>,
}

/// Per-object dry-run operation result.
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct DryRunObjectReport {
    /// Planned operation status.
    pub status: DryRunOperationStatus,
    /// Destination object key or local path.
    pub key: String,
    /// Source ZIP path when the operation corresponds to a ZIP entry.
    #[serde(skip_serializing_if = "Option::is_none")]
    pub zip_path: Option<String>,
    /// Source entry size in bytes when known.
    #[serde(skip_serializing_if = "Option::is_none")]
    pub size: Option<u64>,
    /// Source MD5 digest when known.
    #[serde(skip_serializing_if = "Option::is_none")]
    pub md5: Option<String>,
    /// Destination ETag observed during the initial listing when available.
    #[serde(skip_serializing_if = "Option::is_none")]
    pub destination_etag: Option<String>,
    /// Error message when present.
    #[serde(skip_serializing_if = "Option::is_none")]
    pub message: Option<String>,
}

#[cfg(test)]
pub(crate) fn summarize(report: &mut SyncReport) {
    for operation in &report.operations {
        summarize_operation(&mut report.summary, operation);
    }
}

pub(crate) fn summarize_dry_run_operation(
    summary: &mut UnzipDryRunSummary,
    operation: &DryRunObjectReport,
) {
    match operation.status {
        DryRunOperationStatus::WouldUploadNew => summary.would_upload_new += 1,
        DryRunOperationStatus::WouldUploadChanged => summary.would_upload_changed += 1,
        DryRunOperationStatus::SkippedUnchanged => summary.skipped_unchanged += 1,
        DryRunOperationStatus::WouldDeleteExtra => summary.would_delete_extra += 1,
        DryRunOperationStatus::Error => summary.errors += 1,
    }
}

pub(crate) fn summarize_operation(summary: &mut SyncSummary, operation: &ObjectReport) {
    match operation.status {
        OperationStatus::UploadedNew => summary.uploaded_new += 1,
        OperationStatus::UploadedChanged => summary.uploaded_changed += 1,
        OperationStatus::SkippedUnchanged => summary.skipped_unchanged += 1,
        OperationStatus::ConditionalConflict => summary.conditional_conflicts += 1,
        OperationStatus::DeletedExtra => summary.deleted_extra += 1,
        OperationStatus::Error => summary.errors += 1,
    }
}