1use std::{
4 collections::HashSet,
5 net::SocketAddr,
6 sync::Arc,
7 time::{Duration, Instant, SystemTime},
8};
9
10use solana_signature::Signature;
11use solana_signer::signers::Signers;
12use solana_transaction::versioned::VersionedTransaction;
13use tokio::{
14 net::TcpStream,
15 task::JoinSet,
16 time::{sleep, timeout},
17};
18
19use super::{
20 DirectSubmitConfig, DirectSubmitTransport, JitoSubmitConfig, JitoSubmitTransport,
21 RpcSubmitConfig, RpcSubmitTransport, SignedTx, SubmitError, SubmitMode, SubmitReliability,
22 SubmitResult, SubmitTransportError, TxFlowSafetyQuality, TxFlowSafetySource,
23 TxSubmitClientBuilder, TxSubmitContext, TxSubmitGuardPolicy, TxSubmitOutcome,
24 TxSubmitOutcomeKind, TxSubmitOutcomeReporter, TxToxicFlowRejectionReason, TxToxicFlowTelemetry,
25 TxToxicFlowTelemetrySnapshot,
26};
27use crate::{
28 builder::TxBuilder,
29 providers::{
30 LeaderProvider, LeaderTarget, RecentBlockhashProvider, RpcRecentBlockhashProvider,
31 StaticLeaderProvider,
32 },
33 routing::{RoutingPolicy, SignatureDeduper, select_targets},
34 submit::{JsonRpcTransport, types::TxSuppressionCache},
35};
36
37pub struct TxSubmitClient {
39 blockhash_provider: Arc<dyn RecentBlockhashProvider>,
41 on_demand_blockhash_provider: Option<Arc<RpcRecentBlockhashProvider>>,
43 leader_provider: Arc<dyn LeaderProvider>,
45 backups: Vec<LeaderTarget>,
47 policy: RoutingPolicy,
49 deduper: SignatureDeduper,
51 rpc_transport: Option<Arc<dyn RpcSubmitTransport>>,
53 direct_transport: Option<Arc<dyn DirectSubmitTransport>>,
55 jito_transport: Option<Arc<dyn JitoSubmitTransport>>,
57 rpc_config: RpcSubmitConfig,
59 jito_config: JitoSubmitConfig,
61 direct_config: DirectSubmitConfig,
63 flow_safety_source: Option<Arc<dyn TxFlowSafetySource>>,
65 guard_policy: TxSubmitGuardPolicy,
67 suppression: TxSuppressionCache,
69 telemetry: Arc<TxToxicFlowTelemetry>,
71 outcome_reporter: Option<Arc<dyn TxSubmitOutcomeReporter>>,
73}
74
75impl TxSubmitClient {
76 #[must_use]
78 pub fn builder() -> TxSubmitClientBuilder {
79 TxSubmitClientBuilder::new()
80 }
81
82 #[must_use]
84 pub fn new(
85 blockhash_provider: Arc<dyn RecentBlockhashProvider>,
86 leader_provider: Arc<dyn LeaderProvider>,
87 ) -> Self {
88 Self {
89 blockhash_provider,
90 on_demand_blockhash_provider: None,
91 leader_provider,
92 backups: Vec::new(),
93 policy: RoutingPolicy::default(),
94 deduper: SignatureDeduper::new(Duration::from_secs(10)),
95 rpc_transport: None,
96 direct_transport: None,
97 jito_transport: None,
98 rpc_config: RpcSubmitConfig::default(),
99 jito_config: JitoSubmitConfig::default(),
100 direct_config: DirectSubmitConfig::default(),
101 flow_safety_source: None,
102 guard_policy: TxSubmitGuardPolicy::default(),
103 suppression: TxSuppressionCache::default(),
104 telemetry: TxToxicFlowTelemetry::shared(),
105 outcome_reporter: None,
106 }
107 }
108
109 #[must_use]
111 pub fn blockhash_only(blockhash_provider: Arc<dyn RecentBlockhashProvider>) -> Self {
112 Self::new(
113 blockhash_provider,
114 Arc::new(StaticLeaderProvider::default()),
115 )
116 }
117
118 pub fn blockhash_via_rpc(rpc_url: impl Into<String>) -> Result<Self, SubmitTransportError> {
124 let blockhash_provider = Arc::new(RpcRecentBlockhashProvider::new(rpc_url.into())?);
125 Ok(Self::blockhash_only(blockhash_provider.clone())
126 .with_rpc_blockhash_provider(blockhash_provider))
127 }
128
129 pub fn rpc_only(rpc_url: impl Into<String>) -> Result<Self, SubmitTransportError> {
136 let rpc_url = rpc_url.into();
137 let client = Self::blockhash_via_rpc(rpc_url.clone())?;
138 let rpc_transport = Arc::new(JsonRpcTransport::new(rpc_url)?);
139 Ok(client.with_rpc_transport(rpc_transport))
140 }
141
142 #[must_use]
144 pub fn with_backups(mut self, backups: Vec<LeaderTarget>) -> Self {
145 self.backups = backups;
146 self
147 }
148
149 #[must_use]
151 pub fn with_routing_policy(mut self, policy: RoutingPolicy) -> Self {
152 self.policy = policy.normalized();
153 self
154 }
155
156 #[must_use]
158 pub fn with_dedupe_ttl(mut self, ttl: Duration) -> Self {
159 self.deduper = SignatureDeduper::new(ttl);
160 self
161 }
162
163 #[must_use]
165 pub fn with_rpc_transport(mut self, transport: Arc<dyn RpcSubmitTransport>) -> Self {
166 self.rpc_transport = Some(transport);
167 self
168 }
169
170 #[must_use]
172 pub fn with_direct_transport(mut self, transport: Arc<dyn DirectSubmitTransport>) -> Self {
173 self.direct_transport = Some(transport);
174 self
175 }
176
177 #[must_use]
179 pub fn with_jito_transport(mut self, transport: Arc<dyn JitoSubmitTransport>) -> Self {
180 self.jito_transport = Some(transport);
181 self
182 }
183
184 #[must_use]
186 pub fn with_rpc_config(mut self, config: RpcSubmitConfig) -> Self {
187 self.rpc_config = config;
188 self
189 }
190
191 #[must_use]
193 pub const fn with_jito_config(mut self, config: JitoSubmitConfig) -> Self {
194 self.jito_config = config;
195 self
196 }
197
198 #[must_use]
200 pub const fn with_direct_config(mut self, config: DirectSubmitConfig) -> Self {
201 self.direct_config = config.normalized();
202 self
203 }
204
205 #[must_use]
207 pub const fn with_reliability(mut self, reliability: SubmitReliability) -> Self {
208 self.direct_config = DirectSubmitConfig::from_reliability(reliability);
209 self
210 }
211
212 #[must_use]
214 pub fn with_flow_safety_source(mut self, source: Arc<dyn TxFlowSafetySource>) -> Self {
215 self.flow_safety_source = Some(source);
216 self
217 }
218
219 #[must_use]
221 pub const fn with_guard_policy(mut self, policy: TxSubmitGuardPolicy) -> Self {
222 self.guard_policy = policy;
223 self
224 }
225
226 #[must_use]
228 pub fn with_outcome_reporter(mut self, reporter: Arc<dyn TxSubmitOutcomeReporter>) -> Self {
229 self.outcome_reporter = Some(reporter);
230 self
231 }
232
233 #[must_use]
235 pub fn with_rpc_blockhash_provider(
236 mut self,
237 provider: Arc<RpcRecentBlockhashProvider>,
238 ) -> Self {
239 self.on_demand_blockhash_provider = Some(provider);
240 self
241 }
242
243 #[must_use]
245 pub fn toxic_flow_telemetry(&self) -> TxToxicFlowTelemetrySnapshot {
246 self.telemetry.snapshot()
247 }
248
249 pub fn record_external_outcome(&self, outcome: &TxSubmitOutcome) {
251 self.telemetry.record(outcome);
252 if let Some(reporter) = &self.outcome_reporter {
253 reporter.record_outcome(outcome);
254 }
255 }
256
257 pub async fn submit_unsigned<T>(
264 &mut self,
265 builder: TxBuilder,
266 signers: &T,
267 mode: SubmitMode,
268 ) -> Result<SubmitResult, SubmitError>
269 where
270 T: Signers + ?Sized,
271 {
272 if let Some(provider) = &self.on_demand_blockhash_provider {
273 provider
274 .refresh()
275 .await
276 .map_err(|source| SubmitError::Rpc { source })?;
277 }
278 let blockhash = self
279 .blockhash_provider
280 .latest_blockhash()
281 .ok_or(SubmitError::MissingRecentBlockhash)?;
282 let tx = builder
283 .build_and_sign(blockhash, signers)
284 .map_err(|source| SubmitError::Build { source })?;
285 self.submit_transaction_with_context(tx, mode, TxSubmitContext::default())
286 .await
287 }
288
289 pub async fn submit_unsigned_with_context<T>(
296 &mut self,
297 builder: TxBuilder,
298 signers: &T,
299 mode: SubmitMode,
300 context: TxSubmitContext,
301 ) -> Result<SubmitResult, SubmitError>
302 where
303 T: Signers + ?Sized,
304 {
305 if let Some(provider) = &self.on_demand_blockhash_provider {
306 provider
307 .refresh()
308 .await
309 .map_err(|source| SubmitError::Rpc { source })?;
310 }
311 let blockhash = self
312 .blockhash_provider
313 .latest_blockhash()
314 .ok_or(SubmitError::MissingRecentBlockhash)?;
315 let tx = builder
316 .build_and_sign(blockhash, signers)
317 .map_err(|source| SubmitError::Build { source })?;
318 self.submit_transaction_with_context(tx, mode, context)
319 .await
320 }
321
322 pub async fn submit_transaction(
328 &mut self,
329 tx: VersionedTransaction,
330 mode: SubmitMode,
331 ) -> Result<SubmitResult, SubmitError> {
332 self.submit_transaction_with_context(tx, mode, TxSubmitContext::default())
333 .await
334 }
335
336 pub async fn submit_transaction_with_context(
343 &mut self,
344 tx: VersionedTransaction,
345 mode: SubmitMode,
346 context: TxSubmitContext,
347 ) -> Result<SubmitResult, SubmitError> {
348 let signature = tx.signatures.first().copied();
349 let tx_bytes =
350 bincode::serialize(&tx).map_err(|source| SubmitError::DecodeSignedBytes { source })?;
351 self.submit_bytes(tx_bytes, signature, mode, context).await
352 }
353
354 pub async fn submit_signed(
360 &mut self,
361 signed_tx: SignedTx,
362 mode: SubmitMode,
363 ) -> Result<SubmitResult, SubmitError> {
364 self.submit_signed_with_context(signed_tx, mode, TxSubmitContext::default())
365 .await
366 }
367
368 pub async fn submit_signed_with_context(
375 &mut self,
376 signed_tx: SignedTx,
377 mode: SubmitMode,
378 context: TxSubmitContext,
379 ) -> Result<SubmitResult, SubmitError> {
380 let tx_bytes = match signed_tx {
381 SignedTx::VersionedTransactionBytes(bytes) => bytes,
382 SignedTx::WireTransactionBytes(bytes) => bytes,
383 };
384 let tx: VersionedTransaction = bincode::deserialize(&tx_bytes)
385 .map_err(|source| SubmitError::DecodeSignedBytes { source })?;
386 let signature = tx.signatures.first().copied();
387 self.submit_bytes(tx_bytes, signature, mode, context).await
388 }
389
390 async fn submit_bytes(
392 &mut self,
393 tx_bytes: Vec<u8>,
394 signature: Option<Signature>,
395 mode: SubmitMode,
396 context: TxSubmitContext,
397 ) -> Result<SubmitResult, SubmitError> {
398 self.enforce_toxic_flow_guards(signature, mode, &context)?;
399 self.enforce_dedupe(signature)?;
400 match mode {
401 SubmitMode::RpcOnly => self.submit_rpc_only(tx_bytes, signature, mode).await,
402 SubmitMode::JitoOnly => self.submit_jito_only(tx_bytes, signature, mode).await,
403 SubmitMode::DirectOnly => self.submit_direct_only(tx_bytes, signature, mode).await,
404 SubmitMode::Hybrid => self.submit_hybrid(tx_bytes, signature, mode).await,
405 }
406 }
407
408 fn enforce_dedupe(&mut self, signature: Option<Signature>) -> Result<(), SubmitError> {
410 if let Some(signature) = signature {
411 let now = Instant::now();
412 if !self.deduper.check_and_insert(signature, now) {
413 return Err(SubmitError::DuplicateSignature);
414 }
415 }
416 Ok(())
417 }
418
419 fn enforce_toxic_flow_guards(
421 &mut self,
422 signature: Option<Signature>,
423 mode: SubmitMode,
424 context: &TxSubmitContext,
425 ) -> Result<(), SubmitError> {
426 let now = SystemTime::now();
427 let opportunity_age_ms = context
428 .opportunity_created_at
429 .and_then(|created_at| now.duration_since(created_at).ok())
430 .map(|duration| duration.as_millis().min(u128::from(u64::MAX)) as u64);
431 if let Some(age_ms) = opportunity_age_ms
432 && let Some(max_age) = self.guard_policy.max_opportunity_age
433 {
434 let max_allowed_ms = max_age.as_millis().min(u128::from(u64::MAX)) as u64;
435 if age_ms > max_allowed_ms {
436 return Err(self.reject_with_outcome(
437 TxToxicFlowRejectionReason::OpportunityStale {
438 age_ms,
439 max_allowed_ms,
440 },
441 TxSubmitOutcomeKind::RejectedDueToStaleness,
442 signature,
443 mode,
444 None,
445 opportunity_age_ms,
446 ));
447 }
448 }
449
450 if self.suppression.is_suppressed(
451 &context.suppression_keys,
452 now,
453 self.guard_policy.suppression_ttl,
454 ) {
455 return Err(self.reject_with_outcome(
456 TxToxicFlowRejectionReason::Suppressed,
457 TxSubmitOutcomeKind::Suppressed,
458 signature,
459 mode,
460 None,
461 opportunity_age_ms,
462 ));
463 }
464
465 if let Some(source) = &self.flow_safety_source {
466 let snapshot = source.toxic_flow_snapshot();
467 if self.guard_policy.reject_on_replay_recovery_pending
468 && snapshot.replay_recovery_pending
469 {
470 return Err(self.reject_with_outcome(
471 TxToxicFlowRejectionReason::ReplayRecoveryPending,
472 TxSubmitOutcomeKind::RejectedDueToReplayRecovery,
473 signature,
474 mode,
475 snapshot.current_state_version,
476 opportunity_age_ms,
477 ));
478 }
479 if self.guard_policy.require_stable_control_plane
480 && !matches!(snapshot.quality, TxFlowSafetyQuality::Stable)
481 {
482 let outcome_kind = match snapshot.quality {
483 TxFlowSafetyQuality::ReorgRisk | TxFlowSafetyQuality::Provisional => {
484 TxSubmitOutcomeKind::RejectedDueToReorgRisk
485 }
486 TxFlowSafetyQuality::Stale => TxSubmitOutcomeKind::RejectedDueToStaleness,
487 TxFlowSafetyQuality::Degraded
488 | TxFlowSafetyQuality::IncompleteControlPlane
489 | TxFlowSafetyQuality::Stable => TxSubmitOutcomeKind::Suppressed,
490 };
491 return Err(self.reject_with_outcome(
492 TxToxicFlowRejectionReason::UnsafeControlPlane {
493 quality: snapshot.quality,
494 },
495 outcome_kind,
496 signature,
497 mode,
498 snapshot.current_state_version,
499 opportunity_age_ms,
500 ));
501 }
502 if let (Some(decision_version), Some(current_version), Some(max_allowed)) = (
503 context.decision_state_version,
504 snapshot.current_state_version,
505 self.guard_policy.max_state_version_drift,
506 ) {
507 let drift = current_version.saturating_sub(decision_version);
508 if drift > max_allowed {
509 return Err(self.reject_with_outcome(
510 TxToxicFlowRejectionReason::StateDrift { drift, max_allowed },
511 TxSubmitOutcomeKind::RejectedDueToStateDrift,
512 signature,
513 mode,
514 Some(current_version),
515 opportunity_age_ms,
516 ));
517 }
518 }
519 }
520
521 self.suppression.insert_all(&context.suppression_keys, now);
522 Ok(())
523 }
524
525 fn reject_with_outcome(
527 &self,
528 reason: TxToxicFlowRejectionReason,
529 outcome_kind: TxSubmitOutcomeKind,
530 signature: Option<Signature>,
531 mode: SubmitMode,
532 state_version: Option<u64>,
533 opportunity_age_ms: Option<u64>,
534 ) -> SubmitError {
535 let outcome = TxSubmitOutcome {
536 kind: outcome_kind,
537 signature,
538 mode,
539 state_version,
540 opportunity_age_ms,
541 };
542 self.record_external_outcome(&outcome);
543 SubmitError::ToxicFlow { reason }
544 }
545
546 async fn submit_rpc_only(
548 &self,
549 tx_bytes: Vec<u8>,
550 signature: Option<Signature>,
551 mode: SubmitMode,
552 ) -> Result<SubmitResult, SubmitError> {
553 let rpc = self
554 .rpc_transport
555 .as_ref()
556 .ok_or(SubmitError::MissingRpcTransport)?;
557 let rpc_signature = rpc
558 .submit_rpc(&tx_bytes, &self.rpc_config)
559 .await
560 .map_err(|source| SubmitError::Rpc { source })?;
561 self.record_external_outcome(&TxSubmitOutcome {
562 kind: TxSubmitOutcomeKind::RpcAccepted,
563 signature,
564 mode,
565 state_version: self
566 .flow_safety_source
567 .as_ref()
568 .and_then(|source| source.toxic_flow_snapshot().current_state_version),
569 opportunity_age_ms: None,
570 });
571 Ok(SubmitResult {
572 signature,
573 mode,
574 direct_target: None,
575 rpc_signature: Some(rpc_signature),
576 jito_signature: None,
577 jito_bundle_id: None,
578 used_rpc_fallback: false,
579 selected_target_count: 0,
580 selected_identity_count: 0,
581 })
582 }
583
584 async fn submit_jito_only(
586 &self,
587 tx_bytes: Vec<u8>,
588 signature: Option<Signature>,
589 mode: SubmitMode,
590 ) -> Result<SubmitResult, SubmitError> {
591 let jito = self
592 .jito_transport
593 .as_ref()
594 .ok_or(SubmitError::MissingJitoTransport)?;
595 let jito_response = jito
596 .submit_jito(&tx_bytes, &self.jito_config)
597 .await
598 .map_err(|source| SubmitError::Jito { source })?;
599 self.record_external_outcome(&TxSubmitOutcome {
600 kind: TxSubmitOutcomeKind::JitoAccepted,
601 signature,
602 mode,
603 state_version: self
604 .flow_safety_source
605 .as_ref()
606 .and_then(|source| source.toxic_flow_snapshot().current_state_version),
607 opportunity_age_ms: None,
608 });
609 Ok(SubmitResult {
610 signature,
611 mode,
612 direct_target: None,
613 rpc_signature: None,
614 jito_signature: jito_response.transaction_signature,
615 jito_bundle_id: jito_response.bundle_id,
616 used_rpc_fallback: false,
617 selected_target_count: 0,
618 selected_identity_count: 0,
619 })
620 }
621
622 async fn submit_direct_only(
624 &self,
625 tx_bytes: Vec<u8>,
626 signature: Option<Signature>,
627 mode: SubmitMode,
628 ) -> Result<SubmitResult, SubmitError> {
629 let direct = self
630 .direct_transport
631 .as_ref()
632 .ok_or(SubmitError::MissingDirectTransport)?;
633 let direct_config = self.direct_config.clone().normalized();
634 let mut last_error = None;
635 let attempt_timeout = direct_attempt_timeout(&direct_config);
636
637 for attempt_idx in 0..direct_config.direct_submit_attempts {
638 let mut targets = self.select_direct_targets(&direct_config).await;
639 rotate_targets_for_attempt(&mut targets, attempt_idx, self.policy);
640 let (selected_target_count, selected_identity_count) = summarize_targets(&targets);
641 if targets.is_empty() {
642 return Err(SubmitError::NoDirectTargets);
643 }
644 match timeout(
645 attempt_timeout,
646 direct.submit_direct(&tx_bytes, &targets, self.policy, &direct_config),
647 )
648 .await
649 {
650 Ok(Ok(target)) => {
651 self.record_external_outcome(&TxSubmitOutcome {
652 kind: TxSubmitOutcomeKind::DirectAccepted,
653 signature,
654 mode,
655 state_version: self
656 .flow_safety_source
657 .as_ref()
658 .and_then(|source| source.toxic_flow_snapshot().current_state_version),
659 opportunity_age_ms: None,
660 });
661 self.spawn_agave_rebroadcast(Arc::from(tx_bytes), &direct_config);
662 return Ok(SubmitResult {
663 signature,
664 mode,
665 direct_target: Some(target),
666 rpc_signature: None,
667 jito_signature: None,
668 jito_bundle_id: None,
669 used_rpc_fallback: false,
670 selected_target_count,
671 selected_identity_count,
672 });
673 }
674 Ok(Err(source)) => last_error = Some(source),
675 Err(_elapsed) => {
676 last_error = Some(super::SubmitTransportError::Failure {
677 message: format!(
678 "direct submit attempt timed out after {}ms",
679 attempt_timeout.as_millis()
680 ),
681 });
682 }
683 }
684 if attempt_idx < direct_config.direct_submit_attempts.saturating_sub(1) {
685 sleep(direct_config.rebroadcast_interval).await;
686 }
687 }
688
689 Err(SubmitError::Direct {
690 source: last_error.unwrap_or_else(|| super::SubmitTransportError::Failure {
691 message: "direct submit attempts exhausted".to_owned(),
692 }),
693 })
694 }
695
696 async fn submit_hybrid(
698 &self,
699 tx_bytes: Vec<u8>,
700 signature: Option<Signature>,
701 mode: SubmitMode,
702 ) -> Result<SubmitResult, SubmitError> {
703 let direct = self
704 .direct_transport
705 .as_ref()
706 .ok_or(SubmitError::MissingDirectTransport)?;
707 let rpc = self
708 .rpc_transport
709 .as_ref()
710 .ok_or(SubmitError::MissingRpcTransport)?;
711
712 let direct_config = self.direct_config.clone().normalized();
713 let attempt_timeout = direct_attempt_timeout(&direct_config);
714 for attempt_idx in 0..direct_config.hybrid_direct_attempts {
715 let mut targets = self.select_direct_targets(&direct_config).await;
716 rotate_targets_for_attempt(&mut targets, attempt_idx, self.policy);
717 let (selected_target_count, selected_identity_count) = summarize_targets(&targets);
718 if targets.is_empty() {
719 break;
720 }
721 if let Ok(Ok(target)) = timeout(
722 attempt_timeout,
723 direct.submit_direct(&tx_bytes, &targets, self.policy, &direct_config),
724 )
725 .await
726 {
727 let tx_bytes = Arc::<[u8]>::from(tx_bytes);
728 self.spawn_agave_rebroadcast(Arc::clone(&tx_bytes), &direct_config);
729 if direct_config.hybrid_rpc_broadcast
730 && let Ok(rpc_signature) =
731 rpc.submit_rpc(tx_bytes.as_ref(), &self.rpc_config).await
732 {
733 self.record_external_outcome(&TxSubmitOutcome {
734 kind: TxSubmitOutcomeKind::DirectAccepted,
735 signature,
736 mode,
737 state_version: self
738 .flow_safety_source
739 .as_ref()
740 .and_then(|source| source.toxic_flow_snapshot().current_state_version),
741 opportunity_age_ms: None,
742 });
743 return Ok(SubmitResult {
744 signature,
745 mode,
746 direct_target: Some(target),
747 rpc_signature: Some(rpc_signature),
748 jito_signature: None,
749 jito_bundle_id: None,
750 used_rpc_fallback: false,
751 selected_target_count,
752 selected_identity_count,
753 });
754 }
755 self.record_external_outcome(&TxSubmitOutcome {
756 kind: TxSubmitOutcomeKind::DirectAccepted,
757 signature,
758 mode,
759 state_version: self
760 .flow_safety_source
761 .as_ref()
762 .and_then(|source| source.toxic_flow_snapshot().current_state_version),
763 opportunity_age_ms: None,
764 });
765 return Ok(SubmitResult {
766 signature,
767 mode,
768 direct_target: Some(target),
769 rpc_signature: None,
770 jito_signature: None,
771 jito_bundle_id: None,
772 used_rpc_fallback: false,
773 selected_target_count,
774 selected_identity_count,
775 });
776 }
777 if attempt_idx < direct_config.hybrid_direct_attempts.saturating_sub(1) {
778 sleep(direct_config.rebroadcast_interval).await;
779 }
780 }
781
782 let rpc_signature = rpc
783 .submit_rpc(&tx_bytes, &self.rpc_config)
784 .await
785 .map_err(|source| SubmitError::Rpc { source })?;
786 self.record_external_outcome(&TxSubmitOutcome {
787 kind: TxSubmitOutcomeKind::RpcAccepted,
788 signature,
789 mode,
790 state_version: self
791 .flow_safety_source
792 .as_ref()
793 .and_then(|source| source.toxic_flow_snapshot().current_state_version),
794 opportunity_age_ms: None,
795 });
796 Ok(SubmitResult {
797 signature,
798 mode,
799 direct_target: None,
800 rpc_signature: Some(rpc_signature),
801 jito_signature: None,
802 jito_bundle_id: None,
803 used_rpc_fallback: true,
804 selected_target_count: 0,
805 selected_identity_count: 0,
806 })
807 }
808
809 async fn select_direct_targets(&self, direct_config: &DirectSubmitConfig) -> Vec<LeaderTarget> {
811 select_and_rank_targets(
812 self.leader_provider.as_ref(),
813 &self.backups,
814 self.policy,
815 direct_config,
816 )
817 .await
818 }
819
820 fn spawn_agave_rebroadcast(&self, tx_bytes: Arc<[u8]>, direct_config: &DirectSubmitConfig) {
822 if !direct_config.agave_rebroadcast_enabled
823 || direct_config.agave_rebroadcast_window.is_zero()
824 {
825 return;
826 }
827 let Some(direct_transport) = self.direct_transport.clone() else {
828 return;
829 };
830 spawn_agave_rebroadcast_task(
831 tx_bytes,
832 direct_transport,
833 self.leader_provider.clone(),
834 self.backups.clone(),
835 self.policy,
836 direct_config.clone(),
837 );
838 }
839}
840
841#[cfg(not(test))]
842fn spawn_agave_rebroadcast_task(
844 tx_bytes: Arc<[u8]>,
845 direct_transport: Arc<dyn DirectSubmitTransport>,
846 leader_provider: Arc<dyn LeaderProvider>,
847 backups: Vec<LeaderTarget>,
848 policy: RoutingPolicy,
849 direct_config: DirectSubmitConfig,
850) {
851 tokio::spawn(async move {
852 let deadline = Instant::now()
853 .checked_add(direct_config.agave_rebroadcast_window)
854 .unwrap_or_else(Instant::now);
855 loop {
856 let now = Instant::now();
857 if now >= deadline {
858 break;
859 }
860
861 let sleep_for = deadline
862 .saturating_duration_since(now)
863 .min(direct_config.agave_rebroadcast_interval);
864 if !sleep_for.is_zero() {
865 sleep(sleep_for).await;
866 }
867
868 if Instant::now() >= deadline {
869 break;
870 }
871
872 let targets = select_and_rank_targets(
873 leader_provider.as_ref(),
874 backups.as_slice(),
875 policy,
876 &direct_config,
877 )
878 .await;
879 if targets.is_empty() {
880 continue;
881 }
882
883 drop(
884 timeout(
885 direct_attempt_timeout(&direct_config),
886 direct_transport.submit_direct(
887 tx_bytes.as_ref(),
888 &targets,
889 policy,
890 &direct_config,
891 ),
892 )
893 .await,
894 );
895 }
896 });
897}
898
899#[cfg(test)]
900fn spawn_agave_rebroadcast_task(
902 _tx_bytes: Arc<[u8]>,
903 _direct_transport: Arc<dyn DirectSubmitTransport>,
904 _leader_provider: Arc<dyn LeaderProvider>,
905 _backups: Vec<LeaderTarget>,
906 _policy: RoutingPolicy,
907 _direct_config: DirectSubmitConfig,
908) {
909}
910
911async fn select_and_rank_targets(
913 leader_provider: &(impl LeaderProvider + ?Sized),
914 backups: &[LeaderTarget],
915 policy: RoutingPolicy,
916 direct_config: &DirectSubmitConfig,
917) -> Vec<LeaderTarget> {
918 let targets = select_targets(leader_provider, backups, policy);
919 rank_targets_by_latency(targets, direct_config).await
920}
921
922async fn rank_targets_by_latency(
924 targets: Vec<LeaderTarget>,
925 direct_config: &DirectSubmitConfig,
926) -> Vec<LeaderTarget> {
927 if targets.len() <= 1 || !direct_config.latency_aware_targeting {
928 return targets;
929 }
930
931 let probe_timeout = direct_config.latency_probe_timeout;
932 let probe_port = direct_config.latency_probe_port;
933 let probe_count = targets
934 .len()
935 .min(direct_config.latency_probe_max_targets.max(1));
936 let mut latencies = vec![None; probe_count];
937 let mut probes = JoinSet::new();
938 for (idx, target) in targets.iter().take(probe_count).cloned().enumerate() {
939 probes.spawn(async move {
940 (
941 idx,
942 probe_target_latency(&target, probe_port, probe_timeout).await,
943 )
944 });
945 }
946 while let Some(result) = probes.join_next().await {
947 if let Ok((idx, latency)) = result
948 && idx < latencies.len()
949 && let Some(slot) = latencies.get_mut(idx)
950 {
951 *slot = latency;
952 }
953 }
954
955 let mut ranked = targets
956 .iter()
957 .take(probe_count)
958 .cloned()
959 .enumerate()
960 .collect::<Vec<_>>();
961 ranked.sort_by_key(|(idx, _target)| {
962 (
963 latencies.get(*idx).copied().flatten().unwrap_or(u128::MAX),
964 *idx,
965 )
966 });
967
968 let mut output = ranked
969 .into_iter()
970 .map(|(_idx, target)| target)
971 .collect::<Vec<_>>();
972 output.extend(targets.iter().skip(probe_count).cloned());
973 output
974}
975
976async fn probe_target_latency(
978 target: &LeaderTarget,
979 probe_port: Option<u16>,
980 probe_timeout: Duration,
981) -> Option<u128> {
982 let mut ports = vec![target.tpu_addr.port()];
983 if let Some(port) = probe_port
984 && port != target.tpu_addr.port()
985 {
986 ports.push(port);
987 }
988
989 let ip = target.tpu_addr.ip();
990 let mut best = None::<u128>;
991 for port in ports {
992 if let Some(latency) = probe_tcp_latency(ip, port, probe_timeout).await {
993 best = Some(best.map_or(latency, |current| current.min(latency)));
994 }
995 }
996 best
997}
998
999async fn probe_tcp_latency(
1001 ip: std::net::IpAddr,
1002 port: u16,
1003 timeout_duration: Duration,
1004) -> Option<u128> {
1005 let start = Instant::now();
1006 let addr = SocketAddr::new(ip, port);
1007 let stream = timeout(timeout_duration, TcpStream::connect(addr))
1008 .await
1009 .ok()?
1010 .ok()?;
1011 drop(stream);
1012 Some(start.elapsed().as_millis())
1013}
1014
1015fn summarize_targets(targets: &[LeaderTarget]) -> (usize, usize) {
1017 let selected_target_count = targets.len();
1018 let selected_identity_count = targets
1019 .iter()
1020 .filter_map(|target| target.identity)
1021 .collect::<HashSet<_>>()
1022 .len();
1023 (selected_target_count, selected_identity_count)
1024}
1025
1026fn rotate_targets_for_attempt(
1028 targets: &mut [LeaderTarget],
1029 attempt_idx: usize,
1030 policy: RoutingPolicy,
1031) {
1032 if attempt_idx == 0 || targets.len() <= 1 {
1033 return;
1034 }
1035
1036 let normalized = policy.normalized();
1037 let stride = normalized.max_parallel_sends.max(1);
1038 let rotation = attempt_idx
1039 .saturating_mul(stride)
1040 .checked_rem(targets.len())
1041 .unwrap_or(0);
1042 if rotation > 0 {
1043 targets.rotate_left(rotation);
1044 }
1045}
1046
1047fn direct_attempt_timeout(direct_config: &DirectSubmitConfig) -> Duration {
1049 direct_config
1050 .global_timeout
1051 .saturating_add(direct_config.per_target_timeout)
1052 .saturating_add(direct_config.rebroadcast_interval)
1053 .max(Duration::from_secs(8))
1054}