carbon_rpc_block_subscribe_datasource/
lib.rs1use {
2 async_trait::async_trait,
3 carbon_core::{
4 datasource::{Datasource, TransactionUpdate, Update, UpdateType},
5 error::CarbonResult,
6 metrics::MetricsCollection,
7 transformers::transaction_metadata_from_original_meta,
8 },
9 core::time::Duration,
10 futures::StreamExt,
11 solana_client::{
12 nonblocking::pubsub_client::PubsubClient,
13 rpc_client::SerializableTransaction,
14 rpc_config::{RpcBlockSubscribeConfig, RpcBlockSubscribeFilter},
15 },
16 std::sync::Arc,
17 tokio::sync::mpsc::Sender,
18 tokio_util::sync::CancellationToken,
19};
20
21const MAX_RECONNECTION_ATTEMPTS: u32 = 10;
22const RECONNECTION_DELAY_MS: u64 = 3000;
23
24#[derive(Debug, Clone)]
25pub struct Filters {
26 pub block_filter: RpcBlockSubscribeFilter,
27 pub block_subscribe_config: Option<RpcBlockSubscribeConfig>,
28}
29
30impl Filters {
31 pub const fn new(
32 block_filter: RpcBlockSubscribeFilter,
33 block_subscribe_config: Option<RpcBlockSubscribeConfig>,
34 ) -> Self {
35 Filters {
36 block_filter,
37 block_subscribe_config,
38 }
39 }
40}
41
42pub struct RpcBlockSubscribe {
43 pub rpc_ws_url: String,
44 pub filters: Filters,
45}
46
47impl RpcBlockSubscribe {
48 pub const fn new(rpc_ws_url: String, filters: Filters) -> Self {
49 Self {
50 rpc_ws_url,
51 filters,
52 }
53 }
54}
55
56#[async_trait]
57impl Datasource for RpcBlockSubscribe {
58 async fn consume(
59 &self,
60 sender: &Sender<Update>,
61 cancellation_token: CancellationToken,
62 metrics: Arc<MetricsCollection>,
63 ) -> CarbonResult<()> {
64 let mut reconnection_attempts = 0;
65
66 loop {
67 if cancellation_token.is_cancelled() {
68 log::info!("Cancellation requested, stopping reconnection attempts");
69 break;
70 }
71
72 let client = match PubsubClient::new(&self.rpc_ws_url).await {
73 Ok(client) => client,
74 Err(err) => {
75 log::error!("Failed to create RPC subscribe client: {}", err);
76 reconnection_attempts += 1;
77 if reconnection_attempts >= MAX_RECONNECTION_ATTEMPTS {
78 return Err(carbon_core::error::Error::Custom(format!(
79 "Failed to create RPC subscribe client after {} attempts: {}",
80 MAX_RECONNECTION_ATTEMPTS, err
81 )));
82 }
83 tokio::time::sleep(Duration::from_millis(RECONNECTION_DELAY_MS)).await;
84 continue;
85 }
86 };
87
88 let filters = self.filters.clone();
89 let sender_clone = sender.clone();
90
91 let (mut block_stream, _block_unsub) = match client
92 .block_subscribe(filters.block_filter, filters.block_subscribe_config)
93 .await
94 {
95 Ok(subscription) => subscription,
96 Err(err) => {
97 log::error!("Failed to subscribe to block updates: {:?}", err);
98 reconnection_attempts += 1;
99 if reconnection_attempts > MAX_RECONNECTION_ATTEMPTS {
100 return Err(carbon_core::error::Error::Custom(format!(
101 "Failed to subscribe after {} attempts: {}",
102 MAX_RECONNECTION_ATTEMPTS, err
103 )));
104 }
105 tokio::time::sleep(Duration::from_millis(RECONNECTION_DELAY_MS)).await;
106 continue;
107 }
108 };
109
110 reconnection_attempts = 0;
111
112 loop {
113 tokio::select! {
114 _ = cancellation_token.cancelled() => {
115 log::info!("Cancellation requested, stopping subscription...");
116 return Ok(());
117 }
118 block_event = block_stream.next() => {
119 match block_event {
120 Some(tx_event) => {
121 let slot = tx_event.context.slot;
122
123 if let Some(block) = tx_event.value.block {
124 let block_start_time = std::time::Instant::now();
125 if let Some(transactions) = block.transactions {
126 for encoded_transaction_with_status_meta in transactions {
127 let start_time = std::time::Instant::now();
128
129 let meta_original = if let Some(meta) = encoded_transaction_with_status_meta.clone().meta {
130 meta
131 } else {
132 continue;
133 };
134
135 if meta_original.status.is_err() {
136 continue;
137 }
138
139 let Some(decoded_transaction) = encoded_transaction_with_status_meta.transaction.decode() else {
140 log::error!("Failed to decode transaction: {:?}", encoded_transaction_with_status_meta);
141 continue;
142 };
143
144 let Ok(meta_needed) = transaction_metadata_from_original_meta(meta_original) else {
145 log::error!("Error getting metadata from transaction original meta.");
146 continue;
147 };
148
149 let update = Update::Transaction(Box::new(TransactionUpdate {
150 signature: *decoded_transaction.get_signature(),
151 transaction: decoded_transaction.clone(),
152 meta: meta_needed,
153 is_vote: false,
154 slot,
155 block_time: block.block_time,
156 }));
157
158 metrics
159 .record_histogram(
160 "block_subscribe_transaction_process_time_nanoseconds",
161 start_time.elapsed().as_nanos() as f64
162 )
163 .await
164 .unwrap_or_else(|value| log::error!("Error recording metric: {}", value));
165
166 metrics.increment_counter("block_subscribe_transactions_processed", 1)
167 .await
168 .unwrap_or_else(|value| log::error!("Error recording metric: {}", value));
169
170 if let Err(err) = sender_clone.try_send(update) {
171 log::error!("Error sending transaction update: {:?}", err);
172 break;
173 }
174 }
175 }
176
177 metrics
178 .record_histogram(
179 "block_subscribe_block_process_time_nanoseconds",
180 block_start_time.elapsed().as_nanos() as f64
181 )
182 .await
183 .unwrap_or_else(|value| log::error!("Error recording metric: {}", value));
184
185 metrics.increment_counter("block_subscribe_blocks_received", 1)
186 .await
187 .unwrap_or_else(|value| log::error!("Error recording metric: {}", value));
188 }
189 }
190 None => {
191 log::warn!("Block stream has been closed, attempting to reconnect...");
192 break;
193 }
194 }
195 }
196 }
197 }
198
199 tokio::time::sleep(Duration::from_millis(RECONNECTION_DELAY_MS)).await;
200 }
201
202 Ok(())
203 }
204
205 fn update_types(&self) -> Vec<UpdateType> {
206 vec![UpdateType::Transaction]
207 }
208}