1use std::{
4 collections::HashSet,
5 net::SocketAddr,
6 sync::Arc,
7 time::{Duration, Instant, SystemTime},
8};
9
10use solana_signature::Signature;
11use solana_signer::signers::Signers;
12use solana_transaction::versioned::VersionedTransaction;
13use tokio::{
14 net::TcpStream,
15 task::JoinSet,
16 time::{sleep, timeout},
17};
18
19use super::{
20 DirectSubmitConfig, DirectSubmitTransport, RpcSubmitConfig, RpcSubmitTransport, SignedTx,
21 SubmitError, SubmitMode, SubmitReliability, SubmitResult, TxFlowSafetyQuality,
22 TxFlowSafetySource, TxSubmitContext, TxSubmitGuardPolicy, TxSubmitOutcome, TxSubmitOutcomeKind,
23 TxSubmitOutcomeReporter, TxToxicFlowRejectionReason, TxToxicFlowTelemetry,
24 TxToxicFlowTelemetrySnapshot,
25};
26use crate::{
27 builder::TxBuilder,
28 providers::{LeaderProvider, LeaderTarget, RecentBlockhashProvider},
29 routing::{RoutingPolicy, SignatureDeduper, select_targets},
30 submit::types::TxSuppressionCache,
31};
32
33pub struct TxSubmitClient {
35 blockhash_provider: Arc<dyn RecentBlockhashProvider>,
37 leader_provider: Arc<dyn LeaderProvider>,
39 backups: Vec<LeaderTarget>,
41 policy: RoutingPolicy,
43 deduper: SignatureDeduper,
45 rpc_transport: Option<Arc<dyn RpcSubmitTransport>>,
47 direct_transport: Option<Arc<dyn DirectSubmitTransport>>,
49 rpc_config: RpcSubmitConfig,
51 direct_config: DirectSubmitConfig,
53 flow_safety_source: Option<Arc<dyn TxFlowSafetySource>>,
55 guard_policy: TxSubmitGuardPolicy,
57 suppression: TxSuppressionCache,
59 telemetry: Arc<TxToxicFlowTelemetry>,
61 outcome_reporter: Option<Arc<dyn TxSubmitOutcomeReporter>>,
63}
64
65impl TxSubmitClient {
66 #[must_use]
68 pub fn new(
69 blockhash_provider: Arc<dyn RecentBlockhashProvider>,
70 leader_provider: Arc<dyn LeaderProvider>,
71 ) -> Self {
72 Self {
73 blockhash_provider,
74 leader_provider,
75 backups: Vec::new(),
76 policy: RoutingPolicy::default(),
77 deduper: SignatureDeduper::new(Duration::from_secs(10)),
78 rpc_transport: None,
79 direct_transport: None,
80 rpc_config: RpcSubmitConfig::default(),
81 direct_config: DirectSubmitConfig::default(),
82 flow_safety_source: None,
83 guard_policy: TxSubmitGuardPolicy::default(),
84 suppression: TxSuppressionCache::default(),
85 telemetry: TxToxicFlowTelemetry::shared(),
86 outcome_reporter: None,
87 }
88 }
89
90 #[must_use]
92 pub fn with_backups(mut self, backups: Vec<LeaderTarget>) -> Self {
93 self.backups = backups;
94 self
95 }
96
97 #[must_use]
99 pub fn with_routing_policy(mut self, policy: RoutingPolicy) -> Self {
100 self.policy = policy.normalized();
101 self
102 }
103
104 #[must_use]
106 pub fn with_dedupe_ttl(mut self, ttl: Duration) -> Self {
107 self.deduper = SignatureDeduper::new(ttl);
108 self
109 }
110
111 #[must_use]
113 pub fn with_rpc_transport(mut self, transport: Arc<dyn RpcSubmitTransport>) -> Self {
114 self.rpc_transport = Some(transport);
115 self
116 }
117
118 #[must_use]
120 pub fn with_direct_transport(mut self, transport: Arc<dyn DirectSubmitTransport>) -> Self {
121 self.direct_transport = Some(transport);
122 self
123 }
124
125 #[must_use]
127 pub fn with_rpc_config(mut self, config: RpcSubmitConfig) -> Self {
128 self.rpc_config = config;
129 self
130 }
131
132 #[must_use]
134 pub const fn with_direct_config(mut self, config: DirectSubmitConfig) -> Self {
135 self.direct_config = config.normalized();
136 self
137 }
138
139 #[must_use]
141 pub const fn with_reliability(mut self, reliability: SubmitReliability) -> Self {
142 self.direct_config = DirectSubmitConfig::from_reliability(reliability);
143 self
144 }
145
146 #[must_use]
148 pub fn with_flow_safety_source(mut self, source: Arc<dyn TxFlowSafetySource>) -> Self {
149 self.flow_safety_source = Some(source);
150 self
151 }
152
153 #[must_use]
155 pub const fn with_guard_policy(mut self, policy: TxSubmitGuardPolicy) -> Self {
156 self.guard_policy = policy;
157 self
158 }
159
160 #[must_use]
162 pub fn with_outcome_reporter(mut self, reporter: Arc<dyn TxSubmitOutcomeReporter>) -> Self {
163 self.outcome_reporter = Some(reporter);
164 self
165 }
166
167 #[must_use]
169 pub fn toxic_flow_telemetry(&self) -> TxToxicFlowTelemetrySnapshot {
170 self.telemetry.snapshot()
171 }
172
173 pub fn record_external_outcome(&self, outcome: &TxSubmitOutcome) {
175 self.telemetry.record(outcome);
176 if let Some(reporter) = &self.outcome_reporter {
177 reporter.record_outcome(outcome);
178 }
179 }
180
181 pub async fn submit_builder<T>(
188 &mut self,
189 builder: TxBuilder,
190 signers: &T,
191 mode: SubmitMode,
192 ) -> Result<SubmitResult, SubmitError>
193 where
194 T: Signers + ?Sized,
195 {
196 let blockhash = self
197 .blockhash_provider
198 .latest_blockhash()
199 .ok_or(SubmitError::MissingRecentBlockhash)?;
200 let tx = builder
201 .build_and_sign(blockhash, signers)
202 .map_err(|source| SubmitError::Build { source })?;
203 self.submit_transaction_with_context(tx, mode, TxSubmitContext::default())
204 .await
205 }
206
207 pub async fn submit_builder_with_context<T>(
214 &mut self,
215 builder: TxBuilder,
216 signers: &T,
217 mode: SubmitMode,
218 context: TxSubmitContext,
219 ) -> Result<SubmitResult, SubmitError>
220 where
221 T: Signers + ?Sized,
222 {
223 let blockhash = self
224 .blockhash_provider
225 .latest_blockhash()
226 .ok_or(SubmitError::MissingRecentBlockhash)?;
227 let tx = builder
228 .build_and_sign(blockhash, signers)
229 .map_err(|source| SubmitError::Build { source })?;
230 self.submit_transaction_with_context(tx, mode, context)
231 .await
232 }
233
234 pub async fn submit_transaction(
240 &mut self,
241 tx: VersionedTransaction,
242 mode: SubmitMode,
243 ) -> Result<SubmitResult, SubmitError> {
244 self.submit_transaction_with_context(tx, mode, TxSubmitContext::default())
245 .await
246 }
247
248 pub async fn submit_transaction_with_context(
255 &mut self,
256 tx: VersionedTransaction,
257 mode: SubmitMode,
258 context: TxSubmitContext,
259 ) -> Result<SubmitResult, SubmitError> {
260 let signature = tx.signatures.first().copied();
261 let tx_bytes =
262 bincode::serialize(&tx).map_err(|source| SubmitError::DecodeSignedBytes { source })?;
263 self.submit_bytes(tx_bytes, signature, mode, context).await
264 }
265
266 pub async fn submit_signed(
272 &mut self,
273 signed_tx: SignedTx,
274 mode: SubmitMode,
275 ) -> Result<SubmitResult, SubmitError> {
276 self.submit_signed_with_context(signed_tx, mode, TxSubmitContext::default())
277 .await
278 }
279
280 pub async fn submit_signed_with_context(
287 &mut self,
288 signed_tx: SignedTx,
289 mode: SubmitMode,
290 context: TxSubmitContext,
291 ) -> Result<SubmitResult, SubmitError> {
292 let tx_bytes = match signed_tx {
293 SignedTx::VersionedTransactionBytes(bytes) => bytes,
294 SignedTx::WireTransactionBytes(bytes) => bytes,
295 };
296 let tx: VersionedTransaction = bincode::deserialize(&tx_bytes)
297 .map_err(|source| SubmitError::DecodeSignedBytes { source })?;
298 let signature = tx.signatures.first().copied();
299 self.submit_bytes(tx_bytes, signature, mode, context).await
300 }
301
302 async fn submit_bytes(
304 &mut self,
305 tx_bytes: Vec<u8>,
306 signature: Option<Signature>,
307 mode: SubmitMode,
308 context: TxSubmitContext,
309 ) -> Result<SubmitResult, SubmitError> {
310 self.enforce_toxic_flow_guards(signature, mode, &context)?;
311 self.enforce_dedupe(signature)?;
312 match mode {
313 SubmitMode::RpcOnly => self.submit_rpc_only(tx_bytes, signature, mode).await,
314 SubmitMode::DirectOnly => self.submit_direct_only(tx_bytes, signature, mode).await,
315 SubmitMode::Hybrid => self.submit_hybrid(tx_bytes, signature, mode).await,
316 }
317 }
318
319 fn enforce_dedupe(&mut self, signature: Option<Signature>) -> Result<(), SubmitError> {
321 if let Some(signature) = signature {
322 let now = Instant::now();
323 if !self.deduper.check_and_insert(signature, now) {
324 return Err(SubmitError::DuplicateSignature);
325 }
326 }
327 Ok(())
328 }
329
330 fn enforce_toxic_flow_guards(
332 &mut self,
333 signature: Option<Signature>,
334 mode: SubmitMode,
335 context: &TxSubmitContext,
336 ) -> Result<(), SubmitError> {
337 let now = SystemTime::now();
338 let opportunity_age_ms = context
339 .opportunity_created_at
340 .and_then(|created_at| now.duration_since(created_at).ok())
341 .map(|duration| duration.as_millis().min(u128::from(u64::MAX)) as u64);
342 if let Some(age_ms) = opportunity_age_ms
343 && let Some(max_age) = self.guard_policy.max_opportunity_age
344 {
345 let max_allowed_ms = max_age.as_millis().min(u128::from(u64::MAX)) as u64;
346 if age_ms > max_allowed_ms {
347 return Err(self.reject_with_outcome(
348 TxToxicFlowRejectionReason::OpportunityStale {
349 age_ms,
350 max_allowed_ms,
351 },
352 TxSubmitOutcomeKind::RejectedDueToStaleness,
353 signature,
354 mode,
355 None,
356 opportunity_age_ms,
357 ));
358 }
359 }
360
361 if self.suppression.is_suppressed(
362 &context.suppression_keys,
363 now,
364 self.guard_policy.suppression_ttl,
365 ) {
366 return Err(self.reject_with_outcome(
367 TxToxicFlowRejectionReason::Suppressed,
368 TxSubmitOutcomeKind::Suppressed,
369 signature,
370 mode,
371 None,
372 opportunity_age_ms,
373 ));
374 }
375
376 if let Some(source) = &self.flow_safety_source {
377 let snapshot = source.toxic_flow_snapshot();
378 if self.guard_policy.reject_on_replay_recovery_pending
379 && snapshot.replay_recovery_pending
380 {
381 return Err(self.reject_with_outcome(
382 TxToxicFlowRejectionReason::ReplayRecoveryPending,
383 TxSubmitOutcomeKind::RejectedDueToReplayRecovery,
384 signature,
385 mode,
386 snapshot.current_state_version,
387 opportunity_age_ms,
388 ));
389 }
390 if self.guard_policy.require_stable_control_plane
391 && !matches!(snapshot.quality, TxFlowSafetyQuality::Stable)
392 {
393 let outcome_kind = match snapshot.quality {
394 TxFlowSafetyQuality::ReorgRisk | TxFlowSafetyQuality::Provisional => {
395 TxSubmitOutcomeKind::RejectedDueToReorgRisk
396 }
397 TxFlowSafetyQuality::Stale => TxSubmitOutcomeKind::RejectedDueToStaleness,
398 TxFlowSafetyQuality::Degraded
399 | TxFlowSafetyQuality::IncompleteControlPlane
400 | TxFlowSafetyQuality::Stable => TxSubmitOutcomeKind::Suppressed,
401 };
402 return Err(self.reject_with_outcome(
403 TxToxicFlowRejectionReason::UnsafeControlPlane {
404 quality: snapshot.quality,
405 },
406 outcome_kind,
407 signature,
408 mode,
409 snapshot.current_state_version,
410 opportunity_age_ms,
411 ));
412 }
413 if let (Some(decision_version), Some(current_version), Some(max_allowed)) = (
414 context.decision_state_version,
415 snapshot.current_state_version,
416 self.guard_policy.max_state_version_drift,
417 ) {
418 let drift = current_version.saturating_sub(decision_version);
419 if drift > max_allowed {
420 return Err(self.reject_with_outcome(
421 TxToxicFlowRejectionReason::StateDrift { drift, max_allowed },
422 TxSubmitOutcomeKind::RejectedDueToStateDrift,
423 signature,
424 mode,
425 Some(current_version),
426 opportunity_age_ms,
427 ));
428 }
429 }
430 }
431
432 self.suppression.insert_all(&context.suppression_keys, now);
433 Ok(())
434 }
435
436 fn reject_with_outcome(
438 &self,
439 reason: TxToxicFlowRejectionReason,
440 outcome_kind: TxSubmitOutcomeKind,
441 signature: Option<Signature>,
442 mode: SubmitMode,
443 state_version: Option<u64>,
444 opportunity_age_ms: Option<u64>,
445 ) -> SubmitError {
446 let outcome = TxSubmitOutcome {
447 kind: outcome_kind,
448 signature,
449 mode,
450 state_version,
451 opportunity_age_ms,
452 };
453 self.record_external_outcome(&outcome);
454 SubmitError::ToxicFlow { reason }
455 }
456
457 async fn submit_rpc_only(
459 &self,
460 tx_bytes: Vec<u8>,
461 signature: Option<Signature>,
462 mode: SubmitMode,
463 ) -> Result<SubmitResult, SubmitError> {
464 let rpc = self
465 .rpc_transport
466 .as_ref()
467 .ok_or(SubmitError::MissingRpcTransport)?;
468 let rpc_signature = rpc
469 .submit_rpc(&tx_bytes, &self.rpc_config)
470 .await
471 .map_err(|source| SubmitError::Rpc { source })?;
472 self.record_external_outcome(&TxSubmitOutcome {
473 kind: TxSubmitOutcomeKind::RpcAccepted,
474 signature,
475 mode,
476 state_version: self
477 .flow_safety_source
478 .as_ref()
479 .and_then(|source| source.toxic_flow_snapshot().current_state_version),
480 opportunity_age_ms: None,
481 });
482 Ok(SubmitResult {
483 signature,
484 mode,
485 direct_target: None,
486 rpc_signature: Some(rpc_signature),
487 used_rpc_fallback: false,
488 selected_target_count: 0,
489 selected_identity_count: 0,
490 })
491 }
492
493 async fn submit_direct_only(
495 &self,
496 tx_bytes: Vec<u8>,
497 signature: Option<Signature>,
498 mode: SubmitMode,
499 ) -> Result<SubmitResult, SubmitError> {
500 let direct = self
501 .direct_transport
502 .as_ref()
503 .ok_or(SubmitError::MissingDirectTransport)?;
504 let direct_config = self.direct_config.clone().normalized();
505 let mut last_error = None;
506 let attempt_timeout = direct_attempt_timeout(&direct_config);
507
508 for attempt_idx in 0..direct_config.direct_submit_attempts {
509 let mut targets = self.select_direct_targets(&direct_config).await;
510 rotate_targets_for_attempt(&mut targets, attempt_idx, self.policy);
511 let (selected_target_count, selected_identity_count) = summarize_targets(&targets);
512 if targets.is_empty() {
513 return Err(SubmitError::NoDirectTargets);
514 }
515 match timeout(
516 attempt_timeout,
517 direct.submit_direct(&tx_bytes, &targets, self.policy, &direct_config),
518 )
519 .await
520 {
521 Ok(Ok(target)) => {
522 self.record_external_outcome(&TxSubmitOutcome {
523 kind: TxSubmitOutcomeKind::DirectAccepted,
524 signature,
525 mode,
526 state_version: self
527 .flow_safety_source
528 .as_ref()
529 .and_then(|source| source.toxic_flow_snapshot().current_state_version),
530 opportunity_age_ms: None,
531 });
532 self.spawn_agave_rebroadcast(tx_bytes.clone(), &direct_config);
533 return Ok(SubmitResult {
534 signature,
535 mode,
536 direct_target: Some(target),
537 rpc_signature: None,
538 used_rpc_fallback: false,
539 selected_target_count,
540 selected_identity_count,
541 });
542 }
543 Ok(Err(source)) => last_error = Some(source),
544 Err(_elapsed) => {
545 last_error = Some(super::SubmitTransportError::Failure {
546 message: format!(
547 "direct submit attempt timed out after {}ms",
548 attempt_timeout.as_millis()
549 ),
550 });
551 }
552 }
553 if attempt_idx < direct_config.direct_submit_attempts.saturating_sub(1) {
554 sleep(direct_config.rebroadcast_interval).await;
555 }
556 }
557
558 Err(SubmitError::Direct {
559 source: last_error.unwrap_or_else(|| super::SubmitTransportError::Failure {
560 message: "direct submit attempts exhausted".to_owned(),
561 }),
562 })
563 }
564
565 async fn submit_hybrid(
567 &self,
568 tx_bytes: Vec<u8>,
569 signature: Option<Signature>,
570 mode: SubmitMode,
571 ) -> Result<SubmitResult, SubmitError> {
572 let direct = self
573 .direct_transport
574 .as_ref()
575 .ok_or(SubmitError::MissingDirectTransport)?;
576 let rpc = self
577 .rpc_transport
578 .as_ref()
579 .ok_or(SubmitError::MissingRpcTransport)?;
580
581 let direct_config = self.direct_config.clone().normalized();
582 let attempt_timeout = direct_attempt_timeout(&direct_config);
583 for attempt_idx in 0..direct_config.hybrid_direct_attempts {
584 let mut targets = self.select_direct_targets(&direct_config).await;
585 rotate_targets_for_attempt(&mut targets, attempt_idx, self.policy);
586 let (selected_target_count, selected_identity_count) = summarize_targets(&targets);
587 if targets.is_empty() {
588 break;
589 }
590 if let Ok(Ok(target)) = timeout(
591 attempt_timeout,
592 direct.submit_direct(&tx_bytes, &targets, self.policy, &direct_config),
593 )
594 .await
595 {
596 self.spawn_agave_rebroadcast(tx_bytes.clone(), &direct_config);
597 if direct_config.hybrid_rpc_broadcast
598 && let Ok(rpc_signature) = rpc.submit_rpc(&tx_bytes, &self.rpc_config).await
599 {
600 self.record_external_outcome(&TxSubmitOutcome {
601 kind: TxSubmitOutcomeKind::DirectAccepted,
602 signature,
603 mode,
604 state_version: self
605 .flow_safety_source
606 .as_ref()
607 .and_then(|source| source.toxic_flow_snapshot().current_state_version),
608 opportunity_age_ms: None,
609 });
610 return Ok(SubmitResult {
611 signature,
612 mode,
613 direct_target: Some(target),
614 rpc_signature: Some(rpc_signature),
615 used_rpc_fallback: false,
616 selected_target_count,
617 selected_identity_count,
618 });
619 }
620 self.record_external_outcome(&TxSubmitOutcome {
621 kind: TxSubmitOutcomeKind::DirectAccepted,
622 signature,
623 mode,
624 state_version: self
625 .flow_safety_source
626 .as_ref()
627 .and_then(|source| source.toxic_flow_snapshot().current_state_version),
628 opportunity_age_ms: None,
629 });
630 return Ok(SubmitResult {
631 signature,
632 mode,
633 direct_target: Some(target),
634 rpc_signature: None,
635 used_rpc_fallback: false,
636 selected_target_count,
637 selected_identity_count,
638 });
639 }
640 if attempt_idx < direct_config.hybrid_direct_attempts.saturating_sub(1) {
641 sleep(direct_config.rebroadcast_interval).await;
642 }
643 }
644
645 let rpc_signature = rpc
646 .submit_rpc(&tx_bytes, &self.rpc_config)
647 .await
648 .map_err(|source| SubmitError::Rpc { source })?;
649 self.record_external_outcome(&TxSubmitOutcome {
650 kind: TxSubmitOutcomeKind::RpcAccepted,
651 signature,
652 mode,
653 state_version: self
654 .flow_safety_source
655 .as_ref()
656 .and_then(|source| source.toxic_flow_snapshot().current_state_version),
657 opportunity_age_ms: None,
658 });
659 Ok(SubmitResult {
660 signature,
661 mode,
662 direct_target: None,
663 rpc_signature: Some(rpc_signature),
664 used_rpc_fallback: true,
665 selected_target_count: 0,
666 selected_identity_count: 0,
667 })
668 }
669
670 async fn select_direct_targets(&self, direct_config: &DirectSubmitConfig) -> Vec<LeaderTarget> {
672 select_and_rank_targets(
673 self.leader_provider.as_ref(),
674 &self.backups,
675 self.policy,
676 direct_config,
677 )
678 .await
679 }
680
681 fn spawn_agave_rebroadcast(&self, tx_bytes: Vec<u8>, direct_config: &DirectSubmitConfig) {
683 if !direct_config.agave_rebroadcast_enabled
684 || direct_config.agave_rebroadcast_window.is_zero()
685 {
686 return;
687 }
688 let Some(direct_transport) = self.direct_transport.clone() else {
689 return;
690 };
691 spawn_agave_rebroadcast_task(
692 tx_bytes,
693 direct_transport,
694 self.leader_provider.clone(),
695 self.backups.clone(),
696 self.policy,
697 direct_config.clone(),
698 );
699 }
700}
701
702#[cfg(not(test))]
703fn spawn_agave_rebroadcast_task(
705 tx_bytes: Vec<u8>,
706 direct_transport: Arc<dyn DirectSubmitTransport>,
707 leader_provider: Arc<dyn LeaderProvider>,
708 backups: Vec<LeaderTarget>,
709 policy: RoutingPolicy,
710 direct_config: DirectSubmitConfig,
711) {
712 tokio::spawn(async move {
713 let deadline = Instant::now()
714 .checked_add(direct_config.agave_rebroadcast_window)
715 .unwrap_or_else(Instant::now);
716 loop {
717 let now = Instant::now();
718 if now >= deadline {
719 break;
720 }
721
722 let sleep_for = deadline
723 .saturating_duration_since(now)
724 .min(direct_config.agave_rebroadcast_interval);
725 if !sleep_for.is_zero() {
726 sleep(sleep_for).await;
727 }
728
729 if Instant::now() >= deadline {
730 break;
731 }
732
733 let targets = select_and_rank_targets(
734 leader_provider.as_ref(),
735 backups.as_slice(),
736 policy,
737 &direct_config,
738 )
739 .await;
740 if targets.is_empty() {
741 continue;
742 }
743
744 drop(
745 timeout(
746 direct_attempt_timeout(&direct_config),
747 direct_transport.submit_direct(&tx_bytes, &targets, policy, &direct_config),
748 )
749 .await,
750 );
751 }
752 });
753}
754
755#[cfg(test)]
756fn spawn_agave_rebroadcast_task(
758 _tx_bytes: Vec<u8>,
759 _direct_transport: Arc<dyn DirectSubmitTransport>,
760 _leader_provider: Arc<dyn LeaderProvider>,
761 _backups: Vec<LeaderTarget>,
762 _policy: RoutingPolicy,
763 _direct_config: DirectSubmitConfig,
764) {
765}
766
767async fn select_and_rank_targets(
769 leader_provider: &dyn LeaderProvider,
770 backups: &[LeaderTarget],
771 policy: RoutingPolicy,
772 direct_config: &DirectSubmitConfig,
773) -> Vec<LeaderTarget> {
774 let targets = select_targets(leader_provider, backups, policy);
775 rank_targets_by_latency(targets, direct_config).await
776}
777
778async fn rank_targets_by_latency(
780 targets: Vec<LeaderTarget>,
781 direct_config: &DirectSubmitConfig,
782) -> Vec<LeaderTarget> {
783 if targets.len() <= 1 || !direct_config.latency_aware_targeting {
784 return targets;
785 }
786
787 let probe_count = targets
788 .len()
789 .min(direct_config.latency_probe_max_targets.max(1));
790 let mut latencies = vec![None; probe_count];
791 let mut probes = JoinSet::new();
792 for (idx, target) in targets.iter().take(probe_count).cloned().enumerate() {
793 let cfg = direct_config.clone();
794 probes.spawn(async move { (idx, probe_target_latency(&target, &cfg).await) });
795 }
796 while let Some(result) = probes.join_next().await {
797 if let Ok((idx, latency)) = result
798 && idx < latencies.len()
799 && let Some(slot) = latencies.get_mut(idx)
800 {
801 *slot = latency;
802 }
803 }
804
805 let mut ranked = targets
806 .iter()
807 .take(probe_count)
808 .cloned()
809 .enumerate()
810 .collect::<Vec<_>>();
811 ranked.sort_by_key(|(idx, _target)| {
812 (
813 latencies.get(*idx).copied().flatten().unwrap_or(u128::MAX),
814 *idx,
815 )
816 });
817
818 let mut output = ranked
819 .into_iter()
820 .map(|(_idx, target)| target)
821 .collect::<Vec<_>>();
822 output.extend(targets.iter().skip(probe_count).cloned());
823 output
824}
825
826async fn probe_target_latency(
828 target: &LeaderTarget,
829 direct_config: &DirectSubmitConfig,
830) -> Option<u128> {
831 let mut ports = vec![target.tpu_addr.port()];
832 if let Some(port) = direct_config.latency_probe_port
833 && port != target.tpu_addr.port()
834 {
835 ports.push(port);
836 }
837
838 let ip = target.tpu_addr.ip();
839 let mut best = None::<u128>;
840 for port in ports {
841 if let Some(latency) =
842 probe_tcp_latency(ip, port, direct_config.latency_probe_timeout).await
843 {
844 best = Some(best.map_or(latency, |current| current.min(latency)));
845 }
846 }
847 best
848}
849
850async fn probe_tcp_latency(
852 ip: std::net::IpAddr,
853 port: u16,
854 timeout_duration: Duration,
855) -> Option<u128> {
856 let start = Instant::now();
857 let addr = SocketAddr::new(ip, port);
858 let stream = timeout(timeout_duration, TcpStream::connect(addr))
859 .await
860 .ok()?
861 .ok()?;
862 drop(stream);
863 Some(start.elapsed().as_millis())
864}
865
866fn summarize_targets(targets: &[LeaderTarget]) -> (usize, usize) {
868 let selected_target_count = targets.len();
869 let selected_identity_count = targets
870 .iter()
871 .filter_map(|target| target.identity)
872 .collect::<HashSet<_>>()
873 .len();
874 (selected_target_count, selected_identity_count)
875}
876
877fn rotate_targets_for_attempt(
879 targets: &mut [LeaderTarget],
880 attempt_idx: usize,
881 policy: RoutingPolicy,
882) {
883 if attempt_idx == 0 || targets.len() <= 1 {
884 return;
885 }
886
887 let normalized = policy.normalized();
888 let stride = normalized.max_parallel_sends.max(1);
889 let rotation = attempt_idx
890 .saturating_mul(stride)
891 .checked_rem(targets.len())
892 .unwrap_or(0);
893 if rotation > 0 {
894 targets.rotate_left(rotation);
895 }
896}
897
898fn direct_attempt_timeout(direct_config: &DirectSubmitConfig) -> Duration {
900 direct_config
901 .global_timeout
902 .saturating_add(direct_config.per_target_timeout)
903 .saturating_add(direct_config.rebroadcast_interval)
904 .max(Duration::from_secs(8))
905}