Skip to main content

sof_tx/submit/
client.rs

1//! Submission client implementation and mode orchestration.
2
3use std::{
4    collections::HashSet,
5    net::SocketAddr,
6    sync::Arc,
7    time::{Duration, Instant, SystemTime},
8};
9
10use solana_signature::Signature;
11use solana_signer::signers::Signers;
12use solana_transaction::versioned::VersionedTransaction;
13use tokio::{
14    net::TcpStream,
15    task::JoinSet,
16    time::{sleep, timeout},
17};
18
19use super::{
20    DirectSubmitConfig, DirectSubmitTransport, JitoSubmitConfig, JitoSubmitTransport,
21    RpcSubmitConfig, RpcSubmitTransport, SignedTx, SubmitError, SubmitMode, SubmitReliability,
22    SubmitResult, SubmitTransportError, TxFlowSafetyQuality, TxFlowSafetySource,
23    TxSubmitClientBuilder, TxSubmitContext, TxSubmitGuardPolicy, TxSubmitOutcome,
24    TxSubmitOutcomeKind, TxSubmitOutcomeReporter, TxToxicFlowRejectionReason, TxToxicFlowTelemetry,
25    TxToxicFlowTelemetrySnapshot,
26};
27use crate::{
28    builder::TxBuilder,
29    providers::{
30        LeaderProvider, LeaderTarget, RecentBlockhashProvider, RpcRecentBlockhashProvider,
31        StaticLeaderProvider,
32    },
33    routing::{RoutingPolicy, SignatureDeduper, select_targets},
34    submit::{JsonRpcTransport, types::TxSuppressionCache},
35};
36
37/// Transaction submission client that orchestrates RPC and direct submit modes.
38pub struct TxSubmitClient {
39    /// Blockhash source used by unsigned submit path.
40    blockhash_provider: Arc<dyn RecentBlockhashProvider>,
41    /// Optional RPC-backed blockhash source refreshed on demand before unsigned submit.
42    on_demand_blockhash_provider: Option<Arc<RpcRecentBlockhashProvider>>,
43    /// Leader source used by direct/hybrid paths.
44    leader_provider: Arc<dyn LeaderProvider>,
45    /// Optional backup validator targets.
46    backups: Vec<LeaderTarget>,
47    /// Direct routing policy.
48    policy: RoutingPolicy,
49    /// Signature dedupe window.
50    deduper: SignatureDeduper,
51    /// Optional RPC transport.
52    rpc_transport: Option<Arc<dyn RpcSubmitTransport>>,
53    /// Optional direct transport.
54    direct_transport: Option<Arc<dyn DirectSubmitTransport>>,
55    /// Optional Jito transport.
56    jito_transport: Option<Arc<dyn JitoSubmitTransport>>,
57    /// RPC tuning.
58    rpc_config: RpcSubmitConfig,
59    /// Jito tuning.
60    jito_config: JitoSubmitConfig,
61    /// Direct tuning.
62    direct_config: DirectSubmitConfig,
63    /// Optional toxic-flow guard source.
64    flow_safety_source: Option<Arc<dyn TxFlowSafetySource>>,
65    /// Guard policy applied before submit.
66    guard_policy: TxSubmitGuardPolicy,
67    /// Built-in suppression keys.
68    suppression: TxSuppressionCache,
69    /// Built-in toxic-flow telemetry sink.
70    telemetry: Arc<TxToxicFlowTelemetry>,
71    /// Optional external outcome reporter.
72    outcome_reporter: Option<Arc<dyn TxSubmitOutcomeReporter>>,
73}
74
75impl TxSubmitClient {
76    /// Creates a high-level builder for common submit configurations.
77    #[must_use]
78    pub fn builder() -> TxSubmitClientBuilder {
79        TxSubmitClientBuilder::new()
80    }
81
82    /// Creates a submission client with no transports preconfigured.
83    #[must_use]
84    pub fn new(
85        blockhash_provider: Arc<dyn RecentBlockhashProvider>,
86        leader_provider: Arc<dyn LeaderProvider>,
87    ) -> Self {
88        Self {
89            blockhash_provider,
90            on_demand_blockhash_provider: None,
91            leader_provider,
92            backups: Vec::new(),
93            policy: RoutingPolicy::default(),
94            deduper: SignatureDeduper::new(Duration::from_secs(10)),
95            rpc_transport: None,
96            direct_transport: None,
97            jito_transport: None,
98            rpc_config: RpcSubmitConfig::default(),
99            jito_config: JitoSubmitConfig::default(),
100            direct_config: DirectSubmitConfig::default(),
101            flow_safety_source: None,
102            guard_policy: TxSubmitGuardPolicy::default(),
103            suppression: TxSuppressionCache::default(),
104            telemetry: TxToxicFlowTelemetry::shared(),
105            outcome_reporter: None,
106        }
107    }
108
109    /// Creates a client with an empty leader source for blockhash-only submit paths.
110    #[must_use]
111    pub fn blockhash_only(blockhash_provider: Arc<dyn RecentBlockhashProvider>) -> Self {
112        Self::new(
113            blockhash_provider,
114            Arc::new(StaticLeaderProvider::default()),
115        )
116    }
117
118    /// Creates a client with RPC-backed on-demand blockhash sourcing and no leader routing.
119    ///
120    /// # Errors
121    ///
122    /// Returns [`SubmitTransportError`] when the RPC-backed blockhash provider cannot be created.
123    pub fn blockhash_via_rpc(rpc_url: impl Into<String>) -> Result<Self, SubmitTransportError> {
124        let blockhash_provider = Arc::new(RpcRecentBlockhashProvider::new(rpc_url.into())?);
125        Ok(Self::blockhash_only(blockhash_provider.clone())
126            .with_rpc_blockhash_provider(blockhash_provider))
127    }
128
129    /// Creates an RPC-only client from one RPC URL used for both blockhash and submission.
130    ///
131    /// # Errors
132    ///
133    /// Returns [`SubmitTransportError`] when the RPC transport or blockhash provider
134    /// cannot be initialized.
135    pub fn rpc_only(rpc_url: impl Into<String>) -> Result<Self, SubmitTransportError> {
136        let rpc_url = rpc_url.into();
137        let client = Self::blockhash_via_rpc(rpc_url.clone())?;
138        let rpc_transport = Arc::new(JsonRpcTransport::new(rpc_url)?);
139        Ok(client.with_rpc_transport(rpc_transport))
140    }
141
142    /// Sets optional backup validators.
143    #[must_use]
144    pub fn with_backups(mut self, backups: Vec<LeaderTarget>) -> Self {
145        self.backups = backups;
146        self
147    }
148
149    /// Sets routing policy.
150    #[must_use]
151    pub fn with_routing_policy(mut self, policy: RoutingPolicy) -> Self {
152        self.policy = policy.normalized();
153        self
154    }
155
156    /// Sets dedupe TTL.
157    #[must_use]
158    pub fn with_dedupe_ttl(mut self, ttl: Duration) -> Self {
159        self.deduper = SignatureDeduper::new(ttl);
160        self
161    }
162
163    /// Sets RPC transport.
164    #[must_use]
165    pub fn with_rpc_transport(mut self, transport: Arc<dyn RpcSubmitTransport>) -> Self {
166        self.rpc_transport = Some(transport);
167        self
168    }
169
170    /// Sets direct transport.
171    #[must_use]
172    pub fn with_direct_transport(mut self, transport: Arc<dyn DirectSubmitTransport>) -> Self {
173        self.direct_transport = Some(transport);
174        self
175    }
176
177    /// Sets Jito transport.
178    #[must_use]
179    pub fn with_jito_transport(mut self, transport: Arc<dyn JitoSubmitTransport>) -> Self {
180        self.jito_transport = Some(transport);
181        self
182    }
183
184    /// Sets RPC submit tuning.
185    #[must_use]
186    pub fn with_rpc_config(mut self, config: RpcSubmitConfig) -> Self {
187        self.rpc_config = config;
188        self
189    }
190
191    /// Sets Jito submit tuning.
192    #[must_use]
193    pub const fn with_jito_config(mut self, config: JitoSubmitConfig) -> Self {
194        self.jito_config = config;
195        self
196    }
197
198    /// Sets direct submit tuning.
199    #[must_use]
200    pub const fn with_direct_config(mut self, config: DirectSubmitConfig) -> Self {
201        self.direct_config = config.normalized();
202        self
203    }
204
205    /// Sets direct/hybrid reliability profile.
206    #[must_use]
207    pub const fn with_reliability(mut self, reliability: SubmitReliability) -> Self {
208        self.direct_config = DirectSubmitConfig::from_reliability(reliability);
209        self
210    }
211
212    /// Sets the toxic-flow guard source used before submission.
213    #[must_use]
214    pub fn with_flow_safety_source(mut self, source: Arc<dyn TxFlowSafetySource>) -> Self {
215        self.flow_safety_source = Some(source);
216        self
217    }
218
219    /// Sets the toxic-flow guard policy.
220    #[must_use]
221    pub const fn with_guard_policy(mut self, policy: TxSubmitGuardPolicy) -> Self {
222        self.guard_policy = policy;
223        self
224    }
225
226    /// Sets an optional external outcome reporter.
227    #[must_use]
228    pub fn with_outcome_reporter(mut self, reporter: Arc<dyn TxSubmitOutcomeReporter>) -> Self {
229        self.outcome_reporter = Some(reporter);
230        self
231    }
232
233    /// Registers an RPC-backed blockhash provider to refresh on demand for unsigned submit paths.
234    #[must_use]
235    pub fn with_rpc_blockhash_provider(
236        mut self,
237        provider: Arc<RpcRecentBlockhashProvider>,
238    ) -> Self {
239        self.on_demand_blockhash_provider = Some(provider);
240        self
241    }
242
243    /// Returns the current built-in toxic-flow telemetry snapshot.
244    #[must_use]
245    pub fn toxic_flow_telemetry(&self) -> TxToxicFlowTelemetrySnapshot {
246        self.telemetry.snapshot()
247    }
248
249    /// Records one external terminal outcome against the built-in telemetry and optional reporter.
250    pub fn record_external_outcome(&self, outcome: &TxSubmitOutcome) {
251        self.telemetry.record(outcome);
252        if let Some(reporter) = &self.outcome_reporter {
253            reporter.record_outcome(outcome);
254        }
255    }
256
257    /// Builds, signs, and submits a transaction in one API call.
258    ///
259    /// # Errors
260    ///
261    /// Returns [`SubmitError`] when blockhash lookup, signing, dedupe, routing, or submission
262    /// fails.
263    pub async fn submit_unsigned<T>(
264        &mut self,
265        builder: TxBuilder,
266        signers: &T,
267        mode: SubmitMode,
268    ) -> Result<SubmitResult, SubmitError>
269    where
270        T: Signers + ?Sized,
271    {
272        if let Some(provider) = &self.on_demand_blockhash_provider {
273            provider
274                .refresh()
275                .await
276                .map_err(|source| SubmitError::Rpc { source })?;
277        }
278        let blockhash = self
279            .blockhash_provider
280            .latest_blockhash()
281            .ok_or(SubmitError::MissingRecentBlockhash)?;
282        let tx = builder
283            .build_and_sign(blockhash, signers)
284            .map_err(|source| SubmitError::Build { source })?;
285        self.submit_transaction_with_context(tx, mode, TxSubmitContext::default())
286            .await
287    }
288
289    /// Builds, signs, and submits a transaction with explicit toxic-flow context.
290    ///
291    /// # Errors
292    ///
293    /// Returns [`SubmitError`] when blockhash lookup, signing, dedupe, toxic-flow guards,
294    /// routing, or submission fails.
295    pub async fn submit_unsigned_with_context<T>(
296        &mut self,
297        builder: TxBuilder,
298        signers: &T,
299        mode: SubmitMode,
300        context: TxSubmitContext,
301    ) -> Result<SubmitResult, SubmitError>
302    where
303        T: Signers + ?Sized,
304    {
305        if let Some(provider) = &self.on_demand_blockhash_provider {
306            provider
307                .refresh()
308                .await
309                .map_err(|source| SubmitError::Rpc { source })?;
310        }
311        let blockhash = self
312            .blockhash_provider
313            .latest_blockhash()
314            .ok_or(SubmitError::MissingRecentBlockhash)?;
315        let tx = builder
316            .build_and_sign(blockhash, signers)
317            .map_err(|source| SubmitError::Build { source })?;
318        self.submit_transaction_with_context(tx, mode, context)
319            .await
320    }
321
322    /// Submits one signed `VersionedTransaction`.
323    ///
324    /// # Errors
325    ///
326    /// Returns [`SubmitError`] when encoding, dedupe, routing, or submission fails.
327    pub async fn submit_transaction(
328        &mut self,
329        tx: VersionedTransaction,
330        mode: SubmitMode,
331    ) -> Result<SubmitResult, SubmitError> {
332        self.submit_transaction_with_context(tx, mode, TxSubmitContext::default())
333            .await
334    }
335
336    /// Submits one signed `VersionedTransaction` with explicit toxic-flow context.
337    ///
338    /// # Errors
339    ///
340    /// Returns [`SubmitError`] when encoding, dedupe, toxic-flow guards, routing, or submission
341    /// fails.
342    pub async fn submit_transaction_with_context(
343        &mut self,
344        tx: VersionedTransaction,
345        mode: SubmitMode,
346        context: TxSubmitContext,
347    ) -> Result<SubmitResult, SubmitError> {
348        let signature = tx.signatures.first().copied();
349        let tx_bytes =
350            bincode::serialize(&tx).map_err(|source| SubmitError::DecodeSignedBytes { source })?;
351        self.submit_bytes(tx_bytes, signature, mode, context).await
352    }
353
354    /// Submits externally signed transaction bytes.
355    ///
356    /// # Errors
357    ///
358    /// Returns [`SubmitError`] when decoding, dedupe, routing, or submission fails.
359    pub async fn submit_signed(
360        &mut self,
361        signed_tx: SignedTx,
362        mode: SubmitMode,
363    ) -> Result<SubmitResult, SubmitError> {
364        self.submit_signed_with_context(signed_tx, mode, TxSubmitContext::default())
365            .await
366    }
367
368    /// Submits externally signed transaction bytes with explicit toxic-flow context.
369    ///
370    /// # Errors
371    ///
372    /// Returns [`SubmitError`] when decoding, dedupe, toxic-flow guards, routing, or submission
373    /// fails.
374    pub async fn submit_signed_with_context(
375        &mut self,
376        signed_tx: SignedTx,
377        mode: SubmitMode,
378        context: TxSubmitContext,
379    ) -> Result<SubmitResult, SubmitError> {
380        let tx_bytes = match signed_tx {
381            SignedTx::VersionedTransactionBytes(bytes) => bytes,
382            SignedTx::WireTransactionBytes(bytes) => bytes,
383        };
384        let tx: VersionedTransaction = bincode::deserialize(&tx_bytes)
385            .map_err(|source| SubmitError::DecodeSignedBytes { source })?;
386        let signature = tx.signatures.first().copied();
387        self.submit_bytes(tx_bytes, signature, mode, context).await
388    }
389
390    /// Submits raw tx bytes after dedupe check.
391    async fn submit_bytes(
392        &mut self,
393        tx_bytes: Vec<u8>,
394        signature: Option<Signature>,
395        mode: SubmitMode,
396        context: TxSubmitContext,
397    ) -> Result<SubmitResult, SubmitError> {
398        self.enforce_toxic_flow_guards(signature, mode, &context)?;
399        self.enforce_dedupe(signature)?;
400        match mode {
401            SubmitMode::RpcOnly => self.submit_rpc_only(tx_bytes, signature, mode).await,
402            SubmitMode::JitoOnly => self.submit_jito_only(tx_bytes, signature, mode).await,
403            SubmitMode::DirectOnly => self.submit_direct_only(tx_bytes, signature, mode).await,
404            SubmitMode::Hybrid => self.submit_hybrid(tx_bytes, signature, mode).await,
405        }
406    }
407
408    /// Applies signature dedupe policy.
409    fn enforce_dedupe(&mut self, signature: Option<Signature>) -> Result<(), SubmitError> {
410        if let Some(signature) = signature {
411            let now = Instant::now();
412            if !self.deduper.check_and_insert(signature, now) {
413                return Err(SubmitError::DuplicateSignature);
414            }
415        }
416        Ok(())
417    }
418
419    /// Applies toxic-flow guard policy before transport.
420    fn enforce_toxic_flow_guards(
421        &mut self,
422        signature: Option<Signature>,
423        mode: SubmitMode,
424        context: &TxSubmitContext,
425    ) -> Result<(), SubmitError> {
426        let now = SystemTime::now();
427        let opportunity_age_ms = context
428            .opportunity_created_at
429            .and_then(|created_at| now.duration_since(created_at).ok())
430            .map(|duration| duration.as_millis().min(u128::from(u64::MAX)) as u64);
431        if let Some(age_ms) = opportunity_age_ms
432            && let Some(max_age) = self.guard_policy.max_opportunity_age
433        {
434            let max_allowed_ms = max_age.as_millis().min(u128::from(u64::MAX)) as u64;
435            if age_ms > max_allowed_ms {
436                return Err(self.reject_with_outcome(
437                    TxToxicFlowRejectionReason::OpportunityStale {
438                        age_ms,
439                        max_allowed_ms,
440                    },
441                    TxSubmitOutcomeKind::RejectedDueToStaleness,
442                    signature,
443                    mode,
444                    None,
445                    opportunity_age_ms,
446                ));
447            }
448        }
449
450        if self.suppression.is_suppressed(
451            &context.suppression_keys,
452            now,
453            self.guard_policy.suppression_ttl,
454        ) {
455            return Err(self.reject_with_outcome(
456                TxToxicFlowRejectionReason::Suppressed,
457                TxSubmitOutcomeKind::Suppressed,
458                signature,
459                mode,
460                None,
461                opportunity_age_ms,
462            ));
463        }
464
465        if let Some(source) = &self.flow_safety_source {
466            let snapshot = source.toxic_flow_snapshot();
467            if self.guard_policy.reject_on_replay_recovery_pending
468                && snapshot.replay_recovery_pending
469            {
470                return Err(self.reject_with_outcome(
471                    TxToxicFlowRejectionReason::ReplayRecoveryPending,
472                    TxSubmitOutcomeKind::RejectedDueToReplayRecovery,
473                    signature,
474                    mode,
475                    snapshot.current_state_version,
476                    opportunity_age_ms,
477                ));
478            }
479            if self.guard_policy.require_stable_control_plane
480                && !matches!(snapshot.quality, TxFlowSafetyQuality::Stable)
481            {
482                let outcome_kind = match snapshot.quality {
483                    TxFlowSafetyQuality::ReorgRisk | TxFlowSafetyQuality::Provisional => {
484                        TxSubmitOutcomeKind::RejectedDueToReorgRisk
485                    }
486                    TxFlowSafetyQuality::Stale => TxSubmitOutcomeKind::RejectedDueToStaleness,
487                    TxFlowSafetyQuality::Degraded
488                    | TxFlowSafetyQuality::IncompleteControlPlane
489                    | TxFlowSafetyQuality::Stable => TxSubmitOutcomeKind::Suppressed,
490                };
491                return Err(self.reject_with_outcome(
492                    TxToxicFlowRejectionReason::UnsafeControlPlane {
493                        quality: snapshot.quality,
494                    },
495                    outcome_kind,
496                    signature,
497                    mode,
498                    snapshot.current_state_version,
499                    opportunity_age_ms,
500                ));
501            }
502            if let (Some(decision_version), Some(current_version), Some(max_allowed)) = (
503                context.decision_state_version,
504                snapshot.current_state_version,
505                self.guard_policy.max_state_version_drift,
506            ) {
507                let drift = current_version.saturating_sub(decision_version);
508                if drift > max_allowed {
509                    return Err(self.reject_with_outcome(
510                        TxToxicFlowRejectionReason::StateDrift { drift, max_allowed },
511                        TxSubmitOutcomeKind::RejectedDueToStateDrift,
512                        signature,
513                        mode,
514                        Some(current_version),
515                        opportunity_age_ms,
516                    ));
517                }
518            }
519        }
520
521        self.suppression.insert_all(&context.suppression_keys, now);
522        Ok(())
523    }
524
525    /// Builds one rejection error while recording telemetry and reporting.
526    fn reject_with_outcome(
527        &self,
528        reason: TxToxicFlowRejectionReason,
529        outcome_kind: TxSubmitOutcomeKind,
530        signature: Option<Signature>,
531        mode: SubmitMode,
532        state_version: Option<u64>,
533        opportunity_age_ms: Option<u64>,
534    ) -> SubmitError {
535        let outcome = TxSubmitOutcome {
536            kind: outcome_kind,
537            signature,
538            mode,
539            state_version,
540            opportunity_age_ms,
541        };
542        self.record_external_outcome(&outcome);
543        SubmitError::ToxicFlow { reason }
544    }
545
546    /// Submits through RPC path only.
547    async fn submit_rpc_only(
548        &self,
549        tx_bytes: Vec<u8>,
550        signature: Option<Signature>,
551        mode: SubmitMode,
552    ) -> Result<SubmitResult, SubmitError> {
553        let rpc = self
554            .rpc_transport
555            .as_ref()
556            .ok_or(SubmitError::MissingRpcTransport)?;
557        let rpc_signature = rpc
558            .submit_rpc(&tx_bytes, &self.rpc_config)
559            .await
560            .map_err(|source| SubmitError::Rpc { source })?;
561        self.record_external_outcome(&TxSubmitOutcome {
562            kind: TxSubmitOutcomeKind::RpcAccepted,
563            signature,
564            mode,
565            state_version: self
566                .flow_safety_source
567                .as_ref()
568                .and_then(|source| source.toxic_flow_snapshot().current_state_version),
569            opportunity_age_ms: None,
570        });
571        Ok(SubmitResult {
572            signature,
573            mode,
574            direct_target: None,
575            rpc_signature: Some(rpc_signature),
576            jito_signature: None,
577            jito_bundle_id: None,
578            used_rpc_fallback: false,
579            selected_target_count: 0,
580            selected_identity_count: 0,
581        })
582    }
583
584    /// Submits through Jito block-engine path only.
585    async fn submit_jito_only(
586        &self,
587        tx_bytes: Vec<u8>,
588        signature: Option<Signature>,
589        mode: SubmitMode,
590    ) -> Result<SubmitResult, SubmitError> {
591        let jito = self
592            .jito_transport
593            .as_ref()
594            .ok_or(SubmitError::MissingJitoTransport)?;
595        let jito_response = jito
596            .submit_jito(&tx_bytes, &self.jito_config)
597            .await
598            .map_err(|source| SubmitError::Jito { source })?;
599        self.record_external_outcome(&TxSubmitOutcome {
600            kind: TxSubmitOutcomeKind::JitoAccepted,
601            signature,
602            mode,
603            state_version: self
604                .flow_safety_source
605                .as_ref()
606                .and_then(|source| source.toxic_flow_snapshot().current_state_version),
607            opportunity_age_ms: None,
608        });
609        Ok(SubmitResult {
610            signature,
611            mode,
612            direct_target: None,
613            rpc_signature: None,
614            jito_signature: jito_response.transaction_signature,
615            jito_bundle_id: jito_response.bundle_id,
616            used_rpc_fallback: false,
617            selected_target_count: 0,
618            selected_identity_count: 0,
619        })
620    }
621
622    /// Submits through direct path only.
623    async fn submit_direct_only(
624        &self,
625        tx_bytes: Vec<u8>,
626        signature: Option<Signature>,
627        mode: SubmitMode,
628    ) -> Result<SubmitResult, SubmitError> {
629        let direct = self
630            .direct_transport
631            .as_ref()
632            .ok_or(SubmitError::MissingDirectTransport)?;
633        let direct_config = self.direct_config.clone().normalized();
634        let mut last_error = None;
635        let attempt_timeout = direct_attempt_timeout(&direct_config);
636
637        for attempt_idx in 0..direct_config.direct_submit_attempts {
638            let mut targets = self.select_direct_targets(&direct_config).await;
639            rotate_targets_for_attempt(&mut targets, attempt_idx, self.policy);
640            let (selected_target_count, selected_identity_count) = summarize_targets(&targets);
641            if targets.is_empty() {
642                return Err(SubmitError::NoDirectTargets);
643            }
644            match timeout(
645                attempt_timeout,
646                direct.submit_direct(&tx_bytes, &targets, self.policy, &direct_config),
647            )
648            .await
649            {
650                Ok(Ok(target)) => {
651                    self.record_external_outcome(&TxSubmitOutcome {
652                        kind: TxSubmitOutcomeKind::DirectAccepted,
653                        signature,
654                        mode,
655                        state_version: self
656                            .flow_safety_source
657                            .as_ref()
658                            .and_then(|source| source.toxic_flow_snapshot().current_state_version),
659                        opportunity_age_ms: None,
660                    });
661                    self.spawn_agave_rebroadcast(Arc::from(tx_bytes), &direct_config);
662                    return Ok(SubmitResult {
663                        signature,
664                        mode,
665                        direct_target: Some(target),
666                        rpc_signature: None,
667                        jito_signature: None,
668                        jito_bundle_id: None,
669                        used_rpc_fallback: false,
670                        selected_target_count,
671                        selected_identity_count,
672                    });
673                }
674                Ok(Err(source)) => last_error = Some(source),
675                Err(_elapsed) => {
676                    last_error = Some(super::SubmitTransportError::Failure {
677                        message: format!(
678                            "direct submit attempt timed out after {}ms",
679                            attempt_timeout.as_millis()
680                        ),
681                    });
682                }
683            }
684            if attempt_idx < direct_config.direct_submit_attempts.saturating_sub(1) {
685                sleep(direct_config.rebroadcast_interval).await;
686            }
687        }
688
689        Err(SubmitError::Direct {
690            source: last_error.unwrap_or_else(|| super::SubmitTransportError::Failure {
691                message: "direct submit attempts exhausted".to_owned(),
692            }),
693        })
694    }
695
696    /// Submits through hybrid mode (direct first, RPC fallback).
697    async fn submit_hybrid(
698        &self,
699        tx_bytes: Vec<u8>,
700        signature: Option<Signature>,
701        mode: SubmitMode,
702    ) -> Result<SubmitResult, SubmitError> {
703        let direct = self
704            .direct_transport
705            .as_ref()
706            .ok_or(SubmitError::MissingDirectTransport)?;
707        let rpc = self
708            .rpc_transport
709            .as_ref()
710            .ok_or(SubmitError::MissingRpcTransport)?;
711
712        let direct_config = self.direct_config.clone().normalized();
713        let attempt_timeout = direct_attempt_timeout(&direct_config);
714        for attempt_idx in 0..direct_config.hybrid_direct_attempts {
715            let mut targets = self.select_direct_targets(&direct_config).await;
716            rotate_targets_for_attempt(&mut targets, attempt_idx, self.policy);
717            let (selected_target_count, selected_identity_count) = summarize_targets(&targets);
718            if targets.is_empty() {
719                break;
720            }
721            if let Ok(Ok(target)) = timeout(
722                attempt_timeout,
723                direct.submit_direct(&tx_bytes, &targets, self.policy, &direct_config),
724            )
725            .await
726            {
727                let tx_bytes = Arc::<[u8]>::from(tx_bytes);
728                self.spawn_agave_rebroadcast(Arc::clone(&tx_bytes), &direct_config);
729                if direct_config.hybrid_rpc_broadcast
730                    && let Ok(rpc_signature) =
731                        rpc.submit_rpc(tx_bytes.as_ref(), &self.rpc_config).await
732                {
733                    self.record_external_outcome(&TxSubmitOutcome {
734                        kind: TxSubmitOutcomeKind::DirectAccepted,
735                        signature,
736                        mode,
737                        state_version: self
738                            .flow_safety_source
739                            .as_ref()
740                            .and_then(|source| source.toxic_flow_snapshot().current_state_version),
741                        opportunity_age_ms: None,
742                    });
743                    return Ok(SubmitResult {
744                        signature,
745                        mode,
746                        direct_target: Some(target),
747                        rpc_signature: Some(rpc_signature),
748                        jito_signature: None,
749                        jito_bundle_id: None,
750                        used_rpc_fallback: false,
751                        selected_target_count,
752                        selected_identity_count,
753                    });
754                }
755                self.record_external_outcome(&TxSubmitOutcome {
756                    kind: TxSubmitOutcomeKind::DirectAccepted,
757                    signature,
758                    mode,
759                    state_version: self
760                        .flow_safety_source
761                        .as_ref()
762                        .and_then(|source| source.toxic_flow_snapshot().current_state_version),
763                    opportunity_age_ms: None,
764                });
765                return Ok(SubmitResult {
766                    signature,
767                    mode,
768                    direct_target: Some(target),
769                    rpc_signature: None,
770                    jito_signature: None,
771                    jito_bundle_id: None,
772                    used_rpc_fallback: false,
773                    selected_target_count,
774                    selected_identity_count,
775                });
776            }
777            if attempt_idx < direct_config.hybrid_direct_attempts.saturating_sub(1) {
778                sleep(direct_config.rebroadcast_interval).await;
779            }
780        }
781
782        let rpc_signature = rpc
783            .submit_rpc(&tx_bytes, &self.rpc_config)
784            .await
785            .map_err(|source| SubmitError::Rpc { source })?;
786        self.record_external_outcome(&TxSubmitOutcome {
787            kind: TxSubmitOutcomeKind::RpcAccepted,
788            signature,
789            mode,
790            state_version: self
791                .flow_safety_source
792                .as_ref()
793                .and_then(|source| source.toxic_flow_snapshot().current_state_version),
794            opportunity_age_ms: None,
795        });
796        Ok(SubmitResult {
797            signature,
798            mode,
799            direct_target: None,
800            rpc_signature: Some(rpc_signature),
801            jito_signature: None,
802            jito_bundle_id: None,
803            used_rpc_fallback: true,
804            selected_target_count: 0,
805            selected_identity_count: 0,
806        })
807    }
808
809    /// Resolves and ranks the direct targets for the next submission attempt.
810    async fn select_direct_targets(&self, direct_config: &DirectSubmitConfig) -> Vec<LeaderTarget> {
811        select_and_rank_targets(
812            self.leader_provider.as_ref(),
813            &self.backups,
814            self.policy,
815            direct_config,
816        )
817        .await
818    }
819
820    /// Starts the post-ack rebroadcast worker when that reliability mode is enabled.
821    fn spawn_agave_rebroadcast(&self, tx_bytes: Arc<[u8]>, direct_config: &DirectSubmitConfig) {
822        if !direct_config.agave_rebroadcast_enabled
823            || direct_config.agave_rebroadcast_window.is_zero()
824        {
825            return;
826        }
827        let Some(direct_transport) = self.direct_transport.clone() else {
828            return;
829        };
830        spawn_agave_rebroadcast_task(
831            tx_bytes,
832            direct_transport,
833            self.leader_provider.clone(),
834            self.backups.clone(),
835            self.policy,
836            direct_config.clone(),
837        );
838    }
839}
840
841#[cfg(not(test))]
842/// Replays successful direct submissions for a bounded Agave-like persistence window.
843fn spawn_agave_rebroadcast_task(
844    tx_bytes: Arc<[u8]>,
845    direct_transport: Arc<dyn DirectSubmitTransport>,
846    leader_provider: Arc<dyn LeaderProvider>,
847    backups: Vec<LeaderTarget>,
848    policy: RoutingPolicy,
849    direct_config: DirectSubmitConfig,
850) {
851    tokio::spawn(async move {
852        let deadline = Instant::now()
853            .checked_add(direct_config.agave_rebroadcast_window)
854            .unwrap_or_else(Instant::now);
855        loop {
856            let now = Instant::now();
857            if now >= deadline {
858                break;
859            }
860
861            let sleep_for = deadline
862                .saturating_duration_since(now)
863                .min(direct_config.agave_rebroadcast_interval);
864            if !sleep_for.is_zero() {
865                sleep(sleep_for).await;
866            }
867
868            if Instant::now() >= deadline {
869                break;
870            }
871
872            let targets = select_and_rank_targets(
873                leader_provider.as_ref(),
874                backups.as_slice(),
875                policy,
876                &direct_config,
877            )
878            .await;
879            if targets.is_empty() {
880                continue;
881            }
882
883            drop(
884                timeout(
885                    direct_attempt_timeout(&direct_config),
886                    direct_transport.submit_direct(
887                        tx_bytes.as_ref(),
888                        &targets,
889                        policy,
890                        &direct_config,
891                    ),
892                )
893                .await,
894            );
895        }
896    });
897}
898
899#[cfg(test)]
900/// Test-only stub that disables background rebroadcasting for deterministic assertions.
901fn spawn_agave_rebroadcast_task(
902    _tx_bytes: Arc<[u8]>,
903    _direct_transport: Arc<dyn DirectSubmitTransport>,
904    _leader_provider: Arc<dyn LeaderProvider>,
905    _backups: Vec<LeaderTarget>,
906    _policy: RoutingPolicy,
907    _direct_config: DirectSubmitConfig,
908) {
909}
910
911/// Selects routing targets and applies optional latency-aware ranking.
912async fn select_and_rank_targets(
913    leader_provider: &(impl LeaderProvider + ?Sized),
914    backups: &[LeaderTarget],
915    policy: RoutingPolicy,
916    direct_config: &DirectSubmitConfig,
917) -> Vec<LeaderTarget> {
918    let targets = select_targets(leader_provider, backups, policy);
919    rank_targets_by_latency(targets, direct_config).await
920}
921
922/// Reorders the probe set by observed TCP connect latency while preserving the tail order.
923async fn rank_targets_by_latency(
924    targets: Vec<LeaderTarget>,
925    direct_config: &DirectSubmitConfig,
926) -> Vec<LeaderTarget> {
927    if targets.len() <= 1 || !direct_config.latency_aware_targeting {
928        return targets;
929    }
930
931    let probe_timeout = direct_config.latency_probe_timeout;
932    let probe_port = direct_config.latency_probe_port;
933    let probe_count = targets
934        .len()
935        .min(direct_config.latency_probe_max_targets.max(1));
936    let mut latencies = vec![None; probe_count];
937    let mut probes = JoinSet::new();
938    for (idx, target) in targets.iter().take(probe_count).cloned().enumerate() {
939        probes.spawn(async move {
940            (
941                idx,
942                probe_target_latency(&target, probe_port, probe_timeout).await,
943            )
944        });
945    }
946    while let Some(result) = probes.join_next().await {
947        if let Ok((idx, latency)) = result
948            && idx < latencies.len()
949            && let Some(slot) = latencies.get_mut(idx)
950        {
951            *slot = latency;
952        }
953    }
954
955    let mut ranked = targets
956        .iter()
957        .take(probe_count)
958        .cloned()
959        .enumerate()
960        .collect::<Vec<_>>();
961    ranked.sort_by_key(|(idx, _target)| {
962        (
963            latencies.get(*idx).copied().flatten().unwrap_or(u128::MAX),
964            *idx,
965        )
966    });
967
968    let mut output = ranked
969        .into_iter()
970        .map(|(_idx, target)| target)
971        .collect::<Vec<_>>();
972    output.extend(targets.iter().skip(probe_count).cloned());
973    output
974}
975
976/// Probes a target's candidate ports and keeps the best observed connect latency.
977async fn probe_target_latency(
978    target: &LeaderTarget,
979    probe_port: Option<u16>,
980    probe_timeout: Duration,
981) -> Option<u128> {
982    let mut ports = vec![target.tpu_addr.port()];
983    if let Some(port) = probe_port
984        && port != target.tpu_addr.port()
985    {
986        ports.push(port);
987    }
988
989    let ip = target.tpu_addr.ip();
990    let mut best = None::<u128>;
991    for port in ports {
992        if let Some(latency) = probe_tcp_latency(ip, port, probe_timeout).await {
993            best = Some(best.map_or(latency, |current| current.min(latency)));
994        }
995    }
996    best
997}
998
999/// Measures one TCP connect attempt and returns elapsed milliseconds on success.
1000async fn probe_tcp_latency(
1001    ip: std::net::IpAddr,
1002    port: u16,
1003    timeout_duration: Duration,
1004) -> Option<u128> {
1005    let start = Instant::now();
1006    let addr = SocketAddr::new(ip, port);
1007    let stream = timeout(timeout_duration, TcpStream::connect(addr))
1008        .await
1009        .ok()?
1010        .ok()?;
1011    drop(stream);
1012    Some(start.elapsed().as_millis())
1013}
1014
1015/// Summarizes the selected target list for observability.
1016fn summarize_targets(targets: &[LeaderTarget]) -> (usize, usize) {
1017    let selected_target_count = targets.len();
1018    let selected_identity_count = targets
1019        .iter()
1020        .filter_map(|target| target.identity)
1021        .collect::<HashSet<_>>()
1022        .len();
1023    (selected_target_count, selected_identity_count)
1024}
1025
1026/// Rotates the target ordering between attempts to spread retries across candidates.
1027fn rotate_targets_for_attempt(
1028    targets: &mut [LeaderTarget],
1029    attempt_idx: usize,
1030    policy: RoutingPolicy,
1031) {
1032    if attempt_idx == 0 || targets.len() <= 1 {
1033        return;
1034    }
1035
1036    let normalized = policy.normalized();
1037    let stride = normalized.max_parallel_sends.max(1);
1038    let rotation = attempt_idx
1039        .saturating_mul(stride)
1040        .checked_rem(targets.len())
1041        .unwrap_or(0);
1042    if rotation > 0 {
1043        targets.rotate_left(rotation);
1044    }
1045}
1046
1047/// Bounds one submit attempt so retry loops cannot hang indefinitely.
1048fn direct_attempt_timeout(direct_config: &DirectSubmitConfig) -> Duration {
1049    direct_config
1050        .global_timeout
1051        .saturating_add(direct_config.per_target_timeout)
1052        .saturating_add(direct_config.rebroadcast_interval)
1053        .max(Duration::from_secs(8))
1054}