use crate::containers::UintVecMin0;
use crate::blob_store::traits::{BlobStore, BatchBlobStore, IterableBlobStore, BlobStoreStats};
use crate::error::{Result, ZiporaError};
use crate::RecordId;
use std::collections::HashMap;
#[derive(Debug, Clone)]
pub struct SimpleZipConfig {
pub min_frag_len: usize,
pub max_frag_len: usize,
pub delimiters: Vec<u8>,
}
impl Default for SimpleZipConfig {
fn default() -> Self {
Self {
min_frag_len: 8,
max_frag_len: 256,
delimiters: vec![b'\n', b'\r', b'\t', b' '],
}
}
}
impl SimpleZipConfig {
pub fn builder() -> Self { Self::default() }
pub fn min_frag_len(mut self, v: usize) -> Self { self.min_frag_len = v; self }
pub fn max_frag_len(mut self, v: usize) -> Self { self.max_frag_len = v; self }
pub fn delimiters(mut self, v: Vec<u8>) -> Self { self.delimiters = v; self }
pub fn build(self) -> Result<Self> { self.validate()?; Ok(self) }
pub fn validate(&self) -> Result<()> {
if self.min_frag_len == 0 {
return Err(ZiporaError::invalid_parameter("min_frag_len must be > 0"));
}
if self.max_frag_len < self.min_frag_len {
return Err(ZiporaError::invalid_parameter(
"max_frag_len must be >= min_frag_len"
));
}
if self.max_frag_len > 1024 * 1024 {
return Err(ZiporaError::invalid_parameter(
"max_frag_len must be <= 1MB (fragmentation efficiency)"
));
}
Ok(())
}
}
pub struct SimpleZipBlobStore {
strpool: Vec<u8>,
off_len: Vec<u64>,
len_bits: u32,
records: UintVecMin0,
num_records: usize,
unzip_size: usize,
stats: BlobStoreStats,
}
impl SimpleZipBlobStore {
pub fn build_from(data: &[Vec<u8>], config: &SimpleZipConfig) -> Result<Self> {
config.validate()?;
if data.is_empty() {
return Ok(Self::default());
}
let mut unzip_size = 0;
let mut all_fragments = Vec::new();
let mut record_boundaries = vec![0usize];
for record in data {
unzip_size += record.len();
let record_frags = Self::fragment_record(record, config);
all_fragments.extend(record_frags.into_iter().map(|s| s.to_vec()));
record_boundaries.push(all_fragments.len());
}
let (strpool, offsets, lengths) = Self::build_strpool(&all_fragments)?;
let max_len = lengths.iter().copied().max().unwrap_or(0);
let len_bits = if max_len == 0 { 0 } else { (usize::BITS - max_len.leading_zeros()) as u32 };
let off_len: Vec<u64> = offsets.iter().zip(lengths.iter())
.map(|(&offset, &length)| ((offset as u64) << len_bits) | (length as u64))
.collect();
let records = UintVecMin0::build_from_usize(&record_boundaries).0;
let mut stats = BlobStoreStats::default();
stats.blob_count = data.len();
stats.total_size = unzip_size;
stats.average_size = if data.is_empty() {
0.0
} else {
unzip_size as f64 / data.len() as f64
};
Ok(Self {
strpool,
off_len,
len_bits,
records,
num_records: data.len(),
unzip_size,
stats,
})
}
fn fragment_record<'a>(record: &'a [u8], config: &SimpleZipConfig) -> Vec<&'a [u8]> {
let mut fragments = Vec::new();
let mut pos = 0;
while pos < record.len() {
let max_end = (pos + config.max_frag_len).min(record.len());
let min_end = (pos + config.min_frag_len).min(record.len());
let mut end = min_end;
for i in min_end..max_end {
if config.delimiters.contains(&record[i]) {
end = i + 1; break;
}
}
if end == min_end {
end = max_end;
}
fragments.push(&record[pos..end]);
pos = end;
}
fragments
}
fn build_strpool(fragments: &[Vec<u8>]) -> Result<(Vec<u8>, Vec<usize>, Vec<usize>)> {
let mut strpool = Vec::new();
let mut frag_map: HashMap<Vec<u8>, usize> = HashMap::new();
let mut offsets = Vec::new();
let mut lengths = Vec::new();
for frag in fragments {
let offset = *frag_map.entry(frag.clone()).or_insert_with(|| {
let offset = strpool.len();
strpool.extend_from_slice(frag);
offset
});
offsets.push(offset);
lengths.push(frag.len());
}
Ok((strpool, offsets, lengths))
}
fn get_record_append_imp(&self, rec_id: usize, rec_data: &mut Vec<u8>) -> Result<()> {
if rec_id >= self.num_records {
return Err(ZiporaError::not_found(format!(
"Record {} not found (max {})", rec_id, self.num_records - 1
)));
}
let beg = self.records.get(rec_id);
let end = self.records.get(rec_id + 1);
let strpool = self.strpool.as_ptr();
let ol_data = self.off_len.as_ptr();
let len_bits = self.len_bits;
let len_mask = (1u64 << len_bits) - 1;
for i in beg..end {
let packed = unsafe { *ol_data.add(i) };
let offset = (packed >> len_bits) as usize;
let length = (packed & len_mask) as usize;
debug_assert!(offset + length <= self.strpool.len());
unsafe {
let src = strpool.add(offset);
rec_data.extend_from_slice(std::slice::from_raw_parts(src, length));
}
}
Ok(())
}
pub fn memory_stats(&self) -> MemoryStats {
let off_len_size = self.off_len.len() * std::mem::size_of::<u64>();
let metadata_size = off_len_size + self.records.mem_size();
MemoryStats {
strpool_size: self.strpool.len(),
off_len_size,
records_size: self.records.mem_size(),
total_size: self.strpool.len() + metadata_size,
uncompressed_size: self.unzip_size,
compression_ratio: if self.unzip_size > 0 {
(self.strpool.len() + metadata_size) as f64 / self.unzip_size as f64
} else {
1.0
},
}
}
pub fn num_unique_fragments(&self) -> usize {
self.off_len.len()
}
}
impl Default for SimpleZipBlobStore {
fn default() -> Self {
Self {
strpool: Vec::new(),
off_len: Vec::new(),
len_bits: 0,
records: UintVecMin0::new_empty(),
num_records: 0,
unzip_size: 0,
stats: BlobStoreStats::default(),
}
}
}
impl BlobStore for SimpleZipBlobStore {
fn get(&self, id: RecordId) -> Result<Vec<u8>> {
let mut data = Vec::new();
self.get_record_append_imp(id as usize, &mut data)?;
Ok(data)
}
fn put(&mut self, _data: &[u8]) -> Result<RecordId> {
Err(ZiporaError::not_supported(
"SimpleZipBlobStore is read-only after build"
))
}
fn remove(&mut self, _id: RecordId) -> Result<()> {
Err(ZiporaError::not_supported(
"SimpleZipBlobStore is read-only"
))
}
fn contains(&self, id: RecordId) -> bool {
(id as usize) < self.num_records
}
fn size(&self, id: RecordId) -> Result<Option<usize>> {
if id as usize >= self.num_records {
return Ok(None);
}
let mut data = Vec::new();
self.get_record_append_imp(id as usize, &mut data)?;
Ok(Some(data.len()))
}
fn len(&self) -> usize {
self.num_records
}
fn stats(&self) -> BlobStoreStats {
self.stats.clone()
}
}
impl BatchBlobStore for SimpleZipBlobStore {
fn put_batch<I>(&mut self, _blobs: I) -> Result<Vec<RecordId>>
where
I: IntoIterator<Item = Vec<u8>>,
{
Err(ZiporaError::not_supported(
"SimpleZipBlobStore is read-only"
))
}
fn get_batch<I>(&self, ids: I) -> Result<Vec<Option<Vec<u8>>>>
where
I: IntoIterator<Item = RecordId>,
{
ids.into_iter()
.map(|id| {
if self.contains(id) {
self.get(id).map(Some)
} else {
Ok(None)
}
})
.collect()
}
fn remove_batch<I>(&mut self, _ids: I) -> Result<usize>
where
I: IntoIterator<Item = RecordId>,
{
Err(ZiporaError::not_supported(
"SimpleZipBlobStore is read-only"
))
}
}
impl IterableBlobStore for SimpleZipBlobStore {
type IdIter = std::ops::Range<RecordId>;
fn iter_ids(&self) -> Self::IdIter {
0..self.num_records as RecordId
}
}
#[derive(Debug, Clone)]
pub struct MemoryStats {
pub strpool_size: usize,
pub off_len_size: usize,
pub records_size: usize,
pub total_size: usize,
pub uncompressed_size: usize,
pub compression_ratio: f64,
}
impl MemoryStats {
pub fn space_saved_percent(&self) -> f64 {
(1.0 - self.compression_ratio) * 100.0
}
pub fn metadata_overhead_percent(&self) -> f64 {
if self.strpool_size == 0 {
0.0
} else {
((self.off_len_size + self.records_size) as f64 / self.strpool_size as f64) * 100.0
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_config_default() {
let config = SimpleZipConfig::default();
assert_eq!(config.min_frag_len, 8);
assert_eq!(config.max_frag_len, 256);
assert_eq!(config.delimiters, vec![b'\n', b'\r', b'\t', b' ']);
assert!(config.validate().is_ok());
}
#[test]
fn test_config_builder() {
let config = SimpleZipConfig::builder()
.min_frag_len(16)
.max_frag_len(512)
.delimiters(vec![b',', b';'])
.build()
.unwrap();
assert_eq!(config.min_frag_len, 16);
assert_eq!(config.max_frag_len, 512);
assert_eq!(config.delimiters, vec![b',', b';']);
}
#[test]
fn test_config_validation() {
let result = SimpleZipConfig::builder()
.min_frag_len(0)
.build();
assert!(result.is_err());
let result = SimpleZipConfig::builder()
.min_frag_len(100)
.max_frag_len(50)
.build();
assert!(result.is_err());
}
#[test]
fn test_empty_data() {
let data: Vec<Vec<u8>> = vec![];
let config = SimpleZipConfig::default();
let store = SimpleZipBlobStore::build_from(&data, &config).unwrap();
assert_eq!(store.len(), 0);
assert!(store.is_empty());
assert_eq!(store.unzip_size, 0);
}
#[test]
fn test_single_record() {
let data = vec![b"Hello World\n".to_vec()];
let config = SimpleZipConfig::default();
let store = SimpleZipBlobStore::build_from(&data, &config).unwrap();
assert_eq!(store.len(), 1);
assert_eq!(store.get(0).unwrap(), b"Hello World\n");
assert!(store.contains(0));
assert!(!store.contains(1));
}
#[test]
fn test_fragment_deduplication() {
let data = vec![
b"Hello World\n".to_vec(),
b"Hello Rust\n".to_vec(),
b"World Peace\n".to_vec(),
];
let config = SimpleZipConfig::default();
let store = SimpleZipBlobStore::build_from(&data, &config).unwrap();
assert_eq!(store.get(0).unwrap(), b"Hello World\n");
assert_eq!(store.get(1).unwrap(), b"Hello Rust\n");
assert_eq!(store.get(2).unwrap(), b"World Peace\n");
let stats = store.memory_stats();
assert!(stats.strpool_size > 0);
println!("Strpool: {} bytes", stats.strpool_size);
println!("Uncompressed: {} bytes", store.unzip_size);
println!("Compression ratio: {:.2}%", stats.compression_ratio * 100.0);
}
#[test]
fn test_delimiter_fragmentation() {
let data = vec![
b"line1\nline2\nline3\n".to_vec(),
b"line1\nline4\nline5\n".to_vec(),
];
let config = SimpleZipConfig {
min_frag_len: 1,
max_frag_len: 20,
delimiters: vec![b'\n'],
};
let store = SimpleZipBlobStore::build_from(&data, &config).unwrap();
assert_eq!(store.get(0).unwrap(), b"line1\nline2\nline3\n");
assert_eq!(store.get(1).unwrap(), b"line1\nline4\nline5\n");
let stats = store.memory_stats();
assert!(stats.strpool_size < store.unzip_size, "Fragment deduplication should reduce strpool size");
}
#[test]
fn test_no_delimiters_found() {
let data = vec![
b"AAAAAAAAAABBBBBBBBBBCCCCCCCCCC".to_vec(),
b"DDDDDDDDDDEEEEEEEEEEFFFFFFFFFF".to_vec(),
];
let config = SimpleZipConfig {
min_frag_len: 8,
max_frag_len: 10,
delimiters: vec![b'\n'],
};
let store = SimpleZipBlobStore::build_from(&data, &config).unwrap();
assert_eq!(store.get(0).unwrap(), b"AAAAAAAAAABBBBBBBBBBCCCCCCCCCC");
assert_eq!(store.get(1).unwrap(), b"DDDDDDDDDDEEEEEEEEEEFFFFFFFFFF");
}
#[test]
fn test_shared_substrings() {
let timestamp = b"2024-01-15 12:34:56 ";
let mut data = Vec::new();
for i in 0..100 {
let mut record = timestamp.to_vec();
record.extend_from_slice(format!("Event {}\n", i).as_bytes());
data.push(record);
}
let config = SimpleZipConfig::default();
let store = SimpleZipBlobStore::build_from(&data, &config).unwrap();
for i in 0..100 {
let expected = format!("2024-01-15 12:34:56 Event {}\n", i);
assert_eq!(store.get(i).unwrap(), expected.as_bytes());
}
let stats = store.memory_stats();
assert!(stats.strpool_size < store.unzip_size / 2, "Timestamp deduplication should save space");
println!("Space saved: {:.2}%", stats.space_saved_percent());
}
#[test]
fn test_size_method() {
let data = vec![
b"short".to_vec(),
b"medium length".to_vec(),
b"a much longer string with more content".to_vec(),
];
let config = SimpleZipConfig::default();
let store = SimpleZipBlobStore::build_from(&data, &config).unwrap();
assert_eq!(store.size(0).unwrap(), Some(5));
assert_eq!(store.size(1).unwrap(), Some(13));
assert_eq!(store.size(2).unwrap(), Some(b"a much longer string with more content".len()));
assert_eq!(store.size(999).unwrap(), None);
}
#[test]
fn test_batch_operations() {
let data = vec![
b"record0".to_vec(),
b"record1".to_vec(),
b"record2".to_vec(),
];
let config = SimpleZipConfig::default();
let store = SimpleZipBlobStore::build_from(&data, &config).unwrap();
let results = store.get_batch(vec![0, 2, 999]).unwrap();
assert_eq!(results[0], Some(b"record0".to_vec()));
assert_eq!(results[1], Some(b"record2".to_vec()));
assert_eq!(results[2], None);
}
#[test]
fn test_iteration() {
let data = vec![
b"A".to_vec(),
b"B".to_vec(),
b"C".to_vec(),
];
let config = SimpleZipConfig::default();
let store = SimpleZipBlobStore::build_from(&data, &config).unwrap();
let ids: Vec<_> = store.iter_ids().collect();
assert_eq!(ids, vec![0, 1, 2]);
let blobs: Vec<_> = store.iter_blobs().collect::<Result<Vec<_>>>().unwrap();
assert_eq!(blobs.len(), 3);
assert_eq!(blobs[0].1, b"A");
assert_eq!(blobs[1].1, b"B");
assert_eq!(blobs[2].1, b"C");
}
#[test]
fn test_read_only_operations() {
let data = vec![b"test".to_vec()];
let config = SimpleZipConfig::default();
let mut store = SimpleZipBlobStore::build_from(&data, &config).unwrap();
assert!(store.put(b"new").is_err());
assert!(store.remove(0).is_err());
assert!(store.put_batch(vec![b"a".to_vec()]).is_err());
assert!(store.remove_batch(vec![0]).is_err());
}
#[test]
fn test_record_not_found() {
let data = vec![b"test".to_vec()];
let config = SimpleZipConfig::default();
let store = SimpleZipBlobStore::build_from(&data, &config).unwrap();
assert!(store.get(999).is_err());
}
#[test]
fn test_memory_stats() {
let data = vec![
b"Hello World\n".to_vec(),
b"Hello Rust\n".to_vec(),
];
let config = SimpleZipConfig::default();
let store = SimpleZipBlobStore::build_from(&data, &config).unwrap();
let stats = store.memory_stats();
assert!(stats.strpool_size > 0);
assert!(stats.off_len_size > 0);
assert!(stats.records_size > 0);
assert_eq!(stats.total_size, stats.strpool_size + stats.off_len_size + stats.records_size);
assert!(stats.compression_ratio > 0.0);
println!("Strpool: {} bytes", stats.strpool_size);
println!("Off-len: {} bytes", stats.off_len_size);
println!("Records: {} bytes", stats.records_size);
println!("Total: {} bytes", stats.total_size);
println!("Uncompressed: {} bytes", stats.uncompressed_size);
println!("Ratio: {:.2}%", stats.compression_ratio * 100.0);
println!("Metadata overhead: {:.2}%", stats.metadata_overhead_percent());
}
#[test]
fn test_large_dataset() {
let mut data = Vec::new();
for i in 0..1000 {
let record = format!("Record {}: Some common prefix data\n", i);
data.push(record.into_bytes());
}
let config = SimpleZipConfig::default();
let store = SimpleZipBlobStore::build_from(&data, &config).unwrap();
assert_eq!(store.get(0).unwrap(), b"Record 0: Some common prefix data\n");
assert_eq!(store.get(500).unwrap(), b"Record 500: Some common prefix data\n");
assert_eq!(store.get(999).unwrap(), b"Record 999: Some common prefix data\n");
let stats = store.memory_stats();
println!("Large dataset compression: {:.2}%", stats.compression_ratio * 100.0);
assert!(stats.strpool_size < store.unzip_size / 2, "Common prefix should be deduplicated");
}
#[test]
fn test_various_delimiters() {
let data = vec![
b"a,b,c,d".to_vec(),
b"a,e,f,g".to_vec(),
b"h,b,i,d".to_vec(),
];
let config = SimpleZipConfig {
min_frag_len: 1,
max_frag_len: 10,
delimiters: vec![b','],
};
let store = SimpleZipBlobStore::build_from(&data, &config).unwrap();
assert_eq!(store.get(0).unwrap(), b"a,b,c,d");
assert_eq!(store.get(1).unwrap(), b"a,e,f,g");
assert_eq!(store.get(2).unwrap(), b"h,b,i,d");
let stats = store.memory_stats();
assert!(stats.strpool_size < store.unzip_size, "Fragment deduplication should work");
}
}