#![allow(dead_code)]
use anyhow::Result;
use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::RwLock;
use super::helpers::{check_model_health, skip_slow_tests, test_config};
pub struct ExtendedTestHarness {
pub config: selfware::config::Config,
pub metrics: Arc<TestMetrics>,
pub checkpoint_dir: PathBuf,
pub start_time: Instant,
}
#[derive(Debug, Default)]
pub struct TestMetrics {
pub requests: AtomicU64,
pub successes: AtomicU64,
pub failures: AtomicU64,
pub total_tokens: AtomicU64,
pub latencies: RwLock<Vec<Duration>>,
}
impl TestMetrics {
pub fn new() -> Self {
Self::default()
}
pub fn record_request(&self, success: bool, duration: Duration, tokens: u64) {
self.requests.fetch_add(1, Ordering::SeqCst);
if success {
self.successes.fetch_add(1, Ordering::SeqCst);
} else {
self.failures.fetch_add(1, Ordering::SeqCst);
}
self.total_tokens.fetch_add(tokens, Ordering::SeqCst);
if let Ok(mut latencies) = self.latencies.try_write() {
latencies.push(duration);
}
}
pub fn request_count(&self) -> u64 {
self.requests.load(Ordering::SeqCst)
}
pub fn success_count(&self) -> u64 {
self.successes.load(Ordering::SeqCst)
}
pub fn failure_count(&self) -> u64 {
self.failures.load(Ordering::SeqCst)
}
pub fn success_rate(&self) -> f64 {
let total = self.request_count();
if total == 0 {
return 0.0;
}
self.success_count() as f64 / total as f64
}
}
#[derive(Debug)]
pub struct TestReport {
pub test_name: String,
pub duration: Duration,
pub requests: u64,
pub successes: u64,
pub failures: u64,
pub success_rate: f64,
pub avg_latency_ms: u64,
pub p50_latency_ms: u64,
pub p95_latency_ms: u64,
pub p99_latency_ms: u64,
pub total_tokens: u64,
pub checkpoints: Vec<CheckpointInfo>,
}
#[derive(Debug, Clone)]
pub struct CheckpointInfo {
pub timestamp: u64,
pub requests_at_checkpoint: u64,
pub state_size_bytes: u64,
}
impl ExtendedTestHarness {
pub fn new() -> Result<Self> {
let config = test_config();
let checkpoint_dir = std::env::temp_dir().join("selfware-extended-tests");
std::fs::create_dir_all(&checkpoint_dir)?;
Ok(Self {
config,
metrics: Arc::new(TestMetrics::new()),
checkpoint_dir,
start_time: Instant::now(),
})
}
pub async fn run_with_checkpoints<F, Fut>(
&self,
test_name: &str,
checkpoint_interval: Duration,
max_duration: Duration,
mut task: F,
) -> Result<TestReport>
where
F: FnMut(Arc<TestMetrics>) -> Fut,
Fut: std::future::Future<Output = Result<()>>,
{
let mut checkpoints = Vec::new();
let start = Instant::now();
let mut iteration = 0u64;
while start.elapsed() < max_duration {
if let Err(e) = task(Arc::clone(&self.metrics)).await {
eprintln!("Task iteration {} failed: {}", iteration, e);
}
iteration += 1;
if start.elapsed() > checkpoint_interval * (checkpoints.len() as u32 + 1) {
let checkpoint = CheckpointInfo {
timestamp: start.elapsed().as_secs(),
requests_at_checkpoint: self.metrics.request_count(),
state_size_bytes: 0, };
checkpoints.push(checkpoint);
println!(
"Checkpoint {}: {} requests completed",
checkpoints.len(),
self.metrics.request_count()
);
}
}
self.generate_report(test_name, start.elapsed(), checkpoints)
.await
}
async fn generate_report(
&self,
test_name: &str,
duration: Duration,
checkpoints: Vec<CheckpointInfo>,
) -> Result<TestReport> {
let latencies = self.metrics.latencies.read().await;
let mut sorted_latencies: Vec<u64> =
latencies.iter().map(|d| d.as_millis() as u64).collect();
sorted_latencies.sort();
let avg_latency_ms = if !sorted_latencies.is_empty() {
sorted_latencies.iter().sum::<u64>() / sorted_latencies.len() as u64
} else {
0
};
let percentile = |p: f64| -> u64 {
if sorted_latencies.is_empty() {
return 0;
}
let idx =
((sorted_latencies.len() as f64 * p) as usize).min(sorted_latencies.len() - 1);
sorted_latencies[idx]
};
Ok(TestReport {
test_name: test_name.to_string(),
duration,
requests: self.metrics.request_count(),
successes: self.metrics.success_count(),
failures: self.metrics.failure_count(),
success_rate: self.metrics.success_rate(),
avg_latency_ms,
p50_latency_ms: percentile(0.50),
p95_latency_ms: percentile(0.95),
p99_latency_ms: percentile(0.99),
total_tokens: self.metrics.total_tokens.load(Ordering::SeqCst),
checkpoints,
})
}
}
impl Default for ExtendedTestHarness {
fn default() -> Self {
Self::new().expect("Failed to create test harness")
}
}
#[tokio::test]
#[ignore = "Simulation-only; run explicitly with: cargo test extended_coding_session --ignored"]
async fn extended_coding_session_2h() {
if skip_slow_tests() {
println!("Skipping: extended test (SELFWARE_SKIP_SLOW=1)");
eprintln!("Skipping extended test (SELFWARE_SKIP_SLOW=1)");
return;
}
let harness = match ExtendedTestHarness::new() {
Ok(h) => h,
Err(e) => {
println!("Skipping: failed to create harness: {}", e);
eprintln!("Failed to create harness: {}", e);
return;
}
};
if !check_model_health(&harness.config).await.unwrap_or(false) {
println!("Skipping: LLM endpoint not available");
eprintln!("Model not available, skipping extended test");
return;
}
let duration_secs: u64 = std::env::var("EXTENDED_TEST_DURATION")
.ok()
.and_then(|s| s.parse().ok())
.unwrap_or(7200);
let max_duration = Duration::from_secs(duration_secs);
let checkpoint_interval = Duration::from_secs(900);
let report = harness
.run_with_checkpoints(
"extended_coding_session_2h",
checkpoint_interval,
max_duration,
|metrics| async move {
let start = Instant::now();
let success = true;
let tokens = 100u64;
metrics.record_request(success, start.elapsed(), tokens);
tokio::time::sleep(Duration::from_secs(1)).await;
Ok(())
},
)
.await;
match report {
Ok(report) => {
println!("\n=== Extended Coding Session Report ===");
println!("Duration: {:?}", report.duration);
println!("Requests: {}", report.requests);
println!("Success Rate: {:.2}%", report.success_rate * 100.0);
println!("Avg Latency: {}ms", report.avg_latency_ms);
println!("P95 Latency: {}ms", report.p95_latency_ms);
println!("Total Tokens: {}", report.total_tokens);
println!("Checkpoints: {}", report.checkpoints.len());
assert!(report.success_rate > 0.9, "Success rate too low");
}
Err(e) => {
eprintln!("Test failed: {}", e);
}
}
}
#[tokio::test]
#[ignore = "Simulation-only; run explicitly with: cargo test extended_multi_agent --ignored"]
async fn extended_multi_agent_collaboration_1h() {
if skip_slow_tests() {
println!("Skipping: extended test (SELFWARE_SKIP_SLOW=1)");
eprintln!("Skipping extended test (SELFWARE_SKIP_SLOW=1)");
return;
}
let harness = match ExtendedTestHarness::new() {
Ok(h) => h,
Err(e) => {
println!("Skipping: failed to create harness: {}", e);
eprintln!("Failed to create harness: {}", e);
return;
}
};
if !check_model_health(&harness.config).await.unwrap_or(false) {
println!("Skipping: LLM endpoint not available");
eprintln!("Model not available, skipping extended test");
return;
}
let duration_secs: u64 = std::env::var("EXTENDED_TEST_DURATION")
.ok()
.and_then(|s| s.parse().ok())
.unwrap_or(3600);
let max_duration = Duration::from_secs(duration_secs);
let checkpoint_interval = Duration::from_secs(600);
let report = harness
.run_with_checkpoints(
"extended_multi_agent_1h",
checkpoint_interval,
max_duration,
|metrics| async move {
let agent_count = 4;
let mut handles = Vec::new();
for agent_id in 0..agent_count {
let m = Arc::clone(&metrics);
let handle = tokio::spawn(async move {
let start = Instant::now();
tokio::time::sleep(Duration::from_millis(100 * agent_id as u64)).await;
m.record_request(true, start.elapsed(), 50);
});
handles.push(handle);
}
for handle in handles {
let _ = handle.await;
}
Ok(())
},
)
.await;
match report {
Ok(report) => {
println!("\n=== Multi-Agent Collaboration Report ===");
println!("Duration: {:?}", report.duration);
println!("Requests: {}", report.requests);
println!("Success Rate: {:.2}%", report.success_rate * 100.0);
println!("Avg Latency: {}ms", report.avg_latency_ms);
assert!(report.success_rate > 0.9);
}
Err(e) => {
eprintln!("Test failed: {}", e);
}
}
}
#[tokio::test]
#[ignore = "Simulation-only; exercises serde round-trip, not real agent checkpointing"]
async fn extended_checkpoint_resume_cycle() {
if skip_slow_tests() {
return;
}
let harness = match ExtendedTestHarness::new() {
Ok(h) => h,
Err(e) => {
eprintln!("Failed to create harness: {}", e);
return;
}
};
let checkpoint_path = harness.checkpoint_dir.join("test_checkpoint.json");
let state_before = HashMap::from([
("task_id".to_string(), "test-123".to_string()),
("step".to_string(), "5".to_string()),
("status".to_string(), "in_progress".to_string()),
]);
let json = serde_json::to_string_pretty(&state_before).expect("Serialize failed");
std::fs::write(&checkpoint_path, &json).expect("Write checkpoint failed");
let restored_json = std::fs::read_to_string(&checkpoint_path).expect("Read failed");
let state_after: HashMap<String, String> =
serde_json::from_str(&restored_json).expect("Deserialize failed");
assert_eq!(state_before, state_after);
assert_eq!(state_after.get("task_id"), Some(&"test-123".to_string()));
assert_eq!(state_after.get("step"), Some(&"5".to_string()));
println!("Checkpoint/resume cycle test passed");
let _ = std::fs::remove_file(&checkpoint_path);
}
#[tokio::test]
#[ignore = "Simulation-only; run explicitly with: cargo test extended_stress --ignored"]
async fn extended_stress_100_requests() {
if skip_slow_tests() {
return;
}
let harness = match ExtendedTestHarness::new() {
Ok(h) => h,
Err(e) => {
eprintln!("Failed to create harness: {}", e);
return;
}
};
if !check_model_health(&harness.config).await.unwrap_or(false) {
eprintln!("Model not available");
return;
}
let request_count = 100;
let start = Instant::now();
for i in 0..request_count {
let req_start = Instant::now();
tokio::time::sleep(Duration::from_millis(10)).await;
let duration = req_start.elapsed();
harness.metrics.record_request(true, duration, 50);
if (i + 1) % 25 == 0 {
println!("Completed {}/{} requests", i + 1, request_count);
}
}
let total_duration = start.elapsed();
println!("\n=== Stress Test Report ===");
println!("Total Duration: {:?}", total_duration);
println!("Requests: {}", harness.metrics.request_count());
println!(
"Throughput: {:.2} req/s",
request_count as f64 / total_duration.as_secs_f64()
);
let latencies = harness.metrics.latencies.read().await;
if !latencies.is_empty() {
let mut sorted: Vec<u64> = latencies.iter().map(|d| d.as_millis() as u64).collect();
sorted.sort();
println!("P50 Latency: {}ms", sorted[sorted.len() / 2]);
println!(
"P95 Latency: {}ms",
sorted[(sorted.len() as f64 * 0.95) as usize]
);
println!(
"P99 Latency: {}ms",
sorted[(sorted.len() as f64 * 0.99) as usize]
);
}
assert_eq!(harness.metrics.request_count(), request_count);
}
#[tokio::test]
#[ignore = "Simulation-only; run explicitly with: cargo test extended_long_context --ignored"]
async fn extended_long_context_conversation() {
if skip_slow_tests() {
return;
}
let harness = match ExtendedTestHarness::new() {
Ok(h) => h,
Err(e) => {
eprintln!("Failed to create harness: {}", e);
return;
}
};
if !check_model_health(&harness.config).await.unwrap_or(false) {
eprintln!("Model not available");
return;
}
let mut context_size = 0usize;
let turn_count = 50;
for turn in 0..turn_count {
let start = Instant::now();
let message_len = 100 + turn * 50; context_size += message_len;
let processing_time = (context_size / 1000) as u64 + 10;
tokio::time::sleep(Duration::from_millis(processing_time)).await;
harness
.metrics
.record_request(true, start.elapsed(), message_len as u64);
if (turn + 1) % 10 == 0 {
println!(
"Turn {}/{}: Context size {} chars",
turn + 1,
turn_count,
context_size
);
}
}
println!("\n=== Long Context Test Report ===");
println!("Turns: {}", turn_count);
println!("Final Context Size: {} chars", context_size);
println!("Total Requests: {}", harness.metrics.request_count());
assert_eq!(harness.metrics.request_count(), turn_count as u64);
}
#[tokio::test]
#[ignore = "Simulation-only; run explicitly with: cargo test extended_concurrent --ignored"]
async fn extended_concurrent_requests() {
if skip_slow_tests() {
return;
}
let harness = ExtendedTestHarness::new().expect("Harness creation failed");
let metrics = harness.metrics.clone();
let concurrent_count = 10;
let requests_per_worker = 20;
let start = Instant::now();
let mut handles = Vec::new();
for worker_id in 0..concurrent_count {
let m = Arc::clone(&metrics);
let handle = tokio::spawn(async move {
for req_id in 0..requests_per_worker {
let req_start = Instant::now();
tokio::time::sleep(Duration::from_millis(5 + (worker_id * 2) as u64)).await;
m.record_request(true, req_start.elapsed(), 25);
if req_id % 5 == 0 {
println!(
"Worker {} completed request {}/{}",
worker_id,
req_id + 1,
requests_per_worker
);
}
}
});
handles.push(handle);
}
for handle in handles {
handle.await.expect("Worker panicked");
}
let total_duration = start.elapsed();
let total_requests = concurrent_count * requests_per_worker;
println!("\n=== Concurrent Requests Report ===");
println!("Workers: {}", concurrent_count);
println!("Requests per Worker: {}", requests_per_worker);
println!("Total Requests: {}", total_requests);
println!("Duration: {:?}", total_duration);
println!(
"Throughput: {:.2} req/s",
total_requests as f64 / total_duration.as_secs_f64()
);
assert_eq!(metrics.request_count(), total_requests as u64);
}
#[tokio::test]
#[ignore = "Simulation-only; run explicitly with: cargo test extended_memory --ignored"]
async fn extended_memory_tracking() {
if skip_slow_tests() {
return;
}
#[derive(Debug)]
struct MemorySnapshot {
iteration: u64,
allocated_bytes: usize,
}
let mut snapshots = Vec::new();
let mut accumulated_data = Vec::new();
for i in 0..100 {
accumulated_data.push(vec![0u8; 10_000]);
if i % 10 == 0 {
let snapshot = MemorySnapshot {
iteration: i,
allocated_bytes: accumulated_data.len() * 10_000,
};
snapshots.push(snapshot);
}
}
println!("\n=== Memory Tracking Report ===");
for snapshot in &snapshots {
println!(
"Iteration {}: {} KB allocated",
snapshot.iteration,
snapshot.allocated_bytes / 1024
);
}
assert!(snapshots.len() >= 10);
assert!(snapshots.last().unwrap().allocated_bytes > snapshots.first().unwrap().allocated_bytes);
}
#[tokio::test]
#[ignore = "Simulation-only; run explicitly with: cargo test extended_error_recovery --ignored"]
async fn extended_error_recovery() {
if skip_slow_tests() {
return;
}
let harness = ExtendedTestHarness::new().expect("Harness creation failed");
let total_requests = 50;
let failure_rate = 0.2;
for i in 0..total_requests {
let start = Instant::now();
let should_fail = (i as f64 / total_requests as f64) < failure_rate && i > 0;
tokio::time::sleep(Duration::from_millis(10)).await;
if should_fail {
harness.metrics.record_request(false, start.elapsed(), 0);
} else {
harness.metrics.record_request(true, start.elapsed(), 50);
}
}
println!("\n=== Error Recovery Report ===");
println!("Total Requests: {}", harness.metrics.request_count());
println!("Successes: {}", harness.metrics.success_count());
println!("Failures: {}", harness.metrics.failure_count());
println!(
"Success Rate: {:.2}%",
harness.metrics.success_rate() * 100.0
);
assert!(harness.metrics.failure_count() > 0);
assert!(harness.metrics.success_count() > harness.metrics.failure_count());
}
#[tokio::test]
#[ignore = "Simulation-only; run explicitly with: cargo test extended_timeout --ignored"]
async fn extended_timeout_handling() {
if skip_slow_tests() {
return;
}
let harness = ExtendedTestHarness::new().expect("Harness creation failed");
let timeout = Duration::from_millis(100);
for i in 0..20 {
let start = Instant::now();
let delay = if i % 5 == 0 {
Duration::from_millis(150) } else {
Duration::from_millis(50) };
let result = tokio::time::timeout(timeout, async {
tokio::time::sleep(delay).await;
true
})
.await;
match result {
Ok(_) => harness.metrics.record_request(true, start.elapsed(), 50),
Err(_) => harness.metrics.record_request(false, start.elapsed(), 0),
}
}
println!("\n=== Timeout Handling Report ===");
println!("Total: {}", harness.metrics.request_count());
println!("Timeouts: {}", harness.metrics.failure_count());
println!(
"Success Rate: {:.2}%",
harness.metrics.success_rate() * 100.0
);
assert!(harness.metrics.failure_count() > 0);
}
pub fn print_report(report: &TestReport) {
println!("\n╔══════════════════════════════════════════════════╗");
println!("║ TEST REPORT: {} ", report.test_name);
println!("╠══════════════════════════════════════════════════╣");
println!(
"║ Duration: {:>10?} ",
report.duration
);
println!(
"║ Requests: {:>10} ",
report.requests
);
println!(
"║ Successes: {:>10} ",
report.successes
);
println!(
"║ Failures: {:>10} ",
report.failures
);
println!(
"║ Success Rate: {:>9.2}% ",
report.success_rate * 100.0
);
println!("╠══════════════════════════════════════════════════╣");
println!("║ Latency (ms): ║");
println!(
"║ Average: {:>10} ",
report.avg_latency_ms
);
println!(
"║ P50: {:>10} ",
report.p50_latency_ms
);
println!(
"║ P95: {:>10} ",
report.p95_latency_ms
);
println!(
"║ P99: {:>10} ",
report.p99_latency_ms
);
println!("╠══════════════════════════════════════════════════╣");
println!(
"║ Total Tokens: {:>10} ",
report.total_tokens
);
println!(
"║ Checkpoints: {:>10} ",
report.checkpoints.len()
);
println!("╚══════════════════════════════════════════════════╝");
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_metrics_creation() {
let metrics = TestMetrics::new();
assert_eq!(metrics.request_count(), 0);
assert_eq!(metrics.success_rate(), 0.0);
}
#[test]
fn test_metrics_record_success() {
let metrics = TestMetrics::new();
metrics.record_request(true, Duration::from_millis(100), 50);
assert_eq!(metrics.request_count(), 1);
assert_eq!(metrics.success_count(), 1);
assert_eq!(metrics.failure_count(), 0);
assert_eq!(metrics.success_rate(), 1.0);
}
#[test]
fn test_metrics_record_failure() {
let metrics = TestMetrics::new();
metrics.record_request(false, Duration::from_millis(100), 0);
assert_eq!(metrics.request_count(), 1);
assert_eq!(metrics.success_count(), 0);
assert_eq!(metrics.failure_count(), 1);
assert_eq!(metrics.success_rate(), 0.0);
}
#[test]
fn test_metrics_success_rate() {
let metrics = TestMetrics::new();
metrics.record_request(true, Duration::from_millis(10), 10);
metrics.record_request(true, Duration::from_millis(10), 10);
metrics.record_request(false, Duration::from_millis(10), 0);
metrics.record_request(true, Duration::from_millis(10), 10);
assert_eq!(metrics.request_count(), 4);
assert_eq!(metrics.success_count(), 3);
assert_eq!(metrics.failure_count(), 1);
assert_eq!(metrics.success_rate(), 0.75);
}
#[tokio::test]
async fn test_harness_creation() {
let harness = ExtendedTestHarness::new();
assert!(harness.is_ok());
}
#[test]
fn test_checkpoint_info() {
let checkpoint = CheckpointInfo {
timestamp: 1000,
requests_at_checkpoint: 50,
state_size_bytes: 1024,
};
assert_eq!(checkpoint.timestamp, 1000);
assert_eq!(checkpoint.requests_at_checkpoint, 50);
}
#[test]
fn test_report_creation() {
let report = TestReport {
test_name: "test".to_string(),
duration: Duration::from_secs(60),
requests: 100,
successes: 95,
failures: 5,
success_rate: 0.95,
avg_latency_ms: 50,
p50_latency_ms: 45,
p95_latency_ms: 80,
p99_latency_ms: 120,
total_tokens: 5000,
checkpoints: vec![],
};
assert_eq!(report.requests, 100);
assert!(report.success_rate > 0.9);
}
}