#![allow(dead_code)]
use crate::error::{BackendError, BackendResult};
use crate::{Device, MemoryManager};
use std::collections::HashMap;
use std::sync::{Arc, RwLock};
use torsh_core::device::DeviceType;
#[cfg(feature = "cuda")]
use crate::cuda::CudaDevice as SciRs2CudaDevice;
#[cfg(feature = "cuda")]
mod scirs2_cuda {
#[derive(Debug)]
pub struct MockCudaDevice {
id: usize,
}
pub mod memory {
pub enum MemoryAdvice {
SetPreferredLocation(u32),
SetAccessedBy(u32),
SetReadMostly,
UnsetReadMostly,
}
pub async fn prefetch_async(
_device: &crate::cuda::CudaDevice,
_ptr: *const u8,
_size: usize,
) -> Result<(), String> {
Err("CUDA not available".to_string())
}
pub async fn set_advice(
_ptr: *const u8,
_size: usize,
_advice: MemoryAdvice,
) -> Result<(), String> {
Err("CUDA not available".to_string())
}
pub async fn copy_peer_to_peer(
_src: *const u8,
_dst: *mut u8,
_size: usize,
) -> Result<(), String> {
Err("CUDA not available".to_string())
}
pub async fn copy_host_to_device_async(
_src: *const u8,
_dst: *mut u8,
_size: usize,
) -> Result<(), String> {
Err("CUDA not available".to_string())
}
pub async fn copy_device_to_host_async(
_src: *const u8,
_dst: *mut u8,
_size: usize,
) -> Result<(), String> {
Err("CUDA not available".to_string())
}
pub fn copy_host_to_device(
_src: *const u8,
_dst: *mut u8,
_size: usize,
) -> Result<(), String> {
Err("CUDA not available".to_string())
}
pub fn copy_device_to_host(
_src: *const u8,
_dst: *mut u8,
_size: usize,
) -> Result<(), String> {
Err("CUDA not available".to_string())
}
}
pub fn synchronize(_device: &crate::cuda::CudaDevice) -> Result<(), String> {
Err("CUDA not available".to_string())
}
}
#[cfg(all(feature = "metal", target_os = "macos", target_arch = "aarch64"))]
use crate::metal::MetalDevice as SciRs2MetalDevice;
#[cfg(all(feature = "metal", target_os = "macos", target_arch = "aarch64"))]
mod scirs2_metal {
pub mod memory {
use crate::metal::device::MetalDevice;
pub enum CpuCacheMode {
WriteCombined,
}
pub fn set_cpu_cache_mode(
_device: &MetalDevice,
_ptr: *mut u8,
_mode: CpuCacheMode,
) -> Result<(), String> {
Ok(())
}
pub async fn copy_host_to_device_async(
_device: &MetalDevice,
_src_ptr: *const u8,
_dst_ptr: *mut u8,
_size: usize,
) -> Result<(), String> {
Ok(())
}
pub async fn copy_device_to_host_async(
_device: &MetalDevice,
_src_ptr: *const u8,
_dst_ptr: *mut u8,
_size: usize,
) -> Result<(), String> {
Ok(())
}
pub fn copy_host_to_device(
_device: &MetalDevice,
_src_ptr: *const u8,
_dst_ptr: *mut u8,
_size: usize,
) -> Result<(), String> {
Ok(())
}
pub fn copy_device_to_host(
_device: &MetalDevice,
_src_ptr: *const u8,
_dst_ptr: *mut u8,
_size: usize,
) -> Result<(), String> {
Ok(())
}
}
pub fn synchronize(_device: &crate::metal::device::MetalDevice) -> Result<(), String> {
Ok(())
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum TransferMode {
Synchronous,
Asynchronous,
Streaming,
PeerToPeer,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum TransferDirection {
HostToDevice,
DeviceToHost,
DeviceToDevice,
CrossDevice,
}
#[derive(Debug, Clone, Copy)]
pub struct ZeroCopyCapabilities {
pub unified_memory: bool,
pub peer_to_peer: bool,
pub memory_mapping: bool,
pub direct_gpu_access: bool,
pub pinned_memory: bool,
pub memory_advice: bool,
pub async_transfers: bool,
pub streaming_transfers: bool,
}
impl Default for ZeroCopyCapabilities {
fn default() -> Self {
Self {
unified_memory: false,
peer_to_peer: false,
memory_mapping: false,
direct_gpu_access: false,
pinned_memory: false,
memory_advice: false,
async_transfers: false,
streaming_transfers: false,
}
}
}
impl ZeroCopyCapabilities {
pub fn has_any_capabilities(&self) -> bool {
self.unified_memory
|| self.peer_to_peer
|| self.memory_mapping
|| self.direct_gpu_access
|| self.pinned_memory
|| self.async_transfers
}
pub fn capability_score(&self) -> f32 {
let mut score = 0.0;
let total_features = 8.0;
if self.unified_memory {
score += 1.0;
}
if self.peer_to_peer {
score += 1.0;
}
if self.memory_mapping {
score += 1.0;
}
if self.direct_gpu_access {
score += 1.0;
}
if self.pinned_memory {
score += 1.0;
}
if self.memory_advice {
score += 1.0;
}
if self.async_transfers {
score += 1.0;
}
if self.streaming_transfers {
score += 1.0;
}
score / total_features
}
pub fn recommended_transfer_mode(&self) -> TransferMode {
if self.streaming_transfers {
TransferMode::Streaming
} else if self.async_transfers {
TransferMode::Asynchronous
} else if self.peer_to_peer {
TransferMode::PeerToPeer
} else {
TransferMode::Synchronous
}
}
}
#[derive(Debug, Clone)]
pub struct ZeroCopyTransfer {
pub source_device: Device,
pub destination_device: Device,
pub direction: TransferDirection,
pub mode: TransferMode,
pub source_ptr: *mut u8,
pub destination_ptr: *mut u8,
pub size: usize,
pub alignment: usize,
pub priority: u32,
pub stream_id: Option<u64>,
}
unsafe impl Send for ZeroCopyTransfer {}
unsafe impl Sync for ZeroCopyTransfer {}
impl ZeroCopyTransfer {
pub fn new(
source_device: Device,
destination_device: Device,
source_ptr: *mut u8,
destination_ptr: *mut u8,
size: usize,
) -> Self {
let direction = if source_device.device_type() == DeviceType::Cpu
&& destination_device.device_type() != DeviceType::Cpu
{
TransferDirection::HostToDevice
} else if source_device.device_type() != DeviceType::Cpu
&& destination_device.device_type() == DeviceType::Cpu
{
TransferDirection::DeviceToHost
} else if source_device.id() == destination_device.id() {
TransferDirection::DeviceToDevice
} else {
TransferDirection::CrossDevice
};
Self {
source_device,
destination_device,
direction,
mode: TransferMode::Synchronous,
source_ptr,
destination_ptr,
size,
alignment: 1,
priority: 1,
stream_id: None,
}
}
pub fn with_mode(mut self, mode: TransferMode) -> Self {
self.mode = mode;
self
}
pub fn with_alignment(mut self, alignment: usize) -> Self {
self.alignment = alignment;
self
}
pub fn with_priority(mut self, priority: u32) -> Self {
self.priority = priority;
self
}
pub fn with_stream(mut self, stream_id: u64) -> Self {
self.stream_id = Some(stream_id);
self
}
pub fn is_zero_copy_possible(&self, capabilities: &ZeroCopyCapabilities) -> bool {
match self.direction {
TransferDirection::HostToDevice | TransferDirection::DeviceToHost => {
capabilities.unified_memory || capabilities.pinned_memory
}
TransferDirection::DeviceToDevice => capabilities.memory_mapping,
TransferDirection::CrossDevice => capabilities.peer_to_peer,
}
}
pub fn estimate_bandwidth(&self, device_type: DeviceType) -> u64 {
match (device_type, self.direction) {
(DeviceType::Cuda(_), TransferDirection::HostToDevice) => {
if self.alignment >= 256 {
25_000_000_000 } else {
12_000_000_000 }
}
(DeviceType::Cuda(_), TransferDirection::DeviceToHost) => {
if self.alignment >= 256 {
20_000_000_000 } else {
10_000_000_000 }
}
(DeviceType::Cuda(_), TransferDirection::CrossDevice) => 50_000_000_000, (DeviceType::Metal(_), TransferDirection::HostToDevice) => 40_000_000_000, (DeviceType::Metal(_), TransferDirection::DeviceToHost) => 40_000_000_000,
(DeviceType::Wgpu(_), TransferDirection::HostToDevice) => 8_000_000_000, (DeviceType::Wgpu(_), TransferDirection::DeviceToHost) => 6_000_000_000, (DeviceType::Cpu, _) => 50_000_000_000, _ => 1_000_000_000, }
}
pub fn estimate_transfer_time_us(&self, device_type: DeviceType) -> u64 {
let bandwidth = self.estimate_bandwidth(device_type);
if bandwidth == 0 {
u64::MAX
} else {
(self.size as u64 * 1_000_000) / bandwidth
}
}
}
#[derive(Debug, Default, Clone)]
pub struct ZeroCopyStats {
pub total_transfers: u64,
pub zero_copy_transfers: u64,
pub fallback_transfers: u64,
pub zero_copy_bytes: u64,
pub fallback_bytes: u64,
pub total_transfer_time_us: u64,
pub average_bandwidth: f64,
pub error_count: u64,
}
impl ZeroCopyStats {
pub fn zero_copy_success_rate(&self) -> f64 {
if self.total_transfers == 0 {
0.0
} else {
(self.zero_copy_transfers as f64) / (self.total_transfers as f64)
}
}
pub fn bandwidth_efficiency(&self, theoretical_bandwidth: u64) -> f64 {
if theoretical_bandwidth == 0 {
0.0
} else {
self.average_bandwidth / (theoretical_bandwidth as f64)
}
}
pub fn error_rate(&self) -> f64 {
if self.total_transfers == 0 {
0.0
} else {
(self.error_count as f64) / (self.total_transfers as f64)
}
}
pub fn update_transfer(
&mut self,
bytes: u64,
time_us: u64,
was_zero_copy: bool,
was_error: bool,
) {
self.total_transfers += 1;
if was_error {
self.error_count += 1;
return;
}
if was_zero_copy {
self.zero_copy_transfers += 1;
self.zero_copy_bytes += bytes;
} else {
self.fallback_transfers += 1;
self.fallback_bytes += bytes;
}
self.total_transfer_time_us += time_us;
let total_bytes = self.zero_copy_bytes + self.fallback_bytes;
if self.total_transfer_time_us > 0 {
self.average_bandwidth =
(total_bytes as f64) / (self.total_transfer_time_us as f64 / 1_000_000.0);
}
}
}
pub struct ZeroCopyManager {
capabilities: Arc<RwLock<HashMap<String, ZeroCopyCapabilities>>>,
stats: Arc<RwLock<ZeroCopyStats>>,
memory_managers: HashMap<String, Arc<dyn MemoryManager>>,
#[cfg(feature = "cuda")]
cuda_devices: HashMap<String, Arc<SciRs2CudaDevice>>,
#[cfg(all(feature = "metal", target_os = "macos", target_arch = "aarch64"))]
metal_devices: HashMap<String, Arc<SciRs2MetalDevice>>,
}
impl ZeroCopyManager {
pub fn new() -> Self {
Self {
capabilities: Arc::new(RwLock::new(HashMap::new())),
stats: Arc::new(RwLock::new(ZeroCopyStats::default())),
memory_managers: HashMap::new(),
#[cfg(feature = "cuda")]
cuda_devices: HashMap::new(),
#[cfg(all(feature = "metal", target_os = "macos", target_arch = "aarch64"))]
metal_devices: HashMap::new(),
}
}
pub fn register_device(
&mut self,
device: &Device,
capabilities: ZeroCopyCapabilities,
memory_manager: Arc<dyn MemoryManager>,
) -> BackendResult<()> {
let device_key = format!("{}:{}", device.device_type(), device.id());
{
let mut caps = self
.capabilities
.write()
.expect("lock should not be poisoned");
caps.insert(device_key.clone(), capabilities);
}
self.memory_managers.insert(device_key, memory_manager);
Ok(())
}
#[cfg(feature = "cuda")]
pub fn register_cuda_device(
&mut self,
device: &Device,
scirs2_device: Arc<SciRs2CudaDevice>,
capabilities: ZeroCopyCapabilities,
memory_manager: Arc<dyn MemoryManager>,
) -> BackendResult<()> {
let device_key = format!("{}:{}", device.device_type(), device.id());
self.register_device(device, capabilities, memory_manager)?;
self.cuda_devices.insert(device_key, scirs2_device);
Ok(())
}
#[cfg(all(feature = "metal", target_os = "macos", target_arch = "aarch64"))]
pub fn register_metal_device(
&mut self,
device: &Device,
scirs2_device: Arc<SciRs2MetalDevice>,
capabilities: ZeroCopyCapabilities,
memory_manager: Arc<dyn MemoryManager>,
) -> BackendResult<()> {
let device_key = format!("{}:{}", device.device_type(), device.id());
self.register_device(device, capabilities, memory_manager)?;
self.metal_devices.insert(device_key, scirs2_device);
Ok(())
}
pub fn get_capabilities(&self, device: &Device) -> Option<ZeroCopyCapabilities> {
let device_key = format!("{}:{}", device.device_type(), device.id());
let caps = self
.capabilities
.read()
.expect("lock should not be poisoned");
caps.get(&device_key).copied()
}
pub fn can_zero_copy(&self, source: &Device, destination: &Device) -> bool {
let source_caps = self.get_capabilities(source);
let dest_caps = self.get_capabilities(destination);
match (source_caps, dest_caps) {
(Some(src), Some(dst)) => {
if source.id() == destination.id() {
src.memory_mapping && dst.memory_mapping
} else if source.device_type() == DeviceType::Cpu {
dst.unified_memory || dst.pinned_memory
} else if destination.device_type() == DeviceType::Cpu {
src.unified_memory || src.pinned_memory
} else {
src.peer_to_peer && dst.peer_to_peer
}
}
_ => false,
}
}
pub async fn transfer(&mut self, transfer: &ZeroCopyTransfer) -> BackendResult<bool> {
let start_time = std::time::Instant::now();
if !self.can_zero_copy(&transfer.source_device, &transfer.destination_device) {
return self.fallback_transfer(transfer, start_time).await;
}
let result = match transfer.direction {
TransferDirection::HostToDevice => self.host_to_device_transfer(transfer).await,
TransferDirection::DeviceToHost => self.device_to_host_transfer(transfer).await,
TransferDirection::DeviceToDevice => self.device_to_device_transfer(transfer).await,
TransferDirection::CrossDevice => self.cross_device_transfer(transfer).await,
};
let elapsed_us = start_time.elapsed().as_micros() as u64;
let was_zero_copy = result.is_ok() && result.as_ref().unwrap_or(&false) == &true;
let was_error = result.is_err();
{
let mut stats = self.stats.write().expect("lock should not be poisoned");
stats.update_transfer(transfer.size as u64, elapsed_us, was_zero_copy, was_error);
}
result
}
async fn host_to_device_transfer(&self, transfer: &ZeroCopyTransfer) -> BackendResult<bool> {
let dest_caps = self
.get_capabilities(&transfer.destination_device)
.ok_or_else(|| {
BackendError::BackendError("Destination device not registered".to_string())
})?;
if dest_caps.unified_memory {
self.unified_memory_transfer(transfer).await
} else if dest_caps.pinned_memory {
self.pinned_memory_transfer(transfer).await
} else {
Err(BackendError::BackendError(
"No zero-copy method available for host to device transfer".to_string(),
))
}
}
async fn device_to_host_transfer(&self, transfer: &ZeroCopyTransfer) -> BackendResult<bool> {
let source_caps = self
.get_capabilities(&transfer.source_device)
.ok_or_else(|| {
BackendError::BackendError("Source device not registered".to_string())
})?;
if source_caps.unified_memory {
self.unified_memory_transfer(transfer).await
} else if source_caps.pinned_memory {
self.pinned_memory_transfer(transfer).await
} else {
Err(BackendError::BackendError(
"No zero-copy method available for device to host transfer".to_string(),
))
}
}
async fn device_to_device_transfer(&self, transfer: &ZeroCopyTransfer) -> BackendResult<bool> {
let device_caps = self
.get_capabilities(&transfer.source_device)
.ok_or_else(|| BackendError::BackendError("Device not registered".to_string()))?;
if device_caps.memory_mapping {
self.memory_mapped_transfer(transfer).await
} else {
Err(BackendError::BackendError(
"No zero-copy method available for device to device transfer".to_string(),
))
}
}
#[allow(unused_unsafe)]
async fn cross_device_transfer(&self, transfer: &ZeroCopyTransfer) -> BackendResult<bool> {
let source_caps = self
.get_capabilities(&transfer.source_device)
.ok_or_else(|| {
BackendError::BackendError("Source device not registered".to_string())
})?;
let dest_caps = self
.get_capabilities(&transfer.destination_device)
.ok_or_else(|| {
BackendError::BackendError("Destination device not registered".to_string())
})?;
if source_caps.peer_to_peer && dest_caps.peer_to_peer {
self.peer_to_peer_transfer(transfer).await
} else {
Err(BackendError::BackendError(
"No zero-copy method available for cross-device transfer".to_string(),
))
}
}
async fn unified_memory_transfer(&self, transfer: &ZeroCopyTransfer) -> BackendResult<bool> {
let device_key = format!(
"{}:{}",
transfer.destination_device.device_type(),
transfer.destination_device.id()
);
match transfer.destination_device.device_type() {
#[cfg(feature = "cuda")]
DeviceType::Cuda(_) => {
if self.cuda_devices.get(&device_key).is_some() {
Err(BackendError::BackendError(
"CUDA unified memory prefetch not yet implemented - requires scirs2_cuda"
.to_string(),
))
} else {
Err(BackendError::BackendError(
"CUDA device not registered for unified memory".to_string(),
))
}
}
#[cfg(all(feature = "metal", target_os = "macos", target_arch = "aarch64"))]
DeviceType::Metal(_) => {
if let Some(metal_device) = self.metal_devices.get(&device_key) {
#[allow(unused_unsafe)]
unsafe {
scirs2_metal::memory::set_cpu_cache_mode(
metal_device,
transfer.source_ptr,
scirs2_metal::memory::CpuCacheMode::WriteCombined,
)
.map_err(|e| {
BackendError::BackendError(format!("Metal cache mode failed: {}", e))
})?;
}
Ok(true)
} else {
Err(BackendError::BackendError(
"Metal device not registered for unified memory".to_string(),
))
}
}
_ => {
if let Some(memory_manager) = self.memory_managers.get(&device_key) {
let _ = memory_manager.set_memory_advice(
transfer.source_ptr,
transfer.size,
crate::memory::MemoryAdvice::SetPreferredLocation,
);
let _ = memory_manager.prefetch_to_device(transfer.source_ptr, transfer.size);
Ok(true)
} else {
Err(BackendError::BackendError(
"Memory manager not found for device".to_string(),
))
}
}
}
}
async fn pinned_memory_transfer(&self, transfer: &ZeroCopyTransfer) -> BackendResult<bool> {
#[allow(unused_variables)]
let device_key = format!(
"{}:{}",
transfer.destination_device.device_type(),
transfer.destination_device.id()
);
match transfer.destination_device.device_type() {
#[cfg(feature = "cuda")]
DeviceType::Cuda(_) => {
if let Some(cuda_device) = self.cuda_devices.get(&device_key) {
match transfer.mode {
TransferMode::Asynchronous => {
self.launch_cuda_async_transfer(cuda_device, transfer).await
}
TransferMode::Streaming => {
self.launch_cuda_streaming_transfer(cuda_device, transfer)
.await
}
_ => self.launch_cuda_sync_transfer(cuda_device, transfer).await,
}
} else {
Err(BackendError::BackendError(
"CUDA device not registered for pinned memory".to_string(),
))
}
}
#[cfg(all(feature = "metal", target_os = "macos", target_arch = "aarch64"))]
DeviceType::Metal(_) => {
if let Some(metal_device) = self.metal_devices.get(&device_key) {
match transfer.mode {
TransferMode::Asynchronous => {
self.launch_metal_async_transfer(metal_device, transfer)
.await
}
TransferMode::Streaming => {
self.launch_metal_streaming_transfer(metal_device, transfer)
.await
}
_ => {
self.launch_metal_sync_transfer(metal_device, transfer)
.await
}
}
} else {
Err(BackendError::BackendError(
"Metal device not registered for pinned memory".to_string(),
))
}
}
_ => {
match transfer.mode {
TransferMode::Asynchronous => self.launch_async_dma(transfer).await,
TransferMode::Streaming => self.launch_streaming_transfer(transfer).await,
_ => self.launch_sync_dma(transfer).await,
}
}
}
}
async fn memory_mapped_transfer(&self, transfer: &ZeroCopyTransfer) -> BackendResult<bool> {
if transfer.source_ptr.is_null() || transfer.destination_ptr.is_null() {
return Err(BackendError::InvalidArgument(
"Null pointer in memory mapped transfer".to_string(),
));
}
self.launch_device_copy(transfer).await
}
async fn peer_to_peer_transfer(&self, transfer: &ZeroCopyTransfer) -> BackendResult<bool> {
let source_caps = self
.get_capabilities(&transfer.source_device)
.expect("source device capabilities should exist");
let dest_caps = self
.get_capabilities(&transfer.destination_device)
.expect("destination device capabilities should exist");
if !source_caps.peer_to_peer || !dest_caps.peer_to_peer {
return Err(BackendError::BackendError(
"Peer-to-peer not supported on one or both devices".to_string(),
));
}
self.launch_p2p_transfer(transfer).await
}
async fn fallback_transfer(
&mut self,
transfer: &ZeroCopyTransfer,
start_time: std::time::Instant,
) -> BackendResult<bool> {
if transfer.source_ptr.is_null() || transfer.destination_ptr.is_null() {
return Err(BackendError::InvalidArgument(
"Null pointer in fallback transfer".to_string(),
));
}
unsafe {
std::ptr::copy_nonoverlapping(
transfer.source_ptr,
transfer.destination_ptr,
transfer.size,
);
}
let elapsed_us = start_time.elapsed().as_micros() as u64;
{
let mut stats = self.stats.write().expect("lock should not be poisoned");
stats.update_transfer(transfer.size as u64, elapsed_us, false, false);
}
Ok(false) }
async fn launch_async_dma(&self, transfer: &ZeroCopyTransfer) -> BackendResult<bool> {
#[cfg(feature = "async")]
tokio::task::yield_now().await;
if !transfer.source_ptr.is_null() && !transfer.destination_ptr.is_null() {
unsafe {
std::ptr::copy_nonoverlapping(
transfer.source_ptr,
transfer.destination_ptr,
transfer.size,
);
}
}
Ok(true)
}
async fn launch_streaming_transfer(&self, transfer: &ZeroCopyTransfer) -> BackendResult<bool> {
const CHUNK_SIZE: usize = 64 * 1024 * 1024;
let num_chunks = (transfer.size + CHUNK_SIZE - 1) / CHUNK_SIZE;
for chunk in 0..num_chunks {
let chunk_offset = chunk * CHUNK_SIZE;
let chunk_size = std::cmp::min(CHUNK_SIZE, transfer.size - chunk_offset);
if chunk_size == 0 {
break;
}
let chunk_transfer = ZeroCopyTransfer {
source_ptr: unsafe { transfer.source_ptr.add(chunk_offset) },
destination_ptr: unsafe { transfer.destination_ptr.add(chunk_offset) },
size: chunk_size,
mode: TransferMode::Asynchronous,
..transfer.clone()
};
self.launch_async_dma(&chunk_transfer).await?;
#[cfg(feature = "async")]
tokio::task::yield_now().await;
}
Ok(true)
}
async fn launch_sync_dma(&self, transfer: &ZeroCopyTransfer) -> BackendResult<bool> {
if !transfer.source_ptr.is_null() && !transfer.destination_ptr.is_null() {
unsafe {
std::ptr::copy_nonoverlapping(
transfer.source_ptr,
transfer.destination_ptr,
transfer.size,
);
}
Ok(true)
} else {
Err(BackendError::InvalidArgument(
"Null pointer in sync DMA transfer".to_string(),
))
}
}
async fn launch_device_copy(&self, transfer: &ZeroCopyTransfer) -> BackendResult<bool> {
if !transfer.source_ptr.is_null() && !transfer.destination_ptr.is_null() {
unsafe {
std::ptr::copy_nonoverlapping(
transfer.source_ptr,
transfer.destination_ptr,
transfer.size,
);
}
Ok(true)
} else {
Err(BackendError::InvalidArgument(
"Null pointer in device copy".to_string(),
))
}
}
#[allow(unused_unsafe)]
async fn launch_p2p_transfer(&self, transfer: &ZeroCopyTransfer) -> BackendResult<bool> {
#[allow(unused_variables)]
let source_key = format!(
"{}:{}",
transfer.source_device.device_type(),
transfer.source_device.id()
);
#[allow(unused_variables)]
let dest_key = format!(
"{}:{}",
transfer.destination_device.device_type(),
transfer.destination_device.id()
);
match (
transfer.source_device.device_type(),
transfer.destination_device.device_type(),
) {
#[cfg(feature = "cuda")]
(DeviceType::Cuda(_), DeviceType::Cuda(_)) => {
if self.cuda_devices.get(&source_key).is_some()
&& self.cuda_devices.get(&dest_key).is_some()
{
Err(BackendError::BackendError(
"CUDA P2P transfer not yet implemented - requires scirs2_cuda".to_string(),
))
} else {
Err(BackendError::BackendError(
"CUDA devices not registered for P2P transfer".to_string(),
))
}
}
_ => {
if !transfer.source_ptr.is_null() && !transfer.destination_ptr.is_null() {
unsafe {
std::ptr::copy_nonoverlapping(
transfer.source_ptr,
transfer.destination_ptr,
transfer.size,
);
}
Ok(true)
} else {
Err(BackendError::InvalidArgument(
"Null pointer in P2P transfer".to_string(),
))
}
}
}
}
#[cfg(feature = "cuda")]
#[allow(unused_unsafe)]
async fn launch_cuda_async_transfer(
&self,
_cuda_device: &SciRs2CudaDevice,
_transfer: &ZeroCopyTransfer,
) -> BackendResult<bool> {
Err(BackendError::BackendError(
"CUDA async transfer not yet implemented - requires scirs2_cuda".to_string(),
))
}
#[cfg(feature = "cuda")]
async fn launch_cuda_sync_transfer(
&self,
_cuda_device: &SciRs2CudaDevice,
_transfer: &ZeroCopyTransfer,
) -> BackendResult<bool> {
Err(BackendError::BackendError(
"CUDA sync transfer not yet implemented - requires scirs2_cuda".to_string(),
))
}
#[cfg(feature = "cuda")]
async fn launch_cuda_streaming_transfer(
&self,
cuda_device: &SciRs2CudaDevice,
transfer: &ZeroCopyTransfer,
) -> BackendResult<bool> {
const CHUNK_SIZE: usize = 64 * 1024 * 1024; let num_chunks = (transfer.size + CHUNK_SIZE - 1) / CHUNK_SIZE;
for chunk in 0..num_chunks {
let chunk_offset = chunk * CHUNK_SIZE;
let chunk_size = std::cmp::min(CHUNK_SIZE, transfer.size - chunk_offset);
if chunk_size == 0 {
break;
}
let chunk_transfer = ZeroCopyTransfer {
source_ptr: unsafe { transfer.source_ptr.add(chunk_offset) },
destination_ptr: unsafe { transfer.destination_ptr.add(chunk_offset) },
size: chunk_size,
..transfer.clone()
};
self.launch_cuda_async_transfer(cuda_device, &chunk_transfer)
.await?;
#[cfg(feature = "async")]
tokio::task::yield_now().await;
}
scirs2_cuda::synchronize(cuda_device).map_err(|e| {
BackendError::BackendError(format!("CUDA streaming sync failed: {}", e))
})?;
Ok(true)
}
#[cfg(all(feature = "metal", target_os = "macos", target_arch = "aarch64"))]
async fn launch_metal_async_transfer(
&self,
metal_device: &SciRs2MetalDevice,
transfer: &ZeroCopyTransfer,
) -> BackendResult<bool> {
#[allow(unused_unsafe)]
unsafe {
match transfer.direction {
TransferDirection::HostToDevice => {
scirs2_metal::memory::copy_host_to_device_async(
metal_device,
transfer.source_ptr,
transfer.destination_ptr,
transfer.size,
)
.await
.map_err(|e| {
BackendError::BackendError(format!(
"Metal H2D async transfer failed: {}",
e
))
})?;
}
TransferDirection::DeviceToHost => {
scirs2_metal::memory::copy_device_to_host_async(
metal_device,
transfer.source_ptr,
transfer.destination_ptr,
transfer.size,
)
.await
.map_err(|e| {
BackendError::BackendError(format!(
"Metal D2H async transfer failed: {}",
e
))
})?;
}
_ => {
return Err(BackendError::InvalidArgument(
"Invalid transfer direction for Metal async transfer".to_string(),
));
}
}
}
Ok(true)
}
#[cfg(all(feature = "metal", target_os = "macos", target_arch = "aarch64"))]
async fn launch_metal_sync_transfer(
&self,
metal_device: &SciRs2MetalDevice,
transfer: &ZeroCopyTransfer,
) -> BackendResult<bool> {
#[allow(unused_unsafe)]
unsafe {
match transfer.direction {
TransferDirection::HostToDevice => {
scirs2_metal::memory::copy_host_to_device(
metal_device,
transfer.source_ptr,
transfer.destination_ptr,
transfer.size,
)
.map_err(|e| {
BackendError::BackendError(format!("Metal H2D sync transfer failed: {}", e))
})?;
}
TransferDirection::DeviceToHost => {
scirs2_metal::memory::copy_device_to_host(
metal_device,
transfer.source_ptr,
transfer.destination_ptr,
transfer.size,
)
.map_err(|e| {
BackendError::BackendError(format!("Metal D2H sync transfer failed: {}", e))
})?;
}
_ => {
return Err(BackendError::InvalidArgument(
"Invalid transfer direction for Metal sync transfer".to_string(),
));
}
}
}
Ok(true)
}
#[cfg(all(feature = "metal", target_os = "macos", target_arch = "aarch64"))]
async fn launch_metal_streaming_transfer(
&self,
metal_device: &SciRs2MetalDevice,
transfer: &ZeroCopyTransfer,
) -> BackendResult<bool> {
const CHUNK_SIZE: usize = 32 * 1024 * 1024; let num_chunks = (transfer.size + CHUNK_SIZE - 1) / CHUNK_SIZE;
for chunk in 0..num_chunks {
let chunk_offset = chunk * CHUNK_SIZE;
let chunk_size = std::cmp::min(CHUNK_SIZE, transfer.size - chunk_offset);
if chunk_size == 0 {
break;
}
let chunk_transfer = ZeroCopyTransfer {
source_ptr: unsafe { transfer.source_ptr.add(chunk_offset) },
destination_ptr: unsafe { transfer.destination_ptr.add(chunk_offset) },
size: chunk_size,
..transfer.clone()
};
self.launch_metal_async_transfer(metal_device, &chunk_transfer)
.await?;
#[cfg(feature = "async")]
tokio::task::yield_now().await;
}
scirs2_metal::synchronize(metal_device).map_err(|e| {
BackendError::BackendError(format!("Metal streaming sync failed: {}", e))
})?;
Ok(true)
}
pub fn get_stats(&self) -> ZeroCopyStats {
self.stats
.read()
.expect("lock should not be poisoned")
.clone()
}
pub fn reset_stats(&self) {
let mut stats = self.stats.write().expect("lock should not be poisoned");
*stats = ZeroCopyStats::default();
}
pub fn get_optimal_transfer_mode(&self, transfer: &ZeroCopyTransfer) -> TransferMode {
let source_caps = self.get_capabilities(&transfer.source_device);
let dest_caps = self.get_capabilities(&transfer.destination_device);
match (source_caps, dest_caps) {
(Some(src), Some(dst)) => {
if transfer.size > 100 * 1024 * 1024
&& src.streaming_transfers
&& dst.streaming_transfers
{
TransferMode::Streaming
} else if src.async_transfers && dst.async_transfers {
TransferMode::Asynchronous
} else if transfer.direction == TransferDirection::CrossDevice
&& src.peer_to_peer
&& dst.peer_to_peer
{
TransferMode::PeerToPeer
} else {
TransferMode::Synchronous
}
}
_ => TransferMode::Synchronous,
}
}
pub fn optimize_transfer(&self, mut transfer: ZeroCopyTransfer) -> ZeroCopyTransfer {
transfer.mode = self.get_optimal_transfer_mode(&transfer);
if transfer.alignment < 256 && transfer.size > 1024 * 1024 {
transfer.alignment = 256; }
transfer.priority = if transfer.size > 100 * 1024 * 1024 {
0 } else {
1 };
transfer
}
}
impl Default for ZeroCopyManager {
fn default() -> Self {
Self::new()
}
}
pub mod utils {
use super::*;
pub fn detect_capabilities(device_type: DeviceType) -> ZeroCopyCapabilities {
match device_type {
DeviceType::Cuda(_) => ZeroCopyCapabilities {
unified_memory: true,
peer_to_peer: true,
memory_mapping: true,
direct_gpu_access: true,
pinned_memory: true,
memory_advice: true,
async_transfers: true,
streaming_transfers: true,
},
DeviceType::Metal(_) => ZeroCopyCapabilities {
unified_memory: true,
peer_to_peer: false, memory_mapping: true,
direct_gpu_access: true,
pinned_memory: true,
memory_advice: true,
async_transfers: true,
streaming_transfers: true,
},
DeviceType::Wgpu(_) => ZeroCopyCapabilities {
unified_memory: false,
peer_to_peer: false,
memory_mapping: true,
direct_gpu_access: false,
pinned_memory: false,
memory_advice: false,
async_transfers: true,
streaming_transfers: false,
},
DeviceType::Cpu => ZeroCopyCapabilities {
unified_memory: true,
peer_to_peer: false,
memory_mapping: true,
direct_gpu_access: false,
pinned_memory: true,
memory_advice: false,
async_transfers: false,
streaming_transfers: false,
},
}
}
pub fn check_alignment(ptr: *const u8, alignment: usize) -> bool {
if alignment == 0 || (alignment & (alignment - 1)) != 0 {
return false; }
(ptr as usize).is_multiple_of(alignment)
}
pub fn optimal_chunk_size(total_size: usize, device_type: DeviceType) -> usize {
let base_chunk_size = match device_type {
DeviceType::Cuda(_) => 64 * 1024 * 1024, DeviceType::Metal(_) => 32 * 1024 * 1024, DeviceType::Wgpu(_) => 16 * 1024 * 1024, DeviceType::Cpu => 128 * 1024 * 1024, };
if total_size < base_chunk_size {
total_size
} else {
std::cmp::min(base_chunk_size, total_size / 8) }
}
pub fn estimate_efficiency(
transfer: &ZeroCopyTransfer,
capabilities: &ZeroCopyCapabilities,
) -> f32 {
if !transfer.is_zero_copy_possible(capabilities) {
return 0.0; }
let mut efficiency: f32 = 1.0;
if transfer.alignment < 256 {
efficiency *= 0.8;
}
if transfer.size < 4096 {
efficiency *= 0.5;
}
match transfer.mode {
TransferMode::Streaming if transfer.size > 100 * 1024 * 1024 => efficiency *= 1.2,
TransferMode::PeerToPeer if transfer.direction == TransferDirection::CrossDevice => {
efficiency *= 1.3
}
TransferMode::Asynchronous => efficiency *= 1.1,
_ => {}
}
efficiency.min(1.0)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::device::{Device, DeviceInfo};
use std::ptr::null_mut;
fn create_test_device(device_type: DeviceType, id: usize) -> Device {
let info = DeviceInfo::default();
Device::new(
id,
device_type,
format!("Test {:?} {}", device_type, id),
info,
)
}
#[test]
fn test_zero_copy_capabilities_default() {
let caps = ZeroCopyCapabilities::default();
assert!(!caps.has_any_capabilities());
assert_eq!(caps.capability_score(), 0.0);
assert_eq!(caps.recommended_transfer_mode(), TransferMode::Synchronous);
}
#[test]
fn test_zero_copy_capabilities_scoring() {
let mut caps = ZeroCopyCapabilities::default();
caps.unified_memory = true;
caps.async_transfers = true;
assert!(caps.has_any_capabilities());
assert_eq!(caps.capability_score(), 0.25); assert_eq!(caps.recommended_transfer_mode(), TransferMode::Asynchronous);
}
#[test]
fn test_zero_copy_transfer_creation() {
let cpu_device = create_test_device(DeviceType::Cpu, 0);
let gpu_device = create_test_device(DeviceType::Cuda(1), 1);
let transfer = ZeroCopyTransfer::new(cpu_device, gpu_device, null_mut(), null_mut(), 1024);
assert_eq!(transfer.direction, TransferDirection::HostToDevice);
assert_eq!(transfer.mode, TransferMode::Synchronous);
assert_eq!(transfer.size, 1024);
assert_eq!(transfer.alignment, 1);
assert_eq!(transfer.priority, 1);
assert!(transfer.stream_id.is_none());
}
#[test]
fn test_zero_copy_transfer_direction_detection() {
let cpu_device = create_test_device(DeviceType::Cpu, 0);
let gpu_device1 = create_test_device(DeviceType::Cuda(1), 1);
let gpu_device2 = create_test_device(DeviceType::Cuda(2), 2);
let transfer = ZeroCopyTransfer::new(
cpu_device.clone(),
gpu_device1.clone(),
null_mut(),
null_mut(),
1024,
);
assert_eq!(transfer.direction, TransferDirection::HostToDevice);
let transfer = ZeroCopyTransfer::new(
gpu_device1.clone(),
cpu_device,
null_mut(),
null_mut(),
1024,
);
assert_eq!(transfer.direction, TransferDirection::DeviceToHost);
let transfer = ZeroCopyTransfer::new(
gpu_device1.clone(),
gpu_device1.clone(),
null_mut(),
null_mut(),
1024,
);
assert_eq!(transfer.direction, TransferDirection::DeviceToDevice);
let transfer = ZeroCopyTransfer::new(
gpu_device1.clone(),
gpu_device2,
null_mut(),
null_mut(),
1024,
);
assert_eq!(transfer.direction, TransferDirection::CrossDevice);
}
#[test]
fn test_zero_copy_transfer_builder() {
let cpu_device = create_test_device(DeviceType::Cpu, 0);
let gpu_device = create_test_device(DeviceType::Cuda(1), 1);
let transfer = ZeroCopyTransfer::new(cpu_device, gpu_device, null_mut(), null_mut(), 1024)
.with_mode(TransferMode::Asynchronous)
.with_alignment(256)
.with_priority(0)
.with_stream(42);
assert_eq!(transfer.mode, TransferMode::Asynchronous);
assert_eq!(transfer.alignment, 256);
assert_eq!(transfer.priority, 0);
assert_eq!(transfer.stream_id, Some(42));
}
#[test]
fn test_zero_copy_transfer_zero_copy_possible() {
let cpu_device = create_test_device(DeviceType::Cpu, 0);
let gpu_device = create_test_device(DeviceType::Cuda(1), 1);
let transfer = ZeroCopyTransfer::new(cpu_device, gpu_device, null_mut(), null_mut(), 1024);
let caps_unified = ZeroCopyCapabilities {
unified_memory: true,
..Default::default()
};
let caps_pinned = ZeroCopyCapabilities {
pinned_memory: true,
..Default::default()
};
let caps_none = ZeroCopyCapabilities::default();
assert!(transfer.is_zero_copy_possible(&caps_unified));
assert!(transfer.is_zero_copy_possible(&caps_pinned));
assert!(!transfer.is_zero_copy_possible(&caps_none));
}
#[test]
fn test_zero_copy_transfer_bandwidth_estimation() {
let cpu_device = create_test_device(DeviceType::Cpu, 0);
let gpu_device = create_test_device(DeviceType::Cuda(1), 1);
let transfer = ZeroCopyTransfer::new(cpu_device, gpu_device, null_mut(), null_mut(), 1024)
.with_alignment(256);
let bandwidth = transfer.estimate_bandwidth(DeviceType::Cuda(1));
assert_eq!(bandwidth, 25_000_000_000);
let transfer_unaligned = ZeroCopyTransfer::new(
create_test_device(DeviceType::Cpu, 0),
create_test_device(DeviceType::Cuda(1), 1),
null_mut(),
null_mut(),
1024,
)
.with_alignment(1);
let bandwidth_unaligned = transfer_unaligned.estimate_bandwidth(DeviceType::Cuda(1));
assert_eq!(bandwidth_unaligned, 12_000_000_000); }
#[test]
fn test_zero_copy_stats() {
let mut stats = ZeroCopyStats::default();
assert_eq!(stats.zero_copy_success_rate(), 0.0);
assert_eq!(stats.error_rate(), 0.0);
stats.update_transfer(1024, 100, true, false);
assert_eq!(stats.total_transfers, 1);
assert_eq!(stats.zero_copy_transfers, 1);
assert_eq!(stats.zero_copy_success_rate(), 1.0);
stats.update_transfer(512, 200, false, false);
assert_eq!(stats.total_transfers, 2);
assert_eq!(stats.fallback_transfers, 1);
assert_eq!(stats.zero_copy_success_rate(), 0.5);
stats.update_transfer(256, 50, false, true);
assert_eq!(stats.total_transfers, 3);
assert_eq!(stats.error_count, 1);
assert!((stats.error_rate() - (1.0 / 3.0)).abs() < 0.001);
}
#[test]
fn test_zero_copy_manager_creation() {
let manager = ZeroCopyManager::new();
assert!(manager
.capabilities
.read()
.expect("lock should not be poisoned")
.is_empty());
let stats = manager.get_stats();
assert_eq!(stats.total_transfers, 0);
}
#[test]
fn test_utils_detect_capabilities() {
let cuda_caps = utils::detect_capabilities(DeviceType::Cuda(0));
assert!(cuda_caps.unified_memory);
assert!(cuda_caps.peer_to_peer);
assert!(cuda_caps.streaming_transfers);
let webgpu_caps = utils::detect_capabilities(DeviceType::Wgpu(0));
assert!(!webgpu_caps.unified_memory);
assert!(!webgpu_caps.peer_to_peer);
assert!(!webgpu_caps.streaming_transfers);
}
#[test]
fn test_utils_check_alignment() {
let ptr = 0x1000 as *const u8;
assert!(utils::check_alignment(ptr, 16));
assert!(utils::check_alignment(ptr, 256));
assert!(utils::check_alignment(ptr, 4096));
assert!(!utils::check_alignment(ptr, 8192));
assert!(!utils::check_alignment(ptr, 0));
assert!(!utils::check_alignment(ptr, 3)); }
#[test]
fn test_utils_optimal_chunk_size() {
let cuda_chunk = utils::optimal_chunk_size(1024 * 1024 * 1024, DeviceType::Cuda(0));
assert_eq!(cuda_chunk, 64 * 1024 * 1024);
let small_chunk = utils::optimal_chunk_size(1024, DeviceType::Cuda(0));
assert_eq!(small_chunk, 1024); }
#[test]
fn test_utils_estimate_efficiency() {
let cpu_device = create_test_device(DeviceType::Cpu, 0);
let gpu_device = create_test_device(DeviceType::Cuda(1), 1);
let transfer =
ZeroCopyTransfer::new(cpu_device, gpu_device, null_mut(), null_mut(), 1024 * 1024)
.with_alignment(256)
.with_mode(TransferMode::Asynchronous);
let caps = ZeroCopyCapabilities {
unified_memory: true,
async_transfers: true,
..Default::default()
};
let efficiency = utils::estimate_efficiency(&transfer, &caps);
assert!(efficiency > 0.0); assert!(efficiency <= 1.0); }
}