use alloc::collections::BTreeMap;
use alloc::vec;
use alloc::vec::Vec;
use lazy_static::lazy_static;
use spin::Mutex;
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
pub enum DpuOp {
Checksum,
Compress,
Decompress,
Encrypt,
Decrypt,
RaidParity,
RaidReconstruct,
FullPipeline,
}
#[derive(Debug, Clone)]
pub struct DpuCapabilities {
pub device_id: u32,
pub name: &'static str,
pub vendor: &'static str,
pub cores: u32,
pub memory_bytes: u64,
pub network_gbps: u32,
pub supported_ops: Vec<DpuOp>,
}
impl DpuCapabilities {
pub fn new(
device_id: u32,
name: &'static str,
vendor: &'static str,
cores: u32,
memory_gb: u32,
network_gbps: u32,
) -> Self {
Self {
device_id,
name,
vendor,
cores,
memory_bytes: memory_gb as u64 * 1024 * 1024 * 1024,
network_gbps,
supported_ops: vec![
DpuOp::Checksum,
DpuOp::Compress,
DpuOp::Decompress,
DpuOp::Encrypt,
DpuOp::Decrypt,
DpuOp::RaidParity,
DpuOp::RaidReconstruct,
DpuOp::FullPipeline,
],
}
}
pub fn supports_op(&self, op: DpuOp) -> bool {
self.supported_ops.contains(&op)
}
pub fn expected_throughput(&self, op: DpuOp) -> f32 {
let base_throughput = self.cores as f32 * 0.5; match op {
DpuOp::Checksum => base_throughput * 2.0, DpuOp::Compress => base_throughput, DpuOp::Decompress => base_throughput * 1.5, DpuOp::Encrypt => base_throughput * 1.2, DpuOp::Decrypt => base_throughput * 1.2,
DpuOp::RaidParity => base_throughput * 0.8, DpuOp::RaidReconstruct => base_throughput * 0.7,
DpuOp::FullPipeline => base_throughput * 0.5, }
}
}
#[derive(Debug, Clone)]
pub struct DpuCommand {
pub cmd_id: u64,
pub op: DpuOp,
pub input_size: u64,
pub submitted: u64,
}
#[derive(Debug, Clone)]
pub struct DpuResult {
pub cmd_id: u64,
pub op: DpuOp,
pub output_size: u64,
pub dpu_time_us: u64,
pub total_time_us: u64,
pub cpu_cycles_saved: u64,
}
impl DpuResult {
pub fn throughput_gbps(&self, input_size: u64) -> f32 {
if self.total_time_us == 0 {
return 0.0;
}
(input_size as f64 / (self.total_time_us as f64 / 1_000_000.0) / 1e9) as f32
}
pub fn cpu_overhead_reduction(&self) -> f32 {
95.0
}
}
#[derive(Debug, Clone, Default)]
pub struct DpuStats {
pub total_ops: u64,
pub dpu_ops: u64,
pub cpu_fallback: u64,
pub total_bytes: u64,
pub total_dpu_time_us: u64,
pub total_time_us: u64,
pub cpu_cycles_saved: u64,
}
impl DpuStats {
pub fn offload_ratio(&self) -> f32 {
if self.total_ops == 0 {
return 0.0;
}
self.dpu_ops as f32 / self.total_ops as f32
}
pub fn avg_throughput_gbps(&self) -> f32 {
if self.total_time_us == 0 {
return 0.0;
}
(self.total_bytes as f64 / (self.total_time_us as f64 / 1_000_000.0) / 1e9) as f32
}
pub fn avg_cpu_overhead_reduction(&self) -> f32 {
if self.total_ops == 0 {
return 0.0;
}
95.0 * self.offload_ratio()
}
pub fn total_cpu_savings(&self) -> u64 {
self.cpu_cycles_saved
}
}
pub struct DpuManager {
capabilities: Option<DpuCapabilities>,
pending: BTreeMap<u64, DpuCommand>,
next_cmd_id: u64,
stats: DpuStats,
}
impl Default for DpuManager {
fn default() -> Self {
Self::new()
}
}
impl DpuManager {
pub fn new() -> Self {
Self {
capabilities: None,
pending: BTreeMap::new(),
next_cmd_id: 1,
stats: DpuStats::default(),
}
}
pub fn register_device(&mut self, caps: DpuCapabilities) {
self.capabilities = Some(caps);
}
pub fn is_available(&self) -> bool {
self.capabilities.is_some()
}
pub fn submit(&mut self, op: DpuOp, input_size: u64, timestamp: u64) -> Option<u64> {
if let Some(caps) = &self.capabilities {
if !caps.supports_op(op) {
self.stats.cpu_fallback += 1;
self.stats.total_ops += 1;
return None;
}
let cmd_id = self.next_cmd_id;
self.next_cmd_id += 1;
let cmd = DpuCommand {
cmd_id,
op,
input_size,
submitted: timestamp,
};
self.pending.insert(cmd_id, cmd);
self.stats.dpu_ops += 1;
self.stats.total_ops += 1;
Some(cmd_id)
} else {
self.stats.cpu_fallback += 1;
self.stats.total_ops += 1;
None
}
}
pub fn complete(
&mut self,
cmd_id: u64,
output_size: u64,
current_time: u64,
) -> Option<DpuResult> {
if let Some(cmd) = self.pending.remove(&cmd_id) {
debug_assert!(
self.capabilities.is_some(),
"cmd in pending implies capabilities was set during submit()"
);
let caps = self.capabilities.as_ref()?;
let expected_throughput = caps.expected_throughput(cmd.op); let dpu_time_us =
((cmd.input_size as f64 / 1e9) / expected_throughput as f64 * 1_000_000.0) as u64;
let transfer_overhead_us = 10; let total_time_us = dpu_time_us + transfer_overhead_us;
let cpu_freq_ghz = 3.0;
let cpu_cycles_for_op = (cmd.input_size as f64 * 10.0) as u64; let cpu_cycles_saved = (cpu_cycles_for_op as f64 * 0.95) as u64;
let result = DpuResult {
cmd_id,
op: cmd.op,
output_size,
dpu_time_us,
total_time_us,
cpu_cycles_saved,
};
self.stats.total_bytes += cmd.input_size;
self.stats.total_dpu_time_us += dpu_time_us;
self.stats.total_time_us += total_time_us;
self.stats.cpu_cycles_saved += cpu_cycles_saved;
Some(result)
} else {
None
}
}
pub fn stats(&self) -> DpuStats {
self.stats.clone()
}
pub fn capabilities(&self) -> Option<&DpuCapabilities> {
self.capabilities.as_ref()
}
}
lazy_static! {
static ref DPU_ENGINE: Mutex<DpuManager> = Mutex::new(DpuManager::new());
}
pub struct DpuEngine;
impl DpuEngine {
pub fn register_device(caps: DpuCapabilities) {
let mut engine = DPU_ENGINE.lock();
engine.register_device(caps);
}
pub fn is_available() -> bool {
let engine = DPU_ENGINE.lock();
engine.is_available()
}
pub fn submit(op: DpuOp, input_size: u64, timestamp: u64) -> Option<u64> {
let mut engine = DPU_ENGINE.lock();
engine.submit(op, input_size, timestamp)
}
pub fn complete(cmd_id: u64, output_size: u64, current_time: u64) -> Option<DpuResult> {
let mut engine = DPU_ENGINE.lock();
engine.complete(cmd_id, output_size, current_time)
}
pub fn stats() -> DpuStats {
let engine = DPU_ENGINE.lock();
engine.stats()
}
pub fn capabilities() -> Option<DpuCapabilities> {
let engine = DPU_ENGINE.lock();
engine.capabilities().cloned()
}
}
pub fn create_bluefield3() -> DpuCapabilities {
DpuCapabilities::new(
0,
"NVIDIA BlueField-3",
"NVIDIA",
16, 32, 400, )
}
pub fn create_intel_ipu() -> DpuCapabilities {
DpuCapabilities::new(
1,
"Intel Mount Evans IPU",
"Intel",
8, 16, 200, )
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_dpu_capabilities() {
let caps = create_bluefield3();
assert_eq!(caps.name, "NVIDIA BlueField-3");
assert_eq!(caps.vendor, "NVIDIA");
assert_eq!(caps.cores, 16);
assert!(caps.supports_op(DpuOp::Checksum));
assert!(caps.supports_op(DpuOp::FullPipeline));
}
#[test]
fn test_device_registration() {
let mut mgr = DpuManager::new();
assert!(!mgr.is_available());
mgr.register_device(create_bluefield3());
assert!(mgr.is_available());
}
#[test]
fn test_submit_command() {
let mut mgr = DpuManager::new();
mgr.register_device(create_bluefield3());
let cmd_id = mgr.submit(DpuOp::Compress, 1_000_000, 0);
assert!(cmd_id.is_some());
let stats = mgr.stats();
assert_eq!(stats.dpu_ops, 1);
assert_eq!(stats.total_ops, 1);
}
#[test]
fn test_cpu_fallback() {
let mut mgr = DpuManager::new();
let cmd_id = mgr.submit(DpuOp::Compress, 1_000_000, 0);
assert!(cmd_id.is_none());
let stats = mgr.stats();
assert_eq!(stats.cpu_fallback, 1);
assert_eq!(stats.dpu_ops, 0);
}
#[test]
fn test_complete_command() {
let mut mgr = DpuManager::new();
mgr.register_device(create_bluefield3());
let cmd_id = mgr
.submit(DpuOp::Compress, 10_000_000, 0)
.expect("test: operation should succeed");
let result = mgr
.complete(cmd_id, 5_000_000, 100)
.expect("test: operation should succeed");
assert_eq!(result.cmd_id, cmd_id);
assert_eq!(result.op, DpuOp::Compress);
assert_eq!(result.output_size, 5_000_000);
assert!(result.cpu_cycles_saved > 0);
}
#[test]
fn test_throughput_calculation() {
let caps = create_bluefield3();
let throughput = caps.expected_throughput(DpuOp::Checksum);
assert!((throughput - 16.0).abs() < 1.0);
}
#[test]
fn test_cpu_overhead_reduction() {
let result = DpuResult {
cmd_id: 1,
op: DpuOp::FullPipeline,
output_size: 5_000_000,
dpu_time_us: 1000,
total_time_us: 1010,
cpu_cycles_saved: 95_000_000,
};
assert_eq!(result.cpu_overhead_reduction(), 95.0);
}
#[test]
fn test_statistics() {
let mut mgr = DpuManager::new();
mgr.register_device(create_bluefield3());
for i in 0..10 {
let cmd_id = mgr
.submit(DpuOp::FullPipeline, 1_000_000, i)
.expect("test: operation should succeed");
mgr.complete(cmd_id, 500_000, i + 100);
}
let stats = mgr.stats();
assert_eq!(stats.total_ops, 10);
assert_eq!(stats.dpu_ops, 10);
assert_eq!(stats.total_bytes, 10_000_000);
assert!(stats.cpu_cycles_saved > 0);
}
#[test]
fn test_offload_ratio() {
let mut mgr = DpuManager::new();
for _ in 0..5 {
mgr.submit(DpuOp::Compress, 1_000_000, 0);
}
mgr.register_device(create_bluefield3());
for _ in 0..15 {
mgr.submit(DpuOp::Compress, 1_000_000, 0);
}
let stats = mgr.stats();
assert_eq!(stats.total_ops, 20);
assert_eq!(stats.dpu_ops, 15);
assert_eq!(stats.cpu_fallback, 5);
assert_eq!(stats.offload_ratio(), 0.75);
}
#[test]
fn test_multiple_op_types() {
let mut mgr = DpuManager::new();
mgr.register_device(create_bluefield3());
let ops = vec![
DpuOp::Checksum,
DpuOp::Compress,
DpuOp::Encrypt,
DpuOp::RaidParity,
DpuOp::FullPipeline,
];
for op in ops {
let cmd_id = mgr
.submit(op, 1_000_000, 0)
.expect("test: operation should succeed");
let result = mgr
.complete(cmd_id, 900_000, 100)
.expect("test: operation should succeed");
assert_eq!(result.op, op);
}
let stats = mgr.stats();
assert_eq!(stats.total_ops, 5);
}
#[test]
fn test_intel_ipu() {
let caps = create_intel_ipu();
assert_eq!(caps.vendor, "Intel");
assert_eq!(caps.cores, 8);
assert!(caps.supports_op(DpuOp::Checksum));
}
}