git-remote-object-store 0.2.4

Git remote helper backed by cloud object stores (S3, Azure Blob Storage)
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
//! Backend-neutral multipart-upload constants and part planner.
//!
//! Both the S3 and Azure backends use the same threshold and part-size
//! defaults so the decision to switch from a single-shot put to a
//! multipart upload is identical regardless of which backend the user
//! has configured. S3 *requires* multipart above a 5 GiB single-PUT
//! ceiling; Azure does not technically require it, but a single
//! `BlockBlobClient::upload` call for a multi-GiB body is opaque and
//! error-prone — explicit `stage_block` + `commit_block_list` gives us
//! per-block retries, predictable concurrency, and per-block progress
//! events. Issue #53.
//!
//! Below [`MULTIPART_PUT_THRESHOLD`] both backends keep their existing
//! single-call paths so small bundles, lock files, and HEAD writes do
//! not pay the `CreateMultipartUpload` round-trip cost.

use std::fs::File;
use std::io;
use std::sync::Arc;

use bytes::Bytes;

use super::ObjectStoreError;
use super::error::other_boxed;

/// Object size at or above which uploads switch from a single PUT/upload
/// call to explicit multipart. Same value for S3 and Azure: this fixes
/// the S3 5 GiB ceiling and gives Azure per-block control on large
/// transfers (issue #53).
///
/// Chosen to be small enough that integration tests can exercise the
/// multipart path with modestly sized synthetic bodies, and large
/// enough that ordinary bundle / lock / HEAD writes never pay the
/// `CreateMultipartUpload` round trip.
pub(crate) const MULTIPART_PUT_THRESHOLD: u64 = 64 * 1024 * 1024;

/// Default per-part size. 16 MiB satisfies S3's 5 MiB minimum (S3
/// rejects any non-final part below 5 MiB, except the last one) and
/// yields ≤ 10 000 parts up to ~156 GiB; for larger objects
/// [`plan_upload_parts`] scales the part size up.
pub(crate) const MULTIPART_PUT_PART_SIZE: u64 = 16 * 1024 * 1024;

/// Cap on simultaneous in-flight part uploads. Matches the existing
/// download multipart concurrency in `s3::MULTIPART_MAX_CONCURRENCY`
/// so peak FD / socket / memory usage stays predictable across
/// upload and download.
pub(crate) const MULTIPART_PUT_MAX_CONCURRENCY: usize = 8;

/// S3's protocol cap on parts per multipart upload. AWS rejects
/// `CompleteMultipartUpload` with > 10 000 parts.
pub(crate) const S3_MAX_PARTS: u64 = 10_000;

/// Azure's protocol cap on blocks per blob. The SDK rejects
/// `CommitBlockList` with > 50 000 blocks.
pub(crate) const AZURE_MAX_BLOCKS: u64 = 50_000;

/// One slice of a multipart upload: zero-indexed offset into the
/// source body, and length in bytes. `length` is always non-zero;
/// the planner returns an empty Vec for `size == 0`.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) struct UploadPart {
    pub offset: u64,
    pub length: u64,
}

/// Returns `true` if a body of `size` should use multipart upload.
///
/// Pinned in tripwire tests on each backend so a future regression
/// that re-introduces a bare single-PUT for sizes above the threshold
/// is caught at compile/test time. Issue #53.
pub(crate) fn should_use_multipart(size: u64) -> bool {
    size >= MULTIPART_PUT_THRESHOLD
}

