ibverbs_rs/ibverbs/queue_pair/
builder.rs1use crate::ibverbs::access_config::AccessFlags;
14use crate::ibverbs::completion_queue::CompletionQueue;
15use crate::ibverbs::device::IB_PORT;
16use crate::ibverbs::error::{IbvError, IbvResult};
17use crate::ibverbs::protection_domain::ProtectionDomain;
18use crate::ibverbs::queue_pair::QueuePair;
19use crate::ibverbs::queue_pair::config::*;
20use bon::bon;
21use ibverbs_sys::*;
22use serde::{Deserialize, Serialize};
23use std::ffi::c_void;
24use std::{io, ptr};
25
26#[bon]
27impl QueuePair {
28 #[builder(state_mod(vis = "pub(crate)"))]
48 pub fn builder(
49 pd: &ProtectionDomain,
50 send_cq: &CompletionQueue,
51 recv_cq: &CompletionQueue,
52 access: AccessFlags,
53 #[builder(default = 16)] max_send_wr: u32,
54 #[builder(default = 16)] max_recv_wr: u32,
55 #[builder(default = 16)] max_send_sge: u32,
56 #[builder(default = 16)] max_recv_sge: u32,
57 #[builder(default)] max_rnr_retries: MaxRnrRetries,
58 #[builder(default)] max_ack_retries: MaxAckRetries,
59 #[builder(default)] min_rnr_timer: MinRnrTimer,
60 #[builder(default)] ack_timeout: AckTimeout,
61 #[builder(default)] mtu: MaximumTransferUnit,
62 #[builder(default)] send_psn: PacketSequenceNumber,
63 #[builder(default)] recv_psn: PacketSequenceNumber,
64 ) -> IbvResult<PreparedQueuePair> {
65 let mut attr = ibv_qp_init_attr {
66 qp_context: ptr::null::<c_void>() as *mut _,
67 send_cq: send_cq.inner.cq as *const _ as *mut _,
68 recv_cq: recv_cq.inner.cq as *const _ as *mut _,
69 srq: ptr::null::<ibv_srq>() as *mut _,
70 cap: ibv_qp_cap {
71 max_send_wr,
72 max_recv_wr,
73 max_send_sge,
74 max_recv_sge,
75 max_inline_data: 0,
76 },
77 qp_type: ibv_qp_type::IBV_QPT_RC,
78 sq_sig_all: 0,
79 };
80
81 let qp = unsafe { ibv_create_qp(pd.inner.pd, &mut attr as *mut _) };
82 if qp.is_null() {
83 return Err(IbvError::from_errno_with_msg(
84 io::Error::last_os_error()
85 .raw_os_error()
86 .expect("ibv_create_qp should set errno on error"),
87 "Failed to create queue pair",
88 ));
89 }
90 let endpoint = QueuePairEndpoint {
91 num: unsafe { *qp }.qp_num,
92 lid: pd.inner.context.inner.query_port()?.lid,
93 };
94 log::debug!("QueuePair created");
95 Ok(PreparedQueuePair {
96 qp: QueuePair {
97 pd: pd.clone(),
98 _send_cq: send_cq.clone(),
99 _recv_cq: recv_cq.clone(),
100 qp,
101 },
102 endpoint,
103 access,
104 max_rnr_retries,
105 max_ack_retries,
106 min_rnr_timer,
107 ack_timeout,
108 mtu,
109 send_psn,
110 recv_psn,
111 })
112 }
113}
114
115#[derive(Debug)]
127pub struct PreparedQueuePair {
128 qp: QueuePair,
129 endpoint: QueuePairEndpoint,
130
131 access: AccessFlags,
132 max_rnr_retries: MaxRnrRetries,
133 max_ack_retries: MaxAckRetries,
134 min_rnr_timer: MinRnrTimer,
135 ack_timeout: AckTimeout,
136 mtu: MaximumTransferUnit,
137 send_psn: PacketSequenceNumber,
138 recv_psn: PacketSequenceNumber,
139}
140
141#[derive(Copy, Clone, PartialEq, Eq, Debug, Serialize, Deserialize)]
146pub struct QueuePairEndpoint {
147 pub num: u32,
149 pub lid: u16,
151}
152
153impl PreparedQueuePair {
154 pub fn endpoint(&self) -> QueuePairEndpoint {
158 self.endpoint
159 }
160
161 #[allow(clippy::cast_possible_wrap)]
175 pub fn handshake(self, remote: QueuePairEndpoint) -> IbvResult<QueuePair> {
176 let mut attr = ibv_qp_attr {
178 qp_state: ibv_qp_state::IBV_QPS_INIT,
179 pkey_index: 0,
180 port_num: IB_PORT,
181 qp_access_flags: self.access.code(),
182 ..Default::default()
183 };
184 let mask = ibv_qp_attr_mask::IBV_QP_STATE
185 | ibv_qp_attr_mask::IBV_QP_PKEY_INDEX
186 | ibv_qp_attr_mask::IBV_QP_PORT
187 | ibv_qp_attr_mask::IBV_QP_ACCESS_FLAGS;
188 let errno = unsafe { ibv_modify_qp(self.qp.qp, &mut attr as *mut _, mask.0 as i32) };
189 if errno != 0 {
190 return Err(IbvError::from_errno_with_msg(
191 errno,
192 "Failed to set queue pair to Init state",
193 ));
194 }
195
196 let mut attr = ibv_qp_attr {
198 qp_state: ibv_qp_state::IBV_QPS_RTR,
199 path_mtu: self.mtu.code() as ibv_mtu,
200 dest_qp_num: remote.num,
201 rq_psn: self.recv_psn.code(),
202 max_dest_rd_atomic: 1,
203 min_rnr_timer: self.min_rnr_timer.code(), ah_attr: ibv_ah_attr {
205 dlid: remote.lid,
206 is_global: 0,
207 sl: 0,
208 src_path_bits: 0,
209 port_num: IB_PORT,
210 ..Default::default()
211 },
212 ..Default::default()
213 };
214 let mask = ibv_qp_attr_mask::IBV_QP_STATE
215 | ibv_qp_attr_mask::IBV_QP_AV
216 | ibv_qp_attr_mask::IBV_QP_PATH_MTU
217 | ibv_qp_attr_mask::IBV_QP_DEST_QPN
218 | ibv_qp_attr_mask::IBV_QP_RQ_PSN
219 | ibv_qp_attr_mask::IBV_QP_MAX_DEST_RD_ATOMIC
220 | ibv_qp_attr_mask::IBV_QP_MIN_RNR_TIMER;
221
222 let errno = unsafe { ibv_modify_qp(self.qp.qp, &mut attr as *mut _, mask.0 as i32) };
223 if errno != 0 {
224 return Err(IbvError::from_errno_with_msg(
225 errno,
226 "Failed to set queue pair to Ready to Receive state",
227 ));
228 }
229
230 let mut attr = ibv_qp_attr {
232 qp_state: ibv_qp_state::IBV_QPS_RTS,
233 timeout: self.ack_timeout.code(),
234 retry_cnt: self.max_ack_retries.retries(),
235 rnr_retry: self.max_rnr_retries.code(),
236 max_rd_atomic: 1,
237 sq_psn: self.send_psn.code(),
238 ..Default::default()
239 };
240 let mask = ibv_qp_attr_mask::IBV_QP_STATE
241 | ibv_qp_attr_mask::IBV_QP_TIMEOUT
242 | ibv_qp_attr_mask::IBV_QP_RETRY_CNT
243 | ibv_qp_attr_mask::IBV_QP_RNR_RETRY
244 | ibv_qp_attr_mask::IBV_QP_MAX_QP_RD_ATOMIC
245 | ibv_qp_attr_mask::IBV_QP_SQ_PSN;
246 let errno = unsafe { ibv_modify_qp(self.qp.qp, &mut attr as *mut _, mask.0 as i32) };
247 if errno != 0 {
248 return Err(IbvError::from_errno_with_msg(
249 errno,
250 "Failed to set queue pair to Ready to Send state",
251 ));
252 }
253
254 Ok(self.qp)
255 }
256}