use std::ptr::NonNull;
use std::sync::Arc;
use std::{mem, ptr};
use crate::compute::{print_model_stats, print_tensorgraph_stats};
use crate::gpu::{
pool::GpuPool,
vk_gpu::{Gpu, HostAccessMode},
};
use crate::instruction;
use crate::onnx_parser::parse_onnx_model;
use crate::scheduler::{ExecutionPlan, create_execution_plan, execute_plan};
use crate::tensor::TensorCell;
use crate::tensor::{DeviceId, Tensor};
use crate::utils::error::VKMLError;
use crate::weight_initialiser::Initialiser;
use onnx_extractor::OnnxModel;
use zero_pool::global_pool;
use crate::instruction::Instruction;
use crate::tensor_graph::{DependencyGraph, OperationId, TensorGraph, TensorId};
use crate::{
model::{graph_model::GraphModel, layer_connection::LayerId},
tensor::TensorDesc,
};
use super::cpu_compute::CPUCompute;
use super::optimisations::Optimisations;
pub struct ComputeManager {
pub tensors: Vec<TensorCell>,
pub model: GraphModel,
pub tensor_graph: TensorGraph,
gpus: GpuPool,
cpu: CPUCompute,
cached_plan: Option<Arc<ExecutionPlan>>,
cached_dependency_graph: Option<DependencyGraph>,
optimisations: Optimisations,
}
impl ComputeManager {
pub fn new_from_graph(model: GraphModel) -> Result<Self, VKMLError> {
Self::new_from_graph_with(model, None, None, Optimisations::default())
}
pub fn new_from_graph_with(
mut model: GraphModel,
explicit_gpus: Option<Vec<usize>>,
cpu_memory_limit_bytes: Option<u64>,
optimisations: Optimisations,
) -> Result<Self, VKMLError> {
if model.verified.is_none() {
model.verify()?;
}
let cpu = CPUCompute::new(cpu_memory_limit_bytes);
let tensor_graph = TensorGraph::from_graph_model(&model)?;
let mut manager = Self {
tensors: Vec::new(),
model,
tensor_graph,
gpus: GpuPool::new(explicit_gpus)?,
cpu,
cached_plan: None,
cached_dependency_graph: None,
optimisations,
};
let total_memory = manager.tensor_graph.memory_requirements as u64;
let total_available: u64 = manager
.gpus
.gpus()
.iter()
.map(|gpu| gpu.memory_available())
.sum::<u64>()
+ manager.cpu.memory_tracking.get_available();
if total_memory > total_available {
return Err(VKMLError::ComputeManager(format!(
"Model requires {} bytes but only {} available",
total_memory, total_available
)));
}
manager.allocate_tensor_graph(Vec::new())?;
Ok(manager)
}
pub fn new_from_onnx_path(onnx_path: &str) -> Result<Self, VKMLError> {
Self::new_from_onnx_path_with(onnx_path, None, None, 1, Optimisations::default())
}
pub fn new_from_onnx_path_with(
onnx_path: &str,
explicit_gpus: Option<Vec<usize>>,
cpu_memory_limit_bytes: Option<u64>,
batch_size: usize,
optimisations: Optimisations,
) -> Result<Self, VKMLError> {
assert!(batch_size > 0, "batch_size must be greater than 0");
let onnx_model = OnnxModel::load_from_file(onnx_path).map_err(|e| {
VKMLError::OnnxImporter(format!(
"Failed to load ONNX model from '{}': {}",
onnx_path, e
))
})?;
let (tensor_graph, tensor_bytes) = parse_onnx_model(onnx_model, batch_size as i64)?;
Self::new_from_tensor_graph(
tensor_graph,
tensor_bytes,
GpuPool::new(explicit_gpus)?,
cpu_memory_limit_bytes,
optimisations,
)
}
fn new_from_tensor_graph(
tensor_graph: TensorGraph,
initialisers: Vec<Initialiser>,
gpus: GpuPool,
cpu_memory_limit_bytes: Option<u64>,
optimisations: Optimisations,
) -> Result<Self, VKMLError> {
let cpu = CPUCompute::new(cpu_memory_limit_bytes);
let model = GraphModel::new(1);
let mut manager = Self {
gpus,
cpu,
tensors: Vec::new(),
model,
tensor_graph,
cached_plan: None,
cached_dependency_graph: None,
optimisations,
};
let total_memory = manager.tensor_graph.memory_requirements as u64;
let total_available: u64 = manager
.gpus
.gpus()
.iter()
.map(|gpu| gpu.memory_available())
.sum::<u64>()
+ manager.cpu.memory_tracking.get_available();
if total_memory > total_available {
return Err(VKMLError::ComputeManager(format!(
"Model requires {} bytes but only {} available",
total_memory, total_available
)));
}
manager.allocate_tensor_graph(initialisers)?;
Ok(manager)
}
fn allocate_tensor_graph(&mut self, initialisers: Vec<Initialiser>) -> Result<(), VKMLError> {
let dep_graph = self.tensor_graph.dependency_graph();
let flattened_ops = &dep_graph.topological_order;
let mut tensor_locations: Vec<Option<DeviceId>> =
vec![None; self.tensor_graph.tensor_descs.len()];
let mut tensor_remappings: Vec<Vec<(DeviceId, usize)>> =
vec![Vec::new(); self.tensor_graph.tensor_descs.len()];
let mut operation_remappings: Vec<Option<(Vec<TensorId>, Vec<TensorId>)>> =
vec![None; self.tensor_graph.operations.len()];
let mut new_tensors: Vec<(TensorDesc, DeviceId, Option<LayerId>)> = Vec::new();
let mut transfer_operations: Vec<(OperationId, Box<dyn Instruction>)> = Vec::new();
let mut available_memory: Vec<(DeviceId, u64)> = self
.gpus
.gpus()
.iter()
.enumerate()
.map(|(i, g)| (DeviceId::Gpu(i), g.memory_available()))
.collect();
available_memory.push((DeviceId::Cpu, self.cpu.memory_tracking.get_available()));
let tensor_size = |tid: usize| self.tensor_graph.tensor_descs[tid].size_in_bytes() as u64;
for &op_id in flattened_ops {
let instruction = &self.tensor_graph.operations[op_id];
let input_tensors = instruction.get_input_tensor_ids();
let output_tensors = instruction.get_output_tensor_ids();
let dev_idx = available_memory
.iter()
.position(|(cand_device, available)| {
let mut needed = 0u64;
for &tid in input_tensors.iter().chain(output_tensors.iter()) {
match &tensor_locations[tid] {
None => needed = needed.saturating_add(tensor_size(tid)),
Some(loc)
if loc != cand_device
&& !tensor_remappings[tid]
.iter()
.any(|(d, _)| d == cand_device) =>
{
needed = needed.saturating_add(tensor_size(tid));
}
_ => {}
}
}
needed <= *available
})
.ok_or_else(|| {
VKMLError::ComputeManager(format!(
"Operation {:?} cannot fit on any device",
op_id
))
})?;
let current_device = available_memory[dev_idx].0;
let mut remapping_needed = false;
let mut process_tensors = |tensors: &[TensorId], is_input: bool| -> Vec<TensorId> {
let mut result = Vec::with_capacity(tensors.len());
for &tid in tensors {
match &tensor_locations[tid] {
None => {
tensor_locations[tid] = Some(current_device);
available_memory[dev_idx].1 =
available_memory[dev_idx].1.saturating_sub(tensor_size(tid));
result.push(tid);
}
Some(loc) if loc != ¤t_device => {
if let Some(&(_, mapped_id)) = tensor_remappings[tid]
.iter()
.find(|(dev, _)| dev == ¤t_device)
{
result.push(mapped_id);
remapping_needed = true;
} else {
let new_tensor_id =
self.tensor_graph.tensor_descs.len() + new_tensors.len();
let original_desc = &self.tensor_graph.tensor_descs[tid];
let original_layer_id = self.tensor_graph.tensor_to_layer[tid];
let sz = original_desc.size_in_bytes() as u64;
available_memory[dev_idx].1 =
available_memory[dev_idx].1.saturating_sub(sz);
new_tensors.push((
original_desc.clone(),
current_device,
original_layer_id,
));
if is_input {
let src_device = tensor_locations[tid].unwrap();
let transfer_instr = instruction::transfer(
tid,
new_tensor_id,
src_device,
current_device,
);
transfer_operations.push((op_id, transfer_instr));
}
tensor_remappings[tid].push((current_device, new_tensor_id));
result.push(new_tensor_id);
remapping_needed = true;
}
}
_ => result.push(tid), }
}
result
};
let new_inputs = process_tensors(&input_tensors, true);
let new_outputs = process_tensors(&output_tensors, false);
if remapping_needed {
operation_remappings[op_id] = Some((new_inputs, new_outputs));
}
}
for (tensor_desc, device_location, layer_id) in new_tensors {
self.tensor_graph.tensor_descs.push(tensor_desc);
self.tensor_graph.tensor_to_layer.push(layer_id);
tensor_locations.push(Some(device_location));
}
for out_id in self.tensor_graph.output_tensor_ids.iter_mut() {
if *out_id < tensor_remappings.len() {
let remaps = &tensor_remappings[*out_id];
if let Some((_, new_id)) = remaps.last() {
*out_id = *new_id;
}
}
}
let original_ops = std::mem::take(&mut self.tensor_graph.operations);
let original_op_layers = std::mem::take(&mut self.tensor_graph.operation_to_layer);
let mut transfers_for_op: Vec<Vec<(Box<dyn Instruction>, Option<LayerId>)>> =
(0..original_ops.len()).map(|_| Vec::new()).collect();
transfer_operations.sort_by_key(|(op_idx, _)| *op_idx);
for (op_idx, transfer_instr) in transfer_operations.drain(..) {
let layer_id = original_op_layers[op_idx];
transfers_for_op[op_idx].push((transfer_instr, Some(layer_id)));
}
let mut new_ops = Vec::with_capacity(
original_ops.len() + transfers_for_op.iter().map(|v| v.len()).sum::<usize>(),
);
let mut new_op_layers = Vec::with_capacity(new_ops.capacity());
for (i, mut orig_op) in original_ops.into_iter().enumerate() {
for (transfer_instr, layer_id) in transfers_for_op[i].drain(..) {
new_op_layers.push(layer_id);
new_ops.push(transfer_instr);
}
if let Some((new_inputs, new_outputs)) =
operation_remappings.get(i).and_then(|o| o.clone())
{
orig_op.remap_tensor_ids(&new_inputs, &new_outputs);
}
new_op_layers.push(Some(original_op_layers[i]));
new_ops.push(orig_op);
}
self.tensor_graph.operations = new_ops;
self.tensor_graph.operation_to_layer = new_op_layers
.into_iter()
.map(|opt| opt.expect("operation layer missing"))
.collect();
let mut host_visible_plan = vec![false; self.tensor_graph.tensor_descs.len()];
let gpus = self.gpus.gpus();
let gpu_count = gpus.len();
if gpu_count > 0 {
let mut tensors_by_gpu = vec![Vec::<usize>::new(); gpu_count];
let mut total_gpu_bytes = vec![0u64; gpu_count];
for (tensor_id, location) in tensor_locations.iter().enumerate() {
if let Some(DeviceId::Gpu(idx)) = location {
let bytes = self.tensor_graph.tensor_descs[tensor_id].size_in_bytes() as u64;
tensors_by_gpu[*idx].push(tensor_id);
total_gpu_bytes[*idx] = total_gpu_bytes[*idx].saturating_add(bytes);
}
}
let mut reserved_host_visible = vec![0u64; gpu_count];
for (idx, gpu) in gpus.iter().enumerate() {
let total_bytes = total_gpu_bytes[idx];
if total_bytes > 0 && gpu.host_visible_device_local_bytes() >= total_bytes {
for &tensor_id in &tensors_by_gpu[idx] {
host_visible_plan[tensor_id] = true;
}
reserved_host_visible[idx] = total_bytes;
gpu.set_host_access_mode(HostAccessMode::DirectAllHostVisible);
} else {
gpu.set_host_access_mode(HostAccessMode::DeviceLocalWithStaging);
}
}
for (idx, gpu) in gpus.iter().enumerate() {
gpu.set_host_visible_reserved(reserved_host_visible[idx]);
}
}
self.allocate_tensors(tensor_locations, initialisers, &host_visible_plan);
let new_dep_graph = self.tensor_graph.dependency_graph();
self.cached_dependency_graph = Some(new_dep_graph);
Ok(())
}
fn allocate_tensors(
&mut self,
tensor_locations: Vec<Option<DeviceId>>,
mut initialisers: Vec<Initialiser>,
host_visible_plan: &[bool],
) {
let count = self.tensor_graph.tensor_descs.len();
self.tensors.reserve(count);
let out_ptr: *mut TensorCell = self.tensors.as_mut_ptr();
let manager_ptr = NonNull::from(&*self);
let tasks: Box<[SingleAllocParams]> = (0..count)
.map(|i| SingleAllocParams {
index: i,
initialisers_ptr: initialisers.as_mut_ptr(),
initialisers_len: initialisers.len(),
manager_ptr,
out_ptrs: out_ptr,
tensor_locations_ptr: tensor_locations.as_ptr(),
host_visible_plan_ptr: host_visible_plan.as_ptr(),
})
.collect();
global_pool()
.submit_batch(single_allocate_task, &tasks)
.wait();
unsafe { self.tensors.set_len(count) };
}
pub fn allocate_tensor(
&self,
desc: &TensorDesc,
target_device: &DeviceId,
initialiser: Initialiser,
host_visible: bool,
) -> Result<Tensor, VKMLError> {
let expected_size = desc.size_in_bytes();
match target_device {
DeviceId::Cpu => {
self.cpu.memory_tracking.allocate(expected_size as u64);
let buffer = match initialiser {
Initialiser::None => vec![0u8; expected_size].into(),
init => init.into_cpu_buffer(),
};
if buffer.len() != expected_size {
return Err(VKMLError::ComputeManager(format!(
"Initialiser size mismatch: expected {} got {}",
expected_size,
buffer.len()
)));
}
Ok(Tensor::new_cpu(desc.clone(), buffer))
}
DeviceId::Gpu(idx) => {
let gpu = &self.gpus.get_gpu(*idx);
match initialiser {
Initialiser::None => {
let gpu_mem =
gpu.allocate_uninitialised_gpu_memory(expected_size, host_visible)?;
Ok(Tensor::new_gpu(desc.clone(), *idx, gpu_mem))
}
_ => {
let slice = initialiser.as_slice();
if slice.len() != expected_size {
return Err(VKMLError::ComputeManager(format!(
"Initialiser size mismatch: expected {} got {}",
expected_size,
slice.len()
)));
}
let gpu_mem = if host_visible {
gpu.move_to_gpu_host_visible(slice)?
} else {
gpu.move_to_gpu_host_not_visible(slice)?
};
Ok(Tensor::new_gpu(desc.clone(), *idx, gpu_mem))
}
}
}
}
}
pub fn forward(&mut self, batches: Vec<Tensor>) -> Result<Vec<TensorId>, VKMLError> {
let input_tensor_ids = self.tensor_graph.get_input_tensor_ids();
if batches.len() != input_tensor_ids.len() {
return Err(VKMLError::ComputeManager(format!(
"Expected {} input batches, got {}",
input_tensor_ids.len(),
batches.len()
)));
}
for (batch_idx, batch) in batches.iter().enumerate() {
let expected_bytes = self
.tensor_read(input_tensor_ids[batch_idx])
.desc()
.size_in_bytes();
if batch.len_bytes() != expected_bytes {
return Err(VKMLError::ComputeManager(format!(
"Input batch {} size mismatch: got {} bytes, expected {} bytes",
batch_idx,
batch.len_bytes(),
expected_bytes
)));
}
}
if batches.len() == 1 {
let dest = self.tensor_write(input_tensor_ids[0]);
let bytes = batches[0].read();
dest.write(bytes.as_ref());
} else {
let load_params: Box<_> = batches
.into_iter()
.enumerate()
.map(|(batch_idx, batch)| BatchLoadParams {
tensor_id: input_tensor_ids[batch_idx],
batch,
compute_manager: self,
})
.collect();
global_pool()
.submit_batch(batch_load_task, &load_params)
.wait();
}
self.execute()?;
Ok(self.tensor_graph.get_output_tensor_ids().to_vec())
}
pub fn execute(&mut self) -> Result<(), VKMLError> {
match &self.cached_plan {
Some(existing) => {
let arc_plan = Arc::clone(existing);
execute_plan(self, arc_plan)
}
None => {
let plan = create_execution_plan(self)?;
let arc_plan = Arc::new(plan);
self.cached_plan = Some(Arc::clone(&arc_plan));
execute_plan(self, arc_plan)
}
}
}
pub(crate) fn gpu_count(&self) -> usize {
self.gpus.gpus().len()
}
pub(crate) fn gpu_ref(&self, idx: usize) -> Arc<Gpu> {
self.gpus.get_gpu(idx)
}
pub(crate) fn dependency_graph(&self) -> &DependencyGraph {
self.cached_dependency_graph
.as_ref()
.expect("Dependency graph missing")
}
pub fn format_memory_mb(&self, bytes: u64) -> String {
format!("{:.2} MiB", bytes as f64 / (1024.0 * 1024.0))
}
pub fn get_memory_usage_summary(&self) -> Vec<(String, String, String)> {
let mut result = Vec::new();
result.push((
"CPU".to_string(),
self.format_memory_mb(self.cpu.memory_tracking.get_current()),
self.format_memory_mb(self.cpu.memory_tracking.get_available()),
));
for (i, gpu) in self.gpus.gpus().iter().enumerate() {
result.push((
format!("GPU {}", i),
self.format_memory_mb(gpu.memory_total() - gpu.memory_available()),
self.format_memory_mb(gpu.memory_available()),
));
}
result
}
pub fn print_model_stats(&self) {
print_model_stats::print_model_stats(self);
}
pub fn print_layer_values(&self, layer_id: LayerId) -> Result<(), VKMLError> {
print_model_stats::print_layer_values(self, layer_id)
}
pub fn print_tensor_flow(&self) {
print_tensorgraph_stats::print_tensor_flow(self);
}
pub fn print_gpu_pool(&self) {
println!("{:?}", self.gpus)
}
pub fn chosen_optimisations(&self) -> &Optimisations {
&self.optimisations
}
pub fn tensor_read(&self, tensor_id: usize) -> &Tensor {
unsafe { self.tensors[tensor_id].as_ref() }
}
pub fn tensor_read_vec(&self, tensor_ids: &[TensorId]) -> Vec<Tensor> {
let output_count = tensor_ids.len();
let mut output_batches: Vec<Tensor> = Vec::with_capacity(output_count);
let out_ptr: *mut Tensor = output_batches.as_mut_ptr();
let copy_params: Box<_> = tensor_ids
.iter()
.enumerate()
.map(|(idx, &tensor_id)| BatchCopyParams {
tensor_id,
output_index: idx,
compute_manager: self,
out_ptr,
})
.collect();
global_pool()
.submit_batch(batch_copy_task, ©_params)
.wait();
unsafe { output_batches.set_len(output_count) };
output_batches
}
#[allow(clippy::mut_from_ref)]
pub fn tensor_write(&self, tensor_id: usize) -> &mut Tensor {
unsafe { self.tensors[tensor_id].as_mut() }
}
}
struct SingleAllocParams {
index: usize,
initialisers_ptr: *mut Initialiser,
initialisers_len: usize,
manager_ptr: NonNull<ComputeManager>,
out_ptrs: *mut TensorCell,
tensor_locations_ptr: *const Option<DeviceId>,
host_visible_plan_ptr: *const bool,
}
fn single_allocate_task(params: &SingleAllocParams) {
let manager: &ComputeManager = unsafe { params.manager_ptr.as_ref() };
let desc: &TensorDesc = &manager.tensor_graph.tensor_descs[params.index];
let target =
unsafe { (*params.tensor_locations_ptr.add(params.index)).unwrap_or(DeviceId::Cpu) };
let host_visible = unsafe { *params.host_visible_plan_ptr.add(params.index) };
let initialiser = if params.index < params.initialisers_len {
unsafe { mem::take(&mut *params.initialisers_ptr.add(params.index)) }
} else {
Initialiser::None
};
let tensor = manager
.allocate_tensor(desc, &target, initialiser, host_visible)
.unwrap();
unsafe {
let slot = params.out_ptrs.add(params.index);
ptr::write(slot, TensorCell::new(tensor));
}
}
struct BatchLoadParams<'a> {
tensor_id: usize,
batch: Tensor,
compute_manager: &'a ComputeManager,
}
fn batch_load_task(params: &BatchLoadParams) {
let dest = params.compute_manager.tensor_write(params.tensor_id);
let bytes = params.batch.read();
dest.write(bytes.as_ref());
}
struct BatchCopyParams<'a> {
tensor_id: usize,
output_index: usize,
compute_manager: &'a ComputeManager,
out_ptr: *mut Tensor,
}
fn batch_copy_task(params: &BatchCopyParams) {
let tensor = params.compute_manager.tensor_read(params.tensor_id);
let batch = Tensor::new_cpu(tensor.desc().clone(), tensor.read().into());
unsafe {
let slot = params.out_ptr.add(params.output_index);
ptr::write(slot, batch);
}
}