carbon_rpc_program_subscribe_datasource/
lib.rs1use async_trait::async_trait;
2use carbon_core::{
3 datasource::{AccountUpdate, Datasource, Update, UpdateType},
4 error::CarbonResult,
5 metrics::MetricsCollection,
6};
7use futures::StreamExt;
8use solana_client::{
9 nonblocking::pubsub_client::PubsubClient, rpc_config::RpcProgramAccountsConfig,
10};
11use solana_sdk::{account::Account, pubkey::Pubkey};
12use std::{str::FromStr, sync::Arc, time::Duration};
13use tokio::sync::mpsc::UnboundedSender;
14use tokio_util::sync::CancellationToken;
15
16const MAX_RECONNECTION_ATTEMPTS: u32 = 10;
17const RECONNECTION_DELAY_MS: u64 = 3000;
18
19#[derive(Debug, Clone)]
20pub struct Filters {
21 pub pubkey: Pubkey,
22 pub program_subscribe_config: Option<RpcProgramAccountsConfig>,
23}
24
25impl Filters {
26 pub fn new(pubkey: Pubkey, program_subscribe_config: Option<RpcProgramAccountsConfig>) -> Self {
27 Filters {
28 pubkey,
29 program_subscribe_config,
30 }
31 }
32}
33
34pub struct RpcProgramSubscribe {
35 pub rpc_ws_url: String,
36 pub filters: Filters,
37}
38
39impl RpcProgramSubscribe {
40 pub fn new(rpc_ws_url: String, filters: Filters) -> Self {
41 Self {
42 rpc_ws_url,
43 filters,
44 }
45 }
46}
47
48#[async_trait]
49impl Datasource for RpcProgramSubscribe {
50 async fn consume(
51 &self,
52 sender: &UnboundedSender<Update>,
53 cancellation_token: CancellationToken,
54 metrics: Arc<MetricsCollection>,
55 ) -> CarbonResult<()> {
56 let mut reconnection_attempts = 0;
57
58 loop {
59 if cancellation_token.is_cancelled() {
60 log::info!("Cancellation requested, stopping reconnection attempts");
61 break;
62 }
63
64 let client = match PubsubClient::new(&self.rpc_ws_url).await {
65 Ok(client) => client,
66 Err(err) => {
67 log::error!("Failed to create RPC subscribe client: {}", err);
68 reconnection_attempts += 1;
69 if reconnection_attempts >= MAX_RECONNECTION_ATTEMPTS {
70 return Err(carbon_core::error::Error::Custom(format!(
71 "Failed to create RPC subscribe client after {} attempts: {}",
72 MAX_RECONNECTION_ATTEMPTS, err
73 )));
74 }
75 tokio::time::sleep(Duration::from_millis(RECONNECTION_DELAY_MS)).await;
76 continue;
77 }
78 };
79
80 let filters = self.filters.clone();
81 let sender_clone = sender.clone();
82
83 let (mut program_stream, _program_unsub) = match client
84 .program_subscribe(&filters.pubkey, filters.program_subscribe_config)
85 .await
86 {
87 Ok(subscription) => subscription,
88 Err(err) => {
89 log::error!("Failed to subscribe to program updates: {:?}", err);
90 reconnection_attempts += 1;
91 if reconnection_attempts > MAX_RECONNECTION_ATTEMPTS {
92 return Err(carbon_core::error::Error::Custom(format!(
93 "Failed to subscribe after {} attempts: {}",
94 MAX_RECONNECTION_ATTEMPTS, err
95 )));
96 }
97 tokio::time::sleep(Duration::from_millis(RECONNECTION_DELAY_MS)).await;
98 continue;
99 }
100 };
101
102 reconnection_attempts = 0;
103
104 loop {
105 tokio::select! {
106 _ = cancellation_token.cancelled() => {
107 log::info!("Cancellation requested, stopping subscription...");
108 return Ok(());
109 }
110 event_result = program_stream.next() => {
111 match event_result {
112 Some(acc_event) => {
113 let start_time = std::time::Instant::now();
114 let decoded_account: Account = match acc_event.value.account.decode() {
115 Some(account_data) => account_data,
116 None => {
117 log::error!("Error decoding account event");
118 continue;
119 }
120 };
121
122 let Ok(account_pubkey) = Pubkey::from_str(&acc_event.value.pubkey) else {
123 log::error!("Error parsing account pubkey. Value: {}", &acc_event.value.pubkey);
124 continue;
125 };
126
127 let update = Update::Account(AccountUpdate {
128 pubkey: account_pubkey,
129 account: decoded_account,
130 slot: acc_event.context.slot,
131 });
132
133 metrics
134 .record_histogram(
135 "program_subscribe_account_process_time_nanoseconds",
136 start_time.elapsed().as_nanos() as f64
137 )
138 .await
139 .unwrap_or_else(|value| log::error!("Error recording metric: {}", value));
140
141 metrics.increment_counter("program_subscribe_accounts_processed", 1)
142 .await
143 .unwrap_or_else(|value| log::error!("Error recording metric: {}", value));
144
145 if let Err(err) = sender_clone.send(update) {
146 log::error!("Error sending account update: {:?}", err);
147 break;
148 }
149 }
150 None => {
151 log::warn!("Program accounts stream has been closed, attempting to reconnect...");
152 break;
153 }
154 }
155 }
156 }
157 }
158
159 tokio::time::sleep(Duration::from_millis(RECONNECTION_DELAY_MS)).await;
160 }
161
162 Ok(())
163 }
164
165 fn update_types(&self) -> Vec<UpdateType> {
166 vec![UpdateType::AccountUpdate]
167 }
168}