faucet-core 1.0.1

Shared types, traits, and utilities for the faucet-stream ecosystem
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
//! Transparent gzip / zstd compression wrappers for file-shaped connectors.
//!
//! Behind the `compression` feature. Exposes:
//! - [`CompressionConfig`] — user-facing enum (`None`, `Gzip`, `Zstd`, `Auto`).
//! - [`Compression`] — internal post-resolution enum (no `Auto`).
//! - `wrap_async_reader` / `wrap_async_writer` — for `stream_pages` and async sinks.
//! - `wrap_sync_reader` / `wrap_sync_writer` — for `spawn_blocking` paths.
//! - [`compress_buf`] — one-shot in-memory compression for S3/GCS sink uploads.
//! - [`warn_mismatch`] — log-once helper when explicit codec disagrees with the
//!   filename extension.

use crate::FaucetError;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use std::pin::Pin;
use tokio::io::{AsyncBufRead, AsyncWrite};

/// User-facing compression config. Defaults to [`Auto`](Self::Auto).
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, JsonSchema, Default)]
#[serde(rename_all = "lowercase")]
pub enum CompressionConfig {
    None,
    Gzip,
    Zstd,
    /// Detect from filename suffix: `.gz` → Gzip, `.zst` → Zstd, anything else → None.
    #[default]
    Auto,
}

/// Internal post-resolution codec. No `Auto` variant — call [`CompressionConfig::resolve`].
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum Compression {
    None,
    Gzip,
    Zstd,
}

impl CompressionConfig {
    /// Resolve `Auto` against a filename. Non-`Auto` variants pass through.
    pub fn resolve(self, path: &str) -> Compression {
        match self {
            Self::None => Compression::None,
            Self::Gzip => Compression::Gzip,
            Self::Zstd => Compression::Zstd,
            Self::Auto => detect_from_path(path),
        }
    }
}

/// Codec detection by filename suffix. Case-insensitive. Looks at the final
/// extension only: `foo.csv.gz` → Gzip, `foo.gz.zst` → Zstd.
pub fn detect_from_path(path: &str) -> Compression {
    let lower = path.to_ascii_lowercase();
    if lower.ends_with(".gz") {
        Compression::Gzip
    } else if lower.ends_with(".zst") {
        Compression::Zstd
    } else {
        Compression::None
    }
}

/// Wrap an async buffered reader with a streaming decoder. `None` passes through.
pub fn wrap_async_reader<'a, R>(
    r: R,
    c: Compression,
) -> Pin<Box<dyn AsyncBufRead + Send + Unpin + 'a>>
where
    R: AsyncBufRead + Send + Unpin + 'a,
{
    match c {
        Compression::None => Box::pin(r),
        Compression::Gzip => {
            let mut dec = async_compression::tokio::bufread::GzipDecoder::new(r);
            dec.multiple_members(true);
            Box::pin(tokio::io::BufReader::new(dec))
        }
        // zstd handles concatenated frames natively, so no `multiple_members`-
        // equivalent flag is needed (the spec is part of zstd itself).
        Compression::Zstd => {
            let dec = async_compression::tokio::bufread::ZstdDecoder::new(r);
            Box::pin(tokio::io::BufReader::new(dec))
        }
    }
}

/// Wrap an async writer with a streaming encoder. `None` passes through.
/// Callers must call [`tokio::io::AsyncWriteExt::shutdown`] on the returned
/// writer to flush the trailer before the underlying writer is dropped.
pub fn wrap_async_writer<'a, W>(
    w: W,
    c: Compression,
) -> Pin<Box<dyn AsyncWrite + Send + Unpin + 'a>>
where
    W: AsyncWrite + Send + Unpin + 'a,
{
    match c {
        Compression::None => Box::pin(w),
        Compression::Gzip => Box::pin(async_compression::tokio::write::GzipEncoder::new(w)),
        Compression::Zstd => Box::pin(async_compression::tokio::write::ZstdEncoder::new(w)),
    }
}

/// Wrap a sync reader with a streaming decoder. `None` passes through.
pub fn wrap_sync_reader<'a, R>(r: R, c: Compression) -> Box<dyn std::io::Read + Send + 'a>
where
    R: std::io::Read + Send + 'a,
{
    match c {
        Compression::None => Box::new(r),
        Compression::Gzip => Box::new(flate2::read::MultiGzDecoder::new(r)),
        Compression::Zstd => Box::new(
            zstd::stream::read::Decoder::new(r)
                .expect("zstd decoder construction is infallible for any Read"),
        ),
    }
}

