1use std::collections::BTreeMap;
22use std::io::{Read, Seek, SeekFrom, Write};
23
24use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
25use serde::{Deserialize, Serialize};
26
27use crate::{Error, Result};
28
29const MAGIC: &[u8; 4] = b"PFA1";
30
31const MAX_BLOB_LEN: u64 = 2 * 1024 * 1024 * 1024;
40
41const MAX_FOOTER_LEN: u64 = 16 * 1024 * 1024;
48
49const MAX_BLOB_COUNT: usize = 65_536;
58
59#[derive(Debug, Clone, Copy, PartialEq, Eq)]
64pub enum CompressionCodec {
65 None,
67 Zstd,
69}
70
71impl CompressionCodec {
72 pub fn as_str(&self) -> &'static str {
83 match self {
84 CompressionCodec::None => "none",
85 CompressionCodec::Zstd => "zstd",
86 }
87 }
88
89 fn from_meta(meta: Option<&str>) -> Self {
90 match meta {
91 Some("zstd") => CompressionCodec::Zstd,
92 _ => CompressionCodec::None,
93 }
94 }
95}
96
97#[derive(Debug, Clone, Default, Serialize, Deserialize)]
99pub struct FooterPayload {
100 pub blobs: Vec<BlobMetadata>,
101 #[serde(default, skip_serializing_if = "BTreeMap::is_empty")]
102 pub properties: BTreeMap<String, String>,
103}
104
105#[derive(Debug, Clone, Serialize, Deserialize)]
107pub struct BlobMetadata {
108 #[serde(rename = "type")]
109 pub kind: String,
110 pub fields: Vec<i32>,
111 #[serde(
112 rename = "snapshot-id",
113 default,
114 skip_serializing_if = "Option::is_none"
115 )]
116 pub snapshot_id: Option<i64>,
117 #[serde(
118 rename = "sequence-number",
119 default,
120 skip_serializing_if = "Option::is_none"
121 )]
122 pub sequence_number: Option<i64>,
123 pub offset: u64,
124 pub length: u64,
125 #[serde(
126 rename = "compression-codec",
127 default,
128 skip_serializing_if = "Option::is_none"
129 )]
130 pub compression_codec: Option<String>,
131 #[serde(default, skip_serializing_if = "BTreeMap::is_empty")]
132 pub properties: BTreeMap<String, String>,
133}
134
135pub struct Blob<'a> {
149 pub kind: String,
150 pub fields: Vec<i32>,
151 pub payload: &'a [u8],
152 pub properties: BTreeMap<String, String>,
153}
154
155impl<'a> Blob<'a> {
156 pub fn new(kind: impl Into<String>, fields: Vec<i32>, payload: &'a [u8]) -> Self {
168 Self {
169 kind: kind.into(),
170 fields,
171 payload,
172 properties: BTreeMap::new(),
173 }
174 }
175}
176
177pub struct PuffinWriter<W: Write + Seek> {
198 inner: W,
199 blobs: Vec<BlobMetadata>,
200 pos: u64,
201 wrote_head: bool,
202}
203
204impl<W: Write + Seek> PuffinWriter<W> {
205 pub fn new(inner: W) -> Self {
206 Self {
207 inner,
208 blobs: Vec::new(),
209 pos: 0,
210 wrote_head: false,
211 }
212 }
213
214 fn ensure_head(&mut self) -> Result<()> {
215 if !self.wrote_head {
216 self.inner.write_all(MAGIC)?;
217 self.pos += MAGIC.len() as u64;
218 self.wrote_head = true;
219 }
220 Ok(())
221 }
222
223 pub fn add_blob(&mut self, blob: Blob<'_>) -> Result<()> {
236 self.ensure_head()?;
237 let offset = self.pos;
238 self.inner.write_all(blob.payload)?;
239 let length = blob.payload.len() as u64;
240 self.pos += length;
241 self.blobs.push(BlobMetadata {
242 kind: blob.kind,
243 fields: blob.fields,
244 snapshot_id: None,
245 sequence_number: None,
246 offset,
247 length,
248 compression_codec: None,
249 properties: blob.properties,
250 });
251 Ok(())
252 }
253
254 pub fn add_blob_compressed(&mut self, blob: Blob<'_>, codec: CompressionCodec) -> Result<()> {
260 match codec {
261 CompressionCodec::None => self.add_blob(blob),
262 CompressionCodec::Zstd => self.add_blob_zstd(blob),
263 }
264 }
265
266 #[cfg(feature = "zstd")]
267 fn add_blob_zstd(&mut self, blob: Blob<'_>) -> Result<()> {
268 self.ensure_head()?;
269 let compressed = zstd::encode_all(blob.payload, 0)
270 .map_err(|e| Error::InvalidPuffin(format!("zstd encode: {e}")))?;
271 let offset = self.pos;
272 self.inner.write_all(&compressed)?;
273 let length = compressed.len() as u64;
274 self.pos += length;
275 self.blobs.push(BlobMetadata {
276 kind: blob.kind,
277 fields: blob.fields,
278 snapshot_id: None,
279 sequence_number: None,
280 offset,
281 length,
282 compression_codec: Some(CompressionCodec::Zstd.as_str().to_string()),
283 properties: blob.properties,
284 });
285 Ok(())
286 }
287
288 #[cfg(not(feature = "zstd"))]
289 fn add_blob_zstd(&mut self, _blob: Blob<'_>) -> Result<()> {
290 Err(Error::InvalidPuffin(
291 "zstd codec requested but the `zstd` cargo feature is disabled".into(),
292 ))
293 }
294
295 pub fn finish(mut self) -> Result<W> {
297 self.ensure_head()?;
298 let footer = FooterPayload {
299 blobs: self.blobs,
300 properties: BTreeMap::new(),
301 };
302 let payload = serde_json::to_vec(&footer)
303 .map_err(|e| Error::InvalidPuffin(format!("footer JSON encode: {e}")))?;
304 let payload_len = u32::try_from(payload.len())
305 .map_err(|_| Error::InvalidPuffin("footer payload exceeds u32".into()))?;
306
307 self.inner.write_all(MAGIC)?;
308 self.inner.write_all(&payload)?;
309 self.inner.write_u32::<LittleEndian>(payload_len)?;
310 self.inner.write_u32::<LittleEndian>(0)?; self.inner.write_all(MAGIC)?;
312 Ok(self.inner)
313 }
314}
315
316pub struct PuffinReader<R: Read + Seek> {
335 inner: R,
336 footer: FooterPayload,
337}
338
339impl<R: Read + Seek> PuffinReader<R> {
340 pub fn open(mut inner: R) -> Result<Self> {
341 let file_len = inner.seek(SeekFrom::End(0))?;
342 if file_len < 20 {
344 return Err(Error::InvalidPuffin(format!(
345 "file too short: {file_len} bytes"
346 )));
347 }
348
349 inner.seek(SeekFrom::End(-4))?;
351 let mut trailing = [0u8; 4];
352 inner.read_exact(&mut trailing)?;
353 if &trailing != MAGIC {
354 return Err(Error::InvalidPuffin("trailing magic missing".into()));
355 }
356
357 inner.seek(SeekFrom::End(-8))?;
359 let _flags = inner.read_u32::<LittleEndian>()?;
360
361 inner.seek(SeekFrom::End(-12))?;
363 let payload_len = inner.read_u32::<LittleEndian>()? as u64;
364
365 if payload_len > MAX_FOOTER_LEN {
366 return Err(Error::InvalidPuffin(format!(
367 "footer payload length {payload_len} exceeds MAX_FOOTER_LEN {MAX_FOOTER_LEN}"
368 )));
369 }
370
371 let footer_total = 16u64 + payload_len; if file_len < footer_total {
373 return Err(Error::InvalidPuffin(
374 "payload length exceeds file size".into(),
375 ));
376 }
377
378 let payload_start = file_len - 12 - payload_len;
380 inner.seek(SeekFrom::Start(payload_start))?;
381 let mut payload = vec![0u8; payload_len as usize];
382 inner.read_exact(&mut payload)?;
383
384 inner.seek(SeekFrom::Start(payload_start - 4))?;
386 let mut footer_head = [0u8; 4];
387 inner.read_exact(&mut footer_head)?;
388 if &footer_head != MAGIC {
389 return Err(Error::InvalidPuffin("footer head magic missing".into()));
390 }
391
392 inner.seek(SeekFrom::Start(0))?;
394 let mut head = [0u8; 4];
395 inner.read_exact(&mut head)?;
396 if &head != MAGIC {
397 return Err(Error::InvalidPuffin("file head magic missing".into()));
398 }
399
400 let footer: FooterPayload = serde_json::from_slice(&payload)
401 .map_err(|e| Error::InvalidPuffin(format!("footer JSON decode: {e}")))?;
402
403 if footer.blobs.len() > MAX_BLOB_COUNT {
404 return Err(Error::InvalidPuffin(format!(
405 "footer enumerates {} blobs; exceeds MAX_BLOB_COUNT {MAX_BLOB_COUNT}",
406 footer.blobs.len()
407 )));
408 }
409
410 Ok(Self { inner, footer })
411 }
412
413 pub fn footer(&self) -> &FooterPayload {
414 &self.footer
415 }
416
417 pub fn blobs(&self) -> &[BlobMetadata] {
418 &self.footer.blobs
419 }
420
421 pub fn read_blob(&mut self, idx: usize) -> Result<Vec<u8>> {
423 let meta = self
424 .footer
425 .blobs
426 .get(idx)
427 .ok_or_else(|| Error::InvalidPuffin(format!("blob index {idx} out of range")))?;
428 if meta.length > MAX_BLOB_LEN {
429 return Err(Error::InvalidPuffin(format!(
430 "blob {idx} length {} exceeds MAX_BLOB_LEN {MAX_BLOB_LEN}",
431 meta.length
432 )));
433 }
434 self.inner.seek(SeekFrom::Start(meta.offset))?;
435 let mut buf = vec![0u8; meta.length as usize];
436 self.inner.read_exact(&mut buf)?;
437 Ok(buf)
438 }
439
440 pub fn read_blob_decompressed(&mut self, idx: usize) -> Result<Vec<u8>> {
447 let codec = {
448 let meta =
449 self.footer.blobs.get(idx).ok_or_else(|| {
450 Error::InvalidPuffin(format!("blob index {idx} out of range"))
451 })?;
452 CompressionCodec::from_meta(meta.compression_codec.as_deref())
453 };
454 let raw = self.read_blob(idx)?;
455 match codec {
456 CompressionCodec::None => Ok(raw),
457 CompressionCodec::Zstd => decode_zstd(&raw),
458 }
459 }
460
461 pub fn find_blob(&self, kind: &str) -> Option<(usize, &BlobMetadata)> {
477 self.footer
478 .blobs
479 .iter()
480 .enumerate()
481 .find(|(_, b)| b.kind == kind)
482 }
483}
484
485#[cfg(feature = "zstd")]
500fn decompress_bounded<R: std::io::Read>(
501 mut reader: R,
502 max_bytes: usize,
503 codec_label: &'static str,
504) -> Result<Vec<u8>> {
505 use std::io::Read as _;
506
507 let mut out = Vec::with_capacity(max_bytes.min(256 * 1024));
508 let n = (&mut reader)
509 .take(max_bytes as u64 + 1)
510 .read_to_end(&mut out)
511 .map_err(|e| Error::InvalidPuffin(format!("{codec_label} decode: {e}")))?;
512 let _ = n; if out.len() > max_bytes {
514 return Err(Error::InvalidPuffin(format!(
515 "{codec_label} decompressed size exceeds {max_bytes}"
516 )));
517 }
518 Ok(out)
519}
520
521#[cfg(feature = "zstd")]
522fn decode_zstd(raw: &[u8]) -> Result<Vec<u8>> {
523 let decoder = zstd::stream::Decoder::new(raw)
527 .map_err(|e| Error::InvalidPuffin(format!("zstd decoder init: {e}")))?;
528 decompress_bounded(decoder, MAX_BLOB_LEN as usize, "zstd")
529}
530
531#[cfg(not(feature = "zstd"))]
532fn decode_zstd(_raw: &[u8]) -> Result<Vec<u8>> {
533 Err(Error::InvalidPuffin(
534 "blob is zstd-compressed but the `zstd` cargo feature is disabled".into(),
535 ))
536}
537
538#[cfg(test)]
539mod tests {
540 use std::io::Cursor;
541
542 use super::*;
543 use crate::sketches::{HllSketch, Sketch};
544
545 #[test]
546 fn round_trip_empty() {
547 let writer = PuffinWriter::new(Cursor::new(Vec::new()));
548 let cursor = writer.finish().unwrap();
549 let reader = PuffinReader::open(Cursor::new(cursor.into_inner())).unwrap();
550 assert!(reader.blobs().is_empty());
551 }
552
553 #[test]
554 fn round_trip_single_blob() {
555 let mut writer = PuffinWriter::new(Cursor::new(Vec::new()));
556 writer
557 .add_blob(Blob::new("samkhya.test-v1", vec![0], b"hello puffin"))
558 .unwrap();
559 let cursor = writer.finish().unwrap();
560
561 let mut reader = PuffinReader::open(Cursor::new(cursor.into_inner())).unwrap();
562 assert_eq!(reader.blobs().len(), 1);
563 assert_eq!(reader.blobs()[0].kind, "samkhya.test-v1");
564 assert_eq!(reader.read_blob(0).unwrap(), b"hello puffin");
565 }
566
567 #[test]
568 fn round_trip_multiple_blobs() {
569 let mut writer = PuffinWriter::new(Cursor::new(Vec::new()));
570 writer
571 .add_blob(Blob::new("samkhya.hll-v1", vec![1], &[1, 2, 3, 4, 5]))
572 .unwrap();
573 writer
574 .add_blob(Blob::new("samkhya.bloom-v1", vec![2], &[10, 20, 30]))
575 .unwrap();
576 let cursor = writer.finish().unwrap();
577
578 let mut reader = PuffinReader::open(Cursor::new(cursor.into_inner())).unwrap();
579 assert_eq!(reader.blobs().len(), 2);
580 assert_eq!(reader.read_blob(0).unwrap(), vec![1, 2, 3, 4, 5]);
581 assert_eq!(reader.read_blob(1).unwrap(), vec![10, 20, 30]);
582 assert_eq!(
583 reader.find_blob("samkhya.bloom-v1").map(|(i, _)| i),
584 Some(1)
585 );
586 assert_eq!(reader.find_blob("absent.kind").map(|(i, _)| i), None);
587 }
588
589 #[test]
590 fn round_trip_hll_sketch_through_puffin() {
591 let mut hll = HllSketch::new(12).unwrap();
592 for i in 0..1000u32 {
593 hll.add(&i.to_le_bytes());
594 }
595 let payload = hll.to_bytes().unwrap();
596
597 let mut writer = PuffinWriter::new(Cursor::new(Vec::new()));
598 writer
599 .add_blob(Blob::new(HllSketch::KIND, vec![7], &payload))
600 .unwrap();
601 let cursor = writer.finish().unwrap();
602
603 let mut reader = PuffinReader::open(Cursor::new(cursor.into_inner())).unwrap();
604 let (idx, meta) = reader.find_blob(HllSketch::KIND).unwrap();
605 assert_eq!(meta.fields, vec![7]);
606 let blob_bytes = reader.read_blob(idx).unwrap();
607 let hll2 = HllSketch::from_bytes(&blob_bytes).unwrap();
608 let err = (hll2.estimate() as f64 - 1000.0).abs() / 1000.0;
609 assert!(err < 0.1, "HLL estimate via Puffin off by {err}");
610 }
611
612 #[test]
613 fn rejects_too_short_file() {
614 let result = PuffinReader::open(Cursor::new(vec![0u8; 5]));
615 assert!(result.is_err());
616 }
617
618 #[test]
619 fn rejects_bad_trailing_magic() {
620 let mut buf = vec![0u8; 20];
621 buf[0..4].copy_from_slice(MAGIC);
622 let result = PuffinReader::open(Cursor::new(buf));
624 assert!(result.is_err());
625 }
626
627 #[test]
628 fn read_blob_decompressed_no_codec_is_passthrough() {
629 let mut writer = PuffinWriter::new(Cursor::new(Vec::new()));
630 writer
631 .add_blob(Blob::new("samkhya.test-v1", vec![0], b"plain payload"))
632 .unwrap();
633 let cursor = writer.finish().unwrap();
634
635 let mut reader = PuffinReader::open(Cursor::new(cursor.into_inner())).unwrap();
636 assert_eq!(reader.read_blob_decompressed(0).unwrap(), b"plain payload");
637 }
638
639 #[test]
640 fn read_blob_rejects_oversized_length() {
641 let mut writer = PuffinWriter::new(Cursor::new(Vec::new()));
645 writer
646 .add_blob(Blob::new("samkhya.test-v1", vec![0], b"x"))
647 .unwrap();
648 let bytes = writer.finish().unwrap().into_inner();
649
650 let file_len = bytes.len();
654 let payload_len =
655 u32::from_le_bytes(bytes[file_len - 12..file_len - 8].try_into().unwrap()) as usize;
656 let payload_start = file_len - 12 - payload_len;
657 let payload_end = file_len - 12;
658 let json = std::str::from_utf8(&bytes[payload_start..payload_end])
659 .unwrap()
660 .to_string();
661 let bogus_len = MAX_BLOB_LEN + 1;
664 let tampered_json = json.replacen("\"length\":1", &format!("\"length\":{bogus_len}"), 1);
665 assert_ne!(tampered_json, json, "tamper failed — payload schema drift?");
666
667 let mut tampered = Vec::with_capacity(file_len + 32);
670 tampered.extend_from_slice(&bytes[..payload_start]);
671 tampered.extend_from_slice(tampered_json.as_bytes());
672 let new_payload_len = tampered_json.len() as u32;
673 tampered.write_u32::<LittleEndian>(new_payload_len).unwrap();
674 tampered.write_u32::<LittleEndian>(0).unwrap();
675 tampered.extend_from_slice(MAGIC);
676
677 let mut reader = PuffinReader::open(Cursor::new(tampered)).unwrap();
678 let err = reader.read_blob(0).unwrap_err();
679 match err {
680 Error::InvalidPuffin(msg) => assert!(
681 msg.contains("MAX_BLOB_LEN"),
682 "expected MAX_BLOB_LEN rejection, got: {msg}"
683 ),
684 other => panic!("expected InvalidPuffin, got {other:?}"),
685 }
686 }
687
688 #[test]
689 fn open_rejects_oversized_footer_payload_len() {
690 let writer = PuffinWriter::new(Cursor::new(Vec::new()));
694 let bytes = writer.finish().unwrap().into_inner();
695 let mut tampered = bytes.clone();
696 let oversized = (MAX_FOOTER_LEN as u32).saturating_add(1);
697 let len_offset = tampered.len() - 12;
698 tampered[len_offset..len_offset + 4].copy_from_slice(&oversized.to_le_bytes());
699 match PuffinReader::open(Cursor::new(tampered)) {
700 Ok(_) => panic!("expected MAX_FOOTER_LEN rejection, got Ok"),
701 Err(Error::InvalidPuffin(msg)) => assert!(
702 msg.contains("MAX_FOOTER_LEN"),
703 "expected MAX_FOOTER_LEN rejection, got: {msg}"
704 ),
705 Err(other) => panic!("expected InvalidPuffin, got {other:?}"),
706 }
707 }
708
709 #[cfg(not(feature = "zstd"))]
710 #[test]
711 fn requesting_zstd_without_feature_errors() {
712 let mut writer = PuffinWriter::new(Cursor::new(Vec::new()));
713 let err = writer
714 .add_blob_compressed(
715 Blob::new("samkhya.test-v1", vec![0], b"x"),
716 CompressionCodec::Zstd,
717 )
718 .unwrap_err();
719 assert!(matches!(err, Error::InvalidPuffin(_)));
720 }
721}
722
723#[cfg(all(test, feature = "zstd"))]
724mod zstd_tests {
725 use std::io::Cursor;
726
727 use super::*;
728 use crate::sketches::{HllSketch, Sketch};
729
730 #[test]
731 fn round_trip_compressed_blob() {
732 let payload = vec![0xABu8; 8192];
734
735 let mut writer = PuffinWriter::new(Cursor::new(Vec::new()));
736 writer
737 .add_blob_compressed(
738 Blob::new("samkhya.test-v1", vec![0], &payload),
739 CompressionCodec::Zstd,
740 )
741 .unwrap();
742 let cursor = writer.finish().unwrap();
743
744 let mut reader = PuffinReader::open(Cursor::new(cursor.into_inner())).unwrap();
745 let meta = &reader.blobs()[0];
746 assert_eq!(meta.compression_codec.as_deref(), Some("zstd"));
747 assert!((meta.length as usize) < payload.len());
749
750 let decoded = reader.read_blob_decompressed(0).unwrap();
751 assert_eq!(decoded, payload);
752 }
753
754 #[test]
755 fn round_trip_compressed_hll_sketch() {
756 let mut hll = HllSketch::new(14).unwrap();
757 for i in 0..5_000u32 {
758 hll.add(&i.to_le_bytes());
759 }
760 let bytes = hll.to_bytes().unwrap();
761
762 let mut writer = PuffinWriter::new(Cursor::new(Vec::new()));
763 writer
764 .add_blob_compressed(
765 Blob::new(HllSketch::KIND, vec![1], &bytes),
766 CompressionCodec::Zstd,
767 )
768 .unwrap();
769 let cursor = writer.finish().unwrap();
770
771 let mut reader = PuffinReader::open(Cursor::new(cursor.into_inner())).unwrap();
772 let (idx, meta) = reader.find_blob(HllSketch::KIND).unwrap();
773 assert_eq!(meta.compression_codec.as_deref(), Some("zstd"));
774 let decoded = reader.read_blob_decompressed(idx).unwrap();
775 let hll2 = HllSketch::from_bytes(&decoded).unwrap();
776 let err = (hll2.estimate() as f64 - 5_000.0).abs() / 5_000.0;
777 assert!(err < 0.05, "HLL estimate off by {err}");
778 }
779
780 #[test]
781 fn none_codec_via_compressed_api_matches_plain() {
782 let mut writer = PuffinWriter::new(Cursor::new(Vec::new()));
783 writer
784 .add_blob_compressed(
785 Blob::new("samkhya.test-v1", vec![0], b"identity"),
786 CompressionCodec::None,
787 )
788 .unwrap();
789 let cursor = writer.finish().unwrap();
790 let mut reader = PuffinReader::open(Cursor::new(cursor.into_inner())).unwrap();
791 assert!(reader.blobs()[0].compression_codec.is_none());
792 assert_eq!(reader.read_blob_decompressed(0).unwrap(), b"identity");
793 }
794}