solana_geyser_plugin_postgres/
geyser_plugin_postgres.rs1use {
3 crate::{
4 accounts_selector::AccountsSelector,
5 postgres_client::{ParallelPostgresClient, PostgresClientBuilder},
6 transaction_selector::TransactionSelector,
7 },
8 bs58,
9 log::*,
10 serde_derive::{Deserialize, Serialize},
11 serde_json,
12 solana_geyser_plugin_interface::geyser_plugin_interface::{
13 GeyserPlugin, GeyserPluginError, ReplicaAccountInfoVersions, ReplicaBlockInfoVersions,
14 ReplicaTransactionInfoVersions, Result, SlotStatus,
15 },
16 solana_measure::measure::Measure,
17 solana_metrics::*,
18 std::{fs::File, io::Read},
19 thiserror::Error,
20};
21
22#[derive(Default)]
23pub struct GeyserPluginPostgres {
24 client: Option<ParallelPostgresClient>,
25 accounts_selector: Option<AccountsSelector>,
26 transaction_selector: Option<TransactionSelector>,
27}
28
29impl std::fmt::Debug for GeyserPluginPostgres {
30 fn fmt(&self, _: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
31 Ok(())
32 }
33}
34
35#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
37pub struct GeyserPluginPostgresConfig {
38 pub host: Option<String>,
40
41 pub user: Option<String>,
43
44 pub port: Option<u16>,
46
47 pub connection_str: Option<String>,
50
51 pub threads: Option<usize>,
54
55 pub batch_size: Option<usize>,
58
59 pub panic_on_db_errors: Option<bool>,
62
63 pub store_account_historical_data: Option<bool>,
65
66 pub use_ssl: Option<bool>,
69
70 pub server_ca: Option<String>,
72
73 pub client_cert: Option<String>,
75
76 pub client_key: Option<String>,
78
79 pub index_token_owner: Option<bool>,
81
82 pub index_token_mint: Option<bool>,
84}
85
86#[derive(Error, Debug)]
87pub enum GeyserPluginPostgresError {
88 #[error("Error connecting to the backend data store. Error message: ({msg})")]
89 DataStoreConnectionError { msg: String },
90
91 #[error("Error preparing data store schema. Error message: ({msg})")]
92 DataSchemaError { msg: String },
93
94 #[error("Error preparing data store schema. Error message: ({msg})")]
95 ConfigurationError { msg: String },
96}
97
98impl GeyserPlugin for GeyserPluginPostgres {
99 fn name(&self) -> &'static str {
100 "GeyserPluginPostgres"
101 }
102
103 fn on_load(&mut self, config_file: &str) -> Result<()> {
163 solana_logger::setup_with_default("info");
164 info!(
165 "Loading plugin {:?} from config_file {:?}",
166 self.name(),
167 config_file
168 );
169 let mut file = File::open(config_file)?;
170 let mut contents = String::new();
171 file.read_to_string(&mut contents)?;
172
173 let result: serde_json::Value = serde_json::from_str(&contents).unwrap();
174 self.accounts_selector = Some(Self::create_accounts_selector_from_config(&result));
175 self.transaction_selector = Some(Self::create_transaction_selector_from_config(&result));
176
177 let result: serde_json::Result<GeyserPluginPostgresConfig> =
178 serde_json::from_str(&contents);
179 match result {
180 Err(err) => {
181 return Err(GeyserPluginError::ConfigFileReadError {
182 msg: format!(
183 "The config file is not in the JSON format expected: {:?}",
184 err
185 ),
186 })
187 }
188 Ok(config) => {
189 let client = PostgresClientBuilder::build_pararallel_postgres_client(&config)?;
190 self.client = Some(client);
191 }
192 }
193
194 Ok(())
195 }
196
197 fn on_unload(&mut self) {
198 info!("Unloading plugin: {:?}", self.name());
199
200 match &mut self.client {
201 None => {}
202 Some(client) => {
203 client.join().unwrap();
204 }
205 }
206 }
207
208 fn update_account(
209 &mut self,
210 account: ReplicaAccountInfoVersions,
211 slot: u64,
212 is_startup: bool,
213 ) -> Result<()> {
214 let mut measure_all = Measure::start("geyser-plugin-postgres-update-account-main");
215 match account {
216 ReplicaAccountInfoVersions::V0_0_1(account) => {
217 let mut measure_select =
218 Measure::start("geyser-plugin-postgres-update-account-select");
219 if let Some(accounts_selector) = &self.accounts_selector {
220 if !accounts_selector.is_account_selected(account.pubkey, account.owner) {
221 return Ok(());
222 }
223 } else {
224 return Ok(());
225 }
226 measure_select.stop();
227 inc_new_counter_debug!(
228 "geyser-plugin-postgres-update-account-select-us",
229 measure_select.as_us() as usize,
230 100000,
231 100000
232 );
233
234 debug!(
235 "Updating account {:?} with owner {:?} at slot {:?} using account selector {:?}",
236 bs58::encode(account.pubkey).into_string(),
237 bs58::encode(account.owner).into_string(),
238 slot,
239 self.accounts_selector.as_ref().unwrap()
240 );
241
242 match &mut self.client {
243 None => {
244 return Err(GeyserPluginError::Custom(Box::new(
245 GeyserPluginPostgresError::DataStoreConnectionError {
246 msg: "There is no connection to the PostgreSQL database."
247 .to_string(),
248 },
249 )));
250 }
251 Some(client) => {
252 let mut measure_update =
253 Measure::start("geyser-plugin-postgres-update-account-client");
254 let result = { client.update_account(account, slot, is_startup) };
255 measure_update.stop();
256
257 inc_new_counter_debug!(
258 "geyser-plugin-postgres-update-account-client-us",
259 measure_update.as_us() as usize,
260 100000,
261 100000
262 );
263
264 if let Err(err) = result {
265 return Err(GeyserPluginError::AccountsUpdateError {
266 msg: format!("Failed to persist the update of account to the PostgreSQL database. Error: {:?}", err)
267 });
268 }
269 }
270 }
271 }
272 }
273
274 measure_all.stop();
275
276 inc_new_counter_debug!(
277 "geyser-plugin-postgres-update-account-main-us",
278 measure_all.as_us() as usize,
279 100000,
280 100000
281 );
282
283 Ok(())
284 }
285
286 fn update_slot_status(
287 &mut self,
288 slot: u64,
289 parent: Option<u64>,
290 status: SlotStatus,
291 ) -> Result<()> {
292 info!("Updating slot {:?} at with status {:?}", slot, status);
293
294 match &mut self.client {
295 None => {
296 return Err(GeyserPluginError::Custom(Box::new(
297 GeyserPluginPostgresError::DataStoreConnectionError {
298 msg: "There is no connection to the PostgreSQL database.".to_string(),
299 },
300 )));
301 }
302 Some(client) => {
303 let result = client.update_slot_status(slot, parent, status);
304
305 if let Err(err) = result {
306 return Err(GeyserPluginError::SlotStatusUpdateError{
307 msg: format!("Failed to persist the update of slot to the PostgreSQL database. Error: {:?}", err)
308 });
309 }
310 }
311 }
312
313 Ok(())
314 }
315
316 fn notify_end_of_startup(&mut self) -> Result<()> {
317 info!("Notifying the end of startup for accounts notifications");
318 match &mut self.client {
319 None => {
320 return Err(GeyserPluginError::Custom(Box::new(
321 GeyserPluginPostgresError::DataStoreConnectionError {
322 msg: "There is no connection to the PostgreSQL database.".to_string(),
323 },
324 )));
325 }
326 Some(client) => {
327 let result = client.notify_end_of_startup();
328
329 if let Err(err) = result {
330 return Err(GeyserPluginError::SlotStatusUpdateError{
331 msg: format!("Failed to notify the end of startup for accounts notifications. Error: {:?}", err)
332 });
333 }
334 }
335 }
336 Ok(())
337 }
338
339 fn notify_transaction(
340 &mut self,
341 transaction_info: ReplicaTransactionInfoVersions,
342 slot: u64,
343 ) -> Result<()> {
344 match &mut self.client {
345 None => {
346 return Err(GeyserPluginError::Custom(Box::new(
347 GeyserPluginPostgresError::DataStoreConnectionError {
348 msg: "There is no connection to the PostgreSQL database.".to_string(),
349 },
350 )));
351 }
352 Some(client) => match transaction_info {
353 ReplicaTransactionInfoVersions::V0_0_1(transaction_info) => {
354 if let Some(transaction_selector) = &self.transaction_selector {
355 if !transaction_selector.is_transaction_selected(
356 transaction_info.is_vote,
357 Box::new(transaction_info.transaction.message().account_keys().iter()),
358 ) {
359 return Ok(());
360 }
361 } else {
362 return Ok(());
363 }
364
365 let result = client.log_transaction_info(transaction_info, slot);
366
367 if let Err(err) = result {
368 return Err(GeyserPluginError::SlotStatusUpdateError{
369 msg: format!("Failed to persist the transaction info to the PostgreSQL database. Error: {:?}", err)
370 });
371 }
372 }
373 },
374 }
375
376 Ok(())
377 }
378
379 fn notify_block_metadata(&mut self, block_info: ReplicaBlockInfoVersions) -> Result<()> {
380 match &mut self.client {
381 None => {
382 return Err(GeyserPluginError::Custom(Box::new(
383 GeyserPluginPostgresError::DataStoreConnectionError {
384 msg: "There is no connection to the PostgreSQL database.".to_string(),
385 },
386 )));
387 }
388 Some(client) => match block_info {
389 ReplicaBlockInfoVersions::V0_0_1(block_info) => {
390 let result = client.update_block_metadata(block_info);
391
392 if let Err(err) = result {
393 return Err(GeyserPluginError::SlotStatusUpdateError{
394 msg: format!("Failed to persist the update of block metadata to the PostgreSQL database. Error: {:?}", err)
395 });
396 }
397 }
398 },
399 }
400
401 Ok(())
402 }
403
404 fn account_data_notifications_enabled(&self) -> bool {
408 self.accounts_selector
409 .as_ref()
410 .map_or_else(|| false, |selector| selector.is_enabled())
411 }
412
413 fn transaction_notifications_enabled(&self) -> bool {
415 self.transaction_selector
416 .as_ref()
417 .map_or_else(|| false, |selector| selector.is_enabled())
418 }
419}
420
421impl GeyserPluginPostgres {
422 fn create_accounts_selector_from_config(config: &serde_json::Value) -> AccountsSelector {
423 let accounts_selector = &config["accounts_selector"];
424
425 if accounts_selector.is_null() {
426 AccountsSelector::default()
427 } else {
428 let accounts = &accounts_selector["accounts"];
429 let accounts: Vec<String> = if accounts.is_array() {
430 accounts
431 .as_array()
432 .unwrap()
433 .iter()
434 .map(|val| val.as_str().unwrap().to_string())
435 .collect()
436 } else {
437 Vec::default()
438 };
439 let owners = &accounts_selector["owners"];
440 let owners: Vec<String> = if owners.is_array() {
441 owners
442 .as_array()
443 .unwrap()
444 .iter()
445 .map(|val| val.as_str().unwrap().to_string())
446 .collect()
447 } else {
448 Vec::default()
449 };
450 AccountsSelector::new(&accounts, &owners)
451 }
452 }
453
454 fn create_transaction_selector_from_config(config: &serde_json::Value) -> TransactionSelector {
455 let transaction_selector = &config["transaction_selector"];
456
457 if transaction_selector.is_null() {
458 TransactionSelector::default()
459 } else {
460 let accounts = &transaction_selector["mentions"];
461 let accounts: Vec<String> = if accounts.is_array() {
462 accounts
463 .as_array()
464 .unwrap()
465 .iter()
466 .map(|val| val.as_str().unwrap().to_string())
467 .collect()
468 } else {
469 Vec::default()
470 };
471 TransactionSelector::new(&accounts)
472 }
473 }
474
475 pub fn new() -> Self {
476 Self::default()
477 }
478}
479
480#[no_mangle]
481#[allow(improper_ctypes_definitions)]
482pub unsafe extern "C" fn _create_plugin() -> *mut dyn GeyserPlugin {
486 let plugin = GeyserPluginPostgres::new();
487 let plugin: Box<dyn GeyserPlugin> = Box::new(plugin);
488 Box::into_raw(plugin)
489}
490
491#[cfg(test)]
492pub(crate) mod tests {
493 use {super::*, serde_json};
494
495 #[test]
496 fn test_accounts_selector_from_config() {
497 let config = "{\"accounts_selector\" : { \
498 \"owners\" : [\"9xQeWvG816bUx9EPjHmaT23yvVM2ZWbrrpZb9PusVFin\"] \
499 }}";
500
501 let config: serde_json::Value = serde_json::from_str(config).unwrap();
502 GeyserPluginPostgres::create_accounts_selector_from_config(&config);
503 }
504}