Skip to main content

sof_tx/submit/
direct.rs

1//! Direct-submit transport implementation.
2
3use std::{
4    collections::HashSet,
5    fmt,
6    net::SocketAddr,
7    sync::Arc,
8    time::{Duration, Instant},
9};
10
11use async_trait::async_trait;
12use solana_connection_cache::{
13    connection_cache::NewConnectionConfig, nonblocking::client_connection::ClientConnection,
14};
15use solana_quic_client::{QuicConfig, QuicConnectionCache, QuicConnectionManager};
16use tokio::{
17    net::UdpSocket,
18    task::JoinSet,
19    time::{sleep, timeout},
20};
21
22use super::{DirectSubmitConfig, DirectSubmitTransport, SubmitTransportError};
23use crate::{providers::LeaderTarget, routing::RoutingPolicy};
24
25/// UDP-based direct transport that sends transaction bytes to TPU targets.
26#[derive(Clone)]
27pub struct UdpDirectTransport {
28    /// Optional shared QUIC connection cache enabled by environment.
29    quic_cache: Option<Arc<QuicConnectionCache>>,
30}
31
32impl fmt::Debug for UdpDirectTransport {
33    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
34        f.debug_struct("UdpDirectTransport")
35            .field("quic_enabled", &self.quic_cache.is_some())
36            .finish()
37    }
38}
39
40impl Default for UdpDirectTransport {
41    fn default() -> Self {
42        Self::new()
43    }
44}
45
46impl UdpDirectTransport {
47    /// Creates a direct transport with optional shared QUIC cache.
48    #[must_use]
49    pub fn new() -> Self {
50        let quic_enabled = std::env::var("SOF_TX_ENABLE_QUIC_DIRECT")
51            .map(|value| {
52                let normalized = value.trim().to_ascii_lowercase();
53                matches!(normalized.as_str(), "1" | "true" | "yes" | "on")
54            })
55            .unwrap_or(false);
56        Self {
57            quic_cache: if quic_enabled {
58                quic_connection_cache().ok().map(Arc::new)
59            } else {
60                None
61            },
62        }
63    }
64}
65
66/// Number of pooled QUIC connections per target.
67const QUIC_CONNECTION_POOL_SIZE: usize = 1;
68/// Name tag used by connection-cache metrics.
69const QUIC_CACHE_NAME: &str = "sof-tx-direct-quic";
70/// Agave QUIC port offset used by TPU clients when `tpu_quic` is unavailable.
71const AGAVE_QUIC_PORT_OFFSET: u16 = 6;
72/// Minimum UDP send successes before accepting non-QUIC propagation.
73const MIN_UDP_SUCCESSES_FOR_ACCEPT: u64 = 16;
74/// Minimum QUIC send successes required before accepting direct propagation.
75const MIN_QUIC_SUCCESSES_FOR_ACCEPT: u64 = 2;
76/// Minimum distinct QUIC targets (identity/address) required before accepting propagation.
77const MIN_DISTINCT_QUIC_TARGETS_FOR_ACCEPT: usize = 2;
78/// Maximum number of QUIC-candidate targets per attempt, as a multiple of parallel send width.
79const QUIC_CANDIDATE_PARALLEL_MULTIPLIER: usize = 8;
80
81/// Per-target send outcome collected from concurrent UDP/QUIC attempts.
82#[derive(Debug, Clone)]
83struct TargetSendResult {
84    /// The target that was attempted.
85    target: LeaderTarget,
86    /// Whether the UDP send call completed successfully.
87    udp_success: bool,
88    /// Whether at least one QUIC candidate send completed successfully.
89    quic_success: bool,
90}
91
92#[async_trait]
93impl DirectSubmitTransport for UdpDirectTransport {
94    async fn submit_direct(
95        &self,
96        tx_bytes: &[u8],
97        targets: &[LeaderTarget],
98        policy: RoutingPolicy,
99        config: &DirectSubmitConfig,
100    ) -> Result<LeaderTarget, SubmitTransportError> {
101        let config = config.clone().normalized();
102        if targets.is_empty() {
103            return Err(SubmitTransportError::Config {
104                message: "no targets provided".to_owned(),
105            });
106        }
107
108        let socket = Arc::new(UdpSocket::bind("0.0.0.0:0").await.map_err(|error| {
109            SubmitTransportError::Failure {
110                message: error.to_string(),
111            }
112        })?);
113        let quic_cache = self.quic_cache.clone();
114        let payload: Arc<[u8]> = Arc::from(tx_bytes.to_vec());
115        let quic_enabled = quic_cache.is_some();
116        let effective_global_timeout = if quic_enabled {
117            config.global_timeout.max(Duration::from_secs(4))
118        } else {
119            config.global_timeout
120        };
121
122        let deadline = Instant::now()
123            .checked_add(effective_global_timeout)
124            .ok_or_else(|| SubmitTransportError::Failure {
125                message: "failed to calculate direct-submit deadline".to_owned(),
126            })?;
127        let normalized_policy = policy.normalized();
128        let quic_timeout = quic_timeout(config.per_target_timeout);
129        let quic_candidate_count = targets.len().min(
130            normalized_policy
131                .max_parallel_sends
132                .saturating_mul(QUIC_CANDIDATE_PARALLEL_MULTIPLIER)
133                .max(32),
134        );
135        let available_distinct_quic_targets = targets
136            .get(..quic_candidate_count)
137            .map_or(0, count_distinct_quic_targets);
138        let required_quic_successes =
139            required_quic_successes(quic_candidate_count, available_distinct_quic_targets);
140        let required_distinct_quic_targets =
141            required_distinct_quic_targets(available_distinct_quic_targets);
142        let mut udp_successes = 0_u64;
143        let mut first_udp_success = None::<LeaderTarget>;
144        let mut quic_successes = 0_u64;
145        let mut first_quic_success = None::<LeaderTarget>;
146        let mut quic_success_identities = HashSet::new();
147        let mut quic_success_addrs = HashSet::new();
148
149        for round in 0..config.direct_target_rounds {
150            if round > 0 {
151                let now = Instant::now();
152                if now >= deadline {
153                    break;
154                }
155                let remaining = deadline.saturating_duration_since(now);
156                let sleep_for = remaining.min(config.rebroadcast_interval);
157                if !sleep_for.is_zero() {
158                    sleep(sleep_for).await;
159                }
160            }
161            let mut target_index = 0_usize;
162            for chunk in targets.chunks(normalized_policy.max_parallel_sends) {
163                let now = Instant::now();
164                if now >= deadline {
165                    if quic_cache.is_none()
166                        && let Some(target) = first_udp_success
167                    {
168                        return Ok(target);
169                    }
170                    break;
171                }
172                let remaining = deadline.saturating_duration_since(now);
173                let per_target_udp_timeout = remaining.min(config.per_target_timeout);
174                let per_target_quic_timeout = remaining.min(quic_timeout);
175                let mut in_flight = JoinSet::new();
176
177                for target in chunk {
178                    let socket = Arc::clone(&socket);
179                    let payload = Arc::clone(&payload);
180                    let target = target.clone();
181                    let quic_cache = quic_cache.clone();
182                    let use_quic = quic_cache.is_some() && target_index < quic_candidate_count;
183                    target_index = target_index.saturating_add(1);
184                    in_flight.spawn(async move {
185                        send_target(
186                            socket,
187                            payload,
188                            target,
189                            per_target_udp_timeout,
190                            per_target_quic_timeout,
191                            quic_cache,
192                            use_quic,
193                        )
194                        .await
195                    });
196                }
197
198                while let Some(result) = in_flight.join_next().await {
199                    if let Ok(send_result) = result {
200                        if send_result.udp_success {
201                            udp_successes = udp_successes.saturating_add(1);
202                            if first_udp_success.is_none() {
203                                first_udp_success = Some(send_result.target.clone());
204                            }
205                        }
206                        if send_result.quic_success {
207                            quic_successes = quic_successes.saturating_add(1);
208                            if first_quic_success.is_none() {
209                                first_quic_success = Some(send_result.target.clone());
210                            }
211                            if let Some(identity) = send_result.target.identity {
212                                let _ = quic_success_identities.insert(identity);
213                            } else {
214                                let _ = quic_success_addrs.insert(send_result.target.tpu_addr);
215                            }
216                            let distinct_quic_targets = quic_success_identities
217                                .len()
218                                .saturating_add(quic_success_addrs.len());
219                            if quic_successes >= required_quic_successes
220                                && distinct_quic_targets >= required_distinct_quic_targets
221                                && let Some(target) = first_quic_success.clone()
222                            {
223                                return Ok(target);
224                            }
225                        }
226                        if quic_cache.is_none()
227                            && udp_successes >= MIN_UDP_SUCCESSES_FOR_ACCEPT
228                            && let Some(target) = first_udp_success.clone()
229                        {
230                            return Ok(target);
231                        }
232                    }
233                }
234            }
235        }
236
237        if quic_cache.is_some() {
238            let distinct_quic_targets = quic_success_identities
239                .len()
240                .saturating_add(quic_success_addrs.len());
241            if quic_successes >= required_quic_successes
242                && distinct_quic_targets >= required_distinct_quic_targets
243                && let Some(target) = first_quic_success
244            {
245                return Ok(target);
246            }
247            if let Some(target) = first_udp_success {
248                return Ok(target);
249            }
250            return Err(SubmitTransportError::Failure {
251                message: format!(
252                    "direct propagation threshold not met (quic_successes={quic_successes}, distinct_quic_targets={distinct_quic_targets}, required_quic_successes={required_quic_successes}, required_distinct_quic_targets={required_distinct_quic_targets}, udp_successes={udp_successes}, quic_candidates={quic_candidate_count}, timeout_ms={})",
253                    effective_global_timeout.as_millis()
254                ),
255            });
256        }
257
258        if let Some(target) = first_udp_success {
259            return Ok(target);
260        }
261
262        Err(SubmitTransportError::Failure {
263            message: format!(
264                "all direct targets failed (udp_successes={udp_successes}, quic_successes=0, quic_candidates={quic_candidate_count})"
265            ),
266        })
267    }
268}
269
270/// Builds the shared QUIC connection cache used for optional direct sends.
271fn quic_connection_cache() -> Result<QuicConnectionCache, SubmitTransportError> {
272    let config = QuicConfig::new().map_err(|error| SubmitTransportError::Failure {
273        message: format!("failed to create quic config: {error}"),
274    })?;
275    let manager = QuicConnectionManager::new_with_connection_config(config);
276    QuicConnectionCache::new(QUIC_CACHE_NAME, manager, QUIC_CONNECTION_POOL_SIZE).map_err(|error| {
277        SubmitTransportError::Failure {
278            message: format!("failed to create quic connection cache: {error}"),
279        }
280    })
281}
282
283/// Expands short caller timeouts so QUIC handshakes have a usable floor.
284fn quic_timeout(per_target_timeout: Duration) -> Duration {
285    let minimum = Duration::from_millis(1_000);
286    if per_target_timeout < minimum {
287        minimum
288    } else {
289        per_target_timeout
290    }
291}
292
293/// Counts the distinct identities or addresses in the QUIC candidate set.
294fn count_distinct_quic_targets(targets: &[LeaderTarget]) -> usize {
295    targets
296        .iter()
297        .map(|target| {
298            target.identity.map_or(
299                DistinctTargetKey::Addr(target.tpu_addr),
300                DistinctTargetKey::Identity,
301            )
302        })
303        .collect::<HashSet<_>>()
304        .len()
305}
306
307/// Scales the QUIC success requirement down for small candidate sets.
308fn required_quic_successes(candidate_count: usize, available_distinct_targets: usize) -> u64 {
309    let required_by_candidates = u64::try_from(candidate_count).unwrap_or(u64::MAX);
310    let required_by_distinct = u64::try_from(available_distinct_targets).unwrap_or(u64::MAX);
311    MIN_QUIC_SUCCESSES_FOR_ACCEPT
312        .min(required_by_candidates.max(1))
313        .min(required_by_distinct.max(1))
314}
315
316/// Scales the distinct-target threshold down for sparse target sets.
317fn required_distinct_quic_targets(available_distinct_targets: usize) -> usize {
318    MIN_DISTINCT_QUIC_TARGETS_FOR_ACCEPT.min(available_distinct_targets.max(1))
319}
320
321/// Key used to count unique QUIC targets using the same identity/address semantics as submits.
322#[derive(Debug, Clone, Copy, Eq, Hash, PartialEq)]
323enum DistinctTargetKey {
324    /// Deduplicate targets that share the same validator identity.
325    Identity(solana_pubkey::Pubkey),
326    /// Fallback key for targets without a known validator identity.
327    Addr(SocketAddr),
328}
329
330/// Sends one payload to a target over UDP and optionally QUIC in parallel.
331async fn send_target(
332    socket: Arc<UdpSocket>,
333    payload: Arc<[u8]>,
334    target: LeaderTarget,
335    udp_timeout: Duration,
336    quic_timeout: Duration,
337    quic_cache: Option<Arc<QuicConnectionCache>>,
338    use_quic: bool,
339) -> TargetSendResult {
340    let udp_success = matches!(
341        timeout(
342            udp_timeout,
343            socket.send_to(payload.as_ref(), target.tpu_addr)
344        )
345        .await,
346        Ok(Ok(_))
347    );
348
349    let quic_success = if use_quic {
350        send_quic(quic_cache, payload.as_ref(), target.tpu_addr, quic_timeout).await
351    } else {
352        false
353    };
354
355    TargetSendResult {
356        target,
357        udp_success,
358        quic_success,
359    }
360}
361
362/// Attempts QUIC sends against the target's candidate QUIC addresses.
363async fn send_quic(
364    quic_cache: Option<Arc<QuicConnectionCache>>,
365    payload: &[u8],
366    target: SocketAddr,
367    timeout_budget: Duration,
368) -> bool {
369    let Some(quic_cache) = quic_cache else {
370        return false;
371    };
372    let candidate_addrs = quic_candidate_addrs(target);
373    let payload: Arc<[u8]> = Arc::from(payload.to_vec());
374    let mut in_flight = JoinSet::new();
375    for addr in candidate_addrs {
376        let connection = quic_cache.get_nonblocking_connection(&addr);
377        let payload = Arc::clone(&payload);
378        in_flight.spawn(async move {
379            matches!(
380                timeout(timeout_budget, connection.send_data(payload.as_ref())).await,
381                Ok(Ok(()))
382            )
383        });
384    }
385    while let Some(result) = in_flight.join_next().await {
386        if matches!(result, Ok(true)) {
387            in_flight.abort_all();
388            return true;
389        }
390    }
391    false
392}
393
394/// Expands one TPU address into the candidate QUIC addresses to probe.
395fn quic_candidate_addrs(target: SocketAddr) -> Vec<SocketAddr> {
396    let mut addrs = Vec::with_capacity(2);
397    addrs.push(target);
398    if let Some(quic_fallback) = with_agave_quic_fallback(target)
399        && quic_fallback != target
400    {
401        addrs.push(quic_fallback);
402    }
403    addrs
404}
405
406/// Applies the standard Agave TPU-to-QUIC port offset when it fits in `u16`.
407fn with_agave_quic_fallback(addr: SocketAddr) -> Option<SocketAddr> {
408    let mut quic_addr = addr;
409    let port = quic_addr.port().checked_add(AGAVE_QUIC_PORT_OFFSET)?;
410    quic_addr.set_port(port);
411    Some(quic_addr)
412}
413
414#[cfg(test)]
415mod tests {
416    use super::*;
417
418    #[test]
419    fn quic_requirements_scale_down_for_single_target() {
420        assert_eq!(required_quic_successes(1, 1), 1);
421        assert_eq!(required_distinct_quic_targets(1), 1);
422    }
423
424    #[test]
425    fn quic_requirements_keep_default_for_multi_target_sets() {
426        assert_eq!(required_quic_successes(4, 4), MIN_QUIC_SUCCESSES_FOR_ACCEPT);
427        assert_eq!(
428            required_distinct_quic_targets(4),
429            MIN_DISTINCT_QUIC_TARGETS_FOR_ACCEPT
430        );
431    }
432
433    #[test]
434    fn quic_distinct_target_count_deduplicates_same_identity() {
435        let identity = solana_pubkey::Pubkey::new_unique();
436        let targets = vec![
437            LeaderTarget::new(Some(identity), SocketAddr::from(([127, 0, 0, 1], 9001))),
438            LeaderTarget::new(Some(identity), SocketAddr::from(([127, 0, 0, 1], 9002))),
439        ];
440        assert_eq!(count_distinct_quic_targets(&targets), 1);
441    }
442}