1use std::sync::Arc;
4
5use rdma_io_sys::ibverbs::*;
6use rdma_io_sys::wrapper::*;
7
8use crate::Result;
9use crate::cq::CompletionQueue;
10use crate::error::from_ret;
11use crate::pd::ProtectionDomain;
12use crate::wr::{QpState, QpType, RecvWr, SendWr};
13
14#[derive(Debug, Clone)]
16pub struct QpInitAttr {
17 pub qp_type: QpType,
19 pub max_send_wr: u32,
21 pub max_recv_wr: u32,
23 pub max_send_sge: u32,
25 pub max_recv_sge: u32,
27 pub max_inline_data: u32,
29 pub sq_sig_all: bool,
31}
32
33impl Default for QpInitAttr {
34 fn default() -> Self {
35 Self {
36 qp_type: QpType::Rc,
37 max_send_wr: 16,
38 max_recv_wr: 16,
39 max_send_sge: 1,
40 max_recv_sge: 1,
41 max_inline_data: 0,
42 sq_sig_all: true,
43 }
44 }
45}
46
47pub struct QueuePair {
49 pub(crate) inner: *mut ibv_qp,
50 pub(crate) _pd: Arc<ProtectionDomain>,
51 pub(crate) _send_cq: Arc<CompletionQueue>,
52 pub(crate) _recv_cq: Arc<CompletionQueue>,
53}
54
55unsafe impl Send for QueuePair {}
57unsafe impl Sync for QueuePair {}
58
59impl Drop for QueuePair {
60 fn drop(&mut self) {
61 let ret = unsafe { ibv_destroy_qp(self.inner) };
62 if ret != 0 {
63 tracing::error!(
64 "ibv_destroy_qp failed: {}",
65 std::io::Error::from_raw_os_error(-ret)
66 );
67 }
68 }
69}
70
71impl QueuePair {
72 pub fn qp_num(&self) -> u32 {
74 unsafe { (*self.inner).qp_num }
75 }
76
77 pub fn state(&self) -> QpState {
79 QpState::from_raw(unsafe { (*self.inner).state })
80 }
81
82 pub fn modify(&self, attr: &mut ibv_qp_attr, attr_mask: u32) -> Result<()> {
87 from_ret(unsafe { ibv_modify_qp(self.inner, attr, attr_mask as i32) })
88 }
89
90 pub fn to_init(&self, port_num: u8, pkey_index: u16, access_flags: u32) -> Result<()> {
92 let mut attr = ibv_qp_attr {
93 qp_state: IBV_QPS_INIT,
94 pkey_index,
95 port_num,
96 qp_access_flags: access_flags,
97 ..Default::default()
98 };
99 let mask = IBV_QP_STATE | IBV_QP_PKEY_INDEX | IBV_QP_PORT | IBV_QP_ACCESS_FLAGS;
100 self.modify(&mut attr, mask)
101 }
102
103 pub fn to_rtr(
110 &self,
111 dest_qp_num: u32,
112 rq_psn: u32,
113 dgid: &ibv_gid,
114 port_num: u8,
115 ) -> Result<()> {
116 let mut attr = ibv_qp_attr {
117 qp_state: IBV_QPS_RTR,
118 path_mtu: IBV_MTU_1024,
119 dest_qp_num,
120 rq_psn,
121 max_dest_rd_atomic: 1,
122 min_rnr_timer: 12,
123 ah_attr: ibv_ah_attr {
124 grh: ibv_global_route {
125 dgid: *dgid,
126 sgid_index: 0,
127 hop_limit: 1,
128 ..Default::default()
129 },
130 dlid: 0,
131 sl: 0,
132 src_path_bits: 0,
133 is_global: 1,
134 port_num,
135 ..Default::default()
136 },
137 ..Default::default()
138 };
139 let mask = IBV_QP_STATE
140 | IBV_QP_AV
141 | IBV_QP_PATH_MTU
142 | IBV_QP_DEST_QPN
143 | IBV_QP_RQ_PSN
144 | IBV_QP_MAX_DEST_RD_ATOMIC
145 | IBV_QP_MIN_RNR_TIMER;
146 self.modify(&mut attr, mask)
147 }
148
149 pub fn to_rts(&self, sq_psn: u32) -> Result<()> {
151 let mut attr = ibv_qp_attr {
152 qp_state: IBV_QPS_RTS,
153 timeout: 14,
154 retry_cnt: 7,
155 rnr_retry: 7,
156 sq_psn,
157 max_rd_atomic: 1,
158 ..Default::default()
159 };
160 let mask = IBV_QP_STATE
161 | IBV_QP_TIMEOUT
162 | IBV_QP_RETRY_CNT
163 | IBV_QP_RNR_RETRY
164 | IBV_QP_SQ_PSN
165 | IBV_QP_MAX_QP_RD_ATOMIC;
166 self.modify(&mut attr, mask)
167 }
168
169 pub fn post_send(&self, wr: &mut SendWr) -> Result<()> {
171 let mut raw = wr.build_raw();
172 let mut bad_wr: *mut ibv_send_wr = std::ptr::null_mut();
173 from_ret(unsafe { rdma_wrap_ibv_post_send(self.inner, &mut raw, &mut bad_wr) })
174 }
175
176 pub fn post_recv(&self, wr: &mut RecvWr) -> Result<()> {
178 let mut raw = wr.build_raw();
179 let mut bad_wr: *mut ibv_recv_wr = std::ptr::null_mut();
180 from_ret(unsafe { rdma_wrap_ibv_post_recv(self.inner, &mut raw, &mut bad_wr) })
181 }
182
183 pub fn query(&self, attr_mask: u32) -> Result<(ibv_qp_attr, ibv_qp_init_attr)> {
185 let mut attr = ibv_qp_attr::default();
186 let mut init_attr = ibv_qp_init_attr::default();
187 from_ret(unsafe { ibv_query_qp(self.inner, &mut attr, attr_mask as i32, &mut init_attr) })?;
188 Ok((attr, init_attr))
189 }
190
191 pub fn as_raw(&self) -> *mut ibv_qp {
193 self.inner
194 }
195}