1#![allow(clippy::integer_arithmetic)]
2
3mod postgres_client_account_index;
4mod postgres_client_block_metadata;
5mod postgres_client_transaction;
6
7use {
9 crate::{
10 geyser_plugin_postgres::{GeyserPluginPostgresConfig, GeyserPluginPostgresError},
11 postgres_client::postgres_client_account_index::TokenSecondaryIndexEntry,
12 },
13 chrono::Utc,
14 crossbeam_channel::{bounded, Receiver, RecvTimeoutError, Sender},
15 log::*,
16 openssl::ssl::{SslConnector, SslFiletype, SslMethod},
17 postgres::{Client, NoTls, Statement},
18 postgres_client_block_metadata::DbBlockInfo,
19 postgres_client_transaction::LogTransactionRequest,
20 postgres_openssl::MakeTlsConnector,
21 solana_geyser_plugin_interface::geyser_plugin_interface::{
22 GeyserPluginError, ReplicaAccountInfo, ReplicaBlockInfo, SlotStatus,
23 },
24 solana_measure::measure::Measure,
25 solana_metrics::*,
26 solana_sdk::timing::AtomicInterval,
27 std::{
28 collections::HashSet,
29 sync::{
30 atomic::{AtomicBool, AtomicUsize, Ordering},
31 Arc, Mutex,
32 },
33 thread::{self, sleep, Builder, JoinHandle},
34 time::Duration,
35 },
36 tokio_postgres::types,
37};
38
39const MAX_ASYNC_REQUESTS: usize = 40960;
42const DEFAULT_POSTGRES_PORT: u16 = 5432;
43const DEFAULT_THREADS_COUNT: usize = 100;
44const DEFAULT_ACCOUNTS_INSERT_BATCH_SIZE: usize = 10;
45const ACCOUNT_COLUMN_COUNT: usize = 9;
46const DEFAULT_PANIC_ON_DB_ERROR: bool = false;
47const DEFAULT_STORE_ACCOUNT_HISTORICAL_DATA: bool = false;
48
49struct PostgresSqlClientWrapper {
50 client: Client,
51 update_account_stmt: Statement,
52 bulk_account_insert_stmt: Statement,
53 update_slot_with_parent_stmt: Statement,
54 update_slot_without_parent_stmt: Statement,
55 update_transaction_log_stmt: Statement,
56 update_block_metadata_stmt: Statement,
57 insert_account_audit_stmt: Option<Statement>,
58 insert_token_owner_index_stmt: Option<Statement>,
59 insert_token_mint_index_stmt: Option<Statement>,
60 bulk_insert_token_owner_index_stmt: Option<Statement>,
61 bulk_insert_token_mint_index_stmt: Option<Statement>,
62}
63
64pub struct SimplePostgresClient {
65 batch_size: usize,
66 slots_at_startup: HashSet<u64>,
67 pending_account_updates: Vec<DbAccountInfo>,
68 index_token_owner: bool,
69 index_token_mint: bool,
70 pending_token_owner_index: Vec<TokenSecondaryIndexEntry>,
71 pending_token_mint_index: Vec<TokenSecondaryIndexEntry>,
72 client: Mutex<PostgresSqlClientWrapper>,
73}
74
75struct PostgresClientWorker {
76 client: SimplePostgresClient,
77 is_startup_done: bool,
79}
80
81impl Eq for DbAccountInfo {}
82
83#[derive(Clone, PartialEq, Debug)]
84pub struct DbAccountInfo {
85 pub pubkey: Vec<u8>,
86 pub lamports: i64,
87 pub owner: Vec<u8>,
88 pub executable: bool,
89 pub rent_epoch: i64,
90 pub data: Vec<u8>,
91 pub slot: i64,
92 pub write_version: i64,
93}
94
95pub(crate) fn abort() -> ! {
96 #[cfg(not(test))]
97 {
98 eprintln!("Validator process aborted. The validator log may contain further details");
101 std::process::exit(1);
102 }
103
104 #[cfg(test)]
105 panic!("process::exit(1) is intercepted for friendly test failure...");
106}
107
108impl DbAccountInfo {
109 fn new<T: ReadableAccountInfo>(account: &T, slot: u64) -> DbAccountInfo {
110 let data = account.data().to_vec();
111 Self {
112 pubkey: account.pubkey().to_vec(),
113 lamports: account.lamports() as i64,
114 owner: account.owner().to_vec(),
115 executable: account.executable(),
116 rent_epoch: account.rent_epoch() as i64,
117 data,
118 slot: slot as i64,
119 write_version: account.write_version(),
120 }
121 }
122}
123
124pub trait ReadableAccountInfo: Sized {
125 fn pubkey(&self) -> &[u8];
126 fn owner(&self) -> &[u8];
127 fn lamports(&self) -> i64;
128 fn executable(&self) -> bool;
129 fn rent_epoch(&self) -> i64;
130 fn data(&self) -> &[u8];
131 fn write_version(&self) -> i64;
132}
133
134impl ReadableAccountInfo for DbAccountInfo {
135 fn pubkey(&self) -> &[u8] {
136 &self.pubkey
137 }
138
139 fn owner(&self) -> &[u8] {
140 &self.owner
141 }
142
143 fn lamports(&self) -> i64 {
144 self.lamports
145 }
146
147 fn executable(&self) -> bool {
148 self.executable
149 }
150
151 fn rent_epoch(&self) -> i64 {
152 self.rent_epoch
153 }
154
155 fn data(&self) -> &[u8] {
156 &self.data
157 }
158
159 fn write_version(&self) -> i64 {
160 self.write_version
161 }
162}
163
164impl<'a> ReadableAccountInfo for ReplicaAccountInfo<'a> {
165 fn pubkey(&self) -> &[u8] {
166 self.pubkey
167 }
168
169 fn owner(&self) -> &[u8] {
170 self.owner
171 }
172
173 fn lamports(&self) -> i64 {
174 self.lamports as i64
175 }
176
177 fn executable(&self) -> bool {
178 self.executable
179 }
180
181 fn rent_epoch(&self) -> i64 {
182 self.rent_epoch as i64
183 }
184
185 fn data(&self) -> &[u8] {
186 self.data
187 }
188
189 fn write_version(&self) -> i64 {
190 self.write_version as i64
191 }
192}
193
194pub trait PostgresClient {
195 fn join(&mut self) -> thread::Result<()> {
196 Ok(())
197 }
198
199 fn update_account(
200 &mut self,
201 account: DbAccountInfo,
202 is_startup: bool,
203 ) -> Result<(), GeyserPluginError>;
204
205 fn update_slot_status(
206 &mut self,
207 slot: u64,
208 parent: Option<u64>,
209 status: SlotStatus,
210 ) -> Result<(), GeyserPluginError>;
211
212 fn notify_end_of_startup(&mut self) -> Result<(), GeyserPluginError>;
213
214 fn log_transaction(
215 &mut self,
216 transaction_log_info: LogTransactionRequest,
217 ) -> Result<(), GeyserPluginError>;
218
219 fn update_block_metadata(
220 &mut self,
221 block_info: UpdateBlockMetadataRequest,
222 ) -> Result<(), GeyserPluginError>;
223}
224
225impl SimplePostgresClient {
226 pub fn connect_to_db(config: &GeyserPluginPostgresConfig) -> Result<Client, GeyserPluginError> {
227 let port = config.port.unwrap_or(DEFAULT_POSTGRES_PORT);
228
229 let connection_str = if let Some(connection_str) = &config.connection_str {
230 connection_str.clone()
231 } else {
232 if config.host.is_none() || config.user.is_none() {
233 let msg = format!(
234 "\"connection_str\": {:?}, or \"host\": {:?} \"user\": {:?} must be specified",
235 config.connection_str, config.host, config.user
236 );
237 return Err(GeyserPluginError::Custom(Box::new(
238 GeyserPluginPostgresError::ConfigurationError { msg },
239 )));
240 }
241 format!(
242 "host={} user={} port={}",
243 config.host.as_ref().unwrap(),
244 config.user.as_ref().unwrap(),
245 port
246 )
247 };
248
249 let result = if let Some(true) = config.use_ssl {
250 if config.server_ca.is_none() {
251 let msg = "\"server_ca\" must be specified when \"use_ssl\" is set".to_string();
252 return Err(GeyserPluginError::Custom(Box::new(
253 GeyserPluginPostgresError::ConfigurationError { msg },
254 )));
255 }
256 if config.client_cert.is_none() {
257 let msg = "\"client_cert\" must be specified when \"use_ssl\" is set".to_string();
258 return Err(GeyserPluginError::Custom(Box::new(
259 GeyserPluginPostgresError::ConfigurationError { msg },
260 )));
261 }
262 if config.client_key.is_none() {
263 let msg = "\"client_key\" must be specified when \"use_ssl\" is set".to_string();
264 return Err(GeyserPluginError::Custom(Box::new(
265 GeyserPluginPostgresError::ConfigurationError { msg },
266 )));
267 }
268 let mut builder = SslConnector::builder(SslMethod::tls()).unwrap();
269 if let Err(err) = builder.set_ca_file(config.server_ca.as_ref().unwrap()) {
270 let msg = format!(
271 "Failed to set the server certificate specified by \"server_ca\": {}. Error: ({})",
272 config.server_ca.as_ref().unwrap(), err);
273 return Err(GeyserPluginError::Custom(Box::new(
274 GeyserPluginPostgresError::ConfigurationError { msg },
275 )));
276 }
277 if let Err(err) =
278 builder.set_certificate_file(config.client_cert.as_ref().unwrap(), SslFiletype::PEM)
279 {
280 let msg = format!(
281 "Failed to set the client certificate specified by \"client_cert\": {}. Error: ({})",
282 config.client_cert.as_ref().unwrap(), err);
283 return Err(GeyserPluginError::Custom(Box::new(
284 GeyserPluginPostgresError::ConfigurationError { msg },
285 )));
286 }
287 if let Err(err) =
288 builder.set_private_key_file(config.client_key.as_ref().unwrap(), SslFiletype::PEM)
289 {
290 let msg = format!(
291 "Failed to set the client key specified by \"client_key\": {}. Error: ({})",
292 config.client_key.as_ref().unwrap(),
293 err
294 );
295 return Err(GeyserPluginError::Custom(Box::new(
296 GeyserPluginPostgresError::ConfigurationError { msg },
297 )));
298 }
299
300 let mut connector = MakeTlsConnector::new(builder.build());
301 connector.set_callback(|connect_config, _domain| {
302 connect_config.set_verify_hostname(false);
303 Ok(())
304 });
305 Client::connect(&connection_str, connector)
306 } else {
307 Client::connect(&connection_str, NoTls)
308 };
309
310 match result {
311 Err(err) => {
312 let msg = format!(
313 "Error in connecting to the PostgreSQL database: {:?} connection_str: {:?}",
314 err, connection_str
315 );
316 error!("{}", msg);
317 Err(GeyserPluginError::Custom(Box::new(
318 GeyserPluginPostgresError::DataStoreConnectionError { msg },
319 )))
320 }
321 Ok(client) => Ok(client),
322 }
323 }
324
325 fn build_bulk_account_insert_statement(
326 client: &mut Client,
327 config: &GeyserPluginPostgresConfig,
328 ) -> Result<Statement, GeyserPluginError> {
329 let batch_size = config
330 .batch_size
331 .unwrap_or(DEFAULT_ACCOUNTS_INSERT_BATCH_SIZE);
332 let mut stmt = String::from("INSERT INTO account AS acct (pubkey, slot, owner, lamports, executable, rent_epoch, data, write_version, updated_on) VALUES");
333 for j in 0..batch_size {
334 let row = j * ACCOUNT_COLUMN_COUNT;
335 let val_str = format!(
336 "(${}, ${}, ${}, ${}, ${}, ${}, ${}, ${}, ${})",
337 row + 1,
338 row + 2,
339 row + 3,
340 row + 4,
341 row + 5,
342 row + 6,
343 row + 7,
344 row + 8,
345 row + 9,
346 );
347
348 if j == 0 {
349 stmt = format!("{} {}", &stmt, val_str);
350 } else {
351 stmt = format!("{}, {}", &stmt, val_str);
352 }
353 }
354
355 let handle_conflict = "ON CONFLICT (pubkey) DO UPDATE SET slot=excluded.slot, owner=excluded.owner, lamports=excluded.lamports, executable=excluded.executable, rent_epoch=excluded.rent_epoch, \
356 data=excluded.data, write_version=excluded.write_version, updated_on=excluded.updated_on WHERE acct.slot < excluded.slot OR (\
357 acct.slot = excluded.slot AND acct.write_version < excluded.write_version)";
358
359 stmt = format!("{} {}", stmt, handle_conflict);
360
361 info!("{}", stmt);
362 let bulk_stmt = client.prepare(&stmt);
363
364 match bulk_stmt {
365 Err(err) => {
366 return Err(GeyserPluginError::Custom(Box::new(GeyserPluginPostgresError::DataSchemaError {
367 msg: format!(
368 "Error in preparing for the accounts update PostgreSQL database: {} host: {:?} user: {:?} config: {:?}",
369 err, config.host, config.user, config
370 ),
371 })));
372 }
373 Ok(update_account_stmt) => Ok(update_account_stmt),
374 }
375 }
376
377 fn build_single_account_upsert_statement(
378 client: &mut Client,
379 config: &GeyserPluginPostgresConfig,
380 ) -> Result<Statement, GeyserPluginError> {
381 let stmt = "INSERT INTO account AS acct (pubkey, slot, owner, lamports, executable, rent_epoch, data, write_version, updated_on) \
382 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) \
383 ON CONFLICT (pubkey) DO UPDATE SET slot=excluded.slot, owner=excluded.owner, lamports=excluded.lamports, executable=excluded.executable, rent_epoch=excluded.rent_epoch, \
384 data=excluded.data, write_version=excluded.write_version, updated_on=excluded.updated_on WHERE acct.slot < excluded.slot OR (\
385 acct.slot = excluded.slot AND acct.write_version < excluded.write_version)";
386
387 let stmt = client.prepare(stmt);
388
389 match stmt {
390 Err(err) => {
391 return Err(GeyserPluginError::Custom(Box::new(GeyserPluginPostgresError::DataSchemaError {
392 msg: format!(
393 "Error in preparing for the accounts update PostgreSQL database: {} host: {:?} user: {:?} config: {:?}",
394 err, config.host, config.user, config
395 ),
396 })));
397 }
398 Ok(update_account_stmt) => Ok(update_account_stmt),
399 }
400 }
401
402 fn prepare_query_statement(
403 client: &mut Client,
404 config: &GeyserPluginPostgresConfig,
405 stmt: &str,
406 ) -> Result<Statement, GeyserPluginError> {
407 let statement = client.prepare(stmt);
408
409 match statement {
410 Err(err) => {
411 return Err(GeyserPluginError::Custom(Box::new(GeyserPluginPostgresError::DataSchemaError {
412 msg: format!(
413 "Error in preparing for the statement {} for PostgreSQL database: {} host: {:?} user: {:?} config: {:?}",
414 stmt, err, config.host, config.user, config
415 ),
416 })));
417 }
418 Ok(statement) => Ok(statement),
419 }
420 }
421
422 fn build_account_audit_insert_statement(
423 client: &mut Client,
424 config: &GeyserPluginPostgresConfig,
425 ) -> Result<Statement, GeyserPluginError> {
426 let stmt = "INSERT INTO account_audit (pubkey, slot, owner, lamports, executable, rent_epoch, data, write_version, updated_on) \
427 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)";
428
429 let stmt = client.prepare(stmt);
430
431 match stmt {
432 Err(err) => {
433 return Err(GeyserPluginError::Custom(Box::new(GeyserPluginPostgresError::DataSchemaError {
434 msg: format!(
435 "Error in preparing for the account_audit update PostgreSQL database: {} host: {:?} user: {:?} config: {:?}",
436 err, config.host, config.user, config
437 ),
438 })));
439 }
440 Ok(stmt) => Ok(stmt),
441 }
442 }
443
444 fn build_slot_upsert_statement_with_parent(
445 client: &mut Client,
446 config: &GeyserPluginPostgresConfig,
447 ) -> Result<Statement, GeyserPluginError> {
448 let stmt = "INSERT INTO slot (slot, parent, status, updated_on) \
449 VALUES ($1, $2, $3, $4) \
450 ON CONFLICT (slot) DO UPDATE SET parent=excluded.parent, status=excluded.status, updated_on=excluded.updated_on";
451
452 let stmt = client.prepare(stmt);
453
454 match stmt {
455 Err(err) => {
456 return Err(GeyserPluginError::Custom(Box::new(GeyserPluginPostgresError::DataSchemaError {
457 msg: format!(
458 "Error in preparing for the slot update PostgreSQL database: {} host: {:?} user: {:?} config: {:?}",
459 err, config.host, config.user, config
460 ),
461 })));
462 }
463 Ok(stmt) => Ok(stmt),
464 }
465 }
466
467 fn build_slot_upsert_statement_without_parent(
468 client: &mut Client,
469 config: &GeyserPluginPostgresConfig,
470 ) -> Result<Statement, GeyserPluginError> {
471 let stmt = "INSERT INTO slot (slot, status, updated_on) \
472 VALUES ($1, $2, $3) \
473 ON CONFLICT (slot) DO UPDATE SET status=excluded.status, updated_on=excluded.updated_on";
474
475 let stmt = client.prepare(stmt);
476
477 match stmt {
478 Err(err) => {
479 return Err(GeyserPluginError::Custom(Box::new(GeyserPluginPostgresError::DataSchemaError {
480 msg: format!(
481 "Error in preparing for the slot update PostgreSQL database: {} host: {:?} user: {:?} config: {:?}",
482 err, config.host, config.user, config
483 ),
484 })));
485 }
486 Ok(stmt) => Ok(stmt),
487 }
488 }
489
490 fn insert_account_audit(
492 account: &DbAccountInfo,
493 statement: &Statement,
494 client: &mut Client,
495 ) -> Result<(), GeyserPluginError> {
496 let lamports = account.lamports() as i64;
497 let rent_epoch = account.rent_epoch() as i64;
498 let updated_on = Utc::now().naive_utc();
499 let result = client.execute(
500 statement,
501 &[
502 &account.pubkey(),
503 &account.slot,
504 &account.owner(),
505 &lamports,
506 &account.executable(),
507 &rent_epoch,
508 &account.data(),
509 &account.write_version(),
510 &updated_on,
511 ],
512 );
513
514 if let Err(err) = result {
515 let msg = format!(
516 "Failed to persist the insert of account_audit to the PostgreSQL database. Error: {:?}",
517 err
518 );
519 error!("{}", msg);
520 return Err(GeyserPluginError::AccountsUpdateError { msg });
521 }
522 Ok(())
523 }
524
525 fn upsert_account_internal(
527 account: &DbAccountInfo,
528 statement: &Statement,
529 client: &mut Client,
530 insert_account_audit_stmt: &Option<Statement>,
531 insert_token_owner_index_stmt: &Option<Statement>,
532 insert_token_mint_index_stmt: &Option<Statement>,
533 ) -> Result<(), GeyserPluginError> {
534 let lamports = account.lamports() as i64;
535 let rent_epoch = account.rent_epoch() as i64;
536 let updated_on = Utc::now().naive_utc();
537 let result = client.execute(
538 statement,
539 &[
540 &account.pubkey(),
541 &account.slot,
542 &account.owner(),
543 &lamports,
544 &account.executable(),
545 &rent_epoch,
546 &account.data(),
547 &account.write_version(),
548 &updated_on,
549 ],
550 );
551
552 if let Err(err) = result {
553 let msg = format!(
554 "Failed to persist the update of account to the PostgreSQL database. Error: {:?}",
555 err
556 );
557 error!("{}", msg);
558 return Err(GeyserPluginError::AccountsUpdateError { msg });
559 } else if result.unwrap() == 0 && insert_account_audit_stmt.is_some() {
560 let statement = insert_account_audit_stmt.as_ref().unwrap();
563 Self::insert_account_audit(account, statement, client)?;
564 }
565
566 if let Some(insert_token_owner_index_stmt) = insert_token_owner_index_stmt {
567 Self::update_token_owner_index(client, insert_token_owner_index_stmt, account)?;
568 }
569
570 if let Some(insert_token_mint_index_stmt) = insert_token_mint_index_stmt {
571 Self::update_token_mint_index(client, insert_token_mint_index_stmt, account)?;
572 }
573
574 Ok(())
575 }
576
577 fn upsert_account(&mut self, account: &DbAccountInfo) -> Result<(), GeyserPluginError> {
579 let client = self.client.get_mut().unwrap();
580 let insert_account_audit_stmt = &client.insert_account_audit_stmt;
581 let statement = &client.update_account_stmt;
582 let insert_token_owner_index_stmt = &client.insert_token_owner_index_stmt;
583 let insert_token_mint_index_stmt = &client.insert_token_mint_index_stmt;
584 let client = &mut client.client;
585 Self::upsert_account_internal(
586 account,
587 statement,
588 client,
589 insert_account_audit_stmt,
590 insert_token_owner_index_stmt,
591 insert_token_mint_index_stmt,
592 )?;
593
594 Ok(())
595 }
596
597 fn insert_accounts_in_batch(
599 &mut self,
600 account: DbAccountInfo,
601 ) -> Result<(), GeyserPluginError> {
602 self.queue_secondary_indexes(&account);
603 self.pending_account_updates.push(account);
604
605 self.bulk_insert_accounts()?;
606 self.bulk_insert_token_owner_index()?;
607 self.bulk_insert_token_mint_index()
608 }
609
610 fn bulk_insert_accounts(&mut self) -> Result<(), GeyserPluginError> {
611 if self.pending_account_updates.len() == self.batch_size {
612 let mut measure = Measure::start("geyser-plugin-postgres-prepare-values");
613
614 let mut values: Vec<&(dyn types::ToSql + Sync)> =
615 Vec::with_capacity(self.batch_size * ACCOUNT_COLUMN_COUNT);
616 let updated_on = Utc::now().naive_utc();
617 for j in 0..self.batch_size {
618 let account = &self.pending_account_updates[j];
619
620 values.push(&account.pubkey);
621 values.push(&account.slot);
622 values.push(&account.owner);
623 values.push(&account.lamports);
624 values.push(&account.executable);
625 values.push(&account.rent_epoch);
626 values.push(&account.data);
627 values.push(&account.write_version);
628 values.push(&updated_on);
629 }
630 measure.stop();
631 inc_new_counter_debug!(
632 "geyser-plugin-postgres-prepare-values-us",
633 measure.as_us() as usize,
634 10000,
635 10000
636 );
637
638 let mut measure = Measure::start("geyser-plugin-postgres-update-account");
639 let client = self.client.get_mut().unwrap();
640 let result = client
641 .client
642 .query(&client.bulk_account_insert_stmt, &values);
643
644 self.pending_account_updates.clear();
645
646 if let Err(err) = result {
647 let msg = format!(
648 "Failed to persist the update of account to the PostgreSQL database. Error: {:?}",
649 err
650 );
651 error!("{}", msg);
652 return Err(GeyserPluginError::AccountsUpdateError { msg });
653 }
654
655 measure.stop();
656 inc_new_counter_debug!(
657 "geyser-plugin-postgres-update-account-us",
658 measure.as_us() as usize,
659 10000,
660 10000
661 );
662 inc_new_counter_debug!(
663 "geyser-plugin-postgres-update-account-count",
664 self.batch_size,
665 10000,
666 10000
667 );
668 }
669 Ok(())
670 }
671
672 fn flush_buffered_writes(&mut self) -> Result<(), GeyserPluginError> {
674 if self.pending_account_updates.is_empty() {
675 return Ok(());
676 }
677
678 let client = self.client.get_mut().unwrap();
679 let insert_account_audit_stmt = &client.insert_account_audit_stmt;
680 let statement = &client.update_account_stmt;
681 let insert_token_owner_index_stmt = &client.insert_token_owner_index_stmt;
682 let insert_token_mint_index_stmt = &client.insert_token_mint_index_stmt;
683 let insert_slot_stmt = &client.update_slot_without_parent_stmt;
684 let client = &mut client.client;
685
686 for account in self.pending_account_updates.drain(..) {
687 Self::upsert_account_internal(
688 &account,
689 statement,
690 client,
691 insert_account_audit_stmt,
692 insert_token_owner_index_stmt,
693 insert_token_mint_index_stmt,
694 )?;
695 }
696
697 let mut measure = Measure::start("geyser-plugin-postgres-flush-slots-us");
698
699 for slot in &self.slots_at_startup {
700 Self::upsert_slot_status_internal(
701 *slot,
702 None,
703 SlotStatus::Rooted,
704 client,
705 insert_slot_stmt,
706 )?;
707 }
708 measure.stop();
709
710 datapoint_info!(
711 "geyser_plugin_notify_account_restore_from_snapshot_summary",
712 ("flush_slots-us", measure.as_us(), i64),
713 ("flush-slots-counts", self.slots_at_startup.len(), i64),
714 );
715
716 self.slots_at_startup.clear();
717 self.clear_buffered_indexes();
718 Ok(())
719 }
720
721 fn upsert_slot_status_internal(
722 slot: u64,
723 parent: Option<u64>,
724 status: SlotStatus,
725 client: &mut Client,
726 statement: &Statement,
727 ) -> Result<(), GeyserPluginError> {
728 let slot = slot as i64; let parent = parent.map(|parent| parent as i64);
730 let updated_on = Utc::now().naive_utc();
731 let status_str = status.as_str();
732
733 let result = match parent {
734 Some(parent) => client.execute(statement, &[&slot, &parent, &status_str, &updated_on]),
735 None => client.execute(statement, &[&slot, &status_str, &updated_on]),
736 };
737
738 match result {
739 Err(err) => {
740 let msg = format!(
741 "Failed to persist the update of slot to the PostgreSQL database. Error: {:?}",
742 err
743 );
744 error!("{:?}", msg);
745 return Err(GeyserPluginError::SlotStatusUpdateError { msg });
746 }
747 Ok(rows) => {
748 assert_eq!(1, rows, "Expected one rows to be updated a time");
749 }
750 }
751
752 Ok(())
753 }
754
755 pub fn new(config: &GeyserPluginPostgresConfig) -> Result<Self, GeyserPluginError> {
756 info!("Creating SimplePostgresClient...");
757 let mut client = Self::connect_to_db(config)?;
758 let bulk_account_insert_stmt =
759 Self::build_bulk_account_insert_statement(&mut client, config)?;
760 let update_account_stmt = Self::build_single_account_upsert_statement(&mut client, config)?;
761
762 let update_slot_with_parent_stmt =
763 Self::build_slot_upsert_statement_with_parent(&mut client, config)?;
764 let update_slot_without_parent_stmt =
765 Self::build_slot_upsert_statement_without_parent(&mut client, config)?;
766 let update_transaction_log_stmt =
767 Self::build_transaction_info_upsert_statement(&mut client, config)?;
768 let update_block_metadata_stmt =
769 Self::build_block_metadata_upsert_statement(&mut client, config)?;
770
771 let batch_size = config
772 .batch_size
773 .unwrap_or(DEFAULT_ACCOUNTS_INSERT_BATCH_SIZE);
774
775 let store_account_historical_data = config
776 .store_account_historical_data
777 .unwrap_or(DEFAULT_STORE_ACCOUNT_HISTORICAL_DATA);
778
779 let insert_account_audit_stmt = if store_account_historical_data {
780 let stmt = Self::build_account_audit_insert_statement(&mut client, config)?;
781 Some(stmt)
782 } else {
783 None
784 };
785
786 let bulk_insert_token_owner_index_stmt = if let Some(true) = config.index_token_owner {
787 let stmt = Self::build_bulk_token_owner_index_insert_statement(&mut client, config)?;
788 Some(stmt)
789 } else {
790 None
791 };
792
793 let bulk_insert_token_mint_index_stmt = if let Some(true) = config.index_token_mint {
794 let stmt = Self::build_bulk_token_mint_index_insert_statement(&mut client, config)?;
795 Some(stmt)
796 } else {
797 None
798 };
799
800 let insert_token_owner_index_stmt = if let Some(true) = config.index_token_owner {
801 Some(Self::build_single_token_owner_index_upsert_statement(
802 &mut client,
803 config,
804 )?)
805 } else {
806 None
807 };
808
809 let insert_token_mint_index_stmt = if let Some(true) = config.index_token_mint {
810 Some(Self::build_single_token_mint_index_upsert_statement(
811 &mut client,
812 config,
813 )?)
814 } else {
815 None
816 };
817
818 info!("Created SimplePostgresClient.");
819 Ok(Self {
820 batch_size,
821 pending_account_updates: Vec::with_capacity(batch_size),
822 client: Mutex::new(PostgresSqlClientWrapper {
823 client,
824 update_account_stmt,
825 bulk_account_insert_stmt,
826 update_slot_with_parent_stmt,
827 update_slot_without_parent_stmt,
828 update_transaction_log_stmt,
829 update_block_metadata_stmt,
830 insert_account_audit_stmt,
831 insert_token_owner_index_stmt,
832 insert_token_mint_index_stmt,
833 bulk_insert_token_owner_index_stmt,
834 bulk_insert_token_mint_index_stmt,
835 }),
836 index_token_owner: config.index_token_owner.unwrap_or_default(),
837 index_token_mint: config.index_token_mint.unwrap_or(false),
838 pending_token_owner_index: Vec::with_capacity(batch_size),
839 pending_token_mint_index: Vec::with_capacity(batch_size),
840 slots_at_startup: HashSet::default(),
841 })
842 }
843}
844
845impl PostgresClient for SimplePostgresClient {
846 fn update_account(
847 &mut self,
848 account: DbAccountInfo,
849 is_startup: bool,
850 ) -> Result<(), GeyserPluginError> {
851 trace!(
852 "Updating account {} with owner {} at slot {}",
853 bs58::encode(account.pubkey()).into_string(),
854 bs58::encode(account.owner()).into_string(),
855 account.slot,
856 );
857 if !is_startup {
858 return self.upsert_account(&account);
859 }
860 self.slots_at_startup.insert(account.slot as u64);
861 self.insert_accounts_in_batch(account)
862 }
863
864 fn update_slot_status(
865 &mut self,
866 slot: u64,
867 parent: Option<u64>,
868 status: SlotStatus,
869 ) -> Result<(), GeyserPluginError> {
870 info!("Updating slot {:?} at with status {:?}", slot, status);
871
872 let client = self.client.get_mut().unwrap();
873
874 let statement = match parent {
875 Some(_) => &client.update_slot_with_parent_stmt,
876 None => &client.update_slot_without_parent_stmt,
877 };
878
879 Self::upsert_slot_status_internal(slot, parent, status, &mut client.client, statement)
880 }
881
882 fn notify_end_of_startup(&mut self) -> Result<(), GeyserPluginError> {
883 self.flush_buffered_writes()
884 }
885
886 fn log_transaction(
887 &mut self,
888 transaction_log_info: LogTransactionRequest,
889 ) -> Result<(), GeyserPluginError> {
890 self.log_transaction_impl(transaction_log_info)
891 }
892
893 fn update_block_metadata(
894 &mut self,
895 block_info: UpdateBlockMetadataRequest,
896 ) -> Result<(), GeyserPluginError> {
897 self.update_block_metadata_impl(block_info)
898 }
899}
900
901struct UpdateAccountRequest {
902 account: DbAccountInfo,
903 is_startup: bool,
904}
905
906struct UpdateSlotRequest {
907 slot: u64,
908 parent: Option<u64>,
909 slot_status: SlotStatus,
910}
911
912pub struct UpdateBlockMetadataRequest {
913 pub block_info: DbBlockInfo,
914}
915
916#[warn(clippy::large_enum_variant)]
917enum DbWorkItem {
918 UpdateAccount(Box<UpdateAccountRequest>),
919 UpdateSlot(Box<UpdateSlotRequest>),
920 LogTransaction(Box<LogTransactionRequest>),
921 UpdateBlockMetadata(Box<UpdateBlockMetadataRequest>),
922}
923
924impl PostgresClientWorker {
925 fn new(config: GeyserPluginPostgresConfig) -> Result<Self, GeyserPluginError> {
926 let result = SimplePostgresClient::new(&config);
927 match result {
928 Ok(client) => Ok(PostgresClientWorker {
929 client,
930 is_startup_done: false,
931 }),
932 Err(err) => {
933 error!("Error in creating SimplePostgresClient: {}", err);
934 Err(err)
935 }
936 }
937 }
938
939 fn do_work(
940 &mut self,
941 receiver: Receiver<DbWorkItem>,
942 exit_worker: Arc<AtomicBool>,
943 is_startup_done: Arc<AtomicBool>,
944 startup_done_count: Arc<AtomicUsize>,
945 panic_on_db_errors: bool,
946 ) -> Result<(), GeyserPluginError> {
947 while !exit_worker.load(Ordering::Relaxed) {
948 let mut measure = Measure::start("geyser-plugin-postgres-worker-recv");
949 let work = receiver.recv_timeout(Duration::from_millis(500));
950 measure.stop();
951 inc_new_counter_debug!(
952 "geyser-plugin-postgres-worker-recv-us",
953 measure.as_us() as usize,
954 100000,
955 100000
956 );
957 match work {
958 Ok(work) => match work {
959 DbWorkItem::UpdateAccount(request) => {
960 if let Err(err) = self
961 .client
962 .update_account(request.account, request.is_startup)
963 {
964 error!("Failed to update account: ({})", err);
965 if panic_on_db_errors {
966 abort();
967 }
968 }
969 }
970 DbWorkItem::UpdateSlot(request) => {
971 if let Err(err) = self.client.update_slot_status(
972 request.slot,
973 request.parent,
974 request.slot_status,
975 ) {
976 error!("Failed to update slot: ({})", err);
977 if panic_on_db_errors {
978 abort();
979 }
980 }
981 }
982 DbWorkItem::LogTransaction(transaction_log_info) => {
983 if let Err(err) = self.client.log_transaction(*transaction_log_info) {
984 error!("Failed to update transaction: ({})", err);
985 if panic_on_db_errors {
986 abort();
987 }
988 }
989 }
990 DbWorkItem::UpdateBlockMetadata(block_info) => {
991 if let Err(err) = self.client.update_block_metadata(*block_info) {
992 error!("Failed to update block metadata: ({})", err);
993 if panic_on_db_errors {
994 abort();
995 }
996 }
997 }
998 },
999 Err(err) => match err {
1000 RecvTimeoutError::Timeout => {
1001 if !self.is_startup_done && is_startup_done.load(Ordering::Relaxed) {
1002 if let Err(err) = self.client.notify_end_of_startup() {
1003 error!("Error in notifying end of startup: ({})", err);
1004 if panic_on_db_errors {
1005 abort();
1006 }
1007 }
1008 self.is_startup_done = true;
1009 startup_done_count.fetch_add(1, Ordering::Relaxed);
1010 }
1011
1012 continue;
1013 }
1014 _ => {
1015 error!("Error in receiving the item {:?}", err);
1016 if panic_on_db_errors {
1017 abort();
1018 }
1019 break;
1020 }
1021 },
1022 }
1023 }
1024 Ok(())
1025 }
1026}
1027pub struct ParallelPostgresClient {
1028 workers: Vec<JoinHandle<Result<(), GeyserPluginError>>>,
1029 exit_worker: Arc<AtomicBool>,
1030 is_startup_done: Arc<AtomicBool>,
1031 startup_done_count: Arc<AtomicUsize>,
1032 initialized_worker_count: Arc<AtomicUsize>,
1033 sender: Sender<DbWorkItem>,
1034 last_report: AtomicInterval,
1035}
1036
1037impl ParallelPostgresClient {
1038 pub fn new(config: &GeyserPluginPostgresConfig) -> Result<Self, GeyserPluginError> {
1039 info!("Creating ParallelPostgresClient...");
1040 let (sender, receiver) = bounded(MAX_ASYNC_REQUESTS);
1041 let exit_worker = Arc::new(AtomicBool::new(false));
1042 let mut workers = Vec::default();
1043 let is_startup_done = Arc::new(AtomicBool::new(false));
1044 let startup_done_count = Arc::new(AtomicUsize::new(0));
1045 let worker_count = config.threads.unwrap_or(DEFAULT_THREADS_COUNT);
1046 let initialized_worker_count = Arc::new(AtomicUsize::new(0));
1047 for i in 0..worker_count {
1048 let cloned_receiver = receiver.clone();
1049 let exit_clone = exit_worker.clone();
1050 let is_startup_done_clone = is_startup_done.clone();
1051 let startup_done_count_clone = startup_done_count.clone();
1052 let initialized_worker_count_clone = initialized_worker_count.clone();
1053 let config = config.clone();
1054 let worker = Builder::new()
1055 .name(format!("worker-{}", i))
1056 .spawn(move || -> Result<(), GeyserPluginError> {
1057 let panic_on_db_errors = *config
1058 .panic_on_db_errors
1059 .as_ref()
1060 .unwrap_or(&DEFAULT_PANIC_ON_DB_ERROR);
1061 let result = PostgresClientWorker::new(config);
1062
1063 match result {
1064 Ok(mut worker) => {
1065 initialized_worker_count_clone.fetch_add(1, Ordering::Relaxed);
1066 worker.do_work(
1067 cloned_receiver,
1068 exit_clone,
1069 is_startup_done_clone,
1070 startup_done_count_clone,
1071 panic_on_db_errors,
1072 )?;
1073 Ok(())
1074 }
1075 Err(err) => {
1076 error!("Error when making connection to database: ({})", err);
1077 if panic_on_db_errors {
1078 abort();
1079 }
1080 Err(err)
1081 }
1082 }
1083 })
1084 .unwrap();
1085
1086 workers.push(worker);
1087 }
1088
1089 info!("Created ParallelPostgresClient.");
1090 Ok(Self {
1091 last_report: AtomicInterval::default(),
1092 workers,
1093 exit_worker,
1094 is_startup_done,
1095 startup_done_count,
1096 initialized_worker_count,
1097 sender,
1098 })
1099 }
1100
1101 pub fn join(&mut self) -> thread::Result<()> {
1102 self.exit_worker.store(true, Ordering::Relaxed);
1103 while !self.workers.is_empty() {
1104 let worker = self.workers.pop();
1105 if worker.is_none() {
1106 break;
1107 }
1108 let worker = worker.unwrap();
1109 let result = worker.join().unwrap();
1110 if result.is_err() {
1111 error!("The worker thread has failed: {:?}", result);
1112 }
1113 }
1114
1115 Ok(())
1116 }
1117
1118 pub fn update_account(
1119 &mut self,
1120 account: &ReplicaAccountInfo,
1121 slot: u64,
1122 is_startup: bool,
1123 ) -> Result<(), GeyserPluginError> {
1124 if self.last_report.should_update(30000) {
1125 datapoint_debug!(
1126 "postgres-plugin-stats",
1127 ("message-queue-length", self.sender.len() as i64, i64),
1128 );
1129 }
1130 let mut measure = Measure::start("geyser-plugin-posgres-create-work-item");
1131 let wrk_item = DbWorkItem::UpdateAccount(Box::new(UpdateAccountRequest {
1132 account: DbAccountInfo::new(account, slot),
1133 is_startup,
1134 }));
1135
1136 measure.stop();
1137
1138 inc_new_counter_debug!(
1139 "geyser-plugin-posgres-create-work-item-us",
1140 measure.as_us() as usize,
1141 100000,
1142 100000
1143 );
1144
1145 let mut measure = Measure::start("geyser-plugin-posgres-send-msg");
1146
1147 if let Err(err) = self.sender.send(wrk_item) {
1148 return Err(GeyserPluginError::AccountsUpdateError {
1149 msg: format!(
1150 "Failed to update the account {:?}, error: {:?}",
1151 bs58::encode(account.pubkey()).into_string(),
1152 err
1153 ),
1154 });
1155 }
1156
1157 measure.stop();
1158 inc_new_counter_debug!(
1159 "geyser-plugin-posgres-send-msg-us",
1160 measure.as_us() as usize,
1161 100000,
1162 100000
1163 );
1164
1165 Ok(())
1166 }
1167
1168 pub fn update_slot_status(
1169 &mut self,
1170 slot: u64,
1171 parent: Option<u64>,
1172 status: SlotStatus,
1173 ) -> Result<(), GeyserPluginError> {
1174 if let Err(err) = self
1175 .sender
1176 .send(DbWorkItem::UpdateSlot(Box::new(UpdateSlotRequest {
1177 slot,
1178 parent,
1179 slot_status: status,
1180 })))
1181 {
1182 return Err(GeyserPluginError::SlotStatusUpdateError {
1183 msg: format!("Failed to update the slot {:?}, error: {:?}", slot, err),
1184 });
1185 }
1186 Ok(())
1187 }
1188
1189 pub fn update_block_metadata(
1190 &mut self,
1191 block_info: &ReplicaBlockInfo,
1192 ) -> Result<(), GeyserPluginError> {
1193 if let Err(err) = self.sender.send(DbWorkItem::UpdateBlockMetadata(Box::new(
1194 UpdateBlockMetadataRequest {
1195 block_info: DbBlockInfo::from(block_info),
1196 },
1197 ))) {
1198 return Err(GeyserPluginError::SlotStatusUpdateError {
1199 msg: format!(
1200 "Failed to update the block metadata at slot {:?}, error: {:?}",
1201 block_info.slot, err
1202 ),
1203 });
1204 }
1205 Ok(())
1206 }
1207
1208 pub fn notify_end_of_startup(&mut self) -> Result<(), GeyserPluginError> {
1209 info!("Notifying the end of startup");
1210 while !self.sender.is_empty() {
1212 sleep(Duration::from_millis(100));
1213 }
1214 self.is_startup_done.store(true, Ordering::Relaxed);
1215
1216 while self.startup_done_count.load(Ordering::Relaxed)
1218 != self.initialized_worker_count.load(Ordering::Relaxed)
1219 {
1220 info!(
1221 "Startup done count: {}, good worker thread count: {}",
1222 self.startup_done_count.load(Ordering::Relaxed),
1223 self.initialized_worker_count.load(Ordering::Relaxed)
1224 );
1225 sleep(Duration::from_millis(100));
1226 }
1227
1228 info!("Done with notifying the end of startup");
1229 Ok(())
1230 }
1231}
1232
1233pub struct PostgresClientBuilder {}
1234
1235impl PostgresClientBuilder {
1236 pub fn build_pararallel_postgres_client(
1237 config: &GeyserPluginPostgresConfig,
1238 ) -> Result<ParallelPostgresClient, GeyserPluginError> {
1239 ParallelPostgresClient::new(config)
1240 }
1241
1242 pub fn build_simple_postgres_client(
1243 config: &GeyserPluginPostgresConfig,
1244 ) -> Result<SimplePostgresClient, GeyserPluginError> {
1245 SimplePostgresClient::new(config)
1246 }
1247}