#![allow(clippy::needless_doctest_main)]
use std::any::Any;
use std::collections::HashMap;
use std::os::fd::{AsRawFd, RawFd};
use std::ptr::{null, null_mut};
use std::sync::{LazyLock, Mutex, Weak};
use std::time::Duration;
use std::{io, mem::MaybeUninit, net::SocketAddr, ptr::NonNull, sync::Arc};
use os_socketaddr::OsSocketAddr;
use rdma_mummy_sys::{
ibv_qp_attr, rdma_accept, rdma_ack_cm_event, rdma_bind_addr, rdma_cm_event, rdma_cm_event_type, rdma_cm_id,
rdma_conn_param, rdma_connect, rdma_create_event_channel, rdma_create_id, rdma_destroy_event_channel,
rdma_destroy_id, rdma_disconnect, rdma_establish, rdma_event_channel, rdma_get_cm_event, rdma_init_qp_attr,
rdma_listen, rdma_port_space, rdma_resolve_addr, rdma_resolve_route,
};
use crate::ibverbs::device_context::DeviceContext;
use crate::ibverbs::queue_pair::{QueuePairAttribute, QueuePairState};
#[repr(u32)]
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord)]
pub enum EventType {
AddressResolved = rdma_cm_event_type::RDMA_CM_EVENT_ADDR_RESOLVED,
AddressError = rdma_cm_event_type::RDMA_CM_EVENT_ADDR_ERROR,
RouteResolved = rdma_cm_event_type::RDMA_CM_EVENT_ROUTE_RESOLVED,
RouteError = rdma_cm_event_type::RDMA_CM_EVENT_ROUTE_ERROR,
ConnectRequest = rdma_cm_event_type::RDMA_CM_EVENT_CONNECT_REQUEST,
ConnectResponse = rdma_cm_event_type::RDMA_CM_EVENT_CONNECT_RESPONSE,
ConnectError = rdma_cm_event_type::RDMA_CM_EVENT_CONNECT_ERROR,
Unreachable = rdma_cm_event_type::RDMA_CM_EVENT_UNREACHABLE,
Rejected = rdma_cm_event_type::RDMA_CM_EVENT_REJECTED,
Established = rdma_cm_event_type::RDMA_CM_EVENT_ESTABLISHED,
Disconnected = rdma_cm_event_type::RDMA_CM_EVENT_DISCONNECTED,
DeviceRemoval = rdma_cm_event_type::RDMA_CM_EVENT_DEVICE_REMOVAL,
MulticastJoin = rdma_cm_event_type::RDMA_CM_EVENT_MULTICAST_JOIN,
MulticastError = rdma_cm_event_type::RDMA_CM_EVENT_MULTICAST_ERROR,
AddressChange = rdma_cm_event_type::RDMA_CM_EVENT_ADDR_CHANGE,
TimewaitExit = rdma_cm_event_type::RDMA_CM_EVENT_TIMEWAIT_EXIT,
}
impl From<u32> for EventType {
fn from(event: u32) -> Self {
match event {
rdma_cm_event_type::RDMA_CM_EVENT_ADDR_RESOLVED => EventType::AddressResolved,
rdma_cm_event_type::RDMA_CM_EVENT_ADDR_ERROR => EventType::AddressError,
rdma_cm_event_type::RDMA_CM_EVENT_ROUTE_RESOLVED => EventType::RouteResolved,
rdma_cm_event_type::RDMA_CM_EVENT_ROUTE_ERROR => EventType::RouteError,
rdma_cm_event_type::RDMA_CM_EVENT_CONNECT_REQUEST => EventType::ConnectRequest,
rdma_cm_event_type::RDMA_CM_EVENT_CONNECT_RESPONSE => EventType::ConnectResponse,
rdma_cm_event_type::RDMA_CM_EVENT_CONNECT_ERROR => EventType::ConnectError,
rdma_cm_event_type::RDMA_CM_EVENT_UNREACHABLE => EventType::Unreachable,
rdma_cm_event_type::RDMA_CM_EVENT_REJECTED => EventType::Rejected,
rdma_cm_event_type::RDMA_CM_EVENT_ESTABLISHED => EventType::Established,
rdma_cm_event_type::RDMA_CM_EVENT_DISCONNECTED => EventType::Disconnected,
rdma_cm_event_type::RDMA_CM_EVENT_DEVICE_REMOVAL => EventType::DeviceRemoval,
rdma_cm_event_type::RDMA_CM_EVENT_MULTICAST_JOIN => EventType::MulticastJoin,
rdma_cm_event_type::RDMA_CM_EVENT_MULTICAST_ERROR => EventType::MulticastError,
rdma_cm_event_type::RDMA_CM_EVENT_ADDR_CHANGE => EventType::AddressChange,
rdma_cm_event_type::RDMA_CM_EVENT_TIMEWAIT_EXIT => EventType::TimewaitExit,
_ => panic!("Unknown RDMA CM event type: {event}"),
}
}
}
static DEVICE_LISTS: LazyLock<Mutex<HashMap<usize, Arc<DeviceContext>>>> = LazyLock::new(|| Mutex::new(HashMap::new()));
pub struct Event {
event: NonNull<rdma_cm_event>,
cm_id: Option<Arc<Identifier>>,
listener_id: Option<Arc<Identifier>>,
}
pub struct EventChannel {
channel: NonNull<rdma_event_channel>,
}
pub struct Identifier {
_event_channel: Arc<EventChannel>,
cm_id: NonNull<rdma_cm_id>,
user_context: Mutex<Option<Arc<dyn Any + Send + Sync>>>,
}
pub struct ConnectionParameter(rdma_conn_param);
pub enum PortSpace {
InfiniBand = rdma_port_space::RDMA_PS_IB as isize,
IpOverInfiniBand = rdma_port_space::RDMA_PS_IPOIB as isize,
Tcp = rdma_port_space::RDMA_PS_TCP as isize,
Udp = rdma_port_space::RDMA_PS_UDP as isize,
}
#[derive(Debug, thiserror::Error)]
#[error("failed to create rdma cm event channel")]
#[non_exhaustive]
pub struct CreateEventChannelError(#[from] pub CreateEventChannelErrorKind);
#[derive(Debug, thiserror::Error)]
#[error(transparent)]
#[non_exhaustive]
pub enum CreateEventChannelErrorKind {
Rdmacm(#[from] io::Error),
}
#[derive(Debug, thiserror::Error)]
#[error("failed to create rdma cm identifier")]
#[non_exhaustive]
pub struct CreateIdentifierError(#[from] pub CreateIdentifierErrorKind);
#[derive(Debug, thiserror::Error)]
#[error(transparent)]
#[non_exhaustive]
pub enum CreateIdentifierErrorKind {
Rdmacm(#[from] io::Error),
}
#[derive(Debug, thiserror::Error)]
#[error("failed to get rdma cm event")]
#[non_exhaustive]
pub struct GetEventError(#[from] pub GetEventErrorKind);
#[derive(Debug, thiserror::Error)]
#[error(transparent)]
#[non_exhaustive]
pub enum GetEventErrorKind {
Rdmacm(#[from] io::Error),
#[error("no event in event channel")]
NoEvent,
}
#[derive(Debug, thiserror::Error)]
#[error("failed to acknowledge rdma cm event")]
#[non_exhaustive]
pub struct AcknowledgeEventError(#[from] pub AcknowledgeEventErrorKind);
#[derive(Debug, thiserror::Error)]
#[error(transparent)]
#[non_exhaustive]
pub enum AcknowledgeEventErrorKind {
Rdmacm(#[from] io::Error),
}
#[derive(Debug, thiserror::Error)]
#[error("failed to bind address (addr={addr})")]
#[non_exhaustive]
pub struct BindAddressError {
pub addr: SocketAddr,
pub source: BindAddressErrorKind,
}
#[derive(Debug, thiserror::Error)]
#[error(transparent)]
#[non_exhaustive]
pub enum BindAddressErrorKind {
Rdmacm(#[from] io::Error),
}
#[derive(Debug, thiserror::Error)]
#[error("failed to resolve address (src_addr={:?}, dst_addr={dst_addr})", src_addr)]
#[non_exhaustive]
pub struct ResolveAddressError {
pub src_addr: Option<SocketAddr>,
pub dst_addr: SocketAddr,
pub source: ResolveAddressErrorKind,
}
#[derive(Debug, thiserror::Error)]
#[error(transparent)]
#[non_exhaustive]
pub enum ResolveAddressErrorKind {
Rdmacm(#[from] io::Error),
}
#[derive(Debug, thiserror::Error)]
#[error("failed to resolve route")]
#[non_exhaustive]
pub struct ResolveRouteError(#[from] pub ResolveRouteErrorKind);
#[derive(Debug, thiserror::Error)]
#[error(transparent)]
#[non_exhaustive]
pub enum ResolveRouteErrorKind {
Rdmacm(#[from] io::Error),
}
#[derive(Debug, thiserror::Error)]
#[error("failed to listen")]
#[non_exhaustive]
pub struct ListenError(#[from] pub ListenErrorKind);
#[derive(Debug, thiserror::Error)]
#[error(transparent)]
#[non_exhaustive]
pub enum ListenErrorKind {
Rdmacm(#[from] io::Error),
}
#[derive(Debug, thiserror::Error)]
#[error("failed to connect")]
#[non_exhaustive]
pub struct ConnectError(#[from] pub ConnectErrorKind);
#[derive(Debug, thiserror::Error)]
#[error(transparent)]
#[non_exhaustive]
pub enum ConnectErrorKind {
Rdmacm(#[from] io::Error),
}
#[derive(Debug, thiserror::Error)]
#[error("failed to accept")]
#[non_exhaustive]
pub struct AcceptError(#[from] pub AcceptErrorKind);
#[derive(Debug, thiserror::Error)]
#[error(transparent)]
#[non_exhaustive]
pub enum AcceptErrorKind {
Rdmacm(#[from] io::Error),
}
#[derive(Debug, thiserror::Error)]
#[error("failed to establish")]
#[non_exhaustive]
pub struct EstablishError(#[from] pub EstablishErrorKind);
#[derive(Debug, thiserror::Error)]
#[error(transparent)]
#[non_exhaustive]
pub enum EstablishErrorKind {
Rdmacm(#[from] io::Error),
}
#[derive(Debug, thiserror::Error)]
#[error("failed to disconnect")]
#[non_exhaustive]
pub struct DisconnectError(#[from] pub DisconnectErrorKind);
#[derive(Debug, thiserror::Error)]
#[error(transparent)]
#[non_exhaustive]
pub enum DisconnectErrorKind {
Rdmacm(#[from] io::Error),
}
#[derive(Debug, thiserror::Error)]
#[error("failed to get qp attribute")]
#[non_exhaustive]
pub struct GetQueuePairAttributeError(#[from] pub GetQueuePairAttributeErrorKind);
#[derive(Debug, thiserror::Error)]
#[error(transparent)]
#[non_exhaustive]
pub enum GetQueuePairAttributeErrorKind {
Rdmacm(#[from] io::Error),
}
impl Drop for EventChannel {
fn drop(&mut self) {
unsafe {
rdma_destroy_event_channel(self.channel.as_mut());
}
}
}
impl Event {
pub fn cm_id(&self) -> Option<Arc<Identifier>> {
self.cm_id.clone()
}
pub fn listener_id(&self) -> Option<Arc<Identifier>> {
self.listener_id.clone()
}
pub fn event_type(&self) -> EventType {
unsafe { self.event.as_ref().event.into() }
}
pub fn status(&self) -> i32 {
unsafe { self.event.as_ref().status }
}
pub fn ack(mut self) -> Result<(), AcknowledgeEventError> {
let ret = unsafe { rdma_ack_cm_event(self.event.as_mut()) };
if ret < 0 {
return Err(AcknowledgeEventErrorKind::Rdmacm(io::Error::last_os_error()).into());
}
self.cm_id.take();
self.listener_id.take();
std::mem::forget(self);
Ok(())
}
}
impl Drop for Event {
fn drop(&mut self) {
unsafe {
rdma_ack_cm_event(self.event.as_mut());
}
}
}
fn new_cm_id_for_raw(event_channel: Arc<EventChannel>, raw: *mut rdma_cm_id) -> Arc<Identifier> {
let cm = unsafe {
Arc::new(Identifier {
_event_channel: event_channel,
cm_id: NonNull::new(raw).unwrap_unchecked(),
user_context: Mutex::new(None),
})
};
let weak_cm = Arc::downgrade(&cm.clone());
let boxed = Box::new(weak_cm);
let raw_box = Box::into_raw(boxed);
unsafe {
(*raw).context = raw_box as *mut std::ffi::c_void;
}
cm
}
impl EventChannel {
pub fn new() -> Result<Arc<EventChannel>, CreateEventChannelError> {
let channel = unsafe { rdma_create_event_channel() };
if channel.is_null() {
return Err(CreateEventChannelErrorKind::Rdmacm(io::Error::last_os_error()).into());
}
Ok(Arc::new(EventChannel {
channel: unsafe { NonNull::new(channel).unwrap_unchecked() },
}))
}
pub fn create_id(self: &Arc<Self>, port_space: PortSpace) -> Result<Arc<Identifier>, CreateIdentifierError> {
let mut cm_id_ptr: *mut rdma_cm_id = null_mut();
let ret = unsafe { rdma_create_id(self.channel.as_ptr(), &mut cm_id_ptr, null_mut(), port_space as u32) };
if ret < 0 {
return Err(CreateIdentifierErrorKind::Rdmacm(io::Error::last_os_error()).into());
}
Ok(new_cm_id_for_raw(self.clone(), cm_id_ptr))
}
pub fn get_cm_event(self: &Arc<Self>) -> Result<Event, GetEventError> {
let mut event_ptr = MaybeUninit::<*mut rdma_cm_event>::uninit();
let ret = unsafe { rdma_get_cm_event(self.channel.as_ptr(), event_ptr.as_mut_ptr()) };
if ret < 0 {
match io::Error::last_os_error().kind() {
io::ErrorKind::WouldBlock => return Err(GetEventErrorKind::NoEvent.into()),
err => return Err(GetEventErrorKind::Rdmacm(err.into()).into()),
}
}
let event = unsafe { NonNull::new(event_ptr.assume_init()).unwrap() };
let cm_id = unsafe {
let raw_cm_id = event.as_ref().id;
assert_ne!(raw_cm_id, null_mut());
if event.as_ref().event == EventType::ConnectRequest as u32 {
Some(new_cm_id_for_raw(self.clone(), raw_cm_id))
} else {
let context_ptr = (*raw_cm_id).context as *mut Weak<Identifier>;
assert_ne!(context_ptr, null_mut());
(*context_ptr).clone().upgrade()
}
};
let listener_id = unsafe {
let raw_listen_id = event.as_ref().listen_id;
if !raw_listen_id.is_null() {
let context_ptr = (*raw_listen_id).context as *mut Weak<Identifier>;
assert_ne!(context_ptr, null_mut());
(*context_ptr).clone().upgrade()
} else {
None
}
};
Ok(Event {
event,
cm_id,
listener_id,
})
}
pub fn set_nonblocking(&self, nonblocking: bool) -> io::Result<()> {
let fd = self.as_raw_fd();
unsafe {
let previous = libc::fcntl(fd, libc::F_GETFL);
if previous < 0 {
return Err(io::Error::last_os_error());
}
let new = if nonblocking {
previous | libc::O_NONBLOCK
} else {
previous & !libc::O_NONBLOCK
};
if libc::fcntl(fd, libc::F_SETFL, new) < 0 {
return Err(io::Error::last_os_error());
}
Ok(())
}
}
}
impl AsRawFd for EventChannel {
fn as_raw_fd(&self) -> RawFd {
unsafe { self.channel.as_ref().fd }
}
}
unsafe impl Send for EventChannel {}
unsafe impl Sync for EventChannel {}
impl Drop for Identifier {
fn drop(&mut self) {
let cm_id = self.cm_id;
unsafe {
let ctx = cm_id.as_ref().context as *mut Weak<Identifier>;
rdma_destroy_id(cm_id.as_ptr());
let _ = Box::from_raw(ctx);
}
}
}
unsafe impl Sync for Identifier {}
unsafe impl Send for Identifier {}
impl Identifier {
pub fn setup_context<C: Any + Send + Sync>(&self, ctx: C) {
let mut user_data = self.user_context.lock().unwrap();
*user_data = Some(Arc::new(ctx));
}
pub fn get_context<C: Any + Send + Sync>(&self) -> Option<Arc<C>> {
let user_data = self.user_context.lock().unwrap();
let arc_any = user_data.as_ref()?.clone();
arc_any.downcast::<C>().ok()
}
pub fn port(&self) -> u8 {
let cm_id = self.cm_id;
unsafe { cm_id.as_ref().port_num }
}
pub fn bind_addr(&self, addr: SocketAddr) -> Result<(), BindAddressError> {
let cm_id = self.cm_id;
let ret = unsafe { rdma_bind_addr(cm_id.as_ptr(), OsSocketAddr::from(addr).as_mut_ptr()) };
if ret < 0 {
return Err(BindAddressError {
addr,
source: BindAddressErrorKind::Rdmacm(io::Error::last_os_error()),
});
}
Ok(())
}
pub fn resolve_addr(
&self, src_addr: Option<SocketAddr>, dst_addr: SocketAddr, timeout: Duration,
) -> Result<(), ResolveAddressError> {
let cm_id = self.cm_id;
let timeout_ms: i32 = timeout.as_millis().try_into().unwrap();
let ret = unsafe {
rdma_resolve_addr(
cm_id.as_ptr(),
match src_addr {
Some(addr) => OsSocketAddr::from(addr).as_mut_ptr(),
None => null_mut(),
},
OsSocketAddr::from(dst_addr).as_mut_ptr(),
timeout_ms,
)
};
if ret < 0 {
return Err(ResolveAddressError {
src_addr,
dst_addr,
source: ResolveAddressErrorKind::Rdmacm(io::Error::last_os_error()),
});
}
Ok(())
}
pub fn resolve_route(&self, timeout: Duration) -> Result<(), ResolveRouteError> {
let cm_id = self.cm_id;
let timeout_ms: i32 = timeout.as_millis().try_into().unwrap();
let ret = unsafe { rdma_resolve_route(cm_id.as_ptr(), timeout_ms) };
if ret < 0 {
return Err(ResolveRouteErrorKind::Rdmacm(io::Error::last_os_error()).into());
}
Ok(())
}
pub fn listen(&self, backlog: i32) -> Result<(), ListenError> {
let cm_id = self.cm_id;
let ret = unsafe { rdma_listen(cm_id.as_ptr(), backlog) };
if ret < 0 {
return Err(ListenErrorKind::Rdmacm(io::Error::last_os_error()).into());
}
Ok(())
}
pub fn get_device_context(&self) -> Option<Arc<DeviceContext>> {
let cm_id = self.cm_id;
unsafe {
if (*cm_id.as_ptr()).verbs.is_null() {
return None;
}
let mut guard = DEVICE_LISTS.lock().unwrap();
let device_ctx = guard.entry((*cm_id.as_ptr()).verbs as usize).or_insert_with(|| {
Arc::new(DeviceContext {
context: NonNull::new((*cm_id.as_ptr()).verbs).unwrap(),
})
});
Some(device_ctx.clone())
}
}
pub fn connect(&self, mut conn_param: ConnectionParameter) -> Result<(), ConnectError> {
let cm_id = self.cm_id;
let ret = unsafe { rdma_connect(cm_id.as_ptr(), &mut conn_param.0) };
if ret < 0 {
return Err(ConnectErrorKind::Rdmacm(io::Error::last_os_error()).into());
}
Ok(())
}
pub fn disconnect(&self) -> Result<(), DisconnectError> {
let cm_id = self.cm_id;
let ret = unsafe { rdma_disconnect(cm_id.as_ptr()) };
if ret < 0 {
return Err(DisconnectErrorKind::Rdmacm(io::Error::last_os_error()).into());
}
Ok(())
}
pub fn accept(&self, mut conn_param: ConnectionParameter) -> Result<(), AcceptError> {
let cm_id = self.cm_id;
let ret = unsafe { rdma_accept(cm_id.as_ptr(), &mut conn_param.0) };
if ret < 0 {
return Err(AcceptErrorKind::Rdmacm(io::Error::last_os_error()).into());
}
Ok(())
}
pub fn establish(&self) -> Result<(), EstablishError> {
let cm_id = self.cm_id;
let ret = unsafe { rdma_establish(cm_id.as_ptr()) };
if ret < 0 {
return Err(EstablishErrorKind::Rdmacm(io::Error::last_os_error()).into());
}
Ok(())
}
pub fn get_qp_attr(&self, state: QueuePairState) -> Result<QueuePairAttribute, GetQueuePairAttributeError> {
let cm_id = self.cm_id;
let mut attr = MaybeUninit::<ibv_qp_attr>::uninit();
let mut mask = 0;
unsafe { (*attr.as_mut_ptr()).qp_state = state as _ };
let ret = unsafe { rdma_init_qp_attr(cm_id.as_ptr(), attr.as_mut_ptr(), &mut mask) };
if ret < 0 {
return Err(GetQueuePairAttributeErrorKind::Rdmacm(io::Error::last_os_error()).into());
}
Ok(QueuePairAttribute::from(unsafe { attr.assume_init_ref() }, mask))
}
}
impl Default for ConnectionParameter {
fn default() -> Self {
Self(rdma_conn_param {
private_data: null(),
private_data_len: 0,
responder_resources: 1,
initiator_depth: 1,
flow_control: 0,
retry_count: 7,
rnr_retry_count: 7,
srq: 0,
qp_num: 0,
})
}
}
impl ConnectionParameter {
pub fn new() -> Self {
Self(rdma_conn_param {
private_data: null(),
private_data_len: 0,
responder_resources: 0,
initiator_depth: 0,
flow_control: 0,
retry_count: 0,
rnr_retry_count: 0,
srq: 0,
qp_num: 0,
})
}
pub fn setup_qp_number(&mut self, qp_number: u32) -> &mut Self {
self.0.qp_num = qp_number;
self
}
}
#[cfg(test)]
mod tests {
use super::*;
use polling::{Event, Events, Poller};
use std::net::{IpAddr, SocketAddr};
use std::str::FromStr;
use std::thread;
#[test]
fn test_cm_id_reference_count() -> Result<(), Box<dyn std::error::Error>> {
match EventChannel::new() {
Ok(channel) => {
let id = channel.create_id(PortSpace::Tcp).unwrap();
assert_eq!(Arc::strong_count(&channel), 2);
assert_eq!(Arc::strong_count(&id), 1);
let _ = id.resolve_addr(
None,
SocketAddr::from((IpAddr::from_str("127.0.0.1").expect("Invalid IP address"), 0)),
Duration::new(0, 200000000),
);
assert_eq!(Arc::strong_count(&id), 1);
let event = channel.get_cm_event().unwrap();
assert_eq!(Arc::strong_count(&id), 2);
let cm_id = event.cm_id().unwrap();
assert_eq!(Arc::strong_count(&id), 3);
assert_eq!(Arc::strong_count(&cm_id), 3);
event.ack().unwrap();
assert_eq!(Arc::strong_count(&id), 2);
assert_eq!(Arc::strong_count(&cm_id), 2);
Ok(())
},
Err(_) => Ok(()),
}
}
#[test]
fn test_channel_event_fd() -> Result<(), Box<dyn std::error::Error>> {
match EventChannel::new() {
Ok(channel) => {
let id = channel.create_id(PortSpace::Tcp).unwrap();
assert_eq!(Arc::strong_count(&id), 1);
channel.set_nonblocking(true).unwrap();
let dispatcher = thread::spawn(move || {
let poller = Poller::new().expect("Failed to create poller");
let key = 233;
assert_eq!(Arc::strong_count(&channel), 2);
unsafe { poller.add(&channel, Event::readable(key)).unwrap() };
let mut events = Events::new();
events.clear();
poller.wait(&mut events, None).unwrap();
assert_eq!(events.len(), 1);
for ev in events.iter() {
assert_eq!(ev.key, key);
let event = channel.get_cm_event().unwrap();
assert_eq!(event.event_type(), EventType::AddressResolved);
assert_eq!(Arc::strong_count(&channel), 2);
event.ack().unwrap();
assert_eq!(Arc::strong_count(&channel), 2);
}
});
let _ = id.resolve_addr(
None,
SocketAddr::from((IpAddr::from_str("127.0.0.1").expect("Invalid IP address"), 0)),
Duration::new(0, 200000000),
);
dispatcher.join().unwrap();
assert_eq!(Arc::strong_count(&id), 1);
Ok(())
},
Err(_) => Ok(()),
}
}
#[test]
fn test_bind_on_the_same_port() -> Result<(), Box<dyn std::error::Error>> {
match EventChannel::new() {
Ok(channel) => {
let id = channel.create_id(PortSpace::Tcp).unwrap();
let address = SocketAddr::from((IpAddr::from_str("0.0.0.0").expect("Invalid IP address"), 8080));
let res = id.bind_addr(address);
assert!(res.is_ok());
let new_id = channel.create_id(PortSpace::Tcp).unwrap();
let err = new_id.bind_addr(address).err().unwrap();
assert_eq!(err.addr, address);
match err.source {
BindAddressErrorKind::Rdmacm(err) => assert_eq!(err.kind(), io::ErrorKind::AddrNotAvailable),
};
Ok(())
},
Err(_) => Ok(()),
}
}
#[test]
fn test_conn_param() -> Result<(), Box<dyn std::error::Error>> {
match EventChannel::new() {
Ok(channel) => {
let _id = channel.create_id(PortSpace::Tcp).unwrap();
let _param = ConnectionParameter::new();
Ok(())
},
Err(_) => Ok(()),
}
}
#[test]
fn test_event_channel_outlives_identifier_arc_counts() -> Result<(), Box<dyn std::error::Error>> {
match EventChannel::new() {
Ok(channel) => {
let id = channel.create_id(PortSpace::Tcp).unwrap();
assert_eq!(Arc::strong_count(&channel), 2);
drop(id);
assert_eq!(Arc::strong_count(&channel), 1);
Ok(())
},
Err(_) => Ok(()),
}
}
#[test]
fn test_get_device_context_caches_correctly() -> Result<(), Box<dyn std::error::Error>> {
match EventChannel::new() {
Ok(channel) => {
let id = channel.create_id(PortSpace::Tcp)?;
let _ = id.resolve_addr(
None,
SocketAddr::from((IpAddr::from_str("127.0.0.1")?, 0)),
Duration::new(0, 200000000),
);
let event = channel.get_cm_event()?;
assert_eq!(event.event_type(), EventType::AddressResolved);
let ctx1 = id.get_device_context();
let ctx2 = id.get_device_context();
let ctx3 = id.get_device_context();
assert!(ctx1.is_some(), "First get_device_context should return Some");
assert!(ctx2.is_some(), "Second get_device_context should return Some");
assert!(ctx3.is_some(), "Third get_device_context should return Some");
assert!(
Arc::ptr_eq(&ctx1.clone().unwrap(), &ctx2.clone().unwrap()),
"ctx1 and ctx2 should point to the same DeviceContext"
);
assert!(
Arc::ptr_eq(&ctx2.clone().unwrap(), &ctx3.clone().unwrap()),
"ctx2 and ctx3 should point to the same DeviceContext"
);
let ctx = ctx1.unwrap();
let _pd = ctx.alloc_pd()?;
Ok(())
},
Err(_) => Ok(()),
}
}
}