Skip to main content

samkhya_core/
puffin.rs

1//! Iceberg Puffin sidecar format — reader and writer.
2//!
3//! Spec: <https://iceberg.apache.org/puffin-spec/>
4//!
5//! Layout:
6//! ```text
7//!  +-----------+
8//!  | Magic     |  4 bytes "PFA1"
9//!  +-----------+
10//!  | Blob 1    |  variable
11//!  +-----------+
12//!  | Blob 2    |  variable
13//!  +-----------+
14//!  | ...       |
15//!  +-----------+
16//!  | Footer    |  Magic + JSON payload + payload-len(LE u32)
17//!  |           |  + flags(LE u32) + Magic
18//!  +-----------+
19//! ```
20
21use std::collections::BTreeMap;
22use std::io::{Read, Seek, SeekFrom, Write};
23
24use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
25use serde::{Deserialize, Serialize};
26
27use crate::{Error, Result};
28
29const MAGIC: &[u8; 4] = b"PFA1";
30
31/// Maximum permitted size for a single blob payload, in bytes.
32///
33/// Caps untrusted `length` and decompressed-size values read from the
34/// JSON footer of a Puffin sidecar. A Bloom filter sized for 1B keys at
35/// 1% FPR fits comfortably under this limit (~1.2 GiB); legitimate
36/// HLL/CMS/histogram payloads are sub-MiB. Attacker-controlled sidecars
37/// declaring larger sizes are rejected before allocation, eliminating
38/// the trivial DoS surface where a small file claims a multi-EiB blob.
39const MAX_BLOB_LEN: u64 = 2 * 1024 * 1024 * 1024;
40
41/// Maximum permitted size for the JSON footer payload, in bytes.
42///
43/// The footer is a `serde_json` blob enumerating every per-blob metadata
44/// record. 16 MiB accommodates pathologically large numbers of blobs
45/// while still letting the reader reject attacker-controlled `payload_len`
46/// values that would force a multi-GiB allocation for the footer alone.
47const MAX_FOOTER_LEN: u64 = 16 * 1024 * 1024;
48
49/// Maximum number of blob entries enumerated in a single footer.
50///
51/// Caps the `blobs` array length after JSON parse so an attacker cannot
52/// stage a footer with millions of trivially-sized blob records that
53/// each individually pass the per-blob length cap. Legitimate samkhya
54/// sidecars carry one blob per stat per column — even a wide table with
55/// one HLL + one Bloom + one CMS + two histograms per column tops out
56/// well under this limit.
57const MAX_BLOB_COUNT: usize = 65_536;
58
59/// Compression codec applied to a blob payload before it is written to the file.
60///
61/// The codec name is recorded in the blob's `compression-codec` metadata so
62/// readers can decompress lazily without scanning unrelated blobs.
63#[derive(Debug, Clone, Copy, PartialEq, Eq)]
64pub enum CompressionCodec {
65    /// Payload is stored verbatim.
66    None,
67    /// Payload is compressed with zstd (default level).
68    Zstd,
69}
70
71impl CompressionCodec {
72    /// Codec name as serialized in blob metadata (Puffin spec convention).
73    ///
74    /// # Examples
75    ///
76    /// ```
77    /// use samkhya_core::puffin::CompressionCodec;
78    ///
79    /// assert_eq!(CompressionCodec::None.as_str(), "none");
80    /// assert_eq!(CompressionCodec::Zstd.as_str(), "zstd");
81    /// ```
82    pub fn as_str(&self) -> &'static str {
83        match self {
84            CompressionCodec::None => "none",
85            CompressionCodec::Zstd => "zstd",
86        }
87    }
88
89    fn from_meta(meta: Option<&str>) -> Self {
90        match meta {
91            Some("zstd") => CompressionCodec::Zstd,
92            _ => CompressionCodec::None,
93        }
94    }
95}
96
97/// Footer payload (JSON-encoded inside the file).
98#[derive(Debug, Clone, Default, Serialize, Deserialize)]
99pub struct FooterPayload {
100    pub blobs: Vec<BlobMetadata>,
101    #[serde(default, skip_serializing_if = "BTreeMap::is_empty")]
102    pub properties: BTreeMap<String, String>,
103}
104
105/// Per-blob metadata stored in the footer.
106#[derive(Debug, Clone, Serialize, Deserialize)]
107pub struct BlobMetadata {
108    #[serde(rename = "type")]
109    pub kind: String,
110    pub fields: Vec<i32>,
111    #[serde(
112        rename = "snapshot-id",
113        default,
114        skip_serializing_if = "Option::is_none"
115    )]
116    pub snapshot_id: Option<i64>,
117    #[serde(
118        rename = "sequence-number",
119        default,
120        skip_serializing_if = "Option::is_none"
121    )]
122    pub sequence_number: Option<i64>,
123    pub offset: u64,
124    pub length: u64,
125    #[serde(
126        rename = "compression-codec",
127        default,
128        skip_serializing_if = "Option::is_none"
129    )]
130    pub compression_codec: Option<String>,
131    #[serde(default, skip_serializing_if = "BTreeMap::is_empty")]
132    pub properties: BTreeMap<String, String>,
133}
134
135/// A blob to be written to a Puffin file.
136///
137/// # Examples
138///
139/// ```
140/// use samkhya_core::puffin::Blob;
141///
142/// let payload = b"sketch bytes";
143/// let blob = Blob::new("samkhya.hll-v1", vec![7], payload);
144/// assert_eq!(blob.kind, "samkhya.hll-v1");
145/// assert_eq!(blob.fields, vec![7]);
146/// assert_eq!(blob.payload, payload);
147/// ```
148pub struct Blob<'a> {
149    pub kind: String,
150    pub fields: Vec<i32>,
151    pub payload: &'a [u8],
152    pub properties: BTreeMap<String, String>,
153}
154
155impl<'a> Blob<'a> {
156    /// Build a blob with empty properties. Add metadata afterwards by
157    /// inserting into `blob.properties`.
158    ///
159    /// # Examples
160    ///
161    /// ```
162    /// use samkhya_core::puffin::Blob;
163    ///
164    /// let blob = Blob::new("samkhya.bloom-v1", vec![1, 2], &[0u8, 1, 2, 3]);
165    /// assert!(blob.properties.is_empty());
166    /// ```
167    pub fn new(kind: impl Into<String>, fields: Vec<i32>, payload: &'a [u8]) -> Self {
168        Self {
169            kind: kind.into(),
170            fields,
171            payload,
172            properties: BTreeMap::new(),
173        }
174    }
175}
176
177/// Streaming writer for Puffin files.
178///
179/// Wraps any `Write + Seek` sink. The magic header is written lazily on the
180/// first blob (or finish); call [`PuffinWriter::finish`] to flush the footer
181/// and recover the inner writer.
182///
183/// # Examples
184///
185/// ```
186/// use std::io::Cursor;
187/// use samkhya_core::puffin::{Blob, PuffinReader, PuffinWriter};
188///
189/// let mut writer = PuffinWriter::new(Cursor::new(Vec::<u8>::new()));
190/// writer.add_blob(Blob::new("samkhya.test-v1", vec![0], b"hello")).unwrap();
191/// let cursor = writer.finish().unwrap();
192///
193/// // Round-trip: open the bytes back as a reader.
194/// let reader = PuffinReader::open(Cursor::new(cursor.into_inner())).unwrap();
195/// assert_eq!(reader.blobs().len(), 1);
196/// ```
197pub struct PuffinWriter<W: Write + Seek> {
198    inner: W,
199    blobs: Vec<BlobMetadata>,
200    pos: u64,
201    wrote_head: bool,
202}
203
204impl<W: Write + Seek> PuffinWriter<W> {
205    pub fn new(inner: W) -> Self {
206        Self {
207            inner,
208            blobs: Vec::new(),
209            pos: 0,
210            wrote_head: false,
211        }
212    }
213
214    fn ensure_head(&mut self) -> Result<()> {
215        if !self.wrote_head {
216            self.inner.write_all(MAGIC)?;
217            self.pos += MAGIC.len() as u64;
218            self.wrote_head = true;
219        }
220        Ok(())
221    }
222
223    /// Append a blob to the file.
224    ///
225    /// # Examples
226    ///
227    /// ```
228    /// use std::io::Cursor;
229    /// use samkhya_core::puffin::{Blob, PuffinWriter};
230    ///
231    /// let mut writer = PuffinWriter::new(Cursor::new(Vec::<u8>::new()));
232    /// writer.add_blob(Blob::new("samkhya.test-v1", vec![0], b"payload")).unwrap();
233    /// let _bytes = writer.finish().unwrap().into_inner();
234    /// ```
235    pub fn add_blob(&mut self, blob: Blob<'_>) -> Result<()> {
236        self.ensure_head()?;
237        let offset = self.pos;
238        self.inner.write_all(blob.payload)?;
239        let length = blob.payload.len() as u64;
240        self.pos += length;
241        self.blobs.push(BlobMetadata {
242            kind: blob.kind,
243            fields: blob.fields,
244            snapshot_id: None,
245            sequence_number: None,
246            offset,
247            length,
248            compression_codec: None,
249            properties: blob.properties,
250        });
251        Ok(())
252    }
253
254    /// Append a blob with payload compressed under `codec`.
255    ///
256    /// `CompressionCodec::None` is equivalent to [`Self::add_blob`].
257    /// `CompressionCodec::Zstd` requires the `zstd` feature; otherwise this
258    /// returns an [`Error::InvalidPuffin`] explaining the missing feature.
259    pub fn add_blob_compressed(&mut self, blob: Blob<'_>, codec: CompressionCodec) -> Result<()> {
260        match codec {
261            CompressionCodec::None => self.add_blob(blob),
262            CompressionCodec::Zstd => self.add_blob_zstd(blob),
263        }
264    }
265
266    #[cfg(feature = "zstd")]
267    fn add_blob_zstd(&mut self, blob: Blob<'_>) -> Result<()> {
268        self.ensure_head()?;
269        let compressed = zstd::encode_all(blob.payload, 0)
270            .map_err(|e| Error::InvalidPuffin(format!("zstd encode: {e}")))?;
271        let offset = self.pos;
272        self.inner.write_all(&compressed)?;
273        let length = compressed.len() as u64;
274        self.pos += length;
275        self.blobs.push(BlobMetadata {
276            kind: blob.kind,
277            fields: blob.fields,
278            snapshot_id: None,
279            sequence_number: None,
280            offset,
281            length,
282            compression_codec: Some(CompressionCodec::Zstd.as_str().to_string()),
283            properties: blob.properties,
284        });
285        Ok(())
286    }
287
288    #[cfg(not(feature = "zstd"))]
289    fn add_blob_zstd(&mut self, _blob: Blob<'_>) -> Result<()> {
290        Err(Error::InvalidPuffin(
291            "zstd codec requested but the `zstd` cargo feature is disabled".into(),
292        ))
293    }
294
295    /// Finalize the file: write the footer and return the inner writer.
296    pub fn finish(mut self) -> Result<W> {
297        self.ensure_head()?;
298        let footer = FooterPayload {
299            blobs: self.blobs,
300            properties: BTreeMap::new(),
301        };
302        let payload = serde_json::to_vec(&footer)
303            .map_err(|e| Error::InvalidPuffin(format!("footer JSON encode: {e}")))?;
304        let payload_len = u32::try_from(payload.len())
305            .map_err(|_| Error::InvalidPuffin("footer payload exceeds u32".into()))?;
306
307        self.inner.write_all(MAGIC)?;
308        self.inner.write_all(&payload)?;
309        self.inner.write_u32::<LittleEndian>(payload_len)?;
310        self.inner.write_u32::<LittleEndian>(0)?; // flags
311        self.inner.write_all(MAGIC)?;
312        Ok(self.inner)
313    }
314}
315
316/// Reader for Puffin files — parses the footer once, lazily loads blob payloads.
317///
318/// # Examples
319///
320/// ```
321/// use std::io::Cursor;
322/// use samkhya_core::puffin::{Blob, PuffinReader, PuffinWriter};
323///
324/// // Write a file in memory, then read it back.
325/// let mut w = PuffinWriter::new(Cursor::new(Vec::<u8>::new()));
326/// w.add_blob(Blob::new("samkhya.hll-v1", vec![1], b"sketch bytes")).unwrap();
327/// let bytes = w.finish().unwrap().into_inner();
328///
329/// let mut reader = PuffinReader::open(Cursor::new(bytes)).unwrap();
330/// let (idx, meta) = reader.find_blob("samkhya.hll-v1").unwrap();
331/// assert_eq!(meta.kind, "samkhya.hll-v1");
332/// assert_eq!(reader.read_blob(idx).unwrap(), b"sketch bytes");
333/// ```
334pub struct PuffinReader<R: Read + Seek> {
335    inner: R,
336    footer: FooterPayload,
337}
338
339impl<R: Read + Seek> PuffinReader<R> {
340    pub fn open(mut inner: R) -> Result<Self> {
341        let file_len = inner.seek(SeekFrom::End(0))?;
342        // 4 (head magic) + 4 (footer head magic) + 4 (payload len) + 4 (flags) + 4 (trailing magic)
343        if file_len < 20 {
344            return Err(Error::InvalidPuffin(format!(
345                "file too short: {file_len} bytes"
346            )));
347        }
348
349        // Trailing magic
350        inner.seek(SeekFrom::End(-4))?;
351        let mut trailing = [0u8; 4];
352        inner.read_exact(&mut trailing)?;
353        if &trailing != MAGIC {
354            return Err(Error::InvalidPuffin("trailing magic missing".into()));
355        }
356
357        // Flags (4 bytes before trailing magic) — read but ignored for now.
358        inner.seek(SeekFrom::End(-8))?;
359        let _flags = inner.read_u32::<LittleEndian>()?;
360
361        // Payload length (4 bytes before flags)
362        inner.seek(SeekFrom::End(-12))?;
363        let payload_len = inner.read_u32::<LittleEndian>()? as u64;
364
365        if payload_len > MAX_FOOTER_LEN {
366            return Err(Error::InvalidPuffin(format!(
367                "footer payload length {payload_len} exceeds MAX_FOOTER_LEN {MAX_FOOTER_LEN}"
368            )));
369        }
370
371        let footer_total = 16u64 + payload_len; // head magic + payload + len + flags + trailing magic
372        if file_len < footer_total {
373            return Err(Error::InvalidPuffin(
374                "payload length exceeds file size".into(),
375            ));
376        }
377
378        // Payload bytes
379        let payload_start = file_len - 12 - payload_len;
380        inner.seek(SeekFrom::Start(payload_start))?;
381        let mut payload = vec![0u8; payload_len as usize];
382        inner.read_exact(&mut payload)?;
383
384        // Footer head magic (4 bytes before payload)
385        inner.seek(SeekFrom::Start(payload_start - 4))?;
386        let mut footer_head = [0u8; 4];
387        inner.read_exact(&mut footer_head)?;
388        if &footer_head != MAGIC {
389            return Err(Error::InvalidPuffin("footer head magic missing".into()));
390        }
391
392        // File head magic
393        inner.seek(SeekFrom::Start(0))?;
394        let mut head = [0u8; 4];
395        inner.read_exact(&mut head)?;
396        if &head != MAGIC {
397            return Err(Error::InvalidPuffin("file head magic missing".into()));
398        }
399
400        let footer: FooterPayload = serde_json::from_slice(&payload)
401            .map_err(|e| Error::InvalidPuffin(format!("footer JSON decode: {e}")))?;
402
403        if footer.blobs.len() > MAX_BLOB_COUNT {
404            return Err(Error::InvalidPuffin(format!(
405                "footer enumerates {} blobs; exceeds MAX_BLOB_COUNT {MAX_BLOB_COUNT}",
406                footer.blobs.len()
407            )));
408        }
409
410        Ok(Self { inner, footer })
411    }
412
413    pub fn footer(&self) -> &FooterPayload {
414        &self.footer
415    }
416
417    pub fn blobs(&self) -> &[BlobMetadata] {
418        &self.footer.blobs
419    }
420
421    /// Read a blob's payload by index.
422    pub fn read_blob(&mut self, idx: usize) -> Result<Vec<u8>> {
423        let meta = self
424            .footer
425            .blobs
426            .get(idx)
427            .ok_or_else(|| Error::InvalidPuffin(format!("blob index {idx} out of range")))?;
428        if meta.length > MAX_BLOB_LEN {
429            return Err(Error::InvalidPuffin(format!(
430                "blob {idx} length {} exceeds MAX_BLOB_LEN {MAX_BLOB_LEN}",
431                meta.length
432            )));
433        }
434        self.inner.seek(SeekFrom::Start(meta.offset))?;
435        let mut buf = vec![0u8; meta.length as usize];
436        self.inner.read_exact(&mut buf)?;
437        Ok(buf)
438    }
439
440    /// Read a blob's payload by index and decompress it according to the
441    /// blob's `compression_codec` metadata.
442    ///
443    /// Falls back to the raw bytes when the codec is `None` / unset. Returns
444    /// an error if the recorded codec is unsupported in the current build
445    /// (e.g. `zstd` without the `zstd` feature).
446    pub fn read_blob_decompressed(&mut self, idx: usize) -> Result<Vec<u8>> {
447        let codec = {
448            let meta =
449                self.footer.blobs.get(idx).ok_or_else(|| {
450                    Error::InvalidPuffin(format!("blob index {idx} out of range"))
451                })?;
452            CompressionCodec::from_meta(meta.compression_codec.as_deref())
453        };
454        let raw = self.read_blob(idx)?;
455        match codec {
456            CompressionCodec::None => Ok(raw),
457            CompressionCodec::Zstd => decode_zstd(&raw),
458        }
459    }
460
461    /// Find the first blob whose `type` (kind) matches `kind`.
462    ///
463    /// # Examples
464    ///
465    /// ```
466    /// use std::io::Cursor;
467    /// use samkhya_core::puffin::{Blob, PuffinReader, PuffinWriter};
468    ///
469    /// let mut w = PuffinWriter::new(Cursor::new(Vec::<u8>::new()));
470    /// w.add_blob(Blob::new("samkhya.hll-v1", vec![0], b"x")).unwrap();
471    /// let bytes = w.finish().unwrap().into_inner();
472    /// let reader = PuffinReader::open(Cursor::new(bytes)).unwrap();
473    /// assert!(reader.find_blob("samkhya.hll-v1").is_some());
474    /// assert!(reader.find_blob("absent.kind").is_none());
475    /// ```
476    pub fn find_blob(&self, kind: &str) -> Option<(usize, &BlobMetadata)> {
477        self.footer
478            .blobs
479            .iter()
480            .enumerate()
481            .find(|(_, b)| b.kind == kind)
482    }
483}
484
485/// Bounded streaming decompression helper shared across compression
486/// codecs.
487///
488/// Reads from `reader` into a fresh `Vec<u8>` whose final length is at
489/// most `max_bytes`. The `+1` sentinel byte distinguishes "decompressed
490/// exactly max_bytes" (legitimate boundary) from "would have produced
491/// more than max_bytes" (attacker-controlled). `codec_label` appears in
492/// the error message so the caller can identify which codec tripped.
493///
494/// Centralising this here means a future LZ4 / Brotli / Snappy codec
495/// only needs to construct a `Read` and call this function; the cap is
496/// not re-derived per codec. See `documents/SECURITY-REVIEW-2026-05-17.md`
497/// item M6. Currently used only by the zstd path; broaden the cfg gate
498/// when a new codec lands.
499#[cfg(feature = "zstd")]
500fn decompress_bounded<R: std::io::Read>(
501    mut reader: R,
502    max_bytes: usize,
503    codec_label: &'static str,
504) -> Result<Vec<u8>> {
505    use std::io::Read as _;
506
507    let mut out = Vec::with_capacity(max_bytes.min(256 * 1024));
508    let n = (&mut reader)
509        .take(max_bytes as u64 + 1)
510        .read_to_end(&mut out)
511        .map_err(|e| Error::InvalidPuffin(format!("{codec_label} decode: {e}")))?;
512    let _ = n; // silence unused-binding when read_to_end's return is not used
513    if out.len() > max_bytes {
514        return Err(Error::InvalidPuffin(format!(
515            "{codec_label} decompressed size exceeds {max_bytes}"
516        )));
517    }
518    Ok(out)
519}
520
521#[cfg(feature = "zstd")]
522fn decode_zstd(raw: &[u8]) -> Result<Vec<u8>> {
523    // Stream into a bounded buffer so a high-ratio compression bomb
524    // cannot force a multi-GiB allocation. The cap and the sentinel
525    // logic live in `decompress_bounded`.
526    let decoder = zstd::stream::Decoder::new(raw)
527        .map_err(|e| Error::InvalidPuffin(format!("zstd decoder init: {e}")))?;
528    decompress_bounded(decoder, MAX_BLOB_LEN as usize, "zstd")
529}
530
531#[cfg(not(feature = "zstd"))]
532fn decode_zstd(_raw: &[u8]) -> Result<Vec<u8>> {
533    Err(Error::InvalidPuffin(
534        "blob is zstd-compressed but the `zstd` cargo feature is disabled".into(),
535    ))
536}
537
538#[cfg(test)]
539mod tests {
540    use std::io::Cursor;
541
542    use super::*;
543    use crate::sketches::{HllSketch, Sketch};
544
545    #[test]
546    fn round_trip_empty() {
547        let writer = PuffinWriter::new(Cursor::new(Vec::new()));
548        let cursor = writer.finish().unwrap();
549        let reader = PuffinReader::open(Cursor::new(cursor.into_inner())).unwrap();
550        assert!(reader.blobs().is_empty());
551    }
552
553    #[test]
554    fn round_trip_single_blob() {
555        let mut writer = PuffinWriter::new(Cursor::new(Vec::new()));
556        writer
557            .add_blob(Blob::new("samkhya.test-v1", vec![0], b"hello puffin"))
558            .unwrap();
559        let cursor = writer.finish().unwrap();
560
561        let mut reader = PuffinReader::open(Cursor::new(cursor.into_inner())).unwrap();
562        assert_eq!(reader.blobs().len(), 1);
563        assert_eq!(reader.blobs()[0].kind, "samkhya.test-v1");
564        assert_eq!(reader.read_blob(0).unwrap(), b"hello puffin");
565    }
566
567    #[test]
568    fn round_trip_multiple_blobs() {
569        let mut writer = PuffinWriter::new(Cursor::new(Vec::new()));
570        writer
571            .add_blob(Blob::new("samkhya.hll-v1", vec![1], &[1, 2, 3, 4, 5]))
572            .unwrap();
573        writer
574            .add_blob(Blob::new("samkhya.bloom-v1", vec![2], &[10, 20, 30]))
575            .unwrap();
576        let cursor = writer.finish().unwrap();
577
578        let mut reader = PuffinReader::open(Cursor::new(cursor.into_inner())).unwrap();
579        assert_eq!(reader.blobs().len(), 2);
580        assert_eq!(reader.read_blob(0).unwrap(), vec![1, 2, 3, 4, 5]);
581        assert_eq!(reader.read_blob(1).unwrap(), vec![10, 20, 30]);
582        assert_eq!(
583            reader.find_blob("samkhya.bloom-v1").map(|(i, _)| i),
584            Some(1)
585        );
586        assert_eq!(reader.find_blob("absent.kind").map(|(i, _)| i), None);
587    }
588
589    #[test]
590    fn round_trip_hll_sketch_through_puffin() {
591        let mut hll = HllSketch::new(12).unwrap();
592        for i in 0..1000u32 {
593            hll.add(&i.to_le_bytes());
594        }
595        let payload = hll.to_bytes().unwrap();
596
597        let mut writer = PuffinWriter::new(Cursor::new(Vec::new()));
598        writer
599            .add_blob(Blob::new(HllSketch::KIND, vec![7], &payload))
600            .unwrap();
601        let cursor = writer.finish().unwrap();
602
603        let mut reader = PuffinReader::open(Cursor::new(cursor.into_inner())).unwrap();
604        let (idx, meta) = reader.find_blob(HllSketch::KIND).unwrap();
605        assert_eq!(meta.fields, vec![7]);
606        let blob_bytes = reader.read_blob(idx).unwrap();
607        let hll2 = HllSketch::from_bytes(&blob_bytes).unwrap();
608        let err = (hll2.estimate() as f64 - 1000.0).abs() / 1000.0;
609        assert!(err < 0.1, "HLL estimate via Puffin off by {err}");
610    }
611
612    #[test]
613    fn rejects_too_short_file() {
614        let result = PuffinReader::open(Cursor::new(vec![0u8; 5]));
615        assert!(result.is_err());
616    }
617
618    #[test]
619    fn rejects_bad_trailing_magic() {
620        let mut buf = vec![0u8; 20];
621        buf[0..4].copy_from_slice(MAGIC);
622        // trailing 4 bytes are not magic
623        let result = PuffinReader::open(Cursor::new(buf));
624        assert!(result.is_err());
625    }
626
627    #[test]
628    fn read_blob_decompressed_no_codec_is_passthrough() {
629        let mut writer = PuffinWriter::new(Cursor::new(Vec::new()));
630        writer
631            .add_blob(Blob::new("samkhya.test-v1", vec![0], b"plain payload"))
632            .unwrap();
633        let cursor = writer.finish().unwrap();
634
635        let mut reader = PuffinReader::open(Cursor::new(cursor.into_inner())).unwrap();
636        assert_eq!(reader.read_blob_decompressed(0).unwrap(), b"plain payload");
637    }
638
639    #[test]
640    fn read_blob_rejects_oversized_length() {
641        // Build a minimal valid Puffin file then tamper with the JSON
642        // footer to claim a blob length of MAX_BLOB_LEN + 1. The reader
643        // must reject the read_blob call before allocating.
644        let mut writer = PuffinWriter::new(Cursor::new(Vec::new()));
645        writer
646            .add_blob(Blob::new("samkhya.test-v1", vec![0], b"x"))
647            .unwrap();
648        let bytes = writer.finish().unwrap().into_inner();
649
650        // Locate the JSON footer payload (between footer-head magic and
651        // the payload-len trailer). The header is the first 4 bytes; the
652        // last 12 bytes are payload-len(4) + flags(4) + trailing-magic(4).
653        let file_len = bytes.len();
654        let payload_len =
655            u32::from_le_bytes(bytes[file_len - 12..file_len - 8].try_into().unwrap()) as usize;
656        let payload_start = file_len - 12 - payload_len;
657        let payload_end = file_len - 12;
658        let json = std::str::from_utf8(&bytes[payload_start..payload_end])
659            .unwrap()
660            .to_string();
661        // Replace the legitimate length:1 with an oversized value. The
662        // payload was `b"x"` so the field reads `"length":1`.
663        let bogus_len = MAX_BLOB_LEN + 1;
664        let tampered_json = json.replacen("\"length\":1", &format!("\"length\":{bogus_len}"), 1);
665        assert_ne!(tampered_json, json, "tamper failed — payload schema drift?");
666
667        // Rebuild the file with the tampered footer. Adjust payload_len
668        // to match the new JSON byte length.
669        let mut tampered = Vec::with_capacity(file_len + 32);
670        tampered.extend_from_slice(&bytes[..payload_start]);
671        tampered.extend_from_slice(tampered_json.as_bytes());
672        let new_payload_len = tampered_json.len() as u32;
673        tampered.write_u32::<LittleEndian>(new_payload_len).unwrap();
674        tampered.write_u32::<LittleEndian>(0).unwrap();
675        tampered.extend_from_slice(MAGIC);
676
677        let mut reader = PuffinReader::open(Cursor::new(tampered)).unwrap();
678        let err = reader.read_blob(0).unwrap_err();
679        match err {
680            Error::InvalidPuffin(msg) => assert!(
681                msg.contains("MAX_BLOB_LEN"),
682                "expected MAX_BLOB_LEN rejection, got: {msg}"
683            ),
684            other => panic!("expected InvalidPuffin, got {other:?}"),
685        }
686    }
687
688    #[test]
689    fn open_rejects_oversized_footer_payload_len() {
690        // Build a minimal file then rewrite its payload-len field to a
691        // value that exceeds MAX_FOOTER_LEN. Reader must reject before
692        // allocating a footer-sized buffer.
693        let writer = PuffinWriter::new(Cursor::new(Vec::new()));
694        let bytes = writer.finish().unwrap().into_inner();
695        let mut tampered = bytes.clone();
696        let oversized = (MAX_FOOTER_LEN as u32).saturating_add(1);
697        let len_offset = tampered.len() - 12;
698        tampered[len_offset..len_offset + 4].copy_from_slice(&oversized.to_le_bytes());
699        match PuffinReader::open(Cursor::new(tampered)) {
700            Ok(_) => panic!("expected MAX_FOOTER_LEN rejection, got Ok"),
701            Err(Error::InvalidPuffin(msg)) => assert!(
702                msg.contains("MAX_FOOTER_LEN"),
703                "expected MAX_FOOTER_LEN rejection, got: {msg}"
704            ),
705            Err(other) => panic!("expected InvalidPuffin, got {other:?}"),
706        }
707    }
708
709    #[cfg(not(feature = "zstd"))]
710    #[test]
711    fn requesting_zstd_without_feature_errors() {
712        let mut writer = PuffinWriter::new(Cursor::new(Vec::new()));
713        let err = writer
714            .add_blob_compressed(
715                Blob::new("samkhya.test-v1", vec![0], b"x"),
716                CompressionCodec::Zstd,
717            )
718            .unwrap_err();
719        assert!(matches!(err, Error::InvalidPuffin(_)));
720    }
721}
722
723#[cfg(all(test, feature = "zstd"))]
724mod zstd_tests {
725    use std::io::Cursor;
726
727    use super::*;
728    use crate::sketches::{HllSketch, Sketch};
729
730    #[test]
731    fn round_trip_compressed_blob() {
732        // Payload with enough redundancy that zstd visibly shrinks it.
733        let payload = vec![0xABu8; 8192];
734
735        let mut writer = PuffinWriter::new(Cursor::new(Vec::new()));
736        writer
737            .add_blob_compressed(
738                Blob::new("samkhya.test-v1", vec![0], &payload),
739                CompressionCodec::Zstd,
740            )
741            .unwrap();
742        let cursor = writer.finish().unwrap();
743
744        let mut reader = PuffinReader::open(Cursor::new(cursor.into_inner())).unwrap();
745        let meta = &reader.blobs()[0];
746        assert_eq!(meta.compression_codec.as_deref(), Some("zstd"));
747        // Compressed length should be strictly smaller than the original.
748        assert!((meta.length as usize) < payload.len());
749
750        let decoded = reader.read_blob_decompressed(0).unwrap();
751        assert_eq!(decoded, payload);
752    }
753
754    #[test]
755    fn round_trip_compressed_hll_sketch() {
756        let mut hll = HllSketch::new(14).unwrap();
757        for i in 0..5_000u32 {
758            hll.add(&i.to_le_bytes());
759        }
760        let bytes = hll.to_bytes().unwrap();
761
762        let mut writer = PuffinWriter::new(Cursor::new(Vec::new()));
763        writer
764            .add_blob_compressed(
765                Blob::new(HllSketch::KIND, vec![1], &bytes),
766                CompressionCodec::Zstd,
767            )
768            .unwrap();
769        let cursor = writer.finish().unwrap();
770
771        let mut reader = PuffinReader::open(Cursor::new(cursor.into_inner())).unwrap();
772        let (idx, meta) = reader.find_blob(HllSketch::KIND).unwrap();
773        assert_eq!(meta.compression_codec.as_deref(), Some("zstd"));
774        let decoded = reader.read_blob_decompressed(idx).unwrap();
775        let hll2 = HllSketch::from_bytes(&decoded).unwrap();
776        let err = (hll2.estimate() as f64 - 5_000.0).abs() / 5_000.0;
777        assert!(err < 0.05, "HLL estimate off by {err}");
778    }
779
780    #[test]
781    fn none_codec_via_compressed_api_matches_plain() {
782        let mut writer = PuffinWriter::new(Cursor::new(Vec::new()));
783        writer
784            .add_blob_compressed(
785                Blob::new("samkhya.test-v1", vec![0], b"identity"),
786                CompressionCodec::None,
787            )
788            .unwrap();
789        let cursor = writer.finish().unwrap();
790        let mut reader = PuffinReader::open(Cursor::new(cursor.into_inner())).unwrap();
791        assert!(reader.blobs()[0].compression_codec.is_none());
792        assert_eq!(reader.read_blob_decompressed(0).unwrap(), b"identity");
793    }
794}