1use std::collections::hash_map::DefaultHasher;
2use std::hash::{Hash, Hasher};
3use std::path::{Path, PathBuf};
4
5use google_cloud_storage::client::{Client, ClientConfig};
6use google_cloud_storage::http::objects::delete::DeleteObjectRequest;
7use google_cloud_storage::http::objects::download::Range;
8use google_cloud_storage::http::objects::get::GetObjectRequest;
9use google_cloud_storage::http::objects::list::ListObjectsRequest;
10use google_cloud_storage::http::objects::upload::{Media, UploadObjectRequest, UploadType};
11use tokio::runtime::Runtime;
12
13use crate::errors::{RunError, StorageError};
14use crate::{ConfigError, FloeResult};
15
16use super::{planner, ObjectRef, StorageClient};
17
18pub struct GcsClient {
19 bucket: String,
20 client: Client,
21 runtime: Runtime,
22}
23
24impl GcsClient {
25 pub fn new(bucket: String) -> FloeResult<Self> {
26 let runtime = tokio::runtime::Builder::new_current_thread()
27 .enable_all()
28 .build()
29 .map_err(|err| Box::new(StorageError(format!("gcs runtime init failed: {err}"))))?;
30 let client = runtime.block_on(async {
31 let config = ClientConfig::default()
32 .with_auth()
33 .await
34 .map_err(|err| Box::new(StorageError(format!("gcs auth init failed: {err}"))))?;
35 Ok::<_, Box<dyn std::error::Error + Send + Sync>>(Client::new(config))
36 })?;
37 Ok(Self {
38 bucket,
39 client,
40 runtime,
41 })
42 }
43
44 fn bucket(&self) -> &str {
45 self.bucket.as_str()
46 }
47}
48
49impl StorageClient for GcsClient {
50 fn list(&self, prefix_or_path: &str) -> FloeResult<Vec<ObjectRef>> {
51 let bucket = self.bucket().to_string();
52 let prefix = prefix_or_path.trim_start_matches('/').to_string();
53 let client = self.client.clone();
54 self.runtime.block_on(async move {
55 let mut refs = Vec::new();
56 let mut page_token = None;
57 loop {
58 let request = ListObjectsRequest {
59 bucket: bucket.clone(),
60 prefix: if prefix.is_empty() {
61 None
62 } else {
63 Some(prefix.clone())
64 },
65 page_token,
66 ..Default::default()
67 };
68 let response = client.list_objects(&request).await.map_err(|err| {
69 Box::new(StorageError(format!(
70 "gcs list objects failed for bucket {}: {err}",
71 bucket
72 ))) as Box<dyn std::error::Error + Send + Sync>
73 })?;
74 if let Some(items) = response.items {
75 for object in items {
76 let key = object.name.clone();
77 let uri = format_gcs_uri(&bucket, &key);
78 refs.push(ObjectRef {
79 uri,
80 key,
81 last_modified: object.updated.map(|value| value.to_string()),
82 size: Some(object.size as u64),
83 });
84 }
85 }
86 match response.next_page_token {
87 Some(token) if !token.is_empty() => {
88 page_token = Some(token);
89 }
90 _ => break,
91 }
92 }
93 Ok(planner::stable_sort_refs(refs))
94 })
95 }
96
97 fn download_to_temp(&self, uri: &str, temp_dir: &Path) -> FloeResult<PathBuf> {
98 let location = parse_gcs_uri(uri)?;
99 let bucket = location.bucket;
100 let key = location.key;
101 let dest = temp_path_for_key(temp_dir, &key);
102 let dest_clone = dest.clone();
103 let client = self.client.clone();
104 self.runtime.block_on(async move {
105 let data = client
106 .download_object(
107 &GetObjectRequest {
108 bucket,
109 object: key,
110 ..Default::default()
111 },
112 &Range::default(),
113 )
114 .await
115 .map_err(|err| {
116 Box::new(StorageError(format!("gcs download failed: {err}")))
117 as Box<dyn std::error::Error + Send + Sync>
118 })?;
119 if let Some(parent) = dest_clone.parent() {
120 tokio::fs::create_dir_all(parent).await?;
121 }
122 tokio::fs::write(&dest_clone, data).await?;
123 Ok::<(), Box<dyn std::error::Error + Send + Sync>>(())
124 })?;
125 Ok(dest)
126 }
127
128 fn upload_from_path(&self, local_path: &Path, uri: &str) -> FloeResult<()> {
129 let location = parse_gcs_uri(uri)?;
130 let path = local_path.to_path_buf();
131 let client = self.client.clone();
132 self.runtime.block_on(async move {
133 let data = tokio::fs::read(path).await?;
134 let upload_type = UploadType::Simple(Media::new(location.key.clone()));
135 let request = UploadObjectRequest {
136 bucket: location.bucket,
137 ..Default::default()
138 };
139 client
140 .upload_object(&request, data, &upload_type)
141 .await
142 .map_err(|err| {
143 Box::new(StorageError(format!("gcs upload failed: {err}")))
144 as Box<dyn std::error::Error + Send + Sync>
145 })?;
146 Ok(())
147 })
148 }
149
150 fn resolve_uri(&self, path: &str) -> FloeResult<String> {
151 Ok(format_gcs_uri(self.bucket(), path.trim_start_matches('/')))
152 }
153
154 fn copy_object(&self, src_uri: &str, dst_uri: &str) -> FloeResult<()> {
155 let temp_dir = tempfile::TempDir::new().map_err(|err| {
156 Box::new(StorageError(format!("gcs tempdir failed: {err}")))
157 as Box<dyn std::error::Error + Send + Sync>
158 })?;
159 let temp_path = self.download_to_temp(src_uri, temp_dir.path())?;
160 self.upload_from_path(&temp_path, dst_uri)?;
161 Ok(())
162 }
163
164 fn delete_object(&self, uri: &str) -> FloeResult<()> {
165 let location = parse_gcs_uri(uri)?;
166 let client = self.client.clone();
167 self.runtime.block_on(async move {
168 client
169 .delete_object(&DeleteObjectRequest {
170 bucket: location.bucket,
171 object: location.key,
172 ..Default::default()
173 })
174 .await
175 .map_err(|err| {
176 Box::new(StorageError(format!("gcs delete failed: {err}")))
177 as Box<dyn std::error::Error + Send + Sync>
178 })?;
179 Ok(())
180 })
181 }
182
183 fn exists(&self, uri: &str) -> FloeResult<bool> {
184 let location = parse_gcs_uri(uri)?;
185 if location.key.is_empty() {
186 return Ok(false);
187 }
188 let refs = self.list(&location.key)?;
189 Ok(refs.iter().any(|object| object.key == location.key))
190 }
191}
192
193#[derive(Debug, Clone, PartialEq, Eq)]
194pub struct GcsLocation {
195 pub bucket: String,
196 pub key: String,
197}
198
199pub fn parse_gcs_uri(uri: &str) -> FloeResult<GcsLocation> {
200 let stripped = uri.strip_prefix("gs://").ok_or_else(|| {
201 Box::new(ConfigError(format!("expected gcs uri, got {}", uri)))
202 as Box<dyn std::error::Error + Send + Sync>
203 })?;
204 let mut parts = stripped.splitn(2, '/');
205 let bucket = parts.next().unwrap_or("").to_string();
206 if bucket.is_empty() {
207 return Err(Box::new(ConfigError(format!(
208 "missing bucket in gcs uri: {}",
209 uri
210 ))));
211 }
212 let key = parts.next().unwrap_or("").to_string();
213 Ok(GcsLocation { bucket, key })
214}
215
216pub fn format_gcs_uri(bucket: &str, key: &str) -> String {
217 if key.is_empty() {
218 format!("gs://{}", bucket)
219 } else {
220 format!("gs://{}/{}", bucket, key)
221 }
222}
223
224pub fn temp_path_for_key(temp_dir: &Path, key: &str) -> PathBuf {
225 let mut hasher = DefaultHasher::new();
226 key.hash(&mut hasher);
227 let hash = hasher.finish();
228 let name = super::s3::file_name_from_key(key).unwrap_or_else(|| "object".to_string());
229 let sanitized = sanitize_filename(&name);
230 temp_dir.join(format!("{hash:016x}_{sanitized}"))
231}
232
233fn sanitize_filename(name: &str) -> String {
234 name.chars()
235 .map(|ch| {
236 if ch.is_ascii_alphanumeric() || matches!(ch, '.' | '-' | '_') {
237 ch
238 } else {
239 '_'
240 }
241 })
242 .collect()
243}
244
245pub fn build_input_files(
246 client: &dyn StorageClient,
247 bucket: &str,
248 prefix: &str,
249 adapter: &dyn crate::io::format::InputAdapter,
250 temp_dir: &Path,
251 entity: &crate::config::EntityConfig,
252 storage: &str,
253) -> FloeResult<Vec<crate::io::format::InputFile>> {
254 let suffixes = adapter.suffixes()?;
255 let list_refs = client.list(prefix)?;
256 let filtered = planner::filter_by_suffixes(list_refs, &suffixes);
257 let filtered = planner::stable_sort_refs(filtered);
258 if filtered.is_empty() {
259 return Err(Box::new(RunError(format!(
260 "entity.name={} source.storage={} no input objects matched (bucket={}, prefix={}, suffixes={})",
261 entity.name,
262 storage,
263 bucket,
264 prefix,
265 suffixes.join(",")
266 ))));
267 }
268 let mut inputs = Vec::with_capacity(filtered.len());
269 for object in filtered {
270 let local_path = client.download_to_temp(&object.uri, temp_dir)?;
271 let source_name =
272 super::s3::file_name_from_key(&object.key).unwrap_or_else(|| entity.name.clone());
273 let source_stem =
274 super::s3::file_stem_from_name(&source_name).unwrap_or_else(|| entity.name.clone());
275 let source_uri = object.uri;
276 inputs.push(crate::io::format::InputFile {
277 source_uri,
278 source_local_path: local_path,
279 source_name,
280 source_stem,
281 });
282 }
283 Ok(inputs)
284}