Skip to main content

sof_tx/submit/
client.rs

1//! Submission client implementation and mode orchestration.
2
3use std::{
4    collections::HashSet,
5    net::SocketAddr,
6    sync::Arc,
7    time::{Duration, Instant, SystemTime},
8};
9
10use solana_signature::Signature;
11use solana_signer::signers::Signers;
12use solana_transaction::versioned::VersionedTransaction;
13use tokio::{
14    net::TcpStream,
15    task::JoinSet,
16    time::{sleep, timeout},
17};
18
19use super::{
20    DirectSubmitConfig, DirectSubmitTransport, JitoSubmitConfig, JitoSubmitTransport,
21    RpcSubmitConfig, RpcSubmitTransport, SignedTx, SubmitError, SubmitMode, SubmitReliability,
22    SubmitResult, TxFlowSafetyQuality, TxFlowSafetySource, TxSubmitContext, TxSubmitGuardPolicy,
23    TxSubmitOutcome, TxSubmitOutcomeKind, TxSubmitOutcomeReporter, TxToxicFlowRejectionReason,
24    TxToxicFlowTelemetry, TxToxicFlowTelemetrySnapshot,
25};
26use crate::{
27    builder::TxBuilder,
28    providers::{LeaderProvider, LeaderTarget, RecentBlockhashProvider},
29    routing::{RoutingPolicy, SignatureDeduper, select_targets},
30    submit::types::TxSuppressionCache,
31};
32
33/// Transaction submission client that orchestrates RPC and direct submit modes.
34pub struct TxSubmitClient {
35    /// Blockhash source used by builder submit path.
36    blockhash_provider: Arc<dyn RecentBlockhashProvider>,
37    /// Leader source used by direct/hybrid paths.
38    leader_provider: Arc<dyn LeaderProvider>,
39    /// Optional backup validator targets.
40    backups: Vec<LeaderTarget>,
41    /// Direct routing policy.
42    policy: RoutingPolicy,
43    /// Signature dedupe window.
44    deduper: SignatureDeduper,
45    /// Optional RPC transport.
46    rpc_transport: Option<Arc<dyn RpcSubmitTransport>>,
47    /// Optional direct transport.
48    direct_transport: Option<Arc<dyn DirectSubmitTransport>>,
49    /// Optional Jito transport.
50    jito_transport: Option<Arc<dyn JitoSubmitTransport>>,
51    /// RPC tuning.
52    rpc_config: RpcSubmitConfig,
53    /// Jito tuning.
54    jito_config: JitoSubmitConfig,
55    /// Direct tuning.
56    direct_config: DirectSubmitConfig,
57    /// Optional toxic-flow guard source.
58    flow_safety_source: Option<Arc<dyn TxFlowSafetySource>>,
59    /// Guard policy applied before submit.
60    guard_policy: TxSubmitGuardPolicy,
61    /// Built-in suppression keys.
62    suppression: TxSuppressionCache,
63    /// Built-in toxic-flow telemetry sink.
64    telemetry: Arc<TxToxicFlowTelemetry>,
65    /// Optional external outcome reporter.
66    outcome_reporter: Option<Arc<dyn TxSubmitOutcomeReporter>>,
67}
68
69impl TxSubmitClient {
70    /// Creates a submission client with no transports preconfigured.
71    #[must_use]
72    pub fn new(
73        blockhash_provider: Arc<dyn RecentBlockhashProvider>,
74        leader_provider: Arc<dyn LeaderProvider>,
75    ) -> Self {
76        Self {
77            blockhash_provider,
78            leader_provider,
79            backups: Vec::new(),
80            policy: RoutingPolicy::default(),
81            deduper: SignatureDeduper::new(Duration::from_secs(10)),
82            rpc_transport: None,
83            direct_transport: None,
84            jito_transport: None,
85            rpc_config: RpcSubmitConfig::default(),
86            jito_config: JitoSubmitConfig::default(),
87            direct_config: DirectSubmitConfig::default(),
88            flow_safety_source: None,
89            guard_policy: TxSubmitGuardPolicy::default(),
90            suppression: TxSuppressionCache::default(),
91            telemetry: TxToxicFlowTelemetry::shared(),
92            outcome_reporter: None,
93        }
94    }
95
96    /// Sets optional backup validators.
97    #[must_use]
98    pub fn with_backups(mut self, backups: Vec<LeaderTarget>) -> Self {
99        self.backups = backups;
100        self
101    }
102
103    /// Sets routing policy.
104    #[must_use]
105    pub fn with_routing_policy(mut self, policy: RoutingPolicy) -> Self {
106        self.policy = policy.normalized();
107        self
108    }
109
110    /// Sets dedupe TTL.
111    #[must_use]
112    pub fn with_dedupe_ttl(mut self, ttl: Duration) -> Self {
113        self.deduper = SignatureDeduper::new(ttl);
114        self
115    }
116
117    /// Sets RPC transport.
118    #[must_use]
119    pub fn with_rpc_transport(mut self, transport: Arc<dyn RpcSubmitTransport>) -> Self {
120        self.rpc_transport = Some(transport);
121        self
122    }
123
124    /// Sets direct transport.
125    #[must_use]
126    pub fn with_direct_transport(mut self, transport: Arc<dyn DirectSubmitTransport>) -> Self {
127        self.direct_transport = Some(transport);
128        self
129    }
130
131    /// Sets Jito transport.
132    #[must_use]
133    pub fn with_jito_transport(mut self, transport: Arc<dyn JitoSubmitTransport>) -> Self {
134        self.jito_transport = Some(transport);
135        self
136    }
137
138    /// Sets RPC submit tuning.
139    #[must_use]
140    pub fn with_rpc_config(mut self, config: RpcSubmitConfig) -> Self {
141        self.rpc_config = config;
142        self
143    }
144
145    /// Sets Jito submit tuning.
146    #[must_use]
147    pub const fn with_jito_config(mut self, config: JitoSubmitConfig) -> Self {
148        self.jito_config = config;
149        self
150    }
151
152    /// Sets direct submit tuning.
153    #[must_use]
154    pub const fn with_direct_config(mut self, config: DirectSubmitConfig) -> Self {
155        self.direct_config = config.normalized();
156        self
157    }
158
159    /// Sets direct/hybrid reliability profile.
160    #[must_use]
161    pub const fn with_reliability(mut self, reliability: SubmitReliability) -> Self {
162        self.direct_config = DirectSubmitConfig::from_reliability(reliability);
163        self
164    }
165
166    /// Sets the toxic-flow guard source used before submission.
167    #[must_use]
168    pub fn with_flow_safety_source(mut self, source: Arc<dyn TxFlowSafetySource>) -> Self {
169        self.flow_safety_source = Some(source);
170        self
171    }
172
173    /// Sets the toxic-flow guard policy.
174    #[must_use]
175    pub const fn with_guard_policy(mut self, policy: TxSubmitGuardPolicy) -> Self {
176        self.guard_policy = policy;
177        self
178    }
179
180    /// Sets an optional external outcome reporter.
181    #[must_use]
182    pub fn with_outcome_reporter(mut self, reporter: Arc<dyn TxSubmitOutcomeReporter>) -> Self {
183        self.outcome_reporter = Some(reporter);
184        self
185    }
186
187    /// Returns the current built-in toxic-flow telemetry snapshot.
188    #[must_use]
189    pub fn toxic_flow_telemetry(&self) -> TxToxicFlowTelemetrySnapshot {
190        self.telemetry.snapshot()
191    }
192
193    /// Records one external terminal outcome against the built-in telemetry and optional reporter.
194    pub fn record_external_outcome(&self, outcome: &TxSubmitOutcome) {
195        self.telemetry.record(outcome);
196        if let Some(reporter) = &self.outcome_reporter {
197            reporter.record_outcome(outcome);
198        }
199    }
200
201    /// Builds, signs, and submits a transaction in one API call.
202    ///
203    /// # Errors
204    ///
205    /// Returns [`SubmitError`] when blockhash lookup, signing, dedupe, routing, or submission
206    /// fails.
207    pub async fn submit_builder<T>(
208        &mut self,
209        builder: TxBuilder,
210        signers: &T,
211        mode: SubmitMode,
212    ) -> Result<SubmitResult, SubmitError>
213    where
214        T: Signers + ?Sized,
215    {
216        let blockhash = self
217            .blockhash_provider
218            .latest_blockhash()
219            .ok_or(SubmitError::MissingRecentBlockhash)?;
220        let tx = builder
221            .build_and_sign(blockhash, signers)
222            .map_err(|source| SubmitError::Build { source })?;
223        self.submit_transaction_with_context(tx, mode, TxSubmitContext::default())
224            .await
225    }
226
227    /// Builds, signs, and submits a transaction with explicit toxic-flow context.
228    ///
229    /// # Errors
230    ///
231    /// Returns [`SubmitError`] when blockhash lookup, signing, dedupe, toxic-flow guards,
232    /// routing, or submission fails.
233    pub async fn submit_builder_with_context<T>(
234        &mut self,
235        builder: TxBuilder,
236        signers: &T,
237        mode: SubmitMode,
238        context: TxSubmitContext,
239    ) -> Result<SubmitResult, SubmitError>
240    where
241        T: Signers + ?Sized,
242    {
243        let blockhash = self
244            .blockhash_provider
245            .latest_blockhash()
246            .ok_or(SubmitError::MissingRecentBlockhash)?;
247        let tx = builder
248            .build_and_sign(blockhash, signers)
249            .map_err(|source| SubmitError::Build { source })?;
250        self.submit_transaction_with_context(tx, mode, context)
251            .await
252    }
253
254    /// Submits one signed `VersionedTransaction`.
255    ///
256    /// # Errors
257    ///
258    /// Returns [`SubmitError`] when encoding, dedupe, routing, or submission fails.
259    pub async fn submit_transaction(
260        &mut self,
261        tx: VersionedTransaction,
262        mode: SubmitMode,
263    ) -> Result<SubmitResult, SubmitError> {
264        self.submit_transaction_with_context(tx, mode, TxSubmitContext::default())
265            .await
266    }
267
268    /// Submits one signed `VersionedTransaction` with explicit toxic-flow context.
269    ///
270    /// # Errors
271    ///
272    /// Returns [`SubmitError`] when encoding, dedupe, toxic-flow guards, routing, or submission
273    /// fails.
274    pub async fn submit_transaction_with_context(
275        &mut self,
276        tx: VersionedTransaction,
277        mode: SubmitMode,
278        context: TxSubmitContext,
279    ) -> Result<SubmitResult, SubmitError> {
280        let signature = tx.signatures.first().copied();
281        let tx_bytes =
282            bincode::serialize(&tx).map_err(|source| SubmitError::DecodeSignedBytes { source })?;
283        self.submit_bytes(tx_bytes, signature, mode, context).await
284    }
285
286    /// Submits externally signed transaction bytes.
287    ///
288    /// # Errors
289    ///
290    /// Returns [`SubmitError`] when decoding, dedupe, routing, or submission fails.
291    pub async fn submit_signed(
292        &mut self,
293        signed_tx: SignedTx,
294        mode: SubmitMode,
295    ) -> Result<SubmitResult, SubmitError> {
296        self.submit_signed_with_context(signed_tx, mode, TxSubmitContext::default())
297            .await
298    }
299
300    /// Submits externally signed transaction bytes with explicit toxic-flow context.
301    ///
302    /// # Errors
303    ///
304    /// Returns [`SubmitError`] when decoding, dedupe, toxic-flow guards, routing, or submission
305    /// fails.
306    pub async fn submit_signed_with_context(
307        &mut self,
308        signed_tx: SignedTx,
309        mode: SubmitMode,
310        context: TxSubmitContext,
311    ) -> Result<SubmitResult, SubmitError> {
312        let tx_bytes = match signed_tx {
313            SignedTx::VersionedTransactionBytes(bytes) => bytes,
314            SignedTx::WireTransactionBytes(bytes) => bytes,
315        };
316        let tx: VersionedTransaction = bincode::deserialize(&tx_bytes)
317            .map_err(|source| SubmitError::DecodeSignedBytes { source })?;
318        let signature = tx.signatures.first().copied();
319        self.submit_bytes(tx_bytes, signature, mode, context).await
320    }
321
322    /// Submits raw tx bytes after dedupe check.
323    async fn submit_bytes(
324        &mut self,
325        tx_bytes: Vec<u8>,
326        signature: Option<Signature>,
327        mode: SubmitMode,
328        context: TxSubmitContext,
329    ) -> Result<SubmitResult, SubmitError> {
330        self.enforce_toxic_flow_guards(signature, mode, &context)?;
331        self.enforce_dedupe(signature)?;
332        match mode {
333            SubmitMode::RpcOnly => self.submit_rpc_only(tx_bytes, signature, mode).await,
334            SubmitMode::JitoOnly => self.submit_jito_only(tx_bytes, signature, mode).await,
335            SubmitMode::DirectOnly => self.submit_direct_only(tx_bytes, signature, mode).await,
336            SubmitMode::Hybrid => self.submit_hybrid(tx_bytes, signature, mode).await,
337        }
338    }
339
340    /// Applies signature dedupe policy.
341    fn enforce_dedupe(&mut self, signature: Option<Signature>) -> Result<(), SubmitError> {
342        if let Some(signature) = signature {
343            let now = Instant::now();
344            if !self.deduper.check_and_insert(signature, now) {
345                return Err(SubmitError::DuplicateSignature);
346            }
347        }
348        Ok(())
349    }
350
351    /// Applies toxic-flow guard policy before transport.
352    fn enforce_toxic_flow_guards(
353        &mut self,
354        signature: Option<Signature>,
355        mode: SubmitMode,
356        context: &TxSubmitContext,
357    ) -> Result<(), SubmitError> {
358        let now = SystemTime::now();
359        let opportunity_age_ms = context
360            .opportunity_created_at
361            .and_then(|created_at| now.duration_since(created_at).ok())
362            .map(|duration| duration.as_millis().min(u128::from(u64::MAX)) as u64);
363        if let Some(age_ms) = opportunity_age_ms
364            && let Some(max_age) = self.guard_policy.max_opportunity_age
365        {
366            let max_allowed_ms = max_age.as_millis().min(u128::from(u64::MAX)) as u64;
367            if age_ms > max_allowed_ms {
368                return Err(self.reject_with_outcome(
369                    TxToxicFlowRejectionReason::OpportunityStale {
370                        age_ms,
371                        max_allowed_ms,
372                    },
373                    TxSubmitOutcomeKind::RejectedDueToStaleness,
374                    signature,
375                    mode,
376                    None,
377                    opportunity_age_ms,
378                ));
379            }
380        }
381
382        if self.suppression.is_suppressed(
383            &context.suppression_keys,
384            now,
385            self.guard_policy.suppression_ttl,
386        ) {
387            return Err(self.reject_with_outcome(
388                TxToxicFlowRejectionReason::Suppressed,
389                TxSubmitOutcomeKind::Suppressed,
390                signature,
391                mode,
392                None,
393                opportunity_age_ms,
394            ));
395        }
396
397        if let Some(source) = &self.flow_safety_source {
398            let snapshot = source.toxic_flow_snapshot();
399            if self.guard_policy.reject_on_replay_recovery_pending
400                && snapshot.replay_recovery_pending
401            {
402                return Err(self.reject_with_outcome(
403                    TxToxicFlowRejectionReason::ReplayRecoveryPending,
404                    TxSubmitOutcomeKind::RejectedDueToReplayRecovery,
405                    signature,
406                    mode,
407                    snapshot.current_state_version,
408                    opportunity_age_ms,
409                ));
410            }
411            if self.guard_policy.require_stable_control_plane
412                && !matches!(snapshot.quality, TxFlowSafetyQuality::Stable)
413            {
414                let outcome_kind = match snapshot.quality {
415                    TxFlowSafetyQuality::ReorgRisk | TxFlowSafetyQuality::Provisional => {
416                        TxSubmitOutcomeKind::RejectedDueToReorgRisk
417                    }
418                    TxFlowSafetyQuality::Stale => TxSubmitOutcomeKind::RejectedDueToStaleness,
419                    TxFlowSafetyQuality::Degraded
420                    | TxFlowSafetyQuality::IncompleteControlPlane
421                    | TxFlowSafetyQuality::Stable => TxSubmitOutcomeKind::Suppressed,
422                };
423                return Err(self.reject_with_outcome(
424                    TxToxicFlowRejectionReason::UnsafeControlPlane {
425                        quality: snapshot.quality,
426                    },
427                    outcome_kind,
428                    signature,
429                    mode,
430                    snapshot.current_state_version,
431                    opportunity_age_ms,
432                ));
433            }
434            if let (Some(decision_version), Some(current_version), Some(max_allowed)) = (
435                context.decision_state_version,
436                snapshot.current_state_version,
437                self.guard_policy.max_state_version_drift,
438            ) {
439                let drift = current_version.saturating_sub(decision_version);
440                if drift > max_allowed {
441                    return Err(self.reject_with_outcome(
442                        TxToxicFlowRejectionReason::StateDrift { drift, max_allowed },
443                        TxSubmitOutcomeKind::RejectedDueToStateDrift,
444                        signature,
445                        mode,
446                        Some(current_version),
447                        opportunity_age_ms,
448                    ));
449                }
450            }
451        }
452
453        self.suppression.insert_all(&context.suppression_keys, now);
454        Ok(())
455    }
456
457    /// Builds one rejection error while recording telemetry and reporting.
458    fn reject_with_outcome(
459        &self,
460        reason: TxToxicFlowRejectionReason,
461        outcome_kind: TxSubmitOutcomeKind,
462        signature: Option<Signature>,
463        mode: SubmitMode,
464        state_version: Option<u64>,
465        opportunity_age_ms: Option<u64>,
466    ) -> SubmitError {
467        let outcome = TxSubmitOutcome {
468            kind: outcome_kind,
469            signature,
470            mode,
471            state_version,
472            opportunity_age_ms,
473        };
474        self.record_external_outcome(&outcome);
475        SubmitError::ToxicFlow { reason }
476    }
477
478    /// Submits through RPC path only.
479    async fn submit_rpc_only(
480        &self,
481        tx_bytes: Vec<u8>,
482        signature: Option<Signature>,
483        mode: SubmitMode,
484    ) -> Result<SubmitResult, SubmitError> {
485        let rpc = self
486            .rpc_transport
487            .as_ref()
488            .ok_or(SubmitError::MissingRpcTransport)?;
489        let rpc_signature = rpc
490            .submit_rpc(&tx_bytes, &self.rpc_config)
491            .await
492            .map_err(|source| SubmitError::Rpc { source })?;
493        self.record_external_outcome(&TxSubmitOutcome {
494            kind: TxSubmitOutcomeKind::RpcAccepted,
495            signature,
496            mode,
497            state_version: self
498                .flow_safety_source
499                .as_ref()
500                .and_then(|source| source.toxic_flow_snapshot().current_state_version),
501            opportunity_age_ms: None,
502        });
503        Ok(SubmitResult {
504            signature,
505            mode,
506            direct_target: None,
507            rpc_signature: Some(rpc_signature),
508            jito_signature: None,
509            jito_bundle_id: None,
510            used_rpc_fallback: false,
511            selected_target_count: 0,
512            selected_identity_count: 0,
513        })
514    }
515
516    /// Submits through Jito block-engine path only.
517    async fn submit_jito_only(
518        &self,
519        tx_bytes: Vec<u8>,
520        signature: Option<Signature>,
521        mode: SubmitMode,
522    ) -> Result<SubmitResult, SubmitError> {
523        let jito = self
524            .jito_transport
525            .as_ref()
526            .ok_or(SubmitError::MissingJitoTransport)?;
527        let jito_response = jito
528            .submit_jito(&tx_bytes, &self.jito_config)
529            .await
530            .map_err(|source| SubmitError::Jito { source })?;
531        self.record_external_outcome(&TxSubmitOutcome {
532            kind: TxSubmitOutcomeKind::JitoAccepted,
533            signature,
534            mode,
535            state_version: self
536                .flow_safety_source
537                .as_ref()
538                .and_then(|source| source.toxic_flow_snapshot().current_state_version),
539            opportunity_age_ms: None,
540        });
541        Ok(SubmitResult {
542            signature,
543            mode,
544            direct_target: None,
545            rpc_signature: None,
546            jito_signature: jito_response.transaction_signature,
547            jito_bundle_id: jito_response.bundle_id,
548            used_rpc_fallback: false,
549            selected_target_count: 0,
550            selected_identity_count: 0,
551        })
552    }
553
554    /// Submits through direct path only.
555    async fn submit_direct_only(
556        &self,
557        tx_bytes: Vec<u8>,
558        signature: Option<Signature>,
559        mode: SubmitMode,
560    ) -> Result<SubmitResult, SubmitError> {
561        let direct = self
562            .direct_transport
563            .as_ref()
564            .ok_or(SubmitError::MissingDirectTransport)?;
565        let direct_config = self.direct_config.clone().normalized();
566        let mut last_error = None;
567        let attempt_timeout = direct_attempt_timeout(&direct_config);
568
569        for attempt_idx in 0..direct_config.direct_submit_attempts {
570            let mut targets = self.select_direct_targets(&direct_config).await;
571            rotate_targets_for_attempt(&mut targets, attempt_idx, self.policy);
572            let (selected_target_count, selected_identity_count) = summarize_targets(&targets);
573            if targets.is_empty() {
574                return Err(SubmitError::NoDirectTargets);
575            }
576            match timeout(
577                attempt_timeout,
578                direct.submit_direct(&tx_bytes, &targets, self.policy, &direct_config),
579            )
580            .await
581            {
582                Ok(Ok(target)) => {
583                    self.record_external_outcome(&TxSubmitOutcome {
584                        kind: TxSubmitOutcomeKind::DirectAccepted,
585                        signature,
586                        mode,
587                        state_version: self
588                            .flow_safety_source
589                            .as_ref()
590                            .and_then(|source| source.toxic_flow_snapshot().current_state_version),
591                        opportunity_age_ms: None,
592                    });
593                    self.spawn_agave_rebroadcast(Arc::from(tx_bytes), &direct_config);
594                    return Ok(SubmitResult {
595                        signature,
596                        mode,
597                        direct_target: Some(target),
598                        rpc_signature: None,
599                        jito_signature: None,
600                        jito_bundle_id: None,
601                        used_rpc_fallback: false,
602                        selected_target_count,
603                        selected_identity_count,
604                    });
605                }
606                Ok(Err(source)) => last_error = Some(source),
607                Err(_elapsed) => {
608                    last_error = Some(super::SubmitTransportError::Failure {
609                        message: format!(
610                            "direct submit attempt timed out after {}ms",
611                            attempt_timeout.as_millis()
612                        ),
613                    });
614                }
615            }
616            if attempt_idx < direct_config.direct_submit_attempts.saturating_sub(1) {
617                sleep(direct_config.rebroadcast_interval).await;
618            }
619        }
620
621        Err(SubmitError::Direct {
622            source: last_error.unwrap_or_else(|| super::SubmitTransportError::Failure {
623                message: "direct submit attempts exhausted".to_owned(),
624            }),
625        })
626    }
627
628    /// Submits through hybrid mode (direct first, RPC fallback).
629    async fn submit_hybrid(
630        &self,
631        tx_bytes: Vec<u8>,
632        signature: Option<Signature>,
633        mode: SubmitMode,
634    ) -> Result<SubmitResult, SubmitError> {
635        let direct = self
636            .direct_transport
637            .as_ref()
638            .ok_or(SubmitError::MissingDirectTransport)?;
639        let rpc = self
640            .rpc_transport
641            .as_ref()
642            .ok_or(SubmitError::MissingRpcTransport)?;
643
644        let direct_config = self.direct_config.clone().normalized();
645        let attempt_timeout = direct_attempt_timeout(&direct_config);
646        for attempt_idx in 0..direct_config.hybrid_direct_attempts {
647            let mut targets = self.select_direct_targets(&direct_config).await;
648            rotate_targets_for_attempt(&mut targets, attempt_idx, self.policy);
649            let (selected_target_count, selected_identity_count) = summarize_targets(&targets);
650            if targets.is_empty() {
651                break;
652            }
653            if let Ok(Ok(target)) = timeout(
654                attempt_timeout,
655                direct.submit_direct(&tx_bytes, &targets, self.policy, &direct_config),
656            )
657            .await
658            {
659                let tx_bytes = Arc::<[u8]>::from(tx_bytes);
660                self.spawn_agave_rebroadcast(Arc::clone(&tx_bytes), &direct_config);
661                if direct_config.hybrid_rpc_broadcast
662                    && let Ok(rpc_signature) =
663                        rpc.submit_rpc(tx_bytes.as_ref(), &self.rpc_config).await
664                {
665                    self.record_external_outcome(&TxSubmitOutcome {
666                        kind: TxSubmitOutcomeKind::DirectAccepted,
667                        signature,
668                        mode,
669                        state_version: self
670                            .flow_safety_source
671                            .as_ref()
672                            .and_then(|source| source.toxic_flow_snapshot().current_state_version),
673                        opportunity_age_ms: None,
674                    });
675                    return Ok(SubmitResult {
676                        signature,
677                        mode,
678                        direct_target: Some(target),
679                        rpc_signature: Some(rpc_signature),
680                        jito_signature: None,
681                        jito_bundle_id: None,
682                        used_rpc_fallback: false,
683                        selected_target_count,
684                        selected_identity_count,
685                    });
686                }
687                self.record_external_outcome(&TxSubmitOutcome {
688                    kind: TxSubmitOutcomeKind::DirectAccepted,
689                    signature,
690                    mode,
691                    state_version: self
692                        .flow_safety_source
693                        .as_ref()
694                        .and_then(|source| source.toxic_flow_snapshot().current_state_version),
695                    opportunity_age_ms: None,
696                });
697                return Ok(SubmitResult {
698                    signature,
699                    mode,
700                    direct_target: Some(target),
701                    rpc_signature: None,
702                    jito_signature: None,
703                    jito_bundle_id: None,
704                    used_rpc_fallback: false,
705                    selected_target_count,
706                    selected_identity_count,
707                });
708            }
709            if attempt_idx < direct_config.hybrid_direct_attempts.saturating_sub(1) {
710                sleep(direct_config.rebroadcast_interval).await;
711            }
712        }
713
714        let rpc_signature = rpc
715            .submit_rpc(&tx_bytes, &self.rpc_config)
716            .await
717            .map_err(|source| SubmitError::Rpc { source })?;
718        self.record_external_outcome(&TxSubmitOutcome {
719            kind: TxSubmitOutcomeKind::RpcAccepted,
720            signature,
721            mode,
722            state_version: self
723                .flow_safety_source
724                .as_ref()
725                .and_then(|source| source.toxic_flow_snapshot().current_state_version),
726            opportunity_age_ms: None,
727        });
728        Ok(SubmitResult {
729            signature,
730            mode,
731            direct_target: None,
732            rpc_signature: Some(rpc_signature),
733            jito_signature: None,
734            jito_bundle_id: None,
735            used_rpc_fallback: true,
736            selected_target_count: 0,
737            selected_identity_count: 0,
738        })
739    }
740
741    /// Resolves and ranks the direct targets for the next submission attempt.
742    async fn select_direct_targets(&self, direct_config: &DirectSubmitConfig) -> Vec<LeaderTarget> {
743        select_and_rank_targets(
744            self.leader_provider.as_ref(),
745            &self.backups,
746            self.policy,
747            direct_config,
748        )
749        .await
750    }
751
752    /// Starts the post-ack rebroadcast worker when that reliability mode is enabled.
753    fn spawn_agave_rebroadcast(&self, tx_bytes: Arc<[u8]>, direct_config: &DirectSubmitConfig) {
754        if !direct_config.agave_rebroadcast_enabled
755            || direct_config.agave_rebroadcast_window.is_zero()
756        {
757            return;
758        }
759        let Some(direct_transport) = self.direct_transport.clone() else {
760            return;
761        };
762        spawn_agave_rebroadcast_task(
763            tx_bytes,
764            direct_transport,
765            self.leader_provider.clone(),
766            self.backups.clone(),
767            self.policy,
768            direct_config.clone(),
769        );
770    }
771}
772
773#[cfg(not(test))]
774/// Replays successful direct submissions for a bounded Agave-like persistence window.
775fn spawn_agave_rebroadcast_task(
776    tx_bytes: Arc<[u8]>,
777    direct_transport: Arc<dyn DirectSubmitTransport>,
778    leader_provider: Arc<dyn LeaderProvider>,
779    backups: Vec<LeaderTarget>,
780    policy: RoutingPolicy,
781    direct_config: DirectSubmitConfig,
782) {
783    tokio::spawn(async move {
784        let deadline = Instant::now()
785            .checked_add(direct_config.agave_rebroadcast_window)
786            .unwrap_or_else(Instant::now);
787        loop {
788            let now = Instant::now();
789            if now >= deadline {
790                break;
791            }
792
793            let sleep_for = deadline
794                .saturating_duration_since(now)
795                .min(direct_config.agave_rebroadcast_interval);
796            if !sleep_for.is_zero() {
797                sleep(sleep_for).await;
798            }
799
800            if Instant::now() >= deadline {
801                break;
802            }
803
804            let targets = select_and_rank_targets(
805                leader_provider.as_ref(),
806                backups.as_slice(),
807                policy,
808                &direct_config,
809            )
810            .await;
811            if targets.is_empty() {
812                continue;
813            }
814
815            drop(
816                timeout(
817                    direct_attempt_timeout(&direct_config),
818                    direct_transport.submit_direct(
819                        tx_bytes.as_ref(),
820                        &targets,
821                        policy,
822                        &direct_config,
823                    ),
824                )
825                .await,
826            );
827        }
828    });
829}
830
831#[cfg(test)]
832/// Test-only stub that disables background rebroadcasting for deterministic assertions.
833fn spawn_agave_rebroadcast_task(
834    _tx_bytes: Arc<[u8]>,
835    _direct_transport: Arc<dyn DirectSubmitTransport>,
836    _leader_provider: Arc<dyn LeaderProvider>,
837    _backups: Vec<LeaderTarget>,
838    _policy: RoutingPolicy,
839    _direct_config: DirectSubmitConfig,
840) {
841}
842
843/// Selects routing targets and applies optional latency-aware ranking.
844async fn select_and_rank_targets(
845    leader_provider: &(impl LeaderProvider + ?Sized),
846    backups: &[LeaderTarget],
847    policy: RoutingPolicy,
848    direct_config: &DirectSubmitConfig,
849) -> Vec<LeaderTarget> {
850    let targets = select_targets(leader_provider, backups, policy);
851    rank_targets_by_latency(targets, direct_config).await
852}
853
854/// Reorders the probe set by observed TCP connect latency while preserving the tail order.
855async fn rank_targets_by_latency(
856    targets: Vec<LeaderTarget>,
857    direct_config: &DirectSubmitConfig,
858) -> Vec<LeaderTarget> {
859    if targets.len() <= 1 || !direct_config.latency_aware_targeting {
860        return targets;
861    }
862
863    let probe_timeout = direct_config.latency_probe_timeout;
864    let probe_port = direct_config.latency_probe_port;
865    let probe_count = targets
866        .len()
867        .min(direct_config.latency_probe_max_targets.max(1));
868    let mut latencies = vec![None; probe_count];
869    let mut probes = JoinSet::new();
870    for (idx, target) in targets.iter().take(probe_count).cloned().enumerate() {
871        probes.spawn(async move {
872            (
873                idx,
874                probe_target_latency(&target, probe_port, probe_timeout).await,
875            )
876        });
877    }
878    while let Some(result) = probes.join_next().await {
879        if let Ok((idx, latency)) = result
880            && idx < latencies.len()
881            && let Some(slot) = latencies.get_mut(idx)
882        {
883            *slot = latency;
884        }
885    }
886
887    let mut ranked = targets
888        .iter()
889        .take(probe_count)
890        .cloned()
891        .enumerate()
892        .collect::<Vec<_>>();
893    ranked.sort_by_key(|(idx, _target)| {
894        (
895            latencies.get(*idx).copied().flatten().unwrap_or(u128::MAX),
896            *idx,
897        )
898    });
899
900    let mut output = ranked
901        .into_iter()
902        .map(|(_idx, target)| target)
903        .collect::<Vec<_>>();
904    output.extend(targets.iter().skip(probe_count).cloned());
905    output
906}
907
908/// Probes a target's candidate ports and keeps the best observed connect latency.
909async fn probe_target_latency(
910    target: &LeaderTarget,
911    probe_port: Option<u16>,
912    probe_timeout: Duration,
913) -> Option<u128> {
914    let mut ports = vec![target.tpu_addr.port()];
915    if let Some(port) = probe_port
916        && port != target.tpu_addr.port()
917    {
918        ports.push(port);
919    }
920
921    let ip = target.tpu_addr.ip();
922    let mut best = None::<u128>;
923    for port in ports {
924        if let Some(latency) = probe_tcp_latency(ip, port, probe_timeout).await {
925            best = Some(best.map_or(latency, |current| current.min(latency)));
926        }
927    }
928    best
929}
930
931/// Measures one TCP connect attempt and returns elapsed milliseconds on success.
932async fn probe_tcp_latency(
933    ip: std::net::IpAddr,
934    port: u16,
935    timeout_duration: Duration,
936) -> Option<u128> {
937    let start = Instant::now();
938    let addr = SocketAddr::new(ip, port);
939    let stream = timeout(timeout_duration, TcpStream::connect(addr))
940        .await
941        .ok()?
942        .ok()?;
943    drop(stream);
944    Some(start.elapsed().as_millis())
945}
946
947/// Summarizes the selected target list for observability.
948fn summarize_targets(targets: &[LeaderTarget]) -> (usize, usize) {
949    let selected_target_count = targets.len();
950    let selected_identity_count = targets
951        .iter()
952        .filter_map(|target| target.identity)
953        .collect::<HashSet<_>>()
954        .len();
955    (selected_target_count, selected_identity_count)
956}
957
958/// Rotates the target ordering between attempts to spread retries across candidates.
959fn rotate_targets_for_attempt(
960    targets: &mut [LeaderTarget],
961    attempt_idx: usize,
962    policy: RoutingPolicy,
963) {
964    if attempt_idx == 0 || targets.len() <= 1 {
965        return;
966    }
967
968    let normalized = policy.normalized();
969    let stride = normalized.max_parallel_sends.max(1);
970    let rotation = attempt_idx
971        .saturating_mul(stride)
972        .checked_rem(targets.len())
973        .unwrap_or(0);
974    if rotation > 0 {
975        targets.rotate_left(rotation);
976    }
977}
978
979/// Bounds one submit attempt so retry loops cannot hang indefinitely.
980fn direct_attempt_timeout(direct_config: &DirectSubmitConfig) -> Duration {
981    direct_config
982        .global_timeout
983        .saturating_add(direct_config.per_target_timeout)
984        .saturating_add(direct_config.rebroadcast_interval)
985        .max(Duration::from_secs(8))
986}