git_remote_object_store/object_store/mod.rs
1//! Backend-neutral object-store trait shared by the S3 and Azure Blob
2//! implementations.
3//!
4//! The trait, value types, and in-memory mock live here; the concrete
5//! S3 and Azure backends are in the sibling modules.
6//!
7//! Trait dispatch is intended for `Arc<dyn ObjectStore>` so the
8//! protocol REPL can drive either backend without monomorphisation.
9//! Async methods are routed through [`async_trait`] so
10//! `dyn ObjectStore + Send + Sync` composes cleanly — native
11//! `async fn`-in-trait would require per-method `Send` bounds that
12//! don't survive `dyn`.
13
14pub mod azure;
15pub mod error;
16pub(crate) mod multipart;
17pub mod s3;
18
19#[cfg(any(test, feature = "test-util"))]
20pub mod mock;
21
22#[cfg(test)]
23pub(crate) mod test_support;
24
25use std::path::Path;
26use std::sync::Arc;
27
28use bytes::Bytes;
29use tempfile::NamedTempFile;
30use time::OffsetDateTime;
31use tracing::warn;
32
33use self::error::other_boxed;
34pub use self::error::{BoxError, ObjectStoreError};
35
36/// Progress callback invoked by streaming put/get operations.
37///
38/// `report(bytes_just_transferred)` fires at chunk boundaries — one
39/// event per completed part / block on multipart uploads (both S3 and
40/// Azure above [`multipart::MULTIPART_PUT_THRESHOLD`]; issue #53), one
41/// event per ranged GET on multipart downloads, one event per body
42/// chunk on small-object reads, and a single end-of-transfer event on
43/// single-PUT uploads. Callers accumulate `bytes_so_far` themselves.
44///
45/// The callback runs on the backend's task and may be invoked from a
46/// spawned worker, so it must be cheap and non-blocking. The LFS agent
47/// forwards `report` calls into an `mpsc` channel that the agent drains
48/// into protocol `progress` events.
49#[derive(Clone)]
50pub struct ProgressSink(Arc<dyn Fn(u64) + Send + Sync>);
51
52impl ProgressSink {
53 /// Build a sink from any cheap, thread-safe callback.
54 pub fn new<F>(f: F) -> Self
55 where
56 F: Fn(u64) + Send + Sync + 'static,
57 {
58 Self(Arc::new(f))
59 }
60
61 /// Report `bytes_amount` newly transferred bytes.
62 pub fn report(&self, bytes_amount: u64) {
63 (self.0)(bytes_amount);
64 }
65}
66
67impl std::fmt::Debug for ProgressSink {
68 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
69 f.debug_struct("ProgressSink").finish_non_exhaustive()
70 }
71}
72
73/// Atomically rename a [`NamedTempFile`] to `dest`, mapping the
74/// [`tempfile::PersistError`] into [`ObjectStoreError::Other`].
75///
76/// Shared between the S3 and Azure backends — both write `get_to_file`
77/// results to a sibling tempfile and persist on success so a partial
78/// download cannot leave a corrupt destination for the unbundle step.
79pub(crate) fn persist_temp(temp: NamedTempFile, dest: &Path) -> Result<(), ObjectStoreError> {
80 temp.persist(dest)
81 .map_err(|e| ObjectStoreError::Other(Box::new(e.error)))?;
82 Ok(())
83}
84
85/// Metadata returned by `list` and `head`.
86///
87/// `key` is the full backend key (the prefix passed to `list` is included);
88/// `last_modified` is the server-side wall clock, used by stale-lock
89/// recovery in the push path.
90#[derive(Debug, Clone)]
91pub struct ObjectMeta {
92 /// Full key of the stored object.
93 pub key: String,
94 /// Body length in bytes.
95 pub size: u64,
96 /// Server-side last-modified timestamp.
97 pub last_modified: OffsetDateTime,
98 /// Opaque entity-tag returned by `HEAD` / `GET`. S3 returns a
99 /// quoted MD5 (e.g. `"d41d8…"`); Azure returns a similar `ETag`.
100 /// `None` when the backend does not expose one (e.g. `list` results
101 /// on some backends omit it).
102 pub etag: Option<String>,
103}
104
105/// Optional `put_bytes` / `put_path` knobs.
106///
107/// `content_disposition` and `user_metadata` are populated only by the
108/// zip-archive push path, which supplies `Content-Disposition` and the
109/// `codepipeline-artifact-revision-summary` user metadata. `progress`
110/// is populated by the LFS agent so long uploads can drive the
111/// `git-lfs` progress bar; left `None` for bundle / lock / HEAD writes
112/// where progress reporting is not useful. Defaults to "no extras",
113/// which covers every other write.
114#[derive(Debug, Clone, Default)]
115pub struct PutOpts {
116 /// HTTP `Content-Disposition` header to associate with the object.
117 pub content_disposition: Option<String>,
118 /// Backend user-defined metadata (key/value pairs). Backends should
119 /// preserve insertion order; key case-folding is backend-defined.
120 pub user_metadata: Vec<(String, String)>,
121 /// Optional progress sink invoked at chunk boundaries during the
122 /// upload. Backends that do single-shot uploads (small bodies)
123 /// emit one `report(size)` call after the transfer completes.
124 pub progress: Option<ProgressSink>,
125}
126
127/// Optional `get_to_file` knobs.
128///
129/// `progress` is populated by the LFS agent (the only consumer that
130/// needs live download progress); bundle fetches leave it `None`.
131#[derive(Debug, Clone, Default)]
132pub struct GetOpts {
133 /// Optional progress sink invoked at chunk boundaries during the
134 /// download. Multipart download paths emit one `report(chunk_size)`
135 /// call per completed range; the small-object path emits one
136 /// `report(chunk.len())` per body chunk read off the wire.
137 pub progress: Option<ProgressSink>,
138}
139
140/// Caller-side preflight for [`ObjectStore::get_bytes_range`].
141///
142/// Centralises the trait contract for degenerate inputs so every
143/// backend impl renders the same wire-line and the same short-circuit:
144///
145/// - `start == end` → `Ok(Some(Bytes::new()))`; the caller returns the
146/// empty body without issuing a network request.
147/// - `start > end` → `Err(RangeNotSatisfiable)`; the caller propagates.
148/// - otherwise → `Ok(None)`; the caller proceeds with the SDK call.
149pub(crate) fn precheck_range(
150 key: &str,
151 range: &std::ops::Range<u64>,
152) -> Result<Option<Bytes>, ObjectStoreError> {
153 if range.start == range.end {
154 return Ok(Some(Bytes::new()));
155 }
156 if range.start > range.end {
157 return Err(ObjectStoreError::RangeNotSatisfiable {
158 key: key.to_owned(),
159 requested: range.clone(),
160 });
161 }
162 Ok(None)
163}
164
165/// Post-flight check for [`ObjectStore::get_bytes_range`] body responses.
166///
167/// Real S3 and Azure backends silently truncate a ranged GET when
168/// `range.start < body.len() <= range.end` — the wire response carries
169/// `range.start..body.len()` bytes with HTTP 206 and no error. The
170/// packchain reader treats the returned slice as the exact entry it
171/// asked for, so a truncated pack file (or a stale `chain.json` whose
172/// recorded offsets outrun the on-bucket pack) would propagate as
173/// downstream pack-decode garbage rather than as the data-integrity
174/// failure it is.
175///
176/// This helper asserts the SDK gave back exactly `range.end - range.start`
177/// bytes and surfaces a mismatch as [`ObjectStoreError::RangeNotSatisfiable`]
178/// with the originally requested range. It is the symmetric companion to
179/// [`precheck_range`] — the preflight guards against degenerate inputs
180/// before the SDK call; this guards against degenerate output after.
181///
182/// `precheck_range` short-circuits empty and inverted ranges, so by the
183/// time this runs `range.start < range.end` is guaranteed. Subtraction
184/// therefore cannot underflow.
185pub(crate) fn verify_range_response_length(
186 key: &str,
187 range: &std::ops::Range<u64>,
188 body: Bytes,
189) -> Result<Bytes, ObjectStoreError> {
190 let expected = range.end - range.start;
191 let actual = body.len() as u64;
192 if actual == expected {
193 return Ok(body);
194 }
195 Err(ObjectStoreError::RangeNotSatisfiable {
196 key: key.to_owned(),
197 requested: range.clone(),
198 })
199}
200
201/// Backend-neutral cloud object-store surface.
202///
203/// Method semantics — every implementation must satisfy these contracts so
204/// higher layers can target the trait without backend-specific branching.
205///
206/// - **`list(prefix)`** — byte-prefix match (matches S3 `Prefix=`
207/// semantics; `list("a")` returns `a`, `a/1`, and `aaa`). Returns full
208/// keys; ordering is backend-defined.
209/// - **`get_to_file(key, dest, opts)`** — caller must ensure `dest`'s
210/// parent directory exists. `opts.progress`, if set, fires at chunk
211/// boundaries so callers (notably the LFS agent) can render a live
212/// progress bar.
213/// - **`put_bytes`** — overwrites if the key already exists.
214/// - **`put_path`** — streams a local file to the key, overwriting if
215/// present. Default reads the file into memory; backends should
216/// override for large-file streaming.
217/// - **`put_if_absent`** — returns `Ok(true)` on creation, `Ok(false)` if
218/// the key already existed. Backends collapse both 412
219/// (`PreconditionFailed`) and 409 (`Conflict`) into `Ok(false)`;
220/// transport-level failures still surface as `Err`.
221/// - **`copy(src, dst)`** — overwrites `dst`; returns `Err(NotFound)` when
222/// `src` is absent.
223/// - **`delete`** — returns `Err(NotFound)` on missing key. `release_lock`
224/// maps `NotFound` to `Ok(())` and propagates other errors.
225/// - **`get_bytes_range`** — half-open `[start, end)` range. `start == end`
226/// returns `Ok(Bytes::new())` with no network call. `start > end` and
227/// server-side 416 both surface as
228/// [`ObjectStoreError::RangeNotSatisfiable`]. When the object's body
229/// ends inside the requested range
230/// (`start < body.len() <= end`) real S3/Azure backends return a
231/// silently truncated body (HTTP 206, fewer bytes than requested);
232/// this trait elevates that mismatch to
233/// [`ObjectStoreError::RangeNotSatisfiable`] so callers never see a
234/// short slice masquerading as the full requested range. The mock
235/// backend matches this contract.
236#[async_trait::async_trait]
237pub trait ObjectStore: Send + Sync {
238 /// Enumerate every object whose key has `prefix` as a byte prefix.
239 async fn list(&self, prefix: &str) -> Result<Vec<ObjectMeta>, ObjectStoreError>;
240
241 /// Stream the object body to `dest`. The destination's parent
242 /// directory must already exist. `opts.progress`, when set, fires
243 /// at chunk boundaries with the count of bytes just received.
244 async fn get_to_file(
245 &self,
246 key: &str,
247 dest: &Path,
248 opts: GetOpts,
249 ) -> Result<(), ObjectStoreError>;
250
251 /// Read the entire object body into memory.
252 async fn get_bytes(&self, key: &str) -> Result<Bytes, ObjectStoreError>;
253
254 /// Read a half-open byte range `[start, end)` of the object body.
255 ///
256 /// `start == end` returns `Ok(Bytes::new())` without issuing a
257 /// network request. `start > end`, or a server-side 416, surfaces
258 /// as [`ObjectStoreError::RangeNotSatisfiable`].
259 ///
260 /// **Truncation contract**: real S3 and Azure backends return a
261 /// silently truncated body (HTTP 206 with fewer bytes than asked)
262 /// when the requested range overruns the object's end —
263 /// `start < body.len() <= end`. Backends here elevate that mismatch
264 /// to [`ObjectStoreError::RangeNotSatisfiable`] so a successful
265 /// `Ok(bytes)` always carries exactly `end - start` bytes. The
266 /// packchain reader (issue #52) relies on this to surface stale
267 /// `chain.json` offsets and truncated pack files as data-integrity
268 /// errors instead of pack-decode garbage.
269 ///
270 /// Used by the packchain engine (issue #52) to read a single blob
271 /// out of a larger pack file without downloading the whole pack.
272 async fn get_bytes_range(
273 &self,
274 key: &str,
275 range: std::ops::Range<u64>,
276 ) -> Result<Bytes, ObjectStoreError>;
277
278 /// Write `body` to `key`, overwriting any existing object.
279 async fn put_bytes(
280 &self,
281 key: &str,
282 body: Bytes,
283 opts: PutOpts,
284 ) -> Result<(), ObjectStoreError>;
285
286 /// Stream a local file to `key`, overwriting any existing object.
287 ///
288 /// Backends should override this to stream from disk without buffering
289 /// the entire file in process memory. The default implementation reads
290 /// the file into memory and delegates to [`put_bytes`](Self::put_bytes);
291 /// this is correct but defeats the streaming intent for large files.
292 async fn put_path(&self, key: &str, src: &Path, opts: PutOpts) -> Result<(), ObjectStoreError> {
293 warn!(
294 key,
295 path = %src.display(),
296 "put_path: falling back to read-then-put_bytes; override this method to avoid \
297 buffering the entire file in memory"
298 );
299 let body = tokio::fs::read(src).await.map_err(other_boxed)?;
300 // `usize` is at most 64 bits wide, so this cast never truncates.
301 let len = body.len() as u64;
302 let progress = opts.progress.clone();
303 // Strip progress from the inner `put_bytes` call so the sink
304 // doesn't fire twice — once during put_bytes' own reporting and
305 // again on our final end-of-transfer event below.
306 let inner_opts = PutOpts {
307 progress: None,
308 ..opts
309 };
310 self.put_bytes(key, Bytes::from(body), inner_opts).await?;
311 // Single-shot fallback emits a final progress event with the
312 // full body size. Zero-byte bodies produce no progress event.
313 if let Some(sink) = progress
314 && len > 0
315 {
316 sink.report(len);
317 }
318 Ok(())
319 }
320
321 /// Create `key` if and only if it does not exist. Returns `Ok(true)`
322 /// when the object was created, `Ok(false)` when the key was already
323 /// present.
324 async fn put_if_absent(&self, key: &str, body: Bytes) -> Result<bool, ObjectStoreError>;
325
326 /// Fetch metadata for an exact key.
327 async fn head(&self, key: &str) -> Result<ObjectMeta, ObjectStoreError>;
328
329 /// Copy `src` to `dst`. The body is preserved on every backend;
330 /// user metadata is **not** guaranteed to survive — callers must not
331 /// rely on metadata round-tripping through `copy`.
332 ///
333 /// The trait's only in-tree consumer is `Doctor::evict_losing_bundle`,
334 /// which carries no user metadata on bundle objects.
335 async fn copy(&self, src: &str, dst: &str) -> Result<(), ObjectStoreError>;
336
337 /// Delete `key`. Returns `Err(ObjectStoreError::NotFound)` if the key was
338 /// not present.
339 async fn delete(&self, key: &str) -> Result<(), ObjectStoreError>;
340
341 /// Build a presigned (short-lived, signed) URL granting GET access
342 /// to `key` for `ttl`. Used by the `bundle-uri` capability
343 /// ([`crate::protocol::bundle_uri`]) to advertise time-limited
344 /// download URLs against private buckets.
345 ///
346 /// The default impl returns
347 /// [`ObjectStoreError::Unsupported`] so backends that have no
348 /// presigning model (e.g. `MockStore` in tests, or
349 /// `AzureStore` configured with a `TokenCredential` rather than
350 /// a shared account key) inherit a clean "not supported" error
351 /// without needing a stub.
352 ///
353 /// # Errors
354 ///
355 /// Returns [`ObjectStoreError::Unsupported`] when the backend
356 /// cannot produce signed URLs (default).
357 /// Returns [`ObjectStoreError::Other`] when the backend supports
358 /// presigning but the SDK rejects the TTL (e.g. AWS's 7-day
359 /// ceiling).
360 async fn presigned_get_url(
361 &self,
362 key: &str,
363 ttl: std::time::Duration,
364 ) -> Result<String, ObjectStoreError> {
365 let _ = (key, ttl);
366 Err(ObjectStoreError::Unsupported(
367 "presigned URLs are not supported by this backend".to_owned(),
368 ))
369 }
370}
371
372/// Blanket impl so `&Arc<T>` coerces to `&dyn ObjectStore` and so
373/// `Arc<T>` is itself usable as an `ObjectStore` value.
374///
375/// `T: ObjectStore + ?Sized` covers both concrete types (`Arc<S3Store>`,
376/// `Arc<MockStore>`) and erased trait objects (`Arc<dyn ObjectStore>`).
377/// Every method forwards to the inner `T` through the `Deref` impl.
378///
379/// Verified non-removable: dropping this impl breaks call sites that
380/// pass `&Arc<dyn ObjectStore>` to a function taking `&dyn ObjectStore`.
381/// Plain `Arc::deref` lets `arc.list()` work via Rust's auto-deref on
382/// method calls, but the *coercion* `&Arc<T>` → `&dyn ObjectStore`
383/// requires `Arc<T>` itself to implement the trait — and that is what
384/// this impl provides.
385#[async_trait::async_trait]
386impl<T: ObjectStore + ?Sized> ObjectStore for Arc<T> {
387 async fn list(&self, prefix: &str) -> Result<Vec<ObjectMeta>, ObjectStoreError> {
388 (**self).list(prefix).await
389 }
390
391 async fn get_to_file(
392 &self,
393 key: &str,
394 dest: &Path,
395 opts: GetOpts,
396 ) -> Result<(), ObjectStoreError> {
397 (**self).get_to_file(key, dest, opts).await
398 }
399
400 async fn get_bytes(&self, key: &str) -> Result<Bytes, ObjectStoreError> {
401 (**self).get_bytes(key).await
402 }
403
404 async fn get_bytes_range(
405 &self,
406 key: &str,
407 range: std::ops::Range<u64>,
408 ) -> Result<Bytes, ObjectStoreError> {
409 (**self).get_bytes_range(key, range).await
410 }
411
412 async fn put_bytes(
413 &self,
414 key: &str,
415 body: Bytes,
416 opts: PutOpts,
417 ) -> Result<(), ObjectStoreError> {
418 (**self).put_bytes(key, body, opts).await
419 }
420
421 async fn put_path(&self, key: &str, src: &Path, opts: PutOpts) -> Result<(), ObjectStoreError> {
422 (**self).put_path(key, src, opts).await
423 }
424
425 async fn put_if_absent(&self, key: &str, body: Bytes) -> Result<bool, ObjectStoreError> {
426 (**self).put_if_absent(key, body).await
427 }
428
429 async fn head(&self, key: &str) -> Result<ObjectMeta, ObjectStoreError> {
430 (**self).head(key).await
431 }
432
433 async fn copy(&self, src: &str, dst: &str) -> Result<(), ObjectStoreError> {
434 (**self).copy(src, dst).await
435 }
436
437 async fn delete(&self, key: &str) -> Result<(), ObjectStoreError> {
438 (**self).delete(key).await
439 }
440
441 async fn presigned_get_url(
442 &self,
443 key: &str,
444 ttl: std::time::Duration,
445 ) -> Result<String, ObjectStoreError> {
446 (**self).presigned_get_url(key, ttl).await
447 }
448}
449
450#[cfg(test)]
451mod tests {
452 use super::*;
453
454 #[test]
455 fn precheck_range_empty_short_circuits_with_empty_bytes() {
456 let out = precheck_range("k", &(5..5)).expect("empty range is valid");
457 let bytes = out.expect("empty range short-circuits with Some");
458 assert!(bytes.is_empty());
459 }
460
461 #[test]
462 fn precheck_range_inverted_returns_range_not_satisfiable() {
463 let range = std::ops::Range { start: 7, end: 3 };
464 let err = precheck_range("k", &range).expect_err("inverted range must error");
465 assert!(matches!(
466 err,
467 ObjectStoreError::RangeNotSatisfiable {
468 ref key,
469 requested: ref r,
470 } if key == "k" && r.start == 7 && r.end == 3
471 ));
472 }
473
474 #[test]
475 fn precheck_range_well_formed_returns_none() {
476 let out = precheck_range("k", &(2..6)).expect("valid range");
477 assert!(out.is_none(), "well-formed range proceeds to SDK call");
478 }
479
480 #[test]
481 fn verify_range_response_length_passes_exact_length() {
482 let range = 2..6;
483 let body = Bytes::from_static(b"abcd"); // 4 bytes, matches range
484 let out = verify_range_response_length("k", &range, body.clone())
485 .expect("exact-length body must pass");
486 assert_eq!(out, body);
487 }
488
489 #[test]
490 fn verify_range_response_length_rejects_truncated_body() {
491 // S3/Azure return start..body.len() when start < body.len() <= end.
492 // Caller asked for 4 bytes; SDK returned 2.
493 let range = 2..6;
494 let body = Bytes::from_static(b"ab");
495 let err = verify_range_response_length("pack-key", &range, body)
496 .expect_err("short body must be rejected");
497 assert!(
498 matches!(
499 err,
500 ObjectStoreError::RangeNotSatisfiable {
501 ref key,
502 requested: ref r,
503 } if key == "pack-key" && r.start == 2 && r.end == 6,
504 ),
505 "expected RangeNotSatisfiable(pack-key, 2..6), got {err:?}"
506 );
507 }
508
509 #[test]
510 fn verify_range_response_length_rejects_overlong_body() {
511 // Defensive: a hypothetical SDK bug that returned MORE bytes than
512 // requested would equally let downstream callers consume corrupt
513 // data. Treat as the same mismatch.
514 let range = 0..4;
515 let body = Bytes::from_static(b"abcdef");
516 let err = verify_range_response_length("k", &range, body)
517 .expect_err("overlong body must be rejected");
518 assert!(matches!(err, ObjectStoreError::RangeNotSatisfiable { .. }));
519 }
520
521 #[test]
522 fn verify_range_response_length_single_byte_round_trip() {
523 // 1-byte range is the smallest non-degenerate case (precheck
524 // already rejected 0-byte and inverted). Locks in that the
525 // single-byte path has no off-by-one.
526 let range = 7..8;
527 let body = Bytes::from_static(b"x");
528 let out = verify_range_response_length("k", &range, body.clone())
529 .expect("single-byte body must pass");
530 assert_eq!(out, body);
531 }
532}