carbon_rpc_block_crawler_datasource/
lib.rs

1pub use solana_client::rpc_config::RpcBlockConfig;
2use solana_sdk::hash::Hash;
3use std::str::FromStr;
4use {
5    async_trait::async_trait,
6    carbon_core::{
7        datasource::{Datasource, TransactionUpdate, Update, UpdateType},
8        error::CarbonResult,
9        metrics::MetricsCollection,
10        transformers::transaction_metadata_from_original_meta,
11    },
12    futures::StreamExt,
13    solana_client::{nonblocking::rpc_client::RpcClient, rpc_client::SerializableTransaction},
14    solana_sdk::commitment_config::CommitmentConfig,
15    solana_transaction_status::UiConfirmedBlock,
16    std::{
17        sync::Arc,
18        time::{Duration, Instant},
19    },
20    tokio::{
21        sync::mpsc::{self, Receiver, Sender},
22        task::JoinHandle,
23    },
24    tokio_util::sync::CancellationToken,
25};
26
27const CHANNEL_BUFFER_SIZE: usize = 1000;
28const MAX_CONCURRENT_REQUESTS: usize = 10;
29const BLOCK_INTERVAL: Duration = Duration::from_millis(100);
30
31/// RpcBlockCrawler is a datasource that crawls the Solana blockchain for blocks and sends them to the sender.
32/// It uses a channel to send blocks to the task processor.
33pub struct RpcBlockCrawler {
34    pub rpc_url: String,
35    pub start_slot: u64,
36    pub end_slot: Option<u64>,
37    pub block_interval: Duration,
38    pub block_config: RpcBlockConfig,
39    pub max_concurrent_requests: usize,
40    pub channel_buffer_size: usize,
41}
42
43impl RpcBlockCrawler {
44    pub fn new(
45        rpc_url: String,
46        start_slot: u64,
47        end_slot: Option<u64>,
48        block_interval: Option<Duration>,
49        block_config: RpcBlockConfig,
50        max_concurrent_requests: Option<usize>,
51        channel_buffer_size: Option<usize>,
52    ) -> Self {
53        Self {
54            rpc_url,
55            start_slot,
56            end_slot,
57            block_config,
58            block_interval: block_interval.unwrap_or(BLOCK_INTERVAL),
59            max_concurrent_requests: max_concurrent_requests.unwrap_or(MAX_CONCURRENT_REQUESTS),
60            channel_buffer_size: channel_buffer_size.unwrap_or(CHANNEL_BUFFER_SIZE),
61        }
62    }
63}
64
65#[async_trait]
66impl Datasource for RpcBlockCrawler {
67    async fn consume(
68        &self,
69        sender: &Sender<Update>,
70        cancellation_token: CancellationToken,
71        metrics: Arc<MetricsCollection>,
72    ) -> CarbonResult<()> {
73        let rpc_client = Arc::new(RpcClient::new_with_commitment(
74            self.rpc_url.clone(),
75            self.block_config
76                .commitment
77                .unwrap_or(CommitmentConfig::confirmed()),
78        ));
79        let sender = sender.clone();
80        let (block_sender, block_receiver) = mpsc::channel(self.channel_buffer_size);
81
82        let block_fetcher = block_fetcher(
83            rpc_client,
84            self.start_slot,
85            self.end_slot,
86            self.block_interval,
87            self.block_config,
88            block_sender,
89            self.max_concurrent_requests,
90            cancellation_token.clone(),
91            metrics.clone(),
92        );
93
94        let task_processor = task_processor(
95            block_receiver,
96            sender,
97            cancellation_token.clone(),
98            metrics.clone(),
99        );
100
101        tokio::spawn(async move {
102            tokio::select! {
103                _ = block_fetcher => {},
104                _ = task_processor => {},
105            }
106        });
107
108        Ok(())
109    }
110
111    fn update_types(&self) -> Vec<UpdateType> {
112        vec![UpdateType::Transaction]
113    }
114}
115
116#[allow(clippy::too_many_arguments)]
117fn block_fetcher(
118    rpc_client: Arc<RpcClient>,
119    start_slot: u64,
120    end_slot: Option<u64>,
121    block_interval: Duration,
122    block_config: RpcBlockConfig,
123    block_sender: Sender<(u64, UiConfirmedBlock)>,
124    max_concurrent_requests: usize,
125    cancellation_token: CancellationToken,
126    metrics: Arc<MetricsCollection>,
127) -> JoinHandle<()> {
128    let rpc_client_clone = rpc_client.clone();
129    tokio::spawn(async move {
130        let fetch_stream_task = async {
131            let fetch_stream = async_stream::stream! {
132                let mut current_slot = start_slot;
133                let mut latest_slot = current_slot;
134                loop {
135                    if let Some(end) = end_slot {
136                        if current_slot > end {
137                            break;
138                        }
139                    } else {
140                        if current_slot >= latest_slot {
141                            match rpc_client_clone.get_slot().await {
142                                Ok(slot) => {
143                                    latest_slot = slot;
144                                    if current_slot > latest_slot {
145                                        log::debug!(
146                                            "Waiting for new blocks... Current: {}, Latest: {}",
147                                            current_slot,
148                                            latest_slot
149                                        );
150                                        tokio::time::sleep(block_interval).await;
151                                        continue;
152                                    }
153                                }
154                                Err(e) => {
155                                    log::error!("Error fetching latest slot: {:?}", e);
156                                    tokio::time::sleep(block_interval).await;
157                                    continue;
158                                }
159                            }
160                        }
161                        if latest_slot - current_slot > 100 {
162                            log::debug!(
163                                "Current slot {} is behind latest slot {} by {}",
164                                current_slot,
165                                latest_slot,
166                                latest_slot - current_slot
167                            );
168                        }
169                    }
170                    yield current_slot;
171                    current_slot += 1;
172                }
173            };
174
175            fetch_stream
176                .map(|slot| {
177                    let rpc_client = Arc::clone(&rpc_client);
178                    let metrics = metrics.clone();
179
180                    async move {
181                        let start = Instant::now();
182                        match rpc_client.get_block_with_config(slot, block_config).await {
183                            Ok(block) => {
184                                let time_taken = start.elapsed().as_millis();
185                                metrics
186                                    .record_histogram(
187                                        "block_crawler_blocks_fetch_times_milliseconds",
188                                        time_taken as f64,
189                                    )
190                                    .await
191                                    .unwrap_or_else(|value| {
192                                        log::error!("Error recording metric: {}", value)
193                                    });
194
195                                metrics
196                                    .increment_counter("block_crawler_blocks_fetched", 1)
197                                    .await
198                                    .unwrap_or_else(|value| {
199                                        log::error!("Error recording metric: {}", value)
200                                    });
201
202                                Some((slot, block))
203                            }
204                            Err(e) => {
205                                // https://support.quicknode.com/hc/en-us/articles/16459608696721-Solana-RPC-Error-Code-Reference
206                                // solana skippable errors
207                                // -32004, // Block not available for slot x
208                                // -32007, // Slot {} was skipped, or missing due to ledger jump to recent snapshot
209                                // -32009, // Slot {} was skipped, or missing in long-term storage
210                                if e.to_string().contains("-32009")
211                                    || e.to_string().contains("-32004")
212                                    || e.to_string().contains("-32007")
213                                {
214                                    metrics
215                                        .increment_counter("block_crawler_blocks_skipped", 1)
216                                        .await
217                                        .unwrap_or_else(|value| {
218                                            log::error!("Error recording metric: {}", value)
219                                        });
220                                } else {
221                                    log::error!("Error fetching block at slot {}: {:?}", slot, e);
222                                }
223                                None
224                            }
225                        }
226                    }
227                })
228                .buffer_unordered(max_concurrent_requests)
229                .for_each(|result| async {
230                    if let Some((slot, block)) = result {
231                        if let Err(e) = block_sender.send((slot, block)).await {
232                            log::error!("Failed to send block: {:?}", e);
233                        }
234                    }
235                })
236                .await;
237        };
238
239        tokio::select! {
240            _ = cancellation_token.cancelled() => {
241                log::info!("Cancelling RPC Crawler block fetcher...");
242            }
243            _ = fetch_stream_task => {}
244        }
245    })
246}
247
248/// Process the block and send the transactions to the sender
249fn task_processor(
250    block_receiver: Receiver<(u64, UiConfirmedBlock)>,
251    sender: Sender<Update>,
252    cancellation_token: CancellationToken,
253    metrics: Arc<MetricsCollection>,
254) -> JoinHandle<()> {
255    let mut block_receiver = block_receiver;
256    let sender = sender.clone();
257
258    tokio::spawn(async move {
259        loop {
260            tokio::select! {
261                _ = cancellation_token.cancelled() => {
262                    log::info!("Cancelling RPC Crawler task processor...");
263                    break;
264                }
265                Some((slot, block)) = block_receiver.recv() => {
266                    metrics
267                        .increment_counter("block_crawler_blocks_received", 1)
268                        .await
269                        .unwrap_or_else(|value| {
270                            log::error!("Error recording metric: {}", value)
271                        });
272                    let block_start_time = Instant::now();
273                    let block_hash = Hash::from_str(&block.blockhash).ok();
274                    if let Some(transactions) = block.transactions {
275                        for encoded_transaction_with_status_meta in transactions {
276                            let start_time = std::time::Instant::now();
277
278                            let meta_original = if let Some(meta) = encoded_transaction_with_status_meta.clone().meta {
279                                meta
280                            } else {
281                                continue;
282                            };
283
284                            if meta_original.status.is_err() {
285                                continue;
286                            }
287
288                            let Some(decoded_transaction) = encoded_transaction_with_status_meta.transaction.decode() else {
289                                log::error!("Failed to decode transaction: {:?}", encoded_transaction_with_status_meta);
290                                continue;
291                            };
292
293                            let Ok(meta_needed) = transaction_metadata_from_original_meta(meta_original) else {
294                                log::error!("Error getting metadata from transaction original meta.");
295                                continue;
296                            };
297
298                            let update = Update::Transaction(Box::new(TransactionUpdate {
299                                signature: *decoded_transaction.get_signature(),
300                                transaction: decoded_transaction.clone(),
301                                meta: meta_needed,
302                                is_vote: false,
303                                slot,
304                                block_time: block.block_time,
305                                block_hash,
306                            }));
307
308                            metrics
309                                .record_histogram(
310                                    "block_crawler_transaction_process_time_nanoseconds",
311                                    start_time.elapsed().as_nanos() as f64
312                                )
313                                .await
314                                .unwrap_or_else(|value| log::error!("Error recording metric: {}", value));
315
316                            metrics.increment_counter("block_crawler_transactions_processed", 1)
317                                .await
318                                .unwrap_or_else(|value| log::error!("Error recording metric: {}", value));
319
320                            if let Err(err) = sender.try_send(update) {
321                                log::error!("Error sending transaction update: {:?}", err);
322                                break;
323                            }
324                        }
325                    }
326                    metrics
327                        .record_histogram(
328                            "block_crawler_block_process_time_nanoseconds",
329                            block_start_time.elapsed().as_nanos() as f64
330                        ).await
331                        .unwrap_or_else(|value| log::error!("Error recording metric: {}", value));
332
333                    metrics
334                        .increment_counter("block_crawler_blocks_processed", 1)
335                        .await
336                        .unwrap_or_else(|value| log::error!("Error recording metric: {}", value));
337                }
338            }
339        }
340    })
341}
342
343#[cfg(test)]
344mod tests {
345    use super::*;
346
347    #[tokio::test]
348    async fn test_block_fetcher_with_end_slot() {
349        let rpc_client = Arc::new(RpcClient::new_with_commitment(
350            "https://api.mainnet-beta.solana.com/".to_string(),
351            CommitmentConfig::confirmed(),
352        ));
353        let block_interval = Duration::from_millis(100);
354        let cancellation_token = CancellationToken::new();
355        let (block_sender, mut block_receiver) = mpsc::channel(1);
356
357        let block_config = RpcBlockConfig {
358            max_supported_transaction_version: Some(0),
359            ..Default::default()
360        };
361
362        // Start block_fetcher
363        let block_fetcher = block_fetcher(
364            rpc_client,
365            328837890,
366            Some(328837901),
367            block_interval,
368            block_config,
369            block_sender,
370            1,
371            cancellation_token.clone(),
372            Arc::new(MetricsCollection::new(vec![])),
373        );
374
375        // Create a task to receive blocks
376        let receiver_task = tokio::spawn(async move {
377            let mut received_blocks = Vec::new();
378
379            while let Some((slot, block)) = block_receiver.recv().await {
380                received_blocks.push((slot, block));
381
382                if received_blocks.len() == 2 {
383                    break;
384                }
385            }
386            received_blocks
387        });
388
389        tokio::spawn(async move {
390            block_fetcher.await.expect("Block fetcher should not panic");
391        });
392
393        // Wait for both block_fetcher and receiver task to complete
394        let exit_reason = tokio::select! {
395            result = receiver_task => {
396                let received_blocks = result.expect("Receiver task should not panic");
397                println!("Received {} blocks", received_blocks.len());
398
399                for (slot, block) in received_blocks {
400                    println!("Block at slot {}: {} transactions",
401                        slot,
402                        block.transactions.map(|t| t.len()).unwrap_or(0)
403                    );
404                }
405                "receiver_completed"
406            }
407            _ = cancellation_token.cancelled() => {
408                println!("Cancellation token triggered");
409                "cancellation_token"
410            }
411            _ = tokio::time::sleep(Duration::from_secs(30)) => {
412                println!("Timeout");
413                "timeout"
414            }
415        };
416
417        assert_eq!(
418            exit_reason, "receiver_completed",
419            "Test should exit because block fetcher completed"
420        );
421    }
422
423    #[tokio::test]
424    async fn test_block_fetcher_without_end_slot() {
425        let rpc_client = Arc::new(RpcClient::new_with_commitment(
426            "https://api.mainnet-beta.solana.com/".to_string(),
427            CommitmentConfig::confirmed(),
428        ));
429        let latest_slot = rpc_client
430            .get_slot()
431            .await
432            .expect("Failed to get last slot");
433
434        let block_interval = Duration::from_millis(100);
435        let cancellation_token = CancellationToken::new();
436        let (block_sender, mut block_receiver) = mpsc::channel(1);
437
438        let block_config = RpcBlockConfig {
439            max_supported_transaction_version: Some(0),
440            ..Default::default()
441        };
442
443        // Start block_fetcher
444        let block_fetcher = block_fetcher(
445            rpc_client,
446            latest_slot,
447            None,
448            block_interval,
449            block_config,
450            block_sender,
451            2,
452            cancellation_token.clone(),
453            Arc::new(MetricsCollection::new(vec![])),
454        );
455
456        // Create a task to receive blocks
457        let receiver_task = tokio::spawn(async move {
458            let mut received_blocks = Vec::new();
459
460            while let Some((slot, block)) = block_receiver.recv().await {
461                println!("Received block at slot {}", slot);
462                received_blocks.push((slot, block));
463
464                if received_blocks.len() == 2 {
465                    break;
466                }
467            }
468            received_blocks
469        });
470
471        tokio::spawn(async move {
472            block_fetcher.await.expect("Block fetcher should not panic");
473        });
474
475        // Wait for both block_fetcher and receiver task to complete
476        let exit_reason = tokio::select! {
477            result = receiver_task => {
478                let received_blocks = result.expect("Receiver task should not panic");
479                println!("Received {} blocks", received_blocks.len());
480
481                for (slot, block) in received_blocks {
482                    println!("Block at slot {}: {} transactions",
483                        slot,
484                        block.transactions.map(|t| t.len()).unwrap_or(0)
485                    );
486                }
487                "receiver_completed"
488            }
489            _ = cancellation_token.cancelled() => {
490                println!("Cancellation token triggered");
491                "cancellation_token"
492            }
493            _ = tokio::time::sleep(Duration::from_secs(30)) => {
494                println!("Timeout");
495                "timeout"
496            }
497        };
498
499        assert_eq!(
500            exit_reason, "receiver_completed",
501            "Test should exit because block fetcher completed"
502        );
503    }
504}