floe_core/io/storage/
gcs.rs1use std::collections::hash_map::DefaultHasher;
2use std::hash::{Hash, Hasher};
3use std::path::{Path, PathBuf};
4
5use google_cloud_storage::client::{Client, ClientConfig};
6use google_cloud_storage::http::objects::delete::DeleteObjectRequest;
7use google_cloud_storage::http::objects::download::Range;
8use google_cloud_storage::http::objects::get::GetObjectRequest;
9use google_cloud_storage::http::objects::list::ListObjectsRequest;
10use google_cloud_storage::http::objects::upload::{Media, UploadObjectRequest, UploadType};
11use tokio::runtime::Runtime;
12
13use crate::errors::{RunError, StorageError};
14use crate::{ConfigError, FloeResult};
15
16use super::{planner, ObjectRef, StorageClient};
17
18pub struct GcsClient {
19 bucket: String,
20 client: Client,
21 runtime: Runtime,
22}
23
24impl GcsClient {
25 pub fn new(bucket: String) -> FloeResult<Self> {
26 let runtime = tokio::runtime::Builder::new_current_thread()
27 .enable_all()
28 .build()
29 .map_err(|err| Box::new(StorageError(format!("gcs runtime init failed: {err}"))))?;
30 let client = runtime.block_on(async {
31 let config = ClientConfig::default()
32 .with_auth()
33 .await
34 .map_err(|err| Box::new(StorageError(format!("gcs auth init failed: {err}"))))?;
35 Ok::<_, Box<dyn std::error::Error + Send + Sync>>(Client::new(config))
36 })?;
37 Ok(Self {
38 bucket,
39 client,
40 runtime,
41 })
42 }
43
44 fn bucket(&self) -> &str {
45 self.bucket.as_str()
46 }
47}
48
49impl StorageClient for GcsClient {
50 fn list(&self, prefix_or_path: &str) -> FloeResult<Vec<ObjectRef>> {
51 let bucket = self.bucket().to_string();
52 let prefix = prefix_or_path.trim_start_matches('/').to_string();
53 let client = self.client.clone();
54 self.runtime.block_on(async move {
55 let mut refs = Vec::new();
56 let mut page_token = None;
57 loop {
58 let request = ListObjectsRequest {
59 bucket: bucket.clone(),
60 prefix: if prefix.is_empty() {
61 None
62 } else {
63 Some(prefix.clone())
64 },
65 page_token,
66 ..Default::default()
67 };
68 let response = client.list_objects(&request).await.map_err(|err| {
69 Box::new(StorageError(format!(
70 "gcs list objects failed for bucket {}: {err}",
71 bucket
72 ))) as Box<dyn std::error::Error + Send + Sync>
73 })?;
74 if let Some(items) = response.items {
75 for object in items {
76 let key = object.name.clone();
77 let uri = format_gcs_uri(&bucket, &key);
78 refs.push(ObjectRef {
79 uri,
80 key,
81 last_modified: object.updated.map(|value| value.to_string()),
82 size: Some(object.size as u64),
83 });
84 }
85 }
86 match response.next_page_token {
87 Some(token) if !token.is_empty() => {
88 page_token = Some(token);
89 }
90 _ => break,
91 }
92 }
93 Ok(planner::stable_sort_refs(refs))
94 })
95 }
96
97 fn download_to_temp(&self, uri: &str, temp_dir: &Path) -> FloeResult<PathBuf> {
98 let location = parse_gcs_uri(uri)?;
99 let bucket = location.bucket;
100 let key = location.key;
101 let dest = temp_path_for_key(temp_dir, &key);
102 let dest_clone = dest.clone();
103 let client = self.client.clone();
104 self.runtime.block_on(async move {
105 let data = client
106 .download_object(
107 &GetObjectRequest {
108 bucket,
109 object: key,
110 ..Default::default()
111 },
112 &Range::default(),
113 )
114 .await
115 .map_err(|err| {
116 Box::new(StorageError(format!("gcs download failed: {err}")))
117 as Box<dyn std::error::Error + Send + Sync>
118 })?;
119 if let Some(parent) = dest_clone.parent() {
120 tokio::fs::create_dir_all(parent).await?;
121 }
122 tokio::fs::write(&dest_clone, data).await?;
123 Ok::<(), Box<dyn std::error::Error + Send + Sync>>(())
124 })?;
125 Ok(dest)
126 }
127
128 fn upload_from_path(&self, local_path: &Path, uri: &str) -> FloeResult<()> {
129 let location = parse_gcs_uri(uri)?;
130 let path = local_path.to_path_buf();
131 let client = self.client.clone();
132 self.runtime.block_on(async move {
133 let data = tokio::fs::read(path).await?;
134 let upload_type = UploadType::Simple(Media::new(location.key.clone()));
135 let request = UploadObjectRequest {
136 bucket: location.bucket,
137 ..Default::default()
138 };
139 client
140 .upload_object(&request, data, &upload_type)
141 .await
142 .map_err(|err| {
143 Box::new(StorageError(format!("gcs upload failed: {err}")))
144 as Box<dyn std::error::Error + Send + Sync>
145 })?;
146 Ok(())
147 })
148 }
149
150 fn resolve_uri(&self, path: &str) -> FloeResult<String> {
151 Ok(format_gcs_uri(self.bucket(), path.trim_start_matches('/')))
152 }
153
154 fn delete(&self, uri: &str) -> FloeResult<()> {
155 let location = parse_gcs_uri(uri)?;
156 let client = self.client.clone();
157 self.runtime.block_on(async move {
158 client
159 .delete_object(&DeleteObjectRequest {
160 bucket: location.bucket,
161 object: location.key,
162 ..Default::default()
163 })
164 .await
165 .map_err(|err| {
166 Box::new(StorageError(format!("gcs delete failed: {err}")))
167 as Box<dyn std::error::Error + Send + Sync>
168 })?;
169 Ok(())
170 })
171 }
172}
173
174#[derive(Debug, Clone, PartialEq, Eq)]
175pub struct GcsLocation {
176 pub bucket: String,
177 pub key: String,
178}
179
180pub fn parse_gcs_uri(uri: &str) -> FloeResult<GcsLocation> {
181 let stripped = uri.strip_prefix("gs://").ok_or_else(|| {
182 Box::new(ConfigError(format!("expected gcs uri, got {}", uri)))
183 as Box<dyn std::error::Error + Send + Sync>
184 })?;
185 let mut parts = stripped.splitn(2, '/');
186 let bucket = parts.next().unwrap_or("").to_string();
187 if bucket.is_empty() {
188 return Err(Box::new(ConfigError(format!(
189 "missing bucket in gcs uri: {}",
190 uri
191 ))));
192 }
193 let key = parts.next().unwrap_or("").to_string();
194 Ok(GcsLocation { bucket, key })
195}
196
197pub fn format_gcs_uri(bucket: &str, key: &str) -> String {
198 if key.is_empty() {
199 format!("gs://{}", bucket)
200 } else {
201 format!("gs://{}/{}", bucket, key)
202 }
203}
204
205pub fn temp_path_for_key(temp_dir: &Path, key: &str) -> PathBuf {
206 let mut hasher = DefaultHasher::new();
207 key.hash(&mut hasher);
208 let hash = hasher.finish();
209 let name = super::s3::file_name_from_key(key).unwrap_or_else(|| "object".to_string());
210 let sanitized = sanitize_filename(&name);
211 temp_dir.join(format!("{hash:016x}_{sanitized}"))
212}
213
214fn sanitize_filename(name: &str) -> String {
215 name.chars()
216 .map(|ch| {
217 if ch.is_ascii_alphanumeric() || matches!(ch, '.' | '-' | '_') {
218 ch
219 } else {
220 '_'
221 }
222 })
223 .collect()
224}
225
226pub fn build_input_files(
227 client: &dyn StorageClient,
228 bucket: &str,
229 prefix: &str,
230 adapter: &dyn crate::io::format::InputAdapter,
231 temp_dir: &Path,
232 entity: &crate::config::EntityConfig,
233 storage: &str,
234) -> FloeResult<Vec<crate::io::format::InputFile>> {
235 let suffixes = adapter.suffixes()?;
236 let list_refs = client.list(prefix)?;
237 let filtered = planner::filter_by_suffixes(list_refs, &suffixes);
238 let filtered = planner::stable_sort_refs(filtered);
239 if filtered.is_empty() {
240 return Err(Box::new(RunError(format!(
241 "entity.name={} source.storage={} no input objects matched (bucket={}, prefix={}, suffixes={})",
242 entity.name,
243 storage,
244 bucket,
245 prefix,
246 suffixes.join(",")
247 ))));
248 }
249 let mut inputs = Vec::with_capacity(filtered.len());
250 for object in filtered {
251 let local_path = client.download_to_temp(&object.uri, temp_dir)?;
252 let source_name =
253 super::s3::file_name_from_key(&object.key).unwrap_or_else(|| entity.name.clone());
254 let source_stem =
255 super::s3::file_stem_from_name(&source_name).unwrap_or_else(|| entity.name.clone());
256 let source_uri = object.uri;
257 inputs.push(crate::io::format::InputFile {
258 source_uri,
259 source_local_path: local_path,
260 source_name,
261 source_stem,
262 });
263 }
264 Ok(inputs)
265}