1use std::{
4 collections::HashSet,
5 net::SocketAddr,
6 sync::Arc,
7 time::{Duration, Instant, SystemTime},
8};
9
10use solana_signature::Signature;
11use solana_signer::signers::Signers;
12use solana_transaction::versioned::VersionedTransaction;
13use tokio::{
14 net::TcpStream,
15 task::JoinSet,
16 time::{sleep, timeout},
17};
18
19use super::{
20 DirectSubmitConfig, DirectSubmitTransport, JitoSubmitConfig, JitoSubmitTransport,
21 RpcSubmitConfig, RpcSubmitTransport, SignedTx, SubmitError, SubmitMode, SubmitReliability,
22 SubmitResult, TxFlowSafetyQuality, TxFlowSafetySource, TxSubmitContext, TxSubmitGuardPolicy,
23 TxSubmitOutcome, TxSubmitOutcomeKind, TxSubmitOutcomeReporter, TxToxicFlowRejectionReason,
24 TxToxicFlowTelemetry, TxToxicFlowTelemetrySnapshot,
25};
26use crate::{
27 builder::TxBuilder,
28 providers::{LeaderProvider, LeaderTarget, RecentBlockhashProvider},
29 routing::{RoutingPolicy, SignatureDeduper, select_targets},
30 submit::types::TxSuppressionCache,
31};
32
33pub struct TxSubmitClient {
35 blockhash_provider: Arc<dyn RecentBlockhashProvider>,
37 leader_provider: Arc<dyn LeaderProvider>,
39 backups: Vec<LeaderTarget>,
41 policy: RoutingPolicy,
43 deduper: SignatureDeduper,
45 rpc_transport: Option<Arc<dyn RpcSubmitTransport>>,
47 direct_transport: Option<Arc<dyn DirectSubmitTransport>>,
49 jito_transport: Option<Arc<dyn JitoSubmitTransport>>,
51 rpc_config: RpcSubmitConfig,
53 jito_config: JitoSubmitConfig,
55 direct_config: DirectSubmitConfig,
57 flow_safety_source: Option<Arc<dyn TxFlowSafetySource>>,
59 guard_policy: TxSubmitGuardPolicy,
61 suppression: TxSuppressionCache,
63 telemetry: Arc<TxToxicFlowTelemetry>,
65 outcome_reporter: Option<Arc<dyn TxSubmitOutcomeReporter>>,
67}
68
69impl TxSubmitClient {
70 #[must_use]
72 pub fn new(
73 blockhash_provider: Arc<dyn RecentBlockhashProvider>,
74 leader_provider: Arc<dyn LeaderProvider>,
75 ) -> Self {
76 Self {
77 blockhash_provider,
78 leader_provider,
79 backups: Vec::new(),
80 policy: RoutingPolicy::default(),
81 deduper: SignatureDeduper::new(Duration::from_secs(10)),
82 rpc_transport: None,
83 direct_transport: None,
84 jito_transport: None,
85 rpc_config: RpcSubmitConfig::default(),
86 jito_config: JitoSubmitConfig::default(),
87 direct_config: DirectSubmitConfig::default(),
88 flow_safety_source: None,
89 guard_policy: TxSubmitGuardPolicy::default(),
90 suppression: TxSuppressionCache::default(),
91 telemetry: TxToxicFlowTelemetry::shared(),
92 outcome_reporter: None,
93 }
94 }
95
96 #[must_use]
98 pub fn with_backups(mut self, backups: Vec<LeaderTarget>) -> Self {
99 self.backups = backups;
100 self
101 }
102
103 #[must_use]
105 pub fn with_routing_policy(mut self, policy: RoutingPolicy) -> Self {
106 self.policy = policy.normalized();
107 self
108 }
109
110 #[must_use]
112 pub fn with_dedupe_ttl(mut self, ttl: Duration) -> Self {
113 self.deduper = SignatureDeduper::new(ttl);
114 self
115 }
116
117 #[must_use]
119 pub fn with_rpc_transport(mut self, transport: Arc<dyn RpcSubmitTransport>) -> Self {
120 self.rpc_transport = Some(transport);
121 self
122 }
123
124 #[must_use]
126 pub fn with_direct_transport(mut self, transport: Arc<dyn DirectSubmitTransport>) -> Self {
127 self.direct_transport = Some(transport);
128 self
129 }
130
131 #[must_use]
133 pub fn with_jito_transport(mut self, transport: Arc<dyn JitoSubmitTransport>) -> Self {
134 self.jito_transport = Some(transport);
135 self
136 }
137
138 #[must_use]
140 pub fn with_rpc_config(mut self, config: RpcSubmitConfig) -> Self {
141 self.rpc_config = config;
142 self
143 }
144
145 #[must_use]
147 pub const fn with_jito_config(mut self, config: JitoSubmitConfig) -> Self {
148 self.jito_config = config;
149 self
150 }
151
152 #[must_use]
154 pub const fn with_direct_config(mut self, config: DirectSubmitConfig) -> Self {
155 self.direct_config = config.normalized();
156 self
157 }
158
159 #[must_use]
161 pub const fn with_reliability(mut self, reliability: SubmitReliability) -> Self {
162 self.direct_config = DirectSubmitConfig::from_reliability(reliability);
163 self
164 }
165
166 #[must_use]
168 pub fn with_flow_safety_source(mut self, source: Arc<dyn TxFlowSafetySource>) -> Self {
169 self.flow_safety_source = Some(source);
170 self
171 }
172
173 #[must_use]
175 pub const fn with_guard_policy(mut self, policy: TxSubmitGuardPolicy) -> Self {
176 self.guard_policy = policy;
177 self
178 }
179
180 #[must_use]
182 pub fn with_outcome_reporter(mut self, reporter: Arc<dyn TxSubmitOutcomeReporter>) -> Self {
183 self.outcome_reporter = Some(reporter);
184 self
185 }
186
187 #[must_use]
189 pub fn toxic_flow_telemetry(&self) -> TxToxicFlowTelemetrySnapshot {
190 self.telemetry.snapshot()
191 }
192
193 pub fn record_external_outcome(&self, outcome: &TxSubmitOutcome) {
195 self.telemetry.record(outcome);
196 if let Some(reporter) = &self.outcome_reporter {
197 reporter.record_outcome(outcome);
198 }
199 }
200
201 pub async fn submit_builder<T>(
208 &mut self,
209 builder: TxBuilder,
210 signers: &T,
211 mode: SubmitMode,
212 ) -> Result<SubmitResult, SubmitError>
213 where
214 T: Signers + ?Sized,
215 {
216 let blockhash = self
217 .blockhash_provider
218 .latest_blockhash()
219 .ok_or(SubmitError::MissingRecentBlockhash)?;
220 let tx = builder
221 .build_and_sign(blockhash, signers)
222 .map_err(|source| SubmitError::Build { source })?;
223 self.submit_transaction_with_context(tx, mode, TxSubmitContext::default())
224 .await
225 }
226
227 pub async fn submit_builder_with_context<T>(
234 &mut self,
235 builder: TxBuilder,
236 signers: &T,
237 mode: SubmitMode,
238 context: TxSubmitContext,
239 ) -> Result<SubmitResult, SubmitError>
240 where
241 T: Signers + ?Sized,
242 {
243 let blockhash = self
244 .blockhash_provider
245 .latest_blockhash()
246 .ok_or(SubmitError::MissingRecentBlockhash)?;
247 let tx = builder
248 .build_and_sign(blockhash, signers)
249 .map_err(|source| SubmitError::Build { source })?;
250 self.submit_transaction_with_context(tx, mode, context)
251 .await
252 }
253
254 pub async fn submit_transaction(
260 &mut self,
261 tx: VersionedTransaction,
262 mode: SubmitMode,
263 ) -> Result<SubmitResult, SubmitError> {
264 self.submit_transaction_with_context(tx, mode, TxSubmitContext::default())
265 .await
266 }
267
268 pub async fn submit_transaction_with_context(
275 &mut self,
276 tx: VersionedTransaction,
277 mode: SubmitMode,
278 context: TxSubmitContext,
279 ) -> Result<SubmitResult, SubmitError> {
280 let signature = tx.signatures.first().copied();
281 let tx_bytes =
282 bincode::serialize(&tx).map_err(|source| SubmitError::DecodeSignedBytes { source })?;
283 self.submit_bytes(tx_bytes, signature, mode, context).await
284 }
285
286 pub async fn submit_signed(
292 &mut self,
293 signed_tx: SignedTx,
294 mode: SubmitMode,
295 ) -> Result<SubmitResult, SubmitError> {
296 self.submit_signed_with_context(signed_tx, mode, TxSubmitContext::default())
297 .await
298 }
299
300 pub async fn submit_signed_with_context(
307 &mut self,
308 signed_tx: SignedTx,
309 mode: SubmitMode,
310 context: TxSubmitContext,
311 ) -> Result<SubmitResult, SubmitError> {
312 let tx_bytes = match signed_tx {
313 SignedTx::VersionedTransactionBytes(bytes) => bytes,
314 SignedTx::WireTransactionBytes(bytes) => bytes,
315 };
316 let tx: VersionedTransaction = bincode::deserialize(&tx_bytes)
317 .map_err(|source| SubmitError::DecodeSignedBytes { source })?;
318 let signature = tx.signatures.first().copied();
319 self.submit_bytes(tx_bytes, signature, mode, context).await
320 }
321
322 async fn submit_bytes(
324 &mut self,
325 tx_bytes: Vec<u8>,
326 signature: Option<Signature>,
327 mode: SubmitMode,
328 context: TxSubmitContext,
329 ) -> Result<SubmitResult, SubmitError> {
330 self.enforce_toxic_flow_guards(signature, mode, &context)?;
331 self.enforce_dedupe(signature)?;
332 match mode {
333 SubmitMode::RpcOnly => self.submit_rpc_only(tx_bytes, signature, mode).await,
334 SubmitMode::JitoOnly => self.submit_jito_only(tx_bytes, signature, mode).await,
335 SubmitMode::DirectOnly => self.submit_direct_only(tx_bytes, signature, mode).await,
336 SubmitMode::Hybrid => self.submit_hybrid(tx_bytes, signature, mode).await,
337 }
338 }
339
340 fn enforce_dedupe(&mut self, signature: Option<Signature>) -> Result<(), SubmitError> {
342 if let Some(signature) = signature {
343 let now = Instant::now();
344 if !self.deduper.check_and_insert(signature, now) {
345 return Err(SubmitError::DuplicateSignature);
346 }
347 }
348 Ok(())
349 }
350
351 fn enforce_toxic_flow_guards(
353 &mut self,
354 signature: Option<Signature>,
355 mode: SubmitMode,
356 context: &TxSubmitContext,
357 ) -> Result<(), SubmitError> {
358 let now = SystemTime::now();
359 let opportunity_age_ms = context
360 .opportunity_created_at
361 .and_then(|created_at| now.duration_since(created_at).ok())
362 .map(|duration| duration.as_millis().min(u128::from(u64::MAX)) as u64);
363 if let Some(age_ms) = opportunity_age_ms
364 && let Some(max_age) = self.guard_policy.max_opportunity_age
365 {
366 let max_allowed_ms = max_age.as_millis().min(u128::from(u64::MAX)) as u64;
367 if age_ms > max_allowed_ms {
368 return Err(self.reject_with_outcome(
369 TxToxicFlowRejectionReason::OpportunityStale {
370 age_ms,
371 max_allowed_ms,
372 },
373 TxSubmitOutcomeKind::RejectedDueToStaleness,
374 signature,
375 mode,
376 None,
377 opportunity_age_ms,
378 ));
379 }
380 }
381
382 if self.suppression.is_suppressed(
383 &context.suppression_keys,
384 now,
385 self.guard_policy.suppression_ttl,
386 ) {
387 return Err(self.reject_with_outcome(
388 TxToxicFlowRejectionReason::Suppressed,
389 TxSubmitOutcomeKind::Suppressed,
390 signature,
391 mode,
392 None,
393 opportunity_age_ms,
394 ));
395 }
396
397 if let Some(source) = &self.flow_safety_source {
398 let snapshot = source.toxic_flow_snapshot();
399 if self.guard_policy.reject_on_replay_recovery_pending
400 && snapshot.replay_recovery_pending
401 {
402 return Err(self.reject_with_outcome(
403 TxToxicFlowRejectionReason::ReplayRecoveryPending,
404 TxSubmitOutcomeKind::RejectedDueToReplayRecovery,
405 signature,
406 mode,
407 snapshot.current_state_version,
408 opportunity_age_ms,
409 ));
410 }
411 if self.guard_policy.require_stable_control_plane
412 && !matches!(snapshot.quality, TxFlowSafetyQuality::Stable)
413 {
414 let outcome_kind = match snapshot.quality {
415 TxFlowSafetyQuality::ReorgRisk | TxFlowSafetyQuality::Provisional => {
416 TxSubmitOutcomeKind::RejectedDueToReorgRisk
417 }
418 TxFlowSafetyQuality::Stale => TxSubmitOutcomeKind::RejectedDueToStaleness,
419 TxFlowSafetyQuality::Degraded
420 | TxFlowSafetyQuality::IncompleteControlPlane
421 | TxFlowSafetyQuality::Stable => TxSubmitOutcomeKind::Suppressed,
422 };
423 return Err(self.reject_with_outcome(
424 TxToxicFlowRejectionReason::UnsafeControlPlane {
425 quality: snapshot.quality,
426 },
427 outcome_kind,
428 signature,
429 mode,
430 snapshot.current_state_version,
431 opportunity_age_ms,
432 ));
433 }
434 if let (Some(decision_version), Some(current_version), Some(max_allowed)) = (
435 context.decision_state_version,
436 snapshot.current_state_version,
437 self.guard_policy.max_state_version_drift,
438 ) {
439 let drift = current_version.saturating_sub(decision_version);
440 if drift > max_allowed {
441 return Err(self.reject_with_outcome(
442 TxToxicFlowRejectionReason::StateDrift { drift, max_allowed },
443 TxSubmitOutcomeKind::RejectedDueToStateDrift,
444 signature,
445 mode,
446 Some(current_version),
447 opportunity_age_ms,
448 ));
449 }
450 }
451 }
452
453 self.suppression.insert_all(&context.suppression_keys, now);
454 Ok(())
455 }
456
457 fn reject_with_outcome(
459 &self,
460 reason: TxToxicFlowRejectionReason,
461 outcome_kind: TxSubmitOutcomeKind,
462 signature: Option<Signature>,
463 mode: SubmitMode,
464 state_version: Option<u64>,
465 opportunity_age_ms: Option<u64>,
466 ) -> SubmitError {
467 let outcome = TxSubmitOutcome {
468 kind: outcome_kind,
469 signature,
470 mode,
471 state_version,
472 opportunity_age_ms,
473 };
474 self.record_external_outcome(&outcome);
475 SubmitError::ToxicFlow { reason }
476 }
477
478 async fn submit_rpc_only(
480 &self,
481 tx_bytes: Vec<u8>,
482 signature: Option<Signature>,
483 mode: SubmitMode,
484 ) -> Result<SubmitResult, SubmitError> {
485 let rpc = self
486 .rpc_transport
487 .as_ref()
488 .ok_or(SubmitError::MissingRpcTransport)?;
489 let rpc_signature = rpc
490 .submit_rpc(&tx_bytes, &self.rpc_config)
491 .await
492 .map_err(|source| SubmitError::Rpc { source })?;
493 self.record_external_outcome(&TxSubmitOutcome {
494 kind: TxSubmitOutcomeKind::RpcAccepted,
495 signature,
496 mode,
497 state_version: self
498 .flow_safety_source
499 .as_ref()
500 .and_then(|source| source.toxic_flow_snapshot().current_state_version),
501 opportunity_age_ms: None,
502 });
503 Ok(SubmitResult {
504 signature,
505 mode,
506 direct_target: None,
507 rpc_signature: Some(rpc_signature),
508 jito_signature: None,
509 jito_bundle_id: None,
510 used_rpc_fallback: false,
511 selected_target_count: 0,
512 selected_identity_count: 0,
513 })
514 }
515
516 async fn submit_jito_only(
518 &self,
519 tx_bytes: Vec<u8>,
520 signature: Option<Signature>,
521 mode: SubmitMode,
522 ) -> Result<SubmitResult, SubmitError> {
523 let jito = self
524 .jito_transport
525 .as_ref()
526 .ok_or(SubmitError::MissingJitoTransport)?;
527 let jito_response = jito
528 .submit_jito(&tx_bytes, &self.jito_config)
529 .await
530 .map_err(|source| SubmitError::Jito { source })?;
531 self.record_external_outcome(&TxSubmitOutcome {
532 kind: TxSubmitOutcomeKind::JitoAccepted,
533 signature,
534 mode,
535 state_version: self
536 .flow_safety_source
537 .as_ref()
538 .and_then(|source| source.toxic_flow_snapshot().current_state_version),
539 opportunity_age_ms: None,
540 });
541 Ok(SubmitResult {
542 signature,
543 mode,
544 direct_target: None,
545 rpc_signature: None,
546 jito_signature: jito_response.transaction_signature,
547 jito_bundle_id: jito_response.bundle_id,
548 used_rpc_fallback: false,
549 selected_target_count: 0,
550 selected_identity_count: 0,
551 })
552 }
553
554 async fn submit_direct_only(
556 &self,
557 tx_bytes: Vec<u8>,
558 signature: Option<Signature>,
559 mode: SubmitMode,
560 ) -> Result<SubmitResult, SubmitError> {
561 let direct = self
562 .direct_transport
563 .as_ref()
564 .ok_or(SubmitError::MissingDirectTransport)?;
565 let direct_config = self.direct_config.clone().normalized();
566 let mut last_error = None;
567 let attempt_timeout = direct_attempt_timeout(&direct_config);
568
569 for attempt_idx in 0..direct_config.direct_submit_attempts {
570 let mut targets = self.select_direct_targets(&direct_config).await;
571 rotate_targets_for_attempt(&mut targets, attempt_idx, self.policy);
572 let (selected_target_count, selected_identity_count) = summarize_targets(&targets);
573 if targets.is_empty() {
574 return Err(SubmitError::NoDirectTargets);
575 }
576 match timeout(
577 attempt_timeout,
578 direct.submit_direct(&tx_bytes, &targets, self.policy, &direct_config),
579 )
580 .await
581 {
582 Ok(Ok(target)) => {
583 self.record_external_outcome(&TxSubmitOutcome {
584 kind: TxSubmitOutcomeKind::DirectAccepted,
585 signature,
586 mode,
587 state_version: self
588 .flow_safety_source
589 .as_ref()
590 .and_then(|source| source.toxic_flow_snapshot().current_state_version),
591 opportunity_age_ms: None,
592 });
593 self.spawn_agave_rebroadcast(Arc::from(tx_bytes), &direct_config);
594 return Ok(SubmitResult {
595 signature,
596 mode,
597 direct_target: Some(target),
598 rpc_signature: None,
599 jito_signature: None,
600 jito_bundle_id: None,
601 used_rpc_fallback: false,
602 selected_target_count,
603 selected_identity_count,
604 });
605 }
606 Ok(Err(source)) => last_error = Some(source),
607 Err(_elapsed) => {
608 last_error = Some(super::SubmitTransportError::Failure {
609 message: format!(
610 "direct submit attempt timed out after {}ms",
611 attempt_timeout.as_millis()
612 ),
613 });
614 }
615 }
616 if attempt_idx < direct_config.direct_submit_attempts.saturating_sub(1) {
617 sleep(direct_config.rebroadcast_interval).await;
618 }
619 }
620
621 Err(SubmitError::Direct {
622 source: last_error.unwrap_or_else(|| super::SubmitTransportError::Failure {
623 message: "direct submit attempts exhausted".to_owned(),
624 }),
625 })
626 }
627
628 async fn submit_hybrid(
630 &self,
631 tx_bytes: Vec<u8>,
632 signature: Option<Signature>,
633 mode: SubmitMode,
634 ) -> Result<SubmitResult, SubmitError> {
635 let direct = self
636 .direct_transport
637 .as_ref()
638 .ok_or(SubmitError::MissingDirectTransport)?;
639 let rpc = self
640 .rpc_transport
641 .as_ref()
642 .ok_or(SubmitError::MissingRpcTransport)?;
643
644 let direct_config = self.direct_config.clone().normalized();
645 let attempt_timeout = direct_attempt_timeout(&direct_config);
646 for attempt_idx in 0..direct_config.hybrid_direct_attempts {
647 let mut targets = self.select_direct_targets(&direct_config).await;
648 rotate_targets_for_attempt(&mut targets, attempt_idx, self.policy);
649 let (selected_target_count, selected_identity_count) = summarize_targets(&targets);
650 if targets.is_empty() {
651 break;
652 }
653 if let Ok(Ok(target)) = timeout(
654 attempt_timeout,
655 direct.submit_direct(&tx_bytes, &targets, self.policy, &direct_config),
656 )
657 .await
658 {
659 let tx_bytes = Arc::<[u8]>::from(tx_bytes);
660 self.spawn_agave_rebroadcast(Arc::clone(&tx_bytes), &direct_config);
661 if direct_config.hybrid_rpc_broadcast
662 && let Ok(rpc_signature) =
663 rpc.submit_rpc(tx_bytes.as_ref(), &self.rpc_config).await
664 {
665 self.record_external_outcome(&TxSubmitOutcome {
666 kind: TxSubmitOutcomeKind::DirectAccepted,
667 signature,
668 mode,
669 state_version: self
670 .flow_safety_source
671 .as_ref()
672 .and_then(|source| source.toxic_flow_snapshot().current_state_version),
673 opportunity_age_ms: None,
674 });
675 return Ok(SubmitResult {
676 signature,
677 mode,
678 direct_target: Some(target),
679 rpc_signature: Some(rpc_signature),
680 jito_signature: None,
681 jito_bundle_id: None,
682 used_rpc_fallback: false,
683 selected_target_count,
684 selected_identity_count,
685 });
686 }
687 self.record_external_outcome(&TxSubmitOutcome {
688 kind: TxSubmitOutcomeKind::DirectAccepted,
689 signature,
690 mode,
691 state_version: self
692 .flow_safety_source
693 .as_ref()
694 .and_then(|source| source.toxic_flow_snapshot().current_state_version),
695 opportunity_age_ms: None,
696 });
697 return Ok(SubmitResult {
698 signature,
699 mode,
700 direct_target: Some(target),
701 rpc_signature: None,
702 jito_signature: None,
703 jito_bundle_id: None,
704 used_rpc_fallback: false,
705 selected_target_count,
706 selected_identity_count,
707 });
708 }
709 if attempt_idx < direct_config.hybrid_direct_attempts.saturating_sub(1) {
710 sleep(direct_config.rebroadcast_interval).await;
711 }
712 }
713
714 let rpc_signature = rpc
715 .submit_rpc(&tx_bytes, &self.rpc_config)
716 .await
717 .map_err(|source| SubmitError::Rpc { source })?;
718 self.record_external_outcome(&TxSubmitOutcome {
719 kind: TxSubmitOutcomeKind::RpcAccepted,
720 signature,
721 mode,
722 state_version: self
723 .flow_safety_source
724 .as_ref()
725 .and_then(|source| source.toxic_flow_snapshot().current_state_version),
726 opportunity_age_ms: None,
727 });
728 Ok(SubmitResult {
729 signature,
730 mode,
731 direct_target: None,
732 rpc_signature: Some(rpc_signature),
733 jito_signature: None,
734 jito_bundle_id: None,
735 used_rpc_fallback: true,
736 selected_target_count: 0,
737 selected_identity_count: 0,
738 })
739 }
740
741 async fn select_direct_targets(&self, direct_config: &DirectSubmitConfig) -> Vec<LeaderTarget> {
743 select_and_rank_targets(
744 self.leader_provider.as_ref(),
745 &self.backups,
746 self.policy,
747 direct_config,
748 )
749 .await
750 }
751
752 fn spawn_agave_rebroadcast(&self, tx_bytes: Arc<[u8]>, direct_config: &DirectSubmitConfig) {
754 if !direct_config.agave_rebroadcast_enabled
755 || direct_config.agave_rebroadcast_window.is_zero()
756 {
757 return;
758 }
759 let Some(direct_transport) = self.direct_transport.clone() else {
760 return;
761 };
762 spawn_agave_rebroadcast_task(
763 tx_bytes,
764 direct_transport,
765 self.leader_provider.clone(),
766 self.backups.clone(),
767 self.policy,
768 direct_config.clone(),
769 );
770 }
771}
772
773#[cfg(not(test))]
774fn spawn_agave_rebroadcast_task(
776 tx_bytes: Arc<[u8]>,
777 direct_transport: Arc<dyn DirectSubmitTransport>,
778 leader_provider: Arc<dyn LeaderProvider>,
779 backups: Vec<LeaderTarget>,
780 policy: RoutingPolicy,
781 direct_config: DirectSubmitConfig,
782) {
783 tokio::spawn(async move {
784 let deadline = Instant::now()
785 .checked_add(direct_config.agave_rebroadcast_window)
786 .unwrap_or_else(Instant::now);
787 loop {
788 let now = Instant::now();
789 if now >= deadline {
790 break;
791 }
792
793 let sleep_for = deadline
794 .saturating_duration_since(now)
795 .min(direct_config.agave_rebroadcast_interval);
796 if !sleep_for.is_zero() {
797 sleep(sleep_for).await;
798 }
799
800 if Instant::now() >= deadline {
801 break;
802 }
803
804 let targets = select_and_rank_targets(
805 leader_provider.as_ref(),
806 backups.as_slice(),
807 policy,
808 &direct_config,
809 )
810 .await;
811 if targets.is_empty() {
812 continue;
813 }
814
815 drop(
816 timeout(
817 direct_attempt_timeout(&direct_config),
818 direct_transport.submit_direct(
819 tx_bytes.as_ref(),
820 &targets,
821 policy,
822 &direct_config,
823 ),
824 )
825 .await,
826 );
827 }
828 });
829}
830
831#[cfg(test)]
832fn spawn_agave_rebroadcast_task(
834 _tx_bytes: Arc<[u8]>,
835 _direct_transport: Arc<dyn DirectSubmitTransport>,
836 _leader_provider: Arc<dyn LeaderProvider>,
837 _backups: Vec<LeaderTarget>,
838 _policy: RoutingPolicy,
839 _direct_config: DirectSubmitConfig,
840) {
841}
842
843async fn select_and_rank_targets(
845 leader_provider: &(impl LeaderProvider + ?Sized),
846 backups: &[LeaderTarget],
847 policy: RoutingPolicy,
848 direct_config: &DirectSubmitConfig,
849) -> Vec<LeaderTarget> {
850 let targets = select_targets(leader_provider, backups, policy);
851 rank_targets_by_latency(targets, direct_config).await
852}
853
854async fn rank_targets_by_latency(
856 targets: Vec<LeaderTarget>,
857 direct_config: &DirectSubmitConfig,
858) -> Vec<LeaderTarget> {
859 if targets.len() <= 1 || !direct_config.latency_aware_targeting {
860 return targets;
861 }
862
863 let probe_timeout = direct_config.latency_probe_timeout;
864 let probe_port = direct_config.latency_probe_port;
865 let probe_count = targets
866 .len()
867 .min(direct_config.latency_probe_max_targets.max(1));
868 let mut latencies = vec![None; probe_count];
869 let mut probes = JoinSet::new();
870 for (idx, target) in targets.iter().take(probe_count).cloned().enumerate() {
871 probes.spawn(async move {
872 (
873 idx,
874 probe_target_latency(&target, probe_port, probe_timeout).await,
875 )
876 });
877 }
878 while let Some(result) = probes.join_next().await {
879 if let Ok((idx, latency)) = result
880 && idx < latencies.len()
881 && let Some(slot) = latencies.get_mut(idx)
882 {
883 *slot = latency;
884 }
885 }
886
887 let mut ranked = targets
888 .iter()
889 .take(probe_count)
890 .cloned()
891 .enumerate()
892 .collect::<Vec<_>>();
893 ranked.sort_by_key(|(idx, _target)| {
894 (
895 latencies.get(*idx).copied().flatten().unwrap_or(u128::MAX),
896 *idx,
897 )
898 });
899
900 let mut output = ranked
901 .into_iter()
902 .map(|(_idx, target)| target)
903 .collect::<Vec<_>>();
904 output.extend(targets.iter().skip(probe_count).cloned());
905 output
906}
907
908async fn probe_target_latency(
910 target: &LeaderTarget,
911 probe_port: Option<u16>,
912 probe_timeout: Duration,
913) -> Option<u128> {
914 let mut ports = vec![target.tpu_addr.port()];
915 if let Some(port) = probe_port
916 && port != target.tpu_addr.port()
917 {
918 ports.push(port);
919 }
920
921 let ip = target.tpu_addr.ip();
922 let mut best = None::<u128>;
923 for port in ports {
924 if let Some(latency) = probe_tcp_latency(ip, port, probe_timeout).await {
925 best = Some(best.map_or(latency, |current| current.min(latency)));
926 }
927 }
928 best
929}
930
931async fn probe_tcp_latency(
933 ip: std::net::IpAddr,
934 port: u16,
935 timeout_duration: Duration,
936) -> Option<u128> {
937 let start = Instant::now();
938 let addr = SocketAddr::new(ip, port);
939 let stream = timeout(timeout_duration, TcpStream::connect(addr))
940 .await
941 .ok()?
942 .ok()?;
943 drop(stream);
944 Some(start.elapsed().as_millis())
945}
946
947fn summarize_targets(targets: &[LeaderTarget]) -> (usize, usize) {
949 let selected_target_count = targets.len();
950 let selected_identity_count = targets
951 .iter()
952 .filter_map(|target| target.identity)
953 .collect::<HashSet<_>>()
954 .len();
955 (selected_target_count, selected_identity_count)
956}
957
958fn rotate_targets_for_attempt(
960 targets: &mut [LeaderTarget],
961 attempt_idx: usize,
962 policy: RoutingPolicy,
963) {
964 if attempt_idx == 0 || targets.len() <= 1 {
965 return;
966 }
967
968 let normalized = policy.normalized();
969 let stride = normalized.max_parallel_sends.max(1);
970 let rotation = attempt_idx
971 .saturating_mul(stride)
972 .checked_rem(targets.len())
973 .unwrap_or(0);
974 if rotation > 0 {
975 targets.rotate_left(rotation);
976 }
977}
978
979fn direct_attempt_timeout(direct_config: &DirectSubmitConfig) -> Duration {
981 direct_config
982 .global_timeout
983 .saturating_add(direct_config.per_target_timeout)
984 .saturating_add(direct_config.rebroadcast_interval)
985 .max(Duration::from_secs(8))
986}