carbon_rpc_program_subscribe_datasource/
lib.rs1use {
2 async_trait::async_trait,
3 carbon_core::{
4 datasource::{AccountUpdate, Datasource, DatasourceId, Update, UpdateType},
5 error::CarbonResult,
6 metrics::MetricsCollection,
7 },
8 futures::StreamExt,
9 solana_account::Account,
10 solana_client::{
11 nonblocking::pubsub_client::PubsubClient, rpc_config::RpcProgramAccountsConfig,
12 },
13 solana_pubkey::Pubkey,
14 std::{str::FromStr, sync::Arc, time::Duration},
15 tokio::sync::mpsc::Sender,
16 tokio_util::sync::CancellationToken,
17};
18
19const MAX_RECONNECTION_ATTEMPTS: u32 = 10;
20const RECONNECTION_DELAY_MS: u64 = 3000;
21
22#[derive(Debug, Clone)]
23pub struct Filters {
24 pub pubkey: Pubkey,
25 pub program_subscribe_config: Option<RpcProgramAccountsConfig>,
26}
27
28impl Filters {
29 pub const fn new(
30 pubkey: Pubkey,
31 program_subscribe_config: Option<RpcProgramAccountsConfig>,
32 ) -> Self {
33 Filters {
34 pubkey,
35 program_subscribe_config,
36 }
37 }
38}
39
40pub struct RpcProgramSubscribe {
41 pub rpc_ws_url: String,
42 pub filters: Filters,
43}
44
45impl RpcProgramSubscribe {
46 pub const fn new(rpc_ws_url: String, filters: Filters) -> Self {
47 Self {
48 rpc_ws_url,
49 filters,
50 }
51 }
52}
53
54#[async_trait]
55impl Datasource for RpcProgramSubscribe {
56 async fn consume(
57 &self,
58 id: DatasourceId,
59 sender: Sender<(Update, DatasourceId)>,
60 cancellation_token: CancellationToken,
61 metrics: Arc<MetricsCollection>,
62 ) -> CarbonResult<()> {
63 let mut reconnection_attempts = 0;
64
65 loop {
66 if cancellation_token.is_cancelled() {
67 log::info!("Cancellation requested, stopping reconnection attempts");
68 break;
69 }
70
71 let client = match PubsubClient::new(&self.rpc_ws_url).await {
72 Ok(client) => client,
73 Err(err) => {
74 log::error!("Failed to create RPC subscribe client: {}", err);
75 reconnection_attempts += 1;
76 if reconnection_attempts >= MAX_RECONNECTION_ATTEMPTS {
77 return Err(carbon_core::error::Error::Custom(format!(
78 "Failed to create RPC subscribe client after {} attempts: {}",
79 MAX_RECONNECTION_ATTEMPTS, err
80 )));
81 }
82 tokio::time::sleep(Duration::from_millis(RECONNECTION_DELAY_MS)).await;
83 continue;
84 }
85 };
86
87 let filters = self.filters.clone();
88 let sender_clone = sender.clone();
89 let id_for_loop = id.clone();
90
91 let (mut program_stream, _program_unsub) = match client
92 .program_subscribe(&filters.pubkey, filters.program_subscribe_config)
93 .await
94 {
95 Ok(subscription) => subscription,
96 Err(err) => {
97 log::error!("Failed to subscribe to program updates: {:?}", err);
98 reconnection_attempts += 1;
99 if reconnection_attempts > MAX_RECONNECTION_ATTEMPTS {
100 return Err(carbon_core::error::Error::Custom(format!(
101 "Failed to subscribe after {} attempts: {}",
102 MAX_RECONNECTION_ATTEMPTS, err
103 )));
104 }
105 tokio::time::sleep(Duration::from_millis(RECONNECTION_DELAY_MS)).await;
106 continue;
107 }
108 };
109
110 reconnection_attempts = 0;
111
112 loop {
113 tokio::select! {
114 _ = cancellation_token.cancelled() => {
115 log::info!("Cancellation requested, stopping subscription...");
116 return Ok(());
117 }
118 event_result = program_stream.next() => {
119 match event_result {
120 Some(acc_event) => {
121 let start_time = std::time::Instant::now();
122 let decoded_account: Account = match acc_event.value.account.decode() {
123 Some(account_data) => account_data,
124 None => {
125 log::error!("Error decoding account event");
126 continue;
127 }
128 };
129
130 let Ok(account_pubkey) = Pubkey::from_str(&acc_event.value.pubkey) else {
131 log::error!("Error parsing account pubkey. Value: {}", &acc_event.value.pubkey);
132 continue;
133 };
134
135 let update = Update::Account(AccountUpdate {
136 pubkey: account_pubkey,
137 account: decoded_account,
138 slot: acc_event.context.slot,
139 transaction_signature: None,
140 });
141
142 metrics
143 .record_histogram(
144 "program_subscribe_account_process_time_nanoseconds",
145 start_time.elapsed().as_nanos() as f64
146 )
147 .await
148 .unwrap_or_else(|value| log::error!("Error recording metric: {}", value));
149
150 metrics.increment_counter("program_subscribe_accounts_processed", 1)
151 .await
152 .unwrap_or_else(|value| log::error!("Error recording metric: {}", value));
153
154 if let Err(err) = sender_clone.try_send((update, id_for_loop.clone())) {
155 log::error!("Error sending account update: {:?}", err);
156 break;
157 }
158 }
159 None => {
160 log::warn!("Program accounts stream has been closed, attempting to reconnect...");
161 break;
162 }
163 }
164 }
165 }
166 }
167
168 tokio::time::sleep(Duration::from_millis(RECONNECTION_DELAY_MS)).await;
169 }
170
171 Ok(())
172 }
173
174 fn update_types(&self) -> Vec<UpdateType> {
175 vec![UpdateType::AccountUpdate]
176 }
177}