carbon_rpc_block_subscribe_datasource/
lib.rs

1use carbon_core::datasource::{BlockDetails, DatasourceId};
2use solana_hash::Hash;
3use std::str::FromStr;
4
5use {
6    async_trait::async_trait,
7    carbon_core::{
8        datasource::{Datasource, TransactionUpdate, Update, UpdateType},
9        error::CarbonResult,
10        metrics::MetricsCollection,
11        transformers::transaction_metadata_from_original_meta,
12    },
13    core::time::Duration,
14    futures::StreamExt,
15    solana_client::{
16        nonblocking::pubsub_client::PubsubClient,
17        rpc_client::SerializableTransaction,
18        rpc_config::{RpcBlockSubscribeConfig, RpcBlockSubscribeFilter},
19    },
20    std::sync::Arc,
21    tokio::sync::mpsc::Sender,
22    tokio_util::sync::CancellationToken,
23};
24
25const MAX_RECONNECTION_ATTEMPTS: u32 = 10;
26const RECONNECTION_DELAY_MS: u64 = 3000;
27
28#[derive(Debug, Clone)]
29pub struct Filters {
30    pub block_filter: RpcBlockSubscribeFilter,
31    pub block_subscribe_config: Option<RpcBlockSubscribeConfig>,
32}
33
34impl Filters {
35    pub const fn new(
36        block_filter: RpcBlockSubscribeFilter,
37        block_subscribe_config: Option<RpcBlockSubscribeConfig>,
38    ) -> Self {
39        Filters {
40            block_filter,
41            block_subscribe_config,
42        }
43    }
44}
45
46pub struct RpcBlockSubscribe {
47    pub rpc_ws_url: String,
48    pub filters: Filters,
49}
50
51impl RpcBlockSubscribe {
52    pub const fn new(rpc_ws_url: String, filters: Filters) -> Self {
53        Self {
54            rpc_ws_url,
55            filters,
56        }
57    }
58}
59
60#[async_trait]
61impl Datasource for RpcBlockSubscribe {
62    async fn consume(
63        &self,
64        id: DatasourceId,
65        sender: Sender<(Update, DatasourceId)>,
66        cancellation_token: CancellationToken,
67        metrics: Arc<MetricsCollection>,
68    ) -> CarbonResult<()> {
69        let mut reconnection_attempts = 0;
70
71        loop {
72            if cancellation_token.is_cancelled() {
73                log::info!("Cancellation requested, stopping reconnection attempts");
74                break;
75            }
76
77            let client = match PubsubClient::new(&self.rpc_ws_url).await {
78                Ok(client) => client,
79                Err(err) => {
80                    log::error!("Failed to create RPC subscribe client: {}", err);
81                    reconnection_attempts += 1;
82                    if reconnection_attempts >= MAX_RECONNECTION_ATTEMPTS {
83                        return Err(carbon_core::error::Error::Custom(format!(
84                            "Failed to create RPC subscribe client after {} attempts: {}",
85                            MAX_RECONNECTION_ATTEMPTS, err
86                        )));
87                    }
88                    tokio::time::sleep(Duration::from_millis(RECONNECTION_DELAY_MS)).await;
89                    continue;
90                }
91            };
92
93            let filters = self.filters.clone();
94            let sender_clone = sender.clone();
95            let id_for_loop = id.clone();
96
97            let (mut block_stream, _block_unsub) = match client
98                .block_subscribe(filters.block_filter, filters.block_subscribe_config)
99                .await
100            {
101                Ok(subscription) => subscription,
102                Err(err) => {
103                    log::error!("Failed to subscribe to block updates: {:?}", err);
104                    reconnection_attempts += 1;
105                    if reconnection_attempts > MAX_RECONNECTION_ATTEMPTS {
106                        return Err(carbon_core::error::Error::Custom(format!(
107                            "Failed to subscribe after {} attempts: {}",
108                            MAX_RECONNECTION_ATTEMPTS, err
109                        )));
110                    }
111                    tokio::time::sleep(Duration::from_millis(RECONNECTION_DELAY_MS)).await;
112                    continue;
113                }
114            };
115
116            reconnection_attempts = 0;
117
118            loop {
119                tokio::select! {
120                    _ = cancellation_token.cancelled() => {
121                        log::info!("Cancellation requested, stopping subscription...");
122                        return Ok(());
123                    }
124                    block_event = block_stream.next() => {
125                        match block_event {
126                            Some(tx_event) => {
127                                let slot = tx_event.context.slot;
128
129                                if let Some(block) = tx_event.value.block {
130                                    let block_start_time = std::time::Instant::now();
131                                    let block_hash = Hash::from_str(&block.blockhash).ok();
132                                    let previous_block_hash = Hash::from_str(&block.previous_blockhash).ok();
133
134                                    let block_deteils = Update::BlockDetails( BlockDetails {
135                                                slot,
136                                                block_hash,
137                                                previous_block_hash,
138                                                rewards: block.rewards,
139                                                num_reward_partitions: block.num_reward_partitions,
140                                                block_time: block.block_time,
141                                                block_height: block.block_height,
142                                    });
143
144                                    if let Err(err) = sender_clone.try_send((block_deteils, id_for_loop.clone())) {
145                                        log::error!("Error sending block details: {:?}", err);
146                                        break;
147                                    }
148
149                                    if let Some(transactions) = block.transactions {
150                                        for encoded_transaction_with_status_meta in transactions {
151                                            let start_time = std::time::Instant::now();
152
153                                            let meta_original = if let Some(meta) = encoded_transaction_with_status_meta.clone().meta {
154                                                meta
155                                            } else {
156                                                continue;
157                                            };
158
159                                            if meta_original.status.is_err() {
160                                                continue;
161                                            }
162
163                                            let Some(decoded_transaction) = encoded_transaction_with_status_meta.transaction.decode() else {
164                                                log::error!("Failed to decode transaction: {:?}", encoded_transaction_with_status_meta);
165                                                continue;
166                                            };
167
168                                            let Ok(meta_needed) = transaction_metadata_from_original_meta(meta_original) else {
169                                                log::error!("Error getting metadata from transaction original meta.");
170                                                continue;
171                                            };
172
173                                            let update = Update::Transaction(Box::new(TransactionUpdate {
174                                                signature: *decoded_transaction.get_signature(),
175                                                transaction: decoded_transaction.clone(),
176                                                meta: meta_needed,
177                                                is_vote: false,
178                                                slot,
179                                                block_time: block.block_time,
180                                                block_hash,
181                                            }));
182
183                                            metrics
184                                                .record_histogram(
185                                                    "block_subscribe_transaction_process_time_nanoseconds",
186                                                    start_time.elapsed().as_nanos() as f64
187                                                )
188                                                .await
189                                                .unwrap_or_else(|value| log::error!("Error recording metric: {}", value));
190
191                                            metrics.increment_counter("block_subscribe_transactions_processed", 1)
192                                                .await
193                                                .unwrap_or_else(|value| log::error!("Error recording metric: {}", value));
194
195                                            if let Err(err) = sender_clone.try_send((update, id_for_loop.clone())) {
196                                                log::error!("Error sending transaction update: {:?}", err);
197                                                break;
198                                            }
199                                        }
200                                    }
201
202                                    metrics
203                                        .record_histogram(
204                                            "block_subscribe_block_process_time_nanoseconds",
205                                            block_start_time.elapsed().as_nanos() as f64
206                                        )
207                                        .await
208                                        .unwrap_or_else(|value| log::error!("Error recording metric: {}", value));
209
210                                    metrics.increment_counter("block_subscribe_blocks_received", 1)
211                                        .await
212                                        .unwrap_or_else(|value| log::error!("Error recording metric: {}", value));
213                                }
214                            }
215                            None => {
216                                log::warn!("Block stream has been closed, attempting to reconnect...");
217                                break;
218                            }
219                        }
220                    }
221                }
222            }
223
224            tokio::time::sleep(Duration::from_millis(RECONNECTION_DELAY_MS)).await;
225        }
226
227        Ok(())
228    }
229
230    fn update_types(&self) -> Vec<UpdateType> {
231        vec![UpdateType::Transaction]
232    }
233}