Skip to main content

sof_tx/submit/
client.rs

1//! Submission client implementation and mode orchestration.
2
3use std::{
4    collections::HashSet,
5    net::SocketAddr,
6    sync::Arc,
7    time::{Duration, Instant, SystemTime},
8};
9
10use solana_signature::Signature;
11use solana_signer::signers::Signers;
12use solana_transaction::versioned::VersionedTransaction;
13use tokio::{
14    net::TcpStream,
15    task::JoinSet,
16    time::{sleep, timeout},
17};
18
19use super::{
20    DirectSubmitConfig, DirectSubmitTransport, JitoSubmitConfig, JitoSubmitTransport,
21    RpcSubmitConfig, RpcSubmitTransport, SignedTx, SubmitError, SubmitMode, SubmitReliability,
22    SubmitResult, TxFlowSafetyQuality, TxFlowSafetySource, TxSubmitContext, TxSubmitGuardPolicy,
23    TxSubmitOutcome, TxSubmitOutcomeKind, TxSubmitOutcomeReporter, TxToxicFlowRejectionReason,
24    TxToxicFlowTelemetry, TxToxicFlowTelemetrySnapshot,
25};
26use crate::{
27    builder::TxBuilder,
28    providers::{LeaderProvider, LeaderTarget, RecentBlockhashProvider},
29    routing::{RoutingPolicy, SignatureDeduper, select_targets},
30    submit::types::TxSuppressionCache,
31};
32
33/// Transaction submission client that orchestrates RPC and direct submit modes.
34pub struct TxSubmitClient {
35    /// Blockhash source used by builder submit path.
36    blockhash_provider: Arc<dyn RecentBlockhashProvider>,
37    /// Leader source used by direct/hybrid paths.
38    leader_provider: Arc<dyn LeaderProvider>,
39    /// Optional backup validator targets.
40    backups: Vec<LeaderTarget>,
41    /// Direct routing policy.
42    policy: RoutingPolicy,
43    /// Signature dedupe window.
44    deduper: SignatureDeduper,
45    /// Optional RPC transport.
46    rpc_transport: Option<Arc<dyn RpcSubmitTransport>>,
47    /// Optional direct transport.
48    direct_transport: Option<Arc<dyn DirectSubmitTransport>>,
49    /// Optional Jito transport.
50    jito_transport: Option<Arc<dyn JitoSubmitTransport>>,
51    /// RPC tuning.
52    rpc_config: RpcSubmitConfig,
53    /// Jito tuning.
54    jito_config: JitoSubmitConfig,
55    /// Direct tuning.
56    direct_config: DirectSubmitConfig,
57    /// Optional toxic-flow guard source.
58    flow_safety_source: Option<Arc<dyn TxFlowSafetySource>>,
59    /// Guard policy applied before submit.
60    guard_policy: TxSubmitGuardPolicy,
61    /// Built-in suppression keys.
62    suppression: TxSuppressionCache,
63    /// Built-in toxic-flow telemetry sink.
64    telemetry: Arc<TxToxicFlowTelemetry>,
65    /// Optional external outcome reporter.
66    outcome_reporter: Option<Arc<dyn TxSubmitOutcomeReporter>>,
67}
68
69impl TxSubmitClient {
70    /// Creates a submission client with no transports preconfigured.
71    #[must_use]
72    pub fn new(
73        blockhash_provider: Arc<dyn RecentBlockhashProvider>,
74        leader_provider: Arc<dyn LeaderProvider>,
75    ) -> Self {
76        Self {
77            blockhash_provider,
78            leader_provider,
79            backups: Vec::new(),
80            policy: RoutingPolicy::default(),
81            deduper: SignatureDeduper::new(Duration::from_secs(10)),
82            rpc_transport: None,
83            direct_transport: None,
84            jito_transport: None,
85            rpc_config: RpcSubmitConfig::default(),
86            jito_config: JitoSubmitConfig::default(),
87            direct_config: DirectSubmitConfig::default(),
88            flow_safety_source: None,
89            guard_policy: TxSubmitGuardPolicy::default(),
90            suppression: TxSuppressionCache::default(),
91            telemetry: TxToxicFlowTelemetry::shared(),
92            outcome_reporter: None,
93        }
94    }
95
96    /// Sets optional backup validators.
97    #[must_use]
98    pub fn with_backups(mut self, backups: Vec<LeaderTarget>) -> Self {
99        self.backups = backups;
100        self
101    }
102
103    /// Sets routing policy.
104    #[must_use]
105    pub fn with_routing_policy(mut self, policy: RoutingPolicy) -> Self {
106        self.policy = policy.normalized();
107        self
108    }
109
110    /// Sets dedupe TTL.
111    #[must_use]
112    pub fn with_dedupe_ttl(mut self, ttl: Duration) -> Self {
113        self.deduper = SignatureDeduper::new(ttl);
114        self
115    }
116
117    /// Sets RPC transport.
118    #[must_use]
119    pub fn with_rpc_transport(mut self, transport: Arc<dyn RpcSubmitTransport>) -> Self {
120        self.rpc_transport = Some(transport);
121        self
122    }
123
124    /// Sets direct transport.
125    #[must_use]
126    pub fn with_direct_transport(mut self, transport: Arc<dyn DirectSubmitTransport>) -> Self {
127        self.direct_transport = Some(transport);
128        self
129    }
130
131    /// Sets Jito transport.
132    #[must_use]
133    pub fn with_jito_transport(mut self, transport: Arc<dyn JitoSubmitTransport>) -> Self {
134        self.jito_transport = Some(transport);
135        self
136    }
137
138    /// Sets RPC submit tuning.
139    #[must_use]
140    pub fn with_rpc_config(mut self, config: RpcSubmitConfig) -> Self {
141        self.rpc_config = config;
142        self
143    }
144
145    /// Sets Jito submit tuning.
146    #[must_use]
147    pub const fn with_jito_config(mut self, config: JitoSubmitConfig) -> Self {
148        self.jito_config = config;
149        self
150    }
151
152    /// Sets direct submit tuning.
153    #[must_use]
154    pub const fn with_direct_config(mut self, config: DirectSubmitConfig) -> Self {
155        self.direct_config = config.normalized();
156        self
157    }
158
159    /// Sets direct/hybrid reliability profile.
160    #[must_use]
161    pub const fn with_reliability(mut self, reliability: SubmitReliability) -> Self {
162        self.direct_config = DirectSubmitConfig::from_reliability(reliability);
163        self
164    }
165
166    /// Sets the toxic-flow guard source used before submission.
167    #[must_use]
168    pub fn with_flow_safety_source(mut self, source: Arc<dyn TxFlowSafetySource>) -> Self {
169        self.flow_safety_source = Some(source);
170        self
171    }
172
173    /// Sets the toxic-flow guard policy.
174    #[must_use]
175    pub const fn with_guard_policy(mut self, policy: TxSubmitGuardPolicy) -> Self {
176        self.guard_policy = policy;
177        self
178    }
179
180    /// Sets an optional external outcome reporter.
181    #[must_use]
182    pub fn with_outcome_reporter(mut self, reporter: Arc<dyn TxSubmitOutcomeReporter>) -> Self {
183        self.outcome_reporter = Some(reporter);
184        self
185    }
186
187    /// Returns the current built-in toxic-flow telemetry snapshot.
188    #[must_use]
189    pub fn toxic_flow_telemetry(&self) -> TxToxicFlowTelemetrySnapshot {
190        self.telemetry.snapshot()
191    }
192
193    /// Records one external terminal outcome against the built-in telemetry and optional reporter.
194    pub fn record_external_outcome(&self, outcome: &TxSubmitOutcome) {
195        self.telemetry.record(outcome);
196        if let Some(reporter) = &self.outcome_reporter {
197            reporter.record_outcome(outcome);
198        }
199    }
200
201    /// Builds, signs, and submits a transaction in one API call.
202    ///
203    /// # Errors
204    ///
205    /// Returns [`SubmitError`] when blockhash lookup, signing, dedupe, routing, or submission
206    /// fails.
207    pub async fn submit_builder<T>(
208        &mut self,
209        builder: TxBuilder,
210        signers: &T,
211        mode: SubmitMode,
212    ) -> Result<SubmitResult, SubmitError>
213    where
214        T: Signers + ?Sized,
215    {
216        let blockhash = self
217            .blockhash_provider
218            .latest_blockhash()
219            .ok_or(SubmitError::MissingRecentBlockhash)?;
220        let tx = builder
221            .build_and_sign(blockhash, signers)
222            .map_err(|source| SubmitError::Build { source })?;
223        self.submit_transaction_with_context(tx, mode, TxSubmitContext::default())
224            .await
225    }
226
227    /// Builds, signs, and submits a transaction with explicit toxic-flow context.
228    ///
229    /// # Errors
230    ///
231    /// Returns [`SubmitError`] when blockhash lookup, signing, dedupe, toxic-flow guards,
232    /// routing, or submission fails.
233    pub async fn submit_builder_with_context<T>(
234        &mut self,
235        builder: TxBuilder,
236        signers: &T,
237        mode: SubmitMode,
238        context: TxSubmitContext,
239    ) -> Result<SubmitResult, SubmitError>
240    where
241        T: Signers + ?Sized,
242    {
243        let blockhash = self
244            .blockhash_provider
245            .latest_blockhash()
246            .ok_or(SubmitError::MissingRecentBlockhash)?;
247        let tx = builder
248            .build_and_sign(blockhash, signers)
249            .map_err(|source| SubmitError::Build { source })?;
250        self.submit_transaction_with_context(tx, mode, context)
251            .await
252    }
253
254    /// Submits one signed `VersionedTransaction`.
255    ///
256    /// # Errors
257    ///
258    /// Returns [`SubmitError`] when encoding, dedupe, routing, or submission fails.
259    pub async fn submit_transaction(
260        &mut self,
261        tx: VersionedTransaction,
262        mode: SubmitMode,
263    ) -> Result<SubmitResult, SubmitError> {
264        self.submit_transaction_with_context(tx, mode, TxSubmitContext::default())
265            .await
266    }
267
268    /// Submits one signed `VersionedTransaction` with explicit toxic-flow context.
269    ///
270    /// # Errors
271    ///
272    /// Returns [`SubmitError`] when encoding, dedupe, toxic-flow guards, routing, or submission
273    /// fails.
274    pub async fn submit_transaction_with_context(
275        &mut self,
276        tx: VersionedTransaction,
277        mode: SubmitMode,
278        context: TxSubmitContext,
279    ) -> Result<SubmitResult, SubmitError> {
280        let signature = tx.signatures.first().copied();
281        let tx_bytes =
282            bincode::serialize(&tx).map_err(|source| SubmitError::DecodeSignedBytes { source })?;
283        self.submit_bytes(tx_bytes, signature, mode, context).await
284    }
285
286    /// Submits externally signed transaction bytes.
287    ///
288    /// # Errors
289    ///
290    /// Returns [`SubmitError`] when decoding, dedupe, routing, or submission fails.
291    pub async fn submit_signed(
292        &mut self,
293        signed_tx: SignedTx,
294        mode: SubmitMode,
295    ) -> Result<SubmitResult, SubmitError> {
296        self.submit_signed_with_context(signed_tx, mode, TxSubmitContext::default())
297            .await
298    }
299
300    /// Submits externally signed transaction bytes with explicit toxic-flow context.
301    ///
302    /// # Errors
303    ///
304    /// Returns [`SubmitError`] when decoding, dedupe, toxic-flow guards, routing, or submission
305    /// fails.
306    pub async fn submit_signed_with_context(
307        &mut self,
308        signed_tx: SignedTx,
309        mode: SubmitMode,
310        context: TxSubmitContext,
311    ) -> Result<SubmitResult, SubmitError> {
312        let tx_bytes = match signed_tx {
313            SignedTx::VersionedTransactionBytes(bytes) => bytes,
314            SignedTx::WireTransactionBytes(bytes) => bytes,
315        };
316        let tx: VersionedTransaction = bincode::deserialize(&tx_bytes)
317            .map_err(|source| SubmitError::DecodeSignedBytes { source })?;
318        let signature = tx.signatures.first().copied();
319        self.submit_bytes(tx_bytes, signature, mode, context).await
320    }
321
322    /// Submits raw tx bytes after dedupe check.
323    async fn submit_bytes(
324        &mut self,
325        tx_bytes: Vec<u8>,
326        signature: Option<Signature>,
327        mode: SubmitMode,
328        context: TxSubmitContext,
329    ) -> Result<SubmitResult, SubmitError> {
330        self.enforce_toxic_flow_guards(signature, mode, &context)?;
331        self.enforce_dedupe(signature)?;
332        match mode {
333            SubmitMode::RpcOnly => self.submit_rpc_only(tx_bytes, signature, mode).await,
334            SubmitMode::JitoOnly => self.submit_jito_only(tx_bytes, signature, mode).await,
335            SubmitMode::DirectOnly => self.submit_direct_only(tx_bytes, signature, mode).await,
336            SubmitMode::Hybrid => self.submit_hybrid(tx_bytes, signature, mode).await,
337        }
338    }
339
340    /// Applies signature dedupe policy.
341    fn enforce_dedupe(&mut self, signature: Option<Signature>) -> Result<(), SubmitError> {
342        if let Some(signature) = signature {
343            let now = Instant::now();
344            if !self.deduper.check_and_insert(signature, now) {
345                return Err(SubmitError::DuplicateSignature);
346            }
347        }
348        Ok(())
349    }
350
351    /// Applies toxic-flow guard policy before transport.
352    fn enforce_toxic_flow_guards(
353        &mut self,
354        signature: Option<Signature>,
355        mode: SubmitMode,
356        context: &TxSubmitContext,
357    ) -> Result<(), SubmitError> {
358        let now = SystemTime::now();
359        let opportunity_age_ms = context
360            .opportunity_created_at
361            .and_then(|created_at| now.duration_since(created_at).ok())
362            .map(|duration| duration.as_millis().min(u128::from(u64::MAX)) as u64);
363        if let Some(age_ms) = opportunity_age_ms
364            && let Some(max_age) = self.guard_policy.max_opportunity_age
365        {
366            let max_allowed_ms = max_age.as_millis().min(u128::from(u64::MAX)) as u64;
367            if age_ms > max_allowed_ms {
368                return Err(self.reject_with_outcome(
369                    TxToxicFlowRejectionReason::OpportunityStale {
370                        age_ms,
371                        max_allowed_ms,
372                    },
373                    TxSubmitOutcomeKind::RejectedDueToStaleness,
374                    signature,
375                    mode,
376                    None,
377                    opportunity_age_ms,
378                ));
379            }
380        }
381
382        if self.suppression.is_suppressed(
383            &context.suppression_keys,
384            now,
385            self.guard_policy.suppression_ttl,
386        ) {
387            return Err(self.reject_with_outcome(
388                TxToxicFlowRejectionReason::Suppressed,
389                TxSubmitOutcomeKind::Suppressed,
390                signature,
391                mode,
392                None,
393                opportunity_age_ms,
394            ));
395        }
396
397        if let Some(source) = &self.flow_safety_source {
398            let snapshot = source.toxic_flow_snapshot();
399            if self.guard_policy.reject_on_replay_recovery_pending
400                && snapshot.replay_recovery_pending
401            {
402                return Err(self.reject_with_outcome(
403                    TxToxicFlowRejectionReason::ReplayRecoveryPending,
404                    TxSubmitOutcomeKind::RejectedDueToReplayRecovery,
405                    signature,
406                    mode,
407                    snapshot.current_state_version,
408                    opportunity_age_ms,
409                ));
410            }
411            if self.guard_policy.require_stable_control_plane
412                && !matches!(snapshot.quality, TxFlowSafetyQuality::Stable)
413            {
414                let outcome_kind = match snapshot.quality {
415                    TxFlowSafetyQuality::ReorgRisk | TxFlowSafetyQuality::Provisional => {
416                        TxSubmitOutcomeKind::RejectedDueToReorgRisk
417                    }
418                    TxFlowSafetyQuality::Stale => TxSubmitOutcomeKind::RejectedDueToStaleness,
419                    TxFlowSafetyQuality::Degraded
420                    | TxFlowSafetyQuality::IncompleteControlPlane
421                    | TxFlowSafetyQuality::Stable => TxSubmitOutcomeKind::Suppressed,
422                };
423                return Err(self.reject_with_outcome(
424                    TxToxicFlowRejectionReason::UnsafeControlPlane {
425                        quality: snapshot.quality,
426                    },
427                    outcome_kind,
428                    signature,
429                    mode,
430                    snapshot.current_state_version,
431                    opportunity_age_ms,
432                ));
433            }
434            if let (Some(decision_version), Some(current_version), Some(max_allowed)) = (
435                context.decision_state_version,
436                snapshot.current_state_version,
437                self.guard_policy.max_state_version_drift,
438            ) {
439                let drift = current_version.saturating_sub(decision_version);
440                if drift > max_allowed {
441                    return Err(self.reject_with_outcome(
442                        TxToxicFlowRejectionReason::StateDrift { drift, max_allowed },
443                        TxSubmitOutcomeKind::RejectedDueToStateDrift,
444                        signature,
445                        mode,
446                        Some(current_version),
447                        opportunity_age_ms,
448                    ));
449                }
450            }
451        }
452
453        self.suppression.insert_all(&context.suppression_keys, now);
454        Ok(())
455    }
456
457    /// Builds one rejection error while recording telemetry and reporting.
458    fn reject_with_outcome(
459        &self,
460        reason: TxToxicFlowRejectionReason,
461        outcome_kind: TxSubmitOutcomeKind,
462        signature: Option<Signature>,
463        mode: SubmitMode,
464        state_version: Option<u64>,
465        opportunity_age_ms: Option<u64>,
466    ) -> SubmitError {
467        let outcome = TxSubmitOutcome {
468            kind: outcome_kind,
469            signature,
470            mode,
471            state_version,
472            opportunity_age_ms,
473        };
474        self.record_external_outcome(&outcome);
475        SubmitError::ToxicFlow { reason }
476    }
477
478    /// Submits through RPC path only.
479    async fn submit_rpc_only(
480        &self,
481        tx_bytes: Vec<u8>,
482        signature: Option<Signature>,
483        mode: SubmitMode,
484    ) -> Result<SubmitResult, SubmitError> {
485        let rpc = self
486            .rpc_transport
487            .as_ref()
488            .ok_or(SubmitError::MissingRpcTransport)?;
489        let rpc_signature = rpc
490            .submit_rpc(&tx_bytes, &self.rpc_config)
491            .await
492            .map_err(|source| SubmitError::Rpc { source })?;
493        self.record_external_outcome(&TxSubmitOutcome {
494            kind: TxSubmitOutcomeKind::RpcAccepted,
495            signature,
496            mode,
497            state_version: self
498                .flow_safety_source
499                .as_ref()
500                .and_then(|source| source.toxic_flow_snapshot().current_state_version),
501            opportunity_age_ms: None,
502        });
503        Ok(SubmitResult {
504            signature,
505            mode,
506            direct_target: None,
507            rpc_signature: Some(rpc_signature),
508            jito_signature: None,
509            used_rpc_fallback: false,
510            selected_target_count: 0,
511            selected_identity_count: 0,
512        })
513    }
514
515    /// Submits through Jito block-engine path only.
516    async fn submit_jito_only(
517        &self,
518        tx_bytes: Vec<u8>,
519        signature: Option<Signature>,
520        mode: SubmitMode,
521    ) -> Result<SubmitResult, SubmitError> {
522        let jito = self
523            .jito_transport
524            .as_ref()
525            .ok_or(SubmitError::MissingJitoTransport)?;
526        let jito_signature = jito
527            .submit_jito(&tx_bytes, &self.jito_config)
528            .await
529            .map_err(|source| SubmitError::Jito { source })?;
530        self.record_external_outcome(&TxSubmitOutcome {
531            kind: TxSubmitOutcomeKind::JitoAccepted,
532            signature,
533            mode,
534            state_version: self
535                .flow_safety_source
536                .as_ref()
537                .and_then(|source| source.toxic_flow_snapshot().current_state_version),
538            opportunity_age_ms: None,
539        });
540        Ok(SubmitResult {
541            signature,
542            mode,
543            direct_target: None,
544            rpc_signature: None,
545            jito_signature: Some(jito_signature),
546            used_rpc_fallback: false,
547            selected_target_count: 0,
548            selected_identity_count: 0,
549        })
550    }
551
552    /// Submits through direct path only.
553    async fn submit_direct_only(
554        &self,
555        tx_bytes: Vec<u8>,
556        signature: Option<Signature>,
557        mode: SubmitMode,
558    ) -> Result<SubmitResult, SubmitError> {
559        let direct = self
560            .direct_transport
561            .as_ref()
562            .ok_or(SubmitError::MissingDirectTransport)?;
563        let direct_config = self.direct_config.clone().normalized();
564        let mut last_error = None;
565        let attempt_timeout = direct_attempt_timeout(&direct_config);
566
567        for attempt_idx in 0..direct_config.direct_submit_attempts {
568            let mut targets = self.select_direct_targets(&direct_config).await;
569            rotate_targets_for_attempt(&mut targets, attempt_idx, self.policy);
570            let (selected_target_count, selected_identity_count) = summarize_targets(&targets);
571            if targets.is_empty() {
572                return Err(SubmitError::NoDirectTargets);
573            }
574            match timeout(
575                attempt_timeout,
576                direct.submit_direct(&tx_bytes, &targets, self.policy, &direct_config),
577            )
578            .await
579            {
580                Ok(Ok(target)) => {
581                    self.record_external_outcome(&TxSubmitOutcome {
582                        kind: TxSubmitOutcomeKind::DirectAccepted,
583                        signature,
584                        mode,
585                        state_version: self
586                            .flow_safety_source
587                            .as_ref()
588                            .and_then(|source| source.toxic_flow_snapshot().current_state_version),
589                        opportunity_age_ms: None,
590                    });
591                    self.spawn_agave_rebroadcast(Arc::from(tx_bytes), &direct_config);
592                    return Ok(SubmitResult {
593                        signature,
594                        mode,
595                        direct_target: Some(target),
596                        rpc_signature: None,
597                        jito_signature: None,
598                        used_rpc_fallback: false,
599                        selected_target_count,
600                        selected_identity_count,
601                    });
602                }
603                Ok(Err(source)) => last_error = Some(source),
604                Err(_elapsed) => {
605                    last_error = Some(super::SubmitTransportError::Failure {
606                        message: format!(
607                            "direct submit attempt timed out after {}ms",
608                            attempt_timeout.as_millis()
609                        ),
610                    });
611                }
612            }
613            if attempt_idx < direct_config.direct_submit_attempts.saturating_sub(1) {
614                sleep(direct_config.rebroadcast_interval).await;
615            }
616        }
617
618        Err(SubmitError::Direct {
619            source: last_error.unwrap_or_else(|| super::SubmitTransportError::Failure {
620                message: "direct submit attempts exhausted".to_owned(),
621            }),
622        })
623    }
624
625    /// Submits through hybrid mode (direct first, RPC fallback).
626    async fn submit_hybrid(
627        &self,
628        tx_bytes: Vec<u8>,
629        signature: Option<Signature>,
630        mode: SubmitMode,
631    ) -> Result<SubmitResult, SubmitError> {
632        let direct = self
633            .direct_transport
634            .as_ref()
635            .ok_or(SubmitError::MissingDirectTransport)?;
636        let rpc = self
637            .rpc_transport
638            .as_ref()
639            .ok_or(SubmitError::MissingRpcTransport)?;
640
641        let direct_config = self.direct_config.clone().normalized();
642        let attempt_timeout = direct_attempt_timeout(&direct_config);
643        for attempt_idx in 0..direct_config.hybrid_direct_attempts {
644            let mut targets = self.select_direct_targets(&direct_config).await;
645            rotate_targets_for_attempt(&mut targets, attempt_idx, self.policy);
646            let (selected_target_count, selected_identity_count) = summarize_targets(&targets);
647            if targets.is_empty() {
648                break;
649            }
650            if let Ok(Ok(target)) = timeout(
651                attempt_timeout,
652                direct.submit_direct(&tx_bytes, &targets, self.policy, &direct_config),
653            )
654            .await
655            {
656                let tx_bytes = Arc::<[u8]>::from(tx_bytes);
657                self.spawn_agave_rebroadcast(Arc::clone(&tx_bytes), &direct_config);
658                if direct_config.hybrid_rpc_broadcast
659                    && let Ok(rpc_signature) =
660                        rpc.submit_rpc(tx_bytes.as_ref(), &self.rpc_config).await
661                {
662                    self.record_external_outcome(&TxSubmitOutcome {
663                        kind: TxSubmitOutcomeKind::DirectAccepted,
664                        signature,
665                        mode,
666                        state_version: self
667                            .flow_safety_source
668                            .as_ref()
669                            .and_then(|source| source.toxic_flow_snapshot().current_state_version),
670                        opportunity_age_ms: None,
671                    });
672                    return Ok(SubmitResult {
673                        signature,
674                        mode,
675                        direct_target: Some(target),
676                        rpc_signature: Some(rpc_signature),
677                        jito_signature: None,
678                        used_rpc_fallback: false,
679                        selected_target_count,
680                        selected_identity_count,
681                    });
682                }
683                self.record_external_outcome(&TxSubmitOutcome {
684                    kind: TxSubmitOutcomeKind::DirectAccepted,
685                    signature,
686                    mode,
687                    state_version: self
688                        .flow_safety_source
689                        .as_ref()
690                        .and_then(|source| source.toxic_flow_snapshot().current_state_version),
691                    opportunity_age_ms: None,
692                });
693                return Ok(SubmitResult {
694                    signature,
695                    mode,
696                    direct_target: Some(target),
697                    rpc_signature: None,
698                    jito_signature: None,
699                    used_rpc_fallback: false,
700                    selected_target_count,
701                    selected_identity_count,
702                });
703            }
704            if attempt_idx < direct_config.hybrid_direct_attempts.saturating_sub(1) {
705                sleep(direct_config.rebroadcast_interval).await;
706            }
707        }
708
709        let rpc_signature = rpc
710            .submit_rpc(&tx_bytes, &self.rpc_config)
711            .await
712            .map_err(|source| SubmitError::Rpc { source })?;
713        self.record_external_outcome(&TxSubmitOutcome {
714            kind: TxSubmitOutcomeKind::RpcAccepted,
715            signature,
716            mode,
717            state_version: self
718                .flow_safety_source
719                .as_ref()
720                .and_then(|source| source.toxic_flow_snapshot().current_state_version),
721            opportunity_age_ms: None,
722        });
723        Ok(SubmitResult {
724            signature,
725            mode,
726            direct_target: None,
727            rpc_signature: Some(rpc_signature),
728            jito_signature: None,
729            used_rpc_fallback: true,
730            selected_target_count: 0,
731            selected_identity_count: 0,
732        })
733    }
734
735    /// Resolves and ranks the direct targets for the next submission attempt.
736    async fn select_direct_targets(&self, direct_config: &DirectSubmitConfig) -> Vec<LeaderTarget> {
737        select_and_rank_targets(
738            self.leader_provider.as_ref(),
739            &self.backups,
740            self.policy,
741            direct_config,
742        )
743        .await
744    }
745
746    /// Starts the post-ack rebroadcast worker when that reliability mode is enabled.
747    fn spawn_agave_rebroadcast(&self, tx_bytes: Arc<[u8]>, direct_config: &DirectSubmitConfig) {
748        if !direct_config.agave_rebroadcast_enabled
749            || direct_config.agave_rebroadcast_window.is_zero()
750        {
751            return;
752        }
753        let Some(direct_transport) = self.direct_transport.clone() else {
754            return;
755        };
756        spawn_agave_rebroadcast_task(
757            tx_bytes,
758            direct_transport,
759            self.leader_provider.clone(),
760            self.backups.clone(),
761            self.policy,
762            direct_config.clone(),
763        );
764    }
765}
766
767#[cfg(not(test))]
768/// Replays successful direct submissions for a bounded Agave-like persistence window.
769fn spawn_agave_rebroadcast_task(
770    tx_bytes: Arc<[u8]>,
771    direct_transport: Arc<dyn DirectSubmitTransport>,
772    leader_provider: Arc<dyn LeaderProvider>,
773    backups: Vec<LeaderTarget>,
774    policy: RoutingPolicy,
775    direct_config: DirectSubmitConfig,
776) {
777    tokio::spawn(async move {
778        let deadline = Instant::now()
779            .checked_add(direct_config.agave_rebroadcast_window)
780            .unwrap_or_else(Instant::now);
781        loop {
782            let now = Instant::now();
783            if now >= deadline {
784                break;
785            }
786
787            let sleep_for = deadline
788                .saturating_duration_since(now)
789                .min(direct_config.agave_rebroadcast_interval);
790            if !sleep_for.is_zero() {
791                sleep(sleep_for).await;
792            }
793
794            if Instant::now() >= deadline {
795                break;
796            }
797
798            let targets = select_and_rank_targets(
799                leader_provider.as_ref(),
800                backups.as_slice(),
801                policy,
802                &direct_config,
803            )
804            .await;
805            if targets.is_empty() {
806                continue;
807            }
808
809            drop(
810                timeout(
811                    direct_attempt_timeout(&direct_config),
812                    direct_transport.submit_direct(
813                        tx_bytes.as_ref(),
814                        &targets,
815                        policy,
816                        &direct_config,
817                    ),
818                )
819                .await,
820            );
821        }
822    });
823}
824
825#[cfg(test)]
826/// Test-only stub that disables background rebroadcasting for deterministic assertions.
827fn spawn_agave_rebroadcast_task(
828    _tx_bytes: Arc<[u8]>,
829    _direct_transport: Arc<dyn DirectSubmitTransport>,
830    _leader_provider: Arc<dyn LeaderProvider>,
831    _backups: Vec<LeaderTarget>,
832    _policy: RoutingPolicy,
833    _direct_config: DirectSubmitConfig,
834) {
835}
836
837/// Selects routing targets and applies optional latency-aware ranking.
838async fn select_and_rank_targets(
839    leader_provider: &(impl LeaderProvider + ?Sized),
840    backups: &[LeaderTarget],
841    policy: RoutingPolicy,
842    direct_config: &DirectSubmitConfig,
843) -> Vec<LeaderTarget> {
844    let targets = select_targets(leader_provider, backups, policy);
845    rank_targets_by_latency(targets, direct_config).await
846}
847
848/// Reorders the probe set by observed TCP connect latency while preserving the tail order.
849async fn rank_targets_by_latency(
850    targets: Vec<LeaderTarget>,
851    direct_config: &DirectSubmitConfig,
852) -> Vec<LeaderTarget> {
853    if targets.len() <= 1 || !direct_config.latency_aware_targeting {
854        return targets;
855    }
856
857    let probe_timeout = direct_config.latency_probe_timeout;
858    let probe_port = direct_config.latency_probe_port;
859    let probe_count = targets
860        .len()
861        .min(direct_config.latency_probe_max_targets.max(1));
862    let mut latencies = vec![None; probe_count];
863    let mut probes = JoinSet::new();
864    for (idx, target) in targets.iter().take(probe_count).cloned().enumerate() {
865        probes.spawn(async move {
866            (
867                idx,
868                probe_target_latency(&target, probe_port, probe_timeout).await,
869            )
870        });
871    }
872    while let Some(result) = probes.join_next().await {
873        if let Ok((idx, latency)) = result
874            && idx < latencies.len()
875            && let Some(slot) = latencies.get_mut(idx)
876        {
877            *slot = latency;
878        }
879    }
880
881    let mut ranked = targets
882        .iter()
883        .take(probe_count)
884        .cloned()
885        .enumerate()
886        .collect::<Vec<_>>();
887    ranked.sort_by_key(|(idx, _target)| {
888        (
889            latencies.get(*idx).copied().flatten().unwrap_or(u128::MAX),
890            *idx,
891        )
892    });
893
894    let mut output = ranked
895        .into_iter()
896        .map(|(_idx, target)| target)
897        .collect::<Vec<_>>();
898    output.extend(targets.iter().skip(probe_count).cloned());
899    output
900}
901
902/// Probes a target's candidate ports and keeps the best observed connect latency.
903async fn probe_target_latency(
904    target: &LeaderTarget,
905    probe_port: Option<u16>,
906    probe_timeout: Duration,
907) -> Option<u128> {
908    let mut ports = vec![target.tpu_addr.port()];
909    if let Some(port) = probe_port
910        && port != target.tpu_addr.port()
911    {
912        ports.push(port);
913    }
914
915    let ip = target.tpu_addr.ip();
916    let mut best = None::<u128>;
917    for port in ports {
918        if let Some(latency) = probe_tcp_latency(ip, port, probe_timeout).await {
919            best = Some(best.map_or(latency, |current| current.min(latency)));
920        }
921    }
922    best
923}
924
925/// Measures one TCP connect attempt and returns elapsed milliseconds on success.
926async fn probe_tcp_latency(
927    ip: std::net::IpAddr,
928    port: u16,
929    timeout_duration: Duration,
930) -> Option<u128> {
931    let start = Instant::now();
932    let addr = SocketAddr::new(ip, port);
933    let stream = timeout(timeout_duration, TcpStream::connect(addr))
934        .await
935        .ok()?
936        .ok()?;
937    drop(stream);
938    Some(start.elapsed().as_millis())
939}
940
941/// Summarizes the selected target list for observability.
942fn summarize_targets(targets: &[LeaderTarget]) -> (usize, usize) {
943    let selected_target_count = targets.len();
944    let selected_identity_count = targets
945        .iter()
946        .filter_map(|target| target.identity)
947        .collect::<HashSet<_>>()
948        .len();
949    (selected_target_count, selected_identity_count)
950}
951
952/// Rotates the target ordering between attempts to spread retries across candidates.
953fn rotate_targets_for_attempt(
954    targets: &mut [LeaderTarget],
955    attempt_idx: usize,
956    policy: RoutingPolicy,
957) {
958    if attempt_idx == 0 || targets.len() <= 1 {
959        return;
960    }
961
962    let normalized = policy.normalized();
963    let stride = normalized.max_parallel_sends.max(1);
964    let rotation = attempt_idx
965        .saturating_mul(stride)
966        .checked_rem(targets.len())
967        .unwrap_or(0);
968    if rotation > 0 {
969        targets.rotate_left(rotation);
970    }
971}
972
973/// Bounds one submit attempt so retry loops cannot hang indefinitely.
974fn direct_attempt_timeout(direct_config: &DirectSubmitConfig) -> Duration {
975    direct_config
976        .global_timeout
977        .saturating_add(direct_config.per_target_timeout)
978        .saturating_add(direct_config.rebroadcast_interval)
979        .max(Duration::from_secs(8))
980}