carbon_rpc_program_subscribe_datasource/
lib.rs1use {
2 async_trait::async_trait,
3 carbon_core::{
4 datasource::{AccountUpdate, Datasource, Update, UpdateType},
5 error::CarbonResult,
6 metrics::MetricsCollection,
7 },
8 futures::StreamExt,
9 solana_account::Account,
10 solana_client::{
11 nonblocking::pubsub_client::PubsubClient, rpc_config::RpcProgramAccountsConfig,
12 },
13 solana_pubkey::Pubkey,
14 std::{str::FromStr, sync::Arc, time::Duration},
15 tokio::sync::mpsc::Sender,
16 tokio_util::sync::CancellationToken,
17};
18
19const MAX_RECONNECTION_ATTEMPTS: u32 = 10;
20const RECONNECTION_DELAY_MS: u64 = 3000;
21
22#[derive(Debug, Clone)]
23pub struct Filters {
24 pub pubkey: Pubkey,
25 pub program_subscribe_config: Option<RpcProgramAccountsConfig>,
26}
27
28impl Filters {
29 pub const fn new(
30 pubkey: Pubkey,
31 program_subscribe_config: Option<RpcProgramAccountsConfig>,
32 ) -> Self {
33 Filters {
34 pubkey,
35 program_subscribe_config,
36 }
37 }
38}
39
40pub struct RpcProgramSubscribe {
41 pub rpc_ws_url: String,
42 pub filters: Filters,
43}
44
45impl RpcProgramSubscribe {
46 pub const fn new(rpc_ws_url: String, filters: Filters) -> Self {
47 Self {
48 rpc_ws_url,
49 filters,
50 }
51 }
52}
53
54#[async_trait]
55impl Datasource for RpcProgramSubscribe {
56 async fn consume(
57 &self,
58 sender: &Sender<Update>,
59 cancellation_token: CancellationToken,
60 metrics: Arc<MetricsCollection>,
61 ) -> CarbonResult<()> {
62 let mut reconnection_attempts = 0;
63
64 loop {
65 if cancellation_token.is_cancelled() {
66 log::info!("Cancellation requested, stopping reconnection attempts");
67 break;
68 }
69
70 let client = match PubsubClient::new(&self.rpc_ws_url).await {
71 Ok(client) => client,
72 Err(err) => {
73 log::error!("Failed to create RPC subscribe client: {}", err);
74 reconnection_attempts += 1;
75 if reconnection_attempts >= MAX_RECONNECTION_ATTEMPTS {
76 return Err(carbon_core::error::Error::Custom(format!(
77 "Failed to create RPC subscribe client after {} attempts: {}",
78 MAX_RECONNECTION_ATTEMPTS, err
79 )));
80 }
81 tokio::time::sleep(Duration::from_millis(RECONNECTION_DELAY_MS)).await;
82 continue;
83 }
84 };
85
86 let filters = self.filters.clone();
87 let sender_clone = sender.clone();
88
89 let (mut program_stream, _program_unsub) = match client
90 .program_subscribe(&filters.pubkey, filters.program_subscribe_config)
91 .await
92 {
93 Ok(subscription) => subscription,
94 Err(err) => {
95 log::error!("Failed to subscribe to program updates: {:?}", err);
96 reconnection_attempts += 1;
97 if reconnection_attempts > MAX_RECONNECTION_ATTEMPTS {
98 return Err(carbon_core::error::Error::Custom(format!(
99 "Failed to subscribe after {} attempts: {}",
100 MAX_RECONNECTION_ATTEMPTS, err
101 )));
102 }
103 tokio::time::sleep(Duration::from_millis(RECONNECTION_DELAY_MS)).await;
104 continue;
105 }
106 };
107
108 reconnection_attempts = 0;
109
110 loop {
111 tokio::select! {
112 _ = cancellation_token.cancelled() => {
113 log::info!("Cancellation requested, stopping subscription...");
114 return Ok(());
115 }
116 event_result = program_stream.next() => {
117 match event_result {
118 Some(acc_event) => {
119 let start_time = std::time::Instant::now();
120 let decoded_account: Account = match acc_event.value.account.decode() {
121 Some(account_data) => account_data,
122 None => {
123 log::error!("Error decoding account event");
124 continue;
125 }
126 };
127
128 let Ok(account_pubkey) = Pubkey::from_str(&acc_event.value.pubkey) else {
129 log::error!("Error parsing account pubkey. Value: {}", &acc_event.value.pubkey);
130 continue;
131 };
132
133 let update = Update::Account(AccountUpdate {
134 pubkey: account_pubkey,
135 account: decoded_account,
136 slot: acc_event.context.slot,
137 });
138
139 metrics
140 .record_histogram(
141 "program_subscribe_account_process_time_nanoseconds",
142 start_time.elapsed().as_nanos() as f64
143 )
144 .await
145 .unwrap_or_else(|value| log::error!("Error recording metric: {}", value));
146
147 metrics.increment_counter("program_subscribe_accounts_processed", 1)
148 .await
149 .unwrap_or_else(|value| log::error!("Error recording metric: {}", value));
150
151 if let Err(err) = sender_clone.try_send(update) {
152 log::error!("Error sending account update: {:?}", err);
153 break;
154 }
155 }
156 None => {
157 log::warn!("Program accounts stream has been closed, attempting to reconnect...");
158 break;
159 }
160 }
161 }
162 }
163 }
164
165 tokio::time::sleep(Duration::from_millis(RECONNECTION_DELAY_MS)).await;
166 }
167
168 Ok(())
169 }
170
171 fn update_types(&self) -> Vec<UpdateType> {
172 vec![UpdateType::AccountUpdate]
173 }
174}