/// Plan part offsets/lengths for a multipart upload.
///
/// Returns at most `max_parts` non-empty parts whose lengths sum to
/// `size`. `target_part_size` is scaled up by powers of two until the
/// part count fits under `max_parts`; this matches the AWS S3
/// transfer manager's planner shape and satisfies S3's "no part
/// smaller than 5 MiB except the last" constraint as long as the
/// caller supplies `target_part_size >= 5 MiB`.
///
/// Returns an empty Vec for `size == 0` — caller must short-circuit
/// (a zero-byte multipart upload would create a useless `upload_id`
/// with no parts, and S3 rejects `CompleteMultipartUpload` with no
/// parts).
pub(crate) fn plan_upload_parts(
    size: u64,
    target_part_size: u64,
    max_parts: u64,
) -> Vec<UploadPart> {
    if size == 0 || target_part_size == 0 || max_parts == 0 {
        return Vec::new();
    }

    let part_size = scale_part_size(size, target_part_size, max_parts);
    let full_parts = size / part_size;
    let last_part = size % part_size;
    // `with_capacity` is best-effort; saturating to `usize::MAX` on a
    // 32-bit target is fine — the Vec will simply grow as needed.
    let part_count = usize::try_from(full_parts).unwrap_or(usize::MAX) + usize::from(last_part > 0);
    let mut parts = Vec::with_capacity(part_count);
    for i in 0..full_parts {
        parts.push(UploadPart {
            offset: i * part_size,
            length: part_size,
        });
    }
    if last_part > 0 {
        parts.push(UploadPart {
            offset: full_parts * part_size,
            length: last_part,
        });
    }
    parts
}

/// Read `part.length` bytes starting at `part.offset` from `file`
/// into a freshly-allocated `Bytes`, using a positional read so
/// concurrent tasks sharing the same file handle do not trample
/// each other's offsets.
///
/// Per-task `try_clone` would *not* work: stdlib's `File::try_clone`
/// (and therefore `tokio::fs::File::try_clone`) returns a new
/// `File` that references the same kernel open file description —
/// which holds the seek offset, so concurrent seeks via different
/// `File` handles trample. `read_exact_at` (`pread64`) bypasses
/// the offset entirely; it's thread-safe and kernel-defined to be
/// a no-op on the file's seek offset.
///
/// Sharing one open file description across all tasks instead of
/// re-opening by path closes the metadata/upload race: every task
/// sees the same inode regardless of concurrent rename or unlink
/// at the original path.
pub(crate) async fn read_file_part(
    file: Arc<std::fs::File>,
    part: UploadPart,
) -> Result<Bytes, ObjectStoreError> {
    let length = usize::try_from(part.length).map_err(other_boxed)?;
    // Positional read is blocking; offload to the blocking pool so
    // we don't stall the runtime for 16 MiB syscalls.
    let buf = tokio::task::spawn_blocking(move || -> std::io::Result<Vec<u8>> {
        let mut buf = vec![0u8; length];
        pread_exact(&file, &mut buf, part.offset)?;
        Ok(buf)
    })
    .await
    .map_err(other_boxed)?
    .map_err(other_boxed)?;
    Ok(Bytes::from(buf))
}

/// Cross-platform positional read: fill `buf` from `file` starting at
/// the given `offset`.
///
/// Unix uses `pread(2)` via `FileExt::read_exact_at`, which is
/// kernel-defined to be atomic with respect to the seek offset and
/// does not modify the file's seek pointer. Windows uses `seek_read`
/// from `std::os::windows::fs::FileExt`, which reads from an
/// absolute offset (independent of the current cursor) but as a
/// side effect *does* set the cursor to the end of the read. That
/// side effect is harmless for our use here because every read is
/// addressed by its own absolute `offset`, and no caller of
/// `read_file_part` consults the file's seek position. Unlike
/// `read_exact_at`, `seek_read` may return a short read or `0` at
/// EOF, so we loop until `buf` is filled and treat a zero-byte read
/// as `UnexpectedEof`.
///
/// The Windows path is reviewed for correctness against the stdlib
/// docs but is not exercised by CI; issue #176 tracks adding a
/// Windows runner.
#[cfg(unix)]
fn pread_exact(file: &File, buf: &mut [u8], offset: u64) -> io::Result<()> {
    use std::os::unix::fs::FileExt;
    file.read_exact_at(buf, offset)
}

#[cfg(windows)]
fn pread_exact(file: &File, buf: &mut [u8], offset: u64) -> io::Result<()> {
    use std::os::windows::fs::FileExt;
    let mut filled = 0;
    while filled < buf.len() {
        let n = file.seek_read(&mut buf[filled..], offset + filled as u64)?;
        if n == 0 {
            return Err(io::Error::from(io::ErrorKind::UnexpectedEof));
        }
        filled += n;
    }
    Ok(())
}

