1use std::{
4 collections::HashMap,
5 collections::HashSet,
6 net::SocketAddr,
7 panic::{AssertUnwindSafe, catch_unwind},
8 sync::{
9 Arc, Mutex, OnceLock, Weak,
10 atomic::{AtomicBool, Ordering},
11 mpsc::{self as std_mpsc, SyncSender, TrySendError},
12 },
13 thread,
14 time::{Duration, Instant, SystemTime},
15};
16
17use sof_types::SignatureBytes;
18use tokio::{
19 net::TcpStream,
20 sync::mpsc,
21 task::JoinSet,
22 time::{sleep, timeout},
23};
24
25use super::{
26 DirectSubmitConfig, DirectSubmitTransport, JitoSubmitConfig, JitoSubmitTransport,
27 RpcSubmitConfig, RpcSubmitTransport, SignedTx, SubmitError, SubmitMode, SubmitPlan,
28 SubmitReliability, SubmitResult, SubmitRoute, SubmitStrategy, SubmitTransportError,
29 TxFlowSafetyQuality, TxFlowSafetySource, TxSubmitClientBuilder, TxSubmitContext,
30 TxSubmitGuardPolicy, TxSubmitOutcome, TxSubmitOutcomeKind, TxSubmitOutcomeReporter,
31 TxToxicFlowRejectionReason, TxToxicFlowTelemetry, TxToxicFlowTelemetrySnapshot,
32};
33use crate::{
34 providers::{
35 LeaderProvider, LeaderTarget, RecentBlockhashProvider, RpcRecentBlockhashProvider,
36 StaticLeaderProvider,
37 },
38 routing::{RoutingPolicy, SignatureDeduper, select_targets},
39 submit::{JsonRpcTransport, types::TxSuppressionCache},
40};
41
42pub struct TxSubmitClient {
44 blockhash_provider: Arc<dyn RecentBlockhashProvider>,
46 on_demand_blockhash_provider: Option<Arc<RpcRecentBlockhashProvider>>,
48 leader_provider: Arc<dyn LeaderProvider>,
50 backups: Vec<LeaderTarget>,
52 policy: RoutingPolicy,
54 deduper: SignatureDeduper,
56 rpc_transport: Option<Arc<dyn RpcSubmitTransport>>,
58 direct_transport: Option<Arc<dyn DirectSubmitTransport>>,
60 jito_transport: Option<Arc<dyn JitoSubmitTransport>>,
62 rpc_config: RpcSubmitConfig,
64 jito_config: JitoSubmitConfig,
66 direct_config: DirectSubmitConfig,
68 flow_safety_source: Option<Arc<dyn TxFlowSafetySource>>,
70 guard_policy: TxSubmitGuardPolicy,
72 suppression: TxSuppressionCache,
74 telemetry: Arc<TxToxicFlowTelemetry>,
76 outcome_reporter: Option<OutcomeReporterHandle>,
78}
79
80impl TxSubmitClient {
81 #[must_use]
83 pub fn builder() -> TxSubmitClientBuilder {
84 TxSubmitClientBuilder::new()
85 }
86
87 #[must_use]
89 pub fn new(
90 blockhash_provider: Arc<dyn RecentBlockhashProvider>,
91 leader_provider: Arc<dyn LeaderProvider>,
92 ) -> Self {
93 Self {
94 blockhash_provider,
95 on_demand_blockhash_provider: None,
96 leader_provider,
97 backups: Vec::new(),
98 policy: RoutingPolicy::default(),
99 deduper: SignatureDeduper::new(Duration::from_secs(10)),
100 rpc_transport: None,
101 direct_transport: None,
102 jito_transport: None,
103 rpc_config: RpcSubmitConfig::default(),
104 jito_config: JitoSubmitConfig::default(),
105 direct_config: DirectSubmitConfig::default(),
106 flow_safety_source: None,
107 guard_policy: TxSubmitGuardPolicy::default(),
108 suppression: TxSuppressionCache::default(),
109 telemetry: TxToxicFlowTelemetry::shared(),
110 outcome_reporter: None,
111 }
112 }
113
114 #[must_use]
116 pub fn blockhash_only(blockhash_provider: Arc<dyn RecentBlockhashProvider>) -> Self {
117 Self::new(
118 blockhash_provider,
119 Arc::new(StaticLeaderProvider::default()),
120 )
121 }
122
123 pub fn blockhash_via_rpc(rpc_url: impl Into<String>) -> Result<Self, SubmitTransportError> {
129 let blockhash_provider = Arc::new(RpcRecentBlockhashProvider::new(rpc_url.into())?);
130 Ok(Self::blockhash_only(blockhash_provider.clone())
131 .with_rpc_blockhash_provider(blockhash_provider))
132 }
133
134 pub fn rpc_only(rpc_url: impl Into<String>) -> Result<Self, SubmitTransportError> {
141 let rpc_url = rpc_url.into();
142 let client = Self::blockhash_via_rpc(rpc_url.clone())?;
143 let rpc_transport = Arc::new(JsonRpcTransport::new(rpc_url)?);
144 Ok(client.with_rpc_transport(rpc_transport))
145 }
146
147 #[must_use]
149 pub fn with_backups(mut self, backups: Vec<LeaderTarget>) -> Self {
150 self.backups = backups;
151 self
152 }
153
154 #[must_use]
156 pub fn with_routing_policy(mut self, policy: RoutingPolicy) -> Self {
157 self.policy = policy.normalized();
158 self
159 }
160
161 #[must_use]
163 pub fn with_dedupe_ttl(mut self, ttl: Duration) -> Self {
164 self.deduper = SignatureDeduper::new(ttl);
165 self
166 }
167
168 #[must_use]
170 pub fn with_rpc_transport(mut self, transport: Arc<dyn RpcSubmitTransport>) -> Self {
171 self.rpc_transport = Some(transport);
172 self
173 }
174
175 #[must_use]
177 pub fn with_direct_transport(mut self, transport: Arc<dyn DirectSubmitTransport>) -> Self {
178 self.direct_transport = Some(transport);
179 self
180 }
181
182 #[must_use]
184 pub fn with_jito_transport(mut self, transport: Arc<dyn JitoSubmitTransport>) -> Self {
185 self.jito_transport = Some(transport);
186 self
187 }
188
189 #[must_use]
191 pub fn with_rpc_config(mut self, config: RpcSubmitConfig) -> Self {
192 self.rpc_config = config;
193 self
194 }
195
196 #[must_use]
198 pub const fn with_jito_config(mut self, config: JitoSubmitConfig) -> Self {
199 self.jito_config = config;
200 self
201 }
202
203 #[must_use]
205 pub const fn with_direct_config(mut self, config: DirectSubmitConfig) -> Self {
206 self.direct_config = config.normalized();
207 self
208 }
209
210 #[must_use]
212 pub const fn with_reliability(mut self, reliability: SubmitReliability) -> Self {
213 self.direct_config = DirectSubmitConfig::from_reliability(reliability);
214 self
215 }
216
217 #[must_use]
219 pub fn with_flow_safety_source(mut self, source: Arc<dyn TxFlowSafetySource>) -> Self {
220 self.flow_safety_source = Some(source);
221 self
222 }
223
224 #[must_use]
226 pub const fn with_guard_policy(mut self, policy: TxSubmitGuardPolicy) -> Self {
227 self.guard_policy = policy;
228 self
229 }
230
231 #[must_use]
233 pub fn with_outcome_reporter(mut self, reporter: Arc<dyn TxSubmitOutcomeReporter>) -> Self {
234 self.outcome_reporter = Some(OutcomeReporterHandle::new(reporter));
235 self
236 }
237
238 #[must_use]
240 pub fn with_rpc_blockhash_provider(
241 mut self,
242 provider: Arc<RpcRecentBlockhashProvider>,
243 ) -> Self {
244 self.on_demand_blockhash_provider = Some(provider);
245 self
246 }
247
248 #[must_use]
250 pub fn toxic_flow_telemetry(&self) -> TxToxicFlowTelemetrySnapshot {
251 self.telemetry.snapshot()
252 }
253
254 pub fn record_external_outcome(&self, outcome: &TxSubmitOutcome) {
256 record_external_outcome_shared(&self.telemetry, self.outcome_reporter.as_ref(), outcome);
257 }
258
259 pub async fn submit_signed(
265 &mut self,
266 signed_tx: SignedTx,
267 mode: SubmitMode,
268 ) -> Result<SubmitResult, SubmitError> {
269 self.submit_signed_with_context(signed_tx, mode, TxSubmitContext::default())
270 .await
271 }
272
273 pub async fn submit_signed_via(
279 &mut self,
280 signed_tx: SignedTx,
281 plan: SubmitPlan,
282 ) -> Result<SubmitResult, SubmitError> {
283 self.submit_signed_with_context_via(signed_tx, plan, TxSubmitContext::default())
284 .await
285 }
286
287 pub async fn submit_signed_with_context(
294 &mut self,
295 signed_tx: SignedTx,
296 mode: SubmitMode,
297 context: TxSubmitContext,
298 ) -> Result<SubmitResult, SubmitError> {
299 self.submit_signed_with_context_via(signed_tx, SubmitPlan::from(mode), context)
300 .await
301 }
302
303 pub async fn submit_signed_with_context_via(
311 &mut self,
312 signed_tx: SignedTx,
313 plan: SubmitPlan,
314 context: TxSubmitContext,
315 ) -> Result<SubmitResult, SubmitError> {
316 let tx_bytes = match signed_tx {
317 SignedTx::VersionedTransactionBytes(bytes) => bytes,
318 SignedTx::WireTransactionBytes(bytes) => bytes,
319 };
320 let signature = extract_first_signature(&tx_bytes)?;
321 self.submit_bytes(tx_bytes, signature, plan, context).await
322 }
323
324 pub async fn refresh_latest_blockhash_bytes(
333 &self,
334 ) -> Result<Option<[u8; 32]>, SubmitTransportError> {
335 if let Some(provider) = &self.on_demand_blockhash_provider {
336 let _ = provider.refresh().await?;
337 }
338 Ok(self.latest_blockhash_bytes())
339 }
340
341 #[must_use]
343 pub fn latest_blockhash_bytes(&self) -> Option<[u8; 32]> {
344 self.blockhash_provider.latest_blockhash()
345 }
346
347 async fn submit_bytes(
349 &mut self,
350 tx_bytes: Vec<u8>,
351 signature: Option<SignatureBytes>,
352 plan: SubmitPlan,
353 context: TxSubmitContext,
354 ) -> Result<SubmitResult, SubmitError> {
355 let plan = plan.into_normalized();
356 self.validate_submit_plan(&plan)?;
357 self.enforce_toxic_flow_guards(signature, &plan, &context)?;
358 self.enforce_dedupe(signature)?;
359 let tx_bytes = Arc::<[u8]>::from(tx_bytes);
360 match plan.strategy {
361 SubmitStrategy::OrderedFallback => {
362 self.submit_routes_in_order(tx_bytes, signature, plan).await
363 }
364 SubmitStrategy::AllAtOnce => {
365 self.submit_routes_all_at_once(tx_bytes, signature, plan)
366 .await
367 }
368 }
369 }
370
371 fn enforce_dedupe(&mut self, signature: Option<SignatureBytes>) -> Result<(), SubmitError> {
373 if let Some(signature) = signature {
374 let now = Instant::now();
375 if !self.deduper.check_and_insert(signature, now) {
376 return Err(SubmitError::DuplicateSignature);
377 }
378 }
379 Ok(())
380 }
381
382 fn enforce_toxic_flow_guards(
384 &mut self,
385 signature: Option<SignatureBytes>,
386 plan: &SubmitPlan,
387 context: &TxSubmitContext,
388 ) -> Result<(), SubmitError> {
389 let legacy_mode = plan.legacy_mode();
390 let now = SystemTime::now();
391 let opportunity_age_ms = context
392 .opportunity_created_at
393 .and_then(|created_at| now.duration_since(created_at).ok())
394 .map(|duration| duration.as_millis().min(u128::from(u64::MAX)) as u64);
395 if let Some(age_ms) = opportunity_age_ms
396 && let Some(max_age) = self.guard_policy.max_opportunity_age
397 {
398 let max_allowed_ms = max_age.as_millis().min(u128::from(u64::MAX)) as u64;
399 if age_ms > max_allowed_ms {
400 return Err(self.reject_with_outcome(
401 TxToxicFlowRejectionReason::OpportunityStale {
402 age_ms,
403 max_allowed_ms,
404 },
405 TxSubmitOutcomeKind::RejectedDueToStaleness,
406 RejectionMetadata {
407 signature,
408 plan: plan.clone(),
409 legacy_mode,
410 state_version: None,
411 opportunity_age_ms,
412 },
413 ));
414 }
415 }
416
417 if self.suppression.is_suppressed(
418 &context.suppression_keys,
419 now,
420 self.guard_policy.suppression_ttl,
421 ) {
422 return Err(self.reject_with_outcome(
423 TxToxicFlowRejectionReason::Suppressed,
424 TxSubmitOutcomeKind::Suppressed,
425 RejectionMetadata {
426 signature,
427 plan: plan.clone(),
428 legacy_mode,
429 state_version: None,
430 opportunity_age_ms,
431 },
432 ));
433 }
434
435 if let Some(source) = &self.flow_safety_source {
436 let snapshot = source.toxic_flow_snapshot();
437 if self.guard_policy.reject_on_replay_recovery_pending
438 && snapshot.replay_recovery_pending
439 {
440 return Err(self.reject_with_outcome(
441 TxToxicFlowRejectionReason::ReplayRecoveryPending,
442 TxSubmitOutcomeKind::RejectedDueToReplayRecovery,
443 RejectionMetadata {
444 signature,
445 plan: plan.clone(),
446 legacy_mode,
447 state_version: snapshot.current_state_version,
448 opportunity_age_ms,
449 },
450 ));
451 }
452 if self.guard_policy.require_stable_control_plane
453 && !matches!(snapshot.quality, TxFlowSafetyQuality::Stable)
454 {
455 let outcome_kind = match snapshot.quality {
456 TxFlowSafetyQuality::ReorgRisk | TxFlowSafetyQuality::Provisional => {
457 TxSubmitOutcomeKind::RejectedDueToReorgRisk
458 }
459 TxFlowSafetyQuality::Stale => TxSubmitOutcomeKind::RejectedDueToStaleness,
460 TxFlowSafetyQuality::Degraded
461 | TxFlowSafetyQuality::IncompleteControlPlane
462 | TxFlowSafetyQuality::Stable => TxSubmitOutcomeKind::Suppressed,
463 };
464 return Err(self.reject_with_outcome(
465 TxToxicFlowRejectionReason::UnsafeControlPlane {
466 quality: snapshot.quality,
467 },
468 outcome_kind,
469 RejectionMetadata {
470 signature,
471 plan: plan.clone(),
472 legacy_mode,
473 state_version: snapshot.current_state_version,
474 opportunity_age_ms,
475 },
476 ));
477 }
478 if let (Some(decision_version), Some(current_version), Some(max_allowed)) = (
479 context.decision_state_version,
480 snapshot.current_state_version,
481 self.guard_policy.max_state_version_drift,
482 ) {
483 let drift = current_version.saturating_sub(decision_version);
484 if drift > max_allowed {
485 return Err(self.reject_with_outcome(
486 TxToxicFlowRejectionReason::StateDrift { drift, max_allowed },
487 TxSubmitOutcomeKind::RejectedDueToStateDrift,
488 RejectionMetadata {
489 signature,
490 plan: plan.clone(),
491 legacy_mode,
492 state_version: Some(current_version),
493 opportunity_age_ms,
494 },
495 ));
496 }
497 }
498 }
499
500 self.suppression.insert_all(&context.suppression_keys, now);
501 Ok(())
502 }
503
504 fn reject_with_outcome(
506 &self,
507 reason: TxToxicFlowRejectionReason,
508 outcome_kind: TxSubmitOutcomeKind,
509 metadata: RejectionMetadata,
510 ) -> SubmitError {
511 let outcome = TxSubmitOutcome {
512 kind: outcome_kind,
513 signature: metadata.signature,
514 route: None,
515 plan: metadata.plan,
516 legacy_mode: metadata.legacy_mode,
517 rpc_signature: None,
518 jito_signature: None,
519 jito_bundle_id: None,
520 state_version: metadata.state_version,
521 opportunity_age_ms: metadata.opportunity_age_ms,
522 };
523 self.record_external_outcome(&outcome);
524 SubmitError::ToxicFlow { reason }
525 }
526
527 fn validate_submit_plan(&self, plan: &SubmitPlan) -> Result<(), SubmitError> {
529 if plan.routes.is_empty() {
530 return Err(SubmitError::InternalSync {
531 message: "submit plan must contain at least one route".to_owned(),
532 });
533 }
534 for route in &plan.routes {
535 match route {
536 SubmitRoute::Rpc if self.rpc_transport.is_none() => {
537 return Err(SubmitError::MissingRpcTransport);
538 }
539 SubmitRoute::Jito if self.jito_transport.is_none() => {
540 return Err(SubmitError::MissingJitoTransport);
541 }
542 SubmitRoute::Direct if self.direct_transport.is_none() => {
543 return Err(SubmitError::MissingDirectTransport);
544 }
545 SubmitRoute::Rpc | SubmitRoute::Jito | SubmitRoute::Direct => {}
546 }
547 }
548 Ok(())
549 }
550
551 async fn submit_routes_in_order(
553 &self,
554 tx_bytes: Arc<[u8]>,
555 signature: Option<SignatureBytes>,
556 plan: SubmitPlan,
557 ) -> Result<SubmitResult, SubmitError> {
558 let legacy_mode = plan.legacy_mode();
559 let mut last_error = None;
560 let task_context = self.route_task_context();
561 for (route_idx, route) in plan.routes.iter().copied().enumerate() {
562 let next_idx = route_idx.saturating_add(1);
563 let has_later_routes = plan.routes.get(next_idx).is_some();
564 let direct_mode = if has_later_routes {
565 DirectExecutionMode::Fallback
566 } else {
567 DirectExecutionMode::Standalone
568 };
569 match submit_one_route_task(
570 route,
571 Arc::clone(&tx_bytes),
572 task_context.clone(),
573 direct_mode,
574 )
575 .await
576 {
577 Ok(outcome) => {
578 self.record_route_outcome(signature, &plan, &outcome);
579 if matches!(outcome.route, SubmitRoute::Direct) {
580 self.spawn_agave_rebroadcast(
581 Arc::clone(&tx_bytes),
582 &self.direct_config.clone().normalized(),
583 );
584 if has_later_routes
585 && self.direct_config.hybrid_rpc_broadcast
586 && plan
587 .routes
588 .iter()
589 .skip(next_idx)
590 .any(|next| *next == SubmitRoute::Rpc)
591 {
592 self.spawn_background_rpc_broadcast(
593 Arc::clone(&tx_bytes),
594 signature,
595 plan.clone(),
596 );
597 }
598 }
599 return Ok(SubmitResult {
600 signature,
601 plan,
602 legacy_mode,
603 first_success_route: Some(outcome.route),
604 successful_routes: vec![outcome.route],
605 direct_target: outcome.direct_target,
606 rpc_signature: outcome.rpc_signature,
607 jito_signature: outcome.jito_signature,
608 jito_bundle_id: outcome.jito_bundle_id,
609 used_fallback_route: route_idx > 0,
610 selected_target_count: outcome.selected_target_count,
611 selected_identity_count: outcome.selected_identity_count,
612 });
613 }
614 Err(error) => last_error = Some(error),
615 }
616 }
617 Err(last_error.unwrap_or_else(|| SubmitError::InternalSync {
618 message: "ordered submit plan completed without a route outcome".to_owned(),
619 }))
620 }
621
622 async fn submit_routes_all_at_once(
624 &self,
625 tx_bytes: Arc<[u8]>,
626 signature: Option<SignatureBytes>,
627 plan: SubmitPlan,
628 ) -> Result<SubmitResult, SubmitError> {
629 let legacy_mode = plan.legacy_mode();
630 let (result_tx, mut result_rx) = mpsc::unbounded_channel();
631 for (route_idx, route) in plan.routes.iter().copied().enumerate() {
632 let task_context = self.route_task_context();
633 let tx_bytes = Arc::clone(&tx_bytes);
634 let result_tx = result_tx.clone();
635 let telemetry = Arc::clone(&self.telemetry);
636 let reporter = self.outcome_reporter.clone();
637 let flow_safety_source = self.flow_safety_source.clone();
638 let plan_for_task = plan.clone();
639 let direct_transport = self.direct_transport.clone();
640 let leader_provider = self.leader_provider.clone();
641 let backups = self.backups.clone();
642 let policy = self.policy;
643 let direct_config = self.direct_config.clone().normalized();
644 tokio::spawn(async move {
645 let result = submit_one_route_task(
646 route,
647 Arc::clone(&tx_bytes),
648 task_context,
649 DirectExecutionMode::Standalone,
650 )
651 .await;
652 if let Ok(outcome) = &result {
653 record_route_outcome_shared(
654 &telemetry,
655 reporter.as_ref(),
656 flow_safety_source.as_ref(),
657 signature,
658 &plan_for_task,
659 outcome,
660 );
661 if matches!(outcome.route, SubmitRoute::Direct)
662 && direct_config.agave_rebroadcast_enabled
663 && !direct_config.agave_rebroadcast_window.is_zero()
664 && let Some(direct_transport) = direct_transport
665 {
666 spawn_agave_rebroadcast_task(
667 Arc::clone(&tx_bytes),
668 direct_transport,
669 leader_provider,
670 backups,
671 policy,
672 direct_config.clone(),
673 );
674 }
675 }
676 drop(result_tx.send((route_idx, result)));
677 });
678 }
679 drop(result_tx);
680
681 let mut errors_by_route: Vec<Option<SubmitError>> = std::iter::repeat_with(|| None)
682 .take(plan.routes.len())
683 .collect();
684 while let Some((route_idx, result)) = result_rx.recv().await {
685 match result {
686 Ok(outcome) => {
687 return Ok(SubmitResult {
688 signature,
689 plan,
690 legacy_mode,
691 first_success_route: Some(outcome.route),
692 successful_routes: vec![outcome.route],
693 direct_target: outcome.direct_target,
694 rpc_signature: outcome.rpc_signature,
695 jito_signature: outcome.jito_signature,
696 jito_bundle_id: outcome.jito_bundle_id,
697 used_fallback_route: false,
698 selected_target_count: outcome.selected_target_count,
699 selected_identity_count: outcome.selected_identity_count,
700 });
701 }
702 Err(error) => {
703 if let Some(slot) = errors_by_route.get_mut(route_idx) {
704 *slot = Some(error);
705 }
706 }
707 }
708 }
709
710 Err(errors_by_route
711 .into_iter()
712 .flatten()
713 .next()
714 .unwrap_or_else(|| SubmitError::InternalSync {
715 message: "all-at-once submit plan completed without a route outcome".to_owned(),
716 }))
717 }
718
719 fn record_route_outcome(
721 &self,
722 signature: Option<SignatureBytes>,
723 plan: &SubmitPlan,
724 outcome: &RouteSubmitOutcome,
725 ) {
726 record_route_outcome_shared(
727 &self.telemetry,
728 self.outcome_reporter.as_ref(),
729 self.flow_safety_source.as_ref(),
730 signature,
731 plan,
732 outcome,
733 );
734 }
735
736 fn route_task_context(&self) -> RouteTaskContext {
738 RouteTaskContext {
739 rpc_transport: self.rpc_transport.clone(),
740 jito_transport: self.jito_transport.clone(),
741 direct_transport: self.direct_transport.clone(),
742 leader_provider: self.leader_provider.clone(),
743 backups: Arc::from(self.backups.clone()),
744 policy: self.policy,
745 rpc_config: self.rpc_config.clone(),
746 jito_config: self.jito_config.clone(),
747 direct_config: self.direct_config.clone().normalized(),
748 }
749 }
750
751 fn spawn_agave_rebroadcast(&self, tx_bytes: Arc<[u8]>, direct_config: &DirectSubmitConfig) {
753 if !direct_config.agave_rebroadcast_enabled
754 || direct_config.agave_rebroadcast_window.is_zero()
755 {
756 return;
757 }
758 let Some(direct_transport) = self.direct_transport.clone() else {
759 return;
760 };
761 spawn_agave_rebroadcast_task(
762 tx_bytes,
763 direct_transport,
764 self.leader_provider.clone(),
765 self.backups.clone(),
766 self.policy,
767 direct_config.clone(),
768 );
769 }
770
771 fn spawn_background_rpc_broadcast(
773 &self,
774 tx_bytes: Arc<[u8]>,
775 signature: Option<SignatureBytes>,
776 plan: SubmitPlan,
777 ) {
778 let Some(rpc) = self.rpc_transport.clone() else {
779 return;
780 };
781 let rpc_config = self.rpc_config.clone();
782 let telemetry = Arc::clone(&self.telemetry);
783 let reporter = self.outcome_reporter.clone();
784 let flow_safety_source = self.flow_safety_source.clone();
785 tokio::spawn(async move {
786 if let Ok(rpc_signature) = rpc.submit_rpc(tx_bytes.as_ref(), &rpc_config).await {
787 let outcome = RouteSubmitOutcome {
788 route: SubmitRoute::Rpc,
789 direct_target: None,
790 rpc_signature: Some(rpc_signature),
791 jito_signature: None,
792 jito_bundle_id: None,
793 selected_target_count: 0,
794 selected_identity_count: 0,
795 };
796 record_route_outcome_shared(
797 &telemetry,
798 reporter.as_ref(),
799 flow_safety_source.as_ref(),
800 signature,
801 &plan,
802 &outcome,
803 );
804 }
805 });
806 }
807}
808
809#[derive(Debug, Clone)]
811struct RejectionMetadata {
812 signature: Option<SignatureBytes>,
814 plan: SubmitPlan,
816 legacy_mode: Option<SubmitMode>,
818 state_version: Option<u64>,
820 opportunity_age_ms: Option<u64>,
822}
823
824#[derive(Clone)]
826struct RouteTaskContext {
827 rpc_transport: Option<Arc<dyn RpcSubmitTransport>>,
829 jito_transport: Option<Arc<dyn JitoSubmitTransport>>,
831 direct_transport: Option<Arc<dyn DirectSubmitTransport>>,
833 leader_provider: Arc<dyn LeaderProvider>,
835 backups: Arc<[LeaderTarget]>,
837 policy: RoutingPolicy,
839 rpc_config: RpcSubmitConfig,
841 jito_config: JitoSubmitConfig,
843 direct_config: DirectSubmitConfig,
845}
846
847#[derive(Debug, Clone, Copy, Eq, PartialEq)]
849enum DirectExecutionMode {
850 Standalone,
852 Fallback,
854}
855
856#[derive(Debug)]
858struct RouteSubmitOutcome {
859 route: SubmitRoute,
861 direct_target: Option<LeaderTarget>,
863 rpc_signature: Option<String>,
865 jito_signature: Option<String>,
867 jito_bundle_id: Option<String>,
869 selected_target_count: usize,
871 selected_identity_count: usize,
873}
874
875fn record_external_outcome_shared(
877 telemetry: &Arc<TxToxicFlowTelemetry>,
878 reporter: Option<&OutcomeReporterHandle>,
879 outcome: &TxSubmitOutcome,
880) {
881 telemetry.record(outcome);
882 if let Some(reporter) = reporter {
883 reporter.dispatch(telemetry, outcome.clone());
884 }
885}
886
887fn record_route_outcome_shared(
889 telemetry: &Arc<TxToxicFlowTelemetry>,
890 reporter: Option<&OutcomeReporterHandle>,
891 flow_safety_source: Option<&Arc<dyn TxFlowSafetySource>>,
892 signature: Option<SignatureBytes>,
893 plan: &SubmitPlan,
894 outcome: &RouteSubmitOutcome,
895) {
896 let kind = match outcome.route {
897 SubmitRoute::Rpc => TxSubmitOutcomeKind::RpcAccepted,
898 SubmitRoute::Jito => TxSubmitOutcomeKind::JitoAccepted,
899 SubmitRoute::Direct => TxSubmitOutcomeKind::DirectAccepted,
900 };
901 let outcome = TxSubmitOutcome {
902 kind,
903 signature,
904 route: Some(outcome.route),
905 plan: plan.clone(),
906 legacy_mode: plan.legacy_mode(),
907 rpc_signature: outcome.rpc_signature.clone(),
908 jito_signature: outcome.jito_signature.clone(),
909 jito_bundle_id: outcome.jito_bundle_id.clone(),
910 state_version: flow_safety_source
911 .and_then(|source| source.toxic_flow_snapshot().current_state_version),
912 opportunity_age_ms: None,
913 };
914 record_external_outcome_shared(telemetry, reporter, &outcome);
915}
916
917#[derive(Clone)]
919enum OutcomeReporterHandle {
920 Ready(Arc<OutcomeReporterDispatcher>),
922 Unavailable,
924}
925
926impl OutcomeReporterHandle {
927 fn new(reporter: Arc<dyn TxSubmitOutcomeReporter>) -> Self {
929 match OutcomeReporterDispatcher::shared(reporter) {
930 Ok(dispatcher) => Self::Ready(dispatcher),
931 Err(error) => {
932 eprintln!("sof-tx: failed to start external outcome reporter worker: {error}");
933 Self::Unavailable
934 }
935 }
936 }
937
938 fn dispatch(&self, telemetry: &Arc<TxToxicFlowTelemetry>, outcome: TxSubmitOutcome) {
940 match self {
941 Self::Ready(dispatcher) => match dispatcher.dispatch(outcome) {
942 ReporterDispatchStatus::Enqueued => {}
943 ReporterDispatchStatus::DroppedFull => telemetry.record_reporter_drop(),
944 ReporterDispatchStatus::Unavailable => telemetry.record_reporter_unavailable(),
945 },
946 Self::Unavailable => telemetry.record_reporter_unavailable(),
947 }
948 }
949}
950
951struct OutcomeReporterDispatcher {
953 tx: SyncSender<TxSubmitOutcome>,
955 queue_full_warned: AtomicBool,
957 unavailable_warned: AtomicBool,
959}
960
961impl OutcomeReporterDispatcher {
962 #[cfg(not(test))]
964 const QUEUE_CAPACITY: usize = 1024;
965 #[cfg(test)]
966 const QUEUE_CAPACITY: usize = 8;
967
968 fn shared(reporter: Arc<dyn TxSubmitOutcomeReporter>) -> Result<Arc<Self>, std::io::Error> {
970 let key = reporter_identity(&reporter);
971 let registry = outcome_reporter_registry();
972 {
973 let registry = registry
974 .lock()
975 .unwrap_or_else(|poisoned| poisoned.into_inner());
976 if let Some(existing) = registry.get(&key).and_then(Weak::upgrade) {
977 return Ok(existing);
978 }
979 }
980
981 let (tx, rx) = std_mpsc::sync_channel::<TxSubmitOutcome>(Self::QUEUE_CAPACITY);
982 thread::Builder::new()
983 .name("sof-tx-outcome-reporter".to_owned())
984 .spawn(move || {
985 while let Ok(outcome) = rx.recv() {
986 drop(catch_unwind(AssertUnwindSafe(|| {
987 reporter.record_outcome(&outcome);
988 })));
989 }
990 })?;
991
992 let dispatcher = Arc::new(Self {
993 tx,
994 queue_full_warned: AtomicBool::new(false),
995 unavailable_warned: AtomicBool::new(false),
996 });
997 let mut registry = registry
998 .lock()
999 .unwrap_or_else(|poisoned| poisoned.into_inner());
1000 let _ = registry.insert(key, Arc::downgrade(&dispatcher));
1001 Ok(dispatcher)
1002 }
1003
1004 fn dispatch(&self, outcome: TxSubmitOutcome) -> ReporterDispatchStatus {
1006 match self.tx.try_send(outcome) {
1007 Ok(()) => {
1008 self.queue_full_warned.store(false, Ordering::Relaxed);
1009 ReporterDispatchStatus::Enqueued
1010 }
1011 Err(TrySendError::Disconnected(_)) => {
1012 if !self.unavailable_warned.swap(true, Ordering::Relaxed) {
1013 eprintln!(
1014 "sof-tx: external outcome reporter worker stopped; dropping reporter outcomes"
1015 );
1016 }
1017 ReporterDispatchStatus::Unavailable
1018 }
1019 Err(TrySendError::Full(_)) => {
1020 if !self.queue_full_warned.swap(true, Ordering::Relaxed) {
1021 eprintln!(
1022 "sof-tx: external outcome reporter queue is full; dropping reporter outcomes until it drains"
1023 );
1024 }
1025 ReporterDispatchStatus::DroppedFull
1026 }
1027 }
1028 }
1029}
1030
1031enum ReporterDispatchStatus {
1033 Enqueued,
1035 DroppedFull,
1037 Unavailable,
1039}
1040
1041static OUTCOME_REPORTER_REGISTRY: OnceLock<Mutex<HashMap<usize, Weak<OutcomeReporterDispatcher>>>> =
1043 OnceLock::new();
1044
1045fn outcome_reporter_registry() -> &'static Mutex<HashMap<usize, Weak<OutcomeReporterDispatcher>>> {
1047 OUTCOME_REPORTER_REGISTRY.get_or_init(|| Mutex::new(HashMap::new()))
1048}
1049
1050fn reporter_identity(reporter: &Arc<dyn TxSubmitOutcomeReporter>) -> usize {
1052 Arc::as_ptr(reporter) as *const () as usize
1053}
1054
1055fn extract_first_signature(tx_bytes: &[u8]) -> Result<Option<SignatureBytes>, SubmitError> {
1057 let Some((signature_count, offset)) = decode_short_vec_len(tx_bytes) else {
1058 return Err(decode_signed_bytes_error(
1059 "transaction bytes did not contain a valid signature vector prefix",
1060 ));
1061 };
1062 if signature_count == 0 {
1063 return Ok(None);
1064 }
1065 let signature_end = offset.saturating_add(64);
1066 let Some(signature_bytes) = tx_bytes.get(offset..signature_end) else {
1067 return Err(decode_signed_bytes_error(
1068 "transaction bytes ended before the first signature completed",
1069 ));
1070 };
1071 let mut signature = [0_u8; 64];
1072 signature.copy_from_slice(signature_bytes);
1073 Ok(Some(SignatureBytes::new(signature)))
1074}
1075
1076fn decode_short_vec_len(bytes: &[u8]) -> Option<(usize, usize)> {
1078 let mut value = 0_usize;
1079 let mut shift = 0_u32;
1080 for (idx, byte) in bytes.iter().copied().take(3).enumerate() {
1081 value |= usize::from(byte & 0x7f) << shift;
1082 if byte & 0x80 == 0 {
1083 return Some((value, idx.saturating_add(1)));
1084 }
1085 shift = shift.saturating_add(7);
1086 }
1087 None
1088}
1089
1090fn decode_signed_bytes_error(message: &'static str) -> SubmitError {
1092 SubmitError::DecodeSignedBytes {
1093 source: Box::new(bincode::ErrorKind::Custom(message.to_owned())),
1094 }
1095}
1096
1097async fn submit_one_route_task(
1099 route: SubmitRoute,
1100 tx_bytes: Arc<[u8]>,
1101 task_context: RouteTaskContext,
1102 direct_mode: DirectExecutionMode,
1103) -> Result<RouteSubmitOutcome, SubmitError> {
1104 match route {
1105 SubmitRoute::Rpc => {
1106 let rpc = task_context
1107 .rpc_transport
1108 .ok_or(SubmitError::MissingRpcTransport)?;
1109 let rpc_signature = rpc
1110 .submit_rpc(tx_bytes.as_ref(), &task_context.rpc_config)
1111 .await
1112 .map_err(|source| SubmitError::Rpc { source })?;
1113 Ok(RouteSubmitOutcome {
1114 route,
1115 direct_target: None,
1116 rpc_signature: Some(rpc_signature),
1117 jito_signature: None,
1118 jito_bundle_id: None,
1119 selected_target_count: 0,
1120 selected_identity_count: 0,
1121 })
1122 }
1123 SubmitRoute::Jito => {
1124 let jito = task_context
1125 .jito_transport
1126 .ok_or(SubmitError::MissingJitoTransport)?;
1127 let response = jito
1128 .submit_jito(tx_bytes.as_ref(), &task_context.jito_config)
1129 .await
1130 .map_err(|source| SubmitError::Jito { source })?;
1131 Ok(RouteSubmitOutcome {
1132 route,
1133 direct_target: None,
1134 rpc_signature: None,
1135 jito_signature: response.transaction_signature,
1136 jito_bundle_id: response.bundle_id,
1137 selected_target_count: 0,
1138 selected_identity_count: 0,
1139 })
1140 }
1141 SubmitRoute::Direct => {
1142 let direct = task_context
1143 .direct_transport
1144 .ok_or(SubmitError::MissingDirectTransport)?;
1145 let attempt_timeout = direct_attempt_timeout(&task_context.direct_config);
1146 let attempt_count = match direct_mode {
1147 DirectExecutionMode::Standalone => {
1148 task_context.direct_config.direct_submit_attempts
1149 }
1150 DirectExecutionMode::Fallback => task_context.direct_config.hybrid_direct_attempts,
1151 };
1152 let mut last_error = None;
1153 for attempt_idx in 0..attempt_count {
1154 let mut targets = select_and_rank_targets(
1155 task_context.leader_provider.as_ref(),
1156 task_context.backups.as_ref(),
1157 task_context.policy,
1158 &task_context.direct_config,
1159 )
1160 .await;
1161 rotate_targets_for_attempt(&mut targets, attempt_idx, task_context.policy);
1162 let (selected_target_count, selected_identity_count) = summarize_targets(&targets);
1163 if targets.is_empty() {
1164 if matches!(direct_mode, DirectExecutionMode::Fallback) {
1165 break;
1166 }
1167 return Err(SubmitError::NoDirectTargets);
1168 }
1169 match timeout(
1170 attempt_timeout,
1171 direct.submit_direct(
1172 tx_bytes.as_ref(),
1173 &targets,
1174 task_context.policy,
1175 &task_context.direct_config,
1176 ),
1177 )
1178 .await
1179 {
1180 Ok(Ok(target)) => {
1181 return Ok(RouteSubmitOutcome {
1182 route,
1183 direct_target: Some(target),
1184 rpc_signature: None,
1185 jito_signature: None,
1186 jito_bundle_id: None,
1187 selected_target_count,
1188 selected_identity_count,
1189 });
1190 }
1191 Ok(Err(source)) => last_error = Some(source),
1192 Err(_elapsed) => {
1193 last_error = Some(SubmitTransportError::Failure {
1194 message: format!(
1195 "direct submit attempt timed out after {}ms",
1196 attempt_timeout.as_millis()
1197 ),
1198 });
1199 }
1200 }
1201 if attempt_idx < attempt_count.saturating_sub(1) {
1202 sleep(task_context.direct_config.rebroadcast_interval).await;
1203 }
1204 }
1205 Err(SubmitError::Direct {
1206 source: last_error.unwrap_or_else(|| SubmitTransportError::Failure {
1207 message: "direct submit attempts exhausted".to_owned(),
1208 }),
1209 })
1210 }
1211 }
1212}
1213
1214#[cfg(not(test))]
1215fn spawn_agave_rebroadcast_task(
1217 tx_bytes: Arc<[u8]>,
1218 direct_transport: Arc<dyn DirectSubmitTransport>,
1219 leader_provider: Arc<dyn LeaderProvider>,
1220 backups: Vec<LeaderTarget>,
1221 policy: RoutingPolicy,
1222 direct_config: DirectSubmitConfig,
1223) {
1224 tokio::spawn(async move {
1225 let deadline = Instant::now()
1226 .checked_add(direct_config.agave_rebroadcast_window)
1227 .unwrap_or_else(Instant::now);
1228 loop {
1229 let now = Instant::now();
1230 if now >= deadline {
1231 break;
1232 }
1233
1234 let sleep_for = deadline
1235 .saturating_duration_since(now)
1236 .min(direct_config.agave_rebroadcast_interval);
1237 if !sleep_for.is_zero() {
1238 sleep(sleep_for).await;
1239 }
1240
1241 if Instant::now() >= deadline {
1242 break;
1243 }
1244
1245 let targets = select_and_rank_targets(
1246 leader_provider.as_ref(),
1247 backups.as_slice(),
1248 policy,
1249 &direct_config,
1250 )
1251 .await;
1252 if targets.is_empty() {
1253 continue;
1254 }
1255
1256 drop(
1257 timeout(
1258 direct_attempt_timeout(&direct_config),
1259 direct_transport.submit_direct(
1260 tx_bytes.as_ref(),
1261 &targets,
1262 policy,
1263 &direct_config,
1264 ),
1265 )
1266 .await,
1267 );
1268 }
1269 });
1270}
1271
1272#[cfg(test)]
1273fn spawn_agave_rebroadcast_task(
1275 _tx_bytes: Arc<[u8]>,
1276 _direct_transport: Arc<dyn DirectSubmitTransport>,
1277 _leader_provider: Arc<dyn LeaderProvider>,
1278 _backups: Vec<LeaderTarget>,
1279 _policy: RoutingPolicy,
1280 _direct_config: DirectSubmitConfig,
1281) {
1282}
1283
1284async fn select_and_rank_targets(
1286 leader_provider: &(impl LeaderProvider + ?Sized),
1287 backups: &[LeaderTarget],
1288 policy: RoutingPolicy,
1289 direct_config: &DirectSubmitConfig,
1290) -> Vec<LeaderTarget> {
1291 let targets = select_targets(leader_provider, backups, policy);
1292 rank_targets_by_latency(targets, direct_config).await
1293}
1294
1295async fn rank_targets_by_latency(
1297 targets: Vec<LeaderTarget>,
1298 direct_config: &DirectSubmitConfig,
1299) -> Vec<LeaderTarget> {
1300 if targets.len() <= 1 || !direct_config.latency_aware_targeting {
1301 return targets;
1302 }
1303
1304 let probe_timeout = direct_config.latency_probe_timeout;
1305 let probe_port = direct_config.latency_probe_port;
1306 let probe_count = targets
1307 .len()
1308 .min(direct_config.latency_probe_max_targets.max(1));
1309 let mut latencies = vec![None; probe_count];
1310 let mut probes = JoinSet::new();
1311 for (idx, target) in targets.iter().take(probe_count).cloned().enumerate() {
1312 probes.spawn(async move {
1313 (
1314 idx,
1315 probe_target_latency(&target, probe_port, probe_timeout).await,
1316 )
1317 });
1318 }
1319 while let Some(result) = probes.join_next().await {
1320 if let Ok((idx, latency)) = result
1321 && idx < latencies.len()
1322 && let Some(slot) = latencies.get_mut(idx)
1323 {
1324 *slot = latency;
1325 }
1326 }
1327
1328 let mut ranked = targets
1329 .iter()
1330 .take(probe_count)
1331 .cloned()
1332 .enumerate()
1333 .collect::<Vec<_>>();
1334 ranked.sort_by_key(|(idx, _target)| {
1335 (
1336 latencies.get(*idx).copied().flatten().unwrap_or(u128::MAX),
1337 *idx,
1338 )
1339 });
1340
1341 let mut output = ranked
1342 .into_iter()
1343 .map(|(_idx, target)| target)
1344 .collect::<Vec<_>>();
1345 output.extend(targets.iter().skip(probe_count).cloned());
1346 output
1347}
1348
1349async fn probe_target_latency(
1351 target: &LeaderTarget,
1352 probe_port: Option<u16>,
1353 probe_timeout: Duration,
1354) -> Option<u128> {
1355 let mut ports = vec![target.tpu_addr.port()];
1356 if let Some(port) = probe_port
1357 && port != target.tpu_addr.port()
1358 {
1359 ports.push(port);
1360 }
1361
1362 let ip = target.tpu_addr.ip();
1363 let mut best = None::<u128>;
1364 for port in ports {
1365 if let Some(latency) = probe_tcp_latency(ip, port, probe_timeout).await {
1366 best = Some(best.map_or(latency, |current| current.min(latency)));
1367 }
1368 }
1369 best
1370}
1371
1372async fn probe_tcp_latency(
1374 ip: std::net::IpAddr,
1375 port: u16,
1376 timeout_duration: Duration,
1377) -> Option<u128> {
1378 let start = Instant::now();
1379 let addr = SocketAddr::new(ip, port);
1380 let stream = timeout(timeout_duration, TcpStream::connect(addr))
1381 .await
1382 .ok()?
1383 .ok()?;
1384 drop(stream);
1385 Some(start.elapsed().as_millis())
1386}
1387
1388fn summarize_targets(targets: &[LeaderTarget]) -> (usize, usize) {
1390 let selected_target_count = targets.len();
1391 let selected_identity_count = targets
1392 .iter()
1393 .filter_map(|target| target.identity)
1394 .collect::<HashSet<_>>()
1395 .len();
1396 (selected_target_count, selected_identity_count)
1397}
1398
1399fn rotate_targets_for_attempt(
1401 targets: &mut [LeaderTarget],
1402 attempt_idx: usize,
1403 policy: RoutingPolicy,
1404) {
1405 if attempt_idx == 0 || targets.len() <= 1 {
1406 return;
1407 }
1408
1409 let normalized = policy.normalized();
1410 let stride = normalized.max_parallel_sends.max(1);
1411 let rotation = attempt_idx
1412 .saturating_mul(stride)
1413 .checked_rem(targets.len())
1414 .unwrap_or(0);
1415 if rotation > 0 {
1416 targets.rotate_left(rotation);
1417 }
1418}
1419
1420fn direct_attempt_timeout(direct_config: &DirectSubmitConfig) -> Duration {
1422 direct_config
1423 .global_timeout
1424 .saturating_add(direct_config.per_target_timeout)
1425 .saturating_add(direct_config.rebroadcast_interval)
1426 .max(Duration::from_secs(8))
1427}