solana_geyser_plugin_postgres/
geyser_plugin_postgres.rs

1/// Main entry for the PostgreSQL plugin
2use {
3    crate::{
4        accounts_selector::AccountsSelector,
5        postgres_client::{ParallelPostgresClient, PostgresClientBuilder},
6        transaction_selector::TransactionSelector,
7    },
8    bs58,
9    log::*,
10    serde_derive::{Deserialize, Serialize},
11    serde_json,
12    solana_geyser_plugin_interface::geyser_plugin_interface::{
13        GeyserPlugin, GeyserPluginError, ReplicaAccountInfoVersions, ReplicaBlockInfoVersions,
14        ReplicaTransactionInfoVersions, Result, SlotStatus,
15    },
16    solana_measure::measure::Measure,
17    solana_metrics::*,
18    std::{fs::File, io::Read},
19    thiserror::Error,
20};
21
22#[derive(Default)]
23pub struct GeyserPluginPostgres {
24    client: Option<ParallelPostgresClient>,
25    accounts_selector: Option<AccountsSelector>,
26    transaction_selector: Option<TransactionSelector>,
27}
28
29impl std::fmt::Debug for GeyserPluginPostgres {
30    fn fmt(&self, _: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
31        Ok(())
32    }
33}
34
35/// The Configuration for the PostgreSQL plugin
36#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
37pub struct GeyserPluginPostgresConfig {
38    /// The host name or IP of the PostgreSQL server
39    pub host: Option<String>,
40
41    /// The user name of the PostgreSQL server.
42    pub user: Option<String>,
43
44    /// The port number of the PostgreSQL database, the default is 5432
45    pub port: Option<u16>,
46
47    /// The connection string of PostgreSQL database, if this is set
48    /// `host`, `user` and `port` will be ignored.
49    pub connection_str: Option<String>,
50
51    /// Controls the number of threads establishing connections to
52    /// the PostgreSQL server. The default is 10.
53    pub threads: Option<usize>,
54
55    /// Controls the batch size when bulk loading accounts.
56    /// The default is 10.
57    pub batch_size: Option<usize>,
58
59    /// Controls whether to panic the validator in case of errors
60    /// writing to PostgreSQL server. The default is false
61    pub panic_on_db_errors: Option<bool>,
62
63    /// Indicates whether to store historical data for accounts
64    pub store_account_historical_data: Option<bool>,
65
66    /// Controls whether to use SSL based connection to the database server.
67    /// The default is false
68    pub use_ssl: Option<bool>,
69
70    /// Specify the path to PostgreSQL server's certificate file
71    pub server_ca: Option<String>,
72
73    /// Specify the path to the local client's certificate file
74    pub client_cert: Option<String>,
75
76    /// Specify the path to the local client's private PEM key file.
77    pub client_key: Option<String>,
78
79    /// Controls whether to index the token owners. The default is false
80    pub index_token_owner: Option<bool>,
81
82    /// Controls whetherf to index the token mints. The default is false
83    pub index_token_mint: Option<bool>,
84}
85
86#[derive(Error, Debug)]
87pub enum GeyserPluginPostgresError {
88    #[error("Error connecting to the backend data store. Error message: ({msg})")]
89    DataStoreConnectionError { msg: String },
90
91    #[error("Error preparing data store schema. Error message: ({msg})")]
92    DataSchemaError { msg: String },
93
94    #[error("Error preparing data store schema. Error message: ({msg})")]
95    ConfigurationError { msg: String },
96}
97
98impl GeyserPlugin for GeyserPluginPostgres {
99    fn name(&self) -> &'static str {
100        "GeyserPluginPostgres"
101    }
102
103    /// Do initialization for the PostgreSQL plugin.
104    ///
105    /// # Format of the config file:
106    /// * The `accounts_selector` section allows the user to controls accounts selections.
107    /// "accounts_selector" : {
108    ///     "accounts" : \["pubkey-1", "pubkey-2", ..., "pubkey-n"\],
109    /// }
110    /// or:
111    /// "accounts_selector" = {
112    ///     "owners" : \["pubkey-1", "pubkey-2", ..., "pubkey-m"\]
113    /// }
114    /// Accounts either satisyfing the accounts condition or owners condition will be selected.
115    /// When only owners is specified,
116    /// all accounts belonging to the owners will be streamed.
117    /// The accounts field supports wildcard to select all accounts:
118    /// "accounts_selector" : {
119    ///     "accounts" : \["*"\],
120    /// }
121    /// * "host", optional, specifies the PostgreSQL server.
122    /// * "user", optional, specifies the PostgreSQL user.
123    /// * "port", optional, specifies the PostgreSQL server's port.
124    /// * "connection_str", optional, the custom PostgreSQL connection string.
125    /// Please refer to https://docs.rs/postgres/0.19.2/postgres/config/struct.Config.html for the connection configuration.
126    /// When `connection_str` is set, the values in "host", "user" and "port" are ignored. If `connection_str` is not given,
127    /// `host` and `user` must be given.
128    /// "store_account_historical_data", optional, set it to 'true', to store historical account data to account_audit
129    /// table.
130    /// * "threads" optional, specifies the number of worker threads for the plugin. A thread
131    /// maintains a PostgreSQL connection to the server. The default is '10'.
132    /// * "batch_size" optional, specifies the batch size of bulk insert when the AccountsDb is created
133    /// from restoring a snapshot. The default is '10'.
134    /// * "panic_on_db_errors", optional, contols if to panic when there are errors replicating data to the
135    /// PostgreSQL database. The default is 'false'.
136    /// * "transaction_selector", optional, controls if and what transaction to store. If this field is missing
137    /// None of the transction is stored.
138    /// "transaction_selector" : {
139    ///     "mentions" : \["pubkey-1", "pubkey-2", ..., "pubkey-n"\],
140    /// }
141    /// The `mentions` field support wildcard to select all transaction or all 'vote' transactions:
142    /// For example, to select all transactions:
143    /// "transaction_selector" : {
144    ///     "mentions" : \["*"\],
145    /// }
146    /// To select all vote transactions:
147    /// "transaction_selector" : {
148    ///     "mentions" : \["all_votes"\],
149    /// }
150    /// # Examples
151    ///
152    /// {
153    ///    "libpath": "/home/solana/target/release/libsolana_geyser_plugin_postgres.so",
154    ///    "host": "host_foo",
155    ///    "user": "solana",
156    ///    "threads": 10,
157    ///    "accounts_selector" : {
158    ///       "owners" : ["9oT9R5ZyRovSVnt37QvVoBttGpNqR3J7unkb567NP8k3"]
159    ///    }
160    /// }
161
162    fn on_load(&mut self, config_file: &str) -> Result<()> {
163        solana_logger::setup_with_default("info");
164        info!(
165            "Loading plugin {:?} from config_file {:?}",
166            self.name(),
167            config_file
168        );
169        let mut file = File::open(config_file)?;
170        let mut contents = String::new();
171        file.read_to_string(&mut contents)?;
172
173        let result: serde_json::Value = serde_json::from_str(&contents).unwrap();
174        self.accounts_selector = Some(Self::create_accounts_selector_from_config(&result));
175        self.transaction_selector = Some(Self::create_transaction_selector_from_config(&result));
176
177        let result: serde_json::Result<GeyserPluginPostgresConfig> =
178            serde_json::from_str(&contents);
179        match result {
180            Err(err) => {
181                return Err(GeyserPluginError::ConfigFileReadError {
182                    msg: format!(
183                        "The config file is not in the JSON format expected: {:?}",
184                        err
185                    ),
186                })
187            }
188            Ok(config) => {
189                let client = PostgresClientBuilder::build_pararallel_postgres_client(&config)?;
190                self.client = Some(client);
191            }
192        }
193
194        Ok(())
195    }
196
197    fn on_unload(&mut self) {
198        info!("Unloading plugin: {:?}", self.name());
199
200        match &mut self.client {
201            None => {}
202            Some(client) => {
203                client.join().unwrap();
204            }
205        }
206    }
207
208    fn update_account(
209        &mut self,
210        account: ReplicaAccountInfoVersions,
211        slot: u64,
212        is_startup: bool,
213    ) -> Result<()> {
214        let mut measure_all = Measure::start("geyser-plugin-postgres-update-account-main");
215        match account {
216            ReplicaAccountInfoVersions::V0_0_1(account) => {
217                let mut measure_select =
218                    Measure::start("geyser-plugin-postgres-update-account-select");
219                if let Some(accounts_selector) = &self.accounts_selector {
220                    if !accounts_selector.is_account_selected(account.pubkey, account.owner) {
221                        return Ok(());
222                    }
223                } else {
224                    return Ok(());
225                }
226                measure_select.stop();
227                inc_new_counter_debug!(
228                    "geyser-plugin-postgres-update-account-select-us",
229                    measure_select.as_us() as usize,
230                    100000,
231                    100000
232                );
233
234                debug!(
235                    "Updating account {:?} with owner {:?} at slot {:?} using account selector {:?}",
236                    bs58::encode(account.pubkey).into_string(),
237                    bs58::encode(account.owner).into_string(),
238                    slot,
239                    self.accounts_selector.as_ref().unwrap()
240                );
241
242                match &mut self.client {
243                    None => {
244                        return Err(GeyserPluginError::Custom(Box::new(
245                            GeyserPluginPostgresError::DataStoreConnectionError {
246                                msg: "There is no connection to the PostgreSQL database."
247                                    .to_string(),
248                            },
249                        )));
250                    }
251                    Some(client) => {
252                        let mut measure_update =
253                            Measure::start("geyser-plugin-postgres-update-account-client");
254                        let result = { client.update_account(account, slot, is_startup) };
255                        measure_update.stop();
256
257                        inc_new_counter_debug!(
258                            "geyser-plugin-postgres-update-account-client-us",
259                            measure_update.as_us() as usize,
260                            100000,
261                            100000
262                        );
263
264                        if let Err(err) = result {
265                            return Err(GeyserPluginError::AccountsUpdateError {
266                                msg: format!("Failed to persist the update of account to the PostgreSQL database. Error: {:?}", err)
267                            });
268                        }
269                    }
270                }
271            }
272        }
273
274        measure_all.stop();
275
276        inc_new_counter_debug!(
277            "geyser-plugin-postgres-update-account-main-us",
278            measure_all.as_us() as usize,
279            100000,
280            100000
281        );
282
283        Ok(())
284    }
285
286    fn update_slot_status(
287        &mut self,
288        slot: u64,
289        parent: Option<u64>,
290        status: SlotStatus,
291    ) -> Result<()> {
292        info!("Updating slot {:?} at with status {:?}", slot, status);
293
294        match &mut self.client {
295            None => {
296                return Err(GeyserPluginError::Custom(Box::new(
297                    GeyserPluginPostgresError::DataStoreConnectionError {
298                        msg: "There is no connection to the PostgreSQL database.".to_string(),
299                    },
300                )));
301            }
302            Some(client) => {
303                let result = client.update_slot_status(slot, parent, status);
304
305                if let Err(err) = result {
306                    return Err(GeyserPluginError::SlotStatusUpdateError{
307                        msg: format!("Failed to persist the update of slot to the PostgreSQL database. Error: {:?}", err)
308                    });
309                }
310            }
311        }
312
313        Ok(())
314    }
315
316    fn notify_end_of_startup(&mut self) -> Result<()> {
317        info!("Notifying the end of startup for accounts notifications");
318        match &mut self.client {
319            None => {
320                return Err(GeyserPluginError::Custom(Box::new(
321                    GeyserPluginPostgresError::DataStoreConnectionError {
322                        msg: "There is no connection to the PostgreSQL database.".to_string(),
323                    },
324                )));
325            }
326            Some(client) => {
327                let result = client.notify_end_of_startup();
328
329                if let Err(err) = result {
330                    return Err(GeyserPluginError::SlotStatusUpdateError{
331                        msg: format!("Failed to notify the end of startup for accounts notifications. Error: {:?}", err)
332                    });
333                }
334            }
335        }
336        Ok(())
337    }
338
339    fn notify_transaction(
340        &mut self,
341        transaction_info: ReplicaTransactionInfoVersions,
342        slot: u64,
343    ) -> Result<()> {
344        match &mut self.client {
345            None => {
346                return Err(GeyserPluginError::Custom(Box::new(
347                    GeyserPluginPostgresError::DataStoreConnectionError {
348                        msg: "There is no connection to the PostgreSQL database.".to_string(),
349                    },
350                )));
351            }
352            Some(client) => match transaction_info {
353                ReplicaTransactionInfoVersions::V0_0_1(transaction_info) => {
354                    if let Some(transaction_selector) = &self.transaction_selector {
355                        if !transaction_selector.is_transaction_selected(
356                            transaction_info.is_vote,
357                            Box::new(transaction_info.transaction.message().account_keys().iter()),
358                        ) {
359                            return Ok(());
360                        }
361                    } else {
362                        return Ok(());
363                    }
364
365                    let result = client.log_transaction_info(transaction_info, slot);
366
367                    if let Err(err) = result {
368                        return Err(GeyserPluginError::SlotStatusUpdateError{
369                                msg: format!("Failed to persist the transaction info to the PostgreSQL database. Error: {:?}", err)
370                            });
371                    }
372                }
373            },
374        }
375
376        Ok(())
377    }
378
379    fn notify_block_metadata(&mut self, block_info: ReplicaBlockInfoVersions) -> Result<()> {
380        match &mut self.client {
381            None => {
382                return Err(GeyserPluginError::Custom(Box::new(
383                    GeyserPluginPostgresError::DataStoreConnectionError {
384                        msg: "There is no connection to the PostgreSQL database.".to_string(),
385                    },
386                )));
387            }
388            Some(client) => match block_info {
389                ReplicaBlockInfoVersions::V0_0_1(block_info) => {
390                    let result = client.update_block_metadata(block_info);
391
392                    if let Err(err) = result {
393                        return Err(GeyserPluginError::SlotStatusUpdateError{
394                                msg: format!("Failed to persist the update of block metadata to the PostgreSQL database. Error: {:?}", err)
395                            });
396                    }
397                }
398            },
399        }
400
401        Ok(())
402    }
403
404    /// Check if the plugin is interested in account data
405    /// Default is true -- if the plugin is not interested in
406    /// account data, please return false.
407    fn account_data_notifications_enabled(&self) -> bool {
408        self.accounts_selector
409            .as_ref()
410            .map_or_else(|| false, |selector| selector.is_enabled())
411    }
412
413    /// Check if the plugin is interested in transaction data
414    fn transaction_notifications_enabled(&self) -> bool {
415        self.transaction_selector
416            .as_ref()
417            .map_or_else(|| false, |selector| selector.is_enabled())
418    }
419}
420
421impl GeyserPluginPostgres {
422    fn create_accounts_selector_from_config(config: &serde_json::Value) -> AccountsSelector {
423        let accounts_selector = &config["accounts_selector"];
424
425        if accounts_selector.is_null() {
426            AccountsSelector::default()
427        } else {
428            let accounts = &accounts_selector["accounts"];
429            let accounts: Vec<String> = if accounts.is_array() {
430                accounts
431                    .as_array()
432                    .unwrap()
433                    .iter()
434                    .map(|val| val.as_str().unwrap().to_string())
435                    .collect()
436            } else {
437                Vec::default()
438            };
439            let owners = &accounts_selector["owners"];
440            let owners: Vec<String> = if owners.is_array() {
441                owners
442                    .as_array()
443                    .unwrap()
444                    .iter()
445                    .map(|val| val.as_str().unwrap().to_string())
446                    .collect()
447            } else {
448                Vec::default()
449            };
450            AccountsSelector::new(&accounts, &owners)
451        }
452    }
453
454    fn create_transaction_selector_from_config(config: &serde_json::Value) -> TransactionSelector {
455        let transaction_selector = &config["transaction_selector"];
456
457        if transaction_selector.is_null() {
458            TransactionSelector::default()
459        } else {
460            let accounts = &transaction_selector["mentions"];
461            let accounts: Vec<String> = if accounts.is_array() {
462                accounts
463                    .as_array()
464                    .unwrap()
465                    .iter()
466                    .map(|val| val.as_str().unwrap().to_string())
467                    .collect()
468            } else {
469                Vec::default()
470            };
471            TransactionSelector::new(&accounts)
472        }
473    }
474
475    pub fn new() -> Self {
476        Self::default()
477    }
478}
479
480#[no_mangle]
481#[allow(improper_ctypes_definitions)]
482/// # Safety
483///
484/// This function returns the GeyserPluginPostgres pointer as trait GeyserPlugin.
485pub unsafe extern "C" fn _create_plugin() -> *mut dyn GeyserPlugin {
486    let plugin = GeyserPluginPostgres::new();
487    let plugin: Box<dyn GeyserPlugin> = Box::new(plugin);
488    Box::into_raw(plugin)
489}
490
491#[cfg(test)]
492pub(crate) mod tests {
493    use {super::*, serde_json};
494
495    #[test]
496    fn test_accounts_selector_from_config() {
497        let config = "{\"accounts_selector\" : { \
498           \"owners\" : [\"9xQeWvG816bUx9EPjHmaT23yvVM2ZWbrrpZb9PusVFin\"] \
499        }}";
500
501        let config: serde_json::Value = serde_json::from_str(config).unwrap();
502        GeyserPluginPostgres::create_accounts_selector_from_config(&config);
503    }
504}