carbon_rpc_block_crawler_datasource/
lib.rs

1use carbon_core::datasource::DatasourceId;
2pub use solana_client::rpc_config::RpcBlockConfig;
3use solana_hash::Hash;
4use std::str::FromStr;
5use {
6    async_trait::async_trait,
7    carbon_core::{
8        datasource::{Datasource, TransactionUpdate, Update, UpdateType},
9        error::CarbonResult,
10        metrics::MetricsCollection,
11        transformers::transaction_metadata_from_original_meta,
12    },
13    futures::StreamExt,
14    solana_client::{nonblocking::rpc_client::RpcClient, rpc_client::SerializableTransaction},
15    solana_commitment_config::CommitmentConfig,
16    solana_transaction_status::UiConfirmedBlock,
17    std::{
18        sync::Arc,
19        time::{Duration, Instant},
20    },
21    tokio::{
22        sync::mpsc::{self, Receiver, Sender},
23        task::JoinHandle,
24    },
25    tokio_util::sync::CancellationToken,
26};
27
28const CHANNEL_BUFFER_SIZE: usize = 1000;
29const MAX_CONCURRENT_REQUESTS: usize = 10;
30const BLOCK_INTERVAL: Duration = Duration::from_millis(100);
31
32/// RpcBlockCrawler is a datasource that crawls the Solana blockchain for blocks and sends them to the sender.
33/// It uses a channel to send blocks to the task processor.
34pub struct RpcBlockCrawler {
35    pub rpc_url: String,
36    pub start_slot: u64,
37    pub end_slot: Option<u64>,
38    pub block_interval: Duration,
39    pub block_config: RpcBlockConfig,
40    pub max_concurrent_requests: usize,
41    pub channel_buffer_size: usize,
42}
43
44impl RpcBlockCrawler {
45    pub fn new(
46        rpc_url: String,
47        start_slot: u64,
48        end_slot: Option<u64>,
49        block_interval: Option<Duration>,
50        block_config: RpcBlockConfig,
51        max_concurrent_requests: Option<usize>,
52        channel_buffer_size: Option<usize>,
53    ) -> Self {
54        Self {
55            rpc_url,
56            start_slot,
57            end_slot,
58            block_config,
59            block_interval: block_interval.unwrap_or(BLOCK_INTERVAL),
60            max_concurrent_requests: max_concurrent_requests.unwrap_or(MAX_CONCURRENT_REQUESTS),
61            channel_buffer_size: channel_buffer_size.unwrap_or(CHANNEL_BUFFER_SIZE),
62        }
63    }
64}
65
66#[async_trait]
67impl Datasource for RpcBlockCrawler {
68    async fn consume(
69        &self,
70        id: DatasourceId,
71        sender: Sender<(Update, DatasourceId)>,
72        cancellation_token: CancellationToken,
73        metrics: Arc<MetricsCollection>,
74    ) -> CarbonResult<()> {
75        let rpc_client = Arc::new(RpcClient::new_with_commitment(
76            self.rpc_url.clone(),
77            self.block_config
78                .commitment
79                .unwrap_or(CommitmentConfig::confirmed()),
80        ));
81        let (block_sender, block_receiver) = mpsc::channel(self.channel_buffer_size);
82
83        let block_fetcher = block_fetcher(
84            rpc_client,
85            self.start_slot,
86            self.end_slot,
87            self.block_interval,
88            self.block_config,
89            block_sender,
90            self.max_concurrent_requests,
91            cancellation_token.clone(),
92            metrics.clone(),
93        );
94
95        let task_processor = task_processor(
96            block_receiver,
97            sender,
98            id,
99            cancellation_token.clone(),
100            metrics.clone(),
101        );
102
103        tokio::spawn(async move {
104            tokio::select! {
105                _ = block_fetcher => {},
106                _ = task_processor => {},
107            }
108        });
109
110        Ok(())
111    }
112
113    fn update_types(&self) -> Vec<UpdateType> {
114        vec![UpdateType::Transaction]
115    }
116}
117
118#[allow(clippy::too_many_arguments)]
119fn block_fetcher(
120    rpc_client: Arc<RpcClient>,
121    start_slot: u64,
122    end_slot: Option<u64>,
123    block_interval: Duration,
124    block_config: RpcBlockConfig,
125    block_sender: Sender<(u64, UiConfirmedBlock)>,
126    max_concurrent_requests: usize,
127    cancellation_token: CancellationToken,
128    metrics: Arc<MetricsCollection>,
129) -> JoinHandle<()> {
130    let rpc_client_clone = rpc_client.clone();
131    tokio::spawn(async move {
132        let fetch_stream_task = async {
133            let fetch_stream = async_stream::stream! {
134                let mut current_slot = start_slot;
135                let mut latest_slot = current_slot;
136                loop {
137                    if let Some(end) = end_slot {
138                        if current_slot > end {
139                            break;
140                        }
141                    } else {
142                        if current_slot >= latest_slot {
143                            match rpc_client_clone.get_slot().await {
144                                Ok(slot) => {
145                                    latest_slot = slot;
146                                    if current_slot > latest_slot {
147                                        log::debug!(
148                                            "Waiting for new blocks... Current: {}, Latest: {}",
149                                            current_slot,
150                                            latest_slot
151                                        );
152                                        tokio::time::sleep(block_interval).await;
153                                        continue;
154                                    }
155                                }
156                                Err(e) => {
157                                    log::error!("Error fetching latest slot: {:?}", e);
158                                    tokio::time::sleep(block_interval).await;
159                                    continue;
160                                }
161                            }
162                        }
163                        if latest_slot - current_slot > 100 {
164                            log::debug!(
165                                "Current slot {} is behind latest slot {} by {}",
166                                current_slot,
167                                latest_slot,
168                                latest_slot - current_slot
169                            );
170                        }
171                    }
172                    yield current_slot;
173                    current_slot += 1;
174                }
175            };
176
177            fetch_stream
178                .map(|slot| {
179                    let rpc_client = Arc::clone(&rpc_client);
180                    let metrics = metrics.clone();
181
182                    async move {
183                        let start = Instant::now();
184                        match rpc_client.get_block_with_config(slot, block_config).await {
185                            Ok(block) => {
186                                let time_taken = start.elapsed().as_millis();
187                                metrics
188                                    .record_histogram(
189                                        "block_crawler_blocks_fetch_times_milliseconds",
190                                        time_taken as f64,
191                                    )
192                                    .await
193                                    .unwrap_or_else(|value| {
194                                        log::error!("Error recording metric: {}", value)
195                                    });
196
197                                metrics
198                                    .increment_counter("block_crawler_blocks_fetched", 1)
199                                    .await
200                                    .unwrap_or_else(|value| {
201                                        log::error!("Error recording metric: {}", value)
202                                    });
203
204                                Some((slot, block))
205                            }
206                            Err(e) => {
207                                // https://support.quicknode.com/hc/en-us/articles/16459608696721-Solana-RPC-Error-Code-Reference
208                                // solana skippable errors
209                                // -32004, // Block not available for slot x
210                                // -32007, // Slot {} was skipped, or missing due to ledger jump to recent snapshot
211                                // -32009, // Slot {} was skipped, or missing in long-term storage
212                                if e.to_string().contains("-32009")
213                                    || e.to_string().contains("-32004")
214                                    || e.to_string().contains("-32007")
215                                {
216                                    metrics
217                                        .increment_counter("block_crawler_blocks_skipped", 1)
218                                        .await
219                                        .unwrap_or_else(|value| {
220                                            log::error!("Error recording metric: {}", value)
221                                        });
222                                } else {
223                                    log::error!("Error fetching block at slot {}: {:?}", slot, e);
224                                }
225                                None
226                            }
227                        }
228                    }
229                })
230                .buffer_unordered(max_concurrent_requests)
231                .for_each(|result| async {
232                    if let Some((slot, block)) = result {
233                        if let Err(e) = block_sender.send((slot, block)).await {
234                            log::error!("Failed to send block: {:?}", e);
235                        }
236                    }
237                })
238                .await;
239        };
240
241        tokio::select! {
242            _ = cancellation_token.cancelled() => {
243                log::info!("Cancelling RPC Crawler block fetcher...");
244            }
245            _ = fetch_stream_task => {}
246        }
247    })
248}
249
250/// Process the block and send the transactions to the sender
251fn task_processor(
252    block_receiver: Receiver<(u64, UiConfirmedBlock)>,
253    sender: Sender<(Update, DatasourceId)>,
254    id: DatasourceId,
255    cancellation_token: CancellationToken,
256    metrics: Arc<MetricsCollection>,
257) -> JoinHandle<()> {
258    let mut block_receiver = block_receiver;
259    let sender = sender.clone();
260    let id_for_loop = id.clone();
261
262    tokio::spawn(async move {
263        loop {
264            tokio::select! {
265            _ = cancellation_token.cancelled() => {
266                log::info!("Cancelling RPC Crawler task processor...");
267                break;
268            }
269            maybe_block = block_receiver.recv() => {
270                match maybe_block {
271                    Some((slot, block)) => {
272
273                        metrics
274                            .increment_counter("block_crawler_blocks_received", 1)
275                            .await
276                            .unwrap_or_else(|value| {
277                                log::error!("Error recording metric: {}", value)
278                            });
279                        let block_start_time = Instant::now();
280                        let block_hash = Hash::from_str(&block.blockhash).ok();
281                        if let Some(transactions) = block.transactions {
282                            for encoded_transaction_with_status_meta in transactions {
283                                let start_time = std::time::Instant::now();
284
285                                let meta_original = if let Some(meta) = encoded_transaction_with_status_meta.clone().meta {
286                                    meta
287                                } else {
288                                    continue;
289                                };
290
291                                if meta_original.status.is_err() {
292                                    continue;
293                                }
294
295                                let Some(decoded_transaction) = encoded_transaction_with_status_meta.transaction.decode() else {
296                                    log::error!("Failed to decode transaction: {:?}", encoded_transaction_with_status_meta);
297                                    continue;
298                                };
299
300                                let Ok(meta_needed) = transaction_metadata_from_original_meta(meta_original) else {
301                                    log::error!("Error getting metadata from transaction original meta.");
302                                    continue;
303                                };
304
305                                let update = Update::Transaction(Box::new(TransactionUpdate {
306                                    signature: *decoded_transaction.get_signature(),
307                                    transaction: decoded_transaction.clone(),
308                                    meta: meta_needed,
309                                    is_vote: false,
310                                    slot,
311                                    block_time: block.block_time,
312                                    block_hash,
313                                }));
314
315                                metrics
316                                    .record_histogram(
317                                        "block_crawler_transaction_process_time_nanoseconds",
318                                        start_time.elapsed().as_nanos() as f64
319                                    )
320                                    .await
321                                    .unwrap_or_else(|value| log::error!("Error recording metric: {}", value));
322
323                                metrics.increment_counter("block_crawler_transactions_processed", 1)
324                                    .await
325                                    .unwrap_or_else(|value| log::error!("Error recording metric: {}", value));
326
327                                if let Err(err) = sender.try_send((update, id_for_loop.clone())) {
328                                    log::error!("Error sending transaction update: {:?}", err);
329                                    break;
330                                }
331                            }
332                        }
333                        metrics
334                            .record_histogram(
335                                "block_crawler_block_process_time_nanoseconds",
336                                block_start_time.elapsed().as_nanos() as f64
337                            ).await
338                            .unwrap_or_else(|value| log::error!("Error recording metric: {}", value));
339
340                        metrics
341                            .increment_counter("block_crawler_blocks_processed", 1)
342                            .await
343                            .unwrap_or_else(|value| log::error!("Error recording metric: {}", value));
344                    }
345                    None => {
346                        break;
347                    }
348                }
349            }}
350        }
351    })
352}
353
354#[cfg(test)]
355mod tests {
356    use super::*;
357
358    #[tokio::test]
359    async fn test_block_fetcher_with_end_slot() {
360        let rpc_client = Arc::new(RpcClient::new_with_commitment(
361            "https://api.mainnet-beta.solana.com/".to_string(),
362            CommitmentConfig::confirmed(),
363        ));
364        let block_interval = Duration::from_millis(100);
365        let cancellation_token = CancellationToken::new();
366        let (block_sender, mut block_receiver) = mpsc::channel(1);
367
368        let block_config = RpcBlockConfig {
369            max_supported_transaction_version: Some(0),
370            ..Default::default()
371        };
372
373        // Start block_fetcher
374        let block_fetcher = block_fetcher(
375            rpc_client,
376            328837890,
377            Some(328837901),
378            block_interval,
379            block_config,
380            block_sender,
381            1,
382            cancellation_token.clone(),
383            Arc::new(MetricsCollection::new(vec![])),
384        );
385
386        // Create a task to receive blocks
387        let receiver_task = tokio::spawn(async move {
388            let mut received_blocks = Vec::new();
389
390            while let Some((slot, block)) = block_receiver.recv().await {
391                received_blocks.push((slot, block));
392
393                if received_blocks.len() == 2 {
394                    break;
395                }
396            }
397            received_blocks
398        });
399
400        tokio::spawn(async move {
401            block_fetcher.await.expect("Block fetcher should not panic");
402        });
403
404        // Wait for both block_fetcher and receiver task to complete
405        let exit_reason = tokio::select! {
406            result = receiver_task => {
407                let received_blocks = result.expect("Receiver task should not panic");
408                println!("Received {} blocks", received_blocks.len());
409
410                for (slot, block) in received_blocks {
411                    println!("Block at slot {}: {} transactions",
412                        slot,
413                        block.transactions.map(|t| t.len()).unwrap_or(0)
414                    );
415                }
416                "receiver_completed"
417            }
418            _ = cancellation_token.cancelled() => {
419                println!("Cancellation token triggered");
420                "cancellation_token"
421            }
422            _ = tokio::time::sleep(Duration::from_secs(30)) => {
423                println!("Timeout");
424                "timeout"
425            }
426        };
427
428        assert_eq!(
429            exit_reason, "receiver_completed",
430            "Test should exit because block fetcher completed"
431        );
432    }
433
434    #[tokio::test]
435    async fn test_block_fetcher_without_end_slot() {
436        let rpc_client = Arc::new(RpcClient::new_with_commitment(
437            "https://api.mainnet-beta.solana.com/".to_string(),
438            CommitmentConfig::confirmed(),
439        ));
440        let latest_slot = rpc_client
441            .get_slot()
442            .await
443            .expect("Failed to get last slot");
444
445        let block_interval = Duration::from_millis(100);
446        let cancellation_token = CancellationToken::new();
447        let (block_sender, mut block_receiver) = mpsc::channel(1);
448
449        let block_config = RpcBlockConfig {
450            max_supported_transaction_version: Some(0),
451            ..Default::default()
452        };
453
454        // Start block_fetcher
455        let block_fetcher = block_fetcher(
456            rpc_client,
457            latest_slot,
458            None,
459            block_interval,
460            block_config,
461            block_sender,
462            2,
463            cancellation_token.clone(),
464            Arc::new(MetricsCollection::new(vec![])),
465        );
466
467        // Create a task to receive blocks
468        let receiver_task = tokio::spawn(async move {
469            let mut received_blocks = Vec::new();
470
471            while let Some((slot, block)) = block_receiver.recv().await {
472                println!("Received block at slot {}", slot);
473                received_blocks.push((slot, block));
474
475                if received_blocks.len() == 2 {
476                    break;
477                }
478            }
479            received_blocks
480        });
481
482        tokio::spawn(async move {
483            block_fetcher.await.expect("Block fetcher should not panic");
484        });
485
486        // Wait for both block_fetcher and receiver task to complete
487        let exit_reason = tokio::select! {
488            result = receiver_task => {
489                let received_blocks = result.expect("Receiver task should not panic");
490                println!("Received {} blocks", received_blocks.len());
491
492                for (slot, block) in received_blocks {
493                    println!("Block at slot {}: {} transactions",
494                        slot,
495                        block.transactions.map(|t| t.len()).unwrap_or(0)
496                    );
497                }
498                "receiver_completed"
499            }
500            _ = cancellation_token.cancelled() => {
501                println!("Cancellation token triggered");
502                "cancellation_token"
503            }
504            _ = tokio::time::sleep(Duration::from_secs(30)) => {
505                println!("Timeout");
506                "timeout"
507            }
508        };
509
510        assert_eq!(
511            exit_reason, "receiver_completed",
512            "Test should exit because block fetcher completed"
513        );
514    }
515}