carbon-rpc-program-subscribe-datasource 0.12.0

RPC Program Subscribe Datasource
Documentation
use {
    async_trait::async_trait,
    carbon_core::{
        datasource::{AccountUpdate, Datasource, DatasourceId, Update, UpdateType},
        error::CarbonResult,
        metrics::MetricsCollection,
    },
    futures::StreamExt,
    solana_account::Account,
    solana_client::{
        nonblocking::pubsub_client::PubsubClient, rpc_config::RpcProgramAccountsConfig,
    },
    solana_pubkey::Pubkey,
    std::{str::FromStr, sync::Arc, time::Duration},
    tokio::sync::mpsc::Sender,
    tokio_util::sync::CancellationToken,
};

const MAX_RECONNECTION_ATTEMPTS: u32 = 10;
const RECONNECTION_DELAY_MS: u64 = 3000;

#[derive(Debug, Clone)]
pub struct Filters {
    pub pubkey: Pubkey,
    pub program_subscribe_config: Option<RpcProgramAccountsConfig>,
}

impl Filters {
    pub const fn new(
        pubkey: Pubkey,
        program_subscribe_config: Option<RpcProgramAccountsConfig>,
    ) -> Self {
        Filters {
            pubkey,
            program_subscribe_config,
        }
    }
}

pub struct RpcProgramSubscribe {
    pub rpc_ws_url: String,
    pub filters: Filters,
}

impl RpcProgramSubscribe {
    pub const fn new(rpc_ws_url: String, filters: Filters) -> Self {
        Self {
            rpc_ws_url,
            filters,
        }
    }
}

#[async_trait]
impl Datasource for RpcProgramSubscribe {
    async fn consume(
        &self,
        id: DatasourceId,
        sender: Sender<(Update, DatasourceId)>,
        cancellation_token: CancellationToken,
        metrics: Arc<MetricsCollection>,
    ) -> CarbonResult<()> {
        let mut reconnection_attempts = 0;

        loop {
            if cancellation_token.is_cancelled() {
                log::info!("Cancellation requested, stopping reconnection attempts");
                break;
            }

            let client = match PubsubClient::new(&self.rpc_ws_url).await {
                Ok(client) => client,
                Err(err) => {
                    log::error!("Failed to create RPC subscribe client: {}", err);
                    reconnection_attempts += 1;
                    if reconnection_attempts >= MAX_RECONNECTION_ATTEMPTS {
                        return Err(carbon_core::error::Error::Custom(format!(
                            "Failed to create RPC subscribe client after {} attempts: {}",
                            MAX_RECONNECTION_ATTEMPTS, err
                        )));
                    }
                    tokio::time::sleep(Duration::from_millis(RECONNECTION_DELAY_MS)).await;
                    continue;
                }
            };

            let filters = self.filters.clone();
            let sender_clone = sender.clone();
            let id_for_loop = id.clone();

            let (mut program_stream, _program_unsub) = match client
                .program_subscribe(&filters.pubkey, filters.program_subscribe_config)
                .await
            {
                Ok(subscription) => subscription,
                Err(err) => {
                    log::error!("Failed to subscribe to program updates: {:?}", err);
                    reconnection_attempts += 1;
                    if reconnection_attempts > MAX_RECONNECTION_ATTEMPTS {
                        return Err(carbon_core::error::Error::Custom(format!(
                            "Failed to subscribe after {} attempts: {}",
                            MAX_RECONNECTION_ATTEMPTS, err
                        )));
                    }
                    tokio::time::sleep(Duration::from_millis(RECONNECTION_DELAY_MS)).await;
                    continue;
                }
            };

            reconnection_attempts = 0;

            loop {
                tokio::select! {
                    _ = cancellation_token.cancelled() => {
                        log::info!("Cancellation requested, stopping subscription...");
                        return Ok(());
                    }
                    event_result = program_stream.next() => {
                        match event_result {
                            Some(acc_event) => {
                                let start_time = std::time::Instant::now();
                                let decoded_account: Account = match acc_event.value.account.decode() {
                                    Some(account_data) => account_data,
                                    None => {
                                        log::error!("Error decoding account event");
                                        continue;
                                    }
                                };

                                let Ok(account_pubkey) = Pubkey::from_str(&acc_event.value.pubkey) else {
                                    log::error!("Error parsing account pubkey. Value: {}", &acc_event.value.pubkey);
                                    continue;
                                };

                                let update = Update::Account(AccountUpdate {
                                    pubkey: account_pubkey,
                                    account: decoded_account,
                                    slot: acc_event.context.slot,
                                    transaction_signature: None,
                                });

                                metrics
                                    .record_histogram(
                                        "program_subscribe_account_process_time_nanoseconds",
                                        start_time.elapsed().as_nanos() as f64
                                    )
                                    .await
                                    .unwrap_or_else(|value| log::error!("Error recording metric: {}", value));

                                metrics.increment_counter("program_subscribe_accounts_processed", 1)
                                    .await
                                    .unwrap_or_else(|value| log::error!("Error recording metric: {}", value));

                                if let Err(err) = sender_clone.try_send((update, id_for_loop.clone())) {
                                    log::error!("Error sending account update: {:?}", err);
                                    break;
                                }
                            }
                            None => {
                                log::warn!("Program accounts stream has been closed, attempting to reconnect...");
                                break;
                            }
                        }
                    }
                }
            }

            tokio::time::sleep(Duration::from_millis(RECONNECTION_DELAY_MS)).await;
        }

        Ok(())
    }

    fn update_types(&self) -> Vec<UpdateType> {
        vec![UpdateType::AccountUpdate]
    }
}