Skip to main content

faucet_sink_s3/
sink.rs

1//! S3 sink executor.
2
3use crate::config::S3SinkConfig;
4use async_trait::async_trait;
5use aws_sdk_s3::Client;
6use faucet_core::FaucetError;
7use futures::stream::{self, StreamExt, TryStreamExt};
8use serde_json::Value;
9
10/// A sink that writes JSON records to S3 as JSON Lines files.
11pub struct S3Sink {
12    config: S3SinkConfig,
13    client: Client,
14}
15
16impl S3Sink {
17    /// Create a new S3 sink from the given configuration.
18    ///
19    /// Builds the S3 client eagerly so it is reused across calls.
20    pub async fn new(config: S3SinkConfig) -> Result<Self, FaucetError> {
21        faucet_core::validate_batch_size(config.batch_size)?;
22        let client = Self::build_client(&config).await?;
23        Ok(Self { config, client })
24    }
25
26    /// Build an S3 client from the configuration.
27    async fn build_client(config: &S3SinkConfig) -> Result<Client, FaucetError> {
28        let mut config_loader = aws_config::defaults(aws_config::BehaviorVersion::latest());
29
30        if let Some(ref region) = config.region {
31            config_loader = config_loader.region(aws_config::Region::new(region.clone()));
32        }
33
34        if let Some(ref endpoint) = config.endpoint_url {
35            config_loader = config_loader.endpoint_url(endpoint);
36        }
37
38        let sdk_config = config_loader.load().await;
39        let client = Client::new(&sdk_config);
40        Ok(client)
41    }
42
43    /// Serialize a slice of records as JSON Lines bytes.
44    fn serialize_jsonl(records: &[Value]) -> Result<Vec<u8>, FaucetError> {
45        let mut buf: Vec<u8> = Vec::new();
46        for record in records {
47            let line = serde_json::to_vec(record)
48                .map_err(|e| FaucetError::Sink(format!("JSON serialization failed: {e}")))?;
49            buf.extend_from_slice(&line);
50            buf.push(b'\n');
51        }
52        Ok(buf)
53    }
54
55    /// Generate a unique S3 key for a file.
56    fn generate_key(&self) -> String {
57        let id = uuid::Uuid::new_v4();
58        format!("{}{}{}", self.config.prefix, id, self.config.file_extension)
59    }
60
61    /// Upload a single JSONL file to S3.
62    async fn upload_file(&self, key: &str, body: Vec<u8>) -> Result<(), FaucetError> {
63        #[cfg(feature = "compression")]
64        let body = {
65            let codec = self.config.compression.resolve(&self.config.file_extension);
66            faucet_core::compression::warn_mismatch(&self.config.file_extension, codec);
67            faucet_core::compression::compress_buf(&body, codec)?
68        };
69
70        self.client
71            .put_object()
72            .bucket(&self.config.bucket)
73            .key(key)
74            .body(body.into())
75            .content_type("application/x-ndjson")
76            .send()
77            .await
78            .map_err(|e| FaucetError::Sink(format!("S3 put object error for key '{key}': {e}")))?;
79
80        tracing::debug!(key = %key, "Uploaded S3 object");
81        Ok(())
82    }
83}
84
85#[async_trait]
86impl faucet_core::Sink for S3Sink {
87    fn config_schema(&self) -> serde_json::Value {
88        serde_json::to_value(faucet_core::schema_for!(S3SinkConfig)).expect("schema serialization")
89    }
90
91    /// Preflight probe: confirm the configured bucket is reachable and the
92    /// credentials work via a non-mutating `HeadBucket` call. Uploads nothing.
93    async fn check(
94        &self,
95        ctx: &faucet_core::check::CheckContext,
96    ) -> Result<faucet_core::check::CheckReport, FaucetError> {
97        use faucet_core::check::{CheckReport, Probe};
98
99        let started = std::time::Instant::now();
100        let probe = match tokio::time::timeout(
101            ctx.timeout,
102            self.client.head_bucket().bucket(&self.config.bucket).send(),
103        )
104        .await
105        {
106            Ok(Ok(_)) => Probe::pass("auth", started.elapsed()),
107            Ok(Err(e)) => Probe::fail_hint(
108                "auth",
109                started.elapsed(),
110                e.to_string(),
111                "check bucket name, credentials, and network",
112            ),
113            Err(_) => Probe::fail("network", started.elapsed(), "timed out"),
114        };
115        Ok(CheckReport::single(probe))
116    }
117
118    async fn write_batch(&self, records: &[Value]) -> Result<usize, FaucetError> {
119        if records.is_empty() {
120            return Ok(0);
121        }
122
123        // Effective per-object cap is the smaller of `batch_size` (when set)
124        // and `max_records_per_file` (when set). When neither caps the chunk,
125        // the whole slice is written as a single object.
126        let chunk_cap: Option<usize> =
127            match (self.config.batch_size, self.config.max_records_per_file) {
128                (0, None) => None,
129                (0, Some(0)) => None,
130                (0, Some(max)) => Some(max),
131                (bs, None) => Some(bs),
132                (bs, Some(0)) => Some(bs),
133                (bs, Some(max)) => Some(bs.min(max)),
134            };
135
136        let chunks: Vec<&[Value]> = match chunk_cap {
137            Some(cap) => records.chunks(cap).collect(),
138            None => vec![records],
139        };
140
141        let total_files = chunks.len();
142        let concurrency = self.config.concurrency.max(1);
143
144        // Pre-serialize each chunk and generate keys before uploading.
145        let prepared: Vec<(String, Vec<u8>)> = chunks
146            .iter()
147            .map(|chunk| {
148                let body = Self::serialize_jsonl(chunk)?;
149                let key = self.generate_key();
150                Ok((key, body))
151            })
152            .collect::<Result<Vec<_>, FaucetError>>()?;
153
154        stream::iter(prepared)
155            .map(|(key, body)| async move { self.upload_file(&key, body).await })
156            .buffer_unordered(concurrency)
157            .try_collect::<Vec<()>>()
158            .await?;
159
160        tracing::info!(
161            records = records.len(),
162            files = total_files,
163            "S3 batch write complete"
164        );
165        Ok(records.len())
166    }
167}
168
169#[cfg(test)]
170mod tests {
171    use super::*;
172    use crate::config::S3SinkConfig;
173    use serde_json::json;
174
175    /// Helper to build an S3Sink synchronously for tests that never make network calls.
176    fn test_sink(config: S3SinkConfig) -> S3Sink {
177        let sdk_config = aws_config::SdkConfig::builder()
178            .behavior_version(aws_config::BehaviorVersion::latest())
179            .build();
180        let client = Client::new(&sdk_config);
181        S3Sink { config, client }
182    }
183
184    #[test]
185    fn serialize_jsonl_produces_newline_delimited() {
186        let records = vec![
187            json!({"id": 1, "name": "Alice"}),
188            json!({"id": 2, "name": "Bob"}),
189        ];
190        let result = S3Sink::serialize_jsonl(&records).unwrap();
191        let text = String::from_utf8(result).unwrap();
192        let lines: Vec<&str> = text.trim().split('\n').collect();
193        assert_eq!(lines.len(), 2);
194
195        let first: Value = serde_json::from_str(lines[0]).unwrap();
196        assert_eq!(first["id"], 1);
197    }
198
199    #[test]
200    fn serialize_jsonl_empty() {
201        let result = S3Sink::serialize_jsonl(&[]).unwrap();
202        assert!(result.is_empty());
203    }
204
205    #[test]
206    fn generate_key_uses_prefix_and_extension() {
207        let sink = test_sink(
208            S3SinkConfig::new("bucket")
209                .prefix("data/")
210                .file_extension(".jsonl"),
211        );
212        let key = sink.generate_key();
213        assert!(key.starts_with("data/"));
214        assert!(key.ends_with(".jsonl"));
215        // UUID is 36 chars
216        assert!(key.len() > "data/".len() + ".jsonl".len());
217    }
218
219    #[test]
220    fn generate_key_no_prefix() {
221        let sink = test_sink(S3SinkConfig::new("bucket"));
222        let key = sink.generate_key();
223        assert!(key.ends_with(".jsonl"));
224        // No prefix means key starts with UUID
225        assert!(!key.starts_with('/'));
226    }
227
228    #[tokio::test]
229    async fn new_rejects_out_of_range_batch_size() {
230        let mut config = S3SinkConfig::new("bucket");
231        config.batch_size = faucet_core::MAX_BATCH_SIZE + 1;
232        match S3Sink::new(config).await {
233            Err(faucet_core::FaucetError::Config(m)) => {
234                assert!(m.contains("batch_size"), "got: {m}")
235            }
236            _ => panic!("expected a batch_size Config error"),
237        }
238    }
239
240    #[cfg(feature = "compression")]
241    #[test]
242    fn compress_buf_used_for_gzip_extension() {
243        // White-box: confirm the codec resolved from file_extension is Gzip
244        // and that compress_buf produces a gzip-magic-prefixed buffer.
245        let cfg = S3SinkConfig::new("bucket").file_extension(".jsonl.gz");
246        let codec = cfg.compression.resolve(&cfg.file_extension);
247        assert_eq!(codec, faucet_core::Compression::Gzip);
248        let compressed = faucet_core::compression::compress_buf(b"hello\n", codec).unwrap();
249        // gzip magic bytes.
250        assert_eq!(&compressed[..2], b"\x1f\x8b");
251    }
252}