Skip to main content

sof_tx/submit/
client.rs

1//! Submission client implementation and mode orchestration.
2
3use std::{
4    collections::HashSet,
5    net::SocketAddr,
6    sync::Arc,
7    time::{Duration, Instant, SystemTime},
8};
9
10use solana_signature::Signature;
11use solana_signer::signers::Signers;
12use solana_transaction::versioned::VersionedTransaction;
13use tokio::{
14    net::TcpStream,
15    task::JoinSet,
16    time::{sleep, timeout},
17};
18
19use super::{
20    DirectSubmitConfig, DirectSubmitTransport, RpcSubmitConfig, RpcSubmitTransport, SignedTx,
21    SubmitError, SubmitMode, SubmitReliability, SubmitResult, TxFlowSafetyQuality,
22    TxFlowSafetySource, TxSubmitContext, TxSubmitGuardPolicy, TxSubmitOutcome, TxSubmitOutcomeKind,
23    TxSubmitOutcomeReporter, TxToxicFlowRejectionReason, TxToxicFlowTelemetry,
24    TxToxicFlowTelemetrySnapshot,
25};
26use crate::{
27    builder::TxBuilder,
28    providers::{LeaderProvider, LeaderTarget, RecentBlockhashProvider},
29    routing::{RoutingPolicy, SignatureDeduper, select_targets},
30    submit::types::TxSuppressionCache,
31};
32
33/// Transaction submission client that orchestrates RPC and direct submit modes.
34pub struct TxSubmitClient {
35    /// Blockhash source used by builder submit path.
36    blockhash_provider: Arc<dyn RecentBlockhashProvider>,
37    /// Leader source used by direct/hybrid paths.
38    leader_provider: Arc<dyn LeaderProvider>,
39    /// Optional backup validator targets.
40    backups: Vec<LeaderTarget>,
41    /// Direct routing policy.
42    policy: RoutingPolicy,
43    /// Signature dedupe window.
44    deduper: SignatureDeduper,
45    /// Optional RPC transport.
46    rpc_transport: Option<Arc<dyn RpcSubmitTransport>>,
47    /// Optional direct transport.
48    direct_transport: Option<Arc<dyn DirectSubmitTransport>>,
49    /// RPC tuning.
50    rpc_config: RpcSubmitConfig,
51    /// Direct tuning.
52    direct_config: DirectSubmitConfig,
53    /// Optional toxic-flow guard source.
54    flow_safety_source: Option<Arc<dyn TxFlowSafetySource>>,
55    /// Guard policy applied before submit.
56    guard_policy: TxSubmitGuardPolicy,
57    /// Built-in suppression keys.
58    suppression: TxSuppressionCache,
59    /// Built-in toxic-flow telemetry sink.
60    telemetry: Arc<TxToxicFlowTelemetry>,
61    /// Optional external outcome reporter.
62    outcome_reporter: Option<Arc<dyn TxSubmitOutcomeReporter>>,
63}
64
65impl TxSubmitClient {
66    /// Creates a submission client with no transports preconfigured.
67    #[must_use]
68    pub fn new(
69        blockhash_provider: Arc<dyn RecentBlockhashProvider>,
70        leader_provider: Arc<dyn LeaderProvider>,
71    ) -> Self {
72        Self {
73            blockhash_provider,
74            leader_provider,
75            backups: Vec::new(),
76            policy: RoutingPolicy::default(),
77            deduper: SignatureDeduper::new(Duration::from_secs(10)),
78            rpc_transport: None,
79            direct_transport: None,
80            rpc_config: RpcSubmitConfig::default(),
81            direct_config: DirectSubmitConfig::default(),
82            flow_safety_source: None,
83            guard_policy: TxSubmitGuardPolicy::default(),
84            suppression: TxSuppressionCache::default(),
85            telemetry: TxToxicFlowTelemetry::shared(),
86            outcome_reporter: None,
87        }
88    }
89
90    /// Sets optional backup validators.
91    #[must_use]
92    pub fn with_backups(mut self, backups: Vec<LeaderTarget>) -> Self {
93        self.backups = backups;
94        self
95    }
96
97    /// Sets routing policy.
98    #[must_use]
99    pub fn with_routing_policy(mut self, policy: RoutingPolicy) -> Self {
100        self.policy = policy.normalized();
101        self
102    }
103
104    /// Sets dedupe TTL.
105    #[must_use]
106    pub fn with_dedupe_ttl(mut self, ttl: Duration) -> Self {
107        self.deduper = SignatureDeduper::new(ttl);
108        self
109    }
110
111    /// Sets RPC transport.
112    #[must_use]
113    pub fn with_rpc_transport(mut self, transport: Arc<dyn RpcSubmitTransport>) -> Self {
114        self.rpc_transport = Some(transport);
115        self
116    }
117
118    /// Sets direct transport.
119    #[must_use]
120    pub fn with_direct_transport(mut self, transport: Arc<dyn DirectSubmitTransport>) -> Self {
121        self.direct_transport = Some(transport);
122        self
123    }
124
125    /// Sets RPC submit tuning.
126    #[must_use]
127    pub fn with_rpc_config(mut self, config: RpcSubmitConfig) -> Self {
128        self.rpc_config = config;
129        self
130    }
131
132    /// Sets direct submit tuning.
133    #[must_use]
134    pub const fn with_direct_config(mut self, config: DirectSubmitConfig) -> Self {
135        self.direct_config = config.normalized();
136        self
137    }
138
139    /// Sets direct/hybrid reliability profile.
140    #[must_use]
141    pub const fn with_reliability(mut self, reliability: SubmitReliability) -> Self {
142        self.direct_config = DirectSubmitConfig::from_reliability(reliability);
143        self
144    }
145
146    /// Sets the toxic-flow guard source used before submission.
147    #[must_use]
148    pub fn with_flow_safety_source(mut self, source: Arc<dyn TxFlowSafetySource>) -> Self {
149        self.flow_safety_source = Some(source);
150        self
151    }
152
153    /// Sets the toxic-flow guard policy.
154    #[must_use]
155    pub const fn with_guard_policy(mut self, policy: TxSubmitGuardPolicy) -> Self {
156        self.guard_policy = policy;
157        self
158    }
159
160    /// Sets an optional external outcome reporter.
161    #[must_use]
162    pub fn with_outcome_reporter(mut self, reporter: Arc<dyn TxSubmitOutcomeReporter>) -> Self {
163        self.outcome_reporter = Some(reporter);
164        self
165    }
166
167    /// Returns the current built-in toxic-flow telemetry snapshot.
168    #[must_use]
169    pub fn toxic_flow_telemetry(&self) -> TxToxicFlowTelemetrySnapshot {
170        self.telemetry.snapshot()
171    }
172
173    /// Records one external terminal outcome against the built-in telemetry and optional reporter.
174    pub fn record_external_outcome(&self, outcome: &TxSubmitOutcome) {
175        self.telemetry.record(outcome);
176        if let Some(reporter) = &self.outcome_reporter {
177            reporter.record_outcome(outcome);
178        }
179    }
180
181    /// Builds, signs, and submits a transaction in one API call.
182    ///
183    /// # Errors
184    ///
185    /// Returns [`SubmitError`] when blockhash lookup, signing, dedupe, routing, or submission
186    /// fails.
187    pub async fn submit_builder<T>(
188        &mut self,
189        builder: TxBuilder,
190        signers: &T,
191        mode: SubmitMode,
192    ) -> Result<SubmitResult, SubmitError>
193    where
194        T: Signers + ?Sized,
195    {
196        let blockhash = self
197            .blockhash_provider
198            .latest_blockhash()
199            .ok_or(SubmitError::MissingRecentBlockhash)?;
200        let tx = builder
201            .build_and_sign(blockhash, signers)
202            .map_err(|source| SubmitError::Build { source })?;
203        self.submit_transaction_with_context(tx, mode, TxSubmitContext::default())
204            .await
205    }
206
207    /// Builds, signs, and submits a transaction with explicit toxic-flow context.
208    ///
209    /// # Errors
210    ///
211    /// Returns [`SubmitError`] when blockhash lookup, signing, dedupe, toxic-flow guards,
212    /// routing, or submission fails.
213    pub async fn submit_builder_with_context<T>(
214        &mut self,
215        builder: TxBuilder,
216        signers: &T,
217        mode: SubmitMode,
218        context: TxSubmitContext,
219    ) -> Result<SubmitResult, SubmitError>
220    where
221        T: Signers + ?Sized,
222    {
223        let blockhash = self
224            .blockhash_provider
225            .latest_blockhash()
226            .ok_or(SubmitError::MissingRecentBlockhash)?;
227        let tx = builder
228            .build_and_sign(blockhash, signers)
229            .map_err(|source| SubmitError::Build { source })?;
230        self.submit_transaction_with_context(tx, mode, context)
231            .await
232    }
233
234    /// Submits one signed `VersionedTransaction`.
235    ///
236    /// # Errors
237    ///
238    /// Returns [`SubmitError`] when encoding, dedupe, routing, or submission fails.
239    pub async fn submit_transaction(
240        &mut self,
241        tx: VersionedTransaction,
242        mode: SubmitMode,
243    ) -> Result<SubmitResult, SubmitError> {
244        self.submit_transaction_with_context(tx, mode, TxSubmitContext::default())
245            .await
246    }
247
248    /// Submits one signed `VersionedTransaction` with explicit toxic-flow context.
249    ///
250    /// # Errors
251    ///
252    /// Returns [`SubmitError`] when encoding, dedupe, toxic-flow guards, routing, or submission
253    /// fails.
254    pub async fn submit_transaction_with_context(
255        &mut self,
256        tx: VersionedTransaction,
257        mode: SubmitMode,
258        context: TxSubmitContext,
259    ) -> Result<SubmitResult, SubmitError> {
260        let signature = tx.signatures.first().copied();
261        let tx_bytes =
262            bincode::serialize(&tx).map_err(|source| SubmitError::DecodeSignedBytes { source })?;
263        self.submit_bytes(tx_bytes, signature, mode, context).await
264    }
265
266    /// Submits externally signed transaction bytes.
267    ///
268    /// # Errors
269    ///
270    /// Returns [`SubmitError`] when decoding, dedupe, routing, or submission fails.
271    pub async fn submit_signed(
272        &mut self,
273        signed_tx: SignedTx,
274        mode: SubmitMode,
275    ) -> Result<SubmitResult, SubmitError> {
276        self.submit_signed_with_context(signed_tx, mode, TxSubmitContext::default())
277            .await
278    }
279
280    /// Submits externally signed transaction bytes with explicit toxic-flow context.
281    ///
282    /// # Errors
283    ///
284    /// Returns [`SubmitError`] when decoding, dedupe, toxic-flow guards, routing, or submission
285    /// fails.
286    pub async fn submit_signed_with_context(
287        &mut self,
288        signed_tx: SignedTx,
289        mode: SubmitMode,
290        context: TxSubmitContext,
291    ) -> Result<SubmitResult, SubmitError> {
292        let tx_bytes = match signed_tx {
293            SignedTx::VersionedTransactionBytes(bytes) => bytes,
294            SignedTx::WireTransactionBytes(bytes) => bytes,
295        };
296        let tx: VersionedTransaction = bincode::deserialize(&tx_bytes)
297            .map_err(|source| SubmitError::DecodeSignedBytes { source })?;
298        let signature = tx.signatures.first().copied();
299        self.submit_bytes(tx_bytes, signature, mode, context).await
300    }
301
302    /// Submits raw tx bytes after dedupe check.
303    async fn submit_bytes(
304        &mut self,
305        tx_bytes: Vec<u8>,
306        signature: Option<Signature>,
307        mode: SubmitMode,
308        context: TxSubmitContext,
309    ) -> Result<SubmitResult, SubmitError> {
310        self.enforce_toxic_flow_guards(signature, mode, &context)?;
311        self.enforce_dedupe(signature)?;
312        match mode {
313            SubmitMode::RpcOnly => self.submit_rpc_only(tx_bytes, signature, mode).await,
314            SubmitMode::DirectOnly => self.submit_direct_only(tx_bytes, signature, mode).await,
315            SubmitMode::Hybrid => self.submit_hybrid(tx_bytes, signature, mode).await,
316        }
317    }
318
319    /// Applies signature dedupe policy.
320    fn enforce_dedupe(&mut self, signature: Option<Signature>) -> Result<(), SubmitError> {
321        if let Some(signature) = signature {
322            let now = Instant::now();
323            if !self.deduper.check_and_insert(signature, now) {
324                return Err(SubmitError::DuplicateSignature);
325            }
326        }
327        Ok(())
328    }
329
330    /// Applies toxic-flow guard policy before transport.
331    fn enforce_toxic_flow_guards(
332        &mut self,
333        signature: Option<Signature>,
334        mode: SubmitMode,
335        context: &TxSubmitContext,
336    ) -> Result<(), SubmitError> {
337        let now = SystemTime::now();
338        let opportunity_age_ms = context
339            .opportunity_created_at
340            .and_then(|created_at| now.duration_since(created_at).ok())
341            .map(|duration| duration.as_millis().min(u128::from(u64::MAX)) as u64);
342        if let Some(age_ms) = opportunity_age_ms
343            && let Some(max_age) = self.guard_policy.max_opportunity_age
344        {
345            let max_allowed_ms = max_age.as_millis().min(u128::from(u64::MAX)) as u64;
346            if age_ms > max_allowed_ms {
347                return Err(self.reject_with_outcome(
348                    TxToxicFlowRejectionReason::OpportunityStale {
349                        age_ms,
350                        max_allowed_ms,
351                    },
352                    TxSubmitOutcomeKind::RejectedDueToStaleness,
353                    signature,
354                    mode,
355                    None,
356                    opportunity_age_ms,
357                ));
358            }
359        }
360
361        if self.suppression.is_suppressed(
362            &context.suppression_keys,
363            now,
364            self.guard_policy.suppression_ttl,
365        ) {
366            return Err(self.reject_with_outcome(
367                TxToxicFlowRejectionReason::Suppressed,
368                TxSubmitOutcomeKind::Suppressed,
369                signature,
370                mode,
371                None,
372                opportunity_age_ms,
373            ));
374        }
375
376        if let Some(source) = &self.flow_safety_source {
377            let snapshot = source.toxic_flow_snapshot();
378            if self.guard_policy.reject_on_replay_recovery_pending
379                && snapshot.replay_recovery_pending
380            {
381                return Err(self.reject_with_outcome(
382                    TxToxicFlowRejectionReason::ReplayRecoveryPending,
383                    TxSubmitOutcomeKind::RejectedDueToReplayRecovery,
384                    signature,
385                    mode,
386                    snapshot.current_state_version,
387                    opportunity_age_ms,
388                ));
389            }
390            if self.guard_policy.require_stable_control_plane
391                && !matches!(snapshot.quality, TxFlowSafetyQuality::Stable)
392            {
393                let outcome_kind = match snapshot.quality {
394                    TxFlowSafetyQuality::ReorgRisk | TxFlowSafetyQuality::Provisional => {
395                        TxSubmitOutcomeKind::RejectedDueToReorgRisk
396                    }
397                    TxFlowSafetyQuality::Stale => TxSubmitOutcomeKind::RejectedDueToStaleness,
398                    TxFlowSafetyQuality::Degraded
399                    | TxFlowSafetyQuality::IncompleteControlPlane
400                    | TxFlowSafetyQuality::Stable => TxSubmitOutcomeKind::Suppressed,
401                };
402                return Err(self.reject_with_outcome(
403                    TxToxicFlowRejectionReason::UnsafeControlPlane {
404                        quality: snapshot.quality,
405                    },
406                    outcome_kind,
407                    signature,
408                    mode,
409                    snapshot.current_state_version,
410                    opportunity_age_ms,
411                ));
412            }
413            if let (Some(decision_version), Some(current_version), Some(max_allowed)) = (
414                context.decision_state_version,
415                snapshot.current_state_version,
416                self.guard_policy.max_state_version_drift,
417            ) {
418                let drift = current_version.saturating_sub(decision_version);
419                if drift > max_allowed {
420                    return Err(self.reject_with_outcome(
421                        TxToxicFlowRejectionReason::StateDrift { drift, max_allowed },
422                        TxSubmitOutcomeKind::RejectedDueToStateDrift,
423                        signature,
424                        mode,
425                        Some(current_version),
426                        opportunity_age_ms,
427                    ));
428                }
429            }
430        }
431
432        self.suppression.insert_all(&context.suppression_keys, now);
433        Ok(())
434    }
435
436    /// Builds one rejection error while recording telemetry and reporting.
437    fn reject_with_outcome(
438        &self,
439        reason: TxToxicFlowRejectionReason,
440        outcome_kind: TxSubmitOutcomeKind,
441        signature: Option<Signature>,
442        mode: SubmitMode,
443        state_version: Option<u64>,
444        opportunity_age_ms: Option<u64>,
445    ) -> SubmitError {
446        let outcome = TxSubmitOutcome {
447            kind: outcome_kind,
448            signature,
449            mode,
450            state_version,
451            opportunity_age_ms,
452        };
453        self.record_external_outcome(&outcome);
454        SubmitError::ToxicFlow { reason }
455    }
456
457    /// Submits through RPC path only.
458    async fn submit_rpc_only(
459        &self,
460        tx_bytes: Vec<u8>,
461        signature: Option<Signature>,
462        mode: SubmitMode,
463    ) -> Result<SubmitResult, SubmitError> {
464        let rpc = self
465            .rpc_transport
466            .as_ref()
467            .ok_or(SubmitError::MissingRpcTransport)?;
468        let rpc_signature = rpc
469            .submit_rpc(&tx_bytes, &self.rpc_config)
470            .await
471            .map_err(|source| SubmitError::Rpc { source })?;
472        self.record_external_outcome(&TxSubmitOutcome {
473            kind: TxSubmitOutcomeKind::RpcAccepted,
474            signature,
475            mode,
476            state_version: self
477                .flow_safety_source
478                .as_ref()
479                .and_then(|source| source.toxic_flow_snapshot().current_state_version),
480            opportunity_age_ms: None,
481        });
482        Ok(SubmitResult {
483            signature,
484            mode,
485            direct_target: None,
486            rpc_signature: Some(rpc_signature),
487            used_rpc_fallback: false,
488            selected_target_count: 0,
489            selected_identity_count: 0,
490        })
491    }
492
493    /// Submits through direct path only.
494    async fn submit_direct_only(
495        &self,
496        tx_bytes: Vec<u8>,
497        signature: Option<Signature>,
498        mode: SubmitMode,
499    ) -> Result<SubmitResult, SubmitError> {
500        let direct = self
501            .direct_transport
502            .as_ref()
503            .ok_or(SubmitError::MissingDirectTransport)?;
504        let direct_config = self.direct_config.clone().normalized();
505        let mut last_error = None;
506        let attempt_timeout = direct_attempt_timeout(&direct_config);
507
508        for attempt_idx in 0..direct_config.direct_submit_attempts {
509            let mut targets = self.select_direct_targets(&direct_config).await;
510            rotate_targets_for_attempt(&mut targets, attempt_idx, self.policy);
511            let (selected_target_count, selected_identity_count) = summarize_targets(&targets);
512            if targets.is_empty() {
513                return Err(SubmitError::NoDirectTargets);
514            }
515            match timeout(
516                attempt_timeout,
517                direct.submit_direct(&tx_bytes, &targets, self.policy, &direct_config),
518            )
519            .await
520            {
521                Ok(Ok(target)) => {
522                    self.record_external_outcome(&TxSubmitOutcome {
523                        kind: TxSubmitOutcomeKind::DirectAccepted,
524                        signature,
525                        mode,
526                        state_version: self
527                            .flow_safety_source
528                            .as_ref()
529                            .and_then(|source| source.toxic_flow_snapshot().current_state_version),
530                        opportunity_age_ms: None,
531                    });
532                    self.spawn_agave_rebroadcast(Arc::from(tx_bytes), &direct_config);
533                    return Ok(SubmitResult {
534                        signature,
535                        mode,
536                        direct_target: Some(target),
537                        rpc_signature: None,
538                        used_rpc_fallback: false,
539                        selected_target_count,
540                        selected_identity_count,
541                    });
542                }
543                Ok(Err(source)) => last_error = Some(source),
544                Err(_elapsed) => {
545                    last_error = Some(super::SubmitTransportError::Failure {
546                        message: format!(
547                            "direct submit attempt timed out after {}ms",
548                            attempt_timeout.as_millis()
549                        ),
550                    });
551                }
552            }
553            if attempt_idx < direct_config.direct_submit_attempts.saturating_sub(1) {
554                sleep(direct_config.rebroadcast_interval).await;
555            }
556        }
557
558        Err(SubmitError::Direct {
559            source: last_error.unwrap_or_else(|| super::SubmitTransportError::Failure {
560                message: "direct submit attempts exhausted".to_owned(),
561            }),
562        })
563    }
564
565    /// Submits through hybrid mode (direct first, RPC fallback).
566    async fn submit_hybrid(
567        &self,
568        tx_bytes: Vec<u8>,
569        signature: Option<Signature>,
570        mode: SubmitMode,
571    ) -> Result<SubmitResult, SubmitError> {
572        let direct = self
573            .direct_transport
574            .as_ref()
575            .ok_or(SubmitError::MissingDirectTransport)?;
576        let rpc = self
577            .rpc_transport
578            .as_ref()
579            .ok_or(SubmitError::MissingRpcTransport)?;
580
581        let direct_config = self.direct_config.clone().normalized();
582        let attempt_timeout = direct_attempt_timeout(&direct_config);
583        for attempt_idx in 0..direct_config.hybrid_direct_attempts {
584            let mut targets = self.select_direct_targets(&direct_config).await;
585            rotate_targets_for_attempt(&mut targets, attempt_idx, self.policy);
586            let (selected_target_count, selected_identity_count) = summarize_targets(&targets);
587            if targets.is_empty() {
588                break;
589            }
590            if let Ok(Ok(target)) = timeout(
591                attempt_timeout,
592                direct.submit_direct(&tx_bytes, &targets, self.policy, &direct_config),
593            )
594            .await
595            {
596                let tx_bytes = Arc::<[u8]>::from(tx_bytes);
597                self.spawn_agave_rebroadcast(Arc::clone(&tx_bytes), &direct_config);
598                if direct_config.hybrid_rpc_broadcast
599                    && let Ok(rpc_signature) =
600                        rpc.submit_rpc(tx_bytes.as_ref(), &self.rpc_config).await
601                {
602                    self.record_external_outcome(&TxSubmitOutcome {
603                        kind: TxSubmitOutcomeKind::DirectAccepted,
604                        signature,
605                        mode,
606                        state_version: self
607                            .flow_safety_source
608                            .as_ref()
609                            .and_then(|source| source.toxic_flow_snapshot().current_state_version),
610                        opportunity_age_ms: None,
611                    });
612                    return Ok(SubmitResult {
613                        signature,
614                        mode,
615                        direct_target: Some(target),
616                        rpc_signature: Some(rpc_signature),
617                        used_rpc_fallback: false,
618                        selected_target_count,
619                        selected_identity_count,
620                    });
621                }
622                self.record_external_outcome(&TxSubmitOutcome {
623                    kind: TxSubmitOutcomeKind::DirectAccepted,
624                    signature,
625                    mode,
626                    state_version: self
627                        .flow_safety_source
628                        .as_ref()
629                        .and_then(|source| source.toxic_flow_snapshot().current_state_version),
630                    opportunity_age_ms: None,
631                });
632                return Ok(SubmitResult {
633                    signature,
634                    mode,
635                    direct_target: Some(target),
636                    rpc_signature: None,
637                    used_rpc_fallback: false,
638                    selected_target_count,
639                    selected_identity_count,
640                });
641            }
642            if attempt_idx < direct_config.hybrid_direct_attempts.saturating_sub(1) {
643                sleep(direct_config.rebroadcast_interval).await;
644            }
645        }
646
647        let rpc_signature = rpc
648            .submit_rpc(&tx_bytes, &self.rpc_config)
649            .await
650            .map_err(|source| SubmitError::Rpc { source })?;
651        self.record_external_outcome(&TxSubmitOutcome {
652            kind: TxSubmitOutcomeKind::RpcAccepted,
653            signature,
654            mode,
655            state_version: self
656                .flow_safety_source
657                .as_ref()
658                .and_then(|source| source.toxic_flow_snapshot().current_state_version),
659            opportunity_age_ms: None,
660        });
661        Ok(SubmitResult {
662            signature,
663            mode,
664            direct_target: None,
665            rpc_signature: Some(rpc_signature),
666            used_rpc_fallback: true,
667            selected_target_count: 0,
668            selected_identity_count: 0,
669        })
670    }
671
672    /// Resolves and ranks the direct targets for the next submission attempt.
673    async fn select_direct_targets(&self, direct_config: &DirectSubmitConfig) -> Vec<LeaderTarget> {
674        select_and_rank_targets(
675            self.leader_provider.as_ref(),
676            &self.backups,
677            self.policy,
678            direct_config,
679        )
680        .await
681    }
682
683    /// Starts the post-ack rebroadcast worker when that reliability mode is enabled.
684    fn spawn_agave_rebroadcast(&self, tx_bytes: Arc<[u8]>, direct_config: &DirectSubmitConfig) {
685        if !direct_config.agave_rebroadcast_enabled
686            || direct_config.agave_rebroadcast_window.is_zero()
687        {
688            return;
689        }
690        let Some(direct_transport) = self.direct_transport.clone() else {
691            return;
692        };
693        spawn_agave_rebroadcast_task(
694            tx_bytes,
695            direct_transport,
696            self.leader_provider.clone(),
697            self.backups.clone(),
698            self.policy,
699            direct_config.clone(),
700        );
701    }
702}
703
704#[cfg(not(test))]
705/// Replays successful direct submissions for a bounded Agave-like persistence window.
706fn spawn_agave_rebroadcast_task(
707    tx_bytes: Arc<[u8]>,
708    direct_transport: Arc<dyn DirectSubmitTransport>,
709    leader_provider: Arc<dyn LeaderProvider>,
710    backups: Vec<LeaderTarget>,
711    policy: RoutingPolicy,
712    direct_config: DirectSubmitConfig,
713) {
714    tokio::spawn(async move {
715        let deadline = Instant::now()
716            .checked_add(direct_config.agave_rebroadcast_window)
717            .unwrap_or_else(Instant::now);
718        loop {
719            let now = Instant::now();
720            if now >= deadline {
721                break;
722            }
723
724            let sleep_for = deadline
725                .saturating_duration_since(now)
726                .min(direct_config.agave_rebroadcast_interval);
727            if !sleep_for.is_zero() {
728                sleep(sleep_for).await;
729            }
730
731            if Instant::now() >= deadline {
732                break;
733            }
734
735            let targets = select_and_rank_targets(
736                leader_provider.as_ref(),
737                backups.as_slice(),
738                policy,
739                &direct_config,
740            )
741            .await;
742            if targets.is_empty() {
743                continue;
744            }
745
746            drop(
747                timeout(
748                    direct_attempt_timeout(&direct_config),
749                    direct_transport.submit_direct(
750                        tx_bytes.as_ref(),
751                        &targets,
752                        policy,
753                        &direct_config,
754                    ),
755                )
756                .await,
757            );
758        }
759    });
760}
761
762#[cfg(test)]
763/// Test-only stub that disables background rebroadcasting for deterministic assertions.
764fn spawn_agave_rebroadcast_task(
765    _tx_bytes: Arc<[u8]>,
766    _direct_transport: Arc<dyn DirectSubmitTransport>,
767    _leader_provider: Arc<dyn LeaderProvider>,
768    _backups: Vec<LeaderTarget>,
769    _policy: RoutingPolicy,
770    _direct_config: DirectSubmitConfig,
771) {
772}
773
774/// Selects routing targets and applies optional latency-aware ranking.
775async fn select_and_rank_targets(
776    leader_provider: &(impl LeaderProvider + ?Sized),
777    backups: &[LeaderTarget],
778    policy: RoutingPolicy,
779    direct_config: &DirectSubmitConfig,
780) -> Vec<LeaderTarget> {
781    let targets = select_targets(leader_provider, backups, policy);
782    rank_targets_by_latency(targets, direct_config).await
783}
784
785/// Reorders the probe set by observed TCP connect latency while preserving the tail order.
786async fn rank_targets_by_latency(
787    targets: Vec<LeaderTarget>,
788    direct_config: &DirectSubmitConfig,
789) -> Vec<LeaderTarget> {
790    if targets.len() <= 1 || !direct_config.latency_aware_targeting {
791        return targets;
792    }
793
794    let probe_timeout = direct_config.latency_probe_timeout;
795    let probe_port = direct_config.latency_probe_port;
796    let probe_count = targets
797        .len()
798        .min(direct_config.latency_probe_max_targets.max(1));
799    let mut latencies = vec![None; probe_count];
800    let mut probes = JoinSet::new();
801    for (idx, target) in targets.iter().take(probe_count).cloned().enumerate() {
802        probes.spawn(async move {
803            (
804                idx,
805                probe_target_latency(&target, probe_port, probe_timeout).await,
806            )
807        });
808    }
809    while let Some(result) = probes.join_next().await {
810        if let Ok((idx, latency)) = result
811            && idx < latencies.len()
812            && let Some(slot) = latencies.get_mut(idx)
813        {
814            *slot = latency;
815        }
816    }
817
818    let mut ranked = targets
819        .iter()
820        .take(probe_count)
821        .cloned()
822        .enumerate()
823        .collect::<Vec<_>>();
824    ranked.sort_by_key(|(idx, _target)| {
825        (
826            latencies.get(*idx).copied().flatten().unwrap_or(u128::MAX),
827            *idx,
828        )
829    });
830
831    let mut output = ranked
832        .into_iter()
833        .map(|(_idx, target)| target)
834        .collect::<Vec<_>>();
835    output.extend(targets.iter().skip(probe_count).cloned());
836    output
837}
838
839/// Probes a target's candidate ports and keeps the best observed connect latency.
840async fn probe_target_latency(
841    target: &LeaderTarget,
842    probe_port: Option<u16>,
843    probe_timeout: Duration,
844) -> Option<u128> {
845    let mut ports = vec![target.tpu_addr.port()];
846    if let Some(port) = probe_port
847        && port != target.tpu_addr.port()
848    {
849        ports.push(port);
850    }
851
852    let ip = target.tpu_addr.ip();
853    let mut best = None::<u128>;
854    for port in ports {
855        if let Some(latency) = probe_tcp_latency(ip, port, probe_timeout).await {
856            best = Some(best.map_or(latency, |current| current.min(latency)));
857        }
858    }
859    best
860}
861
862/// Measures one TCP connect attempt and returns elapsed milliseconds on success.
863async fn probe_tcp_latency(
864    ip: std::net::IpAddr,
865    port: u16,
866    timeout_duration: Duration,
867) -> Option<u128> {
868    let start = Instant::now();
869    let addr = SocketAddr::new(ip, port);
870    let stream = timeout(timeout_duration, TcpStream::connect(addr))
871        .await
872        .ok()?
873        .ok()?;
874    drop(stream);
875    Some(start.elapsed().as_millis())
876}
877
878/// Summarizes the selected target list for observability.
879fn summarize_targets(targets: &[LeaderTarget]) -> (usize, usize) {
880    let selected_target_count = targets.len();
881    let selected_identity_count = targets
882        .iter()
883        .filter_map(|target| target.identity)
884        .collect::<HashSet<_>>()
885        .len();
886    (selected_target_count, selected_identity_count)
887}
888
889/// Rotates the target ordering between attempts to spread retries across candidates.
890fn rotate_targets_for_attempt(
891    targets: &mut [LeaderTarget],
892    attempt_idx: usize,
893    policy: RoutingPolicy,
894) {
895    if attempt_idx == 0 || targets.len() <= 1 {
896        return;
897    }
898
899    let normalized = policy.normalized();
900    let stride = normalized.max_parallel_sends.max(1);
901    let rotation = attempt_idx
902        .saturating_mul(stride)
903        .checked_rem(targets.len())
904        .unwrap_or(0);
905    if rotation > 0 {
906        targets.rotate_left(rotation);
907    }
908}
909
910/// Bounds one submit attempt so retry loops cannot hang indefinitely.
911fn direct_attempt_timeout(direct_config: &DirectSubmitConfig) -> Duration {
912    direct_config
913        .global_timeout
914        .saturating_add(direct_config.per_target_timeout)
915        .saturating_add(direct_config.rebroadcast_interval)
916        .max(Duration::from_secs(8))
917}