1use std::{
4 collections::HashSet,
5 fmt,
6 net::SocketAddr,
7 sync::Arc,
8 time::{Duration, Instant},
9};
10
11use async_trait::async_trait;
12use sof_types::PubkeyBytes;
13use solana_connection_cache::{
14 connection_cache::NewConnectionConfig, nonblocking::client_connection::ClientConnection,
15};
16use solana_quic_client::{QuicConfig, QuicConnectionCache, QuicConnectionManager};
17use tokio::{
18 net::UdpSocket,
19 task::JoinSet,
20 time::{sleep, timeout},
21};
22
23use super::{DirectSubmitConfig, DirectSubmitTransport, SubmitTransportError};
24use crate::{providers::LeaderTarget, routing::RoutingPolicy};
25
26#[derive(Clone)]
28pub struct UdpDirectTransport {
29 quic_cache: Option<Arc<QuicConnectionCache>>,
31}
32
33impl fmt::Debug for UdpDirectTransport {
34 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
35 f.debug_struct("UdpDirectTransport")
36 .field("quic_enabled", &self.quic_cache.is_some())
37 .finish()
38 }
39}
40
41impl Default for UdpDirectTransport {
42 fn default() -> Self {
43 Self::new()
44 }
45}
46
47impl UdpDirectTransport {
48 #[must_use]
50 pub fn new() -> Self {
51 let quic_enabled = std::env::var("SOF_TX_ENABLE_QUIC_DIRECT")
52 .map(|value| {
53 let normalized = value.trim().to_ascii_lowercase();
54 matches!(normalized.as_str(), "1" | "true" | "yes" | "on")
55 })
56 .unwrap_or(false);
57 Self {
58 quic_cache: if quic_enabled {
59 quic_connection_cache().ok().map(Arc::new)
60 } else {
61 None
62 },
63 }
64 }
65}
66
67const QUIC_CONNECTION_POOL_SIZE: usize = 1;
69const QUIC_CACHE_NAME: &str = "sof-tx-direct-quic";
71const AGAVE_QUIC_PORT_OFFSET: u16 = 6;
73const MIN_UDP_SUCCESSES_FOR_ACCEPT: u64 = 16;
75const MIN_QUIC_SUCCESSES_FOR_ACCEPT: u64 = 2;
77const MIN_DISTINCT_QUIC_TARGETS_FOR_ACCEPT: usize = 2;
79const QUIC_CANDIDATE_PARALLEL_MULTIPLIER: usize = 8;
81
82#[derive(Debug, Clone)]
84struct TargetSendResult {
85 target: LeaderTarget,
87 udp_success: bool,
89 quic_success: bool,
91}
92
93#[async_trait]
94impl DirectSubmitTransport for UdpDirectTransport {
95 async fn submit_direct(
96 &self,
97 tx_bytes: &[u8],
98 targets: &[LeaderTarget],
99 policy: RoutingPolicy,
100 config: &DirectSubmitConfig,
101 ) -> Result<LeaderTarget, SubmitTransportError> {
102 let config = config.clone().normalized();
103 if targets.is_empty() {
104 return Err(SubmitTransportError::Config {
105 message: "no targets provided".to_owned(),
106 });
107 }
108
109 let socket = Arc::new(UdpSocket::bind("0.0.0.0:0").await.map_err(|error| {
110 SubmitTransportError::Failure {
111 message: error.to_string(),
112 }
113 })?);
114 let quic_cache = self.quic_cache.clone();
115 let payload: Arc<[u8]> = Arc::from(tx_bytes.to_vec());
116 let quic_enabled = quic_cache.is_some();
117 let effective_global_timeout = if quic_enabled {
118 config.global_timeout.max(Duration::from_secs(4))
119 } else {
120 config.global_timeout
121 };
122
123 let deadline = Instant::now()
124 .checked_add(effective_global_timeout)
125 .ok_or_else(|| SubmitTransportError::Failure {
126 message: "failed to calculate direct-submit deadline".to_owned(),
127 })?;
128 let normalized_policy = policy.normalized();
129 let quic_timeout = quic_timeout(config.per_target_timeout);
130 let quic_candidate_count = targets.len().min(
131 normalized_policy
132 .max_parallel_sends
133 .saturating_mul(QUIC_CANDIDATE_PARALLEL_MULTIPLIER)
134 .max(32),
135 );
136 let available_distinct_quic_targets = targets
137 .get(..quic_candidate_count)
138 .map_or(0, count_distinct_quic_targets);
139 let required_quic_successes =
140 required_quic_successes(quic_candidate_count, available_distinct_quic_targets);
141 let required_distinct_quic_targets =
142 required_distinct_quic_targets(available_distinct_quic_targets);
143 let mut udp_successes = 0_u64;
144 let mut first_udp_success = None::<LeaderTarget>;
145 let mut quic_successes = 0_u64;
146 let mut first_quic_success = None::<LeaderTarget>;
147 let mut quic_success_identities = HashSet::new();
148 let mut quic_success_addrs = HashSet::new();
149
150 for round in 0..config.direct_target_rounds {
151 if round > 0 {
152 let now = Instant::now();
153 if now >= deadline {
154 break;
155 }
156 let remaining = deadline.saturating_duration_since(now);
157 let sleep_for = remaining.min(config.rebroadcast_interval);
158 if !sleep_for.is_zero() {
159 sleep(sleep_for).await;
160 }
161 }
162 let mut target_index = 0_usize;
163 for chunk in targets.chunks(normalized_policy.max_parallel_sends) {
164 let now = Instant::now();
165 if now >= deadline {
166 if quic_cache.is_none()
167 && let Some(target) = first_udp_success
168 {
169 return Ok(target);
170 }
171 break;
172 }
173 let remaining = deadline.saturating_duration_since(now);
174 let per_target_udp_timeout = remaining.min(config.per_target_timeout);
175 let per_target_quic_timeout = remaining.min(quic_timeout);
176 let mut in_flight = JoinSet::new();
177
178 for target in chunk {
179 let socket = Arc::clone(&socket);
180 let payload = Arc::clone(&payload);
181 let target = target.clone();
182 let quic_cache = quic_cache.clone();
183 let use_quic = quic_cache.is_some() && target_index < quic_candidate_count;
184 target_index = target_index.saturating_add(1);
185 in_flight.spawn(async move {
186 send_target(
187 socket,
188 payload,
189 target,
190 per_target_udp_timeout,
191 per_target_quic_timeout,
192 quic_cache,
193 use_quic,
194 )
195 .await
196 });
197 }
198
199 while let Some(result) = in_flight.join_next().await {
200 if let Ok(send_result) = result {
201 if send_result.udp_success {
202 udp_successes = udp_successes.saturating_add(1);
203 if first_udp_success.is_none() {
204 first_udp_success = Some(send_result.target.clone());
205 }
206 }
207 if send_result.quic_success {
208 quic_successes = quic_successes.saturating_add(1);
209 if first_quic_success.is_none() {
210 first_quic_success = Some(send_result.target.clone());
211 }
212 if let Some(identity) = send_result.target.identity {
213 let _ = quic_success_identities.insert(identity);
214 } else {
215 let _ = quic_success_addrs.insert(send_result.target.tpu_addr);
216 }
217 let distinct_quic_targets = quic_success_identities
218 .len()
219 .saturating_add(quic_success_addrs.len());
220 if quic_successes >= required_quic_successes
221 && distinct_quic_targets >= required_distinct_quic_targets
222 && let Some(target) = first_quic_success.clone()
223 {
224 return Ok(target);
225 }
226 }
227 if quic_cache.is_none()
228 && udp_successes >= MIN_UDP_SUCCESSES_FOR_ACCEPT
229 && let Some(target) = first_udp_success.clone()
230 {
231 return Ok(target);
232 }
233 }
234 }
235 }
236 }
237
238 if quic_cache.is_some() {
239 let distinct_quic_targets = quic_success_identities
240 .len()
241 .saturating_add(quic_success_addrs.len());
242 if quic_successes >= required_quic_successes
243 && distinct_quic_targets >= required_distinct_quic_targets
244 && let Some(target) = first_quic_success
245 {
246 return Ok(target);
247 }
248 if let Some(target) = first_udp_success {
249 return Ok(target);
250 }
251 return Err(SubmitTransportError::Failure {
252 message: format!(
253 "direct propagation threshold not met (quic_successes={quic_successes}, distinct_quic_targets={distinct_quic_targets}, required_quic_successes={required_quic_successes}, required_distinct_quic_targets={required_distinct_quic_targets}, udp_successes={udp_successes}, quic_candidates={quic_candidate_count}, timeout_ms={})",
254 effective_global_timeout.as_millis()
255 ),
256 });
257 }
258
259 if let Some(target) = first_udp_success {
260 return Ok(target);
261 }
262
263 Err(SubmitTransportError::Failure {
264 message: format!(
265 "all direct targets failed (udp_successes={udp_successes}, quic_successes=0, quic_candidates={quic_candidate_count})"
266 ),
267 })
268 }
269}
270
271fn quic_connection_cache() -> Result<QuicConnectionCache, SubmitTransportError> {
273 let config = QuicConfig::new().map_err(|error| SubmitTransportError::Failure {
274 message: format!("failed to create quic config: {error}"),
275 })?;
276 let manager = QuicConnectionManager::new_with_connection_config(config);
277 QuicConnectionCache::new(QUIC_CACHE_NAME, manager, QUIC_CONNECTION_POOL_SIZE).map_err(|error| {
278 SubmitTransportError::Failure {
279 message: format!("failed to create quic connection cache: {error}"),
280 }
281 })
282}
283
284fn quic_timeout(per_target_timeout: Duration) -> Duration {
286 let minimum = Duration::from_millis(1_000);
287 if per_target_timeout < minimum {
288 minimum
289 } else {
290 per_target_timeout
291 }
292}
293
294fn count_distinct_quic_targets(targets: &[LeaderTarget]) -> usize {
296 targets
297 .iter()
298 .map(|target| {
299 target.identity.map_or(
300 DistinctTargetKey::Addr(target.tpu_addr),
301 DistinctTargetKey::Identity,
302 )
303 })
304 .collect::<HashSet<_>>()
305 .len()
306}
307
308fn required_quic_successes(candidate_count: usize, available_distinct_targets: usize) -> u64 {
310 let required_by_candidates = u64::try_from(candidate_count).unwrap_or(u64::MAX);
311 let required_by_distinct = u64::try_from(available_distinct_targets).unwrap_or(u64::MAX);
312 MIN_QUIC_SUCCESSES_FOR_ACCEPT
313 .min(required_by_candidates.max(1))
314 .min(required_by_distinct.max(1))
315}
316
317fn required_distinct_quic_targets(available_distinct_targets: usize) -> usize {
319 MIN_DISTINCT_QUIC_TARGETS_FOR_ACCEPT.min(available_distinct_targets.max(1))
320}
321
322#[derive(Debug, Clone, Copy, Eq, Hash, PartialEq)]
324enum DistinctTargetKey {
325 Identity(PubkeyBytes),
327 Addr(SocketAddr),
329}
330
331async fn send_target(
333 socket: Arc<UdpSocket>,
334 payload: Arc<[u8]>,
335 target: LeaderTarget,
336 udp_timeout: Duration,
337 quic_timeout: Duration,
338 quic_cache: Option<Arc<QuicConnectionCache>>,
339 use_quic: bool,
340) -> TargetSendResult {
341 let udp_success = matches!(
342 timeout(
343 udp_timeout,
344 socket.send_to(payload.as_ref(), target.tpu_addr)
345 )
346 .await,
347 Ok(Ok(_))
348 );
349
350 let quic_success = if use_quic {
351 send_quic(quic_cache, payload.as_ref(), target.tpu_addr, quic_timeout).await
352 } else {
353 false
354 };
355
356 TargetSendResult {
357 target,
358 udp_success,
359 quic_success,
360 }
361}
362
363async fn send_quic(
365 quic_cache: Option<Arc<QuicConnectionCache>>,
366 payload: &[u8],
367 target: SocketAddr,
368 timeout_budget: Duration,
369) -> bool {
370 let Some(quic_cache) = quic_cache else {
371 return false;
372 };
373 let candidate_addrs = quic_candidate_addrs(target);
374 let payload: Arc<[u8]> = Arc::from(payload.to_vec());
375 let mut in_flight = JoinSet::new();
376 for addr in candidate_addrs {
377 let connection = quic_cache.get_nonblocking_connection(&addr);
378 let payload = Arc::clone(&payload);
379 in_flight.spawn(async move {
380 matches!(
381 timeout(timeout_budget, connection.send_data(payload.as_ref())).await,
382 Ok(Ok(()))
383 )
384 });
385 }
386 while let Some(result) = in_flight.join_next().await {
387 if matches!(result, Ok(true)) {
388 in_flight.abort_all();
389 return true;
390 }
391 }
392 false
393}
394
395fn quic_candidate_addrs(target: SocketAddr) -> Vec<SocketAddr> {
397 let mut addrs = Vec::with_capacity(2);
398 addrs.push(target);
399 if let Some(quic_fallback) = with_agave_quic_fallback(target)
400 && quic_fallback != target
401 {
402 addrs.push(quic_fallback);
403 }
404 addrs
405}
406
407fn with_agave_quic_fallback(addr: SocketAddr) -> Option<SocketAddr> {
409 let mut quic_addr = addr;
410 let port = quic_addr.port().checked_add(AGAVE_QUIC_PORT_OFFSET)?;
411 quic_addr.set_port(port);
412 Some(quic_addr)
413}
414
415#[cfg(test)]
416mod tests {
417 use super::*;
418
419 #[test]
420 fn quic_requirements_scale_down_for_single_target() {
421 assert_eq!(required_quic_successes(1, 1), 1);
422 assert_eq!(required_distinct_quic_targets(1), 1);
423 }
424
425 #[test]
426 fn quic_requirements_keep_default_for_multi_target_sets() {
427 assert_eq!(required_quic_successes(4, 4), MIN_QUIC_SUCCESSES_FOR_ACCEPT);
428 assert_eq!(
429 required_distinct_quic_targets(4),
430 MIN_DISTINCT_QUIC_TARGETS_FOR_ACCEPT
431 );
432 }
433
434 #[test]
435 fn quic_distinct_target_count_deduplicates_same_identity() {
436 let identity = PubkeyBytes::from_solana(solana_pubkey::Pubkey::new_unique());
437 let targets = vec![
438 LeaderTarget::new(Some(identity), SocketAddr::from(([127, 0, 0, 1], 9001))),
439 LeaderTarget::new(Some(identity), SocketAddr::from(([127, 0, 0, 1], 9002))),
440 ];
441 assert_eq!(count_distinct_quic_targets(&targets), 1);
442 }
443}