#![allow(unexpected_cfgs)]
#[path = "common/mod.rs"]
mod common;
use common::is_redis_available;
use oxcache::backend::l1::L1Backend;
use oxcache::backend::l2::L2Backend;
use std::sync::Arc;
use std::time::Duration;
use tokio::time::sleep;
#[tokio::test]
async fn test_l1_cache_memory_leak() {
let cache = Arc::new(L1Backend::new(1000));
for i in 0..10000 {
let key = format!("key_{}", i % 100); let value = vec![i as u8; 100];
cache
.set_bytes(&key, value.clone(), Some(60))
.await
.unwrap();
cache.get_bytes(&key).await.unwrap();
if i % 1000 == 0 {
for j in 0..100 {
let key = format!("key_{}", j);
let _ = cache.delete(&key).await;
}
sleep(Duration::from_millis(10)).await;
}
}
for j in 0..100 {
let key = format!("key_{}", j);
let _ = cache.delete(&key).await;
}
drop(cache);
sleep(Duration::from_millis(100)).await;
}
#[tokio::test]
async fn test_l2_cache_memory_leak() {
if !is_redis_available().await {
println!("跳过test_l2_cache_memory_leak:Redis不可用");
return;
}
use oxcache::config::L2Config;
use oxcache::config::RedisMode;
let config = L2Config {
mode: RedisMode::Standalone,
connection_string: secrecy::SecretString::from("redis://127.0.0.1:6379/15".to_string()),
connection_timeout_ms: 5000,
command_timeout_ms: 1000,
password: None,
enable_tls: false,
sentinel: None,
cluster: None,
default_ttl: Some(3600),
..Default::default()
};
let l2_backend = L2Backend::new(&config)
.await
.expect("Failed to connect to Redis");
for i in 0..5000 {
let key = format!("l2_leak_test_{}", i % 50); let value = vec![i as u8; 1024];
l2_backend
.set_with_version(&key, value.clone(), Some(300))
.await
.unwrap();
l2_backend.get_bytes(&key).await.unwrap();
if i % 500 == 0 {
l2_backend.delete(&key).await.unwrap();
sleep(Duration::from_millis(50)).await;
}
}
for i in 0..50 {
let key = format!("l2_leak_test_{}", i);
l2_backend.delete(&key).await.unwrap();
}
drop(l2_backend);
}
#[tokio::test]
async fn test_two_level_cache_memory_leak() {
if !is_redis_available().await {
println!("跳过test_two_level_cache_memory_leak:Redis不可用");
return;
}
use oxcache::config::L2Config;
use oxcache::config::RedisMode;
let l1 = Arc::new(L1Backend::new(100));
let config = L2Config {
mode: RedisMode::Standalone,
connection_string: secrecy::SecretString::from("redis://127.0.0.1:6379/14".to_string()),
connection_timeout_ms: 5000,
command_timeout_ms: 1000,
password: None,
enable_tls: false,
sentinel: None,
cluster: None,
default_ttl: Some(3600),
..Default::default()
};
let l2 = L2Backend::new(&config)
.await
.expect("Failed to connect to Redis");
for i in 0..1500 {
let key = format!("two_level_l1_{}", i % 100);
let value = format!("value_{}", i).into_bytes();
l1.set_bytes(&key, value.clone(), Some(120)).await.unwrap();
let _ = l1.get_bytes(&key).await;
if i % 150 == 0 {
for j in 0..100 {
let key = format!("two_level_l1_{}", j);
let _ = l1.delete(&key).await;
}
sleep(Duration::from_millis(20)).await;
}
}
for j in 0..100 {
let key = format!("two_level_l1_{}", j);
let _ = l1.delete(&key).await;
}
for i in 0..1500 {
let key = format!("two_level_l2_{}", i % 100);
let value = format!("value_{}", i).into_bytes();
l2.set_with_version(&key, value.clone(), Some(120))
.await
.unwrap();
let _ = l2.get_bytes(&key).await;
if i % 150 == 0 {
for j in 0..100 {
let key = format!("two_level_l2_{}", j);
l2.delete(&key).await.unwrap();
}
sleep(Duration::from_millis(20)).await;
}
}
for j in 0..100 {
let key = format!("two_level_l2_{}", j);
l2.delete(&key).await.unwrap();
}
drop(l1);
drop(l2);
sleep(Duration::from_millis(100)).await;
}
#[tokio::test]
async fn test_batch_operation_memory_leak() {
if !is_redis_available().await {
println!("跳过test_batch_operation_memory_leak:Redis不可用");
return;
}
let l1 = Arc::new(L1Backend::new(500));
use oxcache::config::L2Config;
use oxcache::config::RedisMode;
let config = L2Config {
mode: RedisMode::Standalone,
connection_string: secrecy::SecretString::from("redis://127.0.0.1:6379/13".to_string()),
connection_timeout_ms: 5000,
command_timeout_ms: 1000,
password: None,
enable_tls: false,
sentinel: None,
cluster: None,
default_ttl: Some(3600),
..Default::default()
};
let l2 = L2Backend::new(&config)
.await
.expect("Failed to connect to Redis");
for batch_id in 0..50 {
let mut batch = Vec::new();
for i in 0..50 {
let key = format!("batch_l1_{}_{}", batch_id, i);
let value = vec![batch_id as u8; 256];
batch.push((key, value));
}
for (key, value) in &batch {
l1.set_bytes(key, value.clone(), Some(60)).await.unwrap();
}
for (key, _) in &batch {
let _ = l1.get_bytes(key).await;
}
for (key, _) in &batch {
l1.delete(key).await.unwrap();
}
let mut l2_batch = Vec::new();
for i in 0..50 {
let key = format!("batch_l2_{}_{}", batch_id, i);
let value = vec![batch_id as u8; 256];
l2_batch.push((key, value));
}
for (key, value) in &l2_batch {
l2.set_with_version(key, value.clone(), Some(60))
.await
.unwrap();
}
for (key, _) in &l2_batch {
let _ = l2.get_bytes(key).await;
}
for (key, _) in &l2_batch {
l2.delete(key).await.unwrap();
}
sleep(Duration::from_millis(10)).await;
}
for i in 0..100 {
let key = format!("batch_l1_0_{}", i);
let _ = l1.delete(&key).await;
}
drop(l1);
drop(l2);
}
#[tokio::test]
async fn test_concurrent_memory_leak() {
let cache = Arc::new(L1Backend::new(1000));
let mut handles = vec![];
for thread_id in 0..10 {
let cache_clone = Arc::clone(&cache);
let handle = tokio::spawn(async move {
for i in 0..1000 {
let key = format!("thread_{}_key_{}", thread_id, i % 50);
let value = format!("thread_{}_value_{}", thread_id, i).into_bytes();
cache_clone
.set_bytes(&key, value.clone(), Some(60))
.await
.unwrap();
let _ = cache_clone.get_bytes(&key).await;
if i % 100 == 0 {
for j in 0..50 {
let key = format!("thread_{}_key_{}", thread_id, j);
let _ = cache_clone.delete(&key).await;
}
}
}
});
handles.push(handle);
}
for handle in handles {
handle.await.unwrap();
}
for thread_id in 0..10 {
for i in 0..50 {
let key = format!("thread_{}_key_{}", thread_id, i);
let _ = cache.delete(&key).await;
}
}
drop(cache);
sleep(Duration::from_millis(200)).await;
}
#[tokio::test]
async fn test_circular_reference_memory_leak() {
use std::cell::RefCell;
use std::rc::Rc;
struct Node {
_value: Vec<u8>,
next: Option<Rc<RefCell<Node>>>,
}
let node1 = Rc::new(RefCell::new(Node {
_value: vec![1; 1024],
next: None,
}));
let node2 = Rc::new(RefCell::new(Node {
_value: vec![2; 1024],
next: Some(Rc::clone(&node1)),
}));
node1.borrow_mut().next = Some(Rc::clone(&node2));
let cache = Arc::new(L1Backend::new(100));
let serialized = format!("circular_ref_data_{}", Rc::strong_count(&node1)).into_bytes();
cache
.set_bytes("circular_ref", serialized.clone(), Some(10))
.await
.unwrap();
cache.delete("circular_ref").await.unwrap();
drop(cache);
drop(node1);
drop(node2);
sleep(Duration::from_millis(100)).await;
}
#[cfg(feature = "memory_profiling_never_12345")]
mod memory_profiling {
use super::*;
use jemalloc_ctl::{epoch, stats};
use std::fmt;
#[derive(Debug)]
struct JemallocWrapper(jemalloc_ctl::Error);
impl fmt::Display for JemallocWrapper {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "jemalloc error: {:?}", self.0)
}
}
impl std::error::Error for JemallocWrapper {}
pub async fn get_memory_usage() -> Result<(usize, usize), Box<dyn std::error::Error>> {
epoch::advance().map_err(|e| Box::new(JemallocWrapper(e)) as Box<dyn std::error::Error>)?;
let allocated = stats::allocated::read()
.map_err(|e| Box::new(JemallocWrapper(e)) as Box<dyn std::error::Error>)?;
let active = stats::active::read()
.map_err(|e| Box::new(JemallocWrapper(e)) as Box<dyn std::error::Error>)?;
Ok((allocated, active))
}
#[tokio::test]
async fn test_memory_usage_tracking() {
let (initial_allocated, initial_active) = get_memory_usage().await.unwrap();
let cache = Arc::new(L1Backend::new(10000));
for i in 0..10000 {
let key = format!("mem_test_{}", i);
let value = vec![i as u8; 1024];
cache.set_bytes(&key, value, Some(60)).await.unwrap();
}
let (peak_allocated, peak_active) = get_memory_usage().await.unwrap();
println!(
"Memory usage - Initial: {} bytes allocated, {} bytes active",
initial_allocated, initial_active
);
println!(
"Memory usage - Peak: {} bytes allocated, {} bytes active",
peak_allocated, peak_active
);
for i in 0..10000 {
let key = format!("mem_test_{}", i);
let _ = cache.delete(&key).await;
}
drop(cache);
sleep(Duration::from_millis(500)).await;
let (final_allocated, final_active) = get_memory_usage().await.unwrap();
println!(
"Memory usage - Final: {} bytes allocated, {} bytes active",
final_allocated, final_active
);
let max_reasonable_allocation = initial_allocated.saturating_add(10 * 1024 * 1024);
assert!(
final_allocated < max_reasonable_allocation,
"Potential memory leak: allocated {} bytes (initial: {}, max reasonable: {})",
final_allocated,
initial_allocated,
max_reasonable_allocation
);
assert!(
final_allocated <= peak_allocated * 2,
"Memory allocation increased significantly after cleanup: {} vs peak {}",
final_allocated,
peak_allocated
);
}
#[tokio::test]
async fn test_long_running_memory_stability() {
let (initial_allocated, _) = get_memory_usage().await.unwrap();
let cache = Arc::new(L1Backend::new(5000));
let mut memory_samples = Vec::new();
memory_samples.push(initial_allocated);
for round in 0..10 {
println!("Running memory stability test round {}/10", round + 1);
for i in 0..2000 {
let key = format!("longrun_{}_{}", round, i % 500);
let value = vec![round as u8; 512];
cache.set_bytes(&key, value, Some(120)).await.unwrap();
}
for i in 0..2000 {
let key = format!("longrun_{}_{}", round, i % 500);
let _ = cache.get_bytes(&key).await;
}
if round % 2 == 0 {
for i in 0..500 {
let key = format!("longrun_{}_{}", (round + 1) % 2, i);
let _ = cache.delete(&key).await;
}
}
let (current_allocated, _) = get_memory_usage().await.unwrap();
memory_samples.push(current_allocated);
println!(
" Memory usage after round {}: {} bytes",
round + 1,
current_allocated
);
sleep(Duration::from_millis(200)).await;
}
for round in 0..10 {
for i in 0..500 {
let key = format!("longrun_{}_{}", round, i);
let _ = cache.delete(&key).await;
}
}
drop(cache);
sleep(Duration::from_millis(500)).await;
let max_memory = memory_samples.iter().max().unwrap();
let min_memory = memory_samples.iter().min().unwrap();
println!("Long running memory stability test results:");
println!(" Minimum memory usage: {} bytes", min_memory);
println!(" Maximum memory usage: {} bytes", max_memory);
println!(" Memory usage range: {} bytes", max_memory - min_memory);
assert!(
*max_memory < initial_allocated * 3,
"Memory usage exceeded expected limit: {} vs {}",
max_memory,
initial_allocated * 3
);
}
}