1use std::{
4 collections::HashSet,
5 net::SocketAddr,
6 sync::Arc,
7 time::{Duration, Instant, SystemTime},
8};
9
10use sof_types::SignatureBytes;
11use solana_transaction::versioned::VersionedTransaction;
12use tokio::{
13 net::TcpStream,
14 task::JoinSet,
15 time::{sleep, timeout},
16};
17
18use super::{
19 DirectSubmitConfig, DirectSubmitTransport, JitoSubmitConfig, JitoSubmitTransport,
20 RpcSubmitConfig, RpcSubmitTransport, SignedTx, SubmitError, SubmitMode, SubmitReliability,
21 SubmitResult, SubmitTransportError, TxFlowSafetyQuality, TxFlowSafetySource,
22 TxSubmitClientBuilder, TxSubmitContext, TxSubmitGuardPolicy, TxSubmitOutcome,
23 TxSubmitOutcomeKind, TxSubmitOutcomeReporter, TxToxicFlowRejectionReason, TxToxicFlowTelemetry,
24 TxToxicFlowTelemetrySnapshot,
25};
26use crate::{
27 providers::{
28 LeaderProvider, LeaderTarget, RecentBlockhashProvider, RpcRecentBlockhashProvider,
29 StaticLeaderProvider,
30 },
31 routing::{RoutingPolicy, SignatureDeduper, select_targets},
32 submit::{JsonRpcTransport, types::TxSuppressionCache},
33};
34
35pub struct TxSubmitClient {
37 blockhash_provider: Arc<dyn RecentBlockhashProvider>,
39 on_demand_blockhash_provider: Option<Arc<RpcRecentBlockhashProvider>>,
41 leader_provider: Arc<dyn LeaderProvider>,
43 backups: Vec<LeaderTarget>,
45 policy: RoutingPolicy,
47 deduper: SignatureDeduper,
49 rpc_transport: Option<Arc<dyn RpcSubmitTransport>>,
51 direct_transport: Option<Arc<dyn DirectSubmitTransport>>,
53 jito_transport: Option<Arc<dyn JitoSubmitTransport>>,
55 rpc_config: RpcSubmitConfig,
57 jito_config: JitoSubmitConfig,
59 direct_config: DirectSubmitConfig,
61 flow_safety_source: Option<Arc<dyn TxFlowSafetySource>>,
63 guard_policy: TxSubmitGuardPolicy,
65 suppression: TxSuppressionCache,
67 telemetry: Arc<TxToxicFlowTelemetry>,
69 outcome_reporter: Option<Arc<dyn TxSubmitOutcomeReporter>>,
71}
72
73impl TxSubmitClient {
74 #[must_use]
76 pub fn builder() -> TxSubmitClientBuilder {
77 TxSubmitClientBuilder::new()
78 }
79
80 #[must_use]
82 pub fn new(
83 blockhash_provider: Arc<dyn RecentBlockhashProvider>,
84 leader_provider: Arc<dyn LeaderProvider>,
85 ) -> Self {
86 Self {
87 blockhash_provider,
88 on_demand_blockhash_provider: None,
89 leader_provider,
90 backups: Vec::new(),
91 policy: RoutingPolicy::default(),
92 deduper: SignatureDeduper::new(Duration::from_secs(10)),
93 rpc_transport: None,
94 direct_transport: None,
95 jito_transport: None,
96 rpc_config: RpcSubmitConfig::default(),
97 jito_config: JitoSubmitConfig::default(),
98 direct_config: DirectSubmitConfig::default(),
99 flow_safety_source: None,
100 guard_policy: TxSubmitGuardPolicy::default(),
101 suppression: TxSuppressionCache::default(),
102 telemetry: TxToxicFlowTelemetry::shared(),
103 outcome_reporter: None,
104 }
105 }
106
107 #[must_use]
109 pub fn blockhash_only(blockhash_provider: Arc<dyn RecentBlockhashProvider>) -> Self {
110 Self::new(
111 blockhash_provider,
112 Arc::new(StaticLeaderProvider::default()),
113 )
114 }
115
116 pub fn blockhash_via_rpc(rpc_url: impl Into<String>) -> Result<Self, SubmitTransportError> {
122 let blockhash_provider = Arc::new(RpcRecentBlockhashProvider::new(rpc_url.into())?);
123 Ok(Self::blockhash_only(blockhash_provider.clone())
124 .with_rpc_blockhash_provider(blockhash_provider))
125 }
126
127 pub fn rpc_only(rpc_url: impl Into<String>) -> Result<Self, SubmitTransportError> {
134 let rpc_url = rpc_url.into();
135 let client = Self::blockhash_via_rpc(rpc_url.clone())?;
136 let rpc_transport = Arc::new(JsonRpcTransport::new(rpc_url)?);
137 Ok(client.with_rpc_transport(rpc_transport))
138 }
139
140 #[must_use]
142 pub fn with_backups(mut self, backups: Vec<LeaderTarget>) -> Self {
143 self.backups = backups;
144 self
145 }
146
147 #[must_use]
149 pub fn with_routing_policy(mut self, policy: RoutingPolicy) -> Self {
150 self.policy = policy.normalized();
151 self
152 }
153
154 #[must_use]
156 pub fn with_dedupe_ttl(mut self, ttl: Duration) -> Self {
157 self.deduper = SignatureDeduper::new(ttl);
158 self
159 }
160
161 #[must_use]
163 pub fn with_rpc_transport(mut self, transport: Arc<dyn RpcSubmitTransport>) -> Self {
164 self.rpc_transport = Some(transport);
165 self
166 }
167
168 #[must_use]
170 pub fn with_direct_transport(mut self, transport: Arc<dyn DirectSubmitTransport>) -> Self {
171 self.direct_transport = Some(transport);
172 self
173 }
174
175 #[must_use]
177 pub fn with_jito_transport(mut self, transport: Arc<dyn JitoSubmitTransport>) -> Self {
178 self.jito_transport = Some(transport);
179 self
180 }
181
182 #[must_use]
184 pub fn with_rpc_config(mut self, config: RpcSubmitConfig) -> Self {
185 self.rpc_config = config;
186 self
187 }
188
189 #[must_use]
191 pub const fn with_jito_config(mut self, config: JitoSubmitConfig) -> Self {
192 self.jito_config = config;
193 self
194 }
195
196 #[must_use]
198 pub const fn with_direct_config(mut self, config: DirectSubmitConfig) -> Self {
199 self.direct_config = config.normalized();
200 self
201 }
202
203 #[must_use]
205 pub const fn with_reliability(mut self, reliability: SubmitReliability) -> Self {
206 self.direct_config = DirectSubmitConfig::from_reliability(reliability);
207 self
208 }
209
210 #[must_use]
212 pub fn with_flow_safety_source(mut self, source: Arc<dyn TxFlowSafetySource>) -> Self {
213 self.flow_safety_source = Some(source);
214 self
215 }
216
217 #[must_use]
219 pub const fn with_guard_policy(mut self, policy: TxSubmitGuardPolicy) -> Self {
220 self.guard_policy = policy;
221 self
222 }
223
224 #[must_use]
226 pub fn with_outcome_reporter(mut self, reporter: Arc<dyn TxSubmitOutcomeReporter>) -> Self {
227 self.outcome_reporter = Some(reporter);
228 self
229 }
230
231 #[must_use]
233 pub fn with_rpc_blockhash_provider(
234 mut self,
235 provider: Arc<RpcRecentBlockhashProvider>,
236 ) -> Self {
237 self.on_demand_blockhash_provider = Some(provider);
238 self
239 }
240
241 #[must_use]
243 pub fn toxic_flow_telemetry(&self) -> TxToxicFlowTelemetrySnapshot {
244 self.telemetry.snapshot()
245 }
246
247 pub fn record_external_outcome(&self, outcome: &TxSubmitOutcome) {
249 self.telemetry.record(outcome);
250 if let Some(reporter) = &self.outcome_reporter {
251 reporter.record_outcome(outcome);
252 }
253 }
254
255 pub async fn submit_signed(
261 &mut self,
262 signed_tx: SignedTx,
263 mode: SubmitMode,
264 ) -> Result<SubmitResult, SubmitError> {
265 self.submit_signed_with_context(signed_tx, mode, TxSubmitContext::default())
266 .await
267 }
268
269 pub async fn submit_signed_with_context(
276 &mut self,
277 signed_tx: SignedTx,
278 mode: SubmitMode,
279 context: TxSubmitContext,
280 ) -> Result<SubmitResult, SubmitError> {
281 let tx_bytes = match signed_tx {
282 SignedTx::VersionedTransactionBytes(bytes) => bytes,
283 SignedTx::WireTransactionBytes(bytes) => bytes,
284 };
285 let tx: VersionedTransaction = bincode::deserialize(&tx_bytes)
286 .map_err(|source| SubmitError::DecodeSignedBytes { source })?;
287 let signature = tx
288 .signatures
289 .first()
290 .copied()
291 .map(SignatureBytes::from_solana);
292 self.submit_bytes(tx_bytes, signature, mode, context).await
293 }
294
295 pub async fn refresh_latest_blockhash_bytes(
304 &self,
305 ) -> Result<Option<[u8; 32]>, SubmitTransportError> {
306 if let Some(provider) = &self.on_demand_blockhash_provider {
307 let _ = provider.refresh().await?;
308 }
309 Ok(self.latest_blockhash_bytes())
310 }
311
312 #[must_use]
314 pub fn latest_blockhash_bytes(&self) -> Option<[u8; 32]> {
315 self.blockhash_provider.latest_blockhash()
316 }
317
318 async fn submit_bytes(
320 &mut self,
321 tx_bytes: Vec<u8>,
322 signature: Option<SignatureBytes>,
323 mode: SubmitMode,
324 context: TxSubmitContext,
325 ) -> Result<SubmitResult, SubmitError> {
326 self.enforce_toxic_flow_guards(signature, mode, &context)?;
327 self.enforce_dedupe(signature)?;
328 match mode {
329 SubmitMode::RpcOnly => self.submit_rpc_only(tx_bytes, signature, mode).await,
330 SubmitMode::JitoOnly => self.submit_jito_only(tx_bytes, signature, mode).await,
331 SubmitMode::DirectOnly => self.submit_direct_only(tx_bytes, signature, mode).await,
332 SubmitMode::Hybrid => self.submit_hybrid(tx_bytes, signature, mode).await,
333 }
334 }
335
336 fn enforce_dedupe(&mut self, signature: Option<SignatureBytes>) -> Result<(), SubmitError> {
338 if let Some(signature) = signature {
339 let now = Instant::now();
340 if !self.deduper.check_and_insert(signature, now) {
341 return Err(SubmitError::DuplicateSignature);
342 }
343 }
344 Ok(())
345 }
346
347 fn enforce_toxic_flow_guards(
349 &mut self,
350 signature: Option<SignatureBytes>,
351 mode: SubmitMode,
352 context: &TxSubmitContext,
353 ) -> Result<(), SubmitError> {
354 let now = SystemTime::now();
355 let opportunity_age_ms = context
356 .opportunity_created_at
357 .and_then(|created_at| now.duration_since(created_at).ok())
358 .map(|duration| duration.as_millis().min(u128::from(u64::MAX)) as u64);
359 if let Some(age_ms) = opportunity_age_ms
360 && let Some(max_age) = self.guard_policy.max_opportunity_age
361 {
362 let max_allowed_ms = max_age.as_millis().min(u128::from(u64::MAX)) as u64;
363 if age_ms > max_allowed_ms {
364 return Err(self.reject_with_outcome(
365 TxToxicFlowRejectionReason::OpportunityStale {
366 age_ms,
367 max_allowed_ms,
368 },
369 TxSubmitOutcomeKind::RejectedDueToStaleness,
370 signature,
371 mode,
372 None,
373 opportunity_age_ms,
374 ));
375 }
376 }
377
378 if self.suppression.is_suppressed(
379 &context.suppression_keys,
380 now,
381 self.guard_policy.suppression_ttl,
382 ) {
383 return Err(self.reject_with_outcome(
384 TxToxicFlowRejectionReason::Suppressed,
385 TxSubmitOutcomeKind::Suppressed,
386 signature,
387 mode,
388 None,
389 opportunity_age_ms,
390 ));
391 }
392
393 if let Some(source) = &self.flow_safety_source {
394 let snapshot = source.toxic_flow_snapshot();
395 if self.guard_policy.reject_on_replay_recovery_pending
396 && snapshot.replay_recovery_pending
397 {
398 return Err(self.reject_with_outcome(
399 TxToxicFlowRejectionReason::ReplayRecoveryPending,
400 TxSubmitOutcomeKind::RejectedDueToReplayRecovery,
401 signature,
402 mode,
403 snapshot.current_state_version,
404 opportunity_age_ms,
405 ));
406 }
407 if self.guard_policy.require_stable_control_plane
408 && !matches!(snapshot.quality, TxFlowSafetyQuality::Stable)
409 {
410 let outcome_kind = match snapshot.quality {
411 TxFlowSafetyQuality::ReorgRisk | TxFlowSafetyQuality::Provisional => {
412 TxSubmitOutcomeKind::RejectedDueToReorgRisk
413 }
414 TxFlowSafetyQuality::Stale => TxSubmitOutcomeKind::RejectedDueToStaleness,
415 TxFlowSafetyQuality::Degraded
416 | TxFlowSafetyQuality::IncompleteControlPlane
417 | TxFlowSafetyQuality::Stable => TxSubmitOutcomeKind::Suppressed,
418 };
419 return Err(self.reject_with_outcome(
420 TxToxicFlowRejectionReason::UnsafeControlPlane {
421 quality: snapshot.quality,
422 },
423 outcome_kind,
424 signature,
425 mode,
426 snapshot.current_state_version,
427 opportunity_age_ms,
428 ));
429 }
430 if let (Some(decision_version), Some(current_version), Some(max_allowed)) = (
431 context.decision_state_version,
432 snapshot.current_state_version,
433 self.guard_policy.max_state_version_drift,
434 ) {
435 let drift = current_version.saturating_sub(decision_version);
436 if drift > max_allowed {
437 return Err(self.reject_with_outcome(
438 TxToxicFlowRejectionReason::StateDrift { drift, max_allowed },
439 TxSubmitOutcomeKind::RejectedDueToStateDrift,
440 signature,
441 mode,
442 Some(current_version),
443 opportunity_age_ms,
444 ));
445 }
446 }
447 }
448
449 self.suppression.insert_all(&context.suppression_keys, now);
450 Ok(())
451 }
452
453 fn reject_with_outcome(
455 &self,
456 reason: TxToxicFlowRejectionReason,
457 outcome_kind: TxSubmitOutcomeKind,
458 signature: Option<SignatureBytes>,
459 mode: SubmitMode,
460 state_version: Option<u64>,
461 opportunity_age_ms: Option<u64>,
462 ) -> SubmitError {
463 let outcome = TxSubmitOutcome {
464 kind: outcome_kind,
465 signature,
466 mode,
467 state_version,
468 opportunity_age_ms,
469 };
470 self.record_external_outcome(&outcome);
471 SubmitError::ToxicFlow { reason }
472 }
473
474 async fn submit_rpc_only(
476 &self,
477 tx_bytes: Vec<u8>,
478 signature: Option<SignatureBytes>,
479 mode: SubmitMode,
480 ) -> Result<SubmitResult, SubmitError> {
481 let rpc = self
482 .rpc_transport
483 .as_ref()
484 .ok_or(SubmitError::MissingRpcTransport)?;
485 let rpc_signature = rpc
486 .submit_rpc(&tx_bytes, &self.rpc_config)
487 .await
488 .map_err(|source| SubmitError::Rpc { source })?;
489 self.record_external_outcome(&TxSubmitOutcome {
490 kind: TxSubmitOutcomeKind::RpcAccepted,
491 signature,
492 mode,
493 state_version: self
494 .flow_safety_source
495 .as_ref()
496 .and_then(|source| source.toxic_flow_snapshot().current_state_version),
497 opportunity_age_ms: None,
498 });
499 Ok(SubmitResult {
500 signature,
501 mode,
502 direct_target: None,
503 rpc_signature: Some(rpc_signature),
504 jito_signature: None,
505 jito_bundle_id: None,
506 used_rpc_fallback: false,
507 selected_target_count: 0,
508 selected_identity_count: 0,
509 })
510 }
511
512 async fn submit_jito_only(
514 &self,
515 tx_bytes: Vec<u8>,
516 signature: Option<SignatureBytes>,
517 mode: SubmitMode,
518 ) -> Result<SubmitResult, SubmitError> {
519 let jito = self
520 .jito_transport
521 .as_ref()
522 .ok_or(SubmitError::MissingJitoTransport)?;
523 let jito_response = jito
524 .submit_jito(&tx_bytes, &self.jito_config)
525 .await
526 .map_err(|source| SubmitError::Jito { source })?;
527 self.record_external_outcome(&TxSubmitOutcome {
528 kind: TxSubmitOutcomeKind::JitoAccepted,
529 signature,
530 mode,
531 state_version: self
532 .flow_safety_source
533 .as_ref()
534 .and_then(|source| source.toxic_flow_snapshot().current_state_version),
535 opportunity_age_ms: None,
536 });
537 Ok(SubmitResult {
538 signature,
539 mode,
540 direct_target: None,
541 rpc_signature: None,
542 jito_signature: jito_response.transaction_signature,
543 jito_bundle_id: jito_response.bundle_id,
544 used_rpc_fallback: false,
545 selected_target_count: 0,
546 selected_identity_count: 0,
547 })
548 }
549
550 async fn submit_direct_only(
552 &self,
553 tx_bytes: Vec<u8>,
554 signature: Option<SignatureBytes>,
555 mode: SubmitMode,
556 ) -> Result<SubmitResult, SubmitError> {
557 let direct = self
558 .direct_transport
559 .as_ref()
560 .ok_or(SubmitError::MissingDirectTransport)?;
561 let direct_config = self.direct_config.clone().normalized();
562 let mut last_error = None;
563 let attempt_timeout = direct_attempt_timeout(&direct_config);
564
565 for attempt_idx in 0..direct_config.direct_submit_attempts {
566 let mut targets = self.select_direct_targets(&direct_config).await;
567 rotate_targets_for_attempt(&mut targets, attempt_idx, self.policy);
568 let (selected_target_count, selected_identity_count) = summarize_targets(&targets);
569 if targets.is_empty() {
570 return Err(SubmitError::NoDirectTargets);
571 }
572 match timeout(
573 attempt_timeout,
574 direct.submit_direct(&tx_bytes, &targets, self.policy, &direct_config),
575 )
576 .await
577 {
578 Ok(Ok(target)) => {
579 self.record_external_outcome(&TxSubmitOutcome {
580 kind: TxSubmitOutcomeKind::DirectAccepted,
581 signature,
582 mode,
583 state_version: self
584 .flow_safety_source
585 .as_ref()
586 .and_then(|source| source.toxic_flow_snapshot().current_state_version),
587 opportunity_age_ms: None,
588 });
589 self.spawn_agave_rebroadcast(Arc::from(tx_bytes), &direct_config);
590 return Ok(SubmitResult {
591 signature,
592 mode,
593 direct_target: Some(target),
594 rpc_signature: None,
595 jito_signature: None,
596 jito_bundle_id: None,
597 used_rpc_fallback: false,
598 selected_target_count,
599 selected_identity_count,
600 });
601 }
602 Ok(Err(source)) => last_error = Some(source),
603 Err(_elapsed) => {
604 last_error = Some(super::SubmitTransportError::Failure {
605 message: format!(
606 "direct submit attempt timed out after {}ms",
607 attempt_timeout.as_millis()
608 ),
609 });
610 }
611 }
612 if attempt_idx < direct_config.direct_submit_attempts.saturating_sub(1) {
613 sleep(direct_config.rebroadcast_interval).await;
614 }
615 }
616
617 Err(SubmitError::Direct {
618 source: last_error.unwrap_or_else(|| super::SubmitTransportError::Failure {
619 message: "direct submit attempts exhausted".to_owned(),
620 }),
621 })
622 }
623
624 async fn submit_hybrid(
626 &self,
627 tx_bytes: Vec<u8>,
628 signature: Option<SignatureBytes>,
629 mode: SubmitMode,
630 ) -> Result<SubmitResult, SubmitError> {
631 let direct = self
632 .direct_transport
633 .as_ref()
634 .ok_or(SubmitError::MissingDirectTransport)?;
635 let rpc = self
636 .rpc_transport
637 .as_ref()
638 .ok_or(SubmitError::MissingRpcTransport)?;
639
640 let direct_config = self.direct_config.clone().normalized();
641 let attempt_timeout = direct_attempt_timeout(&direct_config);
642 for attempt_idx in 0..direct_config.hybrid_direct_attempts {
643 let mut targets = self.select_direct_targets(&direct_config).await;
644 rotate_targets_for_attempt(&mut targets, attempt_idx, self.policy);
645 let (selected_target_count, selected_identity_count) = summarize_targets(&targets);
646 if targets.is_empty() {
647 break;
648 }
649 if let Ok(Ok(target)) = timeout(
650 attempt_timeout,
651 direct.submit_direct(&tx_bytes, &targets, self.policy, &direct_config),
652 )
653 .await
654 {
655 let tx_bytes = Arc::<[u8]>::from(tx_bytes);
656 self.spawn_agave_rebroadcast(Arc::clone(&tx_bytes), &direct_config);
657 if direct_config.hybrid_rpc_broadcast
658 && let Ok(rpc_signature) =
659 rpc.submit_rpc(tx_bytes.as_ref(), &self.rpc_config).await
660 {
661 self.record_external_outcome(&TxSubmitOutcome {
662 kind: TxSubmitOutcomeKind::DirectAccepted,
663 signature,
664 mode,
665 state_version: self
666 .flow_safety_source
667 .as_ref()
668 .and_then(|source| source.toxic_flow_snapshot().current_state_version),
669 opportunity_age_ms: None,
670 });
671 return Ok(SubmitResult {
672 signature,
673 mode,
674 direct_target: Some(target),
675 rpc_signature: Some(rpc_signature),
676 jito_signature: None,
677 jito_bundle_id: None,
678 used_rpc_fallback: false,
679 selected_target_count,
680 selected_identity_count,
681 });
682 }
683 self.record_external_outcome(&TxSubmitOutcome {
684 kind: TxSubmitOutcomeKind::DirectAccepted,
685 signature,
686 mode,
687 state_version: self
688 .flow_safety_source
689 .as_ref()
690 .and_then(|source| source.toxic_flow_snapshot().current_state_version),
691 opportunity_age_ms: None,
692 });
693 return Ok(SubmitResult {
694 signature,
695 mode,
696 direct_target: Some(target),
697 rpc_signature: None,
698 jito_signature: None,
699 jito_bundle_id: None,
700 used_rpc_fallback: false,
701 selected_target_count,
702 selected_identity_count,
703 });
704 }
705 if attempt_idx < direct_config.hybrid_direct_attempts.saturating_sub(1) {
706 sleep(direct_config.rebroadcast_interval).await;
707 }
708 }
709
710 let rpc_signature = rpc
711 .submit_rpc(&tx_bytes, &self.rpc_config)
712 .await
713 .map_err(|source| SubmitError::Rpc { source })?;
714 self.record_external_outcome(&TxSubmitOutcome {
715 kind: TxSubmitOutcomeKind::RpcAccepted,
716 signature,
717 mode,
718 state_version: self
719 .flow_safety_source
720 .as_ref()
721 .and_then(|source| source.toxic_flow_snapshot().current_state_version),
722 opportunity_age_ms: None,
723 });
724 Ok(SubmitResult {
725 signature,
726 mode,
727 direct_target: None,
728 rpc_signature: Some(rpc_signature),
729 jito_signature: None,
730 jito_bundle_id: None,
731 used_rpc_fallback: true,
732 selected_target_count: 0,
733 selected_identity_count: 0,
734 })
735 }
736
737 async fn select_direct_targets(&self, direct_config: &DirectSubmitConfig) -> Vec<LeaderTarget> {
739 select_and_rank_targets(
740 self.leader_provider.as_ref(),
741 &self.backups,
742 self.policy,
743 direct_config,
744 )
745 .await
746 }
747
748 fn spawn_agave_rebroadcast(&self, tx_bytes: Arc<[u8]>, direct_config: &DirectSubmitConfig) {
750 if !direct_config.agave_rebroadcast_enabled
751 || direct_config.agave_rebroadcast_window.is_zero()
752 {
753 return;
754 }
755 let Some(direct_transport) = self.direct_transport.clone() else {
756 return;
757 };
758 spawn_agave_rebroadcast_task(
759 tx_bytes,
760 direct_transport,
761 self.leader_provider.clone(),
762 self.backups.clone(),
763 self.policy,
764 direct_config.clone(),
765 );
766 }
767}
768
769#[cfg(not(test))]
770fn spawn_agave_rebroadcast_task(
772 tx_bytes: Arc<[u8]>,
773 direct_transport: Arc<dyn DirectSubmitTransport>,
774 leader_provider: Arc<dyn LeaderProvider>,
775 backups: Vec<LeaderTarget>,
776 policy: RoutingPolicy,
777 direct_config: DirectSubmitConfig,
778) {
779 tokio::spawn(async move {
780 let deadline = Instant::now()
781 .checked_add(direct_config.agave_rebroadcast_window)
782 .unwrap_or_else(Instant::now);
783 loop {
784 let now = Instant::now();
785 if now >= deadline {
786 break;
787 }
788
789 let sleep_for = deadline
790 .saturating_duration_since(now)
791 .min(direct_config.agave_rebroadcast_interval);
792 if !sleep_for.is_zero() {
793 sleep(sleep_for).await;
794 }
795
796 if Instant::now() >= deadline {
797 break;
798 }
799
800 let targets = select_and_rank_targets(
801 leader_provider.as_ref(),
802 backups.as_slice(),
803 policy,
804 &direct_config,
805 )
806 .await;
807 if targets.is_empty() {
808 continue;
809 }
810
811 drop(
812 timeout(
813 direct_attempt_timeout(&direct_config),
814 direct_transport.submit_direct(
815 tx_bytes.as_ref(),
816 &targets,
817 policy,
818 &direct_config,
819 ),
820 )
821 .await,
822 );
823 }
824 });
825}
826
827#[cfg(test)]
828fn spawn_agave_rebroadcast_task(
830 _tx_bytes: Arc<[u8]>,
831 _direct_transport: Arc<dyn DirectSubmitTransport>,
832 _leader_provider: Arc<dyn LeaderProvider>,
833 _backups: Vec<LeaderTarget>,
834 _policy: RoutingPolicy,
835 _direct_config: DirectSubmitConfig,
836) {
837}
838
839async fn select_and_rank_targets(
841 leader_provider: &(impl LeaderProvider + ?Sized),
842 backups: &[LeaderTarget],
843 policy: RoutingPolicy,
844 direct_config: &DirectSubmitConfig,
845) -> Vec<LeaderTarget> {
846 let targets = select_targets(leader_provider, backups, policy);
847 rank_targets_by_latency(targets, direct_config).await
848}
849
850async fn rank_targets_by_latency(
852 targets: Vec<LeaderTarget>,
853 direct_config: &DirectSubmitConfig,
854) -> Vec<LeaderTarget> {
855 if targets.len() <= 1 || !direct_config.latency_aware_targeting {
856 return targets;
857 }
858
859 let probe_timeout = direct_config.latency_probe_timeout;
860 let probe_port = direct_config.latency_probe_port;
861 let probe_count = targets
862 .len()
863 .min(direct_config.latency_probe_max_targets.max(1));
864 let mut latencies = vec![None; probe_count];
865 let mut probes = JoinSet::new();
866 for (idx, target) in targets.iter().take(probe_count).cloned().enumerate() {
867 probes.spawn(async move {
868 (
869 idx,
870 probe_target_latency(&target, probe_port, probe_timeout).await,
871 )
872 });
873 }
874 while let Some(result) = probes.join_next().await {
875 if let Ok((idx, latency)) = result
876 && idx < latencies.len()
877 && let Some(slot) = latencies.get_mut(idx)
878 {
879 *slot = latency;
880 }
881 }
882
883 let mut ranked = targets
884 .iter()
885 .take(probe_count)
886 .cloned()
887 .enumerate()
888 .collect::<Vec<_>>();
889 ranked.sort_by_key(|(idx, _target)| {
890 (
891 latencies.get(*idx).copied().flatten().unwrap_or(u128::MAX),
892 *idx,
893 )
894 });
895
896 let mut output = ranked
897 .into_iter()
898 .map(|(_idx, target)| target)
899 .collect::<Vec<_>>();
900 output.extend(targets.iter().skip(probe_count).cloned());
901 output
902}
903
904async fn probe_target_latency(
906 target: &LeaderTarget,
907 probe_port: Option<u16>,
908 probe_timeout: Duration,
909) -> Option<u128> {
910 let mut ports = vec![target.tpu_addr.port()];
911 if let Some(port) = probe_port
912 && port != target.tpu_addr.port()
913 {
914 ports.push(port);
915 }
916
917 let ip = target.tpu_addr.ip();
918 let mut best = None::<u128>;
919 for port in ports {
920 if let Some(latency) = probe_tcp_latency(ip, port, probe_timeout).await {
921 best = Some(best.map_or(latency, |current| current.min(latency)));
922 }
923 }
924 best
925}
926
927async fn probe_tcp_latency(
929 ip: std::net::IpAddr,
930 port: u16,
931 timeout_duration: Duration,
932) -> Option<u128> {
933 let start = Instant::now();
934 let addr = SocketAddr::new(ip, port);
935 let stream = timeout(timeout_duration, TcpStream::connect(addr))
936 .await
937 .ok()?
938 .ok()?;
939 drop(stream);
940 Some(start.elapsed().as_millis())
941}
942
943fn summarize_targets(targets: &[LeaderTarget]) -> (usize, usize) {
945 let selected_target_count = targets.len();
946 let selected_identity_count = targets
947 .iter()
948 .filter_map(|target| target.identity)
949 .collect::<HashSet<_>>()
950 .len();
951 (selected_target_count, selected_identity_count)
952}
953
954fn rotate_targets_for_attempt(
956 targets: &mut [LeaderTarget],
957 attempt_idx: usize,
958 policy: RoutingPolicy,
959) {
960 if attempt_idx == 0 || targets.len() <= 1 {
961 return;
962 }
963
964 let normalized = policy.normalized();
965 let stride = normalized.max_parallel_sends.max(1);
966 let rotation = attempt_idx
967 .saturating_mul(stride)
968 .checked_rem(targets.len())
969 .unwrap_or(0);
970 if rotation > 0 {
971 targets.rotate_left(rotation);
972 }
973}
974
975fn direct_attempt_timeout(direct_config: &DirectSubmitConfig) -> Duration {
977 direct_config
978 .global_timeout
979 .saturating_add(direct_config.per_target_timeout)
980 .saturating_add(direct_config.rebroadcast_interval)
981 .max(Duration::from_secs(8))
982}