taskchampion/server/cloud/
gcp.rs

1use super::service::{validate_object_name, ObjectInfo, Service};
2use crate::errors::Result;
3use google_cloud_storage::client::google_cloud_auth::credentials::CredentialsFile;
4use google_cloud_storage::client::{Client, ClientConfig};
5use google_cloud_storage::http::error::ErrorResponse;
6use google_cloud_storage::http::Error as GcsError;
7use google_cloud_storage::http::{self, objects};
8use tokio::runtime::Runtime;
9
10/// A [`Service`] implementation based on the Google Cloud Storage service.
11pub(in crate::server) struct GcpService {
12    client: Client,
13    rt: Runtime,
14    bucket: String,
15}
16
17/// Determine whether the given result contains an HTTP error with the given code.
18fn is_http_error<T>(query: u16, res: &std::result::Result<T, http::Error>) -> bool {
19    match res {
20        // Errors from RPC's.
21        Err(GcsError::Response(ErrorResponse { code, .. })) => *code == query,
22        // Errors from reqwest (downloads, uploads).
23        Err(GcsError::HttpClient(e)) => e.status().map(|s| s.as_u16()) == Some(query),
24        _ => false,
25    }
26}
27
28impl GcpService {
29    pub(in crate::server) fn new(bucket: String, credential_path: Option<String>) -> Result<Self> {
30        let rt = Runtime::new()?;
31
32        let config: ClientConfig = if let Some(credentials) = credential_path {
33            let credentials = rt.block_on(CredentialsFile::new_from_file(credentials))?;
34            rt.block_on(ClientConfig::default().with_credentials(credentials))?
35        } else {
36            rt.block_on(ClientConfig::default().with_auth())?
37        };
38
39        Ok(Self {
40            client: Client::new(config),
41            rt,
42            bucket,
43        })
44    }
45}
46
47impl Service for GcpService {
48    fn put(&mut self, name: &str, value: &[u8]) -> Result<()> {
49        validate_object_name(name);
50        let upload_type =
51            objects::upload::UploadType::Simple(objects::upload::Media::new(name.to_string()));
52        self.rt.block_on(self.client.upload_object(
53            &objects::upload::UploadObjectRequest {
54                bucket: self.bucket.clone(),
55                ..Default::default()
56            },
57            value.to_vec(),
58            &upload_type,
59        ))?;
60        Ok(())
61    }
62
63    fn get(&mut self, name: &str) -> Result<Option<Vec<u8>>> {
64        validate_object_name(name);
65        let download_res = self.rt.block_on(self.client.download_object(
66            &objects::get::GetObjectRequest {
67                bucket: self.bucket.clone(),
68                object: name.to_string(),
69                ..Default::default()
70            },
71            &objects::download::Range::default(),
72        ));
73        if is_http_error(404, &download_res) {
74            Ok(None)
75        } else {
76            Ok(Some(download_res?))
77        }
78    }
79
80    fn del(&mut self, name: &str) -> Result<()> {
81        validate_object_name(name);
82        let del_res = self.rt.block_on(self.client.delete_object(
83            &objects::delete::DeleteObjectRequest {
84                bucket: self.bucket.clone(),
85                object: name.to_string(),
86                ..Default::default()
87            },
88        ));
89        if !is_http_error(404, &del_res) {
90            del_res?;
91        }
92        Ok(())
93    }
94
95    fn list<'a>(&'a mut self, prefix: &str) -> Box<dyn Iterator<Item = Result<ObjectInfo>> + 'a> {
96        validate_object_name(prefix);
97        Box::new(ObjectIterator {
98            service: self,
99            prefix: prefix.to_string(),
100            last_response: None,
101            next_index: 0,
102        })
103    }
104
105    fn compare_and_swap(
106        &mut self,
107        name: &str,
108        existing_value: Option<Vec<u8>>,
109        new_value: Vec<u8>,
110    ) -> Result<bool> {
111        validate_object_name(name);
112        let get_res = self
113            .rt
114            .block_on(self.client.get_object(&objects::get::GetObjectRequest {
115                bucket: self.bucket.clone(),
116                object: name.to_string(),
117                ..Default::default()
118            }));
119        // Determine the object's generation. See https://cloud.google.com/storage/docs/metadata#generation-number
120        let generation = if is_http_error(404, &get_res) {
121            // If a value was expected, that expectation has not been met.
122            if existing_value.is_some() {
123                return Ok(false);
124            }
125            // Generation 0 indicates that the object does not yet exist.
126            0
127        } else {
128            get_res?.generation
129        };
130
131        // If the file existed, then verify its contents.
132        if generation > 0 {
133            let data = self.rt.block_on(self.client.download_object(
134                &objects::get::GetObjectRequest {
135                    bucket: self.bucket.clone(),
136                    object: name.to_string(),
137                    // Fetch the same generation.
138                    generation: Some(generation),
139                    ..Default::default()
140                },
141                &objects::download::Range::default(),
142            ))?;
143            if Some(data) != existing_value {
144                return Ok(false);
145            }
146        }
147
148        // When testing, an object named "$pfx-racing-delete" is deleted between get_object and
149        // put_object.
150        #[cfg(test)]
151        if name.ends_with("-racing-delete") {
152            println!("deleting object {name}");
153            let del_res = self.rt.block_on(self.client.delete_object(
154                &objects::delete::DeleteObjectRequest {
155                    bucket: self.bucket.clone(),
156                    object: name.to_string(),
157                    ..Default::default()
158                },
159            ));
160            if !is_http_error(404, &del_res) {
161                del_res?;
162            }
163        }
164
165        // When testing, if the object is named "$pfx-racing-put" then the value "CHANGED" is
166        // written to it between get_object and put_object.
167        #[cfg(test)]
168        if name.ends_with("-racing-put") {
169            println!("changing object {name}");
170            let upload_type =
171                objects::upload::UploadType::Simple(objects::upload::Media::new(name.to_string()));
172            self.rt.block_on(self.client.upload_object(
173                &objects::upload::UploadObjectRequest {
174                    bucket: self.bucket.clone(),
175                    ..Default::default()
176                },
177                b"CHANGED".to_vec(),
178                &upload_type,
179            ))?;
180        }
181
182        // Finally, put the new value with a condition that the generation hasn't changed.
183        let upload_type =
184            objects::upload::UploadType::Simple(objects::upload::Media::new(name.to_string()));
185        let upload_res = self.rt.block_on(self.client.upload_object(
186            &objects::upload::UploadObjectRequest {
187                bucket: self.bucket.clone(),
188                if_generation_match: Some(generation),
189                ..Default::default()
190            },
191            new_value.to_vec(),
192            &upload_type,
193        ));
194        if is_http_error(412, &upload_res) {
195            // A 412 indicates the precondition was not satisfied: the given generation
196            // is no longer the latest.
197            Ok(false)
198        } else {
199            upload_res?;
200            Ok(true)
201        }
202    }
203}
204
205/// An Iterator returning names of objects from `list_objects`.
206///
207/// This handles response pagination by fetching one page at a time.
208struct ObjectIterator<'a> {
209    service: &'a mut GcpService,
210    prefix: String,
211    last_response: Option<objects::list::ListObjectsResponse>,
212    next_index: usize,
213}
214
215impl ObjectIterator<'_> {
216    fn fetch_batch(&mut self) -> Result<()> {
217        let mut page_token = None;
218        if let Some(ref resp) = self.last_response {
219            page_token.clone_from(&resp.next_page_token);
220        }
221        self.last_response = Some(self.service.rt.block_on(self.service.client.list_objects(
222            &objects::list::ListObjectsRequest {
223                bucket: self.service.bucket.clone(),
224                prefix: Some(self.prefix.clone()),
225                page_token,
226                #[cfg(test)] // For testing, use a small page size.
227                max_results: Some(6),
228                ..Default::default()
229            },
230        ))?);
231        self.next_index = 0;
232        Ok(())
233    }
234}
235
236impl Iterator for ObjectIterator<'_> {
237    type Item = Result<ObjectInfo>;
238    fn next(&mut self) -> Option<Self::Item> {
239        // If the iterator is just starting, fetch the first response.
240        if self.last_response.is_none() {
241            if let Err(e) = self.fetch_batch() {
242                return Some(Err(e));
243            }
244        }
245        if let Some(ref result) = self.last_response {
246            if let Some(ref items) = result.items {
247                if self.next_index < items.len() {
248                    // Return a result from the existing response.
249                    let obj = &items[self.next_index];
250                    self.next_index += 1;
251                    // It's unclear when `time_created` would be None, so default to 0 in that case
252                    // or when the timestamp is not a valid u64 (before 1970).
253                    let creation = obj.time_created.map(|t| t.unix_timestamp()).unwrap_or(0);
254                    let creation: u64 = creation.try_into().unwrap_or(0);
255                    return Some(Ok(ObjectInfo {
256                        name: obj.name.clone(),
257                        creation,
258                    }));
259                } else if result.next_page_token.is_some() {
260                    // Fetch the next page and try again.
261                    if let Err(e) = self.fetch_batch() {
262                        return Some(Err(e));
263                    }
264                    return self.next();
265                }
266            }
267        }
268        None
269    }
270}
271
272#[cfg(test)]
273mod tests {
274    use super::*;
275
276    /// Make a service if `GCP_TEST_BUCKET` is set, as well as a function to put a unique prefix on
277    /// an object name, so that tests do not interfere with one another.
278    ///
279    /// Set up this bucket with a lifecyle policy to delete objects with age > 1 day. While passing
280    /// tests should correctly clean up after themselves, failing tests may leave objects in the
281    /// bucket.
282    ///
283    /// When the environment variable is not set, this returns false and the test does not run.
284    /// Note that the Rust test runner will still show "ok" for the test, as there is no way to
285    /// indicate anything else.
286    fn make_service() -> Option<GcpService> {
287        let Ok(bucket) = std::env::var("GCP_TEST_BUCKET") else {
288            return None;
289        };
290
291        let Ok(credential_path) = std::env::var("GCP_TEST_CREDENTIAL_PATH") else {
292            return None;
293        };
294
295        Some(GcpService::new(bucket, Some(credential_path)).unwrap())
296    }
297
298    crate::server::cloud::test::service_tests!(make_service());
299}