carbon_rpc_block_subscribe_datasource/
lib.rs

1use {
2    async_trait::async_trait,
3    carbon_core::{
4        datasource::{Datasource, TransactionUpdate, Update, UpdateType},
5        error::CarbonResult,
6        metrics::MetricsCollection,
7        transformers::transaction_metadata_from_original_meta,
8    },
9    core::time::Duration,
10    futures::StreamExt,
11    solana_client::{
12        nonblocking::pubsub_client::PubsubClient,
13        rpc_client::SerializableTransaction,
14        rpc_config::{RpcBlockSubscribeConfig, RpcBlockSubscribeFilter},
15    },
16    std::sync::Arc,
17    tokio::sync::mpsc::Sender,
18    tokio_util::sync::CancellationToken,
19};
20
21const MAX_RECONNECTION_ATTEMPTS: u32 = 10;
22const RECONNECTION_DELAY_MS: u64 = 3000;
23
24#[derive(Debug, Clone)]
25pub struct Filters {
26    pub block_filter: RpcBlockSubscribeFilter,
27    pub block_subscribe_config: Option<RpcBlockSubscribeConfig>,
28}
29
30impl Filters {
31    pub const fn new(
32        block_filter: RpcBlockSubscribeFilter,
33        block_subscribe_config: Option<RpcBlockSubscribeConfig>,
34    ) -> Self {
35        Filters {
36            block_filter,
37            block_subscribe_config,
38        }
39    }
40}
41
42pub struct RpcBlockSubscribe {
43    pub rpc_ws_url: String,
44    pub filters: Filters,
45}
46
47impl RpcBlockSubscribe {
48    pub const fn new(rpc_ws_url: String, filters: Filters) -> Self {
49        Self {
50            rpc_ws_url,
51            filters,
52        }
53    }
54}
55
56#[async_trait]
57impl Datasource for RpcBlockSubscribe {
58    async fn consume(
59        &self,
60        sender: &Sender<Update>,
61        cancellation_token: CancellationToken,
62        metrics: Arc<MetricsCollection>,
63    ) -> CarbonResult<()> {
64        let mut reconnection_attempts = 0;
65
66        loop {
67            if cancellation_token.is_cancelled() {
68                log::info!("Cancellation requested, stopping reconnection attempts");
69                break;
70            }
71
72            let client = match PubsubClient::new(&self.rpc_ws_url).await {
73                Ok(client) => client,
74                Err(err) => {
75                    log::error!("Failed to create RPC subscribe client: {}", err);
76                    reconnection_attempts += 1;
77                    if reconnection_attempts >= MAX_RECONNECTION_ATTEMPTS {
78                        return Err(carbon_core::error::Error::Custom(format!(
79                            "Failed to create RPC subscribe client after {} attempts: {}",
80                            MAX_RECONNECTION_ATTEMPTS, err
81                        )));
82                    }
83                    tokio::time::sleep(Duration::from_millis(RECONNECTION_DELAY_MS)).await;
84                    continue;
85                }
86            };
87
88            let filters = self.filters.clone();
89            let sender_clone = sender.clone();
90
91            let (mut block_stream, _block_unsub) = match client
92                .block_subscribe(filters.block_filter, filters.block_subscribe_config)
93                .await
94            {
95                Ok(subscription) => subscription,
96                Err(err) => {
97                    log::error!("Failed to subscribe to block updates: {:?}", err);
98                    reconnection_attempts += 1;
99                    if reconnection_attempts > MAX_RECONNECTION_ATTEMPTS {
100                        return Err(carbon_core::error::Error::Custom(format!(
101                            "Failed to subscribe after {} attempts: {}",
102                            MAX_RECONNECTION_ATTEMPTS, err
103                        )));
104                    }
105                    tokio::time::sleep(Duration::from_millis(RECONNECTION_DELAY_MS)).await;
106                    continue;
107                }
108            };
109
110            reconnection_attempts = 0;
111
112            loop {
113                tokio::select! {
114                    _ = cancellation_token.cancelled() => {
115                        log::info!("Cancellation requested, stopping subscription...");
116                        return Ok(());
117                    }
118                    block_event = block_stream.next() => {
119                        match block_event {
120                            Some(tx_event) => {
121                                let slot = tx_event.context.slot;
122
123                                if let Some(block) = tx_event.value.block {
124                                    let block_start_time = std::time::Instant::now();
125                                    if let Some(transactions) = block.transactions {
126                                        for encoded_transaction_with_status_meta in transactions {
127                                            let start_time = std::time::Instant::now();
128
129                                            let meta_original = if let Some(meta) = encoded_transaction_with_status_meta.clone().meta {
130                                                meta
131                                            } else {
132                                                continue;
133                                            };
134
135                                            if meta_original.status.is_err() {
136                                                continue;
137                                            }
138
139                                            let Some(decoded_transaction) = encoded_transaction_with_status_meta.transaction.decode() else {
140                                                log::error!("Failed to decode transaction: {:?}", encoded_transaction_with_status_meta);
141                                                continue;
142                                            };
143
144                                            let Ok(meta_needed) = transaction_metadata_from_original_meta(meta_original) else {
145                                                log::error!("Error getting metadata from transaction original meta.");
146                                                continue;
147                                            };
148
149                                            let update = Update::Transaction(Box::new(TransactionUpdate {
150                                                signature: *decoded_transaction.get_signature(),
151                                                transaction: decoded_transaction.clone(),
152                                                meta: meta_needed,
153                                                is_vote: false,
154                                                slot,
155                                                block_time: block.block_time,
156                                            }));
157
158                                            metrics
159                                                .record_histogram(
160                                                    "block_subscribe_transaction_process_time_nanoseconds",
161                                                    start_time.elapsed().as_nanos() as f64
162                                                )
163                                                .await
164                                                .unwrap_or_else(|value| log::error!("Error recording metric: {}", value));
165
166                                            metrics.increment_counter("block_subscribe_transactions_processed", 1)
167                                                .await
168                                                .unwrap_or_else(|value| log::error!("Error recording metric: {}", value));
169
170                                            if let Err(err) = sender_clone.try_send(update) {
171                                                log::error!("Error sending transaction update: {:?}", err);
172                                                break;
173                                            }
174                                        }
175                                    }
176
177                                    metrics
178                                        .record_histogram(
179                                            "block_subscribe_block_process_time_nanoseconds",
180                                            block_start_time.elapsed().as_nanos() as f64
181                                        )
182                                        .await
183                                        .unwrap_or_else(|value| log::error!("Error recording metric: {}", value));
184
185                                    metrics.increment_counter("block_subscribe_blocks_received", 1)
186                                        .await
187                                        .unwrap_or_else(|value| log::error!("Error recording metric: {}", value));
188                                }
189                            }
190                            None => {
191                                log::warn!("Block stream has been closed, attempting to reconnect...");
192                                break;
193                            }
194                        }
195                    }
196                }
197            }
198
199            tokio::time::sleep(Duration::from_millis(RECONNECTION_DELAY_MS)).await;
200        }
201
202        Ok(())
203    }
204
205    fn update_types(&self) -> Vec<UpdateType> {
206        vec![UpdateType::Transaction]
207    }
208}