1use crate::config::{GcsFileFormat, GcsSourceConfig};
4use async_trait::async_trait;
5use faucet_common_gcs::{build_storage, build_storage_control};
6use faucet_core::{FaucetError, Stream, StreamPage};
7use futures::stream::{self, StreamExt, TryStreamExt};
8use google_cloud_gax::paginator::ItemPaginator;
9use google_cloud_storage::client::{Storage, StorageControl};
10use serde_json::Value;
11use std::pin::Pin;
12use tokio::io::AsyncBufReadExt;
13
14pub struct GcsSource {
16 config: GcsSourceConfig,
17 storage: Storage,
18 control: StorageControl,
19}
20
21impl GcsSource {
22 pub async fn new(config: GcsSourceConfig) -> Result<Self, FaucetError> {
25 let storage = build_storage(&config.auth, config.storage_host.as_deref()).await?;
26 let control = build_storage_control(&config.auth, config.storage_host.as_deref()).await?;
27 Ok(Self {
28 config,
29 storage,
30 control,
31 })
32 }
33
34 fn bucket_path(&self) -> String {
36 format!("projects/_/buckets/{}", self.config.bucket)
37 }
38
39 async fn list_object_names(
42 &self,
43 prefix_override: Option<&str>,
44 ) -> Result<Vec<String>, FaucetError> {
45 if let Some(ref keys) = self.config.object_keys {
46 return Ok(cap_keys(keys.clone(), self.config.max_objects));
47 }
48
49 let effective_prefix = prefix_override.or(self.config.prefix.as_deref());
50 let mut req = self.control.list_objects().set_parent(self.bucket_path());
51 if let Some(p) = effective_prefix {
52 req = req.set_prefix(p.to_string());
53 }
54 req = req.set_page_size(1000_i32);
55
56 let mut paginator = req.by_item();
57 let mut names: Vec<String> = Vec::new();
58 while let Some(item) = paginator.next().await {
59 let object = item.map_err(|e| {
60 FaucetError::Source(format!(
61 "GCS list error for bucket '{}': {e}",
62 self.config.bucket
63 ))
64 })?;
65 if object.name.is_empty() {
66 continue;
67 }
68 names.push(object.name);
69 if let Some(max) = self.config.max_objects
70 && names.len() >= max
71 {
72 break;
73 }
74 }
75 Ok(names)
76 }
77
78 async fn read_object_text(&self, key: &str) -> Result<String, FaucetError> {
80 use tokio::io::AsyncReadExt as _;
86 let mut reader = self.open_object_reader(key).await?;
87 let mut text = String::new();
88 reader.read_to_string(&mut text).await.map_err(|e| {
89 FaucetError::Source(format!(
90 "GCS read/decode error for key '{key}' (not valid UTF-8?): {e}"
91 ))
92 })?;
93 Ok(text)
94 }
95
96 async fn open_object_reader(
101 &self,
102 key: &str,
103 ) -> Result<std::pin::Pin<Box<dyn tokio::io::AsyncBufRead + Send + Unpin>>, FaucetError> {
104 let resp = self
105 .storage
106 .read_object(self.bucket_path(), key.to_string())
107 .send()
108 .await
109 .map_err(|e| {
110 FaucetError::Source(format!(
111 "GCS get error for bucket '{}' key '{key}': {e}",
112 self.config.bucket
113 ))
114 })?;
115 let bytes_stream = resp
116 .into_stream()
117 .map_err(|e| std::io::Error::other(e.to_string()));
118 let buffered = tokio::io::BufReader::new(tokio_util::io::StreamReader::new(bytes_stream));
119 #[cfg(feature = "compression")]
120 {
121 let codec = self.config.compression.resolve(key);
122 faucet_core::compression::warn_mismatch(key, codec);
123 Ok(faucet_core::compression::wrap_async_reader(buffered, codec))
124 }
125 #[cfg(not(feature = "compression"))]
126 {
127 Ok(Box::pin(buffered))
128 }
129 }
130
131 fn parse_content(&self, key: &str, text: &str) -> Result<Vec<Value>, FaucetError> {
133 match self.config.file_format {
134 GcsFileFormat::JsonLines => {
135 let mut records = Vec::new();
136 for (line_num, line) in text.lines().enumerate() {
137 let trimmed = line.trim();
138 if trimmed.is_empty() {
139 continue;
140 }
141 let value: Value = serde_json::from_str(trimmed).map_err(|e| {
142 FaucetError::Source(format!(
143 "GCS JSON parse error in '{key}' at line {}: {e}",
144 line_num + 1
145 ))
146 })?;
147 records.push(value);
148 }
149 Ok(records)
150 }
151 GcsFileFormat::JsonArray => {
152 let value: Value = serde_json::from_str(text).map_err(|e| {
153 FaucetError::Source(format!("GCS JSON parse error in '{key}': {e}"))
154 })?;
155 match value {
156 Value::Array(arr) => Ok(arr),
157 other => Err(FaucetError::Source(format!(
158 "GCS expected JSON array in '{key}', got {}",
159 value_type_name(&other)
160 ))),
161 }
162 }
163 GcsFileFormat::RawText => Ok(vec![serde_json::json!({
164 "key": key,
165 "content": text,
166 })]),
167 }
168 }
169}
170
171#[async_trait]
172impl faucet_core::Source for GcsSource {
173 async fn fetch_with_context(
174 &self,
175 context: &std::collections::HashMap<String, Value>,
176 ) -> Result<Vec<Value>, FaucetError> {
177 let substituted_prefix: Option<String> = if !context.is_empty() {
178 self.config
179 .prefix
180 .as_ref()
181 .map(|p| faucet_core::util::substitute_context(p, context))
182 } else {
183 None
184 };
185
186 let keys = self
187 .list_object_names(substituted_prefix.as_deref())
188 .await?;
189 tracing::info!(
190 bucket = %self.config.bucket,
191 objects = keys.len(),
192 "Listed GCS objects",
193 );
194
195 let concurrency = self.config.concurrency.max(1);
196 let results: Vec<Vec<Value>> = stream::iter(keys)
197 .map(|key| async move {
198 let text = self.read_object_text(&key).await?;
199 let records = self.parse_content(&key, &text)?;
200 tracing::debug!(key = %key, records = records.len(), "Read GCS object");
201 Ok::<Vec<Value>, FaucetError>(records)
202 })
203 .buffer_unordered(concurrency)
204 .try_collect()
205 .await?;
206
207 let all_records: Vec<Value> = results.into_iter().flatten().collect();
208 tracing::info!(total_records = all_records.len(), "GCS fetch complete");
209 Ok(all_records)
210 }
211
212 fn stream_pages<'a>(
216 &'a self,
217 context: &'a std::collections::HashMap<String, Value>,
218 _batch_size: usize,
219 ) -> Pin<Box<dyn Stream<Item = Result<StreamPage, FaucetError>> + Send + 'a>> {
220 let batch_size = self.config.batch_size;
221
222 Box::pin(async_stream::try_stream! {
223 let substituted_prefix: Option<String> = if !context.is_empty() {
224 self.config
225 .prefix
226 .as_ref()
227 .map(|p| faucet_core::util::substitute_context(p, context))
228 } else {
229 None
230 };
231
232 let keys = self.list_object_names(substituted_prefix.as_deref()).await?;
233 tracing::info!(
234 bucket = %self.config.bucket,
235 objects = keys.len(),
236 "Listed GCS objects (stream)",
237 );
238
239 let chunk = if batch_size == 0 { usize::MAX } else { batch_size };
240 let initial_capacity = if batch_size == 0 { 1024 } else { batch_size };
241 let mut buffer: Vec<Value> = Vec::with_capacity(initial_capacity);
242 let mut total = 0usize;
243
244 for key in &keys {
245 match self.config.file_format {
246 GcsFileFormat::JsonLines => {
247 let reader = self.open_object_reader(key).await?;
248 let mut lines = reader.lines();
249 let mut line_num: usize = 0;
250 while let Some(line) = lines
251 .next_line()
252 .await
253 .map_err(|e| FaucetError::Source(format!(
254 "GCS read body error for key '{key}': {e}"
255 )))?
256 {
257 line_num += 1;
258 let trimmed = line.trim();
259 if trimmed.is_empty() { continue; }
260 let value: Value = serde_json::from_str(trimmed).map_err(|e| {
261 FaucetError::Source(format!(
262 "GCS JSON parse error in '{key}' at line {line_num}: {e}",
263 ))
264 })?;
265 buffer.push(value);
266 if batch_size != 0 && buffer.len() >= chunk {
267 let page = std::mem::replace(
268 &mut buffer,
269 Vec::with_capacity(initial_capacity),
270 );
271 total += page.len();
272 yield StreamPage { records: page, bookmark: None };
273 }
274 }
275 if batch_size == 0 && !buffer.is_empty() {
276 let page = std::mem::take(&mut buffer);
277 total += page.len();
278 yield StreamPage { records: page, bookmark: None };
279 }
280 }
281 GcsFileFormat::RawText => {
282 let text = self.read_object_text(key).await?;
283 let record = serde_json::json!({ "key": key, "content": text });
284 buffer.push(record);
285 if batch_size == 0 {
286 let page = std::mem::take(&mut buffer);
287 total += page.len();
288 yield StreamPage { records: page, bookmark: None };
289 } else if buffer.len() >= chunk {
290 let page = std::mem::replace(
291 &mut buffer,
292 Vec::with_capacity(initial_capacity),
293 );
294 total += page.len();
295 yield StreamPage { records: page, bookmark: None };
296 }
297 }
298 GcsFileFormat::JsonArray => {
299 let text = self.read_object_text(key).await?;
300 let value: Value = serde_json::from_str(&text).map_err(|e| {
301 FaucetError::Source(format!("GCS JSON parse error in '{key}': {e}"))
302 })?;
303 let array = match value {
304 Value::Array(arr) => arr,
305 other => Err(FaucetError::Source(format!(
306 "GCS expected JSON array in '{key}', got {}",
307 value_type_name(&other)
308 )))?,
309 };
310 if batch_size == 0 {
311 if !buffer.is_empty() {
312 let page = std::mem::take(&mut buffer);
313 total += page.len();
314 yield StreamPage { records: page, bookmark: None };
315 }
316 total += array.len();
317 yield StreamPage { records: array, bookmark: None };
318 } else {
319 for record in array {
320 buffer.push(record);
321 if buffer.len() >= chunk {
322 let page = std::mem::replace(
323 &mut buffer,
324 Vec::with_capacity(initial_capacity),
325 );
326 total += page.len();
327 yield StreamPage { records: page, bookmark: None };
328 }
329 }
330 }
331 }
332 }
333 }
334
335 if !buffer.is_empty() {
336 let page = std::mem::take(&mut buffer);
337 total += page.len();
338 yield StreamPage { records: page, bookmark: None };
339 }
340
341 tracing::info!(
342 total_records = total,
343 batch_size,
344 objects = keys.len(),
345 "GCS source stream complete",
346 );
347 })
348 }
349
350 fn config_schema(&self) -> Value {
351 serde_json::to_value(faucet_core::schema_for!(GcsSourceConfig))
352 .expect("schema serialization")
353 }
354
355 fn connector_name(&self) -> &'static str {
356 "gcs"
357 }
358}
359
360fn cap_keys(mut keys: Vec<String>, max: Option<usize>) -> Vec<String> {
367 if let Some(n) = max {
368 keys.truncate(n);
369 }
370 keys
371}
372
373fn value_type_name(v: &Value) -> &'static str {
374 match v {
375 Value::Null => "null",
376 Value::Bool(_) => "boolean",
377 Value::Number(_) => "number",
378 Value::String(_) => "string",
379 Value::Array(_) => "array",
380 Value::Object(_) => "object",
381 }
382}
383
384#[cfg(test)]
385mod tests {
386 use super::*;
387 use serde_json::json;
388
389 #[cfg(feature = "compression")]
390 #[test]
391 fn compression_default_is_auto() {
392 let cfg = GcsSourceConfig::new("bucket");
393 assert_eq!(cfg.compression, faucet_core::CompressionConfig::Auto);
394 }
395
396 fn parse(format: GcsFileFormat, key: &str, text: &str) -> Result<Vec<Value>, FaucetError> {
401 match format {
404 GcsFileFormat::JsonLines => {
405 let mut records = Vec::new();
406 for (line_num, line) in text.lines().enumerate() {
407 let trimmed = line.trim();
408 if trimmed.is_empty() {
409 continue;
410 }
411 let value: Value = serde_json::from_str(trimmed).map_err(|e| {
412 FaucetError::Source(format!(
413 "GCS JSON parse error in '{key}' at line {}: {e}",
414 line_num + 1
415 ))
416 })?;
417 records.push(value);
418 }
419 Ok(records)
420 }
421 GcsFileFormat::JsonArray => {
422 let value: Value = serde_json::from_str(text).map_err(|e| {
423 FaucetError::Source(format!("GCS JSON parse error in '{key}': {e}"))
424 })?;
425 match value {
426 Value::Array(arr) => Ok(arr),
427 other => Err(FaucetError::Source(format!(
428 "GCS expected JSON array in '{key}', got {}",
429 value_type_name(&other)
430 ))),
431 }
432 }
433 GcsFileFormat::RawText => Ok(vec![json!({
434 "key": key,
435 "content": text,
436 })]),
437 }
438 }
439
440 #[test]
441 fn parse_json_lines() {
442 let r = parse(GcsFileFormat::JsonLines, "t", "{\"id\":1}\n{\"id\":2}\n").unwrap();
443 assert_eq!(r.len(), 2);
444 assert_eq!(r[0]["id"], 1);
445 }
446
447 #[test]
448 fn parse_json_lines_skips_blanks() {
449 let r = parse(
450 GcsFileFormat::JsonLines,
451 "t",
452 "{\"id\":1}\n\n{\"id\":2}\n\n",
453 )
454 .unwrap();
455 assert_eq!(r.len(), 2);
456 }
457
458 #[test]
459 fn parse_json_lines_reports_line_number() {
460 let err = parse(GcsFileFormat::JsonLines, "t", "{\"id\":1}\nbad-line\n").unwrap_err();
461 let msg = err.to_string();
462 assert!(msg.contains("line 2"), "unexpected: {msg}");
463 }
464
465 #[test]
466 fn parse_json_array() {
467 let r = parse(
468 GcsFileFormat::JsonArray,
469 "t.json",
470 "[{\"id\":1},{\"id\":2}]",
471 )
472 .unwrap();
473 assert_eq!(r.len(), 2);
474 }
475
476 #[test]
477 fn parse_json_array_rejects_non_array() {
478 let err = parse(GcsFileFormat::JsonArray, "t.json", "{\"id\":1}").unwrap_err();
479 assert!(err.to_string().contains("expected JSON array"));
480 }
481
482 #[test]
483 fn parse_raw_text_yields_single_record() {
484 let r = parse(GcsFileFormat::RawText, "p/f.txt", "hello").unwrap();
485 assert_eq!(r, vec![json!({"key": "p/f.txt", "content": "hello"})]);
486 }
487
488 #[test]
489 fn cap_keys_truncates_explicit_list_to_max_objects() {
490 let keys = vec!["a".to_string(), "b".to_string(), "c".to_string()];
491 let capped = cap_keys(keys, Some(2));
492 assert_eq!(capped, vec!["a".to_string(), "b".to_string()]);
493 }
494
495 #[test]
496 fn cap_keys_passes_through_when_no_max() {
497 let keys = vec!["a".to_string(), "b".to_string(), "c".to_string()];
498 let capped = cap_keys(keys.clone(), None);
499 assert_eq!(capped, keys);
500 }
501
502 #[test]
503 fn cap_keys_noop_when_max_exceeds_len() {
504 let keys = vec!["a".to_string(), "b".to_string()];
505 let capped = cap_keys(keys.clone(), Some(10));
506 assert_eq!(capped, keys);
507 }
508}