Skip to main content

snapdir_stores/
gcs_store.rs

1//! `GcsStore`: the `gs://` storage backend, backed by the native
2//! `google-cloud-storage` SDK.
3//!
4//! A [`GcsStore`] targets a `gs://bucket/prefix` location and holds the frozen
5//! content-addressable `.objects`/`.manifests` sharded layout, so a
6//! bucket/prefix is interchangeable across conforming implementations:
7//!
8//! ```text
9//! gs://<bucket>/<prefix>/.objects/<sharded checksum>     raw object bytes
10//! gs://<bucket>/<prefix>/.manifests/<sharded snapshot id> manifest text
11//! ```
12//!
13//! Sharding and the relative keys come straight from [`snapdir_core::store`]
14//! ([`object_path`] / [`manifest_path`]); this module never reimplements them.
15//!
16//! # `gs://` parsing
17//!
18//! The oracle derives the bucket as `cut -d'/' -f3` of the URL
19//! (`_snapdir_export_store_vars`) and the prefix with
20//! `sed -E 's|^gs:/*[^/]*/?||'` then a trailing-slash strip
21//! (`_snapdir_gcs_store_get_remote_prefix`). [`GcsLocation::parse`] reproduces
22//! that exactly: bucket = first segment after the scheme, prefix = the
23//! remainder with leading/trailing slashes removed.
24//!
25//! # Credentials
26//!
27//! Authentication is delegated entirely to the SDK's own credential chain
28//! (Application Default Credentials): `GOOGLE_APPLICATION_CREDENTIALS`,
29//! `GOOGLE_APPLICATION_CREDENTIALS_JSON`, `gcloud` user creds, and the GCE/GKE
30//! metadata server. No bespoke snapdir credential variables are introduced.
31//!
32//! # TLS provider (project-load-bearing)
33//!
34//! The shipped binary must statically link on musl, so the workspace
35//! standardizes on the **`ring`** rustls provider; `aws-lc-rs` is banned. The
36//! `google-cloud-storage` default features pull `aws-lc-rs` in via
37//! `google-cloud-auth` (both its id-token backend and its rustls provider), so
38//! we depend on the crate with `default-features = false` and instead install
39//! the **ring** [`CryptoProvider`](rustls_ring::crypto::CryptoProvider) as the
40//! rustls *process default* (the SDK's `reqwest` is built with
41//! `rustls-no-provider`, i.e. it consumes that process default). See the crate
42//! `Cargo.toml` for the full rationale.
43//!
44//! # Sync trait, async SDK
45//!
46//! The SDK is async. [`GcsStore`] owns a private multi-thread `tokio` runtime
47//! and bridges each [`Store`] method with `runtime.block_on(...)`, exactly like
48//! [`S3Store`](crate::S3Store), so no `async` leaks into `snapdir-core`.
49
50use std::path::Path;
51use std::sync::Arc;
52
53use google_cloud_gax::error::rpc::Code;
54use google_cloud_gax::error::Error as GcsError;
55use google_cloud_storage::client::{Storage, StorageControl};
56use snapdir_core::manifest::Manifest;
57use snapdir_core::merkle::{Blake3Hasher, Hasher};
58use snapdir_core::store::{manifest_path, object_path, Store, StoreError};
59use snapdir_core::Meter;
60
61use crate::fetch::fetch_files_concurrent;
62use crate::push::{push_objects_concurrent, upload_object};
63use crate::stream::StreamStore;
64use crate::transfer::{RateLimiter, TransferConfig};
65use tokio::runtime::Runtime;
66
67/// Number of times a fetch is retried when the downloaded bytes fail their
68/// checksum, mirroring the oracle's `_SNAPDIR_GCS_STORE_RETRIES` default of 5.
69const MAX_FETCH_RETRIES: u32 = 5;
70
71/// The parsed location a [`GcsStore`] targets: a GCS bucket plus a key prefix.
72#[derive(Debug, Clone, PartialEq, Eq)]
73pub struct GcsLocation {
74    /// The bucket name (first path segment of the `gs://` URL).
75    pub bucket: String,
76    /// The object-name prefix (remaining segments), with no leading or trailing
77    /// slash. Empty when the store points at the bucket root.
78    pub prefix: String,
79}
80
81impl GcsLocation {
82    /// Parses a `gs://bucket/prefix` URL into its bucket and prefix.
83    ///
84    /// Matches the oracle exactly: the bucket is `cut -d'/' -f3` of the store
85    /// URL (the segment immediately after `gs://`), and the prefix is
86    /// `sed -E 's|^gs:/*[^/]*/?||'` with a trailing slash stripped
87    /// (`_snapdir_gcs_store_get_remote_prefix`).
88    ///
89    /// The `gs://` scheme is optional; a bare `bucket/prefix` is accepted too.
90    #[must_use]
91    pub fn parse(store_url: &str) -> Self {
92        // Drop the scheme (`gs://`, or any `<proto>://`) if present. The oracle
93        // splits the full URL on `/` and takes field 3 as the bucket, which for
94        // `gs://bucket/...` is exactly the segment after the `//`.
95        let without_scheme = match store_url.find("://") {
96            Some(idx) => &store_url[idx + 3..],
97            None => store_url,
98        };
99        let mut parts = without_scheme.splitn(2, '/');
100        let bucket = parts.next().unwrap_or("").to_owned();
101        let prefix = parts.next().unwrap_or("");
102        // Strip a leading slash (empty-first-segment edge case) and the trailing
103        // slash; the prefix is joined back with a single `/`.
104        let prefix = prefix
105            .trim_end_matches('/')
106            .trim_start_matches('/')
107            .to_owned();
108        Self { bucket, prefix }
109    }
110
111    /// The GCS resource name for this bucket, `projects/_/buckets/<bucket>`, as
112    /// required by the `google-cloud-storage` v1 API.
113    #[must_use]
114    pub fn bucket_resource(&self) -> String {
115        format!("projects/_/buckets/{}", self.bucket)
116    }
117
118    /// Returns the full GCS object name for a content object given its checksum,
119    /// i.e. `<prefix>/.objects/<sharded>` (no leading slash).
120    #[must_use]
121    pub fn object_key(&self, checksum: &str) -> String {
122        self.key_for(&object_path(checksum))
123    }
124
125    /// Returns the full GCS object name for a manifest given its snapshot id,
126    /// i.e. `<prefix>/.manifests/<sharded>` (no leading slash).
127    #[must_use]
128    pub fn manifest_key(&self, id: &str) -> String {
129        self.key_for(&manifest_path(id))
130    }
131
132    /// Joins the store prefix with a store-relative path (`.objects/...` or
133    /// `.manifests/...`), producing a leading-slash-free object name. Mirrors
134    /// the oracle's `${_SNAPDIR_STORE_BASE_DIR%/}/${source_path#/}` with the
135    /// leading slash trimmed.
136    fn key_for(&self, rel: &str) -> String {
137        let rel = rel.trim_start_matches('/');
138        if self.prefix.is_empty() {
139            rel.to_owned()
140        } else {
141            format!("{}/{rel}", self.prefix)
142        }
143    }
144}
145
146/// A content-addressable store backed by a Google Cloud Storage bucket.
147///
148/// Construct one with [`GcsStore::connect`] (resolves Application Default
149/// Credentials via the SDK).
150pub struct GcsStore {
151    /// Data-plane client (object read/write).
152    storage: Storage,
153    /// Control-plane client (object metadata, used for HEAD-like existence
154    /// checks without downloading the body).
155    control: StorageControl,
156    location: GcsLocation,
157    runtime: Arc<Runtime>,
158    config: TransferConfig,
159    /// Optional progress meter; recorded into during transfers. `None` (the
160    /// default from every constructor) means zero recording and byte-identical
161    /// behavior. Set by the CLI via [`GcsStore::with_meter`].
162    meter: Option<Arc<Meter>>,
163}
164
165impl GcsStore {
166    /// Connects to the `gs://bucket/prefix` store, resolving credentials via the
167    /// SDK's Application Default Credentials chain.
168    ///
169    /// The `ring` rustls crypto provider is installed as the process default
170    /// (idempotent) before the clients are built, keeping `aws-lc-rs` out of the
171    /// graph (see the module docs).
172    ///
173    /// # Errors
174    ///
175    /// [`StoreError::Backend`] if the tokio runtime cannot be created or the SDK
176    /// clients cannot be built (e.g. credentials cannot be resolved).
177    pub fn connect(store_url: &str) -> Result<Self, StoreError> {
178        Self::connect_with(store_url, TransferConfig::default())
179    }
180
181    /// Like [`connect`](Self::connect), but carries a [`TransferConfig`] for
182    /// concurrency / bandwidth control. The existing [`connect`](Self::connect)
183    /// delegates here with [`TransferConfig::default`].
184    ///
185    /// # Errors
186    ///
187    /// [`StoreError::Backend`] if the tokio runtime cannot be created or the SDK
188    /// clients cannot be built (e.g. credentials cannot be resolved).
189    pub fn connect_with(store_url: &str, config: TransferConfig) -> Result<Self, StoreError> {
190        let location = GcsLocation::parse(store_url);
191        let runtime = build_runtime()?;
192        install_ring_provider();
193
194        let (storage, control) = runtime.block_on(async {
195            let storage = Storage::builder()
196                .build()
197                .await
198                .map_err(|e| backend("building GCS Storage client", e))?;
199            let control = StorageControl::builder()
200                .build()
201                .await
202                .map_err(|e| backend("building GCS StorageControl client", e))?;
203            Ok::<_, StoreError>((storage, control))
204        })?;
205
206        Ok(Self {
207            storage,
208            control,
209            location,
210            runtime: Arc::new(runtime),
211            config,
212            meter: None,
213        })
214    }
215
216    /// Builds a store from already-configured SDK clients and a parsed location,
217    /// owning a fresh tokio runtime for the sync bridge. Intended for tests.
218    ///
219    /// # Errors
220    ///
221    /// [`StoreError::Backend`] if the tokio runtime cannot be created.
222    pub fn from_clients(
223        storage: Storage,
224        control: StorageControl,
225        location: GcsLocation,
226    ) -> Result<Self, StoreError> {
227        Ok(Self {
228            storage,
229            control,
230            location,
231            runtime: Arc::new(build_runtime()?),
232            config: TransferConfig::default(),
233            meter: None,
234        })
235    }
236
237    /// Attaches (or clears) an optional progress [`Meter`], rides alongside
238    /// [`config`](Self::transfer_config). The transfer paths record bytes-in /
239    /// bytes-out + per-object progress into it; `None` (the constructor default)
240    /// means zero recording and byte-identical behavior. The CLI sets this after
241    /// construction.
242    #[must_use]
243    pub fn with_meter(mut self, meter: Option<Arc<Meter>>) -> Self {
244        self.meter = meter;
245        self
246    }
247
248    /// The parsed bucket/prefix this store targets.
249    #[must_use]
250    pub fn location(&self) -> &GcsLocation {
251        &self.location
252    }
253
254    /// The [`TransferConfig`] (concurrency / bandwidth) this store was built
255    /// with. Consumed by the transfer loops in later gates.
256    #[must_use]
257    pub fn transfer_config(&self) -> &TransferConfig {
258        &self.config
259    }
260
261    /// Metadata HEAD on an object key; `Ok(true)` if it exists, `Ok(false)` if
262    /// absent (see [`is_not_found`] for what counts as absent).
263    async fn key_exists(&self, key: &str) -> Result<bool, StoreError> {
264        match self
265            .control
266            .get_object()
267            .set_bucket(self.location.bucket_resource())
268            .set_object(key)
269            .send()
270            .await
271        {
272            Ok(_) => Ok(true),
273            Err(err) => {
274                if is_not_found(&err) {
275                    Ok(false)
276                } else {
277                    Err(backend("GCS get_object metadata failed", err))
278                }
279            }
280        }
281    }
282
283    /// GET an object key's full body, draining the read stream, or `None` if it
284    /// is absent (see [`is_not_found`] for what counts as absent).
285    async fn get_bytes(&self, key: &str) -> Result<Option<Vec<u8>>, StoreError> {
286        let mut resp = match self
287            .storage
288            .read_object(self.location.bucket_resource(), key)
289            .send()
290            .await
291        {
292            Ok(resp) => resp,
293            Err(err) => {
294                if is_not_found(&err) {
295                    return Ok(None);
296                }
297                return Err(backend("GCS read_object failed", err));
298            }
299        };
300
301        let mut buf = Vec::new();
302        while let Some(chunk) = resp.next().await {
303            let chunk = chunk.map_err(|e| backend("reading GCS object body", e))?;
304            buf.extend_from_slice(&chunk);
305        }
306        Ok(Some(buf))
307    }
308
309    /// PUT `bytes` at `key`. GCS object writes are atomic (the new object is only
310    /// visible once fully uploaded), so no temp-key dance is needed — matching
311    /// the oracle's reliance on `gcloud storage cp` atomicity.
312    async fn put_bytes(&self, key: &str, bytes: Vec<u8>) -> Result<(), StoreError> {
313        // The SDK's upload future is large (>30 KiB); box it so it does not
314        // bloat the enclosing `push` future (clippy::large_futures).
315        let upload = self
316            .storage
317            .write_object(
318                self.location.bucket_resource(),
319                key,
320                bytes::Bytes::from(bytes),
321            )
322            .send_buffered();
323        Box::pin(upload)
324            .await
325            .map_err(|e| backend("GCS write_object failed", e))?;
326        Ok(())
327    }
328
329    /// Downloads `key`, verifying its BLAKE3 against `expected`, retrying up to
330    /// [`MAX_FETCH_RETRIES`] times. Mirrors `_snapdir_gcs_fetch_to_cache`.
331    async fn fetch_verified(&self, key: &str, expected: &str) -> Result<Vec<u8>, StoreError> {
332        let hasher = Blake3Hasher::new();
333        let mut attempts_left = MAX_FETCH_RETRIES;
334        loop {
335            match self.get_bytes(key).await? {
336                Some(bytes) => {
337                    let actual = hasher.hash_hex(&bytes);
338                    if actual == expected {
339                        return Ok(bytes);
340                    }
341                    // Mismatched checksum after fetching: retry (the oracle
342                    // decrements its retry budget on the same condition).
343                    attempts_left = attempts_left.saturating_sub(1);
344                    if attempts_left == 0 {
345                        return Err(StoreError::Integrity {
346                            address: format!("gs://{}/{key}", self.location.bucket),
347                            expected: expected.to_owned(),
348                            actual,
349                        });
350                    }
351                }
352                None => {
353                    // Treat a missing key as not-found rather than spinning.
354                    return Err(StoreError::ObjectNotFound {
355                        checksum: expected.to_owned(),
356                    });
357                }
358            }
359        }
360    }
361}
362
363impl Store for GcsStore {
364    fn get_manifest(&self, id: &str) -> Result<Manifest, StoreError> {
365        let key = self.location.manifest_key(id);
366        let bytes = self.runtime.block_on(async {
367            match self.get_bytes(&key).await? {
368                Some(b) => Ok(b),
369                None => Err(StoreError::ManifestNotFound { id: id.to_owned() }),
370            }
371        })?;
372
373        let text = String::from_utf8(bytes).map_err(|err| StoreError::Backend {
374            message: format!("manifest {id} is not valid UTF-8"),
375            source: Some(Box::new(err)),
376        })?;
377        let manifest = Manifest::parse(&text)?;
378
379        // Verify the stored manifest hashes back to its snapshot id before
380        // trusting it (oracle: the id check on fetch).
381        let actual = snapdir_core::merkle::snapshot_id(&manifest, &Blake3Hasher::new());
382        if actual != id {
383            return Err(StoreError::Integrity {
384                address: self.location.manifest_key(id),
385                expected: id.to_owned(),
386                actual,
387            });
388        }
389        Ok(manifest)
390    }
391
392    fn fetch_files(&self, manifest: &Manifest, dest: &Path) -> Result<(), StoreError> {
393        // Concurrent download via the shared orchestrator: it owns the
394        // skip-if-present-and-verified short-circuit, directory creation, the
395        // bounded-concurrency pass, the per-object rate limit, and the atomic
396        // write. GCS only injects the per-object download, preserving the
397        // BLAKE3-verify + retry discipline of `fetch_verified`.
398        let limiter = RateLimiter::new(self.config.max_bytes_per_sec);
399        let meter = self.meter.as_deref();
400        let meter_arc = self.meter.clone();
401        self.runtime.block_on(async {
402            fetch_files_concurrent(
403                manifest,
404                dest,
405                &self.config,
406                &limiter,
407                meter,
408                meter_arc,
409                |entry| async {
410                    let key = self.location.object_key(&entry.checksum);
411                    self.fetch_verified(&key, &entry.checksum).await
412                },
413            )
414            .await
415        })
416    }
417
418    fn push(&self, manifest: &Manifest, source: &Path) -> Result<(), StoreError> {
419        let hasher = Blake3Hasher::new();
420        let id = snapdir_core::merkle::snapshot_id(manifest, &hasher);
421
422        // Concurrent upload via the shared orchestrator: it owns the bounded
423        // per-object pass and the manifest-last / all-or-nothing ordering. GCS
424        // injects the per-object skip-present + upload (via `upload_object`,
425        // which also owns the shared read+verify) and the manifest-write
426        // closure. A failed push writes NO manifest.
427        let limiter = RateLimiter::new(self.config.max_bytes_per_sec);
428        let meter = self.meter.as_deref();
429        let meter_arc = self.meter.clone();
430        self.runtime.block_on(async {
431            // Skip-if-manifest-present pre-check: a present manifest implies all
432            // its objects are present (we always write the manifest last).
433            let manifest_key = self.location.manifest_key(&id);
434            if self.key_exists(&manifest_key).await? {
435                return Ok(());
436            }
437
438            push_objects_concurrent(
439                manifest,
440                &self.config,
441                &limiter,
442                meter,
443                meter_arc,
444                |entry| {
445                    let object_key = self.location.object_key(&entry.checksum);
446                    upload_object(
447                        entry,
448                        object_key,
449                        source,
450                        &limiter,
451                        meter,
452                        |key| async move { self.key_exists(&key).await },
453                        |key, bytes| async move { self.put_bytes(&key, bytes).await },
454                    )
455                },
456                || async {
457                    // Write the manifest last (verified to hash back to its id),
458                    // exactly as the oracle stores the manifest text.
459                    let mut text = manifest.to_string();
460                    text.push('\n');
461                    let manifest_actual = hasher.hash_hex(text.as_bytes());
462                    if manifest_actual != id {
463                        return Err(StoreError::Integrity {
464                            address: manifest_key.clone(),
465                            expected: id.clone(),
466                            actual: manifest_actual,
467                        });
468                    }
469                    self.put_bytes(&manifest_key, text.into_bytes()).await
470                },
471            )
472            .await
473        })
474    }
475}
476
477impl StreamStore for GcsStore {
478    fn has_object(&self, checksum: &str) -> Result<bool, StoreError> {
479        let key = self.location.object_key(checksum);
480        self.runtime.block_on(async { self.key_exists(&key).await })
481    }
482
483    fn get_object(&self, checksum: &str) -> Result<Vec<u8>, StoreError> {
484        let key = self.location.object_key(checksum);
485        let bytes = self.runtime.block_on(async {
486            self.get_bytes(&key)
487                .await?
488                .ok_or_else(|| StoreError::ObjectNotFound {
489                    checksum: checksum.to_owned(),
490                })
491        })?;
492
493        // Verify the downloaded blob hashes back to its content-address before
494        // returning it (corruption surfaces as `Integrity`, never bad bytes).
495        let actual = Blake3Hasher::new().hash_hex(&bytes);
496        if actual != checksum {
497            return Err(StoreError::Integrity {
498                address: format!("gs://{}/{key}", self.location.bucket),
499                expected: checksum.to_owned(),
500                actual,
501            });
502        }
503        Ok(bytes)
504    }
505
506    fn put_object(&self, checksum: &str, bytes: Vec<u8>) -> Result<(), StoreError> {
507        // Verify BEFORE uploading: a blob whose bytes do not hash to `checksum`
508        // must never land at that content-address (nothing is stored).
509        let actual = Blake3Hasher::new().hash_hex(&bytes);
510        if actual != checksum {
511            return Err(StoreError::Integrity {
512                address: self.location.object_key(checksum),
513                expected: checksum.to_owned(),
514                actual,
515            });
516        }
517        let key = self.location.object_key(checksum);
518        self.runtime
519            .block_on(async { self.put_bytes(&key, bytes).await })
520    }
521
522    fn put_manifest(&self, id: &str, manifest: &Manifest) -> Result<(), StoreError> {
523        let key = self.location.manifest_key(id);
524        // Mirror the manifest-write tail of `push`: render the oracle's
525        // `echo "${manifest}"` bytes, verify they hash back to `id`, then PUT.
526        let mut text = manifest.to_string();
527        text.push('\n');
528        let actual = Blake3Hasher::new().hash_hex(text.as_bytes());
529        if actual != id {
530            return Err(StoreError::Integrity {
531                address: key,
532                expected: id.to_owned(),
533                actual,
534            });
535        }
536        self.runtime
537            .block_on(async { self.put_bytes(&key, text.into_bytes()).await })
538    }
539}
540
541/// Builds the multi-thread tokio runtime that backs the sync bridge.
542fn build_runtime() -> Result<Runtime, StoreError> {
543    tokio::runtime::Builder::new_multi_thread()
544        .enable_all()
545        .build()
546        .map_err(|e| backend("creating tokio runtime for GcsStore", e))
547}
548
549/// Installs the **`ring`** rustls [`CryptoProvider`] as the process default if
550/// none is set yet. Idempotent: a second call (or a default installed by another
551/// store) is a harmless no-op. This is the load-bearing piece that keeps
552/// `aws-lc-rs` out of the dependency graph — the SDK's `reqwest`
553/// (`rustls-no-provider`) consumes whatever process-default provider is set.
554fn install_ring_provider() {
555    // Ignore the error: it only means a provider was already installed, which is
556    // exactly the state we want.
557    let _ = rustls_ring::crypto::ring::default_provider().install_default();
558}
559
560/// Wraps any backend error into [`StoreError::Backend`] with a message.
561fn backend<E>(message: &str, source: E) -> StoreError
562where
563    E: std::error::Error + Send + Sync + 'static,
564{
565    StoreError::Backend {
566        message: message.to_owned(),
567        source: Some(Box::new(source)),
568    }
569}
570
571/// Classifies a `google-cloud-storage` SDK error as "object is absent".
572///
573/// The SDK reports a missing object two different ways, and the absent-object
574/// paths (`key_exists` -> `Ok(false)`, `get_bytes` -> `Ok(None)`) must treat
575/// BOTH as not-found:
576///
577/// 1. A plain **HTTP 404** (`http_status_code() == Some(404)`), e.g. from a
578///    proxy/load balancer ahead of the service.
579/// 2. A **service-level gRPC-style error** whose `status().code` is
580///    [`Code::NotFound`] but whose `http_status_code()` is `None`. This is what
581///    the v1.x SDK actually returns for `get_object`/`read_object` on a missing
582///    object, and the form the original `== Some(404)`-only check misclassified
583///    as a fatal backend error (it aborted `push` before the first upload).
584///
585/// This mirrors the SDK's own internal classification
586/// (`e.status().is_some_and(|s| s.code == Code::NotFound)`), the GCS analogue of
587/// the aws-sdk's `is_not_found()` used by [`S3Store`](crate::S3Store).
588fn is_not_found(err: &GcsError) -> bool {
589    err.http_status_code() == Some(404)
590        || err
591            .status()
592            .is_some_and(|status| status.code == Code::NotFound)
593}
594
595#[cfg(test)]
596mod tests {
597    use super::*;
598    use snapdir_core::manifest::PathType;
599
600    /// Strips a leading `./` and a trailing `/` from a manifest path. Kept as a
601    /// test-only assertion of the path normalization the orchestrator
602    /// (`crate::push`) performs.
603    fn strip_leading_dot_slash(path: &str) -> &str {
604        let trimmed = path.strip_prefix("./").unwrap_or(path);
605        trimmed.strip_suffix('/').unwrap_or(trimmed)
606    }
607
608    // The canonical content-addressable fixtures (shared across the s3/gcs
609    // store test suites).
610    const FOO_CHECKSUM: &str = "49dc870df1de7fd60794cebce449f5ccdae575affaa67a24b62acb03e039db92";
611    const FOO_SHARDED: &str = "49d/c87/0df/1de7fd60794cebce449f5ccdae575affaa67a24b62acb03e039db92";
612    const MANIFEST_ID: &str = "aa91e498f401ea9e6ddbaa1138a0dbeb030fab8defc1252d80c77ebefafbc70d";
613    const MANIFEST_SHARDED: &str =
614        "aa9/1e4/98f/401ea9e6ddbaa1138a0dbeb030fab8defc1252d80c77ebefafbc70d";
615
616    #[test]
617    fn gcs_store_parses_bucket_and_prefix() {
618        let loc = GcsLocation::parse("gs://my-bucket/long/term/storage");
619        assert_eq!(loc.bucket, "my-bucket");
620        assert_eq!(loc.prefix, "long/term/storage");
621    }
622
623    #[test]
624    fn gcs_store_parse_matches_oracle_cut_and_sed() {
625        // Oracle: bucket = `cut -d'/' -f3`; prefix = `sed -E 's|^gs:/*[^/]*/?||'`
626        // then trailing-slash strip. For "gs://bucket/a/b/c": bucket=bucket,
627        // prefix=a/b/c.
628        let loc = GcsLocation::parse("gs://bucket/a/b/c");
629        assert_eq!(loc.bucket, "bucket");
630        assert_eq!(loc.prefix, "a/b/c");
631    }
632
633    #[test]
634    fn gcs_store_parse_strips_trailing_slash() {
635        // `_snapdir_gcs_store_get_remote_prefix` strips the trailing slash.
636        let loc = GcsLocation::parse("gs://bucket/prefix/");
637        assert_eq!(loc.bucket, "bucket");
638        assert_eq!(loc.prefix, "prefix");
639    }
640
641    #[test]
642    fn gcs_store_parse_bucket_root_has_empty_prefix() {
643        let loc = GcsLocation::parse("gs://bucket");
644        assert_eq!(loc.bucket, "bucket");
645        assert_eq!(loc.prefix, "");
646
647        let loc_slash = GcsLocation::parse("gs://bucket/");
648        assert_eq!(loc_slash.bucket, "bucket");
649        assert_eq!(loc_slash.prefix, "");
650    }
651
652    #[test]
653    fn gcs_store_parse_accepts_bare_bucket_prefix_without_scheme() {
654        let loc = GcsLocation::parse("bucket/some/prefix");
655        assert_eq!(loc.bucket, "bucket");
656        assert_eq!(loc.prefix, "some/prefix");
657    }
658
659    #[test]
660    fn gcs_store_bucket_resource_uses_projects_underscore_form() {
661        let loc = GcsLocation::parse("gs://my-bucket/x");
662        assert_eq!(loc.bucket_resource(), "projects/_/buckets/my-bucket");
663    }
664
665    #[test]
666    fn gcs_store_object_key_matches_sharded_scheme() {
667        let loc = GcsLocation::parse("gs://b/long/term/storage");
668        assert_eq!(
669            loc.object_key(FOO_CHECKSUM),
670            format!("long/term/storage/.objects/{FOO_SHARDED}")
671        );
672    }
673
674    #[test]
675    fn gcs_store_manifest_key_matches_sharded_scheme() {
676        let loc = GcsLocation::parse("gs://b/long/term/storage");
677        assert_eq!(
678            loc.manifest_key(MANIFEST_ID),
679            format!("long/term/storage/.manifests/{MANIFEST_SHARDED}")
680        );
681    }
682
683    #[test]
684    fn gcs_store_keys_have_no_leading_slash_at_bucket_root() {
685        // With an empty prefix the keys are just `.objects/...` / `.manifests/...`.
686        let loc = GcsLocation::parse("gs://bucket");
687        assert_eq!(
688            loc.object_key(FOO_CHECKSUM),
689            format!(".objects/{FOO_SHARDED}")
690        );
691        assert_eq!(
692            loc.manifest_key(MANIFEST_ID),
693            format!(".manifests/{MANIFEST_SHARDED}")
694        );
695    }
696
697    #[test]
698    fn gcs_store_object_key_uses_core_object_path() {
699        // Cross-check that we delegate to the frozen core sharding helper rather
700        // than reimplementing it: at the bucket root the key equals the core
701        // `object_path` output verbatim.
702        let loc = GcsLocation::parse("gs://b");
703        assert_eq!(loc.object_key(FOO_CHECKSUM), object_path(FOO_CHECKSUM));
704        assert_eq!(loc.manifest_key(MANIFEST_ID), manifest_path(MANIFEST_ID));
705    }
706
707    #[test]
708    fn gcs_store_strip_leading_dot_slash() {
709        assert_eq!(strip_leading_dot_slash("./foo"), "foo");
710        assert_eq!(strip_leading_dot_slash("./a/b/c"), "a/b/c");
711        assert_eq!(strip_leading_dot_slash("./a/"), "a");
712        assert_eq!(strip_leading_dot_slash("./"), "");
713    }
714
715    #[test]
716    fn gcs_store_is_not_found_classifies_service_level_not_found_as_absent() {
717        // Regression guard for the push-abort bug: the v1.x SDK reports a
718        // missing object as a *service-level* gRPC error with code NOT_FOUND and
719        // NO HTTP status code (`http_status_code() == None`). The original
720        // `== Some(404)`-only check misclassified this as a fatal backend error,
721        // so `key_exists` errored and `push` aborted before uploading anything.
722        use google_cloud_gax::error::rpc::Status;
723
724        let status = Status::default()
725            .set_code(Code::NotFound)
726            .set_message("No such object: bucket/.manifests/...");
727        let err = GcsError::service(status);
728        // This is the load-bearing assertion: the real-world shape carries no
729        // HTTP code, so a 404-only check would (and did) miss it.
730        assert_eq!(err.http_status_code(), None);
731        assert!(
732            is_not_found(&err),
733            "service-level NOT_FOUND must be classified as object-absent"
734        );
735    }
736
737    #[test]
738    fn gcs_store_is_not_found_classifies_http_404_as_absent() {
739        // The other absent shape: a plain HTTP 404 (e.g. from a proxy/LB ahead
740        // of the service). Must also count as not-found.
741        let err = GcsError::http(404, http::HeaderMap::new(), bytes::Bytes::new());
742        assert!(is_not_found(&err), "HTTP 404 must be classified as absent");
743    }
744
745    #[test]
746    fn gcs_store_is_not_found_does_not_swallow_other_errors() {
747        // Guard the inverse: a non-not-found service error (e.g. PERMISSION
748        // DENIED) and a non-404 HTTP error must NOT be treated as absent, so
749        // real failures still surface instead of being silently skipped.
750        use google_cloud_gax::error::rpc::Status;
751
752        let denied = GcsError::service(Status::default().set_code(Code::PermissionDenied));
753        assert!(!is_not_found(&denied), "PERMISSION_DENIED is not absence");
754
755        let server_err = GcsError::http(503, http::HeaderMap::new(), bytes::Bytes::new());
756        assert!(!is_not_found(&server_err), "HTTP 503 is not absence");
757    }
758
759    #[test]
760    fn gcs_store_install_ring_provider_is_idempotent() {
761        // Installing the ring provider twice must not panic; the second call is a
762        // harmless no-op (a provider is already the process default).
763        install_ring_provider();
764        install_ring_provider();
765    }
766
767    // --- Live round-trip, skipped by default --------------------------------
768    //
769    // Requires real Google Cloud credentials (ADC) plus a writable bucket.
770    // Gated behind `SNAPDIR_GCS_TEST_STORE` (a `gs://bucket/prefix` URL) so it is
771    // skipped unless explicitly configured. Real round-trips are exercised by
772    // the later `remote-interop` gate.
773    #[test]
774    fn gcs_store_live_round_trip_when_configured() {
775        use snapdir_core::manifest::ManifestEntry;
776
777        let Ok(store) = std::env::var("SNAPDIR_GCS_TEST_STORE") else {
778            eprintln!(
779                "skipping gcs_store live round-trip: set SNAPDIR_GCS_TEST_STORE \
780                 (gs://bucket/prefix) plus ADC credentials to run it"
781            );
782            return;
783        };
784
785        let hasher = Blake3Hasher::new();
786
787        // Build a tiny source tree + matching manifest.
788        let src = std::env::temp_dir().join(format!("snapdir-gcs-live-{}", std::process::id()));
789        std::fs::create_dir_all(&src).unwrap();
790        std::fs::write(src.join("foo"), b"foo\n").unwrap();
791        let foo_sum = hasher.hash_hex(b"foo\n");
792        let root_sum = snapdir_core::merkle::directory_checksum([foo_sum.as_str()], &hasher);
793        let mut manifest = Manifest::new();
794        manifest.push(ManifestEntry::new(
795            PathType::Directory,
796            "700",
797            root_sum,
798            4,
799            "./",
800        ));
801        manifest.push(ManifestEntry::new(
802            PathType::File,
803            "600",
804            foo_sum,
805            4,
806            "./foo",
807        ));
808        let manifest = Manifest::from_entries(manifest.entries().to_vec());
809        let id = snapdir_core::merkle::snapshot_id(&manifest, &hasher);
810
811        let gcs = GcsStore::connect(&store).expect("connect");
812        gcs.push(&manifest, &src).expect("push");
813        let read_back = gcs.get_manifest(&id).expect("get_manifest");
814        assert_eq!(read_back, manifest);
815
816        let dest = std::env::temp_dir().join(format!("snapdir-gcs-dest-{}", std::process::id()));
817        std::fs::create_dir_all(&dest).unwrap();
818        gcs.fetch_files(&read_back, &dest).expect("fetch_files");
819        assert_eq!(std::fs::read(dest.join("foo")).unwrap(), b"foo\n");
820
821        let _ = std::fs::remove_dir_all(&src);
822        let _ = std::fs::remove_dir_all(&dest);
823    }
824}