carbon_rpc_program_subscribe_datasource/
lib.rs

1use {
2    async_trait::async_trait,
3    carbon_core::{
4        datasource::{AccountUpdate, Datasource, Update, UpdateType},
5        error::CarbonResult,
6        metrics::MetricsCollection,
7    },
8    futures::StreamExt,
9    solana_client::{
10        nonblocking::pubsub_client::PubsubClient, rpc_config::RpcProgramAccountsConfig,
11    },
12    solana_sdk::{account::Account, pubkey::Pubkey},
13    std::{str::FromStr, sync::Arc, time::Duration},
14    tokio::sync::mpsc::UnboundedSender,
15    tokio_util::sync::CancellationToken,
16};
17
18const MAX_RECONNECTION_ATTEMPTS: u32 = 10;
19const RECONNECTION_DELAY_MS: u64 = 3000;
20
21#[derive(Debug, Clone)]
22pub struct Filters {
23    pub pubkey: Pubkey,
24    pub program_subscribe_config: Option<RpcProgramAccountsConfig>,
25}
26
27impl Filters {
28    pub fn new(pubkey: Pubkey, program_subscribe_config: Option<RpcProgramAccountsConfig>) -> Self {
29        Filters {
30            pubkey,
31            program_subscribe_config,
32        }
33    }
34}
35
36pub struct RpcProgramSubscribe {
37    pub rpc_ws_url: String,
38    pub filters: Filters,
39}
40
41impl RpcProgramSubscribe {
42    pub fn new(rpc_ws_url: String, filters: Filters) -> Self {
43        Self {
44            rpc_ws_url,
45            filters,
46        }
47    }
48}
49
50#[async_trait]
51impl Datasource for RpcProgramSubscribe {
52    async fn consume(
53        &self,
54        sender: &UnboundedSender<Update>,
55        cancellation_token: CancellationToken,
56        metrics: Arc<MetricsCollection>,
57    ) -> CarbonResult<()> {
58        let mut reconnection_attempts = 0;
59
60        loop {
61            if cancellation_token.is_cancelled() {
62                log::info!("Cancellation requested, stopping reconnection attempts");
63                break;
64            }
65
66            let client = match PubsubClient::new(&self.rpc_ws_url).await {
67                Ok(client) => client,
68                Err(err) => {
69                    log::error!("Failed to create RPC subscribe client: {}", err);
70                    reconnection_attempts += 1;
71                    if reconnection_attempts >= MAX_RECONNECTION_ATTEMPTS {
72                        return Err(carbon_core::error::Error::Custom(format!(
73                            "Failed to create RPC subscribe client after {} attempts: {}",
74                            MAX_RECONNECTION_ATTEMPTS, err
75                        )));
76                    }
77                    tokio::time::sleep(Duration::from_millis(RECONNECTION_DELAY_MS)).await;
78                    continue;
79                }
80            };
81
82            let filters = self.filters.clone();
83            let sender_clone = sender.clone();
84
85            let (mut program_stream, _program_unsub) = match client
86                .program_subscribe(&filters.pubkey, filters.program_subscribe_config)
87                .await
88            {
89                Ok(subscription) => subscription,
90                Err(err) => {
91                    log::error!("Failed to subscribe to program updates: {:?}", err);
92                    reconnection_attempts += 1;
93                    if reconnection_attempts > MAX_RECONNECTION_ATTEMPTS {
94                        return Err(carbon_core::error::Error::Custom(format!(
95                            "Failed to subscribe after {} attempts: {}",
96                            MAX_RECONNECTION_ATTEMPTS, err
97                        )));
98                    }
99                    tokio::time::sleep(Duration::from_millis(RECONNECTION_DELAY_MS)).await;
100                    continue;
101                }
102            };
103
104            reconnection_attempts = 0;
105
106            loop {
107                tokio::select! {
108                    _ = cancellation_token.cancelled() => {
109                        log::info!("Cancellation requested, stopping subscription...");
110                        return Ok(());
111                    }
112                    event_result = program_stream.next() => {
113                        match event_result {
114                            Some(acc_event) => {
115                                let start_time = std::time::Instant::now();
116                                let decoded_account: Account = match acc_event.value.account.decode() {
117                                    Some(account_data) => account_data,
118                                    None => {
119                                        log::error!("Error decoding account event");
120                                        continue;
121                                    }
122                                };
123
124                                let Ok(account_pubkey) = Pubkey::from_str(&acc_event.value.pubkey) else {
125                                    log::error!("Error parsing account pubkey. Value: {}", &acc_event.value.pubkey);
126                                    continue;
127                                };
128
129                                let update = Update::Account(AccountUpdate {
130                                    pubkey: account_pubkey,
131                                    account: decoded_account,
132                                    slot: acc_event.context.slot,
133                                });
134
135                                metrics
136                                    .record_histogram(
137                                        "program_subscribe_account_process_time_nanoseconds",
138                                        start_time.elapsed().as_nanos() as f64
139                                    )
140                                    .await
141                                    .unwrap_or_else(|value| log::error!("Error recording metric: {}", value));
142
143                                metrics.increment_counter("program_subscribe_accounts_processed", 1)
144                                    .await
145                                    .unwrap_or_else(|value| log::error!("Error recording metric: {}", value));
146
147                                if let Err(err) = sender_clone.send(update) {
148                                    log::error!("Error sending account update: {:?}", err);
149                                    break;
150                                }
151                            }
152                            None => {
153                                log::warn!("Program accounts stream has been closed, attempting to reconnect...");
154                                break;
155                            }
156                        }
157                    }
158                }
159            }
160
161            tokio::time::sleep(Duration::from_millis(RECONNECTION_DELAY_MS)).await;
162        }
163
164        Ok(())
165    }
166
167    fn update_types(&self) -> Vec<UpdateType> {
168        vec![UpdateType::AccountUpdate]
169    }
170}