/// Wrap a sync writer with a streaming encoder. `None` passes through.
///
/// The returned writer finalises the encoder when dropped (gzip writes its
/// 8-byte trailer; zstd's `auto_finish` adapter writes the frame epilogue).
/// Because the concrete encoder type is erased behind `Box<dyn Write>`,
/// callers cannot invoke `flate2`'s or `zstd`'s `finish()` to capture the
/// trailer-write `io::Error`. Callers can `flush()` to drain the encoder's
/// internal buffer mid-stream, but trailer-write errors on drop are
/// negligibly rare and are silently swallowed.
///
/// Connectors using this wrapper should drop the box inside a
/// `spawn_blocking` task body so the trailer write does not block the
/// async runtime, and rely on the surrounding `write_all` / `flush` calls
/// to surface earlier I/O errors.
pub fn wrap_sync_writer<'a, W>(w: W, c: Compression) -> Box<dyn std::io::Write + Send + 'a>
where
    W: std::io::Write + Send + 'a,
{
    match c {
        Compression::None => Box::new(w),
        Compression::Gzip => Box::new(flate2::write::GzEncoder::new(
            w,
            flate2::Compression::default(),
        )),
        Compression::Zstd => Box::new(
            zstd::stream::write::Encoder::new(w, 0)
                .expect("zstd encoder construction is infallible")
                .auto_finish(),
        ),
    }
}

/// A sync compressing writer that **retains the concrete encoder** so its
/// finalisation error can be captured, unlike the `Box<dyn Write>` returned by
/// [`wrap_sync_writer`] (#78/#41).
///
/// Call [`finish`](Self::finish) when done: it writes the codec's trailer
/// (gzip's 8-byte CRC/length footer, zstd's frame epilogue) and returns the
/// inner writer, surfacing any I/O error instead of swallowing it on drop.
/// If the value is dropped without `finish`, the zstd variant does **not**
/// emit its epilogue — always `finish` for a valid stream.
pub enum SyncCompressWriter<W: std::io::Write> {
    Plain(W),
    Gzip(flate2::write::GzEncoder<W>),
    Zstd(zstd::stream::write::Encoder<'static, W>),
}

impl<W: std::io::Write> SyncCompressWriter<W> {
    /// Finalise the stream and return the inner writer, propagating any
    /// trailer-write error.
    pub fn finish(self) -> std::io::Result<W> {
        match self {
            SyncCompressWriter::Plain(w) => Ok(w),
            SyncCompressWriter::Gzip(e) => e.finish(),
            SyncCompressWriter::Zstd(e) => e.finish(),
        }
    }
}

impl<W: std::io::Write> std::io::Write for SyncCompressWriter<W> {
    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
        match self {
            SyncCompressWriter::Plain(w) => w.write(buf),
            SyncCompressWriter::Gzip(e) => e.write(buf),
            SyncCompressWriter::Zstd(e) => e.write(buf),
        }
    }

    fn flush(&mut self) -> std::io::Result<()> {
        match self {
            SyncCompressWriter::Plain(w) => w.flush(),
            SyncCompressWriter::Gzip(e) => e.flush(),
            SyncCompressWriter::Zstd(e) => e.flush(),
        }
    }
}

/// Wrap a sync writer in a [`SyncCompressWriter`] for the given codec. Prefer
/// this over [`wrap_sync_writer`] when you need to detect a finalisation
/// error: call [`SyncCompressWriter::finish`] before dropping.
pub fn sync_compress_writer<W: std::io::Write>(w: W, c: Compression) -> SyncCompressWriter<W> {
    match c {
        Compression::None => SyncCompressWriter::Plain(w),
        Compression::Gzip => SyncCompressWriter::Gzip(flate2::write::GzEncoder::new(
            w,
            flate2::Compression::default(),
        )),
        Compression::Zstd => SyncCompressWriter::Zstd(
            zstd::stream::write::Encoder::new(w, 0)
                .expect("zstd encoder construction is infallible"),
        ),
    }
}

/// One-shot in-memory compression. Used by S3 and GCS sinks that build a full
/// `Vec<u8>` body before upload.
pub fn compress_buf(data: &[u8], c: Compression) -> Result<Vec<u8>, FaucetError> {
    use std::io::Write;
    match c {
        Compression::None => Ok(data.to_vec()),
        Compression::Gzip => {
            let mut enc = flate2::write::GzEncoder::new(Vec::new(), flate2::Compression::default());
            enc.write_all(data)
                .map_err(|e| FaucetError::Sink(format!("gzip compress failed: {e}")))?;
            enc.finish()
                .map_err(|e| FaucetError::Sink(format!("gzip finalise failed: {e}")))
        }
        Compression::Zstd => zstd::stream::encode_all(data, 0)
            .map_err(|e| FaucetError::Sink(format!("zstd compress failed: {e}"))),
    }
}

