1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
use crate::error::{NexarError, Result};
use crate::transport::buffer_pool::PooledBuf;
use crate::types::Rank;
use std::sync::Arc;
use tokio::sync::Mutex;
use super::NexarClient;
use super::async_client::RawRecvSource;
impl NexarClient {
/// Send raw bytes to a peer.
///
/// Uses comm-aware send for split communicators. Always uses QUIC transport.
/// For bulk data in collectives, prefer `send_bytes_best_effort` which
/// auto-selects RDMA when available.
pub async fn send_bytes(&self, dest: Rank, data: &[u8]) -> Result<()> {
let peer = self.peer(dest)?;
if self.comm_id == 0 {
peer.send_raw(data).await
} else {
peer.send_raw_comm(self.comm_id, data).await
}
}
/// Send raw bytes using the best available transport (RDMA if available, QUIC fallback).
/// For split communicators, always uses QUIC (comm-id routing required).
pub(crate) async fn send_bytes_best_effort(&self, dest: Rank, data: &[u8]) -> Result<()> {
let peer = self.peer(dest)?;
if self.comm_id == 0 {
peer.send_raw_best_effort(data).await
} else {
// Split communicators need comm_id tagging, QUIC only.
peer.send_raw_comm(self.comm_id, data).await
}
}
/// Send raw bytes to a peer with a u64 tag.
///
/// Tagged sends are always via QUIC (tags are part of the wire format).
pub async fn send_bytes_tagged(&self, dest: Rank, tag: u64, data: &[u8]) -> Result<()> {
let peer = self.peer(dest)?;
peer.send_raw_tagged(tag, data).await
}
/// Receive tagged bytes using the best available transport.
///
/// Tries `TaggedBulkTransport` (TCP sidecar) first, falling back to QUIC.
pub(crate) async fn recv_bytes_tagged_best_effort(
&self,
src: Rank,
tag: u64,
expected_size: usize,
) -> Result<PooledBuf> {
let peer = self.peer(src)?;
let tagged_bulk: Option<std::sync::Arc<dyn crate::transport::TaggedBulkTransport>> = peer
.extension::<std::sync::Arc<dyn crate::transport::TaggedBulkTransport>>()?
.map(|b| std::sync::Arc::clone(&*b));
if let Some(bulk) = tagged_bulk
&& let Ok(data) = bulk.recv_bulk_tagged(tag, expected_size).await
{
return Ok(PooledBuf::from_vec(
data,
std::sync::Arc::clone(&self._pool),
));
}
self.recv_bytes_tagged(src, tag).await
}
/// Send tagged bytes using the best available transport.
pub(crate) async fn send_bytes_tagged_best_effort(
&self,
dest: Rank,
tag: u64,
data: &[u8],
) -> Result<()> {
let peer = self.peer(dest)?;
peer.send_raw_tagged_best_effort(tag, data).await
}
/// Receive tagged raw bytes from a peer.
///
/// The tag channel is lazily created and cached for the lifetime of this
/// (rank, tag) pair. This allows multi-round algorithms (like ring
/// allreduce) to use the same channel across rounds without losing
/// messages that arrive between rounds.
pub async fn recv_bytes_tagged(&self, src: Rank, tag: u64) -> Result<PooledBuf> {
let original_src = self.resolve_rank(src);
let key = (original_src, tag);
// Get or create the receiver for this (rank, tag) pair.
let rx_arc = {
let mut map = self.tagged_receivers.lock().await;
if let Some(rx) = map.get(&key) {
Arc::clone(rx)
} else {
let router = self
.routers
.get(&original_src)
.ok_or(NexarError::UnknownPeer { rank: src })?;
let rx = router.register_tag(tag).await;
let rx_arc = Arc::new(Mutex::new(rx));
map.insert(key, Arc::clone(&rx_arc));
rx_arc
}
};
let mut rx = rx_arc.lock().await;
rx.recv()
.await
.ok_or(NexarError::PeerDisconnected { rank: src })
}
/// Receive raw bytes using the best available transport.
///
/// Tries `BulkTransport::recv_bulk` first (e.g., RDMA), falling back to QUIC.
/// Only works for the default communicator (comm_id 0) and requires knowing
/// the expected size.
pub(crate) async fn recv_bytes_best_effort(
&self,
src: Rank,
expected_size: usize,
) -> Result<PooledBuf> {
if self.comm_id == 0 {
let peer = self.peer(src)?;
// Try BulkTransport recv_bulk if available.
let bulk: Option<std::sync::Arc<dyn crate::transport::BulkTransport>> = peer
.extension::<std::sync::Arc<dyn crate::transport::BulkTransport>>()?
.map(|b| std::sync::Arc::clone(&*b));
if let Some(bulk) = bulk {
match bulk.recv_bulk(expected_size).await {
Ok(data) => {
return Ok(PooledBuf::from_vec(
data,
std::sync::Arc::clone(&self._pool),
));
}
Err(e) => {
tracing::warn!(
src,
expected_size,
error = %e,
"bulk transport recv failed, falling back to QUIC"
);
}
}
}
}
// Fallback to QUIC recv.
self.recv_bytes(src).await
}
/// Receive raw bytes from a peer.
///
/// Uses comm-aware recv for split communicators.
pub async fn recv_bytes(&self, src: Rank) -> Result<PooledBuf> {
match &self.raw_recv {
RawRecvSource::Router => {
let original_src = self.resolve_rank(src);
let router = self
.routers
.get(&original_src)
.ok_or(NexarError::UnknownPeer { rank: src })?;
router.recv_raw(original_src).await
}
RawRecvSource::Comm(channels) => {
let rx = channels
.get(&src)
.ok_or(NexarError::UnknownPeer { rank: src })?;
rx.lock()
.await
.recv()
.await
.ok_or(NexarError::PeerDisconnected { rank: src })
}
}
}
}