1pub use solana_client::rpc_config::RpcBlockConfig;
2use solana_sdk::hash::Hash;
3use std::str::FromStr;
4use {
5 async_trait::async_trait,
6 carbon_core::{
7 datasource::{Datasource, TransactionUpdate, Update, UpdateType},
8 error::CarbonResult,
9 metrics::MetricsCollection,
10 transformers::transaction_metadata_from_original_meta,
11 },
12 futures::StreamExt,
13 solana_client::{nonblocking::rpc_client::RpcClient, rpc_client::SerializableTransaction},
14 solana_sdk::commitment_config::CommitmentConfig,
15 solana_transaction_status::UiConfirmedBlock,
16 std::{
17 sync::Arc,
18 time::{Duration, Instant},
19 },
20 tokio::{
21 sync::mpsc::{self, Receiver, Sender},
22 task::JoinHandle,
23 },
24 tokio_util::sync::CancellationToken,
25};
26
27const CHANNEL_BUFFER_SIZE: usize = 1000;
28const MAX_CONCURRENT_REQUESTS: usize = 10;
29const BLOCK_INTERVAL: Duration = Duration::from_millis(100);
30
31pub struct RpcBlockCrawler {
34 pub rpc_url: String,
35 pub start_slot: u64,
36 pub end_slot: Option<u64>,
37 pub block_interval: Duration,
38 pub block_config: RpcBlockConfig,
39 pub max_concurrent_requests: usize,
40 pub channel_buffer_size: usize,
41}
42
43impl RpcBlockCrawler {
44 pub fn new(
45 rpc_url: String,
46 start_slot: u64,
47 end_slot: Option<u64>,
48 block_interval: Option<Duration>,
49 block_config: RpcBlockConfig,
50 max_concurrent_requests: Option<usize>,
51 channel_buffer_size: Option<usize>,
52 ) -> Self {
53 Self {
54 rpc_url,
55 start_slot,
56 end_slot,
57 block_config,
58 block_interval: block_interval.unwrap_or(BLOCK_INTERVAL),
59 max_concurrent_requests: max_concurrent_requests.unwrap_or(MAX_CONCURRENT_REQUESTS),
60 channel_buffer_size: channel_buffer_size.unwrap_or(CHANNEL_BUFFER_SIZE),
61 }
62 }
63}
64
65#[async_trait]
66impl Datasource for RpcBlockCrawler {
67 async fn consume(
68 &self,
69 sender: &Sender<Update>,
70 cancellation_token: CancellationToken,
71 metrics: Arc<MetricsCollection>,
72 ) -> CarbonResult<()> {
73 let rpc_client = Arc::new(RpcClient::new_with_commitment(
74 self.rpc_url.clone(),
75 self.block_config
76 .commitment
77 .unwrap_or(CommitmentConfig::confirmed()),
78 ));
79 let sender = sender.clone();
80 let (block_sender, block_receiver) = mpsc::channel(self.channel_buffer_size);
81
82 let block_fetcher = block_fetcher(
83 rpc_client,
84 self.start_slot,
85 self.end_slot,
86 self.block_interval,
87 self.block_config,
88 block_sender,
89 self.max_concurrent_requests,
90 cancellation_token.clone(),
91 metrics.clone(),
92 );
93
94 let task_processor = task_processor(
95 block_receiver,
96 sender,
97 cancellation_token.clone(),
98 metrics.clone(),
99 );
100
101 tokio::spawn(async move {
102 tokio::select! {
103 _ = block_fetcher => {},
104 _ = task_processor => {},
105 }
106 });
107
108 Ok(())
109 }
110
111 fn update_types(&self) -> Vec<UpdateType> {
112 vec![UpdateType::Transaction]
113 }
114}
115
116#[allow(clippy::too_many_arguments)]
117fn block_fetcher(
118 rpc_client: Arc<RpcClient>,
119 start_slot: u64,
120 end_slot: Option<u64>,
121 block_interval: Duration,
122 block_config: RpcBlockConfig,
123 block_sender: Sender<(u64, UiConfirmedBlock)>,
124 max_concurrent_requests: usize,
125 cancellation_token: CancellationToken,
126 metrics: Arc<MetricsCollection>,
127) -> JoinHandle<()> {
128 let rpc_client_clone = rpc_client.clone();
129 tokio::spawn(async move {
130 let fetch_stream_task = async {
131 let fetch_stream = async_stream::stream! {
132 let mut current_slot = start_slot;
133 let mut latest_slot = current_slot;
134 loop {
135 if let Some(end) = end_slot {
136 if current_slot > end {
137 break;
138 }
139 } else {
140 if current_slot >= latest_slot {
141 match rpc_client_clone.get_slot().await {
142 Ok(slot) => {
143 latest_slot = slot;
144 if current_slot > latest_slot {
145 log::debug!(
146 "Waiting for new blocks... Current: {}, Latest: {}",
147 current_slot,
148 latest_slot
149 );
150 tokio::time::sleep(block_interval).await;
151 continue;
152 }
153 }
154 Err(e) => {
155 log::error!("Error fetching latest slot: {:?}", e);
156 tokio::time::sleep(block_interval).await;
157 continue;
158 }
159 }
160 }
161 if latest_slot - current_slot > 100 {
162 log::debug!(
163 "Current slot {} is behind latest slot {} by {}",
164 current_slot,
165 latest_slot,
166 latest_slot - current_slot
167 );
168 }
169 }
170 yield current_slot;
171 current_slot += 1;
172 }
173 };
174
175 fetch_stream
176 .map(|slot| {
177 let rpc_client = Arc::clone(&rpc_client);
178 let metrics = metrics.clone();
179
180 async move {
181 let start = Instant::now();
182 match rpc_client.get_block_with_config(slot, block_config).await {
183 Ok(block) => {
184 let time_taken = start.elapsed().as_millis();
185 metrics
186 .record_histogram(
187 "block_crawler_blocks_fetch_times_milliseconds",
188 time_taken as f64,
189 )
190 .await
191 .unwrap_or_else(|value| {
192 log::error!("Error recording metric: {}", value)
193 });
194
195 metrics
196 .increment_counter("block_crawler_blocks_fetched", 1)
197 .await
198 .unwrap_or_else(|value| {
199 log::error!("Error recording metric: {}", value)
200 });
201
202 Some((slot, block))
203 }
204 Err(e) => {
205 if e.to_string().contains("-32009")
211 || e.to_string().contains("-32004")
212 || e.to_string().contains("-32007")
213 {
214 metrics
215 .increment_counter("block_crawler_blocks_skipped", 1)
216 .await
217 .unwrap_or_else(|value| {
218 log::error!("Error recording metric: {}", value)
219 });
220 } else {
221 log::error!("Error fetching block at slot {}: {:?}", slot, e);
222 }
223 None
224 }
225 }
226 }
227 })
228 .buffer_unordered(max_concurrent_requests)
229 .for_each(|result| async {
230 if let Some((slot, block)) = result {
231 if let Err(e) = block_sender.send((slot, block)).await {
232 log::error!("Failed to send block: {:?}", e);
233 }
234 }
235 })
236 .await;
237 };
238
239 tokio::select! {
240 _ = cancellation_token.cancelled() => {
241 log::info!("Cancelling RPC Crawler block fetcher...");
242 }
243 _ = fetch_stream_task => {}
244 }
245 })
246}
247
248fn task_processor(
250 block_receiver: Receiver<(u64, UiConfirmedBlock)>,
251 sender: Sender<Update>,
252 cancellation_token: CancellationToken,
253 metrics: Arc<MetricsCollection>,
254) -> JoinHandle<()> {
255 let mut block_receiver = block_receiver;
256 let sender = sender.clone();
257
258 tokio::spawn(async move {
259 loop {
260 tokio::select! {
261 _ = cancellation_token.cancelled() => {
262 log::info!("Cancelling RPC Crawler task processor...");
263 break;
264 }
265 Some((slot, block)) = block_receiver.recv() => {
266 metrics
267 .increment_counter("block_crawler_blocks_received", 1)
268 .await
269 .unwrap_or_else(|value| {
270 log::error!("Error recording metric: {}", value)
271 });
272 let block_start_time = Instant::now();
273 let block_hash = Hash::from_str(&block.blockhash).ok();
274 if let Some(transactions) = block.transactions {
275 for encoded_transaction_with_status_meta in transactions {
276 let start_time = std::time::Instant::now();
277
278 let meta_original = if let Some(meta) = encoded_transaction_with_status_meta.clone().meta {
279 meta
280 } else {
281 continue;
282 };
283
284 if meta_original.status.is_err() {
285 continue;
286 }
287
288 let Some(decoded_transaction) = encoded_transaction_with_status_meta.transaction.decode() else {
289 log::error!("Failed to decode transaction: {:?}", encoded_transaction_with_status_meta);
290 continue;
291 };
292
293 let Ok(meta_needed) = transaction_metadata_from_original_meta(meta_original) else {
294 log::error!("Error getting metadata from transaction original meta.");
295 continue;
296 };
297
298 let update = Update::Transaction(Box::new(TransactionUpdate {
299 signature: *decoded_transaction.get_signature(),
300 transaction: decoded_transaction.clone(),
301 meta: meta_needed,
302 is_vote: false,
303 slot,
304 block_time: block.block_time,
305 block_hash,
306 }));
307
308 metrics
309 .record_histogram(
310 "block_crawler_transaction_process_time_nanoseconds",
311 start_time.elapsed().as_nanos() as f64
312 )
313 .await
314 .unwrap_or_else(|value| log::error!("Error recording metric: {}", value));
315
316 metrics.increment_counter("block_crawler_transactions_processed", 1)
317 .await
318 .unwrap_or_else(|value| log::error!("Error recording metric: {}", value));
319
320 if let Err(err) = sender.try_send(update) {
321 log::error!("Error sending transaction update: {:?}", err);
322 break;
323 }
324 }
325 }
326 metrics
327 .record_histogram(
328 "block_crawler_block_process_time_nanoseconds",
329 block_start_time.elapsed().as_nanos() as f64
330 ).await
331 .unwrap_or_else(|value| log::error!("Error recording metric: {}", value));
332
333 metrics
334 .increment_counter("block_crawler_blocks_processed", 1)
335 .await
336 .unwrap_or_else(|value| log::error!("Error recording metric: {}", value));
337 }
338 }
339 }
340 })
341}
342
343#[cfg(test)]
344mod tests {
345 use super::*;
346
347 #[tokio::test]
348 async fn test_block_fetcher_with_end_slot() {
349 let rpc_client = Arc::new(RpcClient::new_with_commitment(
350 "https://api.mainnet-beta.solana.com/".to_string(),
351 CommitmentConfig::confirmed(),
352 ));
353 let block_interval = Duration::from_millis(100);
354 let cancellation_token = CancellationToken::new();
355 let (block_sender, mut block_receiver) = mpsc::channel(1);
356
357 let block_config = RpcBlockConfig {
358 max_supported_transaction_version: Some(0),
359 ..Default::default()
360 };
361
362 let block_fetcher = block_fetcher(
364 rpc_client,
365 328837890,
366 Some(328837901),
367 block_interval,
368 block_config,
369 block_sender,
370 1,
371 cancellation_token.clone(),
372 Arc::new(MetricsCollection::new(vec![])),
373 );
374
375 let receiver_task = tokio::spawn(async move {
377 let mut received_blocks = Vec::new();
378
379 while let Some((slot, block)) = block_receiver.recv().await {
380 received_blocks.push((slot, block));
381
382 if received_blocks.len() == 2 {
383 break;
384 }
385 }
386 received_blocks
387 });
388
389 tokio::spawn(async move {
390 block_fetcher.await.expect("Block fetcher should not panic");
391 });
392
393 let exit_reason = tokio::select! {
395 result = receiver_task => {
396 let received_blocks = result.expect("Receiver task should not panic");
397 println!("Received {} blocks", received_blocks.len());
398
399 for (slot, block) in received_blocks {
400 println!("Block at slot {}: {} transactions",
401 slot,
402 block.transactions.map(|t| t.len()).unwrap_or(0)
403 );
404 }
405 "receiver_completed"
406 }
407 _ = cancellation_token.cancelled() => {
408 println!("Cancellation token triggered");
409 "cancellation_token"
410 }
411 _ = tokio::time::sleep(Duration::from_secs(30)) => {
412 println!("Timeout");
413 "timeout"
414 }
415 };
416
417 assert_eq!(
418 exit_reason, "receiver_completed",
419 "Test should exit because block fetcher completed"
420 );
421 }
422
423 #[tokio::test]
424 async fn test_block_fetcher_without_end_slot() {
425 let rpc_client = Arc::new(RpcClient::new_with_commitment(
426 "https://api.mainnet-beta.solana.com/".to_string(),
427 CommitmentConfig::confirmed(),
428 ));
429 let latest_slot = rpc_client
430 .get_slot()
431 .await
432 .expect("Failed to get last slot");
433
434 let block_interval = Duration::from_millis(100);
435 let cancellation_token = CancellationToken::new();
436 let (block_sender, mut block_receiver) = mpsc::channel(1);
437
438 let block_config = RpcBlockConfig {
439 max_supported_transaction_version: Some(0),
440 ..Default::default()
441 };
442
443 let block_fetcher = block_fetcher(
445 rpc_client,
446 latest_slot,
447 None,
448 block_interval,
449 block_config,
450 block_sender,
451 2,
452 cancellation_token.clone(),
453 Arc::new(MetricsCollection::new(vec![])),
454 );
455
456 let receiver_task = tokio::spawn(async move {
458 let mut received_blocks = Vec::new();
459
460 while let Some((slot, block)) = block_receiver.recv().await {
461 println!("Received block at slot {}", slot);
462 received_blocks.push((slot, block));
463
464 if received_blocks.len() == 2 {
465 break;
466 }
467 }
468 received_blocks
469 });
470
471 tokio::spawn(async move {
472 block_fetcher.await.expect("Block fetcher should not panic");
473 });
474
475 let exit_reason = tokio::select! {
477 result = receiver_task => {
478 let received_blocks = result.expect("Receiver task should not panic");
479 println!("Received {} blocks", received_blocks.len());
480
481 for (slot, block) in received_blocks {
482 println!("Block at slot {}: {} transactions",
483 slot,
484 block.transactions.map(|t| t.len()).unwrap_or(0)
485 );
486 }
487 "receiver_completed"
488 }
489 _ = cancellation_token.cancelled() => {
490 println!("Cancellation token triggered");
491 "cancellation_token"
492 }
493 _ = tokio::time::sleep(Duration::from_secs(30)) => {
494 println!("Timeout");
495 "timeout"
496 }
497 };
498
499 assert_eq!(
500 exit_reason, "receiver_completed",
501 "Test should exit because block fetcher completed"
502 );
503 }
504}