/// Log a one-shot warning when the explicit codec disagrees with the
/// filename's detected codec. Deduplicates per `(path, declared)` pair across
/// the whole process so a million-object scan does not flood logs.
pub fn warn_mismatch(path: &str, declared: Compression) {
    use std::collections::HashSet;
    use std::sync::{Mutex, OnceLock};
    /// Hard cap on the dedup set so a run touching an unbounded number of
    /// *distinct* mismatched paths can't leak memory for the process lifetime
    /// (#146 R). Beyond this, mismatches re-warn rather than being tracked —
    /// hitting thousands of distinct mismatched paths is already pathological.
    const MAX_SEEN: usize = 4096;
    static SEEN: OnceLock<Mutex<HashSet<(String, Compression)>>> = OnceLock::new();
    let detected = detect_from_path(path);
    if detected == declared {
        return;
    }
    let key = (path.to_string(), declared);
    let should_warn = {
        let mut seen = SEEN
            .get_or_init(|| Mutex::new(HashSet::new()))
            .lock()
            .expect("compression mismatch log mutex poisoned");
        if seen.len() >= MAX_SEEN {
            true
        } else {
            seen.insert(key)
        }
    };
    if should_warn {
        tracing::warn!(
            path = %path,
            declared = ?declared,
            detected = ?detected,
            "compression codec mismatch — explicit config wins, filename extension ignored",
        );
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use tokio::io::{AsyncReadExt, AsyncWriteExt, BufReader};

    #[test]
    fn detect_extensions() {
        assert_eq!(detect_from_path("foo.jsonl"), Compression::None);
        assert_eq!(detect_from_path("foo.json.gz"), Compression::Gzip);
        assert_eq!(detect_from_path("foo.csv.zst"), Compression::Zstd);
        assert_eq!(detect_from_path("FOO.GZ"), Compression::Gzip);
        assert_eq!(detect_from_path("a.gz.zst"), Compression::Zstd);
        assert_eq!(detect_from_path(""), Compression::None);
    }

    #[test]
    fn resolve_auto_uses_path() {
        assert_eq!(CompressionConfig::Auto.resolve("foo.gz"), Compression::Gzip);
        assert_eq!(
            CompressionConfig::Auto.resolve("foo.zst"),
            Compression::Zstd
        );
        assert_eq!(CompressionConfig::Auto.resolve("foo"), Compression::None);
    }

    #[test]
    fn resolve_explicit_ignores_path() {
        assert_eq!(
            CompressionConfig::Gzip.resolve("foo.txt"),
            Compression::Gzip
        );
        assert_eq!(CompressionConfig::None.resolve("foo.gz"), Compression::None);
    }

    #[test]
    fn config_default_is_auto() {
        assert_eq!(CompressionConfig::default(), CompressionConfig::Auto);
    }

    #[test]
    fn config_serde_lowercase() {
        // All four variants round-trip as lowercase strings.
        for (variant, expected) in [
            (CompressionConfig::None, "\"none\""),
            (CompressionConfig::Gzip, "\"gzip\""),
            (CompressionConfig::Zstd, "\"zstd\""),
            (CompressionConfig::Auto, "\"auto\""),
        ] {
            let serialised = serde_json::to_string(&variant).unwrap();
            assert_eq!(serialised, expected);
            let deserialised: CompressionConfig = serde_json::from_str(expected).unwrap();
            assert_eq!(deserialised, variant);
        }
    }

    #[tokio::test]
    async fn async_roundtrip_gzip() {
        let original = b"hello, compressed world!\n".repeat(100);
        let mut buf = Vec::new();
        {
            let mut w = wrap_async_writer(&mut buf, Compression::Gzip);
            w.write_all(&original).await.unwrap();
            w.shutdown().await.unwrap();
        }
        let mut decompressed = Vec::new();
        let mut r = wrap_async_reader(BufReader::new(&buf[..]), Compression::Gzip);
        r.read_to_end(&mut decompressed).await.unwrap();
        assert_eq!(decompressed, original);
    }

    #[tokio::test]
    async fn async_roundtrip_zstd() {
        let original = b"zstd payload\n".repeat(50);
        let mut buf = Vec::new();
        {
            let mut w = wrap_async_writer(&mut buf, Compression::Zstd);
            w.write_all(&original).await.unwrap();
            w.shutdown().await.unwrap();
        }
        let mut decompressed = Vec::new();
        let mut r = wrap_async_reader(BufReader::new(&buf[..]), Compression::Zstd);
        r.read_to_end(&mut decompressed).await.unwrap();
        assert_eq!(decompressed, original);
    }

    #[tokio::test]
    async fn async_none_passthrough() {
        let original = b"plain text";
        let mut buf = Vec::new();
        {
            let mut w = wrap_async_writer(&mut buf, Compression::None);
            w.write_all(original).await.unwrap();
            w.shutdown().await.unwrap();
        }
        assert_eq!(&buf[..], original);
    }

    #[test]
    fn sync_roundtrip_gzip() {
        use std::io::{Read, Write};
        let original = b"sync gzip data".repeat(20);
        let mut buf = Vec::new();
        {
            let mut w = wrap_sync_writer(&mut buf, Compression::Gzip);
            w.write_all(&original).unwrap();
            // GzEncoder finalises on drop (writes the 8-byte trailer).
            // The explicit flush drains the deflate buffer; the drop at the
            // end of this scope writes the trailer so the reader below
            // sees a complete stream.
            w.flush().unwrap();
        }
        let mut r = wrap_sync_reader(&buf[..], Compression::Gzip);
        let mut decompressed = Vec::new();
        r.read_to_end(&mut decompressed).unwrap();
        assert_eq!(decompressed, original);
    }

    #[test]
    fn sync_roundtrip_zstd() {
        use std::io::{Read, Write};
        let original = b"sync zstd data".repeat(20);
        let mut buf = Vec::new();
        {
            let mut w = wrap_sync_writer(&mut buf, Compression::Zstd);
            w.write_all(&original).unwrap();
            w.flush().unwrap();
        }
        let mut r = wrap_sync_reader(&buf[..], Compression::Zstd);
        let mut decompressed = Vec::new();
        r.read_to_end(&mut decompressed).unwrap();
        assert_eq!(decompressed, original);
    }

    #[test]
    fn compress_buf_roundtrip_gzip() {
        use std::io::Read;
        let original = b"buffer compression".repeat(10);
        let compressed = compress_buf(&original, Compression::Gzip).unwrap();
        assert_ne!(compressed, original);
        let mut r = wrap_sync_reader(&compressed[..], Compression::Gzip);
        let mut decompressed = Vec::new();
        r.read_to_end(&mut decompressed).unwrap();
        assert_eq!(decompressed, original);
    }

    #[test]
    fn compress_buf_roundtrip_zstd() {
        use std::io::Read;
        let original = b"buffer zstd".repeat(10);
        let compressed = compress_buf(&original, Compression::Zstd).unwrap();
        assert_ne!(compressed, original);
        let mut r = wrap_sync_reader(&compressed[..], Compression::Zstd);
        let mut decompressed = Vec::new();
        r.read_to_end(&mut decompressed).unwrap();
        assert_eq!(decompressed, original);
    }

    #[test]
    fn compress_buf_none_is_clone() {
        let original = b"unchanged";
        let out = compress_buf(original, Compression::None).unwrap();
        assert_eq!(out, original);
    }

    #[tokio::test]
    async fn empty_compressed_stream_yields_zero_bytes() {
        // Produce a valid empty gzip stream.
        let mut buf = Vec::new();
        {
            let mut w = wrap_async_writer(&mut buf, Compression::Gzip);
            w.shutdown().await.unwrap();
        }
        // Decompressing it should yield zero bytes, not an error.
        let mut decompressed = Vec::new();
        let mut r = wrap_async_reader(BufReader::new(&buf[..]), Compression::Gzip);
        r.read_to_end(&mut decompressed).await.unwrap();
        assert!(decompressed.is_empty());
    }

    #[tokio::test]
    async fn truncated_gzip_stream_errors() {
        let original = b"this will be truncated mid-stream".repeat(50);
        let mut buf = Vec::new();
        {
            let mut w = wrap_async_writer(&mut buf, Compression::Gzip);
            w.write_all(&original).await.unwrap();
            w.shutdown().await.unwrap();
        }
        // Truncate to half.
        buf.truncate(buf.len() / 2);
        let mut decompressed = Vec::new();
        let mut r = wrap_async_reader(BufReader::new(&buf[..]), Compression::Gzip);
        let err = r.read_to_end(&mut decompressed).await.unwrap_err();
        assert_eq!(err.kind(), std::io::ErrorKind::UnexpectedEof);
    }

    #[test]
    fn warn_mismatch_dedups_per_path_and_codec() {
        // Calling twice with the same (path, declared) pair must produce a
        // single log line. We verify via the internal HashSet: after two
        // calls, the set contains exactly one entry. The static is shared
        // across the whole process, so we use a unique path to avoid
        // collisions with other tests.
        let unique_path = format!("warn_mismatch_dedup_fixture_{}.txt", line!());
        // First call: detected = None (no extension), declared = Gzip → mismatch, logs.
        warn_mismatch(&unique_path, Compression::Gzip);
        // Second call with identical args: must not log a second time.
        warn_mismatch(&unique_path, Compression::Gzip);
        // Third call with different declared: separate dedup key, logs once.
        warn_mismatch(&unique_path, Compression::Zstd);
        // Matching pair: no log (early-exit before touching the HashSet).
        warn_mismatch("file.gz", Compression::Gzip);
        // (Behaviour is verified through log absence in production. Here we
        // only assert the function runs to completion without panicking,
        // which exercises the OnceLock init, the Mutex acquisition, and the
        // HashSet insertion paths.)
    }
}