noierrdev_antoine_tx_engine/
lib.rs1use std::{
2 str::FromStr,
3 sync::{
4 Arc,
5 },
6 time::Instant
7};
8use reqwest::{
9 Client as HttpClient,
10};
11use serde_json::{
12 json,
13 Value
14};
15use solana_client::{
16 rpc_client::RpcClient,
17};
18use solana_sdk::{
19 signature::{Keypair,Signature,Signer},
20 transaction::{ VersionedTransaction},
21 instruction::{Instruction},
22 hash::Hash,
23 pubkey::Pubkey
24};
25use tokio::{
26 time::{sleep, Duration},
27 sync::{
28 RwLock,
29 }
30};
31use dashmap::{DashMap,DashSet};
32use rand::{
33 SeedableRng,
34 Rng,
35 rngs::StdRng
36};
37pub mod tx;
38
39pub mod jito;
40pub mod zero_slot;
41pub mod astralane;
42pub mod helius;
43pub mod temperal;
44pub mod node1;
45pub mod blockrazor;
46pub mod flashblock;
47pub mod stellium;
48
49pub mod tip;
50pub mod general;
51pub mod bundle;
52pub mod nonce;
53pub mod send;
54
55pub struct TxEngine {
56 pub rpc_client : Arc<RpcClient>,
57 pub http_client : Arc<HttpClient>,
58 pub recent_blockhash : Arc<RwLock<Hash>>,
59 pub tx_meta_pool : Arc<DashMap<[u8;64], Value>>
60}
61
62
63impl TxEngine {
64 pub fn new(
65 rpc_client : &Arc<RpcClient>,
66 http_client : &Arc<HttpClient>,
67 tx_meta_pool : Arc<DashMap<[u8;64], Value>>
68 )
69 ->Self
70 {
71 let recent_blockhash : Hash = rpc_client.get_latest_blockhash().unwrap();
72 let recent_blockhash_arc=Arc::new(RwLock::new(recent_blockhash));
73 tokio::spawn({
74 let rpc_client = rpc_client.clone();
75 let recent_blockhash_arc = recent_blockhash_arc.clone();
76 async move {
77 loop {
78 match rpc_client.get_latest_blockhash() {
79 Ok(new_hash) => {
80 *recent_blockhash_arc.write().await = new_hash;
81 }
82 Err(err) => {
83 println!("Failed to refresh blockhash: {:?}", err);
84 }
85 }
86 sleep(Duration::from_secs(1)).await;
87 }
88 }
89 });
90 Self {
91 rpc_client : rpc_client.clone(),
92 http_client : http_client.clone(),
93 recent_blockhash : recent_blockhash_arc,
94 tx_meta_pool : tx_meta_pool
95 }
96 }
97
98 pub async fn get_blockhash(&self)
99 ->Hash
100 {
101 let recent_blockhash = *self.recent_blockhash.read().await;
102 recent_blockhash
103 }
104
105 pub async fn send_sequential(&self, sender_url : &str, txs : &Vec<VersionedTransaction>, retry : u64){
106 for one_tx in txs {
107 let hash = one_tx.signatures[0];
108 let mut hash_bytes = [0u8; 64];
109 hash_bytes.copy_from_slice(hash.as_ref());
110 let mut tx_meta : Value=Value::Null;
111 self.tx_meta_pool.insert(hash_bytes, Value::Null);
112 let mut success = false;
113 for i in 0..retry {
114 send::send_url_versioned(&self.http_client, &sender_url, &one_tx).await;
115 let mut await_counter = 0;
116 loop {
117 let value_opt = self.tx_meta_pool.get(&hash_bytes)
118 .map(|v| v.clone());
119 match value_opt {
120 Some(val) => {
121 if val == Value::Null {
122 await_counter += 1;
123 } else {
124 success=true;
125 tx_meta = val;
126 self.tx_meta_pool.remove(&hash_bytes);
127 break;
128 }
129 }
130 None => {
131 await_counter += 1;
132 }
133 }
134 if await_counter > 150 {
135 break;
136 }
137 tokio::task::yield_now().await;
138 sleep(Duration::from_millis(5)).await;
139 }
140 if success == true{
141 break;
142 }
143 }
144 if !success {
145 println!("Failed to land sequently!");
146 break;
147 }
148 }
149 }
150
151 pub fn send_rpc_versioned(&self, tx : VersionedTransaction)
152 ->String
153 {
154 let tx_hash = send::send_rpc_versioned(&self.rpc_client, &tx);
155 tx_hash
156 }
157
158 pub async fn send_url_versioned(&self, sender_url : &str, tx : VersionedTransaction)
159 ->String
160 {
161 let tx_hash = send::send_url_versioned(&self.http_client, &sender_url, &tx).await;
162 tx_hash
163 }
164
165 pub async fn send_url_text(&self, sender_url : &str, tx : VersionedTransaction){
166 send::send_url_text(&self.http_client, &sender_url, &tx).await;
167 }
168
169 pub fn get_nonce(&self, nonce_account : &Pubkey)
170 ->Hash
171 {
172 let hash :Hash = nonce::get_nonce(&self.rpc_client, &nonce_account);
173 hash
174 }
175
176 pub fn get_nonce_value(&self, nonce_account : &Pubkey)
177 ->Hash
178 {
179 let nonce_hash = nonce::get_nonce(&self.rpc_client, &nonce_account);
180 nonce_hash
181 }
182
183 pub async fn send_with_durable_nonce(
184 &self,
185 wallets : Vec<Arc<Keypair>>,
186 payer : &Pubkey,
187 mut ixs : Vec<Instruction>,
188 nonce_account : &Pubkey,
189 jito_sender_url : &str,
190 astralane_sender_url : &str,
191 zero_slot_sender_url : &str,
192 tip_amount : u64,
193 nonce_hash : Hash
194 )
195 {
196 ixs.insert(0,nonce::build_advance_nonce_instruction(&payer, &nonce_account));
197
198 let ixs = Arc::new(ixs);
199 let wallets = Arc::new(wallets);
200
201 let payer = *payer;
202 let http = self.http_client.clone();
203 let mut tasks = Vec::with_capacity(4);
204
205 macro_rules! spawn_sender {
206 ($builder:path, $sender:path, $url:expr) => {{
207 let http = http.clone();
208 let wallets = wallets.clone();
209 let ixs = ixs.clone();
210 if $url == "" {
211 return;
212 }
213 let url = $url.to_string();
214 let nonce_hash = nonce_hash.clone();
215 tokio::spawn(async move {
216 let tx = $builder(
217 (*wallets).clone(),
218 &payer,
219 (*ixs).clone(),
220 tip_amount,
221 nonce_hash,
222 );
223 $sender(&http, &url, &tx).await;
224 })
225 }};
226 }
227 if jito_sender_url != "" {
228 tasks.push(spawn_sender!(jito::build_tx_with_jito_tip, send::send_url_versioned, jito_sender_url));
229 }
230 if astralane_sender_url != ""{
231 tasks.push(spawn_sender!(astralane::build_tx_with_astralane_tip, send::send_binary_versioned, astralane_sender_url));
232 }
233 if zero_slot_sender_url != "" {
234 tasks.push(spawn_sender!(zero_slot::build_tx_with_zero_slot, send::send_binary_versioned, zero_slot_sender_url));
235 }
236 futures::future::join_all(tasks).await;
237 }
238
239 pub async fn send_json_for_tx_meta(&self, sender_url : &str, tx : &VersionedTransaction)
240 ->Value
241 {
242 let mut meta : Value=Value::Null;
243 let hash = tx.signatures[0];
244 let mut hash_bytes = [0u8; 64];
245 hash_bytes.copy_from_slice(hash.as_ref());
246 let start = Instant::now();
247 self.tx_meta_pool.insert(hash_bytes, Value::Null);
248 send::send_url_versioned(
249 &self.http_client,
250 &sender_url,
251 &tx
252 ).await;
253 let mut await_counter = 0;
254 let mut success:bool=false;
255 loop {
256 let value_opt = self.tx_meta_pool.get(&hash_bytes)
257 .map(|v| v.clone());
258 match value_opt {
259 Some(val) => {
260 if val == Value::Null {
261 await_counter += 1;
262 } else {
263 success=true;
264 meta = val;
265 self.tx_meta_pool.remove(&hash_bytes);
266 break;
267 }
268 }
269 None => {
270 await_counter += 1;
271 }
272 }
273 if await_counter > 500 {
274 break;
275 }
276 tokio::task::yield_now().await;
277 sleep(Duration::from_millis(1)).await;
278 }
279 let elapsed = start.elapsed();
280 println!("Elapsed: {:?}", elapsed);
281 meta
282 }
283
284
285 pub async fn send_binary_for_tx_meta(&self, sender_url : &str, tx : &VersionedTransaction)
286 ->Value
287 {
288 let mut meta : Value=Value::Null;
289 let hash = tx.signatures[0];
290 let mut hash_bytes = [0u8; 64];
291 hash_bytes.copy_from_slice(hash.as_ref());
292 let start = Instant::now();
293 self.tx_meta_pool.insert(hash_bytes, Value::Null);
294 send::send_binary_versioned(
295 &self.http_client,
296 &sender_url,
297 &tx
298 ).await;
299 let mut await_counter = 0;
300 let mut success:bool=false;
301 loop {
302 let value_opt = self.tx_meta_pool.get(&hash_bytes)
303 .map(|v| v.clone());
304 match value_opt {
305 Some(val) => {
306 if val == Value::Null {
307 await_counter += 1;
308 } else {
309 success=true;
310 meta = val;
311 self.tx_meta_pool.remove(&hash_bytes);
312 break;
313 }
314 }
315 None => {
316 await_counter += 1;
317 }
318 }
319 if await_counter > 500 {
320 break;
321 }
322 tokio::task::yield_now().await;
323 sleep(Duration::from_millis(1)).await;
324 }
325 let elapsed = start.elapsed();
326 println!("Elapsed: {:?}", elapsed);
327 meta
328 }
329
330 pub async fn send_tx_multiple_urls(&self, urls : Vec<&str>, tx : &VersionedTransaction){
331 let http_client = &self.http_client;
332
333 let mut tasks = vec![];
334 for one_url in urls {
335 let http_client = http_client.clone();
336 let tx=tx.clone();
337 let one_url=one_url.to_string();
338 let one_task = tokio::spawn(async move {
339 send::send_url_versioned(&http_client, &one_url, &tx).await;
340 });
341 tasks.push(one_task);
342 }
343 futures::future::join_all(tasks).await;
344 }
345
346 pub async fn send_jito_bundle(&self, jito_bundle_url :&str, txs : &Vec<VersionedTransaction>){
347 jito::send_jito_bundle_versioned(&self.http_client, &jito_bundle_url, &txs).await;
348 }
349
350 pub async fn send_astralane_bundle(&self, astralane_bundle_url : &str, txs : &Vec<VersionedTransaction>){
351 astralane::send_astralane_bundle(&self.http_client, &astralane_bundle_url, &txs).await;
352 }
353
354 pub async fn send_json_with_retry(&self, sender_url : &str, tx : &VersionedTransaction, retry : u64)
355 ->Value
356 {
357 let mut meta : Value=Value::Null;
358 let hash = tx.signatures[0];
359 let mut hash_bytes = [0u8; 64];
360 hash_bytes.copy_from_slice(hash.as_ref());
361 self.tx_meta_pool.insert(hash_bytes, Value::Null);
362 let mut success = false;
363 for i in 0..retry+1 {
364 send::send_url_versioned(
365 &self.http_client,
366 &sender_url,
367 &tx
368 ).await;
369 let mut await_counter = 0;
370 let mut success:bool=false;
371 loop {
372 let value_opt = self.tx_meta_pool.get(&hash_bytes)
373 .map(|v| v.clone());
374 match value_opt {
375 Some(val) => {
376 if val == Value::Null {
377 await_counter += 1;
378 } else {
379 success=true;
380 meta = val;
381 self.tx_meta_pool.remove(&hash_bytes);
382 break;
383 }
384 }
385 None => {
386 await_counter += 1;
387 }
388 }
389 if await_counter > 500 {
390 break;
391 }
392 tokio::task::yield_now().await;
393 sleep(Duration::from_millis(1)).await;
394 }
395 if success {
396 break;
397 }
398 }
399 meta
400 }
401
402 pub async fn send_txs_as_jito_bundles(
403 &self,
404 wallet : &Keypair,
405 jito_tip_amount : u64,
406 jito_bundle_url : &str,
407 txs : &Vec<VersionedTransaction>,
408 ){
409 let recent_blockhash = self.recent_blockhash.read().await;
410 let mut rng = StdRng::from_entropy();
411 let random_number = rng.gen_range(0..jito::JITO_TIP_ACCOUNTS.len());
412 let mut first_chunk_len = txs.len()%4;
413 if first_chunk_len == 0 {
414 first_chunk_len =4;
415 }
416 let mut first_chunk = txs[..first_chunk_len].to_vec();
417 let first_jito_tip_tx = jito::build_jito_tip_tx_with_fixed_account(&wallet, random_number, jito_tip_amount, *recent_blockhash);
418 first_chunk.insert(0, first_jito_tip_tx);
419 self.send_jito_bundle(&jito_bundle_url, &first_chunk).await;
420 for (chunk_index, chunk_txs) in txs[first_chunk_len..].chunks(4).enumerate(){
421 let mut txs_in_chunk = chunk_txs.to_vec();
422 let jito_tip_tx = jito::build_jito_tip_tx_with_fixed_account(&wallet, random_number, jito_tip_amount - ((chunk_index as u64)+1)*2000, *recent_blockhash);
423 txs_in_chunk.insert(0, jito_tip_tx);
424 self.send_jito_bundle(&jito_bundle_url, &txs_in_chunk).await;
425 }
426 }
427}