Skip to main content

faucet_sink_jsonl/
sink.rs

1//! JSON Lines file sink.
2
3use crate::config::JsonlSinkConfig;
4use async_trait::async_trait;
5use faucet_core::FaucetError;
6use serde_json::Value;
7use tokio::fs::OpenOptions;
8use tokio::io::AsyncWriteExt;
9use tokio::sync::Mutex;
10
11/// A sink that writes JSON records to a file in JSON Lines format.
12///
13/// Each record is written as a single line of JSON followed by a newline.
14/// The file is opened lazily on the first `write_batch` call.
15///
16/// With the `compression` feature, the writer transparently wraps the file
17/// with a gzip / zstd encoder based on the `compression` config field.
18/// [`Sink::flush`](faucet_core::Sink::flush) finalises the encoder (writes the trailer) and clears the
19/// writer slot — a subsequent `write_batch` reopens the file in append mode
20/// (independent of `config.append`) and starts a fresh encoder, producing a
21/// multi-member compressed file that decoders read back correctly. This makes
22/// the per-page `flush` the pipeline emits for bookmarked pages safe for CDC
23/// sources — every transaction appends rather than truncates.
24pub struct JsonlSink {
25    config: JsonlSinkConfig,
26    /// Mutex-protected writer for thread-safe concurrent writes.
27    writer: Mutex<Option<std::pin::Pin<Box<dyn tokio::io::AsyncWrite + Send + Unpin>>>>,
28    /// Tracks whether `ensure_open` has opened the file at least once.
29    /// On re-opens (after `flush()` clears the writer), we always use
30    /// append mode regardless of `config.append` so the new gzip / zstd
31    /// member appends instead of truncating the file. Without this, the
32    /// pipeline's per-bookmark flush would silently lose data when
33    /// `config.append = false` (the default).
34    opened_once: std::sync::atomic::AtomicBool,
35}
36
37impl JsonlSink {
38    /// Create a new JSON Lines sink. The file is opened on first write.
39    pub fn new(config: JsonlSinkConfig) -> Self {
40        Self {
41            config,
42            writer: Mutex::new(None),
43            opened_once: std::sync::atomic::AtomicBool::new(false),
44        }
45    }
46
47    /// Ensure the file is open and return a mutable reference to the writer.
48    async fn ensure_open(
49        &self,
50    ) -> Result<
51        tokio::sync::MutexGuard<
52            '_,
53            Option<std::pin::Pin<Box<dyn tokio::io::AsyncWrite + Send + Unpin>>>,
54        >,
55        FaucetError,
56    > {
57        let mut guard = self.writer.lock().await;
58        if guard.is_none() {
59            let opened_before = self.opened_once.load(std::sync::atomic::Ordering::Relaxed);
60            // First open obeys `config.append`. Re-opens (after flush()
61            // cleared the writer) always append, so flush-then-write
62            // sequences do not truncate previously-written data.
63            let (append, truncate) = if opened_before {
64                (true, false)
65            } else {
66                (self.config.append, !self.config.append)
67            };
68            if let Some(parent) = self.config.path.parent()
69                && !parent.as_os_str().is_empty()
70            {
71                tokio::fs::create_dir_all(parent).await.map_err(|e| {
72                    FaucetError::Sink(format!(
73                        "failed to create parent directory '{}': {e}",
74                        parent.display()
75                    ))
76                })?;
77            }
78            let file = OpenOptions::new()
79                .create(true)
80                .write(true)
81                .append(append)
82                .truncate(truncate)
83                .open(&self.config.path)
84                .await
85                .map_err(|e| {
86                    FaucetError::Sink(format!(
87                        "failed to open {}: {e}",
88                        self.config.path.display()
89                    ))
90                })?;
91            self.opened_once
92                .store(true, std::sync::atomic::Ordering::Relaxed);
93            let buffered = tokio::io::BufWriter::new(file);
94            #[cfg(feature = "compression")]
95            let writer: std::pin::Pin<Box<dyn tokio::io::AsyncWrite + Send + Unpin>> = {
96                let path_str = self.config.path.to_string_lossy();
97                let codec = self.config.compression.resolve(&path_str);
98                faucet_core::compression::warn_mismatch(&path_str, codec);
99                faucet_core::compression::wrap_async_writer(buffered, codec)
100            };
101            #[cfg(not(feature = "compression"))]
102            let writer: std::pin::Pin<Box<dyn tokio::io::AsyncWrite + Send + Unpin>> =
103                Box::pin(buffered);
104            *guard = Some(writer);
105        }
106        Ok(guard)
107    }
108}
109
110#[async_trait]
111impl faucet_core::Sink for JsonlSink {
112    fn connector_name(&self) -> &'static str {
113        "jsonl"
114    }
115
116    fn config_schema(&self) -> serde_json::Value {
117        serde_json::to_value(faucet_core::schema_for!(JsonlSinkConfig))
118            .expect("schema serialization")
119    }
120
121    async fn write_batch(&self, records: &[Value]) -> Result<usize, FaucetError> {
122        if records.is_empty() {
123            return Ok(0);
124        }
125
126        let mut guard = self.ensure_open().await?;
127        let writer = guard.as_mut().expect("writer opened in ensure_open");
128
129        for record in records {
130            let line = if self.config.pretty {
131                serde_json::to_string_pretty(record)
132            } else {
133                serde_json::to_string(record)
134            }
135            .map_err(|e| FaucetError::Sink(format!("JSON serialization failed: {e}")))?;
136
137            writer
138                .write_all(line.as_bytes())
139                .await
140                .map_err(|e| FaucetError::Sink(format!("write failed: {e}")))?;
141            writer
142                .write_all(b"\n")
143                .await
144                .map_err(|e| FaucetError::Sink(format!("write failed: {e}")))?;
145        }
146
147        tracing::debug!(records = records.len(), "JSONL batch written");
148        Ok(records.len())
149    }
150
151    async fn flush(&self) -> Result<(), FaucetError> {
152        let mut guard = self.writer.lock().await;
153        if let Some(mut writer) = guard.take() {
154            use tokio::io::AsyncWriteExt;
155            writer
156                .shutdown()
157                .await
158                .map_err(|e| FaucetError::Sink(format!("flush failed: {e}")))?;
159        }
160        Ok(())
161    }
162
163    /// Preflight probe for `faucet doctor`. Verifies the configured output
164    /// path's parent directory exists and is writable by creating, then
165    /// immediately removing, a uniquely-named temp file there. Never touches
166    /// the user's actual output file, so it is fully idempotent.
167    async fn check(
168        &self,
169        _ctx: &faucet_core::check::CheckContext,
170    ) -> Result<faucet_core::check::CheckReport, FaucetError> {
171        use faucet_core::check::CheckReport;
172        let start = std::time::Instant::now();
173        let probe = crate::probe::probe_parent_writable(&self.config.path, start).await;
174        Ok(CheckReport::single(probe))
175    }
176}
177
178#[cfg(test)]
179mod tests {
180    use super::*;
181    use faucet_core::Sink;
182    use serde_json::json;
183    use tempfile::NamedTempFile;
184
185    #[tokio::test]
186    async fn writes_jsonl_records() {
187        let tmp = NamedTempFile::new().unwrap();
188        let path = tmp.path().to_path_buf();
189        let sink = JsonlSink::new(JsonlSinkConfig::new(&path));
190
191        let records = vec![
192            json!({"id": 1, "name": "Alice"}),
193            json!({"id": 2, "name": "Bob"}),
194        ];
195        let count = sink.write_batch(&records).await.unwrap();
196        sink.flush().await.unwrap();
197
198        assert_eq!(count, 2);
199        let content = tokio::fs::read_to_string(&path).await.unwrap();
200        let lines: Vec<&str> = content.trim().split('\n').collect();
201        assert_eq!(lines.len(), 2);
202
203        let first: Value = serde_json::from_str(lines[0]).unwrap();
204        assert_eq!(first["id"], 1);
205    }
206
207    #[tokio::test]
208    async fn append_mode() {
209        let tmp = NamedTempFile::new().unwrap();
210        let path = tmp.path().to_path_buf();
211
212        // Write first batch.
213        let sink = JsonlSink::new(JsonlSinkConfig::new(&path));
214        sink.write_batch(&[json!({"id": 1})]).await.unwrap();
215        sink.flush().await.unwrap();
216        drop(sink);
217
218        // Write second batch in append mode.
219        let sink = JsonlSink::new(JsonlSinkConfig::new(&path).append(true));
220        sink.write_batch(&[json!({"id": 2})]).await.unwrap();
221        sink.flush().await.unwrap();
222
223        let content = tokio::fs::read_to_string(&path).await.unwrap();
224        let lines: Vec<&str> = content.trim().split('\n').collect();
225        assert_eq!(lines.len(), 2);
226    }
227
228    #[tokio::test]
229    async fn empty_batch_returns_zero() {
230        let tmp = NamedTempFile::new().unwrap();
231        let sink = JsonlSink::new(JsonlSinkConfig::new(tmp.path()));
232        let count = sink.write_batch(&[]).await.unwrap();
233        assert_eq!(count, 0);
234    }
235
236    #[tokio::test]
237    async fn flush_without_write_is_noop() {
238        let tmp = NamedTempFile::new().unwrap();
239        let sink = JsonlSink::new(JsonlSinkConfig::new(tmp.path()));
240        assert!(sink.flush().await.is_ok());
241    }
242
243    #[tokio::test]
244    async fn multiple_batches_accumulate() {
245        let tmp = NamedTempFile::new().unwrap();
246        let path = tmp.path().to_path_buf();
247        let sink = JsonlSink::new(JsonlSinkConfig::new(&path));
248
249        sink.write_batch(&[json!({"a": 1})]).await.unwrap();
250        sink.write_batch(&[json!({"b": 2}), json!({"c": 3})])
251            .await
252            .unwrap();
253        sink.flush().await.unwrap();
254
255        let content = tokio::fs::read_to_string(&path).await.unwrap();
256        let lines: Vec<&str> = content.trim().split('\n').collect();
257        assert_eq!(lines.len(), 3);
258    }
259
260    #[tokio::test]
261    async fn jsonl_sink_connector_name_is_jsonl() {
262        use faucet_core::Sink;
263        let tmp = NamedTempFile::new().unwrap();
264        let sink = JsonlSink::new(JsonlSinkConfig::new(tmp.path()));
265        assert_eq!(sink.connector_name(), "jsonl");
266    }
267
268    #[tokio::test]
269    async fn check_passes_when_parent_dir_exists() {
270        let dir = tempfile::tempdir().unwrap();
271        let path = dir.path().join("out.jsonl");
272        let sink = JsonlSink::new(JsonlSinkConfig::new(&path));
273        let report = sink
274            .check(&faucet_core::check::CheckContext::default())
275            .await
276            .unwrap();
277        assert_eq!(report.failed_count(), 0);
278        assert_eq!(report.probes[0].name, "io");
279        // The probe must not have created the user's output file.
280        assert!(!path.exists(), "check() must not create the output file");
281    }
282
283    #[tokio::test]
284    async fn check_fails_when_parent_dir_missing() {
285        let dir = tempfile::tempdir().unwrap();
286        let path = dir.path().join("nope").join("out.jsonl");
287        let sink = JsonlSink::new(JsonlSinkConfig::new(&path));
288        let report = sink
289            .check(&faucet_core::check::CheckContext::default())
290            .await
291            .unwrap();
292        assert_eq!(report.failed_count(), 1);
293        assert_eq!(report.probes[0].name, "io");
294    }
295
296    #[tokio::test]
297    async fn creates_missing_parent_directories() {
298        let dir = tempfile::tempdir().unwrap();
299        let nested = dir.path().join("a").join("b").join("out.jsonl");
300        let sink = JsonlSink::new(JsonlSinkConfig::new(&nested));
301
302        let records = vec![json!({"id": 1})];
303        let count = sink.write_batch(&records).await.unwrap();
304        sink.flush().await.unwrap();
305
306        assert_eq!(count, 1);
307        assert!(nested.exists(), "output file must exist after write");
308        let content = tokio::fs::read_to_string(&nested).await.unwrap();
309        let first: Value = serde_json::from_str(content.trim()).unwrap();
310        assert_eq!(first["id"], 1);
311    }
312
313    #[cfg(feature = "compression")]
314    #[tokio::test]
315    async fn roundtrip_gzip() {
316        use faucet_core::CompressionConfig;
317        let tmp = NamedTempFile::with_suffix(".jsonl.gz").unwrap();
318        let path = tmp.path().to_path_buf();
319        let sink = JsonlSink::new(JsonlSinkConfig::new(&path).compression(CompressionConfig::Auto));
320
321        let records = vec![
322            json!({"id": 1, "name": "Alice"}),
323            json!({"id": 2, "name": "Bob"}),
324        ];
325        sink.write_batch(&records).await.unwrap();
326        sink.flush().await.unwrap();
327
328        // Read raw bytes, decompress via faucet_core, parse JSONL.
329        let bytes = tokio::fs::read(&path).await.unwrap();
330        use tokio::io::AsyncReadExt;
331        let mut decoded = Vec::new();
332        let mut r = faucet_core::compression::wrap_async_reader(
333            tokio::io::BufReader::new(&bytes[..]),
334            faucet_core::Compression::Gzip,
335        );
336        r.read_to_end(&mut decoded).await.unwrap();
337        let text = String::from_utf8(decoded).unwrap();
338        let lines: Vec<&str> = text.trim().split('\n').collect();
339        assert_eq!(lines.len(), 2);
340        let first: serde_json::Value = serde_json::from_str(lines[0]).unwrap();
341        assert_eq!(first["id"], 1);
342    }
343
344    #[cfg(feature = "compression")]
345    #[tokio::test]
346    async fn roundtrip_zstd() {
347        use faucet_core::CompressionConfig;
348        let tmp = NamedTempFile::with_suffix(".jsonl.zst").unwrap();
349        let path = tmp.path().to_path_buf();
350        let sink = JsonlSink::new(JsonlSinkConfig::new(&path).compression(CompressionConfig::Auto));
351        sink.write_batch(&[json!({"x": 42})]).await.unwrap();
352        sink.flush().await.unwrap();
353
354        let bytes = tokio::fs::read(&path).await.unwrap();
355        use tokio::io::AsyncReadExt;
356        let mut decoded = Vec::new();
357        let mut r = faucet_core::compression::wrap_async_reader(
358            tokio::io::BufReader::new(&bytes[..]),
359            faucet_core::Compression::Zstd,
360        );
361        r.read_to_end(&mut decoded).await.unwrap();
362        let text = String::from_utf8(decoded).unwrap();
363        let v: serde_json::Value = serde_json::from_str(text.trim()).unwrap();
364        assert_eq!(v["x"], 42);
365    }
366
367    #[tokio::test]
368    async fn write_flush_write_does_not_truncate() {
369        // Regression: flush() clears the writer; the next write_batch
370        // must reopen in append mode regardless of config.append (which
371        // defaults to false). Without the opened_once guard, the second
372        // open would truncate and lose the first batch's records.
373        let tmp = NamedTempFile::new().unwrap();
374        let path = tmp.path().to_path_buf();
375        let sink = JsonlSink::new(JsonlSinkConfig::new(&path));
376
377        sink.write_batch(&[json!({"first": 1})]).await.unwrap();
378        sink.flush().await.unwrap();
379        sink.write_batch(&[json!({"second": 2})]).await.unwrap();
380        sink.flush().await.unwrap();
381
382        let content = tokio::fs::read_to_string(&path).await.unwrap();
383        let lines: Vec<&str> = content.trim().split('\n').collect();
384        assert_eq!(
385            lines.len(),
386            2,
387            "both batches must survive the mid-stream flush"
388        );
389        let first: serde_json::Value = serde_json::from_str(lines[0]).unwrap();
390        assert_eq!(first["first"], 1);
391        let second: serde_json::Value = serde_json::from_str(lines[1]).unwrap();
392        assert_eq!(second["second"], 2);
393    }
394
395    #[cfg(feature = "compression")]
396    #[tokio::test]
397    async fn write_flush_write_produces_multi_member_gzip() {
398        // With compression, flush() finalises one gzip member; the
399        // next write_batch starts a fresh member appended after it.
400        // The decoder reads both members back correctly.
401        use faucet_core::CompressionConfig;
402        let tmp = NamedTempFile::with_suffix(".jsonl.gz").unwrap();
403        let path = tmp.path().to_path_buf();
404        let sink = JsonlSink::new(JsonlSinkConfig::new(&path).compression(CompressionConfig::Auto));
405        sink.write_batch(&[json!({"first": 1})]).await.unwrap();
406        sink.flush().await.unwrap();
407        sink.write_batch(&[json!({"second": 2})]).await.unwrap();
408        sink.flush().await.unwrap();
409
410        let bytes = tokio::fs::read(&path).await.unwrap();
411        use tokio::io::AsyncReadExt;
412        let mut decoded = Vec::new();
413        let mut r = faucet_core::compression::wrap_async_reader(
414            tokio::io::BufReader::new(&bytes[..]),
415            faucet_core::Compression::Gzip,
416        );
417        r.read_to_end(&mut decoded).await.unwrap();
418        let text = String::from_utf8(decoded).unwrap();
419        let lines: Vec<&str> = text.trim().split('\n').collect();
420        assert_eq!(lines.len(), 2);
421    }
422}