use std::{sync::Arc, time::Duration};
use async_trait::async_trait;
use bytes::Bytes;
use object_store::{ObjectStore, ObjectStoreExt, PutPayload, path::Path};
use url::Url;
use super::{StorageError, StorageService, checked_key};
#[derive(Clone)]
pub struct GcsService {
name: String,
bucket: String,
store: Arc<dyn ObjectStore>,
base_url: Url,
}
impl std::fmt::Debug for GcsService {
fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
formatter
.debug_struct("GcsService")
.field("name", &self.name)
.field("bucket", &self.bucket)
.finish_non_exhaustive()
}
}
impl GcsService {
pub fn new(
name: impl Into<String>,
bucket: impl Into<String>,
store: Arc<dyn ObjectStore>,
) -> Result<Self, StorageError> {
let bucket = bucket.into();
let base_url = Url::parse(&format!("https://gcs.local/{bucket}/"))
.map_err(|error| StorageError::InvalidUrl(error.to_string()))?;
Ok(Self {
name: name.into(),
bucket,
store,
base_url,
})
}
fn path_for(&self, key: &str) -> Result<Path, StorageError> {
let key = checked_key(key)?;
Ok(Path::from(key))
}
}
#[async_trait]
impl StorageService for GcsService {
fn name(&self) -> &str {
&self.name
}
async fn upload(&self, key: &str, data: Bytes) -> Result<(), StorageError> {
let path = self.path_for(key)?;
if self.exists(key).await? {
return Err(StorageError::DuplicateKey(key.to_owned()));
}
self.store
.put(&path, PutPayload::from(data))
.await
.map(|_| ())
.map_err(|error| StorageError::ObjectStore {
path: key.to_owned(),
message: error.to_string(),
})
}
async fn download(&self, key: &str) -> Result<Bytes, StorageError> {
let path = self.path_for(key)?;
match self.store.get(&path).await {
Ok(result) => result
.bytes()
.await
.map_err(|error| StorageError::ObjectStore {
path: key.to_owned(),
message: error.to_string(),
}),
Err(object_store::Error::NotFound { .. }) => {
Err(StorageError::NotFound(key.to_owned()))
}
Err(error) => Err(StorageError::ObjectStore {
path: key.to_owned(),
message: error.to_string(),
}),
}
}
async fn delete(&self, key: &str) -> Result<(), StorageError> {
let path = self.path_for(key)?;
match self.store.delete(&path).await {
Ok(()) => Ok(()),
Err(object_store::Error::NotFound { .. }) => Ok(()),
Err(error) => Err(StorageError::ObjectStore {
path: key.to_owned(),
message: error.to_string(),
}),
}
}
async fn exists(&self, key: &str) -> Result<bool, StorageError> {
let path = self.path_for(key)?;
match self.store.head(&path).await {
Ok(_) => Ok(true),
Err(object_store::Error::NotFound { .. }) => Ok(false),
Err(error) => Err(StorageError::ObjectStore {
path: key.to_owned(),
message: error.to_string(),
}),
}
}
async fn url(&self, key: &str, expires_in: Duration) -> Result<Url, StorageError> {
let key = checked_key(key)?;
let mut url = self.base_url.clone();
url.set_path(&format!("{}/{}", self.bucket, key));
url.query_pairs_mut()
.append_pair("service", &self.name)
.append_pair("expires_in", &expires_in.as_secs().to_string());
Ok(url)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_gcs_service_round_trip_with_in_memory_store() {
let service = GcsService::new(
"gcs",
"bucket",
Arc::new(object_store::memory::InMemory::new()),
)
.expect("service should build");
service
.upload("a.txt", Bytes::from_static(b"hello"))
.await
.expect("upload should succeed");
assert_eq!(
service
.download("a.txt")
.await
.expect("download should succeed"),
Bytes::from_static(b"hello")
);
}
#[tokio::test]
async fn test_gcs_service_url_includes_bucket() {
let service = GcsService::new(
"gcs",
"bucket",
Arc::new(object_store::memory::InMemory::new()),
)
.expect("service should build");
let url = service
.url("a.txt", Duration::from_secs(60))
.await
.expect("url should build");
assert_eq!(
url.as_str(),
"https://gcs.local/bucket/a.txt?service=gcs&expires_in=60"
);
}
}