use crate::error::{CoreError, CoreResult, ErrorContext, ErrorLocation};
#[cfg(feature = "gpu")]
use crate::gpu::{GpuBackend, GpuBuffer, GpuContext, GpuDataType};
use ::ndarray::{Array, ArrayBase, Dimension, IxDyn, RawData};
use std::any::TypeId;
use std::collections::HashMap;
use std::hash::{Hash, Hasher};
use std::marker::PhantomData;
use std::sync::{Arc, Mutex};
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum DeviceType {
Cpu,
Gpu(GpuBackend),
Tpu,
}
impl DeviceType {
pub fn is_available(&self) -> bool {
match self {
DeviceType::Cpu => true,
DeviceType::Gpu(backend) => backend.is_available(),
DeviceType::Tpu => false, }
}
pub fn name(&self) -> String {
match self {
DeviceType::Cpu => "CPU".to_string(),
DeviceType::Gpu(backend) => format!("GPU ({backend})"),
DeviceType::Tpu => "TPU".to_string(),
}
}
}
impl std::fmt::Display for DeviceType {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
match self {
DeviceType::Cpu => write!(f, "CPU"),
DeviceType::Gpu(backend) => write!(f, "GPU ({backend})"),
DeviceType::Tpu => write!(f, "TPU"),
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum TransferDirection {
HostToDevice,
DeviceToHost,
DeviceToDevice,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum TransferMode {
Synchronous,
Asynchronous,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum MemoryLayout {
RowMajor,
ColumnMajor,
Strided,
}
#[derive(Debug, Clone)]
pub struct TransferOptions {
pub mode: TransferMode,
pub layout: MemoryLayout,
pub use_pinned_memory: bool,
pub enable_streaming: bool,
pub stream_id: Option<usize>,
}
impl Default for TransferOptions {
fn default() -> Self {
Self {
mode: TransferMode::Synchronous,
layout: MemoryLayout::RowMajor,
use_pinned_memory: true,
enable_streaming: true,
stream_id: None,
}
}
}
#[derive(Debug, Clone)]
pub struct TransferOptionsBuilder {
options: TransferOptions,
}
impl TransferOptionsBuilder {
pub fn new() -> Self {
Self {
options: TransferOptions::default(),
}
}
pub const fn mode(mut self, mode: TransferMode) -> Self {
self.options.mode = mode;
self
}
pub const fn layout(mut self, layout: MemoryLayout) -> Self {
self.options.layout = layout;
self
}
pub const fn memory(mut self, use_pinnedmemory: bool) -> Self {
self.options.use_pinned_memory = use_pinnedmemory;
self
}
pub const fn streaming(mut self, enablestreaming: bool) -> Self {
self.options.enable_streaming = enablestreaming;
self
}
pub const fn with_stream_id(mut self, streamid: Option<usize>) -> Self {
self.options.stream_id = streamid;
self
}
pub fn build(self) -> TransferOptions {
self.options
}
}
impl Default for TransferOptionsBuilder {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
struct CacheKey {
data_id: usize,
device: DeviceType,
type_id: TypeId,
size: usize,
}
impl Hash for CacheKey {
fn hash<H: Hasher>(&self, state: &mut H) {
self.data_id.hash(state);
self.device.hash(state);
std::any::TypeId::of::<i32>().hash(state);
self.size.hash(state);
}
}
#[derive(Debug)]
pub struct TransferEvent {
#[allow(dead_code)]
device: DeviceType,
#[allow(dead_code)]
handle: Arc<Mutex<Box<dyn std::any::Any + Send + Sync>>>,
completed: Arc<std::sync::atomic::AtomicBool>,
}
impl TransferEvent {
#[allow(dead_code)]
fn device(devicetype: DeviceType, handle: Box<dyn std::any::Any + Send + Sync>) -> Self {
Self {
device: devicetype,
handle: Arc::new(Mutex::new(handle)),
completed: Arc::new(std::sync::atomic::AtomicBool::new(false)),
}
}
pub fn wait(&self) {
self.completed
.store(true, std::sync::atomic::Ordering::SeqCst);
}
pub fn is_complete(&self) -> bool {
self.completed.load(std::sync::atomic::Ordering::SeqCst)
}
}
struct CacheEntry<T: GpuDataType> {
buffer: DeviceBuffer<T>,
size: usize,
last_access: std::time::Instant,
#[allow(dead_code)]
dirty: bool,
}
pub struct DeviceMemoryManager {
gpu_context: Option<GpuContext>,
cache: Mutex<HashMap<CacheKey, Box<dyn std::any::Any + Send + Sync>>>,
max_cache_size: usize,
current_cache_size: std::sync::atomic::AtomicUsize,
enable_caching: bool,
}
impl std::fmt::Debug for DeviceMemoryManager {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("DeviceMemoryManager")
.field("gpu_context", &"<gpu_context>")
.field("cache", &"<cache>")
.field("max_cache_size", &self.max_cache_size)
.field(
"current_cache_size",
&self
.current_cache_size
.load(std::sync::atomic::Ordering::Relaxed),
)
.field("enable_caching", &self.enable_caching)
.finish()
}
}
impl DeviceMemoryManager {
pub fn new(max_cachesize: usize) -> Result<Self, CoreError> {
let gpu_context = match GpuBackend::preferred() {
backend if backend.is_available() => GpuContext::new(backend).ok(),
_ => None,
};
Ok(Self {
gpu_context,
cache: Mutex::new(HashMap::new()),
max_cache_size: max_cachesize,
current_cache_size: std::sync::atomic::AtomicUsize::new(0),
enable_caching: true,
})
}
pub fn is_device_available(&self, device: DeviceType) -> bool {
match device {
DeviceType::Cpu => true,
DeviceType::Gpu(_) => self.gpu_context.is_some(),
DeviceType::Tpu => false, }
}
pub fn available_devices(&self) -> Vec<DeviceType> {
let mut devices = vec![DeviceType::Cpu];
if let Some(ref context) = self.gpu_context {
devices.push(DeviceType::Gpu(context.backend()));
}
devices
}
pub fn transfer_to_device<T, S, D>(
&self,
array: &ArrayBase<S, D>,
device: DeviceType,
options: Option<TransferOptions>,
) -> CoreResult<DeviceArray<T, D>>
where
T: GpuDataType,
S: RawData<Elem = T> + crate::ndarray::Data,
D: Dimension,
{
let options = options.unwrap_or_default();
if !self.is_device_available(device) {
return Err(CoreError::DeviceError(
ErrorContext::new(format!("Device {device} is not available"))
.with_location(ErrorLocation::new(file!(), line!())),
));
}
if device == DeviceType::Cpu {
return Ok(DeviceArray::new_cpu(array.to_owned()));
}
if let DeviceType::Gpu(backend) = device {
if let Some(ref context) = self.gpu_context {
if context.backend() != backend {
return Err(CoreError::DeviceError(
ErrorContext::new(format!(
"GPU backend mismatch: requested {}, available {}",
backend,
context.backend()
))
.with_location(ErrorLocation::new(file!(), line!())),
));
}
let flat_data = array.as_slice().ok_or_else(|| {
CoreError::DeviceError(
ErrorContext::new("Array is not contiguous".to_string())
.with_location(ErrorLocation::new(file!(), line!())),
)
})?;
let data_id = flat_data.as_ptr() as usize;
let key = CacheKey {
data_id,
device,
type_id: TypeId::of::<T>(),
size: flat_data.len(),
};
let buffer = if self.enable_caching {
let mut cache = self.cache.lock().expect("Operation failed");
if let Some(entry) = cache.get_mut(&key) {
if let Some(entry) = entry.downcast_mut::<CacheEntry<T>>() {
entry.last_access = std::time::Instant::now();
entry.buffer.clone()
} else {
return Err(CoreError::DeviceError(
ErrorContext::new("Cache entry type mismatch".to_string())
.with_location(ErrorLocation::new(file!(), line!())),
));
}
} else {
let gpubuffer = context.create_buffer_from_slice(flat_data);
let buffer = DeviceBuffer::new_gpu(gpubuffer);
let entry = CacheEntry {
buffer: buffer.clone(),
size: flat_data.len(),
last_access: std::time::Instant::now(),
dirty: false,
};
let buffersize = std::mem::size_of_val(flat_data);
self.current_cache_size
.fetch_add(buffersize, std::sync::atomic::Ordering::SeqCst);
self.evict_cache_entries_if_needed();
cache.insert(key, Box::new(entry));
buffer
}
} else {
let gpubuffer = context.create_buffer_from_slice(flat_data);
DeviceBuffer::new_gpu(gpubuffer)
};
return Ok(DeviceArray {
buffer,
shape: array.raw_dim(),
device: DeviceType::Gpu(crate::gpu::GpuBackend::preferred()),
phantom: PhantomData,
});
}
}
Err(CoreError::DeviceError(
ErrorContext::new(format!("{device}"))
.with_location(ErrorLocation::new(file!(), line!())),
))
}
pub fn transfer_to_host<T, D>(
&self,
devicearray: &DeviceArray<T, D>,
options: Option<TransferOptions>,
) -> CoreResult<Array<T, D>>
where
T: GpuDataType,
D: Dimension,
{
let options = options.unwrap_or_default();
if devicearray.device == DeviceType::Cpu {
if let Some(cpuarray) = devicearray.buffer.get_cpuarray() {
let reshaped = cpuarray
.clone()
.to_shape(devicearray.shape.clone())
.map_err(|e| CoreError::ShapeError(ErrorContext::new(e.to_string())))?
.to_owned();
return Ok(reshaped);
}
}
if let DeviceType::Gpu(_) = devicearray.device {
if let Some(gpubuffer) = devicearray.buffer.get_gpubuffer() {
let size = devicearray.size();
let mut data = vec![unsafe { std::mem::zeroed() }; size];
let _ = gpubuffer.copy_to_host(&mut data);
return Array::from_shape_vec(devicearray.shape.clone(), data).map_err(|e| {
CoreError::DeviceError(
ErrorContext::new(format!("{e}"))
.with_location(ErrorLocation::new(file!(), line!())),
)
});
}
}
Err(CoreError::DeviceError(
ErrorContext::new(format!(
"Unsupported device type for transfer to host: {}",
devicearray.device
))
.with_location(ErrorLocation::new(file!(), line!())),
))
}
pub fn transfer_between_devices<T, D>(
&self,
devicearray: &DeviceArray<T, D>,
target_device: DeviceType,
options: Option<TransferOptions>,
) -> CoreResult<DeviceArray<T, D>>
where
T: GpuDataType,
D: Dimension,
{
let options = options.unwrap_or_default();
if devicearray.device == target_device {
return Ok(devicearray.clone());
}
if target_device == DeviceType::Cpu {
let hostarray = self.transfer_to_host(devicearray, Some(options))?;
return Ok(DeviceArray::new_cpu(hostarray));
}
if devicearray.device == DeviceType::Cpu {
if let Some(cpuarray) = devicearray.buffer.get_cpuarray() {
let cpu_clone = cpuarray.clone();
let reshaped = cpu_clone
.to_shape(devicearray.shape.clone())
.map_err(|e| CoreError::ShapeError(ErrorContext::new(e.to_string())))?;
return self.transfer_to_device(&reshaped.to_owned(), target_device, Some(options));
}
}
let hostarray = self.transfer_to_host(devicearray, Some(options.clone()))?;
self.transfer_to_device(&hostarray, target_device, Some(options))
}
fn evict_cache_entries_if_needed(&self) {
let current_size = self
.current_cache_size
.load(std::sync::atomic::Ordering::SeqCst);
if current_size <= self.max_cache_size {
return;
}
let mut cache = self.cache.lock().expect("Operation failed");
let mut key_times: Vec<_> = cache
.iter()
.map(|(key, value)| {
let access_time = match value.downcast_ref::<CacheEntry<f32>>() {
Some(entry) => entry.last_access,
None => match value.downcast_ref::<CacheEntry<f64>>() {
Some(entry) => entry.last_access,
None => match value.downcast_ref::<CacheEntry<i32>>() {
Some(entry) => entry.last_access,
None => match value.downcast_ref::<CacheEntry<u32>>() {
Some(entry) => entry.last_access,
None => std::time::Instant::now(), },
},
},
};
(key.clone(), access_time)
})
.collect();
key_times.sort_by_key(|a| a.1);
let mut removed_size = 0;
let target_size = current_size - self.max_cache_size / 2;
for key_ in key_times {
let entry = cache.remove(&key_.0).expect("Operation failed");
let entry_size = match entry.downcast_ref::<CacheEntry<f32>>() {
Some(entry) => entry.size * std::mem::size_of::<f32>(),
None => match entry.downcast_ref::<CacheEntry<f64>>() {
Some(entry) => entry.size * std::mem::size_of::<f64>(),
None => match entry.downcast_ref::<CacheEntry<i32>>() {
Some(entry) => entry.size * std::mem::size_of::<i32>(),
None => match entry.downcast_ref::<CacheEntry<u32>>() {
Some(entry) => entry.size * std::mem::size_of::<u32>(),
None => 0, },
},
},
};
removed_size += entry_size;
if removed_size >= target_size {
break;
}
}
self.current_cache_size
.fetch_sub(removed_size, std::sync::atomic::Ordering::SeqCst);
}
pub fn clear_cache(&self) {
let mut cache = self.cache.lock().expect("Operation failed");
cache.clear();
self.current_cache_size
.store(0, std::sync::atomic::Ordering::SeqCst);
}
pub fn execute_kernel<T, D>(
&self,
devicearray: &DeviceArray<T, D>,
kernel_name: &str,
params: HashMap<String, KernelParam>,
) -> CoreResult<()>
where
T: GpuDataType,
D: Dimension,
{
if let DeviceType::Gpu(_) = devicearray.device {
if let Some(ref context) = self.gpu_context {
let kernel = context
.get_kernel(kernel_name)
.map_err(|e| CoreError::ComputationError(ErrorContext::new(e.to_string())))?;
if let Some(gpubuffer) = devicearray.buffer.get_gpubuffer() {
kernel.set_buffer("input", gpubuffer);
}
for (name, param) in params {
match param {
KernelParam::Buffer(buffer) => {
if let Some(gpubuffer) = buffer.get_gpubuffer() {
kernel.set_buffer(&name, gpubuffer);
}
}
KernelParam::U32(value) => kernel.set_u32(&name, value),
KernelParam::I32(value) => kernel.set_i32(&name, value),
KernelParam::F32(value) => kernel.set_f32(&name, value),
KernelParam::F64(value) => kernel.set_f64(&name, value),
}
}
let total_elements = devicearray.size();
let work_group_size = 256; let num_groups = total_elements.div_ceil(work_group_size);
kernel.dispatch([num_groups as u32, 1, 1]);
return Ok(());
}
}
Err(CoreError::DeviceError(
ErrorContext::new(format!(
"Unsupported device type for kernel execution: {}",
devicearray.device
))
.with_location(ErrorLocation::new(file!(), line!())),
))
}
}
#[derive(Debug, Clone)]
pub enum KernelParam {
Buffer(DeviceBuffer<f32>), U32(u32),
I32(i32),
F32(f32),
F64(f64),
}
#[derive(Clone)]
enum BufferLocation<T: GpuDataType> {
Cpu(Arc<Array<T, IxDyn>>),
Gpu(Arc<GpuBuffer<T>>),
}
impl<T> std::fmt::Debug for BufferLocation<T>
where
T: GpuDataType + std::fmt::Debug,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
BufferLocation::Cpu(_) => write!(f, "Cpu(Array)"),
BufferLocation::Gpu(_) => write!(f, "Gpu(GpuBuffer)"),
}
}
}
#[derive(Debug, Clone)]
pub struct DeviceBuffer<T: GpuDataType> {
location: BufferLocation<T>,
}
impl<T: GpuDataType> DeviceBuffer<T> {
fn new_cpu<D: Dimension>(array: Array<T, D>) -> Self {
Self {
location: BufferLocation::Cpu(Arc::new(array.into_dyn())),
}
}
fn new_gpu(buffer: GpuBuffer<T>) -> Self {
Self {
location: BufferLocation::Gpu(Arc::new(buffer)),
}
}
fn get_cpuarray(&self) -> Option<&Array<T, IxDyn>> {
match self.location {
BufferLocation::Cpu(ref array) => Some(array),
_ => None,
}
}
fn get_gpubuffer(&self) -> Option<&GpuBuffer<T>> {
match self.location {
BufferLocation::Gpu(ref buffer) => Some(buffer),
_ => None,
}
}
fn size(&self) -> usize {
match self.location {
BufferLocation::Cpu(ref array) => array.len(),
BufferLocation::Gpu(ref buffer) => buffer.len(),
}
}
}
#[derive(Debug, Clone)]
pub struct DeviceArray<T: GpuDataType, D: Dimension> {
buffer: DeviceBuffer<T>,
shape: D,
device: DeviceType,
phantom: PhantomData<T>,
}
impl<T: GpuDataType, D: Dimension> DeviceArray<T, D> {
fn new_cpu<S: RawData<Elem = T> + crate::ndarray::Data>(array: ArrayBase<S, D>) -> Self {
Self {
buffer: DeviceBuffer::new_cpu(array.to_owned()),
shape: array.raw_dim(),
device: DeviceType::Cpu,
phantom: PhantomData,
}
}
pub fn device(&self) -> DeviceType {
self.device
}
pub const fn shape(&self) -> &D {
&self.shape
}
pub fn size(&self) -> usize {
self.buffer.size()
}
pub fn ndim(&self) -> usize {
self.shape.ndim()
}
pub fn is_on_cpu(&self) -> bool {
self.device == DeviceType::Cpu
}
pub fn is_on_gpu(&self) -> bool {
matches!(self.device, DeviceType::Gpu(_))
}
pub fn as_cpuarray(&self) -> Option<&Array<T, IxDyn>> {
self.buffer.get_cpuarray()
}
pub fn as_gpubuffer(&self) -> Option<&GpuBuffer<T>> {
self.buffer.get_gpubuffer()
}
}
pub struct DeviceStream {
#[allow(dead_code)]
device: DeviceType,
#[allow(dead_code)]
handle: Arc<Mutex<Box<dyn std::any::Any + Send + Sync>>>,
}
impl DeviceStream {
pub fn new(device: DeviceType) -> CoreResult<Self> {
Ok(Self {
device,
handle: Arc::new(Mutex::new(Box::new(()))),
})
}
pub fn synchronize(&self) {
}
}
pub struct DeviceMemoryPool {
device: DeviceType,
freebuffers: Mutex<HashMap<usize, Vec<Box<dyn std::any::Any + Send + Sync>>>>,
max_poolsize: usize,
current_poolsize: std::sync::atomic::AtomicUsize,
}
impl std::fmt::Debug for DeviceMemoryPool {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("DeviceMemoryPool")
.field("device", &self.device)
.field("freebuffers", &"<freebuffers>")
.field("max_poolsize", &self.max_poolsize)
.field(
"current_poolsize",
&self
.current_poolsize
.load(std::sync::atomic::Ordering::Relaxed),
)
.finish()
}
}
impl DeviceMemoryPool {
pub fn new(device: DeviceType, max_poolsize: usize) -> Self {
Self {
device,
freebuffers: Mutex::new(HashMap::new()),
max_poolsize,
current_poolsize: std::sync::atomic::AtomicUsize::new(0),
}
}
pub fn allocate<T: GpuDataType + num_traits::Zero>(
&self,
size: usize,
) -> CoreResult<DeviceBuffer<T>> {
let mut freebuffers = self.freebuffers.lock().expect("Operation failed");
if let Some(buffers) = freebuffers.get_mut(&size) {
if let Some(buffer) = buffers.pop() {
if let Ok(buffer) = buffer.downcast::<DeviceBuffer<T>>() {
return Ok(*buffer);
}
}
}
match self.device {
DeviceType::Cpu => {
let array = Array::<T, crate::ndarray::IxDyn>::zeros(IxDyn(&[size]));
Ok(DeviceBuffer::new_cpu(array))
}
DeviceType::Gpu(_) => {
Err(CoreError::ImplementationError(
ErrorContext::new("GPU memory allocation not implemented".to_string())
.with_location(ErrorLocation::new(file!(), line!())),
))
}
DeviceType::Tpu => {
Err(CoreError::DeviceError(
ErrorContext::new("TPU not supported".to_string())
.with_location(ErrorLocation::new(file!(), line!())),
))
}
}
}
pub fn free<T: GpuDataType>(&self, buffer: DeviceBuffer<T>) {
let size = buffer.size();
let buffersize = size * std::mem::size_of::<T>();
let current_size = self
.current_poolsize
.load(std::sync::atomic::Ordering::SeqCst);
if current_size + buffersize > self.max_poolsize {
return;
}
let mut freebuffers = self.freebuffers.lock().expect("Operation failed");
freebuffers.entry(size).or_default().push(Box::new(buffer));
self.current_poolsize
.fetch_add(buffersize, std::sync::atomic::Ordering::SeqCst);
}
pub fn clear(&self) {
let mut freebuffers = self.freebuffers.lock().expect("Operation failed");
freebuffers.clear();
self.current_poolsize
.store(0, std::sync::atomic::Ordering::SeqCst);
}
}
impl<T: GpuDataType, D: Dimension> DeviceArray<T, D> {
pub fn map<F>(&self, f: F, manager: &DeviceMemoryManager) -> CoreResult<DeviceArray<T, D>>
where
F: Fn(T) -> T + Send + Sync,
D: Clone,
{
if self.is_on_cpu() {
if let Some(cpuarray) = self.as_cpuarray() {
let mapped = cpuarray.map(|&x| f(x));
return Ok(DeviceArray {
buffer: DeviceBuffer::new_cpu(mapped),
shape: self.shape.clone(),
device: DeviceType::Cpu,
phantom: PhantomData,
});
}
}
let hostarray = manager.transfer_to_host(self, None)?;
let mapped = hostarray.map(|&x| f(x));
manager.transfer_to_device(&mapped, self.device, None)
}
pub fn reduce<F>(&self, f: F, manager: &DeviceMemoryManager) -> CoreResult<T>
where
F: Fn(T, T) -> T + Send + Sync,
T: Copy,
{
if self.is_on_cpu() {
if let Some(cpuarray) = self.as_cpuarray() {
if cpuarray.is_empty() {
return Err(CoreError::ValueError(
ErrorContext::new("Cannot reduce empty array".to_string())
.with_location(ErrorLocation::new(file!(), line!())),
));
}
let first = cpuarray[0];
let result = cpuarray.iter().skip(1).fold(first, |acc, &x| f(acc, x));
return Ok(result);
}
}
let hostarray = manager.transfer_to_host(self, None)?;
if hostarray.is_empty() {
return Err(CoreError::ValueError(
ErrorContext::new("Cannot reduce empty array".to_string())
.with_location(ErrorLocation::new(file!(), line!())),
));
}
let first = *hostarray.iter().next().expect("Operation failed");
let result = hostarray.iter().skip(1).fold(first, |acc, &x| f(acc, x));
Ok(result)
}
}
#[derive(Debug)]
pub struct CrossDeviceManager {
memory_managers: HashMap<DeviceType, DeviceMemoryManager>,
memory_pools: HashMap<DeviceType, DeviceMemoryPool>,
active_transfers: Mutex<Vec<TransferEvent>>,
#[allow(dead_code)]
enable_caching: bool,
#[allow(dead_code)]
max_cache_size: usize,
}
impl CrossDeviceManager {
pub fn new(max_cachesize: usize) -> CoreResult<Self> {
let mut memory_managers = HashMap::new();
let mut memory_pools = HashMap::new();
let cpu_manager = DeviceMemoryManager::new(max_cachesize)?;
memory_managers.insert(DeviceType::Cpu, cpu_manager);
memory_pools.insert(
DeviceType::Cpu,
DeviceMemoryPool::new(DeviceType::Cpu, max_cachesize),
);
let gpu_backend = GpuBackend::preferred();
if gpu_backend.is_available() {
let gpu_device = DeviceType::Gpu(gpu_backend);
let gpu_manager = DeviceMemoryManager::new(max_cachesize)?;
memory_managers.insert(gpu_device, gpu_manager);
memory_pools.insert(gpu_device, DeviceMemoryPool::new(gpu_device, max_cachesize));
}
Ok(Self {
memory_managers,
memory_pools,
active_transfers: Mutex::new(Vec::new()),
enable_caching: true,
max_cache_size: max_cachesize,
})
}
pub fn available_devices(&self) -> Vec<DeviceType> {
self.memory_managers.keys().cloned().collect()
}
pub fn is_device_available(&self, device: DeviceType) -> bool {
self.memory_managers.contains_key(&device)
}
pub fn to_device<T, S, D>(
&self,
array: &ArrayBase<S, D>,
device: DeviceType,
options: Option<TransferOptions>,
) -> CoreResult<DeviceArray<T, D>>
where
T: GpuDataType,
S: RawData<Elem = T> + crate::ndarray::Data,
D: Dimension,
{
if !self.is_device_available(device) {
return Err(CoreError::DeviceError(
ErrorContext::new(format!("Device {device} is not available"))
.with_location(ErrorLocation::new(file!(), line!())),
));
}
let manager = self.memory_managers.get(&device).expect("Operation failed");
manager.transfer_to_device(array, device, options)
}
pub fn to_host<T, D>(
&self,
devicearray: &DeviceArray<T, D>,
options: Option<TransferOptions>,
) -> CoreResult<Array<T, D>>
where
T: GpuDataType,
D: Dimension,
{
let manager = self
.memory_managers
.get(&devicearray.device)
.ok_or_else(|| {
CoreError::DeviceError(
ErrorContext::new(format!("Device {} is not available", devicearray.device))
.with_location(ErrorLocation::new(file!(), line!())),
)
})?;
manager.transfer_to_host(devicearray, options)
}
pub fn transfer<T, D>(
&self,
devicearray: &DeviceArray<T, D>,
target_device: DeviceType,
options: Option<TransferOptions>,
) -> CoreResult<DeviceArray<T, D>>
where
T: GpuDataType,
D: Dimension,
{
if !self.is_device_available(target_device) {
return Err(CoreError::DeviceError(
ErrorContext::new(format!("Device {target_device} is not available"))
.with_location(ErrorLocation::new(file!(), line!())),
));
}
let manager = self
.memory_managers
.get(&devicearray.device)
.ok_or_else(|| {
CoreError::DeviceError(
ErrorContext::new(format!("Device {} is not available", devicearray.device))
.with_location(ErrorLocation::new(file!(), line!())),
)
})?;
manager.transfer_between_devices(devicearray, target_device, options)
}
pub fn execute_kernel<T, D>(
&self,
devicearray: &DeviceArray<T, D>,
kernel_name: &str,
params: HashMap<String, KernelParam>,
) -> CoreResult<()>
where
T: GpuDataType,
D: Dimension,
{
let manager = self
.memory_managers
.get(&devicearray.device)
.ok_or_else(|| {
CoreError::DeviceError(
ErrorContext::new(format!("Device {} is not available", devicearray.device))
.with_location(ErrorLocation::new(file!(), line!())),
)
})?;
manager.execute_kernel(devicearray, kernel_name, params)
}
pub fn allocate<T: GpuDataType + num_traits::Zero>(
&self,
size: usize,
device: DeviceType,
) -> CoreResult<DeviceBuffer<T>> {
if !self.is_device_available(device) {
return Err(CoreError::DeviceError(
ErrorContext::new(format!("Device {device} is not available"))
.with_location(ErrorLocation::new(file!(), line!())),
));
}
let pool = self.memory_pools.get(&device).expect("Operation failed");
pool.allocate(size)
}
pub fn free<T: GpuDataType>(&self, buffer: DeviceBuffer<T>, device: DeviceType) {
if !self.is_device_available(device) {
return;
}
let pool = self.memory_pools.get(&device).expect("Operation failed");
pool.free(buffer);
}
pub fn clear(&self) {
for manager in self.memory_managers.values() {
manager.clear_cache();
}
for pool in self.memory_pools.values() {
pool.clear();
}
let mut active_transfers = self.active_transfers.lock().expect("Operation failed");
active_transfers.clear();
}
pub fn synchronize(&self) {
let mut active_transfers = self.active_transfers.lock().expect("Operation failed");
for event in active_transfers.drain(..) {
event.wait();
}
}
}
#[allow(dead_code)]
pub fn create_cross_device_manager() -> CoreResult<CrossDeviceManager> {
CrossDeviceManager::new(1024 * 1024 * 1024) }
pub trait ToDevice<T, D>
where
T: GpuDataType,
D: Dimension,
{
fn to_device(
&self,
device: DeviceType,
manager: &CrossDeviceManager,
) -> CoreResult<DeviceArray<T, D>>;
}
impl<T, S, D> ToDevice<T, D> for ArrayBase<S, D>
where
T: GpuDataType,
S: RawData<Elem = T> + crate::ndarray::Data,
D: Dimension,
{
fn to_device(
&self,
device: DeviceType,
manager: &CrossDeviceManager,
) -> CoreResult<DeviceArray<T, D>> {
manager.to_device(self, device, None)
}
}
pub trait ToHost<T, D>
where
T: GpuDataType,
D: Dimension,
{
fn to_host(&self, manager: &CrossDeviceManager) -> CoreResult<Array<T, D>>;
}
impl<T, D> ToHost<T, D> for DeviceArray<T, D>
where
T: GpuDataType,
D: Dimension,
{
fn to_host(&self, manager: &CrossDeviceManager) -> CoreResult<Array<T, D>> {
manager.to_host(self, None)
}
}
#[allow(dead_code)]
pub fn create_cpuarray<T, S, D>(array: &ArrayBase<S, D>) -> DeviceArray<T, D>
where
T: GpuDataType,
S: RawData<Elem = T> + crate::ndarray::Data,
D: Dimension,
{
DeviceArray::new_cpu(array.to_owned())
}
#[allow(dead_code)]
pub fn create_gpuarray<T, S, D>(
array: &ArrayBase<S, D>,
manager: &CrossDeviceManager,
) -> CoreResult<DeviceArray<T, D>>
where
T: GpuDataType,
S: RawData<Elem = T> + crate::ndarray::Data,
D: Dimension,
{
for device in manager.available_devices() {
if let DeviceType::Gpu(_) = device {
return manager.to_device(array, device, None);
}
}
Err(CoreError::DeviceError(
ErrorContext::new("No GPU device available".to_string())
.with_location(ErrorLocation::new(file!(), line!())),
))
}
#[allow(dead_code)]
pub fn to_best_device<T, S, D>(
array: &ArrayBase<S, D>,
manager: &CrossDeviceManager,
) -> CoreResult<DeviceArray<T, D>>
where
T: GpuDataType,
S: RawData<Elem = T> + crate::ndarray::Data,
D: Dimension,
{
for device in manager.available_devices() {
if let DeviceType::Gpu(_) = device {
return manager.to_device(array, device, None);
}
}
Ok(create_cpuarray(array))
}