1use std::{
4 collections::HashSet,
5 net::SocketAddr,
6 sync::Arc,
7 time::{Duration, Instant, SystemTime},
8};
9
10use solana_signature::Signature;
11use solana_signer::signers::Signers;
12use solana_transaction::versioned::VersionedTransaction;
13use tokio::{
14 net::TcpStream,
15 task::JoinSet,
16 time::{sleep, timeout},
17};
18
19use super::{
20 DirectSubmitConfig, DirectSubmitTransport, JitoSubmitConfig, JitoSubmitTransport,
21 RpcSubmitConfig, RpcSubmitTransport, SignedTx, SubmitError, SubmitMode, SubmitReliability,
22 SubmitResult, TxFlowSafetyQuality, TxFlowSafetySource, TxSubmitContext, TxSubmitGuardPolicy,
23 TxSubmitOutcome, TxSubmitOutcomeKind, TxSubmitOutcomeReporter, TxToxicFlowRejectionReason,
24 TxToxicFlowTelemetry, TxToxicFlowTelemetrySnapshot,
25};
26use crate::{
27 builder::TxBuilder,
28 providers::{LeaderProvider, LeaderTarget, RecentBlockhashProvider},
29 routing::{RoutingPolicy, SignatureDeduper, select_targets},
30 submit::types::TxSuppressionCache,
31};
32
33pub struct TxSubmitClient {
35 blockhash_provider: Arc<dyn RecentBlockhashProvider>,
37 leader_provider: Arc<dyn LeaderProvider>,
39 backups: Vec<LeaderTarget>,
41 policy: RoutingPolicy,
43 deduper: SignatureDeduper,
45 rpc_transport: Option<Arc<dyn RpcSubmitTransport>>,
47 direct_transport: Option<Arc<dyn DirectSubmitTransport>>,
49 jito_transport: Option<Arc<dyn JitoSubmitTransport>>,
51 rpc_config: RpcSubmitConfig,
53 jito_config: JitoSubmitConfig,
55 direct_config: DirectSubmitConfig,
57 flow_safety_source: Option<Arc<dyn TxFlowSafetySource>>,
59 guard_policy: TxSubmitGuardPolicy,
61 suppression: TxSuppressionCache,
63 telemetry: Arc<TxToxicFlowTelemetry>,
65 outcome_reporter: Option<Arc<dyn TxSubmitOutcomeReporter>>,
67}
68
69impl TxSubmitClient {
70 #[must_use]
72 pub fn new(
73 blockhash_provider: Arc<dyn RecentBlockhashProvider>,
74 leader_provider: Arc<dyn LeaderProvider>,
75 ) -> Self {
76 Self {
77 blockhash_provider,
78 leader_provider,
79 backups: Vec::new(),
80 policy: RoutingPolicy::default(),
81 deduper: SignatureDeduper::new(Duration::from_secs(10)),
82 rpc_transport: None,
83 direct_transport: None,
84 jito_transport: None,
85 rpc_config: RpcSubmitConfig::default(),
86 jito_config: JitoSubmitConfig::default(),
87 direct_config: DirectSubmitConfig::default(),
88 flow_safety_source: None,
89 guard_policy: TxSubmitGuardPolicy::default(),
90 suppression: TxSuppressionCache::default(),
91 telemetry: TxToxicFlowTelemetry::shared(),
92 outcome_reporter: None,
93 }
94 }
95
96 #[must_use]
98 pub fn with_backups(mut self, backups: Vec<LeaderTarget>) -> Self {
99 self.backups = backups;
100 self
101 }
102
103 #[must_use]
105 pub fn with_routing_policy(mut self, policy: RoutingPolicy) -> Self {
106 self.policy = policy.normalized();
107 self
108 }
109
110 #[must_use]
112 pub fn with_dedupe_ttl(mut self, ttl: Duration) -> Self {
113 self.deduper = SignatureDeduper::new(ttl);
114 self
115 }
116
117 #[must_use]
119 pub fn with_rpc_transport(mut self, transport: Arc<dyn RpcSubmitTransport>) -> Self {
120 self.rpc_transport = Some(transport);
121 self
122 }
123
124 #[must_use]
126 pub fn with_direct_transport(mut self, transport: Arc<dyn DirectSubmitTransport>) -> Self {
127 self.direct_transport = Some(transport);
128 self
129 }
130
131 #[must_use]
133 pub fn with_jito_transport(mut self, transport: Arc<dyn JitoSubmitTransport>) -> Self {
134 self.jito_transport = Some(transport);
135 self
136 }
137
138 #[must_use]
140 pub fn with_rpc_config(mut self, config: RpcSubmitConfig) -> Self {
141 self.rpc_config = config;
142 self
143 }
144
145 #[must_use]
147 pub const fn with_jito_config(mut self, config: JitoSubmitConfig) -> Self {
148 self.jito_config = config;
149 self
150 }
151
152 #[must_use]
154 pub const fn with_direct_config(mut self, config: DirectSubmitConfig) -> Self {
155 self.direct_config = config.normalized();
156 self
157 }
158
159 #[must_use]
161 pub const fn with_reliability(mut self, reliability: SubmitReliability) -> Self {
162 self.direct_config = DirectSubmitConfig::from_reliability(reliability);
163 self
164 }
165
166 #[must_use]
168 pub fn with_flow_safety_source(mut self, source: Arc<dyn TxFlowSafetySource>) -> Self {
169 self.flow_safety_source = Some(source);
170 self
171 }
172
173 #[must_use]
175 pub const fn with_guard_policy(mut self, policy: TxSubmitGuardPolicy) -> Self {
176 self.guard_policy = policy;
177 self
178 }
179
180 #[must_use]
182 pub fn with_outcome_reporter(mut self, reporter: Arc<dyn TxSubmitOutcomeReporter>) -> Self {
183 self.outcome_reporter = Some(reporter);
184 self
185 }
186
187 #[must_use]
189 pub fn toxic_flow_telemetry(&self) -> TxToxicFlowTelemetrySnapshot {
190 self.telemetry.snapshot()
191 }
192
193 pub fn record_external_outcome(&self, outcome: &TxSubmitOutcome) {
195 self.telemetry.record(outcome);
196 if let Some(reporter) = &self.outcome_reporter {
197 reporter.record_outcome(outcome);
198 }
199 }
200
201 pub async fn submit_builder<T>(
208 &mut self,
209 builder: TxBuilder,
210 signers: &T,
211 mode: SubmitMode,
212 ) -> Result<SubmitResult, SubmitError>
213 where
214 T: Signers + ?Sized,
215 {
216 let blockhash = self
217 .blockhash_provider
218 .latest_blockhash()
219 .ok_or(SubmitError::MissingRecentBlockhash)?;
220 let tx = builder
221 .build_and_sign(blockhash, signers)
222 .map_err(|source| SubmitError::Build { source })?;
223 self.submit_transaction_with_context(tx, mode, TxSubmitContext::default())
224 .await
225 }
226
227 pub async fn submit_builder_with_context<T>(
234 &mut self,
235 builder: TxBuilder,
236 signers: &T,
237 mode: SubmitMode,
238 context: TxSubmitContext,
239 ) -> Result<SubmitResult, SubmitError>
240 where
241 T: Signers + ?Sized,
242 {
243 let blockhash = self
244 .blockhash_provider
245 .latest_blockhash()
246 .ok_or(SubmitError::MissingRecentBlockhash)?;
247 let tx = builder
248 .build_and_sign(blockhash, signers)
249 .map_err(|source| SubmitError::Build { source })?;
250 self.submit_transaction_with_context(tx, mode, context)
251 .await
252 }
253
254 pub async fn submit_transaction(
260 &mut self,
261 tx: VersionedTransaction,
262 mode: SubmitMode,
263 ) -> Result<SubmitResult, SubmitError> {
264 self.submit_transaction_with_context(tx, mode, TxSubmitContext::default())
265 .await
266 }
267
268 pub async fn submit_transaction_with_context(
275 &mut self,
276 tx: VersionedTransaction,
277 mode: SubmitMode,
278 context: TxSubmitContext,
279 ) -> Result<SubmitResult, SubmitError> {
280 let signature = tx.signatures.first().copied();
281 let tx_bytes =
282 bincode::serialize(&tx).map_err(|source| SubmitError::DecodeSignedBytes { source })?;
283 self.submit_bytes(tx_bytes, signature, mode, context).await
284 }
285
286 pub async fn submit_signed(
292 &mut self,
293 signed_tx: SignedTx,
294 mode: SubmitMode,
295 ) -> Result<SubmitResult, SubmitError> {
296 self.submit_signed_with_context(signed_tx, mode, TxSubmitContext::default())
297 .await
298 }
299
300 pub async fn submit_signed_with_context(
307 &mut self,
308 signed_tx: SignedTx,
309 mode: SubmitMode,
310 context: TxSubmitContext,
311 ) -> Result<SubmitResult, SubmitError> {
312 let tx_bytes = match signed_tx {
313 SignedTx::VersionedTransactionBytes(bytes) => bytes,
314 SignedTx::WireTransactionBytes(bytes) => bytes,
315 };
316 let tx: VersionedTransaction = bincode::deserialize(&tx_bytes)
317 .map_err(|source| SubmitError::DecodeSignedBytes { source })?;
318 let signature = tx.signatures.first().copied();
319 self.submit_bytes(tx_bytes, signature, mode, context).await
320 }
321
322 async fn submit_bytes(
324 &mut self,
325 tx_bytes: Vec<u8>,
326 signature: Option<Signature>,
327 mode: SubmitMode,
328 context: TxSubmitContext,
329 ) -> Result<SubmitResult, SubmitError> {
330 self.enforce_toxic_flow_guards(signature, mode, &context)?;
331 self.enforce_dedupe(signature)?;
332 match mode {
333 SubmitMode::RpcOnly => self.submit_rpc_only(tx_bytes, signature, mode).await,
334 SubmitMode::JitoOnly => self.submit_jito_only(tx_bytes, signature, mode).await,
335 SubmitMode::DirectOnly => self.submit_direct_only(tx_bytes, signature, mode).await,
336 SubmitMode::Hybrid => self.submit_hybrid(tx_bytes, signature, mode).await,
337 }
338 }
339
340 fn enforce_dedupe(&mut self, signature: Option<Signature>) -> Result<(), SubmitError> {
342 if let Some(signature) = signature {
343 let now = Instant::now();
344 if !self.deduper.check_and_insert(signature, now) {
345 return Err(SubmitError::DuplicateSignature);
346 }
347 }
348 Ok(())
349 }
350
351 fn enforce_toxic_flow_guards(
353 &mut self,
354 signature: Option<Signature>,
355 mode: SubmitMode,
356 context: &TxSubmitContext,
357 ) -> Result<(), SubmitError> {
358 let now = SystemTime::now();
359 let opportunity_age_ms = context
360 .opportunity_created_at
361 .and_then(|created_at| now.duration_since(created_at).ok())
362 .map(|duration| duration.as_millis().min(u128::from(u64::MAX)) as u64);
363 if let Some(age_ms) = opportunity_age_ms
364 && let Some(max_age) = self.guard_policy.max_opportunity_age
365 {
366 let max_allowed_ms = max_age.as_millis().min(u128::from(u64::MAX)) as u64;
367 if age_ms > max_allowed_ms {
368 return Err(self.reject_with_outcome(
369 TxToxicFlowRejectionReason::OpportunityStale {
370 age_ms,
371 max_allowed_ms,
372 },
373 TxSubmitOutcomeKind::RejectedDueToStaleness,
374 signature,
375 mode,
376 None,
377 opportunity_age_ms,
378 ));
379 }
380 }
381
382 if self.suppression.is_suppressed(
383 &context.suppression_keys,
384 now,
385 self.guard_policy.suppression_ttl,
386 ) {
387 return Err(self.reject_with_outcome(
388 TxToxicFlowRejectionReason::Suppressed,
389 TxSubmitOutcomeKind::Suppressed,
390 signature,
391 mode,
392 None,
393 opportunity_age_ms,
394 ));
395 }
396
397 if let Some(source) = &self.flow_safety_source {
398 let snapshot = source.toxic_flow_snapshot();
399 if self.guard_policy.reject_on_replay_recovery_pending
400 && snapshot.replay_recovery_pending
401 {
402 return Err(self.reject_with_outcome(
403 TxToxicFlowRejectionReason::ReplayRecoveryPending,
404 TxSubmitOutcomeKind::RejectedDueToReplayRecovery,
405 signature,
406 mode,
407 snapshot.current_state_version,
408 opportunity_age_ms,
409 ));
410 }
411 if self.guard_policy.require_stable_control_plane
412 && !matches!(snapshot.quality, TxFlowSafetyQuality::Stable)
413 {
414 let outcome_kind = match snapshot.quality {
415 TxFlowSafetyQuality::ReorgRisk | TxFlowSafetyQuality::Provisional => {
416 TxSubmitOutcomeKind::RejectedDueToReorgRisk
417 }
418 TxFlowSafetyQuality::Stale => TxSubmitOutcomeKind::RejectedDueToStaleness,
419 TxFlowSafetyQuality::Degraded
420 | TxFlowSafetyQuality::IncompleteControlPlane
421 | TxFlowSafetyQuality::Stable => TxSubmitOutcomeKind::Suppressed,
422 };
423 return Err(self.reject_with_outcome(
424 TxToxicFlowRejectionReason::UnsafeControlPlane {
425 quality: snapshot.quality,
426 },
427 outcome_kind,
428 signature,
429 mode,
430 snapshot.current_state_version,
431 opportunity_age_ms,
432 ));
433 }
434 if let (Some(decision_version), Some(current_version), Some(max_allowed)) = (
435 context.decision_state_version,
436 snapshot.current_state_version,
437 self.guard_policy.max_state_version_drift,
438 ) {
439 let drift = current_version.saturating_sub(decision_version);
440 if drift > max_allowed {
441 return Err(self.reject_with_outcome(
442 TxToxicFlowRejectionReason::StateDrift { drift, max_allowed },
443 TxSubmitOutcomeKind::RejectedDueToStateDrift,
444 signature,
445 mode,
446 Some(current_version),
447 opportunity_age_ms,
448 ));
449 }
450 }
451 }
452
453 self.suppression.insert_all(&context.suppression_keys, now);
454 Ok(())
455 }
456
457 fn reject_with_outcome(
459 &self,
460 reason: TxToxicFlowRejectionReason,
461 outcome_kind: TxSubmitOutcomeKind,
462 signature: Option<Signature>,
463 mode: SubmitMode,
464 state_version: Option<u64>,
465 opportunity_age_ms: Option<u64>,
466 ) -> SubmitError {
467 let outcome = TxSubmitOutcome {
468 kind: outcome_kind,
469 signature,
470 mode,
471 state_version,
472 opportunity_age_ms,
473 };
474 self.record_external_outcome(&outcome);
475 SubmitError::ToxicFlow { reason }
476 }
477
478 async fn submit_rpc_only(
480 &self,
481 tx_bytes: Vec<u8>,
482 signature: Option<Signature>,
483 mode: SubmitMode,
484 ) -> Result<SubmitResult, SubmitError> {
485 let rpc = self
486 .rpc_transport
487 .as_ref()
488 .ok_or(SubmitError::MissingRpcTransport)?;
489 let rpc_signature = rpc
490 .submit_rpc(&tx_bytes, &self.rpc_config)
491 .await
492 .map_err(|source| SubmitError::Rpc { source })?;
493 self.record_external_outcome(&TxSubmitOutcome {
494 kind: TxSubmitOutcomeKind::RpcAccepted,
495 signature,
496 mode,
497 state_version: self
498 .flow_safety_source
499 .as_ref()
500 .and_then(|source| source.toxic_flow_snapshot().current_state_version),
501 opportunity_age_ms: None,
502 });
503 Ok(SubmitResult {
504 signature,
505 mode,
506 direct_target: None,
507 rpc_signature: Some(rpc_signature),
508 jito_signature: None,
509 used_rpc_fallback: false,
510 selected_target_count: 0,
511 selected_identity_count: 0,
512 })
513 }
514
515 async fn submit_jito_only(
517 &self,
518 tx_bytes: Vec<u8>,
519 signature: Option<Signature>,
520 mode: SubmitMode,
521 ) -> Result<SubmitResult, SubmitError> {
522 let jito = self
523 .jito_transport
524 .as_ref()
525 .ok_or(SubmitError::MissingJitoTransport)?;
526 let jito_signature = jito
527 .submit_jito(&tx_bytes, &self.jito_config)
528 .await
529 .map_err(|source| SubmitError::Jito { source })?;
530 self.record_external_outcome(&TxSubmitOutcome {
531 kind: TxSubmitOutcomeKind::JitoAccepted,
532 signature,
533 mode,
534 state_version: self
535 .flow_safety_source
536 .as_ref()
537 .and_then(|source| source.toxic_flow_snapshot().current_state_version),
538 opportunity_age_ms: None,
539 });
540 Ok(SubmitResult {
541 signature,
542 mode,
543 direct_target: None,
544 rpc_signature: None,
545 jito_signature: Some(jito_signature),
546 used_rpc_fallback: false,
547 selected_target_count: 0,
548 selected_identity_count: 0,
549 })
550 }
551
552 async fn submit_direct_only(
554 &self,
555 tx_bytes: Vec<u8>,
556 signature: Option<Signature>,
557 mode: SubmitMode,
558 ) -> Result<SubmitResult, SubmitError> {
559 let direct = self
560 .direct_transport
561 .as_ref()
562 .ok_or(SubmitError::MissingDirectTransport)?;
563 let direct_config = self.direct_config.clone().normalized();
564 let mut last_error = None;
565 let attempt_timeout = direct_attempt_timeout(&direct_config);
566
567 for attempt_idx in 0..direct_config.direct_submit_attempts {
568 let mut targets = self.select_direct_targets(&direct_config).await;
569 rotate_targets_for_attempt(&mut targets, attempt_idx, self.policy);
570 let (selected_target_count, selected_identity_count) = summarize_targets(&targets);
571 if targets.is_empty() {
572 return Err(SubmitError::NoDirectTargets);
573 }
574 match timeout(
575 attempt_timeout,
576 direct.submit_direct(&tx_bytes, &targets, self.policy, &direct_config),
577 )
578 .await
579 {
580 Ok(Ok(target)) => {
581 self.record_external_outcome(&TxSubmitOutcome {
582 kind: TxSubmitOutcomeKind::DirectAccepted,
583 signature,
584 mode,
585 state_version: self
586 .flow_safety_source
587 .as_ref()
588 .and_then(|source| source.toxic_flow_snapshot().current_state_version),
589 opportunity_age_ms: None,
590 });
591 self.spawn_agave_rebroadcast(Arc::from(tx_bytes), &direct_config);
592 return Ok(SubmitResult {
593 signature,
594 mode,
595 direct_target: Some(target),
596 rpc_signature: None,
597 jito_signature: None,
598 used_rpc_fallback: false,
599 selected_target_count,
600 selected_identity_count,
601 });
602 }
603 Ok(Err(source)) => last_error = Some(source),
604 Err(_elapsed) => {
605 last_error = Some(super::SubmitTransportError::Failure {
606 message: format!(
607 "direct submit attempt timed out after {}ms",
608 attempt_timeout.as_millis()
609 ),
610 });
611 }
612 }
613 if attempt_idx < direct_config.direct_submit_attempts.saturating_sub(1) {
614 sleep(direct_config.rebroadcast_interval).await;
615 }
616 }
617
618 Err(SubmitError::Direct {
619 source: last_error.unwrap_or_else(|| super::SubmitTransportError::Failure {
620 message: "direct submit attempts exhausted".to_owned(),
621 }),
622 })
623 }
624
625 async fn submit_hybrid(
627 &self,
628 tx_bytes: Vec<u8>,
629 signature: Option<Signature>,
630 mode: SubmitMode,
631 ) -> Result<SubmitResult, SubmitError> {
632 let direct = self
633 .direct_transport
634 .as_ref()
635 .ok_or(SubmitError::MissingDirectTransport)?;
636 let rpc = self
637 .rpc_transport
638 .as_ref()
639 .ok_or(SubmitError::MissingRpcTransport)?;
640
641 let direct_config = self.direct_config.clone().normalized();
642 let attempt_timeout = direct_attempt_timeout(&direct_config);
643 for attempt_idx in 0..direct_config.hybrid_direct_attempts {
644 let mut targets = self.select_direct_targets(&direct_config).await;
645 rotate_targets_for_attempt(&mut targets, attempt_idx, self.policy);
646 let (selected_target_count, selected_identity_count) = summarize_targets(&targets);
647 if targets.is_empty() {
648 break;
649 }
650 if let Ok(Ok(target)) = timeout(
651 attempt_timeout,
652 direct.submit_direct(&tx_bytes, &targets, self.policy, &direct_config),
653 )
654 .await
655 {
656 let tx_bytes = Arc::<[u8]>::from(tx_bytes);
657 self.spawn_agave_rebroadcast(Arc::clone(&tx_bytes), &direct_config);
658 if direct_config.hybrid_rpc_broadcast
659 && let Ok(rpc_signature) =
660 rpc.submit_rpc(tx_bytes.as_ref(), &self.rpc_config).await
661 {
662 self.record_external_outcome(&TxSubmitOutcome {
663 kind: TxSubmitOutcomeKind::DirectAccepted,
664 signature,
665 mode,
666 state_version: self
667 .flow_safety_source
668 .as_ref()
669 .and_then(|source| source.toxic_flow_snapshot().current_state_version),
670 opportunity_age_ms: None,
671 });
672 return Ok(SubmitResult {
673 signature,
674 mode,
675 direct_target: Some(target),
676 rpc_signature: Some(rpc_signature),
677 jito_signature: None,
678 used_rpc_fallback: false,
679 selected_target_count,
680 selected_identity_count,
681 });
682 }
683 self.record_external_outcome(&TxSubmitOutcome {
684 kind: TxSubmitOutcomeKind::DirectAccepted,
685 signature,
686 mode,
687 state_version: self
688 .flow_safety_source
689 .as_ref()
690 .and_then(|source| source.toxic_flow_snapshot().current_state_version),
691 opportunity_age_ms: None,
692 });
693 return Ok(SubmitResult {
694 signature,
695 mode,
696 direct_target: Some(target),
697 rpc_signature: None,
698 jito_signature: None,
699 used_rpc_fallback: false,
700 selected_target_count,
701 selected_identity_count,
702 });
703 }
704 if attempt_idx < direct_config.hybrid_direct_attempts.saturating_sub(1) {
705 sleep(direct_config.rebroadcast_interval).await;
706 }
707 }
708
709 let rpc_signature = rpc
710 .submit_rpc(&tx_bytes, &self.rpc_config)
711 .await
712 .map_err(|source| SubmitError::Rpc { source })?;
713 self.record_external_outcome(&TxSubmitOutcome {
714 kind: TxSubmitOutcomeKind::RpcAccepted,
715 signature,
716 mode,
717 state_version: self
718 .flow_safety_source
719 .as_ref()
720 .and_then(|source| source.toxic_flow_snapshot().current_state_version),
721 opportunity_age_ms: None,
722 });
723 Ok(SubmitResult {
724 signature,
725 mode,
726 direct_target: None,
727 rpc_signature: Some(rpc_signature),
728 jito_signature: None,
729 used_rpc_fallback: true,
730 selected_target_count: 0,
731 selected_identity_count: 0,
732 })
733 }
734
735 async fn select_direct_targets(&self, direct_config: &DirectSubmitConfig) -> Vec<LeaderTarget> {
737 select_and_rank_targets(
738 self.leader_provider.as_ref(),
739 &self.backups,
740 self.policy,
741 direct_config,
742 )
743 .await
744 }
745
746 fn spawn_agave_rebroadcast(&self, tx_bytes: Arc<[u8]>, direct_config: &DirectSubmitConfig) {
748 if !direct_config.agave_rebroadcast_enabled
749 || direct_config.agave_rebroadcast_window.is_zero()
750 {
751 return;
752 }
753 let Some(direct_transport) = self.direct_transport.clone() else {
754 return;
755 };
756 spawn_agave_rebroadcast_task(
757 tx_bytes,
758 direct_transport,
759 self.leader_provider.clone(),
760 self.backups.clone(),
761 self.policy,
762 direct_config.clone(),
763 );
764 }
765}
766
767#[cfg(not(test))]
768fn spawn_agave_rebroadcast_task(
770 tx_bytes: Arc<[u8]>,
771 direct_transport: Arc<dyn DirectSubmitTransport>,
772 leader_provider: Arc<dyn LeaderProvider>,
773 backups: Vec<LeaderTarget>,
774 policy: RoutingPolicy,
775 direct_config: DirectSubmitConfig,
776) {
777 tokio::spawn(async move {
778 let deadline = Instant::now()
779 .checked_add(direct_config.agave_rebroadcast_window)
780 .unwrap_or_else(Instant::now);
781 loop {
782 let now = Instant::now();
783 if now >= deadline {
784 break;
785 }
786
787 let sleep_for = deadline
788 .saturating_duration_since(now)
789 .min(direct_config.agave_rebroadcast_interval);
790 if !sleep_for.is_zero() {
791 sleep(sleep_for).await;
792 }
793
794 if Instant::now() >= deadline {
795 break;
796 }
797
798 let targets = select_and_rank_targets(
799 leader_provider.as_ref(),
800 backups.as_slice(),
801 policy,
802 &direct_config,
803 )
804 .await;
805 if targets.is_empty() {
806 continue;
807 }
808
809 drop(
810 timeout(
811 direct_attempt_timeout(&direct_config),
812 direct_transport.submit_direct(
813 tx_bytes.as_ref(),
814 &targets,
815 policy,
816 &direct_config,
817 ),
818 )
819 .await,
820 );
821 }
822 });
823}
824
825#[cfg(test)]
826fn spawn_agave_rebroadcast_task(
828 _tx_bytes: Arc<[u8]>,
829 _direct_transport: Arc<dyn DirectSubmitTransport>,
830 _leader_provider: Arc<dyn LeaderProvider>,
831 _backups: Vec<LeaderTarget>,
832 _policy: RoutingPolicy,
833 _direct_config: DirectSubmitConfig,
834) {
835}
836
837async fn select_and_rank_targets(
839 leader_provider: &(impl LeaderProvider + ?Sized),
840 backups: &[LeaderTarget],
841 policy: RoutingPolicy,
842 direct_config: &DirectSubmitConfig,
843) -> Vec<LeaderTarget> {
844 let targets = select_targets(leader_provider, backups, policy);
845 rank_targets_by_latency(targets, direct_config).await
846}
847
848async fn rank_targets_by_latency(
850 targets: Vec<LeaderTarget>,
851 direct_config: &DirectSubmitConfig,
852) -> Vec<LeaderTarget> {
853 if targets.len() <= 1 || !direct_config.latency_aware_targeting {
854 return targets;
855 }
856
857 let probe_timeout = direct_config.latency_probe_timeout;
858 let probe_port = direct_config.latency_probe_port;
859 let probe_count = targets
860 .len()
861 .min(direct_config.latency_probe_max_targets.max(1));
862 let mut latencies = vec![None; probe_count];
863 let mut probes = JoinSet::new();
864 for (idx, target) in targets.iter().take(probe_count).cloned().enumerate() {
865 probes.spawn(async move {
866 (
867 idx,
868 probe_target_latency(&target, probe_port, probe_timeout).await,
869 )
870 });
871 }
872 while let Some(result) = probes.join_next().await {
873 if let Ok((idx, latency)) = result
874 && idx < latencies.len()
875 && let Some(slot) = latencies.get_mut(idx)
876 {
877 *slot = latency;
878 }
879 }
880
881 let mut ranked = targets
882 .iter()
883 .take(probe_count)
884 .cloned()
885 .enumerate()
886 .collect::<Vec<_>>();
887 ranked.sort_by_key(|(idx, _target)| {
888 (
889 latencies.get(*idx).copied().flatten().unwrap_or(u128::MAX),
890 *idx,
891 )
892 });
893
894 let mut output = ranked
895 .into_iter()
896 .map(|(_idx, target)| target)
897 .collect::<Vec<_>>();
898 output.extend(targets.iter().skip(probe_count).cloned());
899 output
900}
901
902async fn probe_target_latency(
904 target: &LeaderTarget,
905 probe_port: Option<u16>,
906 probe_timeout: Duration,
907) -> Option<u128> {
908 let mut ports = vec![target.tpu_addr.port()];
909 if let Some(port) = probe_port
910 && port != target.tpu_addr.port()
911 {
912 ports.push(port);
913 }
914
915 let ip = target.tpu_addr.ip();
916 let mut best = None::<u128>;
917 for port in ports {
918 if let Some(latency) = probe_tcp_latency(ip, port, probe_timeout).await {
919 best = Some(best.map_or(latency, |current| current.min(latency)));
920 }
921 }
922 best
923}
924
925async fn probe_tcp_latency(
927 ip: std::net::IpAddr,
928 port: u16,
929 timeout_duration: Duration,
930) -> Option<u128> {
931 let start = Instant::now();
932 let addr = SocketAddr::new(ip, port);
933 let stream = timeout(timeout_duration, TcpStream::connect(addr))
934 .await
935 .ok()?
936 .ok()?;
937 drop(stream);
938 Some(start.elapsed().as_millis())
939}
940
941fn summarize_targets(targets: &[LeaderTarget]) -> (usize, usize) {
943 let selected_target_count = targets.len();
944 let selected_identity_count = targets
945 .iter()
946 .filter_map(|target| target.identity)
947 .collect::<HashSet<_>>()
948 .len();
949 (selected_target_count, selected_identity_count)
950}
951
952fn rotate_targets_for_attempt(
954 targets: &mut [LeaderTarget],
955 attempt_idx: usize,
956 policy: RoutingPolicy,
957) {
958 if attempt_idx == 0 || targets.len() <= 1 {
959 return;
960 }
961
962 let normalized = policy.normalized();
963 let stride = normalized.max_parallel_sends.max(1);
964 let rotation = attempt_idx
965 .saturating_mul(stride)
966 .checked_rem(targets.len())
967 .unwrap_or(0);
968 if rotation > 0 {
969 targets.rotate_left(rotation);
970 }
971}
972
973fn direct_attempt_timeout(direct_config: &DirectSubmitConfig) -> Duration {
975 direct_config
976 .global_timeout
977 .saturating_add(direct_config.per_target_timeout)
978 .saturating_add(direct_config.rebroadcast_interval)
979 .max(Duration::from_secs(8))
980}