Skip to main content

git_remote_object_store/object_store/
mod.rs

1//! Backend-neutral object-store trait shared by the S3 and Azure Blob
2//! implementations.
3//!
4//! The trait, value types, and in-memory mock live here; the concrete
5//! S3 and Azure backends are in the sibling modules.
6//!
7//! Trait dispatch is intended for `Arc<dyn ObjectStore>` so the
8//! protocol REPL can drive either backend without monomorphisation.
9//! Async methods are routed through [`async_trait`] so
10//! `dyn ObjectStore + Send + Sync` composes cleanly — native
11//! `async fn`-in-trait would require per-method `Send` bounds that
12//! don't survive `dyn`.
13
14pub mod azure;
15pub mod error;
16pub(crate) mod multipart;
17pub mod s3;
18
19#[cfg(any(test, feature = "test-util"))]
20pub mod mock;
21
22#[cfg(test)]
23pub(crate) mod test_support;
24
25use std::path::Path;
26use std::sync::Arc;
27
28use bytes::Bytes;
29use tempfile::NamedTempFile;
30use time::OffsetDateTime;
31use tracing::warn;
32
33use self::error::other_boxed;
34pub use self::error::{BoxError, ObjectStoreError};
35
36/// Progress callback invoked by streaming put/get operations.
37///
38/// `report(bytes_just_transferred)` fires at chunk boundaries — one
39/// event per completed part / block on multipart uploads (both S3 and
40/// Azure above [`multipart::MULTIPART_PUT_THRESHOLD`]; issue #53), one
41/// event per ranged GET on multipart downloads, one event per body
42/// chunk on small-object reads, and a single end-of-transfer event on
43/// single-PUT uploads. Callers accumulate `bytes_so_far` themselves.
44///
45/// The callback runs on the backend's task and may be invoked from a
46/// spawned worker, so it must be cheap and non-blocking. The LFS agent
47/// forwards `report` calls into an `mpsc` channel that the agent drains
48/// into protocol `progress` events.
49#[derive(Clone)]
50pub struct ProgressSink(Arc<dyn Fn(u64) + Send + Sync>);
51
52impl ProgressSink {
53    /// Build a sink from any cheap, thread-safe callback.
54    pub fn new<F>(f: F) -> Self
55    where
56        F: Fn(u64) + Send + Sync + 'static,
57    {
58        Self(Arc::new(f))
59    }
60
61    /// Report `bytes_amount` newly transferred bytes.
62    pub fn report(&self, bytes_amount: u64) {
63        (self.0)(bytes_amount);
64    }
65}
66
67impl std::fmt::Debug for ProgressSink {
68    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
69        f.debug_struct("ProgressSink").finish_non_exhaustive()
70    }
71}
72
73/// Atomically rename a [`NamedTempFile`] to `dest`, mapping the
74/// [`tempfile::PersistError`] into [`ObjectStoreError::Other`].
75///
76/// Shared between the S3 and Azure backends — both write `get_to_file`
77/// results to a sibling tempfile and persist on success so a partial
78/// download cannot leave a corrupt destination for the unbundle step.
79pub(crate) fn persist_temp(temp: NamedTempFile, dest: &Path) -> Result<(), ObjectStoreError> {
80    temp.persist(dest)
81        .map_err(|e| ObjectStoreError::Other(Box::new(e.error)))?;
82    Ok(())
83}
84
85/// Metadata returned by `list` and `head`.
86///
87/// `key` is the full backend key (the prefix passed to `list` is included);
88/// `last_modified` is the server-side wall clock, used by stale-lock
89/// recovery in the push path.
90#[derive(Debug, Clone)]
91pub struct ObjectMeta {
92    /// Full key of the stored object.
93    pub key: String,
94    /// Body length in bytes.
95    pub size: u64,
96    /// Server-side last-modified timestamp.
97    pub last_modified: OffsetDateTime,
98    /// Opaque entity-tag returned by `HEAD` / `GET`. S3 returns a
99    /// quoted MD5 (e.g. `"d41d8…"`); Azure returns a similar `ETag`.
100    /// `None` when the backend does not expose one (e.g. `list` results
101    /// on some backends omit it).
102    pub etag: Option<String>,
103}
104
105/// Optional `put_bytes` / `put_path` knobs.
106///
107/// `content_disposition` and `user_metadata` are populated only by the
108/// zip-archive push path, which supplies `Content-Disposition` and the
109/// `codepipeline-artifact-revision-summary` user metadata. `progress`
110/// is populated by the LFS agent so long uploads can drive the
111/// `git-lfs` progress bar; left `None` for bundle / lock / HEAD writes
112/// where progress reporting is not useful. Defaults to "no extras",
113/// which covers every other write.
114#[derive(Debug, Clone, Default)]
115pub struct PutOpts {
116    /// HTTP `Content-Disposition` header to associate with the object.
117    pub content_disposition: Option<String>,
118    /// Backend user-defined metadata (key/value pairs). Backends should
119    /// preserve insertion order; key case-folding is backend-defined.
120    pub user_metadata: Vec<(String, String)>,
121    /// Optional progress sink invoked at chunk boundaries during the
122    /// upload. Backends that do single-shot uploads (small bodies)
123    /// emit one `report(size)` call after the transfer completes.
124    pub progress: Option<ProgressSink>,
125}
126
127/// Optional `get_to_file` knobs.
128///
129/// `progress` is populated by the LFS agent (the only consumer that
130/// needs live download progress); bundle fetches leave it `None`.
131#[derive(Debug, Clone, Default)]
132pub struct GetOpts {
133    /// Optional progress sink invoked at chunk boundaries during the
134    /// download. Multipart download paths emit one `report(chunk_size)`
135    /// call per completed range; the small-object path emits one
136    /// `report(chunk.len())` per body chunk read off the wire.
137    pub progress: Option<ProgressSink>,
138}
139
140/// Caller-side preflight for [`ObjectStore::get_bytes_range`].
141///
142/// Centralises the trait contract for degenerate inputs so every
143/// backend impl renders the same wire-line and the same short-circuit:
144///
145/// - `start == end` → `Ok(Some(Bytes::new()))`; the caller returns the
146///   empty body without issuing a network request.
147/// - `start > end` → `Err(RangeNotSatisfiable)`; the caller propagates.
148/// - otherwise → `Ok(None)`; the caller proceeds with the SDK call.
149pub(crate) fn precheck_range(
150    key: &str,
151    range: &std::ops::Range<u64>,
152) -> Result<Option<Bytes>, ObjectStoreError> {
153    if range.start == range.end {
154        return Ok(Some(Bytes::new()));
155    }
156    if range.start > range.end {
157        return Err(ObjectStoreError::RangeNotSatisfiable {
158            key: key.to_owned(),
159            requested: range.clone(),
160        });
161    }
162    Ok(None)
163}
164
165/// Post-flight check for [`ObjectStore::get_bytes_range`] body responses.
166///
167/// Real S3 and Azure backends silently truncate a ranged GET when
168/// `range.start < body.len() <= range.end` — the wire response carries
169/// `range.start..body.len()` bytes with HTTP 206 and no error. The
170/// packchain reader treats the returned slice as the exact entry it
171/// asked for, so a truncated pack file (or a stale `chain.json` whose
172/// recorded offsets outrun the on-bucket pack) would propagate as
173/// downstream pack-decode garbage rather than as the data-integrity
174/// failure it is.
175///
176/// This helper asserts the SDK gave back exactly `range.end - range.start`
177/// bytes and surfaces a mismatch as [`ObjectStoreError::RangeNotSatisfiable`]
178/// with the originally requested range. It is the symmetric companion to
179/// [`precheck_range`] — the preflight guards against degenerate inputs
180/// before the SDK call; this guards against degenerate output after.
181///
182/// `precheck_range` short-circuits empty and inverted ranges, so by the
183/// time this runs `range.start < range.end` is guaranteed. Subtraction
184/// therefore cannot underflow.
185pub(crate) fn verify_range_response_length(
186    key: &str,
187    range: &std::ops::Range<u64>,
188    body: Bytes,
189) -> Result<Bytes, ObjectStoreError> {
190    let expected = range.end - range.start;
191    let actual = body.len() as u64;
192    if actual == expected {
193        return Ok(body);
194    }
195    Err(ObjectStoreError::RangeNotSatisfiable {
196        key: key.to_owned(),
197        requested: range.clone(),
198    })
199}
200
201/// Backend-neutral cloud object-store surface.
202///
203/// Method semantics — every implementation must satisfy these contracts so
204/// higher layers can target the trait without backend-specific branching.
205///
206/// - **`list(prefix)`** — byte-prefix match (matches S3 `Prefix=`
207///   semantics; `list("a")` returns `a`, `a/1`, and `aaa`). Returns full
208///   keys; ordering is backend-defined.
209/// - **`get_to_file(key, dest, opts)`** — caller must ensure `dest`'s
210///   parent directory exists. `opts.progress`, if set, fires at chunk
211///   boundaries so callers (notably the LFS agent) can render a live
212///   progress bar.
213/// - **`put_bytes`** — overwrites if the key already exists.
214/// - **`put_path`** — streams a local file to the key, overwriting if
215///   present. Default reads the file into memory; backends should
216///   override for large-file streaming.
217/// - **`put_if_absent`** — returns `Ok(true)` on creation, `Ok(false)` if
218///   the key already existed. Backends collapse both 412
219///   (`PreconditionFailed`) and 409 (`Conflict`) into `Ok(false)`;
220///   transport-level failures still surface as `Err`.
221/// - **`copy(src, dst)`** — overwrites `dst`; returns `Err(NotFound)` when
222///   `src` is absent.
223/// - **`delete`** — returns `Err(NotFound)` on missing key. `release_lock`
224///   maps `NotFound` to `Ok(())` and propagates other errors.
225/// - **`get_bytes_range`** — half-open `[start, end)` range. `start == end`
226///   returns `Ok(Bytes::new())` with no network call. `start > end` and
227///   server-side 416 both surface as
228///   [`ObjectStoreError::RangeNotSatisfiable`]. When the object's body
229///   ends inside the requested range
230///   (`start < body.len() <= end`) real S3/Azure backends return a
231///   silently truncated body (HTTP 206, fewer bytes than requested);
232///   this trait elevates that mismatch to
233///   [`ObjectStoreError::RangeNotSatisfiable`] so callers never see a
234///   short slice masquerading as the full requested range. The mock
235///   backend matches this contract.
236#[async_trait::async_trait]
237pub trait ObjectStore: Send + Sync {
238    /// Enumerate every object whose key has `prefix` as a byte prefix.
239    async fn list(&self, prefix: &str) -> Result<Vec<ObjectMeta>, ObjectStoreError>;
240
241    /// Stream the object body to `dest`. The destination's parent
242    /// directory must already exist. `opts.progress`, when set, fires
243    /// at chunk boundaries with the count of bytes just received.
244    async fn get_to_file(
245        &self,
246        key: &str,
247        dest: &Path,
248        opts: GetOpts,
249    ) -> Result<(), ObjectStoreError>;
250
251    /// Read the entire object body into memory.
252    async fn get_bytes(&self, key: &str) -> Result<Bytes, ObjectStoreError>;
253
254    /// Read a half-open byte range `[start, end)` of the object body.
255    ///
256    /// `start == end` returns `Ok(Bytes::new())` without issuing a
257    /// network request. `start > end`, or a server-side 416, surfaces
258    /// as [`ObjectStoreError::RangeNotSatisfiable`].
259    ///
260    /// **Truncation contract**: real S3 and Azure backends return a
261    /// silently truncated body (HTTP 206 with fewer bytes than asked)
262    /// when the requested range overruns the object's end —
263    /// `start < body.len() <= end`. Backends here elevate that mismatch
264    /// to [`ObjectStoreError::RangeNotSatisfiable`] so a successful
265    /// `Ok(bytes)` always carries exactly `end - start` bytes. The
266    /// packchain reader (issue #52) relies on this to surface stale
267    /// `chain.json` offsets and truncated pack files as data-integrity
268    /// errors instead of pack-decode garbage.
269    ///
270    /// Used by the packchain engine (issue #52) to read a single blob
271    /// out of a larger pack file without downloading the whole pack.
272    async fn get_bytes_range(
273        &self,
274        key: &str,
275        range: std::ops::Range<u64>,
276    ) -> Result<Bytes, ObjectStoreError>;
277
278    /// Write `body` to `key`, overwriting any existing object.
279    async fn put_bytes(
280        &self,
281        key: &str,
282        body: Bytes,
283        opts: PutOpts,
284    ) -> Result<(), ObjectStoreError>;
285
286    /// Stream a local file to `key`, overwriting any existing object.
287    ///
288    /// Backends should override this to stream from disk without buffering
289    /// the entire file in process memory. The default implementation reads
290    /// the file into memory and delegates to [`put_bytes`](Self::put_bytes);
291    /// this is correct but defeats the streaming intent for large files.
292    async fn put_path(&self, key: &str, src: &Path, opts: PutOpts) -> Result<(), ObjectStoreError> {
293        warn!(
294            key,
295            path = %src.display(),
296            "put_path: falling back to read-then-put_bytes; override this method to avoid \
297             buffering the entire file in memory"
298        );
299        let body = tokio::fs::read(src).await.map_err(other_boxed)?;
300        // `usize` is at most 64 bits wide, so this cast never truncates.
301        let len = body.len() as u64;
302        let progress = opts.progress.clone();
303        // Strip progress from the inner `put_bytes` call so the sink
304        // doesn't fire twice — once during put_bytes' own reporting and
305        // again on our final end-of-transfer event below.
306        let inner_opts = PutOpts {
307            progress: None,
308            ..opts
309        };
310        self.put_bytes(key, Bytes::from(body), inner_opts).await?;
311        // Single-shot fallback emits a final progress event with the
312        // full body size. Zero-byte bodies produce no progress event.
313        if let Some(sink) = progress
314            && len > 0
315        {
316            sink.report(len);
317        }
318        Ok(())
319    }
320
321    /// Create `key` if and only if it does not exist. Returns `Ok(true)`
322    /// when the object was created, `Ok(false)` when the key was already
323    /// present.
324    async fn put_if_absent(&self, key: &str, body: Bytes) -> Result<bool, ObjectStoreError>;
325
326    /// Fetch metadata for an exact key.
327    async fn head(&self, key: &str) -> Result<ObjectMeta, ObjectStoreError>;
328
329    /// Copy `src` to `dst`. The body is preserved on every backend;
330    /// user metadata is **not** guaranteed to survive — callers must not
331    /// rely on metadata round-tripping through `copy`.
332    ///
333    /// The trait's only in-tree consumer is `Doctor::evict_losing_bundle`,
334    /// which carries no user metadata on bundle objects.
335    async fn copy(&self, src: &str, dst: &str) -> Result<(), ObjectStoreError>;
336
337    /// Delete `key`. Returns `Err(ObjectStoreError::NotFound)` if the key was
338    /// not present.
339    async fn delete(&self, key: &str) -> Result<(), ObjectStoreError>;
340
341    /// Build a presigned (short-lived, signed) URL granting GET access
342    /// to `key` for `ttl`. Used by the `bundle-uri` capability
343    /// ([`crate::protocol::bundle_uri`]) to advertise time-limited
344    /// download URLs against private buckets.
345    ///
346    /// The default impl returns
347    /// [`ObjectStoreError::Unsupported`] so backends that have no
348    /// presigning model (e.g. `MockStore` in tests, or
349    /// `AzureStore` configured with a `TokenCredential` rather than
350    /// a shared account key) inherit a clean "not supported" error
351    /// without needing a stub.
352    ///
353    /// # Errors
354    ///
355    /// Returns [`ObjectStoreError::Unsupported`] when the backend
356    /// cannot produce signed URLs (default).
357    /// Returns [`ObjectStoreError::Other`] when the backend supports
358    /// presigning but the SDK rejects the TTL (e.g. AWS's 7-day
359    /// ceiling).
360    async fn presigned_get_url(
361        &self,
362        key: &str,
363        ttl: std::time::Duration,
364    ) -> Result<String, ObjectStoreError> {
365        let _ = (key, ttl);
366        Err(ObjectStoreError::Unsupported(
367            "presigned URLs are not supported by this backend".to_owned(),
368        ))
369    }
370}
371
372/// Blanket impl so `&Arc<T>` coerces to `&dyn ObjectStore` and so
373/// `Arc<T>` is itself usable as an `ObjectStore` value.
374///
375/// `T: ObjectStore + ?Sized` covers both concrete types (`Arc<S3Store>`,
376/// `Arc<MockStore>`) and erased trait objects (`Arc<dyn ObjectStore>`).
377/// Every method forwards to the inner `T` through the `Deref` impl.
378///
379/// Verified non-removable: dropping this impl breaks call sites that
380/// pass `&Arc<dyn ObjectStore>` to a function taking `&dyn ObjectStore`.
381/// Plain `Arc::deref` lets `arc.list()` work via Rust's auto-deref on
382/// method calls, but the *coercion* `&Arc<T>` → `&dyn ObjectStore`
383/// requires `Arc<T>` itself to implement the trait — and that is what
384/// this impl provides.
385#[async_trait::async_trait]
386impl<T: ObjectStore + ?Sized> ObjectStore for Arc<T> {
387    async fn list(&self, prefix: &str) -> Result<Vec<ObjectMeta>, ObjectStoreError> {
388        (**self).list(prefix).await
389    }
390
391    async fn get_to_file(
392        &self,
393        key: &str,
394        dest: &Path,
395        opts: GetOpts,
396    ) -> Result<(), ObjectStoreError> {
397        (**self).get_to_file(key, dest, opts).await
398    }
399
400    async fn get_bytes(&self, key: &str) -> Result<Bytes, ObjectStoreError> {
401        (**self).get_bytes(key).await
402    }
403
404    async fn get_bytes_range(
405        &self,
406        key: &str,
407        range: std::ops::Range<u64>,
408    ) -> Result<Bytes, ObjectStoreError> {
409        (**self).get_bytes_range(key, range).await
410    }
411
412    async fn put_bytes(
413        &self,
414        key: &str,
415        body: Bytes,
416        opts: PutOpts,
417    ) -> Result<(), ObjectStoreError> {
418        (**self).put_bytes(key, body, opts).await
419    }
420
421    async fn put_path(&self, key: &str, src: &Path, opts: PutOpts) -> Result<(), ObjectStoreError> {
422        (**self).put_path(key, src, opts).await
423    }
424
425    async fn put_if_absent(&self, key: &str, body: Bytes) -> Result<bool, ObjectStoreError> {
426        (**self).put_if_absent(key, body).await
427    }
428
429    async fn head(&self, key: &str) -> Result<ObjectMeta, ObjectStoreError> {
430        (**self).head(key).await
431    }
432
433    async fn copy(&self, src: &str, dst: &str) -> Result<(), ObjectStoreError> {
434        (**self).copy(src, dst).await
435    }
436
437    async fn delete(&self, key: &str) -> Result<(), ObjectStoreError> {
438        (**self).delete(key).await
439    }
440
441    async fn presigned_get_url(
442        &self,
443        key: &str,
444        ttl: std::time::Duration,
445    ) -> Result<String, ObjectStoreError> {
446        (**self).presigned_get_url(key, ttl).await
447    }
448}
449
450#[cfg(test)]
451mod tests {
452    use super::*;
453
454    #[test]
455    fn precheck_range_empty_short_circuits_with_empty_bytes() {
456        let out = precheck_range("k", &(5..5)).expect("empty range is valid");
457        let bytes = out.expect("empty range short-circuits with Some");
458        assert!(bytes.is_empty());
459    }
460
461    #[test]
462    fn precheck_range_inverted_returns_range_not_satisfiable() {
463        let range = std::ops::Range { start: 7, end: 3 };
464        let err = precheck_range("k", &range).expect_err("inverted range must error");
465        assert!(matches!(
466            err,
467            ObjectStoreError::RangeNotSatisfiable {
468                ref key,
469                requested: ref r,
470            } if key == "k" && r.start == 7 && r.end == 3
471        ));
472    }
473
474    #[test]
475    fn precheck_range_well_formed_returns_none() {
476        let out = precheck_range("k", &(2..6)).expect("valid range");
477        assert!(out.is_none(), "well-formed range proceeds to SDK call");
478    }
479
480    #[test]
481    fn verify_range_response_length_passes_exact_length() {
482        let range = 2..6;
483        let body = Bytes::from_static(b"abcd"); // 4 bytes, matches range
484        let out = verify_range_response_length("k", &range, body.clone())
485            .expect("exact-length body must pass");
486        assert_eq!(out, body);
487    }
488
489    #[test]
490    fn verify_range_response_length_rejects_truncated_body() {
491        // S3/Azure return start..body.len() when start < body.len() <= end.
492        // Caller asked for 4 bytes; SDK returned 2.
493        let range = 2..6;
494        let body = Bytes::from_static(b"ab");
495        let err = verify_range_response_length("pack-key", &range, body)
496            .expect_err("short body must be rejected");
497        assert!(
498            matches!(
499                err,
500                ObjectStoreError::RangeNotSatisfiable {
501                    ref key,
502                    requested: ref r,
503                } if key == "pack-key" && r.start == 2 && r.end == 6,
504            ),
505            "expected RangeNotSatisfiable(pack-key, 2..6), got {err:?}"
506        );
507    }
508
509    #[test]
510    fn verify_range_response_length_rejects_overlong_body() {
511        // Defensive: a hypothetical SDK bug that returned MORE bytes than
512        // requested would equally let downstream callers consume corrupt
513        // data. Treat as the same mismatch.
514        let range = 0..4;
515        let body = Bytes::from_static(b"abcdef");
516        let err = verify_range_response_length("k", &range, body)
517            .expect_err("overlong body must be rejected");
518        assert!(matches!(err, ObjectStoreError::RangeNotSatisfiable { .. }));
519    }
520
521    #[test]
522    fn verify_range_response_length_single_byte_round_trip() {
523        // 1-byte range is the smallest non-degenerate case (precheck
524        // already rejected 0-byte and inverted). Locks in that the
525        // single-byte path has no off-by-one.
526        let range = 7..8;
527        let body = Bytes::from_static(b"x");
528        let out = verify_range_response_length("k", &range, body.clone())
529            .expect("single-byte body must pass");
530        assert_eq!(out, body);
531    }
532}