carbon_rpc_program_subscribe_datasource/
lib.rs1use {
2 async_trait::async_trait,
3 carbon_core::{
4 datasource::{AccountUpdate, Datasource, Update, UpdateType},
5 error::CarbonResult,
6 metrics::MetricsCollection,
7 },
8 futures::StreamExt,
9 solana_client::{
10 nonblocking::pubsub_client::PubsubClient, rpc_config::RpcProgramAccountsConfig,
11 },
12 solana_sdk::{account::Account, pubkey::Pubkey},
13 std::{str::FromStr, sync::Arc, time::Duration},
14 tokio::sync::mpsc::UnboundedSender,
15 tokio_util::sync::CancellationToken,
16};
17
18const MAX_RECONNECTION_ATTEMPTS: u32 = 10;
19const RECONNECTION_DELAY_MS: u64 = 3000;
20
21#[derive(Debug, Clone)]
22pub struct Filters {
23 pub pubkey: Pubkey,
24 pub program_subscribe_config: Option<RpcProgramAccountsConfig>,
25}
26
27impl Filters {
28 pub fn new(pubkey: Pubkey, program_subscribe_config: Option<RpcProgramAccountsConfig>) -> Self {
29 Filters {
30 pubkey,
31 program_subscribe_config,
32 }
33 }
34}
35
36pub struct RpcProgramSubscribe {
37 pub rpc_ws_url: String,
38 pub filters: Filters,
39}
40
41impl RpcProgramSubscribe {
42 pub fn new(rpc_ws_url: String, filters: Filters) -> Self {
43 Self {
44 rpc_ws_url,
45 filters,
46 }
47 }
48}
49
50#[async_trait]
51impl Datasource for RpcProgramSubscribe {
52 async fn consume(
53 &self,
54 sender: &UnboundedSender<Update>,
55 cancellation_token: CancellationToken,
56 metrics: Arc<MetricsCollection>,
57 ) -> CarbonResult<()> {
58 let mut reconnection_attempts = 0;
59
60 loop {
61 if cancellation_token.is_cancelled() {
62 log::info!("Cancellation requested, stopping reconnection attempts");
63 break;
64 }
65
66 let client = match PubsubClient::new(&self.rpc_ws_url).await {
67 Ok(client) => client,
68 Err(err) => {
69 log::error!("Failed to create RPC subscribe client: {}", err);
70 reconnection_attempts += 1;
71 if reconnection_attempts >= MAX_RECONNECTION_ATTEMPTS {
72 return Err(carbon_core::error::Error::Custom(format!(
73 "Failed to create RPC subscribe client after {} attempts: {}",
74 MAX_RECONNECTION_ATTEMPTS, err
75 )));
76 }
77 tokio::time::sleep(Duration::from_millis(RECONNECTION_DELAY_MS)).await;
78 continue;
79 }
80 };
81
82 let filters = self.filters.clone();
83 let sender_clone = sender.clone();
84
85 let (mut program_stream, _program_unsub) = match client
86 .program_subscribe(&filters.pubkey, filters.program_subscribe_config)
87 .await
88 {
89 Ok(subscription) => subscription,
90 Err(err) => {
91 log::error!("Failed to subscribe to program updates: {:?}", err);
92 reconnection_attempts += 1;
93 if reconnection_attempts > MAX_RECONNECTION_ATTEMPTS {
94 return Err(carbon_core::error::Error::Custom(format!(
95 "Failed to subscribe after {} attempts: {}",
96 MAX_RECONNECTION_ATTEMPTS, err
97 )));
98 }
99 tokio::time::sleep(Duration::from_millis(RECONNECTION_DELAY_MS)).await;
100 continue;
101 }
102 };
103
104 reconnection_attempts = 0;
105
106 loop {
107 tokio::select! {
108 _ = cancellation_token.cancelled() => {
109 log::info!("Cancellation requested, stopping subscription...");
110 return Ok(());
111 }
112 event_result = program_stream.next() => {
113 match event_result {
114 Some(acc_event) => {
115 let start_time = std::time::Instant::now();
116 let decoded_account: Account = match acc_event.value.account.decode() {
117 Some(account_data) => account_data,
118 None => {
119 log::error!("Error decoding account event");
120 continue;
121 }
122 };
123
124 let Ok(account_pubkey) = Pubkey::from_str(&acc_event.value.pubkey) else {
125 log::error!("Error parsing account pubkey. Value: {}", &acc_event.value.pubkey);
126 continue;
127 };
128
129 let update = Update::Account(AccountUpdate {
130 pubkey: account_pubkey,
131 account: decoded_account,
132 slot: acc_event.context.slot,
133 });
134
135 metrics
136 .record_histogram(
137 "program_subscribe_account_process_time_nanoseconds",
138 start_time.elapsed().as_nanos() as f64
139 )
140 .await
141 .unwrap_or_else(|value| log::error!("Error recording metric: {}", value));
142
143 metrics.increment_counter("program_subscribe_accounts_processed", 1)
144 .await
145 .unwrap_or_else(|value| log::error!("Error recording metric: {}", value));
146
147 if let Err(err) = sender_clone.send(update) {
148 log::error!("Error sending account update: {:?}", err);
149 break;
150 }
151 }
152 None => {
153 log::warn!("Program accounts stream has been closed, attempting to reconnect...");
154 break;
155 }
156 }
157 }
158 }
159 }
160
161 tokio::time::sleep(Duration::from_millis(RECONNECTION_DELAY_MS)).await;
162 }
163
164 Ok(())
165 }
166
167 fn update_types(&self) -> Vec<UpdateType> {
168 vec![UpdateType::AccountUpdate]
169 }
170}