/// Zero-copy slice of an in-memory body for a single part. The
/// part's offset/length are bounded by the body size at the call
/// site, so the `usize::try_from` conversions cannot fail on a
/// target where the body itself fit.
pub(crate) fn slice_bytes_part(body: &Bytes, part: UploadPart) -> Result<Bytes, ObjectStoreError> {
    let offset = usize::try_from(part.offset).map_err(other_boxed)?;
    let length = usize::try_from(part.length).map_err(other_boxed)?;
    Ok(body.slice(offset..offset + length))
}

/// Compute the smallest power-of-two multiple of `target_part_size`
/// that yields a plan with `<= max_parts` parts.
fn scale_part_size(size: u64, target_part_size: u64, max_parts: u64) -> u64 {
    let mut part_size = target_part_size;
    while size.div_ceil(part_size) > max_parts {
        // Saturating shift in case a pathologically large size /
        // small max_parts pair would overflow; in practice
        // S3_MAX_PARTS=10_000 with target_part_size=16 MiB caps at
        // ~156 GiB before a single doubling kicks in, and the upper
        // multipart limit is 5 TiB.
        let next = part_size.checked_mul(2);
        match next {
            Some(n) => part_size = n,
            None => return part_size,
        }
    }
    part_size
}

#[cfg(test)]
mod tests {
    use super::*;

    const KIB: u64 = 1024;
    const MIB: u64 = 1024 * KIB;

    #[test]
    fn should_use_multipart_below_threshold_is_false() {
        assert!(!should_use_multipart(0));
        assert!(!should_use_multipart(MULTIPART_PUT_THRESHOLD - 1));
    }

    #[test]
    fn should_use_multipart_at_or_above_threshold_is_true() {
        assert!(should_use_multipart(MULTIPART_PUT_THRESHOLD));
        assert!(should_use_multipart(MULTIPART_PUT_THRESHOLD + 1));
    }

    #[test]
    fn plan_upload_parts_zero_size_is_empty() {
        let parts = plan_upload_parts(0, MULTIPART_PUT_PART_SIZE, S3_MAX_PARTS);
        assert!(parts.is_empty());
    }

    #[test]
    fn plan_upload_parts_zero_target_part_size_is_empty() {
        let parts = plan_upload_parts(MULTIPART_PUT_PART_SIZE, 0, S3_MAX_PARTS);
        assert!(parts.is_empty());
    }

    #[test]
    fn plan_upload_parts_zero_max_parts_is_empty() {
        let parts = plan_upload_parts(MULTIPART_PUT_PART_SIZE, MULTIPART_PUT_PART_SIZE, 0);
        assert!(parts.is_empty());
    }

    #[test]
    fn plan_upload_parts_one_part_when_size_eq_part_size() {
        let parts = plan_upload_parts(16 * MIB, 16 * MIB, S3_MAX_PARTS);
        assert_eq!(
            parts,
            vec![UploadPart {
                offset: 0,
                length: 16 * MIB
            }]
        );
    }

    #[test]
    fn plan_upload_parts_last_part_short() {
        let parts = plan_upload_parts(16 * MIB + 1, 16 * MIB, S3_MAX_PARTS);
        assert_eq!(
            parts,
            vec![
                UploadPart {
                    offset: 0,
                    length: 16 * MIB,
                },
                UploadPart {
                    offset: 16 * MIB,
                    length: 1,
                },
            ]
        );
    }

    #[test]
    fn plan_upload_parts_threshold_boundary_yields_expected_part_count() {
        // 64 MiB at 16 MiB part size = exactly 4 parts.
        let parts = plan_upload_parts(
            MULTIPART_PUT_THRESHOLD,
            MULTIPART_PUT_PART_SIZE,
            S3_MAX_PARTS,
        );
        assert_eq!(parts.len(), 4);
        let total: u64 = parts.iter().map(|p| p.length).sum();
        assert_eq!(total, MULTIPART_PUT_THRESHOLD);
        for (i, p) in parts.iter().enumerate() {
            assert_eq!(p.offset, (i as u64) * MULTIPART_PUT_PART_SIZE);
            assert_eq!(p.length, MULTIPART_PUT_PART_SIZE);
        }
    }

