floe_core/io/storage/providers/
gcs.rs1use std::path::{Path, PathBuf};
2
3use google_cloud_storage::client::{Client, ClientConfig};
4use google_cloud_storage::http::objects::delete::DeleteObjectRequest;
5use google_cloud_storage::http::objects::download::Range;
6use google_cloud_storage::http::objects::get::GetObjectRequest;
7use google_cloud_storage::http::objects::list::ListObjectsRequest;
8use google_cloud_storage::http::objects::upload::{Media, UploadObjectRequest, UploadType};
9use tokio::runtime::Runtime;
10
11use crate::errors::StorageError;
12use crate::io::storage::uri::{format_bucket_uri, parse_bucket_uri, BucketLocation};
13use crate::io::storage::{planner, ConditionalWrite, ObjectRef, StorageClient, StoredObject};
14use crate::FloeResult;
15
16pub struct GcsClient {
17 bucket: String,
18 client: Client,
19 runtime: Runtime,
20}
21
22impl GcsClient {
23 pub fn new(bucket: String) -> FloeResult<Self> {
24 let runtime = tokio::runtime::Builder::new_current_thread()
25 .enable_all()
26 .build()
27 .map_err(|err| Box::new(StorageError(format!("gcs runtime init failed: {err}"))))?;
28 let client = runtime.block_on(async {
29 let config = ClientConfig::default()
30 .with_auth()
31 .await
32 .map_err(|err| Box::new(StorageError(format!("gcs auth init failed: {err}"))))?;
33 Ok::<_, Box<dyn std::error::Error + Send + Sync>>(Client::new(config))
34 })?;
35 Ok(Self {
36 bucket,
37 client,
38 runtime,
39 })
40 }
41
42 fn bucket(&self) -> &str {
43 self.bucket.as_str()
44 }
45}
46
47impl StorageClient for GcsClient {
48 fn list(&self, prefix_or_path: &str) -> FloeResult<Vec<ObjectRef>> {
49 let bucket = self.bucket().to_string();
50 let prefix = prefix_or_path.trim_start_matches('/').to_string();
51 let client = self.client.clone();
52 self.runtime.block_on(async move {
53 let mut refs = Vec::new();
54 let mut page_token = None;
55 loop {
56 let request = ListObjectsRequest {
57 bucket: bucket.clone(),
58 prefix: if prefix.is_empty() {
59 None
60 } else {
61 Some(prefix.clone())
62 },
63 page_token,
64 ..Default::default()
65 };
66 let response = client.list_objects(&request).await.map_err(|err| {
67 Box::new(StorageError(format!(
68 "gcs list objects failed for bucket {}: {err}",
69 bucket
70 ))) as Box<dyn std::error::Error + Send + Sync>
71 })?;
72 if let Some(items) = response.items {
73 for object in items {
74 let key = object.name.clone();
75 let uri = format_gcs_uri(&bucket, &key);
76 refs.push(planner::object_ref(
77 uri,
78 key,
79 object.updated.map(|value| value.to_string()),
80 Some(object.size as u64),
81 ));
82 }
83 }
84 match response.next_page_token {
85 Some(token) if !token.is_empty() => {
86 page_token = Some(token);
87 }
88 _ => break,
89 }
90 }
91 Ok(planner::stable_sort_refs(refs))
92 })
93 }
94
95 fn download_to_temp(&self, uri: &str, temp_dir: &Path) -> FloeResult<PathBuf> {
96 let location = parse_gcs_uri(uri)?;
97 let bucket = location.bucket;
98 let key = location.key;
99 let dest = planner::temp_path_for_key(temp_dir, &key);
100 let dest_clone = dest.clone();
101 let client = self.client.clone();
102 self.runtime.block_on(async move {
103 let data = client
104 .download_object(
105 &GetObjectRequest {
106 bucket,
107 object: key,
108 ..Default::default()
109 },
110 &Range::default(),
111 )
112 .await
113 .map_err(|err| {
114 Box::new(StorageError(format!("gcs download failed: {err}")))
115 as Box<dyn std::error::Error + Send + Sync>
116 })?;
117 if let Some(parent) = dest_clone.parent() {
118 tokio::fs::create_dir_all(parent).await?;
119 }
120 tokio::fs::write(&dest_clone, data).await?;
121 Ok::<(), Box<dyn std::error::Error + Send + Sync>>(())
122 })?;
123 Ok(dest)
124 }
125
126 fn upload_from_path(&self, local_path: &Path, uri: &str) -> FloeResult<()> {
127 let location = parse_gcs_uri(uri)?;
128 let path = local_path.to_path_buf();
129 let client = self.client.clone();
130 self.runtime.block_on(async move {
131 let data = tokio::fs::read(path).await?;
132 let upload_type = UploadType::Simple(Media::new(location.key.clone()));
133 let request = UploadObjectRequest {
134 bucket: location.bucket,
135 ..Default::default()
136 };
137 client
138 .upload_object(&request, data, &upload_type)
139 .await
140 .map_err(|err| {
141 Box::new(StorageError(format!("gcs upload failed: {err}")))
142 as Box<dyn std::error::Error + Send + Sync>
143 })?;
144 Ok(())
145 })
146 }
147
148 fn resolve_uri(&self, path: &str) -> FloeResult<String> {
149 Ok(format_gcs_uri(self.bucket(), path.trim_start_matches('/')))
150 }
151
152 fn copy_object(&self, src_uri: &str, dst_uri: &str) -> FloeResult<()> {
153 planner::copy_via_temp(self, src_uri, dst_uri)
154 }
155
156 fn delete_object(&self, uri: &str) -> FloeResult<()> {
157 let location = parse_gcs_uri(uri)?;
158 let client = self.client.clone();
159 self.runtime.block_on(async move {
160 client
161 .delete_object(&DeleteObjectRequest {
162 bucket: location.bucket,
163 object: location.key,
164 ..Default::default()
165 })
166 .await
167 .map_err(|err| {
168 Box::new(StorageError(format!("gcs delete failed: {err}")))
169 as Box<dyn std::error::Error + Send + Sync>
170 })?;
171 Ok(())
172 })
173 }
174
175 fn exists(&self, uri: &str) -> FloeResult<bool> {
176 let location = parse_gcs_uri(uri)?;
177 planner::exists_by_key(self, &location.key)
178 }
179
180 fn read_object(&self, uri: &str) -> FloeResult<Option<StoredObject>> {
181 let location = parse_gcs_uri(uri)?;
182 let client = self.client.clone();
183 self.runtime.block_on(async move {
184 let object = client
185 .get_object(&GetObjectRequest {
186 bucket: location.bucket.clone(),
187 object: location.key.clone(),
188 ..Default::default()
189 })
190 .await;
191 let object = match object {
192 Ok(object) => object,
193 Err(err) if is_not_found(&err) => return Ok(None),
194 Err(err) => {
195 return Err(
196 Box::new(StorageError(format!("gcs get object failed: {err}")))
197 as Box<dyn std::error::Error + Send + Sync>,
198 )
199 }
200 };
201 let data = client
202 .download_object(
203 &GetObjectRequest {
204 bucket: location.bucket,
205 object: location.key,
206 ..Default::default()
207 },
208 &Range::default(),
209 )
210 .await
211 .map_err(|err| {
212 Box::new(StorageError(format!("gcs download failed: {err}")))
213 as Box<dyn std::error::Error + Send + Sync>
214 })?;
215 Ok(Some(StoredObject {
216 body: data,
217 version: object.generation.to_string(),
218 }))
219 })
220 }
221
222 fn write_object_conditional(
223 &self,
224 uri: &str,
225 expected_version: Option<&str>,
226 body: &[u8],
227 ) -> FloeResult<ConditionalWrite> {
228 let location = parse_gcs_uri(uri)?;
229 let client = self.client.clone();
230 let data = body.to_vec();
231 let generation = expected_version
232 .map(str::parse::<i64>)
233 .transpose()
234 .map_err(|err| Box::new(StorageError(format!("invalid gcs generation: {err}"))))?;
235 self.runtime.block_on(async move {
236 let upload_type = UploadType::Simple(Media::new(location.key.clone()));
237 let request = UploadObjectRequest {
238 bucket: location.bucket,
239 if_generation_match: Some(generation.unwrap_or(0)),
240 ..Default::default()
241 };
242 match client.upload_object(&request, data, &upload_type).await {
243 Ok(object) => Ok(ConditionalWrite::Written {
244 version: object.generation.to_string(),
245 }),
246 Err(err) if is_precondition(&err) => Ok(ConditionalWrite::Conflict),
247 Err(err) => Err(Box::new(StorageError(format!("gcs upload failed: {err}")))
248 as Box<dyn std::error::Error + Send + Sync>),
249 }
250 })
251 }
252
253 fn delete_object_conditional(
254 &self,
255 uri: &str,
256 expected_version: Option<&str>,
257 ) -> FloeResult<ConditionalWrite> {
258 let Some(expected_version) = expected_version else {
259 return Ok(ConditionalWrite::Written {
260 version: "deleted".to_string(),
261 });
262 };
263 let location = parse_gcs_uri(uri)?;
264 let client = self.client.clone();
265 let generation = expected_version
266 .parse::<i64>()
267 .map(Some)
268 .map_err(|err| Box::new(StorageError(format!("invalid gcs generation: {err}"))))?;
269 self.runtime.block_on(async move {
270 match client
271 .delete_object(&DeleteObjectRequest {
272 bucket: location.bucket,
273 object: location.key,
274 if_generation_match: generation,
275 ..Default::default()
276 })
277 .await
278 {
279 Ok(_) => Ok(ConditionalWrite::Written {
280 version: "deleted".to_string(),
281 }),
282 Err(err) if is_precondition(&err) => Ok(ConditionalWrite::Conflict),
283 Err(err) if is_not_found(&err) => Ok(ConditionalWrite::Written {
284 version: "deleted".to_string(),
285 }),
286 Err(err) => Err(Box::new(StorageError(format!("gcs delete failed: {err}")))
287 as Box<dyn std::error::Error + Send + Sync>),
288 }
289 })
290 }
291}
292
293fn is_not_found<E: std::fmt::Display>(err: &E) -> bool {
294 let text = err.to_string();
295 text.contains("404") || text.contains("NotFound")
296}
297
298fn is_precondition<E: std::fmt::Display>(err: &E) -> bool {
299 let text = err.to_string();
300 text.contains("412") || text.contains("condition") || text.contains("Precondition")
301}
302
303pub fn parse_gcs_uri(uri: &str) -> FloeResult<GcsLocation> {
304 parse_bucket_uri("gs", uri)
305}
306
307pub fn format_gcs_uri(bucket: &str, key: &str) -> String {
308 format_bucket_uri("gs", bucket, key)
309}
310
311pub type GcsLocation = BucketLocation;