faucet_sink_jsonl/
sink.rs1use crate::config::JsonlSinkConfig;
4use async_trait::async_trait;
5use faucet_core::FaucetError;
6use serde_json::Value;
7use tokio::fs::OpenOptions;
8use tokio::io::AsyncWriteExt;
9use tokio::sync::Mutex;
10
11pub struct JsonlSink {
25 config: JsonlSinkConfig,
26 writer: Mutex<Option<std::pin::Pin<Box<dyn tokio::io::AsyncWrite + Send + Unpin>>>>,
28 opened_once: std::sync::atomic::AtomicBool,
35}
36
37impl JsonlSink {
38 pub fn new(config: JsonlSinkConfig) -> Self {
40 Self {
41 config,
42 writer: Mutex::new(None),
43 opened_once: std::sync::atomic::AtomicBool::new(false),
44 }
45 }
46
47 async fn ensure_open(
49 &self,
50 ) -> Result<
51 tokio::sync::MutexGuard<
52 '_,
53 Option<std::pin::Pin<Box<dyn tokio::io::AsyncWrite + Send + Unpin>>>,
54 >,
55 FaucetError,
56 > {
57 let mut guard = self.writer.lock().await;
58 if guard.is_none() {
59 let opened_before = self.opened_once.load(std::sync::atomic::Ordering::Relaxed);
60 let (append, truncate) = if opened_before {
64 (true, false)
65 } else {
66 (self.config.append, !self.config.append)
67 };
68 if let Some(parent) = self.config.path.parent()
69 && !parent.as_os_str().is_empty()
70 {
71 tokio::fs::create_dir_all(parent).await.map_err(|e| {
72 FaucetError::Sink(format!(
73 "failed to create parent directory '{}': {e}",
74 parent.display()
75 ))
76 })?;
77 }
78 let file = OpenOptions::new()
79 .create(true)
80 .write(true)
81 .append(append)
82 .truncate(truncate)
83 .open(&self.config.path)
84 .await
85 .map_err(|e| {
86 FaucetError::Sink(format!(
87 "failed to open {}: {e}",
88 self.config.path.display()
89 ))
90 })?;
91 self.opened_once
92 .store(true, std::sync::atomic::Ordering::Relaxed);
93 let buffered = tokio::io::BufWriter::new(file);
94 #[cfg(feature = "compression")]
95 let writer: std::pin::Pin<Box<dyn tokio::io::AsyncWrite + Send + Unpin>> = {
96 let path_str = self.config.path.to_string_lossy();
97 let codec = self.config.compression.resolve(&path_str);
98 faucet_core::compression::warn_mismatch(&path_str, codec);
99 faucet_core::compression::wrap_async_writer(buffered, codec)
100 };
101 #[cfg(not(feature = "compression"))]
102 let writer: std::pin::Pin<Box<dyn tokio::io::AsyncWrite + Send + Unpin>> =
103 Box::pin(buffered);
104 *guard = Some(writer);
105 }
106 Ok(guard)
107 }
108}
109
110#[async_trait]
111impl faucet_core::Sink for JsonlSink {
112 fn connector_name(&self) -> &'static str {
113 "jsonl"
114 }
115
116 fn config_schema(&self) -> serde_json::Value {
117 serde_json::to_value(faucet_core::schema_for!(JsonlSinkConfig))
118 .expect("schema serialization")
119 }
120
121 async fn write_batch(&self, records: &[Value]) -> Result<usize, FaucetError> {
122 if records.is_empty() {
123 return Ok(0);
124 }
125
126 let mut guard = self.ensure_open().await?;
127 let writer = guard.as_mut().expect("writer opened in ensure_open");
128
129 for record in records {
130 let line = if self.config.pretty {
131 serde_json::to_string_pretty(record)
132 } else {
133 serde_json::to_string(record)
134 }
135 .map_err(|e| FaucetError::Sink(format!("JSON serialization failed: {e}")))?;
136
137 writer
138 .write_all(line.as_bytes())
139 .await
140 .map_err(|e| FaucetError::Sink(format!("write failed: {e}")))?;
141 writer
142 .write_all(b"\n")
143 .await
144 .map_err(|e| FaucetError::Sink(format!("write failed: {e}")))?;
145 }
146
147 tracing::debug!(records = records.len(), "JSONL batch written");
148 Ok(records.len())
149 }
150
151 async fn flush(&self) -> Result<(), FaucetError> {
152 let mut guard = self.writer.lock().await;
153 if let Some(mut writer) = guard.take() {
154 use tokio::io::AsyncWriteExt;
155 writer
156 .shutdown()
157 .await
158 .map_err(|e| FaucetError::Sink(format!("flush failed: {e}")))?;
159 }
160 Ok(())
161 }
162
163 async fn check(
168 &self,
169 _ctx: &faucet_core::check::CheckContext,
170 ) -> Result<faucet_core::check::CheckReport, FaucetError> {
171 use faucet_core::check::CheckReport;
172 let start = std::time::Instant::now();
173 let probe = crate::probe::probe_parent_writable(&self.config.path, start).await;
174 Ok(CheckReport::single(probe))
175 }
176}
177
178#[cfg(test)]
179mod tests {
180 use super::*;
181 use faucet_core::Sink;
182 use serde_json::json;
183 use tempfile::NamedTempFile;
184
185 #[tokio::test]
186 async fn writes_jsonl_records() {
187 let tmp = NamedTempFile::new().unwrap();
188 let path = tmp.path().to_path_buf();
189 let sink = JsonlSink::new(JsonlSinkConfig::new(&path));
190
191 let records = vec![
192 json!({"id": 1, "name": "Alice"}),
193 json!({"id": 2, "name": "Bob"}),
194 ];
195 let count = sink.write_batch(&records).await.unwrap();
196 sink.flush().await.unwrap();
197
198 assert_eq!(count, 2);
199 let content = tokio::fs::read_to_string(&path).await.unwrap();
200 let lines: Vec<&str> = content.trim().split('\n').collect();
201 assert_eq!(lines.len(), 2);
202
203 let first: Value = serde_json::from_str(lines[0]).unwrap();
204 assert_eq!(first["id"], 1);
205 }
206
207 #[tokio::test]
208 async fn append_mode() {
209 let tmp = NamedTempFile::new().unwrap();
210 let path = tmp.path().to_path_buf();
211
212 let sink = JsonlSink::new(JsonlSinkConfig::new(&path));
214 sink.write_batch(&[json!({"id": 1})]).await.unwrap();
215 sink.flush().await.unwrap();
216 drop(sink);
217
218 let sink = JsonlSink::new(JsonlSinkConfig::new(&path).append(true));
220 sink.write_batch(&[json!({"id": 2})]).await.unwrap();
221 sink.flush().await.unwrap();
222
223 let content = tokio::fs::read_to_string(&path).await.unwrap();
224 let lines: Vec<&str> = content.trim().split('\n').collect();
225 assert_eq!(lines.len(), 2);
226 }
227
228 #[tokio::test]
229 async fn empty_batch_returns_zero() {
230 let tmp = NamedTempFile::new().unwrap();
231 let sink = JsonlSink::new(JsonlSinkConfig::new(tmp.path()));
232 let count = sink.write_batch(&[]).await.unwrap();
233 assert_eq!(count, 0);
234 }
235
236 #[tokio::test]
237 async fn flush_without_write_is_noop() {
238 let tmp = NamedTempFile::new().unwrap();
239 let sink = JsonlSink::new(JsonlSinkConfig::new(tmp.path()));
240 assert!(sink.flush().await.is_ok());
241 }
242
243 #[tokio::test]
244 async fn multiple_batches_accumulate() {
245 let tmp = NamedTempFile::new().unwrap();
246 let path = tmp.path().to_path_buf();
247 let sink = JsonlSink::new(JsonlSinkConfig::new(&path));
248
249 sink.write_batch(&[json!({"a": 1})]).await.unwrap();
250 sink.write_batch(&[json!({"b": 2}), json!({"c": 3})])
251 .await
252 .unwrap();
253 sink.flush().await.unwrap();
254
255 let content = tokio::fs::read_to_string(&path).await.unwrap();
256 let lines: Vec<&str> = content.trim().split('\n').collect();
257 assert_eq!(lines.len(), 3);
258 }
259
260 #[tokio::test]
261 async fn jsonl_sink_connector_name_is_jsonl() {
262 use faucet_core::Sink;
263 let tmp = NamedTempFile::new().unwrap();
264 let sink = JsonlSink::new(JsonlSinkConfig::new(tmp.path()));
265 assert_eq!(sink.connector_name(), "jsonl");
266 }
267
268 #[tokio::test]
269 async fn check_passes_when_parent_dir_exists() {
270 let dir = tempfile::tempdir().unwrap();
271 let path = dir.path().join("out.jsonl");
272 let sink = JsonlSink::new(JsonlSinkConfig::new(&path));
273 let report = sink
274 .check(&faucet_core::check::CheckContext::default())
275 .await
276 .unwrap();
277 assert_eq!(report.failed_count(), 0);
278 assert_eq!(report.probes[0].name, "io");
279 assert!(!path.exists(), "check() must not create the output file");
281 }
282
283 #[tokio::test]
284 async fn check_fails_when_parent_dir_missing() {
285 let dir = tempfile::tempdir().unwrap();
286 let path = dir.path().join("nope").join("out.jsonl");
287 let sink = JsonlSink::new(JsonlSinkConfig::new(&path));
288 let report = sink
289 .check(&faucet_core::check::CheckContext::default())
290 .await
291 .unwrap();
292 assert_eq!(report.failed_count(), 1);
293 assert_eq!(report.probes[0].name, "io");
294 }
295
296 #[tokio::test]
297 async fn creates_missing_parent_directories() {
298 let dir = tempfile::tempdir().unwrap();
299 let nested = dir.path().join("a").join("b").join("out.jsonl");
300 let sink = JsonlSink::new(JsonlSinkConfig::new(&nested));
301
302 let records = vec![json!({"id": 1})];
303 let count = sink.write_batch(&records).await.unwrap();
304 sink.flush().await.unwrap();
305
306 assert_eq!(count, 1);
307 assert!(nested.exists(), "output file must exist after write");
308 let content = tokio::fs::read_to_string(&nested).await.unwrap();
309 let first: Value = serde_json::from_str(content.trim()).unwrap();
310 assert_eq!(first["id"], 1);
311 }
312
313 #[cfg(feature = "compression")]
314 #[tokio::test]
315 async fn roundtrip_gzip() {
316 use faucet_core::CompressionConfig;
317 let tmp = NamedTempFile::with_suffix(".jsonl.gz").unwrap();
318 let path = tmp.path().to_path_buf();
319 let sink = JsonlSink::new(JsonlSinkConfig::new(&path).compression(CompressionConfig::Auto));
320
321 let records = vec![
322 json!({"id": 1, "name": "Alice"}),
323 json!({"id": 2, "name": "Bob"}),
324 ];
325 sink.write_batch(&records).await.unwrap();
326 sink.flush().await.unwrap();
327
328 let bytes = tokio::fs::read(&path).await.unwrap();
330 use tokio::io::AsyncReadExt;
331 let mut decoded = Vec::new();
332 let mut r = faucet_core::compression::wrap_async_reader(
333 tokio::io::BufReader::new(&bytes[..]),
334 faucet_core::Compression::Gzip,
335 );
336 r.read_to_end(&mut decoded).await.unwrap();
337 let text = String::from_utf8(decoded).unwrap();
338 let lines: Vec<&str> = text.trim().split('\n').collect();
339 assert_eq!(lines.len(), 2);
340 let first: serde_json::Value = serde_json::from_str(lines[0]).unwrap();
341 assert_eq!(first["id"], 1);
342 }
343
344 #[cfg(feature = "compression")]
345 #[tokio::test]
346 async fn roundtrip_zstd() {
347 use faucet_core::CompressionConfig;
348 let tmp = NamedTempFile::with_suffix(".jsonl.zst").unwrap();
349 let path = tmp.path().to_path_buf();
350 let sink = JsonlSink::new(JsonlSinkConfig::new(&path).compression(CompressionConfig::Auto));
351 sink.write_batch(&[json!({"x": 42})]).await.unwrap();
352 sink.flush().await.unwrap();
353
354 let bytes = tokio::fs::read(&path).await.unwrap();
355 use tokio::io::AsyncReadExt;
356 let mut decoded = Vec::new();
357 let mut r = faucet_core::compression::wrap_async_reader(
358 tokio::io::BufReader::new(&bytes[..]),
359 faucet_core::Compression::Zstd,
360 );
361 r.read_to_end(&mut decoded).await.unwrap();
362 let text = String::from_utf8(decoded).unwrap();
363 let v: serde_json::Value = serde_json::from_str(text.trim()).unwrap();
364 assert_eq!(v["x"], 42);
365 }
366
367 #[tokio::test]
368 async fn write_flush_write_does_not_truncate() {
369 let tmp = NamedTempFile::new().unwrap();
374 let path = tmp.path().to_path_buf();
375 let sink = JsonlSink::new(JsonlSinkConfig::new(&path));
376
377 sink.write_batch(&[json!({"first": 1})]).await.unwrap();
378 sink.flush().await.unwrap();
379 sink.write_batch(&[json!({"second": 2})]).await.unwrap();
380 sink.flush().await.unwrap();
381
382 let content = tokio::fs::read_to_string(&path).await.unwrap();
383 let lines: Vec<&str> = content.trim().split('\n').collect();
384 assert_eq!(
385 lines.len(),
386 2,
387 "both batches must survive the mid-stream flush"
388 );
389 let first: serde_json::Value = serde_json::from_str(lines[0]).unwrap();
390 assert_eq!(first["first"], 1);
391 let second: serde_json::Value = serde_json::from_str(lines[1]).unwrap();
392 assert_eq!(second["second"], 2);
393 }
394
395 #[cfg(feature = "compression")]
396 #[tokio::test]
397 async fn write_flush_write_produces_multi_member_gzip() {
398 use faucet_core::CompressionConfig;
402 let tmp = NamedTempFile::with_suffix(".jsonl.gz").unwrap();
403 let path = tmp.path().to_path_buf();
404 let sink = JsonlSink::new(JsonlSinkConfig::new(&path).compression(CompressionConfig::Auto));
405 sink.write_batch(&[json!({"first": 1})]).await.unwrap();
406 sink.flush().await.unwrap();
407 sink.write_batch(&[json!({"second": 2})]).await.unwrap();
408 sink.flush().await.unwrap();
409
410 let bytes = tokio::fs::read(&path).await.unwrap();
411 use tokio::io::AsyncReadExt;
412 let mut decoded = Vec::new();
413 let mut r = faucet_core::compression::wrap_async_reader(
414 tokio::io::BufReader::new(&bytes[..]),
415 faucet_core::Compression::Gzip,
416 );
417 r.read_to_end(&mut decoded).await.unwrap();
418 let text = String::from_utf8(decoded).unwrap();
419 let lines: Vec<&str> = text.trim().split('\n').collect();
420 assert_eq!(lines.len(), 2);
421 }
422}