Skip to main content

faucet_source_gcs/
stream.rs

1//! GCS source stream executor.
2
3use crate::config::{GcsFileFormat, GcsSourceConfig};
4use async_trait::async_trait;
5use faucet_common_gcs::{build_storage, build_storage_control};
6use faucet_core::{FaucetError, Stream, StreamPage};
7use futures::stream::{self, StreamExt, TryStreamExt};
8use google_cloud_gax::paginator::ItemPaginator;
9use google_cloud_storage::client::{Storage, StorageControl};
10use serde_json::Value;
11use std::pin::Pin;
12use tokio::io::AsyncBufReadExt;
13
14/// A GCS source that lists and reads objects from a bucket.
15pub struct GcsSource {
16    config: GcsSourceConfig,
17    storage: Storage,
18    control: StorageControl,
19}
20
21impl GcsSource {
22    /// Construct the source. Builds both clients eagerly so they are
23    /// reused across calls.
24    pub async fn new(config: GcsSourceConfig) -> Result<Self, FaucetError> {
25        let storage = build_storage(&config.auth, config.storage_host.as_deref()).await?;
26        let control = build_storage_control(&config.auth, config.storage_host.as_deref()).await?;
27        Ok(Self {
28            config,
29            storage,
30            control,
31        })
32    }
33
34    /// Bucket as a GCS resource path: `projects/_/buckets/{bucket}`.
35    fn bucket_path(&self) -> String {
36        format!("projects/_/buckets/{}", self.config.bucket)
37    }
38
39    /// List object names under the configured (or override) prefix,
40    /// capped at `max_objects` if set.
41    async fn list_object_names(
42        &self,
43        prefix_override: Option<&str>,
44    ) -> Result<Vec<String>, FaucetError> {
45        if let Some(ref keys) = self.config.object_keys {
46            return Ok(cap_keys(keys.clone(), self.config.max_objects));
47        }
48
49        let effective_prefix = prefix_override.or(self.config.prefix.as_deref());
50        let mut req = self.control.list_objects().set_parent(self.bucket_path());
51        if let Some(p) = effective_prefix {
52            req = req.set_prefix(p.to_string());
53        }
54        req = req.set_page_size(1000_i32);
55
56        let mut paginator = req.by_item();
57        let mut names: Vec<String> = Vec::new();
58        while let Some(item) = paginator.next().await {
59            let object = item.map_err(|e| {
60                FaucetError::Source(format!(
61                    "GCS list error for bucket '{}': {e}",
62                    self.config.bucket
63                ))
64            })?;
65            if object.name.is_empty() {
66                continue;
67            }
68            names.push(object.name);
69            if let Some(max) = self.config.max_objects
70                && names.len() >= max
71            {
72                break;
73            }
74        }
75        Ok(names)
76    }
77
78    /// Read the full body of a single GCS object into a UTF-8 `String`.
79    async fn read_object_text(&self, key: &str) -> Result<String, FaucetError> {
80        // Stream the (optionally decompressed) body straight into one String
81        // via the same reader the line-streaming path uses, instead of holding
82        // the raw bytes AND the decompressed bytes AND the String at once
83        // (#78/#25). For JsonArray / RawText the whole object is still one
84        // unit, but peak memory is now ~1× the decoded size rather than ~3×.
85        use tokio::io::AsyncReadExt as _;
86        let mut reader = self.open_object_reader(key).await?;
87        let mut text = String::new();
88        reader.read_to_string(&mut text).await.map_err(|e| {
89            FaucetError::Source(format!(
90                "GCS read/decode error for key '{key}' (not valid UTF-8?): {e}"
91            ))
92        })?;
93        Ok(text)
94    }
95
96    /// Open a GCS object as an `AsyncBufRead` over its body so callers can
97    /// decode line-by-line without buffering the entire object.
98    ///
99    /// Requires the `unstable-stream` feature on `google-cloud-storage`.
100    async fn open_object_reader(
101        &self,
102        key: &str,
103    ) -> Result<std::pin::Pin<Box<dyn tokio::io::AsyncBufRead + Send + Unpin>>, FaucetError> {
104        let resp = self
105            .storage
106            .read_object(self.bucket_path(), key.to_string())
107            .send()
108            .await
109            .map_err(|e| {
110                FaucetError::Source(format!(
111                    "GCS get error for bucket '{}' key '{key}': {e}",
112                    self.config.bucket
113                ))
114            })?;
115        let bytes_stream = resp
116            .into_stream()
117            .map_err(|e| std::io::Error::other(e.to_string()));
118        let buffered = tokio::io::BufReader::new(tokio_util::io::StreamReader::new(bytes_stream));
119        #[cfg(feature = "compression")]
120        {
121            let codec = self.config.compression.resolve(key);
122            faucet_core::compression::warn_mismatch(key, codec);
123            Ok(faucet_core::compression::wrap_async_reader(buffered, codec))
124        }
125        #[cfg(not(feature = "compression"))]
126        {
127            Ok(Box::pin(buffered))
128        }
129    }
130
131    /// Parse file content into records based on the configured file format.
132    fn parse_content(&self, key: &str, text: &str) -> Result<Vec<Value>, FaucetError> {
133        match self.config.file_format {
134            GcsFileFormat::JsonLines => {
135                let mut records = Vec::new();
136                for (line_num, line) in text.lines().enumerate() {
137                    let trimmed = line.trim();
138                    if trimmed.is_empty() {
139                        continue;
140                    }
141                    let value: Value = serde_json::from_str(trimmed).map_err(|e| {
142                        FaucetError::Source(format!(
143                            "GCS JSON parse error in '{key}' at line {}: {e}",
144                            line_num + 1
145                        ))
146                    })?;
147                    records.push(value);
148                }
149                Ok(records)
150            }
151            GcsFileFormat::JsonArray => {
152                let value: Value = serde_json::from_str(text).map_err(|e| {
153                    FaucetError::Source(format!("GCS JSON parse error in '{key}': {e}"))
154                })?;
155                match value {
156                    Value::Array(arr) => Ok(arr),
157                    other => Err(FaucetError::Source(format!(
158                        "GCS expected JSON array in '{key}', got {}",
159                        value_type_name(&other)
160                    ))),
161                }
162            }
163            GcsFileFormat::RawText => Ok(vec![serde_json::json!({
164                "key": key,
165                "content": text,
166            })]),
167        }
168    }
169}
170
171#[async_trait]
172impl faucet_core::Source for GcsSource {
173    async fn fetch_with_context(
174        &self,
175        context: &std::collections::HashMap<String, Value>,
176    ) -> Result<Vec<Value>, FaucetError> {
177        let substituted_prefix: Option<String> = if !context.is_empty() {
178            self.config
179                .prefix
180                .as_ref()
181                .map(|p| faucet_core::util::substitute_context(p, context))
182        } else {
183            None
184        };
185
186        let keys = self
187            .list_object_names(substituted_prefix.as_deref())
188            .await?;
189        tracing::info!(
190            bucket = %self.config.bucket,
191            objects = keys.len(),
192            "Listed GCS objects",
193        );
194
195        let concurrency = self.config.concurrency.max(1);
196        let results: Vec<Vec<Value>> = stream::iter(keys)
197            .map(|key| async move {
198                let text = self.read_object_text(&key).await?;
199                let records = self.parse_content(&key, &text)?;
200                tracing::debug!(key = %key, records = records.len(), "Read GCS object");
201                Ok::<Vec<Value>, FaucetError>(records)
202            })
203            .buffer_unordered(concurrency)
204            .try_collect()
205            .await?;
206
207        let all_records: Vec<Value> = results.into_iter().flatten().collect();
208        tracing::info!(total_records = all_records.len(), "GCS fetch complete");
209        Ok(all_records)
210    }
211
212    /// Stream records from listed GCS objects without buffering the full
213    /// scan. Mirrors `S3Source::stream_pages` — see that implementation
214    /// for the per-format reasoning.
215    fn stream_pages<'a>(
216        &'a self,
217        context: &'a std::collections::HashMap<String, Value>,
218        _batch_size: usize,
219    ) -> Pin<Box<dyn Stream<Item = Result<StreamPage, FaucetError>> + Send + 'a>> {
220        let batch_size = self.config.batch_size;
221
222        Box::pin(async_stream::try_stream! {
223            let substituted_prefix: Option<String> = if !context.is_empty() {
224                self.config
225                    .prefix
226                    .as_ref()
227                    .map(|p| faucet_core::util::substitute_context(p, context))
228            } else {
229                None
230            };
231
232            let keys = self.list_object_names(substituted_prefix.as_deref()).await?;
233            tracing::info!(
234                bucket = %self.config.bucket,
235                objects = keys.len(),
236                "Listed GCS objects (stream)",
237            );
238
239            let chunk = if batch_size == 0 { usize::MAX } else { batch_size };
240            let initial_capacity = if batch_size == 0 { 1024 } else { batch_size };
241            let mut buffer: Vec<Value> = Vec::with_capacity(initial_capacity);
242            let mut total = 0usize;
243
244            for key in &keys {
245                match self.config.file_format {
246                    GcsFileFormat::JsonLines => {
247                        let reader = self.open_object_reader(key).await?;
248                        let mut lines = reader.lines();
249                        let mut line_num: usize = 0;
250                        while let Some(line) = lines
251                            .next_line()
252                            .await
253                            .map_err(|e| FaucetError::Source(format!(
254                                "GCS read body error for key '{key}': {e}"
255                            )))?
256                        {
257                            line_num += 1;
258                            let trimmed = line.trim();
259                            if trimmed.is_empty() { continue; }
260                            let value: Value = serde_json::from_str(trimmed).map_err(|e| {
261                                FaucetError::Source(format!(
262                                    "GCS JSON parse error in '{key}' at line {line_num}: {e}",
263                                ))
264                            })?;
265                            buffer.push(value);
266                            if batch_size != 0 && buffer.len() >= chunk {
267                                let page = std::mem::replace(
268                                    &mut buffer,
269                                    Vec::with_capacity(initial_capacity),
270                                );
271                                total += page.len();
272                                yield StreamPage { records: page, bookmark: None };
273                            }
274                        }
275                        if batch_size == 0 && !buffer.is_empty() {
276                            let page = std::mem::take(&mut buffer);
277                            total += page.len();
278                            yield StreamPage { records: page, bookmark: None };
279                        }
280                    }
281                    GcsFileFormat::RawText => {
282                        let text = self.read_object_text(key).await?;
283                        let record = serde_json::json!({ "key": key, "content": text });
284                        buffer.push(record);
285                        if batch_size == 0 {
286                            let page = std::mem::take(&mut buffer);
287                            total += page.len();
288                            yield StreamPage { records: page, bookmark: None };
289                        } else if buffer.len() >= chunk {
290                            let page = std::mem::replace(
291                                &mut buffer,
292                                Vec::with_capacity(initial_capacity),
293                            );
294                            total += page.len();
295                            yield StreamPage { records: page, bookmark: None };
296                        }
297                    }
298                    GcsFileFormat::JsonArray => {
299                        let text = self.read_object_text(key).await?;
300                        let value: Value = serde_json::from_str(&text).map_err(|e| {
301                            FaucetError::Source(format!("GCS JSON parse error in '{key}': {e}"))
302                        })?;
303                        let array = match value {
304                            Value::Array(arr) => arr,
305                            other => Err(FaucetError::Source(format!(
306                                "GCS expected JSON array in '{key}', got {}",
307                                value_type_name(&other)
308                            )))?,
309                        };
310                        if batch_size == 0 {
311                            if !buffer.is_empty() {
312                                let page = std::mem::take(&mut buffer);
313                                total += page.len();
314                                yield StreamPage { records: page, bookmark: None };
315                            }
316                            total += array.len();
317                            yield StreamPage { records: array, bookmark: None };
318                        } else {
319                            for record in array {
320                                buffer.push(record);
321                                if buffer.len() >= chunk {
322                                    let page = std::mem::replace(
323                                        &mut buffer,
324                                        Vec::with_capacity(initial_capacity),
325                                    );
326                                    total += page.len();
327                                    yield StreamPage { records: page, bookmark: None };
328                                }
329                            }
330                        }
331                    }
332                }
333            }
334
335            if !buffer.is_empty() {
336                let page = std::mem::take(&mut buffer);
337                total += page.len();
338                yield StreamPage { records: page, bookmark: None };
339            }
340
341            tracing::info!(
342                total_records = total,
343                batch_size,
344                objects = keys.len(),
345                "GCS source stream complete",
346            );
347        })
348    }
349
350    fn config_schema(&self) -> Value {
351        serde_json::to_value(faucet_core::schema_for!(GcsSourceConfig))
352            .expect("schema serialization")
353    }
354
355    fn connector_name(&self) -> &'static str {
356        "gcs"
357    }
358}
359
360/// Truncate an explicit object-key list to the `max_objects` cap.
361///
362/// `None` leaves the list untouched; `Some(n)` keeps at most the first `n`
363/// keys. This mirrors the cap the listing path applies while paginating, so
364/// `max_objects` is honoured whether keys come from `object_keys` or a live
365/// `list_objects` scan.
366fn cap_keys(mut keys: Vec<String>, max: Option<usize>) -> Vec<String> {
367    if let Some(n) = max {
368        keys.truncate(n);
369    }
370    keys
371}
372
373fn value_type_name(v: &Value) -> &'static str {
374    match v {
375        Value::Null => "null",
376        Value::Bool(_) => "boolean",
377        Value::Number(_) => "number",
378        Value::String(_) => "string",
379        Value::Array(_) => "array",
380        Value::Object(_) => "object",
381    }
382}
383
384#[cfg(test)]
385mod tests {
386    use super::*;
387    use serde_json::json;
388
389    #[cfg(feature = "compression")]
390    #[test]
391    fn compression_default_is_auto() {
392        let cfg = GcsSourceConfig::new("bucket");
393        assert_eq!(cfg.compression, faucet_core::CompressionConfig::Auto);
394    }
395
396    /// Construct a parse-only test config — used by tests that don't touch
397    /// the network. We can't easily construct a real `GcsSource` without
398    /// a runtime, so we test `parse_content` via a free helper that
399    /// inlines the same logic with an explicit `format` argument.
400    fn parse(format: GcsFileFormat, key: &str, text: &str) -> Result<Vec<Value>, FaucetError> {
401        // Mirror the implementation in `parse_content`. Kept in sync with
402        // production code via the integration test suite.
403        match format {
404            GcsFileFormat::JsonLines => {
405                let mut records = Vec::new();
406                for (line_num, line) in text.lines().enumerate() {
407                    let trimmed = line.trim();
408                    if trimmed.is_empty() {
409                        continue;
410                    }
411                    let value: Value = serde_json::from_str(trimmed).map_err(|e| {
412                        FaucetError::Source(format!(
413                            "GCS JSON parse error in '{key}' at line {}: {e}",
414                            line_num + 1
415                        ))
416                    })?;
417                    records.push(value);
418                }
419                Ok(records)
420            }
421            GcsFileFormat::JsonArray => {
422                let value: Value = serde_json::from_str(text).map_err(|e| {
423                    FaucetError::Source(format!("GCS JSON parse error in '{key}': {e}"))
424                })?;
425                match value {
426                    Value::Array(arr) => Ok(arr),
427                    other => Err(FaucetError::Source(format!(
428                        "GCS expected JSON array in '{key}', got {}",
429                        value_type_name(&other)
430                    ))),
431                }
432            }
433            GcsFileFormat::RawText => Ok(vec![json!({
434                "key": key,
435                "content": text,
436            })]),
437        }
438    }
439
440    #[test]
441    fn parse_json_lines() {
442        let r = parse(GcsFileFormat::JsonLines, "t", "{\"id\":1}\n{\"id\":2}\n").unwrap();
443        assert_eq!(r.len(), 2);
444        assert_eq!(r[0]["id"], 1);
445    }
446
447    #[test]
448    fn parse_json_lines_skips_blanks() {
449        let r = parse(
450            GcsFileFormat::JsonLines,
451            "t",
452            "{\"id\":1}\n\n{\"id\":2}\n\n",
453        )
454        .unwrap();
455        assert_eq!(r.len(), 2);
456    }
457
458    #[test]
459    fn parse_json_lines_reports_line_number() {
460        let err = parse(GcsFileFormat::JsonLines, "t", "{\"id\":1}\nbad-line\n").unwrap_err();
461        let msg = err.to_string();
462        assert!(msg.contains("line 2"), "unexpected: {msg}");
463    }
464
465    #[test]
466    fn parse_json_array() {
467        let r = parse(
468            GcsFileFormat::JsonArray,
469            "t.json",
470            "[{\"id\":1},{\"id\":2}]",
471        )
472        .unwrap();
473        assert_eq!(r.len(), 2);
474    }
475
476    #[test]
477    fn parse_json_array_rejects_non_array() {
478        let err = parse(GcsFileFormat::JsonArray, "t.json", "{\"id\":1}").unwrap_err();
479        assert!(err.to_string().contains("expected JSON array"));
480    }
481
482    #[test]
483    fn parse_raw_text_yields_single_record() {
484        let r = parse(GcsFileFormat::RawText, "p/f.txt", "hello").unwrap();
485        assert_eq!(r, vec![json!({"key": "p/f.txt", "content": "hello"})]);
486    }
487
488    #[test]
489    fn cap_keys_truncates_explicit_list_to_max_objects() {
490        let keys = vec!["a".to_string(), "b".to_string(), "c".to_string()];
491        let capped = cap_keys(keys, Some(2));
492        assert_eq!(capped, vec!["a".to_string(), "b".to_string()]);
493    }
494
495    #[test]
496    fn cap_keys_passes_through_when_no_max() {
497        let keys = vec!["a".to_string(), "b".to_string(), "c".to_string()];
498        let capped = cap_keys(keys.clone(), None);
499        assert_eq!(capped, keys);
500    }
501
502    #[test]
503    fn cap_keys_noop_when_max_exceeds_len() {
504        let keys = vec!["a".to_string(), "b".to_string()];
505        let capped = cap_keys(keys.clone(), Some(10));
506        assert_eq!(capped, keys);
507    }
508}