Skip to main content

sof_tx/submit/
client.rs

1//! Submission client implementation and mode orchestration.
2
3use std::{
4    collections::HashSet,
5    net::SocketAddr,
6    sync::Arc,
7    time::{Duration, Instant, SystemTime},
8};
9
10use solana_signature::Signature;
11use solana_signer::signers::Signers;
12use solana_transaction::versioned::VersionedTransaction;
13use tokio::{
14    net::TcpStream,
15    task::JoinSet,
16    time::{sleep, timeout},
17};
18
19use super::{
20    DirectSubmitConfig, DirectSubmitTransport, RpcSubmitConfig, RpcSubmitTransport, SignedTx,
21    SubmitError, SubmitMode, SubmitReliability, SubmitResult, TxFlowSafetyQuality,
22    TxFlowSafetySource, TxSubmitContext, TxSubmitGuardPolicy, TxSubmitOutcome, TxSubmitOutcomeKind,
23    TxSubmitOutcomeReporter, TxToxicFlowRejectionReason, TxToxicFlowTelemetry,
24    TxToxicFlowTelemetrySnapshot,
25};
26use crate::{
27    builder::TxBuilder,
28    providers::{LeaderProvider, LeaderTarget, RecentBlockhashProvider},
29    routing::{RoutingPolicy, SignatureDeduper, select_targets},
30    submit::types::TxSuppressionCache,
31};
32
33/// Transaction submission client that orchestrates RPC and direct submit modes.
34pub struct TxSubmitClient {
35    /// Blockhash source used by builder submit path.
36    blockhash_provider: Arc<dyn RecentBlockhashProvider>,
37    /// Leader source used by direct/hybrid paths.
38    leader_provider: Arc<dyn LeaderProvider>,
39    /// Optional backup validator targets.
40    backups: Vec<LeaderTarget>,
41    /// Direct routing policy.
42    policy: RoutingPolicy,
43    /// Signature dedupe window.
44    deduper: SignatureDeduper,
45    /// Optional RPC transport.
46    rpc_transport: Option<Arc<dyn RpcSubmitTransport>>,
47    /// Optional direct transport.
48    direct_transport: Option<Arc<dyn DirectSubmitTransport>>,
49    /// RPC tuning.
50    rpc_config: RpcSubmitConfig,
51    /// Direct tuning.
52    direct_config: DirectSubmitConfig,
53    /// Optional toxic-flow guard source.
54    flow_safety_source: Option<Arc<dyn TxFlowSafetySource>>,
55    /// Guard policy applied before submit.
56    guard_policy: TxSubmitGuardPolicy,
57    /// Built-in suppression keys.
58    suppression: TxSuppressionCache,
59    /// Built-in toxic-flow telemetry sink.
60    telemetry: Arc<TxToxicFlowTelemetry>,
61    /// Optional external outcome reporter.
62    outcome_reporter: Option<Arc<dyn TxSubmitOutcomeReporter>>,
63}
64
65impl TxSubmitClient {
66    /// Creates a submission client with no transports preconfigured.
67    #[must_use]
68    pub fn new(
69        blockhash_provider: Arc<dyn RecentBlockhashProvider>,
70        leader_provider: Arc<dyn LeaderProvider>,
71    ) -> Self {
72        Self {
73            blockhash_provider,
74            leader_provider,
75            backups: Vec::new(),
76            policy: RoutingPolicy::default(),
77            deduper: SignatureDeduper::new(Duration::from_secs(10)),
78            rpc_transport: None,
79            direct_transport: None,
80            rpc_config: RpcSubmitConfig::default(),
81            direct_config: DirectSubmitConfig::default(),
82            flow_safety_source: None,
83            guard_policy: TxSubmitGuardPolicy::default(),
84            suppression: TxSuppressionCache::default(),
85            telemetry: TxToxicFlowTelemetry::shared(),
86            outcome_reporter: None,
87        }
88    }
89
90    /// Sets optional backup validators.
91    #[must_use]
92    pub fn with_backups(mut self, backups: Vec<LeaderTarget>) -> Self {
93        self.backups = backups;
94        self
95    }
96
97    /// Sets routing policy.
98    #[must_use]
99    pub fn with_routing_policy(mut self, policy: RoutingPolicy) -> Self {
100        self.policy = policy.normalized();
101        self
102    }
103
104    /// Sets dedupe TTL.
105    #[must_use]
106    pub fn with_dedupe_ttl(mut self, ttl: Duration) -> Self {
107        self.deduper = SignatureDeduper::new(ttl);
108        self
109    }
110
111    /// Sets RPC transport.
112    #[must_use]
113    pub fn with_rpc_transport(mut self, transport: Arc<dyn RpcSubmitTransport>) -> Self {
114        self.rpc_transport = Some(transport);
115        self
116    }
117
118    /// Sets direct transport.
119    #[must_use]
120    pub fn with_direct_transport(mut self, transport: Arc<dyn DirectSubmitTransport>) -> Self {
121        self.direct_transport = Some(transport);
122        self
123    }
124
125    /// Sets RPC submit tuning.
126    #[must_use]
127    pub fn with_rpc_config(mut self, config: RpcSubmitConfig) -> Self {
128        self.rpc_config = config;
129        self
130    }
131
132    /// Sets direct submit tuning.
133    #[must_use]
134    pub const fn with_direct_config(mut self, config: DirectSubmitConfig) -> Self {
135        self.direct_config = config.normalized();
136        self
137    }
138
139    /// Sets direct/hybrid reliability profile.
140    #[must_use]
141    pub const fn with_reliability(mut self, reliability: SubmitReliability) -> Self {
142        self.direct_config = DirectSubmitConfig::from_reliability(reliability);
143        self
144    }
145
146    /// Sets the toxic-flow guard source used before submission.
147    #[must_use]
148    pub fn with_flow_safety_source(mut self, source: Arc<dyn TxFlowSafetySource>) -> Self {
149        self.flow_safety_source = Some(source);
150        self
151    }
152
153    /// Sets the toxic-flow guard policy.
154    #[must_use]
155    pub const fn with_guard_policy(mut self, policy: TxSubmitGuardPolicy) -> Self {
156        self.guard_policy = policy;
157        self
158    }
159
160    /// Sets an optional external outcome reporter.
161    #[must_use]
162    pub fn with_outcome_reporter(mut self, reporter: Arc<dyn TxSubmitOutcomeReporter>) -> Self {
163        self.outcome_reporter = Some(reporter);
164        self
165    }
166
167    /// Returns the current built-in toxic-flow telemetry snapshot.
168    #[must_use]
169    pub fn toxic_flow_telemetry(&self) -> TxToxicFlowTelemetrySnapshot {
170        self.telemetry.snapshot()
171    }
172
173    /// Records one external terminal outcome against the built-in telemetry and optional reporter.
174    pub fn record_external_outcome(&self, outcome: &TxSubmitOutcome) {
175        self.telemetry.record(outcome);
176        if let Some(reporter) = &self.outcome_reporter {
177            reporter.record_outcome(outcome);
178        }
179    }
180
181    /// Builds, signs, and submits a transaction in one API call.
182    ///
183    /// # Errors
184    ///
185    /// Returns [`SubmitError`] when blockhash lookup, signing, dedupe, routing, or submission
186    /// fails.
187    pub async fn submit_builder<T>(
188        &mut self,
189        builder: TxBuilder,
190        signers: &T,
191        mode: SubmitMode,
192    ) -> Result<SubmitResult, SubmitError>
193    where
194        T: Signers + ?Sized,
195    {
196        let blockhash = self
197            .blockhash_provider
198            .latest_blockhash()
199            .ok_or(SubmitError::MissingRecentBlockhash)?;
200        let tx = builder
201            .build_and_sign(blockhash, signers)
202            .map_err(|source| SubmitError::Build { source })?;
203        self.submit_transaction_with_context(tx, mode, TxSubmitContext::default())
204            .await
205    }
206
207    /// Builds, signs, and submits a transaction with explicit toxic-flow context.
208    ///
209    /// # Errors
210    ///
211    /// Returns [`SubmitError`] when blockhash lookup, signing, dedupe, toxic-flow guards,
212    /// routing, or submission fails.
213    pub async fn submit_builder_with_context<T>(
214        &mut self,
215        builder: TxBuilder,
216        signers: &T,
217        mode: SubmitMode,
218        context: TxSubmitContext,
219    ) -> Result<SubmitResult, SubmitError>
220    where
221        T: Signers + ?Sized,
222    {
223        let blockhash = self
224            .blockhash_provider
225            .latest_blockhash()
226            .ok_or(SubmitError::MissingRecentBlockhash)?;
227        let tx = builder
228            .build_and_sign(blockhash, signers)
229            .map_err(|source| SubmitError::Build { source })?;
230        self.submit_transaction_with_context(tx, mode, context)
231            .await
232    }
233
234    /// Submits one signed `VersionedTransaction`.
235    ///
236    /// # Errors
237    ///
238    /// Returns [`SubmitError`] when encoding, dedupe, routing, or submission fails.
239    pub async fn submit_transaction(
240        &mut self,
241        tx: VersionedTransaction,
242        mode: SubmitMode,
243    ) -> Result<SubmitResult, SubmitError> {
244        self.submit_transaction_with_context(tx, mode, TxSubmitContext::default())
245            .await
246    }
247
248    /// Submits one signed `VersionedTransaction` with explicit toxic-flow context.
249    ///
250    /// # Errors
251    ///
252    /// Returns [`SubmitError`] when encoding, dedupe, toxic-flow guards, routing, or submission
253    /// fails.
254    pub async fn submit_transaction_with_context(
255        &mut self,
256        tx: VersionedTransaction,
257        mode: SubmitMode,
258        context: TxSubmitContext,
259    ) -> Result<SubmitResult, SubmitError> {
260        let signature = tx.signatures.first().copied();
261        let tx_bytes =
262            bincode::serialize(&tx).map_err(|source| SubmitError::DecodeSignedBytes { source })?;
263        self.submit_bytes(tx_bytes, signature, mode, context).await
264    }
265
266    /// Submits externally signed transaction bytes.
267    ///
268    /// # Errors
269    ///
270    /// Returns [`SubmitError`] when decoding, dedupe, routing, or submission fails.
271    pub async fn submit_signed(
272        &mut self,
273        signed_tx: SignedTx,
274        mode: SubmitMode,
275    ) -> Result<SubmitResult, SubmitError> {
276        self.submit_signed_with_context(signed_tx, mode, TxSubmitContext::default())
277            .await
278    }
279
280    /// Submits externally signed transaction bytes with explicit toxic-flow context.
281    ///
282    /// # Errors
283    ///
284    /// Returns [`SubmitError`] when decoding, dedupe, toxic-flow guards, routing, or submission
285    /// fails.
286    pub async fn submit_signed_with_context(
287        &mut self,
288        signed_tx: SignedTx,
289        mode: SubmitMode,
290        context: TxSubmitContext,
291    ) -> Result<SubmitResult, SubmitError> {
292        let tx_bytes = match signed_tx {
293            SignedTx::VersionedTransactionBytes(bytes) => bytes,
294            SignedTx::WireTransactionBytes(bytes) => bytes,
295        };
296        let tx: VersionedTransaction = bincode::deserialize(&tx_bytes)
297            .map_err(|source| SubmitError::DecodeSignedBytes { source })?;
298        let signature = tx.signatures.first().copied();
299        self.submit_bytes(tx_bytes, signature, mode, context).await
300    }
301
302    /// Submits raw tx bytes after dedupe check.
303    async fn submit_bytes(
304        &mut self,
305        tx_bytes: Vec<u8>,
306        signature: Option<Signature>,
307        mode: SubmitMode,
308        context: TxSubmitContext,
309    ) -> Result<SubmitResult, SubmitError> {
310        self.enforce_toxic_flow_guards(signature, mode, &context)?;
311        self.enforce_dedupe(signature)?;
312        match mode {
313            SubmitMode::RpcOnly => self.submit_rpc_only(tx_bytes, signature, mode).await,
314            SubmitMode::DirectOnly => self.submit_direct_only(tx_bytes, signature, mode).await,
315            SubmitMode::Hybrid => self.submit_hybrid(tx_bytes, signature, mode).await,
316        }
317    }
318
319    /// Applies signature dedupe policy.
320    fn enforce_dedupe(&mut self, signature: Option<Signature>) -> Result<(), SubmitError> {
321        if let Some(signature) = signature {
322            let now = Instant::now();
323            if !self.deduper.check_and_insert(signature, now) {
324                return Err(SubmitError::DuplicateSignature);
325            }
326        }
327        Ok(())
328    }
329
330    /// Applies toxic-flow guard policy before transport.
331    fn enforce_toxic_flow_guards(
332        &mut self,
333        signature: Option<Signature>,
334        mode: SubmitMode,
335        context: &TxSubmitContext,
336    ) -> Result<(), SubmitError> {
337        let now = SystemTime::now();
338        let opportunity_age_ms = context
339            .opportunity_created_at
340            .and_then(|created_at| now.duration_since(created_at).ok())
341            .map(|duration| duration.as_millis().min(u128::from(u64::MAX)) as u64);
342        if let Some(age_ms) = opportunity_age_ms
343            && let Some(max_age) = self.guard_policy.max_opportunity_age
344        {
345            let max_allowed_ms = max_age.as_millis().min(u128::from(u64::MAX)) as u64;
346            if age_ms > max_allowed_ms {
347                return Err(self.reject_with_outcome(
348                    TxToxicFlowRejectionReason::OpportunityStale {
349                        age_ms,
350                        max_allowed_ms,
351                    },
352                    TxSubmitOutcomeKind::RejectedDueToStaleness,
353                    signature,
354                    mode,
355                    None,
356                    opportunity_age_ms,
357                ));
358            }
359        }
360
361        if self.suppression.is_suppressed(
362            &context.suppression_keys,
363            now,
364            self.guard_policy.suppression_ttl,
365        ) {
366            return Err(self.reject_with_outcome(
367                TxToxicFlowRejectionReason::Suppressed,
368                TxSubmitOutcomeKind::Suppressed,
369                signature,
370                mode,
371                None,
372                opportunity_age_ms,
373            ));
374        }
375
376        if let Some(source) = &self.flow_safety_source {
377            let snapshot = source.toxic_flow_snapshot();
378            if self.guard_policy.reject_on_replay_recovery_pending
379                && snapshot.replay_recovery_pending
380            {
381                return Err(self.reject_with_outcome(
382                    TxToxicFlowRejectionReason::ReplayRecoveryPending,
383                    TxSubmitOutcomeKind::RejectedDueToReplayRecovery,
384                    signature,
385                    mode,
386                    snapshot.current_state_version,
387                    opportunity_age_ms,
388                ));
389            }
390            if self.guard_policy.require_stable_control_plane
391                && !matches!(snapshot.quality, TxFlowSafetyQuality::Stable)
392            {
393                let outcome_kind = match snapshot.quality {
394                    TxFlowSafetyQuality::ReorgRisk | TxFlowSafetyQuality::Provisional => {
395                        TxSubmitOutcomeKind::RejectedDueToReorgRisk
396                    }
397                    TxFlowSafetyQuality::Stale => TxSubmitOutcomeKind::RejectedDueToStaleness,
398                    TxFlowSafetyQuality::Degraded
399                    | TxFlowSafetyQuality::IncompleteControlPlane
400                    | TxFlowSafetyQuality::Stable => TxSubmitOutcomeKind::Suppressed,
401                };
402                return Err(self.reject_with_outcome(
403                    TxToxicFlowRejectionReason::UnsafeControlPlane {
404                        quality: snapshot.quality,
405                    },
406                    outcome_kind,
407                    signature,
408                    mode,
409                    snapshot.current_state_version,
410                    opportunity_age_ms,
411                ));
412            }
413            if let (Some(decision_version), Some(current_version), Some(max_allowed)) = (
414                context.decision_state_version,
415                snapshot.current_state_version,
416                self.guard_policy.max_state_version_drift,
417            ) {
418                let drift = current_version.saturating_sub(decision_version);
419                if drift > max_allowed {
420                    return Err(self.reject_with_outcome(
421                        TxToxicFlowRejectionReason::StateDrift { drift, max_allowed },
422                        TxSubmitOutcomeKind::RejectedDueToStateDrift,
423                        signature,
424                        mode,
425                        Some(current_version),
426                        opportunity_age_ms,
427                    ));
428                }
429            }
430        }
431
432        self.suppression.insert_all(&context.suppression_keys, now);
433        Ok(())
434    }
435
436    /// Builds one rejection error while recording telemetry and reporting.
437    fn reject_with_outcome(
438        &self,
439        reason: TxToxicFlowRejectionReason,
440        outcome_kind: TxSubmitOutcomeKind,
441        signature: Option<Signature>,
442        mode: SubmitMode,
443        state_version: Option<u64>,
444        opportunity_age_ms: Option<u64>,
445    ) -> SubmitError {
446        let outcome = TxSubmitOutcome {
447            kind: outcome_kind,
448            signature,
449            mode,
450            state_version,
451            opportunity_age_ms,
452        };
453        self.record_external_outcome(&outcome);
454        SubmitError::ToxicFlow { reason }
455    }
456
457    /// Submits through RPC path only.
458    async fn submit_rpc_only(
459        &self,
460        tx_bytes: Vec<u8>,
461        signature: Option<Signature>,
462        mode: SubmitMode,
463    ) -> Result<SubmitResult, SubmitError> {
464        let rpc = self
465            .rpc_transport
466            .as_ref()
467            .ok_or(SubmitError::MissingRpcTransport)?;
468        let rpc_signature = rpc
469            .submit_rpc(&tx_bytes, &self.rpc_config)
470            .await
471            .map_err(|source| SubmitError::Rpc { source })?;
472        self.record_external_outcome(&TxSubmitOutcome {
473            kind: TxSubmitOutcomeKind::RpcAccepted,
474            signature,
475            mode,
476            state_version: self
477                .flow_safety_source
478                .as_ref()
479                .and_then(|source| source.toxic_flow_snapshot().current_state_version),
480            opportunity_age_ms: None,
481        });
482        Ok(SubmitResult {
483            signature,
484            mode,
485            direct_target: None,
486            rpc_signature: Some(rpc_signature),
487            used_rpc_fallback: false,
488            selected_target_count: 0,
489            selected_identity_count: 0,
490        })
491    }
492
493    /// Submits through direct path only.
494    async fn submit_direct_only(
495        &self,
496        tx_bytes: Vec<u8>,
497        signature: Option<Signature>,
498        mode: SubmitMode,
499    ) -> Result<SubmitResult, SubmitError> {
500        let direct = self
501            .direct_transport
502            .as_ref()
503            .ok_or(SubmitError::MissingDirectTransport)?;
504        let direct_config = self.direct_config.clone().normalized();
505        let mut last_error = None;
506        let attempt_timeout = direct_attempt_timeout(&direct_config);
507
508        for attempt_idx in 0..direct_config.direct_submit_attempts {
509            let mut targets = self.select_direct_targets(&direct_config).await;
510            rotate_targets_for_attempt(&mut targets, attempt_idx, self.policy);
511            let (selected_target_count, selected_identity_count) = summarize_targets(&targets);
512            if targets.is_empty() {
513                return Err(SubmitError::NoDirectTargets);
514            }
515            match timeout(
516                attempt_timeout,
517                direct.submit_direct(&tx_bytes, &targets, self.policy, &direct_config),
518            )
519            .await
520            {
521                Ok(Ok(target)) => {
522                    self.record_external_outcome(&TxSubmitOutcome {
523                        kind: TxSubmitOutcomeKind::DirectAccepted,
524                        signature,
525                        mode,
526                        state_version: self
527                            .flow_safety_source
528                            .as_ref()
529                            .and_then(|source| source.toxic_flow_snapshot().current_state_version),
530                        opportunity_age_ms: None,
531                    });
532                    self.spawn_agave_rebroadcast(tx_bytes.clone(), &direct_config);
533                    return Ok(SubmitResult {
534                        signature,
535                        mode,
536                        direct_target: Some(target),
537                        rpc_signature: None,
538                        used_rpc_fallback: false,
539                        selected_target_count,
540                        selected_identity_count,
541                    });
542                }
543                Ok(Err(source)) => last_error = Some(source),
544                Err(_elapsed) => {
545                    last_error = Some(super::SubmitTransportError::Failure {
546                        message: format!(
547                            "direct submit attempt timed out after {}ms",
548                            attempt_timeout.as_millis()
549                        ),
550                    });
551                }
552            }
553            if attempt_idx < direct_config.direct_submit_attempts.saturating_sub(1) {
554                sleep(direct_config.rebroadcast_interval).await;
555            }
556        }
557
558        Err(SubmitError::Direct {
559            source: last_error.unwrap_or_else(|| super::SubmitTransportError::Failure {
560                message: "direct submit attempts exhausted".to_owned(),
561            }),
562        })
563    }
564
565    /// Submits through hybrid mode (direct first, RPC fallback).
566    async fn submit_hybrid(
567        &self,
568        tx_bytes: Vec<u8>,
569        signature: Option<Signature>,
570        mode: SubmitMode,
571    ) -> Result<SubmitResult, SubmitError> {
572        let direct = self
573            .direct_transport
574            .as_ref()
575            .ok_or(SubmitError::MissingDirectTransport)?;
576        let rpc = self
577            .rpc_transport
578            .as_ref()
579            .ok_or(SubmitError::MissingRpcTransport)?;
580
581        let direct_config = self.direct_config.clone().normalized();
582        let attempt_timeout = direct_attempt_timeout(&direct_config);
583        for attempt_idx in 0..direct_config.hybrid_direct_attempts {
584            let mut targets = self.select_direct_targets(&direct_config).await;
585            rotate_targets_for_attempt(&mut targets, attempt_idx, self.policy);
586            let (selected_target_count, selected_identity_count) = summarize_targets(&targets);
587            if targets.is_empty() {
588                break;
589            }
590            if let Ok(Ok(target)) = timeout(
591                attempt_timeout,
592                direct.submit_direct(&tx_bytes, &targets, self.policy, &direct_config),
593            )
594            .await
595            {
596                self.spawn_agave_rebroadcast(tx_bytes.clone(), &direct_config);
597                if direct_config.hybrid_rpc_broadcast
598                    && let Ok(rpc_signature) = rpc.submit_rpc(&tx_bytes, &self.rpc_config).await
599                {
600                    self.record_external_outcome(&TxSubmitOutcome {
601                        kind: TxSubmitOutcomeKind::DirectAccepted,
602                        signature,
603                        mode,
604                        state_version: self
605                            .flow_safety_source
606                            .as_ref()
607                            .and_then(|source| source.toxic_flow_snapshot().current_state_version),
608                        opportunity_age_ms: None,
609                    });
610                    return Ok(SubmitResult {
611                        signature,
612                        mode,
613                        direct_target: Some(target),
614                        rpc_signature: Some(rpc_signature),
615                        used_rpc_fallback: false,
616                        selected_target_count,
617                        selected_identity_count,
618                    });
619                }
620                self.record_external_outcome(&TxSubmitOutcome {
621                    kind: TxSubmitOutcomeKind::DirectAccepted,
622                    signature,
623                    mode,
624                    state_version: self
625                        .flow_safety_source
626                        .as_ref()
627                        .and_then(|source| source.toxic_flow_snapshot().current_state_version),
628                    opportunity_age_ms: None,
629                });
630                return Ok(SubmitResult {
631                    signature,
632                    mode,
633                    direct_target: Some(target),
634                    rpc_signature: None,
635                    used_rpc_fallback: false,
636                    selected_target_count,
637                    selected_identity_count,
638                });
639            }
640            if attempt_idx < direct_config.hybrid_direct_attempts.saturating_sub(1) {
641                sleep(direct_config.rebroadcast_interval).await;
642            }
643        }
644
645        let rpc_signature = rpc
646            .submit_rpc(&tx_bytes, &self.rpc_config)
647            .await
648            .map_err(|source| SubmitError::Rpc { source })?;
649        self.record_external_outcome(&TxSubmitOutcome {
650            kind: TxSubmitOutcomeKind::RpcAccepted,
651            signature,
652            mode,
653            state_version: self
654                .flow_safety_source
655                .as_ref()
656                .and_then(|source| source.toxic_flow_snapshot().current_state_version),
657            opportunity_age_ms: None,
658        });
659        Ok(SubmitResult {
660            signature,
661            mode,
662            direct_target: None,
663            rpc_signature: Some(rpc_signature),
664            used_rpc_fallback: true,
665            selected_target_count: 0,
666            selected_identity_count: 0,
667        })
668    }
669
670    /// Resolves and ranks the direct targets for the next submission attempt.
671    async fn select_direct_targets(&self, direct_config: &DirectSubmitConfig) -> Vec<LeaderTarget> {
672        select_and_rank_targets(
673            self.leader_provider.as_ref(),
674            &self.backups,
675            self.policy,
676            direct_config,
677        )
678        .await
679    }
680
681    /// Starts the post-ack rebroadcast worker when that reliability mode is enabled.
682    fn spawn_agave_rebroadcast(&self, tx_bytes: Vec<u8>, direct_config: &DirectSubmitConfig) {
683        if !direct_config.agave_rebroadcast_enabled
684            || direct_config.agave_rebroadcast_window.is_zero()
685        {
686            return;
687        }
688        let Some(direct_transport) = self.direct_transport.clone() else {
689            return;
690        };
691        spawn_agave_rebroadcast_task(
692            tx_bytes,
693            direct_transport,
694            self.leader_provider.clone(),
695            self.backups.clone(),
696            self.policy,
697            direct_config.clone(),
698        );
699    }
700}
701
702#[cfg(not(test))]
703/// Replays successful direct submissions for a bounded Agave-like persistence window.
704fn spawn_agave_rebroadcast_task(
705    tx_bytes: Vec<u8>,
706    direct_transport: Arc<dyn DirectSubmitTransport>,
707    leader_provider: Arc<dyn LeaderProvider>,
708    backups: Vec<LeaderTarget>,
709    policy: RoutingPolicy,
710    direct_config: DirectSubmitConfig,
711) {
712    tokio::spawn(async move {
713        let deadline = Instant::now()
714            .checked_add(direct_config.agave_rebroadcast_window)
715            .unwrap_or_else(Instant::now);
716        loop {
717            let now = Instant::now();
718            if now >= deadline {
719                break;
720            }
721
722            let sleep_for = deadline
723                .saturating_duration_since(now)
724                .min(direct_config.agave_rebroadcast_interval);
725            if !sleep_for.is_zero() {
726                sleep(sleep_for).await;
727            }
728
729            if Instant::now() >= deadline {
730                break;
731            }
732
733            let targets = select_and_rank_targets(
734                leader_provider.as_ref(),
735                backups.as_slice(),
736                policy,
737                &direct_config,
738            )
739            .await;
740            if targets.is_empty() {
741                continue;
742            }
743
744            drop(
745                timeout(
746                    direct_attempt_timeout(&direct_config),
747                    direct_transport.submit_direct(&tx_bytes, &targets, policy, &direct_config),
748                )
749                .await,
750            );
751        }
752    });
753}
754
755#[cfg(test)]
756/// Test-only stub that disables background rebroadcasting for deterministic assertions.
757fn spawn_agave_rebroadcast_task(
758    _tx_bytes: Vec<u8>,
759    _direct_transport: Arc<dyn DirectSubmitTransport>,
760    _leader_provider: Arc<dyn LeaderProvider>,
761    _backups: Vec<LeaderTarget>,
762    _policy: RoutingPolicy,
763    _direct_config: DirectSubmitConfig,
764) {
765}
766
767/// Selects routing targets and applies optional latency-aware ranking.
768async fn select_and_rank_targets(
769    leader_provider: &dyn LeaderProvider,
770    backups: &[LeaderTarget],
771    policy: RoutingPolicy,
772    direct_config: &DirectSubmitConfig,
773) -> Vec<LeaderTarget> {
774    let targets = select_targets(leader_provider, backups, policy);
775    rank_targets_by_latency(targets, direct_config).await
776}
777
778/// Reorders the probe set by observed TCP connect latency while preserving the tail order.
779async fn rank_targets_by_latency(
780    targets: Vec<LeaderTarget>,
781    direct_config: &DirectSubmitConfig,
782) -> Vec<LeaderTarget> {
783    if targets.len() <= 1 || !direct_config.latency_aware_targeting {
784        return targets;
785    }
786
787    let probe_count = targets
788        .len()
789        .min(direct_config.latency_probe_max_targets.max(1));
790    let mut latencies = vec![None; probe_count];
791    let mut probes = JoinSet::new();
792    for (idx, target) in targets.iter().take(probe_count).cloned().enumerate() {
793        let cfg = direct_config.clone();
794        probes.spawn(async move { (idx, probe_target_latency(&target, &cfg).await) });
795    }
796    while let Some(result) = probes.join_next().await {
797        if let Ok((idx, latency)) = result
798            && idx < latencies.len()
799            && let Some(slot) = latencies.get_mut(idx)
800        {
801            *slot = latency;
802        }
803    }
804
805    let mut ranked = targets
806        .iter()
807        .take(probe_count)
808        .cloned()
809        .enumerate()
810        .collect::<Vec<_>>();
811    ranked.sort_by_key(|(idx, _target)| {
812        (
813            latencies.get(*idx).copied().flatten().unwrap_or(u128::MAX),
814            *idx,
815        )
816    });
817
818    let mut output = ranked
819        .into_iter()
820        .map(|(_idx, target)| target)
821        .collect::<Vec<_>>();
822    output.extend(targets.iter().skip(probe_count).cloned());
823    output
824}
825
826/// Probes a target's candidate ports and keeps the best observed connect latency.
827async fn probe_target_latency(
828    target: &LeaderTarget,
829    direct_config: &DirectSubmitConfig,
830) -> Option<u128> {
831    let mut ports = vec![target.tpu_addr.port()];
832    if let Some(port) = direct_config.latency_probe_port
833        && port != target.tpu_addr.port()
834    {
835        ports.push(port);
836    }
837
838    let ip = target.tpu_addr.ip();
839    let mut best = None::<u128>;
840    for port in ports {
841        if let Some(latency) =
842            probe_tcp_latency(ip, port, direct_config.latency_probe_timeout).await
843        {
844            best = Some(best.map_or(latency, |current| current.min(latency)));
845        }
846    }
847    best
848}
849
850/// Measures one TCP connect attempt and returns elapsed milliseconds on success.
851async fn probe_tcp_latency(
852    ip: std::net::IpAddr,
853    port: u16,
854    timeout_duration: Duration,
855) -> Option<u128> {
856    let start = Instant::now();
857    let addr = SocketAddr::new(ip, port);
858    let stream = timeout(timeout_duration, TcpStream::connect(addr))
859        .await
860        .ok()?
861        .ok()?;
862    drop(stream);
863    Some(start.elapsed().as_millis())
864}
865
866/// Summarizes the selected target list for observability.
867fn summarize_targets(targets: &[LeaderTarget]) -> (usize, usize) {
868    let selected_target_count = targets.len();
869    let selected_identity_count = targets
870        .iter()
871        .filter_map(|target| target.identity)
872        .collect::<HashSet<_>>()
873        .len();
874    (selected_target_count, selected_identity_count)
875}
876
877/// Rotates the target ordering between attempts to spread retries across candidates.
878fn rotate_targets_for_attempt(
879    targets: &mut [LeaderTarget],
880    attempt_idx: usize,
881    policy: RoutingPolicy,
882) {
883    if attempt_idx == 0 || targets.len() <= 1 {
884        return;
885    }
886
887    let normalized = policy.normalized();
888    let stride = normalized.max_parallel_sends.max(1);
889    let rotation = attempt_idx
890        .saturating_mul(stride)
891        .checked_rem(targets.len())
892        .unwrap_or(0);
893    if rotation > 0 {
894        targets.rotate_left(rotation);
895    }
896}
897
898/// Bounds one submit attempt so retry loops cannot hang indefinitely.
899fn direct_attempt_timeout(direct_config: &DirectSubmitConfig) -> Duration {
900    direct_config
901        .global_timeout
902        .saturating_add(direct_config.per_target_timeout)
903        .saturating_add(direct_config.rebroadcast_interval)
904        .max(Duration::from_secs(8))
905}