1#![allow(clippy::integer_arithmetic)]
2
3mod postgres_client_block_metadata;
4mod postgres_client_transaction;
5
6use {
8 crate::accountsdb_plugin_postgres::{
9 AccountsDbPluginPostgresConfig, AccountsDbPluginPostgresError,
10 },
11 chrono::Utc,
12 crossbeam_channel::{bounded, Receiver, RecvTimeoutError, Sender},
13 log::*,
14 openssl::ssl::{SslConnector, SslFiletype, SslMethod},
15 postgres::{Client, NoTls, Statement},
16 postgres_client_block_metadata::DbBlockInfo,
17 postgres_client_transaction::LogTransactionRequest,
18 postgres_openssl::MakeTlsConnector,
19 solana_accountsdb_plugin_interface::accountsdb_plugin_interface::{
20 AccountsDbPluginError, ReplicaAccountInfo, ReplicaBlockInfo, SlotStatus,
21 },
22 solana_measure::measure::Measure,
23 solana_metrics::*,
24 solana_sdk::timing::AtomicInterval,
25 std::{
26 sync::{
27 atomic::{AtomicBool, AtomicUsize, Ordering},
28 Arc, Mutex,
29 },
30 thread::{self, sleep, Builder, JoinHandle},
31 time::Duration,
32 },
33 tokio_postgres::types,
34};
35
36const MAX_ASYNC_REQUESTS: usize = 40960;
39const DEFAULT_POSTGRES_PORT: u16 = 5432;
40const DEFAULT_THREADS_COUNT: usize = 100;
41const DEFAULT_ACCOUNTS_INSERT_BATCH_SIZE: usize = 10;
42const ACCOUNT_COLUMN_COUNT: usize = 9;
43const DEFAULT_PANIC_ON_DB_ERROR: bool = false;
44const DEFAULT_STORE_ACCOUNT_HISTORICAL_DATA: bool = false;
45
46struct PostgresSqlClientWrapper {
47 client: Client,
48 update_account_stmt: Statement,
49 bulk_account_insert_stmt: Statement,
50 update_slot_with_parent_stmt: Statement,
51 update_slot_without_parent_stmt: Statement,
52 update_transaction_log_stmt: Statement,
53 update_block_metadata_stmt: Statement,
54 insert_account_audit_stmt: Option<Statement>,
55}
56
57pub struct SimplePostgresClient {
58 batch_size: usize,
59 pending_account_updates: Vec<DbAccountInfo>,
60 client: Mutex<PostgresSqlClientWrapper>,
61}
62
63struct PostgresClientWorker {
64 client: SimplePostgresClient,
65 is_startup_done: bool,
67}
68
69impl Eq for DbAccountInfo {}
70
71#[derive(Clone, PartialEq, Debug)]
72pub struct DbAccountInfo {
73 pub pubkey: Vec<u8>,
74 pub lamports: i64,
75 pub owner: Vec<u8>,
76 pub executable: bool,
77 pub rent_epoch: i64,
78 pub data: Vec<u8>,
79 pub slot: i64,
80 pub write_version: i64,
81}
82
83pub(crate) fn abort() -> ! {
84 #[cfg(not(test))]
85 {
86 eprintln!("Validator process aborted. The validator log may contain further details");
89 std::process::exit(1);
90 }
91
92 #[cfg(test)]
93 panic!("process::exit(1) is intercepted for friendly test failure...");
94}
95
96impl DbAccountInfo {
97 fn new<T: ReadableAccountInfo>(account: &T, slot: u64) -> DbAccountInfo {
98 let data = account.data().to_vec();
99 Self {
100 pubkey: account.pubkey().to_vec(),
101 lamports: account.lamports() as i64,
102 owner: account.owner().to_vec(),
103 executable: account.executable(),
104 rent_epoch: account.rent_epoch() as i64,
105 data,
106 slot: slot as i64,
107 write_version: account.write_version(),
108 }
109 }
110}
111
112pub trait ReadableAccountInfo: Sized {
113 fn pubkey(&self) -> &[u8];
114 fn owner(&self) -> &[u8];
115 fn lamports(&self) -> i64;
116 fn executable(&self) -> bool;
117 fn rent_epoch(&self) -> i64;
118 fn data(&self) -> &[u8];
119 fn write_version(&self) -> i64;
120}
121
122impl ReadableAccountInfo for DbAccountInfo {
123 fn pubkey(&self) -> &[u8] {
124 &self.pubkey
125 }
126
127 fn owner(&self) -> &[u8] {
128 &self.owner
129 }
130
131 fn lamports(&self) -> i64 {
132 self.lamports
133 }
134
135 fn executable(&self) -> bool {
136 self.executable
137 }
138
139 fn rent_epoch(&self) -> i64 {
140 self.rent_epoch
141 }
142
143 fn data(&self) -> &[u8] {
144 &self.data
145 }
146
147 fn write_version(&self) -> i64 {
148 self.write_version
149 }
150}
151
152impl<'a> ReadableAccountInfo for ReplicaAccountInfo<'a> {
153 fn pubkey(&self) -> &[u8] {
154 self.pubkey
155 }
156
157 fn owner(&self) -> &[u8] {
158 self.owner
159 }
160
161 fn lamports(&self) -> i64 {
162 self.lamports as i64
163 }
164
165 fn executable(&self) -> bool {
166 self.executable
167 }
168
169 fn rent_epoch(&self) -> i64 {
170 self.rent_epoch as i64
171 }
172
173 fn data(&self) -> &[u8] {
174 self.data
175 }
176
177 fn write_version(&self) -> i64 {
178 self.write_version as i64
179 }
180}
181
182pub trait PostgresClient {
183 fn join(&mut self) -> thread::Result<()> {
184 Ok(())
185 }
186
187 fn update_account(
188 &mut self,
189 account: DbAccountInfo,
190 is_startup: bool,
191 ) -> Result<(), AccountsDbPluginError>;
192
193 fn update_slot_status(
194 &mut self,
195 slot: u64,
196 parent: Option<u64>,
197 status: SlotStatus,
198 ) -> Result<(), AccountsDbPluginError>;
199
200 fn notify_end_of_startup(&mut self) -> Result<(), AccountsDbPluginError>;
201
202 fn log_transaction(
203 &mut self,
204 transaction_log_info: LogTransactionRequest,
205 ) -> Result<(), AccountsDbPluginError>;
206
207 fn update_block_metadata(
208 &mut self,
209 block_info: UpdateBlockMetadataRequest,
210 ) -> Result<(), AccountsDbPluginError>;
211}
212
213impl SimplePostgresClient {
214 pub fn connect_to_db(
215 config: &AccountsDbPluginPostgresConfig,
216 ) -> Result<Client, AccountsDbPluginError> {
217 let port = config.port.unwrap_or(DEFAULT_POSTGRES_PORT);
218
219 let connection_str = if let Some(connection_str) = &config.connection_str {
220 connection_str.clone()
221 } else {
222 if config.host.is_none() || config.user.is_none() {
223 let msg = format!(
224 "\"connection_str\": {:?}, or \"host\": {:?} \"user\": {:?} must be specified",
225 config.connection_str, config.host, config.user
226 );
227 return Err(AccountsDbPluginError::Custom(Box::new(
228 AccountsDbPluginPostgresError::ConfigurationError { msg },
229 )));
230 }
231 format!(
232 "host={} user={} port={}",
233 config.host.as_ref().unwrap(),
234 config.user.as_ref().unwrap(),
235 port
236 )
237 };
238
239 let result = if let Some(true) = config.use_ssl {
240 if config.server_ca.is_none() {
241 let msg = "\"server_ca\" must be specified when \"use_ssl\" is set".to_string();
242 return Err(AccountsDbPluginError::Custom(Box::new(
243 AccountsDbPluginPostgresError::ConfigurationError { msg },
244 )));
245 }
246 if config.client_cert.is_none() {
247 let msg = "\"client_cert\" must be specified when \"use_ssl\" is set".to_string();
248 return Err(AccountsDbPluginError::Custom(Box::new(
249 AccountsDbPluginPostgresError::ConfigurationError { msg },
250 )));
251 }
252 if config.client_key.is_none() {
253 let msg = "\"client_key\" must be specified when \"use_ssl\" is set".to_string();
254 return Err(AccountsDbPluginError::Custom(Box::new(
255 AccountsDbPluginPostgresError::ConfigurationError { msg },
256 )));
257 }
258 let mut builder = SslConnector::builder(SslMethod::tls()).unwrap();
259 if let Err(err) = builder.set_ca_file(config.server_ca.as_ref().unwrap()) {
260 let msg = format!(
261 "Failed to set the server certificate specified by \"server_ca\": {}. Error: ({})",
262 config.server_ca.as_ref().unwrap(), err);
263 return Err(AccountsDbPluginError::Custom(Box::new(
264 AccountsDbPluginPostgresError::ConfigurationError { msg },
265 )));
266 }
267 if let Err(err) =
268 builder.set_certificate_file(config.client_cert.as_ref().unwrap(), SslFiletype::PEM)
269 {
270 let msg = format!(
271 "Failed to set the client certificate specified by \"client_cert\": {}. Error: ({})",
272 config.client_cert.as_ref().unwrap(), err);
273 return Err(AccountsDbPluginError::Custom(Box::new(
274 AccountsDbPluginPostgresError::ConfigurationError { msg },
275 )));
276 }
277 if let Err(err) =
278 builder.set_private_key_file(config.client_key.as_ref().unwrap(), SslFiletype::PEM)
279 {
280 let msg = format!(
281 "Failed to set the client key specified by \"client_key\": {}. Error: ({})",
282 config.client_key.as_ref().unwrap(),
283 err
284 );
285 return Err(AccountsDbPluginError::Custom(Box::new(
286 AccountsDbPluginPostgresError::ConfigurationError { msg },
287 )));
288 }
289
290 let mut connector = MakeTlsConnector::new(builder.build());
291 connector.set_callback(|connect_config, _domain| {
292 connect_config.set_verify_hostname(false);
293 Ok(())
294 });
295 Client::connect(&connection_str, connector)
296 } else {
297 Client::connect(&connection_str, NoTls)
298 };
299
300 match result {
301 Err(err) => {
302 let msg = format!(
303 "Error in connecting to the PostgreSQL database: {:?} connection_str: {:?}",
304 err, connection_str
305 );
306 error!("{}", msg);
307 Err(AccountsDbPluginError::Custom(Box::new(
308 AccountsDbPluginPostgresError::DataStoreConnectionError { msg },
309 )))
310 }
311 Ok(client) => Ok(client),
312 }
313 }
314
315 fn build_bulk_account_insert_statement(
316 client: &mut Client,
317 config: &AccountsDbPluginPostgresConfig,
318 ) -> Result<Statement, AccountsDbPluginError> {
319 let batch_size = config
320 .batch_size
321 .unwrap_or(DEFAULT_ACCOUNTS_INSERT_BATCH_SIZE);
322 let mut stmt = String::from("INSERT INTO account AS acct (pubkey, slot, owner, lamports, executable, rent_epoch, data, write_version, updated_on) VALUES");
323 for j in 0..batch_size {
324 let row = j * ACCOUNT_COLUMN_COUNT;
325 let val_str = format!(
326 "(${}, ${}, ${}, ${}, ${}, ${}, ${}, ${}, ${})",
327 row + 1,
328 row + 2,
329 row + 3,
330 row + 4,
331 row + 5,
332 row + 6,
333 row + 7,
334 row + 8,
335 row + 9,
336 );
337
338 if j == 0 {
339 stmt = format!("{} {}", &stmt, val_str);
340 } else {
341 stmt = format!("{}, {}", &stmt, val_str);
342 }
343 }
344
345 let handle_conflict = "ON CONFLICT (pubkey) DO UPDATE SET slot=excluded.slot, owner=excluded.owner, lamports=excluded.lamports, executable=excluded.executable, rent_epoch=excluded.rent_epoch, \
346 data=excluded.data, write_version=excluded.write_version, updated_on=excluded.updated_on WHERE acct.slot < excluded.slot OR (\
347 acct.slot = excluded.slot AND acct.write_version < excluded.write_version)";
348
349 stmt = format!("{} {}", stmt, handle_conflict);
350
351 info!("{}", stmt);
352 let bulk_stmt = client.prepare(&stmt);
353
354 match bulk_stmt {
355 Err(err) => {
356 return Err(AccountsDbPluginError::Custom(Box::new(AccountsDbPluginPostgresError::DataSchemaError {
357 msg: format!(
358 "Error in preparing for the accounts update PostgreSQL database: {} host: {:?} user: {:?} config: {:?}",
359 err, config.host, config.user, config
360 ),
361 })));
362 }
363 Ok(update_account_stmt) => Ok(update_account_stmt),
364 }
365 }
366
367 fn build_single_account_upsert_statement(
368 client: &mut Client,
369 config: &AccountsDbPluginPostgresConfig,
370 ) -> Result<Statement, AccountsDbPluginError> {
371 let stmt = "INSERT INTO account AS acct (pubkey, slot, owner, lamports, executable, rent_epoch, data, write_version, updated_on) \
372 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) \
373 ON CONFLICT (pubkey) DO UPDATE SET slot=excluded.slot, owner=excluded.owner, lamports=excluded.lamports, executable=excluded.executable, rent_epoch=excluded.rent_epoch, \
374 data=excluded.data, write_version=excluded.write_version, updated_on=excluded.updated_on WHERE acct.slot < excluded.slot OR (\
375 acct.slot = excluded.slot AND acct.write_version < excluded.write_version)";
376
377 let stmt = client.prepare(stmt);
378
379 match stmt {
380 Err(err) => {
381 return Err(AccountsDbPluginError::Custom(Box::new(AccountsDbPluginPostgresError::DataSchemaError {
382 msg: format!(
383 "Error in preparing for the accounts update PostgreSQL database: {} host: {:?} user: {:?} config: {:?}",
384 err, config.host, config.user, config
385 ),
386 })));
387 }
388 Ok(update_account_stmt) => Ok(update_account_stmt),
389 }
390 }
391
392 fn build_account_audit_insert_statement(
393 client: &mut Client,
394 config: &AccountsDbPluginPostgresConfig,
395 ) -> Result<Statement, AccountsDbPluginError> {
396 let stmt = "INSERT INTO account_audit (pubkey, slot, owner, lamports, executable, rent_epoch, data, write_version, updated_on) \
397 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)";
398
399 let stmt = client.prepare(stmt);
400
401 match stmt {
402 Err(err) => {
403 return Err(AccountsDbPluginError::Custom(Box::new(AccountsDbPluginPostgresError::DataSchemaError {
404 msg: format!(
405 "Error in preparing for the account_audit update PostgreSQL database: {} host: {:?} user: {:?} config: {:?}",
406 err, config.host, config.user, config
407 ),
408 })));
409 }
410 Ok(stmt) => Ok(stmt),
411 }
412 }
413
414 fn build_slot_upsert_statement_with_parent(
415 client: &mut Client,
416 config: &AccountsDbPluginPostgresConfig,
417 ) -> Result<Statement, AccountsDbPluginError> {
418 let stmt = "INSERT INTO slot (slot, parent, status, updated_on) \
419 VALUES ($1, $2, $3, $4) \
420 ON CONFLICT (slot) DO UPDATE SET parent=excluded.parent, status=excluded.status, updated_on=excluded.updated_on";
421
422 let stmt = client.prepare(stmt);
423
424 match stmt {
425 Err(err) => {
426 return Err(AccountsDbPluginError::Custom(Box::new(AccountsDbPluginPostgresError::DataSchemaError {
427 msg: format!(
428 "Error in preparing for the slot update PostgreSQL database: {} host: {:?} user: {:?} config: {:?}",
429 err, config.host, config.user, config
430 ),
431 })));
432 }
433 Ok(stmt) => Ok(stmt),
434 }
435 }
436
437 fn build_slot_upsert_statement_without_parent(
438 client: &mut Client,
439 config: &AccountsDbPluginPostgresConfig,
440 ) -> Result<Statement, AccountsDbPluginError> {
441 let stmt = "INSERT INTO slot (slot, status, updated_on) \
442 VALUES ($1, $2, $3) \
443 ON CONFLICT (slot) DO UPDATE SET status=excluded.status, updated_on=excluded.updated_on";
444
445 let stmt = client.prepare(stmt);
446
447 match stmt {
448 Err(err) => {
449 return Err(AccountsDbPluginError::Custom(Box::new(AccountsDbPluginPostgresError::DataSchemaError {
450 msg: format!(
451 "Error in preparing for the slot update PostgreSQL database: {} host: {:?} user: {:?} config: {:?}",
452 err, config.host, config.user, config
453 ),
454 })));
455 }
456 Ok(stmt) => Ok(stmt),
457 }
458 }
459
460 fn insert_account_audit(
462 account: &DbAccountInfo,
463 statement: &Statement,
464 client: &mut Client,
465 ) -> Result<(), AccountsDbPluginError> {
466 let lamports = account.lamports() as i64;
467 let rent_epoch = account.rent_epoch() as i64;
468 let updated_on = Utc::now().naive_utc();
469 let result = client.execute(
470 statement,
471 &[
472 &account.pubkey(),
473 &account.slot,
474 &account.owner(),
475 &lamports,
476 &account.executable(),
477 &rent_epoch,
478 &account.data(),
479 &account.write_version(),
480 &updated_on,
481 ],
482 );
483
484 if let Err(err) = result {
485 let msg = format!(
486 "Failed to persist the insert of account_audit to the PostgreSQL database. Error: {:?}",
487 err
488 );
489 error!("{}", msg);
490 return Err(AccountsDbPluginError::AccountsUpdateError { msg });
491 }
492 Ok(())
493 }
494
495 fn upsert_account_internal(
497 account: &DbAccountInfo,
498 statement: &Statement,
499 client: &mut Client,
500 insert_account_audit_stmt: &Option<Statement>,
501 ) -> Result<(), AccountsDbPluginError> {
502 let lamports = account.lamports() as i64;
503 let rent_epoch = account.rent_epoch() as i64;
504 let updated_on = Utc::now().naive_utc();
505 let result = client.execute(
506 statement,
507 &[
508 &account.pubkey(),
509 &account.slot,
510 &account.owner(),
511 &lamports,
512 &account.executable(),
513 &rent_epoch,
514 &account.data(),
515 &account.write_version(),
516 &updated_on,
517 ],
518 );
519
520 if let Err(err) = result {
521 let msg = format!(
522 "Failed to persist the update of account to the PostgreSQL database. Error: {:?}",
523 err
524 );
525 error!("{}", msg);
526 return Err(AccountsDbPluginError::AccountsUpdateError { msg });
527 } else if result.unwrap() == 0 && insert_account_audit_stmt.is_some() {
528 let statement = insert_account_audit_stmt.as_ref().unwrap();
531 Self::insert_account_audit(account, statement, client)?;
532 }
533
534 Ok(())
535 }
536
537 fn upsert_account(&mut self, account: &DbAccountInfo) -> Result<(), AccountsDbPluginError> {
539 let client = self.client.get_mut().unwrap();
540 let insert_account_audit_stmt = &client.insert_account_audit_stmt;
541 let statement = &client.update_account_stmt;
542 let client = &mut client.client;
543 Self::upsert_account_internal(account, statement, client, insert_account_audit_stmt)
544 }
545
546 fn insert_accounts_in_batch(
548 &mut self,
549 account: DbAccountInfo,
550 ) -> Result<(), AccountsDbPluginError> {
551 self.pending_account_updates.push(account);
552
553 if self.pending_account_updates.len() == self.batch_size {
554 let mut measure = Measure::start("accountsdb-plugin-postgres-prepare-values");
555
556 let mut values: Vec<&(dyn types::ToSql + Sync)> =
557 Vec::with_capacity(self.batch_size * ACCOUNT_COLUMN_COUNT);
558 let updated_on = Utc::now().naive_utc();
559 for j in 0..self.batch_size {
560 let account = &self.pending_account_updates[j];
561
562 values.push(&account.pubkey);
563 values.push(&account.slot);
564 values.push(&account.owner);
565 values.push(&account.lamports);
566 values.push(&account.executable);
567 values.push(&account.rent_epoch);
568 values.push(&account.data);
569 values.push(&account.write_version);
570 values.push(&updated_on);
571 }
572 measure.stop();
573 inc_new_counter_debug!(
574 "accountsdb-plugin-postgres-prepare-values-us",
575 measure.as_us() as usize,
576 10000,
577 10000
578 );
579
580 let mut measure = Measure::start("accountsdb-plugin-postgres-update-account");
581 let client = self.client.get_mut().unwrap();
582 let result = client
583 .client
584 .query(&client.bulk_account_insert_stmt, &values);
585
586 self.pending_account_updates.clear();
587 if let Err(err) = result {
588 let msg = format!(
589 "Failed to persist the update of account to the PostgreSQL database. Error: {:?}",
590 err
591 );
592 error!("{}", msg);
593 return Err(AccountsDbPluginError::AccountsUpdateError { msg });
594 }
595 measure.stop();
596 inc_new_counter_debug!(
597 "accountsdb-plugin-postgres-update-account-us",
598 measure.as_us() as usize,
599 10000,
600 10000
601 );
602 inc_new_counter_debug!(
603 "accountsdb-plugin-postgres-update-account-count",
604 self.batch_size,
605 10000,
606 10000
607 );
608 }
609 Ok(())
610 }
611
612 fn flush_buffered_writes(&mut self) -> Result<(), AccountsDbPluginError> {
614 if self.pending_account_updates.is_empty() {
615 return Ok(());
616 }
617
618 let client = self.client.get_mut().unwrap();
619 let insert_account_audit_stmt = &client.insert_account_audit_stmt;
620 let statement = &client.update_account_stmt;
621 let client = &mut client.client;
622
623 for account in self.pending_account_updates.drain(..) {
624 Self::upsert_account_internal(&account, statement, client, insert_account_audit_stmt)?;
625 }
626
627 Ok(())
628 }
629
630 pub fn new(config: &AccountsDbPluginPostgresConfig) -> Result<Self, AccountsDbPluginError> {
631 info!("Creating SimplePostgresClient...");
632 let mut client = Self::connect_to_db(config)?;
633 let bulk_account_insert_stmt =
634 Self::build_bulk_account_insert_statement(&mut client, config)?;
635 let update_account_stmt = Self::build_single_account_upsert_statement(&mut client, config)?;
636
637 let update_slot_with_parent_stmt =
638 Self::build_slot_upsert_statement_with_parent(&mut client, config)?;
639 let update_slot_without_parent_stmt =
640 Self::build_slot_upsert_statement_without_parent(&mut client, config)?;
641 let update_transaction_log_stmt =
642 Self::build_transaction_info_upsert_statement(&mut client, config)?;
643 let update_block_metadata_stmt =
644 Self::build_block_metadata_upsert_statement(&mut client, config)?;
645
646 let batch_size = config
647 .batch_size
648 .unwrap_or(DEFAULT_ACCOUNTS_INSERT_BATCH_SIZE);
649
650 let store_account_historical_data = config
651 .store_account_historical_data
652 .unwrap_or(DEFAULT_STORE_ACCOUNT_HISTORICAL_DATA);
653
654 let insert_account_audit_stmt = if store_account_historical_data {
655 let stmt = Self::build_account_audit_insert_statement(&mut client, config)?;
656 Some(stmt)
657 } else {
658 None
659 };
660
661 info!("Created SimplePostgresClient.");
662 Ok(Self {
663 batch_size,
664 pending_account_updates: Vec::with_capacity(batch_size),
665 client: Mutex::new(PostgresSqlClientWrapper {
666 client,
667 update_account_stmt,
668 bulk_account_insert_stmt,
669 update_slot_with_parent_stmt,
670 update_slot_without_parent_stmt,
671 update_transaction_log_stmt,
672 update_block_metadata_stmt,
673 insert_account_audit_stmt,
674 }),
675 })
676 }
677}
678
679impl PostgresClient for SimplePostgresClient {
680 fn update_account(
681 &mut self,
682 account: DbAccountInfo,
683 is_startup: bool,
684 ) -> Result<(), AccountsDbPluginError> {
685 trace!(
686 "Updating account {} with owner {} at slot {}",
687 bs58::encode(account.pubkey()).into_string(),
688 bs58::encode(account.owner()).into_string(),
689 account.slot,
690 );
691 if !is_startup {
692 return self.upsert_account(&account);
693 }
694 self.insert_accounts_in_batch(account)
695 }
696
697 fn update_slot_status(
698 &mut self,
699 slot: u64,
700 parent: Option<u64>,
701 status: SlotStatus,
702 ) -> Result<(), AccountsDbPluginError> {
703 info!("Updating slot {:?} at with status {:?}", slot, status);
704
705 let slot = slot as i64; let parent = parent.map(|parent| parent as i64);
707 let updated_on = Utc::now().naive_utc();
708 let status_str = status.as_str();
709 let client = self.client.get_mut().unwrap();
710
711 let result = match parent {
712 Some(parent) => client.client.execute(
713 &client.update_slot_with_parent_stmt,
714 &[&slot, &parent, &status_str, &updated_on],
715 ),
716 None => client.client.execute(
717 &client.update_slot_without_parent_stmt,
718 &[&slot, &status_str, &updated_on],
719 ),
720 };
721
722 match result {
723 Err(err) => {
724 let msg = format!(
725 "Failed to persist the update of slot to the PostgreSQL database. Error: {:?}",
726 err
727 );
728 error!("{:?}", msg);
729 return Err(AccountsDbPluginError::SlotStatusUpdateError { msg });
730 }
731 Ok(rows) => {
732 assert_eq!(1, rows, "Expected one rows to be updated a time");
733 }
734 }
735
736 Ok(())
737 }
738
739 fn notify_end_of_startup(&mut self) -> Result<(), AccountsDbPluginError> {
740 self.flush_buffered_writes()
741 }
742
743 fn log_transaction(
744 &mut self,
745 transaction_log_info: LogTransactionRequest,
746 ) -> Result<(), AccountsDbPluginError> {
747 self.log_transaction_impl(transaction_log_info)
748 }
749
750 fn update_block_metadata(
751 &mut self,
752 block_info: UpdateBlockMetadataRequest,
753 ) -> Result<(), AccountsDbPluginError> {
754 self.update_block_metadata_impl(block_info)
755 }
756}
757
758struct UpdateAccountRequest {
759 account: DbAccountInfo,
760 is_startup: bool,
761}
762
763struct UpdateSlotRequest {
764 slot: u64,
765 parent: Option<u64>,
766 slot_status: SlotStatus,
767}
768
769pub struct UpdateBlockMetadataRequest {
770 pub block_info: DbBlockInfo,
771}
772
773#[warn(clippy::large_enum_variant)]
774enum DbWorkItem {
775 UpdateAccount(Box<UpdateAccountRequest>),
776 UpdateSlot(Box<UpdateSlotRequest>),
777 LogTransaction(Box<LogTransactionRequest>),
778 UpdateBlockMetadata(Box<UpdateBlockMetadataRequest>),
779}
780
781impl PostgresClientWorker {
782 fn new(config: AccountsDbPluginPostgresConfig) -> Result<Self, AccountsDbPluginError> {
783 let result = SimplePostgresClient::new(&config);
784 match result {
785 Ok(client) => Ok(PostgresClientWorker {
786 client,
787 is_startup_done: false,
788 }),
789 Err(err) => {
790 error!("Error in creating SimplePostgresClient: {}", err);
791 Err(err)
792 }
793 }
794 }
795
796 fn do_work(
797 &mut self,
798 receiver: Receiver<DbWorkItem>,
799 exit_worker: Arc<AtomicBool>,
800 is_startup_done: Arc<AtomicBool>,
801 startup_done_count: Arc<AtomicUsize>,
802 panic_on_db_errors: bool,
803 ) -> Result<(), AccountsDbPluginError> {
804 while !exit_worker.load(Ordering::Relaxed) {
805 let mut measure = Measure::start("accountsdb-plugin-postgres-worker-recv");
806 let work = receiver.recv_timeout(Duration::from_millis(500));
807 measure.stop();
808 inc_new_counter_debug!(
809 "accountsdb-plugin-postgres-worker-recv-us",
810 measure.as_us() as usize,
811 100000,
812 100000
813 );
814 match work {
815 Ok(work) => match work {
816 DbWorkItem::UpdateAccount(request) => {
817 if let Err(err) = self
818 .client
819 .update_account(request.account, request.is_startup)
820 {
821 error!("Failed to update account: ({})", err);
822 if panic_on_db_errors {
823 abort();
824 }
825 }
826 }
827 DbWorkItem::UpdateSlot(request) => {
828 if let Err(err) = self.client.update_slot_status(
829 request.slot,
830 request.parent,
831 request.slot_status,
832 ) {
833 error!("Failed to update slot: ({})", err);
834 if panic_on_db_errors {
835 abort();
836 }
837 }
838 }
839 DbWorkItem::LogTransaction(transaction_log_info) => {
840 if let Err(err) = self.client.log_transaction(*transaction_log_info) {
841 error!("Failed to update transaction: ({})", err);
842 if panic_on_db_errors {
843 abort();
844 }
845 }
846 }
847 DbWorkItem::UpdateBlockMetadata(block_info) => {
848 if let Err(err) = self.client.update_block_metadata(*block_info) {
849 error!("Failed to update block metadata: ({})", err);
850 if panic_on_db_errors {
851 abort();
852 }
853 }
854 }
855 },
856 Err(err) => match err {
857 RecvTimeoutError::Timeout => {
858 if !self.is_startup_done && is_startup_done.load(Ordering::Relaxed) {
859 if let Err(err) = self.client.notify_end_of_startup() {
860 error!("Error in notifying end of startup: ({})", err);
861 if panic_on_db_errors {
862 abort();
863 }
864 }
865 self.is_startup_done = true;
866 startup_done_count.fetch_add(1, Ordering::Relaxed);
867 }
868
869 continue;
870 }
871 _ => {
872 error!("Error in receiving the item {:?}", err);
873 if panic_on_db_errors {
874 abort();
875 }
876 break;
877 }
878 },
879 }
880 }
881 Ok(())
882 }
883}
884pub struct ParallelPostgresClient {
885 workers: Vec<JoinHandle<Result<(), AccountsDbPluginError>>>,
886 exit_worker: Arc<AtomicBool>,
887 is_startup_done: Arc<AtomicBool>,
888 startup_done_count: Arc<AtomicUsize>,
889 initialized_worker_count: Arc<AtomicUsize>,
890 sender: Sender<DbWorkItem>,
891 last_report: AtomicInterval,
892}
893
894impl ParallelPostgresClient {
895 pub fn new(config: &AccountsDbPluginPostgresConfig) -> Result<Self, AccountsDbPluginError> {
896 info!("Creating ParallelPostgresClient...");
897 let (sender, receiver) = bounded(MAX_ASYNC_REQUESTS);
898 let exit_worker = Arc::new(AtomicBool::new(false));
899 let mut workers = Vec::default();
900 let is_startup_done = Arc::new(AtomicBool::new(false));
901 let startup_done_count = Arc::new(AtomicUsize::new(0));
902 let worker_count = config.threads.unwrap_or(DEFAULT_THREADS_COUNT);
903 let initialized_worker_count = Arc::new(AtomicUsize::new(0));
904 for i in 0..worker_count {
905 let cloned_receiver = receiver.clone();
906 let exit_clone = exit_worker.clone();
907 let is_startup_done_clone = is_startup_done.clone();
908 let startup_done_count_clone = startup_done_count.clone();
909 let initialized_worker_count_clone = initialized_worker_count.clone();
910 let config = config.clone();
911 let worker = Builder::new()
912 .name(format!("worker-{}", i))
913 .spawn(move || -> Result<(), AccountsDbPluginError> {
914 let panic_on_db_errors = *config
915 .panic_on_db_errors
916 .as_ref()
917 .unwrap_or(&DEFAULT_PANIC_ON_DB_ERROR);
918 let result = PostgresClientWorker::new(config);
919
920 match result {
921 Ok(mut worker) => {
922 initialized_worker_count_clone.fetch_add(1, Ordering::Relaxed);
923 worker.do_work(
924 cloned_receiver,
925 exit_clone,
926 is_startup_done_clone,
927 startup_done_count_clone,
928 panic_on_db_errors,
929 )?;
930 Ok(())
931 }
932 Err(err) => {
933 error!("Error when making connection to database: ({})", err);
934 if panic_on_db_errors {
935 abort();
936 }
937 Err(err)
938 }
939 }
940 })
941 .unwrap();
942
943 workers.push(worker);
944 }
945
946 info!("Created ParallelPostgresClient.");
947 Ok(Self {
948 last_report: AtomicInterval::default(),
949 workers,
950 exit_worker,
951 is_startup_done,
952 startup_done_count,
953 initialized_worker_count,
954 sender,
955 })
956 }
957
958 pub fn join(&mut self) -> thread::Result<()> {
959 self.exit_worker.store(true, Ordering::Relaxed);
960 while !self.workers.is_empty() {
961 let worker = self.workers.pop();
962 if worker.is_none() {
963 break;
964 }
965 let worker = worker.unwrap();
966 let result = worker.join().unwrap();
967 if result.is_err() {
968 error!("The worker thread has failed: {:?}", result);
969 }
970 }
971
972 Ok(())
973 }
974
975 pub fn update_account(
976 &mut self,
977 account: &ReplicaAccountInfo,
978 slot: u64,
979 is_startup: bool,
980 ) -> Result<(), AccountsDbPluginError> {
981 if self.last_report.should_update(30000) {
982 datapoint_debug!(
983 "postgres-plugin-stats",
984 ("message-queue-length", self.sender.len() as i64, i64),
985 );
986 }
987 let mut measure = Measure::start("accountsdb-plugin-posgres-create-work-item");
988 let wrk_item = DbWorkItem::UpdateAccount(Box::new(UpdateAccountRequest {
989 account: DbAccountInfo::new(account, slot),
990 is_startup,
991 }));
992
993 measure.stop();
994
995 inc_new_counter_debug!(
996 "accountsdb-plugin-posgres-create-work-item-us",
997 measure.as_us() as usize,
998 100000,
999 100000
1000 );
1001
1002 let mut measure = Measure::start("accountsdb-plugin-posgres-send-msg");
1003
1004 if let Err(err) = self.sender.send(wrk_item) {
1005 return Err(AccountsDbPluginError::AccountsUpdateError {
1006 msg: format!(
1007 "Failed to update the account {:?}, error: {:?}",
1008 bs58::encode(account.pubkey()).into_string(),
1009 err
1010 ),
1011 });
1012 }
1013
1014 measure.stop();
1015 inc_new_counter_debug!(
1016 "accountsdb-plugin-posgres-send-msg-us",
1017 measure.as_us() as usize,
1018 100000,
1019 100000
1020 );
1021
1022 Ok(())
1023 }
1024
1025 pub fn update_slot_status(
1026 &mut self,
1027 slot: u64,
1028 parent: Option<u64>,
1029 status: SlotStatus,
1030 ) -> Result<(), AccountsDbPluginError> {
1031 if let Err(err) = self
1032 .sender
1033 .send(DbWorkItem::UpdateSlot(Box::new(UpdateSlotRequest {
1034 slot,
1035 parent,
1036 slot_status: status,
1037 })))
1038 {
1039 return Err(AccountsDbPluginError::SlotStatusUpdateError {
1040 msg: format!("Failed to update the slot {:?}, error: {:?}", slot, err),
1041 });
1042 }
1043 Ok(())
1044 }
1045
1046 pub fn update_block_metadata(
1047 &mut self,
1048 block_info: &ReplicaBlockInfo,
1049 ) -> Result<(), AccountsDbPluginError> {
1050 if let Err(err) = self.sender.send(DbWorkItem::UpdateBlockMetadata(Box::new(
1051 UpdateBlockMetadataRequest {
1052 block_info: DbBlockInfo::from(block_info),
1053 },
1054 ))) {
1055 return Err(AccountsDbPluginError::SlotStatusUpdateError {
1056 msg: format!(
1057 "Failed to update the block metadata at slot {:?}, error: {:?}",
1058 block_info.slot, err
1059 ),
1060 });
1061 }
1062 Ok(())
1063 }
1064
1065 pub fn notify_end_of_startup(&mut self) -> Result<(), AccountsDbPluginError> {
1066 info!("Notifying the end of startup");
1067 while !self.sender.is_empty() {
1069 sleep(Duration::from_millis(100));
1070 }
1071 self.is_startup_done.store(true, Ordering::Relaxed);
1072
1073 while self.startup_done_count.load(Ordering::Relaxed)
1075 != self.initialized_worker_count.load(Ordering::Relaxed)
1076 {
1077 info!(
1078 "Startup done count: {}, good worker thread count: {}",
1079 self.startup_done_count.load(Ordering::Relaxed),
1080 self.initialized_worker_count.load(Ordering::Relaxed)
1081 );
1082 sleep(Duration::from_millis(100));
1083 }
1084
1085 info!("Done with notifying the end of startup");
1086 Ok(())
1087 }
1088}
1089
1090pub struct PostgresClientBuilder {}
1091
1092impl PostgresClientBuilder {
1093 pub fn build_pararallel_postgres_client(
1094 config: &AccountsDbPluginPostgresConfig,
1095 ) -> Result<ParallelPostgresClient, AccountsDbPluginError> {
1096 ParallelPostgresClient::new(config)
1097 }
1098
1099 pub fn build_simple_postgres_client(
1100 config: &AccountsDbPluginPostgresConfig,
1101 ) -> Result<SimplePostgresClient, AccountsDbPluginError> {
1102 SimplePostgresClient::new(config)
1103 }
1104}