use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use std::time::Instant;
use anyhow::{Result, anyhow};
use rayon::prelude::*;
use ronn_core::tensor::Tensor;
use ronn_core::{CompiledKernel, DataType, KernelStats, MemoryUsage, SubGraph, TensorLayout};
use tracing::{debug, warn};
use crate::cpu::simd::{SimdCapabilities, simd_add_f32, simd_matmul_f32, simd_mul_f32};
#[derive(Debug)]
pub struct CpuKernel {
subgraph: SubGraph,
execution_plan: ExecutionPlan,
simd_capabilities: SimdCapabilities,
stats: Arc<Mutex<KernelStatsInternal>>,
memory_stats: Arc<Mutex<MemoryStatsInternal>>,
}
#[derive(Debug, Clone)]
struct ExecutionPlan {
operations: Vec<FusedOperation>,
memory_plan: MemoryPlan,
}
#[derive(Debug, Clone)]
struct FusedOperation {
op_type: FusedOpType,
inputs: Vec<usize>,
outputs: Vec<usize>,
params: HashMap<String, f64>,
}
#[derive(Debug, Clone)]
enum FusedOpType {
Add,
Multiply,
MatMul,
AddRelu,
MatMulAdd,
ConvBnRelu,
Copy,
Reshape,
}
#[derive(Debug, Clone)]
struct MemoryPlan {
tensor_count: usize,
tensor_sizes: Vec<usize>,
tensor_dtypes: Vec<DataType>,
tensor_layouts: Vec<TensorLayout>,
}
#[derive(Debug, Default)]
struct KernelStatsInternal {
execution_count: u64,
total_time_us: u64,
min_time_us: u64,
max_time_us: u64,
}
#[derive(Debug, Default)]
struct MemoryStatsInternal {
peak_bytes: usize,
current_bytes: usize,
allocation_count: usize,
}
impl CpuKernel {
pub fn compile(subgraph: SubGraph, simd_capabilities: SimdCapabilities) -> Result<Self> {
let execution_plan = Self::analyze_and_plan(&subgraph)?;
Ok(Self {
subgraph,
execution_plan,
simd_capabilities,
stats: Arc::new(Mutex::new(KernelStatsInternal::default())),
memory_stats: Arc::new(Mutex::new(MemoryStatsInternal::default())),
})
}
fn analyze_and_plan(subgraph: &SubGraph) -> Result<ExecutionPlan> {
let mut operations = Vec::new();
let mut tensor_count = 0;
let mut tensor_sizes = Vec::new();
let mut tensor_dtypes = Vec::new();
let mut tensor_layouts = Vec::new();
for (i, node) in subgraph.nodes.iter().enumerate() {
let fused_op = match node.op_type.as_str() {
"Add" => FusedOperation {
op_type: FusedOpType::Add,
inputs: vec![i * 2, i * 2 + 1], outputs: vec![tensor_count],
params: HashMap::new(),
},
"Mul" => FusedOperation {
op_type: FusedOpType::Multiply,
inputs: vec![i * 2, i * 2 + 1],
outputs: vec![tensor_count],
params: HashMap::new(),
},
"MatMul" => FusedOperation {
op_type: FusedOpType::MatMul,
inputs: vec![i * 2, i * 2 + 1],
outputs: vec![tensor_count],
params: HashMap::new(),
},
"Reshape" => FusedOperation {
op_type: FusedOpType::Reshape,
inputs: vec![i],
outputs: vec![tensor_count],
params: HashMap::new(),
},
_ => {
warn!("Unsupported operation type: {}", node.op_type);
continue;
}
};
operations.push(fused_op);
tensor_sizes.push(1024); tensor_dtypes.push(DataType::F32); tensor_layouts.push(TensorLayout::RowMajor);
tensor_count += 1;
}
let memory_plan = MemoryPlan {
tensor_count,
tensor_sizes,
tensor_dtypes,
tensor_layouts,
};
Ok(ExecutionPlan {
operations,
memory_plan,
})
}
fn execute_operation(
&self,
operation: &FusedOperation,
tensors: &mut [ronn_core::tensor::Tensor],
) -> Result<()> {
match operation.op_type {
FusedOpType::Add => {
if operation.inputs.len() != 2 || operation.outputs.len() != 1 {
return Err(anyhow!("Add operation requires 2 inputs and 1 output"));
}
let a = &tensors[operation.inputs[0]];
let b = &tensors[operation.inputs[1]];
let output_idx = operation.outputs[0];
let a_data = a.to_vec()?;
let b_data = b.to_vec()?;
if a_data.len() != b_data.len() {
return Err(anyhow!("Input tensors must have the same size for Add"));
}
let mut result_data = vec![0.0; a_data.len()];
simd_add_f32(&a_data, &b_data, &mut result_data, &self.simd_capabilities);
let result_tensor =
Tensor::from_data(result_data, a.shape(), a.dtype(), a.layout())?;
tensors[output_idx] = result_tensor;
}
FusedOpType::Multiply => {
if operation.inputs.len() != 2 || operation.outputs.len() != 1 {
return Err(anyhow!("Multiply operation requires 2 inputs and 1 output"));
}
let a = &tensors[operation.inputs[0]];
let b = &tensors[operation.inputs[1]];
let output_idx = operation.outputs[0];
let a_data = a.to_vec()?;
let b_data = b.to_vec()?;
if a_data.len() != b_data.len() {
return Err(anyhow!(
"Input tensors must have the same size for Multiply"
));
}
let mut result_data = vec![0.0; a_data.len()];
simd_mul_f32(&a_data, &b_data, &mut result_data, &self.simd_capabilities);
let result_tensor =
Tensor::from_data(result_data, a.shape(), a.dtype(), a.layout())?;
tensors[output_idx] = result_tensor;
}
FusedOpType::MatMul => {
if operation.inputs.len() != 2 || operation.outputs.len() != 1 {
return Err(anyhow!("MatMul operation requires 2 inputs and 1 output"));
}
let a = &tensors[operation.inputs[0]];
let b = &tensors[operation.inputs[1]];
let output_idx = operation.outputs[0];
let a_shape = a.shape();
let b_shape = b.shape();
if a_shape.len() != 2 || b_shape.len() != 2 {
return Err(anyhow!("MatMul currently only supports 2D tensors"));
}
if a_shape[1] != b_shape[0] {
return Err(anyhow!(
"Matrix dimensions incompatible: {}x{} and {}x{}",
a_shape[0],
a_shape[1],
b_shape[0],
b_shape[1]
));
}
let a_data = a.to_vec()?;
let b_data = b.to_vec()?;
let mut result_data = vec![0.0; a_shape[0] * b_shape[1]];
simd_matmul_f32(
&a_data,
a_shape[0],
a_shape[1],
&b_data,
b_shape[0],
b_shape[1],
&mut result_data,
&self.simd_capabilities,
);
let result_tensor = Tensor::from_data(
result_data,
vec![a_shape[0], b_shape[1]],
a.dtype(),
a.layout(),
)?;
tensors[output_idx] = result_tensor;
}
FusedOpType::AddRelu => {
if operation.inputs.len() != 2 || operation.outputs.len() != 1 {
return Err(anyhow!("AddRelu operation requires 2 inputs and 1 output"));
}
let a = &tensors[operation.inputs[0]];
let b = &tensors[operation.inputs[1]];
let output_idx = operation.outputs[0];
let a_data = a.to_vec()?;
let b_data = b.to_vec()?;
if a_data.len() != b_data.len() {
return Err(anyhow!("Input tensors must have the same size for AddRelu"));
}
let mut result_data = vec![0.0; a_data.len()];
simd_add_f32(&a_data, &b_data, &mut result_data, &self.simd_capabilities);
for value in &mut result_data {
*value = value.max(0.0);
}
let result_tensor =
Tensor::from_data(result_data, a.shape(), a.dtype(), a.layout())?;
tensors[output_idx] = result_tensor;
}
FusedOpType::Copy => {
if operation.inputs.len() != 1 || operation.outputs.len() != 1 {
return Err(anyhow!("Copy operation requires 1 input and 1 output"));
}
let input_tensor = &tensors[operation.inputs[0]];
let output_idx = operation.outputs[0];
tensors[output_idx] = input_tensor.clone();
}
FusedOpType::Reshape => {
if operation.inputs.len() != 1 || operation.outputs.len() != 1 {
return Err(anyhow!("Reshape operation requires 1 input and 1 output"));
}
let input_tensor = &tensors[operation.inputs[0]];
let output_idx = operation.outputs[0];
tensors[output_idx] = input_tensor.clone();
}
_ => {
return Err(anyhow!(
"Unsupported fused operation: {:?}",
operation.op_type
));
}
}
Ok(())
}
fn execute_parallel(
&self,
inputs: &[ronn_core::tensor::Tensor],
) -> Result<Vec<ronn_core::tensor::Tensor>> {
let plan = &self.execution_plan;
let mut tensors = Vec::with_capacity(plan.memory_plan.tensor_count + inputs.len());
tensors.extend_from_slice(inputs);
for i in 0..plan.memory_plan.tensor_count {
let size = plan.memory_plan.tensor_sizes[i];
let dtype = plan.memory_plan.tensor_dtypes[i];
let layout = plan.memory_plan.tensor_layouts[i];
let elements = size / 4; let shape = vec![elements];
let tensor = Tensor::zeros(shape, dtype, layout)?;
tensors.push(tensor);
}
for operation in &plan.operations {
self.execute_operation(operation, &mut tensors)?;
}
let output_start = tensors.len() - plan.operations.len();
Ok(tensors[output_start..].to_vec())
}
}
impl CompiledKernel for CpuKernel {
fn execute(
&self,
inputs: &[ronn_core::tensor::Tensor],
) -> Result<Vec<ronn_core::tensor::Tensor>> {
let start_time = Instant::now();
{
let mut memory_stats = self.memory_stats.lock().unwrap();
memory_stats.allocation_count += 1;
}
let result = if inputs.len() > 1 {
self.execute_parallel(inputs)
} else if !inputs.is_empty() {
self.execute_parallel(inputs)
} else {
Ok(vec![])
};
let execution_time = start_time.elapsed();
{
let mut stats = self.stats.lock().unwrap();
stats.execution_count += 1;
stats.total_time_us += execution_time.as_micros() as u64;
if stats.execution_count == 1 {
stats.min_time_us = execution_time.as_micros() as u64;
stats.max_time_us = execution_time.as_micros() as u64;
} else {
stats.min_time_us = stats.min_time_us.min(execution_time.as_micros() as u64);
stats.max_time_us = stats.max_time_us.max(execution_time.as_micros() as u64);
}
}
debug!("CPU kernel executed in {:?}", execution_time);
result
}
fn get_memory_usage(&self) -> MemoryUsage {
let memory_stats = self.memory_stats.lock().unwrap();
MemoryUsage {
peak_bytes: memory_stats.peak_bytes,
current_bytes: memory_stats.current_bytes,
allocation_count: memory_stats.allocation_count,
}
}
fn get_performance_stats(&self) -> KernelStats {
let stats = self.stats.lock().unwrap();
let average_time_us = if stats.execution_count > 0 {
stats.total_time_us as f64 / stats.execution_count as f64
} else {
0.0
};
KernelStats {
execution_count: stats.execution_count,
average_time_us,
min_time_us: stats.min_time_us as f64,
max_time_us: stats.max_time_us as f64,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use ronn_core::{AttributeValue, GraphEdge, GraphNode};
use std::collections::HashMap;
fn create_test_subgraph() -> SubGraph {
let node1 = GraphNode {
id: 0,
op_type: "Add".to_string(),
attributes: HashMap::new(),
inputs: vec!["input1".to_string(), "input2".to_string()],
outputs: vec!["output1".to_string()],
name: Some("add_node".to_string()),
};
SubGraph {
nodes: vec![node1],
edges: vec![],
inputs: vec!["input1".to_string(), "input2".to_string()],
outputs: vec!["output1".to_string()],
}
}
#[test]
fn test_kernel_compilation() -> Result<()> {
let subgraph = create_test_subgraph();
let simd_caps = crate::cpu::simd::detect_simd_capabilities();
let kernel = CpuKernel::compile(subgraph, simd_caps)?;
assert!(kernel.execution_plan.operations.len() > 0);
Ok(())
}
#[test]
fn test_kernel_execution() -> Result<()> {
let subgraph = create_test_subgraph();
let simd_caps = crate::cpu::simd::detect_simd_capabilities();
let kernel = CpuKernel::compile(subgraph, simd_caps)?;
let input1 = Tensor::from_data(
vec![1.0, 2.0, 3.0, 4.0],
vec![4],
DataType::F32,
TensorLayout::RowMajor,
)?;
let input2 = Tensor::from_data(
vec![5.0, 6.0, 7.0, 8.0],
vec![4],
DataType::F32,
TensorLayout::RowMajor,
)?;
let inputs = vec![input1, input2];
let outputs = kernel.execute(&inputs)?;
assert!(outputs.len() > 0);
let stats = kernel.get_performance_stats();
assert_eq!(stats.execution_count, 1);
assert!(stats.average_time_us > 0.0);
Ok(())
}
#[test]
fn test_multiple_executions() -> Result<()> {
let subgraph = create_test_subgraph();
let simd_caps = crate::cpu::simd::detect_simd_capabilities();
let kernel = CpuKernel::compile(subgraph, simd_caps)?;
let input1 = Tensor::ones(vec![100], DataType::F32, TensorLayout::RowMajor)?;
let input2 = Tensor::ones(vec![100], DataType::F32, TensorLayout::RowMajor)?;
let inputs = vec![input1, input2];
for _ in 0..5 {
let _outputs = kernel.execute(&inputs)?;
}
let stats = kernel.get_performance_stats();
assert_eq!(stats.execution_count, 5);
assert!(stats.min_time_us <= stats.average_time_us);
assert!(stats.average_time_us <= stats.max_time_us);
Ok(())
}
#[test]
fn test_memory_statistics() -> Result<()> {
let subgraph = create_test_subgraph();
let simd_caps = crate::cpu::simd::detect_simd_capabilities();
let kernel = CpuKernel::compile(subgraph, simd_caps)?;
let input1 = Tensor::zeros(vec![1000], DataType::F32, TensorLayout::RowMajor)?;
let input2 = Tensor::zeros(vec![1000], DataType::F32, TensorLayout::RowMajor)?;
let inputs = vec![input1, input2];
let _outputs = kernel.execute(&inputs)?;
let memory_usage = kernel.get_memory_usage();
assert!(memory_usage.allocation_count > 0);
Ok(())
}
}