Skip to main content

sof_tx/submit/
client.rs

1//! Submission client implementation and mode orchestration.
2
3use std::{
4    collections::HashSet,
5    net::SocketAddr,
6    sync::Arc,
7    time::{Duration, Instant},
8};
9
10use solana_signature::Signature;
11use solana_signer::signers::Signers;
12use solana_transaction::versioned::VersionedTransaction;
13use tokio::{
14    net::TcpStream,
15    task::JoinSet,
16    time::{sleep, timeout},
17};
18
19use super::{
20    DirectSubmitConfig, DirectSubmitTransport, RpcSubmitConfig, RpcSubmitTransport, SignedTx,
21    SubmitError, SubmitMode, SubmitReliability, SubmitResult,
22};
23use crate::{
24    builder::TxBuilder,
25    providers::{LeaderProvider, LeaderTarget, RecentBlockhashProvider},
26    routing::{RoutingPolicy, SignatureDeduper, select_targets},
27};
28
29/// Transaction submission client that orchestrates RPC and direct submit modes.
30pub struct TxSubmitClient {
31    /// Blockhash source used by builder submit path.
32    blockhash_provider: Arc<dyn RecentBlockhashProvider>,
33    /// Leader source used by direct/hybrid paths.
34    leader_provider: Arc<dyn LeaderProvider>,
35    /// Optional backup validator targets.
36    backups: Vec<LeaderTarget>,
37    /// Direct routing policy.
38    policy: RoutingPolicy,
39    /// Signature dedupe window.
40    deduper: SignatureDeduper,
41    /// Optional RPC transport.
42    rpc_transport: Option<Arc<dyn RpcSubmitTransport>>,
43    /// Optional direct transport.
44    direct_transport: Option<Arc<dyn DirectSubmitTransport>>,
45    /// RPC tuning.
46    rpc_config: RpcSubmitConfig,
47    /// Direct tuning.
48    direct_config: DirectSubmitConfig,
49}
50
51impl TxSubmitClient {
52    /// Creates a submission client with no transports preconfigured.
53    #[must_use]
54    pub fn new(
55        blockhash_provider: Arc<dyn RecentBlockhashProvider>,
56        leader_provider: Arc<dyn LeaderProvider>,
57    ) -> Self {
58        Self {
59            blockhash_provider,
60            leader_provider,
61            backups: Vec::new(),
62            policy: RoutingPolicy::default(),
63            deduper: SignatureDeduper::new(Duration::from_secs(10)),
64            rpc_transport: None,
65            direct_transport: None,
66            rpc_config: RpcSubmitConfig::default(),
67            direct_config: DirectSubmitConfig::default(),
68        }
69    }
70
71    /// Sets optional backup validators.
72    #[must_use]
73    pub fn with_backups(mut self, backups: Vec<LeaderTarget>) -> Self {
74        self.backups = backups;
75        self
76    }
77
78    /// Sets routing policy.
79    #[must_use]
80    pub fn with_routing_policy(mut self, policy: RoutingPolicy) -> Self {
81        self.policy = policy.normalized();
82        self
83    }
84
85    /// Sets dedupe TTL.
86    #[must_use]
87    pub fn with_dedupe_ttl(mut self, ttl: Duration) -> Self {
88        self.deduper = SignatureDeduper::new(ttl);
89        self
90    }
91
92    /// Sets RPC transport.
93    #[must_use]
94    pub fn with_rpc_transport(mut self, transport: Arc<dyn RpcSubmitTransport>) -> Self {
95        self.rpc_transport = Some(transport);
96        self
97    }
98
99    /// Sets direct transport.
100    #[must_use]
101    pub fn with_direct_transport(mut self, transport: Arc<dyn DirectSubmitTransport>) -> Self {
102        self.direct_transport = Some(transport);
103        self
104    }
105
106    /// Sets RPC submit tuning.
107    #[must_use]
108    pub fn with_rpc_config(mut self, config: RpcSubmitConfig) -> Self {
109        self.rpc_config = config;
110        self
111    }
112
113    /// Sets direct submit tuning.
114    #[must_use]
115    pub const fn with_direct_config(mut self, config: DirectSubmitConfig) -> Self {
116        self.direct_config = config.normalized();
117        self
118    }
119
120    /// Sets direct/hybrid reliability profile.
121    #[must_use]
122    pub const fn with_reliability(mut self, reliability: SubmitReliability) -> Self {
123        self.direct_config = DirectSubmitConfig::from_reliability(reliability);
124        self
125    }
126
127    /// Builds, signs, and submits a transaction in one API call.
128    ///
129    /// # Errors
130    ///
131    /// Returns [`SubmitError`] when blockhash lookup, signing, dedupe, routing, or submission
132    /// fails.
133    pub async fn submit_builder<T>(
134        &mut self,
135        builder: TxBuilder,
136        signers: &T,
137        mode: SubmitMode,
138    ) -> Result<SubmitResult, SubmitError>
139    where
140        T: Signers + ?Sized,
141    {
142        let blockhash = self
143            .blockhash_provider
144            .latest_blockhash()
145            .ok_or(SubmitError::MissingRecentBlockhash)?;
146        let tx = builder
147            .build_and_sign(blockhash, signers)
148            .map_err(|source| SubmitError::Build { source })?;
149        self.submit_transaction(tx, mode).await
150    }
151
152    /// Submits one signed `VersionedTransaction`.
153    ///
154    /// # Errors
155    ///
156    /// Returns [`SubmitError`] when encoding, dedupe, routing, or submission fails.
157    pub async fn submit_transaction(
158        &mut self,
159        tx: VersionedTransaction,
160        mode: SubmitMode,
161    ) -> Result<SubmitResult, SubmitError> {
162        let signature = tx.signatures.first().copied();
163        let tx_bytes =
164            bincode::serialize(&tx).map_err(|source| SubmitError::DecodeSignedBytes { source })?;
165        self.submit_bytes(tx_bytes, signature, mode).await
166    }
167
168    /// Submits externally signed transaction bytes.
169    ///
170    /// # Errors
171    ///
172    /// Returns [`SubmitError`] when decoding, dedupe, routing, or submission fails.
173    pub async fn submit_signed(
174        &mut self,
175        signed_tx: SignedTx,
176        mode: SubmitMode,
177    ) -> Result<SubmitResult, SubmitError> {
178        let tx_bytes = match signed_tx {
179            SignedTx::VersionedTransactionBytes(bytes) => bytes,
180            SignedTx::WireTransactionBytes(bytes) => bytes,
181        };
182        let tx: VersionedTransaction = bincode::deserialize(&tx_bytes)
183            .map_err(|source| SubmitError::DecodeSignedBytes { source })?;
184        let signature = tx.signatures.first().copied();
185        self.submit_bytes(tx_bytes, signature, mode).await
186    }
187
188    /// Submits raw tx bytes after dedupe check.
189    async fn submit_bytes(
190        &mut self,
191        tx_bytes: Vec<u8>,
192        signature: Option<Signature>,
193        mode: SubmitMode,
194    ) -> Result<SubmitResult, SubmitError> {
195        self.enforce_dedupe(signature)?;
196        match mode {
197            SubmitMode::RpcOnly => self.submit_rpc_only(tx_bytes, signature, mode).await,
198            SubmitMode::DirectOnly => self.submit_direct_only(tx_bytes, signature, mode).await,
199            SubmitMode::Hybrid => self.submit_hybrid(tx_bytes, signature, mode).await,
200        }
201    }
202
203    /// Applies signature dedupe policy.
204    fn enforce_dedupe(&mut self, signature: Option<Signature>) -> Result<(), SubmitError> {
205        if let Some(signature) = signature {
206            let now = Instant::now();
207            if !self.deduper.check_and_insert(signature, now) {
208                return Err(SubmitError::DuplicateSignature);
209            }
210        }
211        Ok(())
212    }
213
214    /// Submits through RPC path only.
215    async fn submit_rpc_only(
216        &self,
217        tx_bytes: Vec<u8>,
218        signature: Option<Signature>,
219        mode: SubmitMode,
220    ) -> Result<SubmitResult, SubmitError> {
221        let rpc = self
222            .rpc_transport
223            .as_ref()
224            .ok_or(SubmitError::MissingRpcTransport)?;
225        let rpc_signature = rpc
226            .submit_rpc(&tx_bytes, &self.rpc_config)
227            .await
228            .map_err(|source| SubmitError::Rpc { source })?;
229        Ok(SubmitResult {
230            signature,
231            mode,
232            direct_target: None,
233            rpc_signature: Some(rpc_signature),
234            used_rpc_fallback: false,
235            selected_target_count: 0,
236            selected_identity_count: 0,
237        })
238    }
239
240    /// Submits through direct path only.
241    async fn submit_direct_only(
242        &self,
243        tx_bytes: Vec<u8>,
244        signature: Option<Signature>,
245        mode: SubmitMode,
246    ) -> Result<SubmitResult, SubmitError> {
247        let direct = self
248            .direct_transport
249            .as_ref()
250            .ok_or(SubmitError::MissingDirectTransport)?;
251        let direct_config = self.direct_config.clone().normalized();
252        let mut last_error = None;
253        let attempt_timeout = direct_attempt_timeout(&direct_config);
254
255        for attempt_idx in 0..direct_config.direct_submit_attempts {
256            let mut targets = self.select_direct_targets(&direct_config).await;
257            rotate_targets_for_attempt(&mut targets, attempt_idx, self.policy);
258            let (selected_target_count, selected_identity_count) = summarize_targets(&targets);
259            if targets.is_empty() {
260                return Err(SubmitError::NoDirectTargets);
261            }
262            match timeout(
263                attempt_timeout,
264                direct.submit_direct(&tx_bytes, &targets, self.policy, &direct_config),
265            )
266            .await
267            {
268                Ok(Ok(target)) => {
269                    self.spawn_agave_rebroadcast(tx_bytes.clone(), &direct_config);
270                    return Ok(SubmitResult {
271                        signature,
272                        mode,
273                        direct_target: Some(target),
274                        rpc_signature: None,
275                        used_rpc_fallback: false,
276                        selected_target_count,
277                        selected_identity_count,
278                    });
279                }
280                Ok(Err(source)) => last_error = Some(source),
281                Err(_elapsed) => {
282                    last_error = Some(super::SubmitTransportError::Failure {
283                        message: format!(
284                            "direct submit attempt timed out after {}ms",
285                            attempt_timeout.as_millis()
286                        ),
287                    });
288                }
289            }
290            if attempt_idx < direct_config.direct_submit_attempts.saturating_sub(1) {
291                sleep(direct_config.rebroadcast_interval).await;
292            }
293        }
294
295        Err(SubmitError::Direct {
296            source: last_error.unwrap_or_else(|| super::SubmitTransportError::Failure {
297                message: "direct submit attempts exhausted".to_owned(),
298            }),
299        })
300    }
301
302    /// Submits through hybrid mode (direct first, RPC fallback).
303    async fn submit_hybrid(
304        &self,
305        tx_bytes: Vec<u8>,
306        signature: Option<Signature>,
307        mode: SubmitMode,
308    ) -> Result<SubmitResult, SubmitError> {
309        let direct = self
310            .direct_transport
311            .as_ref()
312            .ok_or(SubmitError::MissingDirectTransport)?;
313        let rpc = self
314            .rpc_transport
315            .as_ref()
316            .ok_or(SubmitError::MissingRpcTransport)?;
317
318        let direct_config = self.direct_config.clone().normalized();
319        let attempt_timeout = direct_attempt_timeout(&direct_config);
320        for attempt_idx in 0..direct_config.hybrid_direct_attempts {
321            let mut targets = self.select_direct_targets(&direct_config).await;
322            rotate_targets_for_attempt(&mut targets, attempt_idx, self.policy);
323            let (selected_target_count, selected_identity_count) = summarize_targets(&targets);
324            if targets.is_empty() {
325                break;
326            }
327            if let Ok(Ok(target)) = timeout(
328                attempt_timeout,
329                direct.submit_direct(&tx_bytes, &targets, self.policy, &direct_config),
330            )
331            .await
332            {
333                self.spawn_agave_rebroadcast(tx_bytes.clone(), &direct_config);
334                if direct_config.hybrid_rpc_broadcast
335                    && let Ok(rpc_signature) = rpc.submit_rpc(&tx_bytes, &self.rpc_config).await
336                {
337                    return Ok(SubmitResult {
338                        signature,
339                        mode,
340                        direct_target: Some(target),
341                        rpc_signature: Some(rpc_signature),
342                        used_rpc_fallback: false,
343                        selected_target_count,
344                        selected_identity_count,
345                    });
346                }
347                return Ok(SubmitResult {
348                    signature,
349                    mode,
350                    direct_target: Some(target),
351                    rpc_signature: None,
352                    used_rpc_fallback: false,
353                    selected_target_count,
354                    selected_identity_count,
355                });
356            }
357            if attempt_idx < direct_config.hybrid_direct_attempts.saturating_sub(1) {
358                sleep(direct_config.rebroadcast_interval).await;
359            }
360        }
361
362        let rpc_signature = rpc
363            .submit_rpc(&tx_bytes, &self.rpc_config)
364            .await
365            .map_err(|source| SubmitError::Rpc { source })?;
366        Ok(SubmitResult {
367            signature,
368            mode,
369            direct_target: None,
370            rpc_signature: Some(rpc_signature),
371            used_rpc_fallback: true,
372            selected_target_count: 0,
373            selected_identity_count: 0,
374        })
375    }
376
377    /// Resolves and ranks the direct targets for the next submission attempt.
378    async fn select_direct_targets(&self, direct_config: &DirectSubmitConfig) -> Vec<LeaderTarget> {
379        select_and_rank_targets(
380            self.leader_provider.as_ref(),
381            &self.backups,
382            self.policy,
383            direct_config,
384        )
385        .await
386    }
387
388    /// Starts the post-ack rebroadcast worker when that reliability mode is enabled.
389    fn spawn_agave_rebroadcast(&self, tx_bytes: Vec<u8>, direct_config: &DirectSubmitConfig) {
390        if !direct_config.agave_rebroadcast_enabled
391            || direct_config.agave_rebroadcast_window.is_zero()
392        {
393            return;
394        }
395        let Some(direct_transport) = self.direct_transport.clone() else {
396            return;
397        };
398        spawn_agave_rebroadcast_task(
399            tx_bytes,
400            direct_transport,
401            self.leader_provider.clone(),
402            self.backups.clone(),
403            self.policy,
404            direct_config.clone(),
405        );
406    }
407}
408
409#[cfg(not(test))]
410/// Replays successful direct submissions for a bounded Agave-like persistence window.
411fn spawn_agave_rebroadcast_task(
412    tx_bytes: Vec<u8>,
413    direct_transport: Arc<dyn DirectSubmitTransport>,
414    leader_provider: Arc<dyn LeaderProvider>,
415    backups: Vec<LeaderTarget>,
416    policy: RoutingPolicy,
417    direct_config: DirectSubmitConfig,
418) {
419    tokio::spawn(async move {
420        let deadline = Instant::now()
421            .checked_add(direct_config.agave_rebroadcast_window)
422            .unwrap_or_else(Instant::now);
423        loop {
424            let now = Instant::now();
425            if now >= deadline {
426                break;
427            }
428
429            let sleep_for = deadline
430                .saturating_duration_since(now)
431                .min(direct_config.agave_rebroadcast_interval);
432            if !sleep_for.is_zero() {
433                sleep(sleep_for).await;
434            }
435
436            if Instant::now() >= deadline {
437                break;
438            }
439
440            let targets = select_and_rank_targets(
441                leader_provider.as_ref(),
442                backups.as_slice(),
443                policy,
444                &direct_config,
445            )
446            .await;
447            if targets.is_empty() {
448                continue;
449            }
450
451            drop(
452                timeout(
453                    direct_attempt_timeout(&direct_config),
454                    direct_transport.submit_direct(&tx_bytes, &targets, policy, &direct_config),
455                )
456                .await,
457            );
458        }
459    });
460}
461
462#[cfg(test)]
463/// Test-only stub that disables background rebroadcasting for deterministic assertions.
464fn spawn_agave_rebroadcast_task(
465    _tx_bytes: Vec<u8>,
466    _direct_transport: Arc<dyn DirectSubmitTransport>,
467    _leader_provider: Arc<dyn LeaderProvider>,
468    _backups: Vec<LeaderTarget>,
469    _policy: RoutingPolicy,
470    _direct_config: DirectSubmitConfig,
471) {
472}
473
474/// Selects routing targets and applies optional latency-aware ranking.
475async fn select_and_rank_targets(
476    leader_provider: &dyn LeaderProvider,
477    backups: &[LeaderTarget],
478    policy: RoutingPolicy,
479    direct_config: &DirectSubmitConfig,
480) -> Vec<LeaderTarget> {
481    let targets = select_targets(leader_provider, backups, policy);
482    rank_targets_by_latency(targets, direct_config).await
483}
484
485/// Reorders the probe set by observed TCP connect latency while preserving the tail order.
486async fn rank_targets_by_latency(
487    targets: Vec<LeaderTarget>,
488    direct_config: &DirectSubmitConfig,
489) -> Vec<LeaderTarget> {
490    if targets.len() <= 1 || !direct_config.latency_aware_targeting {
491        return targets;
492    }
493
494    let probe_count = targets
495        .len()
496        .min(direct_config.latency_probe_max_targets.max(1));
497    let mut latencies = vec![None; probe_count];
498    let mut probes = JoinSet::new();
499    for (idx, target) in targets.iter().take(probe_count).cloned().enumerate() {
500        let cfg = direct_config.clone();
501        probes.spawn(async move { (idx, probe_target_latency(&target, &cfg).await) });
502    }
503    while let Some(result) = probes.join_next().await {
504        if let Ok((idx, latency)) = result
505            && idx < latencies.len()
506            && let Some(slot) = latencies.get_mut(idx)
507        {
508            *slot = latency;
509        }
510    }
511
512    let mut ranked = targets
513        .iter()
514        .take(probe_count)
515        .cloned()
516        .enumerate()
517        .collect::<Vec<_>>();
518    ranked.sort_by_key(|(idx, _target)| {
519        (
520            latencies.get(*idx).copied().flatten().unwrap_or(u128::MAX),
521            *idx,
522        )
523    });
524
525    let mut output = ranked
526        .into_iter()
527        .map(|(_idx, target)| target)
528        .collect::<Vec<_>>();
529    output.extend(targets.iter().skip(probe_count).cloned());
530    output
531}
532
533/// Probes a target's candidate ports and keeps the best observed connect latency.
534async fn probe_target_latency(
535    target: &LeaderTarget,
536    direct_config: &DirectSubmitConfig,
537) -> Option<u128> {
538    let mut ports = vec![target.tpu_addr.port()];
539    if let Some(port) = direct_config.latency_probe_port
540        && port != target.tpu_addr.port()
541    {
542        ports.push(port);
543    }
544
545    let ip = target.tpu_addr.ip();
546    let mut best = None::<u128>;
547    for port in ports {
548        if let Some(latency) =
549            probe_tcp_latency(ip, port, direct_config.latency_probe_timeout).await
550        {
551            best = Some(best.map_or(latency, |current| current.min(latency)));
552        }
553    }
554    best
555}
556
557/// Measures one TCP connect attempt and returns elapsed milliseconds on success.
558async fn probe_tcp_latency(
559    ip: std::net::IpAddr,
560    port: u16,
561    timeout_duration: Duration,
562) -> Option<u128> {
563    let start = Instant::now();
564    let addr = SocketAddr::new(ip, port);
565    let stream = timeout(timeout_duration, TcpStream::connect(addr))
566        .await
567        .ok()?
568        .ok()?;
569    drop(stream);
570    Some(start.elapsed().as_millis())
571}
572
573/// Summarizes the selected target list for observability.
574fn summarize_targets(targets: &[LeaderTarget]) -> (usize, usize) {
575    let selected_target_count = targets.len();
576    let selected_identity_count = targets
577        .iter()
578        .filter_map(|target| target.identity)
579        .collect::<HashSet<_>>()
580        .len();
581    (selected_target_count, selected_identity_count)
582}
583
584/// Rotates the target ordering between attempts to spread retries across candidates.
585fn rotate_targets_for_attempt(
586    targets: &mut [LeaderTarget],
587    attempt_idx: usize,
588    policy: RoutingPolicy,
589) {
590    if attempt_idx == 0 || targets.len() <= 1 {
591        return;
592    }
593
594    let normalized = policy.normalized();
595    let stride = normalized.max_parallel_sends.max(1);
596    let rotation = attempt_idx
597        .saturating_mul(stride)
598        .checked_rem(targets.len())
599        .unwrap_or(0);
600    if rotation > 0 {
601        targets.rotate_left(rotation);
602    }
603}
604
605/// Bounds one submit attempt so retry loops cannot hang indefinitely.
606fn direct_attempt_timeout(direct_config: &DirectSubmitConfig) -> Duration {
607    direct_config
608        .global_timeout
609        .saturating_add(direct_config.per_target_timeout)
610        .saturating_add(direct_config.rebroadcast_interval)
611        .max(Duration::from_secs(8))
612}