use super::service::{validate_object_name, ObjectInfo, Service};
use crate::errors::Result;
use google_cloud_storage::client::google_cloud_auth::credentials::CredentialsFile;
use google_cloud_storage::client::{Client, ClientConfig};
use google_cloud_storage::http::error::ErrorResponse;
use google_cloud_storage::http::Error as GcsError;
use google_cloud_storage::http::{self, objects};
use tokio::runtime::Runtime;
pub(in crate::server) struct GcpService {
client: Client,
rt: Runtime,
bucket: String,
}
fn is_http_error<T>(query: u16, res: &std::result::Result<T, http::Error>) -> bool {
match res {
Err(GcsError::Response(ErrorResponse { code, .. })) => *code == query,
Err(GcsError::HttpClient(e)) => e.status().map(|s| s.as_u16()) == Some(query),
_ => false,
}
}
impl GcpService {
pub(in crate::server) fn new(bucket: String, credential_path: Option<String>) -> Result<Self> {
let rt = Runtime::new()?;
let config: ClientConfig = if let Some(credentials) = credential_path {
let credentials = rt.block_on(CredentialsFile::new_from_file(credentials))?;
rt.block_on(ClientConfig::default().with_credentials(credentials))?
} else {
rt.block_on(ClientConfig::default().with_auth())?
};
Ok(Self {
client: Client::new(config),
rt,
bucket,
})
}
}
impl Service for GcpService {
fn put(&mut self, name: &str, value: &[u8]) -> Result<()> {
validate_object_name(name);
let upload_type =
objects::upload::UploadType::Simple(objects::upload::Media::new(name.to_string()));
self.rt.block_on(self.client.upload_object(
&objects::upload::UploadObjectRequest {
bucket: self.bucket.clone(),
..Default::default()
},
value.to_vec(),
&upload_type,
))?;
Ok(())
}
fn get(&mut self, name: &str) -> Result<Option<Vec<u8>>> {
validate_object_name(name);
let download_res = self.rt.block_on(self.client.download_object(
&objects::get::GetObjectRequest {
bucket: self.bucket.clone(),
object: name.to_string(),
..Default::default()
},
&objects::download::Range::default(),
));
if is_http_error(404, &download_res) {
Ok(None)
} else {
Ok(Some(download_res?))
}
}
fn del(&mut self, name: &str) -> Result<()> {
validate_object_name(name);
let del_res = self.rt.block_on(self.client.delete_object(
&objects::delete::DeleteObjectRequest {
bucket: self.bucket.clone(),
object: name.to_string(),
..Default::default()
},
));
if !is_http_error(404, &del_res) {
del_res?;
}
Ok(())
}
fn list<'a>(&'a mut self, prefix: &str) -> Box<dyn Iterator<Item = Result<ObjectInfo>> + 'a> {
validate_object_name(prefix);
Box::new(ObjectIterator {
service: self,
prefix: prefix.to_string(),
last_response: None,
next_index: 0,
})
}
fn compare_and_swap(
&mut self,
name: &str,
existing_value: Option<Vec<u8>>,
new_value: Vec<u8>,
) -> Result<bool> {
validate_object_name(name);
let get_res = self
.rt
.block_on(self.client.get_object(&objects::get::GetObjectRequest {
bucket: self.bucket.clone(),
object: name.to_string(),
..Default::default()
}));
let generation = if is_http_error(404, &get_res) {
if existing_value.is_some() {
return Ok(false);
}
0
} else {
get_res?.generation
};
if generation > 0 {
let data = self.rt.block_on(self.client.download_object(
&objects::get::GetObjectRequest {
bucket: self.bucket.clone(),
object: name.to_string(),
generation: Some(generation),
..Default::default()
},
&objects::download::Range::default(),
))?;
if Some(data) != existing_value {
return Ok(false);
}
}
#[cfg(test)]
if name.ends_with("-racing-delete") {
println!("deleting object {name}");
let del_res = self.rt.block_on(self.client.delete_object(
&objects::delete::DeleteObjectRequest {
bucket: self.bucket.clone(),
object: name.to_string(),
..Default::default()
},
));
if !is_http_error(404, &del_res) {
del_res?;
}
}
#[cfg(test)]
if name.ends_with("-racing-put") {
println!("changing object {name}");
let upload_type =
objects::upload::UploadType::Simple(objects::upload::Media::new(name.to_string()));
self.rt.block_on(self.client.upload_object(
&objects::upload::UploadObjectRequest {
bucket: self.bucket.clone(),
..Default::default()
},
b"CHANGED".to_vec(),
&upload_type,
))?;
}
let upload_type =
objects::upload::UploadType::Simple(objects::upload::Media::new(name.to_string()));
let upload_res = self.rt.block_on(self.client.upload_object(
&objects::upload::UploadObjectRequest {
bucket: self.bucket.clone(),
if_generation_match: Some(generation),
..Default::default()
},
new_value.to_vec(),
&upload_type,
));
if is_http_error(412, &upload_res) {
Ok(false)
} else {
upload_res?;
Ok(true)
}
}
}
struct ObjectIterator<'a> {
service: &'a mut GcpService,
prefix: String,
last_response: Option<objects::list::ListObjectsResponse>,
next_index: usize,
}
impl ObjectIterator<'_> {
fn fetch_batch(&mut self) -> Result<()> {
let mut page_token = None;
if let Some(ref resp) = self.last_response {
page_token.clone_from(&resp.next_page_token);
}
self.last_response = Some(self.service.rt.block_on(self.service.client.list_objects(
&objects::list::ListObjectsRequest {
bucket: self.service.bucket.clone(),
prefix: Some(self.prefix.clone()),
page_token,
#[cfg(test)] max_results: Some(6),
..Default::default()
},
))?);
self.next_index = 0;
Ok(())
}
}
impl Iterator for ObjectIterator<'_> {
type Item = Result<ObjectInfo>;
fn next(&mut self) -> Option<Self::Item> {
if self.last_response.is_none() {
if let Err(e) = self.fetch_batch() {
return Some(Err(e));
}
}
if let Some(ref result) = self.last_response {
if let Some(ref items) = result.items {
if self.next_index < items.len() {
let obj = &items[self.next_index];
self.next_index += 1;
let creation = obj.time_created.map(|t| t.unix_timestamp()).unwrap_or(0);
let creation: u64 = creation.try_into().unwrap_or(0);
return Some(Ok(ObjectInfo {
name: obj.name.clone(),
creation,
}));
} else if result.next_page_token.is_some() {
if let Err(e) = self.fetch_batch() {
return Some(Err(e));
}
return self.next();
}
}
}
None
}
}
#[cfg(test)]
mod tests {
use super::*;
fn make_service() -> Option<GcpService> {
let Ok(bucket) = std::env::var("GCP_TEST_BUCKET") else {
return None;
};
let Ok(credential_path) = std::env::var("GCP_TEST_CREDENTIAL_PATH") else {
return None;
};
Some(GcpService::new(bucket, Some(credential_path)).unwrap())
}
crate::server::cloud::test::service_tests!(make_service());
}