use std::collections::BTreeMap;
use std::io::{Read, Seek, SeekFrom, Write};
use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
use serde::{Deserialize, Serialize};
use crate::{Error, Result};
const MAGIC: &[u8; 4] = b"PFA1";
const MAX_BLOB_LEN: u64 = 2 * 1024 * 1024 * 1024;
const MAX_FOOTER_LEN: u64 = 16 * 1024 * 1024;
const MAX_BLOB_COUNT: usize = 65_536;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum CompressionCodec {
None,
Zstd,
}
impl CompressionCodec {
pub fn as_str(&self) -> &'static str {
match self {
CompressionCodec::None => "none",
CompressionCodec::Zstd => "zstd",
}
}
fn from_meta(meta: Option<&str>) -> Self {
match meta {
Some("zstd") => CompressionCodec::Zstd,
_ => CompressionCodec::None,
}
}
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct FooterPayload {
pub blobs: Vec<BlobMetadata>,
#[serde(default, skip_serializing_if = "BTreeMap::is_empty")]
pub properties: BTreeMap<String, String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BlobMetadata {
#[serde(rename = "type")]
pub kind: String,
pub fields: Vec<i32>,
#[serde(
rename = "snapshot-id",
default,
skip_serializing_if = "Option::is_none"
)]
pub snapshot_id: Option<i64>,
#[serde(
rename = "sequence-number",
default,
skip_serializing_if = "Option::is_none"
)]
pub sequence_number: Option<i64>,
pub offset: u64,
pub length: u64,
#[serde(
rename = "compression-codec",
default,
skip_serializing_if = "Option::is_none"
)]
pub compression_codec: Option<String>,
#[serde(default, skip_serializing_if = "BTreeMap::is_empty")]
pub properties: BTreeMap<String, String>,
}
pub struct Blob<'a> {
pub kind: String,
pub fields: Vec<i32>,
pub payload: &'a [u8],
pub properties: BTreeMap<String, String>,
}
impl<'a> Blob<'a> {
pub fn new(kind: impl Into<String>, fields: Vec<i32>, payload: &'a [u8]) -> Self {
Self {
kind: kind.into(),
fields,
payload,
properties: BTreeMap::new(),
}
}
}
pub struct PuffinWriter<W: Write + Seek> {
inner: W,
blobs: Vec<BlobMetadata>,
pos: u64,
wrote_head: bool,
}
impl<W: Write + Seek> PuffinWriter<W> {
pub fn new(inner: W) -> Self {
Self {
inner,
blobs: Vec::new(),
pos: 0,
wrote_head: false,
}
}
fn ensure_head(&mut self) -> Result<()> {
if !self.wrote_head {
self.inner.write_all(MAGIC)?;
self.pos += MAGIC.len() as u64;
self.wrote_head = true;
}
Ok(())
}
pub fn add_blob(&mut self, blob: Blob<'_>) -> Result<()> {
self.ensure_head()?;
let offset = self.pos;
self.inner.write_all(blob.payload)?;
let length = blob.payload.len() as u64;
self.pos += length;
self.blobs.push(BlobMetadata {
kind: blob.kind,
fields: blob.fields,
snapshot_id: None,
sequence_number: None,
offset,
length,
compression_codec: None,
properties: blob.properties,
});
Ok(())
}
pub fn add_blob_compressed(&mut self, blob: Blob<'_>, codec: CompressionCodec) -> Result<()> {
match codec {
CompressionCodec::None => self.add_blob(blob),
CompressionCodec::Zstd => self.add_blob_zstd(blob),
}
}
#[cfg(feature = "zstd")]
fn add_blob_zstd(&mut self, blob: Blob<'_>) -> Result<()> {
self.ensure_head()?;
let compressed = zstd::encode_all(blob.payload, 0)
.map_err(|e| Error::InvalidPuffin(format!("zstd encode: {e}")))?;
let offset = self.pos;
self.inner.write_all(&compressed)?;
let length = compressed.len() as u64;
self.pos += length;
self.blobs.push(BlobMetadata {
kind: blob.kind,
fields: blob.fields,
snapshot_id: None,
sequence_number: None,
offset,
length,
compression_codec: Some(CompressionCodec::Zstd.as_str().to_string()),
properties: blob.properties,
});
Ok(())
}
#[cfg(not(feature = "zstd"))]
fn add_blob_zstd(&mut self, _blob: Blob<'_>) -> Result<()> {
Err(Error::InvalidPuffin(
"zstd codec requested but the `zstd` cargo feature is disabled".into(),
))
}
pub fn finish(mut self) -> Result<W> {
self.ensure_head()?;
let footer = FooterPayload {
blobs: self.blobs,
properties: BTreeMap::new(),
};
let payload = serde_json::to_vec(&footer)
.map_err(|e| Error::InvalidPuffin(format!("footer JSON encode: {e}")))?;
let payload_len = u32::try_from(payload.len())
.map_err(|_| Error::InvalidPuffin("footer payload exceeds u32".into()))?;
self.inner.write_all(MAGIC)?;
self.inner.write_all(&payload)?;
self.inner.write_u32::<LittleEndian>(payload_len)?;
self.inner.write_u32::<LittleEndian>(0)?; self.inner.write_all(MAGIC)?;
Ok(self.inner)
}
}
pub struct PuffinReader<R: Read + Seek> {
inner: R,
footer: FooterPayload,
}
impl<R: Read + Seek> PuffinReader<R> {
pub fn open(mut inner: R) -> Result<Self> {
let file_len = inner.seek(SeekFrom::End(0))?;
if file_len < 20 {
return Err(Error::InvalidPuffin(format!(
"file too short: {file_len} bytes"
)));
}
inner.seek(SeekFrom::End(-4))?;
let mut trailing = [0u8; 4];
inner.read_exact(&mut trailing)?;
if &trailing != MAGIC {
return Err(Error::InvalidPuffin("trailing magic missing".into()));
}
inner.seek(SeekFrom::End(-8))?;
let _flags = inner.read_u32::<LittleEndian>()?;
inner.seek(SeekFrom::End(-12))?;
let payload_len = inner.read_u32::<LittleEndian>()? as u64;
if payload_len > MAX_FOOTER_LEN {
return Err(Error::InvalidPuffin(format!(
"footer payload length {payload_len} exceeds MAX_FOOTER_LEN {MAX_FOOTER_LEN}"
)));
}
let footer_total = 16u64 + payload_len; if file_len < footer_total {
return Err(Error::InvalidPuffin(
"payload length exceeds file size".into(),
));
}
let payload_start = file_len - 12 - payload_len;
inner.seek(SeekFrom::Start(payload_start))?;
let mut payload = vec![0u8; payload_len as usize];
inner.read_exact(&mut payload)?;
inner.seek(SeekFrom::Start(payload_start - 4))?;
let mut footer_head = [0u8; 4];
inner.read_exact(&mut footer_head)?;
if &footer_head != MAGIC {
return Err(Error::InvalidPuffin("footer head magic missing".into()));
}
inner.seek(SeekFrom::Start(0))?;
let mut head = [0u8; 4];
inner.read_exact(&mut head)?;
if &head != MAGIC {
return Err(Error::InvalidPuffin("file head magic missing".into()));
}
let footer: FooterPayload = serde_json::from_slice(&payload)
.map_err(|e| Error::InvalidPuffin(format!("footer JSON decode: {e}")))?;
if footer.blobs.len() > MAX_BLOB_COUNT {
return Err(Error::InvalidPuffin(format!(
"footer enumerates {} blobs; exceeds MAX_BLOB_COUNT {MAX_BLOB_COUNT}",
footer.blobs.len()
)));
}
Ok(Self { inner, footer })
}
pub fn footer(&self) -> &FooterPayload {
&self.footer
}
pub fn blobs(&self) -> &[BlobMetadata] {
&self.footer.blobs
}
pub fn read_blob(&mut self, idx: usize) -> Result<Vec<u8>> {
let meta = self
.footer
.blobs
.get(idx)
.ok_or_else(|| Error::InvalidPuffin(format!("blob index {idx} out of range")))?;
if meta.length > MAX_BLOB_LEN {
return Err(Error::InvalidPuffin(format!(
"blob {idx} length {} exceeds MAX_BLOB_LEN {MAX_BLOB_LEN}",
meta.length
)));
}
self.inner.seek(SeekFrom::Start(meta.offset))?;
let mut buf = vec![0u8; meta.length as usize];
self.inner.read_exact(&mut buf)?;
Ok(buf)
}
pub fn read_blob_decompressed(&mut self, idx: usize) -> Result<Vec<u8>> {
let codec = {
let meta =
self.footer.blobs.get(idx).ok_or_else(|| {
Error::InvalidPuffin(format!("blob index {idx} out of range"))
})?;
CompressionCodec::from_meta(meta.compression_codec.as_deref())
};
let raw = self.read_blob(idx)?;
match codec {
CompressionCodec::None => Ok(raw),
CompressionCodec::Zstd => decode_zstd(&raw),
}
}
pub fn find_blob(&self, kind: &str) -> Option<(usize, &BlobMetadata)> {
self.footer
.blobs
.iter()
.enumerate()
.find(|(_, b)| b.kind == kind)
}
}
#[cfg(feature = "zstd")]
fn decompress_bounded<R: std::io::Read>(
mut reader: R,
max_bytes: usize,
codec_label: &'static str,
) -> Result<Vec<u8>> {
use std::io::Read as _;
let mut out = Vec::with_capacity(max_bytes.min(256 * 1024));
let n = (&mut reader)
.take(max_bytes as u64 + 1)
.read_to_end(&mut out)
.map_err(|e| Error::InvalidPuffin(format!("{codec_label} decode: {e}")))?;
let _ = n; if out.len() > max_bytes {
return Err(Error::InvalidPuffin(format!(
"{codec_label} decompressed size exceeds {max_bytes}"
)));
}
Ok(out)
}
#[cfg(feature = "zstd")]
fn decode_zstd(raw: &[u8]) -> Result<Vec<u8>> {
let decoder = zstd::stream::Decoder::new(raw)
.map_err(|e| Error::InvalidPuffin(format!("zstd decoder init: {e}")))?;
decompress_bounded(decoder, MAX_BLOB_LEN as usize, "zstd")
}
#[cfg(not(feature = "zstd"))]
fn decode_zstd(_raw: &[u8]) -> Result<Vec<u8>> {
Err(Error::InvalidPuffin(
"blob is zstd-compressed but the `zstd` cargo feature is disabled".into(),
))
}
#[cfg(test)]
mod tests {
use std::io::Cursor;
use super::*;
use crate::sketches::{HllSketch, Sketch};
#[test]
fn round_trip_empty() {
let writer = PuffinWriter::new(Cursor::new(Vec::new()));
let cursor = writer.finish().unwrap();
let reader = PuffinReader::open(Cursor::new(cursor.into_inner())).unwrap();
assert!(reader.blobs().is_empty());
}
#[test]
fn round_trip_single_blob() {
let mut writer = PuffinWriter::new(Cursor::new(Vec::new()));
writer
.add_blob(Blob::new("samkhya.test-v1", vec![0], b"hello puffin"))
.unwrap();
let cursor = writer.finish().unwrap();
let mut reader = PuffinReader::open(Cursor::new(cursor.into_inner())).unwrap();
assert_eq!(reader.blobs().len(), 1);
assert_eq!(reader.blobs()[0].kind, "samkhya.test-v1");
assert_eq!(reader.read_blob(0).unwrap(), b"hello puffin");
}
#[test]
fn round_trip_multiple_blobs() {
let mut writer = PuffinWriter::new(Cursor::new(Vec::new()));
writer
.add_blob(Blob::new("samkhya.hll-v1", vec![1], &[1, 2, 3, 4, 5]))
.unwrap();
writer
.add_blob(Blob::new("samkhya.bloom-v1", vec![2], &[10, 20, 30]))
.unwrap();
let cursor = writer.finish().unwrap();
let mut reader = PuffinReader::open(Cursor::new(cursor.into_inner())).unwrap();
assert_eq!(reader.blobs().len(), 2);
assert_eq!(reader.read_blob(0).unwrap(), vec![1, 2, 3, 4, 5]);
assert_eq!(reader.read_blob(1).unwrap(), vec![10, 20, 30]);
assert_eq!(
reader.find_blob("samkhya.bloom-v1").map(|(i, _)| i),
Some(1)
);
assert_eq!(reader.find_blob("absent.kind").map(|(i, _)| i), None);
}
#[test]
fn round_trip_hll_sketch_through_puffin() {
let mut hll = HllSketch::new(12).unwrap();
for i in 0..1000u32 {
hll.add(&i.to_le_bytes());
}
let payload = hll.to_bytes().unwrap();
let mut writer = PuffinWriter::new(Cursor::new(Vec::new()));
writer
.add_blob(Blob::new(HllSketch::KIND, vec![7], &payload))
.unwrap();
let cursor = writer.finish().unwrap();
let mut reader = PuffinReader::open(Cursor::new(cursor.into_inner())).unwrap();
let (idx, meta) = reader.find_blob(HllSketch::KIND).unwrap();
assert_eq!(meta.fields, vec![7]);
let blob_bytes = reader.read_blob(idx).unwrap();
let hll2 = HllSketch::from_bytes(&blob_bytes).unwrap();
let err = (hll2.estimate() as f64 - 1000.0).abs() / 1000.0;
assert!(err < 0.1, "HLL estimate via Puffin off by {err}");
}
#[test]
fn rejects_too_short_file() {
let result = PuffinReader::open(Cursor::new(vec![0u8; 5]));
assert!(result.is_err());
}
#[test]
fn rejects_bad_trailing_magic() {
let mut buf = vec![0u8; 20];
buf[0..4].copy_from_slice(MAGIC);
let result = PuffinReader::open(Cursor::new(buf));
assert!(result.is_err());
}
#[test]
fn read_blob_decompressed_no_codec_is_passthrough() {
let mut writer = PuffinWriter::new(Cursor::new(Vec::new()));
writer
.add_blob(Blob::new("samkhya.test-v1", vec![0], b"plain payload"))
.unwrap();
let cursor = writer.finish().unwrap();
let mut reader = PuffinReader::open(Cursor::new(cursor.into_inner())).unwrap();
assert_eq!(reader.read_blob_decompressed(0).unwrap(), b"plain payload");
}
#[test]
fn read_blob_rejects_oversized_length() {
let mut writer = PuffinWriter::new(Cursor::new(Vec::new()));
writer
.add_blob(Blob::new("samkhya.test-v1", vec![0], b"x"))
.unwrap();
let bytes = writer.finish().unwrap().into_inner();
let file_len = bytes.len();
let payload_len =
u32::from_le_bytes(bytes[file_len - 12..file_len - 8].try_into().unwrap()) as usize;
let payload_start = file_len - 12 - payload_len;
let payload_end = file_len - 12;
let json = std::str::from_utf8(&bytes[payload_start..payload_end])
.unwrap()
.to_string();
let bogus_len = MAX_BLOB_LEN + 1;
let tampered_json = json.replacen("\"length\":1", &format!("\"length\":{bogus_len}"), 1);
assert_ne!(tampered_json, json, "tamper failed — payload schema drift?");
let mut tampered = Vec::with_capacity(file_len + 32);
tampered.extend_from_slice(&bytes[..payload_start]);
tampered.extend_from_slice(tampered_json.as_bytes());
let new_payload_len = tampered_json.len() as u32;
tampered.write_u32::<LittleEndian>(new_payload_len).unwrap();
tampered.write_u32::<LittleEndian>(0).unwrap();
tampered.extend_from_slice(MAGIC);
let mut reader = PuffinReader::open(Cursor::new(tampered)).unwrap();
let err = reader.read_blob(0).unwrap_err();
match err {
Error::InvalidPuffin(msg) => assert!(
msg.contains("MAX_BLOB_LEN"),
"expected MAX_BLOB_LEN rejection, got: {msg}"
),
other => panic!("expected InvalidPuffin, got {other:?}"),
}
}
#[test]
fn open_rejects_oversized_footer_payload_len() {
let writer = PuffinWriter::new(Cursor::new(Vec::new()));
let bytes = writer.finish().unwrap().into_inner();
let mut tampered = bytes.clone();
let oversized = (MAX_FOOTER_LEN as u32).saturating_add(1);
let len_offset = tampered.len() - 12;
tampered[len_offset..len_offset + 4].copy_from_slice(&oversized.to_le_bytes());
match PuffinReader::open(Cursor::new(tampered)) {
Ok(_) => panic!("expected MAX_FOOTER_LEN rejection, got Ok"),
Err(Error::InvalidPuffin(msg)) => assert!(
msg.contains("MAX_FOOTER_LEN"),
"expected MAX_FOOTER_LEN rejection, got: {msg}"
),
Err(other) => panic!("expected InvalidPuffin, got {other:?}"),
}
}
#[cfg(not(feature = "zstd"))]
#[test]
fn requesting_zstd_without_feature_errors() {
let mut writer = PuffinWriter::new(Cursor::new(Vec::new()));
let err = writer
.add_blob_compressed(
Blob::new("samkhya.test-v1", vec![0], b"x"),
CompressionCodec::Zstd,
)
.unwrap_err();
assert!(matches!(err, Error::InvalidPuffin(_)));
}
}
#[cfg(all(test, feature = "zstd"))]
mod zstd_tests {
use std::io::Cursor;
use super::*;
use crate::sketches::{HllSketch, Sketch};
#[test]
fn round_trip_compressed_blob() {
let payload = vec![0xABu8; 8192];
let mut writer = PuffinWriter::new(Cursor::new(Vec::new()));
writer
.add_blob_compressed(
Blob::new("samkhya.test-v1", vec![0], &payload),
CompressionCodec::Zstd,
)
.unwrap();
let cursor = writer.finish().unwrap();
let mut reader = PuffinReader::open(Cursor::new(cursor.into_inner())).unwrap();
let meta = &reader.blobs()[0];
assert_eq!(meta.compression_codec.as_deref(), Some("zstd"));
assert!((meta.length as usize) < payload.len());
let decoded = reader.read_blob_decompressed(0).unwrap();
assert_eq!(decoded, payload);
}
#[test]
fn round_trip_compressed_hll_sketch() {
let mut hll = HllSketch::new(14).unwrap();
for i in 0..5_000u32 {
hll.add(&i.to_le_bytes());
}
let bytes = hll.to_bytes().unwrap();
let mut writer = PuffinWriter::new(Cursor::new(Vec::new()));
writer
.add_blob_compressed(
Blob::new(HllSketch::KIND, vec![1], &bytes),
CompressionCodec::Zstd,
)
.unwrap();
let cursor = writer.finish().unwrap();
let mut reader = PuffinReader::open(Cursor::new(cursor.into_inner())).unwrap();
let (idx, meta) = reader.find_blob(HllSketch::KIND).unwrap();
assert_eq!(meta.compression_codec.as_deref(), Some("zstd"));
let decoded = reader.read_blob_decompressed(idx).unwrap();
let hll2 = HllSketch::from_bytes(&decoded).unwrap();
let err = (hll2.estimate() as f64 - 5_000.0).abs() / 5_000.0;
assert!(err < 0.05, "HLL estimate off by {err}");
}
#[test]
fn none_codec_via_compressed_api_matches_plain() {
let mut writer = PuffinWriter::new(Cursor::new(Vec::new()));
writer
.add_blob_compressed(
Blob::new("samkhya.test-v1", vec![0], b"identity"),
CompressionCodec::None,
)
.unwrap();
let cursor = writer.finish().unwrap();
let mut reader = PuffinReader::open(Cursor::new(cursor.into_inner())).unwrap();
assert!(reader.blobs()[0].compression_codec.is_none());
assert_eq!(reader.read_blob_decompressed(0).unwrap(), b"identity");
}
}