1use crate::config::GcsSinkConfig;
4use async_trait::async_trait;
5use faucet_common_gcs::{build_storage, build_storage_control};
6use faucet_core::FaucetError;
7use futures::stream::{self, StreamExt, TryStreamExt};
8use google_cloud_storage::client::Storage;
9use serde_json::Value;
10
11pub struct GcsSink {
13 config: GcsSinkConfig,
14 storage: Storage,
15}
16
17impl GcsSink {
18 pub async fn new(config: GcsSinkConfig) -> Result<Self, FaucetError> {
19 faucet_core::validate_batch_size(config.batch_size)?;
20 let storage = build_storage(&config.auth, config.storage_host.as_deref()).await?;
21 Ok(Self { config, storage })
22 }
23
24 fn bucket_path(&self) -> String {
26 format!("projects/_/buckets/{}", self.config.bucket)
27 }
28
29 fn serialize_jsonl(records: &[Value]) -> Result<Vec<u8>, FaucetError> {
31 let mut buf: Vec<u8> = Vec::new();
32 for record in records {
33 let line = serde_json::to_vec(record)
34 .map_err(|e| FaucetError::Sink(format!("JSON serialization failed: {e}")))?;
35 buf.extend_from_slice(&line);
36 buf.push(b'\n');
37 }
38 Ok(buf)
39 }
40
41 fn generate_key(&self) -> String {
43 generate_object_key(&self.config.prefix, &self.config.file_extension)
44 }
45
46 async fn upload_file(&self, key: &str, body: Vec<u8>) -> Result<(), FaucetError> {
48 #[cfg(feature = "compression")]
49 let body = {
50 let codec = self.config.compression.resolve(&self.config.file_extension);
51 faucet_core::compression::warn_mismatch(&self.config.file_extension, codec);
52 faucet_core::compression::compress_buf(&body, codec)?
53 };
54
55 let payload = bytes::Bytes::from(body);
56 self.storage
57 .write_object(self.bucket_path(), key.to_string(), payload)
58 .set_content_type("application/x-ndjson")
59 .send_unbuffered()
60 .await
61 .map_err(|e| FaucetError::Sink(format!("GCS put object error for key '{key}': {e}")))?;
62 tracing::debug!(key = %key, "Uploaded GCS object");
63 Ok(())
64 }
65
66 fn effective_chunk_size(&self) -> usize {
71 resolve_effective_chunk_size(&self.config)
72 }
73}
74
75#[async_trait]
76impl faucet_core::Sink for GcsSink {
77 async fn write_batch(&self, records: &[Value]) -> Result<usize, FaucetError> {
78 if records.is_empty() {
79 return Ok(0);
80 }
81 let chunk = self.effective_chunk_size();
82 let concurrency = self.config.concurrency.max(1);
83
84 let uploads: Vec<(String, Vec<u8>)> = records
85 .chunks(chunk)
86 .map(|slice| {
87 let body = Self::serialize_jsonl(slice)?;
88 Ok::<(String, Vec<u8>), FaucetError>((self.generate_key(), body))
89 })
90 .collect::<Result<_, _>>()?;
91
92 let written = records.len();
93 stream::iter(uploads)
94 .map(|(key, body)| async move { self.upload_file(&key, body).await })
95 .buffer_unordered(concurrency)
96 .try_collect::<Vec<()>>()
97 .await?;
98
99 Ok(written)
100 }
101
102 fn config_schema(&self) -> Value {
103 serde_json::to_value(faucet_core::schema_for!(GcsSinkConfig)).expect("schema serialization")
104 }
105
106 fn connector_name(&self) -> &'static str {
107 "gcs"
108 }
109
110 async fn check(
118 &self,
119 ctx: &faucet_core::check::CheckContext,
120 ) -> Result<faucet_core::check::CheckReport, FaucetError> {
121 use faucet_core::check::{CheckReport, Probe};
122
123 let started = std::time::Instant::now();
124
125 let control =
129 match build_storage_control(&self.config.auth, self.config.storage_host.as_deref())
130 .await
131 {
132 Ok(c) => c,
133 Err(e) => {
134 return Ok(CheckReport::single(Probe::fail_hint(
135 "auth",
136 started.elapsed(),
137 e.to_string(),
138 "check bucket name, credentials, and network",
139 )));
140 }
141 };
142
143 let probe = match tokio::time::timeout(
144 ctx.timeout,
145 control
146 .list_objects()
147 .set_parent(self.bucket_path())
148 .set_page_size(1_i32)
149 .send(),
150 )
151 .await
152 {
153 Ok(Ok(_)) => Probe::pass("auth", started.elapsed()),
154 Ok(Err(e)) => Probe::fail_hint(
155 "auth",
156 started.elapsed(),
157 e.to_string(),
158 "check bucket name, credentials, and network",
159 ),
160 Err(_) => Probe::fail("network", started.elapsed(), "timed out"),
161 };
162 Ok(CheckReport::single(probe))
163 }
164}
165
166fn resolve_effective_chunk_size(config: &GcsSinkConfig) -> usize {
169 let bs = if config.batch_size == 0 {
170 usize::MAX
171 } else {
172 config.batch_size
173 };
174 let mr = config.max_records_per_file.unwrap_or(usize::MAX);
175 bs.min(mr)
176}
177
178fn generate_object_key(prefix: &str, file_extension: &str) -> String {
182 format!("{prefix}{}{file_extension}", uuid::Uuid::now_v7())
183}
184
185#[cfg(test)]
186mod tests {
187 use super::*;
188
189 #[tokio::test]
190 async fn new_rejects_out_of_range_batch_size() {
191 let mut config = GcsSinkConfig::new("bucket");
193 config.batch_size = faucet_core::MAX_BATCH_SIZE + 1;
194 match GcsSink::new(config).await {
195 Err(FaucetError::Config(m)) => assert!(m.contains("batch_size"), "got: {m}"),
196 _ => panic!("expected a batch_size Config error"),
197 }
198 }
199 use serde_json::json;
200
201 #[test]
202 fn serialize_jsonl_two_records() {
203 let body = GcsSink::serialize_jsonl(&[json!({"a": 1}), json!({"b": 2})]).unwrap();
204 assert_eq!(
205 std::str::from_utf8(&body).unwrap(),
206 "{\"a\":1}\n{\"b\":2}\n"
207 );
208 }
209
210 #[test]
211 fn serialize_jsonl_empty_is_empty() {
212 let body = GcsSink::serialize_jsonl(&[]).unwrap();
213 assert!(body.is_empty());
214 }
215
216 #[test]
217 fn effective_chunk_size_unlimited_when_both_unset() {
218 let cfg = GcsSinkConfig::new("b").with_batch_size(0);
219 assert_eq!(resolve_effective_chunk_size(&cfg), usize::MAX);
220 }
221
222 #[test]
223 fn effective_chunk_size_takes_smaller_limit() {
224 let cfg = GcsSinkConfig::new("b")
225 .with_batch_size(500)
226 .max_records_per_file(100);
227 assert_eq!(resolve_effective_chunk_size(&cfg), 100);
228 }
229
230 #[test]
231 fn effective_chunk_size_uses_batch_size_when_smaller() {
232 let cfg = GcsSinkConfig::new("b")
233 .with_batch_size(50)
234 .max_records_per_file(500);
235 assert_eq!(resolve_effective_chunk_size(&cfg), 50);
236 }
237
238 #[test]
239 fn generate_key_uses_prefix_and_extension() {
240 let key = generate_object_key("out/", ".ndjson");
241 assert!(key.starts_with("out/"));
242 assert!(key.ends_with(".ndjson"));
243 }
244
245 #[test]
246 fn generate_key_yields_distinct_time_ordered_keys() {
247 let a = generate_object_key("p/", ".jsonl");
248 let b = generate_object_key("p/", ".jsonl");
249 assert_ne!(a, b);
250 assert!(a < b, "expected UUIDv7 keys to sort by generation order");
253 }
254
255 #[cfg(feature = "compression")]
256 #[test]
257 fn compress_buf_used_for_zstd_extension() {
258 let cfg = GcsSinkConfig::new("bucket").file_extension(".jsonl.zst");
259 let codec = cfg.compression.resolve(&cfg.file_extension);
260 assert_eq!(codec, faucet_core::Compression::Zstd);
261 let compressed = faucet_core::compression::compress_buf(b"hello\n", codec).unwrap();
262 assert_eq!(&compressed[..4], b"\x28\xb5\x2f\xfd");
264 }
265}