1use carbon_core::datasource::DatasourceId;
2pub use solana_client::rpc_config::RpcBlockConfig;
3use solana_hash::Hash;
4use std::str::FromStr;
5use {
6 async_trait::async_trait,
7 carbon_core::{
8 datasource::{Datasource, TransactionUpdate, Update, UpdateType},
9 error::CarbonResult,
10 metrics::MetricsCollection,
11 transformers::transaction_metadata_from_original_meta,
12 },
13 futures::StreamExt,
14 solana_client::{nonblocking::rpc_client::RpcClient, rpc_client::SerializableTransaction},
15 solana_commitment_config::CommitmentConfig,
16 solana_transaction_status::UiConfirmedBlock,
17 std::{
18 sync::Arc,
19 time::{Duration, Instant},
20 },
21 tokio::{
22 sync::mpsc::{self, Receiver, Sender},
23 task::JoinHandle,
24 },
25 tokio_util::sync::CancellationToken,
26};
27
28const CHANNEL_BUFFER_SIZE: usize = 1000;
29const MAX_CONCURRENT_REQUESTS: usize = 10;
30const BLOCK_INTERVAL: Duration = Duration::from_millis(100);
31
32pub struct RpcBlockCrawler {
35 pub rpc_url: String,
36 pub start_slot: u64,
37 pub end_slot: Option<u64>,
38 pub block_interval: Duration,
39 pub block_config: RpcBlockConfig,
40 pub max_concurrent_requests: usize,
41 pub channel_buffer_size: usize,
42}
43
44impl RpcBlockCrawler {
45 pub fn new(
46 rpc_url: String,
47 start_slot: u64,
48 end_slot: Option<u64>,
49 block_interval: Option<Duration>,
50 block_config: RpcBlockConfig,
51 max_concurrent_requests: Option<usize>,
52 channel_buffer_size: Option<usize>,
53 ) -> Self {
54 Self {
55 rpc_url,
56 start_slot,
57 end_slot,
58 block_config,
59 block_interval: block_interval.unwrap_or(BLOCK_INTERVAL),
60 max_concurrent_requests: max_concurrent_requests.unwrap_or(MAX_CONCURRENT_REQUESTS),
61 channel_buffer_size: channel_buffer_size.unwrap_or(CHANNEL_BUFFER_SIZE),
62 }
63 }
64}
65
66#[async_trait]
67impl Datasource for RpcBlockCrawler {
68 async fn consume(
69 &self,
70 id: DatasourceId,
71 sender: Sender<(Update, DatasourceId)>,
72 cancellation_token: CancellationToken,
73 metrics: Arc<MetricsCollection>,
74 ) -> CarbonResult<()> {
75 let rpc_client = Arc::new(RpcClient::new_with_commitment(
76 self.rpc_url.clone(),
77 self.block_config
78 .commitment
79 .unwrap_or(CommitmentConfig::confirmed()),
80 ));
81 let (block_sender, block_receiver) = mpsc::channel(self.channel_buffer_size);
82
83 let block_fetcher = block_fetcher(
84 rpc_client,
85 self.start_slot,
86 self.end_slot,
87 self.block_interval,
88 self.block_config,
89 block_sender,
90 self.max_concurrent_requests,
91 cancellation_token.clone(),
92 metrics.clone(),
93 );
94
95 let task_processor = task_processor(
96 block_receiver,
97 sender,
98 id,
99 cancellation_token.clone(),
100 metrics.clone(),
101 );
102
103 tokio::spawn(async move {
104 tokio::select! {
105 _ = block_fetcher => {},
106 _ = task_processor => {},
107 }
108 });
109
110 Ok(())
111 }
112
113 fn update_types(&self) -> Vec<UpdateType> {
114 vec![UpdateType::Transaction]
115 }
116}
117
118#[allow(clippy::too_many_arguments)]
119fn block_fetcher(
120 rpc_client: Arc<RpcClient>,
121 start_slot: u64,
122 end_slot: Option<u64>,
123 block_interval: Duration,
124 block_config: RpcBlockConfig,
125 block_sender: Sender<(u64, UiConfirmedBlock)>,
126 max_concurrent_requests: usize,
127 cancellation_token: CancellationToken,
128 metrics: Arc<MetricsCollection>,
129) -> JoinHandle<()> {
130 let rpc_client_clone = rpc_client.clone();
131 tokio::spawn(async move {
132 let fetch_stream_task = async {
133 let fetch_stream = async_stream::stream! {
134 let mut current_slot = start_slot;
135 let mut latest_slot = current_slot;
136 loop {
137 if let Some(end) = end_slot {
138 if current_slot > end {
139 break;
140 }
141 } else {
142 if current_slot >= latest_slot {
143 match rpc_client_clone.get_slot().await {
144 Ok(slot) => {
145 latest_slot = slot;
146 if current_slot > latest_slot {
147 log::debug!(
148 "Waiting for new blocks... Current: {}, Latest: {}",
149 current_slot,
150 latest_slot
151 );
152 tokio::time::sleep(block_interval).await;
153 continue;
154 }
155 }
156 Err(e) => {
157 log::error!("Error fetching latest slot: {:?}", e);
158 tokio::time::sleep(block_interval).await;
159 continue;
160 }
161 }
162 }
163 if latest_slot - current_slot > 100 {
164 log::debug!(
165 "Current slot {} is behind latest slot {} by {}",
166 current_slot,
167 latest_slot,
168 latest_slot - current_slot
169 );
170 }
171 }
172 yield current_slot;
173 current_slot += 1;
174 }
175 };
176
177 fetch_stream
178 .map(|slot| {
179 let rpc_client = Arc::clone(&rpc_client);
180 let metrics = metrics.clone();
181
182 async move {
183 let start = Instant::now();
184 match rpc_client.get_block_with_config(slot, block_config).await {
185 Ok(block) => {
186 let time_taken = start.elapsed().as_millis();
187 metrics
188 .record_histogram(
189 "block_crawler_blocks_fetch_times_milliseconds",
190 time_taken as f64,
191 )
192 .await
193 .unwrap_or_else(|value| {
194 log::error!("Error recording metric: {}", value)
195 });
196
197 metrics
198 .increment_counter("block_crawler_blocks_fetched", 1)
199 .await
200 .unwrap_or_else(|value| {
201 log::error!("Error recording metric: {}", value)
202 });
203
204 Some((slot, block))
205 }
206 Err(e) => {
207 if e.to_string().contains("-32009")
213 || e.to_string().contains("-32004")
214 || e.to_string().contains("-32007")
215 {
216 metrics
217 .increment_counter("block_crawler_blocks_skipped", 1)
218 .await
219 .unwrap_or_else(|value| {
220 log::error!("Error recording metric: {}", value)
221 });
222 } else {
223 log::error!("Error fetching block at slot {}: {:?}", slot, e);
224 }
225 None
226 }
227 }
228 }
229 })
230 .buffer_unordered(max_concurrent_requests)
231 .for_each(|result| async {
232 if let Some((slot, block)) = result {
233 if let Err(e) = block_sender.send((slot, block)).await {
234 log::error!("Failed to send block: {:?}", e);
235 }
236 }
237 })
238 .await;
239 };
240
241 tokio::select! {
242 _ = cancellation_token.cancelled() => {
243 log::info!("Cancelling RPC Crawler block fetcher...");
244 }
245 _ = fetch_stream_task => {}
246 }
247 })
248}
249
250fn task_processor(
252 block_receiver: Receiver<(u64, UiConfirmedBlock)>,
253 sender: Sender<(Update, DatasourceId)>,
254 id: DatasourceId,
255 cancellation_token: CancellationToken,
256 metrics: Arc<MetricsCollection>,
257) -> JoinHandle<()> {
258 let mut block_receiver = block_receiver;
259 let sender = sender.clone();
260 let id_for_loop = id.clone();
261
262 tokio::spawn(async move {
263 loop {
264 tokio::select! {
265 _ = cancellation_token.cancelled() => {
266 log::info!("Cancelling RPC Crawler task processor...");
267 break;
268 }
269 maybe_block = block_receiver.recv() => {
270 match maybe_block {
271 Some((slot, block)) => {
272
273 metrics
274 .increment_counter("block_crawler_blocks_received", 1)
275 .await
276 .unwrap_or_else(|value| {
277 log::error!("Error recording metric: {}", value)
278 });
279 let block_start_time = Instant::now();
280 let block_hash = Hash::from_str(&block.blockhash).ok();
281 if let Some(transactions) = block.transactions {
282 for encoded_transaction_with_status_meta in transactions {
283 let start_time = std::time::Instant::now();
284
285 let meta_original = if let Some(meta) = encoded_transaction_with_status_meta.clone().meta {
286 meta
287 } else {
288 continue;
289 };
290
291 if meta_original.status.is_err() {
292 continue;
293 }
294
295 let Some(decoded_transaction) = encoded_transaction_with_status_meta.transaction.decode() else {
296 log::error!("Failed to decode transaction: {:?}", encoded_transaction_with_status_meta);
297 continue;
298 };
299
300 let Ok(meta_needed) = transaction_metadata_from_original_meta(meta_original) else {
301 log::error!("Error getting metadata from transaction original meta.");
302 continue;
303 };
304
305 let update = Update::Transaction(Box::new(TransactionUpdate {
306 signature: *decoded_transaction.get_signature(),
307 transaction: decoded_transaction.clone(),
308 meta: meta_needed,
309 is_vote: false,
310 slot,
311 block_time: block.block_time,
312 block_hash,
313 }));
314
315 metrics
316 .record_histogram(
317 "block_crawler_transaction_process_time_nanoseconds",
318 start_time.elapsed().as_nanos() as f64
319 )
320 .await
321 .unwrap_or_else(|value| log::error!("Error recording metric: {}", value));
322
323 metrics.increment_counter("block_crawler_transactions_processed", 1)
324 .await
325 .unwrap_or_else(|value| log::error!("Error recording metric: {}", value));
326
327 if let Err(err) = sender.try_send((update, id_for_loop.clone())) {
328 log::error!("Error sending transaction update: {:?}", err);
329 break;
330 }
331 }
332 }
333 metrics
334 .record_histogram(
335 "block_crawler_block_process_time_nanoseconds",
336 block_start_time.elapsed().as_nanos() as f64
337 ).await
338 .unwrap_or_else(|value| log::error!("Error recording metric: {}", value));
339
340 metrics
341 .increment_counter("block_crawler_blocks_processed", 1)
342 .await
343 .unwrap_or_else(|value| log::error!("Error recording metric: {}", value));
344 }
345 None => {
346 break;
347 }
348 }
349 }}
350 }
351 })
352}
353
354#[cfg(test)]
355mod tests {
356 use super::*;
357
358 #[tokio::test]
359 async fn test_block_fetcher_with_end_slot() {
360 let rpc_client = Arc::new(RpcClient::new_with_commitment(
361 "https://api.mainnet-beta.solana.com/".to_string(),
362 CommitmentConfig::confirmed(),
363 ));
364 let block_interval = Duration::from_millis(100);
365 let cancellation_token = CancellationToken::new();
366 let (block_sender, mut block_receiver) = mpsc::channel(1);
367
368 let block_config = RpcBlockConfig {
369 max_supported_transaction_version: Some(0),
370 ..Default::default()
371 };
372
373 let block_fetcher = block_fetcher(
375 rpc_client,
376 328837890,
377 Some(328837901),
378 block_interval,
379 block_config,
380 block_sender,
381 1,
382 cancellation_token.clone(),
383 Arc::new(MetricsCollection::new(vec![])),
384 );
385
386 let receiver_task = tokio::spawn(async move {
388 let mut received_blocks = Vec::new();
389
390 while let Some((slot, block)) = block_receiver.recv().await {
391 received_blocks.push((slot, block));
392
393 if received_blocks.len() == 2 {
394 break;
395 }
396 }
397 received_blocks
398 });
399
400 tokio::spawn(async move {
401 block_fetcher.await.expect("Block fetcher should not panic");
402 });
403
404 let exit_reason = tokio::select! {
406 result = receiver_task => {
407 let received_blocks = result.expect("Receiver task should not panic");
408 println!("Received {} blocks", received_blocks.len());
409
410 for (slot, block) in received_blocks {
411 println!("Block at slot {}: {} transactions",
412 slot,
413 block.transactions.map(|t| t.len()).unwrap_or(0)
414 );
415 }
416 "receiver_completed"
417 }
418 _ = cancellation_token.cancelled() => {
419 println!("Cancellation token triggered");
420 "cancellation_token"
421 }
422 _ = tokio::time::sleep(Duration::from_secs(30)) => {
423 println!("Timeout");
424 "timeout"
425 }
426 };
427
428 assert_eq!(
429 exit_reason, "receiver_completed",
430 "Test should exit because block fetcher completed"
431 );
432 }
433
434 #[tokio::test]
435 async fn test_block_fetcher_without_end_slot() {
436 let rpc_client = Arc::new(RpcClient::new_with_commitment(
437 "https://api.mainnet-beta.solana.com/".to_string(),
438 CommitmentConfig::confirmed(),
439 ));
440 let latest_slot = rpc_client
441 .get_slot()
442 .await
443 .expect("Failed to get last slot");
444
445 let block_interval = Duration::from_millis(100);
446 let cancellation_token = CancellationToken::new();
447 let (block_sender, mut block_receiver) = mpsc::channel(1);
448
449 let block_config = RpcBlockConfig {
450 max_supported_transaction_version: Some(0),
451 ..Default::default()
452 };
453
454 let block_fetcher = block_fetcher(
456 rpc_client,
457 latest_slot,
458 None,
459 block_interval,
460 block_config,
461 block_sender,
462 2,
463 cancellation_token.clone(),
464 Arc::new(MetricsCollection::new(vec![])),
465 );
466
467 let receiver_task = tokio::spawn(async move {
469 let mut received_blocks = Vec::new();
470
471 while let Some((slot, block)) = block_receiver.recv().await {
472 println!("Received block at slot {}", slot);
473 received_blocks.push((slot, block));
474
475 if received_blocks.len() == 2 {
476 break;
477 }
478 }
479 received_blocks
480 });
481
482 tokio::spawn(async move {
483 block_fetcher.await.expect("Block fetcher should not panic");
484 });
485
486 let exit_reason = tokio::select! {
488 result = receiver_task => {
489 let received_blocks = result.expect("Receiver task should not panic");
490 println!("Received {} blocks", received_blocks.len());
491
492 for (slot, block) in received_blocks {
493 println!("Block at slot {}: {} transactions",
494 slot,
495 block.transactions.map(|t| t.len()).unwrap_or(0)
496 );
497 }
498 "receiver_completed"
499 }
500 _ = cancellation_token.cancelled() => {
501 println!("Cancellation token triggered");
502 "cancellation_token"
503 }
504 _ = tokio::time::sleep(Duration::from_secs(30)) => {
505 println!("Timeout");
506 "timeout"
507 }
508 };
509
510 assert_eq!(
511 exit_reason, "receiver_completed",
512 "Test should exit because block fetcher completed"
513 );
514 }
515}