use bytes::Bytes;
use google_cloud_auth::credentials::service_account::Builder as CredentialsBuilder;
use google_cloud_storage::client::{Storage, StorageControl};
use crate::error::Error;
use crate::traits::StorageBackend;
use crate::types::{PutOptions, RawDownloadResult};
pub struct GcsBackend {
storage: Storage,
control: StorageControl,
bucket_path: String,
public_base_url: String,
}
pub enum GcsCredentials {
Adc,
ServiceAccountJson(String),
}
pub struct GcsBackendBuilder {
bucket: Option<String>,
public_base_url: Option<String>,
credentials: GcsCredentials,
}
impl GcsBackend {
pub fn builder() -> GcsBackendBuilder {
GcsBackendBuilder {
bucket: None,
public_base_url: None,
credentials: GcsCredentials::Adc,
}
}
}
impl GcsBackendBuilder {
pub fn bucket(mut self, bucket: &str) -> Self {
self.bucket = Some(bucket.to_string());
self
}
pub fn public_base_url(mut self, url: &str) -> Self {
self.public_base_url = Some(url.trim_end_matches('/').to_string());
self
}
pub fn credentials_json(mut self, json: impl Into<String>) -> Self {
self.credentials = GcsCredentials::ServiceAccountJson(json.into());
self
}
pub async fn build(self) -> Result<GcsBackend, Error> {
let bucket = self
.bucket
.ok_or_else(|| Error::ConfigError("bucket is required".into()))?;
let bucket_path = format!("projects/_/buckets/{bucket}");
let (storage, control) = match self.credentials {
GcsCredentials::Adc => {
let s = Storage::builder().build().await.map_err(|e| {
Error::ConfigError(format!("GCS Storage client build failed: {e}"))
})?;
let c = StorageControl::builder().build().await.map_err(|e| {
Error::ConfigError(format!("GCS StorageControl client build failed: {e}"))
})?;
(s, c)
}
GcsCredentials::ServiceAccountJson(ref json_str) => {
let json_value: serde_json::Value =
serde_json::from_str(json_str).map_err(|e| {
Error::ConfigError(format!("Invalid GCS credentials JSON: {e}"))
})?;
let creds = CredentialsBuilder::new(json_value.clone())
.build()
.map_err(|e| {
Error::ConfigError(format!("GCS credentials build failed: {e}"))
})?;
let creds2 = CredentialsBuilder::new(json_value).build().map_err(|e| {
Error::ConfigError(format!("GCS credentials build failed: {e}"))
})?;
let s = Storage::builder()
.with_credentials(creds)
.build()
.await
.map_err(|e| {
Error::ConfigError(format!("GCS Storage client build failed: {e}"))
})?;
let c = StorageControl::builder()
.with_credentials(creds2)
.build()
.await
.map_err(|e| {
Error::ConfigError(format!("GCS StorageControl client build failed: {e}"))
})?;
(s, c)
}
};
let public_base_url = self
.public_base_url
.unwrap_or_else(|| format!("https://storage.googleapis.com/{bucket}"));
Ok(GcsBackend {
storage,
control,
bucket_path,
public_base_url,
})
}
}
impl StorageBackend for GcsBackend {
async fn put_object(
&self,
key: &str,
data: Bytes,
content_type: &str,
options: &PutOptions,
) -> Result<(), Error> {
let mut builder = self
.storage
.write_object(&self.bucket_path, key, data)
.set_content_type(content_type);
if let Some(ref cc) = options.cache_control {
builder = builder.set_cache_control(cc.clone());
}
if let Some(ref cd) = options.content_disposition {
builder = builder.set_content_disposition(cd.clone());
}
builder
.send_buffered()
.await
.map_err(|e| Error::Backend(format!("GCS upload failed: {e}")))?;
Ok(())
}
async fn get_object(&self, key: &str) -> Result<RawDownloadResult, Error> {
let mut resp = self
.storage
.read_object(&self.bucket_path, key)
.send()
.await
.map_err(|e| Error::Backend(format!("GCS download failed: {e}")))?;
let mut contents = Vec::new();
while let Some(chunk) = resp
.next()
.await
.transpose()
.map_err(|e| Error::Backend(format!("GCS read stream failed: {e}")))?
{
contents.extend_from_slice(&chunk);
}
let metadata = self
.control
.get_object()
.set_bucket(&self.bucket_path)
.set_object(key)
.send()
.await
.ok();
let content_type = metadata
.as_ref()
.map(|m| m.content_type.clone())
.unwrap_or_else(|| "application/octet-stream".to_string());
let len = contents.len() as i64;
Ok(RawDownloadResult {
data: Bytes::from(contents),
content_type,
content_length: Some(len),
})
}
async fn delete_object(&self, key: &str) -> Result<(), Error> {
self.control
.delete_object()
.set_bucket(&self.bucket_path)
.set_object(key)
.send()
.await
.map_err(|e| Error::Backend(format!("GCS delete failed: {e}")))?;
Ok(())
}
async fn exists(&self, key: &str) -> Result<bool, Error> {
match self
.control
.get_object()
.set_bucket(&self.bucket_path)
.set_object(key)
.send()
.await
{
Ok(_) => Ok(true),
Err(e) => {
let msg = format!("{e:?}");
if msg.contains("404") || msg.contains("NotFound") || msg.contains("not found") {
Ok(false)
} else {
Err(Error::Backend(format!("GCS exists check failed: {e}")))
}
}
}
}
async fn presigned_get_url(&self, _key: &str, _expires_in_secs: u64) -> Result<String, Error> {
Err(Error::PresignNotSupported)
}
async fn presigned_put_url(
&self,
_key: &str,
_content_type: &str,
_expires_in_secs: u64,
) -> Result<String, Error> {
Err(Error::PresignNotSupported)
}
fn public_url(&self, key: &str) -> String {
format!("{}/{key}", self.public_base_url)
}
async fn test_connection(&self) -> Result<(), Error> {
self.control
.get_bucket()
.set_name(&self.bucket_path)
.send()
.await
.map_err(|e| Error::Backend(format!("GCS connection test failed: {e}")))?;
Ok(())
}
}