use crate::bindings as C;
use crate::cq::{self, CompletionQueue};
use crate::ctx::Context;
use crate::error::{create_resource, custom_error};
use crate::weakset::WeakSet;
use std::os::raw::c_void;
use std::os::unix::prelude::{AsRawFd, RawFd};
use std::ptr::NonNull;
use std::sync::{Arc, Weak};
use std::{io, ptr};
use parking_lot::Mutex;
#[derive(Clone)]
pub struct CompChannel(Arc<Owner>);
impl CompChannel {
pub(crate) fn ffi_ptr(&self) -> *mut C::ibv_comp_channel {
self.0.ffi_ptr()
}
#[inline]
pub fn create(ctx: &Context) -> io::Result<Self> {
let owner = unsafe {
let cc = create_resource(
|| C::ibv_create_comp_channel(ctx.ffi_ptr()),
|| "failed to create completion channel",
)?;
Arc::new(Owner {
cc,
cq_ref: Mutex::new(WeakSet::new()),
_ctx: ctx.clone(),
})
};
Ok(Self(owner))
}
#[inline]
pub fn wait_cq_event(&self) -> io::Result<CompletionQueue> {
let cc = self.ffi_ptr();
let mut cq: *mut C::ibv_cq = ptr::null_mut();
let mut cq_context: *mut c_void = ptr::null_mut();
unsafe {
let ret = C::ibv_get_cq_event(cc, &mut cq, &mut cq_context);
if ret != 0 {
return Err(custom_error("failed to get completion event"));
}
debug_assert_eq!((*cq).cq_context, cq_context);
}
unsafe { Ok(CompletionQueue::from_cq_context(cq_context)) }
}
pub(crate) fn add_cq_ref(&self, cq: Weak<cq::Owner>) {
self.0.cq_ref.lock().insert(cq);
}
pub(crate) fn del_cq_ref(&self, cq: &cq::Owner) -> bool {
self.0.cq_ref.lock().remove(cq)
}
}
impl AsRawFd for CompChannel {
#[inline]
fn as_raw_fd(&self) -> RawFd {
let cc = self.ffi_ptr();
unsafe { (*cc).fd }
}
}
struct Owner {
cc: NonNull<C::ibv_comp_channel>,
cq_ref: Mutex<WeakSet<cq::Owner>>,
_ctx: Context,
}
unsafe impl Send for Owner {}
unsafe impl Sync for Owner {}
impl Owner {
pub(crate) fn ffi_ptr(&self) -> *mut C::ibv_comp_channel {
self.cc.as_ptr()
}
}
impl Drop for Owner {
fn drop(&mut self) {
unsafe {
let cc = self.ffi_ptr();
let ret = C::ibv_destroy_comp_channel(cc);
assert_eq!(ret, 0);
}
}
}