use crate::device::async_executor::{AsyncExecutor, AsyncExecutorConfig};
use crate::device::kernel_fusion::{FusionConfig, KernelFusionEngine};
use crate::device::memory_pool::{GpuMemoryPool, PoolConfig};
use crate::device::{Backend, DeviceInfo, DeviceMemory, DeviceType, Kernel};
use crate::error::{NnlError, Result};
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use vulkano::{
VulkanLibrary,
buffer::{Buffer, BufferCreateInfo, BufferUsage, Subbuffer},
command_buffer::{
AutoCommandBufferBuilder, CommandBufferUsage, CopyBufferInfo,
allocator::StandardCommandBufferAllocator,
},
descriptor_set::{
PersistentDescriptorSet, WriteDescriptorSet, allocator::StandardDescriptorSetAllocator,
},
device::{
Device as VkDevice, DeviceCreateInfo, DeviceExtensions, Features, Queue, QueueCreateInfo,
QueueFlags, physical::PhysicalDeviceType,
},
instance::{Instance, InstanceCreateInfo},
memory::allocator::{AllocationCreateInfo, MemoryTypeFilter, StandardMemoryAllocator},
pipeline::{
ComputePipeline, Pipeline, PipelineBindPoint, PipelineLayout,
PipelineShaderStageCreateInfo, compute::ComputePipelineCreateInfo,
layout::PipelineDescriptorSetLayoutCreateInfo,
},
shader::{ShaderModule, ShaderModuleCreateInfo},
sync::{self, GpuFuture},
};
use std::sync::LazyLock;
static GLOBAL_VULKAN_BACKEND: LazyLock<Mutex<Option<Arc<VulkanBackendImpl>>>> =
LazyLock::new(|| Mutex::new(None));
pub struct VulkanBackend {
inner: Arc<VulkanBackendImpl>,
}
struct VulkanBackendImpl {
device: Arc<VkDevice>,
queue: Arc<Queue>,
memory_allocator: Arc<StandardMemoryAllocator>,
command_buffer_allocator: Arc<StandardCommandBufferAllocator>,
descriptor_set_allocator: Arc<StandardDescriptorSetAllocator>,
pipelines: Arc<Mutex<HashMap<String, Arc<ComputePipeline>>>>,
device_info: DeviceInfo,
memory_pool: Arc<GpuMemoryPool>,
fusion_engine: Arc<KernelFusionEngine>,
async_executor: Arc<AsyncExecutor>,
}
impl VulkanBackend {
pub fn new() -> Result<Self> {
let mut global = GLOBAL_VULKAN_BACKEND.lock().unwrap();
if let Some(ref inner) = *global {
return Ok(Self {
inner: inner.clone(),
});
}
let inner = Self::create_backend_impl()?;
*global = Some(inner.clone());
Ok(Self { inner })
}
fn create_backend_impl() -> Result<Arc<VulkanBackendImpl>> {
let library = VulkanLibrary::new()
.map_err(|e| NnlError::gpu(format!("Failed to load Vulkan library: {}", e)))?;
let instance = Instance::new(
library,
InstanceCreateInfo {
application_name: Some("NNL Neural Network Library".into()),
application_version: vulkano::Version::V1_0,
..Default::default()
},
)
.map_err(|e| NnlError::gpu(format!("Failed to create Vulkan instance: {}", e)))?;
let physical_device = instance
.enumerate_physical_devices()
.map_err(|e| NnlError::gpu(format!("Failed to enumerate devices: {}", e)))?
.into_iter()
.max_by_key(|device| match device.properties().device_type {
PhysicalDeviceType::DiscreteGpu => 3,
PhysicalDeviceType::IntegratedGpu => 2,
PhysicalDeviceType::VirtualGpu => 1,
_ => 0,
})
.ok_or_else(|| NnlError::gpu("No suitable Vulkan device found"))?;
let queue_family_index = physical_device
.queue_family_properties()
.iter()
.enumerate()
.find_map(|(i, q)| {
if q.queue_flags.intersects(QueueFlags::COMPUTE) {
Some(i as u32)
} else {
None
}
})
.ok_or_else(|| NnlError::gpu("No compute queue family found"))?;
let (device, mut queues) = VkDevice::new(
physical_device.clone(),
DeviceCreateInfo {
queue_create_infos: vec![QueueCreateInfo {
queue_family_index,
..Default::default()
}],
enabled_extensions: DeviceExtensions::empty(),
enabled_features: Features::empty(),
..Default::default()
},
)
.map_err(|e| NnlError::gpu(format!("Failed to create device: {}", e)))?;
let queue = queues.next().unwrap();
let memory_allocator = Arc::new(StandardMemoryAllocator::new_default(device.clone()));
let command_buffer_allocator = Arc::new(StandardCommandBufferAllocator::new(
device.clone(),
Default::default(),
));
let descriptor_set_allocator = Arc::new(StandardDescriptorSetAllocator::new(
device.clone(),
Default::default(),
));
let properties = physical_device.properties();
let memory_properties = physical_device.memory_properties();
let total_memory = memory_properties
.memory_heaps
.iter()
.map(|heap| heap.size)
.max()
.unwrap_or(0);
let device_info = DeviceInfo {
name: properties.device_name.clone(),
device_type: DeviceType::Vulkan,
memory_size: Some(total_memory),
compute_units: Some(properties.max_compute_work_group_count[0]),
supports_f16: false, supports_f64: false, };
let memory_pool = Arc::new(GpuMemoryPool::with_config(
memory_allocator.clone(),
PoolConfig {
max_buffers_per_bucket: 64,
min_buffer_size: 1024,
max_buffer_size: 512 * 1024 * 1024, enable_background_cleanup: true,
cleanup_interval_secs: 30,
buffer_idle_timeout_secs: 300,
track_memory_usage: true,
},
));
let fusion_engine = Arc::new(KernelFusionEngine::with_config(FusionConfig {
max_ops_per_kernel: 12,
max_intermediate_buffers: 6,
aggressive_fusion: true,
min_ops_for_fusion: 2,
enable_matmul_fusion: true,
enable_elementwise_fusion: true,
}));
let all_queues = vec![queue.clone()];
if let Some(_queue_family) = device
.physical_device()
.queue_family_properties()
.iter()
.enumerate()
.find(|(_, q)| q.queue_flags.intersects(QueueFlags::COMPUTE))
.map(|(i, _)| i)
{
for _i in 1..4 {
break;
}
}
let num_available_queues = all_queues.len();
let async_executor = Arc::new(
AsyncExecutor::with_config(
device.clone(),
all_queues,
AsyncExecutorConfig {
num_compute_streams: num_available_queues.min(1), num_transfer_streams: 0, max_operations_per_stream: 512,
enable_load_balancing: false, enable_transfer_overlap: false, stream_selection: crate::device::async_executor::StreamSelection::RoundRobin,
thread_pool_size: 1, operation_timeout_secs: 30,
},
)
.map_err(|e| NnlError::gpu(format!("Failed to create async executor: {}", e)))?,
);
Ok(Arc::new(VulkanBackendImpl {
device,
queue,
memory_allocator,
command_buffer_allocator,
descriptor_set_allocator,
pipelines: Arc::new(Mutex::new(HashMap::new())),
device_info,
memory_pool,
fusion_engine,
async_executor,
}))
}
fn get_pipeline(&self, shader_name: &str) -> Result<Arc<ComputePipeline>> {
let mut pipelines = self.inner.pipelines.lock().unwrap();
if let Some(pipeline) = pipelines.get(shader_name) {
return Ok(pipeline.clone());
}
let shader_code = Self::get_shader_spirv(shader_name)?;
let shader = unsafe {
ShaderModule::new(
self.inner.device.clone(),
ShaderModuleCreateInfo::new(&shader_code),
)
.map_err(|e| NnlError::gpu(format!("Failed to create shader module: {}", e)))?
};
let stage = PipelineShaderStageCreateInfo::new(shader.entry_point("main").unwrap());
let layout = PipelineLayout::new(
self.inner.device.clone(),
PipelineDescriptorSetLayoutCreateInfo::from_stages([&stage])
.into_pipeline_layout_create_info(self.inner.device.clone())
.unwrap(),
)
.unwrap();
let pipeline = ComputePipeline::new(
self.inner.device.clone(),
None,
ComputePipelineCreateInfo::stage_layout(stage, layout),
)
.map_err(|e| NnlError::gpu(format!("Failed to create compute pipeline: {}", e)))?;
pipelines.insert(shader_name.to_string(), pipeline.clone());
Ok(pipeline)
}
fn get_shader_spirv(shader_name: &str) -> Result<Vec<u32>> {
match shader_name {
"elementwise_add" => Ok(include_bytes!("../shaders/elementwise_add.spv")
.chunks_exact(4)
.map(|chunk| u32::from_le_bytes([chunk[0], chunk[1], chunk[2], chunk[3]]))
.collect()),
"elementwise_sub" => Ok(include_bytes!("../shaders/elementwise_sub.spv")
.chunks_exact(4)
.map(|chunk| u32::from_le_bytes([chunk[0], chunk[1], chunk[2], chunk[3]]))
.collect()),
"elementwise_mul" => Ok(include_bytes!("../shaders/elementwise_mul.spv")
.chunks_exact(4)
.map(|chunk| u32::from_le_bytes([chunk[0], chunk[1], chunk[2], chunk[3]]))
.collect()),
"elementwise_div" => Ok(include_bytes!("../shaders/elementwise_div.spv")
.chunks_exact(4)
.map(|chunk| u32::from_le_bytes([chunk[0], chunk[1], chunk[2], chunk[3]]))
.collect()),
"scalar_add" => Ok(include_bytes!("../shaders/scalar_add.spv")
.chunks_exact(4)
.map(|chunk| u32::from_le_bytes([chunk[0], chunk[1], chunk[2], chunk[3]]))
.collect()),
"scalar_mul" => Ok(include_bytes!("../shaders/scalar_mul.spv")
.chunks_exact(4)
.map(|chunk| u32::from_le_bytes([chunk[0], chunk[1], chunk[2], chunk[3]]))
.collect()),
"matrix_mul" => Ok(include_bytes!("../shaders/matrix_mul.spv")
.chunks_exact(4)
.map(|chunk| u32::from_le_bytes([chunk[0], chunk[1], chunk[2], chunk[3]]))
.collect()),
"relu" => Ok(include_bytes!("../shaders/relu.spv")
.chunks_exact(4)
.map(|chunk| u32::from_le_bytes([chunk[0], chunk[1], chunk[2], chunk[3]]))
.collect()),
"sigmoid" => Ok(include_bytes!("../shaders/sigmoid.spv")
.chunks_exact(4)
.map(|chunk| u32::from_le_bytes([chunk[0], chunk[1], chunk[2], chunk[3]]))
.collect()),
"tanh" => Ok(include_bytes!("../shaders/tanh.spv")
.chunks_exact(4)
.map(|chunk| u32::from_le_bytes([chunk[0], chunk[1], chunk[2], chunk[3]]))
.collect()),
"softmax" => Ok(include_bytes!("../shaders/softmax.spv")
.chunks_exact(4)
.map(|chunk| u32::from_le_bytes([chunk[0], chunk[1], chunk[2], chunk[3]]))
.collect()),
"transpose" => Ok(include_bytes!("../shaders/transpose.spv")
.chunks_exact(4)
.map(|chunk| u32::from_le_bytes([chunk[0], chunk[1], chunk[2], chunk[3]]))
.collect()),
"copy" => Ok(include_bytes!("../shaders/copy.spv")
.chunks_exact(4)
.map(|chunk| u32::from_le_bytes([chunk[0], chunk[1], chunk[2], chunk[3]]))
.collect()),
"sqrt" => Ok(include_bytes!("../shaders/sqrt.spv")
.chunks_exact(4)
.map(|chunk| u32::from_le_bytes([chunk[0], chunk[1], chunk[2], chunk[3]]))
.collect()),
"gelu" => Ok(include_bytes!("../shaders/gelu.spv")
.chunks_exact(4)
.map(|chunk| u32::from_le_bytes([chunk[0], chunk[1], chunk[2], chunk[3]]))
.collect()),
"swish" => Ok(include_bytes!("../shaders/swish.spv")
.chunks_exact(4)
.map(|chunk| u32::from_le_bytes([chunk[0], chunk[1], chunk[2], chunk[3]]))
.collect()),
"reduce_sum" => Ok(include_bytes!("../shaders/reduce_sum.spv")
.chunks_exact(4)
.map(|chunk| u32::from_le_bytes([chunk[0], chunk[1], chunk[2], chunk[3]]))
.collect()),
"conv2d" => Ok(include_bytes!("../shaders/conv2d.spv")
.chunks_exact(4)
.map(|chunk| u32::from_le_bytes([chunk[0], chunk[1], chunk[2], chunk[3]]))
.collect()),
_ => Err(NnlError::gpu(format!("Unknown shader: {}", shader_name))),
}
}
pub fn execute_compute_operation(
&self,
operation: &str,
input_buffers: &[Arc<VulkanBuffer>],
output_buffers: &[Arc<VulkanBuffer>],
uniform_data: Option<&[u32]>,
) -> Result<()> {
let pipeline = self.get_pipeline(operation)?;
let mut builder = AutoCommandBufferBuilder::primary(
&self.inner.command_buffer_allocator,
self.inner.queue.queue_family_index(),
CommandBufferUsage::OneTimeSubmit,
)
.map_err(|e| NnlError::gpu(format!("Failed to create command buffer: {}", e)))?;
let layout = pipeline.layout().set_layouts().get(0).unwrap();
let mut set_builder = Vec::new();
for (i, buffer) in input_buffers.iter().enumerate() {
set_builder.push(WriteDescriptorSet::buffer(i as u32, buffer.buffer.clone()));
}
for (i, buffer) in output_buffers.iter().enumerate() {
let binding = (input_buffers.len() + i) as u32;
set_builder.push(WriteDescriptorSet::buffer(binding, buffer.buffer.clone()));
}
if let Some(uniform) = uniform_data {
let uniform_buffer = Buffer::from_iter(
self.inner.memory_allocator.clone(),
BufferCreateInfo {
usage: BufferUsage::UNIFORM_BUFFER,
..Default::default()
},
AllocationCreateInfo {
memory_type_filter: MemoryTypeFilter::PREFER_DEVICE
| MemoryTypeFilter::HOST_SEQUENTIAL_WRITE,
..Default::default()
},
uniform.iter().cloned(),
)
.map_err(|e| NnlError::gpu(format!("Failed to create uniform buffer: {}", e)))?;
let binding = match operation {
"scalar_add" | "scalar_mul" => 2,
"matrix_mul" | "transpose" | "softmax" | "reduce_sum" | "conv2d" => {
(input_buffers.len() + output_buffers.len()) as u32
}
_ => (input_buffers.len() + output_buffers.len()) as u32,
};
set_builder.push(WriteDescriptorSet::buffer(binding, uniform_buffer));
}
let descriptor_set = PersistentDescriptorSet::new(
&self.inner.descriptor_set_allocator,
layout.clone(),
set_builder,
[],
)
.map_err(|e| NnlError::gpu(format!("Failed to create descriptor set: {}", e)))?;
let (dispatch_x, dispatch_y) = if operation == "matrix_mul" {
if let Some(uniform) = uniform_data {
let m = uniform[0] as u32; let n = uniform[1] as u32; let _k = uniform[2] as u32;
let local_size = 16u32;
let groups_x = (n + local_size - 1) / local_size;
let groups_y = (m + local_size - 1) / local_size;
(groups_x, groups_y)
} else {
return Err(NnlError::gpu(
"Matrix multiplication requires uniform buffer with dimensions",
));
}
} else if operation == "transpose" {
let total_elements = if !output_buffers.is_empty() {
output_buffers[0].size() / std::mem::size_of::<f32>()
} else {
return Err(NnlError::gpu("No output buffers provided"));
};
let local_size = 16u32;
let dispatch_x = ((total_elements as f32).sqrt() as u32 + local_size - 1) / local_size;
(dispatch_x, dispatch_x)
} else {
let total_elements = if !output_buffers.is_empty() {
output_buffers[0].size() / std::mem::size_of::<f32>()
} else {
return Err(NnlError::gpu("No output buffers provided"));
};
let local_size = 64u32;
let dispatch_x = ((total_elements as u32) + local_size - 1) / local_size;
(dispatch_x, 1)
};
builder
.bind_pipeline_compute(pipeline.clone())
.map_err(|e| NnlError::gpu(format!("Failed to bind pipeline: {}", e)))?
.bind_descriptor_sets(
PipelineBindPoint::Compute,
pipeline.layout().clone(),
0,
descriptor_set,
)
.map_err(|e| NnlError::gpu(format!("Failed to bind descriptor sets: {}", e)))?
.dispatch([dispatch_x, dispatch_y, 1])
.map_err(|e| NnlError::gpu(format!("Failed to dispatch: {}", e)))?;
let command_buffer = builder
.build()
.map_err(|e| NnlError::gpu(format!("Failed to build command buffer: {}", e)))?;
let _ = sync::now(self.inner.device.clone())
.then_execute(self.inner.queue.clone(), command_buffer)
.map_err(|e| NnlError::gpu(format!("Failed to execute command buffer: {}", e)))?
.then_signal_fence_and_flush()
.map_err(|e| NnlError::gpu(format!("Failed to signal fence: {}", e)))?;
Ok(())
}
}
impl Backend for VulkanBackend {
fn device_info(&self) -> Result<DeviceInfo> {
Ok(self.inner.device_info.clone())
}
fn allocate(&self, size: usize) -> Result<Arc<dyn DeviceMemory>> {
let buffer_size = size * std::mem::size_of::<f32>();
let pooled_buffer = self.inner.memory_pool.get_buffer(buffer_size)?;
Ok(pooled_buffer as Arc<dyn DeviceMemory>)
}
fn allocate_uniform(&self, size: usize) -> Result<Arc<dyn DeviceMemory>> {
let buffer = VulkanBuffer::new(
self.inner.memory_allocator.clone(),
size * std::mem::size_of::<u32>(),
true,
)?;
Ok(Arc::new(buffer) as Arc<dyn DeviceMemory>)
}
fn copy_to_device(&self, data: &[f32], memory: &dyn DeviceMemory) -> Result<()> {
let vulkan_buffer = memory
.as_any()
.downcast_ref::<VulkanBuffer>()
.ok_or_else(|| NnlError::device("Invalid buffer type for Vulkan backend"))?;
vulkan_buffer.write_data(
data,
self.inner.memory_allocator.clone(),
self.inner.command_buffer_allocator.clone(),
self.inner.queue.clone(),
)
}
fn copy_u32_to_device(&self, data: &[u32], memory: &dyn DeviceMemory) -> Result<()> {
let vulkan_buffer = memory
.as_any()
.downcast_ref::<VulkanBuffer>()
.ok_or_else(|| NnlError::device("Invalid buffer type for Vulkan backend"))?;
vulkan_buffer.write_u32_data(
data,
self.inner.memory_allocator.clone(),
self.inner.command_buffer_allocator.clone(),
self.inner.queue.clone(),
)
}
fn copy_to_host(&self, memory: &dyn DeviceMemory, data: &mut [f32]) -> Result<()> {
let vulkan_buffer = memory
.as_any()
.downcast_ref::<VulkanBuffer>()
.ok_or_else(|| NnlError::device("Invalid buffer type for Vulkan backend"))?;
vulkan_buffer.read_data(
data,
self.inner.memory_allocator.clone(),
self.inner.command_buffer_allocator.clone(),
self.inner.queue.clone(),
)
}
fn execute_kernel(
&self,
kernel: &dyn Kernel,
inputs: &[&dyn DeviceMemory],
outputs: &[&dyn DeviceMemory],
) -> Result<()> {
if let Some(fused_kernels) = self.try_fuse_kernel(kernel, inputs, outputs)? {
for fused_kernel in fused_kernels {
self.execute_fused_kernel(&fused_kernel)?;
}
Ok(())
} else {
self.execute_kernel_with_uniform(kernel, inputs, outputs, None)
}
}
fn execute_kernel_with_uniform(
&self,
kernel: &dyn Kernel,
inputs: &[&dyn DeviceMemory],
outputs: &[&dyn DeviceMemory],
uniform: Option<&dyn DeviceMemory>,
) -> Result<()> {
let vulkan_kernel = kernel
.as_any()
.downcast_ref::<VulkanKernel>()
.ok_or_else(|| NnlError::device("Invalid kernel type for Vulkan backend"))?;
let input_buffers: Result<Vec<_>> = inputs
.iter()
.map(|mem| {
mem.as_any()
.downcast_ref::<VulkanBuffer>()
.ok_or_else(|| NnlError::device("Invalid input buffer type"))
.map(|buf| Arc::new(buf.clone()))
})
.collect();
let input_buffers = input_buffers?;
let output_buffers: Result<Vec<_>> = outputs
.iter()
.map(|mem| {
mem.as_any()
.downcast_ref::<VulkanBuffer>()
.ok_or_else(|| NnlError::device("Invalid output buffer type"))
.map(|buf| Arc::new(buf.clone()))
})
.collect();
let output_buffers = output_buffers?;
let uniform_data = if let Some(uniform_mem) = uniform {
let uniform_buffer = uniform_mem
.as_any()
.downcast_ref::<VulkanBuffer>()
.ok_or_else(|| NnlError::device("Invalid uniform buffer type"))?;
Some(uniform_buffer.read_u32_data(
self.inner.memory_allocator.clone(),
self.inner.command_buffer_allocator.clone(),
self.inner.queue.clone(),
)?)
} else {
None
};
self.execute_compute_operation(
vulkan_kernel.name(),
&input_buffers,
&output_buffers,
uniform_data.as_deref(),
)
}
fn synchronize(&self) -> Result<()> {
self.inner.async_executor.synchronize()?;
unsafe {
self.inner
.device
.wait_idle()
.map_err(|e| NnlError::gpu(format!("Failed to synchronize device: {}", e)))
}
}
fn is_available(&self) -> bool {
true }
fn as_any(&self) -> &dyn std::any::Any {
self
}
}
#[derive(Debug, Clone)]
pub struct VulkanBuffer {
buffer: Subbuffer<[f32]>, size_in_bytes: usize,
#[allow(dead_code)]
is_uniform: bool,
}
impl VulkanBuffer {
pub fn new(
allocator: Arc<StandardMemoryAllocator>,
size_in_bytes: usize,
is_uniform: bool,
) -> Result<Self> {
let size_in_f32s = size_in_bytes / std::mem::size_of::<f32>();
let usage = if is_uniform {
BufferUsage::UNIFORM_BUFFER | BufferUsage::TRANSFER_SRC | BufferUsage::TRANSFER_DST
} else {
BufferUsage::STORAGE_BUFFER | BufferUsage::TRANSFER_SRC | BufferUsage::TRANSFER_DST
};
let buffer = Buffer::new_slice::<f32>(
allocator,
BufferCreateInfo {
usage,
..Default::default()
},
AllocationCreateInfo {
memory_type_filter: MemoryTypeFilter::PREFER_DEVICE,
..Default::default()
},
size_in_f32s as u64,
)
.map_err(|e| NnlError::gpu(format!("Failed to create buffer: {}", e)))?;
Ok(Self {
buffer,
size_in_bytes,
is_uniform,
})
}
pub fn from_buffer(
buffer: Subbuffer<[f32]>,
size_in_bytes: usize,
is_uniform: bool,
) -> Result<Self> {
Ok(Self {
buffer,
size_in_bytes,
is_uniform,
})
}
pub fn write_u32_data(
&self,
data: &[u32],
allocator: Arc<StandardMemoryAllocator>,
command_allocator: Arc<StandardCommandBufferAllocator>,
queue: Arc<Queue>,
) -> Result<()> {
if data.len() * std::mem::size_of::<u32>() != self.size_in_bytes {
return Err(NnlError::device("Data size mismatch"));
}
let f32_data: Vec<f32> = data.iter().map(|&x| x as f32).collect();
let staging_buffer = Buffer::from_iter(
allocator,
BufferCreateInfo {
usage: BufferUsage::TRANSFER_SRC,
..Default::default()
},
AllocationCreateInfo {
memory_type_filter: MemoryTypeFilter::PREFER_HOST
| MemoryTypeFilter::HOST_SEQUENTIAL_WRITE,
..Default::default()
},
f32_data.iter().cloned(),
)
.map_err(|e| NnlError::gpu(format!("Failed to create staging buffer: {}", e)))?;
let mut builder = AutoCommandBufferBuilder::primary(
&command_allocator,
queue.queue_family_index(),
CommandBufferUsage::OneTimeSubmit,
)
.map_err(|e| NnlError::gpu(format!("Failed to create command buffer: {}", e)))?;
builder
.copy_buffer(CopyBufferInfo::buffers(staging_buffer, self.buffer.clone()))
.map_err(|e| NnlError::gpu(format!("Failed to copy buffer: {}", e)))?;
let command_buffer = builder
.build()
.map_err(|e| NnlError::gpu(format!("Failed to build command buffer: {}", e)))?;
let _ = sync::now(queue.device().clone())
.then_execute(queue.clone(), command_buffer)
.map_err(|e| NnlError::gpu(format!("Failed to execute command buffer: {}", e)))?
.then_signal_fence_and_flush()
.map_err(|e| NnlError::gpu(format!("Failed to signal fence: {}", e)))?;
Ok(())
}
pub fn write_data(
&self,
data: &[f32],
allocator: Arc<StandardMemoryAllocator>,
command_allocator: Arc<StandardCommandBufferAllocator>,
queue: Arc<Queue>,
) -> Result<()> {
let expected_bytes = data.len() * std::mem::size_of::<f32>();
if expected_bytes > self.size_in_bytes {
return Err(NnlError::device(&format!(
"Data too large for buffer: expected {} bytes (data.len()={} * {}), but buffer size is only {} bytes",
expected_bytes,
data.len(),
std::mem::size_of::<f32>(),
self.size_in_bytes
)));
}
let staging_buffer = Buffer::from_iter(
allocator,
BufferCreateInfo {
usage: BufferUsage::TRANSFER_SRC,
..Default::default()
},
AllocationCreateInfo {
memory_type_filter: MemoryTypeFilter::PREFER_HOST
| MemoryTypeFilter::HOST_SEQUENTIAL_WRITE,
..Default::default()
},
data.iter().cloned(),
)
.map_err(|e| NnlError::gpu(format!("Failed to create staging buffer: {}", e)))?;
let mut builder = AutoCommandBufferBuilder::primary(
&command_allocator,
queue.queue_family_index(),
CommandBufferUsage::OneTimeSubmit,
)
.map_err(|e| NnlError::gpu(format!("Failed to create command buffer: {}", e)))?;
builder
.copy_buffer(CopyBufferInfo::buffers(staging_buffer, self.buffer.clone()))
.map_err(|e| NnlError::gpu(format!("Failed to copy buffer: {}", e)))?;
let command_buffer = builder
.build()
.map_err(|e| NnlError::gpu(format!("Failed to build command buffer: {}", e)))?;
let _ = sync::now(queue.device().clone())
.then_execute(queue.clone(), command_buffer)
.map_err(|e| NnlError::gpu(format!("Failed to execute command buffer: {}", e)))?
.then_signal_fence_and_flush()
.map_err(|e| NnlError::gpu(format!("Failed to signal fence: {}", e)))?;
Ok(())
}
pub fn read_data(
&self,
output: &mut [f32],
allocator: Arc<StandardMemoryAllocator>,
command_allocator: Arc<StandardCommandBufferAllocator>,
queue: Arc<Queue>,
) -> Result<()> {
let expected_bytes = output.len() * std::mem::size_of::<f32>();
if expected_bytes > self.size_in_bytes {
return Err(NnlError::device(&format!(
"Output buffer too large: expected {} bytes (output.len()={} * {}), but buffer size is only {} bytes",
expected_bytes,
output.len(),
std::mem::size_of::<f32>(),
self.size_in_bytes
)));
}
let size_in_f32s = output.len();
let staging_buffer = Buffer::new_slice::<f32>(
allocator,
BufferCreateInfo {
usage: BufferUsage::TRANSFER_DST,
..Default::default()
},
AllocationCreateInfo {
memory_type_filter: MemoryTypeFilter::PREFER_HOST
| MemoryTypeFilter::HOST_RANDOM_ACCESS,
..Default::default()
},
size_in_f32s as u64,
)
.map_err(|e| NnlError::gpu(format!("Failed to create staging buffer: {}", e)))?;
let mut builder = AutoCommandBufferBuilder::primary(
&command_allocator,
queue.queue_family_index(),
CommandBufferUsage::OneTimeSubmit,
)
.map_err(|e| NnlError::gpu(format!("Failed to create command buffer: {}", e)))?;
builder
.copy_buffer(CopyBufferInfo::buffers(
self.buffer.clone(),
staging_buffer.clone(),
))
.map_err(|e| NnlError::gpu(format!("Failed to copy buffer: {}", e)))?;
let command_buffer = builder
.build()
.map_err(|e| NnlError::gpu(format!("Failed to build command buffer: {}", e)))?;
let future = sync::now(queue.device().clone())
.then_execute(queue.clone(), command_buffer)
.map_err(|e| NnlError::gpu(format!("Failed to execute command buffer: {}", e)))?
.then_signal_fence_and_flush()
.map_err(|e| NnlError::gpu(format!("Failed to signal fence: {}", e)))?;
future
.wait(None)
.map_err(|e| NnlError::gpu(format!("Failed to wait for transfer: {}", e)))?;
let staging_read = staging_buffer
.read()
.map_err(|e| NnlError::gpu(format!("Failed to read staging buffer: {}", e)))?;
for (i, &f32_val) in staging_read.iter().enumerate() {
if i < output.len() {
output[i] = f32_val;
}
}
Ok(())
}
pub fn read_u32_data(
&self,
allocator: Arc<StandardMemoryAllocator>,
command_allocator: Arc<StandardCommandBufferAllocator>,
queue: Arc<Queue>,
) -> Result<Vec<u32>> {
let data_len = self.size_in_bytes / std::mem::size_of::<f32>();
let staging_buffer = Buffer::new_slice::<f32>(
allocator,
BufferCreateInfo {
usage: BufferUsage::TRANSFER_DST,
..Default::default()
},
AllocationCreateInfo {
memory_type_filter: MemoryTypeFilter::PREFER_HOST
| MemoryTypeFilter::HOST_RANDOM_ACCESS,
..Default::default()
},
data_len as u64,
)
.map_err(|e| NnlError::gpu(format!("Failed to create staging buffer: {}", e)))?;
let mut builder = AutoCommandBufferBuilder::primary(
&command_allocator,
queue.queue_family_index(),
CommandBufferUsage::OneTimeSubmit,
)
.map_err(|e| NnlError::gpu(format!("Failed to create command buffer: {}", e)))?;
builder
.copy_buffer(CopyBufferInfo::buffers(
self.buffer.clone(),
staging_buffer.clone(),
))
.map_err(|e| NnlError::gpu(format!("Failed to copy buffer: {}", e)))?;
let command_buffer = builder
.build()
.map_err(|e| NnlError::gpu(format!("Failed to build command buffer: {}", e)))?;
let future = sync::now(queue.device().clone())
.then_execute(queue.clone(), command_buffer)
.map_err(|e| NnlError::gpu(format!("Failed to execute command buffer: {}", e)))?
.then_signal_fence_and_flush()
.map_err(|e| NnlError::gpu(format!("Failed to signal fence: {}", e)))?;
future
.wait(None)
.map_err(|e| NnlError::gpu(format!("Failed to wait for transfer: {}", e)))?;
let staging_read = staging_buffer.read().unwrap();
Ok(staging_read.iter().map(|&f| f as u32).collect())
}
}
impl DeviceMemory for VulkanBuffer {
fn size(&self) -> usize {
self.size_in_bytes
}
fn device_type(&self) -> DeviceType {
DeviceType::Vulkan
}
fn as_any(&self) -> &dyn std::any::Any {
self
}
fn as_any_mut(&mut self) -> &mut dyn std::any::Any {
self
}
}
#[derive(Debug)]
pub struct VulkanKernel {
name: String,
dispatch_size: [u32; 3],
}
impl VulkanKernel {
pub fn new(name: String, dispatch_size: [u32; 3]) -> Self {
Self {
name,
dispatch_size,
}
}
pub fn elementwise(name: String, size: u32) -> Self {
Self::new(name, [size.div_ceil(64), 1, 1])
}
pub fn matrix(name: String, rows: u32, cols: u32) -> Self {
Self::new(name, [cols.div_ceil(16), rows.div_ceil(16), 1])
}
pub fn reduction(name: String, size: u32) -> Self {
Self::new(name, [size.div_ceil(256), 1, 1])
}
}
impl Kernel for VulkanKernel {
fn name(&self) -> &str {
&self.name
}
fn local_size(&self) -> Option<[u32; 3]> {
Some(self.dispatch_size)
}
fn as_any(&self) -> &dyn std::any::Any {
self
}
}
impl VulkanBackend {
fn try_fuse_kernel(
&self,
kernel: &dyn Kernel,
inputs: &[&dyn DeviceMemory],
_outputs: &[&dyn DeviceMemory],
) -> Result<Option<Vec<crate::device::kernel_fusion::FusedKernel>>> {
use crate::device::kernel_fusion::{BufferId, FusableOp, MatMulDims};
let vulkan_kernel = kernel
.as_any()
.downcast_ref::<VulkanKernel>()
.ok_or_else(|| NnlError::device("Invalid kernel type for Vulkan backend"))?;
let fusable_op = match vulkan_kernel.name() {
"elementwise_add" => {
if inputs.len() >= 2 {
Some(FusableOp::Add {
a_id: BufferId(0),
b_id: BufferId(1),
})
} else {
None
}
}
"elementwise_mul" => {
if inputs.len() >= 2 {
Some(FusableOp::Mul {
a_id: BufferId(0),
b_id: BufferId(1),
})
} else {
None
}
}
"scalar_add" => {
Some(FusableOp::AddScalar {
input_id: BufferId(0),
scalar: 0.0, })
}
"relu" => Some(FusableOp::Relu {
input_id: BufferId(0),
}),
"matrix_mul" => {
if inputs.len() >= 2 {
Some(FusableOp::MatMul {
a_id: BufferId(0),
b_id: BufferId(1),
dims: MatMulDims { m: 0, k: 0, n: 0 }, })
} else {
None
}
}
_ => None,
};
if let Some(op) = fusable_op {
self.inner.fusion_engine.add_operation(op)?;
let fused_kernels = self.inner.fusion_engine.generate_fused_kernels()?;
if !fused_kernels.is_empty() {
return Ok(Some(
fused_kernels.into_iter().map(|k| (*k).clone()).collect(),
));
}
}
Ok(None)
}
fn execute_fused_kernel(
&self,
fused_kernel: &crate::device::kernel_fusion::FusedKernel,
) -> Result<()> {
use vulkano::{
pipeline::ComputePipeline,
shader::{ShaderModule, ShaderModuleCreateInfo},
};
let spirv_bytes = self.compile_glsl_to_spirv(&fused_kernel.shader_code)?;
let shader = unsafe {
ShaderModule::new(
self.inner.device.clone(),
ShaderModuleCreateInfo::new(&spirv_bytes),
)
.map_err(|e| NnlError::gpu(format!("Failed to create shader module: {}", e)))?
};
let entry_point = shader.entry_point("main").unwrap();
let layout_info =
vulkano::pipeline::layout::PipelineDescriptorSetLayoutCreateInfo::from_stages([
&vulkano::pipeline::PipelineShaderStageCreateInfo::new(entry_point.clone()),
]);
let pipeline_layout = vulkano::pipeline::PipelineLayout::new(
self.inner.device.clone(),
layout_info
.into_pipeline_layout_create_info(self.inner.device.clone())
.unwrap(),
)
.map_err(|e| NnlError::gpu(format!("Failed to create pipeline layout: {}", e)))?;
let pipeline = ComputePipeline::new(
self.inner.device.clone(),
None,
vulkano::pipeline::compute::ComputePipelineCreateInfo::stage_layout(
vulkano::pipeline::PipelineShaderStageCreateInfo::new(entry_point),
pipeline_layout,
),
)
.map_err(|e| NnlError::gpu(format!("Failed to create fused pipeline: {}", e)))?;
let mut builder = vulkano::command_buffer::AutoCommandBufferBuilder::primary(
&self.inner.command_buffer_allocator,
self.inner.queue.queue_family_index(),
vulkano::command_buffer::CommandBufferUsage::OneTimeSubmit,
)
.map_err(|e| NnlError::gpu(format!("Failed to create command buffer: {}", e)))?;
builder
.bind_pipeline_compute(pipeline.clone())
.map_err(|e| NnlError::gpu(format!("Failed to bind pipeline: {}", e)))?;
let (dispatch_x, dispatch_y, dispatch_z) = fused_kernel.local_size;
builder
.dispatch([dispatch_x, dispatch_y, dispatch_z])
.map_err(|e| NnlError::gpu(format!("Failed to dispatch: {}", e)))?;
let command_buffer = builder
.build()
.map_err(|e| NnlError::gpu(format!("Failed to build command buffer: {}", e)))?;
let future = vulkano::sync::now(self.inner.device.clone())
.then_execute(self.inner.queue.clone(), command_buffer)
.map_err(|e| NnlError::gpu(format!("Failed to execute command buffer: {}", e)))?
.then_signal_fence_and_flush()
.map_err(|e| NnlError::gpu(format!("Failed to signal fence: {}", e)))?;
future
.wait(None)
.map_err(|e| NnlError::gpu(format!("Failed to wait for execution: {}", e)))?;
Ok(())
}
fn compile_glsl_to_spirv(&self, glsl_code: &str) -> Result<Vec<u32>> {
log::info!(
"Compiling fused shader with {} bytes of GLSL code",
glsl_code.len()
);
let placeholder_spirv = vec![
0x07230203, 0x00010000, 0x00080001, 0x0000000d, 0x00000000, ];
Ok(placeholder_spirv)
}
pub fn get_memory_pool_stats(&self) -> crate::device::memory_pool::PoolStats {
self.inner.memory_pool.get_stats()
}
pub fn get_executor_stats(&self) -> crate::device::async_executor::ExecutorStats {
self.inner.async_executor.get_stats()
}
pub fn cleanup_memory_pool(&self) -> usize {
self.inner.memory_pool.cleanup_idle_buffers()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_vulkan_backend_creation() {
match VulkanBackend::new() {
Ok(backend) => {
let info = backend.device_info().unwrap();
assert_eq!(info.device_type, DeviceType::Vulkan);
println!("Vulkan device: {}", info.name);
}
Err(e) => {
println!("Vulkan not available: {}", e);
}
}
}
#[test]
fn test_vulkan_buffer_operations() {
if let Ok(backend) = VulkanBackend::new() {
let memory = backend.allocate(4).unwrap(); assert_eq!(memory.size(), 4 * std::mem::size_of::<f32>());
assert_eq!(memory.device_type(), DeviceType::Vulkan);
let test_data = vec![1.0, 2.0, 3.0, 4.0];
backend.copy_to_device(&test_data, memory.as_ref()).unwrap();
let mut result = vec![0.0; 4];
backend.copy_to_host(memory.as_ref(), &mut result).unwrap();
for (actual, expected) in result.iter().zip(test_data.iter()) {
assert!((actual - expected).abs() < 1e-6);
}
}
}
#[test]
fn test_elementwise_operations() {
if let Ok(backend) = VulkanBackend::new() {
let a = vec![1.0, 2.0, 3.0, 4.0];
let b = vec![2.0, 3.0, 4.0, 5.0];
let mem_a = backend.allocate(4).unwrap();
let mem_b = backend.allocate(4).unwrap();
let mem_c = backend.allocate(4).unwrap();
backend.copy_to_device(&a, mem_a.as_ref()).unwrap();
backend.copy_to_device(&b, mem_b.as_ref()).unwrap();
let kernel = VulkanKernel::elementwise("elementwise_add".to_string(), 4);
backend
.execute_kernel(
&kernel,
&[mem_a.as_ref(), mem_b.as_ref()],
&[mem_c.as_ref()],
)
.unwrap();
let mut result = vec![0.0; 4];
backend.copy_to_host(mem_c.as_ref(), &mut result).unwrap();
let expected = vec![3.0, 5.0, 7.0, 9.0];
for (actual, expected) in result.iter().zip(expected.iter()) {
assert!((actual - expected).abs() < 1e-6);
}
}
}
#[test]
fn test_matrix_multiplication() {
if let Ok(backend) = VulkanBackend::new() {
let a = vec![1.0, 2.0, 3.0, 4.0, 5.0, 6.0]; let b = vec![7.0, 8.0, 9.0, 10.0, 11.0, 12.0];
let mem_a = backend.allocate(6).unwrap();
let mem_b = backend.allocate(6).unwrap();
let mem_c = backend.allocate(4).unwrap();
backend.copy_to_device(&a, mem_a.as_ref()).unwrap();
backend.copy_to_device(&b, mem_b.as_ref()).unwrap();
let dims = [2u32, 2u32, 3u32]; let uniform_mem = backend.allocate_uniform(3).unwrap();
backend
.copy_u32_to_device(&dims, uniform_mem.as_ref())
.unwrap();
let kernel = VulkanKernel::matrix("matrix_mul".to_string(), 2, 2);
backend
.execute_kernel_with_uniform(
&kernel,
&[mem_a.as_ref(), mem_b.as_ref()],
&[mem_c.as_ref()],
Some(uniform_mem.as_ref()),
)
.unwrap();
let mut result = vec![0.0; 4];
backend.copy_to_host(mem_c.as_ref(), &mut result).unwrap();
let expected = vec![58.0, 64.0, 139.0, 154.0];
println!("Matrix A: {:?}", a);
println!("Matrix B: {:?}", b);
println!("GPU Result: {:?}", result);
println!("Expected: {:?}", expected);
for (i, (actual, expected)) in result.iter().zip(expected.iter()).enumerate() {
println!(
"Index {}: GPU={}, Expected={}, Diff={}",
i,
actual,
expected,
(actual - expected).abs()
);
assert!((actual - expected).abs() < 1e-6);
}
}
}
}