use alloc::collections::BTreeMap;
use alloc::string::String;
use alloc::vec::Vec;
use lazy_static::lazy_static;
use spin::Mutex;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum NvmeofTransport {
Rdma,
Tcp,
Fc,
}
impl NvmeofTransport {
pub fn name(&self) -> &'static str {
match self {
NvmeofTransport::Rdma => "RDMA",
NvmeofTransport::Tcp => "TCP",
NvmeofTransport::Fc => "FC",
}
}
pub fn latency_us(&self) -> u64 {
match self {
NvmeofTransport::Rdma => 5, NvmeofTransport::Tcp => 50, NvmeofTransport::Fc => 20, }
}
pub fn bandwidth_gbps(&self) -> u32 {
match self {
NvmeofTransport::Rdma => 200, NvmeofTransport::Tcp => 100, NvmeofTransport::Fc => 32, }
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum RdmaOp {
Read,
Write,
CompareSwap,
FetchAdd,
}
#[derive(Debug, Clone)]
pub struct NvmeofTarget {
pub id: u64,
pub address: String,
pub transport: NvmeofTransport,
pub nsid: u32,
pub capacity: u64,
pub queue_depth: u16,
pub connected: bool,
pub qp_num: u32,
}
impl NvmeofTarget {
pub fn new(id: u64, address: String, transport: NvmeofTransport, capacity: u64) -> Self {
Self {
id,
address,
transport,
nsid: 1,
capacity,
queue_depth: 128,
connected: false,
qp_num: 0,
}
}
pub fn connect(&mut self, qp_num: u32) -> Result<(), &'static str> {
if self.connected {
return Err("Already connected");
}
crate::lcpfs_println!(
"[ NVMEOF] Connecting to {} via {} (QP: {})",
self.address,
self.transport.name(),
qp_num
);
self.qp_num = qp_num;
self.connected = true;
Ok(())
}
pub fn disconnect(&mut self) {
self.connected = false;
self.qp_num = 0;
}
}
#[derive(Debug, Clone)]
pub struct RdmaMemoryRegion {
pub laddr: u64,
pub raddr: u64,
pub size: u64,
pub access: RdmaAccess,
}
#[derive(Debug, Clone, Copy)]
pub struct RdmaAccess {
pub local_read: bool,
pub local_write: bool,
pub remote_read: bool,
pub remote_write: bool,
pub atomic: bool,
}
impl RdmaAccess {
pub fn full() -> Self {
Self {
local_read: true,
local_write: true,
remote_read: true,
remote_write: true,
atomic: true,
}
}
pub fn read_only() -> Self {
Self {
local_read: true,
local_write: false,
remote_read: true,
remote_write: false,
atomic: false,
}
}
}
#[derive(Debug, Clone)]
pub struct RdmaCommand {
pub cmd_id: u64,
pub op: RdmaOp,
pub target_id: u64,
pub laddr: u64,
pub raddr: u64,
pub size: u64,
pub timestamp: u64,
}
#[derive(Debug, Clone)]
pub struct RdmaCompletion {
pub cmd_id: u64,
pub success: bool,
pub bytes: u64,
pub latency_us: u64,
}
#[derive(Debug, Clone, Default)]
pub struct NvmeofStats {
pub rdma_reads: u64,
pub rdma_writes: u64,
pub rdma_atomics: u64,
pub bytes_read: u64,
pub bytes_written: u64,
pub avg_latency_us: u64,
pub zero_copy_ops: u64,
}
lazy_static! {
static ref NVMEOF_MANAGER: Mutex<NvmeofManager> = Mutex::new(NvmeofManager::new());
}
pub struct NvmeofManager {
targets: BTreeMap<u64, NvmeofTarget>,
memory_regions: Vec<RdmaMemoryRegion>,
pending: BTreeMap<u64, RdmaCommand>,
next_cmd_id: u64,
stats: NvmeofStats,
total_latency: u64,
}
impl Default for NvmeofManager {
fn default() -> Self {
Self::new()
}
}
impl NvmeofManager {
pub fn new() -> Self {
Self {
targets: BTreeMap::new(),
memory_regions: Vec::new(),
pending: BTreeMap::new(),
next_cmd_id: 1,
stats: NvmeofStats::default(),
total_latency: 0,
}
}
pub fn register_target(&mut self, target: NvmeofTarget) {
crate::lcpfs_println!(
"[ NVMEOF] Registered target {} ({} GB via {})",
target.address,
target.capacity / 1024 / 1024 / 1024,
target.transport.name()
);
self.targets.insert(target.id, target);
}
pub fn connect(&mut self, target_id: u64) -> Result<(), &'static str> {
let target = self.targets.get_mut(&target_id).ok_or("Target not found")?;
let qp_num = (target_id as u32) * 100 + 1;
target.connect(qp_num)?;
Ok(())
}
pub fn register_memory(
&mut self,
laddr: u64,
raddr: u64,
size: u64,
access: RdmaAccess,
) -> Result<(), &'static str> {
let region = RdmaMemoryRegion {
laddr,
raddr,
size,
access,
};
self.memory_regions.push(region);
crate::lcpfs_println!(
"[ NVMEOF] Registered RDMA MR: local=0x{:x}, remote=0x{:x}, size={}",
laddr,
raddr,
size
);
Ok(())
}
pub fn rdma_read(
&mut self,
target_id: u64,
laddr: u64,
raddr: u64,
size: u64,
timestamp: u64,
) -> Result<u64, &'static str> {
let target = self.targets.get(&target_id).ok_or("Target not found")?;
if !target.connected {
return Err("Target not connected");
}
let cmd_id = self.next_cmd_id;
self.next_cmd_id += 1;
let cmd = RdmaCommand {
cmd_id,
op: RdmaOp::Read,
target_id,
laddr,
raddr,
size,
timestamp,
};
self.pending.insert(cmd_id, cmd);
Ok(cmd_id)
}
pub fn rdma_write(
&mut self,
target_id: u64,
laddr: u64,
raddr: u64,
size: u64,
timestamp: u64,
) -> Result<u64, &'static str> {
let target = self.targets.get(&target_id).ok_or("Target not found")?;
if !target.connected {
return Err("Target not connected");
}
let cmd_id = self.next_cmd_id;
self.next_cmd_id += 1;
let cmd = RdmaCommand {
cmd_id,
op: RdmaOp::Write,
target_id,
laddr,
raddr,
size,
timestamp,
};
self.pending.insert(cmd_id, cmd);
Ok(cmd_id)
}
pub fn complete(
&mut self,
cmd_id: u64,
success: bool,
completion_time: u64,
) -> Result<RdmaCompletion, &'static str> {
let cmd = self.pending.remove(&cmd_id).ok_or("Command not found")?;
let latency_us = completion_time.saturating_sub(cmd.timestamp);
let completion = RdmaCompletion {
cmd_id,
success,
bytes: cmd.size,
latency_us,
};
match cmd.op {
RdmaOp::Read => {
self.stats.rdma_reads += 1;
self.stats.bytes_read += cmd.size;
}
RdmaOp::Write => {
self.stats.rdma_writes += 1;
self.stats.bytes_written += cmd.size;
}
RdmaOp::CompareSwap | RdmaOp::FetchAdd => {
self.stats.rdma_atomics += 1;
}
}
self.stats.zero_copy_ops += 1;
self.total_latency += latency_us;
let total_ops = self.stats.rdma_reads + self.stats.rdma_writes + self.stats.rdma_atomics;
if total_ops > 0 {
self.stats.avg_latency_us = self.total_latency / total_ops;
}
Ok(completion)
}
pub fn stats(&self) -> NvmeofStats {
self.stats.clone()
}
pub fn target_count(&self) -> usize {
self.targets.len()
}
pub fn connected_count(&self) -> usize {
self.targets.values().filter(|t| t.connected).count()
}
}
pub struct Nvmeof;
impl Nvmeof {
pub fn register_target(target: NvmeofTarget) {
let mut mgr = NVMEOF_MANAGER.lock();
mgr.register_target(target);
}
pub fn connect(target_id: u64) -> Result<(), &'static str> {
let mut mgr = NVMEOF_MANAGER.lock();
mgr.connect(target_id)
}
pub fn register_memory(
laddr: u64,
raddr: u64,
size: u64,
access: RdmaAccess,
) -> Result<(), &'static str> {
let mut mgr = NVMEOF_MANAGER.lock();
mgr.register_memory(laddr, raddr, size, access)
}
pub fn rdma_read(
target_id: u64,
laddr: u64,
raddr: u64,
size: u64,
timestamp: u64,
) -> Result<u64, &'static str> {
let mut mgr = NVMEOF_MANAGER.lock();
mgr.rdma_read(target_id, laddr, raddr, size, timestamp)
}
pub fn rdma_write(
target_id: u64,
laddr: u64,
raddr: u64,
size: u64,
timestamp: u64,
) -> Result<u64, &'static str> {
let mut mgr = NVMEOF_MANAGER.lock();
mgr.rdma_write(target_id, laddr, raddr, size, timestamp)
}
pub fn complete(
cmd_id: u64,
success: bool,
completion_time: u64,
) -> Result<RdmaCompletion, &'static str> {
let mut mgr = NVMEOF_MANAGER.lock();
mgr.complete(cmd_id, success, completion_time)
}
pub fn stats() -> NvmeofStats {
let mgr = NVMEOF_MANAGER.lock();
mgr.stats()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_transport_characteristics() {
assert!(NvmeofTransport::Rdma.latency_us() < NvmeofTransport::Tcp.latency_us());
assert!(NvmeofTransport::Rdma.bandwidth_gbps() > NvmeofTransport::Fc.bandwidth_gbps());
}
#[test]
fn test_target_creation() {
let target = NvmeofTarget::new(
1,
"192.168.1.100".into(),
NvmeofTransport::Rdma,
1_000_000_000_000,
);
assert_eq!(target.id, 1);
assert_eq!(target.transport, NvmeofTransport::Rdma);
assert!(!target.connected);
}
#[test]
fn test_target_connection() {
let mut target = NvmeofTarget::new(
1,
"192.168.1.100".into(),
NvmeofTransport::Rdma,
1_000_000_000_000,
);
assert!(target.connect(100).is_ok());
assert!(target.connected);
assert_eq!(target.qp_num, 100);
assert!(target.connect(101).is_err());
}
#[test]
fn test_rdma_access_rights() {
let full = RdmaAccess::full();
assert!(full.local_read && full.remote_write && full.atomic);
let ro = RdmaAccess::read_only();
assert!(ro.local_read && !ro.remote_write && !ro.atomic);
}
#[test]
fn test_manager_registration() {
let mut mgr = NvmeofManager::new();
let target =
NvmeofTarget::new(1, "10.0.0.1".into(), NvmeofTransport::Rdma, 500_000_000_000);
mgr.register_target(target);
assert_eq!(mgr.target_count(), 1);
assert_eq!(mgr.connected_count(), 0);
}
#[test]
fn test_memory_registration() {
let mut mgr = NvmeofManager::new();
let result = mgr.register_memory(0x1000, 0x2000, 4096, RdmaAccess::full());
assert!(result.is_ok());
assert_eq!(mgr.memory_regions.len(), 1);
assert_eq!(mgr.memory_regions[0].size, 4096);
}
#[test]
fn test_rdma_read_write() {
let mut mgr = NvmeofManager::new();
let mut target =
NvmeofTarget::new(1, "10.0.0.1".into(), NvmeofTransport::Rdma, 1_000_000_000);
target.connect(100).expect("test: operation should succeed");
mgr.register_target(target);
let cmd_id = mgr
.rdma_read(1, 0x1000, 0x2000, 4096, 100)
.expect("test: operation should succeed");
assert_eq!(cmd_id, 1);
let cmd_id2 = mgr
.rdma_write(1, 0x3000, 0x4000, 8192, 200)
.expect("test: operation should succeed");
assert_eq!(cmd_id2, 2);
let completion = mgr
.complete(cmd_id, true, 105)
.expect("test: operation should succeed");
assert_eq!(completion.bytes, 4096);
assert_eq!(completion.latency_us, 5);
assert_eq!(mgr.stats.rdma_reads, 1);
assert_eq!(mgr.stats.bytes_read, 4096);
}
#[test]
fn test_completion_tracking() {
let mut mgr = NvmeofManager::new();
let mut target =
NvmeofTarget::new(1, "10.0.0.1".into(), NvmeofTransport::Tcp, 1_000_000_000);
target.connect(100).expect("test: operation should succeed");
mgr.register_target(target);
mgr.rdma_read(1, 0x1000, 0x2000, 4096, 100)
.expect("test: operation should succeed");
mgr.rdma_write(1, 0x3000, 0x4000, 8192, 110)
.expect("test: operation should succeed");
mgr.complete(1, true, 105)
.expect("test: operation should succeed");
mgr.complete(2, true, 120)
.expect("test: operation should succeed");
let stats = mgr.stats();
assert_eq!(stats.rdma_reads, 1);
assert_eq!(stats.rdma_writes, 1);
assert_eq!(stats.zero_copy_ops, 2);
assert!(stats.avg_latency_us > 0);
}
#[test]
fn test_disconnected_target_error() {
let mut mgr = NvmeofManager::new();
let target = NvmeofTarget::new(1, "10.0.0.1".into(), NvmeofTransport::Rdma, 1_000_000_000);
mgr.register_target(target);
let result = mgr.rdma_read(1, 0x1000, 0x2000, 4096, 100);
assert!(result.is_err());
}
#[test]
fn test_latency_averaging() {
let mut mgr = NvmeofManager::new();
let mut target =
NvmeofTarget::new(1, "10.0.0.1".into(), NvmeofTransport::Rdma, 1_000_000_000);
target.connect(100).expect("test: operation should succeed");
mgr.register_target(target);
let cmd1 = mgr
.rdma_read(1, 0x1000, 0x2000, 4096, 100)
.expect("test: operation should succeed");
let cmd2 = mgr
.rdma_read(1, 0x3000, 0x4000, 4096, 200)
.expect("test: operation should succeed");
mgr.complete(cmd1, true, 110)
.expect("test: operation should succeed"); mgr.complete(cmd2, true, 220)
.expect("test: operation should succeed");
let stats = mgr.stats();
assert_eq!(stats.avg_latency_us, 15); }
#[test]
fn test_statistics() {
let mut mgr = NvmeofManager::new();
let mut target =
NvmeofTarget::new(1, "10.0.0.1".into(), NvmeofTransport::Rdma, 1_000_000_000);
target.connect(100).expect("test: operation should succeed");
mgr.register_target(target);
let cmd1 = mgr
.rdma_read(1, 0x1000, 0x2000, 1024, 100)
.expect("test: operation should succeed");
let cmd2 = mgr
.rdma_write(1, 0x3000, 0x4000, 2048, 110)
.expect("test: operation should succeed");
mgr.complete(cmd1, true, 105)
.expect("test: operation should succeed");
mgr.complete(cmd2, true, 120)
.expect("test: operation should succeed");
let stats = mgr.stats();
assert_eq!(stats.rdma_reads, 1);
assert_eq!(stats.rdma_writes, 1);
assert_eq!(stats.bytes_read, 1024);
assert_eq!(stats.bytes_written, 2048);
assert_eq!(stats.zero_copy_ops, 2);
}
}