Skip to main content

sof_tx/submit/
client.rs

1//! Submission client implementation and mode orchestration.
2
3use std::{
4    collections::HashMap,
5    collections::HashSet,
6    net::SocketAddr,
7    panic::{AssertUnwindSafe, catch_unwind},
8    sync::{
9        Arc, Mutex, OnceLock, Weak,
10        atomic::{AtomicBool, Ordering},
11        mpsc::{self as std_mpsc, SyncSender, TrySendError},
12    },
13    thread,
14    time::{Duration, Instant, SystemTime},
15};
16
17use sof_support::{short_vec::decode_short_u16_len_prefix, time_support::duration_millis_u64};
18use sof_types::SignatureBytes;
19use tokio::{
20    net::TcpStream,
21    sync::mpsc,
22    task::JoinSet,
23    time::{sleep, timeout},
24};
25
26use super::{
27    DirectSubmitConfig, DirectSubmitTransport, JitoSubmitConfig, JitoSubmitTransport,
28    RpcSubmitConfig, RpcSubmitTransport, SignedTx, SubmitError, SubmitMode, SubmitPlan,
29    SubmitReliability, SubmitResult, SubmitRoute, SubmitStrategy, SubmitTransportError,
30    TxFlowSafetyQuality, TxFlowSafetySource, TxSubmitClientBuilder, TxSubmitContext,
31    TxSubmitGuardPolicy, TxSubmitOutcome, TxSubmitOutcomeKind, TxSubmitOutcomeReporter,
32    TxToxicFlowRejectionReason, TxToxicFlowTelemetry, TxToxicFlowTelemetrySnapshot,
33};
34use crate::{
35    providers::{
36        LeaderProvider, LeaderTarget, RecentBlockhashProvider, RpcRecentBlockhashProvider,
37        StaticLeaderProvider,
38    },
39    routing::{RoutingPolicy, SignatureDeduper, select_targets},
40    submit::{JsonRpcTransport, types::TxSuppressionCache},
41};
42
43/// Transaction submission client that orchestrates RPC and direct submit modes.
44pub struct TxSubmitClient {
45    /// Blockhash source used by unsigned submit path.
46    blockhash_provider: Arc<dyn RecentBlockhashProvider>,
47    /// Optional RPC-backed blockhash source refreshed on demand before unsigned submit.
48    on_demand_blockhash_provider: Option<Arc<RpcRecentBlockhashProvider>>,
49    /// Leader source used by direct/hybrid paths.
50    leader_provider: Arc<dyn LeaderProvider>,
51    /// Optional backup validator targets.
52    backups: Vec<LeaderTarget>,
53    /// Direct routing policy.
54    policy: RoutingPolicy,
55    /// Signature dedupe window.
56    deduper: SignatureDeduper,
57    /// Optional RPC transport.
58    rpc_transport: Option<Arc<dyn RpcSubmitTransport>>,
59    /// Optional direct transport.
60    direct_transport: Option<Arc<dyn DirectSubmitTransport>>,
61    /// Optional Jito transport.
62    jito_transport: Option<Arc<dyn JitoSubmitTransport>>,
63    /// RPC tuning.
64    rpc_config: RpcSubmitConfig,
65    /// Jito tuning.
66    jito_config: JitoSubmitConfig,
67    /// Direct tuning.
68    direct_config: DirectSubmitConfig,
69    /// Optional toxic-flow guard source.
70    flow_safety_source: Option<Arc<dyn TxFlowSafetySource>>,
71    /// Guard policy applied before submit.
72    guard_policy: TxSubmitGuardPolicy,
73    /// Built-in suppression keys.
74    suppression: TxSuppressionCache,
75    /// Built-in toxic-flow telemetry sink.
76    telemetry: Arc<TxToxicFlowTelemetry>,
77    /// Optional external outcome reporter handle.
78    outcome_reporter: Option<OutcomeReporterHandle>,
79}
80
81impl TxSubmitClient {
82    /// Creates a high-level builder for common submit configurations.
83    #[must_use]
84    pub fn builder() -> TxSubmitClientBuilder {
85        TxSubmitClientBuilder::new()
86    }
87
88    /// Creates a submission client with no transports preconfigured.
89    #[must_use]
90    pub fn new(
91        blockhash_provider: Arc<dyn RecentBlockhashProvider>,
92        leader_provider: Arc<dyn LeaderProvider>,
93    ) -> Self {
94        Self {
95            blockhash_provider,
96            on_demand_blockhash_provider: None,
97            leader_provider,
98            backups: Vec::new(),
99            policy: RoutingPolicy::default(),
100            deduper: SignatureDeduper::new(Duration::from_secs(10)),
101            rpc_transport: None,
102            direct_transport: None,
103            jito_transport: None,
104            rpc_config: RpcSubmitConfig::default(),
105            jito_config: JitoSubmitConfig::default(),
106            direct_config: DirectSubmitConfig::default(),
107            flow_safety_source: None,
108            guard_policy: TxSubmitGuardPolicy::default(),
109            suppression: TxSuppressionCache::default(),
110            telemetry: TxToxicFlowTelemetry::shared(),
111            outcome_reporter: None,
112        }
113    }
114
115    /// Creates a client with an empty leader source for blockhash-only submit paths.
116    #[must_use]
117    pub fn blockhash_only(blockhash_provider: Arc<dyn RecentBlockhashProvider>) -> Self {
118        Self::new(
119            blockhash_provider,
120            Arc::new(StaticLeaderProvider::default()),
121        )
122    }
123
124    /// Creates a client with RPC-backed on-demand blockhash sourcing and no leader routing.
125    ///
126    /// # Errors
127    ///
128    /// Returns [`SubmitTransportError`] when the RPC-backed blockhash provider cannot be created.
129    pub fn blockhash_via_rpc(rpc_url: impl Into<String>) -> Result<Self, SubmitTransportError> {
130        let blockhash_provider = Arc::new(RpcRecentBlockhashProvider::new(rpc_url.into())?);
131        Ok(Self::blockhash_only(blockhash_provider.clone())
132            .with_rpc_blockhash_provider(blockhash_provider))
133    }
134
135    /// Creates an RPC-only client from one RPC URL used for both blockhash and submission.
136    ///
137    /// # Errors
138    ///
139    /// Returns [`SubmitTransportError`] when the RPC transport or blockhash provider
140    /// cannot be initialized.
141    pub fn rpc_only(rpc_url: impl Into<String>) -> Result<Self, SubmitTransportError> {
142        let rpc_url = rpc_url.into();
143        let client = Self::blockhash_via_rpc(rpc_url.clone())?;
144        let rpc_transport = Arc::new(JsonRpcTransport::new(rpc_url)?);
145        Ok(client.with_rpc_transport(rpc_transport))
146    }
147
148    /// Sets optional backup validators.
149    #[must_use]
150    pub fn with_backups(mut self, backups: Vec<LeaderTarget>) -> Self {
151        self.backups = backups;
152        self
153    }
154
155    /// Sets routing policy.
156    #[must_use]
157    pub fn with_routing_policy(mut self, policy: RoutingPolicy) -> Self {
158        self.policy = policy.normalized();
159        self
160    }
161
162    /// Sets dedupe TTL.
163    #[must_use]
164    pub fn with_dedupe_ttl(mut self, ttl: Duration) -> Self {
165        self.deduper = SignatureDeduper::new(ttl);
166        self
167    }
168
169    /// Sets RPC transport.
170    #[must_use]
171    pub fn with_rpc_transport(mut self, transport: Arc<dyn RpcSubmitTransport>) -> Self {
172        self.rpc_transport = Some(transport);
173        self
174    }
175
176    /// Sets direct transport.
177    #[must_use]
178    pub fn with_direct_transport(mut self, transport: Arc<dyn DirectSubmitTransport>) -> Self {
179        self.direct_transport = Some(transport);
180        self
181    }
182
183    /// Sets Jito transport.
184    #[must_use]
185    pub fn with_jito_transport(mut self, transport: Arc<dyn JitoSubmitTransport>) -> Self {
186        self.jito_transport = Some(transport);
187        self
188    }
189
190    /// Sets RPC submit tuning.
191    #[must_use]
192    pub fn with_rpc_config(mut self, config: RpcSubmitConfig) -> Self {
193        self.rpc_config = config;
194        self
195    }
196
197    /// Sets Jito submit tuning.
198    #[must_use]
199    pub const fn with_jito_config(mut self, config: JitoSubmitConfig) -> Self {
200        self.jito_config = config;
201        self
202    }
203
204    /// Sets direct submit tuning.
205    #[must_use]
206    pub const fn with_direct_config(mut self, config: DirectSubmitConfig) -> Self {
207        self.direct_config = config.normalized();
208        self
209    }
210
211    /// Sets direct/hybrid reliability profile.
212    #[must_use]
213    pub const fn with_reliability(mut self, reliability: SubmitReliability) -> Self {
214        self.direct_config = DirectSubmitConfig::from_reliability(reliability);
215        self
216    }
217
218    /// Sets the toxic-flow guard source used before submission.
219    #[must_use]
220    pub fn with_flow_safety_source(mut self, source: Arc<dyn TxFlowSafetySource>) -> Self {
221        self.flow_safety_source = Some(source);
222        self
223    }
224
225    /// Sets the toxic-flow guard policy.
226    #[must_use]
227    pub const fn with_guard_policy(mut self, policy: TxSubmitGuardPolicy) -> Self {
228        self.guard_policy = policy;
229        self
230    }
231
232    /// Sets an optional external outcome reporter.
233    #[must_use]
234    pub fn with_outcome_reporter(mut self, reporter: Arc<dyn TxSubmitOutcomeReporter>) -> Self {
235        self.outcome_reporter = Some(OutcomeReporterHandle::new(reporter));
236        self
237    }
238
239    /// Registers an RPC-backed blockhash provider to refresh on demand for unsigned submit paths.
240    #[must_use]
241    pub fn with_rpc_blockhash_provider(
242        mut self,
243        provider: Arc<RpcRecentBlockhashProvider>,
244    ) -> Self {
245        self.on_demand_blockhash_provider = Some(provider);
246        self
247    }
248
249    /// Returns the current built-in toxic-flow telemetry snapshot.
250    #[must_use]
251    pub fn toxic_flow_telemetry(&self) -> TxToxicFlowTelemetrySnapshot {
252        self.telemetry.snapshot()
253    }
254
255    /// Records one external terminal outcome against the built-in telemetry and optional reporter.
256    pub fn record_external_outcome(&self, outcome: &TxSubmitOutcome) {
257        record_external_outcome_shared(&self.telemetry, self.outcome_reporter.as_ref(), outcome);
258    }
259
260    /// Submits externally signed transaction bytes.
261    ///
262    /// # Errors
263    ///
264    /// Returns [`SubmitError`] when decoding, dedupe, routing, or submission fails.
265    pub async fn submit_signed(
266        &mut self,
267        signed_tx: SignedTx,
268        mode: SubmitMode,
269    ) -> Result<SubmitResult, SubmitError> {
270        self.submit_signed_with_context(signed_tx, mode, TxSubmitContext::default())
271            .await
272    }
273
274    /// Submits externally signed transaction bytes through one route plan.
275    ///
276    /// # Errors
277    ///
278    /// Returns [`SubmitError`] when decoding, dedupe, routing, or submission fails.
279    pub async fn submit_signed_via(
280        &mut self,
281        signed_tx: SignedTx,
282        plan: SubmitPlan,
283    ) -> Result<SubmitResult, SubmitError> {
284        self.submit_signed_with_context_via(signed_tx, plan, TxSubmitContext::default())
285            .await
286    }
287
288    /// Submits externally signed transaction bytes with explicit toxic-flow context.
289    ///
290    /// # Errors
291    ///
292    /// Returns [`SubmitError`] when decoding, dedupe, toxic-flow guards, routing, or submission
293    /// fails.
294    pub async fn submit_signed_with_context(
295        &mut self,
296        signed_tx: SignedTx,
297        mode: SubmitMode,
298        context: TxSubmitContext,
299    ) -> Result<SubmitResult, SubmitError> {
300        self.submit_signed_with_context_via(signed_tx, SubmitPlan::from(mode), context)
301            .await
302    }
303
304    /// Submits externally signed transaction bytes with explicit toxic-flow context and route
305    /// plan.
306    ///
307    /// # Errors
308    ///
309    /// Returns [`SubmitError`] when decoding, dedupe, toxic-flow guards, routing, or submission
310    /// fails.
311    pub async fn submit_signed_with_context_via(
312        &mut self,
313        signed_tx: SignedTx,
314        plan: SubmitPlan,
315        context: TxSubmitContext,
316    ) -> Result<SubmitResult, SubmitError> {
317        let tx_bytes = match signed_tx {
318            SignedTx::VersionedTransactionBytes(bytes) => bytes,
319            SignedTx::WireTransactionBytes(bytes) => bytes,
320        };
321        let signature = extract_first_signature(&tx_bytes)?;
322        self.submit_bytes(tx_bytes, signature, plan, context).await
323    }
324
325    /// Refreshes any configured on-demand RPC blockhash source and returns the latest bytes.
326    ///
327    /// This is intended for explicit compatibility layers that build Solana-native transactions
328    /// outside the core byte-oriented `sof-tx` API surface.
329    ///
330    /// # Errors
331    ///
332    /// Returns [`SubmitTransportError`] when the RPC-backed blockhash refresh fails.
333    pub async fn refresh_latest_blockhash_bytes(
334        &self,
335    ) -> Result<Option<[u8; 32]>, SubmitTransportError> {
336        if let Some(provider) = &self.on_demand_blockhash_provider {
337            let _ = provider.refresh().await?;
338        }
339        Ok(self.latest_blockhash_bytes())
340    }
341
342    /// Returns the latest cached recent blockhash bytes when available.
343    #[must_use]
344    pub fn latest_blockhash_bytes(&self) -> Option<[u8; 32]> {
345        self.blockhash_provider.latest_blockhash()
346    }
347
348    /// Submits raw tx bytes after dedupe check.
349    async fn submit_bytes(
350        &mut self,
351        tx_bytes: Vec<u8>,
352        signature: Option<SignatureBytes>,
353        plan: SubmitPlan,
354        context: TxSubmitContext,
355    ) -> Result<SubmitResult, SubmitError> {
356        let plan = plan.into_normalized();
357        self.validate_submit_plan(&plan)?;
358        self.enforce_toxic_flow_guards(signature, &plan, &context)?;
359        self.enforce_dedupe(signature)?;
360        let tx_bytes = Arc::<[u8]>::from(tx_bytes);
361        match plan.strategy {
362            SubmitStrategy::OrderedFallback => {
363                self.submit_routes_in_order(tx_bytes, signature, plan).await
364            }
365            SubmitStrategy::AllAtOnce => {
366                self.submit_routes_all_at_once(tx_bytes, signature, plan)
367                    .await
368            }
369        }
370    }
371
372    /// Applies signature dedupe policy.
373    fn enforce_dedupe(&mut self, signature: Option<SignatureBytes>) -> Result<(), SubmitError> {
374        if let Some(signature) = signature {
375            let now = Instant::now();
376            if !self.deduper.check_and_insert(signature, now) {
377                return Err(SubmitError::DuplicateSignature);
378            }
379        }
380        Ok(())
381    }
382
383    /// Applies toxic-flow guard policy before transport.
384    fn enforce_toxic_flow_guards(
385        &mut self,
386        signature: Option<SignatureBytes>,
387        plan: &SubmitPlan,
388        context: &TxSubmitContext,
389    ) -> Result<(), SubmitError> {
390        let legacy_mode = plan.legacy_mode();
391        let now = SystemTime::now();
392        let opportunity_age_ms = context
393            .opportunity_created_at
394            .and_then(|created_at| now.duration_since(created_at).ok())
395            .map(duration_millis_u64);
396        if let Some(age_ms) = opportunity_age_ms
397            && let Some(max_age) = self.guard_policy.max_opportunity_age
398        {
399            let max_allowed_ms = duration_millis_u64(max_age);
400            if age_ms > max_allowed_ms {
401                return Err(self.reject_with_outcome(
402                    TxToxicFlowRejectionReason::OpportunityStale {
403                        age_ms,
404                        max_allowed_ms,
405                    },
406                    TxSubmitOutcomeKind::RejectedDueToStaleness,
407                    RejectionMetadata {
408                        signature,
409                        plan: plan.clone(),
410                        legacy_mode,
411                        state_version: None,
412                        opportunity_age_ms,
413                    },
414                ));
415            }
416        }
417
418        if self.suppression.is_suppressed(
419            &context.suppression_keys,
420            now,
421            self.guard_policy.suppression_ttl,
422        ) {
423            return Err(self.reject_with_outcome(
424                TxToxicFlowRejectionReason::Suppressed,
425                TxSubmitOutcomeKind::Suppressed,
426                RejectionMetadata {
427                    signature,
428                    plan: plan.clone(),
429                    legacy_mode,
430                    state_version: None,
431                    opportunity_age_ms,
432                },
433            ));
434        }
435
436        if let Some(source) = &self.flow_safety_source {
437            let snapshot = source.toxic_flow_snapshot();
438            if self.guard_policy.reject_on_replay_recovery_pending
439                && snapshot.replay_recovery_pending
440            {
441                return Err(self.reject_with_outcome(
442                    TxToxicFlowRejectionReason::ReplayRecoveryPending,
443                    TxSubmitOutcomeKind::RejectedDueToReplayRecovery,
444                    RejectionMetadata {
445                        signature,
446                        plan: plan.clone(),
447                        legacy_mode,
448                        state_version: snapshot.current_state_version,
449                        opportunity_age_ms,
450                    },
451                ));
452            }
453            if self.guard_policy.require_stable_control_plane
454                && !matches!(snapshot.quality, TxFlowSafetyQuality::Stable)
455            {
456                let outcome_kind = match snapshot.quality {
457                    TxFlowSafetyQuality::ReorgRisk | TxFlowSafetyQuality::Provisional => {
458                        TxSubmitOutcomeKind::RejectedDueToReorgRisk
459                    }
460                    TxFlowSafetyQuality::Stale => TxSubmitOutcomeKind::RejectedDueToStaleness,
461                    TxFlowSafetyQuality::Degraded
462                    | TxFlowSafetyQuality::IncompleteControlPlane
463                    | TxFlowSafetyQuality::Stable => TxSubmitOutcomeKind::Suppressed,
464                };
465                return Err(self.reject_with_outcome(
466                    TxToxicFlowRejectionReason::UnsafeControlPlane {
467                        quality: snapshot.quality,
468                    },
469                    outcome_kind,
470                    RejectionMetadata {
471                        signature,
472                        plan: plan.clone(),
473                        legacy_mode,
474                        state_version: snapshot.current_state_version,
475                        opportunity_age_ms,
476                    },
477                ));
478            }
479            if let (Some(decision_version), Some(current_version), Some(max_allowed)) = (
480                context.decision_state_version,
481                snapshot.current_state_version,
482                self.guard_policy.max_state_version_drift,
483            ) {
484                let drift = current_version.saturating_sub(decision_version);
485                if drift > max_allowed {
486                    return Err(self.reject_with_outcome(
487                        TxToxicFlowRejectionReason::StateDrift { drift, max_allowed },
488                        TxSubmitOutcomeKind::RejectedDueToStateDrift,
489                        RejectionMetadata {
490                            signature,
491                            plan: plan.clone(),
492                            legacy_mode,
493                            state_version: Some(current_version),
494                            opportunity_age_ms,
495                        },
496                    ));
497                }
498            }
499        }
500
501        self.suppression.insert_all(&context.suppression_keys, now);
502        Ok(())
503    }
504
505    /// Builds one rejection error while recording telemetry and reporting.
506    fn reject_with_outcome(
507        &self,
508        reason: TxToxicFlowRejectionReason,
509        outcome_kind: TxSubmitOutcomeKind,
510        metadata: RejectionMetadata,
511    ) -> SubmitError {
512        let outcome = TxSubmitOutcome {
513            kind: outcome_kind,
514            signature: metadata.signature,
515            route: None,
516            plan: metadata.plan,
517            legacy_mode: metadata.legacy_mode,
518            rpc_signature: None,
519            jito_signature: None,
520            jito_bundle_id: None,
521            state_version: metadata.state_version,
522            opportunity_age_ms: metadata.opportunity_age_ms,
523        };
524        self.record_external_outcome(&outcome);
525        SubmitError::ToxicFlow { reason }
526    }
527
528    /// Validates that every configured route has the required transport wiring.
529    fn validate_submit_plan(&self, plan: &SubmitPlan) -> Result<(), SubmitError> {
530        if plan.routes.is_empty() {
531            return Err(SubmitError::InternalSync {
532                message: "submit plan must contain at least one route".to_owned(),
533            });
534        }
535        for route in &plan.routes {
536            match route {
537                SubmitRoute::Rpc if self.rpc_transport.is_none() => {
538                    return Err(SubmitError::MissingRpcTransport);
539                }
540                SubmitRoute::Jito if self.jito_transport.is_none() => {
541                    return Err(SubmitError::MissingJitoTransport);
542                }
543                SubmitRoute::Direct if self.direct_transport.is_none() => {
544                    return Err(SubmitError::MissingDirectTransport);
545                }
546                SubmitRoute::Rpc | SubmitRoute::Jito | SubmitRoute::Direct => {}
547            }
548        }
549        Ok(())
550    }
551
552    /// Executes one route plan in order and returns the first successful route.
553    async fn submit_routes_in_order(
554        &self,
555        tx_bytes: Arc<[u8]>,
556        signature: Option<SignatureBytes>,
557        plan: SubmitPlan,
558    ) -> Result<SubmitResult, SubmitError> {
559        let legacy_mode = plan.legacy_mode();
560        let mut last_error = None;
561        let task_context = self.route_task_context();
562        for (route_idx, route) in plan.routes.iter().copied().enumerate() {
563            let next_idx = route_idx.saturating_add(1);
564            let has_later_routes = plan.routes.get(next_idx).is_some();
565            let direct_mode = if has_later_routes {
566                DirectExecutionMode::Fallback
567            } else {
568                DirectExecutionMode::Standalone
569            };
570            match submit_one_route_task(
571                route,
572                Arc::clone(&tx_bytes),
573                task_context.clone(),
574                direct_mode,
575            )
576            .await
577            {
578                Ok(outcome) => {
579                    self.record_route_outcome(signature, &plan, &outcome);
580                    if matches!(outcome.route, SubmitRoute::Direct) {
581                        self.spawn_agave_rebroadcast(
582                            Arc::clone(&tx_bytes),
583                            &self.direct_config.clone().normalized(),
584                        );
585                        if has_later_routes
586                            && self.direct_config.hybrid_rpc_broadcast
587                            && plan
588                                .routes
589                                .iter()
590                                .skip(next_idx)
591                                .any(|next| *next == SubmitRoute::Rpc)
592                        {
593                            self.spawn_background_rpc_broadcast(
594                                Arc::clone(&tx_bytes),
595                                signature,
596                                plan.clone(),
597                            );
598                        }
599                    }
600                    return Ok(SubmitResult {
601                        signature,
602                        plan,
603                        legacy_mode,
604                        first_success_route: Some(outcome.route),
605                        successful_routes: vec![outcome.route],
606                        direct_target: outcome.direct_target,
607                        rpc_signature: outcome.rpc_signature,
608                        jito_signature: outcome.jito_signature,
609                        jito_bundle_id: outcome.jito_bundle_id,
610                        used_fallback_route: route_idx > 0,
611                        selected_target_count: outcome.selected_target_count,
612                        selected_identity_count: outcome.selected_identity_count,
613                    });
614                }
615                Err(error) => last_error = Some(error),
616            }
617        }
618        Err(last_error.unwrap_or_else(|| SubmitError::InternalSync {
619            message: "ordered submit plan completed without a route outcome".to_owned(),
620        }))
621    }
622
623    /// Executes every configured route at once and returns on the first successful route.
624    async fn submit_routes_all_at_once(
625        &self,
626        tx_bytes: Arc<[u8]>,
627        signature: Option<SignatureBytes>,
628        plan: SubmitPlan,
629    ) -> Result<SubmitResult, SubmitError> {
630        let legacy_mode = plan.legacy_mode();
631        let task_context = self.route_task_context();
632        let direct_transport = self.direct_transport.clone();
633        let leader_provider = self.leader_provider.clone();
634        let backups = self.backups.clone();
635        let policy = self.policy;
636        let direct_config = self.direct_config.clone().normalized();
637        let (result_tx, mut result_rx) = mpsc::unbounded_channel();
638        for (route_idx, route) in plan.routes.iter().copied().enumerate() {
639            let task_context = task_context.clone();
640            let tx_bytes = Arc::clone(&tx_bytes);
641            let result_tx = result_tx.clone();
642            let telemetry = Arc::clone(&self.telemetry);
643            let reporter = self.outcome_reporter.clone();
644            let flow_safety_source = self.flow_safety_source.clone();
645            let plan_for_task = plan.clone();
646            let direct_transport = direct_transport.clone();
647            let leader_provider = leader_provider.clone();
648            let backups = backups.clone();
649            let direct_config = direct_config.clone();
650            tokio::spawn(async move {
651                let result = submit_one_route_task(
652                    route,
653                    Arc::clone(&tx_bytes),
654                    task_context,
655                    DirectExecutionMode::Standalone,
656                )
657                .await;
658                if let Ok(outcome) = &result {
659                    record_route_outcome_shared(
660                        &telemetry,
661                        reporter.as_ref(),
662                        flow_safety_source.as_ref(),
663                        signature,
664                        &plan_for_task,
665                        outcome,
666                    );
667                    if matches!(outcome.route, SubmitRoute::Direct)
668                        && direct_config.agave_rebroadcast_enabled
669                        && !direct_config.agave_rebroadcast_window.is_zero()
670                        && let Some(direct_transport) = direct_transport
671                    {
672                        spawn_agave_rebroadcast_task(
673                            Arc::clone(&tx_bytes),
674                            direct_transport,
675                            leader_provider,
676                            backups,
677                            policy,
678                            direct_config.clone(),
679                        );
680                    }
681                }
682                drop(result_tx.send((route_idx, result)));
683            });
684        }
685        drop(result_tx);
686
687        let mut errors_by_route: Vec<Option<SubmitError>> = std::iter::repeat_with(|| None)
688            .take(plan.routes.len())
689            .collect();
690        while let Some((route_idx, result)) = result_rx.recv().await {
691            match result {
692                Ok(outcome) => {
693                    return Ok(SubmitResult {
694                        signature,
695                        plan,
696                        legacy_mode,
697                        first_success_route: Some(outcome.route),
698                        successful_routes: vec![outcome.route],
699                        direct_target: outcome.direct_target,
700                        rpc_signature: outcome.rpc_signature,
701                        jito_signature: outcome.jito_signature,
702                        jito_bundle_id: outcome.jito_bundle_id,
703                        used_fallback_route: false,
704                        selected_target_count: outcome.selected_target_count,
705                        selected_identity_count: outcome.selected_identity_count,
706                    });
707                }
708                Err(error) => {
709                    if let Some(slot) = errors_by_route.get_mut(route_idx) {
710                        *slot = Some(error);
711                    }
712                }
713            }
714        }
715
716        Err(errors_by_route
717            .into_iter()
718            .flatten()
719            .next()
720            .unwrap_or_else(|| SubmitError::InternalSync {
721                message: "all-at-once submit plan completed without a route outcome".to_owned(),
722            }))
723    }
724
725    /// Records one accepted route as a terminal telemetry outcome.
726    fn record_route_outcome(
727        &self,
728        signature: Option<SignatureBytes>,
729        plan: &SubmitPlan,
730        outcome: &RouteSubmitOutcome,
731    ) {
732        record_route_outcome_shared(
733            &self.telemetry,
734            self.outcome_reporter.as_ref(),
735            self.flow_safety_source.as_ref(),
736            signature,
737            plan,
738            outcome,
739        );
740    }
741
742    /// Clones the current route transports and config into one per-attempt task context.
743    fn route_task_context(&self) -> RouteTaskContext {
744        RouteTaskContext {
745            rpc_transport: self.rpc_transport.clone(),
746            jito_transport: self.jito_transport.clone(),
747            direct_transport: self.direct_transport.clone(),
748            leader_provider: self.leader_provider.clone(),
749            backups: Arc::from(self.backups.clone()),
750            policy: self.policy,
751            rpc_config: self.rpc_config.clone(),
752            jito_config: self.jito_config.clone(),
753            direct_config: self.direct_config.clone().normalized(),
754        }
755    }
756
757    /// Starts the post-ack rebroadcast worker when that reliability mode is enabled.
758    fn spawn_agave_rebroadcast(&self, tx_bytes: Arc<[u8]>, direct_config: &DirectSubmitConfig) {
759        if !direct_config.agave_rebroadcast_enabled
760            || direct_config.agave_rebroadcast_window.is_zero()
761        {
762            return;
763        }
764        let Some(direct_transport) = self.direct_transport.clone() else {
765            return;
766        };
767        spawn_agave_rebroadcast_task(
768            tx_bytes,
769            direct_transport,
770            self.leader_provider.clone(),
771            self.backups.clone(),
772            self.policy,
773            direct_config.clone(),
774        );
775    }
776
777    /// Starts one best-effort background RPC rebroadcast without delaying direct success.
778    fn spawn_background_rpc_broadcast(
779        &self,
780        tx_bytes: Arc<[u8]>,
781        signature: Option<SignatureBytes>,
782        plan: SubmitPlan,
783    ) {
784        let Some(rpc) = self.rpc_transport.clone() else {
785            return;
786        };
787        let rpc_config = self.rpc_config.clone();
788        let telemetry = Arc::clone(&self.telemetry);
789        let reporter = self.outcome_reporter.clone();
790        let flow_safety_source = self.flow_safety_source.clone();
791        tokio::spawn(async move {
792            if let Ok(rpc_signature) = rpc.submit_rpc(tx_bytes.as_ref(), &rpc_config).await {
793                let outcome = RouteSubmitOutcome {
794                    route: SubmitRoute::Rpc,
795                    direct_target: None,
796                    rpc_signature: Some(rpc_signature),
797                    jito_signature: None,
798                    jito_bundle_id: None,
799                    selected_target_count: 0,
800                    selected_identity_count: 0,
801                };
802                record_route_outcome_shared(
803                    &telemetry,
804                    reporter.as_ref(),
805                    flow_safety_source.as_ref(),
806                    signature,
807                    &plan,
808                    &outcome,
809                );
810            }
811        });
812    }
813}
814
815/// Carries toxic-flow rejection metadata into the shared rejection helper.
816#[derive(Debug, Clone)]
817struct RejectionMetadata {
818    /// Signature being evaluated, when already known.
819    signature: Option<SignatureBytes>,
820    /// Route plan that was being attempted.
821    plan: SubmitPlan,
822    /// Matching legacy preset when the plan is one exact compatibility shape.
823    legacy_mode: Option<SubmitMode>,
824    /// Flow-safety state version attached to the rejection, when any.
825    state_version: Option<u64>,
826    /// Age of the triggering opportunity in milliseconds, when tracked.
827    opportunity_age_ms: Option<u64>,
828}
829
830/// Cloned submit transports and route config for one route execution task.
831#[derive(Clone)]
832struct RouteTaskContext {
833    /// RPC submit transport, when configured.
834    rpc_transport: Option<Arc<dyn RpcSubmitTransport>>,
835    /// Jito submit transport, when configured.
836    jito_transport: Option<Arc<dyn JitoSubmitTransport>>,
837    /// Direct submit transport, when configured.
838    direct_transport: Option<Arc<dyn DirectSubmitTransport>>,
839    /// Source of leader targets.
840    leader_provider: Arc<dyn LeaderProvider>,
841    /// Backup targets configured alongside the leader provider.
842    backups: Arc<[LeaderTarget]>,
843    /// Route selection policy for direct submission.
844    policy: RoutingPolicy,
845    /// RPC submit tuning.
846    rpc_config: RpcSubmitConfig,
847    /// Jito submit tuning.
848    jito_config: JitoSubmitConfig,
849    /// Direct submit tuning.
850    direct_config: DirectSubmitConfig,
851}
852
853/// Selects whether direct route execution is standalone or part of a fallback chain.
854#[derive(Debug, Clone, Copy, Eq, PartialEq)]
855enum DirectExecutionMode {
856    /// Execute direct submission using the direct-only retry budget.
857    Standalone,
858    /// Execute direct submission using the hybrid fallback retry budget.
859    Fallback,
860}
861
862/// One successful route execution before it is folded into the public submit result.
863#[derive(Debug)]
864struct RouteSubmitOutcome {
865    /// Route that accepted the submission.
866    route: SubmitRoute,
867    /// Direct target that accepted the submission, when any.
868    direct_target: Option<LeaderTarget>,
869    /// RPC signature metadata for accepted RPC submission.
870    rpc_signature: Option<String>,
871    /// Jito transaction signature metadata when available.
872    jito_signature: Option<String>,
873    /// Jito bundle id metadata when available.
874    jito_bundle_id: Option<String>,
875    /// Number of direct targets considered for this attempt.
876    selected_target_count: usize,
877    /// Number of unique identities represented by the direct targets.
878    selected_identity_count: usize,
879}
880
881/// Records one structured outcome through built-in telemetry and any external reporter.
882fn record_external_outcome_shared(
883    telemetry: &Arc<TxToxicFlowTelemetry>,
884    reporter: Option<&OutcomeReporterHandle>,
885    outcome: &TxSubmitOutcome,
886) {
887    telemetry.record(outcome);
888    if let Some(reporter) = reporter {
889        reporter.dispatch(telemetry, outcome.clone());
890    }
891}
892
893/// Records one accepted route as a terminal telemetry outcome using shared sinks.
894fn record_route_outcome_shared(
895    telemetry: &Arc<TxToxicFlowTelemetry>,
896    reporter: Option<&OutcomeReporterHandle>,
897    flow_safety_source: Option<&Arc<dyn TxFlowSafetySource>>,
898    signature: Option<SignatureBytes>,
899    plan: &SubmitPlan,
900    outcome: &RouteSubmitOutcome,
901) {
902    let kind = match outcome.route {
903        SubmitRoute::Rpc => TxSubmitOutcomeKind::RpcAccepted,
904        SubmitRoute::Jito => TxSubmitOutcomeKind::JitoAccepted,
905        SubmitRoute::Direct => TxSubmitOutcomeKind::DirectAccepted,
906    };
907    let outcome = TxSubmitOutcome {
908        kind,
909        signature,
910        route: Some(outcome.route),
911        plan: plan.clone(),
912        legacy_mode: plan.legacy_mode(),
913        rpc_signature: outcome.rpc_signature.clone(),
914        jito_signature: outcome.jito_signature.clone(),
915        jito_bundle_id: outcome.jito_bundle_id.clone(),
916        state_version: flow_safety_source
917            .and_then(|source| source.toxic_flow_snapshot().current_state_version),
918        opportunity_age_ms: None,
919    };
920    record_external_outcome_shared(telemetry, reporter, &outcome);
921}
922
923/// Outcome reporter lifecycle state stored by one client.
924#[derive(Clone)]
925enum OutcomeReporterHandle {
926    /// Ready shared dispatcher for this reporter instance.
927    Ready(Arc<OutcomeReporterDispatcher>),
928    /// Reporter worker could not be created.
929    Unavailable,
930}
931
932impl OutcomeReporterHandle {
933    /// Creates one handle for the provided reporter.
934    fn new(reporter: Arc<dyn TxSubmitOutcomeReporter>) -> Self {
935        match OutcomeReporterDispatcher::shared(reporter) {
936            Ok(dispatcher) => Self::Ready(dispatcher),
937            Err(error) => {
938                eprintln!("sof-tx: failed to start external outcome reporter worker: {error}");
939                Self::Unavailable
940            }
941        }
942    }
943
944    /// Dispatches one outcome without extending the submit hot path.
945    fn dispatch(&self, telemetry: &Arc<TxToxicFlowTelemetry>, outcome: TxSubmitOutcome) {
946        match self {
947            Self::Ready(dispatcher) => match dispatcher.dispatch(outcome) {
948                ReporterDispatchStatus::Enqueued => {}
949                ReporterDispatchStatus::DroppedFull => telemetry.record_reporter_drop(),
950                ReporterDispatchStatus::Unavailable => telemetry.record_reporter_unavailable(),
951            },
952            Self::Unavailable => telemetry.record_reporter_unavailable(),
953        }
954    }
955}
956
957/// Shared worker state for one concrete reporter instance.
958struct OutcomeReporterDispatcher {
959    /// Bounded FIFO channel to the reporter worker.
960    tx: SyncSender<TxSubmitOutcome>,
961    /// Ensures queue saturation is surfaced without spamming stderr.
962    queue_full_warned: AtomicBool,
963    /// Ensures worker disconnect is surfaced without spamming stderr.
964    unavailable_warned: AtomicBool,
965}
966
967impl OutcomeReporterDispatcher {
968    /// Maximum number of pending outcomes kept for the external reporter.
969    #[cfg(not(test))]
970    const QUEUE_CAPACITY: usize = 1024;
971    #[cfg(test)]
972    const QUEUE_CAPACITY: usize = 8;
973
974    /// Creates one shared dispatcher and worker thread.
975    fn shared(reporter: Arc<dyn TxSubmitOutcomeReporter>) -> Result<Arc<Self>, std::io::Error> {
976        let key = reporter_identity(&reporter);
977        let registry = outcome_reporter_registry();
978        let mut registry = registry
979            .lock()
980            .unwrap_or_else(|poisoned| poisoned.into_inner());
981        registry.retain(|_key, dispatcher| dispatcher.strong_count() > 0);
982        if let Some(existing) = registry.get(&key).and_then(Weak::upgrade) {
983            return Ok(existing);
984        }
985
986        let (tx, rx) = std_mpsc::sync_channel::<TxSubmitOutcome>(Self::QUEUE_CAPACITY);
987        thread::Builder::new()
988            .name("sof-tx-outcome-reporter".to_owned())
989            .spawn(move || {
990                while let Ok(outcome) = rx.recv() {
991                    drop(catch_unwind(AssertUnwindSafe(|| {
992                        reporter.record_outcome(&outcome);
993                    })));
994                }
995            })?;
996
997        let dispatcher = Arc::new(Self {
998            tx,
999            queue_full_warned: AtomicBool::new(false),
1000            unavailable_warned: AtomicBool::new(false),
1001        });
1002        let _ = registry.insert(key, Arc::downgrade(&dispatcher));
1003        Ok(dispatcher)
1004    }
1005
1006    /// Enqueues one outcome without blocking the submit caller.
1007    fn dispatch(&self, outcome: TxSubmitOutcome) -> ReporterDispatchStatus {
1008        match self.tx.try_send(outcome) {
1009            Ok(()) => {
1010                self.queue_full_warned.store(false, Ordering::Relaxed);
1011                ReporterDispatchStatus::Enqueued
1012            }
1013            Err(TrySendError::Disconnected(_)) => {
1014                if !self.unavailable_warned.swap(true, Ordering::Relaxed) {
1015                    eprintln!(
1016                        "sof-tx: external outcome reporter worker stopped; dropping reporter outcomes"
1017                    );
1018                }
1019                ReporterDispatchStatus::Unavailable
1020            }
1021            Err(TrySendError::Full(_)) => {
1022                if !self.queue_full_warned.swap(true, Ordering::Relaxed) {
1023                    eprintln!(
1024                        "sof-tx: external outcome reporter queue is full; dropping reporter outcomes until it drains"
1025                    );
1026                }
1027                ReporterDispatchStatus::DroppedFull
1028            }
1029        }
1030    }
1031}
1032
1033/// Result of one best-effort reporter dispatch attempt.
1034enum ReporterDispatchStatus {
1035    /// Outcome was queued successfully.
1036    Enqueued,
1037    /// Outcome was dropped because the queue was full.
1038    DroppedFull,
1039    /// Outcome could not be queued because the reporter worker was unavailable.
1040    Unavailable,
1041}
1042
1043/// Shared registry of per-reporter dispatchers.
1044static OUTCOME_REPORTER_REGISTRY: OnceLock<Mutex<HashMap<usize, Weak<OutcomeReporterDispatcher>>>> =
1045    OnceLock::new();
1046
1047/// Returns the shared dispatcher registry.
1048fn outcome_reporter_registry() -> &'static Mutex<HashMap<usize, Weak<OutcomeReporterDispatcher>>> {
1049    OUTCOME_REPORTER_REGISTRY.get_or_init(|| Mutex::new(HashMap::new()))
1050}
1051
1052/// Stable identity key for one reporter instance.
1053fn reporter_identity(reporter: &Arc<dyn TxSubmitOutcomeReporter>) -> usize {
1054    Arc::as_ptr(reporter) as *const () as usize
1055}
1056
1057/// Extracts the first transaction signature from serialized transaction bytes.
1058fn extract_first_signature(tx_bytes: &[u8]) -> Result<Option<SignatureBytes>, SubmitError> {
1059    let Some((signature_count, offset)) = decode_short_u16_len_prefix(tx_bytes) else {
1060        return Err(decode_signed_bytes_error(
1061            "transaction bytes did not contain a valid signature vector prefix",
1062        ));
1063    };
1064    if signature_count == 0 {
1065        return Ok(None);
1066    }
1067    let signature_end = offset.saturating_add(64);
1068    let Some(signature_bytes) = tx_bytes.get(offset..signature_end) else {
1069        return Err(decode_signed_bytes_error(
1070            "transaction bytes ended before the first signature completed",
1071        ));
1072    };
1073    let mut signature = [0_u8; 64];
1074    signature.copy_from_slice(signature_bytes);
1075    Ok(Some(SignatureBytes::new(signature)))
1076}
1077
1078/// Builds one signed-byte decode error from a static message.
1079fn decode_signed_bytes_error(message: &'static str) -> SubmitError {
1080    SubmitError::DecodeSignedBytes {
1081        source: Box::new(bincode::ErrorKind::Custom(message.to_owned())),
1082    }
1083}
1084
1085/// Executes one concrete submit route with the cloned task context for that attempt.
1086async fn submit_one_route_task(
1087    route: SubmitRoute,
1088    tx_bytes: Arc<[u8]>,
1089    task_context: RouteTaskContext,
1090    direct_mode: DirectExecutionMode,
1091) -> Result<RouteSubmitOutcome, SubmitError> {
1092    match route {
1093        SubmitRoute::Rpc => {
1094            let rpc = task_context
1095                .rpc_transport
1096                .ok_or(SubmitError::MissingRpcTransport)?;
1097            let rpc_signature = rpc
1098                .submit_rpc(tx_bytes.as_ref(), &task_context.rpc_config)
1099                .await
1100                .map_err(|source| SubmitError::Rpc { source })?;
1101            Ok(RouteSubmitOutcome {
1102                route,
1103                direct_target: None,
1104                rpc_signature: Some(rpc_signature),
1105                jito_signature: None,
1106                jito_bundle_id: None,
1107                selected_target_count: 0,
1108                selected_identity_count: 0,
1109            })
1110        }
1111        SubmitRoute::Jito => {
1112            let jito = task_context
1113                .jito_transport
1114                .ok_or(SubmitError::MissingJitoTransport)?;
1115            let response = jito
1116                .submit_jito(tx_bytes.as_ref(), &task_context.jito_config)
1117                .await
1118                .map_err(|source| SubmitError::Jito { source })?;
1119            Ok(RouteSubmitOutcome {
1120                route,
1121                direct_target: None,
1122                rpc_signature: None,
1123                jito_signature: response.transaction_signature,
1124                jito_bundle_id: response.bundle_id,
1125                selected_target_count: 0,
1126                selected_identity_count: 0,
1127            })
1128        }
1129        SubmitRoute::Direct => {
1130            let direct = task_context
1131                .direct_transport
1132                .ok_or(SubmitError::MissingDirectTransport)?;
1133            let attempt_timeout = direct_attempt_timeout(&task_context.direct_config);
1134            let attempt_count = match direct_mode {
1135                DirectExecutionMode::Standalone => {
1136                    task_context.direct_config.direct_submit_attempts
1137                }
1138                DirectExecutionMode::Fallback => task_context.direct_config.hybrid_direct_attempts,
1139            };
1140            let mut last_error = None;
1141            for attempt_idx in 0..attempt_count {
1142                let mut targets = select_and_rank_targets(
1143                    task_context.leader_provider.as_ref(),
1144                    task_context.backups.as_ref(),
1145                    task_context.policy,
1146                    &task_context.direct_config,
1147                )
1148                .await;
1149                rotate_targets_for_attempt(&mut targets, attempt_idx, task_context.policy);
1150                let (selected_target_count, selected_identity_count) = summarize_targets(&targets);
1151                if targets.is_empty() {
1152                    if matches!(direct_mode, DirectExecutionMode::Fallback) {
1153                        break;
1154                    }
1155                    return Err(SubmitError::NoDirectTargets);
1156                }
1157                match timeout(
1158                    attempt_timeout,
1159                    direct.submit_direct(
1160                        tx_bytes.as_ref(),
1161                        &targets,
1162                        task_context.policy,
1163                        &task_context.direct_config,
1164                    ),
1165                )
1166                .await
1167                {
1168                    Ok(Ok(target)) => {
1169                        return Ok(RouteSubmitOutcome {
1170                            route,
1171                            direct_target: Some(target),
1172                            rpc_signature: None,
1173                            jito_signature: None,
1174                            jito_bundle_id: None,
1175                            selected_target_count,
1176                            selected_identity_count,
1177                        });
1178                    }
1179                    Ok(Err(source)) => last_error = Some(source),
1180                    Err(_elapsed) => {
1181                        last_error = Some(SubmitTransportError::Failure {
1182                            message: format!(
1183                                "direct submit attempt timed out after {}ms",
1184                                attempt_timeout.as_millis()
1185                            ),
1186                        });
1187                    }
1188                }
1189                if attempt_idx < attempt_count.saturating_sub(1) {
1190                    sleep(task_context.direct_config.rebroadcast_interval).await;
1191                }
1192            }
1193            Err(SubmitError::Direct {
1194                source: last_error.unwrap_or_else(|| SubmitTransportError::Failure {
1195                    message: "direct submit attempts exhausted".to_owned(),
1196                }),
1197            })
1198        }
1199    }
1200}
1201
1202#[cfg(not(test))]
1203/// Replays successful direct submissions for a bounded Agave-like persistence window.
1204fn spawn_agave_rebroadcast_task(
1205    tx_bytes: Arc<[u8]>,
1206    direct_transport: Arc<dyn DirectSubmitTransport>,
1207    leader_provider: Arc<dyn LeaderProvider>,
1208    backups: Vec<LeaderTarget>,
1209    policy: RoutingPolicy,
1210    direct_config: DirectSubmitConfig,
1211) {
1212    tokio::spawn(async move {
1213        let deadline = Instant::now()
1214            .checked_add(direct_config.agave_rebroadcast_window)
1215            .unwrap_or_else(Instant::now);
1216        loop {
1217            let now = Instant::now();
1218            if now >= deadline {
1219                break;
1220            }
1221
1222            let sleep_for = deadline
1223                .saturating_duration_since(now)
1224                .min(direct_config.agave_rebroadcast_interval);
1225            if !sleep_for.is_zero() {
1226                sleep(sleep_for).await;
1227            }
1228
1229            if Instant::now() >= deadline {
1230                break;
1231            }
1232
1233            let targets = select_and_rank_targets(
1234                leader_provider.as_ref(),
1235                backups.as_slice(),
1236                policy,
1237                &direct_config,
1238            )
1239            .await;
1240            if targets.is_empty() {
1241                continue;
1242            }
1243
1244            drop(
1245                timeout(
1246                    direct_attempt_timeout(&direct_config),
1247                    direct_transport.submit_direct(
1248                        tx_bytes.as_ref(),
1249                        &targets,
1250                        policy,
1251                        &direct_config,
1252                    ),
1253                )
1254                .await,
1255            );
1256        }
1257    });
1258}
1259
1260#[cfg(test)]
1261/// Test-only stub that disables background rebroadcasting for deterministic assertions.
1262fn spawn_agave_rebroadcast_task(
1263    _tx_bytes: Arc<[u8]>,
1264    _direct_transport: Arc<dyn DirectSubmitTransport>,
1265    _leader_provider: Arc<dyn LeaderProvider>,
1266    _backups: Vec<LeaderTarget>,
1267    _policy: RoutingPolicy,
1268    _direct_config: DirectSubmitConfig,
1269) {
1270}
1271
1272/// Selects routing targets and applies optional latency-aware ranking.
1273async fn select_and_rank_targets(
1274    leader_provider: &(impl LeaderProvider + ?Sized),
1275    backups: &[LeaderTarget],
1276    policy: RoutingPolicy,
1277    direct_config: &DirectSubmitConfig,
1278) -> Vec<LeaderTarget> {
1279    let targets = select_targets(leader_provider, backups, policy);
1280    rank_targets_by_latency(targets, direct_config).await
1281}
1282
1283/// Reorders the probe set by observed TCP connect latency while preserving the tail order.
1284async fn rank_targets_by_latency(
1285    targets: Vec<LeaderTarget>,
1286    direct_config: &DirectSubmitConfig,
1287) -> Vec<LeaderTarget> {
1288    if targets.len() <= 1 || !direct_config.latency_aware_targeting {
1289        return targets;
1290    }
1291
1292    let probe_timeout = direct_config.latency_probe_timeout;
1293    let probe_port = direct_config.latency_probe_port;
1294    let probe_count = targets
1295        .len()
1296        .min(direct_config.latency_probe_max_targets.max(1));
1297    let mut latencies = vec![None; probe_count];
1298    let mut probes = JoinSet::new();
1299    for (idx, target) in targets.iter().take(probe_count).cloned().enumerate() {
1300        probes.spawn(async move {
1301            (
1302                idx,
1303                probe_target_latency(&target, probe_port, probe_timeout).await,
1304            )
1305        });
1306    }
1307    while let Some(result) = probes.join_next().await {
1308        if let Ok((idx, latency)) = result
1309            && idx < latencies.len()
1310            && let Some(slot) = latencies.get_mut(idx)
1311        {
1312            *slot = latency;
1313        }
1314    }
1315
1316    let mut ranked = targets
1317        .iter()
1318        .take(probe_count)
1319        .cloned()
1320        .enumerate()
1321        .collect::<Vec<_>>();
1322    ranked.sort_by_key(|(idx, _target)| {
1323        (
1324            latencies.get(*idx).copied().flatten().unwrap_or(u128::MAX),
1325            *idx,
1326        )
1327    });
1328
1329    let mut output = ranked
1330        .into_iter()
1331        .map(|(_idx, target)| target)
1332        .collect::<Vec<_>>();
1333    output.extend(targets.iter().skip(probe_count).cloned());
1334    output
1335}
1336
1337/// Probes a target's candidate ports and keeps the best observed connect latency.
1338async fn probe_target_latency(
1339    target: &LeaderTarget,
1340    probe_port: Option<u16>,
1341    probe_timeout: Duration,
1342) -> Option<u128> {
1343    let mut ports = vec![target.tpu_addr.port()];
1344    if let Some(port) = probe_port
1345        && port != target.tpu_addr.port()
1346    {
1347        ports.push(port);
1348    }
1349
1350    let ip = target.tpu_addr.ip();
1351    let mut best = None::<u128>;
1352    for port in ports {
1353        if let Some(latency) = probe_tcp_latency(ip, port, probe_timeout).await {
1354            best = Some(best.map_or(latency, |current| current.min(latency)));
1355        }
1356    }
1357    best
1358}
1359
1360/// Measures one TCP connect attempt and returns elapsed milliseconds on success.
1361async fn probe_tcp_latency(
1362    ip: std::net::IpAddr,
1363    port: u16,
1364    timeout_duration: Duration,
1365) -> Option<u128> {
1366    let start = Instant::now();
1367    let addr = SocketAddr::new(ip, port);
1368    let stream = timeout(timeout_duration, TcpStream::connect(addr))
1369        .await
1370        .ok()?
1371        .ok()?;
1372    drop(stream);
1373    Some(start.elapsed().as_millis())
1374}
1375
1376/// Summarizes the selected target list for observability.
1377fn summarize_targets(targets: &[LeaderTarget]) -> (usize, usize) {
1378    let selected_target_count = targets.len();
1379    let selected_identity_count = targets
1380        .iter()
1381        .filter_map(|target| target.identity)
1382        .collect::<HashSet<_>>()
1383        .len();
1384    (selected_target_count, selected_identity_count)
1385}
1386
1387/// Rotates the target ordering between attempts to spread retries across candidates.
1388fn rotate_targets_for_attempt(
1389    targets: &mut [LeaderTarget],
1390    attempt_idx: usize,
1391    policy: RoutingPolicy,
1392) {
1393    if attempt_idx == 0 || targets.len() <= 1 {
1394        return;
1395    }
1396
1397    let normalized = policy.normalized();
1398    let stride = normalized.max_parallel_sends.max(1);
1399    let rotation = attempt_idx
1400        .saturating_mul(stride)
1401        .checked_rem(targets.len())
1402        .unwrap_or(0);
1403    if rotation > 0 {
1404        targets.rotate_left(rotation);
1405    }
1406}
1407
1408/// Bounds one submit attempt so retry loops cannot hang indefinitely.
1409fn direct_attempt_timeout(direct_config: &DirectSubmitConfig) -> Duration {
1410    direct_config
1411        .global_timeout
1412        .saturating_add(direct_config.per_target_timeout)
1413        .saturating_add(direct_config.rebroadcast_interval)
1414        .max(Duration::from_secs(8))
1415}
1416
1417#[cfg(test)]
1418#[allow(clippy::panic)]
1419mod tests {
1420    use super::*;
1421
1422    #[derive(Debug)]
1423    struct NoopOutcomeReporter;
1424
1425    impl TxSubmitOutcomeReporter for NoopOutcomeReporter {
1426        fn record_outcome(&self, _outcome: &TxSubmitOutcome) {}
1427    }
1428
1429    #[test]
1430    fn reporter_dispatcher_reuses_existing_instance() {
1431        let reporter: Arc<dyn TxSubmitOutcomeReporter> = Arc::new(NoopOutcomeReporter);
1432        let first = match OutcomeReporterDispatcher::shared(Arc::clone(&reporter)) {
1433            Ok(dispatcher) => dispatcher,
1434            Err(error) => panic!("first dispatcher failed: {error}"),
1435        };
1436        let second = match OutcomeReporterDispatcher::shared(reporter) {
1437            Ok(dispatcher) => dispatcher,
1438            Err(error) => panic!("second dispatcher failed: {error}"),
1439        };
1440
1441        assert!(Arc::ptr_eq(&first, &second));
1442    }
1443}