solana_accountsdb_plugin_postgres/
postgres_client.rs

1#![allow(clippy::integer_arithmetic)]
2
3mod postgres_client_block_metadata;
4mod postgres_client_transaction;
5
6/// A concurrent implementation for writing accounts into the PostgreSQL in parallel.
7use {
8    crate::accountsdb_plugin_postgres::{
9        AccountsDbPluginPostgresConfig, AccountsDbPluginPostgresError,
10    },
11    chrono::Utc,
12    crossbeam_channel::{bounded, Receiver, RecvTimeoutError, Sender},
13    log::*,
14    openssl::ssl::{SslConnector, SslFiletype, SslMethod},
15    postgres::{Client, NoTls, Statement},
16    postgres_client_block_metadata::DbBlockInfo,
17    postgres_client_transaction::LogTransactionRequest,
18    postgres_openssl::MakeTlsConnector,
19    solana_accountsdb_plugin_interface::accountsdb_plugin_interface::{
20        AccountsDbPluginError, ReplicaAccountInfo, ReplicaBlockInfo, SlotStatus,
21    },
22    solana_measure::measure::Measure,
23    solana_metrics::*,
24    solana_sdk::timing::AtomicInterval,
25    std::{
26        sync::{
27            atomic::{AtomicBool, AtomicUsize, Ordering},
28            Arc, Mutex,
29        },
30        thread::{self, sleep, Builder, JoinHandle},
31        time::Duration,
32    },
33    tokio_postgres::types,
34};
35
36/// The maximum asynchronous requests allowed in the channel to avoid excessive
37/// memory usage. The downside -- calls after this threshold is reached can get blocked.
38const MAX_ASYNC_REQUESTS: usize = 40960;
39const DEFAULT_POSTGRES_PORT: u16 = 5432;
40const DEFAULT_THREADS_COUNT: usize = 100;
41const DEFAULT_ACCOUNTS_INSERT_BATCH_SIZE: usize = 10;
42const ACCOUNT_COLUMN_COUNT: usize = 9;
43const DEFAULT_PANIC_ON_DB_ERROR: bool = false;
44const DEFAULT_STORE_ACCOUNT_HISTORICAL_DATA: bool = false;
45
46struct PostgresSqlClientWrapper {
47    client: Client,
48    update_account_stmt: Statement,
49    bulk_account_insert_stmt: Statement,
50    update_slot_with_parent_stmt: Statement,
51    update_slot_without_parent_stmt: Statement,
52    update_transaction_log_stmt: Statement,
53    update_block_metadata_stmt: Statement,
54    insert_account_audit_stmt: Option<Statement>,
55}
56
57pub struct SimplePostgresClient {
58    batch_size: usize,
59    pending_account_updates: Vec<DbAccountInfo>,
60    client: Mutex<PostgresSqlClientWrapper>,
61}
62
63struct PostgresClientWorker {
64    client: SimplePostgresClient,
65    /// Indicating if accounts notification during startup is done.
66    is_startup_done: bool,
67}
68
69impl Eq for DbAccountInfo {}
70
71#[derive(Clone, PartialEq, Debug)]
72pub struct DbAccountInfo {
73    pub pubkey: Vec<u8>,
74    pub lamports: i64,
75    pub owner: Vec<u8>,
76    pub executable: bool,
77    pub rent_epoch: i64,
78    pub data: Vec<u8>,
79    pub slot: i64,
80    pub write_version: i64,
81}
82
83pub(crate) fn abort() -> ! {
84    #[cfg(not(test))]
85    {
86        // standard error is usually redirected to a log file, cry for help on standard output as
87        // well
88        eprintln!("Validator process aborted. The validator log may contain further details");
89        std::process::exit(1);
90    }
91
92    #[cfg(test)]
93    panic!("process::exit(1) is intercepted for friendly test failure...");
94}
95
96impl DbAccountInfo {
97    fn new<T: ReadableAccountInfo>(account: &T, slot: u64) -> DbAccountInfo {
98        let data = account.data().to_vec();
99        Self {
100            pubkey: account.pubkey().to_vec(),
101            lamports: account.lamports() as i64,
102            owner: account.owner().to_vec(),
103            executable: account.executable(),
104            rent_epoch: account.rent_epoch() as i64,
105            data,
106            slot: slot as i64,
107            write_version: account.write_version(),
108        }
109    }
110}
111
112pub trait ReadableAccountInfo: Sized {
113    fn pubkey(&self) -> &[u8];
114    fn owner(&self) -> &[u8];
115    fn lamports(&self) -> i64;
116    fn executable(&self) -> bool;
117    fn rent_epoch(&self) -> i64;
118    fn data(&self) -> &[u8];
119    fn write_version(&self) -> i64;
120}
121
122impl ReadableAccountInfo for DbAccountInfo {
123    fn pubkey(&self) -> &[u8] {
124        &self.pubkey
125    }
126
127    fn owner(&self) -> &[u8] {
128        &self.owner
129    }
130
131    fn lamports(&self) -> i64 {
132        self.lamports
133    }
134
135    fn executable(&self) -> bool {
136        self.executable
137    }
138
139    fn rent_epoch(&self) -> i64 {
140        self.rent_epoch
141    }
142
143    fn data(&self) -> &[u8] {
144        &self.data
145    }
146
147    fn write_version(&self) -> i64 {
148        self.write_version
149    }
150}
151
152impl<'a> ReadableAccountInfo for ReplicaAccountInfo<'a> {
153    fn pubkey(&self) -> &[u8] {
154        self.pubkey
155    }
156
157    fn owner(&self) -> &[u8] {
158        self.owner
159    }
160
161    fn lamports(&self) -> i64 {
162        self.lamports as i64
163    }
164
165    fn executable(&self) -> bool {
166        self.executable
167    }
168
169    fn rent_epoch(&self) -> i64 {
170        self.rent_epoch as i64
171    }
172
173    fn data(&self) -> &[u8] {
174        self.data
175    }
176
177    fn write_version(&self) -> i64 {
178        self.write_version as i64
179    }
180}
181
182pub trait PostgresClient {
183    fn join(&mut self) -> thread::Result<()> {
184        Ok(())
185    }
186
187    fn update_account(
188        &mut self,
189        account: DbAccountInfo,
190        is_startup: bool,
191    ) -> Result<(), AccountsDbPluginError>;
192
193    fn update_slot_status(
194        &mut self,
195        slot: u64,
196        parent: Option<u64>,
197        status: SlotStatus,
198    ) -> Result<(), AccountsDbPluginError>;
199
200    fn notify_end_of_startup(&mut self) -> Result<(), AccountsDbPluginError>;
201
202    fn log_transaction(
203        &mut self,
204        transaction_log_info: LogTransactionRequest,
205    ) -> Result<(), AccountsDbPluginError>;
206
207    fn update_block_metadata(
208        &mut self,
209        block_info: UpdateBlockMetadataRequest,
210    ) -> Result<(), AccountsDbPluginError>;
211}
212
213impl SimplePostgresClient {
214    pub fn connect_to_db(
215        config: &AccountsDbPluginPostgresConfig,
216    ) -> Result<Client, AccountsDbPluginError> {
217        let port = config.port.unwrap_or(DEFAULT_POSTGRES_PORT);
218
219        let connection_str = if let Some(connection_str) = &config.connection_str {
220            connection_str.clone()
221        } else {
222            if config.host.is_none() || config.user.is_none() {
223                let msg = format!(
224                    "\"connection_str\": {:?}, or \"host\": {:?} \"user\": {:?} must be specified",
225                    config.connection_str, config.host, config.user
226                );
227                return Err(AccountsDbPluginError::Custom(Box::new(
228                    AccountsDbPluginPostgresError::ConfigurationError { msg },
229                )));
230            }
231            format!(
232                "host={} user={} port={}",
233                config.host.as_ref().unwrap(),
234                config.user.as_ref().unwrap(),
235                port
236            )
237        };
238
239        let result = if let Some(true) = config.use_ssl {
240            if config.server_ca.is_none() {
241                let msg = "\"server_ca\" must be specified when \"use_ssl\" is set".to_string();
242                return Err(AccountsDbPluginError::Custom(Box::new(
243                    AccountsDbPluginPostgresError::ConfigurationError { msg },
244                )));
245            }
246            if config.client_cert.is_none() {
247                let msg = "\"client_cert\" must be specified when \"use_ssl\" is set".to_string();
248                return Err(AccountsDbPluginError::Custom(Box::new(
249                    AccountsDbPluginPostgresError::ConfigurationError { msg },
250                )));
251            }
252            if config.client_key.is_none() {
253                let msg = "\"client_key\" must be specified when \"use_ssl\" is set".to_string();
254                return Err(AccountsDbPluginError::Custom(Box::new(
255                    AccountsDbPluginPostgresError::ConfigurationError { msg },
256                )));
257            }
258            let mut builder = SslConnector::builder(SslMethod::tls()).unwrap();
259            if let Err(err) = builder.set_ca_file(config.server_ca.as_ref().unwrap()) {
260                let msg = format!(
261                    "Failed to set the server certificate specified by \"server_ca\": {}. Error: ({})",
262                    config.server_ca.as_ref().unwrap(), err);
263                return Err(AccountsDbPluginError::Custom(Box::new(
264                    AccountsDbPluginPostgresError::ConfigurationError { msg },
265                )));
266            }
267            if let Err(err) =
268                builder.set_certificate_file(config.client_cert.as_ref().unwrap(), SslFiletype::PEM)
269            {
270                let msg = format!(
271                    "Failed to set the client certificate specified by \"client_cert\": {}. Error: ({})",
272                    config.client_cert.as_ref().unwrap(), err);
273                return Err(AccountsDbPluginError::Custom(Box::new(
274                    AccountsDbPluginPostgresError::ConfigurationError { msg },
275                )));
276            }
277            if let Err(err) =
278                builder.set_private_key_file(config.client_key.as_ref().unwrap(), SslFiletype::PEM)
279            {
280                let msg = format!(
281                    "Failed to set the client key specified by \"client_key\": {}. Error: ({})",
282                    config.client_key.as_ref().unwrap(),
283                    err
284                );
285                return Err(AccountsDbPluginError::Custom(Box::new(
286                    AccountsDbPluginPostgresError::ConfigurationError { msg },
287                )));
288            }
289
290            let mut connector = MakeTlsConnector::new(builder.build());
291            connector.set_callback(|connect_config, _domain| {
292                connect_config.set_verify_hostname(false);
293                Ok(())
294            });
295            Client::connect(&connection_str, connector)
296        } else {
297            Client::connect(&connection_str, NoTls)
298        };
299
300        match result {
301            Err(err) => {
302                let msg = format!(
303                    "Error in connecting to the PostgreSQL database: {:?} connection_str: {:?}",
304                    err, connection_str
305                );
306                error!("{}", msg);
307                Err(AccountsDbPluginError::Custom(Box::new(
308                    AccountsDbPluginPostgresError::DataStoreConnectionError { msg },
309                )))
310            }
311            Ok(client) => Ok(client),
312        }
313    }
314
315    fn build_bulk_account_insert_statement(
316        client: &mut Client,
317        config: &AccountsDbPluginPostgresConfig,
318    ) -> Result<Statement, AccountsDbPluginError> {
319        let batch_size = config
320            .batch_size
321            .unwrap_or(DEFAULT_ACCOUNTS_INSERT_BATCH_SIZE);
322        let mut stmt = String::from("INSERT INTO account AS acct (pubkey, slot, owner, lamports, executable, rent_epoch, data, write_version, updated_on) VALUES");
323        for j in 0..batch_size {
324            let row = j * ACCOUNT_COLUMN_COUNT;
325            let val_str = format!(
326                "(${}, ${}, ${}, ${}, ${}, ${}, ${}, ${}, ${})",
327                row + 1,
328                row + 2,
329                row + 3,
330                row + 4,
331                row + 5,
332                row + 6,
333                row + 7,
334                row + 8,
335                row + 9,
336            );
337
338            if j == 0 {
339                stmt = format!("{} {}", &stmt, val_str);
340            } else {
341                stmt = format!("{}, {}", &stmt, val_str);
342            }
343        }
344
345        let handle_conflict = "ON CONFLICT (pubkey) DO UPDATE SET slot=excluded.slot, owner=excluded.owner, lamports=excluded.lamports, executable=excluded.executable, rent_epoch=excluded.rent_epoch, \
346            data=excluded.data, write_version=excluded.write_version, updated_on=excluded.updated_on WHERE acct.slot < excluded.slot OR (\
347            acct.slot = excluded.slot AND acct.write_version < excluded.write_version)";
348
349        stmt = format!("{} {}", stmt, handle_conflict);
350
351        info!("{}", stmt);
352        let bulk_stmt = client.prepare(&stmt);
353
354        match bulk_stmt {
355            Err(err) => {
356                return Err(AccountsDbPluginError::Custom(Box::new(AccountsDbPluginPostgresError::DataSchemaError {
357                    msg: format!(
358                        "Error in preparing for the accounts update PostgreSQL database: {} host: {:?} user: {:?} config: {:?}",
359                        err, config.host, config.user, config
360                    ),
361                })));
362            }
363            Ok(update_account_stmt) => Ok(update_account_stmt),
364        }
365    }
366
367    fn build_single_account_upsert_statement(
368        client: &mut Client,
369        config: &AccountsDbPluginPostgresConfig,
370    ) -> Result<Statement, AccountsDbPluginError> {
371        let stmt = "INSERT INTO account AS acct (pubkey, slot, owner, lamports, executable, rent_epoch, data, write_version, updated_on) \
372        VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) \
373        ON CONFLICT (pubkey) DO UPDATE SET slot=excluded.slot, owner=excluded.owner, lamports=excluded.lamports, executable=excluded.executable, rent_epoch=excluded.rent_epoch, \
374        data=excluded.data, write_version=excluded.write_version, updated_on=excluded.updated_on  WHERE acct.slot < excluded.slot OR (\
375        acct.slot = excluded.slot AND acct.write_version < excluded.write_version)";
376
377        let stmt = client.prepare(stmt);
378
379        match stmt {
380            Err(err) => {
381                return Err(AccountsDbPluginError::Custom(Box::new(AccountsDbPluginPostgresError::DataSchemaError {
382                    msg: format!(
383                        "Error in preparing for the accounts update PostgreSQL database: {} host: {:?} user: {:?} config: {:?}",
384                        err, config.host, config.user, config
385                    ),
386                })));
387            }
388            Ok(update_account_stmt) => Ok(update_account_stmt),
389        }
390    }
391
392    fn build_account_audit_insert_statement(
393        client: &mut Client,
394        config: &AccountsDbPluginPostgresConfig,
395    ) -> Result<Statement, AccountsDbPluginError> {
396        let stmt = "INSERT INTO account_audit (pubkey, slot, owner, lamports, executable, rent_epoch, data, write_version, updated_on) \
397        VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)";
398
399        let stmt = client.prepare(stmt);
400
401        match stmt {
402            Err(err) => {
403                return Err(AccountsDbPluginError::Custom(Box::new(AccountsDbPluginPostgresError::DataSchemaError {
404                    msg: format!(
405                        "Error in preparing for the account_audit update PostgreSQL database: {} host: {:?} user: {:?} config: {:?}",
406                        err, config.host, config.user, config
407                    ),
408                })));
409            }
410            Ok(stmt) => Ok(stmt),
411        }
412    }
413
414    fn build_slot_upsert_statement_with_parent(
415        client: &mut Client,
416        config: &AccountsDbPluginPostgresConfig,
417    ) -> Result<Statement, AccountsDbPluginError> {
418        let stmt = "INSERT INTO slot (slot, parent, status, updated_on) \
419        VALUES ($1, $2, $3, $4) \
420        ON CONFLICT (slot) DO UPDATE SET parent=excluded.parent, status=excluded.status, updated_on=excluded.updated_on";
421
422        let stmt = client.prepare(stmt);
423
424        match stmt {
425            Err(err) => {
426                return Err(AccountsDbPluginError::Custom(Box::new(AccountsDbPluginPostgresError::DataSchemaError {
427                    msg: format!(
428                        "Error in preparing for the slot update PostgreSQL database: {} host: {:?} user: {:?} config: {:?}",
429                        err, config.host, config.user, config
430                    ),
431                })));
432            }
433            Ok(stmt) => Ok(stmt),
434        }
435    }
436
437    fn build_slot_upsert_statement_without_parent(
438        client: &mut Client,
439        config: &AccountsDbPluginPostgresConfig,
440    ) -> Result<Statement, AccountsDbPluginError> {
441        let stmt = "INSERT INTO slot (slot, status, updated_on) \
442        VALUES ($1, $2, $3) \
443        ON CONFLICT (slot) DO UPDATE SET status=excluded.status, updated_on=excluded.updated_on";
444
445        let stmt = client.prepare(stmt);
446
447        match stmt {
448            Err(err) => {
449                return Err(AccountsDbPluginError::Custom(Box::new(AccountsDbPluginPostgresError::DataSchemaError {
450                    msg: format!(
451                        "Error in preparing for the slot update PostgreSQL database: {} host: {:?} user: {:?} config: {:?}",
452                        err, config.host, config.user, config
453                    ),
454                })));
455            }
456            Ok(stmt) => Ok(stmt),
457        }
458    }
459
460    /// Internal function for inserting an account into account_audit table.
461    fn insert_account_audit(
462        account: &DbAccountInfo,
463        statement: &Statement,
464        client: &mut Client,
465    ) -> Result<(), AccountsDbPluginError> {
466        let lamports = account.lamports() as i64;
467        let rent_epoch = account.rent_epoch() as i64;
468        let updated_on = Utc::now().naive_utc();
469        let result = client.execute(
470            statement,
471            &[
472                &account.pubkey(),
473                &account.slot,
474                &account.owner(),
475                &lamports,
476                &account.executable(),
477                &rent_epoch,
478                &account.data(),
479                &account.write_version(),
480                &updated_on,
481            ],
482        );
483
484        if let Err(err) = result {
485            let msg = format!(
486                "Failed to persist the insert of account_audit to the PostgreSQL database. Error: {:?}",
487                err
488            );
489            error!("{}", msg);
490            return Err(AccountsDbPluginError::AccountsUpdateError { msg });
491        }
492        Ok(())
493    }
494
495    /// Internal function for updating or inserting a single account
496    fn upsert_account_internal(
497        account: &DbAccountInfo,
498        statement: &Statement,
499        client: &mut Client,
500        insert_account_audit_stmt: &Option<Statement>,
501    ) -> Result<(), AccountsDbPluginError> {
502        let lamports = account.lamports() as i64;
503        let rent_epoch = account.rent_epoch() as i64;
504        let updated_on = Utc::now().naive_utc();
505        let result = client.execute(
506            statement,
507            &[
508                &account.pubkey(),
509                &account.slot,
510                &account.owner(),
511                &lamports,
512                &account.executable(),
513                &rent_epoch,
514                &account.data(),
515                &account.write_version(),
516                &updated_on,
517            ],
518        );
519
520        if let Err(err) = result {
521            let msg = format!(
522                "Failed to persist the update of account to the PostgreSQL database. Error: {:?}",
523                err
524            );
525            error!("{}", msg);
526            return Err(AccountsDbPluginError::AccountsUpdateError { msg });
527        } else if result.unwrap() == 0 && insert_account_audit_stmt.is_some() {
528            // If no records modified (inserted or updated), it is because the account is updated
529            // at an older slot, insert the record directly into the account_audit table.
530            let statement = insert_account_audit_stmt.as_ref().unwrap();
531            Self::insert_account_audit(account, statement, client)?;
532        }
533
534        Ok(())
535    }
536
537    /// Update or insert a single account
538    fn upsert_account(&mut self, account: &DbAccountInfo) -> Result<(), AccountsDbPluginError> {
539        let client = self.client.get_mut().unwrap();
540        let insert_account_audit_stmt = &client.insert_account_audit_stmt;
541        let statement = &client.update_account_stmt;
542        let client = &mut client.client;
543        Self::upsert_account_internal(account, statement, client, insert_account_audit_stmt)
544    }
545
546    /// Insert accounts in batch to reduce network overhead
547    fn insert_accounts_in_batch(
548        &mut self,
549        account: DbAccountInfo,
550    ) -> Result<(), AccountsDbPluginError> {
551        self.pending_account_updates.push(account);
552
553        if self.pending_account_updates.len() == self.batch_size {
554            let mut measure = Measure::start("accountsdb-plugin-postgres-prepare-values");
555
556            let mut values: Vec<&(dyn types::ToSql + Sync)> =
557                Vec::with_capacity(self.batch_size * ACCOUNT_COLUMN_COUNT);
558            let updated_on = Utc::now().naive_utc();
559            for j in 0..self.batch_size {
560                let account = &self.pending_account_updates[j];
561
562                values.push(&account.pubkey);
563                values.push(&account.slot);
564                values.push(&account.owner);
565                values.push(&account.lamports);
566                values.push(&account.executable);
567                values.push(&account.rent_epoch);
568                values.push(&account.data);
569                values.push(&account.write_version);
570                values.push(&updated_on);
571            }
572            measure.stop();
573            inc_new_counter_debug!(
574                "accountsdb-plugin-postgres-prepare-values-us",
575                measure.as_us() as usize,
576                10000,
577                10000
578            );
579
580            let mut measure = Measure::start("accountsdb-plugin-postgres-update-account");
581            let client = self.client.get_mut().unwrap();
582            let result = client
583                .client
584                .query(&client.bulk_account_insert_stmt, &values);
585
586            self.pending_account_updates.clear();
587            if let Err(err) = result {
588                let msg = format!(
589                    "Failed to persist the update of account to the PostgreSQL database. Error: {:?}",
590                    err
591                );
592                error!("{}", msg);
593                return Err(AccountsDbPluginError::AccountsUpdateError { msg });
594            }
595            measure.stop();
596            inc_new_counter_debug!(
597                "accountsdb-plugin-postgres-update-account-us",
598                measure.as_us() as usize,
599                10000,
600                10000
601            );
602            inc_new_counter_debug!(
603                "accountsdb-plugin-postgres-update-account-count",
604                self.batch_size,
605                10000,
606                10000
607            );
608        }
609        Ok(())
610    }
611
612    /// Flush any left over accounts in batch which are not processed in the last batch
613    fn flush_buffered_writes(&mut self) -> Result<(), AccountsDbPluginError> {
614        if self.pending_account_updates.is_empty() {
615            return Ok(());
616        }
617
618        let client = self.client.get_mut().unwrap();
619        let insert_account_audit_stmt = &client.insert_account_audit_stmt;
620        let statement = &client.update_account_stmt;
621        let client = &mut client.client;
622
623        for account in self.pending_account_updates.drain(..) {
624            Self::upsert_account_internal(&account, statement, client, insert_account_audit_stmt)?;
625        }
626
627        Ok(())
628    }
629
630    pub fn new(config: &AccountsDbPluginPostgresConfig) -> Result<Self, AccountsDbPluginError> {
631        info!("Creating SimplePostgresClient...");
632        let mut client = Self::connect_to_db(config)?;
633        let bulk_account_insert_stmt =
634            Self::build_bulk_account_insert_statement(&mut client, config)?;
635        let update_account_stmt = Self::build_single_account_upsert_statement(&mut client, config)?;
636
637        let update_slot_with_parent_stmt =
638            Self::build_slot_upsert_statement_with_parent(&mut client, config)?;
639        let update_slot_without_parent_stmt =
640            Self::build_slot_upsert_statement_without_parent(&mut client, config)?;
641        let update_transaction_log_stmt =
642            Self::build_transaction_info_upsert_statement(&mut client, config)?;
643        let update_block_metadata_stmt =
644            Self::build_block_metadata_upsert_statement(&mut client, config)?;
645
646        let batch_size = config
647            .batch_size
648            .unwrap_or(DEFAULT_ACCOUNTS_INSERT_BATCH_SIZE);
649
650        let store_account_historical_data = config
651            .store_account_historical_data
652            .unwrap_or(DEFAULT_STORE_ACCOUNT_HISTORICAL_DATA);
653
654        let insert_account_audit_stmt = if store_account_historical_data {
655            let stmt = Self::build_account_audit_insert_statement(&mut client, config)?;
656            Some(stmt)
657        } else {
658            None
659        };
660
661        info!("Created SimplePostgresClient.");
662        Ok(Self {
663            batch_size,
664            pending_account_updates: Vec::with_capacity(batch_size),
665            client: Mutex::new(PostgresSqlClientWrapper {
666                client,
667                update_account_stmt,
668                bulk_account_insert_stmt,
669                update_slot_with_parent_stmt,
670                update_slot_without_parent_stmt,
671                update_transaction_log_stmt,
672                update_block_metadata_stmt,
673                insert_account_audit_stmt,
674            }),
675        })
676    }
677}
678
679impl PostgresClient for SimplePostgresClient {
680    fn update_account(
681        &mut self,
682        account: DbAccountInfo,
683        is_startup: bool,
684    ) -> Result<(), AccountsDbPluginError> {
685        trace!(
686            "Updating account {} with owner {} at slot {}",
687            bs58::encode(account.pubkey()).into_string(),
688            bs58::encode(account.owner()).into_string(),
689            account.slot,
690        );
691        if !is_startup {
692            return self.upsert_account(&account);
693        }
694        self.insert_accounts_in_batch(account)
695    }
696
697    fn update_slot_status(
698        &mut self,
699        slot: u64,
700        parent: Option<u64>,
701        status: SlotStatus,
702    ) -> Result<(), AccountsDbPluginError> {
703        info!("Updating slot {:?} at with status {:?}", slot, status);
704
705        let slot = slot as i64; // postgres only supports i64
706        let parent = parent.map(|parent| parent as i64);
707        let updated_on = Utc::now().naive_utc();
708        let status_str = status.as_str();
709        let client = self.client.get_mut().unwrap();
710
711        let result = match parent {
712            Some(parent) => client.client.execute(
713                &client.update_slot_with_parent_stmt,
714                &[&slot, &parent, &status_str, &updated_on],
715            ),
716            None => client.client.execute(
717                &client.update_slot_without_parent_stmt,
718                &[&slot, &status_str, &updated_on],
719            ),
720        };
721
722        match result {
723            Err(err) => {
724                let msg = format!(
725                    "Failed to persist the update of slot to the PostgreSQL database. Error: {:?}",
726                    err
727                );
728                error!("{:?}", msg);
729                return Err(AccountsDbPluginError::SlotStatusUpdateError { msg });
730            }
731            Ok(rows) => {
732                assert_eq!(1, rows, "Expected one rows to be updated a time");
733            }
734        }
735
736        Ok(())
737    }
738
739    fn notify_end_of_startup(&mut self) -> Result<(), AccountsDbPluginError> {
740        self.flush_buffered_writes()
741    }
742
743    fn log_transaction(
744        &mut self,
745        transaction_log_info: LogTransactionRequest,
746    ) -> Result<(), AccountsDbPluginError> {
747        self.log_transaction_impl(transaction_log_info)
748    }
749
750    fn update_block_metadata(
751        &mut self,
752        block_info: UpdateBlockMetadataRequest,
753    ) -> Result<(), AccountsDbPluginError> {
754        self.update_block_metadata_impl(block_info)
755    }
756}
757
758struct UpdateAccountRequest {
759    account: DbAccountInfo,
760    is_startup: bool,
761}
762
763struct UpdateSlotRequest {
764    slot: u64,
765    parent: Option<u64>,
766    slot_status: SlotStatus,
767}
768
769pub struct UpdateBlockMetadataRequest {
770    pub block_info: DbBlockInfo,
771}
772
773#[warn(clippy::large_enum_variant)]
774enum DbWorkItem {
775    UpdateAccount(Box<UpdateAccountRequest>),
776    UpdateSlot(Box<UpdateSlotRequest>),
777    LogTransaction(Box<LogTransactionRequest>),
778    UpdateBlockMetadata(Box<UpdateBlockMetadataRequest>),
779}
780
781impl PostgresClientWorker {
782    fn new(config: AccountsDbPluginPostgresConfig) -> Result<Self, AccountsDbPluginError> {
783        let result = SimplePostgresClient::new(&config);
784        match result {
785            Ok(client) => Ok(PostgresClientWorker {
786                client,
787                is_startup_done: false,
788            }),
789            Err(err) => {
790                error!("Error in creating SimplePostgresClient: {}", err);
791                Err(err)
792            }
793        }
794    }
795
796    fn do_work(
797        &mut self,
798        receiver: Receiver<DbWorkItem>,
799        exit_worker: Arc<AtomicBool>,
800        is_startup_done: Arc<AtomicBool>,
801        startup_done_count: Arc<AtomicUsize>,
802        panic_on_db_errors: bool,
803    ) -> Result<(), AccountsDbPluginError> {
804        while !exit_worker.load(Ordering::Relaxed) {
805            let mut measure = Measure::start("accountsdb-plugin-postgres-worker-recv");
806            let work = receiver.recv_timeout(Duration::from_millis(500));
807            measure.stop();
808            inc_new_counter_debug!(
809                "accountsdb-plugin-postgres-worker-recv-us",
810                measure.as_us() as usize,
811                100000,
812                100000
813            );
814            match work {
815                Ok(work) => match work {
816                    DbWorkItem::UpdateAccount(request) => {
817                        if let Err(err) = self
818                            .client
819                            .update_account(request.account, request.is_startup)
820                        {
821                            error!("Failed to update account: ({})", err);
822                            if panic_on_db_errors {
823                                abort();
824                            }
825                        }
826                    }
827                    DbWorkItem::UpdateSlot(request) => {
828                        if let Err(err) = self.client.update_slot_status(
829                            request.slot,
830                            request.parent,
831                            request.slot_status,
832                        ) {
833                            error!("Failed to update slot: ({})", err);
834                            if panic_on_db_errors {
835                                abort();
836                            }
837                        }
838                    }
839                    DbWorkItem::LogTransaction(transaction_log_info) => {
840                        if let Err(err) = self.client.log_transaction(*transaction_log_info) {
841                            error!("Failed to update transaction: ({})", err);
842                            if panic_on_db_errors {
843                                abort();
844                            }
845                        }
846                    }
847                    DbWorkItem::UpdateBlockMetadata(block_info) => {
848                        if let Err(err) = self.client.update_block_metadata(*block_info) {
849                            error!("Failed to update block metadata: ({})", err);
850                            if panic_on_db_errors {
851                                abort();
852                            }
853                        }
854                    }
855                },
856                Err(err) => match err {
857                    RecvTimeoutError::Timeout => {
858                        if !self.is_startup_done && is_startup_done.load(Ordering::Relaxed) {
859                            if let Err(err) = self.client.notify_end_of_startup() {
860                                error!("Error in notifying end of startup: ({})", err);
861                                if panic_on_db_errors {
862                                    abort();
863                                }
864                            }
865                            self.is_startup_done = true;
866                            startup_done_count.fetch_add(1, Ordering::Relaxed);
867                        }
868
869                        continue;
870                    }
871                    _ => {
872                        error!("Error in receiving the item {:?}", err);
873                        if panic_on_db_errors {
874                            abort();
875                        }
876                        break;
877                    }
878                },
879            }
880        }
881        Ok(())
882    }
883}
884pub struct ParallelPostgresClient {
885    workers: Vec<JoinHandle<Result<(), AccountsDbPluginError>>>,
886    exit_worker: Arc<AtomicBool>,
887    is_startup_done: Arc<AtomicBool>,
888    startup_done_count: Arc<AtomicUsize>,
889    initialized_worker_count: Arc<AtomicUsize>,
890    sender: Sender<DbWorkItem>,
891    last_report: AtomicInterval,
892}
893
894impl ParallelPostgresClient {
895    pub fn new(config: &AccountsDbPluginPostgresConfig) -> Result<Self, AccountsDbPluginError> {
896        info!("Creating ParallelPostgresClient...");
897        let (sender, receiver) = bounded(MAX_ASYNC_REQUESTS);
898        let exit_worker = Arc::new(AtomicBool::new(false));
899        let mut workers = Vec::default();
900        let is_startup_done = Arc::new(AtomicBool::new(false));
901        let startup_done_count = Arc::new(AtomicUsize::new(0));
902        let worker_count = config.threads.unwrap_or(DEFAULT_THREADS_COUNT);
903        let initialized_worker_count = Arc::new(AtomicUsize::new(0));
904        for i in 0..worker_count {
905            let cloned_receiver = receiver.clone();
906            let exit_clone = exit_worker.clone();
907            let is_startup_done_clone = is_startup_done.clone();
908            let startup_done_count_clone = startup_done_count.clone();
909            let initialized_worker_count_clone = initialized_worker_count.clone();
910            let config = config.clone();
911            let worker = Builder::new()
912                .name(format!("worker-{}", i))
913                .spawn(move || -> Result<(), AccountsDbPluginError> {
914                    let panic_on_db_errors = *config
915                        .panic_on_db_errors
916                        .as_ref()
917                        .unwrap_or(&DEFAULT_PANIC_ON_DB_ERROR);
918                    let result = PostgresClientWorker::new(config);
919
920                    match result {
921                        Ok(mut worker) => {
922                            initialized_worker_count_clone.fetch_add(1, Ordering::Relaxed);
923                            worker.do_work(
924                                cloned_receiver,
925                                exit_clone,
926                                is_startup_done_clone,
927                                startup_done_count_clone,
928                                panic_on_db_errors,
929                            )?;
930                            Ok(())
931                        }
932                        Err(err) => {
933                            error!("Error when making connection to database: ({})", err);
934                            if panic_on_db_errors {
935                                abort();
936                            }
937                            Err(err)
938                        }
939                    }
940                })
941                .unwrap();
942
943            workers.push(worker);
944        }
945
946        info!("Created ParallelPostgresClient.");
947        Ok(Self {
948            last_report: AtomicInterval::default(),
949            workers,
950            exit_worker,
951            is_startup_done,
952            startup_done_count,
953            initialized_worker_count,
954            sender,
955        })
956    }
957
958    pub fn join(&mut self) -> thread::Result<()> {
959        self.exit_worker.store(true, Ordering::Relaxed);
960        while !self.workers.is_empty() {
961            let worker = self.workers.pop();
962            if worker.is_none() {
963                break;
964            }
965            let worker = worker.unwrap();
966            let result = worker.join().unwrap();
967            if result.is_err() {
968                error!("The worker thread has failed: {:?}", result);
969            }
970        }
971
972        Ok(())
973    }
974
975    pub fn update_account(
976        &mut self,
977        account: &ReplicaAccountInfo,
978        slot: u64,
979        is_startup: bool,
980    ) -> Result<(), AccountsDbPluginError> {
981        if self.last_report.should_update(30000) {
982            datapoint_debug!(
983                "postgres-plugin-stats",
984                ("message-queue-length", self.sender.len() as i64, i64),
985            );
986        }
987        let mut measure = Measure::start("accountsdb-plugin-posgres-create-work-item");
988        let wrk_item = DbWorkItem::UpdateAccount(Box::new(UpdateAccountRequest {
989            account: DbAccountInfo::new(account, slot),
990            is_startup,
991        }));
992
993        measure.stop();
994
995        inc_new_counter_debug!(
996            "accountsdb-plugin-posgres-create-work-item-us",
997            measure.as_us() as usize,
998            100000,
999            100000
1000        );
1001
1002        let mut measure = Measure::start("accountsdb-plugin-posgres-send-msg");
1003
1004        if let Err(err) = self.sender.send(wrk_item) {
1005            return Err(AccountsDbPluginError::AccountsUpdateError {
1006                msg: format!(
1007                    "Failed to update the account {:?}, error: {:?}",
1008                    bs58::encode(account.pubkey()).into_string(),
1009                    err
1010                ),
1011            });
1012        }
1013
1014        measure.stop();
1015        inc_new_counter_debug!(
1016            "accountsdb-plugin-posgres-send-msg-us",
1017            measure.as_us() as usize,
1018            100000,
1019            100000
1020        );
1021
1022        Ok(())
1023    }
1024
1025    pub fn update_slot_status(
1026        &mut self,
1027        slot: u64,
1028        parent: Option<u64>,
1029        status: SlotStatus,
1030    ) -> Result<(), AccountsDbPluginError> {
1031        if let Err(err) = self
1032            .sender
1033            .send(DbWorkItem::UpdateSlot(Box::new(UpdateSlotRequest {
1034                slot,
1035                parent,
1036                slot_status: status,
1037            })))
1038        {
1039            return Err(AccountsDbPluginError::SlotStatusUpdateError {
1040                msg: format!("Failed to update the slot {:?}, error: {:?}", slot, err),
1041            });
1042        }
1043        Ok(())
1044    }
1045
1046    pub fn update_block_metadata(
1047        &mut self,
1048        block_info: &ReplicaBlockInfo,
1049    ) -> Result<(), AccountsDbPluginError> {
1050        if let Err(err) = self.sender.send(DbWorkItem::UpdateBlockMetadata(Box::new(
1051            UpdateBlockMetadataRequest {
1052                block_info: DbBlockInfo::from(block_info),
1053            },
1054        ))) {
1055            return Err(AccountsDbPluginError::SlotStatusUpdateError {
1056                msg: format!(
1057                    "Failed to update the block metadata at slot {:?}, error: {:?}",
1058                    block_info.slot, err
1059                ),
1060            });
1061        }
1062        Ok(())
1063    }
1064
1065    pub fn notify_end_of_startup(&mut self) -> Result<(), AccountsDbPluginError> {
1066        info!("Notifying the end of startup");
1067        // Ensure all items in the queue has been received by the workers
1068        while !self.sender.is_empty() {
1069            sleep(Duration::from_millis(100));
1070        }
1071        self.is_startup_done.store(true, Ordering::Relaxed);
1072
1073        // Wait for all worker threads to be done with flushing
1074        while self.startup_done_count.load(Ordering::Relaxed)
1075            != self.initialized_worker_count.load(Ordering::Relaxed)
1076        {
1077            info!(
1078                "Startup done count: {}, good worker thread count: {}",
1079                self.startup_done_count.load(Ordering::Relaxed),
1080                self.initialized_worker_count.load(Ordering::Relaxed)
1081            );
1082            sleep(Duration::from_millis(100));
1083        }
1084
1085        info!("Done with notifying the end of startup");
1086        Ok(())
1087    }
1088}
1089
1090pub struct PostgresClientBuilder {}
1091
1092impl PostgresClientBuilder {
1093    pub fn build_pararallel_postgres_client(
1094        config: &AccountsDbPluginPostgresConfig,
1095    ) -> Result<ParallelPostgresClient, AccountsDbPluginError> {
1096        ParallelPostgresClient::new(config)
1097    }
1098
1099    pub fn build_simple_postgres_client(
1100        config: &AccountsDbPluginPostgresConfig,
1101    ) -> Result<SimplePostgresClient, AccountsDbPluginError> {
1102        SimplePostgresClient::new(config)
1103    }
1104}