1use std::{
4 collections::HashSet,
5 fmt,
6 net::SocketAddr,
7 sync::Arc,
8 time::{Duration, Instant},
9};
10
11use async_trait::async_trait;
12use solana_connection_cache::{
13 connection_cache::NewConnectionConfig, nonblocking::client_connection::ClientConnection,
14};
15use solana_quic_client::{QuicConfig, QuicConnectionCache, QuicConnectionManager};
16use tokio::{
17 net::UdpSocket,
18 task::JoinSet,
19 time::{sleep, timeout},
20};
21
22use super::{DirectSubmitConfig, DirectSubmitTransport, SubmitTransportError};
23use crate::{providers::LeaderTarget, routing::RoutingPolicy};
24
25#[derive(Clone)]
27pub struct UdpDirectTransport {
28 quic_cache: Option<Arc<QuicConnectionCache>>,
30}
31
32impl fmt::Debug for UdpDirectTransport {
33 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
34 f.debug_struct("UdpDirectTransport")
35 .field("quic_enabled", &self.quic_cache.is_some())
36 .finish()
37 }
38}
39
40impl Default for UdpDirectTransport {
41 fn default() -> Self {
42 Self::new()
43 }
44}
45
46impl UdpDirectTransport {
47 #[must_use]
49 pub fn new() -> Self {
50 let quic_enabled = std::env::var("SOF_TX_ENABLE_QUIC_DIRECT")
51 .map(|value| {
52 let normalized = value.trim().to_ascii_lowercase();
53 matches!(normalized.as_str(), "1" | "true" | "yes" | "on")
54 })
55 .unwrap_or(false);
56 Self {
57 quic_cache: if quic_enabled {
58 quic_connection_cache().ok().map(Arc::new)
59 } else {
60 None
61 },
62 }
63 }
64}
65
66const QUIC_CONNECTION_POOL_SIZE: usize = 1;
68const QUIC_CACHE_NAME: &str = "sof-tx-direct-quic";
70const AGAVE_QUIC_PORT_OFFSET: u16 = 6;
72const MIN_UDP_SUCCESSES_FOR_ACCEPT: u64 = 16;
74const MIN_QUIC_SUCCESSES_FOR_ACCEPT: u64 = 2;
76const MIN_DISTINCT_QUIC_TARGETS_FOR_ACCEPT: usize = 2;
78const QUIC_CANDIDATE_PARALLEL_MULTIPLIER: usize = 8;
80
81#[derive(Debug, Clone)]
83struct TargetSendResult {
84 target: LeaderTarget,
86 udp_success: bool,
88 quic_success: bool,
90}
91
92#[async_trait]
93impl DirectSubmitTransport for UdpDirectTransport {
94 async fn submit_direct(
95 &self,
96 tx_bytes: &[u8],
97 targets: &[LeaderTarget],
98 policy: RoutingPolicy,
99 config: &DirectSubmitConfig,
100 ) -> Result<LeaderTarget, SubmitTransportError> {
101 let config = config.clone().normalized();
102 if targets.is_empty() {
103 return Err(SubmitTransportError::Config {
104 message: "no targets provided".to_owned(),
105 });
106 }
107
108 let socket = Arc::new(UdpSocket::bind("0.0.0.0:0").await.map_err(|error| {
109 SubmitTransportError::Failure {
110 message: error.to_string(),
111 }
112 })?);
113 let quic_cache = self.quic_cache.clone();
114 let payload: Arc<[u8]> = Arc::from(tx_bytes.to_vec());
115 let quic_enabled = quic_cache.is_some();
116 let effective_global_timeout = if quic_enabled {
117 config.global_timeout.max(Duration::from_secs(4))
118 } else {
119 config.global_timeout
120 };
121
122 let deadline = Instant::now()
123 .checked_add(effective_global_timeout)
124 .ok_or_else(|| SubmitTransportError::Failure {
125 message: "failed to calculate direct-submit deadline".to_owned(),
126 })?;
127 let normalized_policy = policy.normalized();
128 let quic_timeout = quic_timeout(config.per_target_timeout);
129 let quic_candidate_count = targets.len().min(
130 normalized_policy
131 .max_parallel_sends
132 .saturating_mul(QUIC_CANDIDATE_PARALLEL_MULTIPLIER)
133 .max(32),
134 );
135 let available_distinct_quic_targets = targets
136 .get(..quic_candidate_count)
137 .map_or(0, count_distinct_quic_targets);
138 let required_quic_successes =
139 required_quic_successes(quic_candidate_count, available_distinct_quic_targets);
140 let required_distinct_quic_targets =
141 required_distinct_quic_targets(available_distinct_quic_targets);
142 let mut udp_successes = 0_u64;
143 let mut first_udp_success = None::<LeaderTarget>;
144 let mut quic_successes = 0_u64;
145 let mut first_quic_success = None::<LeaderTarget>;
146 let mut quic_success_identities = HashSet::new();
147 let mut quic_success_addrs = HashSet::new();
148
149 for round in 0..config.direct_target_rounds {
150 if round > 0 {
151 let now = Instant::now();
152 if now >= deadline {
153 break;
154 }
155 let remaining = deadline.saturating_duration_since(now);
156 let sleep_for = remaining.min(config.rebroadcast_interval);
157 if !sleep_for.is_zero() {
158 sleep(sleep_for).await;
159 }
160 }
161 let mut target_index = 0_usize;
162 for chunk in targets.chunks(normalized_policy.max_parallel_sends) {
163 let now = Instant::now();
164 if now >= deadline {
165 if quic_cache.is_none()
166 && let Some(target) = first_udp_success
167 {
168 return Ok(target);
169 }
170 break;
171 }
172 let remaining = deadline.saturating_duration_since(now);
173 let per_target_udp_timeout = remaining.min(config.per_target_timeout);
174 let per_target_quic_timeout = remaining.min(quic_timeout);
175 let mut in_flight = JoinSet::new();
176
177 for target in chunk {
178 let socket = Arc::clone(&socket);
179 let payload = Arc::clone(&payload);
180 let target = target.clone();
181 let quic_cache = quic_cache.clone();
182 let use_quic = quic_cache.is_some() && target_index < quic_candidate_count;
183 target_index = target_index.saturating_add(1);
184 in_flight.spawn(async move {
185 send_target(
186 socket,
187 payload,
188 target,
189 per_target_udp_timeout,
190 per_target_quic_timeout,
191 quic_cache,
192 use_quic,
193 )
194 .await
195 });
196 }
197
198 while let Some(result) = in_flight.join_next().await {
199 if let Ok(send_result) = result {
200 if send_result.udp_success {
201 udp_successes = udp_successes.saturating_add(1);
202 if first_udp_success.is_none() {
203 first_udp_success = Some(send_result.target.clone());
204 }
205 }
206 if send_result.quic_success {
207 quic_successes = quic_successes.saturating_add(1);
208 if first_quic_success.is_none() {
209 first_quic_success = Some(send_result.target.clone());
210 }
211 if let Some(identity) = send_result.target.identity {
212 let _ = quic_success_identities.insert(identity);
213 } else {
214 let _ = quic_success_addrs.insert(send_result.target.tpu_addr);
215 }
216 let distinct_quic_targets = quic_success_identities
217 .len()
218 .saturating_add(quic_success_addrs.len());
219 if quic_successes >= required_quic_successes
220 && distinct_quic_targets >= required_distinct_quic_targets
221 && let Some(target) = first_quic_success.clone()
222 {
223 return Ok(target);
224 }
225 }
226 if quic_cache.is_none() && send_result.udp_success {
227 return Ok(send_result.target);
228 }
229 if quic_cache.is_none()
230 && udp_successes >= MIN_UDP_SUCCESSES_FOR_ACCEPT
231 && let Some(target) = first_udp_success.clone()
232 {
233 return Ok(target);
234 }
235 }
236 }
237 }
238 }
239
240 if quic_cache.is_some() {
241 let distinct_quic_targets = quic_success_identities
242 .len()
243 .saturating_add(quic_success_addrs.len());
244 if quic_successes >= required_quic_successes
245 && distinct_quic_targets >= required_distinct_quic_targets
246 && let Some(target) = first_quic_success
247 {
248 return Ok(target);
249 }
250 if let Some(target) = first_udp_success {
251 return Ok(target);
252 }
253 return Err(SubmitTransportError::Failure {
254 message: format!(
255 "direct propagation threshold not met (quic_successes={quic_successes}, distinct_quic_targets={distinct_quic_targets}, required_quic_successes={required_quic_successes}, required_distinct_quic_targets={required_distinct_quic_targets}, udp_successes={udp_successes}, quic_candidates={quic_candidate_count}, timeout_ms={})",
256 effective_global_timeout.as_millis()
257 ),
258 });
259 }
260
261 if let Some(target) = first_udp_success {
262 return Ok(target);
263 }
264
265 Err(SubmitTransportError::Failure {
266 message: format!(
267 "all direct targets failed (udp_successes={udp_successes}, quic_successes=0, quic_candidates={quic_candidate_count})"
268 ),
269 })
270 }
271}
272
273fn quic_connection_cache() -> Result<QuicConnectionCache, SubmitTransportError> {
275 let config = QuicConfig::new().map_err(|error| SubmitTransportError::Failure {
276 message: format!("failed to create quic config: {error}"),
277 })?;
278 let manager = QuicConnectionManager::new_with_connection_config(config);
279 QuicConnectionCache::new(QUIC_CACHE_NAME, manager, QUIC_CONNECTION_POOL_SIZE).map_err(|error| {
280 SubmitTransportError::Failure {
281 message: format!("failed to create quic connection cache: {error}"),
282 }
283 })
284}
285
286fn quic_timeout(per_target_timeout: Duration) -> Duration {
288 let minimum = Duration::from_millis(1_000);
289 if per_target_timeout < minimum {
290 minimum
291 } else {
292 per_target_timeout
293 }
294}
295
296fn count_distinct_quic_targets(targets: &[LeaderTarget]) -> usize {
298 targets
299 .iter()
300 .map(|target| {
301 target.identity.map_or(
302 DistinctTargetKey::Addr(target.tpu_addr),
303 DistinctTargetKey::Identity,
304 )
305 })
306 .collect::<HashSet<_>>()
307 .len()
308}
309
310fn required_quic_successes(candidate_count: usize, available_distinct_targets: usize) -> u64 {
312 let required_by_candidates = u64::try_from(candidate_count).unwrap_or(u64::MAX);
313 let required_by_distinct = u64::try_from(available_distinct_targets).unwrap_or(u64::MAX);
314 MIN_QUIC_SUCCESSES_FOR_ACCEPT
315 .min(required_by_candidates.max(1))
316 .min(required_by_distinct.max(1))
317}
318
319fn required_distinct_quic_targets(available_distinct_targets: usize) -> usize {
321 MIN_DISTINCT_QUIC_TARGETS_FOR_ACCEPT.min(available_distinct_targets.max(1))
322}
323
324#[derive(Debug, Clone, Copy, Eq, Hash, PartialEq)]
326enum DistinctTargetKey {
327 Identity(solana_pubkey::Pubkey),
329 Addr(SocketAddr),
331}
332
333async fn send_target(
335 socket: Arc<UdpSocket>,
336 payload: Arc<[u8]>,
337 target: LeaderTarget,
338 udp_timeout: Duration,
339 quic_timeout: Duration,
340 quic_cache: Option<Arc<QuicConnectionCache>>,
341 use_quic: bool,
342) -> TargetSendResult {
343 let udp_success = matches!(
344 timeout(
345 udp_timeout,
346 socket.send_to(payload.as_ref(), target.tpu_addr)
347 )
348 .await,
349 Ok(Ok(_))
350 );
351
352 let quic_success = if use_quic {
353 send_quic(quic_cache, payload.as_ref(), target.tpu_addr, quic_timeout).await
354 } else {
355 false
356 };
357
358 TargetSendResult {
359 target,
360 udp_success,
361 quic_success,
362 }
363}
364
365async fn send_quic(
367 quic_cache: Option<Arc<QuicConnectionCache>>,
368 payload: &[u8],
369 target: SocketAddr,
370 timeout_budget: Duration,
371) -> bool {
372 let Some(quic_cache) = quic_cache else {
373 return false;
374 };
375 let candidate_addrs = quic_candidate_addrs(target);
376 let payload: Arc<[u8]> = Arc::from(payload.to_vec());
377 let mut in_flight = JoinSet::new();
378 for addr in candidate_addrs {
379 let connection = quic_cache.get_nonblocking_connection(&addr);
380 let payload = Arc::clone(&payload);
381 in_flight.spawn(async move {
382 matches!(
383 timeout(timeout_budget, connection.send_data(payload.as_ref())).await,
384 Ok(Ok(()))
385 )
386 });
387 }
388 while let Some(result) = in_flight.join_next().await {
389 if matches!(result, Ok(true)) {
390 in_flight.abort_all();
391 return true;
392 }
393 }
394 false
395}
396
397fn quic_candidate_addrs(target: SocketAddr) -> Vec<SocketAddr> {
399 let mut addrs = Vec::with_capacity(2);
400 addrs.push(target);
401 if let Some(quic_fallback) = with_agave_quic_fallback(target)
402 && quic_fallback != target
403 {
404 addrs.push(quic_fallback);
405 }
406 addrs
407}
408
409fn with_agave_quic_fallback(addr: SocketAddr) -> Option<SocketAddr> {
411 let mut quic_addr = addr;
412 let port = quic_addr.port().checked_add(AGAVE_QUIC_PORT_OFFSET)?;
413 quic_addr.set_port(port);
414 Some(quic_addr)
415}
416
417#[cfg(test)]
418mod tests {
419 use super::*;
420
421 #[test]
422 fn quic_requirements_scale_down_for_single_target() {
423 assert_eq!(required_quic_successes(1, 1), 1);
424 assert_eq!(required_distinct_quic_targets(1), 1);
425 }
426
427 #[test]
428 fn quic_requirements_keep_default_for_multi_target_sets() {
429 assert_eq!(required_quic_successes(4, 4), MIN_QUIC_SUCCESSES_FOR_ACCEPT);
430 assert_eq!(
431 required_distinct_quic_targets(4),
432 MIN_DISTINCT_QUIC_TARGETS_FOR_ACCEPT
433 );
434 }
435
436 #[test]
437 fn quic_distinct_target_count_deduplicates_same_identity() {
438 let identity = solana_pubkey::Pubkey::new_unique();
439 let targets = vec![
440 LeaderTarget::new(Some(identity), SocketAddr::from(([127, 0, 0, 1], 9001))),
441 LeaderTarget::new(Some(identity), SocketAddr::from(([127, 0, 0, 1], 9002))),
442 ];
443 assert_eq!(count_distinct_quic_targets(&targets), 1);
444 }
445}