    #[test]
    fn plan_upload_parts_lengths_sum_to_size() {
        // Verifies the planner is total: no bytes lost, none doubled.
        let cases = [
            1_u64,
            MULTIPART_PUT_PART_SIZE - 1,
            MULTIPART_PUT_PART_SIZE,
            MULTIPART_PUT_PART_SIZE + 1,
            7 * MULTIPART_PUT_PART_SIZE + 17,
            123 * MIB + 4567,
        ];
        for size in cases {
            let parts = plan_upload_parts(size, MULTIPART_PUT_PART_SIZE, S3_MAX_PARTS);
            let total: u64 = parts.iter().map(|p| p.length).sum();
            assert_eq!(total, size, "size={size}");
            // Offsets are contiguous and sorted.
            let mut expected_offset = 0_u64;
            for p in &parts {
                assert_eq!(p.offset, expected_offset, "size={size}");
                assert!(p.length > 0);
                expected_offset += p.length;
            }
        }
    }

    #[test]
    fn plan_upload_parts_scales_part_size_when_max_parts_exceeded() {
        // 200 GiB at 16 MiB target part size with S3's 10 000 cap:
        // 200 GiB / 16 MiB = 12 800 parts > 10 000, so the planner
        // doubles the part size to 32 MiB → 6 400 parts ≤ 10 000.
        let size = 200 * 1024 * MIB;
        let parts = plan_upload_parts(size, MULTIPART_PUT_PART_SIZE, S3_MAX_PARTS);
        assert!(
            (parts.len() as u64) <= S3_MAX_PARTS,
            "parts.len()={} > S3_MAX_PARTS={S3_MAX_PARTS}",
            parts.len(),
        );
        let total: u64 = parts.iter().map(|p| p.length).sum();
        assert_eq!(total, size);
        // Every full part should be 32 MiB (one power-of-two doubling).
        let expected_part_size = 32 * MIB;
        for p in parts.iter().take(parts.len() - 1) {
            assert_eq!(p.length, expected_part_size);
        }
    }

    #[test]
    fn plan_upload_parts_azure_block_cap_well_above_s3() {
        // Sanity: Azure's 50 000-block cap is reached only at much
        // larger sizes than S3's 10 000-part cap.
        let size = 200 * 1024 * MIB;
        let parts = plan_upload_parts(size, MULTIPART_PUT_PART_SIZE, AZURE_MAX_BLOCKS);
        // 200 GiB / 16 MiB = 12 800 — fits without scaling.
        assert!((parts.len() as u64) <= AZURE_MAX_BLOCKS);
        for p in parts.iter().take(parts.len() - 1) {
            assert_eq!(p.length, MULTIPART_PUT_PART_SIZE);
        }
    }

    /// Deterministic mid-body interruption coverage for issue #56.
    ///
    /// Pin the io-error propagation site of the multipart upload
    /// pipeline: when the source file is truncated underneath the
    /// shared `Arc<File>`, `read_file_part` for an offset past the
    /// new EOF returns `Err`. Both backends thread this error through
    /// `JoinSet::join_next` and abort the upload (S3 explicit
    /// `AbortMultipartUpload`; Azure no `commit_block_list`).
    #[tokio::test]
    async fn read_file_part_propagates_eof_after_truncate() {
        use std::io::Write;

        let dir = tempfile::tempdir().expect("tempdir");
        let path = dir.path().join("truncated.bin");
        // Create a 4 KiB file so the test exercises the read-error
        // path without paying for a full 16 MiB allocation.
        {
            let mut f = std::fs::File::create(&path).expect("create");
            f.write_all(&[0u8; 4 * 1024]).expect("write");
        }
        let file = Arc::new(std::fs::File::open(&path).expect("open"));

        // Truncate to zero through a separate handle. The
        // already-open `file` still references the same inode, but
        // every `pread` past the new EOF returns short — exactly
        // what `multipart_put_path` would observe if the source were
        // unlinked and replaced mid-upload.
        std::fs::File::options()
            .write(true)
            .open(&path)
            .expect("reopen for truncate")
            .set_len(0)
            .expect("truncate");

        let part = UploadPart {
            offset: 1024,
            length: 1024,
        };
        let result = read_file_part(file, part).await;
        assert!(
            result.is_err(),
            "expected read_file_part to error on truncated source, got Ok"
        );
    }
}