Skip to main content

floe_core/io/storage/providers/
gcs.rs

1use std::path::{Path, PathBuf};
2
3use google_cloud_storage::client::{Client, ClientConfig};
4use google_cloud_storage::http::objects::delete::DeleteObjectRequest;
5use google_cloud_storage::http::objects::download::Range;
6use google_cloud_storage::http::objects::get::GetObjectRequest;
7use google_cloud_storage::http::objects::list::ListObjectsRequest;
8use google_cloud_storage::http::objects::upload::{Media, UploadObjectRequest, UploadType};
9use tokio::runtime::Runtime;
10
11use crate::errors::StorageError;
12use crate::io::storage::uri::{format_bucket_uri, parse_bucket_uri, BucketLocation};
13use crate::io::storage::{planner, ConditionalWrite, ObjectRef, StorageClient, StoredObject};
14use crate::FloeResult;
15
16pub struct GcsClient {
17    bucket: String,
18    client: Client,
19    runtime: Runtime,
20}
21
22impl GcsClient {
23    pub fn new(bucket: String) -> FloeResult<Self> {
24        let runtime = tokio::runtime::Builder::new_current_thread()
25            .enable_all()
26            .build()
27            .map_err(|err| Box::new(StorageError(format!("gcs runtime init failed: {err}"))))?;
28        let client = runtime.block_on(async {
29            let config = ClientConfig::default()
30                .with_auth()
31                .await
32                .map_err(|err| Box::new(StorageError(format!("gcs auth init failed: {err}"))))?;
33            Ok::<_, Box<dyn std::error::Error + Send + Sync>>(Client::new(config))
34        })?;
35        Ok(Self {
36            bucket,
37            client,
38            runtime,
39        })
40    }
41
42    fn bucket(&self) -> &str {
43        self.bucket.as_str()
44    }
45}
46
47impl StorageClient for GcsClient {
48    fn list(&self, prefix_or_path: &str) -> FloeResult<Vec<ObjectRef>> {
49        let bucket = self.bucket().to_string();
50        let prefix = prefix_or_path.trim_start_matches('/').to_string();
51        let client = self.client.clone();
52        self.runtime.block_on(async move {
53            let mut refs = Vec::new();
54            let mut page_token = None;
55            loop {
56                let request = ListObjectsRequest {
57                    bucket: bucket.clone(),
58                    prefix: if prefix.is_empty() {
59                        None
60                    } else {
61                        Some(prefix.clone())
62                    },
63                    page_token,
64                    ..Default::default()
65                };
66                let response = client.list_objects(&request).await.map_err(|err| {
67                    Box::new(StorageError(format!(
68                        "gcs list objects failed for bucket {}: {err}",
69                        bucket
70                    ))) as Box<dyn std::error::Error + Send + Sync>
71                })?;
72                if let Some(items) = response.items {
73                    for object in items {
74                        let key = object.name.clone();
75                        let uri = format_gcs_uri(&bucket, &key);
76                        refs.push(planner::object_ref(
77                            uri,
78                            key,
79                            object.updated.map(|value| value.to_string()),
80                            Some(object.size as u64),
81                        ));
82                    }
83                }
84                match response.next_page_token {
85                    Some(token) if !token.is_empty() => {
86                        page_token = Some(token);
87                    }
88                    _ => break,
89                }
90            }
91            Ok(planner::stable_sort_refs(refs))
92        })
93    }
94
95    fn download_to_temp(&self, uri: &str, temp_dir: &Path) -> FloeResult<PathBuf> {
96        let location = parse_gcs_uri(uri)?;
97        let bucket = location.bucket;
98        let key = location.key;
99        let dest = planner::temp_path_for_key(temp_dir, &key);
100        let dest_clone = dest.clone();
101        let client = self.client.clone();
102        self.runtime.block_on(async move {
103            let data = client
104                .download_object(
105                    &GetObjectRequest {
106                        bucket,
107                        object: key,
108                        ..Default::default()
109                    },
110                    &Range::default(),
111                )
112                .await
113                .map_err(|err| {
114                    Box::new(StorageError(format!("gcs download failed: {err}")))
115                        as Box<dyn std::error::Error + Send + Sync>
116                })?;
117            if let Some(parent) = dest_clone.parent() {
118                tokio::fs::create_dir_all(parent).await?;
119            }
120            tokio::fs::write(&dest_clone, data).await?;
121            Ok::<(), Box<dyn std::error::Error + Send + Sync>>(())
122        })?;
123        Ok(dest)
124    }
125
126    fn upload_from_path(&self, local_path: &Path, uri: &str) -> FloeResult<()> {
127        let location = parse_gcs_uri(uri)?;
128        let path = local_path.to_path_buf();
129        let client = self.client.clone();
130        self.runtime.block_on(async move {
131            let data = tokio::fs::read(path).await?;
132            let upload_type = UploadType::Simple(Media::new(location.key.clone()));
133            let request = UploadObjectRequest {
134                bucket: location.bucket,
135                ..Default::default()
136            };
137            client
138                .upload_object(&request, data, &upload_type)
139                .await
140                .map_err(|err| {
141                    Box::new(StorageError(format!("gcs upload failed: {err}")))
142                        as Box<dyn std::error::Error + Send + Sync>
143                })?;
144            Ok(())
145        })
146    }
147
148    fn resolve_uri(&self, path: &str) -> FloeResult<String> {
149        Ok(format_gcs_uri(self.bucket(), path.trim_start_matches('/')))
150    }
151
152    fn copy_object(&self, src_uri: &str, dst_uri: &str) -> FloeResult<()> {
153        planner::copy_via_temp(self, src_uri, dst_uri)
154    }
155
156    fn delete_object(&self, uri: &str) -> FloeResult<()> {
157        let location = parse_gcs_uri(uri)?;
158        let client = self.client.clone();
159        self.runtime.block_on(async move {
160            client
161                .delete_object(&DeleteObjectRequest {
162                    bucket: location.bucket,
163                    object: location.key,
164                    ..Default::default()
165                })
166                .await
167                .map_err(|err| {
168                    Box::new(StorageError(format!("gcs delete failed: {err}")))
169                        as Box<dyn std::error::Error + Send + Sync>
170                })?;
171            Ok(())
172        })
173    }
174
175    fn exists(&self, uri: &str) -> FloeResult<bool> {
176        let location = parse_gcs_uri(uri)?;
177        planner::exists_by_key(self, &location.key)
178    }
179
180    fn read_object(&self, uri: &str) -> FloeResult<Option<StoredObject>> {
181        let location = parse_gcs_uri(uri)?;
182        let client = self.client.clone();
183        self.runtime.block_on(async move {
184            let object = client
185                .get_object(&GetObjectRequest {
186                    bucket: location.bucket.clone(),
187                    object: location.key.clone(),
188                    ..Default::default()
189                })
190                .await;
191            let object = match object {
192                Ok(object) => object,
193                Err(err) if is_not_found(&err) => return Ok(None),
194                Err(err) => {
195                    return Err(
196                        Box::new(StorageError(format!("gcs get object failed: {err}")))
197                            as Box<dyn std::error::Error + Send + Sync>,
198                    )
199                }
200            };
201            let data = client
202                .download_object(
203                    &GetObjectRequest {
204                        bucket: location.bucket,
205                        object: location.key,
206                        ..Default::default()
207                    },
208                    &Range::default(),
209                )
210                .await
211                .map_err(|err| {
212                    Box::new(StorageError(format!("gcs download failed: {err}")))
213                        as Box<dyn std::error::Error + Send + Sync>
214                })?;
215            Ok(Some(StoredObject {
216                body: data,
217                version: object.generation.to_string(),
218            }))
219        })
220    }
221
222    fn write_object_conditional(
223        &self,
224        uri: &str,
225        expected_version: Option<&str>,
226        body: &[u8],
227    ) -> FloeResult<ConditionalWrite> {
228        let location = parse_gcs_uri(uri)?;
229        let client = self.client.clone();
230        let data = body.to_vec();
231        let generation = expected_version
232            .map(str::parse::<i64>)
233            .transpose()
234            .map_err(|err| Box::new(StorageError(format!("invalid gcs generation: {err}"))))?;
235        self.runtime.block_on(async move {
236            let upload_type = UploadType::Simple(Media::new(location.key.clone()));
237            let request = UploadObjectRequest {
238                bucket: location.bucket,
239                if_generation_match: Some(generation.unwrap_or(0)),
240                ..Default::default()
241            };
242            match client.upload_object(&request, data, &upload_type).await {
243                Ok(object) => Ok(ConditionalWrite::Written {
244                    version: object.generation.to_string(),
245                }),
246                Err(err) if is_precondition(&err) => Ok(ConditionalWrite::Conflict),
247                Err(err) => Err(Box::new(StorageError(format!("gcs upload failed: {err}")))
248                    as Box<dyn std::error::Error + Send + Sync>),
249            }
250        })
251    }
252
253    fn delete_object_conditional(
254        &self,
255        uri: &str,
256        expected_version: Option<&str>,
257    ) -> FloeResult<ConditionalWrite> {
258        let Some(expected_version) = expected_version else {
259            return Ok(ConditionalWrite::Written {
260                version: "deleted".to_string(),
261            });
262        };
263        let location = parse_gcs_uri(uri)?;
264        let client = self.client.clone();
265        let generation = expected_version
266            .parse::<i64>()
267            .map(Some)
268            .map_err(|err| Box::new(StorageError(format!("invalid gcs generation: {err}"))))?;
269        self.runtime.block_on(async move {
270            match client
271                .delete_object(&DeleteObjectRequest {
272                    bucket: location.bucket,
273                    object: location.key,
274                    if_generation_match: generation,
275                    ..Default::default()
276                })
277                .await
278            {
279                Ok(_) => Ok(ConditionalWrite::Written {
280                    version: "deleted".to_string(),
281                }),
282                Err(err) if is_precondition(&err) => Ok(ConditionalWrite::Conflict),
283                Err(err) if is_not_found(&err) => Ok(ConditionalWrite::Written {
284                    version: "deleted".to_string(),
285                }),
286                Err(err) => Err(Box::new(StorageError(format!("gcs delete failed: {err}")))
287                    as Box<dyn std::error::Error + Send + Sync>),
288            }
289        })
290    }
291}
292
293fn is_not_found<E: std::fmt::Display>(err: &E) -> bool {
294    let text = err.to_string();
295    text.contains("404") || text.contains("NotFound")
296}
297
298fn is_precondition<E: std::fmt::Display>(err: &E) -> bool {
299    let text = err.to_string();
300    text.contains("412") || text.contains("condition") || text.contains("Precondition")
301}
302
303pub fn parse_gcs_uri(uri: &str) -> FloeResult<GcsLocation> {
304    parse_bucket_uri("gs", uri)
305}
306
307pub fn format_gcs_uri(bucket: &str, key: &str) -> String {
308    format_bucket_uri("gs", bucket, key)
309}
310
311pub type GcsLocation = BucketLocation;