Skip to main content

sof_tx/submit/
direct.rs

1//! Direct-submit transport implementation.
2
3use std::{
4    collections::HashSet,
5    fmt,
6    net::SocketAddr,
7    sync::Arc,
8    time::{Duration, Instant},
9};
10
11use async_trait::async_trait;
12use sof_types::PubkeyBytes;
13use solana_connection_cache::{
14    connection_cache::NewConnectionConfig, nonblocking::client_connection::ClientConnection,
15};
16use solana_quic_client::{QuicConfig, QuicConnectionCache, QuicConnectionManager};
17use tokio::{
18    net::UdpSocket,
19    task::JoinSet,
20    time::{sleep, timeout},
21};
22
23use super::{DirectSubmitConfig, DirectSubmitTransport, SubmitTransportError};
24use crate::{providers::LeaderTarget, routing::RoutingPolicy};
25
26/// UDP-based direct transport that sends transaction bytes to TPU targets.
27#[derive(Clone)]
28pub struct UdpDirectTransport {
29    /// Optional shared QUIC connection cache enabled by environment.
30    quic_cache: Option<Arc<QuicConnectionCache>>,
31}
32
33impl fmt::Debug for UdpDirectTransport {
34    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
35        f.debug_struct("UdpDirectTransport")
36            .field("quic_enabled", &self.quic_cache.is_some())
37            .finish()
38    }
39}
40
41impl Default for UdpDirectTransport {
42    fn default() -> Self {
43        Self::new()
44    }
45}
46
47impl UdpDirectTransport {
48    /// Creates a direct transport with optional shared QUIC cache.
49    #[must_use]
50    pub fn new() -> Self {
51        let quic_enabled = std::env::var("SOF_TX_ENABLE_QUIC_DIRECT")
52            .map(|value| {
53                let normalized = value.trim().to_ascii_lowercase();
54                matches!(normalized.as_str(), "1" | "true" | "yes" | "on")
55            })
56            .unwrap_or(false);
57        Self {
58            quic_cache: if quic_enabled {
59                quic_connection_cache().ok().map(Arc::new)
60            } else {
61                None
62            },
63        }
64    }
65}
66
67/// Number of pooled QUIC connections per target.
68const QUIC_CONNECTION_POOL_SIZE: usize = 1;
69/// Name tag used by connection-cache metrics.
70const QUIC_CACHE_NAME: &str = "sof-tx-direct-quic";
71/// Agave QUIC port offset used by TPU clients when `tpu_quic` is unavailable.
72const AGAVE_QUIC_PORT_OFFSET: u16 = 6;
73/// Minimum UDP send successes before accepting non-QUIC propagation.
74const MIN_UDP_SUCCESSES_FOR_ACCEPT: u64 = 16;
75/// Minimum QUIC send successes required before accepting direct propagation.
76const MIN_QUIC_SUCCESSES_FOR_ACCEPT: u64 = 2;
77/// Minimum distinct QUIC targets (identity/address) required before accepting propagation.
78const MIN_DISTINCT_QUIC_TARGETS_FOR_ACCEPT: usize = 2;
79/// Maximum number of QUIC-candidate targets per attempt, as a multiple of parallel send width.
80const QUIC_CANDIDATE_PARALLEL_MULTIPLIER: usize = 8;
81
82/// Per-target send outcome collected from concurrent UDP/QUIC attempts.
83#[derive(Debug, Clone)]
84struct TargetSendResult {
85    /// The target that was attempted.
86    target: LeaderTarget,
87    /// Whether the UDP send call completed successfully.
88    udp_success: bool,
89    /// Whether at least one QUIC candidate send completed successfully.
90    quic_success: bool,
91}
92
93#[async_trait]
94impl DirectSubmitTransport for UdpDirectTransport {
95    async fn submit_direct(
96        &self,
97        tx_bytes: &[u8],
98        targets: &[LeaderTarget],
99        policy: RoutingPolicy,
100        config: &DirectSubmitConfig,
101    ) -> Result<LeaderTarget, SubmitTransportError> {
102        let config = config.clone().normalized();
103        if targets.is_empty() {
104            return Err(SubmitTransportError::Config {
105                message: "no targets provided".to_owned(),
106            });
107        }
108
109        let socket = Arc::new(UdpSocket::bind("0.0.0.0:0").await.map_err(|error| {
110            SubmitTransportError::Failure {
111                message: error.to_string(),
112            }
113        })?);
114        let quic_cache = self.quic_cache.clone();
115        let payload: Arc<[u8]> = Arc::from(tx_bytes.to_vec());
116        let quic_enabled = quic_cache.is_some();
117        let effective_global_timeout = if quic_enabled {
118            config.global_timeout.max(Duration::from_secs(4))
119        } else {
120            config.global_timeout
121        };
122
123        let deadline = Instant::now()
124            .checked_add(effective_global_timeout)
125            .ok_or_else(|| SubmitTransportError::Failure {
126                message: "failed to calculate direct-submit deadline".to_owned(),
127            })?;
128        let normalized_policy = policy.normalized();
129        let quic_timeout = quic_timeout(config.per_target_timeout);
130        let quic_candidate_count = targets.len().min(
131            normalized_policy
132                .max_parallel_sends
133                .saturating_mul(QUIC_CANDIDATE_PARALLEL_MULTIPLIER)
134                .max(32),
135        );
136        let available_distinct_quic_targets = targets
137            .get(..quic_candidate_count)
138            .map_or(0, count_distinct_quic_targets);
139        let required_quic_successes =
140            required_quic_successes(quic_candidate_count, available_distinct_quic_targets);
141        let required_distinct_quic_targets =
142            required_distinct_quic_targets(available_distinct_quic_targets);
143        let mut udp_successes = 0_u64;
144        let mut first_udp_success = None::<LeaderTarget>;
145        let mut quic_successes = 0_u64;
146        let mut first_quic_success = None::<LeaderTarget>;
147        let mut quic_success_identities = HashSet::new();
148        let mut quic_success_addrs = HashSet::new();
149
150        for round in 0..config.direct_target_rounds {
151            if round > 0 {
152                let now = Instant::now();
153                if now >= deadline {
154                    break;
155                }
156                let remaining = deadline.saturating_duration_since(now);
157                let sleep_for = remaining.min(config.rebroadcast_interval);
158                if !sleep_for.is_zero() {
159                    sleep(sleep_for).await;
160                }
161            }
162            let mut target_index = 0_usize;
163            for chunk in targets.chunks(normalized_policy.max_parallel_sends) {
164                let now = Instant::now();
165                if now >= deadline {
166                    if quic_cache.is_none()
167                        && let Some(target) = first_udp_success
168                    {
169                        return Ok(target);
170                    }
171                    break;
172                }
173                let remaining = deadline.saturating_duration_since(now);
174                let per_target_udp_timeout = remaining.min(config.per_target_timeout);
175                let per_target_quic_timeout = remaining.min(quic_timeout);
176                let mut in_flight = JoinSet::new();
177
178                for target in chunk {
179                    let socket = Arc::clone(&socket);
180                    let payload = Arc::clone(&payload);
181                    let target = target.clone();
182                    let quic_cache = quic_cache.clone();
183                    let use_quic = quic_cache.is_some() && target_index < quic_candidate_count;
184                    target_index = target_index.saturating_add(1);
185                    in_flight.spawn(async move {
186                        send_target(
187                            socket,
188                            payload,
189                            target,
190                            per_target_udp_timeout,
191                            per_target_quic_timeout,
192                            quic_cache,
193                            use_quic,
194                        )
195                        .await
196                    });
197                }
198
199                while let Some(result) = in_flight.join_next().await {
200                    if let Ok(send_result) = result {
201                        if send_result.udp_success {
202                            udp_successes = udp_successes.saturating_add(1);
203                            if first_udp_success.is_none() {
204                                first_udp_success = Some(send_result.target.clone());
205                            }
206                        }
207                        if send_result.quic_success {
208                            quic_successes = quic_successes.saturating_add(1);
209                            if first_quic_success.is_none() {
210                                first_quic_success = Some(send_result.target.clone());
211                            }
212                            if let Some(identity) = send_result.target.identity {
213                                let _ = quic_success_identities.insert(identity);
214                            } else {
215                                let _ = quic_success_addrs.insert(send_result.target.tpu_addr);
216                            }
217                            let distinct_quic_targets = quic_success_identities
218                                .len()
219                                .saturating_add(quic_success_addrs.len());
220                            if quic_successes >= required_quic_successes
221                                && distinct_quic_targets >= required_distinct_quic_targets
222                                && let Some(target) = first_quic_success.clone()
223                            {
224                                return Ok(target);
225                            }
226                        }
227                        if quic_cache.is_none()
228                            && udp_successes >= MIN_UDP_SUCCESSES_FOR_ACCEPT
229                            && let Some(target) = first_udp_success.clone()
230                        {
231                            return Ok(target);
232                        }
233                    }
234                }
235            }
236        }
237
238        if quic_cache.is_some() {
239            let distinct_quic_targets = quic_success_identities
240                .len()
241                .saturating_add(quic_success_addrs.len());
242            if quic_successes >= required_quic_successes
243                && distinct_quic_targets >= required_distinct_quic_targets
244                && let Some(target) = first_quic_success
245            {
246                return Ok(target);
247            }
248            if let Some(target) = first_udp_success {
249                return Ok(target);
250            }
251            return Err(SubmitTransportError::Failure {
252                message: format!(
253                    "direct propagation threshold not met (quic_successes={quic_successes}, distinct_quic_targets={distinct_quic_targets}, required_quic_successes={required_quic_successes}, required_distinct_quic_targets={required_distinct_quic_targets}, udp_successes={udp_successes}, quic_candidates={quic_candidate_count}, timeout_ms={})",
254                    effective_global_timeout.as_millis()
255                ),
256            });
257        }
258
259        if let Some(target) = first_udp_success {
260            return Ok(target);
261        }
262
263        Err(SubmitTransportError::Failure {
264            message: format!(
265                "all direct targets failed (udp_successes={udp_successes}, quic_successes=0, quic_candidates={quic_candidate_count})"
266            ),
267        })
268    }
269}
270
271/// Builds the shared QUIC connection cache used for optional direct sends.
272fn quic_connection_cache() -> Result<QuicConnectionCache, SubmitTransportError> {
273    let config = QuicConfig::new().map_err(|error| SubmitTransportError::Failure {
274        message: format!("failed to create quic config: {error}"),
275    })?;
276    let manager = QuicConnectionManager::new_with_connection_config(config);
277    QuicConnectionCache::new(QUIC_CACHE_NAME, manager, QUIC_CONNECTION_POOL_SIZE).map_err(|error| {
278        SubmitTransportError::Failure {
279            message: format!("failed to create quic connection cache: {error}"),
280        }
281    })
282}
283
284/// Expands short caller timeouts so QUIC handshakes have a usable floor.
285fn quic_timeout(per_target_timeout: Duration) -> Duration {
286    let minimum = Duration::from_millis(1_000);
287    if per_target_timeout < minimum {
288        minimum
289    } else {
290        per_target_timeout
291    }
292}
293
294/// Counts the distinct identities or addresses in the QUIC candidate set.
295fn count_distinct_quic_targets(targets: &[LeaderTarget]) -> usize {
296    targets
297        .iter()
298        .map(|target| {
299            target.identity.map_or(
300                DistinctTargetKey::Addr(target.tpu_addr),
301                DistinctTargetKey::Identity,
302            )
303        })
304        .collect::<HashSet<_>>()
305        .len()
306}
307
308/// Scales the QUIC success requirement down for small candidate sets.
309fn required_quic_successes(candidate_count: usize, available_distinct_targets: usize) -> u64 {
310    let required_by_candidates = u64::try_from(candidate_count).unwrap_or(u64::MAX);
311    let required_by_distinct = u64::try_from(available_distinct_targets).unwrap_or(u64::MAX);
312    MIN_QUIC_SUCCESSES_FOR_ACCEPT
313        .min(required_by_candidates.max(1))
314        .min(required_by_distinct.max(1))
315}
316
317/// Scales the distinct-target threshold down for sparse target sets.
318fn required_distinct_quic_targets(available_distinct_targets: usize) -> usize {
319    MIN_DISTINCT_QUIC_TARGETS_FOR_ACCEPT.min(available_distinct_targets.max(1))
320}
321
322/// Key used to count unique QUIC targets using the same identity/address semantics as submits.
323#[derive(Debug, Clone, Copy, Eq, Hash, PartialEq)]
324enum DistinctTargetKey {
325    /// Deduplicate targets that share the same validator identity.
326    Identity(PubkeyBytes),
327    /// Fallback key for targets without a known validator identity.
328    Addr(SocketAddr),
329}
330
331/// Sends one payload to a target over UDP and optionally QUIC in parallel.
332async fn send_target(
333    socket: Arc<UdpSocket>,
334    payload: Arc<[u8]>,
335    target: LeaderTarget,
336    udp_timeout: Duration,
337    quic_timeout: Duration,
338    quic_cache: Option<Arc<QuicConnectionCache>>,
339    use_quic: bool,
340) -> TargetSendResult {
341    let udp_success = matches!(
342        timeout(
343            udp_timeout,
344            socket.send_to(payload.as_ref(), target.tpu_addr)
345        )
346        .await,
347        Ok(Ok(_))
348    );
349
350    let quic_success = if use_quic {
351        send_quic(quic_cache, payload.as_ref(), target.tpu_addr, quic_timeout).await
352    } else {
353        false
354    };
355
356    TargetSendResult {
357        target,
358        udp_success,
359        quic_success,
360    }
361}
362
363/// Attempts QUIC sends against the target's candidate QUIC addresses.
364async fn send_quic(
365    quic_cache: Option<Arc<QuicConnectionCache>>,
366    payload: &[u8],
367    target: SocketAddr,
368    timeout_budget: Duration,
369) -> bool {
370    let Some(quic_cache) = quic_cache else {
371        return false;
372    };
373    let candidate_addrs = quic_candidate_addrs(target);
374    let payload: Arc<[u8]> = Arc::from(payload.to_vec());
375    let mut in_flight = JoinSet::new();
376    for addr in candidate_addrs {
377        let connection = quic_cache.get_nonblocking_connection(&addr);
378        let payload = Arc::clone(&payload);
379        in_flight.spawn(async move {
380            matches!(
381                timeout(timeout_budget, connection.send_data(payload.as_ref())).await,
382                Ok(Ok(()))
383            )
384        });
385    }
386    while let Some(result) = in_flight.join_next().await {
387        if matches!(result, Ok(true)) {
388            in_flight.abort_all();
389            return true;
390        }
391    }
392    false
393}
394
395/// Expands one TPU address into the candidate QUIC addresses to probe.
396fn quic_candidate_addrs(target: SocketAddr) -> Vec<SocketAddr> {
397    let mut addrs = Vec::with_capacity(2);
398    addrs.push(target);
399    if let Some(quic_fallback) = with_agave_quic_fallback(target)
400        && quic_fallback != target
401    {
402        addrs.push(quic_fallback);
403    }
404    addrs
405}
406
407/// Applies the standard Agave TPU-to-QUIC port offset when it fits in `u16`.
408fn with_agave_quic_fallback(addr: SocketAddr) -> Option<SocketAddr> {
409    let mut quic_addr = addr;
410    let port = quic_addr.port().checked_add(AGAVE_QUIC_PORT_OFFSET)?;
411    quic_addr.set_port(port);
412    Some(quic_addr)
413}
414
415#[cfg(test)]
416mod tests {
417    use super::*;
418
419    #[test]
420    fn quic_requirements_scale_down_for_single_target() {
421        assert_eq!(required_quic_successes(1, 1), 1);
422        assert_eq!(required_distinct_quic_targets(1), 1);
423    }
424
425    #[test]
426    fn quic_requirements_keep_default_for_multi_target_sets() {
427        assert_eq!(required_quic_successes(4, 4), MIN_QUIC_SUCCESSES_FOR_ACCEPT);
428        assert_eq!(
429            required_distinct_quic_targets(4),
430            MIN_DISTINCT_QUIC_TARGETS_FOR_ACCEPT
431        );
432    }
433
434    #[test]
435    fn quic_distinct_target_count_deduplicates_same_identity() {
436        let identity = PubkeyBytes::from_solana(solana_pubkey::Pubkey::new_unique());
437        let targets = vec![
438            LeaderTarget::new(Some(identity), SocketAddr::from(([127, 0, 0, 1], 9001))),
439            LeaderTarget::new(Some(identity), SocketAddr::from(([127, 0, 0, 1], 9002))),
440        ];
441        assert_eq!(count_distinct_quic_targets(&targets), 1);
442    }
443}