carbon_rpc_program_subscribe_datasource/
lib.rs

1use {
2    async_trait::async_trait,
3    carbon_core::{
4        datasource::{AccountUpdate, Datasource, DatasourceId, Update, UpdateType},
5        error::CarbonResult,
6        metrics::MetricsCollection,
7    },
8    futures::StreamExt,
9    solana_account::Account,
10    solana_client::{
11        nonblocking::pubsub_client::PubsubClient, rpc_config::RpcProgramAccountsConfig,
12    },
13    solana_pubkey::Pubkey,
14    std::{str::FromStr, sync::Arc, time::Duration},
15    tokio::sync::mpsc::Sender,
16    tokio_util::sync::CancellationToken,
17};
18
19const MAX_RECONNECTION_ATTEMPTS: u32 = 10;
20const RECONNECTION_DELAY_MS: u64 = 3000;
21
22#[derive(Debug, Clone)]
23pub struct Filters {
24    pub pubkey: Pubkey,
25    pub program_subscribe_config: Option<RpcProgramAccountsConfig>,
26}
27
28impl Filters {
29    pub const fn new(
30        pubkey: Pubkey,
31        program_subscribe_config: Option<RpcProgramAccountsConfig>,
32    ) -> Self {
33        Filters {
34            pubkey,
35            program_subscribe_config,
36        }
37    }
38}
39
40pub struct RpcProgramSubscribe {
41    pub rpc_ws_url: String,
42    pub filters: Filters,
43}
44
45impl RpcProgramSubscribe {
46    pub const fn new(rpc_ws_url: String, filters: Filters) -> Self {
47        Self {
48            rpc_ws_url,
49            filters,
50        }
51    }
52}
53
54#[async_trait]
55impl Datasource for RpcProgramSubscribe {
56    async fn consume(
57        &self,
58        id: DatasourceId,
59        sender: Sender<(Update, DatasourceId)>,
60        cancellation_token: CancellationToken,
61        metrics: Arc<MetricsCollection>,
62    ) -> CarbonResult<()> {
63        let mut reconnection_attempts = 0;
64
65        loop {
66            if cancellation_token.is_cancelled() {
67                log::info!("Cancellation requested, stopping reconnection attempts");
68                break;
69            }
70
71            let client = match PubsubClient::new(&self.rpc_ws_url).await {
72                Ok(client) => client,
73                Err(err) => {
74                    log::error!("Failed to create RPC subscribe client: {}", err);
75                    reconnection_attempts += 1;
76                    if reconnection_attempts >= MAX_RECONNECTION_ATTEMPTS {
77                        return Err(carbon_core::error::Error::Custom(format!(
78                            "Failed to create RPC subscribe client after {} attempts: {}",
79                            MAX_RECONNECTION_ATTEMPTS, err
80                        )));
81                    }
82                    tokio::time::sleep(Duration::from_millis(RECONNECTION_DELAY_MS)).await;
83                    continue;
84                }
85            };
86
87            let filters = self.filters.clone();
88            let sender_clone = sender.clone();
89            let id_for_loop = id.clone();
90
91            let (mut program_stream, _program_unsub) = match client
92                .program_subscribe(&filters.pubkey, filters.program_subscribe_config)
93                .await
94            {
95                Ok(subscription) => subscription,
96                Err(err) => {
97                    log::error!("Failed to subscribe to program updates: {:?}", err);
98                    reconnection_attempts += 1;
99                    if reconnection_attempts > MAX_RECONNECTION_ATTEMPTS {
100                        return Err(carbon_core::error::Error::Custom(format!(
101                            "Failed to subscribe after {} attempts: {}",
102                            MAX_RECONNECTION_ATTEMPTS, err
103                        )));
104                    }
105                    tokio::time::sleep(Duration::from_millis(RECONNECTION_DELAY_MS)).await;
106                    continue;
107                }
108            };
109
110            reconnection_attempts = 0;
111
112            loop {
113                tokio::select! {
114                    _ = cancellation_token.cancelled() => {
115                        log::info!("Cancellation requested, stopping subscription...");
116                        return Ok(());
117                    }
118                    event_result = program_stream.next() => {
119                        match event_result {
120                            Some(acc_event) => {
121                                let start_time = std::time::Instant::now();
122                                let decoded_account: Account = match acc_event.value.account.decode() {
123                                    Some(account_data) => account_data,
124                                    None => {
125                                        log::error!("Error decoding account event");
126                                        continue;
127                                    }
128                                };
129
130                                let Ok(account_pubkey) = Pubkey::from_str(&acc_event.value.pubkey) else {
131                                    log::error!("Error parsing account pubkey. Value: {}", &acc_event.value.pubkey);
132                                    continue;
133                                };
134
135                                let update = Update::Account(AccountUpdate {
136                                    pubkey: account_pubkey,
137                                    account: decoded_account,
138                                    slot: acc_event.context.slot,
139                                    transaction_signature: None,
140                                });
141
142                                metrics
143                                    .record_histogram(
144                                        "program_subscribe_account_process_time_nanoseconds",
145                                        start_time.elapsed().as_nanos() as f64
146                                    )
147                                    .await
148                                    .unwrap_or_else(|value| log::error!("Error recording metric: {}", value));
149
150                                metrics.increment_counter("program_subscribe_accounts_processed", 1)
151                                    .await
152                                    .unwrap_or_else(|value| log::error!("Error recording metric: {}", value));
153
154                                if let Err(err) = sender_clone.try_send((update, id_for_loop.clone())) {
155                                    log::error!("Error sending account update: {:?}", err);
156                                    break;
157                                }
158                            }
159                            None => {
160                                log::warn!("Program accounts stream has been closed, attempting to reconnect...");
161                                break;
162                            }
163                        }
164                    }
165                }
166            }
167
168            tokio::time::sleep(Duration::from_millis(RECONNECTION_DELAY_MS)).await;
169        }
170
171        Ok(())
172    }
173
174    fn update_types(&self) -> Vec<UpdateType> {
175        vec![UpdateType::AccountUpdate]
176    }
177}