snapdir_stores/gcs_store.rs
1//! `GcsStore`: the `gs://` storage backend, backed by the native
2//! `google-cloud-storage` SDK.
3//!
4//! A [`GcsStore`] targets a `gs://bucket/prefix` location and holds the frozen
5//! content-addressable `.objects`/`.manifests` sharded layout, so a
6//! bucket/prefix is interchangeable across conforming implementations:
7//!
8//! ```text
9//! gs://<bucket>/<prefix>/.objects/<sharded checksum> raw object bytes
10//! gs://<bucket>/<prefix>/.manifests/<sharded snapshot id> manifest text
11//! ```
12//!
13//! Sharding and the relative keys come straight from [`snapdir_core::store`]
14//! ([`object_path`] / [`manifest_path`]); this module never reimplements them.
15//!
16//! # `gs://` parsing
17//!
18//! The oracle derives the bucket as `cut -d'/' -f3` of the URL
19//! (`_snapdir_export_store_vars`) and the prefix with
20//! `sed -E 's|^gs:/*[^/]*/?||'` then a trailing-slash strip
21//! (`_snapdir_gcs_store_get_remote_prefix`). [`GcsLocation::parse`] reproduces
22//! that exactly: bucket = first segment after the scheme, prefix = the
23//! remainder with leading/trailing slashes removed.
24//!
25//! # Credentials
26//!
27//! Authentication is delegated entirely to the SDK's own credential chain
28//! (Application Default Credentials): `GOOGLE_APPLICATION_CREDENTIALS`,
29//! `GOOGLE_APPLICATION_CREDENTIALS_JSON`, `gcloud` user creds, and the GCE/GKE
30//! metadata server. No bespoke snapdir credential variables are introduced.
31//!
32//! # TLS provider (project-load-bearing)
33//!
34//! The shipped binary must statically link on musl, so the workspace
35//! standardizes on the **`ring`** rustls provider; `aws-lc-rs` is banned. The
36//! `google-cloud-storage` default features pull `aws-lc-rs` in via
37//! `google-cloud-auth` (both its id-token backend and its rustls provider), so
38//! we depend on the crate with `default-features = false` and instead install
39//! the **ring** [`CryptoProvider`](rustls_ring::crypto::CryptoProvider) as the
40//! rustls *process default* (the SDK's `reqwest` is built with
41//! `rustls-no-provider`, i.e. it consumes that process default). See the crate
42//! `Cargo.toml` for the full rationale.
43//!
44//! # Sync trait, async SDK
45//!
46//! The SDK is async. [`GcsStore`] owns a private multi-thread `tokio` runtime
47//! and bridges each [`Store`] method with `runtime.block_on(...)`, exactly like
48//! [`S3Store`](crate::S3Store), so no `async` leaks into `snapdir-core`.
49
50use std::path::Path;
51use std::sync::Arc;
52
53use google_cloud_gax::error::rpc::Code;
54use google_cloud_gax::error::Error as GcsError;
55use google_cloud_storage::client::{Storage, StorageControl};
56use snapdir_core::manifest::Manifest;
57use snapdir_core::merkle::{Blake3Hasher, Hasher};
58use snapdir_core::store::{manifest_path, object_path, Store, StoreError};
59use snapdir_core::Meter;
60
61use crate::fetch::fetch_files_concurrent;
62use crate::push::{push_objects_concurrent, upload_object};
63use crate::stream::StreamStore;
64use crate::transfer::{RateLimiter, TransferConfig};
65use tokio::runtime::Runtime;
66
67/// Number of times a fetch is retried when the downloaded bytes fail their
68/// checksum, mirroring the oracle's `_SNAPDIR_GCS_STORE_RETRIES` default of 5.
69const MAX_FETCH_RETRIES: u32 = 5;
70
71/// The parsed location a [`GcsStore`] targets: a GCS bucket plus a key prefix.
72#[derive(Debug, Clone, PartialEq, Eq)]
73pub struct GcsLocation {
74 /// The bucket name (first path segment of the `gs://` URL).
75 pub bucket: String,
76 /// The object-name prefix (remaining segments), with no leading or trailing
77 /// slash. Empty when the store points at the bucket root.
78 pub prefix: String,
79}
80
81impl GcsLocation {
82 /// Parses a `gs://bucket/prefix` URL into its bucket and prefix.
83 ///
84 /// Matches the oracle exactly: the bucket is `cut -d'/' -f3` of the store
85 /// URL (the segment immediately after `gs://`), and the prefix is
86 /// `sed -E 's|^gs:/*[^/]*/?||'` with a trailing slash stripped
87 /// (`_snapdir_gcs_store_get_remote_prefix`).
88 ///
89 /// The `gs://` scheme is optional; a bare `bucket/prefix` is accepted too.
90 #[must_use]
91 pub fn parse(store_url: &str) -> Self {
92 // Drop the scheme (`gs://`, or any `<proto>://`) if present. The oracle
93 // splits the full URL on `/` and takes field 3 as the bucket, which for
94 // `gs://bucket/...` is exactly the segment after the `//`.
95 let without_scheme = match store_url.find("://") {
96 Some(idx) => &store_url[idx + 3..],
97 None => store_url,
98 };
99 let mut parts = without_scheme.splitn(2, '/');
100 let bucket = parts.next().unwrap_or("").to_owned();
101 let prefix = parts.next().unwrap_or("");
102 // Strip a leading slash (empty-first-segment edge case) and the trailing
103 // slash; the prefix is joined back with a single `/`.
104 let prefix = prefix
105 .trim_end_matches('/')
106 .trim_start_matches('/')
107 .to_owned();
108 Self { bucket, prefix }
109 }
110
111 /// The GCS resource name for this bucket, `projects/_/buckets/<bucket>`, as
112 /// required by the `google-cloud-storage` v1 API.
113 #[must_use]
114 pub fn bucket_resource(&self) -> String {
115 format!("projects/_/buckets/{}", self.bucket)
116 }
117
118 /// Returns the full GCS object name for a content object given its checksum,
119 /// i.e. `<prefix>/.objects/<sharded>` (no leading slash).
120 #[must_use]
121 pub fn object_key(&self, checksum: &str) -> String {
122 self.key_for(&object_path(checksum))
123 }
124
125 /// Returns the full GCS object name for a manifest given its snapshot id,
126 /// i.e. `<prefix>/.manifests/<sharded>` (no leading slash).
127 #[must_use]
128 pub fn manifest_key(&self, id: &str) -> String {
129 self.key_for(&manifest_path(id))
130 }
131
132 /// Joins the store prefix with a store-relative path (`.objects/...` or
133 /// `.manifests/...`), producing a leading-slash-free object name. Mirrors
134 /// the oracle's `${_SNAPDIR_STORE_BASE_DIR%/}/${source_path#/}` with the
135 /// leading slash trimmed.
136 fn key_for(&self, rel: &str) -> String {
137 let rel = rel.trim_start_matches('/');
138 if self.prefix.is_empty() {
139 rel.to_owned()
140 } else {
141 format!("{}/{rel}", self.prefix)
142 }
143 }
144}
145
146/// A content-addressable store backed by a Google Cloud Storage bucket.
147///
148/// Construct one with [`GcsStore::connect`] (resolves Application Default
149/// Credentials via the SDK).
150pub struct GcsStore {
151 /// Data-plane client (object read/write).
152 storage: Storage,
153 /// Control-plane client (object metadata, used for HEAD-like existence
154 /// checks without downloading the body).
155 control: StorageControl,
156 location: GcsLocation,
157 runtime: Arc<Runtime>,
158 config: TransferConfig,
159 /// Optional progress meter; recorded into during transfers. `None` (the
160 /// default from every constructor) means zero recording and byte-identical
161 /// behavior. Set by the CLI via [`GcsStore::with_meter`].
162 meter: Option<Arc<Meter>>,
163}
164
165impl GcsStore {
166 /// Connects to the `gs://bucket/prefix` store, resolving credentials via the
167 /// SDK's Application Default Credentials chain.
168 ///
169 /// The `ring` rustls crypto provider is installed as the process default
170 /// (idempotent) before the clients are built, keeping `aws-lc-rs` out of the
171 /// graph (see the module docs).
172 ///
173 /// # Errors
174 ///
175 /// [`StoreError::Backend`] if the tokio runtime cannot be created or the SDK
176 /// clients cannot be built (e.g. credentials cannot be resolved).
177 pub fn connect(store_url: &str) -> Result<Self, StoreError> {
178 Self::connect_with(store_url, TransferConfig::default())
179 }
180
181 /// Like [`connect`](Self::connect), but carries a [`TransferConfig`] for
182 /// concurrency / bandwidth control. The existing [`connect`](Self::connect)
183 /// delegates here with [`TransferConfig::default`].
184 ///
185 /// # Errors
186 ///
187 /// [`StoreError::Backend`] if the tokio runtime cannot be created or the SDK
188 /// clients cannot be built (e.g. credentials cannot be resolved).
189 pub fn connect_with(store_url: &str, config: TransferConfig) -> Result<Self, StoreError> {
190 let location = GcsLocation::parse(store_url);
191 let runtime = build_runtime()?;
192 install_ring_provider();
193
194 let (storage, control) = runtime.block_on(async {
195 let storage = Storage::builder()
196 .build()
197 .await
198 .map_err(|e| backend("building GCS Storage client", e))?;
199 let control = StorageControl::builder()
200 .build()
201 .await
202 .map_err(|e| backend("building GCS StorageControl client", e))?;
203 Ok::<_, StoreError>((storage, control))
204 })?;
205
206 Ok(Self {
207 storage,
208 control,
209 location,
210 runtime: Arc::new(runtime),
211 config,
212 meter: None,
213 })
214 }
215
216 /// Builds a store from already-configured SDK clients and a parsed location,
217 /// owning a fresh tokio runtime for the sync bridge. Intended for tests.
218 ///
219 /// # Errors
220 ///
221 /// [`StoreError::Backend`] if the tokio runtime cannot be created.
222 pub fn from_clients(
223 storage: Storage,
224 control: StorageControl,
225 location: GcsLocation,
226 ) -> Result<Self, StoreError> {
227 Ok(Self {
228 storage,
229 control,
230 location,
231 runtime: Arc::new(build_runtime()?),
232 config: TransferConfig::default(),
233 meter: None,
234 })
235 }
236
237 /// Attaches (or clears) an optional progress [`Meter`], rides alongside
238 /// [`config`](Self::transfer_config). The transfer paths record bytes-in /
239 /// bytes-out + per-object progress into it; `None` (the constructor default)
240 /// means zero recording and byte-identical behavior. The CLI sets this after
241 /// construction.
242 #[must_use]
243 pub fn with_meter(mut self, meter: Option<Arc<Meter>>) -> Self {
244 self.meter = meter;
245 self
246 }
247
248 /// The parsed bucket/prefix this store targets.
249 #[must_use]
250 pub fn location(&self) -> &GcsLocation {
251 &self.location
252 }
253
254 /// The [`TransferConfig`] (concurrency / bandwidth) this store was built
255 /// with. Consumed by the transfer loops in later gates.
256 #[must_use]
257 pub fn transfer_config(&self) -> &TransferConfig {
258 &self.config
259 }
260
261 /// Metadata HEAD on an object key; `Ok(true)` if it exists, `Ok(false)` if
262 /// absent (see [`is_not_found`] for what counts as absent).
263 async fn key_exists(&self, key: &str) -> Result<bool, StoreError> {
264 match self
265 .control
266 .get_object()
267 .set_bucket(self.location.bucket_resource())
268 .set_object(key)
269 .send()
270 .await
271 {
272 Ok(_) => Ok(true),
273 Err(err) => {
274 if is_not_found(&err) {
275 Ok(false)
276 } else {
277 Err(backend("GCS get_object metadata failed", err))
278 }
279 }
280 }
281 }
282
283 /// GET an object key's full body, draining the read stream, or `None` if it
284 /// is absent (see [`is_not_found`] for what counts as absent).
285 async fn get_bytes(&self, key: &str) -> Result<Option<Vec<u8>>, StoreError> {
286 let mut resp = match self
287 .storage
288 .read_object(self.location.bucket_resource(), key)
289 .send()
290 .await
291 {
292 Ok(resp) => resp,
293 Err(err) => {
294 if is_not_found(&err) {
295 return Ok(None);
296 }
297 return Err(backend("GCS read_object failed", err));
298 }
299 };
300
301 let mut buf = Vec::new();
302 while let Some(chunk) = resp.next().await {
303 let chunk = chunk.map_err(|e| backend("reading GCS object body", e))?;
304 buf.extend_from_slice(&chunk);
305 }
306 Ok(Some(buf))
307 }
308
309 /// PUT `bytes` at `key`. GCS object writes are atomic (the new object is only
310 /// visible once fully uploaded), so no temp-key dance is needed — matching
311 /// the oracle's reliance on `gcloud storage cp` atomicity.
312 async fn put_bytes(&self, key: &str, bytes: Vec<u8>) -> Result<(), StoreError> {
313 // The SDK's upload future is large (>30 KiB); box it so it does not
314 // bloat the enclosing `push` future (clippy::large_futures).
315 let upload = self
316 .storage
317 .write_object(
318 self.location.bucket_resource(),
319 key,
320 bytes::Bytes::from(bytes),
321 )
322 .send_buffered();
323 Box::pin(upload)
324 .await
325 .map_err(|e| backend("GCS write_object failed", e))?;
326 Ok(())
327 }
328
329 /// Downloads `key`, verifying its BLAKE3 against `expected`, retrying up to
330 /// [`MAX_FETCH_RETRIES`] times. Mirrors `_snapdir_gcs_fetch_to_cache`.
331 async fn fetch_verified(&self, key: &str, expected: &str) -> Result<Vec<u8>, StoreError> {
332 let hasher = Blake3Hasher::new();
333 let mut attempts_left = MAX_FETCH_RETRIES;
334 loop {
335 match self.get_bytes(key).await? {
336 Some(bytes) => {
337 let actual = hasher.hash_hex(&bytes);
338 if actual == expected {
339 return Ok(bytes);
340 }
341 // Mismatched checksum after fetching: retry (the oracle
342 // decrements its retry budget on the same condition).
343 attempts_left = attempts_left.saturating_sub(1);
344 if attempts_left == 0 {
345 return Err(StoreError::Integrity {
346 address: format!("gs://{}/{key}", self.location.bucket),
347 expected: expected.to_owned(),
348 actual,
349 });
350 }
351 }
352 None => {
353 // Treat a missing key as not-found rather than spinning.
354 return Err(StoreError::ObjectNotFound {
355 checksum: expected.to_owned(),
356 });
357 }
358 }
359 }
360 }
361}
362
363impl Store for GcsStore {
364 fn get_manifest(&self, id: &str) -> Result<Manifest, StoreError> {
365 let key = self.location.manifest_key(id);
366 let bytes = self.runtime.block_on(async {
367 match self.get_bytes(&key).await? {
368 Some(b) => Ok(b),
369 None => Err(StoreError::ManifestNotFound { id: id.to_owned() }),
370 }
371 })?;
372
373 let text = String::from_utf8(bytes).map_err(|err| StoreError::Backend {
374 message: format!("manifest {id} is not valid UTF-8"),
375 source: Some(Box::new(err)),
376 })?;
377 let manifest = Manifest::parse(&text)?;
378
379 // Verify the stored manifest hashes back to its snapshot id before
380 // trusting it (oracle: the id check on fetch).
381 let actual = snapdir_core::merkle::snapshot_id(&manifest, &Blake3Hasher::new());
382 if actual != id {
383 return Err(StoreError::Integrity {
384 address: self.location.manifest_key(id),
385 expected: id.to_owned(),
386 actual,
387 });
388 }
389 Ok(manifest)
390 }
391
392 fn fetch_files(&self, manifest: &Manifest, dest: &Path) -> Result<(), StoreError> {
393 // Concurrent download via the shared orchestrator: it owns the
394 // skip-if-present-and-verified short-circuit, directory creation, the
395 // bounded-concurrency pass, the per-object rate limit, and the atomic
396 // write. GCS only injects the per-object download, preserving the
397 // BLAKE3-verify + retry discipline of `fetch_verified`.
398 let limiter = RateLimiter::new(self.config.max_bytes_per_sec);
399 let meter = self.meter.as_deref();
400 let meter_arc = self.meter.clone();
401 self.runtime.block_on(async {
402 fetch_files_concurrent(
403 manifest,
404 dest,
405 &self.config,
406 &limiter,
407 meter,
408 meter_arc,
409 |entry| async {
410 let key = self.location.object_key(&entry.checksum);
411 self.fetch_verified(&key, &entry.checksum).await
412 },
413 )
414 .await
415 })
416 }
417
418 fn push(&self, manifest: &Manifest, source: &Path) -> Result<(), StoreError> {
419 let hasher = Blake3Hasher::new();
420 let id = snapdir_core::merkle::snapshot_id(manifest, &hasher);
421
422 // Concurrent upload via the shared orchestrator: it owns the bounded
423 // per-object pass and the manifest-last / all-or-nothing ordering. GCS
424 // injects the per-object skip-present + upload (via `upload_object`,
425 // which also owns the shared read+verify) and the manifest-write
426 // closure. A failed push writes NO manifest.
427 let limiter = RateLimiter::new(self.config.max_bytes_per_sec);
428 let meter = self.meter.as_deref();
429 let meter_arc = self.meter.clone();
430 self.runtime.block_on(async {
431 // Skip-if-manifest-present pre-check: a present manifest implies all
432 // its objects are present (we always write the manifest last).
433 let manifest_key = self.location.manifest_key(&id);
434 if self.key_exists(&manifest_key).await? {
435 return Ok(());
436 }
437
438 push_objects_concurrent(
439 manifest,
440 &self.config,
441 &limiter,
442 meter,
443 meter_arc,
444 |entry| {
445 let object_key = self.location.object_key(&entry.checksum);
446 upload_object(
447 entry,
448 object_key,
449 source,
450 &limiter,
451 meter,
452 |key| async move { self.key_exists(&key).await },
453 |key, bytes| async move { self.put_bytes(&key, bytes).await },
454 )
455 },
456 || async {
457 // Write the manifest last (verified to hash back to its id),
458 // exactly as the oracle stores the manifest text.
459 let mut text = manifest.to_string();
460 text.push('\n');
461 let manifest_actual = hasher.hash_hex(text.as_bytes());
462 if manifest_actual != id {
463 return Err(StoreError::Integrity {
464 address: manifest_key.clone(),
465 expected: id.clone(),
466 actual: manifest_actual,
467 });
468 }
469 self.put_bytes(&manifest_key, text.into_bytes()).await
470 },
471 )
472 .await
473 })
474 }
475}
476
477impl StreamStore for GcsStore {
478 fn has_object(&self, checksum: &str) -> Result<bool, StoreError> {
479 let key = self.location.object_key(checksum);
480 self.runtime.block_on(async { self.key_exists(&key).await })
481 }
482
483 fn get_object(&self, checksum: &str) -> Result<Vec<u8>, StoreError> {
484 let key = self.location.object_key(checksum);
485 let bytes = self.runtime.block_on(async {
486 self.get_bytes(&key)
487 .await?
488 .ok_or_else(|| StoreError::ObjectNotFound {
489 checksum: checksum.to_owned(),
490 })
491 })?;
492
493 // Verify the downloaded blob hashes back to its content-address before
494 // returning it (corruption surfaces as `Integrity`, never bad bytes).
495 let actual = Blake3Hasher::new().hash_hex(&bytes);
496 if actual != checksum {
497 return Err(StoreError::Integrity {
498 address: format!("gs://{}/{key}", self.location.bucket),
499 expected: checksum.to_owned(),
500 actual,
501 });
502 }
503 Ok(bytes)
504 }
505
506 fn put_object(&self, checksum: &str, bytes: Vec<u8>) -> Result<(), StoreError> {
507 // Verify BEFORE uploading: a blob whose bytes do not hash to `checksum`
508 // must never land at that content-address (nothing is stored).
509 let actual = Blake3Hasher::new().hash_hex(&bytes);
510 if actual != checksum {
511 return Err(StoreError::Integrity {
512 address: self.location.object_key(checksum),
513 expected: checksum.to_owned(),
514 actual,
515 });
516 }
517 let key = self.location.object_key(checksum);
518 self.runtime
519 .block_on(async { self.put_bytes(&key, bytes).await })
520 }
521
522 fn put_manifest(&self, id: &str, manifest: &Manifest) -> Result<(), StoreError> {
523 let key = self.location.manifest_key(id);
524 // Mirror the manifest-write tail of `push`: render the oracle's
525 // `echo "${manifest}"` bytes, verify they hash back to `id`, then PUT.
526 let mut text = manifest.to_string();
527 text.push('\n');
528 let actual = Blake3Hasher::new().hash_hex(text.as_bytes());
529 if actual != id {
530 return Err(StoreError::Integrity {
531 address: key,
532 expected: id.to_owned(),
533 actual,
534 });
535 }
536 self.runtime
537 .block_on(async { self.put_bytes(&key, text.into_bytes()).await })
538 }
539}
540
541/// Builds the multi-thread tokio runtime that backs the sync bridge.
542fn build_runtime() -> Result<Runtime, StoreError> {
543 tokio::runtime::Builder::new_multi_thread()
544 .enable_all()
545 .build()
546 .map_err(|e| backend("creating tokio runtime for GcsStore", e))
547}
548
549/// Installs the **`ring`** rustls [`CryptoProvider`] as the process default if
550/// none is set yet. Idempotent: a second call (or a default installed by another
551/// store) is a harmless no-op. This is the load-bearing piece that keeps
552/// `aws-lc-rs` out of the dependency graph — the SDK's `reqwest`
553/// (`rustls-no-provider`) consumes whatever process-default provider is set.
554fn install_ring_provider() {
555 // Ignore the error: it only means a provider was already installed, which is
556 // exactly the state we want.
557 let _ = rustls_ring::crypto::ring::default_provider().install_default();
558}
559
560/// Wraps any backend error into [`StoreError::Backend`] with a message.
561fn backend<E>(message: &str, source: E) -> StoreError
562where
563 E: std::error::Error + Send + Sync + 'static,
564{
565 StoreError::Backend {
566 message: message.to_owned(),
567 source: Some(Box::new(source)),
568 }
569}
570
571/// Classifies a `google-cloud-storage` SDK error as "object is absent".
572///
573/// The SDK reports a missing object two different ways, and the absent-object
574/// paths (`key_exists` -> `Ok(false)`, `get_bytes` -> `Ok(None)`) must treat
575/// BOTH as not-found:
576///
577/// 1. A plain **HTTP 404** (`http_status_code() == Some(404)`), e.g. from a
578/// proxy/load balancer ahead of the service.
579/// 2. A **service-level gRPC-style error** whose `status().code` is
580/// [`Code::NotFound`] but whose `http_status_code()` is `None`. This is what
581/// the v1.x SDK actually returns for `get_object`/`read_object` on a missing
582/// object, and the form the original `== Some(404)`-only check misclassified
583/// as a fatal backend error (it aborted `push` before the first upload).
584///
585/// This mirrors the SDK's own internal classification
586/// (`e.status().is_some_and(|s| s.code == Code::NotFound)`), the GCS analogue of
587/// the aws-sdk's `is_not_found()` used by [`S3Store`](crate::S3Store).
588fn is_not_found(err: &GcsError) -> bool {
589 err.http_status_code() == Some(404)
590 || err
591 .status()
592 .is_some_and(|status| status.code == Code::NotFound)
593}
594
595#[cfg(test)]
596mod tests {
597 use super::*;
598 use snapdir_core::manifest::PathType;
599
600 /// Strips a leading `./` and a trailing `/` from a manifest path. Kept as a
601 /// test-only assertion of the path normalization the orchestrator
602 /// (`crate::push`) performs.
603 fn strip_leading_dot_slash(path: &str) -> &str {
604 let trimmed = path.strip_prefix("./").unwrap_or(path);
605 trimmed.strip_suffix('/').unwrap_or(trimmed)
606 }
607
608 // The canonical content-addressable fixtures (shared across the s3/gcs
609 // store test suites).
610 const FOO_CHECKSUM: &str = "49dc870df1de7fd60794cebce449f5ccdae575affaa67a24b62acb03e039db92";
611 const FOO_SHARDED: &str = "49d/c87/0df/1de7fd60794cebce449f5ccdae575affaa67a24b62acb03e039db92";
612 const MANIFEST_ID: &str = "aa91e498f401ea9e6ddbaa1138a0dbeb030fab8defc1252d80c77ebefafbc70d";
613 const MANIFEST_SHARDED: &str =
614 "aa9/1e4/98f/401ea9e6ddbaa1138a0dbeb030fab8defc1252d80c77ebefafbc70d";
615
616 #[test]
617 fn gcs_store_parses_bucket_and_prefix() {
618 let loc = GcsLocation::parse("gs://my-bucket/long/term/storage");
619 assert_eq!(loc.bucket, "my-bucket");
620 assert_eq!(loc.prefix, "long/term/storage");
621 }
622
623 #[test]
624 fn gcs_store_parse_matches_oracle_cut_and_sed() {
625 // Oracle: bucket = `cut -d'/' -f3`; prefix = `sed -E 's|^gs:/*[^/]*/?||'`
626 // then trailing-slash strip. For "gs://bucket/a/b/c": bucket=bucket,
627 // prefix=a/b/c.
628 let loc = GcsLocation::parse("gs://bucket/a/b/c");
629 assert_eq!(loc.bucket, "bucket");
630 assert_eq!(loc.prefix, "a/b/c");
631 }
632
633 #[test]
634 fn gcs_store_parse_strips_trailing_slash() {
635 // `_snapdir_gcs_store_get_remote_prefix` strips the trailing slash.
636 let loc = GcsLocation::parse("gs://bucket/prefix/");
637 assert_eq!(loc.bucket, "bucket");
638 assert_eq!(loc.prefix, "prefix");
639 }
640
641 #[test]
642 fn gcs_store_parse_bucket_root_has_empty_prefix() {
643 let loc = GcsLocation::parse("gs://bucket");
644 assert_eq!(loc.bucket, "bucket");
645 assert_eq!(loc.prefix, "");
646
647 let loc_slash = GcsLocation::parse("gs://bucket/");
648 assert_eq!(loc_slash.bucket, "bucket");
649 assert_eq!(loc_slash.prefix, "");
650 }
651
652 #[test]
653 fn gcs_store_parse_accepts_bare_bucket_prefix_without_scheme() {
654 let loc = GcsLocation::parse("bucket/some/prefix");
655 assert_eq!(loc.bucket, "bucket");
656 assert_eq!(loc.prefix, "some/prefix");
657 }
658
659 #[test]
660 fn gcs_store_bucket_resource_uses_projects_underscore_form() {
661 let loc = GcsLocation::parse("gs://my-bucket/x");
662 assert_eq!(loc.bucket_resource(), "projects/_/buckets/my-bucket");
663 }
664
665 #[test]
666 fn gcs_store_object_key_matches_sharded_scheme() {
667 let loc = GcsLocation::parse("gs://b/long/term/storage");
668 assert_eq!(
669 loc.object_key(FOO_CHECKSUM),
670 format!("long/term/storage/.objects/{FOO_SHARDED}")
671 );
672 }
673
674 #[test]
675 fn gcs_store_manifest_key_matches_sharded_scheme() {
676 let loc = GcsLocation::parse("gs://b/long/term/storage");
677 assert_eq!(
678 loc.manifest_key(MANIFEST_ID),
679 format!("long/term/storage/.manifests/{MANIFEST_SHARDED}")
680 );
681 }
682
683 #[test]
684 fn gcs_store_keys_have_no_leading_slash_at_bucket_root() {
685 // With an empty prefix the keys are just `.objects/...` / `.manifests/...`.
686 let loc = GcsLocation::parse("gs://bucket");
687 assert_eq!(
688 loc.object_key(FOO_CHECKSUM),
689 format!(".objects/{FOO_SHARDED}")
690 );
691 assert_eq!(
692 loc.manifest_key(MANIFEST_ID),
693 format!(".manifests/{MANIFEST_SHARDED}")
694 );
695 }
696
697 #[test]
698 fn gcs_store_object_key_uses_core_object_path() {
699 // Cross-check that we delegate to the frozen core sharding helper rather
700 // than reimplementing it: at the bucket root the key equals the core
701 // `object_path` output verbatim.
702 let loc = GcsLocation::parse("gs://b");
703 assert_eq!(loc.object_key(FOO_CHECKSUM), object_path(FOO_CHECKSUM));
704 assert_eq!(loc.manifest_key(MANIFEST_ID), manifest_path(MANIFEST_ID));
705 }
706
707 #[test]
708 fn gcs_store_strip_leading_dot_slash() {
709 assert_eq!(strip_leading_dot_slash("./foo"), "foo");
710 assert_eq!(strip_leading_dot_slash("./a/b/c"), "a/b/c");
711 assert_eq!(strip_leading_dot_slash("./a/"), "a");
712 assert_eq!(strip_leading_dot_slash("./"), "");
713 }
714
715 #[test]
716 fn gcs_store_is_not_found_classifies_service_level_not_found_as_absent() {
717 // Regression guard for the push-abort bug: the v1.x SDK reports a
718 // missing object as a *service-level* gRPC error with code NOT_FOUND and
719 // NO HTTP status code (`http_status_code() == None`). The original
720 // `== Some(404)`-only check misclassified this as a fatal backend error,
721 // so `key_exists` errored and `push` aborted before uploading anything.
722 use google_cloud_gax::error::rpc::Status;
723
724 let status = Status::default()
725 .set_code(Code::NotFound)
726 .set_message("No such object: bucket/.manifests/...");
727 let err = GcsError::service(status);
728 // This is the load-bearing assertion: the real-world shape carries no
729 // HTTP code, so a 404-only check would (and did) miss it.
730 assert_eq!(err.http_status_code(), None);
731 assert!(
732 is_not_found(&err),
733 "service-level NOT_FOUND must be classified as object-absent"
734 );
735 }
736
737 #[test]
738 fn gcs_store_is_not_found_classifies_http_404_as_absent() {
739 // The other absent shape: a plain HTTP 404 (e.g. from a proxy/LB ahead
740 // of the service). Must also count as not-found.
741 let err = GcsError::http(404, http::HeaderMap::new(), bytes::Bytes::new());
742 assert!(is_not_found(&err), "HTTP 404 must be classified as absent");
743 }
744
745 #[test]
746 fn gcs_store_is_not_found_does_not_swallow_other_errors() {
747 // Guard the inverse: a non-not-found service error (e.g. PERMISSION
748 // DENIED) and a non-404 HTTP error must NOT be treated as absent, so
749 // real failures still surface instead of being silently skipped.
750 use google_cloud_gax::error::rpc::Status;
751
752 let denied = GcsError::service(Status::default().set_code(Code::PermissionDenied));
753 assert!(!is_not_found(&denied), "PERMISSION_DENIED is not absence");
754
755 let server_err = GcsError::http(503, http::HeaderMap::new(), bytes::Bytes::new());
756 assert!(!is_not_found(&server_err), "HTTP 503 is not absence");
757 }
758
759 #[test]
760 fn gcs_store_install_ring_provider_is_idempotent() {
761 // Installing the ring provider twice must not panic; the second call is a
762 // harmless no-op (a provider is already the process default).
763 install_ring_provider();
764 install_ring_provider();
765 }
766
767 // --- Live round-trip, skipped by default --------------------------------
768 //
769 // Requires real Google Cloud credentials (ADC) plus a writable bucket.
770 // Gated behind `SNAPDIR_GCS_TEST_STORE` (a `gs://bucket/prefix` URL) so it is
771 // skipped unless explicitly configured. Real round-trips are exercised by
772 // the later `remote-interop` gate.
773 #[test]
774 fn gcs_store_live_round_trip_when_configured() {
775 use snapdir_core::manifest::ManifestEntry;
776
777 let Ok(store) = std::env::var("SNAPDIR_GCS_TEST_STORE") else {
778 eprintln!(
779 "skipping gcs_store live round-trip: set SNAPDIR_GCS_TEST_STORE \
780 (gs://bucket/prefix) plus ADC credentials to run it"
781 );
782 return;
783 };
784
785 let hasher = Blake3Hasher::new();
786
787 // Build a tiny source tree + matching manifest.
788 let src = std::env::temp_dir().join(format!("snapdir-gcs-live-{}", std::process::id()));
789 std::fs::create_dir_all(&src).unwrap();
790 std::fs::write(src.join("foo"), b"foo\n").unwrap();
791 let foo_sum = hasher.hash_hex(b"foo\n");
792 let root_sum = snapdir_core::merkle::directory_checksum([foo_sum.as_str()], &hasher);
793 let mut manifest = Manifest::new();
794 manifest.push(ManifestEntry::new(
795 PathType::Directory,
796 "700",
797 root_sum,
798 4,
799 "./",
800 ));
801 manifest.push(ManifestEntry::new(
802 PathType::File,
803 "600",
804 foo_sum,
805 4,
806 "./foo",
807 ));
808 let manifest = Manifest::from_entries(manifest.entries().to_vec());
809 let id = snapdir_core::merkle::snapshot_id(&manifest, &hasher);
810
811 let gcs = GcsStore::connect(&store).expect("connect");
812 gcs.push(&manifest, &src).expect("push");
813 let read_back = gcs.get_manifest(&id).expect("get_manifest");
814 assert_eq!(read_back, manifest);
815
816 let dest = std::env::temp_dir().join(format!("snapdir-gcs-dest-{}", std::process::id()));
817 std::fs::create_dir_all(&dest).unwrap();
818 gcs.fetch_files(&read_back, &dest).expect("fetch_files");
819 assert_eq!(std::fs::read(dest.join("foo")).unwrap(), b"foo\n");
820
821 let _ = std::fs::remove_dir_all(&src);
822 let _ = std::fs::remove_dir_all(&dest);
823 }
824}