use crate::blob_store::traits::{
BatchBlobStore, BlobStore, BlobStoreStats, CompressedBlobStore, CompressionStats,
};
use crate::compression::dict_zip::{
DictionaryBuilder, DictionaryBuilderConfig, PaZipCompressor, PaZipCompressorConfig,
SuffixArrayDictionary, CompressionStats as PaZipCompressionStats,
};
use crate::containers::LruMap;
use crate::entropy::huffman::{ContextualHuffmanEncoder, ContextualHuffmanDecoder};
use crate::entropy::fse::{FseEncoder, FseDecoder, FseConfig};
use crate::error::{Result, ZiporaError};
use crate::memory::{SecureMemoryPool, SecurePoolConfig};
use crate::RecordId;
use std::collections::HashMap;
use std::fs;
use std::path::Path;
use std::sync::{Arc, RwLock};
use std::cell::RefCell;
#[cfg(feature = "serde")]
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub enum EntropyAlgorithm {
None,
HuffmanO1,
Fse,
}
impl Default for EntropyAlgorithm {
fn default() -> Self {
Self::None
}
}
#[derive(Debug, Clone)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct DictZipConfig {
pub dict_builder_config: DictionaryBuilderConfig,
pub compressor_config: PaZipCompressorConfig,
pub cache_size_bytes: usize,
pub external_dictionary: bool,
pub dict_path: Option<String>,
pub memory_pool_config: Option<SecurePoolConfig>,
pub track_stats: bool,
pub validate_dictionary: bool,
pub min_compression_size: usize,
pub entropy_algorithm: EntropyAlgorithm,
pub entropy_interleaved: u8,
pub enable_lake: bool,
pub embedded_dict: bool,
pub input_is_perm: bool,
pub entropy_zip_ratio_require: f32,
}
impl Default for DictZipConfig {
fn default() -> Self {
Self {
dict_builder_config: DictionaryBuilderConfig::default(),
compressor_config: PaZipCompressorConfig::default(),
cache_size_bytes: 16 * 1024 * 1024, external_dictionary: false,
dict_path: None,
memory_pool_config: None,
track_stats: true,
validate_dictionary: true,
min_compression_size: 64,
entropy_algorithm: EntropyAlgorithm::None,
entropy_interleaved: 0, enable_lake: false,
embedded_dict: false,
input_is_perm: false,
entropy_zip_ratio_require: 0.8, }
}
}
impl DictZipConfig {
pub fn text_compression() -> Self {
let mut config = Self {
dict_builder_config: DictionaryBuilderConfig {
sample_sort_policy: crate::compression::dict_zip::SampleSortPolicy::SortBoth, target_dict_size: 32 * 1024 * 1024, max_dict_size: 40 * 1024 * 1024, min_frequency: 3,
max_bfs_depth: 6,
min_pattern_length: 4,
max_pattern_length: 128,
sample_ratio: 0.8,
validate_result: true,
..Default::default()
},
compressor_config: PaZipCompressorConfig::default(),
cache_size_bytes: 32 * 1024 * 1024, min_compression_size: 32,
..Default::default()
};
config
}
pub fn binary_compression() -> Self {
let mut config = Self {
dict_builder_config: DictionaryBuilderConfig {
sample_sort_policy: crate::compression::dict_zip::SampleSortPolicy::SortRight, target_dict_size: 16 * 1024 * 1024, max_dict_size: 20 * 1024 * 1024, min_frequency: 8,
max_bfs_depth: 4,
min_pattern_length: 8,
max_pattern_length: 64,
sample_ratio: 0.5,
validate_result: true,
..Default::default()
},
compressor_config: PaZipCompressorConfig::default(),
cache_size_bytes: 16 * 1024 * 1024, min_compression_size: 128,
..Default::default()
};
config
}
pub fn log_compression() -> Self {
let mut config = Self {
dict_builder_config: DictionaryBuilderConfig {
sample_sort_policy: crate::compression::dict_zip::SampleSortPolicy::SortLeft, target_dict_size: 64 * 1024 * 1024, max_dict_size: 80 * 1024 * 1024, min_frequency: 2,
max_bfs_depth: 8,
min_pattern_length: 10,
max_pattern_length: 256,
sample_ratio: 0.3, validate_result: true,
..Default::default()
},
compressor_config: PaZipCompressorConfig::default(),
cache_size_bytes: 64 * 1024 * 1024, min_compression_size: 16,
..Default::default()
};
config
}
pub fn realtime_compression() -> Self {
let mut config = Self {
dict_builder_config: DictionaryBuilderConfig {
target_dict_size: 8 * 1024 * 1024, max_dict_size: 10 * 1024 * 1024, min_frequency: 10,
max_bfs_depth: 3,
min_pattern_length: 6,
max_pattern_length: 32,
sample_ratio: 0.2,
validate_result: false, ..Default::default()
},
compressor_config: PaZipCompressorConfig::default(),
cache_size_bytes: 8 * 1024 * 1024, min_compression_size: 256,
..Default::default()
};
config
}
pub fn validate(&self) -> Result<()> {
if self.dict_builder_config.target_dict_size == 0 {
return Err(ZiporaError::invalid_data("Target dictionary size must be > 0"));
}
if self.dict_builder_config.max_dict_size < self.dict_builder_config.target_dict_size {
return Err(ZiporaError::invalid_data("Max dictionary size must be >= target size"));
}
if self.cache_size_bytes == 0 {
return Err(ZiporaError::invalid_data("Cache size must be > 0"));
}
if self.cache_size_bytes > 1024 * 1024 * 1024 {
return Err(ZiporaError::invalid_data("Cache size must be <= 1GB"));
}
if self.external_dictionary && self.dict_path.is_none() {
return Err(ZiporaError::invalid_data("External dictionary requires dict_path"));
}
if ![0, 1, 2, 4, 8].contains(&self.entropy_interleaved) {
return Err(ZiporaError::invalid_data(
"Entropy interleaved must be 0, 1, 2, 4, or 8"
));
}
if self.entropy_zip_ratio_require < 0.0 || self.entropy_zip_ratio_require > 1.0 {
return Err(ZiporaError::invalid_data(
"Entropy zip ratio requirement must be between 0.0 and 1.0"
));
}
Ok(())
}
pub fn with_external_dictionary<P: AsRef<Path>>(mut self, path: P) -> Self {
self.external_dictionary = true;
self.dict_path = Some(path.as_ref().to_string_lossy().to_string());
self
}
pub fn with_cache_size_mb(mut self, mb: usize) -> Self {
self.cache_size_bytes = mb * 1024 * 1024;
self
}
pub fn with_min_compression_size(mut self, size: usize) -> Self {
self.min_compression_size = size;
self
}
}
#[derive(Debug, Clone, Default)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct DictZipBlobStoreStats {
pub blob_stats: BlobStoreStats,
pub compression_stats: CompressionStats,
pub pa_zip_stats: PaZipCompressionStats,
pub cache_hits: u64,
pub cache_misses: u64,
pub compressed_blobs: usize,
pub uncompressed_blobs: usize,
pub dictionary_size: usize,
pub build_time_ms: u64,
}
impl DictZipBlobStoreStats {
pub fn cache_hit_ratio(&self) -> f64 {
let total = self.cache_hits + self.cache_misses;
if total > 0 {
self.cache_hits as f64 / total as f64
} else {
0.0
}
}
pub fn avg_compression_ratio(&self) -> f32 {
if self.compressed_blobs > 0 {
self.compression_stats.compression_ratio
} else {
1.0
}
}
#[inline]
pub fn memory_usage(&self) -> usize {
self.dictionary_size +
self.compression_stats.compressed_size +
(self.cache_hits + self.cache_misses) as usize * 8 }
}
pub struct DictZipBlobStoreBuilder {
config: DictZipConfig,
training_samples: Vec<Vec<u8>>,
training_size: usize,
memory_pool: Option<Arc<SecureMemoryPool>>,
progress_callback: Option<Box<dyn Fn(f64) + Send + Sync>>,
}
impl DictZipBlobStoreBuilder {
pub fn new() -> Result<Self> {
Self::with_config(DictZipConfig::default())
}
pub fn with_config(config: DictZipConfig) -> Result<Self> {
config.validate()?;
let memory_pool = if let Some(pool_config) = &config.memory_pool_config {
Some(SecureMemoryPool::new(pool_config.clone())?)
} else {
None
};
Ok(Self {
config,
training_samples: Vec::new(),
training_size: 0,
memory_pool,
progress_callback: None,
})
}
pub fn add_training_sample(&mut self, data: &[u8]) -> Result<()> {
if data.is_empty() {
return Err(ZiporaError::invalid_data("Training sample cannot be empty"));
}
self.training_samples.push(data.to_vec());
self.training_size += data.len();
Ok(())
}
pub fn add_training_file<P: AsRef<Path>>(&mut self, path: P) -> Result<()> {
let data = fs::read(path.as_ref())
.map_err(|e| ZiporaError::io_error(format!("Failed to read training file: {}", e)))?;
self.add_training_sample(&data)
}
pub fn add_training_samples<I>(&mut self, samples: I) -> Result<()>
where
I: IntoIterator<Item = Vec<u8>>,
{
for sample in samples {
self.add_training_sample(&sample)?;
}
Ok(())
}
pub fn set_dict_size_mb(&mut self, mb: usize) -> Result<()> {
let size_bytes = mb * 1024 * 1024;
self.config.dict_builder_config.target_dict_size = size_bytes;
self.config.dict_builder_config.max_dict_size = size_bytes + (size_bytes / 4); Ok(())
}
pub fn set_min_frequency(&mut self, frequency: u32) -> Result<()> {
if frequency == 0 {
return Err(ZiporaError::invalid_data("Minimum frequency must be > 0"));
}
self.config.dict_builder_config.min_frequency = frequency;
Ok(())
}
pub fn enable_advanced_caching(&mut self) -> Result<()> {
self.config.dict_builder_config.max_bfs_depth = 8;
Ok(())
}
pub fn set_progress_callback<F>(&mut self, callback: F)
where
F: Fn(f64) + Send + Sync + 'static,
{
self.progress_callback = Some(Box::new(callback));
}
pub fn training_stats(&self) -> (usize, usize) {
(self.training_samples.len(), self.training_size)
}
pub fn finish(self) -> Result<DictZipBlobStore> {
if self.training_samples.is_empty() {
return Err(ZiporaError::invalid_data("No training samples provided"));
}
let start_time = std::time::Instant::now();
let mut combined_training = Vec::with_capacity(self.training_size);
for sample in &self.training_samples {
combined_training.extend_from_slice(sample);
}
if let Some(callback) = &self.progress_callback {
callback(0.1); }
let builder = DictionaryBuilder::with_config(self.config.dict_builder_config.clone());
let dictionary = builder.build(&combined_training)?;
if let Some(callback) = &self.progress_callback {
callback(0.7); }
if self.config.validate_dictionary {
dictionary.validate()?;
}
if let Some(callback) = &self.progress_callback {
callback(0.8); }
let memory_pool = if let Some(pool) = &self.memory_pool {
pool.clone()
} else {
SecureMemoryPool::new(SecurePoolConfig::new(4096, 1024, 8))?
};
if self.config.external_dictionary {
if let Some(dict_path) = &self.config.dict_path {
#[cfg(feature = "serde")]
dictionary.save_to_file(dict_path)?;
}
}
let dictionary_size = dictionary.size_in_bytes();
let compressor = PaZipCompressor::new(
dictionary.clone(),
self.config.compressor_config.clone(),
memory_pool.clone(),
)?;
let cache_capacity = self.config.cache_size_bytes / 1024; let cache = LruMap::new(cache_capacity)?;
let build_time = start_time.elapsed();
if let Some(callback) = &self.progress_callback {
callback(1.0); }
let mut stats = DictZipBlobStoreStats::default();
stats.dictionary_size = dictionary_size;
stats.build_time_ms = build_time.as_millis() as u64;
Ok(DictZipBlobStore {
config: self.config,
dictionary: Arc::new(RwLock::new(dictionary)),
compressor: Arc::new(compressor),
storage: HashMap::new(),
cache: Arc::new(RwLock::new(cache)),
stats: Arc::new(RwLock::new(stats)),
memory_pool: Some(memory_pool),
next_id: 0,
huffman_encoder: RefCell::new(None),
huffman_decoder: RefCell::new(None),
fse_encoder: RefCell::new(None),
fse_decoder: RefCell::new(None),
})
}
}
#[derive(Debug, Clone)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
struct CompressedBlob {
compressed_data: Vec<u8>,
original_size: usize,
is_compressed: bool,
compression_ratio: f32,
entropy_algorithm: EntropyAlgorithm,
}
pub struct DictZipBlobStore {
config: DictZipConfig,
dictionary: Arc<RwLock<SuffixArrayDictionary>>,
compressor: Arc<PaZipCompressor>,
storage: HashMap<RecordId, CompressedBlob>,
cache: Arc<RwLock<LruMap<RecordId, Vec<u8>>>>,
stats: Arc<RwLock<DictZipBlobStoreStats>>,
memory_pool: Option<Arc<SecureMemoryPool>>,
next_id: u64,
huffman_encoder: RefCell<Option<ContextualHuffmanEncoder>>,
huffman_decoder: RefCell<Option<ContextualHuffmanDecoder>>,
fse_encoder: RefCell<Option<FseEncoder>>,
fse_decoder: RefCell<Option<FseDecoder>>,
}
impl DictZipBlobStore {
pub fn build_from_training_samples(
training_samples: &[Vec<u8>],
config: &crate::config::nest_louds_trie::NestLoudsTrieConfig,
) -> Result<Self> {
if training_samples.is_empty() {
return Err(ZiporaError::invalid_data("No training samples provided"));
}
let dict_zip_config = Self::convert_config_from_nest_louds_trie(config)?;
let mut builder = DictZipBlobStoreBuilder::with_config(dict_zip_config)?;
for sample in training_samples {
builder.add_training_sample(sample)?;
}
builder.finish()
}
pub fn build_from_sortable_str_vec(
keys: &crate::containers::specialized::SortableStrVec,
config: &crate::config::nest_louds_trie::NestLoudsTrieConfig,
) -> Result<Self> {
let training_samples: Vec<Vec<u8>> = (0..keys.len())
.filter_map(|i| keys.get(i).map(|s| s.as_bytes().to_vec()))
.collect();
if training_samples.is_empty() {
return Err(ZiporaError::invalid_data("No valid strings in SortableStrVec"));
}
Self::build_from_training_samples(&training_samples, config)
}
pub fn build_from_zo_sorted_str_vec(
keys: &crate::containers::specialized::ZoSortedStrVec,
config: &crate::config::nest_louds_trie::NestLoudsTrieConfig,
) -> Result<Self> {
let training_samples: Vec<Vec<u8>> = (0..keys.len())
.filter_map(|i| keys.get(i).map(|s| s.as_bytes().to_vec()))
.collect();
if training_samples.is_empty() {
return Err(ZiporaError::invalid_data("No valid strings in ZoSortedStrVec"));
}
Self::build_from_training_samples(&training_samples, config)
}
pub fn build_from_fixed_len_str_vec<const N: usize>(
keys: &crate::containers::specialized::FixedLenStrVec<N>,
config: &crate::config::nest_louds_trie::NestLoudsTrieConfig,
) -> Result<Self> {
let training_samples: Vec<Vec<u8>> = (0..keys.len())
.filter_map(|i| keys.get(i).map(|s| s.as_bytes().to_vec()))
.collect();
if training_samples.is_empty() {
return Err(ZiporaError::invalid_data("No valid strings in FixedLenStrVec"));
}
Self::build_from_training_samples(&training_samples, config)
}
pub fn build_from_vec_u8(
data: &[u8],
config: &crate::config::nest_louds_trie::NestLoudsTrieConfig,
) -> Result<Self> {
if data.is_empty() {
return Err(ZiporaError::invalid_data("Empty data provided"));
}
let training_samples = vec![data.to_vec()];
Self::build_from_training_samples(&training_samples, config)
}
fn convert_config_from_nest_louds_trie(
config: &crate::config::nest_louds_trie::NestLoudsTrieConfig,
) -> Result<DictZipConfig> {
use crate::compression::dict_zip::PaZipCompressorConfig;
let base_config = if config.optimization_flags.contains(
crate::config::nest_louds_trie::OptimizationFlags::ENABLE_FAST_SEARCH
) {
DictZipConfig::realtime_compression()
} else if config.enable_queue_compression {
DictZipConfig::binary_compression()
} else if config.best_delimiters.len() > 10 {
DictZipConfig::text_compression()
} else {
DictZipConfig::log_compression()
};
let mut dict_builder_config = base_config.dict_builder_config;
dict_builder_config.min_pattern_length = config.min_fragment_length as usize;
dict_builder_config.max_pattern_length = config.max_fragment_length as usize;
dict_builder_config.sample_ratio = 0.8; dict_builder_config.min_frequency = config.sa_fragment_min_freq as u32;
dict_builder_config.max_bfs_depth = config.max_bfs_depth.min(8);
if config.initial_pool_size > 0 {
dict_builder_config.target_dict_size = 16 * 1024 * 1024; dict_builder_config.max_dict_size = 20 * 1024 * 1024; }
dict_builder_config.validate_result = config.optimization_flags.contains(
crate::config::nest_louds_trie::OptimizationFlags::ENABLE_STATISTICS
);
let validate_result = dict_builder_config.validate_result;
Ok(DictZipConfig {
dict_builder_config,
compressor_config: PaZipCompressorConfig::default(),
cache_size_bytes: if config.node_cache_size > 0 { config.node_cache_size } else { base_config.cache_size_bytes },
external_dictionary: false, dict_path: None,
memory_pool_config: None, track_stats: true,
validate_dictionary: validate_result,
min_compression_size: if config.min_fragment_length > 0 {
config.min_fragment_length as usize
} else {
base_config.min_compression_size
},
entropy_algorithm: EntropyAlgorithm::None,
entropy_interleaved: 0,
enable_lake: false,
embedded_dict: false,
input_is_perm: false,
entropy_zip_ratio_require: 0.8,
})
}
pub fn from_dictionary_file<P: AsRef<Path>>(
dict_path: P,
config: DictZipConfig,
) -> Result<Self> {
config.validate()?;
#[cfg(feature = "serde")]
{
let dictionary = SuffixArrayDictionary::load_from_file(dict_path)?;
let memory_pool = if let Some(pool_config) = &config.memory_pool_config {
SecureMemoryPool::new(pool_config.clone())?
} else {
SecureMemoryPool::new(SecurePoolConfig::new(4096, 1024, 8))?
};
let compressor = PaZipCompressor::new(dictionary.clone(), config.compressor_config.clone(), memory_pool.clone())?;
let cache_capacity = config.cache_size_bytes / 1024; let cache = LruMap::new(cache_capacity)?;
let stats = DictZipBlobStoreStats {
dictionary_size: dictionary.size_in_bytes(),
build_time_ms: 0, ..Default::default()
};
Ok(DictZipBlobStore {
config,
dictionary: Arc::new(RwLock::new(dictionary)),
compressor: Arc::new(compressor),
storage: HashMap::new(),
cache: Arc::new(RwLock::new(cache)),
stats: Arc::new(RwLock::new(stats)),
memory_pool: Some(memory_pool),
next_id: 0,
huffman_encoder: RefCell::new(None),
huffman_decoder: RefCell::new(None),
fse_encoder: RefCell::new(None),
fse_decoder: RefCell::new(None),
})
}
#[cfg(not(feature = "serde"))]
Err(ZiporaError::not_supported("Dictionary loading requires 'serde' feature"))
}
pub fn save_dictionary<P: AsRef<Path>>(&self, path: P) -> Result<()> {
#[cfg(feature = "serde")]
{
let dictionary = self.dictionary.read()
.map_err(|_| ZiporaError::resource_busy("Dictionary read lock"))?;
dictionary.save_to_file(path)
}
#[cfg(not(feature = "serde"))]
Err(ZiporaError::not_supported("Dictionary saving requires 'serde' feature"))
}
pub fn load_dictionary<P: AsRef<Path>>(&mut self, path: P) -> Result<()> {
#[cfg(feature = "serde")]
{
let new_dictionary = SuffixArrayDictionary::load_from_file(path)?;
let memory_pool = self.memory_pool.as_ref()
.ok_or_else(|| ZiporaError::invalid_data("Memory pool not initialized"))?;
let new_compressor = PaZipCompressor::new(new_dictionary.clone(), self.config.compressor_config.clone(), Arc::clone(memory_pool))?;
*self.dictionary.write()
.map_err(|_| ZiporaError::resource_busy("Dictionary write lock"))? = new_dictionary;
self.compressor = Arc::new(new_compressor);
self.storage.clear();
self.cache.write()
.map_err(|_| ZiporaError::resource_busy("Cache write lock"))?
.clear();
let mut stats = self.stats.write()
.map_err(|_| ZiporaError::resource_busy("Stats write lock"))?;
stats.dictionary_size = self.dictionary.read()
.map_err(|_| ZiporaError::resource_busy("Dictionary read lock for stats"))?
.size_in_bytes();
Ok(())
}
#[cfg(not(feature = "serde"))]
Err(ZiporaError::not_supported("Dictionary loading requires 'serde' feature"))
}
pub fn dictionary_stats(&self) -> Result<crate::compression::dict_zip::MatchStats> {
let dictionary = self.dictionary.read()
.map_err(|_| ZiporaError::resource_busy("Dictionary read lock for stats"))?;
Ok(dictionary.match_stats().clone())
}
pub fn validate(&self) -> Result<()> {
let dict = self.dictionary.read()
.map_err(|_| ZiporaError::resource_busy("Dictionary read lock"))?;
dict.validate()?;
self.config.validate()?;
for (id, blob) in &self.storage {
if blob.original_size == 0 {
return Err(ZiporaError::invalid_data(format!("Blob {} has zero original size", id)));
}
if blob.is_compressed && blob.compressed_data.len() >= blob.original_size {
}
}
Ok(())
}
pub fn optimize(&mut self) -> Result<()> {
{
let mut cache = self.cache.write()
.map_err(|_| ZiporaError::resource_busy("Cache write lock"))?;
cache.clear();
}
self.validate()
}
pub fn detailed_stats(&self) -> Result<DictZipBlobStoreStats> {
let stats = self.stats.read()
.map_err(|_| ZiporaError::resource_busy("Stats read lock"))?;
Ok(stats.clone())
}
fn next_record_id(&mut self) -> RecordId {
self.next_id += 1;
self.next_id as RecordId
}
fn try_get_from_cache(&self, id: RecordId) -> Option<Vec<u8>> {
if let Ok(cache) = self.cache.read() {
cache.get(&id).map(|v| v.clone())
} else {
None
}
}
fn store_in_cache(&self, id: RecordId, data: Vec<u8>) {
if let Ok(mut cache) = self.cache.write() {
let _ = cache.put(id, data);
}
}
fn update_get_stats(&self, cache_hit: bool) {
if let Ok(mut stats) = self.stats.write() {
if cache_hit {
stats.cache_hits += 1;
} else {
stats.cache_misses += 1;
}
stats.blob_stats.record_get(cache_hit);
}
}
fn update_put_stats(&self, original_size: usize, compressed_size: usize, is_compressed: bool) {
if let Ok(mut stats) = self.stats.write() {
stats.blob_stats.record_put(original_size);
if is_compressed {
stats.compressed_blobs += 1;
stats.compression_stats.uncompressed_size += original_size;
stats.compression_stats.compressed_size += compressed_size;
stats.compression_stats.compressed_count += 1;
stats.compression_stats.compression_ratio =
stats.compression_stats.compressed_size as f32 /
stats.compression_stats.uncompressed_size as f32;
} else {
stats.uncompressed_blobs += 1;
}
}
}
fn update_remove_stats(&self, original_size: usize) {
if let Ok(mut stats) = self.stats.write() {
stats.blob_stats.record_remove(original_size);
}
}
fn apply_entropy_encoding(&self, data: &[u8]) -> Result<Vec<u8>> {
match self.config.entropy_algorithm {
EntropyAlgorithm::None => {
Ok(data.to_vec())
}
EntropyAlgorithm::HuffmanO1 => {
self.apply_huffman_o1_encoding(data)
}
EntropyAlgorithm::Fse => {
self.apply_fse_encoding(data)
}
}
}
fn apply_huffman_o1_encoding(&self, data: &[u8]) -> Result<Vec<u8>> {
if self.huffman_encoder.borrow().is_none() {
let dict = self.dictionary.read()
.map_err(|_| ZiporaError::resource_busy("Dictionary read lock"))?;
let training_data = dict.data();
let new_encoder = ContextualHuffmanEncoder::new(training_data, crate::entropy::huffman::HuffmanOrder::Order1)?;
*self.huffman_encoder.borrow_mut() = Some(new_encoder);
}
let binding = self.huffman_encoder.borrow();
let encoder = binding.as_ref().expect("encoder must be initialized").clone();
match self.config.entropy_interleaved {
0 | 1 => encoder.encode_x1(data),
2 => encoder.encode_x2(data),
4 => encoder.encode_x4(data),
8 => encoder.encode_x8(data),
_ => {
Err(ZiporaError::Configuration {
message: format!("Invalid interleaving factor: {}",
self.config.entropy_interleaved)
})
}
}
}
fn apply_fse_encoding(&self, data: &[u8]) -> Result<Vec<u8>> {
if self.fse_encoder.borrow().is_none() {
let config = FseConfig {
parallel_blocks: if self.config.entropy_interleaved > 1 {
Some(self.config.entropy_interleaved as usize)
} else {
None
},
..Default::default()
};
let new_encoder = FseEncoder::new(config)?;
*self.fse_encoder.borrow_mut() = Some(new_encoder);
}
self.fse_encoder.borrow_mut().as_mut().expect("FSE encoder must be initialized").compress(data)
}
fn check_compression_ratio(&self, original: &[u8], compressed: &[u8]) -> bool {
if original.is_empty() {
return false;
}
let ratio = compressed.len() as f32 / original.len() as f32;
ratio <= self.config.entropy_zip_ratio_require
}
fn decode_entropy(&self, data: &[u8], original_size: usize, entropy_algo: EntropyAlgorithm) -> Result<Vec<u8>> {
match entropy_algo {
EntropyAlgorithm::None => Ok(data.to_vec()),
EntropyAlgorithm::HuffmanO1 => {
self.decode_huffman_o1(data, original_size)
}
EntropyAlgorithm::Fse => {
self.decode_fse(data, original_size)
}
}
}
fn decode_huffman_o1(&self, data: &[u8], original_size: usize) -> Result<Vec<u8>> {
if self.huffman_decoder.borrow().is_none() {
let dict = self.dictionary.read()
.map_err(|_| ZiporaError::resource_busy("Dictionary read lock"))?;
let training_data = dict.data();
let encoder = ContextualHuffmanEncoder::new(training_data, crate::entropy::huffman::HuffmanOrder::Order1)?;
let new_decoder = ContextualHuffmanDecoder::new(encoder);
*self.huffman_decoder.borrow_mut() = Some(new_decoder);
}
let binding = self.huffman_decoder.borrow();
let decoder = binding.as_ref().expect("decoder must be initialized").clone();
decoder.decode(data, original_size)
}
fn decode_fse(&self, data: &[u8], _original_size: usize) -> Result<Vec<u8>> {
if self.fse_decoder.borrow().is_none() {
let new_decoder = FseDecoder::new();
*self.fse_decoder.borrow_mut() = Some(new_decoder);
}
self.fse_decoder.borrow_mut().as_mut().expect("FSE decoder must be initialized").decompress(data)
}
}
impl BlobStore for DictZipBlobStore {
fn get(&self, id: RecordId) -> Result<Vec<u8>> {
if let Some(cached_data) = self.try_get_from_cache(id) {
self.update_get_stats(true);
return Ok(cached_data);
}
let blob = self.storage.get(&id)
.ok_or_else(|| ZiporaError::invalid_data(format!("Blob {} not found", id)))?;
let dict_compressed = self.decode_entropy(
&blob.compressed_data,
blob.original_size,
blob.entropy_algorithm
)?;
let decompressed_data = if blob.is_compressed {
let mut decompressed = Vec::new();
let mut compressor_copy = (*self.compressor).clone();
compressor_copy.decompress(&dict_compressed, &mut decompressed)
.map_err(|e| ZiporaError::invalid_data(&format!("Decompression failed: {}", e)))?;
decompressed
} else {
dict_compressed
};
self.store_in_cache(id, decompressed_data.clone());
self.update_get_stats(false);
Ok(decompressed_data)
}
fn put(&mut self, data: &[u8]) -> Result<RecordId> {
if data.is_empty() {
return Err(ZiporaError::invalid_data("Cannot store empty blob"));
}
let id = self.next_record_id();
let original_size = data.len();
let should_compress = original_size >= self.config.min_compression_size;
let blob = if should_compress {
let mut dict_compressed = Vec::new();
let mut compressor_copy = (*self.compressor).clone();
let _compression_stats = compressor_copy.compress(data, &mut dict_compressed)
.map_err(|e| ZiporaError::invalid_data(&format!("Compression failed: {}", e)))?;
let (final_compressed, entropy_algorithm) = if self.config.entropy_algorithm != EntropyAlgorithm::None {
let entropy_encoded = self.apply_entropy_encoding(&dict_compressed)?;
if self.check_compression_ratio(&dict_compressed, &entropy_encoded) {
(entropy_encoded, self.config.entropy_algorithm)
} else {
(dict_compressed, EntropyAlgorithm::None)
}
} else {
(dict_compressed, EntropyAlgorithm::None)
};
let compression_ratio = if final_compressed.len() > 0 {
final_compressed.len() as f32 / original_size as f32
} else {
1.0
};
if final_compressed.len() < original_size {
CompressedBlob {
compressed_data: final_compressed,
original_size,
is_compressed: true,
compression_ratio,
entropy_algorithm,
}
} else {
CompressedBlob {
compressed_data: data.to_vec(),
original_size,
is_compressed: false,
compression_ratio: 1.0,
entropy_algorithm: EntropyAlgorithm::None,
}
}
} else {
CompressedBlob {
compressed_data: data.to_vec(),
original_size,
is_compressed: false,
compression_ratio: 1.0,
entropy_algorithm: EntropyAlgorithm::None,
}
};
let compressed_size = blob.compressed_data.len();
let is_compressed = blob.is_compressed;
self.storage.insert(id, blob);
self.update_put_stats(original_size, compressed_size, is_compressed);
Ok(id)
}
fn remove(&mut self, id: RecordId) -> Result<()> {
let blob = self.storage.remove(&id)
.ok_or_else(|| ZiporaError::invalid_data(format!("Blob {} not found", id)))?;
if let Ok(mut cache) = self.cache.write() {
cache.remove(&id);
}
self.update_remove_stats(blob.original_size);
Ok(())
}
fn contains(&self, id: RecordId) -> bool {
self.storage.contains_key(&id)
}
fn size(&self, id: RecordId) -> Result<Option<usize>> {
Ok(self.storage.get(&id).map(|blob| blob.original_size))
}
fn len(&self) -> usize {
self.storage.len()
}
fn stats(&self) -> BlobStoreStats {
if let Ok(stats) = self.stats.read() {
stats.blob_stats.clone()
} else {
BlobStoreStats::default()
}
}
}
impl CompressedBlobStore for DictZipBlobStore {
fn compression_ratio(&self, id: RecordId) -> Result<Option<f32>> {
Ok(self.storage.get(&id).map(|blob| blob.compression_ratio))
}
fn compressed_size(&self, id: RecordId) -> Result<Option<usize>> {
Ok(self.storage.get(&id).map(|blob| blob.compressed_data.len()))
}
fn compression_stats(&self) -> CompressionStats {
if let Ok(stats) = self.stats.read() {
stats.compression_stats.clone()
} else {
CompressionStats::default()
}
}
}
impl BatchBlobStore for DictZipBlobStore {
fn put_batch<I>(&mut self, blobs: I) -> Result<Vec<RecordId>>
where
I: IntoIterator<Item = Vec<u8>>,
{
let mut ids = Vec::new();
for blob in blobs {
let id = self.put(&blob)?;
ids.push(id);
}
Ok(ids)
}
fn get_batch<I>(&self, ids: I) -> Result<Vec<Option<Vec<u8>>>>
where
I: IntoIterator<Item = RecordId>,
{
let mut results = Vec::new();
for id in ids {
let result = match self.get(id) {
Ok(data) => Some(data),
Err(ZiporaError::InvalidData { .. }) => None,
Err(e) => return Err(e),
};
results.push(result);
}
Ok(results)
}
fn remove_batch<I>(&mut self, ids: I) -> Result<usize>
where
I: IntoIterator<Item = RecordId>,
{
let mut removed_count = 0;
for id in ids {
match self.remove(id) {
Ok(()) => removed_count += 1,
Err(ZiporaError::InvalidData { .. }) => {}, Err(e) => return Err(e),
}
}
Ok(removed_count)
}
}
impl DictZipBlobStore {
pub fn iter_ids_vec(&self) -> Vec<RecordId> {
self.storage.keys().copied().collect()
}
pub fn iter_blobs_vec(&self) -> Result<Vec<(RecordId, Vec<u8>)>> {
let mut blobs = Vec::new();
for &id in self.storage.keys() {
let data = self.get(id)?;
blobs.push((id, data));
}
Ok(blobs)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::compression::dict_zip::DictionaryBuilderConfig;
fn create_test_training_data() -> Vec<Vec<u8>> {
vec![
b"The quick brown fox jumps over the lazy dog".to_vec(),
b"The lazy dog was jumped over by the quick brown fox".to_vec(),
b"Quick brown foxes are faster than lazy dogs".to_vec(),
b"Dogs and foxes are both animals".to_vec(),
b"Animals like dogs and foxes live in nature".to_vec(),
]
}
#[test]
fn test_dict_zip_config() {
let config = DictZipConfig::default();
assert!(config.validate().is_ok());
let text_config = DictZipConfig::text_compression();
assert!(text_config.validate().is_ok());
assert_eq!(text_config.min_compression_size, 32);
let binary_config = DictZipConfig::binary_compression();
assert!(binary_config.validate().is_ok());
assert_eq!(binary_config.min_compression_size, 128);
}
#[test]
fn test_builder_basic() -> Result<()> {
let config = DictZipConfig {
dict_builder_config: DictionaryBuilderConfig {
target_dict_size: 1024,
max_dict_size: 8192,
validate_result: true,
..Default::default()
},
validate_dictionary: true,
..Default::default()
};
let mut builder = DictZipBlobStoreBuilder::with_config(config)?;
let training_data = create_test_training_data();
for sample in training_data {
builder.add_training_sample(&sample)?;
}
let (sample_count, total_size) = builder.training_stats();
assert_eq!(sample_count, 5);
assert!(total_size > 0);
let store = builder.finish()?;
assert_eq!(store.len(), 0);
Ok(())
}
#[test]
fn test_blob_store_operations() -> Result<()> {
let config = DictZipConfig {
dict_builder_config: DictionaryBuilderConfig {
target_dict_size: 1024,
max_dict_size: 8192,
validate_result: false, ..Default::default()
},
min_compression_size: 10, ..Default::default()
};
let mut builder = DictZipBlobStoreBuilder::with_config(config)?;
let training_data = create_test_training_data();
for sample in training_data {
builder.add_training_sample(&sample)?;
}
let mut store = builder.finish()?;
let test_data = b"The quick brown fox";
let id = store.put(test_data)?;
let retrieved = store.get(id)?;
assert_eq!(test_data, retrieved.as_slice());
assert!(store.contains(id));
assert!(!store.contains(999));
assert_eq!(store.size(id)?, Some(test_data.len()));
assert_eq!(store.size(999)?, None);
store.remove(id)?;
assert!(!store.contains(id));
assert!(store.get(id).is_err());
Ok(())
}
#[test]
fn test_batch_operations() -> Result<()> {
let config = DictZipConfig {
dict_builder_config: DictionaryBuilderConfig {
target_dict_size: 1024,
max_dict_size: 8192,
validate_result: false,
..Default::default()
},
min_compression_size: 10,
..Default::default()
};
let mut builder = DictZipBlobStoreBuilder::with_config(config)?;
let training_data = create_test_training_data();
for sample in training_data {
builder.add_training_sample(&sample)?;
}
let mut store = builder.finish()?;
let test_blobs = vec![
b"First blob".to_vec(),
b"Second blob".to_vec(),
b"Third blob".to_vec(),
];
let ids = store.put_batch(test_blobs.clone())?;
assert_eq!(ids.len(), 3);
let retrieved = store.get_batch(ids.clone())?;
assert_eq!(retrieved.len(), 3);
for (i, data) in retrieved.iter().enumerate() {
assert_eq!(data.as_ref().unwrap(), &test_blobs[i]);
}
let removed_count = store.remove_batch(ids)?;
assert_eq!(removed_count, 3);
assert_eq!(store.len(), 0);
Ok(())
}
#[test]
fn test_compression_stats() -> Result<()> {
let config = DictZipConfig {
dict_builder_config: DictionaryBuilderConfig {
target_dict_size: 2048,
max_dict_size: 8192,
validate_result: false,
..Default::default()
},
min_compression_size: 5, track_stats: true,
..Default::default()
};
let mut builder = DictZipBlobStoreBuilder::with_config(config)?;
let training_data = create_test_training_data();
for sample in training_data {
builder.add_training_sample(&sample)?;
}
let mut store = builder.finish()?;
let test_data = b"The quick brown fox jumps over the lazy dog";
let _id1 = store.put(test_data)?;
let _id2 = store.put(b"Short")?; let _id3 = store.put(test_data)?;
let stats = store.compression_stats();
assert!(stats.compressed_count > 0 || stats.compressed_count == 0);
let detailed_stats = store.detailed_stats()?;
assert!(detailed_stats.dictionary_size > 0);
assert!(detailed_stats.build_time_ms > 0);
Ok(())
}
#[test]
fn test_caching() -> Result<()> {
let config = DictZipConfig {
dict_builder_config: DictionaryBuilderConfig {
target_dict_size: 1024,
max_dict_size: 8192,
validate_result: false,
..Default::default()
},
cache_size_bytes: 1024, ..Default::default()
};
let mut builder = DictZipBlobStoreBuilder::with_config(config)?;
let training_data = create_test_training_data();
for sample in training_data {
builder.add_training_sample(&sample)?;
}
let mut store = builder.finish()?;
let test_data = b"Test data for caching";
let id = store.put(test_data)?;
let _retrieved1 = store.get(id)?;
let _retrieved2 = store.get(id)?;
let detailed_stats = store.detailed_stats()?;
assert!(detailed_stats.cache_hits > 0 || detailed_stats.cache_misses > 0);
Ok(())
}
#[test]
fn test_validation() -> Result<()> {
let config = DictZipConfig {
dict_builder_config: DictionaryBuilderConfig {
target_dict_size: 1024,
max_dict_size: 8192,
validate_result: true, ..Default::default()
},
validate_dictionary: true,
..Default::default()
};
let mut builder = DictZipBlobStoreBuilder::with_config(config)?;
let training_data = create_test_training_data();
for sample in training_data {
builder.add_training_sample(&sample)?;
}
let store = builder.finish()?;
assert!(store.validate().is_ok());
Ok(())
}
#[test]
fn test_builder_configuration_methods() -> Result<()> {
let mut builder = DictZipBlobStoreBuilder::new()?;
builder.set_dict_size_mb(16)?;
builder.set_min_frequency(8)?;
builder.enable_advanced_caching()?;
assert_eq!(builder.config.dict_builder_config.target_dict_size, 16 * 1024 * 1024);
assert_eq!(builder.config.dict_builder_config.min_frequency, 8);
assert_eq!(builder.config.dict_builder_config.max_bfs_depth, 8);
Ok(())
}
#[test]
fn test_config_presets() {
let text_config = DictZipConfig::text_compression();
assert_eq!(text_config.min_compression_size, 32);
let binary_config = DictZipConfig::binary_compression();
assert_eq!(binary_config.min_compression_size, 128);
let log_config = DictZipConfig::log_compression();
assert_eq!(log_config.min_compression_size, 16);
let realtime_config = DictZipConfig::realtime_compression();
assert_eq!(realtime_config.min_compression_size, 256);
assert!(!realtime_config.dict_builder_config.validate_result); }
#[test]
fn test_error_handling() {
let mut builder = DictZipBlobStoreBuilder::new().unwrap();
assert!(builder.finish().is_err());
let mut builder = DictZipBlobStoreBuilder::new().unwrap();
assert!(builder.add_training_sample(b"").is_err());
let invalid_config = DictZipConfig {
cache_size_bytes: 0, ..Default::default()
};
assert!(invalid_config.validate().is_err());
}
#[test]
fn test_entropy_algorithm_none() -> Result<()> {
let config = DictZipConfig {
dict_builder_config: DictionaryBuilderConfig {
target_dict_size: 1024,
max_dict_size: 8192,
validate_result: false,
..Default::default()
},
min_compression_size: 10,
entropy_algorithm: EntropyAlgorithm::None,
entropy_interleaved: 0,
..Default::default()
};
let mut builder = DictZipBlobStoreBuilder::with_config(config)?;
for sample in create_test_training_data() {
builder.add_training_sample(&sample)?;
}
let mut store = builder.finish()?;
let test_data = b"The quick brown fox jumps over the lazy dog";
let id = store.put(test_data)?;
let retrieved = store.get(id)?;
assert_eq!(test_data, retrieved.as_slice());
Ok(())
}
#[test]
fn test_entropy_huffman_o1_x1() -> Result<()> {
let config = DictZipConfig {
dict_builder_config: DictionaryBuilderConfig {
target_dict_size: 1024,
max_dict_size: 8192,
validate_result: false,
..Default::default()
},
min_compression_size: 10,
entropy_algorithm: EntropyAlgorithm::HuffmanO1,
entropy_interleaved: 1, entropy_zip_ratio_require: 0.95, ..Default::default()
};
let mut builder = DictZipBlobStoreBuilder::with_config(config)?;
for sample in create_test_training_data() {
builder.add_training_sample(&sample)?;
}
let mut store = builder.finish()?;
let test_data = b"The quick brown fox jumps over the lazy dog and then the quick brown fox runs away";
let id = store.put(test_data)?;
let retrieved = store.get(id)?;
assert_eq!(test_data, retrieved.as_slice());
Ok(())
}
#[test]
fn test_entropy_huffman_o1_x2() -> Result<()> {
let config = DictZipConfig {
dict_builder_config: DictionaryBuilderConfig {
target_dict_size: 1024,
max_dict_size: 8192,
validate_result: false,
..Default::default()
},
min_compression_size: 10,
entropy_algorithm: EntropyAlgorithm::HuffmanO1,
entropy_interleaved: 2, entropy_zip_ratio_require: 0.95,
..Default::default()
};
let mut builder = DictZipBlobStoreBuilder::with_config(config)?;
for sample in create_test_training_data() {
builder.add_training_sample(&sample)?;
}
let mut store = builder.finish()?;
let test_data = b"Dogs and foxes are both animals that run in nature";
let id = store.put(test_data)?;
let retrieved = store.get(id)?;
assert_eq!(test_data, retrieved.as_slice());
Ok(())
}
#[test]
fn test_entropy_huffman_o1_x4() -> Result<()> {
let config = DictZipConfig {
dict_builder_config: DictionaryBuilderConfig {
target_dict_size: 1024,
max_dict_size: 8192,
validate_result: false,
..Default::default()
},
min_compression_size: 10,
entropy_algorithm: EntropyAlgorithm::HuffmanO1,
entropy_interleaved: 4, entropy_zip_ratio_require: 0.95,
..Default::default()
};
let mut builder = DictZipBlobStoreBuilder::with_config(config)?;
for sample in create_test_training_data() {
builder.add_training_sample(&sample)?;
}
let mut store = builder.finish()?;
let test_data = b"Animals like dogs and foxes live in nature and run around";
let id = store.put(test_data)?;
let retrieved = store.get(id)?;
assert_eq!(test_data, retrieved.as_slice());
Ok(())
}
#[test]
fn test_entropy_huffman_o1_x8() -> Result<()> {
let config = DictZipConfig {
dict_builder_config: DictionaryBuilderConfig {
target_dict_size: 1024,
max_dict_size: 8192,
validate_result: false,
..Default::default()
},
min_compression_size: 10,
entropy_algorithm: EntropyAlgorithm::HuffmanO1,
entropy_interleaved: 8, entropy_zip_ratio_require: 0.95,
..Default::default()
};
let mut builder = DictZipBlobStoreBuilder::with_config(config)?;
for sample in create_test_training_data() {
builder.add_training_sample(&sample)?;
}
let mut store = builder.finish()?;
let test_data = b"The lazy dog was jumped over by the quick brown fox multiple times";
let id = store.put(test_data)?;
let retrieved = store.get(id)?;
assert_eq!(test_data, retrieved.as_slice());
Ok(())
}
#[test]
fn test_entropy_fse() -> Result<()> {
let config = DictZipConfig {
dict_builder_config: DictionaryBuilderConfig {
target_dict_size: 1024,
max_dict_size: 8192,
validate_result: false,
..Default::default()
},
min_compression_size: 10,
entropy_algorithm: EntropyAlgorithm::Fse,
entropy_interleaved: 0, entropy_zip_ratio_require: 0.95,
..Default::default()
};
let mut builder = DictZipBlobStoreBuilder::with_config(config)?;
for sample in create_test_training_data() {
builder.add_training_sample(&sample)?;
}
let mut store = builder.finish()?;
let test_data = b"Quick brown foxes are faster than lazy dogs in every way";
let id = store.put(test_data)?;
let retrieved = store.get(id)?;
assert_eq!(test_data, retrieved.as_slice());
Ok(())
}
#[test]
fn test_entropy_compression_ratio_fallback() -> Result<()> {
let config = DictZipConfig {
dict_builder_config: DictionaryBuilderConfig {
target_dict_size: 1024,
max_dict_size: 8192,
validate_result: false,
..Default::default()
},
min_compression_size: 10,
entropy_algorithm: EntropyAlgorithm::HuffmanO1,
entropy_interleaved: 1,
entropy_zip_ratio_require: 0.1, ..Default::default()
};
let mut builder = DictZipBlobStoreBuilder::with_config(config)?;
for sample in create_test_training_data() {
builder.add_training_sample(&sample)?;
}
let mut store = builder.finish()?;
let test_data = b"Short text";
let id = store.put(test_data)?;
let retrieved = store.get(id)?;
assert_eq!(test_data, retrieved.as_slice());
Ok(())
}
#[test]
fn test_entropy_config_validation() {
let valid_config = DictZipConfig {
entropy_interleaved: 4,
..Default::default()
};
assert!(valid_config.validate().is_ok());
let invalid_config = DictZipConfig {
entropy_interleaved: 3, ..Default::default()
};
assert!(invalid_config.validate().is_err());
let invalid_ratio_config = DictZipConfig {
entropy_zip_ratio_require: 1.5, ..Default::default()
};
assert!(invalid_ratio_config.validate().is_err());
}
#[test]
fn test_multiple_entropy_roundtrips() -> Result<()> {
let config = DictZipConfig {
dict_builder_config: DictionaryBuilderConfig {
target_dict_size: 1024,
max_dict_size: 8192,
validate_result: false,
..Default::default()
},
min_compression_size: 10,
entropy_algorithm: EntropyAlgorithm::HuffmanO1,
entropy_interleaved: 2,
entropy_zip_ratio_require: 0.95,
..Default::default()
};
let mut builder = DictZipBlobStoreBuilder::with_config(config)?;
for sample in create_test_training_data() {
builder.add_training_sample(&sample)?;
}
let mut store = builder.finish()?;
let test_blobs = vec![
b"First test blob with dogs and foxes".to_vec(),
b"Second test blob with quick brown animals".to_vec(),
b"Third test blob describing lazy behavior".to_vec(),
];
let mut ids = Vec::new();
for blob in &test_blobs {
let id = store.put(blob)?;
ids.push(id);
}
for (i, id) in ids.iter().enumerate() {
let retrieved = store.get(*id)?;
assert_eq!(test_blobs[i].as_slice(), retrieved.as_slice());
}
Ok(())
}
}