1use {
2 async_trait::async_trait,
3 carbon_core::{
4 datasource::{Datasource, DatasourceId, TransactionUpdate, Update, UpdateType},
5 error::CarbonResult,
6 metrics::MetricsCollection,
7 transformers::transaction_metadata_from_original_meta,
8 },
9 futures::StreamExt,
10 solana_client::{
11 nonblocking::rpc_client::RpcClient, rpc_client::GetConfirmedSignaturesForAddress2Config,
12 rpc_config::RpcTransactionConfig,
13 },
14 solana_commitment_config::CommitmentConfig,
15 solana_pubkey::Pubkey,
16 solana_signature::Signature,
17 solana_transaction_status::{
18 EncodedConfirmedTransactionWithStatusMeta, UiLoadedAddresses, UiTransactionEncoding,
19 },
20 std::{collections::HashSet, str::FromStr, sync::Arc, time::Duration},
21 tokio::{
22 sync::mpsc::{self, Receiver, Sender},
23 task::JoinHandle,
24 time::Instant,
25 },
26 tokio_util::sync::CancellationToken,
27};
28
29#[derive(Debug, Clone)]
30pub struct Filters {
31 pub accounts: Option<Vec<Pubkey>>,
32 pub before_signature: Option<Signature>,
33 pub until_signature: Option<Signature>,
34}
35
36impl Filters {
37 pub const fn new(
38 accounts: Option<Vec<Pubkey>>,
39 before_signature: Option<Signature>,
40 until_signature: Option<Signature>,
41 ) -> Self {
42 Filters {
43 accounts,
44 before_signature,
45 until_signature,
46 }
47 }
48}
49
50#[derive(Debug, Clone)]
51pub struct RetryConfig {
52 pub max_retries: u32,
53 pub initial_backoff_ms: u64,
54 pub max_backoff_ms: u64,
55 pub backoff_multiplier: f64,
56}
57
58impl RetryConfig {
59 pub const fn new(
60 max_retries: u32,
61 initial_backoff_ms: u64,
62 max_backoff_ms: u64,
63 backoff_multiplier: f64,
64 ) -> Self {
65 RetryConfig {
66 max_retries,
67 initial_backoff_ms,
68 max_backoff_ms,
69 backoff_multiplier,
70 }
71 }
72
73 pub const fn default() -> Self {
74 RetryConfig {
75 max_retries: 3,
76 initial_backoff_ms: 1000,
77 max_backoff_ms: 10000,
78 backoff_multiplier: 2.0,
79 }
80 }
81
82 pub const fn no_retry() -> Self {
83 RetryConfig {
84 max_retries: 0,
85 initial_backoff_ms: 0,
86 max_backoff_ms: 0,
87 backoff_multiplier: 0.0,
88 }
89 }
90}
91
92#[derive(Debug, Clone)]
93pub struct ConnectionConfig {
94 pub batch_limit: usize,
95 pub polling_interval: Duration,
96 pub max_concurrent_requests: usize,
97 pub max_signature_channel_size: Option<usize>,
98 pub max_transaction_channel_size: Option<usize>,
99 pub retry_config: RetryConfig,
100 pub blocking_send: bool,
101}
102
103impl ConnectionConfig {
104 pub const fn new(
105 batch_limit: usize,
106 polling_interval: Duration,
107 max_concurrent_requests: usize,
108 retry_config: RetryConfig,
109 max_signature_channel_size: Option<usize>, max_transaction_channel_size: Option<usize>, blocking_send: bool,
112 ) -> Self {
113 ConnectionConfig {
114 batch_limit,
115 polling_interval,
116 max_concurrent_requests,
117 retry_config,
118 max_signature_channel_size,
119 max_transaction_channel_size,
120 blocking_send,
121 }
122 }
123
124 pub const fn default() -> Self {
125 ConnectionConfig {
126 batch_limit: 100,
127 polling_interval: Duration::from_secs(5),
128 max_concurrent_requests: 5,
129 retry_config: RetryConfig::default(),
130 max_signature_channel_size: None,
131 max_transaction_channel_size: None,
132 blocking_send: false,
133 }
134 }
135}
136
137pub struct RpcTransactionCrawler {
138 pub rpc_url: String,
139 pub account: Pubkey,
140 pub connection_config: ConnectionConfig,
141 pub filters: Filters,
142 pub commitment: Option<CommitmentConfig>,
143}
144
145impl RpcTransactionCrawler {
146 pub const fn new(
147 rpc_url: String,
148 account: Pubkey,
149 connection_config: ConnectionConfig,
150 filters: Filters,
151 commitment: Option<CommitmentConfig>,
152 ) -> Self {
153 RpcTransactionCrawler {
154 rpc_url,
155 account,
156 connection_config,
157 filters,
158 commitment,
159 }
160 }
161}
162
163#[async_trait]
164impl Datasource for RpcTransactionCrawler {
165 async fn consume(
166 &self,
167 id: DatasourceId,
168 sender: Sender<(Update, DatasourceId)>,
169 cancellation_token: CancellationToken,
170 metrics: Arc<MetricsCollection>,
171 ) -> CarbonResult<()> {
172 let rpc_client = Arc::new(RpcClient::new_with_commitment(
173 self.rpc_url.clone(),
174 self.commitment.unwrap_or(CommitmentConfig::confirmed()),
175 ));
176 let account = self.account;
177 let filters = self.filters.clone();
178 let sender = sender.clone();
179 let commitment = self.commitment;
180
181 let (signature_sender, signature_receiver) = mpsc::channel(
182 self.connection_config
183 .max_signature_channel_size
184 .unwrap_or(1000),
185 );
186 let (transaction_sender, transaction_receiver) = mpsc::channel(
187 self.connection_config
188 .max_transaction_channel_size
189 .unwrap_or(1000),
190 );
191
192 let signature_fetcher = signature_fetcher(
193 rpc_client.clone(),
194 account,
195 self.connection_config.clone(),
196 signature_sender,
197 filters.clone(),
198 commitment,
199 cancellation_token.clone(),
200 metrics.clone(),
201 );
202
203 let transaction_fetcher = transaction_fetcher(
204 rpc_client,
205 signature_receiver,
206 transaction_sender,
207 self.connection_config.clone(),
208 commitment,
209 cancellation_token.clone(),
210 metrics.clone(),
211 );
212
213 let task_processor = task_processor(
214 transaction_receiver,
215 sender,
216 id,
217 filters,
218 cancellation_token.clone(),
219 metrics.clone(),
220 self.connection_config.clone(),
221 );
222
223 tokio::spawn(async move {
224 tokio::select! {
225 _ = signature_fetcher => {},
226 _ = transaction_fetcher => {},
227 _ = task_processor => {},
228 }
229 });
230
231 Ok(())
232 }
233
234 fn update_types(&self) -> Vec<UpdateType> {
235 vec![UpdateType::Transaction]
236 }
237}
238
239#[allow(clippy::too_many_arguments)]
240fn signature_fetcher(
241 rpc_client: Arc<RpcClient>,
242 account: Pubkey,
243 connection_config: ConnectionConfig,
244 signature_sender: Sender<Signature>,
245 filters: Filters,
246 commitment: Option<CommitmentConfig>,
247 cancellation_token: CancellationToken,
248 metrics: Arc<MetricsCollection>,
249) -> JoinHandle<()> {
250 let rpc_client = Arc::clone(&rpc_client);
251 let filters = filters.clone();
252 let signature_sender = signature_sender.clone();
253
254 tokio::spawn(async move {
255 let mut last_fetched_signature = filters.before_signature;
256 let mut until_signature = filters.until_signature;
257 let mut most_recent_signature: Option<Signature> = None;
258 loop {
259 tokio::select! {
260 _ = cancellation_token.cancelled() => {
261 log::info!("Cancelling RPC Crawler signature fetcher...");
262 break;
263 }
264 _ = async {
265 let mut retries = 0;
266 let mut backoff = connection_config.retry_config.initial_backoff_ms;
267
268 loop {
269 match rpc_client.get_signatures_for_address_with_config(
270 &account,
271 GetConfirmedSignaturesForAddress2Config {
272 before: last_fetched_signature,
273 until: until_signature,
274 limit: Some(connection_config.batch_limit),
275 commitment: Some(commitment.unwrap_or(CommitmentConfig::confirmed())),
276 }
277 ).await {
278 Ok(signatures) => {
279 let start = Instant::now();
280
281 if signatures.is_empty() {
282 last_fetched_signature = None;
286 if most_recent_signature.is_some() {
287 until_signature = most_recent_signature;
290 most_recent_signature = None;
294 }
295 tokio::time::sleep(connection_config.polling_interval).await;
296 break;
297 }
298
299 if most_recent_signature.is_none() {
302 match Signature::from_str(&signatures[0].signature) {
303 Ok(sig) => most_recent_signature = Some(sig),
304 Err(e) => {
305 log::error!("Invalid signature: {:?}", e);
306 }
307 }
308 }
309
310 for sig_info in signatures.iter() {
311 let signature = match Signature::from_str(&sig_info.signature) {
312 Ok(sig) => sig,
313 Err(e) => {
314 log::error!("Invalid signature: {:?}", e);
315 continue;
316 }
317 };
318
319 if let Err(e) = signature_sender.send(signature).await {
320 log::error!("Failed to send signature: {:?}", e);
321 break;
322 }
323 }
324
325 last_fetched_signature = signatures
326 .last()
327 .and_then(|s| Signature::from_str(&s.signature).ok());
328
329 let time_taken = start.elapsed().as_millis();
330
331 metrics.record_histogram("transaction_crawler_signatures_fetch_times_milliseconds", time_taken as f64)
332 .await.unwrap_or_else(|value| log::error!("Error recording metric: {}", value));
333
334 metrics.increment_counter("transaction_crawler_signatures_fetched", signatures.len() as u64)
335 .await.unwrap_or_else(|value| log::error!("Error recording metric: {}", value));
336
337 break;
338 }
339 Err(e) => {
340 if retries >= connection_config.retry_config.max_retries {
341 log::error!("Failed to fetch signatures after {} retries: {:?}", retries, e);
342 break;
343 }
344
345 log::warn!(
346 "Failed to fetch signatures (attempt {}/{}), retrying in {}ms: {:?}",
347 retries + 1,
348 connection_config.retry_config.max_retries,
349 backoff,
350 e
351 );
352
353 tokio::time::sleep(Duration::from_millis(backoff)).await;
354 retries += 1;
355 backoff = (backoff as f64 * connection_config.retry_config.backoff_multiplier) as u64;
356 backoff = backoff.min(connection_config.retry_config.max_backoff_ms);
357 }
358 }
359 }
360 } => {}
361 }
362 }
363 })
364}
365
366fn transaction_fetcher(
367 rpc_client: Arc<RpcClient>,
368 signature_receiver: Receiver<Signature>,
369 transaction_sender: Sender<(Signature, EncodedConfirmedTransactionWithStatusMeta)>,
370 connection_config: ConnectionConfig,
371 commitment: Option<CommitmentConfig>,
372 cancellation_token: CancellationToken,
373 metrics: Arc<MetricsCollection>,
374) -> JoinHandle<()> {
375 let rpc_client = Arc::clone(&rpc_client);
376 let transaction_sender = transaction_sender.clone();
377 let mut signature_receiver = signature_receiver;
378
379 tokio::spawn(async move {
380 let fetch_stream_task = async {
381 let fetch_stream = async_stream::stream! {
382 while let Some(signature) = signature_receiver.recv().await {
383 yield signature;
384 }
385 };
386
387 fetch_stream
388 .map(|signature| {
389 let metrics = metrics.clone();
390 let connection_config = connection_config.clone();
391 let rpc_client = Arc::clone(&rpc_client);
392 async move {
393 let start = Instant::now();
394 let mut retries = 0;
395 let mut backoff = connection_config.retry_config.initial_backoff_ms;
396
397 loop {
398 match rpc_client.get_transaction_with_config(
399 &signature,
400 RpcTransactionConfig {
401 encoding: Some(UiTransactionEncoding::Base64),
402 commitment: Some(
403 commitment.unwrap_or(CommitmentConfig::confirmed()),
404 ),
405 max_supported_transaction_version: Some(0),
406 },
407 ).await {
408 Ok(tx) => {
409 let time_taken = start.elapsed().as_millis();
410
411 metrics
412 .record_histogram(
413 "transaction_crawler_transaction_fetch_times_milliseconds",
414 time_taken as f64,
415 )
416 .await
417 .expect("Error recording metric");
418
419 return Some((signature, tx));
420 }
421 Err(e) => {
422 if retries >= connection_config.retry_config.max_retries {
423 log::error!("Failed to fetch transaction {} after {} retries: {:?}", signature, retries, e);
424 return None;
425 }
426
427 log::warn!(
428 "Failed to fetch transaction {} (attempt {}/{}), retrying in {}ms: {:?}",
429 signature,
430 retries + 1,
431 connection_config.retry_config.max_retries,
432 backoff,
433 e
434 );
435
436 tokio::time::sleep(Duration::from_millis(backoff)).await;
437 retries += 1;
438 backoff = (backoff as f64 * connection_config.retry_config.backoff_multiplier) as u64;
439 backoff = backoff.min(connection_config.retry_config.max_backoff_ms);
440 }
441 }
442 }
443 }
444 })
445 .buffer_unordered(connection_config.max_concurrent_requests)
446 .for_each(|result| async {
447 metrics
448 .increment_counter("transaction_crawler_transactions_fetched", 1)
449 .await
450 .unwrap_or_else(|value| log::error!("Error recording metric: {}", value));
451
452 if let Some((signature, fetched_transaction)) = result {
453 if let Err(e) = transaction_sender
454 .send((signature, fetched_transaction))
455 .await
456 {
457 log::error!("Failed to send transaction: {:?}", e);
458 }
459 }
460 })
461 .await;
462 };
463
464 tokio::select! {
465 _ = cancellation_token.cancelled() => {
466 log::info!("Cancelling RPC Crawler transaction fetcher...");
467 }
468 _ = fetch_stream_task => {}
469 }
470 })
471}
472
473fn task_processor(
474 transaction_receiver: Receiver<(Signature, EncodedConfirmedTransactionWithStatusMeta)>,
475 sender: Sender<(Update, DatasourceId)>,
476 id: DatasourceId,
477 filters: Filters,
478 cancellation_token: CancellationToken,
479 metrics: Arc<MetricsCollection>,
480 connection_config: ConnectionConfig,
481) -> JoinHandle<()> {
482 let mut transaction_receiver = transaction_receiver;
483 let sender = sender.clone();
484 let id_for_loop = id.clone();
485
486 tokio::spawn(async move {
487 loop {
488 tokio::select! {
489 _ = cancellation_token.cancelled() => {
490 log::info!("Cancelling RPC Crawler task processor...");
491 break;
492 }
493 Some((signature, fetched_transaction)) = transaction_receiver.recv() => {
494 let start = Instant::now();
495 let transaction = fetched_transaction.transaction;
496
497 let meta_original = if let Some(meta) = transaction.clone().meta {
498 meta
499 } else {
500 log::warn!("Meta is malformed for transaction: {:?}", signature);
501 continue;
502 };
503
504 if meta_original.status.is_err() {
505 continue;
506 }
507
508 let Some(decoded_transaction) = transaction.transaction.decode() else {
509 log::error!("Failed to decode transaction: {:?}", transaction);
510 continue;
511 };
512
513 if let Some(accounts) = &filters.accounts {
514 let account_set: HashSet<Pubkey> = accounts.iter().cloned().collect();
515
516 let static_accounts = decoded_transaction.message.static_account_keys();
517
518 let loaded_addresses =
519 meta_original
520 .loaded_addresses
521 .clone()
522 .unwrap_or_else(|| UiLoadedAddresses {
523 writable: vec![],
524 readonly: vec![],
525 });
526
527 let all_accounts: HashSet<Pubkey> = static_accounts
528 .iter()
529 .cloned()
530 .chain(
531 loaded_addresses
532 .writable
533 .iter()
534 .filter_map(|s| Pubkey::from_str(s).ok()),
535 )
536 .chain(
537 loaded_addresses
538 .readonly
539 .iter()
540 .filter_map(|s| Pubkey::from_str(s).ok()),
541 )
542 .collect();
543
544 if !all_accounts
545 .iter()
546 .any(|account| account_set.contains(account))
547 {
548 continue;
549 }
550 }
551
552 let Ok(meta_needed) = transaction_metadata_from_original_meta(meta_original) else {
553 log::error!("Error getting metadata from transaction original meta.");
554 continue;
555 };
556
557 let update = Update::Transaction(Box::new(TransactionUpdate {
558 signature,
559 transaction: decoded_transaction.clone(),
560 meta: meta_needed,
561 is_vote: false,
562 slot: fetched_transaction.slot,
563 block_time: fetched_transaction.block_time,
564 block_hash: None,
565 }));
566
567
568 metrics
569 .record_histogram(
570 "transaction_crawler_transaction_process_time_milliseconds",
571 start.elapsed().as_millis() as f64
572 )
573 .await
574 .unwrap_or_else(|value| log::error!("Error recording metric: {}", value));
575
576
577 if connection_config.blocking_send {
578 if let Err(e) = sender.send((update.clone(), id_for_loop.clone())).await {
579 log::warn!("Failed to send update: {:?}", e);
580 continue;
581 }
582 }
583 if !connection_config.blocking_send {
584 if let Err(e) = sender.try_send((update.clone(), id_for_loop.clone())) {
585 log::warn!("Failed to send update: {:?}", e);
586 continue;
587 }
588 }
589 }
590 }
591 }
592 })
593}