carbon_rpc_block_subscribe_datasource/
lib.rs1use carbon_core::datasource::{BlockDetails, DatasourceId};
2use solana_hash::Hash;
3use std::str::FromStr;
4
5use {
6 async_trait::async_trait,
7 carbon_core::{
8 datasource::{Datasource, TransactionUpdate, Update, UpdateType},
9 error::CarbonResult,
10 metrics::MetricsCollection,
11 transformers::transaction_metadata_from_original_meta,
12 },
13 core::time::Duration,
14 futures::StreamExt,
15 solana_client::{
16 nonblocking::pubsub_client::PubsubClient,
17 rpc_client::SerializableTransaction,
18 rpc_config::{RpcBlockSubscribeConfig, RpcBlockSubscribeFilter},
19 },
20 std::sync::Arc,
21 tokio::sync::mpsc::Sender,
22 tokio_util::sync::CancellationToken,
23};
24
25const MAX_RECONNECTION_ATTEMPTS: u32 = 10;
26const RECONNECTION_DELAY_MS: u64 = 3000;
27
28#[derive(Debug, Clone)]
29pub struct Filters {
30 pub block_filter: RpcBlockSubscribeFilter,
31 pub block_subscribe_config: Option<RpcBlockSubscribeConfig>,
32}
33
34impl Filters {
35 pub const fn new(
36 block_filter: RpcBlockSubscribeFilter,
37 block_subscribe_config: Option<RpcBlockSubscribeConfig>,
38 ) -> Self {
39 Filters {
40 block_filter,
41 block_subscribe_config,
42 }
43 }
44}
45
46pub struct RpcBlockSubscribe {
47 pub rpc_ws_url: String,
48 pub filters: Filters,
49}
50
51impl RpcBlockSubscribe {
52 pub const fn new(rpc_ws_url: String, filters: Filters) -> Self {
53 Self {
54 rpc_ws_url,
55 filters,
56 }
57 }
58}
59
60#[async_trait]
61impl Datasource for RpcBlockSubscribe {
62 async fn consume(
63 &self,
64 id: DatasourceId,
65 sender: Sender<(Update, DatasourceId)>,
66 cancellation_token: CancellationToken,
67 metrics: Arc<MetricsCollection>,
68 ) -> CarbonResult<()> {
69 let mut reconnection_attempts = 0;
70
71 loop {
72 if cancellation_token.is_cancelled() {
73 log::info!("Cancellation requested, stopping reconnection attempts");
74 break;
75 }
76
77 let client = match PubsubClient::new(&self.rpc_ws_url).await {
78 Ok(client) => client,
79 Err(err) => {
80 log::error!("Failed to create RPC subscribe client: {}", err);
81 reconnection_attempts += 1;
82 if reconnection_attempts >= MAX_RECONNECTION_ATTEMPTS {
83 return Err(carbon_core::error::Error::Custom(format!(
84 "Failed to create RPC subscribe client after {} attempts: {}",
85 MAX_RECONNECTION_ATTEMPTS, err
86 )));
87 }
88 tokio::time::sleep(Duration::from_millis(RECONNECTION_DELAY_MS)).await;
89 continue;
90 }
91 };
92
93 let filters = self.filters.clone();
94 let sender_clone = sender.clone();
95 let id_for_loop = id.clone();
96
97 let (mut block_stream, _block_unsub) = match client
98 .block_subscribe(filters.block_filter, filters.block_subscribe_config)
99 .await
100 {
101 Ok(subscription) => subscription,
102 Err(err) => {
103 log::error!("Failed to subscribe to block updates: {:?}", err);
104 reconnection_attempts += 1;
105 if reconnection_attempts > MAX_RECONNECTION_ATTEMPTS {
106 return Err(carbon_core::error::Error::Custom(format!(
107 "Failed to subscribe after {} attempts: {}",
108 MAX_RECONNECTION_ATTEMPTS, err
109 )));
110 }
111 tokio::time::sleep(Duration::from_millis(RECONNECTION_DELAY_MS)).await;
112 continue;
113 }
114 };
115
116 reconnection_attempts = 0;
117
118 loop {
119 tokio::select! {
120 _ = cancellation_token.cancelled() => {
121 log::info!("Cancellation requested, stopping subscription...");
122 return Ok(());
123 }
124 block_event = block_stream.next() => {
125 match block_event {
126 Some(tx_event) => {
127 let slot = tx_event.context.slot;
128
129 if let Some(block) = tx_event.value.block {
130 let block_start_time = std::time::Instant::now();
131 let block_hash = Hash::from_str(&block.blockhash).ok();
132 let previous_block_hash = Hash::from_str(&block.previous_blockhash).ok();
133
134 let block_deteils = Update::BlockDetails( BlockDetails {
135 slot,
136 block_hash,
137 previous_block_hash,
138 rewards: block.rewards,
139 num_reward_partitions: block.num_reward_partitions,
140 block_time: block.block_time,
141 block_height: block.block_height,
142 });
143
144 if let Err(err) = sender_clone.try_send((block_deteils, id_for_loop.clone())) {
145 log::error!("Error sending block details: {:?}", err);
146 break;
147 }
148
149 if let Some(transactions) = block.transactions {
150 for encoded_transaction_with_status_meta in transactions {
151 let start_time = std::time::Instant::now();
152
153 let meta_original = if let Some(meta) = encoded_transaction_with_status_meta.clone().meta {
154 meta
155 } else {
156 continue;
157 };
158
159 if meta_original.status.is_err() {
160 continue;
161 }
162
163 let Some(decoded_transaction) = encoded_transaction_with_status_meta.transaction.decode() else {
164 log::error!("Failed to decode transaction: {:?}", encoded_transaction_with_status_meta);
165 continue;
166 };
167
168 let Ok(meta_needed) = transaction_metadata_from_original_meta(meta_original) else {
169 log::error!("Error getting metadata from transaction original meta.");
170 continue;
171 };
172
173 let update = Update::Transaction(Box::new(TransactionUpdate {
174 signature: *decoded_transaction.get_signature(),
175 transaction: decoded_transaction.clone(),
176 meta: meta_needed,
177 is_vote: false,
178 slot,
179 block_time: block.block_time,
180 block_hash,
181 }));
182
183 metrics
184 .record_histogram(
185 "block_subscribe_transaction_process_time_nanoseconds",
186 start_time.elapsed().as_nanos() as f64
187 )
188 .await
189 .unwrap_or_else(|value| log::error!("Error recording metric: {}", value));
190
191 metrics.increment_counter("block_subscribe_transactions_processed", 1)
192 .await
193 .unwrap_or_else(|value| log::error!("Error recording metric: {}", value));
194
195 if let Err(err) = sender_clone.try_send((update, id_for_loop.clone())) {
196 log::error!("Error sending transaction update: {:?}", err);
197 break;
198 }
199 }
200 }
201
202 metrics
203 .record_histogram(
204 "block_subscribe_block_process_time_nanoseconds",
205 block_start_time.elapsed().as_nanos() as f64
206 )
207 .await
208 .unwrap_or_else(|value| log::error!("Error recording metric: {}", value));
209
210 metrics.increment_counter("block_subscribe_blocks_received", 1)
211 .await
212 .unwrap_or_else(|value| log::error!("Error recording metric: {}", value));
213 }
214 }
215 None => {
216 log::warn!("Block stream has been closed, attempting to reconnect...");
217 break;
218 }
219 }
220 }
221 }
222 }
223
224 tokio::time::sleep(Duration::from_millis(RECONNECTION_DELAY_MS)).await;
225 }
226
227 Ok(())
228 }
229
230 fn update_types(&self) -> Vec<UpdateType> {
231 vec![UpdateType::Transaction]
232 }
233}