use std::path::Path;
use std::sync::Arc;
use google_cloud_gax::error::rpc::Code;
use google_cloud_gax::error::Error as GcsError;
use google_cloud_storage::client::{Storage, StorageControl};
use snapdir_core::manifest::{Manifest, PathType};
use snapdir_core::merkle::{Blake3Hasher, Hasher};
use snapdir_core::store::{manifest_path, object_path, Store, StoreError};
use tokio::runtime::Runtime;
const MAX_FETCH_RETRIES: u32 = 5;
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct GcsLocation {
pub bucket: String,
pub prefix: String,
}
impl GcsLocation {
#[must_use]
pub fn parse(store_url: &str) -> Self {
let without_scheme = match store_url.find("://") {
Some(idx) => &store_url[idx + 3..],
None => store_url,
};
let mut parts = without_scheme.splitn(2, '/');
let bucket = parts.next().unwrap_or("").to_owned();
let prefix = parts.next().unwrap_or("");
let prefix = prefix
.trim_end_matches('/')
.trim_start_matches('/')
.to_owned();
Self { bucket, prefix }
}
#[must_use]
pub fn bucket_resource(&self) -> String {
format!("projects/_/buckets/{}", self.bucket)
}
#[must_use]
pub fn object_key(&self, checksum: &str) -> String {
self.key_for(&object_path(checksum))
}
#[must_use]
pub fn manifest_key(&self, id: &str) -> String {
self.key_for(&manifest_path(id))
}
fn key_for(&self, rel: &str) -> String {
let rel = rel.trim_start_matches('/');
if self.prefix.is_empty() {
rel.to_owned()
} else {
format!("{}/{rel}", self.prefix)
}
}
}
pub struct GcsStore {
storage: Storage,
control: StorageControl,
location: GcsLocation,
runtime: Arc<Runtime>,
}
impl GcsStore {
pub fn connect(store_url: &str) -> Result<Self, StoreError> {
let location = GcsLocation::parse(store_url);
let runtime = build_runtime()?;
install_ring_provider();
let (storage, control) = runtime.block_on(async {
let storage = Storage::builder()
.build()
.await
.map_err(|e| backend("building GCS Storage client", e))?;
let control = StorageControl::builder()
.build()
.await
.map_err(|e| backend("building GCS StorageControl client", e))?;
Ok::<_, StoreError>((storage, control))
})?;
Ok(Self {
storage,
control,
location,
runtime: Arc::new(runtime),
})
}
pub fn from_clients(
storage: Storage,
control: StorageControl,
location: GcsLocation,
) -> Result<Self, StoreError> {
Ok(Self {
storage,
control,
location,
runtime: Arc::new(build_runtime()?),
})
}
#[must_use]
pub fn location(&self) -> &GcsLocation {
&self.location
}
async fn key_exists(&self, key: &str) -> Result<bool, StoreError> {
match self
.control
.get_object()
.set_bucket(self.location.bucket_resource())
.set_object(key)
.send()
.await
{
Ok(_) => Ok(true),
Err(err) => {
if is_not_found(&err) {
Ok(false)
} else {
Err(backend("GCS get_object metadata failed", err))
}
}
}
}
async fn get_bytes(&self, key: &str) -> Result<Option<Vec<u8>>, StoreError> {
let mut resp = match self
.storage
.read_object(self.location.bucket_resource(), key)
.send()
.await
{
Ok(resp) => resp,
Err(err) => {
if is_not_found(&err) {
return Ok(None);
}
return Err(backend("GCS read_object failed", err));
}
};
let mut buf = Vec::new();
while let Some(chunk) = resp.next().await {
let chunk = chunk.map_err(|e| backend("reading GCS object body", e))?;
buf.extend_from_slice(&chunk);
}
Ok(Some(buf))
}
async fn put_bytes(&self, key: &str, bytes: Vec<u8>) -> Result<(), StoreError> {
let upload = self
.storage
.write_object(
self.location.bucket_resource(),
key,
bytes::Bytes::from(bytes),
)
.send_buffered();
Box::pin(upload)
.await
.map_err(|e| backend("GCS write_object failed", e))?;
Ok(())
}
async fn fetch_verified(&self, key: &str, expected: &str) -> Result<Vec<u8>, StoreError> {
let hasher = Blake3Hasher::new();
let mut attempts_left = MAX_FETCH_RETRIES;
loop {
match self.get_bytes(key).await? {
Some(bytes) => {
let actual = hasher.hash_hex(&bytes);
if actual == expected {
return Ok(bytes);
}
attempts_left = attempts_left.saturating_sub(1);
if attempts_left == 0 {
return Err(StoreError::Integrity {
address: format!("gs://{}/{key}", self.location.bucket),
expected: expected.to_owned(),
actual,
});
}
}
None => {
return Err(StoreError::ObjectNotFound {
checksum: expected.to_owned(),
});
}
}
}
}
}
impl Store for GcsStore {
fn get_manifest(&self, id: &str) -> Result<Manifest, StoreError> {
let key = self.location.manifest_key(id);
let bytes = self.runtime.block_on(async {
match self.get_bytes(&key).await? {
Some(b) => Ok(b),
None => Err(StoreError::ManifestNotFound { id: id.to_owned() }),
}
})?;
let text = String::from_utf8(bytes).map_err(|err| StoreError::Backend {
message: format!("manifest {id} is not valid UTF-8"),
source: Some(Box::new(err)),
})?;
let manifest = Manifest::parse(&text)?;
let actual = snapdir_core::merkle::snapshot_id(&manifest, &Blake3Hasher::new());
if actual != id {
return Err(StoreError::Integrity {
address: self.location.manifest_key(id),
expected: id.to_owned(),
actual,
});
}
Ok(manifest)
}
fn fetch_files(&self, manifest: &Manifest, dest: &Path) -> Result<(), StoreError> {
self.runtime.block_on(async {
for entry in manifest.entries() {
let rel = strip_leading_dot_slash(&entry.path);
let target = dest.join(rel);
match entry.path_type {
PathType::Directory => {
std::fs::create_dir_all(&target)?;
}
PathType::File => {
if let Some(parent) = target.parent() {
std::fs::create_dir_all(parent)?;
}
let key = self.location.object_key(&entry.checksum);
let bytes = self.fetch_verified(&key, &entry.checksum).await?;
write_atomic(&target, &bytes)?;
}
}
}
Ok(())
})
}
fn push(&self, manifest: &Manifest, source: &Path) -> Result<(), StoreError> {
let hasher = Blake3Hasher::new();
let id = snapdir_core::merkle::snapshot_id(manifest, &hasher);
self.runtime.block_on(async {
let manifest_key = self.location.manifest_key(&id);
if self.key_exists(&manifest_key).await? {
return Ok(());
}
for entry in manifest.entries() {
if entry.path_type != PathType::File {
continue;
}
let object_key = self.location.object_key(&entry.checksum);
if self.key_exists(&object_key).await? {
continue;
}
let rel = strip_leading_dot_slash(&entry.path);
let object_source = source.join(rel);
let bytes = std::fs::read(&object_source)?;
let actual = hasher.hash_hex(&bytes);
if actual != entry.checksum {
return Err(StoreError::Integrity {
address: object_source.display().to_string(),
expected: entry.checksum.clone(),
actual,
});
}
self.put_bytes(&object_key, bytes).await?;
}
let mut text = manifest.to_string();
text.push('\n');
let manifest_actual = hasher.hash_hex(text.as_bytes());
if manifest_actual != id {
return Err(StoreError::Integrity {
address: manifest_key.clone(),
expected: id.clone(),
actual: manifest_actual,
});
}
self.put_bytes(&manifest_key, text.into_bytes()).await?;
Ok(())
})
}
}
fn build_runtime() -> Result<Runtime, StoreError> {
tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.map_err(|e| backend("creating tokio runtime for GcsStore", e))
}
fn install_ring_provider() {
let _ = rustls_ring::crypto::ring::default_provider().install_default();
}
fn backend<E>(message: &str, source: E) -> StoreError
where
E: std::error::Error + Send + Sync + 'static,
{
StoreError::Backend {
message: message.to_owned(),
source: Some(Box::new(source)),
}
}
fn is_not_found(err: &GcsError) -> bool {
err.http_status_code() == Some(404)
|| err
.status()
.is_some_and(|status| status.code == Code::NotFound)
}
fn write_atomic(target: &Path, bytes: &[u8]) -> Result<(), StoreError> {
use std::sync::atomic::{AtomicU64, Ordering};
static COUNTER: AtomicU64 = AtomicU64::new(0);
if let Some(parent) = target.parent() {
std::fs::create_dir_all(parent)?;
}
let n = COUNTER.fetch_add(1, Ordering::Relaxed);
let pid = std::process::id();
let file_name = target
.file_name()
.map(|s| s.to_string_lossy().into_owned())
.unwrap_or_default();
let tmp = match target.parent() {
Some(parent) => parent.join(format!("{file_name}.{pid}.{n}.tmp")),
None => std::path::PathBuf::from(format!("{file_name}.{pid}.{n}.tmp")),
};
std::fs::write(&tmp, bytes)?;
std::fs::rename(&tmp, target)?;
Ok(())
}
fn strip_leading_dot_slash(path: &str) -> &str {
let trimmed = path.strip_prefix("./").unwrap_or(path);
trimmed.strip_suffix('/').unwrap_or(trimmed)
}
#[cfg(test)]
mod tests {
use super::*;
const FOO_CHECKSUM: &str = "49dc870df1de7fd60794cebce449f5ccdae575affaa67a24b62acb03e039db92";
const FOO_SHARDED: &str = "49d/c87/0df/1de7fd60794cebce449f5ccdae575affaa67a24b62acb03e039db92";
const MANIFEST_ID: &str = "aa91e498f401ea9e6ddbaa1138a0dbeb030fab8defc1252d80c77ebefafbc70d";
const MANIFEST_SHARDED: &str =
"aa9/1e4/98f/401ea9e6ddbaa1138a0dbeb030fab8defc1252d80c77ebefafbc70d";
#[test]
fn gcs_store_parses_bucket_and_prefix() {
let loc = GcsLocation::parse("gs://my-bucket/long/term/storage");
assert_eq!(loc.bucket, "my-bucket");
assert_eq!(loc.prefix, "long/term/storage");
}
#[test]
fn gcs_store_parse_matches_oracle_cut_and_sed() {
let loc = GcsLocation::parse("gs://bucket/a/b/c");
assert_eq!(loc.bucket, "bucket");
assert_eq!(loc.prefix, "a/b/c");
}
#[test]
fn gcs_store_parse_strips_trailing_slash() {
let loc = GcsLocation::parse("gs://bucket/prefix/");
assert_eq!(loc.bucket, "bucket");
assert_eq!(loc.prefix, "prefix");
}
#[test]
fn gcs_store_parse_bucket_root_has_empty_prefix() {
let loc = GcsLocation::parse("gs://bucket");
assert_eq!(loc.bucket, "bucket");
assert_eq!(loc.prefix, "");
let loc_slash = GcsLocation::parse("gs://bucket/");
assert_eq!(loc_slash.bucket, "bucket");
assert_eq!(loc_slash.prefix, "");
}
#[test]
fn gcs_store_parse_accepts_bare_bucket_prefix_without_scheme() {
let loc = GcsLocation::parse("bucket/some/prefix");
assert_eq!(loc.bucket, "bucket");
assert_eq!(loc.prefix, "some/prefix");
}
#[test]
fn gcs_store_bucket_resource_uses_projects_underscore_form() {
let loc = GcsLocation::parse("gs://my-bucket/x");
assert_eq!(loc.bucket_resource(), "projects/_/buckets/my-bucket");
}
#[test]
fn gcs_store_object_key_matches_sharded_scheme() {
let loc = GcsLocation::parse("gs://b/long/term/storage");
assert_eq!(
loc.object_key(FOO_CHECKSUM),
format!("long/term/storage/.objects/{FOO_SHARDED}")
);
}
#[test]
fn gcs_store_manifest_key_matches_sharded_scheme() {
let loc = GcsLocation::parse("gs://b/long/term/storage");
assert_eq!(
loc.manifest_key(MANIFEST_ID),
format!("long/term/storage/.manifests/{MANIFEST_SHARDED}")
);
}
#[test]
fn gcs_store_keys_have_no_leading_slash_at_bucket_root() {
let loc = GcsLocation::parse("gs://bucket");
assert_eq!(
loc.object_key(FOO_CHECKSUM),
format!(".objects/{FOO_SHARDED}")
);
assert_eq!(
loc.manifest_key(MANIFEST_ID),
format!(".manifests/{MANIFEST_SHARDED}")
);
}
#[test]
fn gcs_store_object_key_uses_core_object_path() {
let loc = GcsLocation::parse("gs://b");
assert_eq!(loc.object_key(FOO_CHECKSUM), object_path(FOO_CHECKSUM));
assert_eq!(loc.manifest_key(MANIFEST_ID), manifest_path(MANIFEST_ID));
}
#[test]
fn gcs_store_strip_leading_dot_slash() {
assert_eq!(strip_leading_dot_slash("./foo"), "foo");
assert_eq!(strip_leading_dot_slash("./a/b/c"), "a/b/c");
assert_eq!(strip_leading_dot_slash("./a/"), "a");
assert_eq!(strip_leading_dot_slash("./"), "");
}
#[test]
fn gcs_store_is_not_found_classifies_service_level_not_found_as_absent() {
use google_cloud_gax::error::rpc::Status;
let status = Status::default()
.set_code(Code::NotFound)
.set_message("No such object: bucket/.manifests/...");
let err = GcsError::service(status);
assert_eq!(err.http_status_code(), None);
assert!(
is_not_found(&err),
"service-level NOT_FOUND must be classified as object-absent"
);
}
#[test]
fn gcs_store_is_not_found_classifies_http_404_as_absent() {
let err = GcsError::http(404, http::HeaderMap::new(), bytes::Bytes::new());
assert!(is_not_found(&err), "HTTP 404 must be classified as absent");
}
#[test]
fn gcs_store_is_not_found_does_not_swallow_other_errors() {
use google_cloud_gax::error::rpc::Status;
let denied = GcsError::service(Status::default().set_code(Code::PermissionDenied));
assert!(!is_not_found(&denied), "PERMISSION_DENIED is not absence");
let server_err = GcsError::http(503, http::HeaderMap::new(), bytes::Bytes::new());
assert!(!is_not_found(&server_err), "HTTP 503 is not absence");
}
#[test]
fn gcs_store_install_ring_provider_is_idempotent() {
install_ring_provider();
install_ring_provider();
}
#[test]
fn gcs_store_live_round_trip_when_configured() {
use snapdir_core::manifest::ManifestEntry;
let Ok(store) = std::env::var("SNAPDIR_GCS_TEST_STORE") else {
eprintln!(
"skipping gcs_store live round-trip: set SNAPDIR_GCS_TEST_STORE \
(gs://bucket/prefix) plus ADC credentials to run it"
);
return;
};
let hasher = Blake3Hasher::new();
let src = std::env::temp_dir().join(format!("snapdir-gcs-live-{}", std::process::id()));
std::fs::create_dir_all(&src).unwrap();
std::fs::write(src.join("foo"), b"foo\n").unwrap();
let foo_sum = hasher.hash_hex(b"foo\n");
let root_sum = snapdir_core::merkle::directory_checksum([foo_sum.as_str()], &hasher);
let mut manifest = Manifest::new();
manifest.push(ManifestEntry::new(
PathType::Directory,
"700",
root_sum,
4,
"./",
));
manifest.push(ManifestEntry::new(
PathType::File,
"600",
foo_sum,
4,
"./foo",
));
let manifest = Manifest::from_entries(manifest.entries().to_vec());
let id = snapdir_core::merkle::snapshot_id(&manifest, &hasher);
let gcs = GcsStore::connect(&store).expect("connect");
gcs.push(&manifest, &src).expect("push");
let read_back = gcs.get_manifest(&id).expect("get_manifest");
assert_eq!(read_back, manifest);
let dest = std::env::temp_dir().join(format!("snapdir-gcs-dest-{}", std::process::id()));
std::fs::create_dir_all(&dest).unwrap();
gcs.fetch_files(&read_back, &dest).expect("fetch_files");
assert_eq!(std::fs::read(dest.join("foo")).unwrap(), b"foo\n");
let _ = std::fs::remove_dir_all(&src);
let _ = std::fs::remove_dir_all(&dest);
}
}