1use std::{
4 sync::{Arc, Mutex},
5 time::{Duration, Instant},
6};
7
8use solana_signature::Signature;
9use solana_signer::signers::Signers;
10use solana_transaction::versioned::VersionedTransaction;
11
12use super::{
13 DirectSubmitConfig, DirectSubmitTransport, RpcSubmitConfig, RpcSubmitTransport, SignedTx,
14 SubmitError, SubmitMode, SubmitReliability, SubmitResult,
15};
16use crate::{
17 builder::TxBuilder,
18 providers::{LeaderProvider, LeaderTarget, RecentBlockhashProvider},
19 routing::{RoutingPolicy, SignatureDeduper, select_targets},
20};
21
22pub struct TxSubmitClient {
24 blockhash_provider: Arc<dyn RecentBlockhashProvider>,
26 leader_provider: Arc<dyn LeaderProvider>,
28 backups: Vec<LeaderTarget>,
30 policy: RoutingPolicy,
32 deduper: Mutex<SignatureDeduper>,
34 rpc_transport: Option<Arc<dyn RpcSubmitTransport>>,
36 direct_transport: Option<Arc<dyn DirectSubmitTransport>>,
38 rpc_config: RpcSubmitConfig,
40 direct_config: DirectSubmitConfig,
42}
43
44impl TxSubmitClient {
45 #[must_use]
47 pub fn new(
48 blockhash_provider: Arc<dyn RecentBlockhashProvider>,
49 leader_provider: Arc<dyn LeaderProvider>,
50 ) -> Self {
51 Self {
52 blockhash_provider,
53 leader_provider,
54 backups: Vec::new(),
55 policy: RoutingPolicy::default(),
56 deduper: Mutex::new(SignatureDeduper::new(Duration::from_secs(10))),
57 rpc_transport: None,
58 direct_transport: None,
59 rpc_config: RpcSubmitConfig::default(),
60 direct_config: DirectSubmitConfig::default(),
61 }
62 }
63
64 #[must_use]
66 pub fn with_backups(mut self, backups: Vec<LeaderTarget>) -> Self {
67 self.backups = backups;
68 self
69 }
70
71 #[must_use]
73 pub fn with_routing_policy(mut self, policy: RoutingPolicy) -> Self {
74 self.policy = policy.normalized();
75 self
76 }
77
78 #[must_use]
80 pub fn with_dedupe_ttl(mut self, ttl: Duration) -> Self {
81 self.deduper = Mutex::new(SignatureDeduper::new(ttl));
82 self
83 }
84
85 #[must_use]
87 pub fn with_rpc_transport(mut self, transport: Arc<dyn RpcSubmitTransport>) -> Self {
88 self.rpc_transport = Some(transport);
89 self
90 }
91
92 #[must_use]
94 pub fn with_direct_transport(mut self, transport: Arc<dyn DirectSubmitTransport>) -> Self {
95 self.direct_transport = Some(transport);
96 self
97 }
98
99 #[must_use]
101 pub fn with_rpc_config(mut self, config: RpcSubmitConfig) -> Self {
102 self.rpc_config = config;
103 self
104 }
105
106 #[must_use]
108 pub const fn with_direct_config(mut self, config: DirectSubmitConfig) -> Self {
109 self.direct_config = config.normalized();
110 self
111 }
112
113 #[must_use]
115 pub const fn with_reliability(mut self, reliability: SubmitReliability) -> Self {
116 self.direct_config = DirectSubmitConfig::from_reliability(reliability);
117 self
118 }
119
120 pub async fn submit_builder<T>(
127 &self,
128 builder: TxBuilder,
129 signers: &T,
130 mode: SubmitMode,
131 ) -> Result<SubmitResult, SubmitError>
132 where
133 T: Signers + ?Sized,
134 {
135 let blockhash = self
136 .blockhash_provider
137 .latest_blockhash()
138 .ok_or(SubmitError::MissingRecentBlockhash)?;
139 let tx = builder
140 .build_and_sign(blockhash, signers)
141 .map_err(|source| SubmitError::Build { source })?;
142 self.submit_transaction(tx, mode).await
143 }
144
145 pub async fn submit_transaction(
151 &self,
152 tx: VersionedTransaction,
153 mode: SubmitMode,
154 ) -> Result<SubmitResult, SubmitError> {
155 let signature = tx.signatures.first().copied();
156 let tx_bytes =
157 bincode::serialize(&tx).map_err(|source| SubmitError::DecodeSignedBytes { source })?;
158 self.submit_bytes(tx_bytes, signature, mode).await
159 }
160
161 pub async fn submit_signed(
167 &self,
168 signed_tx: SignedTx,
169 mode: SubmitMode,
170 ) -> Result<SubmitResult, SubmitError> {
171 let tx_bytes = match signed_tx {
172 SignedTx::VersionedTransactionBytes(bytes) => bytes,
173 SignedTx::WireTransactionBytes(bytes) => bytes,
174 };
175 let tx: VersionedTransaction = bincode::deserialize(&tx_bytes)
176 .map_err(|source| SubmitError::DecodeSignedBytes { source })?;
177 let signature = tx.signatures.first().copied();
178 self.submit_bytes(tx_bytes, signature, mode).await
179 }
180
181 async fn submit_bytes(
183 &self,
184 tx_bytes: Vec<u8>,
185 signature: Option<Signature>,
186 mode: SubmitMode,
187 ) -> Result<SubmitResult, SubmitError> {
188 self.enforce_dedupe(signature)?;
189 match mode {
190 SubmitMode::RpcOnly => self.submit_rpc_only(tx_bytes, signature, mode).await,
191 SubmitMode::DirectOnly => self.submit_direct_only(tx_bytes, signature, mode).await,
192 SubmitMode::Hybrid => self.submit_hybrid(tx_bytes, signature, mode).await,
193 }
194 }
195
196 fn enforce_dedupe(&self, signature: Option<Signature>) -> Result<(), SubmitError> {
198 if let Some(signature) = signature {
199 let now = Instant::now();
200 let mut deduper =
201 self.deduper
202 .lock()
203 .map_err(|poisoned| SubmitError::InternalSync {
204 message: poisoned.to_string(),
205 })?;
206 if !deduper.check_and_insert(signature, now) {
207 return Err(SubmitError::DuplicateSignature);
208 }
209 }
210 Ok(())
211 }
212
213 async fn submit_rpc_only(
215 &self,
216 tx_bytes: Vec<u8>,
217 signature: Option<Signature>,
218 mode: SubmitMode,
219 ) -> Result<SubmitResult, SubmitError> {
220 let rpc = self
221 .rpc_transport
222 .as_ref()
223 .ok_or(SubmitError::MissingRpcTransport)?;
224 let rpc_signature = rpc
225 .submit_rpc(&tx_bytes, &self.rpc_config)
226 .await
227 .map_err(|source| SubmitError::Rpc { source })?;
228 Ok(SubmitResult {
229 signature,
230 mode,
231 direct_target: None,
232 rpc_signature: Some(rpc_signature),
233 used_rpc_fallback: false,
234 })
235 }
236
237 async fn submit_direct_only(
239 &self,
240 tx_bytes: Vec<u8>,
241 signature: Option<Signature>,
242 mode: SubmitMode,
243 ) -> Result<SubmitResult, SubmitError> {
244 let direct = self
245 .direct_transport
246 .as_ref()
247 .ok_or(SubmitError::MissingDirectTransport)?;
248 let targets = select_targets(self.leader_provider.as_ref(), &self.backups, self.policy);
249 if targets.is_empty() {
250 return Err(SubmitError::NoDirectTargets);
251 }
252 let direct_config = self.direct_config.clone().normalized();
253 let target = direct
254 .submit_direct(&tx_bytes, &targets, self.policy, &direct_config)
255 .await
256 .map_err(|source| SubmitError::Direct { source })?;
257 Ok(SubmitResult {
258 signature,
259 mode,
260 direct_target: Some(target),
261 rpc_signature: None,
262 used_rpc_fallback: false,
263 })
264 }
265
266 async fn submit_hybrid(
268 &self,
269 tx_bytes: Vec<u8>,
270 signature: Option<Signature>,
271 mode: SubmitMode,
272 ) -> Result<SubmitResult, SubmitError> {
273 let direct = self
274 .direct_transport
275 .as_ref()
276 .ok_or(SubmitError::MissingDirectTransport)?;
277 let rpc = self
278 .rpc_transport
279 .as_ref()
280 .ok_or(SubmitError::MissingRpcTransport)?;
281
282 let direct_config = self.direct_config.clone().normalized();
283 let targets = select_targets(self.leader_provider.as_ref(), &self.backups, self.policy);
284 if !targets.is_empty() {
285 for _ in 0..direct_config.hybrid_direct_attempts {
286 if let Ok(target) = direct
287 .submit_direct(&tx_bytes, &targets, self.policy, &direct_config)
288 .await
289 {
290 return Ok(SubmitResult {
291 signature,
292 mode,
293 direct_target: Some(target),
294 rpc_signature: None,
295 used_rpc_fallback: false,
296 });
297 }
298 }
299 }
300
301 let rpc_signature = rpc
302 .submit_rpc(&tx_bytes, &self.rpc_config)
303 .await
304 .map_err(|source| SubmitError::Rpc { source })?;
305 Ok(SubmitResult {
306 signature,
307 mode,
308 direct_target: None,
309 rpc_signature: Some(rpc_signature),
310 used_rpc_fallback: true,
311 })
312 }
313}