Skip to main content

snapdir_stores/
gcs_store.rs

1//! `GcsStore`: the `gs://` storage backend, backed by the native
2//! `google-cloud-storage` SDK.
3//!
4//! A [`GcsStore`] targets a `gs://bucket/prefix` location and holds the frozen
5//! content-addressable `.objects`/`.manifests` sharded layout, so a
6//! bucket/prefix is interchangeable across conforming implementations:
7//!
8//! ```text
9//! gs://<bucket>/<prefix>/.objects/<sharded checksum>     raw object bytes
10//! gs://<bucket>/<prefix>/.manifests/<sharded snapshot id> manifest text
11//! ```
12//!
13//! Sharding and the relative keys come straight from [`snapdir_core::store`]
14//! ([`object_path`] / [`manifest_path`]); this module never reimplements them.
15//!
16//! # `gs://` parsing
17//!
18//! The oracle derives the bucket as `cut -d'/' -f3` of the URL
19//! (`_snapdir_export_store_vars`) and the prefix with
20//! `sed -E 's|^gs:/*[^/]*/?||'` then a trailing-slash strip
21//! (`_snapdir_gcs_store_get_remote_prefix`). [`GcsLocation::parse`] reproduces
22//! that exactly: bucket = first segment after the scheme, prefix = the
23//! remainder with leading/trailing slashes removed.
24//!
25//! # Credentials
26//!
27//! Authentication is delegated entirely to the SDK's own credential chain
28//! (Application Default Credentials): `GOOGLE_APPLICATION_CREDENTIALS`,
29//! `GOOGLE_APPLICATION_CREDENTIALS_JSON`, `gcloud` user creds, and the GCE/GKE
30//! metadata server. No bespoke snapdir credential variables are introduced.
31//!
32//! # TLS provider (project-load-bearing)
33//!
34//! The shipped binary must statically link on musl, so the workspace
35//! standardizes on the **`ring`** rustls provider; `aws-lc-rs` is banned. The
36//! `google-cloud-storage` default features pull `aws-lc-rs` in via
37//! `google-cloud-auth` (both its id-token backend and its rustls provider), so
38//! we depend on the crate with `default-features = false` and instead install
39//! the **ring** [`CryptoProvider`](rustls_ring::crypto::CryptoProvider) as the
40//! rustls *process default* (the SDK's `reqwest` is built with
41//! `rustls-no-provider`, i.e. it consumes that process default). See the crate
42//! `Cargo.toml` for the full rationale.
43//!
44//! # Sync trait, async SDK
45//!
46//! The SDK is async. [`GcsStore`] owns a private multi-thread `tokio` runtime
47//! and bridges each [`Store`] method with `runtime.block_on(...)`, exactly like
48//! [`S3Store`](crate::S3Store), so no `async` leaks into `snapdir-core`.
49
50use std::path::Path;
51use std::sync::Arc;
52
53use google_cloud_gax::error::rpc::Code;
54use google_cloud_gax::error::Error as GcsError;
55use google_cloud_gax::retry_policy::NeverRetry;
56use google_cloud_storage::client::{Storage, StorageControl};
57use snapdir_core::manifest::Manifest;
58use snapdir_core::merkle::{Blake3Hasher, Hasher};
59use snapdir_core::store::{manifest_path, object_path, Store, StoreError};
60use snapdir_core::Meter;
61
62use crate::fetch::fetch_files_concurrent;
63use crate::push::{push_objects_concurrent, upload_object};
64use crate::retry::{parse_retry_after, retry_network, Attempt, DefaultJitter, TokioSleeper};
65use crate::stream::StreamStore;
66use crate::transfer::{classify_error, RateLimiter, TransferConfig};
67use tokio::runtime::Runtime;
68
69/// Number of times a fetch is retried when the downloaded bytes fail their
70/// checksum, mirroring the oracle's `_SNAPDIR_GCS_STORE_RETRIES` default of 5.
71const MAX_FETCH_RETRIES: u32 = 5;
72
73/// The parsed location a [`GcsStore`] targets: a GCS bucket plus a key prefix.
74#[derive(Debug, Clone, PartialEq, Eq)]
75pub struct GcsLocation {
76    /// The bucket name (first path segment of the `gs://` URL).
77    pub bucket: String,
78    /// The object-name prefix (remaining segments), with no leading or trailing
79    /// slash. Empty when the store points at the bucket root.
80    pub prefix: String,
81}
82
83impl GcsLocation {
84    /// Parses a `gs://bucket/prefix` URL into its bucket and prefix.
85    ///
86    /// Matches the oracle exactly: the bucket is `cut -d'/' -f3` of the store
87    /// URL (the segment immediately after `gs://`), and the prefix is
88    /// `sed -E 's|^gs:/*[^/]*/?||'` with a trailing slash stripped
89    /// (`_snapdir_gcs_store_get_remote_prefix`).
90    ///
91    /// The `gs://` scheme is optional; a bare `bucket/prefix` is accepted too.
92    #[must_use]
93    pub fn parse(store_url: &str) -> Self {
94        // Drop the scheme (`gs://`, or any `<proto>://`) if present. The oracle
95        // splits the full URL on `/` and takes field 3 as the bucket, which for
96        // `gs://bucket/...` is exactly the segment after the `//`.
97        let without_scheme = match store_url.find("://") {
98            Some(idx) => &store_url[idx + 3..],
99            None => store_url,
100        };
101        let mut parts = without_scheme.splitn(2, '/');
102        let bucket = parts.next().unwrap_or("").to_owned();
103        let prefix = parts.next().unwrap_or("");
104        // Strip a leading slash (empty-first-segment edge case) and the trailing
105        // slash; the prefix is joined back with a single `/`.
106        let prefix = prefix
107            .trim_end_matches('/')
108            .trim_start_matches('/')
109            .to_owned();
110        Self { bucket, prefix }
111    }
112
113    /// The GCS resource name for this bucket, `projects/_/buckets/<bucket>`, as
114    /// required by the `google-cloud-storage` v1 API.
115    #[must_use]
116    pub fn bucket_resource(&self) -> String {
117        format!("projects/_/buckets/{}", self.bucket)
118    }
119
120    /// Returns the full GCS object name for a content object given its checksum,
121    /// i.e. `<prefix>/.objects/<sharded>` (no leading slash).
122    #[must_use]
123    pub fn object_key(&self, checksum: &str) -> String {
124        self.key_for(&object_path(checksum))
125    }
126
127    /// Returns the full GCS object name for a manifest given its snapshot id,
128    /// i.e. `<prefix>/.manifests/<sharded>` (no leading slash).
129    #[must_use]
130    pub fn manifest_key(&self, id: &str) -> String {
131        self.key_for(&manifest_path(id))
132    }
133
134    /// Joins the store prefix with a store-relative path (`.objects/...` or
135    /// `.manifests/...`), producing a leading-slash-free object name. Mirrors
136    /// the oracle's `${_SNAPDIR_STORE_BASE_DIR%/}/${source_path#/}` with the
137    /// leading slash trimmed.
138    fn key_for(&self, rel: &str) -> String {
139        let rel = rel.trim_start_matches('/');
140        if self.prefix.is_empty() {
141            rel.to_owned()
142        } else {
143            format!("{}/{rel}", self.prefix)
144        }
145    }
146}
147
148/// A content-addressable store backed by a Google Cloud Storage bucket.
149///
150/// Construct one with [`GcsStore::connect`] (resolves Application Default
151/// Credentials via the SDK).
152pub struct GcsStore {
153    /// Data-plane client (object read/write).
154    storage: Storage,
155    /// Control-plane client (object metadata, used for HEAD-like existence
156    /// checks without downloading the body).
157    control: StorageControl,
158    location: GcsLocation,
159    runtime: Arc<Runtime>,
160    config: TransferConfig,
161    /// Per-call request-rate limiter (one token per SDK call), built from
162    /// [`TransferConfig::max_requests_per_sec`]. Unlimited (a no-op) by default.
163    /// Shared (clone) so every call paces against one aggregate request budget.
164    req_limiter: RateLimiter,
165    /// Optional progress meter; recorded into during transfers. `None` (the
166    /// default from every constructor) means zero recording and byte-identical
167    /// behavior. Set by the CLI via [`GcsStore::with_meter`].
168    meter: Option<Arc<Meter>>,
169}
170
171impl GcsStore {
172    /// Connects to the `gs://bucket/prefix` store, resolving credentials via the
173    /// SDK's Application Default Credentials chain.
174    ///
175    /// The `ring` rustls crypto provider is installed as the process default
176    /// (idempotent) before the clients are built, keeping `aws-lc-rs` out of the
177    /// graph (see the module docs).
178    ///
179    /// # Errors
180    ///
181    /// [`StoreError::Backend`] if the tokio runtime cannot be created or the SDK
182    /// clients cannot be built (e.g. credentials cannot be resolved).
183    pub fn connect(store_url: &str) -> Result<Self, StoreError> {
184        Self::connect_with(store_url, TransferConfig::default())
185    }
186
187    /// Like [`connect`](Self::connect), but carries a [`TransferConfig`] for
188    /// concurrency / bandwidth control. The existing [`connect`](Self::connect)
189    /// delegates here with [`TransferConfig::default`].
190    ///
191    /// # Errors
192    ///
193    /// [`StoreError::Backend`] if the tokio runtime cannot be created or the SDK
194    /// clients cannot be built (e.g. credentials cannot be resolved).
195    pub fn connect_with(store_url: &str, config: TransferConfig) -> Result<Self, StoreError> {
196        let location = GcsLocation::parse(store_url);
197        let runtime = build_runtime()?;
198        install_ring_provider();
199
200        let (storage, control) = runtime.block_on(async {
201            // Disable each SDK client's built-in retry loop (`NeverRetry`) so
202            // snapdir's RetryPolicy is the single backoff authority — no
203            // SDK-retries × ours compounding.
204            let storage = Storage::builder()
205                .with_retry_policy(NeverRetry)
206                .build()
207                .await
208                .map_err(|e| backend("building GCS Storage client", e))?;
209            let control = StorageControl::builder()
210                .with_retry_policy(NeverRetry)
211                .build()
212                .await
213                .map_err(|e| backend("building GCS StorageControl client", e))?;
214            Ok::<_, StoreError>((storage, control))
215        })?;
216
217        let req_limiter = RateLimiter::new(config.max_requests_per_sec);
218        Ok(Self {
219            storage,
220            control,
221            location,
222            runtime: Arc::new(runtime),
223            config,
224            req_limiter,
225            meter: None,
226        })
227    }
228
229    /// Builds a store from already-configured SDK clients and a parsed location,
230    /// owning a fresh tokio runtime for the sync bridge. Intended for tests.
231    ///
232    /// # Errors
233    ///
234    /// [`StoreError::Backend`] if the tokio runtime cannot be created.
235    pub fn from_clients(
236        storage: Storage,
237        control: StorageControl,
238        location: GcsLocation,
239    ) -> Result<Self, StoreError> {
240        let config = TransferConfig::default();
241        let req_limiter = RateLimiter::new(config.max_requests_per_sec);
242        Ok(Self {
243            storage,
244            control,
245            location,
246            runtime: Arc::new(build_runtime()?),
247            config,
248            req_limiter,
249            meter: None,
250        })
251    }
252
253    /// Attaches (or clears) an optional progress [`Meter`], rides alongside
254    /// [`config`](Self::transfer_config). The transfer paths record bytes-in /
255    /// bytes-out + per-object progress into it; `None` (the constructor default)
256    /// means zero recording and byte-identical behavior. The CLI sets this after
257    /// construction.
258    #[must_use]
259    pub fn with_meter(mut self, meter: Option<Arc<Meter>>) -> Self {
260        self.meter = meter;
261        self
262    }
263
264    /// The parsed bucket/prefix this store targets.
265    #[must_use]
266    pub fn location(&self) -> &GcsLocation {
267        &self.location
268    }
269
270    /// The [`TransferConfig`] (concurrency / bandwidth) this store was built
271    /// with. Consumed by the transfer loops in later gates.
272    #[must_use]
273    pub fn transfer_config(&self) -> &TransferConfig {
274        &self.config
275    }
276
277    /// Metadata HEAD on an object key; `Ok(true)` if it exists, `Ok(false)` if
278    /// absent (see [`is_not_found`] for what counts as absent).
279    ///
280    /// The single SDK call is wrapped in [`retry_network`]: it acquires one
281    /// request-rate token, then retries a TRANSIENT failure under the store's
282    /// [`RetryPolicy`](crate::retry::RetryPolicy). A not-found is a normal
283    /// `Ok(false)` outcome (not a retry); other errors are classified via
284    /// [`gcs_attempt_from_err`].
285    async fn key_exists(&self, key: &str) -> Result<bool, StoreError> {
286        retry_network(
287            &self.config.retry,
288            &self.req_limiter,
289            &TokioSleeper,
290            &DefaultJitter::new(),
291            || async {
292                match self
293                    .control
294                    .get_object()
295                    .set_bucket(self.location.bucket_resource())
296                    .set_object(key)
297                    .send()
298                    .await
299                {
300                    Ok(_) => Ok(true),
301                    Err(err) => {
302                        if is_not_found(&err) {
303                            return Ok(false);
304                        }
305                        Err(gcs_attempt_from_err("GCS get_object metadata failed", err))
306                    }
307                }
308            },
309        )
310        .await
311    }
312
313    /// GET an object key's full body, draining the read stream, or `None` if it
314    /// is absent (see [`is_not_found`] for what counts as absent).
315    ///
316    /// Wrapped in [`retry_network`] like [`key_exists`](Self::key_exists): a
317    /// not-found is a normal `Ok(None)`; transient failures retry under the
318    /// store's [`RetryPolicy`](crate::retry::RetryPolicy).
319    async fn get_bytes(&self, key: &str) -> Result<Option<Vec<u8>>, StoreError> {
320        retry_network(
321            &self.config.retry,
322            &self.req_limiter,
323            &TokioSleeper,
324            &DefaultJitter::new(),
325            || async {
326                let mut resp = match self
327                    .storage
328                    .read_object(self.location.bucket_resource(), key)
329                    .send()
330                    .await
331                {
332                    Ok(resp) => resp,
333                    Err(err) => {
334                        if is_not_found(&err) {
335                            return Ok(None);
336                        }
337                        return Err(gcs_attempt_from_err("GCS read_object failed", err));
338                    }
339                };
340
341                let mut buf = Vec::new();
342                while let Some(chunk) = resp.next().await {
343                    // A mid-stream read failure can be transient (connection
344                    // reset); classify it the same way as the initial call.
345                    let chunk =
346                        chunk.map_err(|e| gcs_attempt_from_err("reading GCS object body", e))?;
347                    buf.extend_from_slice(&chunk);
348                }
349                Ok(Some(buf))
350            },
351        )
352        .await
353    }
354
355    /// PUT `bytes` at `key`. GCS object writes are atomic (the new object is only
356    /// visible once fully uploaded), so no temp-key dance is needed — matching
357    /// the oracle's reliance on `gcloud storage cp` atomicity.
358    ///
359    /// Wrapped in [`retry_network`]: each (re)try re-sends the full body, so the
360    /// content-addressed bytes that land are unchanged — only transient failures
361    /// retry under the store's [`RetryPolicy`](crate::retry::RetryPolicy).
362    async fn put_bytes(&self, key: &str, bytes: Vec<u8>) -> Result<(), StoreError> {
363        retry_network(
364            &self.config.retry,
365            &self.req_limiter,
366            &TokioSleeper,
367            &DefaultJitter::new(),
368            || {
369                let bytes = bytes.clone();
370                async move {
371                    // The SDK's upload future is large (>30 KiB); box it so it
372                    // does not bloat the enclosing future (clippy::large_futures).
373                    let upload = self
374                        .storage
375                        .write_object(
376                            self.location.bucket_resource(),
377                            key,
378                            bytes::Bytes::from(bytes),
379                        )
380                        .send_buffered();
381                    Box::pin(upload)
382                        .await
383                        .map(|_| ())
384                        .map_err(|e| gcs_attempt_from_err("GCS write_object failed", e))
385                }
386            },
387        )
388        .await
389    }
390
391    /// Downloads `key`, verifying its BLAKE3 against `expected`, retrying up to
392    /// [`MAX_FETCH_RETRIES`] times. Mirrors `_snapdir_gcs_fetch_to_cache`.
393    async fn fetch_verified(&self, key: &str, expected: &str) -> Result<Vec<u8>, StoreError> {
394        let hasher = Blake3Hasher::new();
395        let mut attempts_left = MAX_FETCH_RETRIES;
396        loop {
397            match self.get_bytes(key).await? {
398                Some(bytes) => {
399                    let actual = hasher.hash_hex(&bytes);
400                    if actual == expected {
401                        return Ok(bytes);
402                    }
403                    // Mismatched checksum after fetching: retry (the oracle
404                    // decrements its retry budget on the same condition).
405                    attempts_left = attempts_left.saturating_sub(1);
406                    if attempts_left == 0 {
407                        return Err(StoreError::Integrity {
408                            address: format!("gs://{}/{key}", self.location.bucket),
409                            expected: expected.to_owned(),
410                            actual,
411                        });
412                    }
413                }
414                None => {
415                    // Treat a missing key as not-found rather than spinning.
416                    return Err(StoreError::ObjectNotFound {
417                        checksum: expected.to_owned(),
418                    });
419                }
420            }
421        }
422    }
423}
424
425impl Store for GcsStore {
426    fn get_manifest(&self, id: &str) -> Result<Manifest, StoreError> {
427        let key = self.location.manifest_key(id);
428        let bytes = self.runtime.block_on(async {
429            match self.get_bytes(&key).await? {
430                Some(b) => Ok(b),
431                None => Err(StoreError::ManifestNotFound { id: id.to_owned() }),
432            }
433        })?;
434
435        let text = String::from_utf8(bytes).map_err(|err| StoreError::Backend {
436            message: format!("manifest {id} is not valid UTF-8"),
437            source: Some(Box::new(err)),
438        })?;
439        let manifest = Manifest::parse(&text)?;
440
441        // Verify the stored manifest hashes back to its snapshot id before
442        // trusting it (oracle: the id check on fetch).
443        let actual = snapdir_core::merkle::snapshot_id(&manifest, &Blake3Hasher::new());
444        if actual != id {
445            return Err(StoreError::Integrity {
446                address: self.location.manifest_key(id),
447                expected: id.to_owned(),
448                actual,
449            });
450        }
451        Ok(manifest)
452    }
453
454    fn fetch_files(&self, manifest: &Manifest, dest: &Path) -> Result<(), StoreError> {
455        // Concurrent download via the shared orchestrator: it owns the
456        // skip-if-present-and-verified short-circuit, directory creation, the
457        // bounded-concurrency pass, the per-object rate limit, and the atomic
458        // write. GCS only injects the per-object download, preserving the
459        // BLAKE3-verify + retry discipline of `fetch_verified`.
460        let limiter = RateLimiter::new(self.config.max_bytes_per_sec);
461        let meter = self.meter.as_deref();
462        let meter_arc = self.meter.clone();
463        self.runtime.block_on(async {
464            fetch_files_concurrent(
465                manifest,
466                dest,
467                &self.config,
468                &limiter,
469                meter,
470                meter_arc,
471                |entry| async {
472                    let key = self.location.object_key(&entry.checksum);
473                    self.fetch_verified(&key, &entry.checksum).await
474                },
475            )
476            .await
477        })
478    }
479
480    fn push(&self, manifest: &Manifest, source: &Path) -> Result<(), StoreError> {
481        let hasher = Blake3Hasher::new();
482        let id = snapdir_core::merkle::snapshot_id(manifest, &hasher);
483
484        // Concurrent upload via the shared orchestrator: it owns the bounded
485        // per-object pass and the manifest-last / all-or-nothing ordering. GCS
486        // injects the per-object skip-present + upload (via `upload_object`,
487        // which also owns the shared read+verify) and the manifest-write
488        // closure. A failed push writes NO manifest.
489        let limiter = RateLimiter::new(self.config.max_bytes_per_sec);
490        let meter = self.meter.as_deref();
491        let meter_arc = self.meter.clone();
492        self.runtime.block_on(async {
493            // Skip-if-manifest-present pre-check: a present manifest implies all
494            // its objects are present (we always write the manifest last).
495            let manifest_key = self.location.manifest_key(&id);
496            if self.key_exists(&manifest_key).await? {
497                return Ok(());
498            }
499
500            push_objects_concurrent(
501                manifest,
502                &self.config,
503                &limiter,
504                meter,
505                meter_arc,
506                |entry| {
507                    let object_key = self.location.object_key(&entry.checksum);
508                    upload_object(
509                        entry,
510                        object_key,
511                        source,
512                        &limiter,
513                        meter,
514                        |key| async move { self.key_exists(&key).await },
515                        |key, bytes| async move { self.put_bytes(&key, bytes).await },
516                    )
517                },
518                || async {
519                    // Write the manifest last (verified to hash back to its id),
520                    // exactly as the oracle stores the manifest text.
521                    let mut text = manifest.to_string();
522                    text.push('\n');
523                    let manifest_actual = hasher.hash_hex(text.as_bytes());
524                    if manifest_actual != id {
525                        return Err(StoreError::Integrity {
526                            address: manifest_key.clone(),
527                            expected: id.clone(),
528                            actual: manifest_actual,
529                        });
530                    }
531                    self.put_bytes(&manifest_key, text.into_bytes()).await
532                },
533            )
534            .await
535        })
536    }
537}
538
539impl StreamStore for GcsStore {
540    fn has_object(&self, checksum: &str) -> Result<bool, StoreError> {
541        let key = self.location.object_key(checksum);
542        self.runtime.block_on(async { self.key_exists(&key).await })
543    }
544
545    fn get_object(&self, checksum: &str) -> Result<Vec<u8>, StoreError> {
546        let key = self.location.object_key(checksum);
547        let bytes = self.runtime.block_on(async {
548            self.get_bytes(&key)
549                .await?
550                .ok_or_else(|| StoreError::ObjectNotFound {
551                    checksum: checksum.to_owned(),
552                })
553        })?;
554
555        // Verify the downloaded blob hashes back to its content-address before
556        // returning it (corruption surfaces as `Integrity`, never bad bytes).
557        let actual = Blake3Hasher::new().hash_hex(&bytes);
558        if actual != checksum {
559            return Err(StoreError::Integrity {
560                address: format!("gs://{}/{key}", self.location.bucket),
561                expected: checksum.to_owned(),
562                actual,
563            });
564        }
565        Ok(bytes)
566    }
567
568    fn put_object(&self, checksum: &str, bytes: Vec<u8>) -> Result<(), StoreError> {
569        // Verify BEFORE uploading: a blob whose bytes do not hash to `checksum`
570        // must never land at that content-address (nothing is stored).
571        let actual = Blake3Hasher::new().hash_hex(&bytes);
572        if actual != checksum {
573            return Err(StoreError::Integrity {
574                address: self.location.object_key(checksum),
575                expected: checksum.to_owned(),
576                actual,
577            });
578        }
579        let key = self.location.object_key(checksum);
580        self.runtime
581            .block_on(async { self.put_bytes(&key, bytes).await })
582    }
583
584    fn put_manifest(&self, id: &str, manifest: &Manifest) -> Result<(), StoreError> {
585        let key = self.location.manifest_key(id);
586        // Mirror the manifest-write tail of `push`: render the oracle's
587        // `echo "${manifest}"` bytes, verify they hash back to `id`, then PUT.
588        let mut text = manifest.to_string();
589        text.push('\n');
590        let actual = Blake3Hasher::new().hash_hex(text.as_bytes());
591        if actual != id {
592            return Err(StoreError::Integrity {
593                address: key,
594                expected: id.to_owned(),
595                actual,
596            });
597        }
598        self.runtime
599            .block_on(async { self.put_bytes(&key, text.into_bytes()).await })
600    }
601}
602
603/// Builds the multi-thread tokio runtime that backs the sync bridge.
604fn build_runtime() -> Result<Runtime, StoreError> {
605    tokio::runtime::Builder::new_multi_thread()
606        .enable_all()
607        .build()
608        .map_err(|e| backend("creating tokio runtime for GcsStore", e))
609}
610
611/// Installs the **`ring`** rustls [`CryptoProvider`] as the process default if
612/// none is set yet. Idempotent: a second call (or a default installed by another
613/// store) is a harmless no-op. This is the load-bearing piece that keeps
614/// `aws-lc-rs` out of the dependency graph — the SDK's `reqwest`
615/// (`rustls-no-provider`) consumes whatever process-default provider is set.
616fn install_ring_provider() {
617    // Ignore the error: it only means a provider was already installed, which is
618    // exactly the state we want.
619    let _ = rustls_ring::crypto::ring::default_provider().install_default();
620}
621
622/// Wraps any backend error into [`StoreError::Backend`] with a message.
623fn backend<E>(message: &str, source: E) -> StoreError
624where
625    E: std::error::Error + Send + Sync + 'static,
626{
627    StoreError::Backend {
628        message: message.to_owned(),
629        source: Some(Box::new(source)),
630    }
631}
632
633/// Builds a retry [`Attempt`] from a concrete `google-cloud-storage`
634/// [`GcsError`], at the boundary where the SDK error still carries its HTTP
635/// status / headers / gRPC status.
636///
637/// - `transient`: true for the retryable signal set — an HTTP `429`/`503`
638///   status, the gRPC `RESOURCE_EXHAUSTED` / `UNAVAILABLE` / `DEADLINE_EXCEEDED`
639///   / `ABORTED` / `INTERNAL` codes, and the transport/timeout classes folded
640///   through the shared [`classify_error`](crate::classify_error) on the mapped
641///   [`StoreError`]. Conservative: unknown / `NOT_FOUND` / `PERMISSION_DENIED`
642///   are NOT transient.
643/// - `retry_after`: extracted from the error's HTTP `Retry-After` header (the
644///   delta-seconds form) when present, else `None`.
645/// - `err`: the mapped [`StoreError::Backend`] (the same value the non-retry
646///   path used to surface).
647fn gcs_attempt_from_err(message: &str, err: GcsError) -> Attempt {
648    let http_status = err.http_status_code();
649    let retry_after = err
650        .http_headers()
651        .and_then(|h| h.get("retry-after"))
652        .and_then(|v| v.to_str().ok())
653        .and_then(parse_retry_after);
654    let code = err.status().map(|s| s.code);
655
656    let store_err = backend(message, err);
657
658    let transient = http_status.is_some_and(|s| s == 429 || s == 503)
659        || matches!(
660            code,
661            Some(
662                Code::ResourceExhausted
663                    | Code::Unavailable
664                    | Code::DeadlineExceeded
665                    | Code::Aborted
666                    | Code::Internal
667            )
668        )
669        || matches!(
670            classify_error(&store_err),
671            crate::adaptive::OpResult::Throttle
672        );
673
674    Attempt {
675        transient,
676        retry_after,
677        err: store_err,
678    }
679}
680
681/// Classifies a `google-cloud-storage` SDK error as "object is absent".
682///
683/// The SDK reports a missing object two different ways, and the absent-object
684/// paths (`key_exists` -> `Ok(false)`, `get_bytes` -> `Ok(None)`) must treat
685/// BOTH as not-found:
686///
687/// 1. A plain **HTTP 404** (`http_status_code() == Some(404)`), e.g. from a
688///    proxy/load balancer ahead of the service.
689/// 2. A **service-level gRPC-style error** whose `status().code` is
690///    [`Code::NotFound`] but whose `http_status_code()` is `None`. This is what
691///    the v1.x SDK actually returns for `get_object`/`read_object` on a missing
692///    object, and the form the original `== Some(404)`-only check misclassified
693///    as a fatal backend error (it aborted `push` before the first upload).
694///
695/// This mirrors the SDK's own internal classification
696/// (`e.status().is_some_and(|s| s.code == Code::NotFound)`), the GCS analogue of
697/// the aws-sdk's `is_not_found()` used by [`S3Store`](crate::S3Store).
698fn is_not_found(err: &GcsError) -> bool {
699    err.http_status_code() == Some(404)
700        || err
701            .status()
702            .is_some_and(|status| status.code == Code::NotFound)
703}
704
705#[cfg(test)]
706mod tests {
707    use super::*;
708    use snapdir_core::manifest::PathType;
709    use std::time::Duration;
710
711    #[test]
712    fn backoff_wire_gcs_extract_503_retry_after_is_transient_with_hint() {
713        // A 503 carrying `Retry-After: 9` => transient, hint = 9s.
714        let mut headers = http::HeaderMap::new();
715        headers.insert("retry-after", http::HeaderValue::from_static("9"));
716        let err = GcsError::http(503, headers, bytes::Bytes::new());
717        let attempt = gcs_attempt_from_err("GCS op failed", err);
718        assert!(attempt.transient, "HTTP 503 must be transient");
719        assert_eq!(
720            attempt.retry_after,
721            Some(Duration::from_secs(9)),
722            "the Retry-After delta-seconds header must be extracted"
723        );
724    }
725
726    #[test]
727    fn backoff_wire_gcs_extract_resource_exhausted_is_transient_no_hint() {
728        // A service-level RESOURCE_EXHAUSTED (no HTTP status / header) =>
729        // transient with no hint.
730        use google_cloud_gax::error::rpc::Status;
731        let status = Status::default()
732            .set_code(Code::ResourceExhausted)
733            .set_message("quota exceeded");
734        let err = GcsError::service(status);
735        let attempt = gcs_attempt_from_err("GCS op failed", err);
736        assert!(
737            attempt.transient,
738            "RESOURCE_EXHAUSTED must be classified transient"
739        );
740        assert_eq!(attempt.retry_after, None, "no Retry-After header => None");
741    }
742
743    #[test]
744    fn backoff_wire_gcs_extract_not_found_is_not_transient() {
745        use google_cloud_gax::error::rpc::Status;
746
747        // A 404 / NOT_FOUND hard error => NOT transient (conservative).
748        let err = GcsError::http(404, http::HeaderMap::new(), bytes::Bytes::new());
749        let attempt = gcs_attempt_from_err("GCS op failed", err);
750        assert!(
751            !attempt.transient,
752            "a 404/not-found must never be classified transient"
753        );
754
755        let denied = GcsError::service(Status::default().set_code(Code::PermissionDenied));
756        let attempt = gcs_attempt_from_err("GCS op failed", denied);
757        assert!(
758            !attempt.transient,
759            "PERMISSION_DENIED must never be classified transient"
760        );
761    }
762
763    /// Strips a leading `./` and a trailing `/` from a manifest path. Kept as a
764    /// test-only assertion of the path normalization the orchestrator
765    /// (`crate::push`) performs.
766    fn strip_leading_dot_slash(path: &str) -> &str {
767        let trimmed = path.strip_prefix("./").unwrap_or(path);
768        trimmed.strip_suffix('/').unwrap_or(trimmed)
769    }
770
771    // The canonical content-addressable fixtures (shared across the s3/gcs
772    // store test suites).
773    const FOO_CHECKSUM: &str = "49dc870df1de7fd60794cebce449f5ccdae575affaa67a24b62acb03e039db92";
774    const FOO_SHARDED: &str = "49d/c87/0df/1de7fd60794cebce449f5ccdae575affaa67a24b62acb03e039db92";
775    const MANIFEST_ID: &str = "aa91e498f401ea9e6ddbaa1138a0dbeb030fab8defc1252d80c77ebefafbc70d";
776    const MANIFEST_SHARDED: &str =
777        "aa9/1e4/98f/401ea9e6ddbaa1138a0dbeb030fab8defc1252d80c77ebefafbc70d";
778
779    #[test]
780    fn gcs_store_parses_bucket_and_prefix() {
781        let loc = GcsLocation::parse("gs://my-bucket/long/term/storage");
782        assert_eq!(loc.bucket, "my-bucket");
783        assert_eq!(loc.prefix, "long/term/storage");
784    }
785
786    #[test]
787    fn gcs_store_parse_matches_oracle_cut_and_sed() {
788        // Oracle: bucket = `cut -d'/' -f3`; prefix = `sed -E 's|^gs:/*[^/]*/?||'`
789        // then trailing-slash strip. For "gs://bucket/a/b/c": bucket=bucket,
790        // prefix=a/b/c.
791        let loc = GcsLocation::parse("gs://bucket/a/b/c");
792        assert_eq!(loc.bucket, "bucket");
793        assert_eq!(loc.prefix, "a/b/c");
794    }
795
796    #[test]
797    fn gcs_store_parse_strips_trailing_slash() {
798        // `_snapdir_gcs_store_get_remote_prefix` strips the trailing slash.
799        let loc = GcsLocation::parse("gs://bucket/prefix/");
800        assert_eq!(loc.bucket, "bucket");
801        assert_eq!(loc.prefix, "prefix");
802    }
803
804    #[test]
805    fn gcs_store_parse_bucket_root_has_empty_prefix() {
806        let loc = GcsLocation::parse("gs://bucket");
807        assert_eq!(loc.bucket, "bucket");
808        assert_eq!(loc.prefix, "");
809
810        let loc_slash = GcsLocation::parse("gs://bucket/");
811        assert_eq!(loc_slash.bucket, "bucket");
812        assert_eq!(loc_slash.prefix, "");
813    }
814
815    #[test]
816    fn gcs_store_parse_accepts_bare_bucket_prefix_without_scheme() {
817        let loc = GcsLocation::parse("bucket/some/prefix");
818        assert_eq!(loc.bucket, "bucket");
819        assert_eq!(loc.prefix, "some/prefix");
820    }
821
822    #[test]
823    fn gcs_store_bucket_resource_uses_projects_underscore_form() {
824        let loc = GcsLocation::parse("gs://my-bucket/x");
825        assert_eq!(loc.bucket_resource(), "projects/_/buckets/my-bucket");
826    }
827
828    #[test]
829    fn gcs_store_object_key_matches_sharded_scheme() {
830        let loc = GcsLocation::parse("gs://b/long/term/storage");
831        assert_eq!(
832            loc.object_key(FOO_CHECKSUM),
833            format!("long/term/storage/.objects/{FOO_SHARDED}")
834        );
835    }
836
837    #[test]
838    fn gcs_store_manifest_key_matches_sharded_scheme() {
839        let loc = GcsLocation::parse("gs://b/long/term/storage");
840        assert_eq!(
841            loc.manifest_key(MANIFEST_ID),
842            format!("long/term/storage/.manifests/{MANIFEST_SHARDED}")
843        );
844    }
845
846    #[test]
847    fn gcs_store_keys_have_no_leading_slash_at_bucket_root() {
848        // With an empty prefix the keys are just `.objects/...` / `.manifests/...`.
849        let loc = GcsLocation::parse("gs://bucket");
850        assert_eq!(
851            loc.object_key(FOO_CHECKSUM),
852            format!(".objects/{FOO_SHARDED}")
853        );
854        assert_eq!(
855            loc.manifest_key(MANIFEST_ID),
856            format!(".manifests/{MANIFEST_SHARDED}")
857        );
858    }
859
860    #[test]
861    fn gcs_store_object_key_uses_core_object_path() {
862        // Cross-check that we delegate to the frozen core sharding helper rather
863        // than reimplementing it: at the bucket root the key equals the core
864        // `object_path` output verbatim.
865        let loc = GcsLocation::parse("gs://b");
866        assert_eq!(loc.object_key(FOO_CHECKSUM), object_path(FOO_CHECKSUM));
867        assert_eq!(loc.manifest_key(MANIFEST_ID), manifest_path(MANIFEST_ID));
868    }
869
870    #[test]
871    fn gcs_store_strip_leading_dot_slash() {
872        assert_eq!(strip_leading_dot_slash("./foo"), "foo");
873        assert_eq!(strip_leading_dot_slash("./a/b/c"), "a/b/c");
874        assert_eq!(strip_leading_dot_slash("./a/"), "a");
875        assert_eq!(strip_leading_dot_slash("./"), "");
876    }
877
878    #[test]
879    fn gcs_store_is_not_found_classifies_service_level_not_found_as_absent() {
880        // Regression guard for the push-abort bug: the v1.x SDK reports a
881        // missing object as a *service-level* gRPC error with code NOT_FOUND and
882        // NO HTTP status code (`http_status_code() == None`). The original
883        // `== Some(404)`-only check misclassified this as a fatal backend error,
884        // so `key_exists` errored and `push` aborted before uploading anything.
885        use google_cloud_gax::error::rpc::Status;
886
887        let status = Status::default()
888            .set_code(Code::NotFound)
889            .set_message("No such object: bucket/.manifests/...");
890        let err = GcsError::service(status);
891        // This is the load-bearing assertion: the real-world shape carries no
892        // HTTP code, so a 404-only check would (and did) miss it.
893        assert_eq!(err.http_status_code(), None);
894        assert!(
895            is_not_found(&err),
896            "service-level NOT_FOUND must be classified as object-absent"
897        );
898    }
899
900    #[test]
901    fn gcs_store_is_not_found_classifies_http_404_as_absent() {
902        // The other absent shape: a plain HTTP 404 (e.g. from a proxy/LB ahead
903        // of the service). Must also count as not-found.
904        let err = GcsError::http(404, http::HeaderMap::new(), bytes::Bytes::new());
905        assert!(is_not_found(&err), "HTTP 404 must be classified as absent");
906    }
907
908    #[test]
909    fn gcs_store_is_not_found_does_not_swallow_other_errors() {
910        // Guard the inverse: a non-not-found service error (e.g. PERMISSION
911        // DENIED) and a non-404 HTTP error must NOT be treated as absent, so
912        // real failures still surface instead of being silently skipped.
913        use google_cloud_gax::error::rpc::Status;
914
915        let denied = GcsError::service(Status::default().set_code(Code::PermissionDenied));
916        assert!(!is_not_found(&denied), "PERMISSION_DENIED is not absence");
917
918        let server_err = GcsError::http(503, http::HeaderMap::new(), bytes::Bytes::new());
919        assert!(!is_not_found(&server_err), "HTTP 503 is not absence");
920    }
921
922    #[test]
923    fn gcs_store_install_ring_provider_is_idempotent() {
924        // Installing the ring provider twice must not panic; the second call is a
925        // harmless no-op (a provider is already the process default).
926        install_ring_provider();
927        install_ring_provider();
928    }
929
930    // --- Live round-trip, skipped by default --------------------------------
931    //
932    // Requires real Google Cloud credentials (ADC) plus a writable bucket.
933    // Gated behind `SNAPDIR_GCS_TEST_STORE` (a `gs://bucket/prefix` URL) so it is
934    // skipped unless explicitly configured. Real round-trips are exercised by
935    // the later `remote-interop` gate.
936    #[test]
937    fn gcs_store_live_round_trip_when_configured() {
938        use snapdir_core::manifest::ManifestEntry;
939
940        let Ok(store) = std::env::var("SNAPDIR_GCS_TEST_STORE") else {
941            eprintln!(
942                "skipping gcs_store live round-trip: set SNAPDIR_GCS_TEST_STORE \
943                 (gs://bucket/prefix) plus ADC credentials to run it"
944            );
945            return;
946        };
947
948        let hasher = Blake3Hasher::new();
949
950        // Build a tiny source tree + matching manifest.
951        let src = std::env::temp_dir().join(format!("snapdir-gcs-live-{}", std::process::id()));
952        std::fs::create_dir_all(&src).unwrap();
953        std::fs::write(src.join("foo"), b"foo\n").unwrap();
954        let foo_sum = hasher.hash_hex(b"foo\n");
955        let root_sum = snapdir_core::merkle::directory_checksum([foo_sum.as_str()], &hasher);
956        let mut manifest = Manifest::new();
957        manifest.push(ManifestEntry::new(
958            PathType::Directory,
959            "700",
960            root_sum,
961            4,
962            "./",
963        ));
964        manifest.push(ManifestEntry::new(
965            PathType::File,
966            "600",
967            foo_sum,
968            4,
969            "./foo",
970        ));
971        let manifest = Manifest::from_entries(manifest.entries().to_vec());
972        let id = snapdir_core::merkle::snapshot_id(&manifest, &hasher);
973
974        let gcs = GcsStore::connect(&store).expect("connect");
975        gcs.push(&manifest, &src).expect("push");
976        let read_back = gcs.get_manifest(&id).expect("get_manifest");
977        assert_eq!(read_back, manifest);
978
979        let dest = std::env::temp_dir().join(format!("snapdir-gcs-dest-{}", std::process::id()));
980        std::fs::create_dir_all(&dest).unwrap();
981        gcs.fetch_files(&read_back, &dest).expect("fetch_files");
982        assert_eq!(std::fs::read(dest.join("foo")).unwrap(), b"foo\n");
983
984        let _ = std::fs::remove_dir_all(&src);
985        let _ = std::fs::remove_dir_all(&dest);
986    }
987}