carbon_rpc_program_subscribe_datasource/
lib.rs

1use async_trait::async_trait;
2use carbon_core::{
3    datasource::{AccountUpdate, Datasource, Update, UpdateType},
4    error::CarbonResult,
5    metrics::MetricsCollection,
6};
7use futures::StreamExt;
8use solana_client::{
9    nonblocking::pubsub_client::PubsubClient, rpc_config::RpcProgramAccountsConfig,
10};
11use solana_sdk::{account::Account, pubkey::Pubkey};
12use std::{str::FromStr, sync::Arc, time::Duration};
13use tokio::sync::mpsc::UnboundedSender;
14use tokio_util::sync::CancellationToken;
15
16const MAX_RECONNECTION_ATTEMPTS: u32 = 10;
17const RECONNECTION_DELAY_MS: u64 = 3000;
18
19#[derive(Debug, Clone)]
20pub struct Filters {
21    pub pubkey: Pubkey,
22    pub program_subscribe_config: Option<RpcProgramAccountsConfig>,
23}
24
25impl Filters {
26    pub fn new(pubkey: Pubkey, program_subscribe_config: Option<RpcProgramAccountsConfig>) -> Self {
27        Filters {
28            pubkey,
29            program_subscribe_config,
30        }
31    }
32}
33
34pub struct RpcProgramSubscribe {
35    pub rpc_ws_url: String,
36    pub filters: Filters,
37}
38
39impl RpcProgramSubscribe {
40    pub fn new(rpc_ws_url: String, filters: Filters) -> Self {
41        Self {
42            rpc_ws_url,
43            filters,
44        }
45    }
46}
47
48#[async_trait]
49impl Datasource for RpcProgramSubscribe {
50    async fn consume(
51        &self,
52        sender: &UnboundedSender<Update>,
53        cancellation_token: CancellationToken,
54        metrics: Arc<MetricsCollection>,
55    ) -> CarbonResult<()> {
56        let mut reconnection_attempts = 0;
57
58        loop {
59            if cancellation_token.is_cancelled() {
60                log::info!("Cancellation requested, stopping reconnection attempts");
61                break;
62            }
63
64            let client = match PubsubClient::new(&self.rpc_ws_url).await {
65                Ok(client) => client,
66                Err(err) => {
67                    log::error!("Failed to create RPC subscribe client: {}", err);
68                    reconnection_attempts += 1;
69                    if reconnection_attempts >= MAX_RECONNECTION_ATTEMPTS {
70                        return Err(carbon_core::error::Error::Custom(format!(
71                            "Failed to create RPC subscribe client after {} attempts: {}",
72                            MAX_RECONNECTION_ATTEMPTS, err
73                        )));
74                    }
75                    tokio::time::sleep(Duration::from_millis(RECONNECTION_DELAY_MS)).await;
76                    continue;
77                }
78            };
79
80            let filters = self.filters.clone();
81            let sender_clone = sender.clone();
82
83            let (mut program_stream, _program_unsub) = match client
84                .program_subscribe(&filters.pubkey, filters.program_subscribe_config)
85                .await
86            {
87                Ok(subscription) => subscription,
88                Err(err) => {
89                    log::error!("Failed to subscribe to program updates: {:?}", err);
90                    reconnection_attempts += 1;
91                    if reconnection_attempts > MAX_RECONNECTION_ATTEMPTS {
92                        return Err(carbon_core::error::Error::Custom(format!(
93                            "Failed to subscribe after {} attempts: {}",
94                            MAX_RECONNECTION_ATTEMPTS, err
95                        )));
96                    }
97                    tokio::time::sleep(Duration::from_millis(RECONNECTION_DELAY_MS)).await;
98                    continue;
99                }
100            };
101
102            reconnection_attempts = 0;
103
104            loop {
105                tokio::select! {
106                    _ = cancellation_token.cancelled() => {
107                        log::info!("Cancellation requested, stopping subscription...");
108                        return Ok(());
109                    }
110                    event_result = program_stream.next() => {
111                        match event_result {
112                            Some(acc_event) => {
113                                let start_time = std::time::Instant::now();
114                                let decoded_account: Account = match acc_event.value.account.decode() {
115                                    Some(account_data) => account_data,
116                                    None => {
117                                        log::error!("Error decoding account event");
118                                        continue;
119                                    }
120                                };
121
122                                let Ok(account_pubkey) = Pubkey::from_str(&acc_event.value.pubkey) else {
123                                    log::error!("Error parsing account pubkey. Value: {}", &acc_event.value.pubkey);
124                                    continue;
125                                };
126
127                                let update = Update::Account(AccountUpdate {
128                                    pubkey: account_pubkey,
129                                    account: decoded_account,
130                                    slot: acc_event.context.slot,
131                                });
132
133                                metrics
134                                    .record_histogram(
135                                        "program_subscribe_account_process_time_nanoseconds",
136                                        start_time.elapsed().as_nanos() as f64
137                                    )
138                                    .await
139                                    .unwrap_or_else(|value| log::error!("Error recording metric: {}", value));
140
141                                metrics.increment_counter("program_subscribe_accounts_processed", 1)
142                                    .await
143                                    .unwrap_or_else(|value| log::error!("Error recording metric: {}", value));
144
145                                if let Err(err) = sender_clone.send(update) {
146                                    log::error!("Error sending account update: {:?}", err);
147                                    break;
148                                }
149                            }
150                            None => {
151                                log::warn!("Program accounts stream has been closed, attempting to reconnect...");
152                                break;
153                            }
154                        }
155                    }
156                }
157            }
158
159            tokio::time::sleep(Duration::from_millis(RECONNECTION_DELAY_MS)).await;
160        }
161
162        Ok(())
163    }
164
165    fn update_types(&self) -> Vec<UpdateType> {
166        vec![UpdateType::AccountUpdate]
167    }
168}