1use crate::config::S3SinkConfig;
4use async_trait::async_trait;
5use aws_sdk_s3::Client;
6use faucet_core::FaucetError;
7use futures::stream::{self, StreamExt, TryStreamExt};
8use serde_json::Value;
9
10pub struct S3Sink {
12 config: S3SinkConfig,
13 client: Client,
14}
15
16impl S3Sink {
17 pub async fn new(config: S3SinkConfig) -> Result<Self, FaucetError> {
21 faucet_core::validate_batch_size(config.batch_size)?;
22 let client = Self::build_client(&config).await?;
23 Ok(Self { config, client })
24 }
25
26 async fn build_client(config: &S3SinkConfig) -> Result<Client, FaucetError> {
28 let mut config_loader = aws_config::defaults(aws_config::BehaviorVersion::latest());
29
30 if let Some(ref region) = config.region {
31 config_loader = config_loader.region(aws_config::Region::new(region.clone()));
32 }
33
34 if let Some(ref endpoint) = config.endpoint_url {
35 config_loader = config_loader.endpoint_url(endpoint);
36 }
37
38 let sdk_config = config_loader.load().await;
39 let client = Client::new(&sdk_config);
40 Ok(client)
41 }
42
43 fn serialize_jsonl(records: &[Value]) -> Result<Vec<u8>, FaucetError> {
45 let mut buf: Vec<u8> = Vec::new();
46 for record in records {
47 let line = serde_json::to_vec(record)
48 .map_err(|e| FaucetError::Sink(format!("JSON serialization failed: {e}")))?;
49 buf.extend_from_slice(&line);
50 buf.push(b'\n');
51 }
52 Ok(buf)
53 }
54
55 fn generate_key(&self) -> String {
57 let id = uuid::Uuid::new_v4();
58 format!("{}{}{}", self.config.prefix, id, self.config.file_extension)
59 }
60
61 async fn upload_file(&self, key: &str, body: Vec<u8>) -> Result<(), FaucetError> {
63 #[cfg(feature = "compression")]
64 let body = {
65 let codec = self.config.compression.resolve(&self.config.file_extension);
66 faucet_core::compression::warn_mismatch(&self.config.file_extension, codec);
67 faucet_core::compression::compress_buf(&body, codec)?
68 };
69
70 self.client
71 .put_object()
72 .bucket(&self.config.bucket)
73 .key(key)
74 .body(body.into())
75 .content_type("application/x-ndjson")
76 .send()
77 .await
78 .map_err(|e| FaucetError::Sink(format!("S3 put object error for key '{key}': {e}")))?;
79
80 tracing::debug!(key = %key, "Uploaded S3 object");
81 Ok(())
82 }
83}
84
85#[async_trait]
86impl faucet_core::Sink for S3Sink {
87 fn config_schema(&self) -> serde_json::Value {
88 serde_json::to_value(faucet_core::schema_for!(S3SinkConfig)).expect("schema serialization")
89 }
90
91 async fn check(
94 &self,
95 ctx: &faucet_core::check::CheckContext,
96 ) -> Result<faucet_core::check::CheckReport, FaucetError> {
97 use faucet_core::check::{CheckReport, Probe};
98
99 let started = std::time::Instant::now();
100 let probe = match tokio::time::timeout(
101 ctx.timeout,
102 self.client.head_bucket().bucket(&self.config.bucket).send(),
103 )
104 .await
105 {
106 Ok(Ok(_)) => Probe::pass("auth", started.elapsed()),
107 Ok(Err(e)) => Probe::fail_hint(
108 "auth",
109 started.elapsed(),
110 e.to_string(),
111 "check bucket name, credentials, and network",
112 ),
113 Err(_) => Probe::fail("network", started.elapsed(), "timed out"),
114 };
115 Ok(CheckReport::single(probe))
116 }
117
118 async fn write_batch(&self, records: &[Value]) -> Result<usize, FaucetError> {
119 if records.is_empty() {
120 return Ok(0);
121 }
122
123 let chunk_cap: Option<usize> =
127 match (self.config.batch_size, self.config.max_records_per_file) {
128 (0, None) => None,
129 (0, Some(0)) => None,
130 (0, Some(max)) => Some(max),
131 (bs, None) => Some(bs),
132 (bs, Some(0)) => Some(bs),
133 (bs, Some(max)) => Some(bs.min(max)),
134 };
135
136 let chunks: Vec<&[Value]> = match chunk_cap {
137 Some(cap) => records.chunks(cap).collect(),
138 None => vec![records],
139 };
140
141 let total_files = chunks.len();
142 let concurrency = self.config.concurrency.max(1);
143
144 let prepared: Vec<(String, Vec<u8>)> = chunks
146 .iter()
147 .map(|chunk| {
148 let body = Self::serialize_jsonl(chunk)?;
149 let key = self.generate_key();
150 Ok((key, body))
151 })
152 .collect::<Result<Vec<_>, FaucetError>>()?;
153
154 stream::iter(prepared)
155 .map(|(key, body)| async move { self.upload_file(&key, body).await })
156 .buffer_unordered(concurrency)
157 .try_collect::<Vec<()>>()
158 .await?;
159
160 tracing::info!(
161 records = records.len(),
162 files = total_files,
163 "S3 batch write complete"
164 );
165 Ok(records.len())
166 }
167}
168
169#[cfg(test)]
170mod tests {
171 use super::*;
172 use crate::config::S3SinkConfig;
173 use serde_json::json;
174
175 fn test_sink(config: S3SinkConfig) -> S3Sink {
177 let sdk_config = aws_config::SdkConfig::builder()
178 .behavior_version(aws_config::BehaviorVersion::latest())
179 .build();
180 let client = Client::new(&sdk_config);
181 S3Sink { config, client }
182 }
183
184 #[test]
185 fn serialize_jsonl_produces_newline_delimited() {
186 let records = vec![
187 json!({"id": 1, "name": "Alice"}),
188 json!({"id": 2, "name": "Bob"}),
189 ];
190 let result = S3Sink::serialize_jsonl(&records).unwrap();
191 let text = String::from_utf8(result).unwrap();
192 let lines: Vec<&str> = text.trim().split('\n').collect();
193 assert_eq!(lines.len(), 2);
194
195 let first: Value = serde_json::from_str(lines[0]).unwrap();
196 assert_eq!(first["id"], 1);
197 }
198
199 #[test]
200 fn serialize_jsonl_empty() {
201 let result = S3Sink::serialize_jsonl(&[]).unwrap();
202 assert!(result.is_empty());
203 }
204
205 #[test]
206 fn generate_key_uses_prefix_and_extension() {
207 let sink = test_sink(
208 S3SinkConfig::new("bucket")
209 .prefix("data/")
210 .file_extension(".jsonl"),
211 );
212 let key = sink.generate_key();
213 assert!(key.starts_with("data/"));
214 assert!(key.ends_with(".jsonl"));
215 assert!(key.len() > "data/".len() + ".jsonl".len());
217 }
218
219 #[test]
220 fn generate_key_no_prefix() {
221 let sink = test_sink(S3SinkConfig::new("bucket"));
222 let key = sink.generate_key();
223 assert!(key.ends_with(".jsonl"));
224 assert!(!key.starts_with('/'));
226 }
227
228 #[tokio::test]
229 async fn new_rejects_out_of_range_batch_size() {
230 let mut config = S3SinkConfig::new("bucket");
231 config.batch_size = faucet_core::MAX_BATCH_SIZE + 1;
232 match S3Sink::new(config).await {
233 Err(faucet_core::FaucetError::Config(m)) => {
234 assert!(m.contains("batch_size"), "got: {m}")
235 }
236 _ => panic!("expected a batch_size Config error"),
237 }
238 }
239
240 #[cfg(feature = "compression")]
241 #[test]
242 fn compress_buf_used_for_gzip_extension() {
243 let cfg = S3SinkConfig::new("bucket").file_extension(".jsonl.gz");
246 let codec = cfg.compression.resolve(&cfg.file_extension);
247 assert_eq!(codec, faucet_core::Compression::Gzip);
248 let compressed = faucet_core::compression::compress_buf(b"hello\n", codec).unwrap();
249 assert_eq!(&compressed[..2], b"\x1f\x8b");
251 }
252}