use crate::blob_store::sorted_uint_vec::SortedUintVecBuilder;
use crate::blob_store::zip_offset::{ZipOffsetBlobStore, ZipOffsetBlobStoreConfig};
use crate::containers::FastVec;
use crate::error::{Result, ZiporaError};
use crate::memory::SecureMemoryPool;
use crate::RecordId;
#[cfg(feature = "serde")]
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Default)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct BuilderStats {
pub record_count: usize,
pub uncompressed_size: usize,
pub compressed_size: usize,
pub compression_ops: usize,
pub checksum_ops: usize,
}
impl BuilderStats {
pub fn compression_ratio(&self) -> f32 {
if self.uncompressed_size > 0 {
self.compressed_size as f32 / self.uncompressed_size as f32
} else {
1.0
}
}
pub fn space_saved_percent(&self) -> f32 {
(1.0 - self.compression_ratio()) * 100.0
}
}
pub struct ZipOffsetBlobStoreBuilder {
config: ZipOffsetBlobStoreConfig,
content: FastVec<u8>,
offset_builder: SortedUintVecBuilder,
current_offset: u64,
stats: BuilderStats,
pool: Option<SecureMemoryPool>,
}
impl ZipOffsetBlobStoreBuilder {
pub fn new() -> Result<Self> {
Self::with_config(ZipOffsetBlobStoreConfig::default())
}
pub fn with_config(config: ZipOffsetBlobStoreConfig) -> Result<Self> {
config.validate()?;
let pool: Option<SecureMemoryPool> = None;
let offset_builder = SortedUintVecBuilder::with_config(config.offset_config.clone());
Ok(Self {
config,
content: FastVec::new(),
offset_builder,
current_offset: 0,
stats: BuilderStats::default(),
pool,
})
}
pub fn with_pool(config: ZipOffsetBlobStoreConfig, pool: SecureMemoryPool) -> Result<Self> {
config.validate()?;
let offset_builder = SortedUintVecBuilder::with_config(config.offset_config.clone());
Ok(Self {
config,
content: FastVec::new(),
offset_builder,
current_offset: 0,
stats: BuilderStats::default(),
pool: Some(pool),
})
}
#[inline]
pub fn len(&self) -> usize {
self.stats.record_count
}
#[inline]
pub fn is_empty(&self) -> bool {
self.stats.record_count == 0
}
pub fn stats(&self) -> &BuilderStats {
&self.stats
}
pub fn config(&self) -> &ZipOffsetBlobStoreConfig {
&self.config
}
pub fn content_size(&self) -> usize {
self.content.len()
}
pub fn add_record(&mut self, data: &[u8]) -> Result<RecordId> {
let record_id = self.stats.record_count;
self.offset_builder.push(self.current_offset)?;
let original_size = data.len();
let mut processed_data = data.to_vec();
if self.config.compress_level > 0 {
processed_data = self.compress_data(data)?;
self.stats.compression_ops += 1;
}
if self.config.checksum_level == 2 || self.config.checksum_level == 3 {
let checksum = self.calculate_checksum(&processed_data);
processed_data.extend_from_slice(&checksum.to_le_bytes());
self.stats.checksum_ops += 1;
}
let processed_len = processed_data.len();
self.content.extend(processed_data.into_iter())?;
self.current_offset += processed_len as u64;
self.stats.record_count += 1;
self.stats.uncompressed_size += original_size;
self.stats.compressed_size += processed_len;
Ok(record_id as u32)
}
pub fn add_records<I, D>(&mut self, records: I) -> Result<Vec<RecordId>>
where
I: IntoIterator<Item = D>,
D: AsRef<[u8]>,
{
let mut ids = Vec::new();
for record in records {
let id = self.add_record(record.as_ref())?;
ids.push(id);
}
Ok(ids)
}
pub fn reserve(&mut self, additional: usize) -> Result<()> {
let avg_size = if self.stats.record_count > 0 {
self.stats.compressed_size / self.stats.record_count
} else {
1024 };
let estimated_bytes = additional * avg_size;
self.content.reserve(estimated_bytes)?;
Ok(())
}
fn compress_data(&self, data: &[u8]) -> Result<Vec<u8>> {
match self.config.compress_level {
0 => Ok(data.to_vec()),
level => {
#[cfg(feature = "zstd")]
{
zstd::encode_all(data, level as i32)
.map_err(|e| ZiporaError::io_error(format!("ZSTD compression failed: {}", e)))
}
#[cfg(not(feature = "zstd"))]
{
Err(ZiporaError::invalid_data("ZSTD compression not available"))
}
}
}
}
fn calculate_checksum(&self, data: &[u8]) -> u32 {
data.iter().fold(0u32, |acc, &byte| {
acc.wrapping_mul(31).wrapping_add(byte as u32)
})
}
pub fn finish(mut self) -> Result<ZipOffsetBlobStore> {
self.offset_builder.push(self.current_offset)?;
let _offsets = self.offset_builder.finish()?;
let store = if let Some(pool) = self.pool {
ZipOffsetBlobStore::with_pool(self.config, pool)?
} else {
ZipOffsetBlobStore::with_config(self.config)?
};
Ok(store)
}
pub fn estimated_size(&self) -> usize {
self.content.len() +
self.offset_builder.len() * 8 + 128 + 64 }
pub fn validate(&self) -> Result<()> {
if self.offset_builder.len() != self.stats.record_count {
return Err(ZiporaError::invalid_data("offset count mismatch"));
}
if self.current_offset != self.content.len() as u64 {
return Err(ZiporaError::invalid_data("content size mismatch"));
}
Ok(())
}
}
impl Default for ZipOffsetBlobStoreBuilder {
fn default() -> Self {
Self::new().unwrap_or_else(|e| {
panic!("ZipOffsetBlobStoreBuilder creation failed in Default: {}. \
This indicates severe memory pressure.", e)
})
}
}
pub struct BatchZipOffsetBlobStoreBuilder {
inner: ZipOffsetBlobStoreBuilder,
batch_buffer: FastVec<u8>,
batch_size: usize,
records_in_batch: usize,
}
impl BatchZipOffsetBlobStoreBuilder {
pub fn new(batch_size: usize) -> Result<Self> {
Ok(Self {
inner: ZipOffsetBlobStoreBuilder::new()?,
batch_buffer: FastVec::new(),
batch_size,
records_in_batch: 0,
})
}
pub fn with_config(config: ZipOffsetBlobStoreConfig, batch_size: usize) -> Result<Self> {
Ok(Self {
inner: ZipOffsetBlobStoreBuilder::with_config(config)?,
batch_buffer: FastVec::new(),
batch_size,
records_in_batch: 0,
})
}
pub fn add_record(&mut self, data: &[u8]) -> Result<RecordId> {
self.batch_buffer.extend(data.iter().cloned())?;
self.batch_buffer.push(0)?; self.records_in_batch += 1;
if self.records_in_batch >= self.batch_size {
self.flush_batch()?;
}
Ok(self.inner.len() as u32) }
pub fn flush_batch(&mut self) -> Result<()> {
if self.records_in_batch == 0 {
return Ok(());
}
if !self.batch_buffer.is_empty() {
self.inner.add_record(&self.batch_buffer.as_slice())?;
}
self.batch_buffer.clear();
self.records_in_batch = 0;
Ok(())
}
#[inline]
pub fn len(&self) -> usize {
self.inner.len() + self.records_in_batch
}
#[inline]
pub fn is_empty(&self) -> bool {
self.inner.is_empty() && self.records_in_batch == 0
}
pub fn stats(&self) -> &BuilderStats {
self.inner.stats()
}
pub fn finish(mut self) -> Result<ZipOffsetBlobStore> {
self.flush_batch()?;
self.inner.finish()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_builder_stats() {
let mut stats = BuilderStats::default();
stats.uncompressed_size = 1000;
stats.compressed_size = 600;
assert!((stats.compression_ratio() - 0.6).abs() < 0.001);
assert!((stats.space_saved_percent() - 40.0).abs() < 0.001);
}
#[test]
fn test_zip_offset_blob_store_builder_basic() {
let mut builder = ZipOffsetBlobStoreBuilder::new().unwrap();
assert!(builder.is_empty());
assert_eq!(builder.len(), 0);
let data1 = b"Hello, world!";
let data2 = b"This is a test record";
let data3 = b"Another record for testing";
let id1 = builder.add_record(data1).unwrap();
let id2 = builder.add_record(data2).unwrap();
let id3 = builder.add_record(data3).unwrap();
assert_eq!(id1, 0);
assert_eq!(id2, 1);
assert_eq!(id3, 2);
assert_eq!(builder.len(), 3);
let stats = builder.stats();
assert_eq!(stats.record_count, 3);
assert_eq!(stats.uncompressed_size, data1.len() + data2.len() + data3.len());
assert!(builder.validate().is_ok());
}
#[test]
fn test_zip_offset_blob_store_builder_with_config() {
let config = ZipOffsetBlobStoreConfig::performance_optimized();
let mut builder = ZipOffsetBlobStoreBuilder::with_config(config).unwrap();
builder.add_record(b"test data").unwrap();
assert_eq!(builder.len(), 1);
assert_eq!(builder.config().compress_level, 1);
}
#[test]
fn test_zip_offset_blob_store_builder_compression() {
let config = ZipOffsetBlobStoreConfig {
compress_level: 3,
checksum_level: 0,
..Default::default()
};
let mut builder = ZipOffsetBlobStoreBuilder::with_config(config).unwrap();
let data = b"aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa";
builder.add_record(data).unwrap();
let stats = builder.stats();
assert_eq!(stats.record_count, 1);
assert_eq!(stats.uncompressed_size, data.len());
#[cfg(feature = "zstd")]
{
assert!(stats.compressed_size < stats.uncompressed_size);
assert!(stats.compression_ratio() < 1.0);
}
}
#[test]
fn test_zip_offset_blob_store_builder_checksums() {
let config = ZipOffsetBlobStoreConfig {
compress_level: 0,
checksum_level: 2, ..Default::default()
};
let mut builder = ZipOffsetBlobStoreBuilder::with_config(config).unwrap();
builder.add_record(b"test data").unwrap();
let stats = builder.stats();
assert_eq!(stats.checksum_ops, 1);
assert_eq!(stats.compressed_size, "test data".len() + 4);
}
#[test]
fn test_zip_offset_blob_store_builder_add_records() {
let mut builder = ZipOffsetBlobStoreBuilder::new().unwrap();
let records = vec![
b"First record".to_vec(),
b"Second record".to_vec(),
b"Third record".to_vec(),
];
let ids = builder.add_records(records.clone()).unwrap();
assert_eq!(ids.len(), 3);
assert_eq!(ids, vec![0, 1, 2]);
assert_eq!(builder.len(), 3);
}
#[test]
fn test_zip_offset_blob_store_builder_reserve() {
let mut builder = ZipOffsetBlobStoreBuilder::new().unwrap();
builder.reserve(100).unwrap();
builder.add_record(b"test").unwrap();
builder.reserve(50).unwrap();
}
#[test]
fn test_zip_offset_blob_store_builder_estimated_size() {
let mut builder = ZipOffsetBlobStoreBuilder::new().unwrap();
let initial_size = builder.estimated_size();
assert!(initial_size > 0);
builder.add_record(b"test data").unwrap();
let size_after_record = builder.estimated_size();
assert!(size_after_record > initial_size);
}
#[test]
fn test_batch_zip_offset_blob_store_builder() {
let mut batch_builder = BatchZipOffsetBlobStoreBuilder::new(2).unwrap();
assert!(batch_builder.is_empty());
batch_builder.add_record(b"record1").unwrap();
batch_builder.add_record(b"record2").unwrap(); batch_builder.add_record(b"record3").unwrap();
assert!(batch_builder.len() >= 1);
let _store = batch_builder.finish().unwrap();
}
#[test]
fn test_batch_builder_manual_flush() {
let mut batch_builder = BatchZipOffsetBlobStoreBuilder::new(10).unwrap();
batch_builder.add_record(b"record1").unwrap();
batch_builder.add_record(b"record2").unwrap();
batch_builder.flush_batch().unwrap();
batch_builder.add_record(b"record3").unwrap();
let _store = batch_builder.finish().unwrap();
}
#[test]
fn test_builder_validation() {
let builder = ZipOffsetBlobStoreBuilder::new().unwrap();
assert!(builder.validate().is_ok());
let mut builder = ZipOffsetBlobStoreBuilder::new().unwrap();
builder.add_record(b"test1").unwrap();
builder.add_record(b"test2").unwrap();
assert!(builder.validate().is_ok());
}
}