noierrdev_antoine_tx_engine/
lib.rs

1use std::{
2    str::FromStr,
3    sync::{
4        Arc,
5    },
6    time::Instant
7};
8use reqwest::{
9    Client as HttpClient,
10};
11use serde_json::{
12    json,
13    Value
14};
15use solana_client::{
16    rpc_client::RpcClient,
17};
18use solana_sdk::{
19    signature::{Keypair,Signature,Signer},
20    transaction::{ VersionedTransaction},
21    instruction::{Instruction},
22    hash::Hash,
23    pubkey::Pubkey
24};
25use tokio::{
26    time::{sleep, Duration},
27    sync::{
28        RwLock,
29    }
30};
31use dashmap::{DashMap,DashSet};
32use rand::{
33    SeedableRng,
34    Rng,
35    rngs::StdRng
36};
37pub mod tx;
38
39pub mod jito;
40pub mod zero_slot;
41pub mod astralane;
42pub mod helius;
43pub mod temperal;
44pub mod node1;
45pub mod blockrazor;
46pub mod flashblock;
47pub mod stellium;
48
49pub mod tip;
50pub mod general;
51pub mod bundle;
52pub mod nonce;
53pub mod send;
54
55pub struct TxEngine {
56    pub rpc_client : Arc<RpcClient>,
57    pub http_client : Arc<HttpClient>,
58    pub recent_blockhash : Arc<RwLock<Hash>>,
59    pub tx_meta_pool : Arc<DashMap<[u8;64], Value>>
60}
61
62
63impl TxEngine {
64    pub fn new(
65        rpc_client : &Arc<RpcClient>,
66        http_client : &Arc<HttpClient>,
67        tx_meta_pool : Arc<DashMap<[u8;64], Value>>
68    ) 
69    ->Self
70    {
71        let recent_blockhash : Hash = rpc_client.get_latest_blockhash().unwrap();
72        let recent_blockhash_arc=Arc::new(RwLock::new(recent_blockhash));
73        tokio::spawn({
74            let rpc_client = rpc_client.clone();
75            let recent_blockhash_arc = recent_blockhash_arc.clone();
76            async move {
77                loop {
78                    match rpc_client.get_latest_blockhash() {
79                        Ok(new_hash) => {
80                            *recent_blockhash_arc.write().await = new_hash;
81                        }
82                        Err(err) => {
83                            println!("Failed to refresh blockhash: {:?}", err);
84                        }
85                    }
86                    sleep(Duration::from_secs(1)).await;
87                }
88            }
89        });
90        Self { 
91            rpc_client : rpc_client.clone(), 
92            http_client : http_client.clone(),
93            recent_blockhash : recent_blockhash_arc,
94            tx_meta_pool : tx_meta_pool
95        }
96    }
97
98    pub async fn get_blockhash(&self)
99    ->Hash
100    {
101        let recent_blockhash = *self.recent_blockhash.read().await;
102        recent_blockhash
103    }
104
105    pub async fn send_sequential(&self, sender_url : &str, txs : &Vec<VersionedTransaction>, retry : u64){
106        for one_tx in txs {
107            let hash = one_tx.signatures[0];
108            let mut hash_bytes = [0u8; 64];
109            hash_bytes.copy_from_slice(hash.as_ref());
110            let mut tx_meta : Value=Value::Null;
111            self.tx_meta_pool.insert(hash_bytes, Value::Null);
112            let mut success = false;
113            for i in 0..retry {
114                send::send_url_versioned(&self.http_client, &sender_url, &one_tx).await;
115                let mut await_counter = 0;
116                loop {
117                    let value_opt = self.tx_meta_pool.get(&hash_bytes)
118                        .map(|v| v.clone());
119                    match value_opt {
120                        Some(val) => {
121                            if val == Value::Null {
122                                await_counter += 1;
123                            } else {
124                                success=true;
125                                tx_meta = val;
126                                self.tx_meta_pool.remove(&hash_bytes);
127                                break;
128                            }
129                        }
130                        None => {
131                            await_counter += 1;
132                        }
133                    }
134                    if await_counter > 150 {
135                        break;
136                    }
137                    tokio::task::yield_now().await;
138                    sleep(Duration::from_millis(5)).await;
139                }
140                if success == true{
141                    break;
142                }
143            }
144            if !success {
145                println!("Failed to land sequently!");
146                break;
147            }
148        }
149    }
150
151    pub fn send_rpc_versioned(&self, tx : VersionedTransaction)
152    ->String
153    {
154        let tx_hash = send::send_rpc_versioned(&self.rpc_client, &tx);
155        tx_hash
156    }
157
158    pub async fn send_url_versioned(&self, sender_url : &str, tx : VersionedTransaction)
159    ->String
160    {
161        let tx_hash = send::send_url_versioned(&self.http_client, &sender_url, &tx).await;
162        tx_hash
163    }
164
165    pub async fn send_url_text(&self, sender_url : &str, tx : VersionedTransaction){
166        send::send_url_text(&self.http_client, &sender_url, &tx).await;
167    }
168
169    pub fn get_nonce(&self, nonce_account : &Pubkey)
170    ->Hash
171    {
172        let hash  :Hash = nonce::get_nonce(&self.rpc_client, &nonce_account);
173        hash
174    }
175
176    pub fn get_nonce_value(&self, nonce_account : &Pubkey)
177    ->Hash
178    {
179        let nonce_hash = nonce::get_nonce(&self.rpc_client, &nonce_account);
180        nonce_hash
181    }
182
183    pub async fn send_with_durable_nonce(
184        &self, 
185        wallets : Vec<Arc<Keypair>>, 
186        payer : &Pubkey, 
187        mut ixs : Vec<Instruction>, 
188        nonce_account : &Pubkey,
189        jito_sender_url : &str,
190        astralane_sender_url : &str,
191        zero_slot_sender_url : &str,
192        tip_amount : u64,
193        nonce_hash : Hash
194    )
195    {
196        ixs.insert(0,nonce::build_advance_nonce_instruction(&payer, &nonce_account));
197
198        let ixs = Arc::new(ixs);
199        let wallets = Arc::new(wallets);
200
201        let payer = *payer;
202        let http = self.http_client.clone();
203        let mut tasks = Vec::with_capacity(4);
204
205        macro_rules! spawn_sender {
206            ($builder:path, $sender:path, $url:expr) => {{
207                let http = http.clone();
208                let wallets = wallets.clone();
209                let ixs = ixs.clone();
210                if $url == "" {
211                    return;
212                }
213                let url = $url.to_string();
214                let nonce_hash = nonce_hash.clone();
215                tokio::spawn(async move {
216                    let tx = $builder(
217                        (*wallets).clone(),
218                        &payer,
219                        (*ixs).clone(),
220                        tip_amount,
221                        nonce_hash,
222                    );
223                    $sender(&http, &url, &tx).await;
224                })
225            }};
226        }
227        if jito_sender_url != "" {
228            tasks.push(spawn_sender!(jito::build_tx_with_jito_tip, send::send_url_versioned, jito_sender_url));
229        }
230        if astralane_sender_url != ""{
231            tasks.push(spawn_sender!(astralane::build_tx_with_astralane_tip, send::send_binary_versioned, astralane_sender_url));
232        }
233        if zero_slot_sender_url != "" {
234            tasks.push(spawn_sender!(zero_slot::build_tx_with_zero_slot, send::send_binary_versioned, zero_slot_sender_url));
235        }
236        futures::future::join_all(tasks).await;
237    }
238
239    pub async fn send_json_for_tx_meta(&self, sender_url : &str, tx : &VersionedTransaction)
240    ->Value
241    {
242        let mut meta : Value=Value::Null;
243        let hash = tx.signatures[0];
244        let mut hash_bytes = [0u8; 64];
245        hash_bytes.copy_from_slice(hash.as_ref());
246        let start = Instant::now();
247        self.tx_meta_pool.insert(hash_bytes, Value::Null);
248        send::send_url_versioned(
249            &self.http_client,
250            &sender_url,
251            &tx
252        ).await;
253        let mut await_counter = 0;
254        let mut success:bool=false;
255        loop {
256            let value_opt = self.tx_meta_pool.get(&hash_bytes)
257                .map(|v| v.clone());
258            match value_opt {
259                Some(val) => {
260                    if val == Value::Null {
261                        await_counter += 1;
262                    } else {
263                        success=true;
264                        meta = val;
265                        self.tx_meta_pool.remove(&hash_bytes);
266                        break;
267                    }
268                }
269                None => {
270                    await_counter += 1;
271                }
272            }
273            if await_counter > 500 {
274                break;
275            }
276            tokio::task::yield_now().await;
277            sleep(Duration::from_millis(1)).await;
278        }
279        let elapsed = start.elapsed();
280        println!("Elapsed: {:?}", elapsed);
281        meta
282    }
283
284
285    pub async fn send_binary_for_tx_meta(&self, sender_url : &str, tx : &VersionedTransaction)
286    ->Value
287    {
288        let mut meta : Value=Value::Null;
289        let hash = tx.signatures[0];
290        let mut hash_bytes = [0u8; 64];
291        hash_bytes.copy_from_slice(hash.as_ref());
292        let start = Instant::now();
293        self.tx_meta_pool.insert(hash_bytes, Value::Null);
294        send::send_binary_versioned(
295            &self.http_client,
296            &sender_url,
297            &tx
298        ).await;
299        let mut await_counter = 0;
300        let mut success:bool=false;
301        loop {
302            let value_opt = self.tx_meta_pool.get(&hash_bytes)
303                .map(|v| v.clone());
304            match value_opt {
305                Some(val) => {
306                    if val == Value::Null {
307                        await_counter += 1;
308                    } else {
309                        success=true;
310                        meta = val;
311                        self.tx_meta_pool.remove(&hash_bytes);
312                        break;
313                    }
314                }
315                None => {
316                    await_counter += 1;
317                }
318            }
319            if await_counter > 500 {
320                break;
321            }
322            tokio::task::yield_now().await;
323            sleep(Duration::from_millis(1)).await;
324        }
325        let elapsed = start.elapsed();
326        println!("Elapsed: {:?}", elapsed);
327        meta
328    }
329
330    pub async fn send_tx_multiple_urls(&self, urls : Vec<&str>, tx : &VersionedTransaction){
331        let http_client = &self.http_client;
332
333        let mut tasks = vec![];
334        for one_url in urls {
335            let http_client = http_client.clone();
336            let tx=tx.clone();
337            let one_url=one_url.to_string();
338            let one_task = tokio::spawn(async move {
339                send::send_url_versioned(&http_client, &one_url, &tx).await;
340            });
341            tasks.push(one_task);
342        }
343        futures::future::join_all(tasks).await;
344    }
345
346    pub async fn send_jito_bundle(&self, jito_bundle_url :&str, txs : &Vec<VersionedTransaction>){
347        jito::send_jito_bundle_versioned(&self.http_client, &jito_bundle_url, &txs).await;
348    }
349
350    pub async fn send_astralane_bundle(&self, astralane_bundle_url : &str, txs : &Vec<VersionedTransaction>){
351        astralane::send_astralane_bundle(&self.http_client, &astralane_bundle_url, &txs).await;
352    }
353
354    pub async fn send_json_with_retry(&self, sender_url : &str, tx : &VersionedTransaction, retry : u64)
355    ->Value
356    {
357        let mut meta : Value=Value::Null;
358        let hash = tx.signatures[0];
359        let mut hash_bytes = [0u8; 64];
360        hash_bytes.copy_from_slice(hash.as_ref());
361        self.tx_meta_pool.insert(hash_bytes, Value::Null);
362        let mut success = false;
363        for i in 0..retry+1 {
364            send::send_url_versioned(
365                &self.http_client,
366                &sender_url,
367                &tx
368            ).await;
369            let mut await_counter = 0;
370            let mut success:bool=false;
371            loop {
372                let value_opt = self.tx_meta_pool.get(&hash_bytes)
373                    .map(|v| v.clone());
374                match value_opt {
375                    Some(val) => {
376                        if val == Value::Null {
377                            await_counter += 1;
378                        } else {
379                            success=true;
380                            meta = val;
381                            self.tx_meta_pool.remove(&hash_bytes);
382                            break;
383                        }
384                    }
385                    None => {
386                        await_counter += 1;
387                    }
388                }
389                if await_counter > 500 {
390                    break;
391                }
392                tokio::task::yield_now().await;
393                sleep(Duration::from_millis(1)).await;
394            }
395            if success {
396                break;
397            }
398        }
399        meta
400    }
401
402    pub async fn send_txs_as_jito_bundles(
403        &self,
404        wallet : &Keypair,
405        jito_tip_amount : u64,
406        jito_bundle_url : &str,
407        txs : &Vec<VersionedTransaction>,
408    ){
409        let recent_blockhash = self.recent_blockhash.read().await;
410        let mut rng = StdRng::from_entropy();
411        let random_number = rng.gen_range(0..jito::JITO_TIP_ACCOUNTS.len());
412        let mut first_chunk_len = txs.len()%4;
413        if first_chunk_len == 0 {
414            first_chunk_len =4;
415        }
416        let mut first_chunk = txs[..first_chunk_len].to_vec();
417        let first_jito_tip_tx = jito::build_jito_tip_tx_with_fixed_account(&wallet, random_number, jito_tip_amount, *recent_blockhash);
418        first_chunk.insert(0, first_jito_tip_tx);
419        self.send_jito_bundle(&jito_bundle_url, &first_chunk).await;
420        for (chunk_index, chunk_txs) in txs[first_chunk_len..].chunks(4).enumerate(){
421            let mut txs_in_chunk = chunk_txs.to_vec();
422            let jito_tip_tx = jito::build_jito_tip_tx_with_fixed_account(&wallet, random_number, jito_tip_amount - ((chunk_index as u64)+1)*2000, *recent_blockhash);
423            txs_in_chunk.insert(0, jito_tip_tx);
424            self.send_jito_bundle(&jito_bundle_url, &txs_in_chunk).await;
425        }
426    }
427}