1use std::{
4 collections::HashSet,
5 net::SocketAddr,
6 sync::Arc,
7 time::{Duration, Instant, SystemTime},
8};
9
10use solana_signature::Signature;
11use solana_signer::signers::Signers;
12use solana_transaction::versioned::VersionedTransaction;
13use tokio::{
14 net::TcpStream,
15 task::JoinSet,
16 time::{sleep, timeout},
17};
18
19use super::{
20 DirectSubmitConfig, DirectSubmitTransport, RpcSubmitConfig, RpcSubmitTransport, SignedTx,
21 SubmitError, SubmitMode, SubmitReliability, SubmitResult, TxFlowSafetyQuality,
22 TxFlowSafetySource, TxSubmitContext, TxSubmitGuardPolicy, TxSubmitOutcome, TxSubmitOutcomeKind,
23 TxSubmitOutcomeReporter, TxToxicFlowRejectionReason, TxToxicFlowTelemetry,
24 TxToxicFlowTelemetrySnapshot,
25};
26use crate::{
27 builder::TxBuilder,
28 providers::{LeaderProvider, LeaderTarget, RecentBlockhashProvider},
29 routing::{RoutingPolicy, SignatureDeduper, select_targets},
30 submit::types::TxSuppressionCache,
31};
32
33pub struct TxSubmitClient {
35 blockhash_provider: Arc<dyn RecentBlockhashProvider>,
37 leader_provider: Arc<dyn LeaderProvider>,
39 backups: Vec<LeaderTarget>,
41 policy: RoutingPolicy,
43 deduper: SignatureDeduper,
45 rpc_transport: Option<Arc<dyn RpcSubmitTransport>>,
47 direct_transport: Option<Arc<dyn DirectSubmitTransport>>,
49 rpc_config: RpcSubmitConfig,
51 direct_config: DirectSubmitConfig,
53 flow_safety_source: Option<Arc<dyn TxFlowSafetySource>>,
55 guard_policy: TxSubmitGuardPolicy,
57 suppression: TxSuppressionCache,
59 telemetry: Arc<TxToxicFlowTelemetry>,
61 outcome_reporter: Option<Arc<dyn TxSubmitOutcomeReporter>>,
63}
64
65impl TxSubmitClient {
66 #[must_use]
68 pub fn new(
69 blockhash_provider: Arc<dyn RecentBlockhashProvider>,
70 leader_provider: Arc<dyn LeaderProvider>,
71 ) -> Self {
72 Self {
73 blockhash_provider,
74 leader_provider,
75 backups: Vec::new(),
76 policy: RoutingPolicy::default(),
77 deduper: SignatureDeduper::new(Duration::from_secs(10)),
78 rpc_transport: None,
79 direct_transport: None,
80 rpc_config: RpcSubmitConfig::default(),
81 direct_config: DirectSubmitConfig::default(),
82 flow_safety_source: None,
83 guard_policy: TxSubmitGuardPolicy::default(),
84 suppression: TxSuppressionCache::default(),
85 telemetry: TxToxicFlowTelemetry::shared(),
86 outcome_reporter: None,
87 }
88 }
89
90 #[must_use]
92 pub fn with_backups(mut self, backups: Vec<LeaderTarget>) -> Self {
93 self.backups = backups;
94 self
95 }
96
97 #[must_use]
99 pub fn with_routing_policy(mut self, policy: RoutingPolicy) -> Self {
100 self.policy = policy.normalized();
101 self
102 }
103
104 #[must_use]
106 pub fn with_dedupe_ttl(mut self, ttl: Duration) -> Self {
107 self.deduper = SignatureDeduper::new(ttl);
108 self
109 }
110
111 #[must_use]
113 pub fn with_rpc_transport(mut self, transport: Arc<dyn RpcSubmitTransport>) -> Self {
114 self.rpc_transport = Some(transport);
115 self
116 }
117
118 #[must_use]
120 pub fn with_direct_transport(mut self, transport: Arc<dyn DirectSubmitTransport>) -> Self {
121 self.direct_transport = Some(transport);
122 self
123 }
124
125 #[must_use]
127 pub fn with_rpc_config(mut self, config: RpcSubmitConfig) -> Self {
128 self.rpc_config = config;
129 self
130 }
131
132 #[must_use]
134 pub const fn with_direct_config(mut self, config: DirectSubmitConfig) -> Self {
135 self.direct_config = config.normalized();
136 self
137 }
138
139 #[must_use]
141 pub const fn with_reliability(mut self, reliability: SubmitReliability) -> Self {
142 self.direct_config = DirectSubmitConfig::from_reliability(reliability);
143 self
144 }
145
146 #[must_use]
148 pub fn with_flow_safety_source(mut self, source: Arc<dyn TxFlowSafetySource>) -> Self {
149 self.flow_safety_source = Some(source);
150 self
151 }
152
153 #[must_use]
155 pub const fn with_guard_policy(mut self, policy: TxSubmitGuardPolicy) -> Self {
156 self.guard_policy = policy;
157 self
158 }
159
160 #[must_use]
162 pub fn with_outcome_reporter(mut self, reporter: Arc<dyn TxSubmitOutcomeReporter>) -> Self {
163 self.outcome_reporter = Some(reporter);
164 self
165 }
166
167 #[must_use]
169 pub fn toxic_flow_telemetry(&self) -> TxToxicFlowTelemetrySnapshot {
170 self.telemetry.snapshot()
171 }
172
173 pub fn record_external_outcome(&self, outcome: &TxSubmitOutcome) {
175 self.telemetry.record(outcome);
176 if let Some(reporter) = &self.outcome_reporter {
177 reporter.record_outcome(outcome);
178 }
179 }
180
181 pub async fn submit_builder<T>(
188 &mut self,
189 builder: TxBuilder,
190 signers: &T,
191 mode: SubmitMode,
192 ) -> Result<SubmitResult, SubmitError>
193 where
194 T: Signers + ?Sized,
195 {
196 let blockhash = self
197 .blockhash_provider
198 .latest_blockhash()
199 .ok_or(SubmitError::MissingRecentBlockhash)?;
200 let tx = builder
201 .build_and_sign(blockhash, signers)
202 .map_err(|source| SubmitError::Build { source })?;
203 self.submit_transaction_with_context(tx, mode, TxSubmitContext::default())
204 .await
205 }
206
207 pub async fn submit_builder_with_context<T>(
214 &mut self,
215 builder: TxBuilder,
216 signers: &T,
217 mode: SubmitMode,
218 context: TxSubmitContext,
219 ) -> Result<SubmitResult, SubmitError>
220 where
221 T: Signers + ?Sized,
222 {
223 let blockhash = self
224 .blockhash_provider
225 .latest_blockhash()
226 .ok_or(SubmitError::MissingRecentBlockhash)?;
227 let tx = builder
228 .build_and_sign(blockhash, signers)
229 .map_err(|source| SubmitError::Build { source })?;
230 self.submit_transaction_with_context(tx, mode, context)
231 .await
232 }
233
234 pub async fn submit_transaction(
240 &mut self,
241 tx: VersionedTransaction,
242 mode: SubmitMode,
243 ) -> Result<SubmitResult, SubmitError> {
244 self.submit_transaction_with_context(tx, mode, TxSubmitContext::default())
245 .await
246 }
247
248 pub async fn submit_transaction_with_context(
255 &mut self,
256 tx: VersionedTransaction,
257 mode: SubmitMode,
258 context: TxSubmitContext,
259 ) -> Result<SubmitResult, SubmitError> {
260 let signature = tx.signatures.first().copied();
261 let tx_bytes =
262 bincode::serialize(&tx).map_err(|source| SubmitError::DecodeSignedBytes { source })?;
263 self.submit_bytes(tx_bytes, signature, mode, context).await
264 }
265
266 pub async fn submit_signed(
272 &mut self,
273 signed_tx: SignedTx,
274 mode: SubmitMode,
275 ) -> Result<SubmitResult, SubmitError> {
276 self.submit_signed_with_context(signed_tx, mode, TxSubmitContext::default())
277 .await
278 }
279
280 pub async fn submit_signed_with_context(
287 &mut self,
288 signed_tx: SignedTx,
289 mode: SubmitMode,
290 context: TxSubmitContext,
291 ) -> Result<SubmitResult, SubmitError> {
292 let tx_bytes = match signed_tx {
293 SignedTx::VersionedTransactionBytes(bytes) => bytes,
294 SignedTx::WireTransactionBytes(bytes) => bytes,
295 };
296 let tx: VersionedTransaction = bincode::deserialize(&tx_bytes)
297 .map_err(|source| SubmitError::DecodeSignedBytes { source })?;
298 let signature = tx.signatures.first().copied();
299 self.submit_bytes(tx_bytes, signature, mode, context).await
300 }
301
302 async fn submit_bytes(
304 &mut self,
305 tx_bytes: Vec<u8>,
306 signature: Option<Signature>,
307 mode: SubmitMode,
308 context: TxSubmitContext,
309 ) -> Result<SubmitResult, SubmitError> {
310 self.enforce_toxic_flow_guards(signature, mode, &context)?;
311 self.enforce_dedupe(signature)?;
312 match mode {
313 SubmitMode::RpcOnly => self.submit_rpc_only(tx_bytes, signature, mode).await,
314 SubmitMode::DirectOnly => self.submit_direct_only(tx_bytes, signature, mode).await,
315 SubmitMode::Hybrid => self.submit_hybrid(tx_bytes, signature, mode).await,
316 }
317 }
318
319 fn enforce_dedupe(&mut self, signature: Option<Signature>) -> Result<(), SubmitError> {
321 if let Some(signature) = signature {
322 let now = Instant::now();
323 if !self.deduper.check_and_insert(signature, now) {
324 return Err(SubmitError::DuplicateSignature);
325 }
326 }
327 Ok(())
328 }
329
330 fn enforce_toxic_flow_guards(
332 &mut self,
333 signature: Option<Signature>,
334 mode: SubmitMode,
335 context: &TxSubmitContext,
336 ) -> Result<(), SubmitError> {
337 let now = SystemTime::now();
338 let opportunity_age_ms = context
339 .opportunity_created_at
340 .and_then(|created_at| now.duration_since(created_at).ok())
341 .map(|duration| duration.as_millis().min(u128::from(u64::MAX)) as u64);
342 if let Some(age_ms) = opportunity_age_ms
343 && let Some(max_age) = self.guard_policy.max_opportunity_age
344 {
345 let max_allowed_ms = max_age.as_millis().min(u128::from(u64::MAX)) as u64;
346 if age_ms > max_allowed_ms {
347 return Err(self.reject_with_outcome(
348 TxToxicFlowRejectionReason::OpportunityStale {
349 age_ms,
350 max_allowed_ms,
351 },
352 TxSubmitOutcomeKind::RejectedDueToStaleness,
353 signature,
354 mode,
355 None,
356 opportunity_age_ms,
357 ));
358 }
359 }
360
361 if self.suppression.is_suppressed(
362 &context.suppression_keys,
363 now,
364 self.guard_policy.suppression_ttl,
365 ) {
366 return Err(self.reject_with_outcome(
367 TxToxicFlowRejectionReason::Suppressed,
368 TxSubmitOutcomeKind::Suppressed,
369 signature,
370 mode,
371 None,
372 opportunity_age_ms,
373 ));
374 }
375
376 if let Some(source) = &self.flow_safety_source {
377 let snapshot = source.toxic_flow_snapshot();
378 if self.guard_policy.reject_on_replay_recovery_pending
379 && snapshot.replay_recovery_pending
380 {
381 return Err(self.reject_with_outcome(
382 TxToxicFlowRejectionReason::ReplayRecoveryPending,
383 TxSubmitOutcomeKind::RejectedDueToReplayRecovery,
384 signature,
385 mode,
386 snapshot.current_state_version,
387 opportunity_age_ms,
388 ));
389 }
390 if self.guard_policy.require_stable_control_plane
391 && !matches!(snapshot.quality, TxFlowSafetyQuality::Stable)
392 {
393 let outcome_kind = match snapshot.quality {
394 TxFlowSafetyQuality::ReorgRisk | TxFlowSafetyQuality::Provisional => {
395 TxSubmitOutcomeKind::RejectedDueToReorgRisk
396 }
397 TxFlowSafetyQuality::Stale => TxSubmitOutcomeKind::RejectedDueToStaleness,
398 TxFlowSafetyQuality::Degraded
399 | TxFlowSafetyQuality::IncompleteControlPlane
400 | TxFlowSafetyQuality::Stable => TxSubmitOutcomeKind::Suppressed,
401 };
402 return Err(self.reject_with_outcome(
403 TxToxicFlowRejectionReason::UnsafeControlPlane {
404 quality: snapshot.quality,
405 },
406 outcome_kind,
407 signature,
408 mode,
409 snapshot.current_state_version,
410 opportunity_age_ms,
411 ));
412 }
413 if let (Some(decision_version), Some(current_version), Some(max_allowed)) = (
414 context.decision_state_version,
415 snapshot.current_state_version,
416 self.guard_policy.max_state_version_drift,
417 ) {
418 let drift = current_version.saturating_sub(decision_version);
419 if drift > max_allowed {
420 return Err(self.reject_with_outcome(
421 TxToxicFlowRejectionReason::StateDrift { drift, max_allowed },
422 TxSubmitOutcomeKind::RejectedDueToStateDrift,
423 signature,
424 mode,
425 Some(current_version),
426 opportunity_age_ms,
427 ));
428 }
429 }
430 }
431
432 self.suppression.insert_all(&context.suppression_keys, now);
433 Ok(())
434 }
435
436 fn reject_with_outcome(
438 &self,
439 reason: TxToxicFlowRejectionReason,
440 outcome_kind: TxSubmitOutcomeKind,
441 signature: Option<Signature>,
442 mode: SubmitMode,
443 state_version: Option<u64>,
444 opportunity_age_ms: Option<u64>,
445 ) -> SubmitError {
446 let outcome = TxSubmitOutcome {
447 kind: outcome_kind,
448 signature,
449 mode,
450 state_version,
451 opportunity_age_ms,
452 };
453 self.record_external_outcome(&outcome);
454 SubmitError::ToxicFlow { reason }
455 }
456
457 async fn submit_rpc_only(
459 &self,
460 tx_bytes: Vec<u8>,
461 signature: Option<Signature>,
462 mode: SubmitMode,
463 ) -> Result<SubmitResult, SubmitError> {
464 let rpc = self
465 .rpc_transport
466 .as_ref()
467 .ok_or(SubmitError::MissingRpcTransport)?;
468 let rpc_signature = rpc
469 .submit_rpc(&tx_bytes, &self.rpc_config)
470 .await
471 .map_err(|source| SubmitError::Rpc { source })?;
472 self.record_external_outcome(&TxSubmitOutcome {
473 kind: TxSubmitOutcomeKind::RpcAccepted,
474 signature,
475 mode,
476 state_version: self
477 .flow_safety_source
478 .as_ref()
479 .and_then(|source| source.toxic_flow_snapshot().current_state_version),
480 opportunity_age_ms: None,
481 });
482 Ok(SubmitResult {
483 signature,
484 mode,
485 direct_target: None,
486 rpc_signature: Some(rpc_signature),
487 used_rpc_fallback: false,
488 selected_target_count: 0,
489 selected_identity_count: 0,
490 })
491 }
492
493 async fn submit_direct_only(
495 &self,
496 tx_bytes: Vec<u8>,
497 signature: Option<Signature>,
498 mode: SubmitMode,
499 ) -> Result<SubmitResult, SubmitError> {
500 let direct = self
501 .direct_transport
502 .as_ref()
503 .ok_or(SubmitError::MissingDirectTransport)?;
504 let direct_config = self.direct_config.clone().normalized();
505 let mut last_error = None;
506 let attempt_timeout = direct_attempt_timeout(&direct_config);
507
508 for attempt_idx in 0..direct_config.direct_submit_attempts {
509 let mut targets = self.select_direct_targets(&direct_config).await;
510 rotate_targets_for_attempt(&mut targets, attempt_idx, self.policy);
511 let (selected_target_count, selected_identity_count) = summarize_targets(&targets);
512 if targets.is_empty() {
513 return Err(SubmitError::NoDirectTargets);
514 }
515 match timeout(
516 attempt_timeout,
517 direct.submit_direct(&tx_bytes, &targets, self.policy, &direct_config),
518 )
519 .await
520 {
521 Ok(Ok(target)) => {
522 self.record_external_outcome(&TxSubmitOutcome {
523 kind: TxSubmitOutcomeKind::DirectAccepted,
524 signature,
525 mode,
526 state_version: self
527 .flow_safety_source
528 .as_ref()
529 .and_then(|source| source.toxic_flow_snapshot().current_state_version),
530 opportunity_age_ms: None,
531 });
532 self.spawn_agave_rebroadcast(Arc::from(tx_bytes), &direct_config);
533 return Ok(SubmitResult {
534 signature,
535 mode,
536 direct_target: Some(target),
537 rpc_signature: None,
538 used_rpc_fallback: false,
539 selected_target_count,
540 selected_identity_count,
541 });
542 }
543 Ok(Err(source)) => last_error = Some(source),
544 Err(_elapsed) => {
545 last_error = Some(super::SubmitTransportError::Failure {
546 message: format!(
547 "direct submit attempt timed out after {}ms",
548 attempt_timeout.as_millis()
549 ),
550 });
551 }
552 }
553 if attempt_idx < direct_config.direct_submit_attempts.saturating_sub(1) {
554 sleep(direct_config.rebroadcast_interval).await;
555 }
556 }
557
558 Err(SubmitError::Direct {
559 source: last_error.unwrap_or_else(|| super::SubmitTransportError::Failure {
560 message: "direct submit attempts exhausted".to_owned(),
561 }),
562 })
563 }
564
565 async fn submit_hybrid(
567 &self,
568 tx_bytes: Vec<u8>,
569 signature: Option<Signature>,
570 mode: SubmitMode,
571 ) -> Result<SubmitResult, SubmitError> {
572 let direct = self
573 .direct_transport
574 .as_ref()
575 .ok_or(SubmitError::MissingDirectTransport)?;
576 let rpc = self
577 .rpc_transport
578 .as_ref()
579 .ok_or(SubmitError::MissingRpcTransport)?;
580
581 let direct_config = self.direct_config.clone().normalized();
582 let attempt_timeout = direct_attempt_timeout(&direct_config);
583 for attempt_idx in 0..direct_config.hybrid_direct_attempts {
584 let mut targets = self.select_direct_targets(&direct_config).await;
585 rotate_targets_for_attempt(&mut targets, attempt_idx, self.policy);
586 let (selected_target_count, selected_identity_count) = summarize_targets(&targets);
587 if targets.is_empty() {
588 break;
589 }
590 if let Ok(Ok(target)) = timeout(
591 attempt_timeout,
592 direct.submit_direct(&tx_bytes, &targets, self.policy, &direct_config),
593 )
594 .await
595 {
596 let tx_bytes = Arc::<[u8]>::from(tx_bytes);
597 self.spawn_agave_rebroadcast(Arc::clone(&tx_bytes), &direct_config);
598 if direct_config.hybrid_rpc_broadcast
599 && let Ok(rpc_signature) =
600 rpc.submit_rpc(tx_bytes.as_ref(), &self.rpc_config).await
601 {
602 self.record_external_outcome(&TxSubmitOutcome {
603 kind: TxSubmitOutcomeKind::DirectAccepted,
604 signature,
605 mode,
606 state_version: self
607 .flow_safety_source
608 .as_ref()
609 .and_then(|source| source.toxic_flow_snapshot().current_state_version),
610 opportunity_age_ms: None,
611 });
612 return Ok(SubmitResult {
613 signature,
614 mode,
615 direct_target: Some(target),
616 rpc_signature: Some(rpc_signature),
617 used_rpc_fallback: false,
618 selected_target_count,
619 selected_identity_count,
620 });
621 }
622 self.record_external_outcome(&TxSubmitOutcome {
623 kind: TxSubmitOutcomeKind::DirectAccepted,
624 signature,
625 mode,
626 state_version: self
627 .flow_safety_source
628 .as_ref()
629 .and_then(|source| source.toxic_flow_snapshot().current_state_version),
630 opportunity_age_ms: None,
631 });
632 return Ok(SubmitResult {
633 signature,
634 mode,
635 direct_target: Some(target),
636 rpc_signature: None,
637 used_rpc_fallback: false,
638 selected_target_count,
639 selected_identity_count,
640 });
641 }
642 if attempt_idx < direct_config.hybrid_direct_attempts.saturating_sub(1) {
643 sleep(direct_config.rebroadcast_interval).await;
644 }
645 }
646
647 let rpc_signature = rpc
648 .submit_rpc(&tx_bytes, &self.rpc_config)
649 .await
650 .map_err(|source| SubmitError::Rpc { source })?;
651 self.record_external_outcome(&TxSubmitOutcome {
652 kind: TxSubmitOutcomeKind::RpcAccepted,
653 signature,
654 mode,
655 state_version: self
656 .flow_safety_source
657 .as_ref()
658 .and_then(|source| source.toxic_flow_snapshot().current_state_version),
659 opportunity_age_ms: None,
660 });
661 Ok(SubmitResult {
662 signature,
663 mode,
664 direct_target: None,
665 rpc_signature: Some(rpc_signature),
666 used_rpc_fallback: true,
667 selected_target_count: 0,
668 selected_identity_count: 0,
669 })
670 }
671
672 async fn select_direct_targets(&self, direct_config: &DirectSubmitConfig) -> Vec<LeaderTarget> {
674 select_and_rank_targets(
675 self.leader_provider.as_ref(),
676 &self.backups,
677 self.policy,
678 direct_config,
679 )
680 .await
681 }
682
683 fn spawn_agave_rebroadcast(&self, tx_bytes: Arc<[u8]>, direct_config: &DirectSubmitConfig) {
685 if !direct_config.agave_rebroadcast_enabled
686 || direct_config.agave_rebroadcast_window.is_zero()
687 {
688 return;
689 }
690 let Some(direct_transport) = self.direct_transport.clone() else {
691 return;
692 };
693 spawn_agave_rebroadcast_task(
694 tx_bytes,
695 direct_transport,
696 self.leader_provider.clone(),
697 self.backups.clone(),
698 self.policy,
699 direct_config.clone(),
700 );
701 }
702}
703
704#[cfg(not(test))]
705fn spawn_agave_rebroadcast_task(
707 tx_bytes: Arc<[u8]>,
708 direct_transport: Arc<dyn DirectSubmitTransport>,
709 leader_provider: Arc<dyn LeaderProvider>,
710 backups: Vec<LeaderTarget>,
711 policy: RoutingPolicy,
712 direct_config: DirectSubmitConfig,
713) {
714 tokio::spawn(async move {
715 let deadline = Instant::now()
716 .checked_add(direct_config.agave_rebroadcast_window)
717 .unwrap_or_else(Instant::now);
718 loop {
719 let now = Instant::now();
720 if now >= deadline {
721 break;
722 }
723
724 let sleep_for = deadline
725 .saturating_duration_since(now)
726 .min(direct_config.agave_rebroadcast_interval);
727 if !sleep_for.is_zero() {
728 sleep(sleep_for).await;
729 }
730
731 if Instant::now() >= deadline {
732 break;
733 }
734
735 let targets = select_and_rank_targets(
736 leader_provider.as_ref(),
737 backups.as_slice(),
738 policy,
739 &direct_config,
740 )
741 .await;
742 if targets.is_empty() {
743 continue;
744 }
745
746 drop(
747 timeout(
748 direct_attempt_timeout(&direct_config),
749 direct_transport.submit_direct(
750 tx_bytes.as_ref(),
751 &targets,
752 policy,
753 &direct_config,
754 ),
755 )
756 .await,
757 );
758 }
759 });
760}
761
762#[cfg(test)]
763fn spawn_agave_rebroadcast_task(
765 _tx_bytes: Arc<[u8]>,
766 _direct_transport: Arc<dyn DirectSubmitTransport>,
767 _leader_provider: Arc<dyn LeaderProvider>,
768 _backups: Vec<LeaderTarget>,
769 _policy: RoutingPolicy,
770 _direct_config: DirectSubmitConfig,
771) {
772}
773
774async fn select_and_rank_targets(
776 leader_provider: &(impl LeaderProvider + ?Sized),
777 backups: &[LeaderTarget],
778 policy: RoutingPolicy,
779 direct_config: &DirectSubmitConfig,
780) -> Vec<LeaderTarget> {
781 let targets = select_targets(leader_provider, backups, policy);
782 rank_targets_by_latency(targets, direct_config).await
783}
784
785async fn rank_targets_by_latency(
787 targets: Vec<LeaderTarget>,
788 direct_config: &DirectSubmitConfig,
789) -> Vec<LeaderTarget> {
790 if targets.len() <= 1 || !direct_config.latency_aware_targeting {
791 return targets;
792 }
793
794 let probe_timeout = direct_config.latency_probe_timeout;
795 let probe_port = direct_config.latency_probe_port;
796 let probe_count = targets
797 .len()
798 .min(direct_config.latency_probe_max_targets.max(1));
799 let mut latencies = vec![None; probe_count];
800 let mut probes = JoinSet::new();
801 for (idx, target) in targets.iter().take(probe_count).cloned().enumerate() {
802 probes.spawn(async move {
803 (
804 idx,
805 probe_target_latency(&target, probe_port, probe_timeout).await,
806 )
807 });
808 }
809 while let Some(result) = probes.join_next().await {
810 if let Ok((idx, latency)) = result
811 && idx < latencies.len()
812 && let Some(slot) = latencies.get_mut(idx)
813 {
814 *slot = latency;
815 }
816 }
817
818 let mut ranked = targets
819 .iter()
820 .take(probe_count)
821 .cloned()
822 .enumerate()
823 .collect::<Vec<_>>();
824 ranked.sort_by_key(|(idx, _target)| {
825 (
826 latencies.get(*idx).copied().flatten().unwrap_or(u128::MAX),
827 *idx,
828 )
829 });
830
831 let mut output = ranked
832 .into_iter()
833 .map(|(_idx, target)| target)
834 .collect::<Vec<_>>();
835 output.extend(targets.iter().skip(probe_count).cloned());
836 output
837}
838
839async fn probe_target_latency(
841 target: &LeaderTarget,
842 probe_port: Option<u16>,
843 probe_timeout: Duration,
844) -> Option<u128> {
845 let mut ports = vec![target.tpu_addr.port()];
846 if let Some(port) = probe_port
847 && port != target.tpu_addr.port()
848 {
849 ports.push(port);
850 }
851
852 let ip = target.tpu_addr.ip();
853 let mut best = None::<u128>;
854 for port in ports {
855 if let Some(latency) = probe_tcp_latency(ip, port, probe_timeout).await {
856 best = Some(best.map_or(latency, |current| current.min(latency)));
857 }
858 }
859 best
860}
861
862async fn probe_tcp_latency(
864 ip: std::net::IpAddr,
865 port: u16,
866 timeout_duration: Duration,
867) -> Option<u128> {
868 let start = Instant::now();
869 let addr = SocketAddr::new(ip, port);
870 let stream = timeout(timeout_duration, TcpStream::connect(addr))
871 .await
872 .ok()?
873 .ok()?;
874 drop(stream);
875 Some(start.elapsed().as_millis())
876}
877
878fn summarize_targets(targets: &[LeaderTarget]) -> (usize, usize) {
880 let selected_target_count = targets.len();
881 let selected_identity_count = targets
882 .iter()
883 .filter_map(|target| target.identity)
884 .collect::<HashSet<_>>()
885 .len();
886 (selected_target_count, selected_identity_count)
887}
888
889fn rotate_targets_for_attempt(
891 targets: &mut [LeaderTarget],
892 attempt_idx: usize,
893 policy: RoutingPolicy,
894) {
895 if attempt_idx == 0 || targets.len() <= 1 {
896 return;
897 }
898
899 let normalized = policy.normalized();
900 let stride = normalized.max_parallel_sends.max(1);
901 let rotation = attempt_idx
902 .saturating_mul(stride)
903 .checked_rem(targets.len())
904 .unwrap_or(0);
905 if rotation > 0 {
906 targets.rotate_left(rotation);
907 }
908}
909
910fn direct_attempt_timeout(direct_config: &DirectSubmitConfig) -> Duration {
912 direct_config
913 .global_timeout
914 .saturating_add(direct_config.per_target_timeout)
915 .saturating_add(direct_config.rebroadcast_interval)
916 .max(Duration::from_secs(8))
917}