Skip to main content

sof_tx/submit/
client.rs

1//! Submission client implementation and mode orchestration.
2
3use std::{
4    collections::HashMap,
5    collections::HashSet,
6    net::SocketAddr,
7    panic::{AssertUnwindSafe, catch_unwind},
8    sync::{
9        Arc, Mutex, OnceLock, Weak,
10        atomic::{AtomicBool, Ordering},
11        mpsc::{self as std_mpsc, SyncSender, TrySendError},
12    },
13    thread,
14    time::{Duration, Instant, SystemTime},
15};
16
17use sof_types::SignatureBytes;
18use tokio::{
19    net::TcpStream,
20    sync::mpsc,
21    task::JoinSet,
22    time::{sleep, timeout},
23};
24
25use super::{
26    DirectSubmitConfig, DirectSubmitTransport, JitoSubmitConfig, JitoSubmitTransport,
27    RpcSubmitConfig, RpcSubmitTransport, SignedTx, SubmitError, SubmitMode, SubmitPlan,
28    SubmitReliability, SubmitResult, SubmitRoute, SubmitStrategy, SubmitTransportError,
29    TxFlowSafetyQuality, TxFlowSafetySource, TxSubmitClientBuilder, TxSubmitContext,
30    TxSubmitGuardPolicy, TxSubmitOutcome, TxSubmitOutcomeKind, TxSubmitOutcomeReporter,
31    TxToxicFlowRejectionReason, TxToxicFlowTelemetry, TxToxicFlowTelemetrySnapshot,
32};
33use crate::{
34    providers::{
35        LeaderProvider, LeaderTarget, RecentBlockhashProvider, RpcRecentBlockhashProvider,
36        StaticLeaderProvider,
37    },
38    routing::{RoutingPolicy, SignatureDeduper, select_targets},
39    submit::{JsonRpcTransport, types::TxSuppressionCache},
40};
41
42/// Transaction submission client that orchestrates RPC and direct submit modes.
43pub struct TxSubmitClient {
44    /// Blockhash source used by unsigned submit path.
45    blockhash_provider: Arc<dyn RecentBlockhashProvider>,
46    /// Optional RPC-backed blockhash source refreshed on demand before unsigned submit.
47    on_demand_blockhash_provider: Option<Arc<RpcRecentBlockhashProvider>>,
48    /// Leader source used by direct/hybrid paths.
49    leader_provider: Arc<dyn LeaderProvider>,
50    /// Optional backup validator targets.
51    backups: Vec<LeaderTarget>,
52    /// Direct routing policy.
53    policy: RoutingPolicy,
54    /// Signature dedupe window.
55    deduper: SignatureDeduper,
56    /// Optional RPC transport.
57    rpc_transport: Option<Arc<dyn RpcSubmitTransport>>,
58    /// Optional direct transport.
59    direct_transport: Option<Arc<dyn DirectSubmitTransport>>,
60    /// Optional Jito transport.
61    jito_transport: Option<Arc<dyn JitoSubmitTransport>>,
62    /// RPC tuning.
63    rpc_config: RpcSubmitConfig,
64    /// Jito tuning.
65    jito_config: JitoSubmitConfig,
66    /// Direct tuning.
67    direct_config: DirectSubmitConfig,
68    /// Optional toxic-flow guard source.
69    flow_safety_source: Option<Arc<dyn TxFlowSafetySource>>,
70    /// Guard policy applied before submit.
71    guard_policy: TxSubmitGuardPolicy,
72    /// Built-in suppression keys.
73    suppression: TxSuppressionCache,
74    /// Built-in toxic-flow telemetry sink.
75    telemetry: Arc<TxToxicFlowTelemetry>,
76    /// Optional external outcome reporter handle.
77    outcome_reporter: Option<OutcomeReporterHandle>,
78}
79
80impl TxSubmitClient {
81    /// Creates a high-level builder for common submit configurations.
82    #[must_use]
83    pub fn builder() -> TxSubmitClientBuilder {
84        TxSubmitClientBuilder::new()
85    }
86
87    /// Creates a submission client with no transports preconfigured.
88    #[must_use]
89    pub fn new(
90        blockhash_provider: Arc<dyn RecentBlockhashProvider>,
91        leader_provider: Arc<dyn LeaderProvider>,
92    ) -> Self {
93        Self {
94            blockhash_provider,
95            on_demand_blockhash_provider: None,
96            leader_provider,
97            backups: Vec::new(),
98            policy: RoutingPolicy::default(),
99            deduper: SignatureDeduper::new(Duration::from_secs(10)),
100            rpc_transport: None,
101            direct_transport: None,
102            jito_transport: None,
103            rpc_config: RpcSubmitConfig::default(),
104            jito_config: JitoSubmitConfig::default(),
105            direct_config: DirectSubmitConfig::default(),
106            flow_safety_source: None,
107            guard_policy: TxSubmitGuardPolicy::default(),
108            suppression: TxSuppressionCache::default(),
109            telemetry: TxToxicFlowTelemetry::shared(),
110            outcome_reporter: None,
111        }
112    }
113
114    /// Creates a client with an empty leader source for blockhash-only submit paths.
115    #[must_use]
116    pub fn blockhash_only(blockhash_provider: Arc<dyn RecentBlockhashProvider>) -> Self {
117        Self::new(
118            blockhash_provider,
119            Arc::new(StaticLeaderProvider::default()),
120        )
121    }
122
123    /// Creates a client with RPC-backed on-demand blockhash sourcing and no leader routing.
124    ///
125    /// # Errors
126    ///
127    /// Returns [`SubmitTransportError`] when the RPC-backed blockhash provider cannot be created.
128    pub fn blockhash_via_rpc(rpc_url: impl Into<String>) -> Result<Self, SubmitTransportError> {
129        let blockhash_provider = Arc::new(RpcRecentBlockhashProvider::new(rpc_url.into())?);
130        Ok(Self::blockhash_only(blockhash_provider.clone())
131            .with_rpc_blockhash_provider(blockhash_provider))
132    }
133
134    /// Creates an RPC-only client from one RPC URL used for both blockhash and submission.
135    ///
136    /// # Errors
137    ///
138    /// Returns [`SubmitTransportError`] when the RPC transport or blockhash provider
139    /// cannot be initialized.
140    pub fn rpc_only(rpc_url: impl Into<String>) -> Result<Self, SubmitTransportError> {
141        let rpc_url = rpc_url.into();
142        let client = Self::blockhash_via_rpc(rpc_url.clone())?;
143        let rpc_transport = Arc::new(JsonRpcTransport::new(rpc_url)?);
144        Ok(client.with_rpc_transport(rpc_transport))
145    }
146
147    /// Sets optional backup validators.
148    #[must_use]
149    pub fn with_backups(mut self, backups: Vec<LeaderTarget>) -> Self {
150        self.backups = backups;
151        self
152    }
153
154    /// Sets routing policy.
155    #[must_use]
156    pub fn with_routing_policy(mut self, policy: RoutingPolicy) -> Self {
157        self.policy = policy.normalized();
158        self
159    }
160
161    /// Sets dedupe TTL.
162    #[must_use]
163    pub fn with_dedupe_ttl(mut self, ttl: Duration) -> Self {
164        self.deduper = SignatureDeduper::new(ttl);
165        self
166    }
167
168    /// Sets RPC transport.
169    #[must_use]
170    pub fn with_rpc_transport(mut self, transport: Arc<dyn RpcSubmitTransport>) -> Self {
171        self.rpc_transport = Some(transport);
172        self
173    }
174
175    /// Sets direct transport.
176    #[must_use]
177    pub fn with_direct_transport(mut self, transport: Arc<dyn DirectSubmitTransport>) -> Self {
178        self.direct_transport = Some(transport);
179        self
180    }
181
182    /// Sets Jito transport.
183    #[must_use]
184    pub fn with_jito_transport(mut self, transport: Arc<dyn JitoSubmitTransport>) -> Self {
185        self.jito_transport = Some(transport);
186        self
187    }
188
189    /// Sets RPC submit tuning.
190    #[must_use]
191    pub fn with_rpc_config(mut self, config: RpcSubmitConfig) -> Self {
192        self.rpc_config = config;
193        self
194    }
195
196    /// Sets Jito submit tuning.
197    #[must_use]
198    pub const fn with_jito_config(mut self, config: JitoSubmitConfig) -> Self {
199        self.jito_config = config;
200        self
201    }
202
203    /// Sets direct submit tuning.
204    #[must_use]
205    pub const fn with_direct_config(mut self, config: DirectSubmitConfig) -> Self {
206        self.direct_config = config.normalized();
207        self
208    }
209
210    /// Sets direct/hybrid reliability profile.
211    #[must_use]
212    pub const fn with_reliability(mut self, reliability: SubmitReliability) -> Self {
213        self.direct_config = DirectSubmitConfig::from_reliability(reliability);
214        self
215    }
216
217    /// Sets the toxic-flow guard source used before submission.
218    #[must_use]
219    pub fn with_flow_safety_source(mut self, source: Arc<dyn TxFlowSafetySource>) -> Self {
220        self.flow_safety_source = Some(source);
221        self
222    }
223
224    /// Sets the toxic-flow guard policy.
225    #[must_use]
226    pub const fn with_guard_policy(mut self, policy: TxSubmitGuardPolicy) -> Self {
227        self.guard_policy = policy;
228        self
229    }
230
231    /// Sets an optional external outcome reporter.
232    #[must_use]
233    pub fn with_outcome_reporter(mut self, reporter: Arc<dyn TxSubmitOutcomeReporter>) -> Self {
234        self.outcome_reporter = Some(OutcomeReporterHandle::new(reporter));
235        self
236    }
237
238    /// Registers an RPC-backed blockhash provider to refresh on demand for unsigned submit paths.
239    #[must_use]
240    pub fn with_rpc_blockhash_provider(
241        mut self,
242        provider: Arc<RpcRecentBlockhashProvider>,
243    ) -> Self {
244        self.on_demand_blockhash_provider = Some(provider);
245        self
246    }
247
248    /// Returns the current built-in toxic-flow telemetry snapshot.
249    #[must_use]
250    pub fn toxic_flow_telemetry(&self) -> TxToxicFlowTelemetrySnapshot {
251        self.telemetry.snapshot()
252    }
253
254    /// Records one external terminal outcome against the built-in telemetry and optional reporter.
255    pub fn record_external_outcome(&self, outcome: &TxSubmitOutcome) {
256        record_external_outcome_shared(&self.telemetry, self.outcome_reporter.as_ref(), outcome);
257    }
258
259    /// Submits externally signed transaction bytes.
260    ///
261    /// # Errors
262    ///
263    /// Returns [`SubmitError`] when decoding, dedupe, routing, or submission fails.
264    pub async fn submit_signed(
265        &mut self,
266        signed_tx: SignedTx,
267        mode: SubmitMode,
268    ) -> Result<SubmitResult, SubmitError> {
269        self.submit_signed_with_context(signed_tx, mode, TxSubmitContext::default())
270            .await
271    }
272
273    /// Submits externally signed transaction bytes through one route plan.
274    ///
275    /// # Errors
276    ///
277    /// Returns [`SubmitError`] when decoding, dedupe, routing, or submission fails.
278    pub async fn submit_signed_via(
279        &mut self,
280        signed_tx: SignedTx,
281        plan: SubmitPlan,
282    ) -> Result<SubmitResult, SubmitError> {
283        self.submit_signed_with_context_via(signed_tx, plan, TxSubmitContext::default())
284            .await
285    }
286
287    /// Submits externally signed transaction bytes with explicit toxic-flow context.
288    ///
289    /// # Errors
290    ///
291    /// Returns [`SubmitError`] when decoding, dedupe, toxic-flow guards, routing, or submission
292    /// fails.
293    pub async fn submit_signed_with_context(
294        &mut self,
295        signed_tx: SignedTx,
296        mode: SubmitMode,
297        context: TxSubmitContext,
298    ) -> Result<SubmitResult, SubmitError> {
299        self.submit_signed_with_context_via(signed_tx, SubmitPlan::from(mode), context)
300            .await
301    }
302
303    /// Submits externally signed transaction bytes with explicit toxic-flow context and route
304    /// plan.
305    ///
306    /// # Errors
307    ///
308    /// Returns [`SubmitError`] when decoding, dedupe, toxic-flow guards, routing, or submission
309    /// fails.
310    pub async fn submit_signed_with_context_via(
311        &mut self,
312        signed_tx: SignedTx,
313        plan: SubmitPlan,
314        context: TxSubmitContext,
315    ) -> Result<SubmitResult, SubmitError> {
316        let tx_bytes = match signed_tx {
317            SignedTx::VersionedTransactionBytes(bytes) => bytes,
318            SignedTx::WireTransactionBytes(bytes) => bytes,
319        };
320        let signature = extract_first_signature(&tx_bytes)?;
321        self.submit_bytes(tx_bytes, signature, plan, context).await
322    }
323
324    /// Refreshes any configured on-demand RPC blockhash source and returns the latest bytes.
325    ///
326    /// This is intended for explicit compatibility layers that build Solana-native transactions
327    /// outside the core byte-oriented `sof-tx` API surface.
328    ///
329    /// # Errors
330    ///
331    /// Returns [`SubmitTransportError`] when the RPC-backed blockhash refresh fails.
332    pub async fn refresh_latest_blockhash_bytes(
333        &self,
334    ) -> Result<Option<[u8; 32]>, SubmitTransportError> {
335        if let Some(provider) = &self.on_demand_blockhash_provider {
336            let _ = provider.refresh().await?;
337        }
338        Ok(self.latest_blockhash_bytes())
339    }
340
341    /// Returns the latest cached recent blockhash bytes when available.
342    #[must_use]
343    pub fn latest_blockhash_bytes(&self) -> Option<[u8; 32]> {
344        self.blockhash_provider.latest_blockhash()
345    }
346
347    /// Submits raw tx bytes after dedupe check.
348    async fn submit_bytes(
349        &mut self,
350        tx_bytes: Vec<u8>,
351        signature: Option<SignatureBytes>,
352        plan: SubmitPlan,
353        context: TxSubmitContext,
354    ) -> Result<SubmitResult, SubmitError> {
355        let plan = plan.into_normalized();
356        self.validate_submit_plan(&plan)?;
357        self.enforce_toxic_flow_guards(signature, &plan, &context)?;
358        self.enforce_dedupe(signature)?;
359        let tx_bytes = Arc::<[u8]>::from(tx_bytes);
360        match plan.strategy {
361            SubmitStrategy::OrderedFallback => {
362                self.submit_routes_in_order(tx_bytes, signature, plan).await
363            }
364            SubmitStrategy::AllAtOnce => {
365                self.submit_routes_all_at_once(tx_bytes, signature, plan)
366                    .await
367            }
368        }
369    }
370
371    /// Applies signature dedupe policy.
372    fn enforce_dedupe(&mut self, signature: Option<SignatureBytes>) -> Result<(), SubmitError> {
373        if let Some(signature) = signature {
374            let now = Instant::now();
375            if !self.deduper.check_and_insert(signature, now) {
376                return Err(SubmitError::DuplicateSignature);
377            }
378        }
379        Ok(())
380    }
381
382    /// Applies toxic-flow guard policy before transport.
383    fn enforce_toxic_flow_guards(
384        &mut self,
385        signature: Option<SignatureBytes>,
386        plan: &SubmitPlan,
387        context: &TxSubmitContext,
388    ) -> Result<(), SubmitError> {
389        let legacy_mode = plan.legacy_mode();
390        let now = SystemTime::now();
391        let opportunity_age_ms = context
392            .opportunity_created_at
393            .and_then(|created_at| now.duration_since(created_at).ok())
394            .map(|duration| duration.as_millis().min(u128::from(u64::MAX)) as u64);
395        if let Some(age_ms) = opportunity_age_ms
396            && let Some(max_age) = self.guard_policy.max_opportunity_age
397        {
398            let max_allowed_ms = max_age.as_millis().min(u128::from(u64::MAX)) as u64;
399            if age_ms > max_allowed_ms {
400                return Err(self.reject_with_outcome(
401                    TxToxicFlowRejectionReason::OpportunityStale {
402                        age_ms,
403                        max_allowed_ms,
404                    },
405                    TxSubmitOutcomeKind::RejectedDueToStaleness,
406                    RejectionMetadata {
407                        signature,
408                        plan: plan.clone(),
409                        legacy_mode,
410                        state_version: None,
411                        opportunity_age_ms,
412                    },
413                ));
414            }
415        }
416
417        if self.suppression.is_suppressed(
418            &context.suppression_keys,
419            now,
420            self.guard_policy.suppression_ttl,
421        ) {
422            return Err(self.reject_with_outcome(
423                TxToxicFlowRejectionReason::Suppressed,
424                TxSubmitOutcomeKind::Suppressed,
425                RejectionMetadata {
426                    signature,
427                    plan: plan.clone(),
428                    legacy_mode,
429                    state_version: None,
430                    opportunity_age_ms,
431                },
432            ));
433        }
434
435        if let Some(source) = &self.flow_safety_source {
436            let snapshot = source.toxic_flow_snapshot();
437            if self.guard_policy.reject_on_replay_recovery_pending
438                && snapshot.replay_recovery_pending
439            {
440                return Err(self.reject_with_outcome(
441                    TxToxicFlowRejectionReason::ReplayRecoveryPending,
442                    TxSubmitOutcomeKind::RejectedDueToReplayRecovery,
443                    RejectionMetadata {
444                        signature,
445                        plan: plan.clone(),
446                        legacy_mode,
447                        state_version: snapshot.current_state_version,
448                        opportunity_age_ms,
449                    },
450                ));
451            }
452            if self.guard_policy.require_stable_control_plane
453                && !matches!(snapshot.quality, TxFlowSafetyQuality::Stable)
454            {
455                let outcome_kind = match snapshot.quality {
456                    TxFlowSafetyQuality::ReorgRisk | TxFlowSafetyQuality::Provisional => {
457                        TxSubmitOutcomeKind::RejectedDueToReorgRisk
458                    }
459                    TxFlowSafetyQuality::Stale => TxSubmitOutcomeKind::RejectedDueToStaleness,
460                    TxFlowSafetyQuality::Degraded
461                    | TxFlowSafetyQuality::IncompleteControlPlane
462                    | TxFlowSafetyQuality::Stable => TxSubmitOutcomeKind::Suppressed,
463                };
464                return Err(self.reject_with_outcome(
465                    TxToxicFlowRejectionReason::UnsafeControlPlane {
466                        quality: snapshot.quality,
467                    },
468                    outcome_kind,
469                    RejectionMetadata {
470                        signature,
471                        plan: plan.clone(),
472                        legacy_mode,
473                        state_version: snapshot.current_state_version,
474                        opportunity_age_ms,
475                    },
476                ));
477            }
478            if let (Some(decision_version), Some(current_version), Some(max_allowed)) = (
479                context.decision_state_version,
480                snapshot.current_state_version,
481                self.guard_policy.max_state_version_drift,
482            ) {
483                let drift = current_version.saturating_sub(decision_version);
484                if drift > max_allowed {
485                    return Err(self.reject_with_outcome(
486                        TxToxicFlowRejectionReason::StateDrift { drift, max_allowed },
487                        TxSubmitOutcomeKind::RejectedDueToStateDrift,
488                        RejectionMetadata {
489                            signature,
490                            plan: plan.clone(),
491                            legacy_mode,
492                            state_version: Some(current_version),
493                            opportunity_age_ms,
494                        },
495                    ));
496                }
497            }
498        }
499
500        self.suppression.insert_all(&context.suppression_keys, now);
501        Ok(())
502    }
503
504    /// Builds one rejection error while recording telemetry and reporting.
505    fn reject_with_outcome(
506        &self,
507        reason: TxToxicFlowRejectionReason,
508        outcome_kind: TxSubmitOutcomeKind,
509        metadata: RejectionMetadata,
510    ) -> SubmitError {
511        let outcome = TxSubmitOutcome {
512            kind: outcome_kind,
513            signature: metadata.signature,
514            route: None,
515            plan: metadata.plan,
516            legacy_mode: metadata.legacy_mode,
517            rpc_signature: None,
518            jito_signature: None,
519            jito_bundle_id: None,
520            state_version: metadata.state_version,
521            opportunity_age_ms: metadata.opportunity_age_ms,
522        };
523        self.record_external_outcome(&outcome);
524        SubmitError::ToxicFlow { reason }
525    }
526
527    /// Validates that every configured route has the required transport wiring.
528    fn validate_submit_plan(&self, plan: &SubmitPlan) -> Result<(), SubmitError> {
529        if plan.routes.is_empty() {
530            return Err(SubmitError::InternalSync {
531                message: "submit plan must contain at least one route".to_owned(),
532            });
533        }
534        for route in &plan.routes {
535            match route {
536                SubmitRoute::Rpc if self.rpc_transport.is_none() => {
537                    return Err(SubmitError::MissingRpcTransport);
538                }
539                SubmitRoute::Jito if self.jito_transport.is_none() => {
540                    return Err(SubmitError::MissingJitoTransport);
541                }
542                SubmitRoute::Direct if self.direct_transport.is_none() => {
543                    return Err(SubmitError::MissingDirectTransport);
544                }
545                SubmitRoute::Rpc | SubmitRoute::Jito | SubmitRoute::Direct => {}
546            }
547        }
548        Ok(())
549    }
550
551    /// Executes one route plan in order and returns the first successful route.
552    async fn submit_routes_in_order(
553        &self,
554        tx_bytes: Arc<[u8]>,
555        signature: Option<SignatureBytes>,
556        plan: SubmitPlan,
557    ) -> Result<SubmitResult, SubmitError> {
558        let legacy_mode = plan.legacy_mode();
559        let mut last_error = None;
560        let task_context = self.route_task_context();
561        for (route_idx, route) in plan.routes.iter().copied().enumerate() {
562            let next_idx = route_idx.saturating_add(1);
563            let has_later_routes = plan.routes.get(next_idx).is_some();
564            let direct_mode = if has_later_routes {
565                DirectExecutionMode::Fallback
566            } else {
567                DirectExecutionMode::Standalone
568            };
569            match submit_one_route_task(
570                route,
571                Arc::clone(&tx_bytes),
572                task_context.clone(),
573                direct_mode,
574            )
575            .await
576            {
577                Ok(outcome) => {
578                    self.record_route_outcome(signature, &plan, &outcome);
579                    if matches!(outcome.route, SubmitRoute::Direct) {
580                        self.spawn_agave_rebroadcast(
581                            Arc::clone(&tx_bytes),
582                            &self.direct_config.clone().normalized(),
583                        );
584                        if has_later_routes
585                            && self.direct_config.hybrid_rpc_broadcast
586                            && plan
587                                .routes
588                                .iter()
589                                .skip(next_idx)
590                                .any(|next| *next == SubmitRoute::Rpc)
591                        {
592                            self.spawn_background_rpc_broadcast(
593                                Arc::clone(&tx_bytes),
594                                signature,
595                                plan.clone(),
596                            );
597                        }
598                    }
599                    return Ok(SubmitResult {
600                        signature,
601                        plan,
602                        legacy_mode,
603                        first_success_route: Some(outcome.route),
604                        successful_routes: vec![outcome.route],
605                        direct_target: outcome.direct_target,
606                        rpc_signature: outcome.rpc_signature,
607                        jito_signature: outcome.jito_signature,
608                        jito_bundle_id: outcome.jito_bundle_id,
609                        used_fallback_route: route_idx > 0,
610                        selected_target_count: outcome.selected_target_count,
611                        selected_identity_count: outcome.selected_identity_count,
612                    });
613                }
614                Err(error) => last_error = Some(error),
615            }
616        }
617        Err(last_error.unwrap_or_else(|| SubmitError::InternalSync {
618            message: "ordered submit plan completed without a route outcome".to_owned(),
619        }))
620    }
621
622    /// Executes every configured route at once and returns on the first successful route.
623    async fn submit_routes_all_at_once(
624        &self,
625        tx_bytes: Arc<[u8]>,
626        signature: Option<SignatureBytes>,
627        plan: SubmitPlan,
628    ) -> Result<SubmitResult, SubmitError> {
629        let legacy_mode = plan.legacy_mode();
630        let (result_tx, mut result_rx) = mpsc::unbounded_channel();
631        for (route_idx, route) in plan.routes.iter().copied().enumerate() {
632            let task_context = self.route_task_context();
633            let tx_bytes = Arc::clone(&tx_bytes);
634            let result_tx = result_tx.clone();
635            let telemetry = Arc::clone(&self.telemetry);
636            let reporter = self.outcome_reporter.clone();
637            let flow_safety_source = self.flow_safety_source.clone();
638            let plan_for_task = plan.clone();
639            let direct_transport = self.direct_transport.clone();
640            let leader_provider = self.leader_provider.clone();
641            let backups = self.backups.clone();
642            let policy = self.policy;
643            let direct_config = self.direct_config.clone().normalized();
644            tokio::spawn(async move {
645                let result = submit_one_route_task(
646                    route,
647                    Arc::clone(&tx_bytes),
648                    task_context,
649                    DirectExecutionMode::Standalone,
650                )
651                .await;
652                if let Ok(outcome) = &result {
653                    record_route_outcome_shared(
654                        &telemetry,
655                        reporter.as_ref(),
656                        flow_safety_source.as_ref(),
657                        signature,
658                        &plan_for_task,
659                        outcome,
660                    );
661                    if matches!(outcome.route, SubmitRoute::Direct)
662                        && direct_config.agave_rebroadcast_enabled
663                        && !direct_config.agave_rebroadcast_window.is_zero()
664                        && let Some(direct_transport) = direct_transport
665                    {
666                        spawn_agave_rebroadcast_task(
667                            Arc::clone(&tx_bytes),
668                            direct_transport,
669                            leader_provider,
670                            backups,
671                            policy,
672                            direct_config.clone(),
673                        );
674                    }
675                }
676                drop(result_tx.send((route_idx, result)));
677            });
678        }
679        drop(result_tx);
680
681        let mut errors_by_route: Vec<Option<SubmitError>> = std::iter::repeat_with(|| None)
682            .take(plan.routes.len())
683            .collect();
684        while let Some((route_idx, result)) = result_rx.recv().await {
685            match result {
686                Ok(outcome) => {
687                    return Ok(SubmitResult {
688                        signature,
689                        plan,
690                        legacy_mode,
691                        first_success_route: Some(outcome.route),
692                        successful_routes: vec![outcome.route],
693                        direct_target: outcome.direct_target,
694                        rpc_signature: outcome.rpc_signature,
695                        jito_signature: outcome.jito_signature,
696                        jito_bundle_id: outcome.jito_bundle_id,
697                        used_fallback_route: false,
698                        selected_target_count: outcome.selected_target_count,
699                        selected_identity_count: outcome.selected_identity_count,
700                    });
701                }
702                Err(error) => {
703                    if let Some(slot) = errors_by_route.get_mut(route_idx) {
704                        *slot = Some(error);
705                    }
706                }
707            }
708        }
709
710        Err(errors_by_route
711            .into_iter()
712            .flatten()
713            .next()
714            .unwrap_or_else(|| SubmitError::InternalSync {
715                message: "all-at-once submit plan completed without a route outcome".to_owned(),
716            }))
717    }
718
719    /// Records one accepted route as a terminal telemetry outcome.
720    fn record_route_outcome(
721        &self,
722        signature: Option<SignatureBytes>,
723        plan: &SubmitPlan,
724        outcome: &RouteSubmitOutcome,
725    ) {
726        record_route_outcome_shared(
727            &self.telemetry,
728            self.outcome_reporter.as_ref(),
729            self.flow_safety_source.as_ref(),
730            signature,
731            plan,
732            outcome,
733        );
734    }
735
736    /// Clones the current route transports and config into one per-attempt task context.
737    fn route_task_context(&self) -> RouteTaskContext {
738        RouteTaskContext {
739            rpc_transport: self.rpc_transport.clone(),
740            jito_transport: self.jito_transport.clone(),
741            direct_transport: self.direct_transport.clone(),
742            leader_provider: self.leader_provider.clone(),
743            backups: Arc::from(self.backups.clone()),
744            policy: self.policy,
745            rpc_config: self.rpc_config.clone(),
746            jito_config: self.jito_config.clone(),
747            direct_config: self.direct_config.clone().normalized(),
748        }
749    }
750
751    /// Starts the post-ack rebroadcast worker when that reliability mode is enabled.
752    fn spawn_agave_rebroadcast(&self, tx_bytes: Arc<[u8]>, direct_config: &DirectSubmitConfig) {
753        if !direct_config.agave_rebroadcast_enabled
754            || direct_config.agave_rebroadcast_window.is_zero()
755        {
756            return;
757        }
758        let Some(direct_transport) = self.direct_transport.clone() else {
759            return;
760        };
761        spawn_agave_rebroadcast_task(
762            tx_bytes,
763            direct_transport,
764            self.leader_provider.clone(),
765            self.backups.clone(),
766            self.policy,
767            direct_config.clone(),
768        );
769    }
770
771    /// Starts one best-effort background RPC rebroadcast without delaying direct success.
772    fn spawn_background_rpc_broadcast(
773        &self,
774        tx_bytes: Arc<[u8]>,
775        signature: Option<SignatureBytes>,
776        plan: SubmitPlan,
777    ) {
778        let Some(rpc) = self.rpc_transport.clone() else {
779            return;
780        };
781        let rpc_config = self.rpc_config.clone();
782        let telemetry = Arc::clone(&self.telemetry);
783        let reporter = self.outcome_reporter.clone();
784        let flow_safety_source = self.flow_safety_source.clone();
785        tokio::spawn(async move {
786            if let Ok(rpc_signature) = rpc.submit_rpc(tx_bytes.as_ref(), &rpc_config).await {
787                let outcome = RouteSubmitOutcome {
788                    route: SubmitRoute::Rpc,
789                    direct_target: None,
790                    rpc_signature: Some(rpc_signature),
791                    jito_signature: None,
792                    jito_bundle_id: None,
793                    selected_target_count: 0,
794                    selected_identity_count: 0,
795                };
796                record_route_outcome_shared(
797                    &telemetry,
798                    reporter.as_ref(),
799                    flow_safety_source.as_ref(),
800                    signature,
801                    &plan,
802                    &outcome,
803                );
804            }
805        });
806    }
807}
808
809/// Carries toxic-flow rejection metadata into the shared rejection helper.
810#[derive(Debug, Clone)]
811struct RejectionMetadata {
812    /// Signature being evaluated, when already known.
813    signature: Option<SignatureBytes>,
814    /// Route plan that was being attempted.
815    plan: SubmitPlan,
816    /// Matching legacy preset when the plan is one exact compatibility shape.
817    legacy_mode: Option<SubmitMode>,
818    /// Flow-safety state version attached to the rejection, when any.
819    state_version: Option<u64>,
820    /// Age of the triggering opportunity in milliseconds, when tracked.
821    opportunity_age_ms: Option<u64>,
822}
823
824/// Cloned submit transports and route config for one route execution task.
825#[derive(Clone)]
826struct RouteTaskContext {
827    /// RPC submit transport, when configured.
828    rpc_transport: Option<Arc<dyn RpcSubmitTransport>>,
829    /// Jito submit transport, when configured.
830    jito_transport: Option<Arc<dyn JitoSubmitTransport>>,
831    /// Direct submit transport, when configured.
832    direct_transport: Option<Arc<dyn DirectSubmitTransport>>,
833    /// Source of leader targets.
834    leader_provider: Arc<dyn LeaderProvider>,
835    /// Backup targets configured alongside the leader provider.
836    backups: Arc<[LeaderTarget]>,
837    /// Route selection policy for direct submission.
838    policy: RoutingPolicy,
839    /// RPC submit tuning.
840    rpc_config: RpcSubmitConfig,
841    /// Jito submit tuning.
842    jito_config: JitoSubmitConfig,
843    /// Direct submit tuning.
844    direct_config: DirectSubmitConfig,
845}
846
847/// Selects whether direct route execution is standalone or part of a fallback chain.
848#[derive(Debug, Clone, Copy, Eq, PartialEq)]
849enum DirectExecutionMode {
850    /// Execute direct submission using the direct-only retry budget.
851    Standalone,
852    /// Execute direct submission using the hybrid fallback retry budget.
853    Fallback,
854}
855
856/// One successful route execution before it is folded into the public submit result.
857#[derive(Debug)]
858struct RouteSubmitOutcome {
859    /// Route that accepted the submission.
860    route: SubmitRoute,
861    /// Direct target that accepted the submission, when any.
862    direct_target: Option<LeaderTarget>,
863    /// RPC signature metadata for accepted RPC submission.
864    rpc_signature: Option<String>,
865    /// Jito transaction signature metadata when available.
866    jito_signature: Option<String>,
867    /// Jito bundle id metadata when available.
868    jito_bundle_id: Option<String>,
869    /// Number of direct targets considered for this attempt.
870    selected_target_count: usize,
871    /// Number of unique identities represented by the direct targets.
872    selected_identity_count: usize,
873}
874
875/// Records one structured outcome through built-in telemetry and any external reporter.
876fn record_external_outcome_shared(
877    telemetry: &Arc<TxToxicFlowTelemetry>,
878    reporter: Option<&OutcomeReporterHandle>,
879    outcome: &TxSubmitOutcome,
880) {
881    telemetry.record(outcome);
882    if let Some(reporter) = reporter {
883        reporter.dispatch(telemetry, outcome.clone());
884    }
885}
886
887/// Records one accepted route as a terminal telemetry outcome using shared sinks.
888fn record_route_outcome_shared(
889    telemetry: &Arc<TxToxicFlowTelemetry>,
890    reporter: Option<&OutcomeReporterHandle>,
891    flow_safety_source: Option<&Arc<dyn TxFlowSafetySource>>,
892    signature: Option<SignatureBytes>,
893    plan: &SubmitPlan,
894    outcome: &RouteSubmitOutcome,
895) {
896    let kind = match outcome.route {
897        SubmitRoute::Rpc => TxSubmitOutcomeKind::RpcAccepted,
898        SubmitRoute::Jito => TxSubmitOutcomeKind::JitoAccepted,
899        SubmitRoute::Direct => TxSubmitOutcomeKind::DirectAccepted,
900    };
901    let outcome = TxSubmitOutcome {
902        kind,
903        signature,
904        route: Some(outcome.route),
905        plan: plan.clone(),
906        legacy_mode: plan.legacy_mode(),
907        rpc_signature: outcome.rpc_signature.clone(),
908        jito_signature: outcome.jito_signature.clone(),
909        jito_bundle_id: outcome.jito_bundle_id.clone(),
910        state_version: flow_safety_source
911            .and_then(|source| source.toxic_flow_snapshot().current_state_version),
912        opportunity_age_ms: None,
913    };
914    record_external_outcome_shared(telemetry, reporter, &outcome);
915}
916
917/// Outcome reporter lifecycle state stored by one client.
918#[derive(Clone)]
919enum OutcomeReporterHandle {
920    /// Ready shared dispatcher for this reporter instance.
921    Ready(Arc<OutcomeReporterDispatcher>),
922    /// Reporter worker could not be created.
923    Unavailable,
924}
925
926impl OutcomeReporterHandle {
927    /// Creates one handle for the provided reporter.
928    fn new(reporter: Arc<dyn TxSubmitOutcomeReporter>) -> Self {
929        match OutcomeReporterDispatcher::shared(reporter) {
930            Ok(dispatcher) => Self::Ready(dispatcher),
931            Err(error) => {
932                eprintln!("sof-tx: failed to start external outcome reporter worker: {error}");
933                Self::Unavailable
934            }
935        }
936    }
937
938    /// Dispatches one outcome without extending the submit hot path.
939    fn dispatch(&self, telemetry: &Arc<TxToxicFlowTelemetry>, outcome: TxSubmitOutcome) {
940        match self {
941            Self::Ready(dispatcher) => match dispatcher.dispatch(outcome) {
942                ReporterDispatchStatus::Enqueued => {}
943                ReporterDispatchStatus::DroppedFull => telemetry.record_reporter_drop(),
944                ReporterDispatchStatus::Unavailable => telemetry.record_reporter_unavailable(),
945            },
946            Self::Unavailable => telemetry.record_reporter_unavailable(),
947        }
948    }
949}
950
951/// Shared worker state for one concrete reporter instance.
952struct OutcomeReporterDispatcher {
953    /// Bounded FIFO channel to the reporter worker.
954    tx: SyncSender<TxSubmitOutcome>,
955    /// Ensures queue saturation is surfaced without spamming stderr.
956    queue_full_warned: AtomicBool,
957    /// Ensures worker disconnect is surfaced without spamming stderr.
958    unavailable_warned: AtomicBool,
959}
960
961impl OutcomeReporterDispatcher {
962    /// Maximum number of pending outcomes kept for the external reporter.
963    #[cfg(not(test))]
964    const QUEUE_CAPACITY: usize = 1024;
965    #[cfg(test)]
966    const QUEUE_CAPACITY: usize = 8;
967
968    /// Creates one shared dispatcher and worker thread.
969    fn shared(reporter: Arc<dyn TxSubmitOutcomeReporter>) -> Result<Arc<Self>, std::io::Error> {
970        let key = reporter_identity(&reporter);
971        let registry = outcome_reporter_registry();
972        {
973            let registry = registry
974                .lock()
975                .unwrap_or_else(|poisoned| poisoned.into_inner());
976            if let Some(existing) = registry.get(&key).and_then(Weak::upgrade) {
977                return Ok(existing);
978            }
979        }
980
981        let (tx, rx) = std_mpsc::sync_channel::<TxSubmitOutcome>(Self::QUEUE_CAPACITY);
982        thread::Builder::new()
983            .name("sof-tx-outcome-reporter".to_owned())
984            .spawn(move || {
985                while let Ok(outcome) = rx.recv() {
986                    drop(catch_unwind(AssertUnwindSafe(|| {
987                        reporter.record_outcome(&outcome);
988                    })));
989                }
990            })?;
991
992        let dispatcher = Arc::new(Self {
993            tx,
994            queue_full_warned: AtomicBool::new(false),
995            unavailable_warned: AtomicBool::new(false),
996        });
997        let mut registry = registry
998            .lock()
999            .unwrap_or_else(|poisoned| poisoned.into_inner());
1000        let _ = registry.insert(key, Arc::downgrade(&dispatcher));
1001        Ok(dispatcher)
1002    }
1003
1004    /// Enqueues one outcome without blocking the submit caller.
1005    fn dispatch(&self, outcome: TxSubmitOutcome) -> ReporterDispatchStatus {
1006        match self.tx.try_send(outcome) {
1007            Ok(()) => {
1008                self.queue_full_warned.store(false, Ordering::Relaxed);
1009                ReporterDispatchStatus::Enqueued
1010            }
1011            Err(TrySendError::Disconnected(_)) => {
1012                if !self.unavailable_warned.swap(true, Ordering::Relaxed) {
1013                    eprintln!(
1014                        "sof-tx: external outcome reporter worker stopped; dropping reporter outcomes"
1015                    );
1016                }
1017                ReporterDispatchStatus::Unavailable
1018            }
1019            Err(TrySendError::Full(_)) => {
1020                if !self.queue_full_warned.swap(true, Ordering::Relaxed) {
1021                    eprintln!(
1022                        "sof-tx: external outcome reporter queue is full; dropping reporter outcomes until it drains"
1023                    );
1024                }
1025                ReporterDispatchStatus::DroppedFull
1026            }
1027        }
1028    }
1029}
1030
1031/// Result of one best-effort reporter dispatch attempt.
1032enum ReporterDispatchStatus {
1033    /// Outcome was queued successfully.
1034    Enqueued,
1035    /// Outcome was dropped because the queue was full.
1036    DroppedFull,
1037    /// Outcome could not be queued because the reporter worker was unavailable.
1038    Unavailable,
1039}
1040
1041/// Shared registry of per-reporter dispatchers.
1042static OUTCOME_REPORTER_REGISTRY: OnceLock<Mutex<HashMap<usize, Weak<OutcomeReporterDispatcher>>>> =
1043    OnceLock::new();
1044
1045/// Returns the shared dispatcher registry.
1046fn outcome_reporter_registry() -> &'static Mutex<HashMap<usize, Weak<OutcomeReporterDispatcher>>> {
1047    OUTCOME_REPORTER_REGISTRY.get_or_init(|| Mutex::new(HashMap::new()))
1048}
1049
1050/// Stable identity key for one reporter instance.
1051fn reporter_identity(reporter: &Arc<dyn TxSubmitOutcomeReporter>) -> usize {
1052    Arc::as_ptr(reporter) as *const () as usize
1053}
1054
1055/// Extracts the first transaction signature from serialized transaction bytes.
1056fn extract_first_signature(tx_bytes: &[u8]) -> Result<Option<SignatureBytes>, SubmitError> {
1057    let Some((signature_count, offset)) = decode_short_vec_len(tx_bytes) else {
1058        return Err(decode_signed_bytes_error(
1059            "transaction bytes did not contain a valid signature vector prefix",
1060        ));
1061    };
1062    if signature_count == 0 {
1063        return Ok(None);
1064    }
1065    let signature_end = offset.saturating_add(64);
1066    let Some(signature_bytes) = tx_bytes.get(offset..signature_end) else {
1067        return Err(decode_signed_bytes_error(
1068            "transaction bytes ended before the first signature completed",
1069        ));
1070    };
1071    let mut signature = [0_u8; 64];
1072    signature.copy_from_slice(signature_bytes);
1073    Ok(Some(SignatureBytes::new(signature)))
1074}
1075
1076/// Decodes Solana's short-vec length prefix and returns the decoded length plus payload offset.
1077fn decode_short_vec_len(bytes: &[u8]) -> Option<(usize, usize)> {
1078    let mut value = 0_usize;
1079    let mut shift = 0_u32;
1080    for (idx, byte) in bytes.iter().copied().take(3).enumerate() {
1081        value |= usize::from(byte & 0x7f) << shift;
1082        if byte & 0x80 == 0 {
1083            return Some((value, idx.saturating_add(1)));
1084        }
1085        shift = shift.saturating_add(7);
1086    }
1087    None
1088}
1089
1090/// Builds one signed-byte decode error from a static message.
1091fn decode_signed_bytes_error(message: &'static str) -> SubmitError {
1092    SubmitError::DecodeSignedBytes {
1093        source: Box::new(bincode::ErrorKind::Custom(message.to_owned())),
1094    }
1095}
1096
1097/// Executes one concrete submit route with the cloned task context for that attempt.
1098async fn submit_one_route_task(
1099    route: SubmitRoute,
1100    tx_bytes: Arc<[u8]>,
1101    task_context: RouteTaskContext,
1102    direct_mode: DirectExecutionMode,
1103) -> Result<RouteSubmitOutcome, SubmitError> {
1104    match route {
1105        SubmitRoute::Rpc => {
1106            let rpc = task_context
1107                .rpc_transport
1108                .ok_or(SubmitError::MissingRpcTransport)?;
1109            let rpc_signature = rpc
1110                .submit_rpc(tx_bytes.as_ref(), &task_context.rpc_config)
1111                .await
1112                .map_err(|source| SubmitError::Rpc { source })?;
1113            Ok(RouteSubmitOutcome {
1114                route,
1115                direct_target: None,
1116                rpc_signature: Some(rpc_signature),
1117                jito_signature: None,
1118                jito_bundle_id: None,
1119                selected_target_count: 0,
1120                selected_identity_count: 0,
1121            })
1122        }
1123        SubmitRoute::Jito => {
1124            let jito = task_context
1125                .jito_transport
1126                .ok_or(SubmitError::MissingJitoTransport)?;
1127            let response = jito
1128                .submit_jito(tx_bytes.as_ref(), &task_context.jito_config)
1129                .await
1130                .map_err(|source| SubmitError::Jito { source })?;
1131            Ok(RouteSubmitOutcome {
1132                route,
1133                direct_target: None,
1134                rpc_signature: None,
1135                jito_signature: response.transaction_signature,
1136                jito_bundle_id: response.bundle_id,
1137                selected_target_count: 0,
1138                selected_identity_count: 0,
1139            })
1140        }
1141        SubmitRoute::Direct => {
1142            let direct = task_context
1143                .direct_transport
1144                .ok_or(SubmitError::MissingDirectTransport)?;
1145            let attempt_timeout = direct_attempt_timeout(&task_context.direct_config);
1146            let attempt_count = match direct_mode {
1147                DirectExecutionMode::Standalone => {
1148                    task_context.direct_config.direct_submit_attempts
1149                }
1150                DirectExecutionMode::Fallback => task_context.direct_config.hybrid_direct_attempts,
1151            };
1152            let mut last_error = None;
1153            for attempt_idx in 0..attempt_count {
1154                let mut targets = select_and_rank_targets(
1155                    task_context.leader_provider.as_ref(),
1156                    task_context.backups.as_ref(),
1157                    task_context.policy,
1158                    &task_context.direct_config,
1159                )
1160                .await;
1161                rotate_targets_for_attempt(&mut targets, attempt_idx, task_context.policy);
1162                let (selected_target_count, selected_identity_count) = summarize_targets(&targets);
1163                if targets.is_empty() {
1164                    if matches!(direct_mode, DirectExecutionMode::Fallback) {
1165                        break;
1166                    }
1167                    return Err(SubmitError::NoDirectTargets);
1168                }
1169                match timeout(
1170                    attempt_timeout,
1171                    direct.submit_direct(
1172                        tx_bytes.as_ref(),
1173                        &targets,
1174                        task_context.policy,
1175                        &task_context.direct_config,
1176                    ),
1177                )
1178                .await
1179                {
1180                    Ok(Ok(target)) => {
1181                        return Ok(RouteSubmitOutcome {
1182                            route,
1183                            direct_target: Some(target),
1184                            rpc_signature: None,
1185                            jito_signature: None,
1186                            jito_bundle_id: None,
1187                            selected_target_count,
1188                            selected_identity_count,
1189                        });
1190                    }
1191                    Ok(Err(source)) => last_error = Some(source),
1192                    Err(_elapsed) => {
1193                        last_error = Some(SubmitTransportError::Failure {
1194                            message: format!(
1195                                "direct submit attempt timed out after {}ms",
1196                                attempt_timeout.as_millis()
1197                            ),
1198                        });
1199                    }
1200                }
1201                if attempt_idx < attempt_count.saturating_sub(1) {
1202                    sleep(task_context.direct_config.rebroadcast_interval).await;
1203                }
1204            }
1205            Err(SubmitError::Direct {
1206                source: last_error.unwrap_or_else(|| SubmitTransportError::Failure {
1207                    message: "direct submit attempts exhausted".to_owned(),
1208                }),
1209            })
1210        }
1211    }
1212}
1213
1214#[cfg(not(test))]
1215/// Replays successful direct submissions for a bounded Agave-like persistence window.
1216fn spawn_agave_rebroadcast_task(
1217    tx_bytes: Arc<[u8]>,
1218    direct_transport: Arc<dyn DirectSubmitTransport>,
1219    leader_provider: Arc<dyn LeaderProvider>,
1220    backups: Vec<LeaderTarget>,
1221    policy: RoutingPolicy,
1222    direct_config: DirectSubmitConfig,
1223) {
1224    tokio::spawn(async move {
1225        let deadline = Instant::now()
1226            .checked_add(direct_config.agave_rebroadcast_window)
1227            .unwrap_or_else(Instant::now);
1228        loop {
1229            let now = Instant::now();
1230            if now >= deadline {
1231                break;
1232            }
1233
1234            let sleep_for = deadline
1235                .saturating_duration_since(now)
1236                .min(direct_config.agave_rebroadcast_interval);
1237            if !sleep_for.is_zero() {
1238                sleep(sleep_for).await;
1239            }
1240
1241            if Instant::now() >= deadline {
1242                break;
1243            }
1244
1245            let targets = select_and_rank_targets(
1246                leader_provider.as_ref(),
1247                backups.as_slice(),
1248                policy,
1249                &direct_config,
1250            )
1251            .await;
1252            if targets.is_empty() {
1253                continue;
1254            }
1255
1256            drop(
1257                timeout(
1258                    direct_attempt_timeout(&direct_config),
1259                    direct_transport.submit_direct(
1260                        tx_bytes.as_ref(),
1261                        &targets,
1262                        policy,
1263                        &direct_config,
1264                    ),
1265                )
1266                .await,
1267            );
1268        }
1269    });
1270}
1271
1272#[cfg(test)]
1273/// Test-only stub that disables background rebroadcasting for deterministic assertions.
1274fn spawn_agave_rebroadcast_task(
1275    _tx_bytes: Arc<[u8]>,
1276    _direct_transport: Arc<dyn DirectSubmitTransport>,
1277    _leader_provider: Arc<dyn LeaderProvider>,
1278    _backups: Vec<LeaderTarget>,
1279    _policy: RoutingPolicy,
1280    _direct_config: DirectSubmitConfig,
1281) {
1282}
1283
1284/// Selects routing targets and applies optional latency-aware ranking.
1285async fn select_and_rank_targets(
1286    leader_provider: &(impl LeaderProvider + ?Sized),
1287    backups: &[LeaderTarget],
1288    policy: RoutingPolicy,
1289    direct_config: &DirectSubmitConfig,
1290) -> Vec<LeaderTarget> {
1291    let targets = select_targets(leader_provider, backups, policy);
1292    rank_targets_by_latency(targets, direct_config).await
1293}
1294
1295/// Reorders the probe set by observed TCP connect latency while preserving the tail order.
1296async fn rank_targets_by_latency(
1297    targets: Vec<LeaderTarget>,
1298    direct_config: &DirectSubmitConfig,
1299) -> Vec<LeaderTarget> {
1300    if targets.len() <= 1 || !direct_config.latency_aware_targeting {
1301        return targets;
1302    }
1303
1304    let probe_timeout = direct_config.latency_probe_timeout;
1305    let probe_port = direct_config.latency_probe_port;
1306    let probe_count = targets
1307        .len()
1308        .min(direct_config.latency_probe_max_targets.max(1));
1309    let mut latencies = vec![None; probe_count];
1310    let mut probes = JoinSet::new();
1311    for (idx, target) in targets.iter().take(probe_count).cloned().enumerate() {
1312        probes.spawn(async move {
1313            (
1314                idx,
1315                probe_target_latency(&target, probe_port, probe_timeout).await,
1316            )
1317        });
1318    }
1319    while let Some(result) = probes.join_next().await {
1320        if let Ok((idx, latency)) = result
1321            && idx < latencies.len()
1322            && let Some(slot) = latencies.get_mut(idx)
1323        {
1324            *slot = latency;
1325        }
1326    }
1327
1328    let mut ranked = targets
1329        .iter()
1330        .take(probe_count)
1331        .cloned()
1332        .enumerate()
1333        .collect::<Vec<_>>();
1334    ranked.sort_by_key(|(idx, _target)| {
1335        (
1336            latencies.get(*idx).copied().flatten().unwrap_or(u128::MAX),
1337            *idx,
1338        )
1339    });
1340
1341    let mut output = ranked
1342        .into_iter()
1343        .map(|(_idx, target)| target)
1344        .collect::<Vec<_>>();
1345    output.extend(targets.iter().skip(probe_count).cloned());
1346    output
1347}
1348
1349/// Probes a target's candidate ports and keeps the best observed connect latency.
1350async fn probe_target_latency(
1351    target: &LeaderTarget,
1352    probe_port: Option<u16>,
1353    probe_timeout: Duration,
1354) -> Option<u128> {
1355    let mut ports = vec![target.tpu_addr.port()];
1356    if let Some(port) = probe_port
1357        && port != target.tpu_addr.port()
1358    {
1359        ports.push(port);
1360    }
1361
1362    let ip = target.tpu_addr.ip();
1363    let mut best = None::<u128>;
1364    for port in ports {
1365        if let Some(latency) = probe_tcp_latency(ip, port, probe_timeout).await {
1366            best = Some(best.map_or(latency, |current| current.min(latency)));
1367        }
1368    }
1369    best
1370}
1371
1372/// Measures one TCP connect attempt and returns elapsed milliseconds on success.
1373async fn probe_tcp_latency(
1374    ip: std::net::IpAddr,
1375    port: u16,
1376    timeout_duration: Duration,
1377) -> Option<u128> {
1378    let start = Instant::now();
1379    let addr = SocketAddr::new(ip, port);
1380    let stream = timeout(timeout_duration, TcpStream::connect(addr))
1381        .await
1382        .ok()?
1383        .ok()?;
1384    drop(stream);
1385    Some(start.elapsed().as_millis())
1386}
1387
1388/// Summarizes the selected target list for observability.
1389fn summarize_targets(targets: &[LeaderTarget]) -> (usize, usize) {
1390    let selected_target_count = targets.len();
1391    let selected_identity_count = targets
1392        .iter()
1393        .filter_map(|target| target.identity)
1394        .collect::<HashSet<_>>()
1395        .len();
1396    (selected_target_count, selected_identity_count)
1397}
1398
1399/// Rotates the target ordering between attempts to spread retries across candidates.
1400fn rotate_targets_for_attempt(
1401    targets: &mut [LeaderTarget],
1402    attempt_idx: usize,
1403    policy: RoutingPolicy,
1404) {
1405    if attempt_idx == 0 || targets.len() <= 1 {
1406        return;
1407    }
1408
1409    let normalized = policy.normalized();
1410    let stride = normalized.max_parallel_sends.max(1);
1411    let rotation = attempt_idx
1412        .saturating_mul(stride)
1413        .checked_rem(targets.len())
1414        .unwrap_or(0);
1415    if rotation > 0 {
1416        targets.rotate_left(rotation);
1417    }
1418}
1419
1420/// Bounds one submit attempt so retry loops cannot hang indefinitely.
1421fn direct_attempt_timeout(direct_config: &DirectSubmitConfig) -> Duration {
1422    direct_config
1423        .global_timeout
1424        .saturating_add(direct_config.per_target_timeout)
1425        .saturating_add(direct_config.rebroadcast_interval)
1426        .max(Duration::from_secs(8))
1427}