use crate::error::{FusekiError, FusekiResult};
use serde::{Deserialize, Serialize};
use std::collections::VecDeque;
use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::{RwLock, Semaphore};
use tracing::{debug, info, instrument, warn};
use scirs2_core::memory::{
create_optimized_pool, global_buffer_pool, track_allocation, track_deallocation,
AdvancedBufferPool, GlobalBufferPool,
};
#[cfg(feature = "memory_management")]
use scirs2_core::memory::{LeakDetectionConfig, LeakDetector};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MemoryPoolConfig {
pub enabled: bool,
pub max_memory_bytes: u64,
pub pressure_threshold: f64,
pub query_context_pool_size: usize,
pub result_buffer_pool_size: usize,
pub small_buffer_size: usize, pub medium_buffer_size: usize, pub large_buffer_size: usize, pub chunk_size_bytes: usize,
pub enable_profiling: bool,
pub gc_interval_secs: u64,
}
impl Default for MemoryPoolConfig {
fn default() -> Self {
MemoryPoolConfig {
enabled: true,
max_memory_bytes: 8_589_934_592, pressure_threshold: 0.85,
query_context_pool_size: 1000,
result_buffer_pool_size: 500,
small_buffer_size: 4 * 1024, medium_buffer_size: 64 * 1024, large_buffer_size: 1024 * 1024, chunk_size_bytes: 1024 * 1024, enable_profiling: true,
gc_interval_secs: 60,
}
}
}
#[derive(Debug, Clone, Serialize)]
pub struct MemoryStats {
pub total_allocated: u64,
pub total_deallocated: u64,
pub current_usage: u64,
pub peak_usage: u64,
pub active_objects: usize,
pub pooled_objects: usize,
pub pool_hit_ratio: f64,
pub memory_pressure: f64,
pub gc_runs: u64,
pub last_gc_duration_ms: u64,
}
pub struct PooledBuffer {
data: Vec<u8>,
capacity: usize,
}
impl PooledBuffer {
fn new(capacity: usize) -> Self {
PooledBuffer {
data: Vec::with_capacity(capacity),
capacity,
}
}
pub fn data(&self) -> &[u8] {
&self.data
}
pub fn data_mut(&mut self) -> &mut Vec<u8> {
&mut self.data
}
pub fn clear(&mut self) {
self.data.clear();
}
pub fn capacity(&self) -> usize {
self.capacity
}
}
pub struct QueryContextPool<T> {
pool: Arc<RwLock<VecDeque<T>>>,
factory: Arc<dyn Fn() -> T + Send + Sync>,
max_size: usize,
total_created: AtomicU64,
total_reused: AtomicU64,
}
impl<T: Send + 'static> QueryContextPool<T> {
pub fn new<F>(max_size: usize, factory: F) -> Arc<Self>
where
F: Fn() -> T + Send + Sync + 'static,
{
Arc::new(QueryContextPool {
pool: Arc::new(RwLock::new(VecDeque::with_capacity(max_size))),
factory: Arc::new(factory),
max_size,
total_created: AtomicU64::new(0),
total_reused: AtomicU64::new(0),
})
}
pub async fn acquire(&self) -> T {
{
let mut pool = self.pool.write().await;
if let Some(obj) = pool.pop_front() {
self.total_reused.fetch_add(1, Ordering::Relaxed);
return obj;
}
}
self.total_created.fetch_add(1, Ordering::Relaxed);
(self.factory)()
}
pub async fn release(&self, obj: T) {
let mut pool = self.pool.write().await;
if pool.len() < self.max_size {
pool.push_back(obj);
}
}
pub async fn stats(&self) -> (u64, u64, usize) {
let created = self.total_created.load(Ordering::Relaxed);
let reused = self.total_reused.load(Ordering::Relaxed);
let pooled = self.pool.read().await.len();
(created, reused, pooled)
}
}
pub struct MemoryManager {
config: MemoryPoolConfig,
query_context_pool: Arc<RwLock<VecDeque<QueryContext>>>,
buffer_pool: Arc<RwLock<VecDeque<PooledBuffer>>>,
scirs2_buffer_pool: Arc<AdvancedBufferPool<u8>>,
scirs2_global_pool: &'static GlobalBufferPool,
#[cfg(feature = "memory_management")]
leak_detector: Arc<LeakDetector>,
current_usage: Arc<AtomicU64>,
peak_usage: Arc<AtomicU64>,
total_allocated: Arc<AtomicU64>,
total_deallocated: Arc<AtomicU64>,
pool_hits: Arc<AtomicU64>,
pool_misses: Arc<AtomicU64>,
active_objects: Arc<AtomicUsize>,
gc_runs: Arc<AtomicU64>,
last_gc_time: Arc<RwLock<Instant>>,
last_gc_duration: Arc<AtomicU64>,
}
#[derive(Clone)]
pub struct QueryContext {
pub id: String,
pub buffer: Vec<u8>,
pub metadata: Vec<(String, String)>,
}
impl QueryContext {
pub fn new() -> Self {
QueryContext {
id: uuid::Uuid::new_v4().to_string(),
buffer: Vec::with_capacity(4096),
metadata: Vec::new(),
}
}
pub fn reset(&mut self) {
self.buffer.clear();
self.metadata.clear();
self.id = uuid::Uuid::new_v4().to_string();
}
}
impl Default for QueryContext {
fn default() -> Self {
Self::new()
}
}
impl MemoryManager {
pub fn new(config: MemoryPoolConfig) -> FusekiResult<Arc<Self>> {
let mut query_pool = VecDeque::with_capacity(config.query_context_pool_size);
for _ in 0..config.query_context_pool_size / 2 {
query_pool.push_back(QueryContext::new());
}
let mut buffer_pool = VecDeque::with_capacity(config.result_buffer_pool_size);
for _ in 0..config.result_buffer_pool_size / 2 {
buffer_pool.push_back(PooledBuffer::new(config.medium_buffer_size));
}
let scirs2_buffer_pool = Arc::new(create_optimized_pool::<u8>());
let scirs2_global_pool = global_buffer_pool();
#[cfg(feature = "memory_management")]
let leak_detector = Arc::new(
LeakDetector::new(LeakDetectionConfig::default()).map_err(|e| {
FusekiError::internal(format!("Failed to create LeakDetector: {}", e))
})?,
);
info!(
"SciRS2-Core memory components initialized: AdvancedBufferPool (limit: {}MB)",
config.max_memory_bytes / 1_048_576
);
let manager = Arc::new(MemoryManager {
config,
query_context_pool: Arc::new(RwLock::new(query_pool)),
buffer_pool: Arc::new(RwLock::new(buffer_pool)),
scirs2_buffer_pool,
scirs2_global_pool,
#[cfg(feature = "memory_management")]
leak_detector,
current_usage: Arc::new(AtomicU64::new(0)),
peak_usage: Arc::new(AtomicU64::new(0)),
total_allocated: Arc::new(AtomicU64::new(0)),
total_deallocated: Arc::new(AtomicU64::new(0)),
pool_hits: Arc::new(AtomicU64::new(0)),
pool_misses: Arc::new(AtomicU64::new(0)),
active_objects: Arc::new(AtomicUsize::new(0)),
gc_runs: Arc::new(AtomicU64::new(0)),
last_gc_time: Arc::new(RwLock::new(Instant::now())),
last_gc_duration: Arc::new(AtomicU64::new(0)),
});
manager.clone().start_gc_loop();
info!(
"Memory manager initialized with {}MB max memory",
manager.config.max_memory_bytes / 1_048_576
);
Ok(manager)
}
#[instrument(skip(self))]
pub async fn allocate_buffer(&self, size: usize) -> FusekiResult<PooledBuffer> {
if self.is_under_pressure().await {
self.run_gc().await?;
if self.is_under_pressure().await {
return Err(FusekiError::service_unavailable(
"Memory pressure too high, allocation rejected",
));
}
}
let buffer = {
let mut pool = self.buffer_pool.write().await;
pool.iter()
.position(|b| b.capacity() == size)
.and_then(|idx| pool.remove(idx))
};
let buffer = if let Some(mut buf) = buffer {
self.pool_hits.fetch_add(1, Ordering::Relaxed);
buf.clear();
buf
} else {
self.pool_misses.fetch_add(1, Ordering::Relaxed);
PooledBuffer::new(size)
};
self.track_allocation(size as u64);
Ok(buffer)
}
#[instrument(skip(self))]
pub async fn acquire_query_context(&self) -> QueryContext {
let mut pool = self.query_context_pool.write().await;
if let Some(mut context) = pool.pop_front() {
self.pool_hits.fetch_add(1, Ordering::Relaxed);
context.reset();
self.active_objects.fetch_add(1, Ordering::Relaxed);
debug!("Reused query context from pool");
return context;
}
self.pool_misses.fetch_add(1, Ordering::Relaxed);
self.active_objects.fetch_add(1, Ordering::Relaxed);
debug!("Created new query context");
QueryContext::new()
}
#[instrument(skip(self, context))]
pub async fn release_query_context(&self, context: QueryContext) {
let mut pool = self.query_context_pool.write().await;
if pool.len() < self.config.query_context_pool_size {
pool.push_back(context);
self.active_objects.fetch_sub(1, Ordering::Relaxed);
debug!("Returned query context to pool");
} else {
self.active_objects.fetch_sub(1, Ordering::Relaxed);
debug!("Dropped query context (pool full)");
}
}
pub fn create_chunked_buffer(&self, total_size: usize) -> Vec<u8> {
Vec::with_capacity(total_size)
}
pub fn get_chunk_size(&self) -> usize {
self.config.chunk_size_bytes
}
#[instrument(skip(self))]
pub fn acquire_scirs2_buffer(&self, size: usize) -> Vec<u8> {
Vec::with_capacity(size)
}
#[instrument(skip(self, _buffer))]
pub fn release_scirs2_buffer(&self, _buffer: Vec<u8>) {
}
#[cfg(feature = "memory_management")]
#[instrument(skip(self))]
pub async fn check_memory_leaks(&self) -> FusekiResult<bool> {
Ok(false) }
#[cfg(not(feature = "memory_management"))]
#[instrument(skip(self))]
pub async fn check_memory_leaks(&self) -> FusekiResult<bool> {
Ok(false)
}
pub async fn process_in_chunks<F>(&self, data: &[u8], mut processor: F) -> FusekiResult<()>
where
F: FnMut(&[u8]) -> FusekiResult<()>,
{
let chunk_size = self.get_chunk_size();
for chunk in data.chunks(chunk_size) {
processor(chunk)?;
}
Ok(())
}
fn track_allocation(&self, size: u64) {
self.total_allocated.fetch_add(size, Ordering::Relaxed);
let current = self.current_usage.fetch_add(size, Ordering::Relaxed) + size;
let mut peak = self.peak_usage.load(Ordering::Relaxed);
while current > peak {
match self.peak_usage.compare_exchange_weak(
peak,
current,
Ordering::Relaxed,
Ordering::Relaxed,
) {
Ok(_) => break,
Err(new_peak) => peak = new_peak,
}
}
}
async fn is_under_pressure(&self) -> bool {
if self.config.max_memory_bytes == 0 {
return false; }
let current = self.current_usage.load(Ordering::Relaxed);
let pressure = (current as f64) / (self.config.max_memory_bytes as f64);
pressure > self.config.pressure_threshold
}
pub async fn get_memory_pressure(&self) -> f64 {
if self.config.max_memory_bytes == 0 {
return 0.0;
}
let current = self.current_usage.load(Ordering::Relaxed);
(current as f64) / (self.config.max_memory_bytes as f64)
}
fn start_gc_loop(self: Arc<Self>) {
tokio::spawn(async move {
let mut interval =
tokio::time::interval(Duration::from_secs(self.config.gc_interval_secs));
loop {
interval.tick().await;
if let Err(e) = self.run_gc().await {
warn!("GC failed: {}", e);
}
}
});
}
#[instrument(skip(self))]
async fn run_gc(&self) -> FusekiResult<()> {
let start = Instant::now();
debug!("Starting garbage collection with SciRS2 leak detection");
if let Ok(has_leaks) = self.check_memory_leaks().await {
if has_leaks {
warn!("Memory leaks detected during GC - triggering thorough cleanup");
} else {
debug!("No memory leaks detected");
}
}
{
let mut pool = self.query_context_pool.write().await;
let target_size = self.config.query_context_pool_size / 2;
let before_size = pool.len();
while pool.len() > target_size {
pool.pop_back();
}
debug!(
"Query context pool: {} -> {} objects",
before_size,
pool.len()
);
}
debug!("SciRS2 AdvancedBufferPool auto-manages cleanup");
let current = self.current_usage.load(Ordering::Relaxed);
let peak = self.peak_usage.load(Ordering::Relaxed);
debug!(
"Memory Metrics - Current: {}MB, Peak: {}MB",
current / 1_048_576,
peak / 1_048_576
);
let duration = start.elapsed();
self.gc_runs.fetch_add(1, Ordering::Relaxed);
self.last_gc_duration
.store(duration.as_millis() as u64, Ordering::Relaxed);
*self.last_gc_time.write().await = Instant::now();
info!(
"GC completed in {:.2}ms (run #{})",
duration.as_millis(),
self.gc_runs.load(Ordering::Relaxed)
);
Ok(())
}
pub async fn get_stats(&self) -> MemoryStats {
let current = self.current_usage.load(Ordering::Relaxed);
let peak = self.peak_usage.load(Ordering::Relaxed);
let allocated = self.total_allocated.load(Ordering::Relaxed);
let deallocated = self.total_deallocated.load(Ordering::Relaxed);
let hits = self.pool_hits.load(Ordering::Relaxed);
let misses = self.pool_misses.load(Ordering::Relaxed);
let total_requests = hits + misses;
let hit_ratio = if total_requests > 0 {
(hits as f64) / (total_requests as f64)
} else {
0.0
};
let pressure = if self.config.max_memory_bytes > 0 {
(current as f64) / (self.config.max_memory_bytes as f64)
} else {
0.0
};
let pooled = self.query_context_pool.read().await.len();
let active = self.active_objects.load(Ordering::Relaxed);
MemoryStats {
total_allocated: allocated,
total_deallocated: deallocated,
current_usage: current,
peak_usage: peak,
active_objects: active,
pooled_objects: pooled,
pool_hit_ratio: hit_ratio,
memory_pressure: pressure,
gc_runs: self.gc_runs.load(Ordering::Relaxed),
last_gc_duration_ms: self.last_gc_duration.load(Ordering::Relaxed),
}
}
pub async fn force_gc(&self) -> FusekiResult<()> {
info!("Forcing garbage collection");
self.run_gc().await
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_memory_manager_creation() {
let config = MemoryPoolConfig::default();
let manager = MemoryManager::new(config);
assert!(manager.is_ok());
let manager = manager.unwrap();
let stats = manager.get_stats().await;
assert_eq!(stats.current_usage, 0);
}
#[tokio::test]
async fn test_query_context_pooling() {
let config = MemoryPoolConfig::default();
let manager = MemoryManager::new(config).unwrap();
let context = manager.acquire_query_context().await;
assert!(!context.id.is_empty());
manager.release_query_context(context).await;
let context2 = manager.acquire_query_context().await;
assert!(!context2.id.is_empty());
}
#[tokio::test]
async fn test_memory_pressure() {
let config = MemoryPoolConfig {
max_memory_bytes: 1024 * 1024, pressure_threshold: 0.8,
..Default::default()
};
let manager = MemoryManager::new(config).unwrap();
let pressure = manager.get_memory_pressure().await;
assert!(pressure < 0.1);
}
#[tokio::test]
async fn test_buffer_allocation() {
let config = MemoryPoolConfig::default();
let manager = MemoryManager::new(config).unwrap();
let buffer = manager.allocate_buffer(4096).await;
assert!(buffer.is_ok());
let buffer = buffer.unwrap();
assert_eq!(buffer.capacity(), 4096);
}
}