1use std::{
4 collections::HashMap,
5 collections::HashSet,
6 net::SocketAddr,
7 panic::{AssertUnwindSafe, catch_unwind},
8 sync::{
9 Arc, Mutex, OnceLock, Weak,
10 atomic::{AtomicBool, Ordering},
11 mpsc::{self as std_mpsc, SyncSender, TrySendError},
12 },
13 thread,
14 time::{Duration, Instant, SystemTime},
15};
16
17use sof_support::{short_vec::decode_short_u16_len_prefix, time_support::duration_millis_u64};
18use sof_types::SignatureBytes;
19use tokio::{
20 net::TcpStream,
21 sync::mpsc,
22 task::JoinSet,
23 time::{sleep, timeout},
24};
25
26use super::{
27 DirectSubmitConfig, DirectSubmitTransport, JitoSubmitConfig, JitoSubmitTransport,
28 RpcSubmitConfig, RpcSubmitTransport, SignedTx, SubmitError, SubmitMode, SubmitPlan,
29 SubmitReliability, SubmitResult, SubmitRoute, SubmitStrategy, SubmitTransportError,
30 TxFlowSafetyQuality, TxFlowSafetySource, TxSubmitClientBuilder, TxSubmitContext,
31 TxSubmitGuardPolicy, TxSubmitOutcome, TxSubmitOutcomeKind, TxSubmitOutcomeReporter,
32 TxToxicFlowRejectionReason, TxToxicFlowTelemetry, TxToxicFlowTelemetrySnapshot,
33};
34use crate::{
35 providers::{
36 LeaderProvider, LeaderTarget, RecentBlockhashProvider, RpcRecentBlockhashProvider,
37 StaticLeaderProvider,
38 },
39 routing::{RoutingPolicy, SignatureDeduper, select_targets},
40 submit::{JsonRpcTransport, types::TxSuppressionCache},
41};
42
43pub struct TxSubmitClient {
45 blockhash_provider: Arc<dyn RecentBlockhashProvider>,
47 on_demand_blockhash_provider: Option<Arc<RpcRecentBlockhashProvider>>,
49 leader_provider: Arc<dyn LeaderProvider>,
51 backups: Vec<LeaderTarget>,
53 policy: RoutingPolicy,
55 deduper: SignatureDeduper,
57 rpc_transport: Option<Arc<dyn RpcSubmitTransport>>,
59 direct_transport: Option<Arc<dyn DirectSubmitTransport>>,
61 jito_transport: Option<Arc<dyn JitoSubmitTransport>>,
63 rpc_config: RpcSubmitConfig,
65 jito_config: JitoSubmitConfig,
67 direct_config: DirectSubmitConfig,
69 flow_safety_source: Option<Arc<dyn TxFlowSafetySource>>,
71 guard_policy: TxSubmitGuardPolicy,
73 suppression: TxSuppressionCache,
75 telemetry: Arc<TxToxicFlowTelemetry>,
77 outcome_reporter: Option<OutcomeReporterHandle>,
79}
80
81impl TxSubmitClient {
82 #[must_use]
84 pub fn builder() -> TxSubmitClientBuilder {
85 TxSubmitClientBuilder::new()
86 }
87
88 #[must_use]
90 pub fn new(
91 blockhash_provider: Arc<dyn RecentBlockhashProvider>,
92 leader_provider: Arc<dyn LeaderProvider>,
93 ) -> Self {
94 Self {
95 blockhash_provider,
96 on_demand_blockhash_provider: None,
97 leader_provider,
98 backups: Vec::new(),
99 policy: RoutingPolicy::default(),
100 deduper: SignatureDeduper::new(Duration::from_secs(10)),
101 rpc_transport: None,
102 direct_transport: None,
103 jito_transport: None,
104 rpc_config: RpcSubmitConfig::default(),
105 jito_config: JitoSubmitConfig::default(),
106 direct_config: DirectSubmitConfig::default(),
107 flow_safety_source: None,
108 guard_policy: TxSubmitGuardPolicy::default(),
109 suppression: TxSuppressionCache::default(),
110 telemetry: TxToxicFlowTelemetry::shared(),
111 outcome_reporter: None,
112 }
113 }
114
115 #[must_use]
117 pub fn blockhash_only(blockhash_provider: Arc<dyn RecentBlockhashProvider>) -> Self {
118 Self::new(
119 blockhash_provider,
120 Arc::new(StaticLeaderProvider::default()),
121 )
122 }
123
124 pub fn blockhash_via_rpc(rpc_url: impl Into<String>) -> Result<Self, SubmitTransportError> {
130 let blockhash_provider = Arc::new(RpcRecentBlockhashProvider::new(rpc_url.into())?);
131 Ok(Self::blockhash_only(blockhash_provider.clone())
132 .with_rpc_blockhash_provider(blockhash_provider))
133 }
134
135 pub fn rpc_only(rpc_url: impl Into<String>) -> Result<Self, SubmitTransportError> {
142 let rpc_url = rpc_url.into();
143 let client = Self::blockhash_via_rpc(rpc_url.clone())?;
144 let rpc_transport = Arc::new(JsonRpcTransport::new(rpc_url)?);
145 Ok(client.with_rpc_transport(rpc_transport))
146 }
147
148 #[must_use]
150 pub fn with_backups(mut self, backups: Vec<LeaderTarget>) -> Self {
151 self.backups = backups;
152 self
153 }
154
155 #[must_use]
157 pub fn with_routing_policy(mut self, policy: RoutingPolicy) -> Self {
158 self.policy = policy.normalized();
159 self
160 }
161
162 #[must_use]
164 pub fn with_dedupe_ttl(mut self, ttl: Duration) -> Self {
165 self.deduper = SignatureDeduper::new(ttl);
166 self
167 }
168
169 #[must_use]
171 pub fn with_rpc_transport(mut self, transport: Arc<dyn RpcSubmitTransport>) -> Self {
172 self.rpc_transport = Some(transport);
173 self
174 }
175
176 #[must_use]
178 pub fn with_direct_transport(mut self, transport: Arc<dyn DirectSubmitTransport>) -> Self {
179 self.direct_transport = Some(transport);
180 self
181 }
182
183 #[must_use]
185 pub fn with_jito_transport(mut self, transport: Arc<dyn JitoSubmitTransport>) -> Self {
186 self.jito_transport = Some(transport);
187 self
188 }
189
190 #[must_use]
192 pub fn with_rpc_config(mut self, config: RpcSubmitConfig) -> Self {
193 self.rpc_config = config;
194 self
195 }
196
197 #[must_use]
199 pub const fn with_jito_config(mut self, config: JitoSubmitConfig) -> Self {
200 self.jito_config = config;
201 self
202 }
203
204 #[must_use]
206 pub const fn with_direct_config(mut self, config: DirectSubmitConfig) -> Self {
207 self.direct_config = config.normalized();
208 self
209 }
210
211 #[must_use]
213 pub const fn with_reliability(mut self, reliability: SubmitReliability) -> Self {
214 self.direct_config = DirectSubmitConfig::from_reliability(reliability);
215 self
216 }
217
218 #[must_use]
220 pub fn with_flow_safety_source(mut self, source: Arc<dyn TxFlowSafetySource>) -> Self {
221 self.flow_safety_source = Some(source);
222 self
223 }
224
225 #[must_use]
227 pub const fn with_guard_policy(mut self, policy: TxSubmitGuardPolicy) -> Self {
228 self.guard_policy = policy;
229 self
230 }
231
232 #[must_use]
234 pub fn with_outcome_reporter(mut self, reporter: Arc<dyn TxSubmitOutcomeReporter>) -> Self {
235 self.outcome_reporter = Some(OutcomeReporterHandle::new(reporter));
236 self
237 }
238
239 #[must_use]
241 pub fn with_rpc_blockhash_provider(
242 mut self,
243 provider: Arc<RpcRecentBlockhashProvider>,
244 ) -> Self {
245 self.on_demand_blockhash_provider = Some(provider);
246 self
247 }
248
249 #[must_use]
251 pub fn toxic_flow_telemetry(&self) -> TxToxicFlowTelemetrySnapshot {
252 self.telemetry.snapshot()
253 }
254
255 pub fn record_external_outcome(&self, outcome: &TxSubmitOutcome) {
257 record_external_outcome_shared(&self.telemetry, self.outcome_reporter.as_ref(), outcome);
258 }
259
260 pub async fn submit_signed(
266 &mut self,
267 signed_tx: SignedTx,
268 mode: SubmitMode,
269 ) -> Result<SubmitResult, SubmitError> {
270 self.submit_signed_with_context(signed_tx, mode, TxSubmitContext::default())
271 .await
272 }
273
274 pub async fn submit_signed_via(
280 &mut self,
281 signed_tx: SignedTx,
282 plan: SubmitPlan,
283 ) -> Result<SubmitResult, SubmitError> {
284 self.submit_signed_with_context_via(signed_tx, plan, TxSubmitContext::default())
285 .await
286 }
287
288 pub async fn submit_signed_with_context(
295 &mut self,
296 signed_tx: SignedTx,
297 mode: SubmitMode,
298 context: TxSubmitContext,
299 ) -> Result<SubmitResult, SubmitError> {
300 self.submit_signed_with_context_via(signed_tx, SubmitPlan::from(mode), context)
301 .await
302 }
303
304 pub async fn submit_signed_with_context_via(
312 &mut self,
313 signed_tx: SignedTx,
314 plan: SubmitPlan,
315 context: TxSubmitContext,
316 ) -> Result<SubmitResult, SubmitError> {
317 let tx_bytes = match signed_tx {
318 SignedTx::VersionedTransactionBytes(bytes) => bytes,
319 SignedTx::WireTransactionBytes(bytes) => bytes,
320 };
321 let signature = extract_first_signature(&tx_bytes)?;
322 self.submit_bytes(tx_bytes, signature, plan, context).await
323 }
324
325 pub async fn refresh_latest_blockhash_bytes(
334 &self,
335 ) -> Result<Option<[u8; 32]>, SubmitTransportError> {
336 if let Some(provider) = &self.on_demand_blockhash_provider {
337 let _ = provider.refresh().await?;
338 }
339 Ok(self.latest_blockhash_bytes())
340 }
341
342 #[must_use]
344 pub fn latest_blockhash_bytes(&self) -> Option<[u8; 32]> {
345 self.blockhash_provider.latest_blockhash()
346 }
347
348 async fn submit_bytes(
350 &mut self,
351 tx_bytes: Vec<u8>,
352 signature: Option<SignatureBytes>,
353 plan: SubmitPlan,
354 context: TxSubmitContext,
355 ) -> Result<SubmitResult, SubmitError> {
356 let plan = plan.into_normalized();
357 self.validate_submit_plan(&plan)?;
358 self.enforce_toxic_flow_guards(signature, &plan, &context)?;
359 self.enforce_dedupe(signature)?;
360 let tx_bytes = Arc::<[u8]>::from(tx_bytes);
361 match plan.strategy {
362 SubmitStrategy::OrderedFallback => {
363 self.submit_routes_in_order(tx_bytes, signature, plan).await
364 }
365 SubmitStrategy::AllAtOnce => {
366 self.submit_routes_all_at_once(tx_bytes, signature, plan)
367 .await
368 }
369 }
370 }
371
372 fn enforce_dedupe(&mut self, signature: Option<SignatureBytes>) -> Result<(), SubmitError> {
374 if let Some(signature) = signature {
375 let now = Instant::now();
376 if !self.deduper.check_and_insert(signature, now) {
377 return Err(SubmitError::DuplicateSignature);
378 }
379 }
380 Ok(())
381 }
382
383 fn enforce_toxic_flow_guards(
385 &mut self,
386 signature: Option<SignatureBytes>,
387 plan: &SubmitPlan,
388 context: &TxSubmitContext,
389 ) -> Result<(), SubmitError> {
390 let legacy_mode = plan.legacy_mode();
391 let now = SystemTime::now();
392 let opportunity_age_ms = context
393 .opportunity_created_at
394 .and_then(|created_at| now.duration_since(created_at).ok())
395 .map(duration_millis_u64);
396 if let Some(age_ms) = opportunity_age_ms
397 && let Some(max_age) = self.guard_policy.max_opportunity_age
398 {
399 let max_allowed_ms = duration_millis_u64(max_age);
400 if age_ms > max_allowed_ms {
401 return Err(self.reject_with_outcome(
402 TxToxicFlowRejectionReason::OpportunityStale {
403 age_ms,
404 max_allowed_ms,
405 },
406 TxSubmitOutcomeKind::RejectedDueToStaleness,
407 RejectionMetadata {
408 signature,
409 plan: plan.clone(),
410 legacy_mode,
411 state_version: None,
412 opportunity_age_ms,
413 },
414 ));
415 }
416 }
417
418 if self.suppression.is_suppressed(
419 &context.suppression_keys,
420 now,
421 self.guard_policy.suppression_ttl,
422 ) {
423 return Err(self.reject_with_outcome(
424 TxToxicFlowRejectionReason::Suppressed,
425 TxSubmitOutcomeKind::Suppressed,
426 RejectionMetadata {
427 signature,
428 plan: plan.clone(),
429 legacy_mode,
430 state_version: None,
431 opportunity_age_ms,
432 },
433 ));
434 }
435
436 if let Some(source) = &self.flow_safety_source {
437 let snapshot = source.toxic_flow_snapshot();
438 if self.guard_policy.reject_on_replay_recovery_pending
439 && snapshot.replay_recovery_pending
440 {
441 return Err(self.reject_with_outcome(
442 TxToxicFlowRejectionReason::ReplayRecoveryPending,
443 TxSubmitOutcomeKind::RejectedDueToReplayRecovery,
444 RejectionMetadata {
445 signature,
446 plan: plan.clone(),
447 legacy_mode,
448 state_version: snapshot.current_state_version,
449 opportunity_age_ms,
450 },
451 ));
452 }
453 if self.guard_policy.require_stable_control_plane
454 && !matches!(snapshot.quality, TxFlowSafetyQuality::Stable)
455 {
456 let outcome_kind = match snapshot.quality {
457 TxFlowSafetyQuality::ReorgRisk | TxFlowSafetyQuality::Provisional => {
458 TxSubmitOutcomeKind::RejectedDueToReorgRisk
459 }
460 TxFlowSafetyQuality::Stale => TxSubmitOutcomeKind::RejectedDueToStaleness,
461 TxFlowSafetyQuality::Degraded
462 | TxFlowSafetyQuality::IncompleteControlPlane
463 | TxFlowSafetyQuality::Stable => TxSubmitOutcomeKind::Suppressed,
464 };
465 return Err(self.reject_with_outcome(
466 TxToxicFlowRejectionReason::UnsafeControlPlane {
467 quality: snapshot.quality,
468 },
469 outcome_kind,
470 RejectionMetadata {
471 signature,
472 plan: plan.clone(),
473 legacy_mode,
474 state_version: snapshot.current_state_version,
475 opportunity_age_ms,
476 },
477 ));
478 }
479 if let (Some(decision_version), Some(current_version), Some(max_allowed)) = (
480 context.decision_state_version,
481 snapshot.current_state_version,
482 self.guard_policy.max_state_version_drift,
483 ) {
484 let drift = current_version.saturating_sub(decision_version);
485 if drift > max_allowed {
486 return Err(self.reject_with_outcome(
487 TxToxicFlowRejectionReason::StateDrift { drift, max_allowed },
488 TxSubmitOutcomeKind::RejectedDueToStateDrift,
489 RejectionMetadata {
490 signature,
491 plan: plan.clone(),
492 legacy_mode,
493 state_version: Some(current_version),
494 opportunity_age_ms,
495 },
496 ));
497 }
498 }
499 }
500
501 self.suppression.insert_all(&context.suppression_keys, now);
502 Ok(())
503 }
504
505 fn reject_with_outcome(
507 &self,
508 reason: TxToxicFlowRejectionReason,
509 outcome_kind: TxSubmitOutcomeKind,
510 metadata: RejectionMetadata,
511 ) -> SubmitError {
512 let outcome = TxSubmitOutcome {
513 kind: outcome_kind,
514 signature: metadata.signature,
515 route: None,
516 plan: metadata.plan,
517 legacy_mode: metadata.legacy_mode,
518 rpc_signature: None,
519 jito_signature: None,
520 jito_bundle_id: None,
521 state_version: metadata.state_version,
522 opportunity_age_ms: metadata.opportunity_age_ms,
523 };
524 self.record_external_outcome(&outcome);
525 SubmitError::ToxicFlow { reason }
526 }
527
528 fn validate_submit_plan(&self, plan: &SubmitPlan) -> Result<(), SubmitError> {
530 if plan.routes.is_empty() {
531 return Err(SubmitError::InternalSync {
532 message: "submit plan must contain at least one route".to_owned(),
533 });
534 }
535 for route in &plan.routes {
536 match route {
537 SubmitRoute::Rpc if self.rpc_transport.is_none() => {
538 return Err(SubmitError::MissingRpcTransport);
539 }
540 SubmitRoute::Jito if self.jito_transport.is_none() => {
541 return Err(SubmitError::MissingJitoTransport);
542 }
543 SubmitRoute::Direct if self.direct_transport.is_none() => {
544 return Err(SubmitError::MissingDirectTransport);
545 }
546 SubmitRoute::Rpc | SubmitRoute::Jito | SubmitRoute::Direct => {}
547 }
548 }
549 Ok(())
550 }
551
552 async fn submit_routes_in_order(
554 &self,
555 tx_bytes: Arc<[u8]>,
556 signature: Option<SignatureBytes>,
557 plan: SubmitPlan,
558 ) -> Result<SubmitResult, SubmitError> {
559 let legacy_mode = plan.legacy_mode();
560 let mut last_error = None;
561 let task_context = self.route_task_context();
562 for (route_idx, route) in plan.routes.iter().copied().enumerate() {
563 let next_idx = route_idx.saturating_add(1);
564 let has_later_routes = plan.routes.get(next_idx).is_some();
565 let direct_mode = if has_later_routes {
566 DirectExecutionMode::Fallback
567 } else {
568 DirectExecutionMode::Standalone
569 };
570 match submit_one_route_task(
571 route,
572 Arc::clone(&tx_bytes),
573 task_context.clone(),
574 direct_mode,
575 )
576 .await
577 {
578 Ok(outcome) => {
579 self.record_route_outcome(signature, &plan, &outcome);
580 if matches!(outcome.route, SubmitRoute::Direct) {
581 self.spawn_agave_rebroadcast(
582 Arc::clone(&tx_bytes),
583 &self.direct_config.clone().normalized(),
584 );
585 if has_later_routes
586 && self.direct_config.hybrid_rpc_broadcast
587 && plan
588 .routes
589 .iter()
590 .skip(next_idx)
591 .any(|next| *next == SubmitRoute::Rpc)
592 {
593 self.spawn_background_rpc_broadcast(
594 Arc::clone(&tx_bytes),
595 signature,
596 plan.clone(),
597 );
598 }
599 }
600 return Ok(SubmitResult {
601 signature,
602 plan,
603 legacy_mode,
604 first_success_route: Some(outcome.route),
605 successful_routes: vec![outcome.route],
606 direct_target: outcome.direct_target,
607 rpc_signature: outcome.rpc_signature,
608 jito_signature: outcome.jito_signature,
609 jito_bundle_id: outcome.jito_bundle_id,
610 used_fallback_route: route_idx > 0,
611 selected_target_count: outcome.selected_target_count,
612 selected_identity_count: outcome.selected_identity_count,
613 });
614 }
615 Err(error) => last_error = Some(error),
616 }
617 }
618 Err(last_error.unwrap_or_else(|| SubmitError::InternalSync {
619 message: "ordered submit plan completed without a route outcome".to_owned(),
620 }))
621 }
622
623 async fn submit_routes_all_at_once(
625 &self,
626 tx_bytes: Arc<[u8]>,
627 signature: Option<SignatureBytes>,
628 plan: SubmitPlan,
629 ) -> Result<SubmitResult, SubmitError> {
630 let legacy_mode = plan.legacy_mode();
631 let task_context = self.route_task_context();
632 let direct_transport = self.direct_transport.clone();
633 let leader_provider = self.leader_provider.clone();
634 let backups = self.backups.clone();
635 let policy = self.policy;
636 let direct_config = self.direct_config.clone().normalized();
637 let (result_tx, mut result_rx) = mpsc::unbounded_channel();
638 for (route_idx, route) in plan.routes.iter().copied().enumerate() {
639 let task_context = task_context.clone();
640 let tx_bytes = Arc::clone(&tx_bytes);
641 let result_tx = result_tx.clone();
642 let telemetry = Arc::clone(&self.telemetry);
643 let reporter = self.outcome_reporter.clone();
644 let flow_safety_source = self.flow_safety_source.clone();
645 let plan_for_task = plan.clone();
646 let direct_transport = direct_transport.clone();
647 let leader_provider = leader_provider.clone();
648 let backups = backups.clone();
649 let direct_config = direct_config.clone();
650 tokio::spawn(async move {
651 let result = submit_one_route_task(
652 route,
653 Arc::clone(&tx_bytes),
654 task_context,
655 DirectExecutionMode::Standalone,
656 )
657 .await;
658 if let Ok(outcome) = &result {
659 record_route_outcome_shared(
660 &telemetry,
661 reporter.as_ref(),
662 flow_safety_source.as_ref(),
663 signature,
664 &plan_for_task,
665 outcome,
666 );
667 if matches!(outcome.route, SubmitRoute::Direct)
668 && direct_config.agave_rebroadcast_enabled
669 && !direct_config.agave_rebroadcast_window.is_zero()
670 && let Some(direct_transport) = direct_transport
671 {
672 spawn_agave_rebroadcast_task(
673 Arc::clone(&tx_bytes),
674 direct_transport,
675 leader_provider,
676 backups,
677 policy,
678 direct_config.clone(),
679 );
680 }
681 }
682 drop(result_tx.send((route_idx, result)));
683 });
684 }
685 drop(result_tx);
686
687 let mut errors_by_route: Vec<Option<SubmitError>> = std::iter::repeat_with(|| None)
688 .take(plan.routes.len())
689 .collect();
690 while let Some((route_idx, result)) = result_rx.recv().await {
691 match result {
692 Ok(outcome) => {
693 return Ok(SubmitResult {
694 signature,
695 plan,
696 legacy_mode,
697 first_success_route: Some(outcome.route),
698 successful_routes: vec![outcome.route],
699 direct_target: outcome.direct_target,
700 rpc_signature: outcome.rpc_signature,
701 jito_signature: outcome.jito_signature,
702 jito_bundle_id: outcome.jito_bundle_id,
703 used_fallback_route: false,
704 selected_target_count: outcome.selected_target_count,
705 selected_identity_count: outcome.selected_identity_count,
706 });
707 }
708 Err(error) => {
709 if let Some(slot) = errors_by_route.get_mut(route_idx) {
710 *slot = Some(error);
711 }
712 }
713 }
714 }
715
716 Err(errors_by_route
717 .into_iter()
718 .flatten()
719 .next()
720 .unwrap_or_else(|| SubmitError::InternalSync {
721 message: "all-at-once submit plan completed without a route outcome".to_owned(),
722 }))
723 }
724
725 fn record_route_outcome(
727 &self,
728 signature: Option<SignatureBytes>,
729 plan: &SubmitPlan,
730 outcome: &RouteSubmitOutcome,
731 ) {
732 record_route_outcome_shared(
733 &self.telemetry,
734 self.outcome_reporter.as_ref(),
735 self.flow_safety_source.as_ref(),
736 signature,
737 plan,
738 outcome,
739 );
740 }
741
742 fn route_task_context(&self) -> RouteTaskContext {
744 RouteTaskContext {
745 rpc_transport: self.rpc_transport.clone(),
746 jito_transport: self.jito_transport.clone(),
747 direct_transport: self.direct_transport.clone(),
748 leader_provider: self.leader_provider.clone(),
749 backups: Arc::from(self.backups.clone()),
750 policy: self.policy,
751 rpc_config: self.rpc_config.clone(),
752 jito_config: self.jito_config.clone(),
753 direct_config: self.direct_config.clone().normalized(),
754 }
755 }
756
757 fn spawn_agave_rebroadcast(&self, tx_bytes: Arc<[u8]>, direct_config: &DirectSubmitConfig) {
759 if !direct_config.agave_rebroadcast_enabled
760 || direct_config.agave_rebroadcast_window.is_zero()
761 {
762 return;
763 }
764 let Some(direct_transport) = self.direct_transport.clone() else {
765 return;
766 };
767 spawn_agave_rebroadcast_task(
768 tx_bytes,
769 direct_transport,
770 self.leader_provider.clone(),
771 self.backups.clone(),
772 self.policy,
773 direct_config.clone(),
774 );
775 }
776
777 fn spawn_background_rpc_broadcast(
779 &self,
780 tx_bytes: Arc<[u8]>,
781 signature: Option<SignatureBytes>,
782 plan: SubmitPlan,
783 ) {
784 let Some(rpc) = self.rpc_transport.clone() else {
785 return;
786 };
787 let rpc_config = self.rpc_config.clone();
788 let telemetry = Arc::clone(&self.telemetry);
789 let reporter = self.outcome_reporter.clone();
790 let flow_safety_source = self.flow_safety_source.clone();
791 tokio::spawn(async move {
792 if let Ok(rpc_signature) = rpc.submit_rpc(tx_bytes.as_ref(), &rpc_config).await {
793 let outcome = RouteSubmitOutcome {
794 route: SubmitRoute::Rpc,
795 direct_target: None,
796 rpc_signature: Some(rpc_signature),
797 jito_signature: None,
798 jito_bundle_id: None,
799 selected_target_count: 0,
800 selected_identity_count: 0,
801 };
802 record_route_outcome_shared(
803 &telemetry,
804 reporter.as_ref(),
805 flow_safety_source.as_ref(),
806 signature,
807 &plan,
808 &outcome,
809 );
810 }
811 });
812 }
813}
814
815#[derive(Debug, Clone)]
817struct RejectionMetadata {
818 signature: Option<SignatureBytes>,
820 plan: SubmitPlan,
822 legacy_mode: Option<SubmitMode>,
824 state_version: Option<u64>,
826 opportunity_age_ms: Option<u64>,
828}
829
830#[derive(Clone)]
832struct RouteTaskContext {
833 rpc_transport: Option<Arc<dyn RpcSubmitTransport>>,
835 jito_transport: Option<Arc<dyn JitoSubmitTransport>>,
837 direct_transport: Option<Arc<dyn DirectSubmitTransport>>,
839 leader_provider: Arc<dyn LeaderProvider>,
841 backups: Arc<[LeaderTarget]>,
843 policy: RoutingPolicy,
845 rpc_config: RpcSubmitConfig,
847 jito_config: JitoSubmitConfig,
849 direct_config: DirectSubmitConfig,
851}
852
853#[derive(Debug, Clone, Copy, Eq, PartialEq)]
855enum DirectExecutionMode {
856 Standalone,
858 Fallback,
860}
861
862#[derive(Debug)]
864struct RouteSubmitOutcome {
865 route: SubmitRoute,
867 direct_target: Option<LeaderTarget>,
869 rpc_signature: Option<String>,
871 jito_signature: Option<String>,
873 jito_bundle_id: Option<String>,
875 selected_target_count: usize,
877 selected_identity_count: usize,
879}
880
881fn record_external_outcome_shared(
883 telemetry: &Arc<TxToxicFlowTelemetry>,
884 reporter: Option<&OutcomeReporterHandle>,
885 outcome: &TxSubmitOutcome,
886) {
887 telemetry.record(outcome);
888 if let Some(reporter) = reporter {
889 reporter.dispatch(telemetry, outcome.clone());
890 }
891}
892
893fn record_route_outcome_shared(
895 telemetry: &Arc<TxToxicFlowTelemetry>,
896 reporter: Option<&OutcomeReporterHandle>,
897 flow_safety_source: Option<&Arc<dyn TxFlowSafetySource>>,
898 signature: Option<SignatureBytes>,
899 plan: &SubmitPlan,
900 outcome: &RouteSubmitOutcome,
901) {
902 let kind = match outcome.route {
903 SubmitRoute::Rpc => TxSubmitOutcomeKind::RpcAccepted,
904 SubmitRoute::Jito => TxSubmitOutcomeKind::JitoAccepted,
905 SubmitRoute::Direct => TxSubmitOutcomeKind::DirectAccepted,
906 };
907 let outcome = TxSubmitOutcome {
908 kind,
909 signature,
910 route: Some(outcome.route),
911 plan: plan.clone(),
912 legacy_mode: plan.legacy_mode(),
913 rpc_signature: outcome.rpc_signature.clone(),
914 jito_signature: outcome.jito_signature.clone(),
915 jito_bundle_id: outcome.jito_bundle_id.clone(),
916 state_version: flow_safety_source
917 .and_then(|source| source.toxic_flow_snapshot().current_state_version),
918 opportunity_age_ms: None,
919 };
920 record_external_outcome_shared(telemetry, reporter, &outcome);
921}
922
923#[derive(Clone)]
925enum OutcomeReporterHandle {
926 Ready(Arc<OutcomeReporterDispatcher>),
928 Unavailable,
930}
931
932impl OutcomeReporterHandle {
933 fn new(reporter: Arc<dyn TxSubmitOutcomeReporter>) -> Self {
935 match OutcomeReporterDispatcher::shared(reporter) {
936 Ok(dispatcher) => Self::Ready(dispatcher),
937 Err(error) => {
938 eprintln!("sof-tx: failed to start external outcome reporter worker: {error}");
939 Self::Unavailable
940 }
941 }
942 }
943
944 fn dispatch(&self, telemetry: &Arc<TxToxicFlowTelemetry>, outcome: TxSubmitOutcome) {
946 match self {
947 Self::Ready(dispatcher) => match dispatcher.dispatch(outcome) {
948 ReporterDispatchStatus::Enqueued => {}
949 ReporterDispatchStatus::DroppedFull => telemetry.record_reporter_drop(),
950 ReporterDispatchStatus::Unavailable => telemetry.record_reporter_unavailable(),
951 },
952 Self::Unavailable => telemetry.record_reporter_unavailable(),
953 }
954 }
955}
956
957struct OutcomeReporterDispatcher {
959 tx: SyncSender<TxSubmitOutcome>,
961 queue_full_warned: AtomicBool,
963 unavailable_warned: AtomicBool,
965}
966
967impl OutcomeReporterDispatcher {
968 #[cfg(not(test))]
970 const QUEUE_CAPACITY: usize = 1024;
971 #[cfg(test)]
972 const QUEUE_CAPACITY: usize = 8;
973
974 fn shared(reporter: Arc<dyn TxSubmitOutcomeReporter>) -> Result<Arc<Self>, std::io::Error> {
976 let key = reporter_identity(&reporter);
977 let registry = outcome_reporter_registry();
978 let mut registry = registry
979 .lock()
980 .unwrap_or_else(|poisoned| poisoned.into_inner());
981 registry.retain(|_key, dispatcher| dispatcher.strong_count() > 0);
982 if let Some(existing) = registry.get(&key).and_then(Weak::upgrade) {
983 return Ok(existing);
984 }
985
986 let (tx, rx) = std_mpsc::sync_channel::<TxSubmitOutcome>(Self::QUEUE_CAPACITY);
987 thread::Builder::new()
988 .name("sof-tx-outcome-reporter".to_owned())
989 .spawn(move || {
990 while let Ok(outcome) = rx.recv() {
991 drop(catch_unwind(AssertUnwindSafe(|| {
992 reporter.record_outcome(&outcome);
993 })));
994 }
995 })?;
996
997 let dispatcher = Arc::new(Self {
998 tx,
999 queue_full_warned: AtomicBool::new(false),
1000 unavailable_warned: AtomicBool::new(false),
1001 });
1002 let _ = registry.insert(key, Arc::downgrade(&dispatcher));
1003 Ok(dispatcher)
1004 }
1005
1006 fn dispatch(&self, outcome: TxSubmitOutcome) -> ReporterDispatchStatus {
1008 match self.tx.try_send(outcome) {
1009 Ok(()) => {
1010 self.queue_full_warned.store(false, Ordering::Relaxed);
1011 ReporterDispatchStatus::Enqueued
1012 }
1013 Err(TrySendError::Disconnected(_)) => {
1014 if !self.unavailable_warned.swap(true, Ordering::Relaxed) {
1015 eprintln!(
1016 "sof-tx: external outcome reporter worker stopped; dropping reporter outcomes"
1017 );
1018 }
1019 ReporterDispatchStatus::Unavailable
1020 }
1021 Err(TrySendError::Full(_)) => {
1022 if !self.queue_full_warned.swap(true, Ordering::Relaxed) {
1023 eprintln!(
1024 "sof-tx: external outcome reporter queue is full; dropping reporter outcomes until it drains"
1025 );
1026 }
1027 ReporterDispatchStatus::DroppedFull
1028 }
1029 }
1030 }
1031}
1032
1033enum ReporterDispatchStatus {
1035 Enqueued,
1037 DroppedFull,
1039 Unavailable,
1041}
1042
1043static OUTCOME_REPORTER_REGISTRY: OnceLock<Mutex<HashMap<usize, Weak<OutcomeReporterDispatcher>>>> =
1045 OnceLock::new();
1046
1047fn outcome_reporter_registry() -> &'static Mutex<HashMap<usize, Weak<OutcomeReporterDispatcher>>> {
1049 OUTCOME_REPORTER_REGISTRY.get_or_init(|| Mutex::new(HashMap::new()))
1050}
1051
1052fn reporter_identity(reporter: &Arc<dyn TxSubmitOutcomeReporter>) -> usize {
1054 Arc::as_ptr(reporter) as *const () as usize
1055}
1056
1057fn extract_first_signature(tx_bytes: &[u8]) -> Result<Option<SignatureBytes>, SubmitError> {
1059 let Some((signature_count, offset)) = decode_short_u16_len_prefix(tx_bytes) else {
1060 return Err(decode_signed_bytes_error(
1061 "transaction bytes did not contain a valid signature vector prefix",
1062 ));
1063 };
1064 if signature_count == 0 {
1065 return Ok(None);
1066 }
1067 let signature_end = offset.saturating_add(64);
1068 let Some(signature_bytes) = tx_bytes.get(offset..signature_end) else {
1069 return Err(decode_signed_bytes_error(
1070 "transaction bytes ended before the first signature completed",
1071 ));
1072 };
1073 let mut signature = [0_u8; 64];
1074 signature.copy_from_slice(signature_bytes);
1075 Ok(Some(SignatureBytes::new(signature)))
1076}
1077
1078fn decode_signed_bytes_error(message: &'static str) -> SubmitError {
1080 SubmitError::DecodeSignedBytes {
1081 source: Box::new(bincode::ErrorKind::Custom(message.to_owned())),
1082 }
1083}
1084
1085async fn submit_one_route_task(
1087 route: SubmitRoute,
1088 tx_bytes: Arc<[u8]>,
1089 task_context: RouteTaskContext,
1090 direct_mode: DirectExecutionMode,
1091) -> Result<RouteSubmitOutcome, SubmitError> {
1092 match route {
1093 SubmitRoute::Rpc => {
1094 let rpc = task_context
1095 .rpc_transport
1096 .ok_or(SubmitError::MissingRpcTransport)?;
1097 let rpc_signature = rpc
1098 .submit_rpc(tx_bytes.as_ref(), &task_context.rpc_config)
1099 .await
1100 .map_err(|source| SubmitError::Rpc { source })?;
1101 Ok(RouteSubmitOutcome {
1102 route,
1103 direct_target: None,
1104 rpc_signature: Some(rpc_signature),
1105 jito_signature: None,
1106 jito_bundle_id: None,
1107 selected_target_count: 0,
1108 selected_identity_count: 0,
1109 })
1110 }
1111 SubmitRoute::Jito => {
1112 let jito = task_context
1113 .jito_transport
1114 .ok_or(SubmitError::MissingJitoTransport)?;
1115 let response = jito
1116 .submit_jito(tx_bytes.as_ref(), &task_context.jito_config)
1117 .await
1118 .map_err(|source| SubmitError::Jito { source })?;
1119 Ok(RouteSubmitOutcome {
1120 route,
1121 direct_target: None,
1122 rpc_signature: None,
1123 jito_signature: response.transaction_signature,
1124 jito_bundle_id: response.bundle_id,
1125 selected_target_count: 0,
1126 selected_identity_count: 0,
1127 })
1128 }
1129 SubmitRoute::Direct => {
1130 let direct = task_context
1131 .direct_transport
1132 .ok_or(SubmitError::MissingDirectTransport)?;
1133 let attempt_timeout = direct_attempt_timeout(&task_context.direct_config);
1134 let attempt_count = match direct_mode {
1135 DirectExecutionMode::Standalone => {
1136 task_context.direct_config.direct_submit_attempts
1137 }
1138 DirectExecutionMode::Fallback => task_context.direct_config.hybrid_direct_attempts,
1139 };
1140 let mut last_error = None;
1141 for attempt_idx in 0..attempt_count {
1142 let mut targets = select_and_rank_targets(
1143 task_context.leader_provider.as_ref(),
1144 task_context.backups.as_ref(),
1145 task_context.policy,
1146 &task_context.direct_config,
1147 )
1148 .await;
1149 rotate_targets_for_attempt(&mut targets, attempt_idx, task_context.policy);
1150 let (selected_target_count, selected_identity_count) = summarize_targets(&targets);
1151 if targets.is_empty() {
1152 if matches!(direct_mode, DirectExecutionMode::Fallback) {
1153 break;
1154 }
1155 return Err(SubmitError::NoDirectTargets);
1156 }
1157 match timeout(
1158 attempt_timeout,
1159 direct.submit_direct(
1160 tx_bytes.as_ref(),
1161 &targets,
1162 task_context.policy,
1163 &task_context.direct_config,
1164 ),
1165 )
1166 .await
1167 {
1168 Ok(Ok(target)) => {
1169 return Ok(RouteSubmitOutcome {
1170 route,
1171 direct_target: Some(target),
1172 rpc_signature: None,
1173 jito_signature: None,
1174 jito_bundle_id: None,
1175 selected_target_count,
1176 selected_identity_count,
1177 });
1178 }
1179 Ok(Err(source)) => last_error = Some(source),
1180 Err(_elapsed) => {
1181 last_error = Some(SubmitTransportError::Failure {
1182 message: format!(
1183 "direct submit attempt timed out after {}ms",
1184 attempt_timeout.as_millis()
1185 ),
1186 });
1187 }
1188 }
1189 if attempt_idx < attempt_count.saturating_sub(1) {
1190 sleep(task_context.direct_config.rebroadcast_interval).await;
1191 }
1192 }
1193 Err(SubmitError::Direct {
1194 source: last_error.unwrap_or_else(|| SubmitTransportError::Failure {
1195 message: "direct submit attempts exhausted".to_owned(),
1196 }),
1197 })
1198 }
1199 }
1200}
1201
1202#[cfg(not(test))]
1203fn spawn_agave_rebroadcast_task(
1205 tx_bytes: Arc<[u8]>,
1206 direct_transport: Arc<dyn DirectSubmitTransport>,
1207 leader_provider: Arc<dyn LeaderProvider>,
1208 backups: Vec<LeaderTarget>,
1209 policy: RoutingPolicy,
1210 direct_config: DirectSubmitConfig,
1211) {
1212 tokio::spawn(async move {
1213 let deadline = Instant::now()
1214 .checked_add(direct_config.agave_rebroadcast_window)
1215 .unwrap_or_else(Instant::now);
1216 loop {
1217 let now = Instant::now();
1218 if now >= deadline {
1219 break;
1220 }
1221
1222 let sleep_for = deadline
1223 .saturating_duration_since(now)
1224 .min(direct_config.agave_rebroadcast_interval);
1225 if !sleep_for.is_zero() {
1226 sleep(sleep_for).await;
1227 }
1228
1229 if Instant::now() >= deadline {
1230 break;
1231 }
1232
1233 let targets = select_and_rank_targets(
1234 leader_provider.as_ref(),
1235 backups.as_slice(),
1236 policy,
1237 &direct_config,
1238 )
1239 .await;
1240 if targets.is_empty() {
1241 continue;
1242 }
1243
1244 drop(
1245 timeout(
1246 direct_attempt_timeout(&direct_config),
1247 direct_transport.submit_direct(
1248 tx_bytes.as_ref(),
1249 &targets,
1250 policy,
1251 &direct_config,
1252 ),
1253 )
1254 .await,
1255 );
1256 }
1257 });
1258}
1259
1260#[cfg(test)]
1261fn spawn_agave_rebroadcast_task(
1263 _tx_bytes: Arc<[u8]>,
1264 _direct_transport: Arc<dyn DirectSubmitTransport>,
1265 _leader_provider: Arc<dyn LeaderProvider>,
1266 _backups: Vec<LeaderTarget>,
1267 _policy: RoutingPolicy,
1268 _direct_config: DirectSubmitConfig,
1269) {
1270}
1271
1272async fn select_and_rank_targets(
1274 leader_provider: &(impl LeaderProvider + ?Sized),
1275 backups: &[LeaderTarget],
1276 policy: RoutingPolicy,
1277 direct_config: &DirectSubmitConfig,
1278) -> Vec<LeaderTarget> {
1279 let targets = select_targets(leader_provider, backups, policy);
1280 rank_targets_by_latency(targets, direct_config).await
1281}
1282
1283async fn rank_targets_by_latency(
1285 targets: Vec<LeaderTarget>,
1286 direct_config: &DirectSubmitConfig,
1287) -> Vec<LeaderTarget> {
1288 if targets.len() <= 1 || !direct_config.latency_aware_targeting {
1289 return targets;
1290 }
1291
1292 let probe_timeout = direct_config.latency_probe_timeout;
1293 let probe_port = direct_config.latency_probe_port;
1294 let probe_count = targets
1295 .len()
1296 .min(direct_config.latency_probe_max_targets.max(1));
1297 let mut latencies = vec![None; probe_count];
1298 let mut probes = JoinSet::new();
1299 for (idx, target) in targets.iter().take(probe_count).cloned().enumerate() {
1300 probes.spawn(async move {
1301 (
1302 idx,
1303 probe_target_latency(&target, probe_port, probe_timeout).await,
1304 )
1305 });
1306 }
1307 while let Some(result) = probes.join_next().await {
1308 if let Ok((idx, latency)) = result
1309 && idx < latencies.len()
1310 && let Some(slot) = latencies.get_mut(idx)
1311 {
1312 *slot = latency;
1313 }
1314 }
1315
1316 let mut ranked = targets
1317 .iter()
1318 .take(probe_count)
1319 .cloned()
1320 .enumerate()
1321 .collect::<Vec<_>>();
1322 ranked.sort_by_key(|(idx, _target)| {
1323 (
1324 latencies.get(*idx).copied().flatten().unwrap_or(u128::MAX),
1325 *idx,
1326 )
1327 });
1328
1329 let mut output = ranked
1330 .into_iter()
1331 .map(|(_idx, target)| target)
1332 .collect::<Vec<_>>();
1333 output.extend(targets.iter().skip(probe_count).cloned());
1334 output
1335}
1336
1337async fn probe_target_latency(
1339 target: &LeaderTarget,
1340 probe_port: Option<u16>,
1341 probe_timeout: Duration,
1342) -> Option<u128> {
1343 let mut ports = vec![target.tpu_addr.port()];
1344 if let Some(port) = probe_port
1345 && port != target.tpu_addr.port()
1346 {
1347 ports.push(port);
1348 }
1349
1350 let ip = target.tpu_addr.ip();
1351 let mut best = None::<u128>;
1352 for port in ports {
1353 if let Some(latency) = probe_tcp_latency(ip, port, probe_timeout).await {
1354 best = Some(best.map_or(latency, |current| current.min(latency)));
1355 }
1356 }
1357 best
1358}
1359
1360async fn probe_tcp_latency(
1362 ip: std::net::IpAddr,
1363 port: u16,
1364 timeout_duration: Duration,
1365) -> Option<u128> {
1366 let start = Instant::now();
1367 let addr = SocketAddr::new(ip, port);
1368 let stream = timeout(timeout_duration, TcpStream::connect(addr))
1369 .await
1370 .ok()?
1371 .ok()?;
1372 drop(stream);
1373 Some(start.elapsed().as_millis())
1374}
1375
1376fn summarize_targets(targets: &[LeaderTarget]) -> (usize, usize) {
1378 let selected_target_count = targets.len();
1379 let selected_identity_count = targets
1380 .iter()
1381 .filter_map(|target| target.identity)
1382 .collect::<HashSet<_>>()
1383 .len();
1384 (selected_target_count, selected_identity_count)
1385}
1386
1387fn rotate_targets_for_attempt(
1389 targets: &mut [LeaderTarget],
1390 attempt_idx: usize,
1391 policy: RoutingPolicy,
1392) {
1393 if attempt_idx == 0 || targets.len() <= 1 {
1394 return;
1395 }
1396
1397 let normalized = policy.normalized();
1398 let stride = normalized.max_parallel_sends.max(1);
1399 let rotation = attempt_idx
1400 .saturating_mul(stride)
1401 .checked_rem(targets.len())
1402 .unwrap_or(0);
1403 if rotation > 0 {
1404 targets.rotate_left(rotation);
1405 }
1406}
1407
1408fn direct_attempt_timeout(direct_config: &DirectSubmitConfig) -> Duration {
1410 direct_config
1411 .global_timeout
1412 .saturating_add(direct_config.per_target_timeout)
1413 .saturating_add(direct_config.rebroadcast_interval)
1414 .max(Duration::from_secs(8))
1415}
1416
1417#[cfg(test)]
1418#[allow(clippy::panic)]
1419mod tests {
1420 use super::*;
1421
1422 #[derive(Debug)]
1423 struct NoopOutcomeReporter;
1424
1425 impl TxSubmitOutcomeReporter for NoopOutcomeReporter {
1426 fn record_outcome(&self, _outcome: &TxSubmitOutcome) {}
1427 }
1428
1429 #[test]
1430 fn reporter_dispatcher_reuses_existing_instance() {
1431 let reporter: Arc<dyn TxSubmitOutcomeReporter> = Arc::new(NoopOutcomeReporter);
1432 let first = match OutcomeReporterDispatcher::shared(Arc::clone(&reporter)) {
1433 Ok(dispatcher) => dispatcher,
1434 Err(error) => panic!("first dispatcher failed: {error}"),
1435 };
1436 let second = match OutcomeReporterDispatcher::shared(reporter) {
1437 Ok(dispatcher) => dispatcher,
1438 Err(error) => panic!("second dispatcher failed: {error}"),
1439 };
1440
1441 assert!(Arc::ptr_eq(&first, &second));
1442 }
1443}