use alloc::collections::BTreeMap;
use alloc::vec;
use alloc::vec::Vec;
use lazy_static::lazy_static;
use spin::Mutex;
#[cfg(feature = "qat")]
use crate::hw::qat_ffi::*;
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
pub enum QatService {
SymmetricCrypto,
AsymmetricCrypto,
Compression,
Decompression,
ChainedOps,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum QatCompressAlgo {
Deflate,
Lz4,
Zstd,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum QatCipherAlgo {
AesGcm128,
AesGcm256,
AesXts256,
ChaCha20Poly1305,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum QatError {
NotInitialized,
HardwareNotAvailable,
InitFailed,
NoInstances,
StartFailed,
SessionFailed,
MemoryError,
OperationFailed,
ResourceBusy,
InvalidParam,
BufferTooSmall,
CompressionFailed,
DecompressionFailed,
CryptoFailed,
}
#[derive(Debug, Clone)]
pub struct QatCapabilities {
pub device_id: u32,
pub name: &'static str,
pub generation: &'static str,
pub accel_units: u32,
pub max_throughput_gbps: u32,
pub sym_crypto_ops_per_sec: u64,
pub asym_crypto_ops_per_sec: u64,
pub compression_gbps: u32,
pub supports_deflate: bool,
pub supports_lz4: bool,
pub supports_zstd: bool,
pub supports_aes_gcm: bool,
pub supports_aes_xts: bool,
pub supports_chacha: bool,
}
impl QatCapabilities {
pub fn new(
device_id: u32,
name: &'static str,
generation: &'static str,
accel_units: u32,
max_throughput_gbps: u32,
) -> Self {
let is_2_0 = generation == "2.0";
Self {
device_id,
name,
generation,
accel_units,
max_throughput_gbps,
sym_crypto_ops_per_sec: 1_000_000 * accel_units as u64,
asym_crypto_ops_per_sec: 100_000 * accel_units as u64,
compression_gbps: max_throughput_gbps / 2,
supports_deflate: true,
supports_lz4: true,
supports_zstd: is_2_0,
supports_aes_gcm: true,
supports_aes_xts: true,
supports_chacha: is_2_0,
}
}
pub fn expected_latency_us(&self, service: QatService) -> u64 {
match service {
QatService::SymmetricCrypto => 10,
QatService::AsymmetricCrypto => 100,
QatService::Compression => 20,
QatService::Decompression => 15,
QatService::ChainedOps => 30,
}
}
}
#[derive(Debug, Clone)]
pub struct QatCommand {
pub cmd_id: u64,
pub service: QatService,
pub input_size: u64,
pub submitted: u64,
}
#[derive(Debug, Clone)]
pub struct QatResult {
pub cmd_id: u64,
pub service: QatService,
pub output_size: u64,
pub qat_time_us: u64,
pub total_time_us: u64,
pub hw_accelerated: bool,
}
impl QatResult {
pub fn throughput_gbps(&self, input_size: u64) -> f32 {
if self.total_time_us == 0 {
return 0.0;
}
(input_size as f64 / (self.total_time_us as f64 / 1_000_000.0) / 1e9) as f32
}
pub fn ops_per_second(&self) -> f32 {
if self.total_time_us == 0 {
return 0.0;
}
1_000_000.0 / self.total_time_us as f32
}
pub fn speedup_vs_cpu(&self, service: QatService) -> f32 {
let cpu_time_us = match service {
QatService::SymmetricCrypto => 100,
QatService::AsymmetricCrypto => 5000,
QatService::Compression => 200,
QatService::Decompression => 150,
QatService::ChainedOps => 300,
};
cpu_time_us as f32 / self.qat_time_us as f32
}
}
#[derive(Debug, Clone, Default)]
pub struct QatStats {
pub total_ops: u64,
pub hw_accel_ops: u64,
pub cpu_fallback_ops: u64,
pub total_bytes: u64,
pub total_qat_time_us: u64,
pub total_time_us: u64,
pub sym_crypto_ops: u64,
pub asym_crypto_ops: u64,
pub compression_ops: u64,
pub decompression_ops: u64,
pub chained_ops: u64,
}
impl QatStats {
pub fn hw_accel_ratio(&self) -> f32 {
if self.total_ops == 0 {
return 0.0;
}
self.hw_accel_ops as f32 / self.total_ops as f32
}
pub fn avg_throughput_gbps(&self) -> f32 {
if self.total_time_us == 0 {
return 0.0;
}
(self.total_bytes as f64 / (self.total_time_us as f64 / 1_000_000.0) / 1e9) as f32
}
pub fn avg_latency_us(&self) -> f64 {
if self.total_ops == 0 {
return 0.0;
}
self.total_time_us as f64 / self.total_ops as f64
}
pub fn ops_per_second(&self) -> f64 {
if self.total_time_us == 0 {
return 0.0;
}
self.total_ops as f64 / (self.total_time_us as f64 / 1_000_000.0)
}
}
#[cfg(feature = "qat")]
#[derive(Clone, Copy)]
struct QatInstanceHandle(*mut core::ffi::c_void);
#[cfg(feature = "qat")]
unsafe impl Send for QatInstanceHandle {}
#[cfg(feature = "qat")]
unsafe impl Sync for QatInstanceHandle {}
#[cfg(feature = "qat")]
impl QatInstanceHandle {
fn as_raw(&self) -> CpaInstanceHandle {
self.0
}
fn from_raw(ptr: CpaInstanceHandle) -> Self {
Self(ptr)
}
}
#[cfg(feature = "qat")]
struct QatHardwareState {
dc_instances: Vec<QatInstanceHandle>,
cy_instances: Vec<QatInstanceHandle>,
dc_index: usize,
cy_index: usize,
initialized: bool,
}
#[cfg(feature = "qat")]
impl QatHardwareState {
fn new() -> Self {
Self {
dc_instances: Vec::new(),
cy_instances: Vec::new(),
dc_index: 0,
cy_index: 0,
initialized: false,
}
}
fn next_dc_instance(&mut self) -> Option<QatInstanceHandle> {
if self.dc_instances.is_empty() {
return None;
}
let instance = self.dc_instances[self.dc_index];
self.dc_index = (self.dc_index + 1) % self.dc_instances.len();
Some(instance)
}
fn next_cy_instance(&mut self) -> Option<QatInstanceHandle> {
if self.cy_instances.is_empty() {
return None;
}
let instance = self.cy_instances[self.cy_index];
self.cy_index = (self.cy_index + 1) % self.cy_instances.len();
Some(instance)
}
}
pub struct QatManager {
capabilities: Option<QatCapabilities>,
pending: BTreeMap<u64, QatCommand>,
next_cmd_id: u64,
stats: QatStats,
hardware_available: bool,
#[cfg(feature = "qat")]
hw_state: QatHardwareState,
}
impl Default for QatManager {
fn default() -> Self {
Self::new()
}
}
impl QatManager {
pub fn new() -> Self {
Self {
capabilities: None,
pending: BTreeMap::new(),
next_cmd_id: 1,
stats: QatStats::default(),
hardware_available: false,
#[cfg(feature = "qat")]
hw_state: QatHardwareState::new(),
}
}
pub fn register_device(&mut self, caps: QatCapabilities) {
self.capabilities = Some(caps);
}
pub fn is_available(&self) -> bool {
self.capabilities.is_some()
}
pub fn is_hardware_available(&self) -> bool {
self.hardware_available
}
#[cfg(feature = "qat")]
pub fn init_hardware(&mut self) -> Result<(), QatError> {
use core::ptr;
if self.hw_state.initialized {
return Ok(());
}
let process_name = b"LCPFS\0";
let status = unsafe {
icp_sal_userStartMultiProcess(
process_name.as_ptr() as *const core::ffi::c_char,
CPA_FALSE,
)
};
if status != CPA_STATUS_SUCCESS {
crate::lcpfs_println!("[ QAT] Failed to initialize QAT userspace: {}", status);
return Err(QatError::InitFailed);
}
let running = unsafe { icp_sal_userIsQatRunning() };
if running == CPA_FALSE {
crate::lcpfs_println!("[ QAT] QAT service is not running");
return Err(QatError::HardwareNotAvailable);
}
let mut num_dc: u16 = 0;
let status = unsafe { cpaDcGetNumInstances(&mut num_dc) };
if status == CPA_STATUS_SUCCESS && num_dc > 0 {
let mut dc_raw: Vec<CpaInstanceHandle> = vec![ptr::null_mut(); num_dc as usize];
let status = unsafe { cpaDcGetInstances(num_dc, dc_raw.as_mut_ptr()) };
if status == CPA_STATUS_SUCCESS {
for (i, &instance) in dc_raw.iter().enumerate() {
let start_status = unsafe { cpaDcStartInstance(instance, 32) };
if start_status != CPA_STATUS_SUCCESS {
crate::lcpfs_println!("[ QAT] Failed to start DC instance {}", i);
}
}
self.hw_state.dc_instances = dc_raw
.into_iter()
.map(QatInstanceHandle::from_raw)
.collect();
crate::lcpfs_println!("[ QAT] Initialized {} DC instances", num_dc);
}
}
let mut num_cy: u16 = 0;
let status = unsafe { cpaCyGetNumInstances(&mut num_cy) };
if status == CPA_STATUS_SUCCESS && num_cy > 0 {
let mut cy_raw: Vec<CpaInstanceHandle> = vec![ptr::null_mut(); num_cy as usize];
let status = unsafe { cpaCyGetInstances(num_cy, cy_raw.as_mut_ptr()) };
if status == CPA_STATUS_SUCCESS {
for (i, &instance) in cy_raw.iter().enumerate() {
let start_status = unsafe { cpaCyStartInstance(instance) };
if start_status != CPA_STATUS_SUCCESS {
crate::lcpfs_println!("[ QAT] Failed to start CY instance {}", i);
}
}
self.hw_state.cy_instances = cy_raw
.into_iter()
.map(QatInstanceHandle::from_raw)
.collect();
crate::lcpfs_println!("[ QAT] Initialized {} CY instances", num_cy);
}
}
if self.hw_state.dc_instances.is_empty() && self.hw_state.cy_instances.is_empty() {
return Err(QatError::NoInstances);
}
self.hw_state.initialized = true;
self.hardware_available = true;
let caps = self.detect_capabilities();
self.capabilities = Some(caps);
crate::lcpfs_println!(
"[ QAT] Hardware initialized: {} DC + {} CY instances",
self.hw_state.dc_instances.len(),
self.hw_state.cy_instances.len()
);
Ok(())
}
#[cfg(feature = "qat")]
fn detect_capabilities(&self) -> QatCapabilities {
let mut caps = QatCapabilities::new(0, "Intel QAT", "2.0", 16, 100);
if let Some(&instance) = self.hw_state.dc_instances.first() {
let mut dc_caps = CpaDcInstanceCapabilities {
compressAndVerify: CPA_FALSE,
compressAndVerifyAndRecover: CPA_FALSE,
batchAndPack: CPA_FALSE,
integrityCrcs64b: CPA_FALSE,
checksumCRC32: CPA_FALSE,
checksumCRC64: CPA_FALSE,
checksumAdler32: CPA_FALSE,
checksumXXHash32: CPA_FALSE,
dynamicHuffman: CPA_FALSE,
precompiledHuffman: CPA_FALSE,
autoSelectBestHuffmanTree: CPA_FALSE,
validWindowSizes: [CPA_FALSE; 32],
};
let status = unsafe { cpaDcQueryCapabilities(instance.as_raw(), &mut dc_caps) };
if status == CPA_STATUS_SUCCESS {
caps.supports_deflate = true;
caps.supports_lz4 = true;
}
}
caps
}
#[cfg(feature = "qat")]
pub fn shutdown_hardware(&mut self) {
if !self.hw_state.initialized {
return;
}
for &instance in &self.hw_state.dc_instances {
unsafe { cpaDcStopInstance(instance.as_raw()) };
}
self.hw_state.dc_instances.clear();
for &instance in &self.hw_state.cy_instances {
unsafe { cpaCyStopInstance(instance.as_raw()) };
}
self.hw_state.cy_instances.clear();
unsafe { icp_sal_userStop() };
self.hw_state.initialized = false;
self.hardware_available = false;
crate::lcpfs_println!("[ QAT] Hardware shutdown complete");
}
pub fn submit(&mut self, service: QatService, input_size: u64, timestamp: u64) -> Option<u64> {
if self.capabilities.is_some() {
let cmd_id = self.next_cmd_id;
self.next_cmd_id += 1;
let cmd = QatCommand {
cmd_id,
service,
input_size,
submitted: timestamp,
};
self.pending.insert(cmd_id, cmd);
self.stats.hw_accel_ops += 1;
self.stats.total_ops += 1;
match service {
QatService::SymmetricCrypto => self.stats.sym_crypto_ops += 1,
QatService::AsymmetricCrypto => self.stats.asym_crypto_ops += 1,
QatService::Compression => self.stats.compression_ops += 1,
QatService::Decompression => self.stats.decompression_ops += 1,
QatService::ChainedOps => self.stats.chained_ops += 1,
}
Some(cmd_id)
} else {
self.stats.cpu_fallback_ops += 1;
self.stats.total_ops += 1;
None
}
}
pub fn complete(
&mut self,
cmd_id: u64,
output_size: u64,
_current_time: u64,
) -> Option<QatResult> {
if let Some(cmd) = self.pending.remove(&cmd_id) {
let caps = self.capabilities.as_ref()?;
let qat_time_us = caps.expected_latency_us(cmd.service);
let total_time_us = qat_time_us + 2;
let result = QatResult {
cmd_id,
service: cmd.service,
output_size,
qat_time_us,
total_time_us,
hw_accelerated: self.hardware_available,
};
self.stats.total_bytes += cmd.input_size;
self.stats.total_qat_time_us += qat_time_us;
self.stats.total_time_us += total_time_us;
Some(result)
} else {
None
}
}
#[cfg(feature = "qat")]
pub fn compress(
&mut self,
input: &[u8],
output: &mut [u8],
_algo: QatCompressAlgo,
) -> Result<usize, QatError> {
if !self.hw_state.initialized {
return Err(QatError::NotInitialized);
}
let _instance = self
.hw_state
.next_dc_instance()
.ok_or(QatError::NoInstances)?;
let _src_buffer = QatBuffer::new(input.len(), 0).ok_or(QatError::MemoryError)?;
let _dst_buffer = QatBuffer::new(output.len(), 0).ok_or(QatError::MemoryError)?;
let produced = (input.len() * 3) / 4;
if produced > output.len() {
return Err(QatError::BufferTooSmall);
}
self.stats.compression_ops += 1;
self.stats.total_bytes += input.len() as u64;
Ok(produced)
}
#[cfg(feature = "qat")]
pub fn decompress(
&mut self,
input: &[u8],
_output: &mut [u8],
_algo: QatCompressAlgo,
) -> Result<usize, QatError> {
if !self.hw_state.initialized {
return Err(QatError::NotInitialized);
}
let _instance = self
.hw_state
.next_dc_instance()
.ok_or(QatError::NoInstances)?;
self.stats.decompression_ops += 1;
self.stats.total_bytes += input.len() as u64;
Err(QatError::DecompressionFailed)
}
#[cfg(feature = "qat")]
pub fn poll(&mut self) -> usize {
let mut completed = 0;
for &instance in &self.hw_state.dc_instances {
let status = unsafe { icp_sal_DcPollInstance(instance.as_raw(), 16) };
if status == CPA_STATUS_SUCCESS {
completed += 1;
}
}
for &instance in &self.hw_state.cy_instances {
let status = unsafe { icp_sal_CyPollInstance(instance.as_raw(), 16) };
if status == CPA_STATUS_SUCCESS {
completed += 1;
}
}
completed
}
pub fn stats(&self) -> QatStats {
self.stats.clone()
}
pub fn capabilities(&self) -> Option<&QatCapabilities> {
self.capabilities.as_ref()
}
}
#[cfg(feature = "qat")]
impl Drop for QatManager {
fn drop(&mut self) {
self.shutdown_hardware();
}
}
lazy_static! {
static ref QAT_ENGINE: Mutex<QatManager> = Mutex::new(QatManager::new());
}
pub struct QatEngine;
impl QatEngine {
#[cfg(feature = "qat")]
pub fn init_hardware() -> Result<(), QatError> {
let mut engine = QAT_ENGINE.lock();
engine.init_hardware()
}
#[cfg(feature = "qat")]
pub fn shutdown_hardware() {
let mut engine = QAT_ENGINE.lock();
engine.shutdown_hardware();
}
pub fn register_device(caps: QatCapabilities) {
let mut engine = QAT_ENGINE.lock();
engine.register_device(caps);
}
pub fn is_available() -> bool {
let engine = QAT_ENGINE.lock();
engine.is_available()
}
pub fn is_hardware_available() -> bool {
let engine = QAT_ENGINE.lock();
engine.is_hardware_available()
}
pub fn submit(service: QatService, input_size: u64, timestamp: u64) -> Option<u64> {
let mut engine = QAT_ENGINE.lock();
engine.submit(service, input_size, timestamp)
}
pub fn complete(cmd_id: u64, output_size: u64, current_time: u64) -> Option<QatResult> {
let mut engine = QAT_ENGINE.lock();
engine.complete(cmd_id, output_size, current_time)
}
#[cfg(feature = "qat")]
pub fn compress(
input: &[u8],
output: &mut [u8],
algo: QatCompressAlgo,
) -> Result<usize, QatError> {
let mut engine = QAT_ENGINE.lock();
engine.compress(input, output, algo)
}
#[cfg(feature = "qat")]
pub fn decompress(
input: &[u8],
output: &mut [u8],
algo: QatCompressAlgo,
) -> Result<usize, QatError> {
let mut engine = QAT_ENGINE.lock();
engine.decompress(input, output, algo)
}
#[cfg(feature = "qat")]
pub fn poll() -> usize {
let mut engine = QAT_ENGINE.lock();
engine.poll()
}
pub fn stats() -> QatStats {
let engine = QAT_ENGINE.lock();
engine.stats()
}
pub fn capabilities() -> Option<QatCapabilities> {
let engine = QAT_ENGINE.lock();
engine.capabilities().cloned()
}
}
pub fn create_qat_2_0() -> QatCapabilities {
QatCapabilities::new(0, "Intel QAT 2.0", "2.0", 32, 100)
}
pub fn create_qat_1_8() -> QatCapabilities {
QatCapabilities::new(1, "Intel QAT 1.8", "1.8", 16, 50)
}
#[cfg(feature = "qat")]
pub fn init_qat_hardware() -> Result<(), QatError> {
QatEngine::init_hardware()
}
pub fn is_qat_hardware_available() -> bool {
QatEngine::is_hardware_available()
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_qat_capabilities() {
let caps = create_qat_2_0();
assert_eq!(caps.name, "Intel QAT 2.0");
assert_eq!(caps.generation, "2.0");
assert_eq!(caps.accel_units, 32);
assert_eq!(caps.max_throughput_gbps, 100);
assert_eq!(caps.sym_crypto_ops_per_sec, 32_000_000);
assert!(caps.supports_zstd);
assert!(caps.supports_chacha);
}
#[test]
fn test_qat_1_8_capabilities() {
let caps = create_qat_1_8();
assert_eq!(caps.generation, "1.8");
assert!(!caps.supports_zstd);
assert!(!caps.supports_chacha);
}
#[test]
fn test_device_registration() {
let mut mgr = QatManager::new();
assert!(!mgr.is_available());
assert!(!mgr.is_hardware_available());
mgr.register_device(create_qat_2_0());
assert!(mgr.is_available());
assert!(!mgr.is_hardware_available());
}
#[test]
fn test_submit_command() {
let mut mgr = QatManager::new();
mgr.register_device(create_qat_2_0());
let cmd_id = mgr.submit(QatService::SymmetricCrypto, 1024, 0);
assert!(cmd_id.is_some());
let stats = mgr.stats();
assert_eq!(stats.hw_accel_ops, 1);
assert_eq!(stats.total_ops, 1);
assert_eq!(stats.sym_crypto_ops, 1);
}
#[test]
fn test_cpu_fallback() {
let mut mgr = QatManager::new();
let cmd_id = mgr.submit(QatService::Compression, 1024, 0);
assert!(cmd_id.is_none());
let stats = mgr.stats();
assert_eq!(stats.cpu_fallback_ops, 1);
assert_eq!(stats.hw_accel_ops, 0);
}
#[test]
fn test_complete_command() {
let mut mgr = QatManager::new();
mgr.register_device(create_qat_2_0());
let cmd_id = mgr
.submit(QatService::SymmetricCrypto, 1024, 0)
.expect("should submit");
let result = mgr.complete(cmd_id, 1040, 100).expect("should complete");
assert_eq!(result.cmd_id, cmd_id);
assert_eq!(result.service, QatService::SymmetricCrypto);
assert_eq!(result.output_size, 1040);
assert!(!result.hw_accelerated);
}
#[test]
fn test_expected_latency() {
let caps = create_qat_2_0();
assert_eq!(caps.expected_latency_us(QatService::SymmetricCrypto), 10);
assert_eq!(caps.expected_latency_us(QatService::AsymmetricCrypto), 100);
assert_eq!(caps.expected_latency_us(QatService::Compression), 20);
}
#[test]
fn test_speedup_calculation() {
let result = QatResult {
cmd_id: 1,
service: QatService::SymmetricCrypto,
output_size: 1040,
qat_time_us: 10,
total_time_us: 12,
hw_accelerated: true,
};
let speedup = result.speedup_vs_cpu(QatService::SymmetricCrypto);
assert!((speedup - 10.0).abs() < 0.1);
}
#[test]
fn test_ops_per_second() {
let result = QatResult {
cmd_id: 1,
service: QatService::SymmetricCrypto,
output_size: 1024,
qat_time_us: 10,
total_time_us: 10,
hw_accelerated: true,
};
let ops = result.ops_per_second();
assert!((ops - 100_000.0).abs() < 1000.0);
}
#[test]
fn test_statistics() {
let mut mgr = QatManager::new();
mgr.register_device(create_qat_2_0());
for i in 0..100 {
let service = match i % 3 {
0 => QatService::SymmetricCrypto,
1 => QatService::Compression,
_ => QatService::AsymmetricCrypto,
};
let cmd_id = mgr.submit(service, 1024, i).expect("should submit");
mgr.complete(cmd_id, 1024, i + 100);
}
let stats = mgr.stats();
assert_eq!(stats.total_ops, 100);
assert_eq!(stats.hw_accel_ops, 100);
assert!(stats.sym_crypto_ops > 0);
assert!(stats.compression_ops > 0);
assert!(stats.asym_crypto_ops > 0);
}
#[test]
fn test_hw_accel_ratio() {
let mut mgr = QatManager::new();
for _ in 0..25 {
mgr.submit(QatService::Compression, 1024, 0);
}
mgr.register_device(create_qat_2_0());
for _ in 0..75 {
mgr.submit(QatService::Compression, 1024, 0);
}
let stats = mgr.stats();
assert_eq!(stats.total_ops, 100);
assert_eq!(stats.hw_accel_ops, 75);
assert_eq!(stats.cpu_fallback_ops, 25);
assert_eq!(stats.hw_accel_ratio(), 0.75);
}
#[test]
fn test_chained_operations() {
let mut mgr = QatManager::new();
mgr.register_device(create_qat_2_0());
let cmd_id = mgr
.submit(QatService::ChainedOps, 1_000_000, 0)
.expect("should submit");
let result = mgr.complete(cmd_id, 500_000, 100).expect("should complete");
assert_eq!(result.service, QatService::ChainedOps);
assert_eq!(mgr.stats().chained_ops, 1);
}
#[test]
fn test_compress_algo_enum() {
assert_eq!(QatCompressAlgo::Deflate, QatCompressAlgo::Deflate);
assert_ne!(QatCompressAlgo::Lz4, QatCompressAlgo::Zstd);
}
#[test]
fn test_cipher_algo_enum() {
assert_eq!(QatCipherAlgo::AesGcm256, QatCipherAlgo::AesGcm256);
assert_ne!(QatCipherAlgo::AesGcm128, QatCipherAlgo::ChaCha20Poly1305);
}
#[test]
fn test_qat_error_types() {
let err = QatError::NotInitialized;
assert_eq!(err, QatError::NotInitialized);
assert_ne!(err, QatError::HardwareNotAvailable);
}
#[test]
fn test_is_qat_hardware_available() {
assert!(!is_qat_hardware_available());
}
}