carbon_rpc_program_subscribe_datasource/
lib.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
use async_trait::async_trait;
use carbon_core::{
    datasource::{AccountUpdate, Datasource, Update, UpdateType},
    error::CarbonResult,
    metrics::MetricsCollection,
};
use futures::StreamExt;
use solana_client::{
    nonblocking::pubsub_client::PubsubClient, rpc_config::RpcProgramAccountsConfig,
};
use solana_sdk::{account::Account, pubkey::Pubkey};
use std::{str::FromStr, sync::Arc};
use tokio::sync::mpsc::UnboundedSender;
use tokio_util::sync::CancellationToken;

#[derive(Debug, Clone)]
pub struct Filters {
    pub pubkey: Pubkey,
    pub program_subscribe_config: Option<RpcProgramAccountsConfig>,
}

impl Filters {
    pub fn new(pubkey: Pubkey, program_subscribe_config: Option<RpcProgramAccountsConfig>) -> Self {
        Filters {
            pubkey,
            program_subscribe_config,
        }
    }
}

pub struct RpcProgramSubscribe {
    pub rpc_ws_url: String,
    pub filters: Filters,
}

impl RpcProgramSubscribe {
    pub fn new(rpc_ws_url: String, filters: Filters) -> Self {
        Self {
            rpc_ws_url,
            filters,
        }
    }
}

#[async_trait]
impl Datasource for RpcProgramSubscribe {
    async fn consume(
        &self,
        sender: &UnboundedSender<Update>,
        cancellation_token: CancellationToken,
        metrics: Arc<MetricsCollection>,
    ) -> CarbonResult<()> {
        let client = PubsubClient::new(&self.rpc_ws_url).await.map_err(|err| {
            carbon_core::error::Error::Custom(format!(
                "Failed to create an RPC subscribe client: {err}"
            ))
        })?;

        let filters = self.filters.clone();
        let sender = sender.clone();

        tokio::spawn(async move {
            let sender_clone = sender.clone();
            let (mut stream, _unsub) = match client
                .program_subscribe(&filters.pubkey, filters.program_subscribe_config)
                .await
            {
                Ok(subscription) => subscription,
                Err(err) => {
                    log::error!("Failed to subscribe to blocks updates: {:?}", err);
                    return;
                }
            };

            loop {
                tokio::select! {
                    _ = cancellation_token.cancelled() => {
                        log::info!("Cancelling RPC program subscription...");
                        break;
                    }
                    event_result = stream.next() => {
                        match event_result {
                            Some(acc_event) => {
                                    let start_time = std::time::Instant::now();
                                    let decoded_account: Account = match acc_event.value.account.decode() {
                                        Some(account_data) => account_data,
                                        None => {
                                            log::error!("Error decoding Helius WS Account event");
                                            continue;
                                        }
                                    };

                                    let Ok(account_pubkey) = Pubkey::from_str(&acc_event.value.pubkey) else {
                                        log::error!("Error parsing account pubkey. Value: {}", &acc_event.value.pubkey);
                                        continue;
                                    };

                                    let update = Update::Account(AccountUpdate {
                                        pubkey: account_pubkey,
                                        account: decoded_account,
                                        slot: acc_event.context.slot,
                                    });

                                    metrics
                                            .record_histogram(
                                                "program_subscribe_account_process_time_nanoseconds",
                                                start_time.elapsed().as_nanos() as f64
                                            )
                                            .await
                                            .unwrap_or_else(|value| log::error!("Error recording metric: {}", value));

                                    metrics.increment_counter("program_subscribe_accounts_processed", 1).await.unwrap_or_else(|value| log::error!("Error recording metric: {}", value));


                                    if let Err(err) = sender_clone.send(update) {
                                        log::error!("Error sending account update: {:?}", err);
                                        break;
                                    }
                            }
                            None => {
                                log::info!("Program accounts stream has been closed");
                                break;
                            }
                        }
                    }
                }
            }
        });

        Ok(())
    }

    fn update_types(&self) -> Vec<UpdateType> {
        vec![UpdateType::AccountUpdate]
    }
}