use std::process::{Command, Stdio};
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::task::JoinHandle;
use tokio::time::{sleep, timeout};
use xlink::channels::memory::MemoryChannel;
use xlink::core::types::{DeviceId, MessagePayload};
use xlink::storage::memory_store::MemoryStorage;
use xlink::XLink;
use crate::common::{test_device_id, NetworkSimulator, TestSdkBuilder};
mod common;
async fn create_memory_sdk(device_id: DeviceId) -> Result<XLink, Box<dyn std::error::Error>> {
use std::collections::HashSet;
use xlink::core::traits::MessageHandler;
use xlink::core::types::{DeviceCapabilities, DeviceType};
struct NoOpHandler;
#[async_trait::async_trait]
impl MessageHandler for NoOpHandler {
async fn handle_message(
&self,
_message: xlink::core::types::Message,
) -> xlink::core::error::Result<()> {
Ok(())
}
}
let capabilities = DeviceCapabilities {
device_id,
device_type: DeviceType::Smartphone,
device_name: "Test Device".to_string(),
supported_channels: HashSet::from([xlink::core::types::ChannelType::Lan]),
battery_level: Some(80),
is_charging: false,
data_cost_sensitive: false,
};
let handler = std::sync::Arc::new(NoOpHandler);
let channel = std::sync::Arc::new(MemoryChannel::new(handler, 10));
let storage = std::sync::Arc::new(MemoryStorage::new());
let sdk = XLink::with_storage(capabilities, vec![channel], storage).await?;
Ok(sdk)
}
async fn get_memory_usage() -> Result<u64, Box<dyn std::error::Error + Send + Sync>> {
let output = Command::new("ps")
.args(["-o", "rss=", "-p", &std::process::id().to_string()])
.stdout(Stdio::piped())
.output()?;
let memory_kb = String::from_utf8(output.stdout)?.trim().parse::<u64>()?;
Ok(memory_kb)
}
async fn simulate_message_traffic(
sdk: Arc<XLink>,
device_id: DeviceId,
message_count: Arc<AtomicU64>,
running: Arc<AtomicBool>,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let mut local_count = 0;
while running.load(Ordering::Relaxed) {
let message = format!("Long-running test message {}", local_count);
let result = sdk.send(device_id, MessagePayload::Text(message)).await;
if result.is_ok() {
message_count.fetch_add(1, Ordering::Relaxed);
local_count += 1;
}
let delay = Duration::from_millis(rand::random::<u64>() % 1000 + 100);
sleep(delay).await;
}
Ok(())
}
async fn monitor_memory_usage(
running: Arc<AtomicBool>,
memory_samples: Arc<std::sync::Mutex<Vec<(Instant, u64)>>>,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
while running.load(Ordering::Relaxed) {
match get_memory_usage().await {
Ok(memory_kb) => {
let mut samples = memory_samples.lock().unwrap();
samples.push((Instant::now(), memory_kb));
if samples.len() > 1 {
let current_memory = memory_kb;
let initial_memory = samples[0].1;
let memory_increase = current_memory.saturating_sub(initial_memory);
println!(
"Memory usage: {} KB (increase: {} KB)",
current_memory, memory_increase
);
if memory_increase > 100 * 1024 {
eprintln!("WARNING: Memory usage increased by more than 100MB!");
}
}
}
Err(e) => {
eprintln!("Failed to get memory usage: {}", e);
}
}
sleep(Duration::from_secs(5)).await;
}
Ok(())
}
#[tokio::test]
async fn test_long_running_7_days_no_memory_leak() {
println!("Starting 7-day long-running test for memory leak detection...");
let test_duration = Duration::from_secs(30); let start_time = Instant::now();
let sdk = Arc::new(
TestSdkBuilder::new()
.with_network_simulator(NetworkSimulator::wifi())
.build()
.await
.unwrap(),
);
let device_id = test_device_id();
let message_count = Arc::new(AtomicU64::new(0));
let running = Arc::new(AtomicBool::new(true));
let memory_samples = Arc::new(std::sync::Mutex::new(Vec::new()));
let message_task = tokio::spawn(simulate_message_traffic(
sdk.clone(),
device_id,
message_count.clone(),
running.clone(),
));
let memory_task = tokio::spawn(monitor_memory_usage(
running.clone(),
memory_samples.clone(),
));
println!("Test running for {:?}...", test_duration);
sleep(test_duration).await;
running.store(false, Ordering::Relaxed);
let _ = timeout(Duration::from_secs(10), message_task).await;
let _ = timeout(Duration::from_secs(10), memory_task).await;
sdk.stop().await;
let actual_duration = start_time.elapsed();
let total_messages = message_count.load(Ordering::Relaxed);
println!("Test completed in {:?}", actual_duration);
println!("Total messages processed: {}", total_messages);
let samples = memory_samples.lock().unwrap();
if samples.len() >= 2 {
let initial_memory = samples[0].1;
let final_memory = samples[samples.len() - 1].1;
let memory_increase = final_memory.saturating_sub(initial_memory);
println!("Memory usage analysis:");
println!(" Initial memory: {} KB", initial_memory);
println!(" Final memory: {} KB", final_memory);
println!(
" Memory increase: {} KB ({:.2} MB)",
memory_increase,
memory_increase as f64 / 1024.0
);
assert!(
memory_increase < 5 * 1024,
"Memory leak detected: increased by {} KB ({} MB) in {:?}",
memory_increase,
memory_increase as f64 / 1024.0,
actual_duration
);
let test_minutes = actual_duration.as_secs() as f64 / 60.0;
let growth_rate = memory_increase as f64 / test_minutes;
println!(" Memory growth rate: {:.2} KB/minute", growth_rate);
if growth_rate > 100.0 {
eprintln!("WARNING: High memory growth rate detected!");
}
}
println!("Long-running test completed successfully!");
println!("Note: This is a shortened version. In production, run the full 7-day test.");
}
#[tokio::test]
async fn test_memory_stability_under_load() {
println!("Starting memory stability test under high load...");
let sdk = Arc::new(
TestSdkBuilder::new()
.with_network_simulator(NetworkSimulator::wifi())
.build()
.await
.unwrap(),
);
let device_id = test_device_id();
let running = Arc::new(AtomicBool::new(true));
let mut handles: Vec<JoinHandle<()>> = Vec::new();
for i in 0..10 {
let sdk_clone = sdk.clone();
let device_id_clone = device_id;
let running_clone = running.clone();
let handle = tokio::spawn(async move {
let mut local_count = 0;
while running_clone.load(Ordering::Relaxed) {
let message = format!("High load test message {} from task {}", local_count, i);
let _ = sdk_clone
.send(device_id_clone, MessagePayload::Text(message))
.await;
local_count += 1;
sleep(Duration::from_millis(10)).await;
}
});
handles.push(handle);
}
let initial_memory = get_memory_usage().await.unwrap();
println!("Initial memory usage: {} KB", initial_memory);
sleep(Duration::from_secs(10)).await;
running.store(false, Ordering::Relaxed);
for handle in handles {
let _ = handle.await;
}
sdk.stop().await;
let final_memory = get_memory_usage().await.unwrap();
let memory_increase = final_memory.saturating_sub(initial_memory);
println!("Final memory usage: {} KB", final_memory);
println!(
"Memory increase: {} KB ({:.2} MB)",
memory_increase,
memory_increase as f64 / 1024.0
);
assert!(
memory_increase < 5 * 1024,
"Memory instability detected under load: increased by {} KB",
memory_increase
);
println!("Memory stability test completed successfully!");
}
#[tokio::test]
async fn test_resource_cleanup() {
println!("Starting resource cleanup test...");
let initial_memory = get_memory_usage().await.unwrap();
for _i in 0..100 {
let device_id = test_device_id();
let sdk = create_memory_sdk(device_id).await.unwrap();
drop(sdk);
sleep(Duration::from_millis(100)).await;
}
sleep(Duration::from_secs(5)).await;
{
let _force_gc: Vec<u8> = vec![0; 1024 * 1024]; }
sleep(Duration::from_millis(500)).await;
{
let _force_gc2: Vec<u8> = vec![0; 2 * 1024 * 1024]; }
sleep(Duration::from_millis(500)).await;
let final_memory = get_memory_usage().await.unwrap();
let memory_increase = final_memory.saturating_sub(initial_memory);
println!("Initial memory: {} KB", initial_memory);
println!("Final memory: {} KB", final_memory);
println!("Memory increase: {} KB", memory_increase);
assert!(
memory_increase < 3072, "Resource cleanup issue detected: memory increased by {} KB after 100 SDK instances",
memory_increase
);
println!("Resource cleanup test completed successfully!");
}