Skip to main content

floe_core/io/storage/
gcs.rs

1use std::collections::hash_map::DefaultHasher;
2use std::hash::{Hash, Hasher};
3use std::path::{Path, PathBuf};
4
5use google_cloud_storage::client::{Client, ClientConfig};
6use google_cloud_storage::http::objects::delete::DeleteObjectRequest;
7use google_cloud_storage::http::objects::download::Range;
8use google_cloud_storage::http::objects::get::GetObjectRequest;
9use google_cloud_storage::http::objects::list::ListObjectsRequest;
10use google_cloud_storage::http::objects::upload::{Media, UploadObjectRequest, UploadType};
11use tokio::runtime::Runtime;
12
13use crate::errors::{RunError, StorageError};
14use crate::{ConfigError, FloeResult};
15
16use super::{planner, ObjectRef, StorageClient};
17
18pub struct GcsClient {
19    bucket: String,
20    client: Client,
21    runtime: Runtime,
22}
23
24impl GcsClient {
25    pub fn new(bucket: String) -> FloeResult<Self> {
26        let runtime = tokio::runtime::Builder::new_current_thread()
27            .enable_all()
28            .build()
29            .map_err(|err| Box::new(StorageError(format!("gcs runtime init failed: {err}"))))?;
30        let client = runtime.block_on(async {
31            let config = ClientConfig::default()
32                .with_auth()
33                .await
34                .map_err(|err| Box::new(StorageError(format!("gcs auth init failed: {err}"))))?;
35            Ok::<_, Box<dyn std::error::Error + Send + Sync>>(Client::new(config))
36        })?;
37        Ok(Self {
38            bucket,
39            client,
40            runtime,
41        })
42    }
43
44    fn bucket(&self) -> &str {
45        self.bucket.as_str()
46    }
47}
48
49impl StorageClient for GcsClient {
50    fn list(&self, prefix_or_path: &str) -> FloeResult<Vec<ObjectRef>> {
51        let bucket = self.bucket().to_string();
52        let prefix = prefix_or_path.trim_start_matches('/').to_string();
53        let client = self.client.clone();
54        self.runtime.block_on(async move {
55            let mut refs = Vec::new();
56            let mut page_token = None;
57            loop {
58                let request = ListObjectsRequest {
59                    bucket: bucket.clone(),
60                    prefix: if prefix.is_empty() {
61                        None
62                    } else {
63                        Some(prefix.clone())
64                    },
65                    page_token,
66                    ..Default::default()
67                };
68                let response = client.list_objects(&request).await.map_err(|err| {
69                    Box::new(StorageError(format!(
70                        "gcs list objects failed for bucket {}: {err}",
71                        bucket
72                    ))) as Box<dyn std::error::Error + Send + Sync>
73                })?;
74                if let Some(items) = response.items {
75                    for object in items {
76                        let key = object.name.clone();
77                        let uri = format_gcs_uri(&bucket, &key);
78                        refs.push(ObjectRef {
79                            uri,
80                            key,
81                            last_modified: object.updated.map(|value| value.to_string()),
82                            size: Some(object.size as u64),
83                        });
84                    }
85                }
86                match response.next_page_token {
87                    Some(token) if !token.is_empty() => {
88                        page_token = Some(token);
89                    }
90                    _ => break,
91                }
92            }
93            Ok(planner::stable_sort_refs(refs))
94        })
95    }
96
97    fn download_to_temp(&self, uri: &str, temp_dir: &Path) -> FloeResult<PathBuf> {
98        let location = parse_gcs_uri(uri)?;
99        let bucket = location.bucket;
100        let key = location.key;
101        let dest = temp_path_for_key(temp_dir, &key);
102        let dest_clone = dest.clone();
103        let client = self.client.clone();
104        self.runtime.block_on(async move {
105            let data = client
106                .download_object(
107                    &GetObjectRequest {
108                        bucket,
109                        object: key,
110                        ..Default::default()
111                    },
112                    &Range::default(),
113                )
114                .await
115                .map_err(|err| {
116                    Box::new(StorageError(format!("gcs download failed: {err}")))
117                        as Box<dyn std::error::Error + Send + Sync>
118                })?;
119            if let Some(parent) = dest_clone.parent() {
120                tokio::fs::create_dir_all(parent).await?;
121            }
122            tokio::fs::write(&dest_clone, data).await?;
123            Ok::<(), Box<dyn std::error::Error + Send + Sync>>(())
124        })?;
125        Ok(dest)
126    }
127
128    fn upload_from_path(&self, local_path: &Path, uri: &str) -> FloeResult<()> {
129        let location = parse_gcs_uri(uri)?;
130        let path = local_path.to_path_buf();
131        let client = self.client.clone();
132        self.runtime.block_on(async move {
133            let data = tokio::fs::read(path).await?;
134            let upload_type = UploadType::Simple(Media::new(location.key.clone()));
135            let request = UploadObjectRequest {
136                bucket: location.bucket,
137                ..Default::default()
138            };
139            client
140                .upload_object(&request, data, &upload_type)
141                .await
142                .map_err(|err| {
143                    Box::new(StorageError(format!("gcs upload failed: {err}")))
144                        as Box<dyn std::error::Error + Send + Sync>
145                })?;
146            Ok(())
147        })
148    }
149
150    fn resolve_uri(&self, path: &str) -> FloeResult<String> {
151        Ok(format_gcs_uri(self.bucket(), path.trim_start_matches('/')))
152    }
153
154    fn copy_object(&self, src_uri: &str, dst_uri: &str) -> FloeResult<()> {
155        let temp_dir = tempfile::TempDir::new().map_err(|err| {
156            Box::new(StorageError(format!("gcs tempdir failed: {err}")))
157                as Box<dyn std::error::Error + Send + Sync>
158        })?;
159        let temp_path = self.download_to_temp(src_uri, temp_dir.path())?;
160        self.upload_from_path(&temp_path, dst_uri)?;
161        Ok(())
162    }
163
164    fn delete_object(&self, uri: &str) -> FloeResult<()> {
165        let location = parse_gcs_uri(uri)?;
166        let client = self.client.clone();
167        self.runtime.block_on(async move {
168            client
169                .delete_object(&DeleteObjectRequest {
170                    bucket: location.bucket,
171                    object: location.key,
172                    ..Default::default()
173                })
174                .await
175                .map_err(|err| {
176                    Box::new(StorageError(format!("gcs delete failed: {err}")))
177                        as Box<dyn std::error::Error + Send + Sync>
178                })?;
179            Ok(())
180        })
181    }
182
183    fn exists(&self, uri: &str) -> FloeResult<bool> {
184        let location = parse_gcs_uri(uri)?;
185        if location.key.is_empty() {
186            return Ok(false);
187        }
188        let refs = self.list(&location.key)?;
189        Ok(refs.iter().any(|object| object.key == location.key))
190    }
191}
192
193#[derive(Debug, Clone, PartialEq, Eq)]
194pub struct GcsLocation {
195    pub bucket: String,
196    pub key: String,
197}
198
199pub fn parse_gcs_uri(uri: &str) -> FloeResult<GcsLocation> {
200    let stripped = uri.strip_prefix("gs://").ok_or_else(|| {
201        Box::new(ConfigError(format!("expected gcs uri, got {}", uri)))
202            as Box<dyn std::error::Error + Send + Sync>
203    })?;
204    let mut parts = stripped.splitn(2, '/');
205    let bucket = parts.next().unwrap_or("").to_string();
206    if bucket.is_empty() {
207        return Err(Box::new(ConfigError(format!(
208            "missing bucket in gcs uri: {}",
209            uri
210        ))));
211    }
212    let key = parts.next().unwrap_or("").to_string();
213    Ok(GcsLocation { bucket, key })
214}
215
216pub fn format_gcs_uri(bucket: &str, key: &str) -> String {
217    if key.is_empty() {
218        format!("gs://{}", bucket)
219    } else {
220        format!("gs://{}/{}", bucket, key)
221    }
222}
223
224pub fn temp_path_for_key(temp_dir: &Path, key: &str) -> PathBuf {
225    let mut hasher = DefaultHasher::new();
226    key.hash(&mut hasher);
227    let hash = hasher.finish();
228    let name = super::s3::file_name_from_key(key).unwrap_or_else(|| "object".to_string());
229    let sanitized = sanitize_filename(&name);
230    temp_dir.join(format!("{hash:016x}_{sanitized}"))
231}
232
233fn sanitize_filename(name: &str) -> String {
234    name.chars()
235        .map(|ch| {
236            if ch.is_ascii_alphanumeric() || matches!(ch, '.' | '-' | '_') {
237                ch
238            } else {
239                '_'
240            }
241        })
242        .collect()
243}
244
245pub fn build_input_files(
246    client: &dyn StorageClient,
247    bucket: &str,
248    prefix: &str,
249    adapter: &dyn crate::io::format::InputAdapter,
250    temp_dir: &Path,
251    entity: &crate::config::EntityConfig,
252    storage: &str,
253) -> FloeResult<Vec<crate::io::format::InputFile>> {
254    let suffixes = adapter.suffixes()?;
255    let list_refs = client.list(prefix)?;
256    let filtered = planner::filter_by_suffixes(list_refs, &suffixes);
257    let filtered = planner::stable_sort_refs(filtered);
258    if filtered.is_empty() {
259        return Err(Box::new(RunError(format!(
260            "entity.name={} source.storage={} no input objects matched (bucket={}, prefix={}, suffixes={})",
261            entity.name,
262            storage,
263            bucket,
264            prefix,
265            suffixes.join(",")
266        ))));
267    }
268    let mut inputs = Vec::with_capacity(filtered.len());
269    for object in filtered {
270        let local_path = client.download_to_temp(&object.uri, temp_dir)?;
271        let source_name =
272            super::s3::file_name_from_key(&object.key).unwrap_or_else(|| entity.name.clone());
273        let source_stem =
274            super::s3::file_stem_from_name(&source_name).unwrap_or_else(|| entity.name.clone());
275        let source_uri = object.uri;
276        inputs.push(crate::io::format::InputFile {
277            source_uri,
278            source_local_path: local_path,
279            source_name,
280            source_stem,
281        });
282    }
283    Ok(inputs)
284}