use crate::core::error::{Error, Result};
use crate::storage::zero_copy::{ZeroCopyView, ZeroCopyManager, CacheLevel, MemoryMappedView};
use std::sync::{Arc, RwLock, Mutex};
use std::collections::HashMap;
use std::str;
use std::ops::Range;
#[derive(Debug)]
pub struct UnifiedStringPool {
manager: Arc<ZeroCopyManager>,
buffer: Arc<RwLock<ZeroCopyView<u8>>>,
strings: Arc<RwLock<Vec<StringMetadata>>>,
dedup_index: Arc<RwLock<HashMap<u64, u32>>>,
buffer_info: Arc<Mutex<BufferInfo>>,
config: UnifiedStringPoolConfig,
}
#[derive(Debug, Clone, Copy)]
pub struct StringMetadata {
pub offset: u32,
pub length: u32,
pub hash: u64,
pub ref_count: u32,
}
#[derive(Debug)]
struct BufferInfo {
current_pos: usize,
capacity: usize,
string_count: usize,
bytes_used: usize,
fragmentation_ratio: f64,
}
#[derive(Debug, Clone)]
pub struct UnifiedStringPoolConfig {
pub initial_buffer_size: usize,
pub max_buffer_size: usize,
pub enable_deduplication: bool,
pub cache_level: CacheLevel,
pub compaction_threshold: f64,
pub enable_memory_mapping: bool,
}
impl Default for UnifiedStringPoolConfig {
fn default() -> Self {
Self {
initial_buffer_size: 1024 * 1024, max_buffer_size: 64 * 1024 * 1024, enable_deduplication: true,
cache_level: CacheLevel::L3,
compaction_threshold: 0.3,
enable_memory_mapping: true,
}
}
}
#[derive(Debug, Clone)]
pub struct UnifiedStringView {
metadata: StringMetadata,
pool_ref: Arc<UnifiedStringPool>,
}
impl UnifiedStringView {
pub fn as_str(&self) -> Result<String> {
let buffer = self.pool_ref.buffer.read()
.map_err(|_| Error::InvalidOperation("Failed to acquire buffer lock".to_string()))?;
let start = self.metadata.offset as usize;
let end = start + self.metadata.length as usize;
if end > buffer.len() {
return Err(Error::InvalidOperation("String extends beyond buffer".to_string()));
}
let data = &buffer.as_slice()[start..end];
let s = str::from_utf8(data)
.map_err(|e| Error::InvalidOperation(format!("Invalid UTF-8: {}", e)))?;
Ok(s.to_string())
}
pub fn as_bytes(&self) -> Result<Vec<u8>> {
let buffer = self.pool_ref.buffer.read()
.map_err(|_| Error::InvalidOperation("Failed to acquire buffer lock".to_string()))?;
let start = self.metadata.offset as usize;
let end = start + self.metadata.length as usize;
if end > buffer.len() {
return Err(Error::InvalidOperation("String extends beyond buffer".to_string()));
}
Ok(buffer.as_slice()[start..end].to_vec())
}
pub fn len(&self) -> usize {
self.metadata.length as usize
}
pub fn is_empty(&self) -> bool {
self.metadata.length == 0
}
pub fn metadata(&self) -> StringMetadata {
self.metadata
}
pub fn substring(&self, range: Range<usize>) -> Result<UnifiedStringView> {
if range.start > self.len() || range.end > self.len() || range.start > range.end {
return Err(Error::InvalidOperation("Invalid substring range".to_string()));
}
let sub_metadata = StringMetadata {
offset: self.metadata.offset + range.start as u32,
length: (range.end - range.start) as u32,
hash: 0, ref_count: 1,
};
Ok(UnifiedStringView {
metadata: sub_metadata,
pool_ref: Arc::clone(&self.pool_ref),
})
}
}
impl std::fmt::Display for UnifiedStringView {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.as_str().unwrap_or_else(|_| "<invalid UTF-8>".to_string()))
}
}
impl UnifiedStringPool {
pub fn new(config: UnifiedStringPoolConfig) -> Result<Self> {
let manager = Arc::new(ZeroCopyManager::new()?);
let buffer = manager.create_view(vec![0u8; config.initial_buffer_size])?;
let buffer_info = BufferInfo {
current_pos: 0,
capacity: config.initial_buffer_size,
string_count: 0,
bytes_used: 0,
fragmentation_ratio: 0.0,
};
Ok(Self {
manager,
buffer: Arc::new(RwLock::new(buffer)),
strings: Arc::new(RwLock::new(Vec::new())),
dedup_index: Arc::new(RwLock::new(HashMap::new())),
buffer_info: Arc::new(Mutex::new(buffer_info)),
config,
})
}
pub fn with_default_config() -> Result<Self> {
Self::new(UnifiedStringPoolConfig::default())
}
pub fn add_string(&self, s: &str) -> Result<u32> {
let bytes = s.as_bytes();
let hash = self.hash_string(s);
if self.config.enable_deduplication {
let dedup_index = self.dedup_index.read()
.map_err(|_| Error::InvalidOperation("Failed to acquire dedup index lock".to_string()))?;
if let Some(&existing_id) = dedup_index.get(&hash) {
self.increment_ref_count(existing_id)?;
return Ok(existing_id);
}
}
self.add_new_string(bytes, hash)
}
pub fn add_strings(&self, strings: &[String]) -> Result<Vec<u32>> {
let mut result = Vec::with_capacity(strings.len());
for s in strings {
result.push(self.add_string(s)?);
}
Ok(result)
}
pub fn get_string(&self, string_id: u32) -> Result<UnifiedStringView> {
let strings = self.strings.read()
.map_err(|_| Error::InvalidOperation("Failed to acquire strings lock".to_string()))?;
let metadata = strings.get(string_id as usize)
.ok_or_else(|| Error::InvalidOperation(format!("String ID {} not found", string_id)))?
.clone();
Ok(UnifiedStringView {
metadata,
pool_ref: Arc::new(self.clone()),
})
}
pub fn get_strings(&self, string_ids: &[u32]) -> Result<Vec<UnifiedStringView>> {
let mut result = Vec::with_capacity(string_ids.len());
for &id in string_ids {
result.push(self.get_string(id)?);
}
Ok(result)
}
pub fn get_string_owned(&self, string_id: u32) -> Result<String> {
let view = self.get_string(string_id)?;
Ok(view.as_str()?.to_string())
}
pub fn compact(&self) -> Result<()> {
let buffer_info = self.buffer_info.lock()
.map_err(|_| Error::InvalidOperation("Failed to acquire buffer info lock".to_string()))?;
if buffer_info.fragmentation_ratio < self.config.compaction_threshold {
return Ok(()); }
drop(buffer_info);
self.create_compacted_buffer()
}
pub fn stats(&self) -> Result<UnifiedStringPoolStats> {
let buffer_info = self.buffer_info.lock()
.map_err(|_| Error::InvalidOperation("Failed to acquire buffer info lock".to_string()))?;
let strings = self.strings.read()
.map_err(|_| Error::InvalidOperation("Failed to acquire strings lock".to_string()))?;
let dedup_index = self.dedup_index.read()
.map_err(|_| Error::InvalidOperation("Failed to acquire dedup index lock".to_string()))?;
Ok(UnifiedStringPoolStats {
total_strings: strings.len(),
unique_strings: dedup_index.len(),
total_bytes: buffer_info.bytes_used,
buffer_capacity: buffer_info.capacity,
fragmentation_ratio: buffer_info.fragmentation_ratio,
deduplication_ratio: if strings.len() > 0 {
1.0 - (dedup_index.len() as f64 / strings.len() as f64)
} else {
0.0
},
memory_efficiency: if buffer_info.capacity > 0 {
buffer_info.bytes_used as f64 / buffer_info.capacity as f64
} else {
0.0
},
})
}
fn add_new_string(&self, bytes: &[u8], hash: u64) -> Result<u32> {
let mut buffer_info = self.buffer_info.lock()
.map_err(|_| Error::InvalidOperation("Failed to acquire buffer info lock".to_string()))?;
if buffer_info.current_pos + bytes.len() > buffer_info.capacity {
drop(buffer_info);
self.expand_buffer(bytes.len())?;
buffer_info = self.buffer_info.lock()
.map_err(|_| Error::InvalidOperation("Failed to acquire buffer info lock".to_string()))?;
}
let offset = buffer_info.current_pos as u32;
{
let mut buffer = self.buffer.write()
.map_err(|_| Error::InvalidOperation("Failed to acquire buffer write lock".to_string()))?;
unsafe {
let dest = &mut buffer.as_mut_slice()[buffer_info.current_pos..buffer_info.current_pos + bytes.len()];
dest.copy_from_slice(bytes);
}
}
let metadata = StringMetadata {
offset,
length: bytes.len() as u32,
hash,
ref_count: 1,
};
let string_id = {
let mut strings = self.strings.write()
.map_err(|_| Error::InvalidOperation("Failed to acquire strings write lock".to_string()))?;
let id = strings.len() as u32;
strings.push(metadata);
id
};
if self.config.enable_deduplication {
let mut dedup_index = self.dedup_index.write()
.map_err(|_| Error::InvalidOperation("Failed to acquire dedup index write lock".to_string()))?;
dedup_index.insert(hash, string_id);
}
buffer_info.current_pos += bytes.len();
buffer_info.bytes_used += bytes.len();
buffer_info.string_count += 1;
Ok(string_id)
}
fn expand_buffer(&self, min_additional_size: usize) -> Result<()> {
let new_capacity = {
let buffer_info = self.buffer_info.lock()
.map_err(|_| Error::InvalidOperation("Failed to acquire buffer info lock".to_string()))?;
let current_capacity = buffer_info.capacity;
let required_capacity = buffer_info.current_pos + min_additional_size;
(current_capacity * 2).max(required_capacity)
};
if new_capacity > self.config.max_buffer_size && self.config.enable_memory_mapping {
return self.switch_to_memory_mapping(new_capacity);
}
let new_buffer = self.manager.create_view(vec![0u8; new_capacity])?;
{
let old_buffer = self.buffer.read()
.map_err(|_| Error::InvalidOperation("Failed to acquire old buffer lock".to_string()))?;
let buffer_info = self.buffer_info.lock()
.map_err(|_| Error::InvalidOperation("Failed to acquire buffer info lock".to_string()))?;
unsafe {
let src = old_buffer.as_slice();
let dest = &new_buffer.as_slice()[..buffer_info.current_pos];
}
}
{
let mut buffer = self.buffer.write()
.map_err(|_| Error::InvalidOperation("Failed to acquire buffer write lock".to_string()))?;
let buffer_info = self.buffer_info.lock()
.map_err(|_| Error::InvalidOperation("Failed to acquire buffer info lock".to_string()))?;
unsafe {
let src = buffer.as_slice();
std::ptr::copy_nonoverlapping(
src.as_ptr(),
new_buffer.as_slice().as_ptr() as *mut u8,
buffer_info.current_pos
);
}
*buffer = new_buffer;
}
{
let mut buffer_info = self.buffer_info.lock()
.map_err(|_| Error::InvalidOperation("Failed to acquire buffer info lock".to_string()))?;
buffer_info.capacity = new_capacity;
}
Ok(())
}
fn switch_to_memory_mapping(&self, _required_capacity: usize) -> Result<()> {
Err(Error::NotImplemented("Memory mapping for large string pools".to_string()))
}
fn create_compacted_buffer(&self) -> Result<()> {
Err(Error::NotImplemented("Buffer compaction".to_string()))
}
fn increment_ref_count(&self, string_id: u32) -> Result<()> {
let mut strings = self.strings.write()
.map_err(|_| Error::InvalidOperation("Failed to acquire strings write lock".to_string()))?;
if let Some(metadata) = strings.get_mut(string_id as usize) {
metadata.ref_count += 1;
}
Ok(())
}
fn hash_string(&self, s: &str) -> u64 {
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
let mut hasher = DefaultHasher::new();
s.hash(&mut hasher);
hasher.finish()
}
}
impl Clone for UnifiedStringPool {
fn clone(&self) -> Self {
Self {
manager: Arc::clone(&self.manager),
buffer: Arc::clone(&self.buffer),
strings: Arc::clone(&self.strings),
dedup_index: Arc::clone(&self.dedup_index),
buffer_info: Arc::clone(&self.buffer_info),
config: self.config.clone(),
}
}
}
#[derive(Debug, Clone)]
pub struct UnifiedStringPoolStats {
pub total_strings: usize,
pub unique_strings: usize,
pub total_bytes: usize,
pub buffer_capacity: usize,
pub fragmentation_ratio: f64,
pub deduplication_ratio: f64,
pub memory_efficiency: f64,
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_unified_string_pool_creation() {
let pool = UnifiedStringPool::with_default_config().expect("operation should succeed");
let stats = pool.stats().expect("operation should succeed");
assert_eq!(stats.total_strings, 0);
assert_eq!(stats.unique_strings, 0);
assert!(stats.buffer_capacity > 0);
}
#[test]
fn test_string_addition_and_retrieval() {
let pool = UnifiedStringPool::with_default_config().expect("operation should succeed");
let id1 = pool.add_string("hello").expect("operation should succeed");
let id2 = pool.add_string("world").expect("operation should succeed");
let id3 = pool.add_string("hello").expect("operation should succeed");
assert_ne!(id1, id2);
assert_eq!(id1, id3);
let view1 = pool.get_string(id1).expect("operation should succeed");
let view2 = pool.get_string(id2).expect("operation should succeed");
assert_eq!(view1.as_str().expect("operation should succeed"), "hello");
assert_eq!(view2.as_str().expect("operation should succeed"), "world");
let stats = pool.stats().expect("operation should succeed");
assert_eq!(stats.total_strings, 2); }
#[test]
fn test_multiple_string_operations() {
let pool = UnifiedStringPool::with_default_config().expect("operation should succeed");
let strings = vec![
"apple".to_string(),
"banana".to_string(),
"cherry".to_string(),
"apple".to_string(), ];
let ids = pool.add_strings(&strings).expect("operation should succeed");
assert_eq!(ids.len(), 4);
assert_eq!(ids[0], ids[3]);
let views = pool.get_strings(&ids).expect("operation should succeed");
assert_eq!(views.len(), 4);
assert_eq!(views[0].as_str().expect("operation should succeed"), "apple");
assert_eq!(views[1].as_str().expect("operation should succeed"), "banana");
assert_eq!(views[2].as_str().expect("operation should succeed"), "cherry");
assert_eq!(views[3].as_str().expect("operation should succeed"), "apple");
}
#[test]
fn test_zero_copy_substring() {
let pool = UnifiedStringPool::with_default_config().expect("operation should succeed");
let id = pool.add_string("hello world").expect("operation should succeed");
let view = pool.get_string(id).expect("operation should succeed");
let substring = view.substring(0..5).expect("operation should succeed");
assert_eq!(substring.as_str().expect("operation should succeed"), "hello");
let substring2 = view.substring(6..11).expect("operation should succeed");
assert_eq!(substring2.as_str().expect("operation should succeed"), "world");
}
#[test]
fn test_pool_statistics() {
let pool = UnifiedStringPool::with_default_config().expect("operation should succeed");
pool.add_string("test").expect("operation should succeed");
pool.add_string("data").expect("operation should succeed");
pool.add_string("test").expect("operation should succeed");
let stats = pool.stats().expect("operation should succeed");
assert_eq!(stats.total_strings, 2); assert_eq!(stats.unique_strings, 2);
assert!(stats.total_bytes > 0);
assert!(stats.deduplication_ratio > 0.0); }
#[test]
fn test_buffer_expansion() {
let mut config = UnifiedStringPoolConfig::default();
config.initial_buffer_size = 16;
let pool = UnifiedStringPool::new(config).expect("operation should succeed");
for i in 0..10 {
let s = format!("this is a longer string {}", i);
pool.add_string(&s).expect("operation should succeed");
}
let stats = pool.stats().expect("operation should succeed");
assert!(stats.buffer_capacity > 16); assert_eq!(stats.total_strings, 10);
}
}