carbon_rpc_transaction_crawler_datasource/
lib.rs

1use {
2    async_trait::async_trait,
3    carbon_core::{
4        datasource::{Datasource, DatasourceId, TransactionUpdate, Update, UpdateType},
5        error::CarbonResult,
6        metrics::MetricsCollection,
7        transformers::transaction_metadata_from_original_meta,
8    },
9    futures::StreamExt,
10    solana_client::{
11        nonblocking::rpc_client::RpcClient, rpc_client::GetConfirmedSignaturesForAddress2Config,
12        rpc_config::RpcTransactionConfig,
13    },
14    solana_commitment_config::CommitmentConfig,
15    solana_pubkey::Pubkey,
16    solana_signature::Signature,
17    solana_transaction_status::{
18        EncodedConfirmedTransactionWithStatusMeta, UiLoadedAddresses, UiTransactionEncoding,
19    },
20    std::{collections::HashSet, str::FromStr, sync::Arc, time::Duration},
21    tokio::{
22        sync::mpsc::{self, Receiver, Sender},
23        task::JoinHandle,
24        time::Instant,
25    },
26    tokio_util::sync::CancellationToken,
27};
28
29#[derive(Debug, Clone)]
30pub struct Filters {
31    pub accounts: Option<Vec<Pubkey>>,
32    pub before_signature: Option<Signature>,
33    pub until_signature: Option<Signature>,
34}
35
36impl Filters {
37    pub const fn new(
38        accounts: Option<Vec<Pubkey>>,
39        before_signature: Option<Signature>,
40        until_signature: Option<Signature>,
41    ) -> Self {
42        Filters {
43            accounts,
44            before_signature,
45            until_signature,
46        }
47    }
48}
49
50#[derive(Debug, Clone)]
51pub struct RetryConfig {
52    pub max_retries: u32,
53    pub initial_backoff_ms: u64,
54    pub max_backoff_ms: u64,
55    pub backoff_multiplier: f64,
56}
57
58impl RetryConfig {
59    pub const fn new(
60        max_retries: u32,
61        initial_backoff_ms: u64,
62        max_backoff_ms: u64,
63        backoff_multiplier: f64,
64    ) -> Self {
65        RetryConfig {
66            max_retries,
67            initial_backoff_ms,
68            max_backoff_ms,
69            backoff_multiplier,
70        }
71    }
72
73    pub const fn default() -> Self {
74        RetryConfig {
75            max_retries: 3,
76            initial_backoff_ms: 1000,
77            max_backoff_ms: 10000,
78            backoff_multiplier: 2.0,
79        }
80    }
81
82    pub const fn no_retry() -> Self {
83        RetryConfig {
84            max_retries: 0,
85            initial_backoff_ms: 0,
86            max_backoff_ms: 0,
87            backoff_multiplier: 0.0,
88        }
89    }
90}
91
92#[derive(Debug, Clone)]
93pub struct ConnectionConfig {
94    pub batch_limit: usize,
95    pub polling_interval: Duration,
96    pub max_concurrent_requests: usize,
97    pub max_signature_channel_size: Option<usize>,
98    pub max_transaction_channel_size: Option<usize>,
99    pub retry_config: RetryConfig,
100    pub blocking_send: bool,
101}
102
103impl ConnectionConfig {
104    pub const fn new(
105        batch_limit: usize,
106        polling_interval: Duration,
107        max_concurrent_requests: usize,
108        retry_config: RetryConfig,
109        max_signature_channel_size: Option<usize>, // None will default to 1000
110        max_transaction_channel_size: Option<usize>, // None will default to 1000
111        blocking_send: bool,
112    ) -> Self {
113        ConnectionConfig {
114            batch_limit,
115            polling_interval,
116            max_concurrent_requests,
117            retry_config,
118            max_signature_channel_size,
119            max_transaction_channel_size,
120            blocking_send,
121        }
122    }
123
124    pub const fn default() -> Self {
125        ConnectionConfig {
126            batch_limit: 100,
127            polling_interval: Duration::from_secs(5),
128            max_concurrent_requests: 5,
129            retry_config: RetryConfig::default(),
130            max_signature_channel_size: None,
131            max_transaction_channel_size: None,
132            blocking_send: false,
133        }
134    }
135}
136
137pub struct RpcTransactionCrawler {
138    pub rpc_url: String,
139    pub account: Pubkey,
140    pub connection_config: ConnectionConfig,
141    pub filters: Filters,
142    pub commitment: Option<CommitmentConfig>,
143}
144
145impl RpcTransactionCrawler {
146    pub const fn new(
147        rpc_url: String,
148        account: Pubkey,
149        connection_config: ConnectionConfig,
150        filters: Filters,
151        commitment: Option<CommitmentConfig>,
152    ) -> Self {
153        RpcTransactionCrawler {
154            rpc_url,
155            account,
156            connection_config,
157            filters,
158            commitment,
159        }
160    }
161}
162
163#[async_trait]
164impl Datasource for RpcTransactionCrawler {
165    async fn consume(
166        &self,
167        id: DatasourceId,
168        sender: Sender<(Update, DatasourceId)>,
169        cancellation_token: CancellationToken,
170        metrics: Arc<MetricsCollection>,
171    ) -> CarbonResult<()> {
172        let rpc_client = Arc::new(RpcClient::new_with_commitment(
173            self.rpc_url.clone(),
174            self.commitment.unwrap_or(CommitmentConfig::confirmed()),
175        ));
176        let account = self.account;
177        let filters = self.filters.clone();
178        let sender = sender.clone();
179        let commitment = self.commitment;
180
181        let (signature_sender, signature_receiver) = mpsc::channel(
182            self.connection_config
183                .max_signature_channel_size
184                .unwrap_or(1000),
185        );
186        let (transaction_sender, transaction_receiver) = mpsc::channel(
187            self.connection_config
188                .max_transaction_channel_size
189                .unwrap_or(1000),
190        );
191
192        let signature_fetcher = signature_fetcher(
193            rpc_client.clone(),
194            account,
195            self.connection_config.clone(),
196            signature_sender,
197            filters.clone(),
198            commitment,
199            cancellation_token.clone(),
200            metrics.clone(),
201        );
202
203        let transaction_fetcher = transaction_fetcher(
204            rpc_client,
205            signature_receiver,
206            transaction_sender,
207            self.connection_config.clone(),
208            commitment,
209            cancellation_token.clone(),
210            metrics.clone(),
211        );
212
213        let task_processor = task_processor(
214            transaction_receiver,
215            sender,
216            id,
217            filters,
218            cancellation_token.clone(),
219            metrics.clone(),
220            self.connection_config.clone(),
221        );
222
223        tokio::spawn(async move {
224            tokio::select! {
225                _ = signature_fetcher => {},
226                _ = transaction_fetcher => {},
227                _ = task_processor => {},
228            }
229        });
230
231        Ok(())
232    }
233
234    fn update_types(&self) -> Vec<UpdateType> {
235        vec![UpdateType::Transaction]
236    }
237}
238
239#[allow(clippy::too_many_arguments)]
240fn signature_fetcher(
241    rpc_client: Arc<RpcClient>,
242    account: Pubkey,
243    connection_config: ConnectionConfig,
244    signature_sender: Sender<Signature>,
245    filters: Filters,
246    commitment: Option<CommitmentConfig>,
247    cancellation_token: CancellationToken,
248    metrics: Arc<MetricsCollection>,
249) -> JoinHandle<()> {
250    let rpc_client = Arc::clone(&rpc_client);
251    let filters = filters.clone();
252    let signature_sender = signature_sender.clone();
253
254    tokio::spawn(async move {
255        let mut last_fetched_signature = filters.before_signature;
256        let mut until_signature = filters.until_signature;
257        let mut most_recent_signature: Option<Signature> = None;
258        loop {
259            tokio::select! {
260                _ = cancellation_token.cancelled() => {
261                    log::info!("Cancelling RPC Crawler signature fetcher...");
262                    break;
263                }
264                _ = async {
265                    let mut retries = 0;
266                    let mut backoff = connection_config.retry_config.initial_backoff_ms;
267
268                    loop {
269                        match rpc_client.get_signatures_for_address_with_config(
270                            &account,
271                            GetConfirmedSignaturesForAddress2Config {
272                                before: last_fetched_signature,
273                                until: until_signature,
274                                limit: Some(connection_config.batch_limit),
275                                commitment: Some(commitment.unwrap_or(CommitmentConfig::confirmed())),
276                            }
277                        ).await {
278                            Ok(signatures) => {
279                                let start = Instant::now();
280
281                                if signatures.is_empty() {
282                                    // no more signatures to fetch, so we've gone through
283                                    // all transactions that have been sent up until we started polling for signatures
284                                    // update `last_fetched_signature` to None so we can detect newly sent transactions
285                                    last_fetched_signature = None;
286                                    if most_recent_signature.is_some() {
287                                            // set the `until` signature to the most recent signature
288                                            // this will prevent reindexing old transactions
289                                            until_signature = most_recent_signature;
290                                            // set the most recent signature to None
291                                            // this will prevent reindexing old transactions
292                                            // after we run out of new
293                                            most_recent_signature = None;
294                                    }
295                                     tokio::time::sleep(connection_config.polling_interval).await;
296                                    break;
297                                }
298
299                                // if we have not seen a signature, then update the most recent signature
300                                // on subsequent loop's, this will prevent us from reindexing already seen transactions
301                                if most_recent_signature.is_none() {
302                                    match Signature::from_str(&signatures[0].signature) {
303                                        Ok(sig) => most_recent_signature = Some(sig),
304                                        Err(e) => {
305                                            log::error!("Invalid signature: {:?}", e);
306                                        }
307                                    }
308                                }
309
310                                for sig_info in signatures.iter() {
311                                    let signature = match Signature::from_str(&sig_info.signature) {
312                                        Ok(sig) => sig,
313                                        Err(e) => {
314                                            log::error!("Invalid signature: {:?}", e);
315                                            continue;
316                                        }
317                                    };
318
319                                    if let Err(e) = signature_sender.send(signature).await {
320                                        log::error!("Failed to send signature: {:?}", e);
321                                        break;
322                                    }
323                                }
324
325                                last_fetched_signature = signatures
326                                    .last()
327                                    .and_then(|s| Signature::from_str(&s.signature).ok());
328
329                                let time_taken = start.elapsed().as_millis();
330
331                                metrics.record_histogram("transaction_crawler_signatures_fetch_times_milliseconds", time_taken as f64)
332                                    .await.unwrap_or_else(|value| log::error!("Error recording metric: {}", value));
333
334                                metrics.increment_counter("transaction_crawler_signatures_fetched", signatures.len() as u64)
335                                    .await.unwrap_or_else(|value| log::error!("Error recording metric: {}", value));
336
337                                break;
338                            }
339                            Err(e) => {
340                                if retries >= connection_config.retry_config.max_retries {
341                                    log::error!("Failed to fetch signatures after {} retries: {:?}", retries, e);
342                                    break;
343                                }
344
345                                log::warn!(
346                                    "Failed to fetch signatures (attempt {}/{}), retrying in {}ms: {:?}",
347                                    retries + 1,
348                                    connection_config.retry_config.max_retries,
349                                    backoff,
350                                    e
351                                );
352
353                                tokio::time::sleep(Duration::from_millis(backoff)).await;
354                                retries += 1;
355                                backoff = (backoff as f64 * connection_config.retry_config.backoff_multiplier) as u64;
356                                backoff = backoff.min(connection_config.retry_config.max_backoff_ms);
357                            }
358                        }
359                    }
360                } => {}
361            }
362        }
363    })
364}
365
366fn transaction_fetcher(
367    rpc_client: Arc<RpcClient>,
368    signature_receiver: Receiver<Signature>,
369    transaction_sender: Sender<(Signature, EncodedConfirmedTransactionWithStatusMeta)>,
370    connection_config: ConnectionConfig,
371    commitment: Option<CommitmentConfig>,
372    cancellation_token: CancellationToken,
373    metrics: Arc<MetricsCollection>,
374) -> JoinHandle<()> {
375    let rpc_client = Arc::clone(&rpc_client);
376    let transaction_sender = transaction_sender.clone();
377    let mut signature_receiver = signature_receiver;
378
379    tokio::spawn(async move {
380        let fetch_stream_task = async {
381            let fetch_stream = async_stream::stream! {
382                while let Some(signature) = signature_receiver.recv().await {
383                    yield signature;
384                }
385            };
386
387            fetch_stream
388                .map(|signature| {
389                    let metrics = metrics.clone();
390                    let connection_config = connection_config.clone();
391                    let rpc_client = Arc::clone(&rpc_client);
392                    async move {
393                        let start = Instant::now();
394                        let mut retries = 0;
395                        let mut backoff = connection_config.retry_config.initial_backoff_ms;
396
397                        loop {
398                            match rpc_client.get_transaction_with_config(
399                                &signature,
400                                RpcTransactionConfig {
401                                    encoding: Some(UiTransactionEncoding::Base64),
402                                    commitment: Some(
403                                        commitment.unwrap_or(CommitmentConfig::confirmed()),
404                                    ),
405                                    max_supported_transaction_version: Some(0),
406                                },
407                            ).await {
408                                Ok(tx) => {
409                                    let time_taken = start.elapsed().as_millis();
410
411                                    metrics
412                                        .record_histogram(
413                                            "transaction_crawler_transaction_fetch_times_milliseconds",
414                                            time_taken as f64,
415                                        )
416                                        .await
417                                        .expect("Error recording metric");
418
419                                    return Some((signature, tx));
420                                }
421                                Err(e) => {
422                                    if retries >= connection_config.retry_config.max_retries {
423                                        log::error!("Failed to fetch transaction {} after {} retries: {:?}", signature, retries, e);
424                                        return None;
425                                    }
426
427                                    log::warn!(
428                                        "Failed to fetch transaction {} (attempt {}/{}), retrying in {}ms: {:?}",
429                                        signature,
430                                        retries + 1,
431                                        connection_config.retry_config.max_retries,
432                                        backoff,
433                                        e
434                                    );
435
436                                    tokio::time::sleep(Duration::from_millis(backoff)).await;
437                                    retries += 1;
438                                    backoff = (backoff as f64 * connection_config.retry_config.backoff_multiplier) as u64;
439                                    backoff = backoff.min(connection_config.retry_config.max_backoff_ms);
440                                }
441                            }
442                        }
443                    }
444                })
445                .buffer_unordered(connection_config.max_concurrent_requests)
446                .for_each(|result| async {
447                    metrics
448                        .increment_counter("transaction_crawler_transactions_fetched", 1)
449                        .await
450                        .unwrap_or_else(|value| log::error!("Error recording metric: {}", value));
451
452                    if let Some((signature, fetched_transaction)) = result {
453                        if let Err(e) = transaction_sender
454                            .send((signature, fetched_transaction))
455                            .await
456                        {
457                            log::error!("Failed to send transaction: {:?}", e);
458                        }
459                    }
460                })
461                .await;
462        };
463
464        tokio::select! {
465            _ = cancellation_token.cancelled() => {
466                log::info!("Cancelling RPC Crawler transaction fetcher...");
467            }
468            _ = fetch_stream_task => {}
469        }
470    })
471}
472
473fn task_processor(
474    transaction_receiver: Receiver<(Signature, EncodedConfirmedTransactionWithStatusMeta)>,
475    sender: Sender<(Update, DatasourceId)>,
476    id: DatasourceId,
477    filters: Filters,
478    cancellation_token: CancellationToken,
479    metrics: Arc<MetricsCollection>,
480    connection_config: ConnectionConfig,
481) -> JoinHandle<()> {
482    let mut transaction_receiver = transaction_receiver;
483    let sender = sender.clone();
484    let id_for_loop = id.clone();
485
486    tokio::spawn(async move {
487        loop {
488            tokio::select! {
489                _ = cancellation_token.cancelled() => {
490                    log::info!("Cancelling RPC Crawler task processor...");
491                    break;
492                }
493                Some((signature, fetched_transaction)) = transaction_receiver.recv() => {
494                    let start = Instant::now();
495                    let transaction = fetched_transaction.transaction;
496
497                    let meta_original = if let Some(meta) = transaction.clone().meta {
498                        meta
499                    } else {
500                        log::warn!("Meta is malformed for transaction: {:?}", signature);
501                        continue;
502                    };
503
504                    if meta_original.status.is_err() {
505                        continue;
506                    }
507
508                    let Some(decoded_transaction) = transaction.transaction.decode() else {
509                        log::error!("Failed to decode transaction: {:?}", transaction);
510                        continue;
511                    };
512
513                    if let Some(accounts) = &filters.accounts {
514                        let account_set: HashSet<Pubkey> = accounts.iter().cloned().collect();
515
516                        let static_accounts = decoded_transaction.message.static_account_keys();
517
518                        let loaded_addresses =
519                            meta_original
520                                .loaded_addresses
521                                .clone()
522                                .unwrap_or_else(|| UiLoadedAddresses {
523                                    writable: vec![],
524                                    readonly: vec![],
525                                });
526
527                        let all_accounts: HashSet<Pubkey> = static_accounts
528                            .iter()
529                            .cloned()
530                            .chain(
531                                loaded_addresses
532                                    .writable
533                                    .iter()
534                                    .filter_map(|s| Pubkey::from_str(s).ok()),
535                            )
536                            .chain(
537                                loaded_addresses
538                                    .readonly
539                                    .iter()
540                                    .filter_map(|s| Pubkey::from_str(s).ok()),
541                            )
542                            .collect();
543
544                        if !all_accounts
545                            .iter()
546                            .any(|account| account_set.contains(account))
547                        {
548                            continue;
549                        }
550                    }
551
552                    let Ok(meta_needed) = transaction_metadata_from_original_meta(meta_original) else {
553                        log::error!("Error getting metadata from transaction original meta.");
554                        continue;
555                    };
556
557                    let update = Update::Transaction(Box::new(TransactionUpdate {
558                        signature,
559                        transaction: decoded_transaction.clone(),
560                        meta: meta_needed,
561                        is_vote: false,
562                        slot: fetched_transaction.slot,
563                        block_time: fetched_transaction.block_time,
564                        block_hash: None,
565                    }));
566
567
568                    metrics
569                            .record_histogram(
570                                "transaction_crawler_transaction_process_time_milliseconds",
571                                start.elapsed().as_millis() as f64
572                            )
573                            .await
574                            .unwrap_or_else(|value| log::error!("Error recording metric: {}", value));
575
576
577                    if connection_config.blocking_send {
578                        if let Err(e) = sender.send((update.clone(), id_for_loop.clone())).await {
579                            log::warn!("Failed to send update: {:?}", e);
580                            continue;
581                        }
582                    }
583                    if !connection_config.blocking_send {
584                        if let Err(e) = sender.try_send((update.clone(), id_for_loop.clone())) {
585                            log::warn!("Failed to send update: {:?}", e);
586                            continue;
587                        }
588                    }
589                }
590            }
591        }
592    })
593}