Skip to main content

faucet_sink_gcs/
sink.rs

1//! GCS sink executor.
2
3use crate::config::GcsSinkConfig;
4use async_trait::async_trait;
5use faucet_common_gcs::{build_storage, build_storage_control};
6use faucet_core::FaucetError;
7use futures::stream::{self, StreamExt, TryStreamExt};
8use google_cloud_storage::client::Storage;
9use serde_json::Value;
10
11/// A sink that writes JSON records to GCS as JSON Lines files.
12pub struct GcsSink {
13    config: GcsSinkConfig,
14    storage: Storage,
15}
16
17impl GcsSink {
18    pub async fn new(config: GcsSinkConfig) -> Result<Self, FaucetError> {
19        faucet_core::validate_batch_size(config.batch_size)?;
20        let storage = build_storage(&config.auth, config.storage_host.as_deref()).await?;
21        Ok(Self { config, storage })
22    }
23
24    /// Bucket as a GCS resource path: `projects/_/buckets/{bucket}`.
25    fn bucket_path(&self) -> String {
26        format!("projects/_/buckets/{}", self.config.bucket)
27    }
28
29    /// Serialize a slice of records as a JSON Lines byte buffer.
30    fn serialize_jsonl(records: &[Value]) -> Result<Vec<u8>, FaucetError> {
31        let mut buf: Vec<u8> = Vec::new();
32        for record in records {
33            let line = serde_json::to_vec(record)
34                .map_err(|e| FaucetError::Sink(format!("JSON serialization failed: {e}")))?;
35            buf.extend_from_slice(&line);
36            buf.push(b'\n');
37        }
38        Ok(buf)
39    }
40
41    /// Generate a time-sortable UUIDv7 object name.
42    fn generate_key(&self) -> String {
43        generate_object_key(&self.config.prefix, &self.config.file_extension)
44    }
45
46    /// Upload a single JSONL file to GCS.
47    async fn upload_file(&self, key: &str, body: Vec<u8>) -> Result<(), FaucetError> {
48        #[cfg(feature = "compression")]
49        let body = {
50            let codec = self.config.compression.resolve(&self.config.file_extension);
51            faucet_core::compression::warn_mismatch(&self.config.file_extension, codec);
52            faucet_core::compression::compress_buf(&body, codec)?
53        };
54
55        let payload = bytes::Bytes::from(body);
56        self.storage
57            .write_object(self.bucket_path(), key.to_string(), payload)
58            .set_content_type("application/x-ndjson")
59            .send_unbuffered()
60            .await
61            .map_err(|e| FaucetError::Sink(format!("GCS put object error for key '{key}': {e}")))?;
62        tracing::debug!(key = %key, "Uploaded GCS object");
63        Ok(())
64    }
65
66    /// Compute the effective chunk size combining `batch_size` and
67    /// `max_records_per_file`. `batch_size = 0` removes the batch-size
68    /// limit; `max_records_per_file = None` removes the file-rollover
69    /// limit. When both are unlimited, returns `usize::MAX` (single chunk).
70    fn effective_chunk_size(&self) -> usize {
71        resolve_effective_chunk_size(&self.config)
72    }
73}
74
75#[async_trait]
76impl faucet_core::Sink for GcsSink {
77    async fn write_batch(&self, records: &[Value]) -> Result<usize, FaucetError> {
78        if records.is_empty() {
79            return Ok(0);
80        }
81        let chunk = self.effective_chunk_size();
82        let concurrency = self.config.concurrency.max(1);
83
84        let uploads: Vec<(String, Vec<u8>)> = records
85            .chunks(chunk)
86            .map(|slice| {
87                let body = Self::serialize_jsonl(slice)?;
88                Ok::<(String, Vec<u8>), FaucetError>((self.generate_key(), body))
89            })
90            .collect::<Result<_, _>>()?;
91
92        let written = records.len();
93        stream::iter(uploads)
94            .map(|(key, body)| async move { self.upload_file(&key, body).await })
95            .buffer_unordered(concurrency)
96            .try_collect::<Vec<()>>()
97            .await?;
98
99        Ok(written)
100    }
101
102    fn config_schema(&self) -> Value {
103        serde_json::to_value(faucet_core::schema_for!(GcsSinkConfig)).expect("schema serialization")
104    }
105
106    fn connector_name(&self) -> &'static str {
107        "gcs"
108    }
109
110    /// Preflight probe: confirm the configured bucket is reachable and the
111    /// credentials work via a non-mutating `list_objects` call capped at a
112    /// single result. Writes nothing.
113    ///
114    /// The sink only holds a data-plane [`Storage`] client (which exposes no
115    /// list/get-bucket call), so the probe builds a control-plane
116    /// `StorageControl` client on demand using the same credentials.
117    async fn check(
118        &self,
119        ctx: &faucet_core::check::CheckContext,
120    ) -> Result<faucet_core::check::CheckReport, FaucetError> {
121        use faucet_core::check::{CheckReport, Probe};
122
123        let started = std::time::Instant::now();
124
125        // Build a control-plane client (the data-plane Storage client has no
126        // read-only list/get-bucket call). Credential/client-build failures
127        // surface as a failed probe rather than an Err.
128        let control =
129            match build_storage_control(&self.config.auth, self.config.storage_host.as_deref())
130                .await
131            {
132                Ok(c) => c,
133                Err(e) => {
134                    return Ok(CheckReport::single(Probe::fail_hint(
135                        "auth",
136                        started.elapsed(),
137                        e.to_string(),
138                        "check bucket name, credentials, and network",
139                    )));
140                }
141            };
142
143        let probe = match tokio::time::timeout(
144            ctx.timeout,
145            control
146                .list_objects()
147                .set_parent(self.bucket_path())
148                .set_page_size(1_i32)
149                .send(),
150        )
151        .await
152        {
153            Ok(Ok(_)) => Probe::pass("auth", started.elapsed()),
154            Ok(Err(e)) => Probe::fail_hint(
155                "auth",
156                started.elapsed(),
157                e.to_string(),
158                "check bucket name, credentials, and network",
159            ),
160            Err(_) => Probe::fail("network", started.elapsed(), "timed out"),
161        };
162        Ok(CheckReport::single(probe))
163    }
164}
165
166/// Pure helper for chunk-size resolution — used by `write_batch` and unit
167/// tested directly so the test surface doesn't need a `Storage` stub.
168fn resolve_effective_chunk_size(config: &GcsSinkConfig) -> usize {
169    let bs = if config.batch_size == 0 {
170        usize::MAX
171    } else {
172        config.batch_size
173    };
174    let mr = config.max_records_per_file.unwrap_or(usize::MAX);
175    bs.min(mr)
176}
177
178/// Pure helper for object-key generation — used by `write_batch` and unit
179/// tested directly. UUIDv7 makes keys time-sortable so a listing of the
180/// destination bucket returns objects in write order.
181fn generate_object_key(prefix: &str, file_extension: &str) -> String {
182    format!("{prefix}{}{file_extension}", uuid::Uuid::now_v7())
183}
184
185#[cfg(test)]
186mod tests {
187    use super::*;
188
189    #[tokio::test]
190    async fn new_rejects_out_of_range_batch_size() {
191        // Validation runs before any GCS client setup, so this needs no backend.
192        let mut config = GcsSinkConfig::new("bucket");
193        config.batch_size = faucet_core::MAX_BATCH_SIZE + 1;
194        match GcsSink::new(config).await {
195            Err(FaucetError::Config(m)) => assert!(m.contains("batch_size"), "got: {m}"),
196            _ => panic!("expected a batch_size Config error"),
197        }
198    }
199    use serde_json::json;
200
201    #[test]
202    fn serialize_jsonl_two_records() {
203        let body = GcsSink::serialize_jsonl(&[json!({"a": 1}), json!({"b": 2})]).unwrap();
204        assert_eq!(
205            std::str::from_utf8(&body).unwrap(),
206            "{\"a\":1}\n{\"b\":2}\n"
207        );
208    }
209
210    #[test]
211    fn serialize_jsonl_empty_is_empty() {
212        let body = GcsSink::serialize_jsonl(&[]).unwrap();
213        assert!(body.is_empty());
214    }
215
216    #[test]
217    fn effective_chunk_size_unlimited_when_both_unset() {
218        let cfg = GcsSinkConfig::new("b").with_batch_size(0);
219        assert_eq!(resolve_effective_chunk_size(&cfg), usize::MAX);
220    }
221
222    #[test]
223    fn effective_chunk_size_takes_smaller_limit() {
224        let cfg = GcsSinkConfig::new("b")
225            .with_batch_size(500)
226            .max_records_per_file(100);
227        assert_eq!(resolve_effective_chunk_size(&cfg), 100);
228    }
229
230    #[test]
231    fn effective_chunk_size_uses_batch_size_when_smaller() {
232        let cfg = GcsSinkConfig::new("b")
233            .with_batch_size(50)
234            .max_records_per_file(500);
235        assert_eq!(resolve_effective_chunk_size(&cfg), 50);
236    }
237
238    #[test]
239    fn generate_key_uses_prefix_and_extension() {
240        let key = generate_object_key("out/", ".ndjson");
241        assert!(key.starts_with("out/"));
242        assert!(key.ends_with(".ndjson"));
243    }
244
245    #[test]
246    fn generate_key_yields_distinct_time_ordered_keys() {
247        let a = generate_object_key("p/", ".jsonl");
248        let b = generate_object_key("p/", ".jsonl");
249        assert_ne!(a, b);
250        // UUIDv7 keys are lexically comparable by time within the same
251        // process: the second key generated should compare greater.
252        assert!(a < b, "expected UUIDv7 keys to sort by generation order");
253    }
254
255    #[cfg(feature = "compression")]
256    #[test]
257    fn compress_buf_used_for_zstd_extension() {
258        let cfg = GcsSinkConfig::new("bucket").file_extension(".jsonl.zst");
259        let codec = cfg.compression.resolve(&cfg.file_extension);
260        assert_eq!(codec, faucet_core::Compression::Zstd);
261        let compressed = faucet_core::compression::compress_buf(b"hello\n", codec).unwrap();
262        // zstd magic bytes: 0x28 B5 2F FD.
263        assert_eq!(&compressed[..4], b"\x28\xb5\x2f\xfd");
264    }
265}