carbon_rpc_program_subscribe_datasource/
lib.rs

1use {
2    async_trait::async_trait,
3    carbon_core::{
4        datasource::{AccountUpdate, Datasource, Update, UpdateType},
5        error::CarbonResult,
6        metrics::MetricsCollection,
7    },
8    futures::StreamExt,
9    solana_account::Account,
10    solana_client::{
11        nonblocking::pubsub_client::PubsubClient, rpc_config::RpcProgramAccountsConfig,
12    },
13    solana_pubkey::Pubkey,
14    std::{str::FromStr, sync::Arc, time::Duration},
15    tokio::sync::mpsc::Sender,
16    tokio_util::sync::CancellationToken,
17};
18
19const MAX_RECONNECTION_ATTEMPTS: u32 = 10;
20const RECONNECTION_DELAY_MS: u64 = 3000;
21
22#[derive(Debug, Clone)]
23pub struct Filters {
24    pub pubkey: Pubkey,
25    pub program_subscribe_config: Option<RpcProgramAccountsConfig>,
26}
27
28impl Filters {
29    pub const fn new(
30        pubkey: Pubkey,
31        program_subscribe_config: Option<RpcProgramAccountsConfig>,
32    ) -> Self {
33        Filters {
34            pubkey,
35            program_subscribe_config,
36        }
37    }
38}
39
40pub struct RpcProgramSubscribe {
41    pub rpc_ws_url: String,
42    pub filters: Filters,
43}
44
45impl RpcProgramSubscribe {
46    pub const fn new(rpc_ws_url: String, filters: Filters) -> Self {
47        Self {
48            rpc_ws_url,
49            filters,
50        }
51    }
52}
53
54#[async_trait]
55impl Datasource for RpcProgramSubscribe {
56    async fn consume(
57        &self,
58        sender: &Sender<Update>,
59        cancellation_token: CancellationToken,
60        metrics: Arc<MetricsCollection>,
61    ) -> CarbonResult<()> {
62        let mut reconnection_attempts = 0;
63
64        loop {
65            if cancellation_token.is_cancelled() {
66                log::info!("Cancellation requested, stopping reconnection attempts");
67                break;
68            }
69
70            let client = match PubsubClient::new(&self.rpc_ws_url).await {
71                Ok(client) => client,
72                Err(err) => {
73                    log::error!("Failed to create RPC subscribe client: {}", err);
74                    reconnection_attempts += 1;
75                    if reconnection_attempts >= MAX_RECONNECTION_ATTEMPTS {
76                        return Err(carbon_core::error::Error::Custom(format!(
77                            "Failed to create RPC subscribe client after {} attempts: {}",
78                            MAX_RECONNECTION_ATTEMPTS, err
79                        )));
80                    }
81                    tokio::time::sleep(Duration::from_millis(RECONNECTION_DELAY_MS)).await;
82                    continue;
83                }
84            };
85
86            let filters = self.filters.clone();
87            let sender_clone = sender.clone();
88
89            let (mut program_stream, _program_unsub) = match client
90                .program_subscribe(&filters.pubkey, filters.program_subscribe_config)
91                .await
92            {
93                Ok(subscription) => subscription,
94                Err(err) => {
95                    log::error!("Failed to subscribe to program updates: {:?}", err);
96                    reconnection_attempts += 1;
97                    if reconnection_attempts > MAX_RECONNECTION_ATTEMPTS {
98                        return Err(carbon_core::error::Error::Custom(format!(
99                            "Failed to subscribe after {} attempts: {}",
100                            MAX_RECONNECTION_ATTEMPTS, err
101                        )));
102                    }
103                    tokio::time::sleep(Duration::from_millis(RECONNECTION_DELAY_MS)).await;
104                    continue;
105                }
106            };
107
108            reconnection_attempts = 0;
109
110            loop {
111                tokio::select! {
112                    _ = cancellation_token.cancelled() => {
113                        log::info!("Cancellation requested, stopping subscription...");
114                        return Ok(());
115                    }
116                    event_result = program_stream.next() => {
117                        match event_result {
118                            Some(acc_event) => {
119                                let start_time = std::time::Instant::now();
120                                let decoded_account: Account = match acc_event.value.account.decode() {
121                                    Some(account_data) => account_data,
122                                    None => {
123                                        log::error!("Error decoding account event");
124                                        continue;
125                                    }
126                                };
127
128                                let Ok(account_pubkey) = Pubkey::from_str(&acc_event.value.pubkey) else {
129                                    log::error!("Error parsing account pubkey. Value: {}", &acc_event.value.pubkey);
130                                    continue;
131                                };
132
133                                let update = Update::Account(AccountUpdate {
134                                    pubkey: account_pubkey,
135                                    account: decoded_account,
136                                    slot: acc_event.context.slot,
137                                });
138
139                                metrics
140                                    .record_histogram(
141                                        "program_subscribe_account_process_time_nanoseconds",
142                                        start_time.elapsed().as_nanos() as f64
143                                    )
144                                    .await
145                                    .unwrap_or_else(|value| log::error!("Error recording metric: {}", value));
146
147                                metrics.increment_counter("program_subscribe_accounts_processed", 1)
148                                    .await
149                                    .unwrap_or_else(|value| log::error!("Error recording metric: {}", value));
150
151                                if let Err(err) = sender_clone.try_send(update) {
152                                    log::error!("Error sending account update: {:?}", err);
153                                    break;
154                                }
155                            }
156                            None => {
157                                log::warn!("Program accounts stream has been closed, attempting to reconnect...");
158                                break;
159                            }
160                        }
161                    }
162                }
163            }
164
165            tokio::time::sleep(Duration::from_millis(RECONNECTION_DELAY_MS)).await;
166        }
167
168        Ok(())
169    }
170
171    fn update_types(&self) -> Vec<UpdateType> {
172        vec![UpdateType::AccountUpdate]
173    }
174}