Skip to main content

sof_tx/submit/
direct.rs

1//! Direct-submit transport implementation.
2
3use std::{
4    collections::HashSet,
5    fmt,
6    net::SocketAddr,
7    sync::Arc,
8    time::{Duration, Instant},
9};
10
11use async_trait::async_trait;
12use solana_connection_cache::{
13    connection_cache::NewConnectionConfig, nonblocking::client_connection::ClientConnection,
14};
15use solana_quic_client::{QuicConfig, QuicConnectionCache, QuicConnectionManager};
16use tokio::{
17    net::UdpSocket,
18    task::JoinSet,
19    time::{sleep, timeout},
20};
21
22use super::{DirectSubmitConfig, DirectSubmitTransport, SubmitTransportError};
23use crate::{providers::LeaderTarget, routing::RoutingPolicy};
24
25/// UDP-based direct transport that sends transaction bytes to TPU targets.
26#[derive(Clone)]
27pub struct UdpDirectTransport {
28    /// Optional shared QUIC connection cache enabled by environment.
29    quic_cache: Option<Arc<QuicConnectionCache>>,
30}
31
32impl fmt::Debug for UdpDirectTransport {
33    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
34        f.debug_struct("UdpDirectTransport")
35            .field("quic_enabled", &self.quic_cache.is_some())
36            .finish()
37    }
38}
39
40impl Default for UdpDirectTransport {
41    fn default() -> Self {
42        Self::new()
43    }
44}
45
46impl UdpDirectTransport {
47    /// Creates a direct transport with optional shared QUIC cache.
48    #[must_use]
49    pub fn new() -> Self {
50        let quic_enabled = std::env::var("SOF_TX_ENABLE_QUIC_DIRECT")
51            .map(|value| {
52                let normalized = value.trim().to_ascii_lowercase();
53                matches!(normalized.as_str(), "1" | "true" | "yes" | "on")
54            })
55            .unwrap_or(false);
56        Self {
57            quic_cache: if quic_enabled {
58                quic_connection_cache().ok().map(Arc::new)
59            } else {
60                None
61            },
62        }
63    }
64}
65
66/// Number of pooled QUIC connections per target.
67const QUIC_CONNECTION_POOL_SIZE: usize = 1;
68/// Name tag used by connection-cache metrics.
69const QUIC_CACHE_NAME: &str = "sof-tx-direct-quic";
70/// Agave QUIC port offset used by TPU clients when `tpu_quic` is unavailable.
71const AGAVE_QUIC_PORT_OFFSET: u16 = 6;
72/// Minimum UDP send successes before accepting non-QUIC propagation.
73const MIN_UDP_SUCCESSES_FOR_ACCEPT: u64 = 16;
74/// Minimum QUIC send successes required before accepting direct propagation.
75const MIN_QUIC_SUCCESSES_FOR_ACCEPT: u64 = 2;
76/// Minimum distinct QUIC targets (identity/address) required before accepting propagation.
77const MIN_DISTINCT_QUIC_TARGETS_FOR_ACCEPT: usize = 2;
78/// Maximum number of QUIC-candidate targets per attempt, as a multiple of parallel send width.
79const QUIC_CANDIDATE_PARALLEL_MULTIPLIER: usize = 8;
80
81/// Per-target send outcome collected from concurrent UDP/QUIC attempts.
82#[derive(Debug, Clone)]
83struct TargetSendResult {
84    /// The target that was attempted.
85    target: LeaderTarget,
86    /// Whether the UDP send call completed successfully.
87    udp_success: bool,
88    /// Whether at least one QUIC candidate send completed successfully.
89    quic_success: bool,
90}
91
92#[async_trait]
93impl DirectSubmitTransport for UdpDirectTransport {
94    async fn submit_direct(
95        &self,
96        tx_bytes: &[u8],
97        targets: &[LeaderTarget],
98        policy: RoutingPolicy,
99        config: &DirectSubmitConfig,
100    ) -> Result<LeaderTarget, SubmitTransportError> {
101        let config = config.clone().normalized();
102        if targets.is_empty() {
103            return Err(SubmitTransportError::Config {
104                message: "no targets provided".to_owned(),
105            });
106        }
107
108        let socket = Arc::new(UdpSocket::bind("0.0.0.0:0").await.map_err(|error| {
109            SubmitTransportError::Failure {
110                message: error.to_string(),
111            }
112        })?);
113        let quic_cache = self.quic_cache.clone();
114        let payload: Arc<[u8]> = Arc::from(tx_bytes.to_vec());
115        let quic_enabled = quic_cache.is_some();
116        let effective_global_timeout = if quic_enabled {
117            config.global_timeout.max(Duration::from_secs(4))
118        } else {
119            config.global_timeout
120        };
121
122        let deadline = Instant::now()
123            .checked_add(effective_global_timeout)
124            .ok_or_else(|| SubmitTransportError::Failure {
125                message: "failed to calculate direct-submit deadline".to_owned(),
126            })?;
127        let normalized_policy = policy.normalized();
128        let quic_timeout = quic_timeout(config.per_target_timeout);
129        let quic_candidate_count = targets.len().min(
130            normalized_policy
131                .max_parallel_sends
132                .saturating_mul(QUIC_CANDIDATE_PARALLEL_MULTIPLIER)
133                .max(32),
134        );
135        let available_distinct_quic_targets = targets
136            .get(..quic_candidate_count)
137            .map_or(0, count_distinct_quic_targets);
138        let required_quic_successes =
139            required_quic_successes(quic_candidate_count, available_distinct_quic_targets);
140        let required_distinct_quic_targets =
141            required_distinct_quic_targets(available_distinct_quic_targets);
142        let mut udp_successes = 0_u64;
143        let mut first_udp_success = None::<LeaderTarget>;
144        let mut quic_successes = 0_u64;
145        let mut first_quic_success = None::<LeaderTarget>;
146        let mut quic_success_identities = HashSet::new();
147        let mut quic_success_addrs = HashSet::new();
148
149        for round in 0..config.direct_target_rounds {
150            if round > 0 {
151                let now = Instant::now();
152                if now >= deadline {
153                    break;
154                }
155                let remaining = deadline.saturating_duration_since(now);
156                let sleep_for = remaining.min(config.rebroadcast_interval);
157                if !sleep_for.is_zero() {
158                    sleep(sleep_for).await;
159                }
160            }
161            let mut target_index = 0_usize;
162            for chunk in targets.chunks(normalized_policy.max_parallel_sends) {
163                let now = Instant::now();
164                if now >= deadline {
165                    if quic_cache.is_none()
166                        && let Some(target) = first_udp_success
167                    {
168                        return Ok(target);
169                    }
170                    break;
171                }
172                let remaining = deadline.saturating_duration_since(now);
173                let per_target_udp_timeout = remaining.min(config.per_target_timeout);
174                let per_target_quic_timeout = remaining.min(quic_timeout);
175                let mut in_flight = JoinSet::new();
176
177                for target in chunk {
178                    let socket = Arc::clone(&socket);
179                    let payload = Arc::clone(&payload);
180                    let target = target.clone();
181                    let quic_cache = quic_cache.clone();
182                    let use_quic = quic_cache.is_some() && target_index < quic_candidate_count;
183                    target_index = target_index.saturating_add(1);
184                    in_flight.spawn(async move {
185                        send_target(
186                            socket,
187                            payload,
188                            target,
189                            per_target_udp_timeout,
190                            per_target_quic_timeout,
191                            quic_cache,
192                            use_quic,
193                        )
194                        .await
195                    });
196                }
197
198                while let Some(result) = in_flight.join_next().await {
199                    if let Ok(send_result) = result {
200                        if send_result.udp_success {
201                            udp_successes = udp_successes.saturating_add(1);
202                            if first_udp_success.is_none() {
203                                first_udp_success = Some(send_result.target.clone());
204                            }
205                        }
206                        if send_result.quic_success {
207                            quic_successes = quic_successes.saturating_add(1);
208                            if first_quic_success.is_none() {
209                                first_quic_success = Some(send_result.target.clone());
210                            }
211                            if let Some(identity) = send_result.target.identity {
212                                let _ = quic_success_identities.insert(identity);
213                            } else {
214                                let _ = quic_success_addrs.insert(send_result.target.tpu_addr);
215                            }
216                            let distinct_quic_targets = quic_success_identities
217                                .len()
218                                .saturating_add(quic_success_addrs.len());
219                            if quic_successes >= required_quic_successes
220                                && distinct_quic_targets >= required_distinct_quic_targets
221                                && let Some(target) = first_quic_success.clone()
222                            {
223                                return Ok(target);
224                            }
225                        }
226                        if quic_cache.is_none() && send_result.udp_success {
227                            return Ok(send_result.target);
228                        }
229                        if quic_cache.is_none()
230                            && udp_successes >= MIN_UDP_SUCCESSES_FOR_ACCEPT
231                            && let Some(target) = first_udp_success.clone()
232                        {
233                            return Ok(target);
234                        }
235                    }
236                }
237            }
238        }
239
240        if quic_cache.is_some() {
241            let distinct_quic_targets = quic_success_identities
242                .len()
243                .saturating_add(quic_success_addrs.len());
244            if quic_successes >= required_quic_successes
245                && distinct_quic_targets >= required_distinct_quic_targets
246                && let Some(target) = first_quic_success
247            {
248                return Ok(target);
249            }
250            if let Some(target) = first_udp_success {
251                return Ok(target);
252            }
253            return Err(SubmitTransportError::Failure {
254                message: format!(
255                    "direct propagation threshold not met (quic_successes={quic_successes}, distinct_quic_targets={distinct_quic_targets}, required_quic_successes={required_quic_successes}, required_distinct_quic_targets={required_distinct_quic_targets}, udp_successes={udp_successes}, quic_candidates={quic_candidate_count}, timeout_ms={})",
256                    effective_global_timeout.as_millis()
257                ),
258            });
259        }
260
261        if let Some(target) = first_udp_success {
262            return Ok(target);
263        }
264
265        Err(SubmitTransportError::Failure {
266            message: format!(
267                "all direct targets failed (udp_successes={udp_successes}, quic_successes=0, quic_candidates={quic_candidate_count})"
268            ),
269        })
270    }
271}
272
273/// Builds the shared QUIC connection cache used for optional direct sends.
274fn quic_connection_cache() -> Result<QuicConnectionCache, SubmitTransportError> {
275    let config = QuicConfig::new().map_err(|error| SubmitTransportError::Failure {
276        message: format!("failed to create quic config: {error}"),
277    })?;
278    let manager = QuicConnectionManager::new_with_connection_config(config);
279    QuicConnectionCache::new(QUIC_CACHE_NAME, manager, QUIC_CONNECTION_POOL_SIZE).map_err(|error| {
280        SubmitTransportError::Failure {
281            message: format!("failed to create quic connection cache: {error}"),
282        }
283    })
284}
285
286/// Expands short caller timeouts so QUIC handshakes have a usable floor.
287fn quic_timeout(per_target_timeout: Duration) -> Duration {
288    let minimum = Duration::from_millis(1_000);
289    if per_target_timeout < minimum {
290        minimum
291    } else {
292        per_target_timeout
293    }
294}
295
296/// Counts the distinct identities or addresses in the QUIC candidate set.
297fn count_distinct_quic_targets(targets: &[LeaderTarget]) -> usize {
298    targets
299        .iter()
300        .map(|target| {
301            target.identity.map_or(
302                DistinctTargetKey::Addr(target.tpu_addr),
303                DistinctTargetKey::Identity,
304            )
305        })
306        .collect::<HashSet<_>>()
307        .len()
308}
309
310/// Scales the QUIC success requirement down for small candidate sets.
311fn required_quic_successes(candidate_count: usize, available_distinct_targets: usize) -> u64 {
312    let required_by_candidates = u64::try_from(candidate_count).unwrap_or(u64::MAX);
313    let required_by_distinct = u64::try_from(available_distinct_targets).unwrap_or(u64::MAX);
314    MIN_QUIC_SUCCESSES_FOR_ACCEPT
315        .min(required_by_candidates.max(1))
316        .min(required_by_distinct.max(1))
317}
318
319/// Scales the distinct-target threshold down for sparse target sets.
320fn required_distinct_quic_targets(available_distinct_targets: usize) -> usize {
321    MIN_DISTINCT_QUIC_TARGETS_FOR_ACCEPT.min(available_distinct_targets.max(1))
322}
323
324/// Key used to count unique QUIC targets using the same identity/address semantics as submits.
325#[derive(Debug, Clone, Copy, Eq, Hash, PartialEq)]
326enum DistinctTargetKey {
327    /// Deduplicate targets that share the same validator identity.
328    Identity(solana_pubkey::Pubkey),
329    /// Fallback key for targets without a known validator identity.
330    Addr(SocketAddr),
331}
332
333/// Sends one payload to a target over UDP and optionally QUIC in parallel.
334async fn send_target(
335    socket: Arc<UdpSocket>,
336    payload: Arc<[u8]>,
337    target: LeaderTarget,
338    udp_timeout: Duration,
339    quic_timeout: Duration,
340    quic_cache: Option<Arc<QuicConnectionCache>>,
341    use_quic: bool,
342) -> TargetSendResult {
343    let udp_success = matches!(
344        timeout(
345            udp_timeout,
346            socket.send_to(payload.as_ref(), target.tpu_addr)
347        )
348        .await,
349        Ok(Ok(_))
350    );
351
352    let quic_success = if use_quic {
353        send_quic(quic_cache, payload.as_ref(), target.tpu_addr, quic_timeout).await
354    } else {
355        false
356    };
357
358    TargetSendResult {
359        target,
360        udp_success,
361        quic_success,
362    }
363}
364
365/// Attempts QUIC sends against the target's candidate QUIC addresses.
366async fn send_quic(
367    quic_cache: Option<Arc<QuicConnectionCache>>,
368    payload: &[u8],
369    target: SocketAddr,
370    timeout_budget: Duration,
371) -> bool {
372    let Some(quic_cache) = quic_cache else {
373        return false;
374    };
375    let candidate_addrs = quic_candidate_addrs(target);
376    let payload: Arc<[u8]> = Arc::from(payload.to_vec());
377    let mut in_flight = JoinSet::new();
378    for addr in candidate_addrs {
379        let connection = quic_cache.get_nonblocking_connection(&addr);
380        let payload = Arc::clone(&payload);
381        in_flight.spawn(async move {
382            matches!(
383                timeout(timeout_budget, connection.send_data(payload.as_ref())).await,
384                Ok(Ok(()))
385            )
386        });
387    }
388    while let Some(result) = in_flight.join_next().await {
389        if matches!(result, Ok(true)) {
390            in_flight.abort_all();
391            return true;
392        }
393    }
394    false
395}
396
397/// Expands one TPU address into the candidate QUIC addresses to probe.
398fn quic_candidate_addrs(target: SocketAddr) -> Vec<SocketAddr> {
399    let mut addrs = Vec::with_capacity(2);
400    addrs.push(target);
401    if let Some(quic_fallback) = with_agave_quic_fallback(target)
402        && quic_fallback != target
403    {
404        addrs.push(quic_fallback);
405    }
406    addrs
407}
408
409/// Applies the standard Agave TPU-to-QUIC port offset when it fits in `u16`.
410fn with_agave_quic_fallback(addr: SocketAddr) -> Option<SocketAddr> {
411    let mut quic_addr = addr;
412    let port = quic_addr.port().checked_add(AGAVE_QUIC_PORT_OFFSET)?;
413    quic_addr.set_port(port);
414    Some(quic_addr)
415}
416
417#[cfg(test)]
418mod tests {
419    use super::*;
420
421    #[test]
422    fn quic_requirements_scale_down_for_single_target() {
423        assert_eq!(required_quic_successes(1, 1), 1);
424        assert_eq!(required_distinct_quic_targets(1), 1);
425    }
426
427    #[test]
428    fn quic_requirements_keep_default_for_multi_target_sets() {
429        assert_eq!(required_quic_successes(4, 4), MIN_QUIC_SUCCESSES_FOR_ACCEPT);
430        assert_eq!(
431            required_distinct_quic_targets(4),
432            MIN_DISTINCT_QUIC_TARGETS_FOR_ACCEPT
433        );
434    }
435
436    #[test]
437    fn quic_distinct_target_count_deduplicates_same_identity() {
438        let identity = solana_pubkey::Pubkey::new_unique();
439        let targets = vec![
440            LeaderTarget::new(Some(identity), SocketAddr::from(([127, 0, 0, 1], 9001))),
441            LeaderTarget::new(Some(identity), SocketAddr::from(([127, 0, 0, 1], 9002))),
442        ];
443        assert_eq!(count_distinct_quic_targets(&targets), 1);
444    }
445}