solana_accountsdb_plugin_postgres/
accountsdb_plugin_postgres.rs

1/// Main entry for the PostgreSQL plugin
2use {
3    crate::{
4        accounts_selector::AccountsSelector,
5        postgres_client::{ParallelPostgresClient, PostgresClientBuilder},
6        transaction_selector::TransactionSelector,
7    },
8    bs58,
9    log::*,
10    serde_derive::{Deserialize, Serialize},
11    serde_json,
12    solana_accountsdb_plugin_interface::accountsdb_plugin_interface::{
13        AccountsDbPlugin, AccountsDbPluginError, ReplicaAccountInfoVersions,
14        ReplicaBlockInfoVersions, ReplicaTransactionInfoVersions, Result, SlotStatus,
15    },
16    solana_measure::measure::Measure,
17    solana_metrics::*,
18    std::{fs::File, io::Read},
19    thiserror::Error,
20};
21
22#[derive(Default)]
23pub struct AccountsDbPluginPostgres {
24    client: Option<ParallelPostgresClient>,
25    accounts_selector: Option<AccountsSelector>,
26    transaction_selector: Option<TransactionSelector>,
27}
28
29impl std::fmt::Debug for AccountsDbPluginPostgres {
30    fn fmt(&self, _: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
31        Ok(())
32    }
33}
34
35#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
36pub struct AccountsDbPluginPostgresConfig {
37    pub host: Option<String>,
38    pub user: Option<String>,
39    pub port: Option<u16>,
40    pub connection_str: Option<String>,
41    pub threads: Option<usize>,
42    pub batch_size: Option<usize>,
43    pub panic_on_db_errors: Option<bool>,
44    /// Indicates if to store historical data for accounts
45    pub store_account_historical_data: Option<bool>,
46    pub use_ssl: Option<bool>,
47    pub server_ca: Option<String>,
48    pub client_cert: Option<String>,
49    pub client_key: Option<String>,
50}
51
52#[derive(Error, Debug)]
53pub enum AccountsDbPluginPostgresError {
54    #[error("Error connecting to the backend data store. Error message: ({msg})")]
55    DataStoreConnectionError { msg: String },
56
57    #[error("Error preparing data store schema. Error message: ({msg})")]
58    DataSchemaError { msg: String },
59
60    #[error("Error preparing data store schema. Error message: ({msg})")]
61    ConfigurationError { msg: String },
62}
63
64impl AccountsDbPlugin for AccountsDbPluginPostgres {
65    fn name(&self) -> &'static str {
66        "AccountsDbPluginPostgres"
67    }
68
69    /// Do initialization for the PostgreSQL plugin.
70    ///
71    /// # Format of the config file:
72    /// * The `accounts_selector` section allows the user to controls accounts selections.
73    /// "accounts_selector" : {
74    ///     "accounts" : \["pubkey-1", "pubkey-2", ..., "pubkey-n"\],
75    /// }
76    /// or:
77    /// "accounts_selector" = {
78    ///     "owners" : \["pubkey-1", "pubkey-2", ..., "pubkey-m"\]
79    /// }
80    /// Accounts either satisyfing the accounts condition or owners condition will be selected.
81    /// When only owners is specified,
82    /// all accounts belonging to the owners will be streamed.
83    /// The accounts field supports wildcard to select all accounts:
84    /// "accounts_selector" : {
85    ///     "accounts" : \["*"\],
86    /// }
87    /// * "host", optional, specifies the PostgreSQL server.
88    /// * "user", optional, specifies the PostgreSQL user.
89    /// * "port", optional, specifies the PostgreSQL server's port.
90    /// * "connection_str", optional, the custom PostgreSQL connection string.
91    /// Please refer to https://docs.rs/postgres/0.19.2/postgres/config/struct.Config.html for the connection configuration.
92    /// When `connection_str` is set, the values in "host", "user" and "port" are ignored. If `connection_str` is not given,
93    /// `host` and `user` must be given.
94    /// "store_account_historical_data", optional, set it to 'true', to store historical account data to account_audit
95    /// table.
96    /// * "threads" optional, specifies the number of worker threads for the plugin. A thread
97    /// maintains a PostgreSQL connection to the server. The default is '10'.
98    /// * "batch_size" optional, specifies the batch size of bulk insert when the AccountsDb is created
99    /// from restoring a snapshot. The default is '10'.
100    /// * "panic_on_db_errors", optional, contols if to panic when there are errors replicating data to the
101    /// PostgreSQL database. The default is 'false'.
102    /// * "transaction_selector", optional, controls if and what transaction to store. If this field is missing
103    /// None of the transction is stored.
104    /// "transaction_selector" : {
105    ///     "mentions" : \["pubkey-1", "pubkey-2", ..., "pubkey-n"\],
106    /// }
107    /// The `mentions` field support wildcard to select all transaction or all 'vote' transactions:
108    /// For example, to select all transactions:
109    /// "transaction_selector" : {
110    ///     "mentions" : \["*"\],
111    /// }
112    /// To select all vote transactions:
113    /// "transaction_selector" : {
114    ///     "mentions" : \["all_votes"\],
115    /// }
116    /// # Examples
117    ///
118    /// {
119    ///    "libpath": "/home/solana/target/release/libsolana_accountsdb_plugin_postgres.so",
120    ///    "host": "host_foo",
121    ///    "user": "solana",
122    ///    "threads": 10,
123    ///    "accounts_selector" : {
124    ///       "owners" : ["9oT9R5ZyRovSVnt37QvVoBttGpNqR3J7unkb567NP8k3"]
125    ///    }
126    /// }
127
128    fn on_load(&mut self, config_file: &str) -> Result<()> {
129        solana_logger::setup_with_default("info");
130        info!(
131            "Loading plugin {:?} from config_file {:?}",
132            self.name(),
133            config_file
134        );
135        let mut file = File::open(config_file)?;
136        let mut contents = String::new();
137        file.read_to_string(&mut contents)?;
138
139        let result: serde_json::Value = serde_json::from_str(&contents).unwrap();
140        self.accounts_selector = Some(Self::create_accounts_selector_from_config(&result));
141        self.transaction_selector = Some(Self::create_transaction_selector_from_config(&result));
142
143        let result: serde_json::Result<AccountsDbPluginPostgresConfig> =
144            serde_json::from_str(&contents);
145        match result {
146            Err(err) => {
147                return Err(AccountsDbPluginError::ConfigFileReadError {
148                    msg: format!(
149                        "The config file is not in the JSON format expected: {:?}",
150                        err
151                    ),
152                })
153            }
154            Ok(config) => {
155                let client = PostgresClientBuilder::build_pararallel_postgres_client(&config)?;
156                self.client = Some(client);
157            }
158        }
159
160        Ok(())
161    }
162
163    fn on_unload(&mut self) {
164        info!("Unloading plugin: {:?}", self.name());
165
166        match &mut self.client {
167            None => {}
168            Some(client) => {
169                client.join().unwrap();
170            }
171        }
172    }
173
174    fn update_account(
175        &mut self,
176        account: ReplicaAccountInfoVersions,
177        slot: u64,
178        is_startup: bool,
179    ) -> Result<()> {
180        let mut measure_all = Measure::start("accountsdb-plugin-postgres-update-account-main");
181        match account {
182            ReplicaAccountInfoVersions::V0_0_1(account) => {
183                let mut measure_select =
184                    Measure::start("accountsdb-plugin-postgres-update-account-select");
185                if let Some(accounts_selector) = &self.accounts_selector {
186                    if !accounts_selector.is_account_selected(account.pubkey, account.owner) {
187                        return Ok(());
188                    }
189                } else {
190                    return Ok(());
191                }
192                measure_select.stop();
193                inc_new_counter_debug!(
194                    "accountsdb-plugin-postgres-update-account-select-us",
195                    measure_select.as_us() as usize,
196                    100000,
197                    100000
198                );
199
200                debug!(
201                    "Updating account {:?} with owner {:?} at slot {:?} using account selector {:?}",
202                    bs58::encode(account.pubkey).into_string(),
203                    bs58::encode(account.owner).into_string(),
204                    slot,
205                    self.accounts_selector.as_ref().unwrap()
206                );
207
208                match &mut self.client {
209                    None => {
210                        return Err(AccountsDbPluginError::Custom(Box::new(
211                            AccountsDbPluginPostgresError::DataStoreConnectionError {
212                                msg: "There is no connection to the PostgreSQL database."
213                                    .to_string(),
214                            },
215                        )));
216                    }
217                    Some(client) => {
218                        let mut measure_update =
219                            Measure::start("accountsdb-plugin-postgres-update-account-client");
220                        let result = { client.update_account(account, slot, is_startup) };
221                        measure_update.stop();
222
223                        inc_new_counter_debug!(
224                            "accountsdb-plugin-postgres-update-account-client-us",
225                            measure_update.as_us() as usize,
226                            100000,
227                            100000
228                        );
229
230                        if let Err(err) = result {
231                            return Err(AccountsDbPluginError::AccountsUpdateError {
232                                msg: format!("Failed to persist the update of account to the PostgreSQL database. Error: {:?}", err)
233                            });
234                        }
235                    }
236                }
237            }
238        }
239
240        measure_all.stop();
241
242        inc_new_counter_debug!(
243            "accountsdb-plugin-postgres-update-account-main-us",
244            measure_all.as_us() as usize,
245            100000,
246            100000
247        );
248
249        Ok(())
250    }
251
252    fn update_slot_status(
253        &mut self,
254        slot: u64,
255        parent: Option<u64>,
256        status: SlotStatus,
257    ) -> Result<()> {
258        info!("Updating slot {:?} at with status {:?}", slot, status);
259
260        match &mut self.client {
261            None => {
262                return Err(AccountsDbPluginError::Custom(Box::new(
263                    AccountsDbPluginPostgresError::DataStoreConnectionError {
264                        msg: "There is no connection to the PostgreSQL database.".to_string(),
265                    },
266                )));
267            }
268            Some(client) => {
269                let result = client.update_slot_status(slot, parent, status);
270
271                if let Err(err) = result {
272                    return Err(AccountsDbPluginError::SlotStatusUpdateError{
273                        msg: format!("Failed to persist the update of slot to the PostgreSQL database. Error: {:?}", err)
274                    });
275                }
276            }
277        }
278
279        Ok(())
280    }
281
282    fn notify_end_of_startup(&mut self) -> Result<()> {
283        info!("Notifying the end of startup for accounts notifications");
284        match &mut self.client {
285            None => {
286                return Err(AccountsDbPluginError::Custom(Box::new(
287                    AccountsDbPluginPostgresError::DataStoreConnectionError {
288                        msg: "There is no connection to the PostgreSQL database.".to_string(),
289                    },
290                )));
291            }
292            Some(client) => {
293                let result = client.notify_end_of_startup();
294
295                if let Err(err) = result {
296                    return Err(AccountsDbPluginError::SlotStatusUpdateError{
297                        msg: format!("Failed to notify the end of startup for accounts notifications. Error: {:?}", err)
298                    });
299                }
300            }
301        }
302        Ok(())
303    }
304
305    fn notify_transaction(
306        &mut self,
307        transaction_info: ReplicaTransactionInfoVersions,
308        slot: u64,
309    ) -> Result<()> {
310        match &mut self.client {
311            None => {
312                return Err(AccountsDbPluginError::Custom(Box::new(
313                    AccountsDbPluginPostgresError::DataStoreConnectionError {
314                        msg: "There is no connection to the PostgreSQL database.".to_string(),
315                    },
316                )));
317            }
318            Some(client) => match transaction_info {
319                ReplicaTransactionInfoVersions::V0_0_1(transaction_info) => {
320                    if let Some(transaction_selector) = &self.transaction_selector {
321                        if !transaction_selector.is_transaction_selected(
322                            transaction_info.is_vote,
323                            transaction_info.transaction.message().account_keys_iter(),
324                        ) {
325                            return Ok(());
326                        }
327                    } else {
328                        return Ok(());
329                    }
330
331                    let result = client.log_transaction_info(transaction_info, slot);
332
333                    if let Err(err) = result {
334                        return Err(AccountsDbPluginError::SlotStatusUpdateError{
335                                msg: format!("Failed to persist the transaction info to the PostgreSQL database. Error: {:?}", err)
336                            });
337                    }
338                }
339            },
340        }
341
342        Ok(())
343    }
344
345    fn notify_block_metadata(&mut self, block_info: ReplicaBlockInfoVersions) -> Result<()> {
346        match &mut self.client {
347            None => {
348                return Err(AccountsDbPluginError::Custom(Box::new(
349                    AccountsDbPluginPostgresError::DataStoreConnectionError {
350                        msg: "There is no connection to the PostgreSQL database.".to_string(),
351                    },
352                )));
353            }
354            Some(client) => match block_info {
355                ReplicaBlockInfoVersions::V0_0_1(block_info) => {
356                    let result = client.update_block_metadata(block_info);
357
358                    if let Err(err) = result {
359                        return Err(AccountsDbPluginError::SlotStatusUpdateError{
360                                msg: format!("Failed to persist the update of block metadata to the PostgreSQL database. Error: {:?}", err)
361                            });
362                    }
363                }
364            },
365        }
366
367        Ok(())
368    }
369
370    /// Check if the plugin is interested in account data
371    /// Default is true -- if the plugin is not interested in
372    /// account data, please return false.
373    fn account_data_notifications_enabled(&self) -> bool {
374        self.accounts_selector
375            .as_ref()
376            .map_or_else(|| false, |selector| selector.is_enabled())
377    }
378
379    /// Check if the plugin is interested in transaction data
380    fn transaction_notifications_enabled(&self) -> bool {
381        self.transaction_selector
382            .as_ref()
383            .map_or_else(|| false, |selector| selector.is_enabled())
384    }
385}
386
387impl AccountsDbPluginPostgres {
388    fn create_accounts_selector_from_config(config: &serde_json::Value) -> AccountsSelector {
389        let accounts_selector = &config["accounts_selector"];
390
391        if accounts_selector.is_null() {
392            AccountsSelector::default()
393        } else {
394            let accounts = &accounts_selector["accounts"];
395            let accounts: Vec<String> = if accounts.is_array() {
396                accounts
397                    .as_array()
398                    .unwrap()
399                    .iter()
400                    .map(|val| val.as_str().unwrap().to_string())
401                    .collect()
402            } else {
403                Vec::default()
404            };
405            let owners = &accounts_selector["owners"];
406            let owners: Vec<String> = if owners.is_array() {
407                owners
408                    .as_array()
409                    .unwrap()
410                    .iter()
411                    .map(|val| val.as_str().unwrap().to_string())
412                    .collect()
413            } else {
414                Vec::default()
415            };
416            AccountsSelector::new(&accounts, &owners)
417        }
418    }
419
420    fn create_transaction_selector_from_config(config: &serde_json::Value) -> TransactionSelector {
421        let transaction_selector = &config["transaction_selector"];
422
423        if transaction_selector.is_null() {
424            TransactionSelector::default()
425        } else {
426            let accounts = &transaction_selector["mentions"];
427            let accounts: Vec<String> = if accounts.is_array() {
428                accounts
429                    .as_array()
430                    .unwrap()
431                    .iter()
432                    .map(|val| val.as_str().unwrap().to_string())
433                    .collect()
434            } else {
435                Vec::default()
436            };
437            TransactionSelector::new(&accounts)
438        }
439    }
440
441    pub fn new() -> Self {
442        Self::default()
443    }
444}
445
446#[no_mangle]
447#[allow(improper_ctypes_definitions)]
448/// # Safety
449///
450/// This function returns the AccountsDbPluginPostgres pointer as trait AccountsDbPlugin.
451pub unsafe extern "C" fn _create_plugin() -> *mut dyn AccountsDbPlugin {
452    let plugin = AccountsDbPluginPostgres::new();
453    let plugin: Box<dyn AccountsDbPlugin> = Box::new(plugin);
454    Box::into_raw(plugin)
455}
456
457#[cfg(test)]
458pub(crate) mod tests {
459    use {super::*, serde_json};
460
461    #[test]
462    fn test_accounts_selector_from_config() {
463        let config = "{\"accounts_selector\" : { \
464           \"owners\" : [\"9xQeWvG816bUx9EPjHmaT23yvVM2ZWbrrpZb9PusVFin\"] \
465        }}";
466
467        let config: serde_json::Value = serde_json::from_str(config).unwrap();
468        AccountsDbPluginPostgres::create_accounts_selector_from_config(&config);
469    }
470}