snapdir_stores/gcs_store.rs
1//! `GcsStore`: the `gs://` storage backend, backed by the native
2//! `google-cloud-storage` SDK.
3//!
4//! A [`GcsStore`] targets a `gs://bucket/prefix` location and holds the frozen
5//! content-addressable `.objects`/`.manifests` sharded layout, so a
6//! bucket/prefix is interchangeable across conforming implementations:
7//!
8//! ```text
9//! gs://<bucket>/<prefix>/.objects/<sharded checksum> raw object bytes
10//! gs://<bucket>/<prefix>/.manifests/<sharded snapshot id> manifest text
11//! ```
12//!
13//! Sharding and the relative keys come straight from [`snapdir_core::store`]
14//! ([`object_path`] / [`manifest_path`]); this module never reimplements them.
15//!
16//! # `gs://` parsing
17//!
18//! The oracle derives the bucket as `cut -d'/' -f3` of the URL
19//! (`_snapdir_export_store_vars`) and the prefix with
20//! `sed -E 's|^gs:/*[^/]*/?||'` then a trailing-slash strip
21//! (`_snapdir_gcs_store_get_remote_prefix`). [`GcsLocation::parse`] reproduces
22//! that exactly: bucket = first segment after the scheme, prefix = the
23//! remainder with leading/trailing slashes removed.
24//!
25//! # Credentials
26//!
27//! Authentication is delegated entirely to the SDK's own credential chain
28//! (Application Default Credentials): `GOOGLE_APPLICATION_CREDENTIALS`,
29//! `GOOGLE_APPLICATION_CREDENTIALS_JSON`, `gcloud` user creds, and the GCE/GKE
30//! metadata server. No bespoke snapdir credential variables are introduced.
31//!
32//! # TLS provider (project-load-bearing)
33//!
34//! The shipped binary must statically link on musl, so the workspace
35//! standardizes on the **`ring`** rustls provider; `aws-lc-rs` is banned. The
36//! `google-cloud-storage` default features pull `aws-lc-rs` in via
37//! `google-cloud-auth` (both its id-token backend and its rustls provider), so
38//! we depend on the crate with `default-features = false` and instead install
39//! the **ring** [`CryptoProvider`](rustls_ring::crypto::CryptoProvider) as the
40//! rustls *process default* (the SDK's `reqwest` is built with
41//! `rustls-no-provider`, i.e. it consumes that process default). See the crate
42//! `Cargo.toml` for the full rationale.
43//!
44//! # Sync trait, async SDK
45//!
46//! The SDK is async. [`GcsStore`] owns a private multi-thread `tokio` runtime
47//! and bridges each [`Store`] method with `runtime.block_on(...)`, exactly like
48//! [`S3Store`](crate::S3Store), so no `async` leaks into `snapdir-core`.
49
50use std::path::Path;
51use std::sync::Arc;
52
53use google_cloud_gax::error::rpc::Code;
54use google_cloud_gax::error::Error as GcsError;
55use google_cloud_storage::client::{Storage, StorageControl};
56use snapdir_core::manifest::Manifest;
57use snapdir_core::merkle::{Blake3Hasher, Hasher};
58use snapdir_core::store::{manifest_path, object_path, Store, StoreError};
59use snapdir_core::Meter;
60
61use crate::fetch::fetch_files_concurrent;
62use crate::push::{push_objects_concurrent, upload_object};
63use crate::stream::StreamStore;
64use crate::transfer::{RateLimiter, TransferConfig};
65use tokio::runtime::Runtime;
66
67/// Number of times a fetch is retried when the downloaded bytes fail their
68/// checksum, mirroring the oracle's `_SNAPDIR_GCS_STORE_RETRIES` default of 5.
69const MAX_FETCH_RETRIES: u32 = 5;
70
71/// The parsed location a [`GcsStore`] targets: a GCS bucket plus a key prefix.
72#[derive(Debug, Clone, PartialEq, Eq)]
73pub struct GcsLocation {
74 /// The bucket name (first path segment of the `gs://` URL).
75 pub bucket: String,
76 /// The object-name prefix (remaining segments), with no leading or trailing
77 /// slash. Empty when the store points at the bucket root.
78 pub prefix: String,
79}
80
81impl GcsLocation {
82 /// Parses a `gs://bucket/prefix` URL into its bucket and prefix.
83 ///
84 /// Matches the oracle exactly: the bucket is `cut -d'/' -f3` of the store
85 /// URL (the segment immediately after `gs://`), and the prefix is
86 /// `sed -E 's|^gs:/*[^/]*/?||'` with a trailing slash stripped
87 /// (`_snapdir_gcs_store_get_remote_prefix`).
88 ///
89 /// The `gs://` scheme is optional; a bare `bucket/prefix` is accepted too.
90 #[must_use]
91 pub fn parse(store_url: &str) -> Self {
92 // Drop the scheme (`gs://`, or any `<proto>://`) if present. The oracle
93 // splits the full URL on `/` and takes field 3 as the bucket, which for
94 // `gs://bucket/...` is exactly the segment after the `//`.
95 let without_scheme = match store_url.find("://") {
96 Some(idx) => &store_url[idx + 3..],
97 None => store_url,
98 };
99 let mut parts = without_scheme.splitn(2, '/');
100 let bucket = parts.next().unwrap_or("").to_owned();
101 let prefix = parts.next().unwrap_or("");
102 // Strip a leading slash (empty-first-segment edge case) and the trailing
103 // slash; the prefix is joined back with a single `/`.
104 let prefix = prefix
105 .trim_end_matches('/')
106 .trim_start_matches('/')
107 .to_owned();
108 Self { bucket, prefix }
109 }
110
111 /// The GCS resource name for this bucket, `projects/_/buckets/<bucket>`, as
112 /// required by the `google-cloud-storage` v1 API.
113 #[must_use]
114 pub fn bucket_resource(&self) -> String {
115 format!("projects/_/buckets/{}", self.bucket)
116 }
117
118 /// Returns the full GCS object name for a content object given its checksum,
119 /// i.e. `<prefix>/.objects/<sharded>` (no leading slash).
120 #[must_use]
121 pub fn object_key(&self, checksum: &str) -> String {
122 self.key_for(&object_path(checksum))
123 }
124
125 /// Returns the full GCS object name for a manifest given its snapshot id,
126 /// i.e. `<prefix>/.manifests/<sharded>` (no leading slash).
127 #[must_use]
128 pub fn manifest_key(&self, id: &str) -> String {
129 self.key_for(&manifest_path(id))
130 }
131
132 /// Joins the store prefix with a store-relative path (`.objects/...` or
133 /// `.manifests/...`), producing a leading-slash-free object name. Mirrors
134 /// the oracle's `${_SNAPDIR_STORE_BASE_DIR%/}/${source_path#/}` with the
135 /// leading slash trimmed.
136 fn key_for(&self, rel: &str) -> String {
137 let rel = rel.trim_start_matches('/');
138 if self.prefix.is_empty() {
139 rel.to_owned()
140 } else {
141 format!("{}/{rel}", self.prefix)
142 }
143 }
144}
145
146/// A content-addressable store backed by a Google Cloud Storage bucket.
147///
148/// Construct one with [`GcsStore::connect`] (resolves Application Default
149/// Credentials via the SDK).
150pub struct GcsStore {
151 /// Data-plane client (object read/write).
152 storage: Storage,
153 /// Control-plane client (object metadata, used for HEAD-like existence
154 /// checks without downloading the body).
155 control: StorageControl,
156 location: GcsLocation,
157 runtime: Arc<Runtime>,
158 config: TransferConfig,
159 /// Optional progress meter; recorded into during transfers. `None` (the
160 /// default from every constructor) means zero recording and byte-identical
161 /// behavior. Set by the CLI via [`GcsStore::with_meter`].
162 meter: Option<Arc<Meter>>,
163}
164
165impl GcsStore {
166 /// Connects to the `gs://bucket/prefix` store, resolving credentials via the
167 /// SDK's Application Default Credentials chain.
168 ///
169 /// The `ring` rustls crypto provider is installed as the process default
170 /// (idempotent) before the clients are built, keeping `aws-lc-rs` out of the
171 /// graph (see the module docs).
172 ///
173 /// # Errors
174 ///
175 /// [`StoreError::Backend`] if the tokio runtime cannot be created or the SDK
176 /// clients cannot be built (e.g. credentials cannot be resolved).
177 pub fn connect(store_url: &str) -> Result<Self, StoreError> {
178 Self::connect_with(store_url, TransferConfig::default())
179 }
180
181 /// Like [`connect`](Self::connect), but carries a [`TransferConfig`] for
182 /// concurrency / bandwidth control. The existing [`connect`](Self::connect)
183 /// delegates here with [`TransferConfig::default`].
184 ///
185 /// # Errors
186 ///
187 /// [`StoreError::Backend`] if the tokio runtime cannot be created or the SDK
188 /// clients cannot be built (e.g. credentials cannot be resolved).
189 pub fn connect_with(store_url: &str, config: TransferConfig) -> Result<Self, StoreError> {
190 let location = GcsLocation::parse(store_url);
191 let runtime = build_runtime()?;
192 install_ring_provider();
193
194 let (storage, control) = runtime.block_on(async {
195 let storage = Storage::builder()
196 .build()
197 .await
198 .map_err(|e| backend("building GCS Storage client", e))?;
199 let control = StorageControl::builder()
200 .build()
201 .await
202 .map_err(|e| backend("building GCS StorageControl client", e))?;
203 Ok::<_, StoreError>((storage, control))
204 })?;
205
206 Ok(Self {
207 storage,
208 control,
209 location,
210 runtime: Arc::new(runtime),
211 config,
212 meter: None,
213 })
214 }
215
216 /// Builds a store from already-configured SDK clients and a parsed location,
217 /// owning a fresh tokio runtime for the sync bridge. Intended for tests.
218 ///
219 /// # Errors
220 ///
221 /// [`StoreError::Backend`] if the tokio runtime cannot be created.
222 pub fn from_clients(
223 storage: Storage,
224 control: StorageControl,
225 location: GcsLocation,
226 ) -> Result<Self, StoreError> {
227 Ok(Self {
228 storage,
229 control,
230 location,
231 runtime: Arc::new(build_runtime()?),
232 config: TransferConfig::default(),
233 meter: None,
234 })
235 }
236
237 /// Attaches (or clears) an optional progress [`Meter`], rides alongside
238 /// [`config`](Self::transfer_config). The transfer paths record bytes-in /
239 /// bytes-out + per-object progress into it; `None` (the constructor default)
240 /// means zero recording and byte-identical behavior. The CLI sets this after
241 /// construction.
242 #[must_use]
243 pub fn with_meter(mut self, meter: Option<Arc<Meter>>) -> Self {
244 self.meter = meter;
245 self
246 }
247
248 /// The parsed bucket/prefix this store targets.
249 #[must_use]
250 pub fn location(&self) -> &GcsLocation {
251 &self.location
252 }
253
254 /// The [`TransferConfig`] (concurrency / bandwidth) this store was built
255 /// with. Consumed by the transfer loops in later gates.
256 #[must_use]
257 pub fn transfer_config(&self) -> &TransferConfig {
258 &self.config
259 }
260
261 /// Metadata HEAD on an object key; `Ok(true)` if it exists, `Ok(false)` if
262 /// absent (see [`is_not_found`] for what counts as absent).
263 async fn key_exists(&self, key: &str) -> Result<bool, StoreError> {
264 match self
265 .control
266 .get_object()
267 .set_bucket(self.location.bucket_resource())
268 .set_object(key)
269 .send()
270 .await
271 {
272 Ok(_) => Ok(true),
273 Err(err) => {
274 if is_not_found(&err) {
275 Ok(false)
276 } else {
277 Err(backend("GCS get_object metadata failed", err))
278 }
279 }
280 }
281 }
282
283 /// GET an object key's full body, draining the read stream, or `None` if it
284 /// is absent (see [`is_not_found`] for what counts as absent).
285 async fn get_bytes(&self, key: &str) -> Result<Option<Vec<u8>>, StoreError> {
286 let mut resp = match self
287 .storage
288 .read_object(self.location.bucket_resource(), key)
289 .send()
290 .await
291 {
292 Ok(resp) => resp,
293 Err(err) => {
294 if is_not_found(&err) {
295 return Ok(None);
296 }
297 return Err(backend("GCS read_object failed", err));
298 }
299 };
300
301 let mut buf = Vec::new();
302 while let Some(chunk) = resp.next().await {
303 let chunk = chunk.map_err(|e| backend("reading GCS object body", e))?;
304 buf.extend_from_slice(&chunk);
305 }
306 Ok(Some(buf))
307 }
308
309 /// PUT `bytes` at `key`. GCS object writes are atomic (the new object is only
310 /// visible once fully uploaded), so no temp-key dance is needed — matching
311 /// the oracle's reliance on `gcloud storage cp` atomicity.
312 async fn put_bytes(&self, key: &str, bytes: Vec<u8>) -> Result<(), StoreError> {
313 // The SDK's upload future is large (>30 KiB); box it so it does not
314 // bloat the enclosing `push` future (clippy::large_futures).
315 let upload = self
316 .storage
317 .write_object(
318 self.location.bucket_resource(),
319 key,
320 bytes::Bytes::from(bytes),
321 )
322 .send_buffered();
323 Box::pin(upload)
324 .await
325 .map_err(|e| backend("GCS write_object failed", e))?;
326 Ok(())
327 }
328
329 /// Downloads `key`, verifying its BLAKE3 against `expected`, retrying up to
330 /// [`MAX_FETCH_RETRIES`] times. Mirrors `_snapdir_gcs_fetch_to_cache`.
331 async fn fetch_verified(&self, key: &str, expected: &str) -> Result<Vec<u8>, StoreError> {
332 let hasher = Blake3Hasher::new();
333 let mut attempts_left = MAX_FETCH_RETRIES;
334 loop {
335 match self.get_bytes(key).await? {
336 Some(bytes) => {
337 let actual = hasher.hash_hex(&bytes);
338 if actual == expected {
339 return Ok(bytes);
340 }
341 // Mismatched checksum after fetching: retry (the oracle
342 // decrements its retry budget on the same condition).
343 attempts_left = attempts_left.saturating_sub(1);
344 if attempts_left == 0 {
345 return Err(StoreError::Integrity {
346 address: format!("gs://{}/{key}", self.location.bucket),
347 expected: expected.to_owned(),
348 actual,
349 });
350 }
351 }
352 None => {
353 // Treat a missing key as not-found rather than spinning.
354 return Err(StoreError::ObjectNotFound {
355 checksum: expected.to_owned(),
356 });
357 }
358 }
359 }
360 }
361}
362
363impl Store for GcsStore {
364 fn get_manifest(&self, id: &str) -> Result<Manifest, StoreError> {
365 let key = self.location.manifest_key(id);
366 let bytes = self.runtime.block_on(async {
367 match self.get_bytes(&key).await? {
368 Some(b) => Ok(b),
369 None => Err(StoreError::ManifestNotFound { id: id.to_owned() }),
370 }
371 })?;
372
373 let text = String::from_utf8(bytes).map_err(|err| StoreError::Backend {
374 message: format!("manifest {id} is not valid UTF-8"),
375 source: Some(Box::new(err)),
376 })?;
377 let manifest = Manifest::parse(&text)?;
378
379 // Verify the stored manifest hashes back to its snapshot id before
380 // trusting it (oracle: the id check on fetch).
381 let actual = snapdir_core::merkle::snapshot_id(&manifest, &Blake3Hasher::new());
382 if actual != id {
383 return Err(StoreError::Integrity {
384 address: self.location.manifest_key(id),
385 expected: id.to_owned(),
386 actual,
387 });
388 }
389 Ok(manifest)
390 }
391
392 fn fetch_files(&self, manifest: &Manifest, dest: &Path) -> Result<(), StoreError> {
393 // Concurrent download via the shared orchestrator: it owns the
394 // skip-if-present-and-verified short-circuit, directory creation, the
395 // bounded-concurrency pass, the per-object rate limit, and the atomic
396 // write. GCS only injects the per-object download, preserving the
397 // BLAKE3-verify + retry discipline of `fetch_verified`.
398 let limiter = RateLimiter::new(self.config.max_bytes_per_sec);
399 let meter = self.meter.as_deref();
400 self.runtime.block_on(async {
401 fetch_files_concurrent(
402 manifest,
403 dest,
404 &self.config,
405 &limiter,
406 meter,
407 |entry| async {
408 let key = self.location.object_key(&entry.checksum);
409 self.fetch_verified(&key, &entry.checksum).await
410 },
411 )
412 .await
413 })
414 }
415
416 fn push(&self, manifest: &Manifest, source: &Path) -> Result<(), StoreError> {
417 let hasher = Blake3Hasher::new();
418 let id = snapdir_core::merkle::snapshot_id(manifest, &hasher);
419
420 // Concurrent upload via the shared orchestrator: it owns the bounded
421 // per-object pass and the manifest-last / all-or-nothing ordering. GCS
422 // injects the per-object skip-present + upload (via `upload_object`,
423 // which also owns the shared read+verify) and the manifest-write
424 // closure. A failed push writes NO manifest.
425 let limiter = RateLimiter::new(self.config.max_bytes_per_sec);
426 let meter = self.meter.as_deref();
427 self.runtime.block_on(async {
428 // Skip-if-manifest-present pre-check: a present manifest implies all
429 // its objects are present (we always write the manifest last).
430 let manifest_key = self.location.manifest_key(&id);
431 if self.key_exists(&manifest_key).await? {
432 return Ok(());
433 }
434
435 push_objects_concurrent(
436 manifest,
437 &self.config,
438 meter,
439 |entry| {
440 let object_key = self.location.object_key(&entry.checksum);
441 upload_object(
442 entry,
443 object_key,
444 source,
445 &limiter,
446 meter,
447 |key| async move { self.key_exists(&key).await },
448 |key, bytes| async move { self.put_bytes(&key, bytes).await },
449 )
450 },
451 || async {
452 // Write the manifest last (verified to hash back to its id),
453 // exactly as the oracle stores the manifest text.
454 let mut text = manifest.to_string();
455 text.push('\n');
456 let manifest_actual = hasher.hash_hex(text.as_bytes());
457 if manifest_actual != id {
458 return Err(StoreError::Integrity {
459 address: manifest_key.clone(),
460 expected: id.clone(),
461 actual: manifest_actual,
462 });
463 }
464 self.put_bytes(&manifest_key, text.into_bytes()).await
465 },
466 )
467 .await
468 })
469 }
470}
471
472impl StreamStore for GcsStore {
473 fn has_object(&self, checksum: &str) -> Result<bool, StoreError> {
474 let key = self.location.object_key(checksum);
475 self.runtime.block_on(async { self.key_exists(&key).await })
476 }
477
478 fn get_object(&self, checksum: &str) -> Result<Vec<u8>, StoreError> {
479 let key = self.location.object_key(checksum);
480 let bytes = self.runtime.block_on(async {
481 self.get_bytes(&key)
482 .await?
483 .ok_or_else(|| StoreError::ObjectNotFound {
484 checksum: checksum.to_owned(),
485 })
486 })?;
487
488 // Verify the downloaded blob hashes back to its content-address before
489 // returning it (corruption surfaces as `Integrity`, never bad bytes).
490 let actual = Blake3Hasher::new().hash_hex(&bytes);
491 if actual != checksum {
492 return Err(StoreError::Integrity {
493 address: format!("gs://{}/{key}", self.location.bucket),
494 expected: checksum.to_owned(),
495 actual,
496 });
497 }
498 Ok(bytes)
499 }
500
501 fn put_object(&self, checksum: &str, bytes: Vec<u8>) -> Result<(), StoreError> {
502 // Verify BEFORE uploading: a blob whose bytes do not hash to `checksum`
503 // must never land at that content-address (nothing is stored).
504 let actual = Blake3Hasher::new().hash_hex(&bytes);
505 if actual != checksum {
506 return Err(StoreError::Integrity {
507 address: self.location.object_key(checksum),
508 expected: checksum.to_owned(),
509 actual,
510 });
511 }
512 let key = self.location.object_key(checksum);
513 self.runtime
514 .block_on(async { self.put_bytes(&key, bytes).await })
515 }
516
517 fn put_manifest(&self, id: &str, manifest: &Manifest) -> Result<(), StoreError> {
518 let key = self.location.manifest_key(id);
519 // Mirror the manifest-write tail of `push`: render the oracle's
520 // `echo "${manifest}"` bytes, verify they hash back to `id`, then PUT.
521 let mut text = manifest.to_string();
522 text.push('\n');
523 let actual = Blake3Hasher::new().hash_hex(text.as_bytes());
524 if actual != id {
525 return Err(StoreError::Integrity {
526 address: key,
527 expected: id.to_owned(),
528 actual,
529 });
530 }
531 self.runtime
532 .block_on(async { self.put_bytes(&key, text.into_bytes()).await })
533 }
534}
535
536/// Builds the multi-thread tokio runtime that backs the sync bridge.
537fn build_runtime() -> Result<Runtime, StoreError> {
538 tokio::runtime::Builder::new_multi_thread()
539 .enable_all()
540 .build()
541 .map_err(|e| backend("creating tokio runtime for GcsStore", e))
542}
543
544/// Installs the **`ring`** rustls [`CryptoProvider`] as the process default if
545/// none is set yet. Idempotent: a second call (or a default installed by another
546/// store) is a harmless no-op. This is the load-bearing piece that keeps
547/// `aws-lc-rs` out of the dependency graph — the SDK's `reqwest`
548/// (`rustls-no-provider`) consumes whatever process-default provider is set.
549fn install_ring_provider() {
550 // Ignore the error: it only means a provider was already installed, which is
551 // exactly the state we want.
552 let _ = rustls_ring::crypto::ring::default_provider().install_default();
553}
554
555/// Wraps any backend error into [`StoreError::Backend`] with a message.
556fn backend<E>(message: &str, source: E) -> StoreError
557where
558 E: std::error::Error + Send + Sync + 'static,
559{
560 StoreError::Backend {
561 message: message.to_owned(),
562 source: Some(Box::new(source)),
563 }
564}
565
566/// Classifies a `google-cloud-storage` SDK error as "object is absent".
567///
568/// The SDK reports a missing object two different ways, and the absent-object
569/// paths (`key_exists` -> `Ok(false)`, `get_bytes` -> `Ok(None)`) must treat
570/// BOTH as not-found:
571///
572/// 1. A plain **HTTP 404** (`http_status_code() == Some(404)`), e.g. from a
573/// proxy/load balancer ahead of the service.
574/// 2. A **service-level gRPC-style error** whose `status().code` is
575/// [`Code::NotFound`] but whose `http_status_code()` is `None`. This is what
576/// the v1.x SDK actually returns for `get_object`/`read_object` on a missing
577/// object, and the form the original `== Some(404)`-only check misclassified
578/// as a fatal backend error (it aborted `push` before the first upload).
579///
580/// This mirrors the SDK's own internal classification
581/// (`e.status().is_some_and(|s| s.code == Code::NotFound)`), the GCS analogue of
582/// the aws-sdk's `is_not_found()` used by [`S3Store`](crate::S3Store).
583fn is_not_found(err: &GcsError) -> bool {
584 err.http_status_code() == Some(404)
585 || err
586 .status()
587 .is_some_and(|status| status.code == Code::NotFound)
588}
589
590#[cfg(test)]
591mod tests {
592 use super::*;
593 use snapdir_core::manifest::PathType;
594
595 /// Strips a leading `./` and a trailing `/` from a manifest path. Kept as a
596 /// test-only assertion of the path normalization the orchestrator
597 /// (`crate::push`) performs.
598 fn strip_leading_dot_slash(path: &str) -> &str {
599 let trimmed = path.strip_prefix("./").unwrap_or(path);
600 trimmed.strip_suffix('/').unwrap_or(trimmed)
601 }
602
603 // The canonical content-addressable fixtures (shared across the s3/gcs
604 // store test suites).
605 const FOO_CHECKSUM: &str = "49dc870df1de7fd60794cebce449f5ccdae575affaa67a24b62acb03e039db92";
606 const FOO_SHARDED: &str = "49d/c87/0df/1de7fd60794cebce449f5ccdae575affaa67a24b62acb03e039db92";
607 const MANIFEST_ID: &str = "aa91e498f401ea9e6ddbaa1138a0dbeb030fab8defc1252d80c77ebefafbc70d";
608 const MANIFEST_SHARDED: &str =
609 "aa9/1e4/98f/401ea9e6ddbaa1138a0dbeb030fab8defc1252d80c77ebefafbc70d";
610
611 #[test]
612 fn gcs_store_parses_bucket_and_prefix() {
613 let loc = GcsLocation::parse("gs://my-bucket/long/term/storage");
614 assert_eq!(loc.bucket, "my-bucket");
615 assert_eq!(loc.prefix, "long/term/storage");
616 }
617
618 #[test]
619 fn gcs_store_parse_matches_oracle_cut_and_sed() {
620 // Oracle: bucket = `cut -d'/' -f3`; prefix = `sed -E 's|^gs:/*[^/]*/?||'`
621 // then trailing-slash strip. For "gs://bucket/a/b/c": bucket=bucket,
622 // prefix=a/b/c.
623 let loc = GcsLocation::parse("gs://bucket/a/b/c");
624 assert_eq!(loc.bucket, "bucket");
625 assert_eq!(loc.prefix, "a/b/c");
626 }
627
628 #[test]
629 fn gcs_store_parse_strips_trailing_slash() {
630 // `_snapdir_gcs_store_get_remote_prefix` strips the trailing slash.
631 let loc = GcsLocation::parse("gs://bucket/prefix/");
632 assert_eq!(loc.bucket, "bucket");
633 assert_eq!(loc.prefix, "prefix");
634 }
635
636 #[test]
637 fn gcs_store_parse_bucket_root_has_empty_prefix() {
638 let loc = GcsLocation::parse("gs://bucket");
639 assert_eq!(loc.bucket, "bucket");
640 assert_eq!(loc.prefix, "");
641
642 let loc_slash = GcsLocation::parse("gs://bucket/");
643 assert_eq!(loc_slash.bucket, "bucket");
644 assert_eq!(loc_slash.prefix, "");
645 }
646
647 #[test]
648 fn gcs_store_parse_accepts_bare_bucket_prefix_without_scheme() {
649 let loc = GcsLocation::parse("bucket/some/prefix");
650 assert_eq!(loc.bucket, "bucket");
651 assert_eq!(loc.prefix, "some/prefix");
652 }
653
654 #[test]
655 fn gcs_store_bucket_resource_uses_projects_underscore_form() {
656 let loc = GcsLocation::parse("gs://my-bucket/x");
657 assert_eq!(loc.bucket_resource(), "projects/_/buckets/my-bucket");
658 }
659
660 #[test]
661 fn gcs_store_object_key_matches_sharded_scheme() {
662 let loc = GcsLocation::parse("gs://b/long/term/storage");
663 assert_eq!(
664 loc.object_key(FOO_CHECKSUM),
665 format!("long/term/storage/.objects/{FOO_SHARDED}")
666 );
667 }
668
669 #[test]
670 fn gcs_store_manifest_key_matches_sharded_scheme() {
671 let loc = GcsLocation::parse("gs://b/long/term/storage");
672 assert_eq!(
673 loc.manifest_key(MANIFEST_ID),
674 format!("long/term/storage/.manifests/{MANIFEST_SHARDED}")
675 );
676 }
677
678 #[test]
679 fn gcs_store_keys_have_no_leading_slash_at_bucket_root() {
680 // With an empty prefix the keys are just `.objects/...` / `.manifests/...`.
681 let loc = GcsLocation::parse("gs://bucket");
682 assert_eq!(
683 loc.object_key(FOO_CHECKSUM),
684 format!(".objects/{FOO_SHARDED}")
685 );
686 assert_eq!(
687 loc.manifest_key(MANIFEST_ID),
688 format!(".manifests/{MANIFEST_SHARDED}")
689 );
690 }
691
692 #[test]
693 fn gcs_store_object_key_uses_core_object_path() {
694 // Cross-check that we delegate to the frozen core sharding helper rather
695 // than reimplementing it: at the bucket root the key equals the core
696 // `object_path` output verbatim.
697 let loc = GcsLocation::parse("gs://b");
698 assert_eq!(loc.object_key(FOO_CHECKSUM), object_path(FOO_CHECKSUM));
699 assert_eq!(loc.manifest_key(MANIFEST_ID), manifest_path(MANIFEST_ID));
700 }
701
702 #[test]
703 fn gcs_store_strip_leading_dot_slash() {
704 assert_eq!(strip_leading_dot_slash("./foo"), "foo");
705 assert_eq!(strip_leading_dot_slash("./a/b/c"), "a/b/c");
706 assert_eq!(strip_leading_dot_slash("./a/"), "a");
707 assert_eq!(strip_leading_dot_slash("./"), "");
708 }
709
710 #[test]
711 fn gcs_store_is_not_found_classifies_service_level_not_found_as_absent() {
712 // Regression guard for the push-abort bug: the v1.x SDK reports a
713 // missing object as a *service-level* gRPC error with code NOT_FOUND and
714 // NO HTTP status code (`http_status_code() == None`). The original
715 // `== Some(404)`-only check misclassified this as a fatal backend error,
716 // so `key_exists` errored and `push` aborted before uploading anything.
717 use google_cloud_gax::error::rpc::Status;
718
719 let status = Status::default()
720 .set_code(Code::NotFound)
721 .set_message("No such object: bucket/.manifests/...");
722 let err = GcsError::service(status);
723 // This is the load-bearing assertion: the real-world shape carries no
724 // HTTP code, so a 404-only check would (and did) miss it.
725 assert_eq!(err.http_status_code(), None);
726 assert!(
727 is_not_found(&err),
728 "service-level NOT_FOUND must be classified as object-absent"
729 );
730 }
731
732 #[test]
733 fn gcs_store_is_not_found_classifies_http_404_as_absent() {
734 // The other absent shape: a plain HTTP 404 (e.g. from a proxy/LB ahead
735 // of the service). Must also count as not-found.
736 let err = GcsError::http(404, http::HeaderMap::new(), bytes::Bytes::new());
737 assert!(is_not_found(&err), "HTTP 404 must be classified as absent");
738 }
739
740 #[test]
741 fn gcs_store_is_not_found_does_not_swallow_other_errors() {
742 // Guard the inverse: a non-not-found service error (e.g. PERMISSION
743 // DENIED) and a non-404 HTTP error must NOT be treated as absent, so
744 // real failures still surface instead of being silently skipped.
745 use google_cloud_gax::error::rpc::Status;
746
747 let denied = GcsError::service(Status::default().set_code(Code::PermissionDenied));
748 assert!(!is_not_found(&denied), "PERMISSION_DENIED is not absence");
749
750 let server_err = GcsError::http(503, http::HeaderMap::new(), bytes::Bytes::new());
751 assert!(!is_not_found(&server_err), "HTTP 503 is not absence");
752 }
753
754 #[test]
755 fn gcs_store_install_ring_provider_is_idempotent() {
756 // Installing the ring provider twice must not panic; the second call is a
757 // harmless no-op (a provider is already the process default).
758 install_ring_provider();
759 install_ring_provider();
760 }
761
762 // --- Live round-trip, skipped by default --------------------------------
763 //
764 // Requires real Google Cloud credentials (ADC) plus a writable bucket.
765 // Gated behind `SNAPDIR_GCS_TEST_STORE` (a `gs://bucket/prefix` URL) so it is
766 // skipped unless explicitly configured. Real round-trips are exercised by
767 // the later `remote-interop` gate.
768 #[test]
769 fn gcs_store_live_round_trip_when_configured() {
770 use snapdir_core::manifest::ManifestEntry;
771
772 let Ok(store) = std::env::var("SNAPDIR_GCS_TEST_STORE") else {
773 eprintln!(
774 "skipping gcs_store live round-trip: set SNAPDIR_GCS_TEST_STORE \
775 (gs://bucket/prefix) plus ADC credentials to run it"
776 );
777 return;
778 };
779
780 let hasher = Blake3Hasher::new();
781
782 // Build a tiny source tree + matching manifest.
783 let src = std::env::temp_dir().join(format!("snapdir-gcs-live-{}", std::process::id()));
784 std::fs::create_dir_all(&src).unwrap();
785 std::fs::write(src.join("foo"), b"foo\n").unwrap();
786 let foo_sum = hasher.hash_hex(b"foo\n");
787 let root_sum = snapdir_core::merkle::directory_checksum([foo_sum.as_str()], &hasher);
788 let mut manifest = Manifest::new();
789 manifest.push(ManifestEntry::new(
790 PathType::Directory,
791 "700",
792 root_sum,
793 4,
794 "./",
795 ));
796 manifest.push(ManifestEntry::new(
797 PathType::File,
798 "600",
799 foo_sum,
800 4,
801 "./foo",
802 ));
803 let manifest = Manifest::from_entries(manifest.entries().to_vec());
804 let id = snapdir_core::merkle::snapshot_id(&manifest, &hasher);
805
806 let gcs = GcsStore::connect(&store).expect("connect");
807 gcs.push(&manifest, &src).expect("push");
808 let read_back = gcs.get_manifest(&id).expect("get_manifest");
809 assert_eq!(read_back, manifest);
810
811 let dest = std::env::temp_dir().join(format!("snapdir-gcs-dest-{}", std::process::id()));
812 std::fs::create_dir_all(&dest).unwrap();
813 gcs.fetch_files(&read_back, &dest).expect("fetch_files");
814 assert_eq!(std::fs::read(dest.join("foo")).unwrap(), b"foo\n");
815
816 let _ = std::fs::remove_dir_all(&src);
817 let _ = std::fs::remove_dir_all(&dest);
818 }
819}