use ailake_core::{AilakeError, AilakeResult};
use bytes::Bytes;
use serde::{Deserialize, Serialize};
pub const BLOB_TYPE_VECTOR_STATS: &str = "ailake-vector-stats-v1";
pub const BLOB_TYPE_BM25_BLOOM: &str = "ailake-bm25-bloom-v1";
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct VectorStatEntry {
pub path: String,
pub centroid: Vec<f32>,
pub radius: f32,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BM25BloomEntry {
pub path: String,
pub bloom_bytes: Vec<u8>,
}
const PUFFIN_MAGIC: &[u8] = b"PFAc";
pub struct PuffinStatsResult {
pub bytes: Bytes,
pub footer_size: usize,
pub vector_stats_blob: (u64, u64),
pub bm25_bloom_blob: Option<(u64, u64)>,
}
pub struct AilakePuffinWriter;
impl AilakePuffinWriter {
pub fn write_stats(
vector_stats: &[VectorStatEntry],
bm25_blooms: &[BM25BloomEntry],
snap_id: i64,
) -> AilakeResult<PuffinStatsResult> {
let vec_blob =
bincode::serialize(vector_stats).map_err(|e| AilakeError::Bincode(e.to_string()))?;
let bm25_blob: Option<Vec<u8>> = if !bm25_blooms.is_empty() {
Some(bincode::serialize(bm25_blooms).map_err(|e| AilakeError::Bincode(e.to_string()))?)
} else {
None
};
let vec_offset = PUFFIN_MAGIC.len() as u64;
let vec_len = vec_blob.len() as u64;
let bm25_offset = vec_offset + vec_len;
let bm25_len = bm25_blob.as_ref().map_or(0, |b| b.len() as u64);
let mut blobs_json = vec![serde_json::json!({
"type": BLOB_TYPE_VECTOR_STATS,
"snapshot-id": snap_id,
"sequence-number": 0,
"offset": vec_offset,
"length": vec_len,
"fields": []
})];
if bm25_blob.is_some() {
blobs_json.push(serde_json::json!({
"type": BLOB_TYPE_BM25_BLOOM,
"snapshot-id": snap_id,
"sequence-number": 0,
"offset": bm25_offset,
"length": bm25_len,
"fields": []
}));
}
let footer_json = serde_json::json!({
"blobs": blobs_json,
"properties": { "ailake.stats-version": "1" }
})
.to_string();
let footer_bytes = footer_json.as_bytes();
let footer_size = footer_bytes.len();
let footer_len_le = (footer_size as u32).to_le_bytes();
let mut out = Vec::with_capacity(
PUFFIN_MAGIC.len() * 2 + vec_blob.len() + bm25_len as usize + footer_size + 4,
);
out.extend_from_slice(PUFFIN_MAGIC);
out.extend_from_slice(&vec_blob);
if let Some(ref b) = bm25_blob {
out.extend_from_slice(b);
}
out.extend_from_slice(footer_bytes);
out.extend_from_slice(&footer_len_le);
out.extend_from_slice(PUFFIN_MAGIC);
Ok(PuffinStatsResult {
bytes: Bytes::from(out),
footer_size,
vector_stats_blob: (vec_offset, vec_len),
bm25_bloom_blob: bm25_blob.map(|_| (bm25_offset, bm25_len)),
})
}
}
pub struct AilakePuffinReader<'a> {
data: &'a [u8],
}
impl<'a> AilakePuffinReader<'a> {
pub fn new(data: &'a [u8]) -> Self {
Self { data }
}
fn footer(&self) -> AilakeResult<serde_json::Value> {
let n = self.data.len();
if n < 12 {
return Err(AilakeError::Catalog("Puffin file too short".into()));
}
let footer_len = u32::from_le_bytes(self.data[n - 8..n - 4].try_into().unwrap()) as usize;
let footer_start = n - 8 - footer_len;
serde_json::from_slice(&self.data[footer_start..footer_start + footer_len])
.map_err(|e| AilakeError::Catalog(format!("Puffin footer parse: {e}")))
}
fn blob_slice(&self, blob: &serde_json::Value) -> AilakeResult<&[u8]> {
let offset = blob["offset"].as_u64().unwrap_or(0) as usize;
let length = blob["length"].as_u64().unwrap_or(0) as usize;
if offset + length > self.data.len() {
return Err(AilakeError::Catalog(
"Puffin blob offset out of range".into(),
));
}
Ok(&self.data[offset..offset + length])
}
pub fn read_vector_stats(&self) -> AilakeResult<Vec<VectorStatEntry>> {
let footer = self.footer()?;
let blobs = match footer["blobs"].as_array() {
Some(b) => b,
None => return Ok(vec![]),
};
for blob in blobs {
if blob["type"].as_str() == Some(BLOB_TYPE_VECTOR_STATS) {
let slice = self.blob_slice(blob)?;
return bincode::deserialize(slice)
.map_err(|e| AilakeError::Bincode(e.to_string()));
}
}
Ok(vec![])
}
pub fn read_bm25_blooms(&self) -> AilakeResult<Vec<BM25BloomEntry>> {
let footer = self.footer()?;
let blobs = match footer["blobs"].as_array() {
Some(b) => b,
None => return Ok(vec![]),
};
for blob in blobs {
if blob["type"].as_str() == Some(BLOB_TYPE_BM25_BLOOM) {
let slice = self.blob_slice(blob)?;
return bincode::deserialize(slice)
.map_err(|e| AilakeError::Bincode(e.to_string()));
}
}
Ok(vec![])
}
}
#[cfg(test)]
mod tests {
use super::*;
fn fnv64a(data: &[u8], seed: u64) -> u64 {
let mut h = seed ^ 14695981039346656037u64;
for &b in data {
h ^= b as u64;
h = h.wrapping_mul(1099511628211u64);
}
h
}
fn sample_vector_stats() -> Vec<VectorStatEntry> {
vec![
VectorStatEntry {
path: "data/part-00001.parquet".into(),
centroid: vec![0.1, 0.2, 0.3],
radius: 0.5,
},
VectorStatEntry {
path: "data/part-00002.parquet".into(),
centroid: vec![0.9, 0.8, 0.7],
radius: 0.3,
},
]
}
fn sample_bloom() -> Vec<BM25BloomEntry> {
let num_bits: usize = 1024;
let mut words = vec![0u64; num_bits / 64];
for term in &["rust", "iceberg"] {
let h1 = fnv64a(term.as_bytes(), 0);
let h2 = fnv64a(term.as_bytes(), h1);
for k in 0..4u64 {
let bit = ((h1.wrapping_add(k.wrapping_mul(h2))) as usize) % num_bits;
words[bit / 64] |= 1u64 << (bit % 64);
}
}
let mut bytes = (num_bits as u64).to_le_bytes().to_vec();
for w in &words {
bytes.extend_from_slice(&w.to_le_bytes());
}
vec![BM25BloomEntry {
path: "data/part-00001.parquet".into(),
bloom_bytes: bytes,
}]
}
#[test]
fn vector_stats_roundtrip() {
let stats = sample_vector_stats();
let result = AilakePuffinWriter::write_stats(&stats, &[], 42).unwrap();
assert!(result.bytes.starts_with(PUFFIN_MAGIC));
assert!(result.bytes.ends_with(PUFFIN_MAGIC));
assert!(result.bm25_bloom_blob.is_none());
let reader = AilakePuffinReader::new(&result.bytes);
let recovered = reader.read_vector_stats().unwrap();
assert_eq!(recovered.len(), 2);
assert_eq!(recovered[0].path, "data/part-00001.parquet");
assert!((recovered[0].radius - 0.5).abs() < 1e-6);
assert_eq!(recovered[1].centroid, vec![0.9f32, 0.8, 0.7]);
}
#[test]
fn bm25_bloom_roundtrip() {
let stats = sample_vector_stats();
let blooms = sample_bloom();
let result = AilakePuffinWriter::write_stats(&stats, &blooms, 99).unwrap();
assert!(result.bm25_bloom_blob.is_some());
let reader = AilakePuffinReader::new(&result.bytes);
let recovered_blooms = reader.read_bm25_blooms().unwrap();
assert_eq!(recovered_blooms.len(), 1);
assert_eq!(recovered_blooms[0].path, "data/part-00001.parquet");
assert!(!recovered_blooms[0].bloom_bytes.is_empty());
assert!(recovered_blooms[0].bloom_bytes.len() >= 8);
let nb = u64::from_le_bytes(recovered_blooms[0].bloom_bytes[..8].try_into().unwrap());
assert_eq!(nb, 1024);
}
#[test]
fn empty_bloom_produces_no_bloom_blob() {
let stats = sample_vector_stats();
let result = AilakePuffinWriter::write_stats(&stats, &[], 1).unwrap();
let reader = AilakePuffinReader::new(&result.bytes);
let blooms = reader.read_bm25_blooms().unwrap();
assert!(blooms.is_empty());
}
#[test]
fn footer_size_matches_actual() {
let stats = sample_vector_stats();
let result = AilakePuffinWriter::write_stats(&stats, &[], 7).unwrap();
let n = result.bytes.len();
let footer_len_from_file =
u32::from_le_bytes(result.bytes[n - 8..n - 4].try_into().unwrap()) as usize;
assert_eq!(footer_len_from_file, result.footer_size);
}
}