snapdir_stores/gcs_store.rs
1//! `GcsStore`: the `gs://` storage backend, backed by the native
2//! `google-cloud-storage` SDK.
3//!
4//! A [`GcsStore`] targets a `gs://bucket/prefix` location and holds the frozen
5//! content-addressable `.objects`/`.manifests` sharded layout, so a
6//! bucket/prefix is interchangeable across conforming implementations:
7//!
8//! ```text
9//! gs://<bucket>/<prefix>/.objects/<sharded checksum> raw object bytes
10//! gs://<bucket>/<prefix>/.manifests/<sharded snapshot id> manifest text
11//! ```
12//!
13//! Sharding and the relative keys come straight from [`snapdir_core::store`]
14//! ([`object_path`] / [`manifest_path`]); this module never reimplements them.
15//!
16//! # `gs://` parsing
17//!
18//! The oracle derives the bucket as `cut -d'/' -f3` of the URL
19//! (`_snapdir_export_store_vars`) and the prefix with
20//! `sed -E 's|^gs:/*[^/]*/?||'` then a trailing-slash strip
21//! (`_snapdir_gcs_store_get_remote_prefix`). [`GcsLocation::parse`] reproduces
22//! that exactly: bucket = first segment after the scheme, prefix = the
23//! remainder with leading/trailing slashes removed.
24//!
25//! # Credentials
26//!
27//! Authentication is delegated entirely to the SDK's own credential chain
28//! (Application Default Credentials): `GOOGLE_APPLICATION_CREDENTIALS`,
29//! `GOOGLE_APPLICATION_CREDENTIALS_JSON`, `gcloud` user creds, and the GCE/GKE
30//! metadata server. No bespoke snapdir credential variables are introduced.
31//!
32//! # TLS provider (project-load-bearing)
33//!
34//! The shipped binary must statically link on musl, so the workspace
35//! standardizes on the **`ring`** rustls provider; `aws-lc-rs` is banned. The
36//! `google-cloud-storage` default features pull `aws-lc-rs` in via
37//! `google-cloud-auth` (both its id-token backend and its rustls provider), so
38//! we depend on the crate with `default-features = false` and instead install
39//! the **ring** [`CryptoProvider`](rustls_ring::crypto::CryptoProvider) as the
40//! rustls *process default* (the SDK's `reqwest` is built with
41//! `rustls-no-provider`, i.e. it consumes that process default). See the crate
42//! `Cargo.toml` for the full rationale.
43//!
44//! # Sync trait, async SDK
45//!
46//! The SDK is async. [`GcsStore`] owns a private multi-thread `tokio` runtime
47//! and bridges each [`Store`] method with `runtime.block_on(...)`, exactly like
48//! [`S3Store`](crate::S3Store), so no `async` leaks into `snapdir-core`.
49
50use std::path::Path;
51use std::sync::Arc;
52
53use google_cloud_gax::error::rpc::Code;
54use google_cloud_gax::error::Error as GcsError;
55use google_cloud_gax::retry_policy::NeverRetry;
56use google_cloud_storage::client::{Storage, StorageControl};
57use snapdir_core::manifest::Manifest;
58use snapdir_core::merkle::{Blake3Hasher, Hasher};
59use snapdir_core::store::{manifest_path, object_path, Store, StoreError};
60use snapdir_core::Meter;
61
62use crate::fetch::fetch_files_concurrent;
63use crate::push::{push_objects_concurrent, upload_object};
64use crate::retry::{parse_retry_after, retry_network, Attempt, DefaultJitter, TokioSleeper};
65use crate::stream::StreamStore;
66use crate::transfer::{classify_error, RateLimiter, TransferConfig};
67use tokio::runtime::Runtime;
68
69/// Number of times a fetch is retried when the downloaded bytes fail their
70/// checksum, mirroring the oracle's `_SNAPDIR_GCS_STORE_RETRIES` default of 5.
71const MAX_FETCH_RETRIES: u32 = 5;
72
73/// The parsed location a [`GcsStore`] targets: a GCS bucket plus a key prefix.
74#[derive(Debug, Clone, PartialEq, Eq)]
75pub struct GcsLocation {
76 /// The bucket name (first path segment of the `gs://` URL).
77 pub bucket: String,
78 /// The object-name prefix (remaining segments), with no leading or trailing
79 /// slash. Empty when the store points at the bucket root.
80 pub prefix: String,
81}
82
83impl GcsLocation {
84 /// Parses a `gs://bucket/prefix` URL into its bucket and prefix.
85 ///
86 /// Matches the oracle exactly: the bucket is `cut -d'/' -f3` of the store
87 /// URL (the segment immediately after `gs://`), and the prefix is
88 /// `sed -E 's|^gs:/*[^/]*/?||'` with a trailing slash stripped
89 /// (`_snapdir_gcs_store_get_remote_prefix`).
90 ///
91 /// The `gs://` scheme is optional; a bare `bucket/prefix` is accepted too.
92 #[must_use]
93 pub fn parse(store_url: &str) -> Self {
94 // Drop the scheme (`gs://`, or any `<proto>://`) if present. The oracle
95 // splits the full URL on `/` and takes field 3 as the bucket, which for
96 // `gs://bucket/...` is exactly the segment after the `//`.
97 let without_scheme = match store_url.find("://") {
98 Some(idx) => &store_url[idx + 3..],
99 None => store_url,
100 };
101 let mut parts = without_scheme.splitn(2, '/');
102 let bucket = parts.next().unwrap_or("").to_owned();
103 let prefix = parts.next().unwrap_or("");
104 // Strip a leading slash (empty-first-segment edge case) and the trailing
105 // slash; the prefix is joined back with a single `/`.
106 let prefix = prefix
107 .trim_end_matches('/')
108 .trim_start_matches('/')
109 .to_owned();
110 Self { bucket, prefix }
111 }
112
113 /// The GCS resource name for this bucket, `projects/_/buckets/<bucket>`, as
114 /// required by the `google-cloud-storage` v1 API.
115 #[must_use]
116 pub fn bucket_resource(&self) -> String {
117 format!("projects/_/buckets/{}", self.bucket)
118 }
119
120 /// Returns the full GCS object name for a content object given its checksum,
121 /// i.e. `<prefix>/.objects/<sharded>` (no leading slash).
122 #[must_use]
123 pub fn object_key(&self, checksum: &str) -> String {
124 self.key_for(&object_path(checksum))
125 }
126
127 /// Returns the full GCS object name for a manifest given its snapshot id,
128 /// i.e. `<prefix>/.manifests/<sharded>` (no leading slash).
129 #[must_use]
130 pub fn manifest_key(&self, id: &str) -> String {
131 self.key_for(&manifest_path(id))
132 }
133
134 /// Joins the store prefix with a store-relative path (`.objects/...` or
135 /// `.manifests/...`), producing a leading-slash-free object name. Mirrors
136 /// the oracle's `${_SNAPDIR_STORE_BASE_DIR%/}/${source_path#/}` with the
137 /// leading slash trimmed.
138 fn key_for(&self, rel: &str) -> String {
139 let rel = rel.trim_start_matches('/');
140 if self.prefix.is_empty() {
141 rel.to_owned()
142 } else {
143 format!("{}/{rel}", self.prefix)
144 }
145 }
146}
147
148/// A content-addressable store backed by a Google Cloud Storage bucket.
149///
150/// Construct one with [`GcsStore::connect`] (resolves Application Default
151/// Credentials via the SDK).
152pub struct GcsStore {
153 /// Data-plane client (object read/write).
154 storage: Storage,
155 /// Control-plane client (object metadata, used for HEAD-like existence
156 /// checks without downloading the body).
157 control: StorageControl,
158 location: GcsLocation,
159 runtime: Arc<Runtime>,
160 config: TransferConfig,
161 /// Per-call request-rate limiter (one token per SDK call), built from
162 /// [`TransferConfig::max_requests_per_sec`]. Unlimited (a no-op) by default.
163 /// Shared (clone) so every call paces against one aggregate request budget.
164 req_limiter: RateLimiter,
165 /// Optional progress meter; recorded into during transfers. `None` (the
166 /// default from every constructor) means zero recording and byte-identical
167 /// behavior. Set by the CLI via [`GcsStore::with_meter`].
168 meter: Option<Arc<Meter>>,
169}
170
171impl GcsStore {
172 /// Connects to the `gs://bucket/prefix` store, resolving credentials via the
173 /// SDK's Application Default Credentials chain.
174 ///
175 /// The `ring` rustls crypto provider is installed as the process default
176 /// (idempotent) before the clients are built, keeping `aws-lc-rs` out of the
177 /// graph (see the module docs).
178 ///
179 /// # Errors
180 ///
181 /// [`StoreError::Backend`] if the tokio runtime cannot be created or the SDK
182 /// clients cannot be built (e.g. credentials cannot be resolved).
183 pub fn connect(store_url: &str) -> Result<Self, StoreError> {
184 Self::connect_with(store_url, TransferConfig::default())
185 }
186
187 /// Like [`connect`](Self::connect), but carries a [`TransferConfig`] for
188 /// concurrency / bandwidth control. The existing [`connect`](Self::connect)
189 /// delegates here with [`TransferConfig::default`].
190 ///
191 /// # Errors
192 ///
193 /// [`StoreError::Backend`] if the tokio runtime cannot be created or the SDK
194 /// clients cannot be built (e.g. credentials cannot be resolved).
195 pub fn connect_with(store_url: &str, config: TransferConfig) -> Result<Self, StoreError> {
196 let location = GcsLocation::parse(store_url);
197 let runtime = build_runtime()?;
198 install_ring_provider();
199
200 let (storage, control) = runtime.block_on(async {
201 // Disable each SDK client's built-in retry loop (`NeverRetry`) so
202 // snapdir's RetryPolicy is the single backoff authority — no
203 // SDK-retries × ours compounding.
204 let storage = Storage::builder()
205 .with_retry_policy(NeverRetry)
206 .build()
207 .await
208 .map_err(|e| backend("building GCS Storage client", e))?;
209 let control = StorageControl::builder()
210 .with_retry_policy(NeverRetry)
211 .build()
212 .await
213 .map_err(|e| backend("building GCS StorageControl client", e))?;
214 Ok::<_, StoreError>((storage, control))
215 })?;
216
217 let req_limiter = RateLimiter::new(config.max_requests_per_sec);
218 Ok(Self {
219 storage,
220 control,
221 location,
222 runtime: Arc::new(runtime),
223 config,
224 req_limiter,
225 meter: None,
226 })
227 }
228
229 /// Builds a store from already-configured SDK clients and a parsed location,
230 /// owning a fresh tokio runtime for the sync bridge. Intended for tests.
231 ///
232 /// # Errors
233 ///
234 /// [`StoreError::Backend`] if the tokio runtime cannot be created.
235 pub fn from_clients(
236 storage: Storage,
237 control: StorageControl,
238 location: GcsLocation,
239 ) -> Result<Self, StoreError> {
240 let config = TransferConfig::default();
241 let req_limiter = RateLimiter::new(config.max_requests_per_sec);
242 Ok(Self {
243 storage,
244 control,
245 location,
246 runtime: Arc::new(build_runtime()?),
247 config,
248 req_limiter,
249 meter: None,
250 })
251 }
252
253 /// Attaches (or clears) an optional progress [`Meter`], rides alongside
254 /// [`config`](Self::transfer_config). The transfer paths record bytes-in /
255 /// bytes-out + per-object progress into it; `None` (the constructor default)
256 /// means zero recording and byte-identical behavior. The CLI sets this after
257 /// construction.
258 #[must_use]
259 pub fn with_meter(mut self, meter: Option<Arc<Meter>>) -> Self {
260 self.meter = meter;
261 self
262 }
263
264 /// The parsed bucket/prefix this store targets.
265 #[must_use]
266 pub fn location(&self) -> &GcsLocation {
267 &self.location
268 }
269
270 /// The [`TransferConfig`] (concurrency / bandwidth) this store was built
271 /// with. Consumed by the transfer loops in later gates.
272 #[must_use]
273 pub fn transfer_config(&self) -> &TransferConfig {
274 &self.config
275 }
276
277 /// Metadata HEAD on an object key; `Ok(true)` if it exists, `Ok(false)` if
278 /// absent (see [`is_not_found`] for what counts as absent).
279 ///
280 /// The single SDK call is wrapped in [`retry_network`]: it acquires one
281 /// request-rate token, then retries a TRANSIENT failure under the store's
282 /// [`RetryPolicy`](crate::retry::RetryPolicy). A not-found is a normal
283 /// `Ok(false)` outcome (not a retry); other errors are classified via
284 /// [`gcs_attempt_from_err`].
285 async fn key_exists(&self, key: &str) -> Result<bool, StoreError> {
286 retry_network(
287 &self.config.retry,
288 &self.req_limiter,
289 &TokioSleeper,
290 &DefaultJitter::new(),
291 || async {
292 match self
293 .control
294 .get_object()
295 .set_bucket(self.location.bucket_resource())
296 .set_object(key)
297 .send()
298 .await
299 {
300 Ok(_) => Ok(true),
301 Err(err) => {
302 if is_not_found(&err) {
303 return Ok(false);
304 }
305 Err(gcs_attempt_from_err("GCS get_object metadata failed", err))
306 }
307 }
308 },
309 )
310 .await
311 }
312
313 /// GET an object key's full body, draining the read stream, or `None` if it
314 /// is absent (see [`is_not_found`] for what counts as absent).
315 ///
316 /// Wrapped in [`retry_network`] like [`key_exists`](Self::key_exists): a
317 /// not-found is a normal `Ok(None)`; transient failures retry under the
318 /// store's [`RetryPolicy`](crate::retry::RetryPolicy).
319 async fn get_bytes(&self, key: &str) -> Result<Option<Vec<u8>>, StoreError> {
320 retry_network(
321 &self.config.retry,
322 &self.req_limiter,
323 &TokioSleeper,
324 &DefaultJitter::new(),
325 || async {
326 let mut resp = match self
327 .storage
328 .read_object(self.location.bucket_resource(), key)
329 .send()
330 .await
331 {
332 Ok(resp) => resp,
333 Err(err) => {
334 if is_not_found(&err) {
335 return Ok(None);
336 }
337 return Err(gcs_attempt_from_err("GCS read_object failed", err));
338 }
339 };
340
341 let mut buf = Vec::new();
342 while let Some(chunk) = resp.next().await {
343 // A mid-stream read failure can be transient (connection
344 // reset); classify it the same way as the initial call.
345 let chunk =
346 chunk.map_err(|e| gcs_attempt_from_err("reading GCS object body", e))?;
347 buf.extend_from_slice(&chunk);
348 }
349 Ok(Some(buf))
350 },
351 )
352 .await
353 }
354
355 /// PUT `bytes` at `key`. GCS object writes are atomic (the new object is only
356 /// visible once fully uploaded), so no temp-key dance is needed — matching
357 /// the oracle's reliance on `gcloud storage cp` atomicity.
358 ///
359 /// Wrapped in [`retry_network`]: each (re)try re-sends the full body, so the
360 /// content-addressed bytes that land are unchanged — only transient failures
361 /// retry under the store's [`RetryPolicy`](crate::retry::RetryPolicy).
362 async fn put_bytes(&self, key: &str, bytes: Vec<u8>) -> Result<(), StoreError> {
363 retry_network(
364 &self.config.retry,
365 &self.req_limiter,
366 &TokioSleeper,
367 &DefaultJitter::new(),
368 || {
369 let bytes = bytes.clone();
370 async move {
371 // The SDK's upload future is large (>30 KiB); box it so it
372 // does not bloat the enclosing future (clippy::large_futures).
373 let upload = self
374 .storage
375 .write_object(
376 self.location.bucket_resource(),
377 key,
378 bytes::Bytes::from(bytes),
379 )
380 .send_buffered();
381 Box::pin(upload)
382 .await
383 .map(|_| ())
384 .map_err(|e| gcs_attempt_from_err("GCS write_object failed", e))
385 }
386 },
387 )
388 .await
389 }
390
391 /// Downloads `key`, verifying its BLAKE3 against `expected`, retrying up to
392 /// [`MAX_FETCH_RETRIES`] times. Mirrors `_snapdir_gcs_fetch_to_cache`.
393 async fn fetch_verified(&self, key: &str, expected: &str) -> Result<Vec<u8>, StoreError> {
394 let hasher = Blake3Hasher::new();
395 let mut attempts_left = MAX_FETCH_RETRIES;
396 loop {
397 match self.get_bytes(key).await? {
398 Some(bytes) => {
399 let actual = hasher.hash_hex(&bytes);
400 if actual == expected {
401 return Ok(bytes);
402 }
403 // Mismatched checksum after fetching: retry (the oracle
404 // decrements its retry budget on the same condition).
405 attempts_left = attempts_left.saturating_sub(1);
406 if attempts_left == 0 {
407 return Err(StoreError::Integrity {
408 address: format!("gs://{}/{key}", self.location.bucket),
409 expected: expected.to_owned(),
410 actual,
411 });
412 }
413 }
414 None => {
415 // Treat a missing key as not-found rather than spinning.
416 return Err(StoreError::ObjectNotFound {
417 checksum: expected.to_owned(),
418 });
419 }
420 }
421 }
422 }
423}
424
425impl Store for GcsStore {
426 fn get_manifest(&self, id: &str) -> Result<Manifest, StoreError> {
427 let key = self.location.manifest_key(id);
428 let bytes = self.runtime.block_on(async {
429 match self.get_bytes(&key).await? {
430 Some(b) => Ok(b),
431 None => Err(StoreError::ManifestNotFound { id: id.to_owned() }),
432 }
433 })?;
434
435 let text = String::from_utf8(bytes).map_err(|err| StoreError::Backend {
436 message: format!("manifest {id} is not valid UTF-8"),
437 source: Some(Box::new(err)),
438 })?;
439 let manifest = Manifest::parse(&text)?;
440
441 // Verify the stored manifest hashes back to its snapshot id before
442 // trusting it (oracle: the id check on fetch).
443 let actual = snapdir_core::merkle::snapshot_id(&manifest, &Blake3Hasher::new());
444 if actual != id {
445 return Err(StoreError::Integrity {
446 address: self.location.manifest_key(id),
447 expected: id.to_owned(),
448 actual,
449 });
450 }
451 Ok(manifest)
452 }
453
454 fn fetch_files(&self, manifest: &Manifest, dest: &Path) -> Result<(), StoreError> {
455 // Concurrent download via the shared orchestrator: it owns the
456 // skip-if-present-and-verified short-circuit, directory creation, the
457 // bounded-concurrency pass, the per-object rate limit, and the atomic
458 // write. GCS only injects the per-object download, preserving the
459 // BLAKE3-verify + retry discipline of `fetch_verified`.
460 let limiter = RateLimiter::new(self.config.max_bytes_per_sec);
461 let meter = self.meter.as_deref();
462 let meter_arc = self.meter.clone();
463 self.runtime.block_on(async {
464 fetch_files_concurrent(
465 manifest,
466 dest,
467 &self.config,
468 &limiter,
469 meter,
470 meter_arc,
471 |entry| async {
472 let key = self.location.object_key(&entry.checksum);
473 self.fetch_verified(&key, &entry.checksum).await
474 },
475 )
476 .await
477 })
478 }
479
480 fn push(&self, manifest: &Manifest, source: &Path) -> Result<(), StoreError> {
481 let hasher = Blake3Hasher::new();
482 let id = snapdir_core::merkle::snapshot_id(manifest, &hasher);
483
484 // Concurrent upload via the shared orchestrator: it owns the bounded
485 // per-object pass and the manifest-last / all-or-nothing ordering. GCS
486 // injects the per-object skip-present + upload (via `upload_object`,
487 // which also owns the shared read+verify) and the manifest-write
488 // closure. A failed push writes NO manifest.
489 let limiter = RateLimiter::new(self.config.max_bytes_per_sec);
490 let meter = self.meter.as_deref();
491 let meter_arc = self.meter.clone();
492 self.runtime.block_on(async {
493 // Skip-if-manifest-present pre-check: a present manifest implies all
494 // its objects are present (we always write the manifest last).
495 let manifest_key = self.location.manifest_key(&id);
496 if self.key_exists(&manifest_key).await? {
497 return Ok(());
498 }
499
500 push_objects_concurrent(
501 manifest,
502 &self.config,
503 &limiter,
504 meter,
505 meter_arc,
506 |entry| {
507 let object_key = self.location.object_key(&entry.checksum);
508 upload_object(
509 entry,
510 object_key,
511 source,
512 &limiter,
513 meter,
514 |key| async move { self.key_exists(&key).await },
515 |key, bytes| async move { self.put_bytes(&key, bytes).await },
516 )
517 },
518 || async {
519 // Write the manifest last (verified to hash back to its id),
520 // exactly as the oracle stores the manifest text.
521 let mut text = manifest.to_string();
522 text.push('\n');
523 let manifest_actual = hasher.hash_hex(text.as_bytes());
524 if manifest_actual != id {
525 return Err(StoreError::Integrity {
526 address: manifest_key.clone(),
527 expected: id.clone(),
528 actual: manifest_actual,
529 });
530 }
531 self.put_bytes(&manifest_key, text.into_bytes()).await
532 },
533 )
534 .await
535 })
536 }
537}
538
539impl StreamStore for GcsStore {
540 fn has_object(&self, checksum: &str) -> Result<bool, StoreError> {
541 let key = self.location.object_key(checksum);
542 self.runtime.block_on(async { self.key_exists(&key).await })
543 }
544
545 fn get_object(&self, checksum: &str) -> Result<Vec<u8>, StoreError> {
546 let key = self.location.object_key(checksum);
547 let bytes = self.runtime.block_on(async {
548 self.get_bytes(&key)
549 .await?
550 .ok_or_else(|| StoreError::ObjectNotFound {
551 checksum: checksum.to_owned(),
552 })
553 })?;
554
555 // Verify the downloaded blob hashes back to its content-address before
556 // returning it (corruption surfaces as `Integrity`, never bad bytes).
557 let actual = Blake3Hasher::new().hash_hex(&bytes);
558 if actual != checksum {
559 return Err(StoreError::Integrity {
560 address: format!("gs://{}/{key}", self.location.bucket),
561 expected: checksum.to_owned(),
562 actual,
563 });
564 }
565 Ok(bytes)
566 }
567
568 fn put_object(&self, checksum: &str, bytes: Vec<u8>) -> Result<(), StoreError> {
569 // Verify BEFORE uploading: a blob whose bytes do not hash to `checksum`
570 // must never land at that content-address (nothing is stored).
571 let actual = Blake3Hasher::new().hash_hex(&bytes);
572 if actual != checksum {
573 return Err(StoreError::Integrity {
574 address: self.location.object_key(checksum),
575 expected: checksum.to_owned(),
576 actual,
577 });
578 }
579 let key = self.location.object_key(checksum);
580 self.runtime
581 .block_on(async { self.put_bytes(&key, bytes).await })
582 }
583
584 fn put_manifest(&self, id: &str, manifest: &Manifest) -> Result<(), StoreError> {
585 let key = self.location.manifest_key(id);
586 // Mirror the manifest-write tail of `push`: render the oracle's
587 // `echo "${manifest}"` bytes, verify they hash back to `id`, then PUT.
588 let mut text = manifest.to_string();
589 text.push('\n');
590 let actual = Blake3Hasher::new().hash_hex(text.as_bytes());
591 if actual != id {
592 return Err(StoreError::Integrity {
593 address: key,
594 expected: id.to_owned(),
595 actual,
596 });
597 }
598 self.runtime
599 .block_on(async { self.put_bytes(&key, text.into_bytes()).await })
600 }
601}
602
603/// Builds the multi-thread tokio runtime that backs the sync bridge.
604fn build_runtime() -> Result<Runtime, StoreError> {
605 tokio::runtime::Builder::new_multi_thread()
606 .enable_all()
607 .build()
608 .map_err(|e| backend("creating tokio runtime for GcsStore", e))
609}
610
611/// Installs the **`ring`** rustls [`CryptoProvider`] as the process default if
612/// none is set yet. Idempotent: a second call (or a default installed by another
613/// store) is a harmless no-op. This is the load-bearing piece that keeps
614/// `aws-lc-rs` out of the dependency graph — the SDK's `reqwest`
615/// (`rustls-no-provider`) consumes whatever process-default provider is set.
616fn install_ring_provider() {
617 // Ignore the error: it only means a provider was already installed, which is
618 // exactly the state we want.
619 let _ = rustls_ring::crypto::ring::default_provider().install_default();
620}
621
622/// Wraps any backend error into [`StoreError::Backend`] with a message.
623fn backend<E>(message: &str, source: E) -> StoreError
624where
625 E: std::error::Error + Send + Sync + 'static,
626{
627 StoreError::Backend {
628 message: message.to_owned(),
629 source: Some(Box::new(source)),
630 }
631}
632
633/// Builds a retry [`Attempt`] from a concrete `google-cloud-storage`
634/// [`GcsError`], at the boundary where the SDK error still carries its HTTP
635/// status / headers / gRPC status.
636///
637/// - `transient`: true for the retryable signal set — an HTTP `429`/`503`
638/// status, the gRPC `RESOURCE_EXHAUSTED` / `UNAVAILABLE` / `DEADLINE_EXCEEDED`
639/// / `ABORTED` / `INTERNAL` codes, and the transport/timeout classes folded
640/// through the shared [`classify_error`](crate::classify_error) on the mapped
641/// [`StoreError`]. Conservative: unknown / `NOT_FOUND` / `PERMISSION_DENIED`
642/// are NOT transient.
643/// - `retry_after`: extracted from the error's HTTP `Retry-After` header (the
644/// delta-seconds form) when present, else `None`.
645/// - `err`: the mapped [`StoreError::Backend`] (the same value the non-retry
646/// path used to surface).
647fn gcs_attempt_from_err(message: &str, err: GcsError) -> Attempt {
648 let http_status = err.http_status_code();
649 let retry_after = err
650 .http_headers()
651 .and_then(|h| h.get("retry-after"))
652 .and_then(|v| v.to_str().ok())
653 .and_then(parse_retry_after);
654 let code = err.status().map(|s| s.code);
655
656 let store_err = backend(message, err);
657
658 let transient = http_status.is_some_and(|s| s == 429 || s == 503)
659 || matches!(
660 code,
661 Some(
662 Code::ResourceExhausted
663 | Code::Unavailable
664 | Code::DeadlineExceeded
665 | Code::Aborted
666 | Code::Internal
667 )
668 )
669 || matches!(
670 classify_error(&store_err),
671 crate::adaptive::OpResult::Throttle
672 );
673
674 Attempt {
675 transient,
676 retry_after,
677 err: store_err,
678 }
679}
680
681/// Classifies a `google-cloud-storage` SDK error as "object is absent".
682///
683/// The SDK reports a missing object two different ways, and the absent-object
684/// paths (`key_exists` -> `Ok(false)`, `get_bytes` -> `Ok(None)`) must treat
685/// BOTH as not-found:
686///
687/// 1. A plain **HTTP 404** (`http_status_code() == Some(404)`), e.g. from a
688/// proxy/load balancer ahead of the service.
689/// 2. A **service-level gRPC-style error** whose `status().code` is
690/// [`Code::NotFound`] but whose `http_status_code()` is `None`. This is what
691/// the v1.x SDK actually returns for `get_object`/`read_object` on a missing
692/// object, and the form the original `== Some(404)`-only check misclassified
693/// as a fatal backend error (it aborted `push` before the first upload).
694///
695/// This mirrors the SDK's own internal classification
696/// (`e.status().is_some_and(|s| s.code == Code::NotFound)`), the GCS analogue of
697/// the aws-sdk's `is_not_found()` used by [`S3Store`](crate::S3Store).
698fn is_not_found(err: &GcsError) -> bool {
699 err.http_status_code() == Some(404)
700 || err
701 .status()
702 .is_some_and(|status| status.code == Code::NotFound)
703}
704
705#[cfg(test)]
706mod tests {
707 use super::*;
708 use snapdir_core::manifest::PathType;
709 use std::time::Duration;
710
711 #[test]
712 fn backoff_wire_gcs_extract_503_retry_after_is_transient_with_hint() {
713 // A 503 carrying `Retry-After: 9` => transient, hint = 9s.
714 let mut headers = http::HeaderMap::new();
715 headers.insert("retry-after", http::HeaderValue::from_static("9"));
716 let err = GcsError::http(503, headers, bytes::Bytes::new());
717 let attempt = gcs_attempt_from_err("GCS op failed", err);
718 assert!(attempt.transient, "HTTP 503 must be transient");
719 assert_eq!(
720 attempt.retry_after,
721 Some(Duration::from_secs(9)),
722 "the Retry-After delta-seconds header must be extracted"
723 );
724 }
725
726 #[test]
727 fn backoff_wire_gcs_extract_resource_exhausted_is_transient_no_hint() {
728 // A service-level RESOURCE_EXHAUSTED (no HTTP status / header) =>
729 // transient with no hint.
730 use google_cloud_gax::error::rpc::Status;
731 let status = Status::default()
732 .set_code(Code::ResourceExhausted)
733 .set_message("quota exceeded");
734 let err = GcsError::service(status);
735 let attempt = gcs_attempt_from_err("GCS op failed", err);
736 assert!(
737 attempt.transient,
738 "RESOURCE_EXHAUSTED must be classified transient"
739 );
740 assert_eq!(attempt.retry_after, None, "no Retry-After header => None");
741 }
742
743 #[test]
744 fn backoff_wire_gcs_extract_not_found_is_not_transient() {
745 use google_cloud_gax::error::rpc::Status;
746
747 // A 404 / NOT_FOUND hard error => NOT transient (conservative).
748 let err = GcsError::http(404, http::HeaderMap::new(), bytes::Bytes::new());
749 let attempt = gcs_attempt_from_err("GCS op failed", err);
750 assert!(
751 !attempt.transient,
752 "a 404/not-found must never be classified transient"
753 );
754
755 let denied = GcsError::service(Status::default().set_code(Code::PermissionDenied));
756 let attempt = gcs_attempt_from_err("GCS op failed", denied);
757 assert!(
758 !attempt.transient,
759 "PERMISSION_DENIED must never be classified transient"
760 );
761 }
762
763 /// Strips a leading `./` and a trailing `/` from a manifest path. Kept as a
764 /// test-only assertion of the path normalization the orchestrator
765 /// (`crate::push`) performs.
766 fn strip_leading_dot_slash(path: &str) -> &str {
767 let trimmed = path.strip_prefix("./").unwrap_or(path);
768 trimmed.strip_suffix('/').unwrap_or(trimmed)
769 }
770
771 // The canonical content-addressable fixtures (shared across the s3/gcs
772 // store test suites).
773 const FOO_CHECKSUM: &str = "49dc870df1de7fd60794cebce449f5ccdae575affaa67a24b62acb03e039db92";
774 const FOO_SHARDED: &str = "49d/c87/0df/1de7fd60794cebce449f5ccdae575affaa67a24b62acb03e039db92";
775 const MANIFEST_ID: &str = "aa91e498f401ea9e6ddbaa1138a0dbeb030fab8defc1252d80c77ebefafbc70d";
776 const MANIFEST_SHARDED: &str =
777 "aa9/1e4/98f/401ea9e6ddbaa1138a0dbeb030fab8defc1252d80c77ebefafbc70d";
778
779 #[test]
780 fn gcs_store_parses_bucket_and_prefix() {
781 let loc = GcsLocation::parse("gs://my-bucket/long/term/storage");
782 assert_eq!(loc.bucket, "my-bucket");
783 assert_eq!(loc.prefix, "long/term/storage");
784 }
785
786 #[test]
787 fn gcs_store_parse_matches_oracle_cut_and_sed() {
788 // Oracle: bucket = `cut -d'/' -f3`; prefix = `sed -E 's|^gs:/*[^/]*/?||'`
789 // then trailing-slash strip. For "gs://bucket/a/b/c": bucket=bucket,
790 // prefix=a/b/c.
791 let loc = GcsLocation::parse("gs://bucket/a/b/c");
792 assert_eq!(loc.bucket, "bucket");
793 assert_eq!(loc.prefix, "a/b/c");
794 }
795
796 #[test]
797 fn gcs_store_parse_strips_trailing_slash() {
798 // `_snapdir_gcs_store_get_remote_prefix` strips the trailing slash.
799 let loc = GcsLocation::parse("gs://bucket/prefix/");
800 assert_eq!(loc.bucket, "bucket");
801 assert_eq!(loc.prefix, "prefix");
802 }
803
804 #[test]
805 fn gcs_store_parse_bucket_root_has_empty_prefix() {
806 let loc = GcsLocation::parse("gs://bucket");
807 assert_eq!(loc.bucket, "bucket");
808 assert_eq!(loc.prefix, "");
809
810 let loc_slash = GcsLocation::parse("gs://bucket/");
811 assert_eq!(loc_slash.bucket, "bucket");
812 assert_eq!(loc_slash.prefix, "");
813 }
814
815 #[test]
816 fn gcs_store_parse_accepts_bare_bucket_prefix_without_scheme() {
817 let loc = GcsLocation::parse("bucket/some/prefix");
818 assert_eq!(loc.bucket, "bucket");
819 assert_eq!(loc.prefix, "some/prefix");
820 }
821
822 #[test]
823 fn gcs_store_bucket_resource_uses_projects_underscore_form() {
824 let loc = GcsLocation::parse("gs://my-bucket/x");
825 assert_eq!(loc.bucket_resource(), "projects/_/buckets/my-bucket");
826 }
827
828 #[test]
829 fn gcs_store_object_key_matches_sharded_scheme() {
830 let loc = GcsLocation::parse("gs://b/long/term/storage");
831 assert_eq!(
832 loc.object_key(FOO_CHECKSUM),
833 format!("long/term/storage/.objects/{FOO_SHARDED}")
834 );
835 }
836
837 #[test]
838 fn gcs_store_manifest_key_matches_sharded_scheme() {
839 let loc = GcsLocation::parse("gs://b/long/term/storage");
840 assert_eq!(
841 loc.manifest_key(MANIFEST_ID),
842 format!("long/term/storage/.manifests/{MANIFEST_SHARDED}")
843 );
844 }
845
846 #[test]
847 fn gcs_store_keys_have_no_leading_slash_at_bucket_root() {
848 // With an empty prefix the keys are just `.objects/...` / `.manifests/...`.
849 let loc = GcsLocation::parse("gs://bucket");
850 assert_eq!(
851 loc.object_key(FOO_CHECKSUM),
852 format!(".objects/{FOO_SHARDED}")
853 );
854 assert_eq!(
855 loc.manifest_key(MANIFEST_ID),
856 format!(".manifests/{MANIFEST_SHARDED}")
857 );
858 }
859
860 #[test]
861 fn gcs_store_object_key_uses_core_object_path() {
862 // Cross-check that we delegate to the frozen core sharding helper rather
863 // than reimplementing it: at the bucket root the key equals the core
864 // `object_path` output verbatim.
865 let loc = GcsLocation::parse("gs://b");
866 assert_eq!(loc.object_key(FOO_CHECKSUM), object_path(FOO_CHECKSUM));
867 assert_eq!(loc.manifest_key(MANIFEST_ID), manifest_path(MANIFEST_ID));
868 }
869
870 #[test]
871 fn gcs_store_strip_leading_dot_slash() {
872 assert_eq!(strip_leading_dot_slash("./foo"), "foo");
873 assert_eq!(strip_leading_dot_slash("./a/b/c"), "a/b/c");
874 assert_eq!(strip_leading_dot_slash("./a/"), "a");
875 assert_eq!(strip_leading_dot_slash("./"), "");
876 }
877
878 #[test]
879 fn gcs_store_is_not_found_classifies_service_level_not_found_as_absent() {
880 // Regression guard for the push-abort bug: the v1.x SDK reports a
881 // missing object as a *service-level* gRPC error with code NOT_FOUND and
882 // NO HTTP status code (`http_status_code() == None`). The original
883 // `== Some(404)`-only check misclassified this as a fatal backend error,
884 // so `key_exists` errored and `push` aborted before uploading anything.
885 use google_cloud_gax::error::rpc::Status;
886
887 let status = Status::default()
888 .set_code(Code::NotFound)
889 .set_message("No such object: bucket/.manifests/...");
890 let err = GcsError::service(status);
891 // This is the load-bearing assertion: the real-world shape carries no
892 // HTTP code, so a 404-only check would (and did) miss it.
893 assert_eq!(err.http_status_code(), None);
894 assert!(
895 is_not_found(&err),
896 "service-level NOT_FOUND must be classified as object-absent"
897 );
898 }
899
900 #[test]
901 fn gcs_store_is_not_found_classifies_http_404_as_absent() {
902 // The other absent shape: a plain HTTP 404 (e.g. from a proxy/LB ahead
903 // of the service). Must also count as not-found.
904 let err = GcsError::http(404, http::HeaderMap::new(), bytes::Bytes::new());
905 assert!(is_not_found(&err), "HTTP 404 must be classified as absent");
906 }
907
908 #[test]
909 fn gcs_store_is_not_found_does_not_swallow_other_errors() {
910 // Guard the inverse: a non-not-found service error (e.g. PERMISSION
911 // DENIED) and a non-404 HTTP error must NOT be treated as absent, so
912 // real failures still surface instead of being silently skipped.
913 use google_cloud_gax::error::rpc::Status;
914
915 let denied = GcsError::service(Status::default().set_code(Code::PermissionDenied));
916 assert!(!is_not_found(&denied), "PERMISSION_DENIED is not absence");
917
918 let server_err = GcsError::http(503, http::HeaderMap::new(), bytes::Bytes::new());
919 assert!(!is_not_found(&server_err), "HTTP 503 is not absence");
920 }
921
922 #[test]
923 fn gcs_store_install_ring_provider_is_idempotent() {
924 // Installing the ring provider twice must not panic; the second call is a
925 // harmless no-op (a provider is already the process default).
926 install_ring_provider();
927 install_ring_provider();
928 }
929
930 // --- Live round-trip, skipped by default --------------------------------
931 //
932 // Requires real Google Cloud credentials (ADC) plus a writable bucket.
933 // Gated behind `SNAPDIR_GCS_TEST_STORE` (a `gs://bucket/prefix` URL) so it is
934 // skipped unless explicitly configured. Real round-trips are exercised by
935 // the later `remote-interop` gate.
936 #[test]
937 fn gcs_store_live_round_trip_when_configured() {
938 use snapdir_core::manifest::ManifestEntry;
939
940 let Ok(store) = std::env::var("SNAPDIR_GCS_TEST_STORE") else {
941 eprintln!(
942 "skipping gcs_store live round-trip: set SNAPDIR_GCS_TEST_STORE \
943 (gs://bucket/prefix) plus ADC credentials to run it"
944 );
945 return;
946 };
947
948 let hasher = Blake3Hasher::new();
949
950 // Build a tiny source tree + matching manifest.
951 let src = std::env::temp_dir().join(format!("snapdir-gcs-live-{}", std::process::id()));
952 std::fs::create_dir_all(&src).unwrap();
953 std::fs::write(src.join("foo"), b"foo\n").unwrap();
954 let foo_sum = hasher.hash_hex(b"foo\n");
955 let root_sum = snapdir_core::merkle::directory_checksum([foo_sum.as_str()], &hasher);
956 let mut manifest = Manifest::new();
957 manifest.push(ManifestEntry::new(
958 PathType::Directory,
959 "700",
960 root_sum,
961 4,
962 "./",
963 ));
964 manifest.push(ManifestEntry::new(
965 PathType::File,
966 "600",
967 foo_sum,
968 4,
969 "./foo",
970 ));
971 let manifest = Manifest::from_entries(manifest.entries().to_vec());
972 let id = snapdir_core::merkle::snapshot_id(&manifest, &hasher);
973
974 let gcs = GcsStore::connect(&store).expect("connect");
975 gcs.push(&manifest, &src).expect("push");
976 let read_back = gcs.get_manifest(&id).expect("get_manifest");
977 assert_eq!(read_back, manifest);
978
979 let dest = std::env::temp_dir().join(format!("snapdir-gcs-dest-{}", std::process::id()));
980 std::fs::create_dir_all(&dest).unwrap();
981 gcs.fetch_files(&read_back, &dest).expect("fetch_files");
982 assert_eq!(std::fs::read(dest.join("foo")).unwrap(), b"foo\n");
983
984 let _ = std::fs::remove_dir_all(&src);
985 let _ = std::fs::remove_dir_all(&dest);
986 }
987}