#![deny(missing_docs)]
#![warn(rust_2018_idioms)]
#![cfg_attr(feature = "cargo-clippy", allow(doc_markdown))]
use std::convert::TryInto;
use std::ffi::CStr;
use std::io;
use std::marker::PhantomData;
use std::mem;
use std::os::raw::c_void;
use std::ptr;
const PORT_NUM: u8 = 1;
pub use ffi::ibv_qp_type;
pub use ffi::ibv_wc;
pub use ffi::ibv_wc_opcode;
pub use ffi::ibv_wc_status;
#[cfg(feature = "serde")]
use serde::{Deserialize, Serialize};
pub use ffi::ibv_access_flags;
mod sliceindex;
pub fn devices() -> io::Result<DeviceList> {
let mut n = 0i32;
let devices = unsafe { ffi::ibv_get_device_list(&mut n as *mut _) };
if devices.is_null() {
return Err(io::Error::last_os_error());
}
let devices = unsafe {
use std::slice;
slice::from_raw_parts_mut(devices, n as usize)
};
Ok(DeviceList(devices))
}
pub struct DeviceList(&'static mut [*mut ffi::ibv_device]);
unsafe impl Sync for DeviceList {}
unsafe impl Send for DeviceList {}
impl Drop for DeviceList {
fn drop(&mut self) {
unsafe { ffi::ibv_free_device_list(self.0.as_mut_ptr()) };
}
}
impl DeviceList {
pub fn iter(&self) -> DeviceListIter<'_> {
DeviceListIter { list: self, i: 0 }
}
pub fn len(&self) -> usize {
self.0.len()
}
pub fn is_empty(&self) -> bool {
self.0.is_empty()
}
pub fn get(&self, index: usize) -> Option<Device<'_>> {
self.0.get(index).map(|d| d.into())
}
}
impl<'a> IntoIterator for &'a DeviceList {
type Item = <DeviceListIter<'a> as Iterator>::Item;
type IntoIter = DeviceListIter<'a>;
fn into_iter(self) -> Self::IntoIter {
DeviceListIter { list: self, i: 0 }
}
}
pub struct DeviceListIter<'iter> {
list: &'iter DeviceList,
i: usize,
}
impl<'iter> Iterator for DeviceListIter<'iter> {
type Item = Device<'iter>;
fn next(&mut self) -> Option<Self::Item> {
let e = self.list.0.get(self.i);
if e.is_some() {
self.i += 1;
}
e.map(|e| e.into())
}
}
pub struct Device<'devlist>(&'devlist *mut ffi::ibv_device);
unsafe impl<'devlist> Sync for Device<'devlist> {}
unsafe impl<'devlist> Send for Device<'devlist> {}
impl<'d> From<&'d *mut ffi::ibv_device> for Device<'d> {
fn from(d: &'d *mut ffi::ibv_device) -> Self {
Device(d)
}
}
impl<'devlist> Device<'devlist> {
pub fn open(&self) -> io::Result<Context> {
Context::with_device(*self.0)
}
pub fn name(&self) -> Option<&'devlist CStr> {
let name_ptr = unsafe { ffi::ibv_get_device_name(*self.0) };
if name_ptr.is_null() {
None
} else {
Some(unsafe { CStr::from_ptr(name_ptr) })
}
}
pub fn guid(&self) -> io::Result<u64> {
let guid = unsafe { ffi::ibv_get_device_guid(*self.0) };
if guid == 0 {
Err(io::Error::last_os_error())
} else {
Ok(guid)
}
}
pub fn index(&self) -> io::Result<i32> {
let idx = unsafe { ffi::ibv_get_device_index(*self.0) };
if idx == -1 {
Err(io::Error::new(
io::ErrorKind::Unsupported,
"device index not known",
))
} else {
Ok(idx)
}
}
}
pub struct Context {
ctx: *mut ffi::ibv_context,
port_attr: ffi::ibv_port_attr,
gid: Gid,
}
unsafe impl Sync for Context {}
unsafe impl Send for Context {}
impl Context {
fn with_device(dev: *mut ffi::ibv_device) -> io::Result<Context> {
assert!(!dev.is_null());
let ctx = unsafe { ffi::ibv_open_device(dev) };
if ctx.is_null() {
return Err(io::Error::new(
io::ErrorKind::Other,
"failed to open device".to_string(),
));
}
let mut port_attr = ffi::ibv_port_attr::default();
let errno = unsafe {
ffi::ibv_query_port(
ctx,
PORT_NUM,
&mut port_attr as *mut ffi::ibv_port_attr as *mut _,
)
};
if errno != 0 {
return Err(io::Error::from_raw_os_error(errno));
}
match port_attr.state {
ffi::ibv_port_state::IBV_PORT_ACTIVE | ffi::ibv_port_state::IBV_PORT_ARMED => {}
_ => {
return Err(io::Error::new(
io::ErrorKind::Other,
"port is not ACTIVE or ARMED".to_string(),
));
}
}
let mut gid = Gid::default();
let ok = unsafe { ffi::ibv_query_gid(ctx, PORT_NUM, 0, gid.as_mut()) };
if ok != 0 {
return Err(io::Error::last_os_error());
}
Ok(Context {
ctx,
port_attr,
gid,
})
}
pub fn create_cq(&self, min_cq_entries: i32, id: isize) -> io::Result<CompletionQueue<'_>> {
let cq = unsafe {
ffi::ibv_create_cq(
self.ctx,
min_cq_entries,
ptr::null::<c_void>().offset(id) as *mut _,
ptr::null::<c_void>() as *mut _,
0,
)
};
if cq.is_null() {
Err(io::Error::last_os_error())
} else {
Ok(CompletionQueue {
_phantom: PhantomData,
cq,
})
}
}
pub fn alloc_pd(&self) -> Result<ProtectionDomain<'_>, ()> {
let pd = unsafe { ffi::ibv_alloc_pd(self.ctx) };
if pd.is_null() {
Err(())
} else {
Ok(ProtectionDomain { ctx: self, pd })
}
}
}
impl Drop for Context {
fn drop(&mut self) {
let ok = unsafe { ffi::ibv_close_device(self.ctx) };
assert_eq!(ok, 0);
}
}
pub struct CompletionQueue<'ctx> {
_phantom: PhantomData<&'ctx ()>,
cq: *mut ffi::ibv_cq,
}
unsafe impl<'a> Send for CompletionQueue<'a> {}
unsafe impl<'a> Sync for CompletionQueue<'a> {}
impl<'ctx> CompletionQueue<'ctx> {
#[inline]
pub fn poll<'c>(
&self,
completions: &'c mut [ffi::ibv_wc],
) -> Result<&'c mut [ffi::ibv_wc], ()> {
let ctx: *mut ffi::ibv_context = unsafe { &*self.cq }.context;
let ops = &mut unsafe { &mut *ctx }.ops;
let n = unsafe {
ops.poll_cq.as_mut().unwrap()(
self.cq,
completions.len() as i32,
completions.as_mut_ptr(),
)
};
if n < 0 {
Err(())
} else {
Ok(&mut completions[0..n as usize])
}
}
}
impl<'a> Drop for CompletionQueue<'a> {
fn drop(&mut self) {
let errno = unsafe { ffi::ibv_destroy_cq(self.cq) };
if errno != 0 {
let e = io::Error::from_raw_os_error(errno);
panic!("{}", e);
}
}
}
pub struct QueuePairBuilder<'res> {
ctx: isize,
pd: &'res ProtectionDomain<'res>,
send: &'res CompletionQueue<'res>,
max_send_wr: u32,
recv: &'res CompletionQueue<'res>,
max_recv_wr: u32,
max_send_sge: u32,
max_recv_sge: u32,
max_inline_data: u32,
qp_type: ffi::ibv_qp_type::Type,
access: ffi::ibv_access_flags,
timeout: u8,
retry_count: u8,
rnr_retry: u8,
min_rnr_timer: u8,
}
impl<'res> QueuePairBuilder<'res> {
fn new<'scq, 'rcq, 'pd>(
pd: &'pd ProtectionDomain<'_>,
send: &'scq CompletionQueue<'_>,
max_send_wr: u32,
recv: &'rcq CompletionQueue<'_>,
max_recv_wr: u32,
qp_type: ffi::ibv_qp_type::Type,
) -> QueuePairBuilder<'res>
where
'scq: 'res,
'rcq: 'res,
'pd: 'res,
{
QueuePairBuilder {
ctx: 0,
pd,
send,
max_send_wr,
recv,
max_recv_wr,
max_send_sge: 1,
max_recv_sge: 1,
max_inline_data: 0,
qp_type,
access: ffi::ibv_access_flags::IBV_ACCESS_LOCAL_WRITE,
min_rnr_timer: 16,
retry_count: 6,
rnr_retry: 6,
timeout: 4,
}
}
pub fn set_access(&mut self, access: ffi::ibv_access_flags) -> &mut Self {
self.access = access;
self
}
pub fn allow_remote_rw(&mut self) -> &mut Self {
self.access = self.access
| ffi::ibv_access_flags::IBV_ACCESS_REMOTE_WRITE
| ffi::ibv_access_flags::IBV_ACCESS_REMOTE_READ;
self
}
pub fn set_min_rnr_timer(&mut self, timer: u8) -> &mut Self {
self.min_rnr_timer = timer;
self
}
pub fn set_timeout(&mut self, timeout: u8) -> &mut Self {
self.timeout = timeout;
self
}
pub fn set_retry_count(&mut self, count: u8) -> &mut Self {
assert!(count <= 7);
self.retry_count = count;
self
}
pub fn set_rnr_retry(&mut self, n: u8) -> &mut Self {
assert!(n <= 7);
self.rnr_retry = n;
self
}
pub fn set_context(&mut self, ctx: isize) -> &mut Self {
self.ctx = ctx;
self
}
pub fn build(&self) -> io::Result<PreparedQueuePair<'res>> {
let mut attr = ffi::ibv_qp_init_attr {
qp_context: unsafe { ptr::null::<c_void>().offset(self.ctx) } as *mut _,
send_cq: self.send.cq as *const _ as *mut _,
recv_cq: self.recv.cq as *const _ as *mut _,
srq: ptr::null::<ffi::ibv_srq>() as *mut _,
cap: ffi::ibv_qp_cap {
max_send_wr: self.max_send_wr,
max_recv_wr: self.max_recv_wr,
max_send_sge: self.max_send_sge,
max_recv_sge: self.max_recv_sge,
max_inline_data: self.max_inline_data,
},
qp_type: self.qp_type,
sq_sig_all: 0,
};
let qp = unsafe { ffi::ibv_create_qp(self.pd.pd, &mut attr as *mut _) };
if qp.is_null() {
Err(io::Error::last_os_error())
} else {
Ok(PreparedQueuePair {
ctx: self.pd.ctx,
qp: QueuePair {
_phantom: PhantomData,
qp,
},
access: self.access,
timeout: self.timeout,
retry_count: self.retry_count,
rnr_retry: self.rnr_retry,
min_rnr_timer: self.min_rnr_timer,
})
}
}
}
pub struct PreparedQueuePair<'res> {
ctx: &'res Context,
qp: QueuePair<'res>,
access: ffi::ibv_access_flags,
min_rnr_timer: u8,
timeout: u8,
retry_count: u8,
rnr_retry: u8,
}
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
#[derive(Default, Copy, Clone, Debug, Eq, PartialEq, Hash)]
#[repr(transparent)]
struct Gid {
raw: [u8; 16],
}
impl Gid {
#[allow(dead_code)]
fn subnet_prefix(&self) -> u64 {
u64::from_be_bytes(self.raw[..8].try_into().unwrap())
}
#[allow(dead_code)]
fn interface_id(&self) -> u64 {
u64::from_be_bytes(self.raw[8..].try_into().unwrap())
}
}
impl From<ffi::ibv_gid> for Gid {
fn from(gid: ffi::ibv_gid) -> Self {
Self {
raw: unsafe { gid.raw },
}
}
}
impl From<Gid> for ffi::ibv_gid {
fn from(mut gid: Gid) -> Self {
*gid.as_mut()
}
}
impl AsRef<ffi::ibv_gid> for Gid {
fn as_ref(&self) -> &ffi::ibv_gid {
unsafe { &*self.raw.as_ptr().cast::<ffi::ibv_gid>() }
}
}
impl AsMut<ffi::ibv_gid> for Gid {
fn as_mut(&mut self) -> &mut ffi::ibv_gid {
unsafe { &mut *self.raw.as_mut_ptr().cast::<ffi::ibv_gid>() }
}
}
#[derive(Copy, Clone, PartialEq, Eq, Debug)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct QueuePairEndpoint {
num: u32,
lid: u16,
gid: Gid,
}
impl<'res> PreparedQueuePair<'res> {
pub fn endpoint(&self) -> QueuePairEndpoint {
let num = unsafe { &*self.qp.qp }.qp_num;
QueuePairEndpoint {
num,
lid: self.ctx.port_attr.lid,
gid: self.ctx.gid,
}
}
pub fn handshake(self, remote: QueuePairEndpoint) -> io::Result<QueuePair<'res>> {
let mut attr = ffi::ibv_qp_attr::default();
attr.qp_state = ffi::ibv_qp_state::IBV_QPS_INIT;
attr.qp_access_flags = self.access.0;
attr.pkey_index = 0;
attr.port_num = PORT_NUM;
let mask = ffi::ibv_qp_attr_mask::IBV_QP_STATE
| ffi::ibv_qp_attr_mask::IBV_QP_PKEY_INDEX
| ffi::ibv_qp_attr_mask::IBV_QP_PORT
| ffi::ibv_qp_attr_mask::IBV_QP_ACCESS_FLAGS;
let errno = unsafe { ffi::ibv_modify_qp(self.qp.qp, &mut attr as *mut _, mask.0 as i32) };
if errno != 0 {
return Err(io::Error::from_raw_os_error(errno));
}
let mut attr = ffi::ibv_qp_attr::default();
attr.qp_state = ffi::ibv_qp_state::IBV_QPS_RTR;
attr.path_mtu = self.ctx.port_attr.active_mtu;
attr.dest_qp_num = remote.num;
attr.rq_psn = 0;
attr.max_dest_rd_atomic = 1;
attr.min_rnr_timer = self.min_rnr_timer;
attr.ah_attr.is_global = 1;
attr.ah_attr.dlid = remote.lid;
attr.ah_attr.sl = 0;
attr.ah_attr.src_path_bits = 0;
attr.ah_attr.port_num = PORT_NUM;
attr.ah_attr.grh.dgid = remote.gid.into();
attr.ah_attr.grh.hop_limit = 0xff;
let mask = ffi::ibv_qp_attr_mask::IBV_QP_STATE
| ffi::ibv_qp_attr_mask::IBV_QP_AV
| ffi::ibv_qp_attr_mask::IBV_QP_PATH_MTU
| ffi::ibv_qp_attr_mask::IBV_QP_DEST_QPN
| ffi::ibv_qp_attr_mask::IBV_QP_RQ_PSN
| ffi::ibv_qp_attr_mask::IBV_QP_MAX_DEST_RD_ATOMIC
| ffi::ibv_qp_attr_mask::IBV_QP_MIN_RNR_TIMER;
let errno = unsafe { ffi::ibv_modify_qp(self.qp.qp, &mut attr as *mut _, mask.0 as i32) };
if errno != 0 {
return Err(io::Error::from_raw_os_error(errno));
}
let mut attr = ffi::ibv_qp_attr::default();
attr.qp_state = ffi::ibv_qp_state::IBV_QPS_RTS;
attr.timeout = self.timeout;
attr.retry_cnt = self.retry_count;
attr.sq_psn = 0;
attr.rnr_retry = self.rnr_retry;
attr.max_rd_atomic = 1;
let mask = ffi::ibv_qp_attr_mask::IBV_QP_STATE
| ffi::ibv_qp_attr_mask::IBV_QP_TIMEOUT
| ffi::ibv_qp_attr_mask::IBV_QP_RETRY_CNT
| ffi::ibv_qp_attr_mask::IBV_QP_SQ_PSN
| ffi::ibv_qp_attr_mask::IBV_QP_RNR_RETRY
| ffi::ibv_qp_attr_mask::IBV_QP_MAX_QP_RD_ATOMIC;
let errno = unsafe { ffi::ibv_modify_qp(self.qp.qp, &mut attr as *mut _, mask.0 as i32) };
if errno != 0 {
return Err(io::Error::from_raw_os_error(errno));
}
Ok(self.qp)
}
}
pub struct MemoryRegion<T> {
mr: *mut ffi::ibv_mr,
data: Vec<T>,
}
unsafe impl<T> Send for MemoryRegion<T> {}
unsafe impl<T> Sync for MemoryRegion<T> {}
use std::ops::{Deref, DerefMut};
impl<T> Deref for MemoryRegion<T> {
type Target = [T];
fn deref(&self) -> &Self::Target {
&self.data[..]
}
}
impl<T> DerefMut for MemoryRegion<T> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.data[..]
}
}
impl<T> MemoryRegion<T> {
pub fn rkey(&self) -> RemoteKey {
RemoteKey(unsafe { &*self.mr }.rkey)
}
}
pub struct RemoteKey(u32);
impl<T> Drop for MemoryRegion<T> {
fn drop(&mut self) {
let errno = unsafe { ffi::ibv_dereg_mr(self.mr) };
if errno != 0 {
let e = io::Error::from_raw_os_error(errno);
panic!("{}", e);
}
}
}
pub struct ProtectionDomain<'ctx> {
ctx: &'ctx Context,
pd: *mut ffi::ibv_pd,
}
unsafe impl<'a> Sync for ProtectionDomain<'a> {}
unsafe impl<'a> Send for ProtectionDomain<'a> {}
impl<'ctx> ProtectionDomain<'ctx> {
pub fn create_qp<'pd, 'scq, 'rcq, 'res>(
&'pd self,
send: &'scq CompletionQueue<'_>,
recv: &'rcq CompletionQueue<'_>,
qp_type: ffi::ibv_qp_type::Type,
) -> QueuePairBuilder<'res>
where
'scq: 'res,
'rcq: 'res,
'pd: 'res,
{
QueuePairBuilder::new(self, send, 1, recv, 1, qp_type)
}
pub fn allocate<T: Sized + Copy + Default>(&self, n: usize) -> io::Result<MemoryRegion<T>> {
assert!(n > 0);
assert!(mem::size_of::<T>() > 0);
let mut data = Vec::with_capacity(n);
data.resize(n, T::default());
let access = ffi::ibv_access_flags::IBV_ACCESS_LOCAL_WRITE
| ffi::ibv_access_flags::IBV_ACCESS_REMOTE_WRITE
| ffi::ibv_access_flags::IBV_ACCESS_REMOTE_READ
| ffi::ibv_access_flags::IBV_ACCESS_REMOTE_ATOMIC;
let mr = unsafe {
ffi::ibv_reg_mr(
self.pd,
data.as_mut_ptr() as *mut _,
n * mem::size_of::<T>(),
access.0 as i32,
)
};
if mr.is_null() {
Err(io::Error::last_os_error())
} else {
Ok(MemoryRegion { mr, data })
}
}
}
impl<'a> Drop for ProtectionDomain<'a> {
fn drop(&mut self) {
let errno = unsafe { ffi::ibv_dealloc_pd(self.pd) };
if errno != 0 {
let e = io::Error::from_raw_os_error(errno);
panic!("{}", e);
}
}
}
pub struct QueuePair<'res> {
_phantom: PhantomData<&'res ()>,
qp: *mut ffi::ibv_qp,
}
unsafe impl<'a> Send for QueuePair<'a> {}
unsafe impl<'a> Sync for QueuePair<'a> {}
impl<'res> QueuePair<'res> {
#[inline]
pub unsafe fn post_send<T, R>(
&mut self,
mr: &mut MemoryRegion<T>,
range: R,
wr_id: u64,
) -> io::Result<()>
where
R: sliceindex::SliceIndex<[T], Output = [T]>,
{
let range = range.index(mr);
let mut sge = ffi::ibv_sge {
addr: range.as_ptr() as u64,
length: (mem::size_of::<T>() * range.len()) as u32,
lkey: (&*mr.mr).lkey,
};
let mut wr = ffi::ibv_send_wr {
wr_id: wr_id,
next: ptr::null::<ffi::ibv_send_wr>() as *mut _,
sg_list: &mut sge as *mut _,
num_sge: 1,
opcode: ffi::ibv_wr_opcode::IBV_WR_SEND,
send_flags: ffi::ibv_send_flags::IBV_SEND_SIGNALED.0,
wr: Default::default(),
qp_type: Default::default(),
__bindgen_anon_1: Default::default(),
__bindgen_anon_2: Default::default(),
};
let mut bad_wr: *mut ffi::ibv_send_wr = ptr::null::<ffi::ibv_send_wr>() as *mut _;
let ctx = (&*self.qp).context;
let ops = &mut (&mut *ctx).ops;
let errno =
ops.post_send.as_mut().unwrap()(self.qp, &mut wr as *mut _, &mut bad_wr as *mut _);
if errno != 0 {
Err(io::Error::from_raw_os_error(errno))
} else {
Ok(())
}
}
#[inline]
pub unsafe fn post_receive<T, R>(
&mut self,
mr: &mut MemoryRegion<T>,
range: R,
wr_id: u64,
) -> io::Result<()>
where
R: sliceindex::SliceIndex<[T], Output = [T]>,
{
let range = range.index(mr);
let mut sge = ffi::ibv_sge {
addr: range.as_ptr() as u64,
length: (mem::size_of::<T>() * range.len()) as u32,
lkey: (&*mr.mr).lkey,
};
let mut wr = ffi::ibv_recv_wr {
wr_id: wr_id,
next: ptr::null::<ffi::ibv_send_wr>() as *mut _,
sg_list: &mut sge as *mut _,
num_sge: 1,
};
let mut bad_wr: *mut ffi::ibv_recv_wr = ptr::null::<ffi::ibv_recv_wr>() as *mut _;
let ctx = (&*self.qp).context;
let ops = &mut (&mut *ctx).ops;
let errno =
ops.post_recv.as_mut().unwrap()(self.qp, &mut wr as *mut _, &mut bad_wr as *mut _);
if errno != 0 {
Err(io::Error::from_raw_os_error(errno))
} else {
Ok(())
}
}
}
impl<'a> Drop for QueuePair<'a> {
fn drop(&mut self) {
let errno = unsafe { ffi::ibv_destroy_qp(self.qp) };
if errno != 0 {
let e = io::Error::from_raw_os_error(errno);
panic!("{}", e);
}
}
}
#[cfg(all(test, feature = "serde"))]
mod test_serde {
use super::*;
#[test]
fn encode_decode() {
let qpe_default = QueuePairEndpoint {
num: 72,
lid: 9,
gid: Default::default(),
};
let mut qpe = qpe_default;
qpe.gid.raw = unsafe { std::mem::transmute([87_u64.to_be(), 192_u64.to_be()]) };
let encoded = bincode::serialize(&qpe).unwrap();
let decoded: QueuePairEndpoint = bincode::deserialize(&encoded).unwrap();
assert_eq!(decoded.gid.subnet_prefix(), 87);
assert_eq!(decoded.gid.interface_id(), 192);
assert_eq!(qpe, decoded);
assert_ne!(qpe, qpe_default);
}
}