Skip to main content

ibverbs_rs/network/ops/
multi_channel_ops.rs

1use crate::channel::TransportResult;
2use crate::channel::{PendingWork, PollingScope, ScopedPendingWork};
3use crate::ibverbs::error::IbvResult;
4use crate::ibverbs::work::{SendWorkRequest, WorkSuccess};
5use crate::multi_channel::{
6    PeerReadWorkRequest, PeerReceiveWorkRequest, PeerSendWorkRequest, PeerWriteWorkRequest,
7};
8use crate::network::Node;
9
10impl<'scope, 'env> PollingScope<'scope, 'env, Node> {
11    /// Posts sends to multiple peers, returning handles for manual polling.
12    pub fn post_scatter_send<'wr, I>(
13        &mut self,
14        wrs: I,
15    ) -> TransportResult<Vec<ScopedPendingWork<'scope>>>
16    where
17        I: IntoIterator<Item = PeerSendWorkRequest<'wr, 'env>>,
18        'env: 'wr,
19    {
20        wrs.into_iter().map(|wr| self.post_send(wr)).collect()
21    }
22
23    /// Posts RDMA writes to multiple peers, returning handles for manual polling.
24    pub fn post_scatter_write<'wr, I>(
25        &mut self,
26        wrs: I,
27    ) -> TransportResult<Vec<ScopedPendingWork<'scope>>>
28    where
29        I: IntoIterator<Item = PeerWriteWorkRequest<'wr, 'env>>,
30        'env: 'wr,
31    {
32        wrs.into_iter().map(|wr| self.post_write(wr)).collect()
33    }
34
35    /// Posts receives from multiple peers, returning handles for manual polling.
36    pub fn post_gather_receive<'wr, I>(
37        &mut self,
38        wrs: I,
39    ) -> TransportResult<Vec<ScopedPendingWork<'scope>>>
40    where
41        I: IntoIterator<Item = PeerReceiveWorkRequest<'wr, 'env>>,
42        'env: 'wr,
43    {
44        wrs.into_iter().map(|wr| self.post_receive(wr)).collect()
45    }
46
47    /// Posts RDMA reads from multiple peers, returning handles for manual polling.
48    pub fn post_gather_read<'wr, I>(
49        &mut self,
50        wrs: I,
51    ) -> TransportResult<Vec<ScopedPendingWork<'scope>>>
52    where
53        I: IntoIterator<Item = PeerReadWorkRequest<'wr, 'env>>,
54        'env: 'wr,
55    {
56        wrs.into_iter().map(|wr| self.post_read(wr)).collect()
57    }
58
59    /// Posts the same send to multiple peers, returning handles for manual polling.
60    pub fn post_multicast_send<'wr, I>(
61        &mut self,
62        peers: I,
63        wr: SendWorkRequest<'wr, 'env>,
64    ) -> TransportResult<Vec<ScopedPendingWork<'scope>>>
65    where
66        I: IntoIterator<Item = usize>,
67        'env: 'wr,
68    {
69        peers
70            .into_iter()
71            .map(|peer| self.post_send(PeerSendWorkRequest::from_wr(peer, wr.clone())))
72            .collect()
73    }
74}
75
76impl Node {
77    /// Posts sends to multiple peers and blocks until all complete.
78    pub fn scatter_send<'op, I>(&'op mut self, wrs: I) -> TransportResult<Vec<WorkSuccess>>
79    where
80        I: IntoIterator<Item = PeerSendWorkRequest<'op, 'op>>,
81    {
82        self.multi_channel.scatter_send(wrs)
83    }
84
85    /// Posts RDMA writes to multiple peers and blocks until all complete.
86    pub fn scatter_write<'op, I>(&'op mut self, wrs: I) -> TransportResult<Vec<WorkSuccess>>
87    where
88        I: IntoIterator<Item = PeerWriteWorkRequest<'op, 'op>>,
89    {
90        self.multi_channel.scatter_write(wrs)
91    }
92
93    /// Posts receives from multiple peers and blocks until all complete.
94    pub fn gather_receive<'op, I>(&'op mut self, wrs: I) -> TransportResult<Vec<WorkSuccess>>
95    where
96        I: IntoIterator<Item = PeerReceiveWorkRequest<'op, 'op>>,
97    {
98        self.multi_channel.gather_receive(wrs)
99    }
100
101    /// Posts RDMA reads from multiple peers and blocks until all complete.
102    pub fn gather_read<'op, I>(&'op mut self, wrs: I) -> TransportResult<Vec<WorkSuccess>>
103    where
104        I: IntoIterator<Item = PeerReadWorkRequest<'op, 'op>>,
105    {
106        self.multi_channel.gather_read(wrs)
107    }
108
109    /// Posts the same send to multiple peers and blocks until all complete.
110    pub fn multicast_send<'op, I>(
111        &'op mut self,
112        peers: I,
113        wr: SendWorkRequest<'op, 'op>,
114    ) -> TransportResult<Vec<WorkSuccess>>
115    where
116        I: IntoIterator<Item = usize>,
117    {
118        self.multi_channel.multicast_send(peers, wr)
119    }
120}
121
122impl Node {
123    /// Posts sends to multiple peers without polling for completion.
124    ///
125    /// # Safety
126    /// See [`Channel::send_unpolled`](crate::channel::Channel::send_unpolled).
127    pub unsafe fn scatter_send_unpolled<'wr, 'data, I>(
128        &mut self,
129        wrs: I,
130    ) -> IbvResult<Vec<PendingWork<'data>>>
131    where
132        I: IntoIterator<Item = PeerSendWorkRequest<'wr, 'data>>,
133        'data: 'wr,
134    {
135        unsafe { self.multi_channel.scatter_send_unpolled(wrs) }
136    }
137
138    /// Posts RDMA writes to multiple peers without polling for completion.
139    ///
140    /// # Safety
141    /// See [`Channel::write_unpolled`](crate::channel::Channel::write_unpolled).
142    pub unsafe fn scatter_write_unpolled<'wr, 'data, I>(
143        &mut self,
144        wrs: I,
145    ) -> IbvResult<Vec<PendingWork<'data>>>
146    where
147        I: IntoIterator<Item = PeerWriteWorkRequest<'wr, 'data>>,
148        'data: 'wr,
149    {
150        unsafe { self.multi_channel.scatter_write_unpolled(wrs) }
151    }
152
153    /// Posts receives from multiple peers without polling for completion.
154    ///
155    /// # Safety
156    /// See [`Channel::receive_unpolled`](crate::channel::Channel::receive_unpolled).
157    pub unsafe fn gather_receive_unpolled<'wr, 'data, I>(
158        &mut self,
159        wrs: I,
160    ) -> IbvResult<Vec<PendingWork<'data>>>
161    where
162        I: IntoIterator<Item = PeerReceiveWorkRequest<'wr, 'data>>,
163        'data: 'wr,
164    {
165        unsafe { self.multi_channel.gather_receive_unpolled(wrs) }
166    }
167
168    /// Posts RDMA reads from multiple peers without polling for completion.
169    ///
170    /// # Safety
171    /// See [`Channel::read_unpolled`](crate::channel::Channel::read_unpolled).
172    pub unsafe fn gather_read_unpolled<'wr, 'data, I>(
173        &mut self,
174        wrs: I,
175    ) -> IbvResult<Vec<PendingWork<'data>>>
176    where
177        I: IntoIterator<Item = PeerReadWorkRequest<'wr, 'data>>,
178        'data: 'wr,
179    {
180        unsafe { self.multi_channel.gather_read_unpolled(wrs) }
181    }
182
183    /// Posts the same send to multiple peers without polling for completion.
184    ///
185    /// # Safety
186    /// See [`Channel::send_unpolled`](crate::channel::Channel::send_unpolled).
187    pub unsafe fn multicast_send_unpolled<'wr, 'data, I>(
188        &mut self,
189        peers: I,
190        wr: SendWorkRequest<'wr, 'data>,
191    ) -> IbvResult<Vec<PendingWork<'data>>>
192    where
193        I: IntoIterator<Item = usize>,
194    {
195        unsafe { self.multi_channel.multicast_send_unpolled(peers, wr) }
196    }
197}