use anyhow::{anyhow, Result};
use memmap2::{Mmap, MmapMut, MmapOptions};
use oxirs_core::model::Triple;
use scirs2_core::memory::{BufferPool, GlobalBufferPool, LeakDetectionConfig, LeakDetector};
use serde::{Deserialize, Serialize};
use std::fs::{File, OpenOptions};
use std::io::{BufReader, Read};
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::RwLock;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MemoryOptimizationConfig {
pub memory_limit: usize,
pub chunk_size: usize,
pub use_mmap: bool,
pub buffer_pool_size: usize,
pub lazy_loading_threshold: usize,
pub enable_leak_detection: bool,
}
impl Default for MemoryOptimizationConfig {
fn default() -> Self {
Self {
memory_limit: 1 << 30, chunk_size: 64 * 1024, use_mmap: true, buffer_pool_size: 128 * 1024 * 1024, lazy_loading_threshold: 10 * 1024 * 1024, enable_leak_detection: cfg!(debug_assertions),
}
}
}
pub struct MmapTripleStore {
file_path: PathBuf,
mmap: Option<Mmap>,
mmap_mut: Option<MmapMut>,
triple_count: usize,
#[allow(dead_code)]
config: MemoryOptimizationConfig,
}
impl MmapTripleStore {
pub fn new<P: AsRef<Path>>(file_path: P, config: MemoryOptimizationConfig) -> Result<Self> {
let file_path = file_path.as_ref().to_path_buf();
if !file_path.exists() {
File::create(&file_path)?;
}
Ok(Self {
file_path,
mmap: None,
mmap_mut: None,
triple_count: 0,
config,
})
}
pub fn open_read(&mut self) -> Result<()> {
let file = File::open(&self.file_path)?;
if file.metadata()?.len() == 0 {
self.mmap = None;
self.triple_count = 0;
return Ok(());
}
let mmap = unsafe { MmapOptions::new().map(&file)? };
self.triple_count = self.deserialize_triple_count(&mmap)?;
self.mmap = Some(mmap);
Ok(())
}
pub fn open_write(&mut self, capacity: usize) -> Result<()> {
let file = OpenOptions::new()
.read(true)
.write(true)
.create(true)
.truncate(false)
.open(&self.file_path)?;
file.set_len(capacity as u64)?;
let mmap_mut = unsafe { MmapOptions::new().map_mut(&file)? };
self.mmap_mut = Some(mmap_mut);
Ok(())
}
pub fn write_triples(&mut self, triples: &[Triple]) -> Result<()> {
if let Some(ref mut mmap) = self.mmap_mut {
let serialized = oxicode::serde::encode_to_vec(&triples, oxicode::config::standard())?;
if serialized.len() > mmap.len() {
return Err(anyhow!(
"Serialized data size {} exceeds mmap capacity {}",
serialized.len(),
mmap.len()
));
}
mmap[..serialized.len()].copy_from_slice(&serialized);
mmap.flush()?;
self.triple_count = triples.len();
Ok(())
} else {
Err(anyhow!("Mmap not opened for writing"))
}
}
pub fn read_triples(&self) -> Result<Vec<Triple>> {
if let Some(ref mmap) = self.mmap {
let triples: Vec<Triple> =
oxicode::serde::decode_from_slice(mmap, oxicode::config::standard())
.map(|(v, _)| v)?;
Ok(triples)
} else {
Ok(Vec::new())
}
}
pub fn get_triple_count(&self) -> usize {
self.triple_count
}
fn deserialize_triple_count(&self, mmap: &Mmap) -> Result<usize> {
if mmap.is_empty() {
return Ok(0);
}
match oxicode::serde::decode_from_slice::<Vec<Triple>, _>(mmap, oxicode::config::standard())
{
Ok((triples, _)) => Ok(triples.len()),
Err(_) => Ok(0),
}
}
pub fn flush(&mut self) -> Result<()> {
if let Some(ref mut mmap) = self.mmap_mut {
mmap.flush()?;
}
Ok(())
}
}
pub struct AdaptiveQueryResultChunker {
current_chunk: Vec<Triple>,
config: MemoryOptimizationConfig,
stats: ChunkingStats,
}
#[derive(Debug, Default, Clone)]
pub struct ChunkingStats {
pub chunks_created: usize,
pub triples_processed: usize,
pub avg_chunk_size: f64,
pub adaptations: usize,
}
impl AdaptiveQueryResultChunker {
pub fn new(config: MemoryOptimizationConfig) -> Result<Self> {
Ok(Self {
current_chunk: Vec::new(),
config,
stats: ChunkingStats::default(),
})
}
pub fn add_triples(&mut self, triples: Vec<Triple>) -> Result<Vec<Vec<Triple>>> {
let mut chunks = Vec::new();
self.current_chunk.extend(triples);
let estimated_size = self.estimate_memory_usage(&self.current_chunk);
if estimated_size >= self.config.chunk_size {
let chunk = std::mem::take(&mut self.current_chunk);
self.stats.chunks_created += 1;
self.stats.triples_processed += chunk.len();
chunks.push(chunk);
}
if self.stats.chunks_created > 0 {
self.stats.avg_chunk_size =
self.stats.triples_processed as f64 / self.stats.chunks_created as f64;
}
Ok(chunks)
}
pub fn finalize(&mut self) -> Result<Vec<Triple>> {
let final_chunk = std::mem::take(&mut self.current_chunk);
if !final_chunk.is_empty() {
self.stats.chunks_created += 1;
self.stats.triples_processed += final_chunk.len();
}
Ok(final_chunk)
}
pub fn get_stats(&self) -> ChunkingStats {
self.stats.clone()
}
fn estimate_memory_usage(&self, triples: &[Triple]) -> usize {
triples.len() * 200
}
pub fn adapt_to_memory_pressure(&mut self, available_memory: usize) {
if available_memory < self.config.memory_limit / 2 {
self.config.chunk_size /= 2;
self.stats.adaptations += 1;
} else if available_memory > self.config.memory_limit * 3 / 4 {
self.config.chunk_size = (self.config.chunk_size * 3 / 2).min(256 * 1024);
self.stats.adaptations += 1;
}
}
}
pub struct NetworkBufferPool {
pool: Arc<GlobalBufferPool>,
local_pool: Arc<std::sync::Mutex<BufferPool<u8>>>,
stats: BufferPoolStats,
#[allow(dead_code)]
config: MemoryOptimizationConfig,
}
#[derive(Debug, Default, Clone)]
pub struct BufferPoolStats {
pub acquisitions: usize,
pub releases: usize,
pub active_buffers: usize,
pub hit_ratio: f64,
}
impl NetworkBufferPool {
pub fn new(config: MemoryOptimizationConfig) -> Result<Self> {
let pool = GlobalBufferPool::new();
let local_pool = Arc::new(std::sync::Mutex::new(BufferPool::<u8>::new()));
Ok(Self {
pool: Arc::new(pool),
local_pool,
stats: BufferPoolStats::default(),
config,
})
}
pub fn acquire(&mut self, size: usize) -> Result<Vec<u8>> {
let buffer = {
let mut pool = self
.local_pool
.lock()
.expect("local_pool lock should not be poisoned");
pool.acquire_vec(size)
};
self.stats.acquisitions += 1;
self.stats.active_buffers += 1;
if self.stats.acquisitions > 0 {
let hits = self.stats.acquisitions - self.stats.active_buffers;
self.stats.hit_ratio = hits as f64 / self.stats.acquisitions as f64;
}
Ok(buffer)
}
pub fn release(&mut self, _buffer: Vec<u8>) {
self.stats.releases += 1;
self.stats.active_buffers = self.stats.active_buffers.saturating_sub(1);
}
pub fn get_stats(&self) -> BufferPoolStats {
self.stats.clone()
}
pub fn get_pool(&self) -> Arc<GlobalBufferPool> {
Arc::clone(&self.pool)
}
}
pub struct LazySnapshotLoader {
snapshot_path: PathBuf,
lazy_data: Option<Arc<RwLock<Vec<u8>>>>,
is_loaded: bool,
config: MemoryOptimizationConfig,
stats: LazyLoadingStats,
}
#[derive(Debug, Default, Clone)]
pub struct LazyLoadingStats {
pub loads: usize,
pub bytes_loaded: usize,
pub load_time_ms: u64,
pub partial_loads: usize,
}
impl LazySnapshotLoader {
pub fn new<P: AsRef<Path>>(snapshot_path: P, config: MemoryOptimizationConfig) -> Result<Self> {
Ok(Self {
snapshot_path: snapshot_path.as_ref().to_path_buf(),
lazy_data: None,
is_loaded: false,
config,
stats: LazyLoadingStats::default(),
})
}
pub async fn load(&mut self) -> Result<Vec<u8>> {
let start = std::time::Instant::now();
if self.is_loaded {
if let Some(ref data) = self.lazy_data {
return Ok(data.read().await.clone());
}
}
let mut file = File::open(&self.snapshot_path)?;
let metadata = file.metadata()?;
let file_size = metadata.len() as usize;
let mut buffer = Vec::with_capacity(file_size);
file.read_to_end(&mut buffer)?;
self.stats.loads += 1;
self.stats.bytes_loaded += buffer.len();
self.stats.load_time_ms = start.elapsed().as_millis() as u64;
if buffer.len() < self.config.lazy_loading_threshold {
self.lazy_data = Some(Arc::new(RwLock::new(buffer.clone())));
self.is_loaded = true;
}
Ok(buffer)
}
pub async fn load_partial(&mut self, offset: usize, length: usize) -> Result<Vec<u8>> {
let start = std::time::Instant::now();
let file = File::open(&self.snapshot_path)?;
let mut reader = BufReader::new(file);
std::io::copy(
&mut reader.by_ref().take(offset as u64),
&mut std::io::sink(),
)?;
let mut buffer = vec![0u8; length];
reader.read_exact(&mut buffer)?;
self.stats.partial_loads += 1;
self.stats.bytes_loaded += buffer.len();
self.stats.load_time_ms += start.elapsed().as_millis() as u64;
Ok(buffer)
}
pub fn get_stats(&self) -> LazyLoadingStats {
self.stats.clone()
}
pub fn is_loaded(&self) -> bool {
self.is_loaded
}
pub fn unload(&mut self) {
self.lazy_data = None;
self.is_loaded = false;
}
}
pub struct MemoryOptimizationManager {
#[allow(dead_code)]
config: MemoryOptimizationConfig,
leak_detector: Option<LeakDetector>,
mmap_stores: Vec<Arc<RwLock<MmapTripleStore>>>,
buffer_pools: Vec<Arc<RwLock<NetworkBufferPool>>>,
stats: MemoryOptimizationStats,
}
#[derive(Debug, Default, Clone)]
pub struct MemoryOptimizationStats {
pub memory_saved: usize,
pub active_mmap_stores: usize,
pub active_buffer_pools: usize,
pub leaks_detected: usize,
}
impl MemoryOptimizationManager {
pub fn new(config: MemoryOptimizationConfig) -> Result<Self> {
let leak_detector = if config.enable_leak_detection {
let leak_config = LeakDetectionConfig {
enabled: true,
growth_threshold_bytes: 10 * 1024 * 1024, detection_window: Duration::from_secs(60),
samplingrate: 0.1,
collect_call_stacks: true,
max_tracked_allocations: 10000,
enable_external_profilers: false,
profiler_tools: Vec::new(),
enable_periodic_checks: true,
check_interval: Duration::from_secs(10),
production_mode: !cfg!(debug_assertions),
};
Some(
LeakDetector::new(leak_config)
.map_err(|e| anyhow!("Failed to create leak detector: {}", e))?,
)
} else {
None
};
Ok(Self {
config,
leak_detector,
mmap_stores: Vec::new(),
buffer_pools: Vec::new(),
stats: MemoryOptimizationStats::default(),
})
}
pub fn register_mmap_store(&mut self, store: Arc<RwLock<MmapTripleStore>>) {
self.mmap_stores.push(store);
self.stats.active_mmap_stores += 1;
}
pub fn register_buffer_pool(&mut self, pool: Arc<RwLock<NetworkBufferPool>>) {
self.buffer_pools.push(pool);
self.stats.active_buffer_pools += 1;
}
pub fn check_leaks(&mut self) -> Result<()> {
if let Some(ref _detector) = self.leak_detector {
}
Ok(())
}
pub fn get_stats(&self) -> MemoryOptimizationStats {
self.stats.clone()
}
pub fn calculate_memory_saved(&mut self) -> usize {
self.stats.memory_saved
}
}
#[cfg(test)]
mod tests {
use super::*;
use oxirs_core::model::{Literal, NamedNode};
use std::env;
fn create_test_triple() -> Triple {
Triple::new(
NamedNode::new("http://example.org/subject").unwrap(),
NamedNode::new("http://example.org/predicate").unwrap(),
Literal::new_simple_literal("object"),
)
}
#[test]
fn test_mmap_triple_store_basic() -> Result<()> {
let temp_dir = env::temp_dir();
let file_path = temp_dir.join("test_mmap_triples.bin");
let _ = std::fs::remove_file(&file_path);
let config = MemoryOptimizationConfig::default();
let mut store = MmapTripleStore::new(&file_path, config)?;
let triples = vec![create_test_triple()];
store.open_write(10 * 1024 * 1024)?; store.write_triples(&triples)?;
store.flush()?;
drop(store);
let mut store = MmapTripleStore::new(&file_path, MemoryOptimizationConfig::default())?;
store.open_read()?;
let loaded_triples = store.read_triples()?;
assert_eq!(loaded_triples.len(), 1);
assert_eq!(store.get_triple_count(), 1);
std::fs::remove_file(&file_path)?;
Ok(())
}
#[test]
fn test_adaptive_chunking() -> Result<()> {
let config = MemoryOptimizationConfig {
chunk_size: 400, ..Default::default()
};
let mut chunker = AdaptiveQueryResultChunker::new(config)?;
let triples = vec![create_test_triple(), create_test_triple()];
let chunks = chunker.add_triples(triples)?;
assert_eq!(chunks.len(), 1);
let stats = chunker.get_stats();
assert_eq!(stats.chunks_created, 1);
assert_eq!(stats.triples_processed, 2);
Ok(())
}
#[test]
fn test_buffer_pool_basic() -> Result<()> {
let config = MemoryOptimizationConfig::default();
let mut pool = NetworkBufferPool::new(config)?;
let buffer1 = pool.acquire(1024)?;
assert_eq!(buffer1.len(), 1024);
let stats = pool.get_stats();
assert_eq!(stats.acquisitions, 1);
assert_eq!(stats.active_buffers, 1);
pool.release(buffer1);
assert_eq!(pool.get_stats().releases, 1);
Ok(())
}
#[tokio::test]
async fn test_lazy_snapshot_loader() -> Result<()> {
let temp_dir = env::temp_dir();
let snapshot_path = temp_dir.join("test_snapshot.bin");
let test_data = b"test snapshot data";
std::fs::write(&snapshot_path, test_data)?;
let config = MemoryOptimizationConfig::default();
let mut loader = LazySnapshotLoader::new(&snapshot_path, config)?;
let data = loader.load().await?;
assert_eq!(&data, test_data);
assert!(loader.is_loaded());
let stats = loader.get_stats();
assert_eq!(stats.loads, 1);
assert_eq!(stats.bytes_loaded, test_data.len());
loader.unload();
assert!(!loader.is_loaded());
std::fs::remove_file(&snapshot_path)?;
Ok(())
}
#[tokio::test]
async fn test_lazy_snapshot_partial_load() -> Result<()> {
let temp_dir = env::temp_dir();
let snapshot_path = temp_dir.join("test_snapshot_partial.bin");
let test_data = b"0123456789abcdefghijklmnopqrstuvwxyz";
std::fs::write(&snapshot_path, test_data)?;
let config = MemoryOptimizationConfig::default();
let mut loader = LazySnapshotLoader::new(&snapshot_path, config)?;
let partial = loader.load_partial(5, 10).await?;
assert_eq!(&partial, b"56789abcde");
let stats = loader.get_stats();
assert_eq!(stats.partial_loads, 1);
assert_eq!(stats.bytes_loaded, 10);
std::fs::remove_file(&snapshot_path)?;
Ok(())
}
#[test]
fn test_memory_optimization_manager() -> Result<()> {
let config = MemoryOptimizationConfig::default();
let mut manager = MemoryOptimizationManager::new(config.clone())?;
let temp_dir = env::temp_dir();
let file_path = temp_dir.join("test_manager_mmap.bin");
let _ = std::fs::remove_file(&file_path);
let store = Arc::new(RwLock::new(MmapTripleStore::new(
&file_path,
config.clone(),
)?));
manager.register_mmap_store(store);
let pool = Arc::new(RwLock::new(NetworkBufferPool::new(config)?));
manager.register_buffer_pool(pool);
let stats = manager.get_stats();
assert_eq!(stats.active_mmap_stores, 1);
assert_eq!(stats.active_buffer_pools, 1);
manager.check_leaks()?;
let _ = std::fs::remove_file(&file_path);
Ok(())
}
#[test]
fn test_chunking_adaptation() -> Result<()> {
let config = MemoryOptimizationConfig {
chunk_size: 1000,
memory_limit: 10000,
..Default::default()
};
let mut chunker = AdaptiveQueryResultChunker::new(config.clone())?;
chunker.adapt_to_memory_pressure(3000);
assert_eq!(chunker.config.chunk_size, 500);
assert_eq!(chunker.get_stats().adaptations, 1);
chunker.adapt_to_memory_pressure(9000);
assert_eq!(chunker.config.chunk_size, 750);
assert_eq!(chunker.get_stats().adaptations, 2);
Ok(())
}
#[test]
fn test_buffer_pool_hit_ratio() -> Result<()> {
let config = MemoryOptimizationConfig::default();
let mut pool = NetworkBufferPool::new(config)?;
for _ in 0..10 {
let _buffer = pool.acquire(1024)?;
}
let stats = pool.get_stats();
assert_eq!(stats.acquisitions, 10);
assert!(stats.hit_ratio >= 0.0 && stats.hit_ratio <= 1.0);
Ok(())
}
#[test]
fn test_mmap_store_empty_file() -> Result<()> {
let temp_dir = env::temp_dir();
let file_path = temp_dir.join("test_empty_mmap.bin");
let _ = std::fs::remove_file(&file_path);
let config = MemoryOptimizationConfig::default();
let mut store = MmapTripleStore::new(&file_path, config)?;
store.open_read()?;
let triples = store.read_triples()?;
assert!(triples.is_empty());
assert_eq!(store.get_triple_count(), 0);
std::fs::remove_file(&file_path)?;
Ok(())
}
}