1use std::{
4 collections::HashSet,
5 net::SocketAddr,
6 sync::Arc,
7 time::{Duration, Instant},
8};
9
10use solana_signature::Signature;
11use solana_signer::signers::Signers;
12use solana_transaction::versioned::VersionedTransaction;
13use tokio::{
14 net::TcpStream,
15 task::JoinSet,
16 time::{sleep, timeout},
17};
18
19use super::{
20 DirectSubmitConfig, DirectSubmitTransport, RpcSubmitConfig, RpcSubmitTransport, SignedTx,
21 SubmitError, SubmitMode, SubmitReliability, SubmitResult,
22};
23use crate::{
24 builder::TxBuilder,
25 providers::{LeaderProvider, LeaderTarget, RecentBlockhashProvider},
26 routing::{RoutingPolicy, SignatureDeduper, select_targets},
27};
28
29pub struct TxSubmitClient {
31 blockhash_provider: Arc<dyn RecentBlockhashProvider>,
33 leader_provider: Arc<dyn LeaderProvider>,
35 backups: Vec<LeaderTarget>,
37 policy: RoutingPolicy,
39 deduper: SignatureDeduper,
41 rpc_transport: Option<Arc<dyn RpcSubmitTransport>>,
43 direct_transport: Option<Arc<dyn DirectSubmitTransport>>,
45 rpc_config: RpcSubmitConfig,
47 direct_config: DirectSubmitConfig,
49}
50
51impl TxSubmitClient {
52 #[must_use]
54 pub fn new(
55 blockhash_provider: Arc<dyn RecentBlockhashProvider>,
56 leader_provider: Arc<dyn LeaderProvider>,
57 ) -> Self {
58 Self {
59 blockhash_provider,
60 leader_provider,
61 backups: Vec::new(),
62 policy: RoutingPolicy::default(),
63 deduper: SignatureDeduper::new(Duration::from_secs(10)),
64 rpc_transport: None,
65 direct_transport: None,
66 rpc_config: RpcSubmitConfig::default(),
67 direct_config: DirectSubmitConfig::default(),
68 }
69 }
70
71 #[must_use]
73 pub fn with_backups(mut self, backups: Vec<LeaderTarget>) -> Self {
74 self.backups = backups;
75 self
76 }
77
78 #[must_use]
80 pub fn with_routing_policy(mut self, policy: RoutingPolicy) -> Self {
81 self.policy = policy.normalized();
82 self
83 }
84
85 #[must_use]
87 pub fn with_dedupe_ttl(mut self, ttl: Duration) -> Self {
88 self.deduper = SignatureDeduper::new(ttl);
89 self
90 }
91
92 #[must_use]
94 pub fn with_rpc_transport(mut self, transport: Arc<dyn RpcSubmitTransport>) -> Self {
95 self.rpc_transport = Some(transport);
96 self
97 }
98
99 #[must_use]
101 pub fn with_direct_transport(mut self, transport: Arc<dyn DirectSubmitTransport>) -> Self {
102 self.direct_transport = Some(transport);
103 self
104 }
105
106 #[must_use]
108 pub fn with_rpc_config(mut self, config: RpcSubmitConfig) -> Self {
109 self.rpc_config = config;
110 self
111 }
112
113 #[must_use]
115 pub const fn with_direct_config(mut self, config: DirectSubmitConfig) -> Self {
116 self.direct_config = config.normalized();
117 self
118 }
119
120 #[must_use]
122 pub const fn with_reliability(mut self, reliability: SubmitReliability) -> Self {
123 self.direct_config = DirectSubmitConfig::from_reliability(reliability);
124 self
125 }
126
127 pub async fn submit_builder<T>(
134 &mut self,
135 builder: TxBuilder,
136 signers: &T,
137 mode: SubmitMode,
138 ) -> Result<SubmitResult, SubmitError>
139 where
140 T: Signers + ?Sized,
141 {
142 let blockhash = self
143 .blockhash_provider
144 .latest_blockhash()
145 .ok_or(SubmitError::MissingRecentBlockhash)?;
146 let tx = builder
147 .build_and_sign(blockhash, signers)
148 .map_err(|source| SubmitError::Build { source })?;
149 self.submit_transaction(tx, mode).await
150 }
151
152 pub async fn submit_transaction(
158 &mut self,
159 tx: VersionedTransaction,
160 mode: SubmitMode,
161 ) -> Result<SubmitResult, SubmitError> {
162 let signature = tx.signatures.first().copied();
163 let tx_bytes =
164 bincode::serialize(&tx).map_err(|source| SubmitError::DecodeSignedBytes { source })?;
165 self.submit_bytes(tx_bytes, signature, mode).await
166 }
167
168 pub async fn submit_signed(
174 &mut self,
175 signed_tx: SignedTx,
176 mode: SubmitMode,
177 ) -> Result<SubmitResult, SubmitError> {
178 let tx_bytes = match signed_tx {
179 SignedTx::VersionedTransactionBytes(bytes) => bytes,
180 SignedTx::WireTransactionBytes(bytes) => bytes,
181 };
182 let tx: VersionedTransaction = bincode::deserialize(&tx_bytes)
183 .map_err(|source| SubmitError::DecodeSignedBytes { source })?;
184 let signature = tx.signatures.first().copied();
185 self.submit_bytes(tx_bytes, signature, mode).await
186 }
187
188 async fn submit_bytes(
190 &mut self,
191 tx_bytes: Vec<u8>,
192 signature: Option<Signature>,
193 mode: SubmitMode,
194 ) -> Result<SubmitResult, SubmitError> {
195 self.enforce_dedupe(signature)?;
196 match mode {
197 SubmitMode::RpcOnly => self.submit_rpc_only(tx_bytes, signature, mode).await,
198 SubmitMode::DirectOnly => self.submit_direct_only(tx_bytes, signature, mode).await,
199 SubmitMode::Hybrid => self.submit_hybrid(tx_bytes, signature, mode).await,
200 }
201 }
202
203 fn enforce_dedupe(&mut self, signature: Option<Signature>) -> Result<(), SubmitError> {
205 if let Some(signature) = signature {
206 let now = Instant::now();
207 if !self.deduper.check_and_insert(signature, now) {
208 return Err(SubmitError::DuplicateSignature);
209 }
210 }
211 Ok(())
212 }
213
214 async fn submit_rpc_only(
216 &self,
217 tx_bytes: Vec<u8>,
218 signature: Option<Signature>,
219 mode: SubmitMode,
220 ) -> Result<SubmitResult, SubmitError> {
221 let rpc = self
222 .rpc_transport
223 .as_ref()
224 .ok_or(SubmitError::MissingRpcTransport)?;
225 let rpc_signature = rpc
226 .submit_rpc(&tx_bytes, &self.rpc_config)
227 .await
228 .map_err(|source| SubmitError::Rpc { source })?;
229 Ok(SubmitResult {
230 signature,
231 mode,
232 direct_target: None,
233 rpc_signature: Some(rpc_signature),
234 used_rpc_fallback: false,
235 selected_target_count: 0,
236 selected_identity_count: 0,
237 })
238 }
239
240 async fn submit_direct_only(
242 &self,
243 tx_bytes: Vec<u8>,
244 signature: Option<Signature>,
245 mode: SubmitMode,
246 ) -> Result<SubmitResult, SubmitError> {
247 let direct = self
248 .direct_transport
249 .as_ref()
250 .ok_or(SubmitError::MissingDirectTransport)?;
251 let direct_config = self.direct_config.clone().normalized();
252 let mut last_error = None;
253 let attempt_timeout = direct_attempt_timeout(&direct_config);
254
255 for attempt_idx in 0..direct_config.direct_submit_attempts {
256 let mut targets = self.select_direct_targets(&direct_config).await;
257 rotate_targets_for_attempt(&mut targets, attempt_idx, self.policy);
258 let (selected_target_count, selected_identity_count) = summarize_targets(&targets);
259 if targets.is_empty() {
260 return Err(SubmitError::NoDirectTargets);
261 }
262 match timeout(
263 attempt_timeout,
264 direct.submit_direct(&tx_bytes, &targets, self.policy, &direct_config),
265 )
266 .await
267 {
268 Ok(Ok(target)) => {
269 self.spawn_agave_rebroadcast(tx_bytes.clone(), &direct_config);
270 return Ok(SubmitResult {
271 signature,
272 mode,
273 direct_target: Some(target),
274 rpc_signature: None,
275 used_rpc_fallback: false,
276 selected_target_count,
277 selected_identity_count,
278 });
279 }
280 Ok(Err(source)) => last_error = Some(source),
281 Err(_elapsed) => {
282 last_error = Some(super::SubmitTransportError::Failure {
283 message: format!(
284 "direct submit attempt timed out after {}ms",
285 attempt_timeout.as_millis()
286 ),
287 });
288 }
289 }
290 if attempt_idx < direct_config.direct_submit_attempts.saturating_sub(1) {
291 sleep(direct_config.rebroadcast_interval).await;
292 }
293 }
294
295 Err(SubmitError::Direct {
296 source: last_error.unwrap_or_else(|| super::SubmitTransportError::Failure {
297 message: "direct submit attempts exhausted".to_owned(),
298 }),
299 })
300 }
301
302 async fn submit_hybrid(
304 &self,
305 tx_bytes: Vec<u8>,
306 signature: Option<Signature>,
307 mode: SubmitMode,
308 ) -> Result<SubmitResult, SubmitError> {
309 let direct = self
310 .direct_transport
311 .as_ref()
312 .ok_or(SubmitError::MissingDirectTransport)?;
313 let rpc = self
314 .rpc_transport
315 .as_ref()
316 .ok_or(SubmitError::MissingRpcTransport)?;
317
318 let direct_config = self.direct_config.clone().normalized();
319 let attempt_timeout = direct_attempt_timeout(&direct_config);
320 for attempt_idx in 0..direct_config.hybrid_direct_attempts {
321 let mut targets = self.select_direct_targets(&direct_config).await;
322 rotate_targets_for_attempt(&mut targets, attempt_idx, self.policy);
323 let (selected_target_count, selected_identity_count) = summarize_targets(&targets);
324 if targets.is_empty() {
325 break;
326 }
327 if let Ok(Ok(target)) = timeout(
328 attempt_timeout,
329 direct.submit_direct(&tx_bytes, &targets, self.policy, &direct_config),
330 )
331 .await
332 {
333 self.spawn_agave_rebroadcast(tx_bytes.clone(), &direct_config);
334 if direct_config.hybrid_rpc_broadcast
335 && let Ok(rpc_signature) = rpc.submit_rpc(&tx_bytes, &self.rpc_config).await
336 {
337 return Ok(SubmitResult {
338 signature,
339 mode,
340 direct_target: Some(target),
341 rpc_signature: Some(rpc_signature),
342 used_rpc_fallback: false,
343 selected_target_count,
344 selected_identity_count,
345 });
346 }
347 return Ok(SubmitResult {
348 signature,
349 mode,
350 direct_target: Some(target),
351 rpc_signature: None,
352 used_rpc_fallback: false,
353 selected_target_count,
354 selected_identity_count,
355 });
356 }
357 if attempt_idx < direct_config.hybrid_direct_attempts.saturating_sub(1) {
358 sleep(direct_config.rebroadcast_interval).await;
359 }
360 }
361
362 let rpc_signature = rpc
363 .submit_rpc(&tx_bytes, &self.rpc_config)
364 .await
365 .map_err(|source| SubmitError::Rpc { source })?;
366 Ok(SubmitResult {
367 signature,
368 mode,
369 direct_target: None,
370 rpc_signature: Some(rpc_signature),
371 used_rpc_fallback: true,
372 selected_target_count: 0,
373 selected_identity_count: 0,
374 })
375 }
376
377 async fn select_direct_targets(&self, direct_config: &DirectSubmitConfig) -> Vec<LeaderTarget> {
379 select_and_rank_targets(
380 self.leader_provider.as_ref(),
381 &self.backups,
382 self.policy,
383 direct_config,
384 )
385 .await
386 }
387
388 fn spawn_agave_rebroadcast(&self, tx_bytes: Vec<u8>, direct_config: &DirectSubmitConfig) {
390 if !direct_config.agave_rebroadcast_enabled
391 || direct_config.agave_rebroadcast_window.is_zero()
392 {
393 return;
394 }
395 let Some(direct_transport) = self.direct_transport.clone() else {
396 return;
397 };
398 spawn_agave_rebroadcast_task(
399 tx_bytes,
400 direct_transport,
401 self.leader_provider.clone(),
402 self.backups.clone(),
403 self.policy,
404 direct_config.clone(),
405 );
406 }
407}
408
409#[cfg(not(test))]
410fn spawn_agave_rebroadcast_task(
412 tx_bytes: Vec<u8>,
413 direct_transport: Arc<dyn DirectSubmitTransport>,
414 leader_provider: Arc<dyn LeaderProvider>,
415 backups: Vec<LeaderTarget>,
416 policy: RoutingPolicy,
417 direct_config: DirectSubmitConfig,
418) {
419 tokio::spawn(async move {
420 let deadline = Instant::now()
421 .checked_add(direct_config.agave_rebroadcast_window)
422 .unwrap_or_else(Instant::now);
423 loop {
424 let now = Instant::now();
425 if now >= deadline {
426 break;
427 }
428
429 let sleep_for = deadline
430 .saturating_duration_since(now)
431 .min(direct_config.agave_rebroadcast_interval);
432 if !sleep_for.is_zero() {
433 sleep(sleep_for).await;
434 }
435
436 if Instant::now() >= deadline {
437 break;
438 }
439
440 let targets = select_and_rank_targets(
441 leader_provider.as_ref(),
442 backups.as_slice(),
443 policy,
444 &direct_config,
445 )
446 .await;
447 if targets.is_empty() {
448 continue;
449 }
450
451 drop(
452 timeout(
453 direct_attempt_timeout(&direct_config),
454 direct_transport.submit_direct(&tx_bytes, &targets, policy, &direct_config),
455 )
456 .await,
457 );
458 }
459 });
460}
461
462#[cfg(test)]
463fn spawn_agave_rebroadcast_task(
465 _tx_bytes: Vec<u8>,
466 _direct_transport: Arc<dyn DirectSubmitTransport>,
467 _leader_provider: Arc<dyn LeaderProvider>,
468 _backups: Vec<LeaderTarget>,
469 _policy: RoutingPolicy,
470 _direct_config: DirectSubmitConfig,
471) {
472}
473
474async fn select_and_rank_targets(
476 leader_provider: &dyn LeaderProvider,
477 backups: &[LeaderTarget],
478 policy: RoutingPolicy,
479 direct_config: &DirectSubmitConfig,
480) -> Vec<LeaderTarget> {
481 let targets = select_targets(leader_provider, backups, policy);
482 rank_targets_by_latency(targets, direct_config).await
483}
484
485async fn rank_targets_by_latency(
487 targets: Vec<LeaderTarget>,
488 direct_config: &DirectSubmitConfig,
489) -> Vec<LeaderTarget> {
490 if targets.len() <= 1 || !direct_config.latency_aware_targeting {
491 return targets;
492 }
493
494 let probe_count = targets
495 .len()
496 .min(direct_config.latency_probe_max_targets.max(1));
497 let mut latencies = vec![None; probe_count];
498 let mut probes = JoinSet::new();
499 for (idx, target) in targets.iter().take(probe_count).cloned().enumerate() {
500 let cfg = direct_config.clone();
501 probes.spawn(async move { (idx, probe_target_latency(&target, &cfg).await) });
502 }
503 while let Some(result) = probes.join_next().await {
504 if let Ok((idx, latency)) = result
505 && idx < latencies.len()
506 && let Some(slot) = latencies.get_mut(idx)
507 {
508 *slot = latency;
509 }
510 }
511
512 let mut ranked = targets
513 .iter()
514 .take(probe_count)
515 .cloned()
516 .enumerate()
517 .collect::<Vec<_>>();
518 ranked.sort_by_key(|(idx, _target)| {
519 (
520 latencies.get(*idx).copied().flatten().unwrap_or(u128::MAX),
521 *idx,
522 )
523 });
524
525 let mut output = ranked
526 .into_iter()
527 .map(|(_idx, target)| target)
528 .collect::<Vec<_>>();
529 output.extend(targets.iter().skip(probe_count).cloned());
530 output
531}
532
533async fn probe_target_latency(
535 target: &LeaderTarget,
536 direct_config: &DirectSubmitConfig,
537) -> Option<u128> {
538 let mut ports = vec![target.tpu_addr.port()];
539 if let Some(port) = direct_config.latency_probe_port
540 && port != target.tpu_addr.port()
541 {
542 ports.push(port);
543 }
544
545 let ip = target.tpu_addr.ip();
546 let mut best = None::<u128>;
547 for port in ports {
548 if let Some(latency) =
549 probe_tcp_latency(ip, port, direct_config.latency_probe_timeout).await
550 {
551 best = Some(best.map_or(latency, |current| current.min(latency)));
552 }
553 }
554 best
555}
556
557async fn probe_tcp_latency(
559 ip: std::net::IpAddr,
560 port: u16,
561 timeout_duration: Duration,
562) -> Option<u128> {
563 let start = Instant::now();
564 let addr = SocketAddr::new(ip, port);
565 let stream = timeout(timeout_duration, TcpStream::connect(addr))
566 .await
567 .ok()?
568 .ok()?;
569 drop(stream);
570 Some(start.elapsed().as_millis())
571}
572
573fn summarize_targets(targets: &[LeaderTarget]) -> (usize, usize) {
575 let selected_target_count = targets.len();
576 let selected_identity_count = targets
577 .iter()
578 .filter_map(|target| target.identity)
579 .collect::<HashSet<_>>()
580 .len();
581 (selected_target_count, selected_identity_count)
582}
583
584fn rotate_targets_for_attempt(
586 targets: &mut [LeaderTarget],
587 attempt_idx: usize,
588 policy: RoutingPolicy,
589) {
590 if attempt_idx == 0 || targets.len() <= 1 {
591 return;
592 }
593
594 let normalized = policy.normalized();
595 let stride = normalized.max_parallel_sends.max(1);
596 let rotation = attempt_idx
597 .saturating_mul(stride)
598 .checked_rem(targets.len())
599 .unwrap_or(0);
600 if rotation > 0 {
601 targets.rotate_left(rotation);
602 }
603}
604
605fn direct_attempt_timeout(direct_config: &DirectSubmitConfig) -> Duration {
607 direct_config
608 .global_timeout
609 .saturating_add(direct_config.per_target_timeout)
610 .saturating_add(direct_config.rebroadcast_interval)
611 .max(Duration::from_secs(8))
612}