use std::path::{Path, PathBuf};
use google_cloud_storage::client::{Client, ClientConfig};
use google_cloud_storage::http::objects::delete::DeleteObjectRequest;
use google_cloud_storage::http::objects::download::Range;
use google_cloud_storage::http::objects::get::GetObjectRequest;
use google_cloud_storage::http::objects::list::ListObjectsRequest;
use google_cloud_storage::http::objects::upload::{Media, UploadObjectRequest, UploadType};
use tokio::runtime::Runtime;
use crate::errors::StorageError;
use crate::io::storage::uri::{format_bucket_uri, parse_bucket_uri, BucketLocation};
use crate::io::storage::{planner, ConditionalWrite, ObjectRef, StorageClient, StoredObject};
use crate::FloeResult;
pub struct GcsClient {
bucket: String,
client: Client,
runtime: Runtime,
}
impl GcsClient {
pub fn new(bucket: String) -> FloeResult<Self> {
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.map_err(|err| Box::new(StorageError(format!("gcs runtime init failed: {err}"))))?;
let client = runtime.block_on(async {
let config = ClientConfig::default()
.with_auth()
.await
.map_err(|err| Box::new(StorageError(format!("gcs auth init failed: {err}"))))?;
Ok::<_, Box<dyn std::error::Error + Send + Sync>>(Client::new(config))
})?;
Ok(Self {
bucket,
client,
runtime,
})
}
fn bucket(&self) -> &str {
self.bucket.as_str()
}
}
impl StorageClient for GcsClient {
fn list(&self, prefix_or_path: &str) -> FloeResult<Vec<ObjectRef>> {
let bucket = self.bucket().to_string();
let prefix = prefix_or_path.trim_start_matches('/').to_string();
let client = self.client.clone();
self.runtime.block_on(async move {
let mut refs = Vec::new();
let mut page_token = None;
loop {
let request = ListObjectsRequest {
bucket: bucket.clone(),
prefix: if prefix.is_empty() {
None
} else {
Some(prefix.clone())
},
page_token,
..Default::default()
};
let response = client.list_objects(&request).await.map_err(|err| {
Box::new(StorageError(format!(
"gcs list objects failed for bucket {}: {err}",
bucket
))) as Box<dyn std::error::Error + Send + Sync>
})?;
if let Some(items) = response.items {
for object in items {
let key = object.name.clone();
let uri = format_gcs_uri(&bucket, &key);
refs.push(planner::object_ref(
uri,
key,
object.updated.map(|value| value.to_string()),
Some(object.size as u64),
));
}
}
match response.next_page_token {
Some(token) if !token.is_empty() => {
page_token = Some(token);
}
_ => break,
}
}
Ok(planner::stable_sort_refs(refs))
})
}
fn download_to_temp(&self, uri: &str, temp_dir: &Path) -> FloeResult<PathBuf> {
let location = parse_gcs_uri(uri)?;
let bucket = location.bucket;
let key = location.key;
let dest = planner::temp_path_for_key(temp_dir, &key);
let dest_clone = dest.clone();
let client = self.client.clone();
self.runtime.block_on(async move {
let data = client
.download_object(
&GetObjectRequest {
bucket,
object: key,
..Default::default()
},
&Range::default(),
)
.await
.map_err(|err| {
Box::new(StorageError(format!("gcs download failed: {err}")))
as Box<dyn std::error::Error + Send + Sync>
})?;
if let Some(parent) = dest_clone.parent() {
tokio::fs::create_dir_all(parent).await?;
}
tokio::fs::write(&dest_clone, data).await?;
Ok::<(), Box<dyn std::error::Error + Send + Sync>>(())
})?;
Ok(dest)
}
fn upload_from_path(&self, local_path: &Path, uri: &str) -> FloeResult<()> {
let location = parse_gcs_uri(uri)?;
let path = local_path.to_path_buf();
let client = self.client.clone();
self.runtime.block_on(async move {
let data = tokio::fs::read(path).await?;
let upload_type = UploadType::Simple(Media::new(location.key.clone()));
let request = UploadObjectRequest {
bucket: location.bucket,
..Default::default()
};
client
.upload_object(&request, data, &upload_type)
.await
.map_err(|err| {
Box::new(StorageError(format!("gcs upload failed: {err}")))
as Box<dyn std::error::Error + Send + Sync>
})?;
Ok(())
})
}
fn resolve_uri(&self, path: &str) -> FloeResult<String> {
Ok(format_gcs_uri(self.bucket(), path.trim_start_matches('/')))
}
fn copy_object(&self, src_uri: &str, dst_uri: &str) -> FloeResult<()> {
planner::copy_via_temp(self, src_uri, dst_uri)
}
fn delete_object(&self, uri: &str) -> FloeResult<()> {
let location = parse_gcs_uri(uri)?;
let client = self.client.clone();
self.runtime.block_on(async move {
client
.delete_object(&DeleteObjectRequest {
bucket: location.bucket,
object: location.key,
..Default::default()
})
.await
.map_err(|err| {
Box::new(StorageError(format!("gcs delete failed: {err}")))
as Box<dyn std::error::Error + Send + Sync>
})?;
Ok(())
})
}
fn exists(&self, uri: &str) -> FloeResult<bool> {
let location = parse_gcs_uri(uri)?;
planner::exists_by_key(self, &location.key)
}
fn read_object(&self, uri: &str) -> FloeResult<Option<StoredObject>> {
let location = parse_gcs_uri(uri)?;
let client = self.client.clone();
self.runtime.block_on(async move {
let object = client
.get_object(&GetObjectRequest {
bucket: location.bucket.clone(),
object: location.key.clone(),
..Default::default()
})
.await;
let object = match object {
Ok(object) => object,
Err(err) if is_not_found(&err) => return Ok(None),
Err(err) => {
return Err(
Box::new(StorageError(format!("gcs get object failed: {err}")))
as Box<dyn std::error::Error + Send + Sync>,
)
}
};
let data = client
.download_object(
&GetObjectRequest {
bucket: location.bucket,
object: location.key,
..Default::default()
},
&Range::default(),
)
.await
.map_err(|err| {
Box::new(StorageError(format!("gcs download failed: {err}")))
as Box<dyn std::error::Error + Send + Sync>
})?;
Ok(Some(StoredObject {
body: data,
version: object.generation.to_string(),
}))
})
}
fn write_object_conditional(
&self,
uri: &str,
expected_version: Option<&str>,
body: &[u8],
) -> FloeResult<ConditionalWrite> {
let location = parse_gcs_uri(uri)?;
let client = self.client.clone();
let data = body.to_vec();
let generation = expected_version
.map(str::parse::<i64>)
.transpose()
.map_err(|err| Box::new(StorageError(format!("invalid gcs generation: {err}"))))?;
self.runtime.block_on(async move {
let upload_type = UploadType::Simple(Media::new(location.key.clone()));
let request = UploadObjectRequest {
bucket: location.bucket,
if_generation_match: Some(generation.unwrap_or(0)),
..Default::default()
};
match client.upload_object(&request, data, &upload_type).await {
Ok(object) => Ok(ConditionalWrite::Written {
version: object.generation.to_string(),
}),
Err(err) if is_precondition(&err) => Ok(ConditionalWrite::Conflict),
Err(err) => Err(Box::new(StorageError(format!("gcs upload failed: {err}")))
as Box<dyn std::error::Error + Send + Sync>),
}
})
}
fn delete_object_conditional(
&self,
uri: &str,
expected_version: Option<&str>,
) -> FloeResult<ConditionalWrite> {
let Some(expected_version) = expected_version else {
return Ok(ConditionalWrite::Written {
version: "deleted".to_string(),
});
};
let location = parse_gcs_uri(uri)?;
let client = self.client.clone();
let generation = expected_version
.parse::<i64>()
.map(Some)
.map_err(|err| Box::new(StorageError(format!("invalid gcs generation: {err}"))))?;
self.runtime.block_on(async move {
match client
.delete_object(&DeleteObjectRequest {
bucket: location.bucket,
object: location.key,
if_generation_match: generation,
..Default::default()
})
.await
{
Ok(_) => Ok(ConditionalWrite::Written {
version: "deleted".to_string(),
}),
Err(err) if is_precondition(&err) => Ok(ConditionalWrite::Conflict),
Err(err) if is_not_found(&err) => Ok(ConditionalWrite::Written {
version: "deleted".to_string(),
}),
Err(err) => Err(Box::new(StorageError(format!("gcs delete failed: {err}")))
as Box<dyn std::error::Error + Send + Sync>),
}
})
}
}
fn is_not_found<E: std::fmt::Display>(err: &E) -> bool {
let text = err.to_string();
text.contains("404") || text.contains("NotFound")
}
fn is_precondition<E: std::fmt::Display>(err: &E) -> bool {
let text = err.to_string();
text.contains("412") || text.contains("condition") || text.contains("Precondition")
}
pub fn parse_gcs_uri(uri: &str) -> FloeResult<GcsLocation> {
parse_bucket_uri("gs", uri)
}
pub fn format_gcs_uri(bucket: &str, key: &str) -> String {
format_bucket_uri("gs", bucket, key)
}
pub type GcsLocation = BucketLocation;