use crate::error::{NumRs2Error, Result};
#[allow(unused_imports)]
use std::alloc::{alloc, dealloc, Layout};
use std::collections::{HashMap, VecDeque};
use std::fs::{File, OpenOptions};
#[allow(unused_imports)]
use std::io::{Read, Seek, SeekFrom, Write};
use std::path::{Path, PathBuf};
#[allow(unused_imports)]
use std::ptr::NonNull;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::{Arc, Mutex, RwLock};
use std::thread;
use std::time::{SystemTime, UNIX_EPOCH};
#[derive(Debug, Clone)]
pub struct LargeScaleConfig {
pub max_memory_usage: usize,
pub spill_threshold: f64,
pub chunk_size: usize,
pub temp_dir: PathBuf,
pub background_cleanup: bool,
pub monitor_interval_ms: u64,
pub enable_stats: bool,
}
impl Default for LargeScaleConfig {
fn default() -> Self {
Self {
max_memory_usage: 8 * 1024 * 1024 * 1024, spill_threshold: 0.8, chunk_size: 64 * 1024 * 1024, temp_dir: std::env::temp_dir().join("numrs_temp"),
background_cleanup: true,
monitor_interval_ms: 1000, enable_stats: true,
}
}
}
#[derive(Debug)]
pub struct MemoryTracker {
current_usage: AtomicUsize,
peak_usage: AtomicUsize,
active_allocations: AtomicUsize,
total_allocations: AtomicUsize,
total_deallocations: AtomicUsize,
enabled: AtomicBool,
}
impl Default for MemoryTracker {
fn default() -> Self {
Self::new()
}
}
impl MemoryTracker {
pub fn new() -> Self {
Self {
current_usage: AtomicUsize::new(0),
peak_usage: AtomicUsize::new(0),
active_allocations: AtomicUsize::new(0),
total_allocations: AtomicUsize::new(0),
total_deallocations: AtomicUsize::new(0),
enabled: AtomicBool::new(true),
}
}
pub fn record_allocation(&self, size: usize) {
if !self.enabled.load(Ordering::Relaxed) {
return;
}
let new_usage = self.current_usage.fetch_add(size, Ordering::SeqCst) + size;
let _ = self.peak_usage.fetch_max(new_usage, Ordering::SeqCst);
self.active_allocations.fetch_add(1, Ordering::SeqCst);
self.total_allocations.fetch_add(1, Ordering::SeqCst);
}
pub fn record_deallocation(&self, size: usize) {
if !self.enabled.load(Ordering::Relaxed) {
return;
}
self.current_usage.fetch_sub(size, Ordering::SeqCst);
self.active_allocations.fetch_sub(1, Ordering::SeqCst);
self.total_deallocations.fetch_add(1, Ordering::SeqCst);
}
pub fn current_usage(&self) -> usize {
self.current_usage.load(Ordering::SeqCst)
}
pub fn peak_usage(&self) -> usize {
self.peak_usage.load(Ordering::SeqCst)
}
pub fn active_allocations(&self) -> usize {
self.active_allocations.load(Ordering::SeqCst)
}
pub fn get_stats(&self) -> MemoryStats {
MemoryStats {
current_usage: self.current_usage(),
peak_usage: self.peak_usage(),
active_allocations: self.active_allocations(),
total_allocations: self.total_allocations.load(Ordering::SeqCst),
total_deallocations: self.total_deallocations.load(Ordering::SeqCst),
}
}
pub fn reset_stats(&self) {
self.current_usage.store(0, Ordering::SeqCst);
self.peak_usage.store(0, Ordering::SeqCst);
self.active_allocations.store(0, Ordering::SeqCst);
self.total_allocations.store(0, Ordering::SeqCst);
self.total_deallocations.store(0, Ordering::SeqCst);
}
pub fn set_enabled(&self, enabled: bool) {
self.enabled.store(enabled, Ordering::SeqCst);
}
}
#[derive(Debug, Clone)]
pub struct MemoryStats {
pub current_usage: usize,
pub peak_usage: usize,
pub active_allocations: usize,
pub total_allocations: usize,
pub total_deallocations: usize,
}
#[derive(Debug)]
pub struct SpilledData {
path: PathBuf,
size: usize,
#[allow(dead_code)]
created_at: SystemTime,
last_accessed: SystemTime,
marked_for_cleanup: bool,
}
impl SpilledData {
pub fn new(path: PathBuf, size: usize) -> Self {
let now = SystemTime::now();
Self {
path,
size,
created_at: now,
last_accessed: now,
marked_for_cleanup: false,
}
}
pub fn path(&self) -> &Path {
&self.path
}
pub fn size(&self) -> usize {
self.size
}
pub fn touch(&mut self) {
self.last_accessed = SystemTime::now();
}
pub fn is_eligible_for_cleanup(&self, max_age_seconds: u64) -> bool {
if self.marked_for_cleanup {
return true;
}
if let Ok(duration) = self.last_accessed.duration_since(UNIX_EPOCH) {
let age_seconds = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_secs()
- duration.as_secs();
age_seconds > max_age_seconds
} else {
false
}
}
pub fn mark_for_cleanup(&mut self) {
self.marked_for_cleanup = true;
}
pub fn load(&mut self) -> Result<Vec<u8>> {
self.touch();
let mut file = File::open(&self.path)?;
let mut buffer = Vec::with_capacity(self.size);
file.read_to_end(&mut buffer)?;
Ok(buffer)
}
}
impl Drop for SpilledData {
fn drop(&mut self) {
let _ = std::fs::remove_file(&self.path);
}
}
pub struct LargeScaleManager {
config: LargeScaleConfig,
tracker: Arc<MemoryTracker>,
spilled_data: Arc<RwLock<HashMap<String, SpilledData>>>,
cleanup_queue: Arc<Mutex<VecDeque<String>>>,
cleanup_thread: Option<thread::JoinHandle<()>>,
shutdown_signal: Arc<AtomicBool>,
next_spill_id: AtomicUsize,
}
impl LargeScaleManager {
pub fn new(config: LargeScaleConfig) -> Result<Self> {
std::fs::create_dir_all(&config.temp_dir)?;
let tracker = Arc::new(MemoryTracker::new());
let spilled_data = Arc::new(RwLock::new(HashMap::new()));
let cleanup_queue = Arc::new(Mutex::new(VecDeque::new()));
let shutdown_signal = Arc::new(AtomicBool::new(false));
let mut manager = Self {
config,
tracker,
spilled_data,
cleanup_queue,
cleanup_thread: None,
shutdown_signal,
next_spill_id: AtomicUsize::new(0),
};
if manager.config.background_cleanup {
manager.start_background_cleanup();
}
Ok(manager)
}
pub fn should_spill(&self) -> bool {
if self.config.max_memory_usage == 0 {
return false; }
let current_usage = self.tracker.current_usage();
let threshold =
(self.config.max_memory_usage as f64 * self.config.spill_threshold) as usize;
current_usage > threshold
}
pub fn spill_data(&self, data: &[u8], id: Option<String>) -> Result<String> {
let spill_id = id.unwrap_or_else(|| {
format!(
"spill_{}",
self.next_spill_id.fetch_add(1, Ordering::SeqCst)
)
});
let spill_path = self.config.temp_dir.join(format!("{}.tmp", spill_id));
let mut file = OpenOptions::new()
.create(true)
.write(true)
.truncate(true)
.open(&spill_path)?;
file.write_all(data)?;
file.sync_all()?;
let spilled = SpilledData::new(spill_path, data.len());
{
let mut registry = self
.spilled_data
.write()
.expect("spilled_data RwLock should not be poisoned");
registry.insert(spill_id.clone(), spilled);
}
{
let mut queue = self
.cleanup_queue
.lock()
.expect("cleanup_queue mutex should not be poisoned");
queue.push_back(spill_id.clone());
}
Ok(spill_id)
}
pub fn load_spilled_data(&self, spill_id: &str) -> Result<Vec<u8>> {
let mut registry = self
.spilled_data
.write()
.expect("spilled_data RwLock should not be poisoned");
if let Some(spilled) = registry.get_mut(spill_id) {
spilled.load()
} else {
Err(NumRs2Error::InvalidOperation(format!(
"Spilled data '{}' not found",
spill_id
)))
}
}
pub fn remove_spilled_data(&self, spill_id: &str) -> Result<()> {
let mut registry = self
.spilled_data
.write()
.expect("spilled_data RwLock should not be poisoned");
if registry.remove(spill_id).is_some() {
let mut queue = self
.cleanup_queue
.lock()
.expect("cleanup_queue mutex should not be poisoned");
queue.retain(|id| id != spill_id);
Ok(())
} else {
Err(NumRs2Error::InvalidOperation(format!(
"Spilled data '{}' not found",
spill_id
)))
}
}
pub fn list_spilled_data(&self) -> Vec<String> {
let registry = self
.spilled_data
.read()
.expect("spilled_data RwLock should not be poisoned");
registry.keys().cloned().collect()
}
pub fn get_memory_stats(&self) -> MemoryStats {
self.tracker.get_stats()
}
pub fn get_spill_stats(&self) -> SpillStats {
let registry = self
.spilled_data
.read()
.expect("spilled_data RwLock should not be poisoned");
let total_spilled_size: usize = registry.values().map(|s| s.size()).sum();
let spilled_count = registry.len();
SpillStats {
total_spilled_size,
spilled_count,
temp_dir: self.config.temp_dir.clone(),
}
}
pub fn cleanup_spilled_data(&self, max_age_seconds: u64) -> usize {
let mut registry = self
.spilled_data
.write()
.expect("spilled_data RwLock should not be poisoned");
let mut queue = self
.cleanup_queue
.lock()
.expect("cleanup_queue mutex should not be poisoned");
let mut cleaned_up = 0;
let mut to_remove = Vec::new();
for (id, spilled) in registry.iter_mut() {
if spilled.is_eligible_for_cleanup(max_age_seconds) {
spilled.mark_for_cleanup();
to_remove.push(id.clone());
cleaned_up += 1;
}
}
for id in &to_remove {
registry.remove(id);
queue.retain(|queue_id| queue_id != id);
}
cleaned_up
}
pub fn force_cleanup_all(&self) {
let mut registry = self
.spilled_data
.write()
.expect("spilled_data RwLock should not be poisoned");
let mut queue = self
.cleanup_queue
.lock()
.expect("cleanup_queue mutex should not be poisoned");
registry.clear();
queue.clear();
}
fn start_background_cleanup(&mut self) {
let spilled_data = Arc::clone(&self.spilled_data);
let cleanup_queue = Arc::clone(&self.cleanup_queue);
let shutdown_signal = Arc::clone(&self.shutdown_signal);
let monitor_interval = self.config.monitor_interval_ms;
let handle = thread::spawn(move || {
let cleanup_interval = std::time::Duration::from_millis(monitor_interval * 10); let max_age_seconds = 3600;
while !shutdown_signal.load(Ordering::Relaxed) {
thread::sleep(cleanup_interval);
let mut registry = spilled_data
.write()
.expect("spilled_data RwLock should not be poisoned");
let mut queue = cleanup_queue
.lock()
.expect("cleanup_queue mutex should not be poisoned");
let mut to_remove = Vec::new();
for (id, spilled) in registry.iter_mut() {
if spilled.is_eligible_for_cleanup(max_age_seconds) {
spilled.mark_for_cleanup();
to_remove.push(id.clone());
}
}
for id in &to_remove {
registry.remove(id);
queue.retain(|queue_id| queue_id != id);
}
drop(registry);
drop(queue);
}
});
self.cleanup_thread = Some(handle);
}
pub fn tracker(&self) -> &Arc<MemoryTracker> {
&self.tracker
}
pub fn config(&self) -> &LargeScaleConfig {
&self.config
}
pub fn chunk_iterator<'a, T>(
&self,
data: &'a [T],
chunk_size: Option<usize>,
) -> ChunkIterator<'a, T> {
let chunk_size = chunk_size.unwrap_or(self.config.chunk_size / std::mem::size_of::<T>());
ChunkIterator::new(data, chunk_size)
}
}
impl Drop for LargeScaleManager {
fn drop(&mut self) {
self.shutdown_signal.store(true, Ordering::SeqCst);
if let Some(handle) = self.cleanup_thread.take() {
let _ = handle.join();
}
self.force_cleanup_all();
}
}
#[derive(Debug, Clone)]
pub struct SpillStats {
pub total_spilled_size: usize,
pub spilled_count: usize,
pub temp_dir: PathBuf,
}
pub struct ChunkIterator<'a, T> {
data: &'a [T],
chunk_size: usize,
current_index: usize,
}
impl<'a, T> ChunkIterator<'a, T> {
pub fn new(data: &'a [T], chunk_size: usize) -> Self {
Self {
data,
chunk_size: chunk_size.max(1), current_index: 0,
}
}
}
impl<'a, T> Iterator for ChunkIterator<'a, T> {
type Item = &'a [T];
fn next(&mut self) -> Option<Self::Item> {
if self.current_index >= self.data.len() {
return None;
}
let end_index = std::cmp::min(self.current_index + self.chunk_size, self.data.len());
let chunk = &self.data[self.current_index..end_index];
self.current_index = end_index;
Some(chunk)
}
}
lazy_static::lazy_static! {
static ref GLOBAL_MANAGER: std::sync::RwLock<Option<LargeScaleManager>> = std::sync::RwLock::new(None);
}
pub fn init_global_manager(config: LargeScaleConfig) -> Result<()> {
let mut manager = GLOBAL_MANAGER
.write()
.expect("GLOBAL_MANAGER RwLock should not be poisoned");
if manager.is_some() {
return Err(NumRs2Error::InvalidOperation(
"Global large-scale manager already initialized".to_string(),
));
}
*manager = Some(LargeScaleManager::new(config)?);
Ok(())
}
pub fn with_global_manager<F, R>(f: F) -> Result<R>
where
F: FnOnce(&LargeScaleManager) -> R,
{
let manager = GLOBAL_MANAGER
.read()
.expect("GLOBAL_MANAGER RwLock should not be poisoned");
let manager_ref = manager.as_ref().ok_or_else(|| {
NumRs2Error::InvalidOperation(
"Global large-scale manager not initialized. Call init_global_manager() first."
.to_string(),
)
})?;
Ok(f(manager_ref))
}
pub fn should_spill_globally() -> bool {
with_global_manager(|manager| manager.should_spill()).unwrap_or(false)
}
pub fn with_global_manager_mut<F, R>(f: F) -> Result<R>
where
F: FnOnce(&mut LargeScaleManager) -> Result<R>,
{
let mut manager = GLOBAL_MANAGER
.write()
.expect("GLOBAL_MANAGER RwLock should not be poisoned");
let manager_ref = manager.as_mut().ok_or_else(|| {
NumRs2Error::InvalidOperation(
"Global large-scale manager not initialized. Call init_global_manager() first."
.to_string(),
)
})?;
f(manager_ref)
}
pub fn spill_data_globally(data: &[u8], id: Option<String>) -> Result<String> {
with_global_manager_mut(|manager| manager.spill_data(data, id))
}
pub fn load_spilled_data_globally(spill_id: &str) -> Result<Vec<u8>> {
with_global_manager_mut(|manager| manager.load_spilled_data(spill_id))
}
pub fn get_global_memory_stats() -> Result<MemoryStats> {
with_global_manager(|manager| manager.get_memory_stats())
}
pub fn get_global_spill_stats() -> Result<SpillStats> {
with_global_manager(|manager| manager.get_spill_stats())
}
#[cfg(test)]
mod tests {
use super::*;
use std::fs;
#[test]
fn test_memory_tracker() {
let tracker = MemoryTracker::new();
tracker.record_allocation(1000);
assert_eq!(tracker.current_usage(), 1000);
assert_eq!(tracker.active_allocations(), 1);
tracker.record_allocation(500);
assert_eq!(tracker.current_usage(), 1500);
assert_eq!(tracker.active_allocations(), 2);
assert_eq!(tracker.peak_usage(), 1500);
tracker.record_deallocation(500);
assert_eq!(tracker.current_usage(), 1000);
assert_eq!(tracker.active_allocations(), 1);
assert_eq!(tracker.peak_usage(), 1500);
let stats = tracker.get_stats();
assert_eq!(stats.current_usage, 1000);
assert_eq!(stats.peak_usage, 1500);
assert_eq!(stats.active_allocations, 1);
assert_eq!(stats.total_allocations, 2);
assert_eq!(stats.total_deallocations, 1);
tracker.reset_stats();
assert_eq!(tracker.current_usage(), 0);
assert_eq!(tracker.peak_usage(), 0);
}
#[test]
fn test_large_scale_manager() -> Result<()> {
let temp_dir = std::env::temp_dir().join("test_large_scale");
let config = LargeScaleConfig {
max_memory_usage: 1000,
spill_threshold: 0.5,
temp_dir: temp_dir.clone(),
background_cleanup: false,
..Default::default()
};
let manager = LargeScaleManager::new(config)?;
manager.tracker().record_allocation(600); assert!(manager.should_spill());
manager.tracker().record_deallocation(200); assert!(!manager.should_spill());
let test_data = b"Hello, world! This is test data for spilling.";
let spill_id = manager.spill_data(test_data, Some("test_spill".to_string()))?;
assert_eq!(spill_id, "test_spill");
let loaded_data = manager.load_spilled_data(&spill_id)?;
assert_eq!(loaded_data, test_data);
let spill_stats = manager.get_spill_stats();
assert_eq!(spill_stats.spilled_count, 1);
assert_eq!(spill_stats.total_spilled_size, test_data.len());
manager.remove_spilled_data(&spill_id)?;
let spill_stats = manager.get_spill_stats();
assert_eq!(spill_stats.spilled_count, 0);
let _ = fs::remove_dir_all(&temp_dir);
Ok(())
}
#[test]
fn test_chunk_iterator() {
let data = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
let chunk_iter = ChunkIterator::new(&data, 3);
let chunks: Vec<&[i32]> = chunk_iter.collect();
assert_eq!(chunks.len(), 4);
assert_eq!(chunks[0], &[1, 2, 3]);
assert_eq!(chunks[1], &[4, 5, 6]);
assert_eq!(chunks[2], &[7, 8, 9]);
assert_eq!(chunks[3], &[10]);
}
#[test]
fn test_spilled_data() {
let temp_dir = std::env::temp_dir();
let spill_path = temp_dir.join("test_spill.tmp");
let test_data = b"Test spilled data";
std::fs::write(&spill_path, test_data).expect("writing test data should succeed");
let mut spilled = SpilledData::new(spill_path.clone(), test_data.len());
let loaded = spilled.load().expect("loading spilled data should succeed");
assert_eq!(loaded, test_data);
assert!(!spilled.is_eligible_for_cleanup(3600));
spilled.mark_for_cleanup();
assert!(spilled.is_eligible_for_cleanup(0));
drop(spilled);
assert!(!spill_path.exists());
}
}