#![deny(missing_docs)]
#![warn(rust_2018_idioms)]
#![allow(clippy::doc_markdown)]
use std::convert::TryInto;
use std::ffi::CStr;
use std::io;
use std::marker::PhantomData;
use std::mem;
use std::os::raw::c_void;
use std::ptr;
const PORT_NUM: u8 = 1;
pub use ffi::ibv_gid_type;
pub use ffi::ibv_qp_type;
pub use ffi::ibv_wc;
pub use ffi::ibv_wc_opcode;
pub use ffi::ibv_wc_status;
#[cfg(feature = "serde")]
use serde::{Deserialize, Serialize};
pub use ffi::ibv_access_flags;
mod sliceindex;
pub fn devices() -> io::Result<DeviceList> {
let mut n = 0i32;
let devices = unsafe { ffi::ibv_get_device_list(&mut n as *mut _) };
if devices.is_null() {
return Err(io::Error::last_os_error());
}
let devices = unsafe {
use std::slice;
slice::from_raw_parts_mut(devices, n as usize)
};
Ok(DeviceList(devices))
}
pub struct DeviceList(&'static mut [*mut ffi::ibv_device]);
unsafe impl Sync for DeviceList {}
unsafe impl Send for DeviceList {}
impl Drop for DeviceList {
fn drop(&mut self) {
unsafe { ffi::ibv_free_device_list(self.0.as_mut_ptr()) };
}
}
impl DeviceList {
pub fn iter(&self) -> DeviceListIter<'_> {
DeviceListIter { list: self, i: 0 }
}
pub fn len(&self) -> usize {
self.0.len()
}
pub fn is_empty(&self) -> bool {
self.0.is_empty()
}
pub fn get(&self, index: usize) -> Option<Device<'_>> {
self.0.get(index).map(|d| d.into())
}
}
impl<'a> IntoIterator for &'a DeviceList {
type Item = <DeviceListIter<'a> as Iterator>::Item;
type IntoIter = DeviceListIter<'a>;
fn into_iter(self) -> Self::IntoIter {
DeviceListIter { list: self, i: 0 }
}
}
pub struct DeviceListIter<'iter> {
list: &'iter DeviceList,
i: usize,
}
impl<'iter> Iterator for DeviceListIter<'iter> {
type Item = Device<'iter>;
fn next(&mut self) -> Option<Self::Item> {
let e = self.list.0.get(self.i);
if e.is_some() {
self.i += 1;
}
e.map(|e| e.into())
}
}
pub struct Device<'devlist>(&'devlist *mut ffi::ibv_device);
unsafe impl<'devlist> Sync for Device<'devlist> {}
unsafe impl<'devlist> Send for Device<'devlist> {}
impl<'d> From<&'d *mut ffi::ibv_device> for Device<'d> {
fn from(d: &'d *mut ffi::ibv_device) -> Self {
Device(d)
}
}
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
#[derive(Default, Copy, Clone, Debug, Eq, PartialEq, Hash)]
#[repr(transparent)]
pub struct Guid {
raw: [u8; 8],
}
impl Guid {
pub fn oui(&self) -> u32 {
let padded = [0, self.raw[0], self.raw[1], self.raw[2]];
u32::from_be_bytes(padded)
}
pub fn is_reserved(&self) -> bool {
self.raw == [0; 8]
}
}
impl From<u64> for Guid {
fn from(guid: u64) -> Self {
Self {
raw: guid.to_be_bytes(),
}
}
}
impl From<Guid> for u64 {
fn from(guid: Guid) -> Self {
u64::from_be_bytes(guid.raw)
}
}
impl AsRef<ffi::__be64> for Guid {
fn as_ref(&self) -> &ffi::__be64 {
unsafe { &*self.raw.as_ptr().cast::<ffi::__be64>() }
}
}
impl<'devlist> Device<'devlist> {
pub fn open(&self) -> io::Result<Context> {
Context::with_device(*self.0)
}
pub fn name(&self) -> Option<&'devlist CStr> {
let name_ptr = unsafe { ffi::ibv_get_device_name(*self.0) };
if name_ptr.is_null() {
None
} else {
Some(unsafe { CStr::from_ptr(name_ptr) })
}
}
pub fn guid(&self) -> io::Result<Guid> {
let guid_int = unsafe { ffi::ibv_get_device_guid(*self.0) };
let guid: Guid = guid_int.into();
if guid.is_reserved() {
Err(io::Error::last_os_error())
} else {
Ok(guid)
}
}
pub fn index(&self) -> io::Result<i32> {
let idx = unsafe { ffi::ibv_get_device_index(*self.0) };
if idx == -1 {
Err(io::Error::new(
io::ErrorKind::Unsupported,
"device index not known",
))
} else {
Ok(idx)
}
}
}
pub struct Context {
ctx: *mut ffi::ibv_context,
port_attr: ffi::ibv_port_attr,
gid_table: Vec<GidEntry>,
}
unsafe impl Sync for Context {}
unsafe impl Send for Context {}
impl Context {
fn with_device(dev: *mut ffi::ibv_device) -> io::Result<Context> {
assert!(!dev.is_null());
let ctx = unsafe { ffi::ibv_open_device(dev) };
if ctx.is_null() {
return Err(io::Error::new(
io::ErrorKind::Other,
"failed to open device".to_string(),
));
}
let mut port_attr = ffi::ibv_port_attr::default();
let errno = unsafe {
ffi::ibv_query_port(
ctx,
PORT_NUM,
&mut port_attr as *mut ffi::ibv_port_attr as *mut _,
)
};
if errno != 0 {
return Err(io::Error::from_raw_os_error(errno));
}
match port_attr.state {
ffi::ibv_port_state::IBV_PORT_ACTIVE | ffi::ibv_port_state::IBV_PORT_ARMED => {}
_ => {
return Err(io::Error::new(
io::ErrorKind::Other,
"port is not ACTIVE or ARMED".to_string(),
));
}
}
let mut gid_table = vec![ffi::ibv_gid_entry::default(); port_attr.gid_tbl_len as usize];
let num_entries = unsafe {
ffi::_ibv_query_gid_table(
ctx,
gid_table.as_mut_ptr(),
gid_table.len(),
0,
size_of::<ffi::ibv_gid_entry>(),
)
};
if num_entries < 0 {
return Err(io::Error::new(
io::ErrorKind::Other,
format!("failed to query gid table, error={}", -num_entries),
));
}
gid_table.truncate(num_entries as usize);
let gid_table = gid_table.into_iter().map(GidEntry::from).collect();
Ok(Context {
ctx,
port_attr,
gid_table,
})
}
pub fn create_cq(&self, min_cq_entries: i32, id: isize) -> io::Result<CompletionQueue<'_>> {
let cq = unsafe {
ffi::ibv_create_cq(
self.ctx,
min_cq_entries,
ptr::null::<c_void>().offset(id) as *mut _,
ptr::null::<c_void>() as *mut _,
0,
)
};
if cq.is_null() {
Err(io::Error::last_os_error())
} else {
Ok(CompletionQueue {
_phantom: PhantomData,
cq,
})
}
}
pub fn alloc_pd(&self) -> io::Result<ProtectionDomain<'_>> {
let pd = unsafe { ffi::ibv_alloc_pd(self.ctx) };
if pd.is_null() {
Err(io::Error::new(
io::ErrorKind::Other,
"obv_alloc_pd returned null",
))
} else {
Ok(ProtectionDomain { ctx: self, pd })
}
}
pub fn gid_table(&self) -> &[GidEntry] {
&self.gid_table
}
}
impl Drop for Context {
fn drop(&mut self) {
let ok = unsafe { ffi::ibv_close_device(self.ctx) };
assert_eq!(ok, 0);
}
}
pub struct CompletionQueue<'ctx> {
_phantom: PhantomData<&'ctx ()>,
cq: *mut ffi::ibv_cq,
}
unsafe impl<'a> Send for CompletionQueue<'a> {}
unsafe impl<'a> Sync for CompletionQueue<'a> {}
impl<'ctx> CompletionQueue<'ctx> {
#[inline]
pub fn poll<'c>(
&self,
completions: &'c mut [ffi::ibv_wc],
) -> io::Result<&'c mut [ffi::ibv_wc]> {
let ctx: *mut ffi::ibv_context = unsafe { &*self.cq }.context;
let ops = &mut unsafe { &mut *ctx }.ops;
let n = unsafe {
ops.poll_cq.as_mut().unwrap()(
self.cq,
completions.len() as i32,
completions.as_mut_ptr(),
)
};
if n < 0 {
Err(io::Error::new(io::ErrorKind::Other, "ibv_poll_cq failed"))
} else {
Ok(&mut completions[0..n as usize])
}
}
}
impl<'a> Drop for CompletionQueue<'a> {
fn drop(&mut self) {
let errno = unsafe { ffi::ibv_destroy_cq(self.cq) };
if errno != 0 {
let e = io::Error::from_raw_os_error(errno);
panic!("{}", e);
}
}
}
pub struct QueuePairBuilder<'res> {
ctx: isize,
pd: &'res ProtectionDomain<'res>,
send: &'res CompletionQueue<'res>,
max_send_wr: u32,
recv: &'res CompletionQueue<'res>,
max_recv_wr: u32,
gid_index: Option<u32>,
max_send_sge: u32,
max_recv_sge: u32,
max_inline_data: u32,
qp_type: ffi::ibv_qp_type::Type,
access: Option<ffi::ibv_access_flags>,
timeout: Option<u8>,
retry_count: Option<u8>,
rnr_retry: Option<u8>,
min_rnr_timer: Option<u8>,
max_rd_atomic: Option<u8>,
max_dest_rd_atomic: Option<u8>,
path_mtu: Option<u32>,
rq_psn: Option<u32>,
}
impl<'res> QueuePairBuilder<'res> {
fn new<'scq, 'rcq, 'pd>(
pd: &'pd ProtectionDomain<'_>,
send: &'scq CompletionQueue<'_>,
max_send_wr: u32,
recv: &'rcq CompletionQueue<'_>,
max_recv_wr: u32,
qp_type: ffi::ibv_qp_type::Type,
) -> QueuePairBuilder<'res>
where
'scq: 'res,
'rcq: 'res,
'pd: 'res,
{
QueuePairBuilder {
ctx: 0,
pd,
gid_index: None,
send,
max_send_wr,
recv,
max_recv_wr,
max_send_sge: 1,
max_recv_sge: 1,
max_inline_data: 0,
qp_type,
access: (qp_type == ffi::ibv_qp_type::IBV_QPT_RC
|| qp_type == ffi::ibv_qp_type::IBV_QPT_UC)
.then_some(ffi::ibv_access_flags::IBV_ACCESS_LOCAL_WRITE),
min_rnr_timer: (qp_type == ffi::ibv_qp_type::IBV_QPT_RC).then_some(16),
retry_count: (qp_type == ffi::ibv_qp_type::IBV_QPT_RC).then_some(6),
rnr_retry: (qp_type == ffi::ibv_qp_type::IBV_QPT_RC).then_some(6),
timeout: (qp_type == ffi::ibv_qp_type::IBV_QPT_RC).then_some(4),
max_rd_atomic: (qp_type == ffi::ibv_qp_type::IBV_QPT_RC).then_some(1),
max_dest_rd_atomic: (qp_type == ffi::ibv_qp_type::IBV_QPT_RC).then_some(1),
path_mtu: (qp_type == ffi::ibv_qp_type::IBV_QPT_RC
|| qp_type == ffi::ibv_qp_type::IBV_QPT_UC)
.then_some(pd.ctx.port_attr.active_mtu),
rq_psn: (qp_type == ffi::ibv_qp_type::IBV_QPT_RC
|| qp_type == ffi::ibv_qp_type::IBV_QPT_UC)
.then_some(0),
}
}
pub fn set_access(&mut self, access: ffi::ibv_access_flags) -> &mut Self {
if self.qp_type == ffi::ibv_qp_type::IBV_QPT_RC
|| self.qp_type == ffi::ibv_qp_type::IBV_QPT_UC
{
self.access = Some(access);
}
self
}
pub fn allow_remote_rw(&mut self) -> &mut Self {
if self.qp_type == ffi::ibv_qp_type::IBV_QPT_RC
|| self.qp_type == ffi::ibv_qp_type::IBV_QPT_UC
{
self.access = Some(
self.access.expect("always set to Some in new")
| ffi::ibv_access_flags::IBV_ACCESS_REMOTE_WRITE
| ffi::ibv_access_flags::IBV_ACCESS_REMOTE_READ,
);
}
self
}
pub fn set_gid_index(&mut self, gid_index: u32) -> &mut Self {
assert!(gid_index < self.pd.ctx.gid_table.len() as u32);
self.gid_index = Some(gid_index);
self
}
pub fn set_min_rnr_timer(&mut self, timer: u8) -> &mut Self {
if self.qp_type == ffi::ibv_qp_type::IBV_QPT_RC {
self.min_rnr_timer = Some(timer);
}
self
}
pub fn set_timeout(&mut self, timeout: u8) -> &mut Self {
if self.qp_type == ffi::ibv_qp_type::IBV_QPT_RC {
self.timeout = Some(timeout);
}
self
}
pub fn set_retry_count(&mut self, count: u8) -> &mut Self {
if self.qp_type == ffi::ibv_qp_type::IBV_QPT_RC {
assert!(count <= 7);
self.retry_count = Some(count);
}
self
}
pub fn set_rnr_retry(&mut self, n: u8) -> &mut Self {
if self.qp_type == ffi::ibv_qp_type::IBV_QPT_RC {
assert!(n <= 7);
self.rnr_retry = Some(n);
}
self
}
pub fn set_max_rd_atomic(&mut self, max_rd_atomic: u8) -> &mut Self {
if self.qp_type == ffi::ibv_qp_type::IBV_QPT_RC {
self.max_rd_atomic = Some(max_rd_atomic);
}
self
}
pub fn set_max_dest_rd_atomic(&mut self, max_dest_rd_atomic: u8) -> &mut Self {
if self.qp_type == ffi::ibv_qp_type::IBV_QPT_RC {
self.max_dest_rd_atomic = Some(max_dest_rd_atomic);
}
self
}
pub fn set_path_mtu(&mut self, path_mtu: u32) -> &mut Self {
if self.qp_type == ffi::ibv_qp_type::IBV_QPT_RC
|| self.qp_type == ffi::ibv_qp_type::IBV_QPT_UC
{
assert!((1..=5).contains(&path_mtu));
self.path_mtu = Some(path_mtu);
}
self
}
pub fn set_rq_psn(&mut self, rq_psn: u32) -> &mut Self {
if self.qp_type == ffi::ibv_qp_type::IBV_QPT_RC
|| self.qp_type == ffi::ibv_qp_type::IBV_QPT_UC
{
self.rq_psn = Some(rq_psn);
}
self
}
pub fn set_context(&mut self, ctx: isize) -> &mut Self {
self.ctx = ctx;
self
}
pub fn set_max_send_wr(&mut self, max_send_wr: u32) -> &mut Self {
self.max_send_wr = max_send_wr;
self
}
pub fn set_max_recv_wr(&mut self, max_recv_wr: u32) -> &mut Self {
self.max_recv_wr = max_recv_wr;
self
}
pub fn build(&self) -> io::Result<PreparedQueuePair<'res>> {
let mut attr = ffi::ibv_qp_init_attr {
qp_context: unsafe { ptr::null::<c_void>().offset(self.ctx) } as *mut _,
send_cq: self.send.cq as *const _ as *mut _,
recv_cq: self.recv.cq as *const _ as *mut _,
srq: ptr::null::<ffi::ibv_srq>() as *mut _,
cap: ffi::ibv_qp_cap {
max_send_wr: self.max_send_wr,
max_recv_wr: self.max_recv_wr,
max_send_sge: self.max_send_sge,
max_recv_sge: self.max_recv_sge,
max_inline_data: self.max_inline_data,
},
qp_type: self.qp_type,
sq_sig_all: 0,
};
let qp = unsafe { ffi::ibv_create_qp(self.pd.pd, &mut attr as *mut _) };
if qp.is_null() {
Err(io::Error::last_os_error())
} else {
Ok(PreparedQueuePair {
ctx: self.pd.ctx,
qp: QueuePair {
_phantom: PhantomData,
qp,
},
gid_index: self.gid_index,
access: self.access,
timeout: self.timeout,
retry_count: self.retry_count,
rnr_retry: self.rnr_retry,
min_rnr_timer: self.min_rnr_timer,
max_rd_atomic: self.max_rd_atomic,
max_dest_rd_atomic: self.max_dest_rd_atomic,
path_mtu: self.path_mtu,
rq_psn: self.rq_psn,
})
}
}
}
pub struct PreparedQueuePair<'res> {
ctx: &'res Context,
qp: QueuePair<'res>,
gid_index: Option<u32>,
access: Option<ffi::ibv_access_flags>,
min_rnr_timer: Option<u8>,
timeout: Option<u8>,
retry_count: Option<u8>,
rnr_retry: Option<u8>,
max_rd_atomic: Option<u8>,
max_dest_rd_atomic: Option<u8>,
path_mtu: Option<u32>,
rq_psn: Option<u32>,
}
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
#[derive(Default, Copy, Clone, Debug, Eq, PartialEq, Hash)]
#[repr(transparent)]
pub struct Gid {
raw: [u8; 16],
}
impl Gid {
pub fn subnet_prefix(&self) -> u64 {
u64::from_be_bytes(self.raw[..8].try_into().unwrap())
}
pub fn interface_id(&self) -> u64 {
u64::from_be_bytes(self.raw[8..].try_into().unwrap())
}
}
impl From<ffi::ibv_gid> for Gid {
fn from(gid: ffi::ibv_gid) -> Self {
Self {
raw: unsafe { gid.raw },
}
}
}
impl From<Gid> for ffi::ibv_gid {
fn from(mut gid: Gid) -> Self {
*gid.as_mut()
}
}
impl AsRef<ffi::ibv_gid> for Gid {
fn as_ref(&self) -> &ffi::ibv_gid {
unsafe { &*self.raw.as_ptr().cast::<ffi::ibv_gid>() }
}
}
impl AsMut<ffi::ibv_gid> for Gid {
fn as_mut(&mut self) -> &mut ffi::ibv_gid {
unsafe { &mut *self.raw.as_mut_ptr().cast::<ffi::ibv_gid>() }
}
}
#[derive(Debug, Clone)]
pub struct GidEntry {
pub gid: Gid,
pub gid_index: u32,
pub port_num: u32,
pub gid_type: ffi::ibv_gid_type,
pub ndev_ifindex: u32,
}
impl From<ffi::ibv_gid_entry> for GidEntry {
fn from(gid_entry: ffi::ibv_gid_entry) -> Self {
Self {
gid: gid_entry.gid.into(),
gid_index: gid_entry.gid_index,
port_num: gid_entry.port_num,
gid_type: gid_entry.gid_type as ffi::ibv_gid_type,
ndev_ifindex: gid_entry.ndev_ifindex,
}
}
}
#[derive(Copy, Clone, PartialEq, Eq, Debug)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct QueuePairEndpoint {
pub num: u32,
pub lid: u16,
pub gid: Option<Gid>,
}
impl<'res> PreparedQueuePair<'res> {
pub fn endpoint(&self) -> QueuePairEndpoint {
let num = unsafe { &*self.qp.qp }.qp_num;
let gid = self.gid_index.map(|gid_index| {
let gid_entry = &self.ctx.gid_table[gid_index as usize];
assert_eq!(gid_entry.gid_index, gid_index);
gid_entry.gid
});
QueuePairEndpoint {
num,
lid: self.ctx.port_attr.lid,
gid,
}
}
pub fn handshake(self, remote: QueuePairEndpoint) -> io::Result<QueuePair<'res>> {
let mut attr = ffi::ibv_qp_attr {
qp_state: ffi::ibv_qp_state::IBV_QPS_INIT,
pkey_index: 0,
port_num: PORT_NUM,
..Default::default()
};
let mut mask = ffi::ibv_qp_attr_mask::IBV_QP_STATE
| ffi::ibv_qp_attr_mask::IBV_QP_PKEY_INDEX
| ffi::ibv_qp_attr_mask::IBV_QP_PORT;
if let Some(access) = self.access {
attr.qp_access_flags = access.0;
mask |= ffi::ibv_qp_attr_mask::IBV_QP_ACCESS_FLAGS;
}
let errno = unsafe { ffi::ibv_modify_qp(self.qp.qp, &mut attr as *mut _, mask.0 as i32) };
if errno != 0 {
return Err(io::Error::from_raw_os_error(errno));
}
let mut attr = ffi::ibv_qp_attr {
qp_state: ffi::ibv_qp_state::IBV_QPS_RTR,
dest_qp_num: remote.num,
ah_attr: ffi::ibv_ah_attr {
dlid: remote.lid,
sl: 0,
src_path_bits: 0,
port_num: PORT_NUM,
grh: Default::default(),
..Default::default()
},
..Default::default()
};
if let Some(gid) = remote.gid {
attr.ah_attr.is_global = 1;
attr.ah_attr.grh.dgid = gid.into();
attr.ah_attr.grh.hop_limit = 0xff;
attr.ah_attr.grh.sgid_index = self
.gid_index
.ok_or_else(|| io::Error::other("gid was set for remote but not local"))?
as u8;
}
let mut mask = ffi::ibv_qp_attr_mask::IBV_QP_STATE
| ffi::ibv_qp_attr_mask::IBV_QP_AV
| ffi::ibv_qp_attr_mask::IBV_QP_DEST_QPN;
if let Some(max_dest_rd_atomic) = self.max_dest_rd_atomic {
attr.max_dest_rd_atomic = max_dest_rd_atomic;
mask |= ffi::ibv_qp_attr_mask::IBV_QP_MAX_DEST_RD_ATOMIC;
}
if let Some(min_rnr_timer) = self.min_rnr_timer {
attr.min_rnr_timer = min_rnr_timer;
mask |= ffi::ibv_qp_attr_mask::IBV_QP_MIN_RNR_TIMER;
}
if let Some(path_mtu) = self.path_mtu {
attr.path_mtu = path_mtu;
mask |= ffi::ibv_qp_attr_mask::IBV_QP_PATH_MTU;
}
if let Some(rq_psn) = self.rq_psn {
attr.rq_psn = rq_psn;
mask |= ffi::ibv_qp_attr_mask::IBV_QP_RQ_PSN;
}
let errno = unsafe { ffi::ibv_modify_qp(self.qp.qp, &mut attr as *mut _, mask.0 as i32) };
if errno != 0 {
return Err(io::Error::from_raw_os_error(errno));
}
let mut attr = ffi::ibv_qp_attr {
qp_state: ffi::ibv_qp_state::IBV_QPS_RTS,
sq_psn: 0,
..Default::default()
};
let mut mask = ffi::ibv_qp_attr_mask::IBV_QP_STATE | ffi::ibv_qp_attr_mask::IBV_QP_SQ_PSN;
if let Some(timeout) = self.timeout {
attr.timeout = timeout;
mask |= ffi::ibv_qp_attr_mask::IBV_QP_TIMEOUT;
}
if let Some(retry_count) = self.retry_count {
attr.retry_cnt = retry_count;
mask |= ffi::ibv_qp_attr_mask::IBV_QP_RETRY_CNT;
}
if let Some(rnr_retry) = self.rnr_retry {
attr.rnr_retry = rnr_retry;
mask |= ffi::ibv_qp_attr_mask::IBV_QP_RNR_RETRY;
}
if let Some(max_rd_atomic) = self.max_rd_atomic {
attr.max_rd_atomic = max_rd_atomic;
mask |= ffi::ibv_qp_attr_mask::IBV_QP_MAX_QP_RD_ATOMIC;
}
let errno = unsafe { ffi::ibv_modify_qp(self.qp.qp, &mut attr as *mut _, mask.0 as i32) };
if errno != 0 {
return Err(io::Error::from_raw_os_error(errno));
}
Ok(self.qp)
}
}
pub struct MemoryRegion<T> {
mr: *mut ffi::ibv_mr,
data: Vec<T>,
}
unsafe impl<T> Send for MemoryRegion<T> {}
unsafe impl<T> Sync for MemoryRegion<T> {}
use std::ops::{Deref, DerefMut};
impl<T> Deref for MemoryRegion<T> {
type Target = [T];
fn deref(&self) -> &Self::Target {
&self.data[..]
}
}
impl<T> DerefMut for MemoryRegion<T> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.data[..]
}
}
impl<T> MemoryRegion<T> {
pub fn rkey(&self) -> RemoteKey {
RemoteKey {
key: unsafe { &*self.mr }.rkey,
}
}
}
#[derive(Debug, Clone, Copy)]
#[non_exhaustive]
pub struct RemoteKey {
pub key: u32,
}
impl<T> Drop for MemoryRegion<T> {
fn drop(&mut self) {
let errno = unsafe { ffi::ibv_dereg_mr(self.mr) };
if errno != 0 {
let e = io::Error::from_raw_os_error(errno);
panic!("{}", e);
}
}
}
pub struct ProtectionDomain<'ctx> {
ctx: &'ctx Context,
pd: *mut ffi::ibv_pd,
}
unsafe impl<'a> Sync for ProtectionDomain<'a> {}
unsafe impl<'a> Send for ProtectionDomain<'a> {}
impl<'ctx> ProtectionDomain<'ctx> {
pub fn create_qp<'pd, 'scq, 'rcq, 'res>(
&'pd self,
send: &'scq CompletionQueue<'_>,
recv: &'rcq CompletionQueue<'_>,
qp_type: ffi::ibv_qp_type::Type,
) -> QueuePairBuilder<'res>
where
'scq: 'res,
'rcq: 'res,
'pd: 'res,
{
QueuePairBuilder::new(self, send, 1, recv, 1, qp_type)
}
pub fn allocate<T: Sized + Copy + Default>(&self, n: usize) -> io::Result<MemoryRegion<T>> {
assert!(n > 0);
assert!(mem::size_of::<T>() > 0);
let mut data = Vec::with_capacity(n);
data.resize(n, T::default());
let access = ffi::ibv_access_flags::IBV_ACCESS_LOCAL_WRITE
| ffi::ibv_access_flags::IBV_ACCESS_REMOTE_WRITE
| ffi::ibv_access_flags::IBV_ACCESS_REMOTE_READ
| ffi::ibv_access_flags::IBV_ACCESS_REMOTE_ATOMIC;
let mr = unsafe {
ffi::ibv_reg_mr(
self.pd,
data.as_mut_ptr() as *mut _,
n * mem::size_of::<T>(),
access.0 as i32,
)
};
if mr.is_null() {
Err(io::Error::last_os_error())
} else {
Ok(MemoryRegion { mr, data })
}
}
}
impl<'a> Drop for ProtectionDomain<'a> {
fn drop(&mut self) {
let errno = unsafe { ffi::ibv_dealloc_pd(self.pd) };
if errno != 0 {
let e = io::Error::from_raw_os_error(errno);
panic!("{}", e);
}
}
}
pub struct QueuePair<'res> {
_phantom: PhantomData<&'res ()>,
qp: *mut ffi::ibv_qp,
}
unsafe impl<'a> Send for QueuePair<'a> {}
unsafe impl<'a> Sync for QueuePair<'a> {}
impl<'res> QueuePair<'res> {
#[inline]
pub unsafe fn post_send<T, R>(
&mut self,
mr: &mut MemoryRegion<T>,
range: R,
wr_id: u64,
) -> io::Result<()>
where
R: sliceindex::SliceIndex<[T], Output = [T]>,
{
let range = range.index(mr);
let mut sge = ffi::ibv_sge {
addr: range.as_ptr() as u64,
length: mem::size_of_val(range) as u32,
lkey: (*mr.mr).lkey,
};
let mut wr = ffi::ibv_send_wr {
wr_id,
next: ptr::null::<ffi::ibv_send_wr>() as *mut _,
sg_list: &mut sge as *mut _,
num_sge: 1,
opcode: ffi::ibv_wr_opcode::IBV_WR_SEND,
send_flags: ffi::ibv_send_flags::IBV_SEND_SIGNALED.0,
wr: Default::default(),
qp_type: Default::default(),
__bindgen_anon_1: Default::default(),
__bindgen_anon_2: Default::default(),
};
let mut bad_wr: *mut ffi::ibv_send_wr = ptr::null::<ffi::ibv_send_wr>() as *mut _;
let ctx = (*self.qp).context;
let ops = &mut (*ctx).ops;
let errno =
ops.post_send.as_mut().unwrap()(self.qp, &mut wr as *mut _, &mut bad_wr as *mut _);
if errno != 0 {
Err(io::Error::from_raw_os_error(errno))
} else {
Ok(())
}
}
#[inline]
pub unsafe fn post_receive<T, R>(
&mut self,
mr: &mut MemoryRegion<T>,
range: R,
wr_id: u64,
) -> io::Result<()>
where
R: sliceindex::SliceIndex<[T], Output = [T]>,
{
let range = range.index(mr);
let mut sge = ffi::ibv_sge {
addr: range.as_ptr() as u64,
length: mem::size_of_val(range) as u32,
lkey: (*mr.mr).lkey,
};
let mut wr = ffi::ibv_recv_wr {
wr_id,
next: ptr::null::<ffi::ibv_send_wr>() as *mut _,
sg_list: &mut sge as *mut _,
num_sge: 1,
};
let mut bad_wr: *mut ffi::ibv_recv_wr = ptr::null::<ffi::ibv_recv_wr>() as *mut _;
let ctx = (*self.qp).context;
let ops = &mut (*ctx).ops;
let errno =
ops.post_recv.as_mut().unwrap()(self.qp, &mut wr as *mut _, &mut bad_wr as *mut _);
if errno != 0 {
Err(io::Error::from_raw_os_error(errno))
} else {
Ok(())
}
}
}
impl<'a> Drop for QueuePair<'a> {
fn drop(&mut self) {
let errno = unsafe { ffi::ibv_destroy_qp(self.qp) };
if errno != 0 {
let e = io::Error::from_raw_os_error(errno);
panic!("{}", e);
}
}
}
#[cfg(all(test, feature = "serde"))]
mod test_serde {
use super::*;
#[test]
fn encode_decode() {
let qpe_default = QueuePairEndpoint {
num: 72,
lid: 9,
gid: Some(Default::default()),
};
let mut qpe = qpe_default;
qpe.gid.as_mut().unwrap().raw =
unsafe { std::mem::transmute([87_u64.to_be(), 192_u64.to_be()]) };
let encoded = bincode::serialize(&qpe).unwrap();
let decoded: QueuePairEndpoint = bincode::deserialize(&encoded).unwrap();
assert_eq!(decoded.gid.unwrap().subnet_prefix(), 87);
assert_eq!(decoded.gid.unwrap().interface_id(), 192);
assert_eq!(qpe, decoded);
assert_ne!(qpe, qpe_default);
}
#[test]
fn encode_decode_guid() {
let guid_u64 = 0x12_34_56_78_9a_bc_de_f0_u64;
let _be: ffi::__be64 = guid_u64.to_be();
let guid: Guid = guid_u64.into();
assert_eq!(guid.is_reserved(), false);
assert_eq!(guid.raw, [0x12, 0x34, 0x56, 0x78, 0x9a, 0xbc, 0xde, 0xf0]);
println!("{:#08x}", guid.oui());
assert_eq!(guid.oui(), 0x123456);
}
}