#[cfg(test)]
mod cpu_gpu_overlap_tests {
use super::*;
use crate::device::async_execution::AsyncExecutor;
use crate::ops::async_binary::{
add_async, add_async_priority, batch_add_async, global_async_executor, mul_async,
};
use crate::ops::hybrid_scheduler::{HybridWorkScheduler, WorkPriority};
use crate::Device;
use crate::Tensor;
use futures::StreamExt;
use std::sync::Arc;
use std::time::{Duration, Instant};
#[cfg(feature = "gpu")]
use crate::gpu::multi_stream_executor::MultiStreamGpuExecutor;
#[tokio::test]
#[ignore = "Slow test - run with --ignored if needed"]
async fn test_basic_async_operations() {
let a = Tensor::<f32>::from_vec(vec![1.0, 2.0, 3.0, 4.0], &[4])
.expect("test: from_vec should succeed");
let b = Tensor::<f32>::from_vec(vec![5.0, 6.0, 7.0, 8.0], &[4])
.expect("test: from_vec should succeed");
let start = Instant::now();
let result = add_async(&a, &b)
.await
.expect("test: operation should succeed");
let add_time = start.elapsed();
println!("Async add took: {:?}", add_time);
if let crate::tensor::TensorStorage::Cpu(arr) = &result.storage {
assert_eq!(
arr.as_slice().expect("tensor should be contiguous"),
&[6.0, 8.0, 10.0, 12.0]
);
}
let start = Instant::now();
let result = mul_async(&a, &b)
.await
.expect("test: operation should succeed");
let mul_time = start.elapsed();
println!("Async mul took: {:?}", mul_time);
if let crate::tensor::TensorStorage::Cpu(arr) = &result.storage {
assert_eq!(
arr.as_slice().expect("tensor should be contiguous"),
&[5.0, 12.0, 21.0, 32.0]
);
}
}
#[tokio::test]
#[ignore = "Slow test - run with --ignored if needed"]
async fn test_priority_based_execution() {
let a = Tensor::<f32>::from_vec(vec![1.0, 2.0, 3.0], &[3])
.expect("test: from_vec should succeed");
let b = Tensor::<f32>::from_vec(vec![4.0, 5.0, 6.0], &[3])
.expect("test: from_vec should succeed");
let start = Instant::now();
let result = add_async_priority(&a, &b, WorkPriority::High)
.await
.expect("test: operation should succeed");
let high_priority_time = start.elapsed();
println!("High priority async add took: {:?}", high_priority_time);
if let crate::tensor::TensorStorage::Cpu(arr) = &result.storage {
assert_eq!(
arr.as_slice().expect("tensor should be contiguous"),
&[5.0, 7.0, 9.0]
);
}
let start = Instant::now();
let result = add_async_priority(&a, &b, WorkPriority::Normal)
.await
.expect("test: operation should succeed");
let normal_priority_time = start.elapsed();
println!("Normal priority async add took: {:?}", normal_priority_time);
if let crate::tensor::TensorStorage::Cpu(arr) = &result.storage {
assert_eq!(
arr.as_slice().expect("tensor should be contiguous"),
&[5.0, 7.0, 9.0]
);
}
}
#[tokio::test]
#[ignore = "Slow test - run with --ignored if needed"]
async fn test_batch_processing() {
let tensors_a = [
Tensor::<f32>::from_vec(vec![1.0, 2.0], &[2]).expect("test: from_vec should succeed"),
Tensor::<f32>::from_vec(vec![3.0, 4.0], &[2]).expect("test: from_vec should succeed"),
Tensor::<f32>::from_vec(vec![5.0, 6.0], &[2]).expect("test: from_vec should succeed"),
Tensor::<f32>::from_vec(vec![7.0, 8.0], &[2]).expect("test: from_vec should succeed"),
];
let tensors_b = [
Tensor::<f32>::from_vec(vec![2.0, 3.0], &[2]).expect("test: from_vec should succeed"),
Tensor::<f32>::from_vec(vec![4.0, 5.0], &[2]).expect("test: from_vec should succeed"),
Tensor::<f32>::from_vec(vec![6.0, 7.0], &[2]).expect("test: from_vec should succeed"),
Tensor::<f32>::from_vec(vec![8.0, 9.0], &[2]).expect("test: from_vec should succeed"),
];
let operations: Vec<_> = tensors_a.iter().zip(tensors_b.iter()).collect();
let start = Instant::now();
let results = batch_add_async(operations)
.await
.expect("test: operation should succeed");
let batch_time = start.elapsed();
println!(
"Batch processing {} operations took: {:?}",
results.len(),
batch_time
);
assert_eq!(results.len(), 4);
let expected_results = [
vec![3.0, 5.0],
vec![7.0, 9.0],
vec![11.0, 13.0],
vec![15.0, 17.0],
];
for (i, result) in results.iter().enumerate() {
if let crate::tensor::TensorStorage::Cpu(arr) = &result.storage {
assert_eq!(
arr.as_slice().expect("tensor should be contiguous"),
&expected_results[i]
);
}
}
}
#[tokio::test]
#[ignore = "Slow test - run with --ignored if needed"]
async fn test_concurrent_operations() {
let size = 1000;
let data_a: Vec<f32> = (0..size).map(|i| i as f32).collect();
let data_b: Vec<f32> = (0..size).map(|i| (i + 1) as f32).collect();
let a = Tensor::<f32>::from_vec(data_a, &[size]).expect("test: from_vec should succeed");
let b = Tensor::<f32>::from_vec(data_b, &[size]).expect("test: from_vec should succeed");
let num_operations = 10usize;
let mut futures = Vec::new();
let start = Instant::now();
for i in 0..num_operations {
let a_ref = &a;
let b_ref = &b;
let future = async move {
let result = add_async(a_ref, b_ref)
.await
.expect("test: operation should succeed");
println!("Operation {} completed", i);
result
};
futures.push(future);
}
let results = futures::future::join_all(futures).await;
let total_time = start.elapsed();
println!(
"Completed {} concurrent operations in {:?}",
num_operations, total_time
);
println!(
"Average time per operation: {:?}",
total_time / num_operations as u32
);
assert_eq!(results.len(), num_operations);
for result in results {
if let crate::tensor::TensorStorage::Cpu(arr) = &result.storage {
let slice = arr.as_slice().expect("tensor should be contiguous");
for (i, &value) in slice.iter().enumerate().take(size.min(10)) {
assert_eq!(value, (i as f32) + (i + 1) as f32);
}
}
}
}
#[tokio::test]
#[ignore = "Slow test - run with --ignored if needed"]
async fn test_hybrid_scheduler_integration() {
let executor = global_async_executor();
let a = Tensor::<f32>::from_vec(vec![1.0, 2.0, 3.0, 4.0, 5.0], &[5])
.expect("test: from_vec should succeed");
let b = Tensor::<f32>::from_vec(vec![5.0, 4.0, 3.0, 2.0, 1.0], &[5])
.expect("test: from_vec should succeed");
assert!(executor.is_idle());
let start = Instant::now();
let result = executor
.execute_async(&a, &b, crate::ops::binary::AddOp)
.await
.expect("test: operation should succeed");
let execution_time = start.elapsed();
println!("Hybrid scheduler execution took: {:?}", execution_time);
if let crate::tensor::TensorStorage::Cpu(arr) = &result.storage {
assert_eq!(
arr.as_slice().expect("tensor should be contiguous"),
&[6.0, 6.0, 6.0, 6.0, 6.0]
);
}
executor.synchronize();
assert!(executor.is_idle());
}
#[tokio::test]
#[ignore = "Slow test - run with --ignored if needed"]
async fn test_performance_metrics() {
use crate::memory::{global_monitor, OperationTimer};
let monitor = global_monitor();
monitor.clear();
let a = Tensor::<f32>::from_vec(vec![1.0; 1000], &[1000])
.expect("test: from_vec should succeed");
let b = Tensor::<f32>::from_vec(vec![2.0; 1000], &[1000])
.expect("test: from_vec should succeed");
let num_ops = 5;
for i in 0..num_ops {
let _timer = OperationTimer::new(
format!("async_add_{}", i),
crate::memory::global_monitor_arc(),
);
let _result = add_async(&a, &b)
.await
.expect("test: operation should succeed");
}
let avg_time = monitor.get_average_time("async_add_0");
if let Some(time) = avg_time {
println!("Average async add time: {:?}", time);
assert!(time < Duration::from_secs(1)); }
let current_memory = monitor.get_current_memory();
let peak_memory = monitor.get_peak_memory();
println!("Current memory usage: {} bytes", current_memory);
println!("Peak memory usage: {} bytes", peak_memory);
let report = monitor.generate_report();
println!("Performance Report:\n{}", report);
assert!(report.contains("Performance Monitor Report"));
}
#[tokio::test]
#[ignore = "Slow test - run with --ignored if needed"]
async fn test_large_tensor_operations() {
let size = 10000;
let data_a: Vec<f32> = (0..size).map(|i| (i % 100) as f32).collect();
let data_b: Vec<f32> = (0..size).map(|i| ((i + 50) % 100) as f32).collect();
let a = Tensor::<f32>::from_vec(data_a, &[size]).expect("test: from_vec should succeed");
let b = Tensor::<f32>::from_vec(data_b, &[size]).expect("test: from_vec should succeed");
let start = Instant::now();
let result1 = add_async(&a, &b)
.await
.expect("test: operation should succeed");
let result2 = mul_async(&a, &b)
.await
.expect("test: operation should succeed");
let sequential_time = start.elapsed();
println!("Sequential execution took: {:?}", sequential_time);
let start = Instant::now();
let (result3, result4) = tokio::join!(add_async(&a, &b), mul_async(&a, &b));
let concurrent_time = start.elapsed();
println!("Concurrent execution took: {:?}", concurrent_time);
assert_eq!(
result1.shape(),
result3
.as_ref()
.expect("test: value should be present")
.shape()
);
assert_eq!(
result2.shape(),
result4
.as_ref()
.expect("test: value should be present")
.shape()
);
let speedup_ratio = sequential_time.as_secs_f64() / concurrent_time.as_secs_f64();
println!("Speedup ratio: {:.2}", speedup_ratio);
assert!(speedup_ratio > 0.8); }
#[tokio::test]
#[ignore = "Slow test - run with --ignored if needed"]
async fn test_error_handling() {
let a = Tensor::<f32>::from_vec(vec![1.0, 2.0, 3.0], &[3])
.expect("test: from_vec should succeed");
let b =
Tensor::<f32>::from_vec(vec![4.0, 5.0], &[2]).expect("test: from_vec should succeed");
let result = add_async(&a, &b).await;
assert!(result.is_err());
if let Err(e) = result {
println!("Expected error: {:?}", e);
assert!(e.to_string().contains("Shape"));
}
}
#[tokio::test]
#[ignore = "Slow test - run with --ignored if needed"]
async fn test_streaming_operations() {
use tokio_stream::{self as stream};
let base_data = vec![1.0, 2.0, 3.0, 4.0, 5.0];
let base_tensor =
Tensor::<f32>::from_vec(base_data, &[5]).expect("test: from_vec should succeed");
let operation_stream = stream::iter(0..10)
.map(|i| {
let scalar_data = vec![i as f32; 5];
let scalar_tensor = Tensor::<f32>::from_vec(scalar_data, &[5])
.expect("test: from_vec should succeed");
(base_tensor.clone(), scalar_tensor)
})
.map(|(a, b)| async move { add_async(&a, &b).await })
.buffered(3);
let results: Vec<_> = operation_stream.collect().await;
assert_eq!(results.len(), 10);
for (i, result) in results.iter().enumerate() {
if let Ok(tensor) = result {
if let crate::tensor::TensorStorage::Cpu(arr) = &tensor.storage {
let slice = arr.as_slice().expect("tensor should be contiguous");
assert_eq!(slice[0], 1.0 + i as f32);
assert_eq!(slice[4], 5.0 + i as f32);
}
}
}
println!(
"Successfully processed {} streaming operations",
results.len()
);
}
}