Skip to main content

floe_core/io/storage/
gcs.rs

1use std::collections::hash_map::DefaultHasher;
2use std::hash::{Hash, Hasher};
3use std::path::{Path, PathBuf};
4
5use google_cloud_storage::client::{Client, ClientConfig};
6use google_cloud_storage::http::objects::delete::DeleteObjectRequest;
7use google_cloud_storage::http::objects::download::Range;
8use google_cloud_storage::http::objects::get::GetObjectRequest;
9use google_cloud_storage::http::objects::list::ListObjectsRequest;
10use google_cloud_storage::http::objects::upload::{Media, UploadObjectRequest, UploadType};
11use tokio::runtime::Runtime;
12
13use crate::errors::{RunError, StorageError};
14use crate::{ConfigError, FloeResult};
15
16use super::{planner, ObjectRef, StorageClient};
17
18pub struct GcsClient {
19    bucket: String,
20    client: Client,
21    runtime: Runtime,
22}
23
24impl GcsClient {
25    pub fn new(bucket: String) -> FloeResult<Self> {
26        let runtime = tokio::runtime::Builder::new_current_thread()
27            .enable_all()
28            .build()
29            .map_err(|err| Box::new(StorageError(format!("gcs runtime init failed: {err}"))))?;
30        let client = runtime.block_on(async {
31            let config = ClientConfig::default()
32                .with_auth()
33                .await
34                .map_err(|err| Box::new(StorageError(format!("gcs auth init failed: {err}"))))?;
35            Ok::<_, Box<dyn std::error::Error + Send + Sync>>(Client::new(config))
36        })?;
37        Ok(Self {
38            bucket,
39            client,
40            runtime,
41        })
42    }
43
44    fn bucket(&self) -> &str {
45        self.bucket.as_str()
46    }
47}
48
49impl StorageClient for GcsClient {
50    fn list(&self, prefix_or_path: &str) -> FloeResult<Vec<ObjectRef>> {
51        let bucket = self.bucket().to_string();
52        let prefix = prefix_or_path.trim_start_matches('/').to_string();
53        let client = self.client.clone();
54        self.runtime.block_on(async move {
55            let mut refs = Vec::new();
56            let mut page_token = None;
57            loop {
58                let request = ListObjectsRequest {
59                    bucket: bucket.clone(),
60                    prefix: if prefix.is_empty() {
61                        None
62                    } else {
63                        Some(prefix.clone())
64                    },
65                    page_token,
66                    ..Default::default()
67                };
68                let response = client.list_objects(&request).await.map_err(|err| {
69                    Box::new(StorageError(format!(
70                        "gcs list objects failed for bucket {}: {err}",
71                        bucket
72                    ))) as Box<dyn std::error::Error + Send + Sync>
73                })?;
74                if let Some(items) = response.items {
75                    for object in items {
76                        let key = object.name.clone();
77                        let uri = format_gcs_uri(&bucket, &key);
78                        refs.push(ObjectRef {
79                            uri,
80                            key,
81                            last_modified: object.updated.map(|value| value.to_string()),
82                            size: Some(object.size as u64),
83                        });
84                    }
85                }
86                match response.next_page_token {
87                    Some(token) if !token.is_empty() => {
88                        page_token = Some(token);
89                    }
90                    _ => break,
91                }
92            }
93            Ok(planner::stable_sort_refs(refs))
94        })
95    }
96
97    fn download_to_temp(&self, uri: &str, temp_dir: &Path) -> FloeResult<PathBuf> {
98        let location = parse_gcs_uri(uri)?;
99        let bucket = location.bucket;
100        let key = location.key;
101        let dest = temp_path_for_key(temp_dir, &key);
102        let dest_clone = dest.clone();
103        let client = self.client.clone();
104        self.runtime.block_on(async move {
105            let data = client
106                .download_object(
107                    &GetObjectRequest {
108                        bucket,
109                        object: key,
110                        ..Default::default()
111                    },
112                    &Range::default(),
113                )
114                .await
115                .map_err(|err| {
116                    Box::new(StorageError(format!("gcs download failed: {err}")))
117                        as Box<dyn std::error::Error + Send + Sync>
118                })?;
119            if let Some(parent) = dest_clone.parent() {
120                tokio::fs::create_dir_all(parent).await?;
121            }
122            tokio::fs::write(&dest_clone, data).await?;
123            Ok::<(), Box<dyn std::error::Error + Send + Sync>>(())
124        })?;
125        Ok(dest)
126    }
127
128    fn upload_from_path(&self, local_path: &Path, uri: &str) -> FloeResult<()> {
129        let location = parse_gcs_uri(uri)?;
130        let path = local_path.to_path_buf();
131        let client = self.client.clone();
132        self.runtime.block_on(async move {
133            let data = tokio::fs::read(path).await?;
134            let upload_type = UploadType::Simple(Media::new(location.key.clone()));
135            let request = UploadObjectRequest {
136                bucket: location.bucket,
137                ..Default::default()
138            };
139            client
140                .upload_object(&request, data, &upload_type)
141                .await
142                .map_err(|err| {
143                    Box::new(StorageError(format!("gcs upload failed: {err}")))
144                        as Box<dyn std::error::Error + Send + Sync>
145                })?;
146            Ok(())
147        })
148    }
149
150    fn resolve_uri(&self, path: &str) -> FloeResult<String> {
151        Ok(format_gcs_uri(self.bucket(), path.trim_start_matches('/')))
152    }
153
154    fn delete(&self, uri: &str) -> FloeResult<()> {
155        let location = parse_gcs_uri(uri)?;
156        let client = self.client.clone();
157        self.runtime.block_on(async move {
158            client
159                .delete_object(&DeleteObjectRequest {
160                    bucket: location.bucket,
161                    object: location.key,
162                    ..Default::default()
163                })
164                .await
165                .map_err(|err| {
166                    Box::new(StorageError(format!("gcs delete failed: {err}")))
167                        as Box<dyn std::error::Error + Send + Sync>
168                })?;
169            Ok(())
170        })
171    }
172}
173
174#[derive(Debug, Clone, PartialEq, Eq)]
175pub struct GcsLocation {
176    pub bucket: String,
177    pub key: String,
178}
179
180pub fn parse_gcs_uri(uri: &str) -> FloeResult<GcsLocation> {
181    let stripped = uri.strip_prefix("gs://").ok_or_else(|| {
182        Box::new(ConfigError(format!("expected gcs uri, got {}", uri)))
183            as Box<dyn std::error::Error + Send + Sync>
184    })?;
185    let mut parts = stripped.splitn(2, '/');
186    let bucket = parts.next().unwrap_or("").to_string();
187    if bucket.is_empty() {
188        return Err(Box::new(ConfigError(format!(
189            "missing bucket in gcs uri: {}",
190            uri
191        ))));
192    }
193    let key = parts.next().unwrap_or("").to_string();
194    Ok(GcsLocation { bucket, key })
195}
196
197pub fn format_gcs_uri(bucket: &str, key: &str) -> String {
198    if key.is_empty() {
199        format!("gs://{}", bucket)
200    } else {
201        format!("gs://{}/{}", bucket, key)
202    }
203}
204
205pub fn temp_path_for_key(temp_dir: &Path, key: &str) -> PathBuf {
206    let mut hasher = DefaultHasher::new();
207    key.hash(&mut hasher);
208    let hash = hasher.finish();
209    let name = super::s3::file_name_from_key(key).unwrap_or_else(|| "object".to_string());
210    let sanitized = sanitize_filename(&name);
211    temp_dir.join(format!("{hash:016x}_{sanitized}"))
212}
213
214fn sanitize_filename(name: &str) -> String {
215    name.chars()
216        .map(|ch| {
217            if ch.is_ascii_alphanumeric() || matches!(ch, '.' | '-' | '_') {
218                ch
219            } else {
220                '_'
221            }
222        })
223        .collect()
224}
225
226pub fn build_input_files(
227    client: &dyn StorageClient,
228    bucket: &str,
229    prefix: &str,
230    adapter: &dyn crate::io::format::InputAdapter,
231    temp_dir: &Path,
232    entity: &crate::config::EntityConfig,
233    storage: &str,
234) -> FloeResult<Vec<crate::io::format::InputFile>> {
235    let suffixes = adapter.suffixes()?;
236    let list_refs = client.list(prefix)?;
237    let filtered = planner::filter_by_suffixes(list_refs, &suffixes);
238    let filtered = planner::stable_sort_refs(filtered);
239    if filtered.is_empty() {
240        return Err(Box::new(RunError(format!(
241            "entity.name={} source.storage={} no input objects matched (bucket={}, prefix={}, suffixes={})",
242            entity.name,
243            storage,
244            bucket,
245            prefix,
246            suffixes.join(",")
247        ))));
248    }
249    let mut inputs = Vec::with_capacity(filtered.len());
250    for object in filtered {
251        let local_path = client.download_to_temp(&object.uri, temp_dir)?;
252        let source_name =
253            super::s3::file_name_from_key(&object.key).unwrap_or_else(|| entity.name.clone());
254        let source_stem =
255            super::s3::file_stem_from_name(&source_name).unwrap_or_else(|| entity.name.clone());
256        let source_uri = object.uri;
257        inputs.push(crate::io::format::InputFile {
258            source_uri,
259            source_local_path: local_path,
260            source_name,
261            source_stem,
262        });
263    }
264    Ok(inputs)
265}