Skip to main content

sof_tx/submit/
client.rs

1//! Submission client implementation and mode orchestration.
2
3use std::{
4    collections::HashSet,
5    net::SocketAddr,
6    sync::Arc,
7    time::{Duration, Instant, SystemTime},
8};
9
10use sof_types::SignatureBytes;
11use solana_transaction::versioned::VersionedTransaction;
12use tokio::{
13    net::TcpStream,
14    task::JoinSet,
15    time::{sleep, timeout},
16};
17
18use super::{
19    DirectSubmitConfig, DirectSubmitTransport, JitoSubmitConfig, JitoSubmitTransport,
20    RpcSubmitConfig, RpcSubmitTransport, SignedTx, SubmitError, SubmitMode, SubmitReliability,
21    SubmitResult, SubmitTransportError, TxFlowSafetyQuality, TxFlowSafetySource,
22    TxSubmitClientBuilder, TxSubmitContext, TxSubmitGuardPolicy, TxSubmitOutcome,
23    TxSubmitOutcomeKind, TxSubmitOutcomeReporter, TxToxicFlowRejectionReason, TxToxicFlowTelemetry,
24    TxToxicFlowTelemetrySnapshot,
25};
26use crate::{
27    providers::{
28        LeaderProvider, LeaderTarget, RecentBlockhashProvider, RpcRecentBlockhashProvider,
29        StaticLeaderProvider,
30    },
31    routing::{RoutingPolicy, SignatureDeduper, select_targets},
32    submit::{JsonRpcTransport, types::TxSuppressionCache},
33};
34
35/// Transaction submission client that orchestrates RPC and direct submit modes.
36pub struct TxSubmitClient {
37    /// Blockhash source used by unsigned submit path.
38    blockhash_provider: Arc<dyn RecentBlockhashProvider>,
39    /// Optional RPC-backed blockhash source refreshed on demand before unsigned submit.
40    on_demand_blockhash_provider: Option<Arc<RpcRecentBlockhashProvider>>,
41    /// Leader source used by direct/hybrid paths.
42    leader_provider: Arc<dyn LeaderProvider>,
43    /// Optional backup validator targets.
44    backups: Vec<LeaderTarget>,
45    /// Direct routing policy.
46    policy: RoutingPolicy,
47    /// Signature dedupe window.
48    deduper: SignatureDeduper,
49    /// Optional RPC transport.
50    rpc_transport: Option<Arc<dyn RpcSubmitTransport>>,
51    /// Optional direct transport.
52    direct_transport: Option<Arc<dyn DirectSubmitTransport>>,
53    /// Optional Jito transport.
54    jito_transport: Option<Arc<dyn JitoSubmitTransport>>,
55    /// RPC tuning.
56    rpc_config: RpcSubmitConfig,
57    /// Jito tuning.
58    jito_config: JitoSubmitConfig,
59    /// Direct tuning.
60    direct_config: DirectSubmitConfig,
61    /// Optional toxic-flow guard source.
62    flow_safety_source: Option<Arc<dyn TxFlowSafetySource>>,
63    /// Guard policy applied before submit.
64    guard_policy: TxSubmitGuardPolicy,
65    /// Built-in suppression keys.
66    suppression: TxSuppressionCache,
67    /// Built-in toxic-flow telemetry sink.
68    telemetry: Arc<TxToxicFlowTelemetry>,
69    /// Optional external outcome reporter.
70    outcome_reporter: Option<Arc<dyn TxSubmitOutcomeReporter>>,
71}
72
73impl TxSubmitClient {
74    /// Creates a high-level builder for common submit configurations.
75    #[must_use]
76    pub fn builder() -> TxSubmitClientBuilder {
77        TxSubmitClientBuilder::new()
78    }
79
80    /// Creates a submission client with no transports preconfigured.
81    #[must_use]
82    pub fn new(
83        blockhash_provider: Arc<dyn RecentBlockhashProvider>,
84        leader_provider: Arc<dyn LeaderProvider>,
85    ) -> Self {
86        Self {
87            blockhash_provider,
88            on_demand_blockhash_provider: None,
89            leader_provider,
90            backups: Vec::new(),
91            policy: RoutingPolicy::default(),
92            deduper: SignatureDeduper::new(Duration::from_secs(10)),
93            rpc_transport: None,
94            direct_transport: None,
95            jito_transport: None,
96            rpc_config: RpcSubmitConfig::default(),
97            jito_config: JitoSubmitConfig::default(),
98            direct_config: DirectSubmitConfig::default(),
99            flow_safety_source: None,
100            guard_policy: TxSubmitGuardPolicy::default(),
101            suppression: TxSuppressionCache::default(),
102            telemetry: TxToxicFlowTelemetry::shared(),
103            outcome_reporter: None,
104        }
105    }
106
107    /// Creates a client with an empty leader source for blockhash-only submit paths.
108    #[must_use]
109    pub fn blockhash_only(blockhash_provider: Arc<dyn RecentBlockhashProvider>) -> Self {
110        Self::new(
111            blockhash_provider,
112            Arc::new(StaticLeaderProvider::default()),
113        )
114    }
115
116    /// Creates a client with RPC-backed on-demand blockhash sourcing and no leader routing.
117    ///
118    /// # Errors
119    ///
120    /// Returns [`SubmitTransportError`] when the RPC-backed blockhash provider cannot be created.
121    pub fn blockhash_via_rpc(rpc_url: impl Into<String>) -> Result<Self, SubmitTransportError> {
122        let blockhash_provider = Arc::new(RpcRecentBlockhashProvider::new(rpc_url.into())?);
123        Ok(Self::blockhash_only(blockhash_provider.clone())
124            .with_rpc_blockhash_provider(blockhash_provider))
125    }
126
127    /// Creates an RPC-only client from one RPC URL used for both blockhash and submission.
128    ///
129    /// # Errors
130    ///
131    /// Returns [`SubmitTransportError`] when the RPC transport or blockhash provider
132    /// cannot be initialized.
133    pub fn rpc_only(rpc_url: impl Into<String>) -> Result<Self, SubmitTransportError> {
134        let rpc_url = rpc_url.into();
135        let client = Self::blockhash_via_rpc(rpc_url.clone())?;
136        let rpc_transport = Arc::new(JsonRpcTransport::new(rpc_url)?);
137        Ok(client.with_rpc_transport(rpc_transport))
138    }
139
140    /// Sets optional backup validators.
141    #[must_use]
142    pub fn with_backups(mut self, backups: Vec<LeaderTarget>) -> Self {
143        self.backups = backups;
144        self
145    }
146
147    /// Sets routing policy.
148    #[must_use]
149    pub fn with_routing_policy(mut self, policy: RoutingPolicy) -> Self {
150        self.policy = policy.normalized();
151        self
152    }
153
154    /// Sets dedupe TTL.
155    #[must_use]
156    pub fn with_dedupe_ttl(mut self, ttl: Duration) -> Self {
157        self.deduper = SignatureDeduper::new(ttl);
158        self
159    }
160
161    /// Sets RPC transport.
162    #[must_use]
163    pub fn with_rpc_transport(mut self, transport: Arc<dyn RpcSubmitTransport>) -> Self {
164        self.rpc_transport = Some(transport);
165        self
166    }
167
168    /// Sets direct transport.
169    #[must_use]
170    pub fn with_direct_transport(mut self, transport: Arc<dyn DirectSubmitTransport>) -> Self {
171        self.direct_transport = Some(transport);
172        self
173    }
174
175    /// Sets Jito transport.
176    #[must_use]
177    pub fn with_jito_transport(mut self, transport: Arc<dyn JitoSubmitTransport>) -> Self {
178        self.jito_transport = Some(transport);
179        self
180    }
181
182    /// Sets RPC submit tuning.
183    #[must_use]
184    pub fn with_rpc_config(mut self, config: RpcSubmitConfig) -> Self {
185        self.rpc_config = config;
186        self
187    }
188
189    /// Sets Jito submit tuning.
190    #[must_use]
191    pub const fn with_jito_config(mut self, config: JitoSubmitConfig) -> Self {
192        self.jito_config = config;
193        self
194    }
195
196    /// Sets direct submit tuning.
197    #[must_use]
198    pub const fn with_direct_config(mut self, config: DirectSubmitConfig) -> Self {
199        self.direct_config = config.normalized();
200        self
201    }
202
203    /// Sets direct/hybrid reliability profile.
204    #[must_use]
205    pub const fn with_reliability(mut self, reliability: SubmitReliability) -> Self {
206        self.direct_config = DirectSubmitConfig::from_reliability(reliability);
207        self
208    }
209
210    /// Sets the toxic-flow guard source used before submission.
211    #[must_use]
212    pub fn with_flow_safety_source(mut self, source: Arc<dyn TxFlowSafetySource>) -> Self {
213        self.flow_safety_source = Some(source);
214        self
215    }
216
217    /// Sets the toxic-flow guard policy.
218    #[must_use]
219    pub const fn with_guard_policy(mut self, policy: TxSubmitGuardPolicy) -> Self {
220        self.guard_policy = policy;
221        self
222    }
223
224    /// Sets an optional external outcome reporter.
225    #[must_use]
226    pub fn with_outcome_reporter(mut self, reporter: Arc<dyn TxSubmitOutcomeReporter>) -> Self {
227        self.outcome_reporter = Some(reporter);
228        self
229    }
230
231    /// Registers an RPC-backed blockhash provider to refresh on demand for unsigned submit paths.
232    #[must_use]
233    pub fn with_rpc_blockhash_provider(
234        mut self,
235        provider: Arc<RpcRecentBlockhashProvider>,
236    ) -> Self {
237        self.on_demand_blockhash_provider = Some(provider);
238        self
239    }
240
241    /// Returns the current built-in toxic-flow telemetry snapshot.
242    #[must_use]
243    pub fn toxic_flow_telemetry(&self) -> TxToxicFlowTelemetrySnapshot {
244        self.telemetry.snapshot()
245    }
246
247    /// Records one external terminal outcome against the built-in telemetry and optional reporter.
248    pub fn record_external_outcome(&self, outcome: &TxSubmitOutcome) {
249        self.telemetry.record(outcome);
250        if let Some(reporter) = &self.outcome_reporter {
251            reporter.record_outcome(outcome);
252        }
253    }
254
255    /// Submits externally signed transaction bytes.
256    ///
257    /// # Errors
258    ///
259    /// Returns [`SubmitError`] when decoding, dedupe, routing, or submission fails.
260    pub async fn submit_signed(
261        &mut self,
262        signed_tx: SignedTx,
263        mode: SubmitMode,
264    ) -> Result<SubmitResult, SubmitError> {
265        self.submit_signed_with_context(signed_tx, mode, TxSubmitContext::default())
266            .await
267    }
268
269    /// Submits externally signed transaction bytes with explicit toxic-flow context.
270    ///
271    /// # Errors
272    ///
273    /// Returns [`SubmitError`] when decoding, dedupe, toxic-flow guards, routing, or submission
274    /// fails.
275    pub async fn submit_signed_with_context(
276        &mut self,
277        signed_tx: SignedTx,
278        mode: SubmitMode,
279        context: TxSubmitContext,
280    ) -> Result<SubmitResult, SubmitError> {
281        let tx_bytes = match signed_tx {
282            SignedTx::VersionedTransactionBytes(bytes) => bytes,
283            SignedTx::WireTransactionBytes(bytes) => bytes,
284        };
285        let tx: VersionedTransaction = bincode::deserialize(&tx_bytes)
286            .map_err(|source| SubmitError::DecodeSignedBytes { source })?;
287        let signature = tx
288            .signatures
289            .first()
290            .copied()
291            .map(SignatureBytes::from_solana);
292        self.submit_bytes(tx_bytes, signature, mode, context).await
293    }
294
295    /// Refreshes any configured on-demand RPC blockhash source and returns the latest bytes.
296    ///
297    /// This is intended for explicit compatibility layers that build Solana-native transactions
298    /// outside the core byte-oriented `sof-tx` API surface.
299    ///
300    /// # Errors
301    ///
302    /// Returns [`SubmitTransportError`] when the RPC-backed blockhash refresh fails.
303    pub async fn refresh_latest_blockhash_bytes(
304        &self,
305    ) -> Result<Option<[u8; 32]>, SubmitTransportError> {
306        if let Some(provider) = &self.on_demand_blockhash_provider {
307            let _ = provider.refresh().await?;
308        }
309        Ok(self.latest_blockhash_bytes())
310    }
311
312    /// Returns the latest cached recent blockhash bytes when available.
313    #[must_use]
314    pub fn latest_blockhash_bytes(&self) -> Option<[u8; 32]> {
315        self.blockhash_provider.latest_blockhash()
316    }
317
318    /// Submits raw tx bytes after dedupe check.
319    async fn submit_bytes(
320        &mut self,
321        tx_bytes: Vec<u8>,
322        signature: Option<SignatureBytes>,
323        mode: SubmitMode,
324        context: TxSubmitContext,
325    ) -> Result<SubmitResult, SubmitError> {
326        self.enforce_toxic_flow_guards(signature, mode, &context)?;
327        self.enforce_dedupe(signature)?;
328        match mode {
329            SubmitMode::RpcOnly => self.submit_rpc_only(tx_bytes, signature, mode).await,
330            SubmitMode::JitoOnly => self.submit_jito_only(tx_bytes, signature, mode).await,
331            SubmitMode::DirectOnly => self.submit_direct_only(tx_bytes, signature, mode).await,
332            SubmitMode::Hybrid => self.submit_hybrid(tx_bytes, signature, mode).await,
333        }
334    }
335
336    /// Applies signature dedupe policy.
337    fn enforce_dedupe(&mut self, signature: Option<SignatureBytes>) -> Result<(), SubmitError> {
338        if let Some(signature) = signature {
339            let now = Instant::now();
340            if !self.deduper.check_and_insert(signature, now) {
341                return Err(SubmitError::DuplicateSignature);
342            }
343        }
344        Ok(())
345    }
346
347    /// Applies toxic-flow guard policy before transport.
348    fn enforce_toxic_flow_guards(
349        &mut self,
350        signature: Option<SignatureBytes>,
351        mode: SubmitMode,
352        context: &TxSubmitContext,
353    ) -> Result<(), SubmitError> {
354        let now = SystemTime::now();
355        let opportunity_age_ms = context
356            .opportunity_created_at
357            .and_then(|created_at| now.duration_since(created_at).ok())
358            .map(|duration| duration.as_millis().min(u128::from(u64::MAX)) as u64);
359        if let Some(age_ms) = opportunity_age_ms
360            && let Some(max_age) = self.guard_policy.max_opportunity_age
361        {
362            let max_allowed_ms = max_age.as_millis().min(u128::from(u64::MAX)) as u64;
363            if age_ms > max_allowed_ms {
364                return Err(self.reject_with_outcome(
365                    TxToxicFlowRejectionReason::OpportunityStale {
366                        age_ms,
367                        max_allowed_ms,
368                    },
369                    TxSubmitOutcomeKind::RejectedDueToStaleness,
370                    signature,
371                    mode,
372                    None,
373                    opportunity_age_ms,
374                ));
375            }
376        }
377
378        if self.suppression.is_suppressed(
379            &context.suppression_keys,
380            now,
381            self.guard_policy.suppression_ttl,
382        ) {
383            return Err(self.reject_with_outcome(
384                TxToxicFlowRejectionReason::Suppressed,
385                TxSubmitOutcomeKind::Suppressed,
386                signature,
387                mode,
388                None,
389                opportunity_age_ms,
390            ));
391        }
392
393        if let Some(source) = &self.flow_safety_source {
394            let snapshot = source.toxic_flow_snapshot();
395            if self.guard_policy.reject_on_replay_recovery_pending
396                && snapshot.replay_recovery_pending
397            {
398                return Err(self.reject_with_outcome(
399                    TxToxicFlowRejectionReason::ReplayRecoveryPending,
400                    TxSubmitOutcomeKind::RejectedDueToReplayRecovery,
401                    signature,
402                    mode,
403                    snapshot.current_state_version,
404                    opportunity_age_ms,
405                ));
406            }
407            if self.guard_policy.require_stable_control_plane
408                && !matches!(snapshot.quality, TxFlowSafetyQuality::Stable)
409            {
410                let outcome_kind = match snapshot.quality {
411                    TxFlowSafetyQuality::ReorgRisk | TxFlowSafetyQuality::Provisional => {
412                        TxSubmitOutcomeKind::RejectedDueToReorgRisk
413                    }
414                    TxFlowSafetyQuality::Stale => TxSubmitOutcomeKind::RejectedDueToStaleness,
415                    TxFlowSafetyQuality::Degraded
416                    | TxFlowSafetyQuality::IncompleteControlPlane
417                    | TxFlowSafetyQuality::Stable => TxSubmitOutcomeKind::Suppressed,
418                };
419                return Err(self.reject_with_outcome(
420                    TxToxicFlowRejectionReason::UnsafeControlPlane {
421                        quality: snapshot.quality,
422                    },
423                    outcome_kind,
424                    signature,
425                    mode,
426                    snapshot.current_state_version,
427                    opportunity_age_ms,
428                ));
429            }
430            if let (Some(decision_version), Some(current_version), Some(max_allowed)) = (
431                context.decision_state_version,
432                snapshot.current_state_version,
433                self.guard_policy.max_state_version_drift,
434            ) {
435                let drift = current_version.saturating_sub(decision_version);
436                if drift > max_allowed {
437                    return Err(self.reject_with_outcome(
438                        TxToxicFlowRejectionReason::StateDrift { drift, max_allowed },
439                        TxSubmitOutcomeKind::RejectedDueToStateDrift,
440                        signature,
441                        mode,
442                        Some(current_version),
443                        opportunity_age_ms,
444                    ));
445                }
446            }
447        }
448
449        self.suppression.insert_all(&context.suppression_keys, now);
450        Ok(())
451    }
452
453    /// Builds one rejection error while recording telemetry and reporting.
454    fn reject_with_outcome(
455        &self,
456        reason: TxToxicFlowRejectionReason,
457        outcome_kind: TxSubmitOutcomeKind,
458        signature: Option<SignatureBytes>,
459        mode: SubmitMode,
460        state_version: Option<u64>,
461        opportunity_age_ms: Option<u64>,
462    ) -> SubmitError {
463        let outcome = TxSubmitOutcome {
464            kind: outcome_kind,
465            signature,
466            mode,
467            state_version,
468            opportunity_age_ms,
469        };
470        self.record_external_outcome(&outcome);
471        SubmitError::ToxicFlow { reason }
472    }
473
474    /// Submits through RPC path only.
475    async fn submit_rpc_only(
476        &self,
477        tx_bytes: Vec<u8>,
478        signature: Option<SignatureBytes>,
479        mode: SubmitMode,
480    ) -> Result<SubmitResult, SubmitError> {
481        let rpc = self
482            .rpc_transport
483            .as_ref()
484            .ok_or(SubmitError::MissingRpcTransport)?;
485        let rpc_signature = rpc
486            .submit_rpc(&tx_bytes, &self.rpc_config)
487            .await
488            .map_err(|source| SubmitError::Rpc { source })?;
489        self.record_external_outcome(&TxSubmitOutcome {
490            kind: TxSubmitOutcomeKind::RpcAccepted,
491            signature,
492            mode,
493            state_version: self
494                .flow_safety_source
495                .as_ref()
496                .and_then(|source| source.toxic_flow_snapshot().current_state_version),
497            opportunity_age_ms: None,
498        });
499        Ok(SubmitResult {
500            signature,
501            mode,
502            direct_target: None,
503            rpc_signature: Some(rpc_signature),
504            jito_signature: None,
505            jito_bundle_id: None,
506            used_rpc_fallback: false,
507            selected_target_count: 0,
508            selected_identity_count: 0,
509        })
510    }
511
512    /// Submits through Jito block-engine path only.
513    async fn submit_jito_only(
514        &self,
515        tx_bytes: Vec<u8>,
516        signature: Option<SignatureBytes>,
517        mode: SubmitMode,
518    ) -> Result<SubmitResult, SubmitError> {
519        let jito = self
520            .jito_transport
521            .as_ref()
522            .ok_or(SubmitError::MissingJitoTransport)?;
523        let jito_response = jito
524            .submit_jito(&tx_bytes, &self.jito_config)
525            .await
526            .map_err(|source| SubmitError::Jito { source })?;
527        self.record_external_outcome(&TxSubmitOutcome {
528            kind: TxSubmitOutcomeKind::JitoAccepted,
529            signature,
530            mode,
531            state_version: self
532                .flow_safety_source
533                .as_ref()
534                .and_then(|source| source.toxic_flow_snapshot().current_state_version),
535            opportunity_age_ms: None,
536        });
537        Ok(SubmitResult {
538            signature,
539            mode,
540            direct_target: None,
541            rpc_signature: None,
542            jito_signature: jito_response.transaction_signature,
543            jito_bundle_id: jito_response.bundle_id,
544            used_rpc_fallback: false,
545            selected_target_count: 0,
546            selected_identity_count: 0,
547        })
548    }
549
550    /// Submits through direct path only.
551    async fn submit_direct_only(
552        &self,
553        tx_bytes: Vec<u8>,
554        signature: Option<SignatureBytes>,
555        mode: SubmitMode,
556    ) -> Result<SubmitResult, SubmitError> {
557        let direct = self
558            .direct_transport
559            .as_ref()
560            .ok_or(SubmitError::MissingDirectTransport)?;
561        let direct_config = self.direct_config.clone().normalized();
562        let mut last_error = None;
563        let attempt_timeout = direct_attempt_timeout(&direct_config);
564
565        for attempt_idx in 0..direct_config.direct_submit_attempts {
566            let mut targets = self.select_direct_targets(&direct_config).await;
567            rotate_targets_for_attempt(&mut targets, attempt_idx, self.policy);
568            let (selected_target_count, selected_identity_count) = summarize_targets(&targets);
569            if targets.is_empty() {
570                return Err(SubmitError::NoDirectTargets);
571            }
572            match timeout(
573                attempt_timeout,
574                direct.submit_direct(&tx_bytes, &targets, self.policy, &direct_config),
575            )
576            .await
577            {
578                Ok(Ok(target)) => {
579                    self.record_external_outcome(&TxSubmitOutcome {
580                        kind: TxSubmitOutcomeKind::DirectAccepted,
581                        signature,
582                        mode,
583                        state_version: self
584                            .flow_safety_source
585                            .as_ref()
586                            .and_then(|source| source.toxic_flow_snapshot().current_state_version),
587                        opportunity_age_ms: None,
588                    });
589                    self.spawn_agave_rebroadcast(Arc::from(tx_bytes), &direct_config);
590                    return Ok(SubmitResult {
591                        signature,
592                        mode,
593                        direct_target: Some(target),
594                        rpc_signature: None,
595                        jito_signature: None,
596                        jito_bundle_id: None,
597                        used_rpc_fallback: false,
598                        selected_target_count,
599                        selected_identity_count,
600                    });
601                }
602                Ok(Err(source)) => last_error = Some(source),
603                Err(_elapsed) => {
604                    last_error = Some(super::SubmitTransportError::Failure {
605                        message: format!(
606                            "direct submit attempt timed out after {}ms",
607                            attempt_timeout.as_millis()
608                        ),
609                    });
610                }
611            }
612            if attempt_idx < direct_config.direct_submit_attempts.saturating_sub(1) {
613                sleep(direct_config.rebroadcast_interval).await;
614            }
615        }
616
617        Err(SubmitError::Direct {
618            source: last_error.unwrap_or_else(|| super::SubmitTransportError::Failure {
619                message: "direct submit attempts exhausted".to_owned(),
620            }),
621        })
622    }
623
624    /// Submits through hybrid mode (direct first, RPC fallback).
625    async fn submit_hybrid(
626        &self,
627        tx_bytes: Vec<u8>,
628        signature: Option<SignatureBytes>,
629        mode: SubmitMode,
630    ) -> Result<SubmitResult, SubmitError> {
631        let direct = self
632            .direct_transport
633            .as_ref()
634            .ok_or(SubmitError::MissingDirectTransport)?;
635        let rpc = self
636            .rpc_transport
637            .as_ref()
638            .ok_or(SubmitError::MissingRpcTransport)?;
639
640        let direct_config = self.direct_config.clone().normalized();
641        let attempt_timeout = direct_attempt_timeout(&direct_config);
642        for attempt_idx in 0..direct_config.hybrid_direct_attempts {
643            let mut targets = self.select_direct_targets(&direct_config).await;
644            rotate_targets_for_attempt(&mut targets, attempt_idx, self.policy);
645            let (selected_target_count, selected_identity_count) = summarize_targets(&targets);
646            if targets.is_empty() {
647                break;
648            }
649            if let Ok(Ok(target)) = timeout(
650                attempt_timeout,
651                direct.submit_direct(&tx_bytes, &targets, self.policy, &direct_config),
652            )
653            .await
654            {
655                let tx_bytes = Arc::<[u8]>::from(tx_bytes);
656                self.spawn_agave_rebroadcast(Arc::clone(&tx_bytes), &direct_config);
657                if direct_config.hybrid_rpc_broadcast
658                    && let Ok(rpc_signature) =
659                        rpc.submit_rpc(tx_bytes.as_ref(), &self.rpc_config).await
660                {
661                    self.record_external_outcome(&TxSubmitOutcome {
662                        kind: TxSubmitOutcomeKind::DirectAccepted,
663                        signature,
664                        mode,
665                        state_version: self
666                            .flow_safety_source
667                            .as_ref()
668                            .and_then(|source| source.toxic_flow_snapshot().current_state_version),
669                        opportunity_age_ms: None,
670                    });
671                    return Ok(SubmitResult {
672                        signature,
673                        mode,
674                        direct_target: Some(target),
675                        rpc_signature: Some(rpc_signature),
676                        jito_signature: None,
677                        jito_bundle_id: None,
678                        used_rpc_fallback: false,
679                        selected_target_count,
680                        selected_identity_count,
681                    });
682                }
683                self.record_external_outcome(&TxSubmitOutcome {
684                    kind: TxSubmitOutcomeKind::DirectAccepted,
685                    signature,
686                    mode,
687                    state_version: self
688                        .flow_safety_source
689                        .as_ref()
690                        .and_then(|source| source.toxic_flow_snapshot().current_state_version),
691                    opportunity_age_ms: None,
692                });
693                return Ok(SubmitResult {
694                    signature,
695                    mode,
696                    direct_target: Some(target),
697                    rpc_signature: None,
698                    jito_signature: None,
699                    jito_bundle_id: None,
700                    used_rpc_fallback: false,
701                    selected_target_count,
702                    selected_identity_count,
703                });
704            }
705            if attempt_idx < direct_config.hybrid_direct_attempts.saturating_sub(1) {
706                sleep(direct_config.rebroadcast_interval).await;
707            }
708        }
709
710        let rpc_signature = rpc
711            .submit_rpc(&tx_bytes, &self.rpc_config)
712            .await
713            .map_err(|source| SubmitError::Rpc { source })?;
714        self.record_external_outcome(&TxSubmitOutcome {
715            kind: TxSubmitOutcomeKind::RpcAccepted,
716            signature,
717            mode,
718            state_version: self
719                .flow_safety_source
720                .as_ref()
721                .and_then(|source| source.toxic_flow_snapshot().current_state_version),
722            opportunity_age_ms: None,
723        });
724        Ok(SubmitResult {
725            signature,
726            mode,
727            direct_target: None,
728            rpc_signature: Some(rpc_signature),
729            jito_signature: None,
730            jito_bundle_id: None,
731            used_rpc_fallback: true,
732            selected_target_count: 0,
733            selected_identity_count: 0,
734        })
735    }
736
737    /// Resolves and ranks the direct targets for the next submission attempt.
738    async fn select_direct_targets(&self, direct_config: &DirectSubmitConfig) -> Vec<LeaderTarget> {
739        select_and_rank_targets(
740            self.leader_provider.as_ref(),
741            &self.backups,
742            self.policy,
743            direct_config,
744        )
745        .await
746    }
747
748    /// Starts the post-ack rebroadcast worker when that reliability mode is enabled.
749    fn spawn_agave_rebroadcast(&self, tx_bytes: Arc<[u8]>, direct_config: &DirectSubmitConfig) {
750        if !direct_config.agave_rebroadcast_enabled
751            || direct_config.agave_rebroadcast_window.is_zero()
752        {
753            return;
754        }
755        let Some(direct_transport) = self.direct_transport.clone() else {
756            return;
757        };
758        spawn_agave_rebroadcast_task(
759            tx_bytes,
760            direct_transport,
761            self.leader_provider.clone(),
762            self.backups.clone(),
763            self.policy,
764            direct_config.clone(),
765        );
766    }
767}
768
769#[cfg(not(test))]
770/// Replays successful direct submissions for a bounded Agave-like persistence window.
771fn spawn_agave_rebroadcast_task(
772    tx_bytes: Arc<[u8]>,
773    direct_transport: Arc<dyn DirectSubmitTransport>,
774    leader_provider: Arc<dyn LeaderProvider>,
775    backups: Vec<LeaderTarget>,
776    policy: RoutingPolicy,
777    direct_config: DirectSubmitConfig,
778) {
779    tokio::spawn(async move {
780        let deadline = Instant::now()
781            .checked_add(direct_config.agave_rebroadcast_window)
782            .unwrap_or_else(Instant::now);
783        loop {
784            let now = Instant::now();
785            if now >= deadline {
786                break;
787            }
788
789            let sleep_for = deadline
790                .saturating_duration_since(now)
791                .min(direct_config.agave_rebroadcast_interval);
792            if !sleep_for.is_zero() {
793                sleep(sleep_for).await;
794            }
795
796            if Instant::now() >= deadline {
797                break;
798            }
799
800            let targets = select_and_rank_targets(
801                leader_provider.as_ref(),
802                backups.as_slice(),
803                policy,
804                &direct_config,
805            )
806            .await;
807            if targets.is_empty() {
808                continue;
809            }
810
811            drop(
812                timeout(
813                    direct_attempt_timeout(&direct_config),
814                    direct_transport.submit_direct(
815                        tx_bytes.as_ref(),
816                        &targets,
817                        policy,
818                        &direct_config,
819                    ),
820                )
821                .await,
822            );
823        }
824    });
825}
826
827#[cfg(test)]
828/// Test-only stub that disables background rebroadcasting for deterministic assertions.
829fn spawn_agave_rebroadcast_task(
830    _tx_bytes: Arc<[u8]>,
831    _direct_transport: Arc<dyn DirectSubmitTransport>,
832    _leader_provider: Arc<dyn LeaderProvider>,
833    _backups: Vec<LeaderTarget>,
834    _policy: RoutingPolicy,
835    _direct_config: DirectSubmitConfig,
836) {
837}
838
839/// Selects routing targets and applies optional latency-aware ranking.
840async fn select_and_rank_targets(
841    leader_provider: &(impl LeaderProvider + ?Sized),
842    backups: &[LeaderTarget],
843    policy: RoutingPolicy,
844    direct_config: &DirectSubmitConfig,
845) -> Vec<LeaderTarget> {
846    let targets = select_targets(leader_provider, backups, policy);
847    rank_targets_by_latency(targets, direct_config).await
848}
849
850/// Reorders the probe set by observed TCP connect latency while preserving the tail order.
851async fn rank_targets_by_latency(
852    targets: Vec<LeaderTarget>,
853    direct_config: &DirectSubmitConfig,
854) -> Vec<LeaderTarget> {
855    if targets.len() <= 1 || !direct_config.latency_aware_targeting {
856        return targets;
857    }
858
859    let probe_timeout = direct_config.latency_probe_timeout;
860    let probe_port = direct_config.latency_probe_port;
861    let probe_count = targets
862        .len()
863        .min(direct_config.latency_probe_max_targets.max(1));
864    let mut latencies = vec![None; probe_count];
865    let mut probes = JoinSet::new();
866    for (idx, target) in targets.iter().take(probe_count).cloned().enumerate() {
867        probes.spawn(async move {
868            (
869                idx,
870                probe_target_latency(&target, probe_port, probe_timeout).await,
871            )
872        });
873    }
874    while let Some(result) = probes.join_next().await {
875        if let Ok((idx, latency)) = result
876            && idx < latencies.len()
877            && let Some(slot) = latencies.get_mut(idx)
878        {
879            *slot = latency;
880        }
881    }
882
883    let mut ranked = targets
884        .iter()
885        .take(probe_count)
886        .cloned()
887        .enumerate()
888        .collect::<Vec<_>>();
889    ranked.sort_by_key(|(idx, _target)| {
890        (
891            latencies.get(*idx).copied().flatten().unwrap_or(u128::MAX),
892            *idx,
893        )
894    });
895
896    let mut output = ranked
897        .into_iter()
898        .map(|(_idx, target)| target)
899        .collect::<Vec<_>>();
900    output.extend(targets.iter().skip(probe_count).cloned());
901    output
902}
903
904/// Probes a target's candidate ports and keeps the best observed connect latency.
905async fn probe_target_latency(
906    target: &LeaderTarget,
907    probe_port: Option<u16>,
908    probe_timeout: Duration,
909) -> Option<u128> {
910    let mut ports = vec![target.tpu_addr.port()];
911    if let Some(port) = probe_port
912        && port != target.tpu_addr.port()
913    {
914        ports.push(port);
915    }
916
917    let ip = target.tpu_addr.ip();
918    let mut best = None::<u128>;
919    for port in ports {
920        if let Some(latency) = probe_tcp_latency(ip, port, probe_timeout).await {
921            best = Some(best.map_or(latency, |current| current.min(latency)));
922        }
923    }
924    best
925}
926
927/// Measures one TCP connect attempt and returns elapsed milliseconds on success.
928async fn probe_tcp_latency(
929    ip: std::net::IpAddr,
930    port: u16,
931    timeout_duration: Duration,
932) -> Option<u128> {
933    let start = Instant::now();
934    let addr = SocketAddr::new(ip, port);
935    let stream = timeout(timeout_duration, TcpStream::connect(addr))
936        .await
937        .ok()?
938        .ok()?;
939    drop(stream);
940    Some(start.elapsed().as_millis())
941}
942
943/// Summarizes the selected target list for observability.
944fn summarize_targets(targets: &[LeaderTarget]) -> (usize, usize) {
945    let selected_target_count = targets.len();
946    let selected_identity_count = targets
947        .iter()
948        .filter_map(|target| target.identity)
949        .collect::<HashSet<_>>()
950        .len();
951    (selected_target_count, selected_identity_count)
952}
953
954/// Rotates the target ordering between attempts to spread retries across candidates.
955fn rotate_targets_for_attempt(
956    targets: &mut [LeaderTarget],
957    attempt_idx: usize,
958    policy: RoutingPolicy,
959) {
960    if attempt_idx == 0 || targets.len() <= 1 {
961        return;
962    }
963
964    let normalized = policy.normalized();
965    let stride = normalized.max_parallel_sends.max(1);
966    let rotation = attempt_idx
967        .saturating_mul(stride)
968        .checked_rem(targets.len())
969        .unwrap_or(0);
970    if rotation > 0 {
971        targets.rotate_left(rotation);
972    }
973}
974
975/// Bounds one submit attempt so retry loops cannot hang indefinitely.
976fn direct_attempt_timeout(direct_config: &DirectSubmitConfig) -> Duration {
977    direct_config
978        .global_timeout
979        .saturating_add(direct_config.per_target_timeout)
980        .saturating_add(direct_config.rebroadcast_interval)
981        .max(Duration::from_secs(8))
982}