taskchampion/server/cloud/
gcp.rs1use super::service::{validate_object_name, ObjectInfo, Service};
2use crate::errors::Result;
3use google_cloud_storage::client::google_cloud_auth::credentials::CredentialsFile;
4use google_cloud_storage::client::{Client, ClientConfig};
5use google_cloud_storage::http::error::ErrorResponse;
6use google_cloud_storage::http::Error as GcsError;
7use google_cloud_storage::http::{self, objects};
8use tokio::runtime::Runtime;
9
10pub(in crate::server) struct GcpService {
12 client: Client,
13 rt: Runtime,
14 bucket: String,
15}
16
17fn is_http_error<T>(query: u16, res: &std::result::Result<T, http::Error>) -> bool {
19 match res {
20 Err(GcsError::Response(ErrorResponse { code, .. })) => *code == query,
22 Err(GcsError::HttpClient(e)) => e.status().map(|s| s.as_u16()) == Some(query),
24 _ => false,
25 }
26}
27
28impl GcpService {
29 pub(in crate::server) fn new(bucket: String, credential_path: Option<String>) -> Result<Self> {
30 let rt = Runtime::new()?;
31
32 let config: ClientConfig = if let Some(credentials) = credential_path {
33 let credentials = rt.block_on(CredentialsFile::new_from_file(credentials))?;
34 rt.block_on(ClientConfig::default().with_credentials(credentials))?
35 } else {
36 rt.block_on(ClientConfig::default().with_auth())?
37 };
38
39 Ok(Self {
40 client: Client::new(config),
41 rt,
42 bucket,
43 })
44 }
45}
46
47impl Service for GcpService {
48 fn put(&mut self, name: &str, value: &[u8]) -> Result<()> {
49 validate_object_name(name);
50 let upload_type =
51 objects::upload::UploadType::Simple(objects::upload::Media::new(name.to_string()));
52 self.rt.block_on(self.client.upload_object(
53 &objects::upload::UploadObjectRequest {
54 bucket: self.bucket.clone(),
55 ..Default::default()
56 },
57 value.to_vec(),
58 &upload_type,
59 ))?;
60 Ok(())
61 }
62
63 fn get(&mut self, name: &str) -> Result<Option<Vec<u8>>> {
64 validate_object_name(name);
65 let download_res = self.rt.block_on(self.client.download_object(
66 &objects::get::GetObjectRequest {
67 bucket: self.bucket.clone(),
68 object: name.to_string(),
69 ..Default::default()
70 },
71 &objects::download::Range::default(),
72 ));
73 if is_http_error(404, &download_res) {
74 Ok(None)
75 } else {
76 Ok(Some(download_res?))
77 }
78 }
79
80 fn del(&mut self, name: &str) -> Result<()> {
81 validate_object_name(name);
82 let del_res = self.rt.block_on(self.client.delete_object(
83 &objects::delete::DeleteObjectRequest {
84 bucket: self.bucket.clone(),
85 object: name.to_string(),
86 ..Default::default()
87 },
88 ));
89 if !is_http_error(404, &del_res) {
90 del_res?;
91 }
92 Ok(())
93 }
94
95 fn list<'a>(&'a mut self, prefix: &str) -> Box<dyn Iterator<Item = Result<ObjectInfo>> + 'a> {
96 validate_object_name(prefix);
97 Box::new(ObjectIterator {
98 service: self,
99 prefix: prefix.to_string(),
100 last_response: None,
101 next_index: 0,
102 })
103 }
104
105 fn compare_and_swap(
106 &mut self,
107 name: &str,
108 existing_value: Option<Vec<u8>>,
109 new_value: Vec<u8>,
110 ) -> Result<bool> {
111 validate_object_name(name);
112 let get_res = self
113 .rt
114 .block_on(self.client.get_object(&objects::get::GetObjectRequest {
115 bucket: self.bucket.clone(),
116 object: name.to_string(),
117 ..Default::default()
118 }));
119 let generation = if is_http_error(404, &get_res) {
121 if existing_value.is_some() {
123 return Ok(false);
124 }
125 0
127 } else {
128 get_res?.generation
129 };
130
131 if generation > 0 {
133 let data = self.rt.block_on(self.client.download_object(
134 &objects::get::GetObjectRequest {
135 bucket: self.bucket.clone(),
136 object: name.to_string(),
137 generation: Some(generation),
139 ..Default::default()
140 },
141 &objects::download::Range::default(),
142 ))?;
143 if Some(data) != existing_value {
144 return Ok(false);
145 }
146 }
147
148 #[cfg(test)]
151 if name.ends_with("-racing-delete") {
152 println!("deleting object {name}");
153 let del_res = self.rt.block_on(self.client.delete_object(
154 &objects::delete::DeleteObjectRequest {
155 bucket: self.bucket.clone(),
156 object: name.to_string(),
157 ..Default::default()
158 },
159 ));
160 if !is_http_error(404, &del_res) {
161 del_res?;
162 }
163 }
164
165 #[cfg(test)]
168 if name.ends_with("-racing-put") {
169 println!("changing object {name}");
170 let upload_type =
171 objects::upload::UploadType::Simple(objects::upload::Media::new(name.to_string()));
172 self.rt.block_on(self.client.upload_object(
173 &objects::upload::UploadObjectRequest {
174 bucket: self.bucket.clone(),
175 ..Default::default()
176 },
177 b"CHANGED".to_vec(),
178 &upload_type,
179 ))?;
180 }
181
182 let upload_type =
184 objects::upload::UploadType::Simple(objects::upload::Media::new(name.to_string()));
185 let upload_res = self.rt.block_on(self.client.upload_object(
186 &objects::upload::UploadObjectRequest {
187 bucket: self.bucket.clone(),
188 if_generation_match: Some(generation),
189 ..Default::default()
190 },
191 new_value.to_vec(),
192 &upload_type,
193 ));
194 if is_http_error(412, &upload_res) {
195 Ok(false)
198 } else {
199 upload_res?;
200 Ok(true)
201 }
202 }
203}
204
205struct ObjectIterator<'a> {
209 service: &'a mut GcpService,
210 prefix: String,
211 last_response: Option<objects::list::ListObjectsResponse>,
212 next_index: usize,
213}
214
215impl ObjectIterator<'_> {
216 fn fetch_batch(&mut self) -> Result<()> {
217 let mut page_token = None;
218 if let Some(ref resp) = self.last_response {
219 page_token.clone_from(&resp.next_page_token);
220 }
221 self.last_response = Some(self.service.rt.block_on(self.service.client.list_objects(
222 &objects::list::ListObjectsRequest {
223 bucket: self.service.bucket.clone(),
224 prefix: Some(self.prefix.clone()),
225 page_token,
226 #[cfg(test)] max_results: Some(6),
228 ..Default::default()
229 },
230 ))?);
231 self.next_index = 0;
232 Ok(())
233 }
234}
235
236impl Iterator for ObjectIterator<'_> {
237 type Item = Result<ObjectInfo>;
238 fn next(&mut self) -> Option<Self::Item> {
239 if self.last_response.is_none() {
241 if let Err(e) = self.fetch_batch() {
242 return Some(Err(e));
243 }
244 }
245 if let Some(ref result) = self.last_response {
246 if let Some(ref items) = result.items {
247 if self.next_index < items.len() {
248 let obj = &items[self.next_index];
250 self.next_index += 1;
251 let creation = obj.time_created.map(|t| t.unix_timestamp()).unwrap_or(0);
254 let creation: u64 = creation.try_into().unwrap_or(0);
255 return Some(Ok(ObjectInfo {
256 name: obj.name.clone(),
257 creation,
258 }));
259 } else if result.next_page_token.is_some() {
260 if let Err(e) = self.fetch_batch() {
262 return Some(Err(e));
263 }
264 return self.next();
265 }
266 }
267 }
268 None
269 }
270}
271
272#[cfg(test)]
273mod tests {
274 use super::*;
275
276 fn make_service() -> Option<GcpService> {
287 let Ok(bucket) = std::env::var("GCP_TEST_BUCKET") else {
288 return None;
289 };
290
291 let Ok(credential_path) = std::env::var("GCP_TEST_CREDENTIAL_PATH") else {
292 return None;
293 };
294
295 Some(GcpService::new(bucket, Some(credential_path)).unwrap())
296 }
297
298 crate::server::cloud::test::service_tests!(make_service());
299}