solana_geyser_plugin_postgres/
postgres_client.rs

1#![allow(clippy::integer_arithmetic)]
2
3mod postgres_client_account_index;
4mod postgres_client_block_metadata;
5mod postgres_client_transaction;
6
7/// A concurrent implementation for writing accounts into the PostgreSQL in parallel.
8use {
9    crate::{
10        geyser_plugin_postgres::{GeyserPluginPostgresConfig, GeyserPluginPostgresError},
11        postgres_client::postgres_client_account_index::TokenSecondaryIndexEntry,
12    },
13    chrono::Utc,
14    crossbeam_channel::{bounded, Receiver, RecvTimeoutError, Sender},
15    log::*,
16    openssl::ssl::{SslConnector, SslFiletype, SslMethod},
17    postgres::{Client, NoTls, Statement},
18    postgres_client_block_metadata::DbBlockInfo,
19    postgres_client_transaction::LogTransactionRequest,
20    postgres_openssl::MakeTlsConnector,
21    solana_geyser_plugin_interface::geyser_plugin_interface::{
22        GeyserPluginError, ReplicaAccountInfo, ReplicaBlockInfo, SlotStatus,
23    },
24    solana_measure::measure::Measure,
25    solana_metrics::*,
26    solana_sdk::timing::AtomicInterval,
27    std::{
28        collections::HashSet,
29        sync::{
30            atomic::{AtomicBool, AtomicUsize, Ordering},
31            Arc, Mutex,
32        },
33        thread::{self, sleep, Builder, JoinHandle},
34        time::Duration,
35    },
36    tokio_postgres::types,
37};
38
39/// The maximum asynchronous requests allowed in the channel to avoid excessive
40/// memory usage. The downside -- calls after this threshold is reached can get blocked.
41const MAX_ASYNC_REQUESTS: usize = 40960;
42const DEFAULT_POSTGRES_PORT: u16 = 5432;
43const DEFAULT_THREADS_COUNT: usize = 100;
44const DEFAULT_ACCOUNTS_INSERT_BATCH_SIZE: usize = 10;
45const ACCOUNT_COLUMN_COUNT: usize = 9;
46const DEFAULT_PANIC_ON_DB_ERROR: bool = false;
47const DEFAULT_STORE_ACCOUNT_HISTORICAL_DATA: bool = false;
48
49struct PostgresSqlClientWrapper {
50    client: Client,
51    update_account_stmt: Statement,
52    bulk_account_insert_stmt: Statement,
53    update_slot_with_parent_stmt: Statement,
54    update_slot_without_parent_stmt: Statement,
55    update_transaction_log_stmt: Statement,
56    update_block_metadata_stmt: Statement,
57    insert_account_audit_stmt: Option<Statement>,
58    insert_token_owner_index_stmt: Option<Statement>,
59    insert_token_mint_index_stmt: Option<Statement>,
60    bulk_insert_token_owner_index_stmt: Option<Statement>,
61    bulk_insert_token_mint_index_stmt: Option<Statement>,
62}
63
64pub struct SimplePostgresClient {
65    batch_size: usize,
66    slots_at_startup: HashSet<u64>,
67    pending_account_updates: Vec<DbAccountInfo>,
68    index_token_owner: bool,
69    index_token_mint: bool,
70    pending_token_owner_index: Vec<TokenSecondaryIndexEntry>,
71    pending_token_mint_index: Vec<TokenSecondaryIndexEntry>,
72    client: Mutex<PostgresSqlClientWrapper>,
73}
74
75struct PostgresClientWorker {
76    client: SimplePostgresClient,
77    /// Indicating if accounts notification during startup is done.
78    is_startup_done: bool,
79}
80
81impl Eq for DbAccountInfo {}
82
83#[derive(Clone, PartialEq, Debug)]
84pub struct DbAccountInfo {
85    pub pubkey: Vec<u8>,
86    pub lamports: i64,
87    pub owner: Vec<u8>,
88    pub executable: bool,
89    pub rent_epoch: i64,
90    pub data: Vec<u8>,
91    pub slot: i64,
92    pub write_version: i64,
93}
94
95pub(crate) fn abort() -> ! {
96    #[cfg(not(test))]
97    {
98        // standard error is usually redirected to a log file, cry for help on standard output as
99        // well
100        eprintln!("Validator process aborted. The validator log may contain further details");
101        std::process::exit(1);
102    }
103
104    #[cfg(test)]
105    panic!("process::exit(1) is intercepted for friendly test failure...");
106}
107
108impl DbAccountInfo {
109    fn new<T: ReadableAccountInfo>(account: &T, slot: u64) -> DbAccountInfo {
110        let data = account.data().to_vec();
111        Self {
112            pubkey: account.pubkey().to_vec(),
113            lamports: account.lamports() as i64,
114            owner: account.owner().to_vec(),
115            executable: account.executable(),
116            rent_epoch: account.rent_epoch() as i64,
117            data,
118            slot: slot as i64,
119            write_version: account.write_version(),
120        }
121    }
122}
123
124pub trait ReadableAccountInfo: Sized {
125    fn pubkey(&self) -> &[u8];
126    fn owner(&self) -> &[u8];
127    fn lamports(&self) -> i64;
128    fn executable(&self) -> bool;
129    fn rent_epoch(&self) -> i64;
130    fn data(&self) -> &[u8];
131    fn write_version(&self) -> i64;
132}
133
134impl ReadableAccountInfo for DbAccountInfo {
135    fn pubkey(&self) -> &[u8] {
136        &self.pubkey
137    }
138
139    fn owner(&self) -> &[u8] {
140        &self.owner
141    }
142
143    fn lamports(&self) -> i64 {
144        self.lamports
145    }
146
147    fn executable(&self) -> bool {
148        self.executable
149    }
150
151    fn rent_epoch(&self) -> i64 {
152        self.rent_epoch
153    }
154
155    fn data(&self) -> &[u8] {
156        &self.data
157    }
158
159    fn write_version(&self) -> i64 {
160        self.write_version
161    }
162}
163
164impl<'a> ReadableAccountInfo for ReplicaAccountInfo<'a> {
165    fn pubkey(&self) -> &[u8] {
166        self.pubkey
167    }
168
169    fn owner(&self) -> &[u8] {
170        self.owner
171    }
172
173    fn lamports(&self) -> i64 {
174        self.lamports as i64
175    }
176
177    fn executable(&self) -> bool {
178        self.executable
179    }
180
181    fn rent_epoch(&self) -> i64 {
182        self.rent_epoch as i64
183    }
184
185    fn data(&self) -> &[u8] {
186        self.data
187    }
188
189    fn write_version(&self) -> i64 {
190        self.write_version as i64
191    }
192}
193
194pub trait PostgresClient {
195    fn join(&mut self) -> thread::Result<()> {
196        Ok(())
197    }
198
199    fn update_account(
200        &mut self,
201        account: DbAccountInfo,
202        is_startup: bool,
203    ) -> Result<(), GeyserPluginError>;
204
205    fn update_slot_status(
206        &mut self,
207        slot: u64,
208        parent: Option<u64>,
209        status: SlotStatus,
210    ) -> Result<(), GeyserPluginError>;
211
212    fn notify_end_of_startup(&mut self) -> Result<(), GeyserPluginError>;
213
214    fn log_transaction(
215        &mut self,
216        transaction_log_info: LogTransactionRequest,
217    ) -> Result<(), GeyserPluginError>;
218
219    fn update_block_metadata(
220        &mut self,
221        block_info: UpdateBlockMetadataRequest,
222    ) -> Result<(), GeyserPluginError>;
223}
224
225impl SimplePostgresClient {
226    pub fn connect_to_db(config: &GeyserPluginPostgresConfig) -> Result<Client, GeyserPluginError> {
227        let port = config.port.unwrap_or(DEFAULT_POSTGRES_PORT);
228
229        let connection_str = if let Some(connection_str) = &config.connection_str {
230            connection_str.clone()
231        } else {
232            if config.host.is_none() || config.user.is_none() {
233                let msg = format!(
234                    "\"connection_str\": {:?}, or \"host\": {:?} \"user\": {:?} must be specified",
235                    config.connection_str, config.host, config.user
236                );
237                return Err(GeyserPluginError::Custom(Box::new(
238                    GeyserPluginPostgresError::ConfigurationError { msg },
239                )));
240            }
241            format!(
242                "host={} user={} port={}",
243                config.host.as_ref().unwrap(),
244                config.user.as_ref().unwrap(),
245                port
246            )
247        };
248
249        let result = if let Some(true) = config.use_ssl {
250            if config.server_ca.is_none() {
251                let msg = "\"server_ca\" must be specified when \"use_ssl\" is set".to_string();
252                return Err(GeyserPluginError::Custom(Box::new(
253                    GeyserPluginPostgresError::ConfigurationError { msg },
254                )));
255            }
256            if config.client_cert.is_none() {
257                let msg = "\"client_cert\" must be specified when \"use_ssl\" is set".to_string();
258                return Err(GeyserPluginError::Custom(Box::new(
259                    GeyserPluginPostgresError::ConfigurationError { msg },
260                )));
261            }
262            if config.client_key.is_none() {
263                let msg = "\"client_key\" must be specified when \"use_ssl\" is set".to_string();
264                return Err(GeyserPluginError::Custom(Box::new(
265                    GeyserPluginPostgresError::ConfigurationError { msg },
266                )));
267            }
268            let mut builder = SslConnector::builder(SslMethod::tls()).unwrap();
269            if let Err(err) = builder.set_ca_file(config.server_ca.as_ref().unwrap()) {
270                let msg = format!(
271                    "Failed to set the server certificate specified by \"server_ca\": {}. Error: ({})",
272                    config.server_ca.as_ref().unwrap(), err);
273                return Err(GeyserPluginError::Custom(Box::new(
274                    GeyserPluginPostgresError::ConfigurationError { msg },
275                )));
276            }
277            if let Err(err) =
278                builder.set_certificate_file(config.client_cert.as_ref().unwrap(), SslFiletype::PEM)
279            {
280                let msg = format!(
281                    "Failed to set the client certificate specified by \"client_cert\": {}. Error: ({})",
282                    config.client_cert.as_ref().unwrap(), err);
283                return Err(GeyserPluginError::Custom(Box::new(
284                    GeyserPluginPostgresError::ConfigurationError { msg },
285                )));
286            }
287            if let Err(err) =
288                builder.set_private_key_file(config.client_key.as_ref().unwrap(), SslFiletype::PEM)
289            {
290                let msg = format!(
291                    "Failed to set the client key specified by \"client_key\": {}. Error: ({})",
292                    config.client_key.as_ref().unwrap(),
293                    err
294                );
295                return Err(GeyserPluginError::Custom(Box::new(
296                    GeyserPluginPostgresError::ConfigurationError { msg },
297                )));
298            }
299
300            let mut connector = MakeTlsConnector::new(builder.build());
301            connector.set_callback(|connect_config, _domain| {
302                connect_config.set_verify_hostname(false);
303                Ok(())
304            });
305            Client::connect(&connection_str, connector)
306        } else {
307            Client::connect(&connection_str, NoTls)
308        };
309
310        match result {
311            Err(err) => {
312                let msg = format!(
313                    "Error in connecting to the PostgreSQL database: {:?} connection_str: {:?}",
314                    err, connection_str
315                );
316                error!("{}", msg);
317                Err(GeyserPluginError::Custom(Box::new(
318                    GeyserPluginPostgresError::DataStoreConnectionError { msg },
319                )))
320            }
321            Ok(client) => Ok(client),
322        }
323    }
324
325    fn build_bulk_account_insert_statement(
326        client: &mut Client,
327        config: &GeyserPluginPostgresConfig,
328    ) -> Result<Statement, GeyserPluginError> {
329        let batch_size = config
330            .batch_size
331            .unwrap_or(DEFAULT_ACCOUNTS_INSERT_BATCH_SIZE);
332        let mut stmt = String::from("INSERT INTO account AS acct (pubkey, slot, owner, lamports, executable, rent_epoch, data, write_version, updated_on) VALUES");
333        for j in 0..batch_size {
334            let row = j * ACCOUNT_COLUMN_COUNT;
335            let val_str = format!(
336                "(${}, ${}, ${}, ${}, ${}, ${}, ${}, ${}, ${})",
337                row + 1,
338                row + 2,
339                row + 3,
340                row + 4,
341                row + 5,
342                row + 6,
343                row + 7,
344                row + 8,
345                row + 9,
346            );
347
348            if j == 0 {
349                stmt = format!("{} {}", &stmt, val_str);
350            } else {
351                stmt = format!("{}, {}", &stmt, val_str);
352            }
353        }
354
355        let handle_conflict = "ON CONFLICT (pubkey) DO UPDATE SET slot=excluded.slot, owner=excluded.owner, lamports=excluded.lamports, executable=excluded.executable, rent_epoch=excluded.rent_epoch, \
356            data=excluded.data, write_version=excluded.write_version, updated_on=excluded.updated_on WHERE acct.slot < excluded.slot OR (\
357            acct.slot = excluded.slot AND acct.write_version < excluded.write_version)";
358
359        stmt = format!("{} {}", stmt, handle_conflict);
360
361        info!("{}", stmt);
362        let bulk_stmt = client.prepare(&stmt);
363
364        match bulk_stmt {
365            Err(err) => {
366                return Err(GeyserPluginError::Custom(Box::new(GeyserPluginPostgresError::DataSchemaError {
367                    msg: format!(
368                        "Error in preparing for the accounts update PostgreSQL database: {} host: {:?} user: {:?} config: {:?}",
369                        err, config.host, config.user, config
370                    ),
371                })));
372            }
373            Ok(update_account_stmt) => Ok(update_account_stmt),
374        }
375    }
376
377    fn build_single_account_upsert_statement(
378        client: &mut Client,
379        config: &GeyserPluginPostgresConfig,
380    ) -> Result<Statement, GeyserPluginError> {
381        let stmt = "INSERT INTO account AS acct (pubkey, slot, owner, lamports, executable, rent_epoch, data, write_version, updated_on) \
382        VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) \
383        ON CONFLICT (pubkey) DO UPDATE SET slot=excluded.slot, owner=excluded.owner, lamports=excluded.lamports, executable=excluded.executable, rent_epoch=excluded.rent_epoch, \
384        data=excluded.data, write_version=excluded.write_version, updated_on=excluded.updated_on  WHERE acct.slot < excluded.slot OR (\
385        acct.slot = excluded.slot AND acct.write_version < excluded.write_version)";
386
387        let stmt = client.prepare(stmt);
388
389        match stmt {
390            Err(err) => {
391                return Err(GeyserPluginError::Custom(Box::new(GeyserPluginPostgresError::DataSchemaError {
392                    msg: format!(
393                        "Error in preparing for the accounts update PostgreSQL database: {} host: {:?} user: {:?} config: {:?}",
394                        err, config.host, config.user, config
395                    ),
396                })));
397            }
398            Ok(update_account_stmt) => Ok(update_account_stmt),
399        }
400    }
401
402    fn prepare_query_statement(
403        client: &mut Client,
404        config: &GeyserPluginPostgresConfig,
405        stmt: &str,
406    ) -> Result<Statement, GeyserPluginError> {
407        let statement = client.prepare(stmt);
408
409        match statement {
410            Err(err) => {
411                return Err(GeyserPluginError::Custom(Box::new(GeyserPluginPostgresError::DataSchemaError {
412                    msg: format!(
413                        "Error in preparing for the statement {} for PostgreSQL database: {} host: {:?} user: {:?} config: {:?}",
414                        stmt, err, config.host, config.user, config
415                    ),
416                })));
417            }
418            Ok(statement) => Ok(statement),
419        }
420    }
421
422    fn build_account_audit_insert_statement(
423        client: &mut Client,
424        config: &GeyserPluginPostgresConfig,
425    ) -> Result<Statement, GeyserPluginError> {
426        let stmt = "INSERT INTO account_audit (pubkey, slot, owner, lamports, executable, rent_epoch, data, write_version, updated_on) \
427        VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)";
428
429        let stmt = client.prepare(stmt);
430
431        match stmt {
432            Err(err) => {
433                return Err(GeyserPluginError::Custom(Box::new(GeyserPluginPostgresError::DataSchemaError {
434                    msg: format!(
435                        "Error in preparing for the account_audit update PostgreSQL database: {} host: {:?} user: {:?} config: {:?}",
436                        err, config.host, config.user, config
437                    ),
438                })));
439            }
440            Ok(stmt) => Ok(stmt),
441        }
442    }
443
444    fn build_slot_upsert_statement_with_parent(
445        client: &mut Client,
446        config: &GeyserPluginPostgresConfig,
447    ) -> Result<Statement, GeyserPluginError> {
448        let stmt = "INSERT INTO slot (slot, parent, status, updated_on) \
449        VALUES ($1, $2, $3, $4) \
450        ON CONFLICT (slot) DO UPDATE SET parent=excluded.parent, status=excluded.status, updated_on=excluded.updated_on";
451
452        let stmt = client.prepare(stmt);
453
454        match stmt {
455            Err(err) => {
456                return Err(GeyserPluginError::Custom(Box::new(GeyserPluginPostgresError::DataSchemaError {
457                    msg: format!(
458                        "Error in preparing for the slot update PostgreSQL database: {} host: {:?} user: {:?} config: {:?}",
459                        err, config.host, config.user, config
460                    ),
461                })));
462            }
463            Ok(stmt) => Ok(stmt),
464        }
465    }
466
467    fn build_slot_upsert_statement_without_parent(
468        client: &mut Client,
469        config: &GeyserPluginPostgresConfig,
470    ) -> Result<Statement, GeyserPluginError> {
471        let stmt = "INSERT INTO slot (slot, status, updated_on) \
472        VALUES ($1, $2, $3) \
473        ON CONFLICT (slot) DO UPDATE SET status=excluded.status, updated_on=excluded.updated_on";
474
475        let stmt = client.prepare(stmt);
476
477        match stmt {
478            Err(err) => {
479                return Err(GeyserPluginError::Custom(Box::new(GeyserPluginPostgresError::DataSchemaError {
480                    msg: format!(
481                        "Error in preparing for the slot update PostgreSQL database: {} host: {:?} user: {:?} config: {:?}",
482                        err, config.host, config.user, config
483                    ),
484                })));
485            }
486            Ok(stmt) => Ok(stmt),
487        }
488    }
489
490    /// Internal function for inserting an account into account_audit table.
491    fn insert_account_audit(
492        account: &DbAccountInfo,
493        statement: &Statement,
494        client: &mut Client,
495    ) -> Result<(), GeyserPluginError> {
496        let lamports = account.lamports() as i64;
497        let rent_epoch = account.rent_epoch() as i64;
498        let updated_on = Utc::now().naive_utc();
499        let result = client.execute(
500            statement,
501            &[
502                &account.pubkey(),
503                &account.slot,
504                &account.owner(),
505                &lamports,
506                &account.executable(),
507                &rent_epoch,
508                &account.data(),
509                &account.write_version(),
510                &updated_on,
511            ],
512        );
513
514        if let Err(err) = result {
515            let msg = format!(
516                "Failed to persist the insert of account_audit to the PostgreSQL database. Error: {:?}",
517                err
518            );
519            error!("{}", msg);
520            return Err(GeyserPluginError::AccountsUpdateError { msg });
521        }
522        Ok(())
523    }
524
525    /// Internal function for updating or inserting a single account
526    fn upsert_account_internal(
527        account: &DbAccountInfo,
528        statement: &Statement,
529        client: &mut Client,
530        insert_account_audit_stmt: &Option<Statement>,
531        insert_token_owner_index_stmt: &Option<Statement>,
532        insert_token_mint_index_stmt: &Option<Statement>,
533    ) -> Result<(), GeyserPluginError> {
534        let lamports = account.lamports() as i64;
535        let rent_epoch = account.rent_epoch() as i64;
536        let updated_on = Utc::now().naive_utc();
537        let result = client.execute(
538            statement,
539            &[
540                &account.pubkey(),
541                &account.slot,
542                &account.owner(),
543                &lamports,
544                &account.executable(),
545                &rent_epoch,
546                &account.data(),
547                &account.write_version(),
548                &updated_on,
549            ],
550        );
551
552        if let Err(err) = result {
553            let msg = format!(
554                "Failed to persist the update of account to the PostgreSQL database. Error: {:?}",
555                err
556            );
557            error!("{}", msg);
558            return Err(GeyserPluginError::AccountsUpdateError { msg });
559        } else if result.unwrap() == 0 && insert_account_audit_stmt.is_some() {
560            // If no records modified (inserted or updated), it is because the account is updated
561            // at an older slot, insert the record directly into the account_audit table.
562            let statement = insert_account_audit_stmt.as_ref().unwrap();
563            Self::insert_account_audit(account, statement, client)?;
564        }
565
566        if let Some(insert_token_owner_index_stmt) = insert_token_owner_index_stmt {
567            Self::update_token_owner_index(client, insert_token_owner_index_stmt, account)?;
568        }
569
570        if let Some(insert_token_mint_index_stmt) = insert_token_mint_index_stmt {
571            Self::update_token_mint_index(client, insert_token_mint_index_stmt, account)?;
572        }
573
574        Ok(())
575    }
576
577    /// Update or insert a single account
578    fn upsert_account(&mut self, account: &DbAccountInfo) -> Result<(), GeyserPluginError> {
579        let client = self.client.get_mut().unwrap();
580        let insert_account_audit_stmt = &client.insert_account_audit_stmt;
581        let statement = &client.update_account_stmt;
582        let insert_token_owner_index_stmt = &client.insert_token_owner_index_stmt;
583        let insert_token_mint_index_stmt = &client.insert_token_mint_index_stmt;
584        let client = &mut client.client;
585        Self::upsert_account_internal(
586            account,
587            statement,
588            client,
589            insert_account_audit_stmt,
590            insert_token_owner_index_stmt,
591            insert_token_mint_index_stmt,
592        )?;
593
594        Ok(())
595    }
596
597    /// Insert accounts in batch to reduce network overhead
598    fn insert_accounts_in_batch(
599        &mut self,
600        account: DbAccountInfo,
601    ) -> Result<(), GeyserPluginError> {
602        self.queue_secondary_indexes(&account);
603        self.pending_account_updates.push(account);
604
605        self.bulk_insert_accounts()?;
606        self.bulk_insert_token_owner_index()?;
607        self.bulk_insert_token_mint_index()
608    }
609
610    fn bulk_insert_accounts(&mut self) -> Result<(), GeyserPluginError> {
611        if self.pending_account_updates.len() == self.batch_size {
612            let mut measure = Measure::start("geyser-plugin-postgres-prepare-values");
613
614            let mut values: Vec<&(dyn types::ToSql + Sync)> =
615                Vec::with_capacity(self.batch_size * ACCOUNT_COLUMN_COUNT);
616            let updated_on = Utc::now().naive_utc();
617            for j in 0..self.batch_size {
618                let account = &self.pending_account_updates[j];
619
620                values.push(&account.pubkey);
621                values.push(&account.slot);
622                values.push(&account.owner);
623                values.push(&account.lamports);
624                values.push(&account.executable);
625                values.push(&account.rent_epoch);
626                values.push(&account.data);
627                values.push(&account.write_version);
628                values.push(&updated_on);
629            }
630            measure.stop();
631            inc_new_counter_debug!(
632                "geyser-plugin-postgres-prepare-values-us",
633                measure.as_us() as usize,
634                10000,
635                10000
636            );
637
638            let mut measure = Measure::start("geyser-plugin-postgres-update-account");
639            let client = self.client.get_mut().unwrap();
640            let result = client
641                .client
642                .query(&client.bulk_account_insert_stmt, &values);
643
644            self.pending_account_updates.clear();
645
646            if let Err(err) = result {
647                let msg = format!(
648                    "Failed to persist the update of account to the PostgreSQL database. Error: {:?}",
649                    err
650                );
651                error!("{}", msg);
652                return Err(GeyserPluginError::AccountsUpdateError { msg });
653            }
654
655            measure.stop();
656            inc_new_counter_debug!(
657                "geyser-plugin-postgres-update-account-us",
658                measure.as_us() as usize,
659                10000,
660                10000
661            );
662            inc_new_counter_debug!(
663                "geyser-plugin-postgres-update-account-count",
664                self.batch_size,
665                10000,
666                10000
667            );
668        }
669        Ok(())
670    }
671
672    /// Flush any left over accounts in batch which are not processed in the last batch
673    fn flush_buffered_writes(&mut self) -> Result<(), GeyserPluginError> {
674        if self.pending_account_updates.is_empty() {
675            return Ok(());
676        }
677
678        let client = self.client.get_mut().unwrap();
679        let insert_account_audit_stmt = &client.insert_account_audit_stmt;
680        let statement = &client.update_account_stmt;
681        let insert_token_owner_index_stmt = &client.insert_token_owner_index_stmt;
682        let insert_token_mint_index_stmt = &client.insert_token_mint_index_stmt;
683        let insert_slot_stmt = &client.update_slot_without_parent_stmt;
684        let client = &mut client.client;
685
686        for account in self.pending_account_updates.drain(..) {
687            Self::upsert_account_internal(
688                &account,
689                statement,
690                client,
691                insert_account_audit_stmt,
692                insert_token_owner_index_stmt,
693                insert_token_mint_index_stmt,
694            )?;
695        }
696
697        let mut measure = Measure::start("geyser-plugin-postgres-flush-slots-us");
698
699        for slot in &self.slots_at_startup {
700            Self::upsert_slot_status_internal(
701                *slot,
702                None,
703                SlotStatus::Rooted,
704                client,
705                insert_slot_stmt,
706            )?;
707        }
708        measure.stop();
709
710        datapoint_info!(
711            "geyser_plugin_notify_account_restore_from_snapshot_summary",
712            ("flush_slots-us", measure.as_us(), i64),
713            ("flush-slots-counts", self.slots_at_startup.len(), i64),
714        );
715
716        self.slots_at_startup.clear();
717        self.clear_buffered_indexes();
718        Ok(())
719    }
720
721    fn upsert_slot_status_internal(
722        slot: u64,
723        parent: Option<u64>,
724        status: SlotStatus,
725        client: &mut Client,
726        statement: &Statement,
727    ) -> Result<(), GeyserPluginError> {
728        let slot = slot as i64; // postgres only supports i64
729        let parent = parent.map(|parent| parent as i64);
730        let updated_on = Utc::now().naive_utc();
731        let status_str = status.as_str();
732
733        let result = match parent {
734            Some(parent) => client.execute(statement, &[&slot, &parent, &status_str, &updated_on]),
735            None => client.execute(statement, &[&slot, &status_str, &updated_on]),
736        };
737
738        match result {
739            Err(err) => {
740                let msg = format!(
741                    "Failed to persist the update of slot to the PostgreSQL database. Error: {:?}",
742                    err
743                );
744                error!("{:?}", msg);
745                return Err(GeyserPluginError::SlotStatusUpdateError { msg });
746            }
747            Ok(rows) => {
748                assert_eq!(1, rows, "Expected one rows to be updated a time");
749            }
750        }
751
752        Ok(())
753    }
754
755    pub fn new(config: &GeyserPluginPostgresConfig) -> Result<Self, GeyserPluginError> {
756        info!("Creating SimplePostgresClient...");
757        let mut client = Self::connect_to_db(config)?;
758        let bulk_account_insert_stmt =
759            Self::build_bulk_account_insert_statement(&mut client, config)?;
760        let update_account_stmt = Self::build_single_account_upsert_statement(&mut client, config)?;
761
762        let update_slot_with_parent_stmt =
763            Self::build_slot_upsert_statement_with_parent(&mut client, config)?;
764        let update_slot_without_parent_stmt =
765            Self::build_slot_upsert_statement_without_parent(&mut client, config)?;
766        let update_transaction_log_stmt =
767            Self::build_transaction_info_upsert_statement(&mut client, config)?;
768        let update_block_metadata_stmt =
769            Self::build_block_metadata_upsert_statement(&mut client, config)?;
770
771        let batch_size = config
772            .batch_size
773            .unwrap_or(DEFAULT_ACCOUNTS_INSERT_BATCH_SIZE);
774
775        let store_account_historical_data = config
776            .store_account_historical_data
777            .unwrap_or(DEFAULT_STORE_ACCOUNT_HISTORICAL_DATA);
778
779        let insert_account_audit_stmt = if store_account_historical_data {
780            let stmt = Self::build_account_audit_insert_statement(&mut client, config)?;
781            Some(stmt)
782        } else {
783            None
784        };
785
786        let bulk_insert_token_owner_index_stmt = if let Some(true) = config.index_token_owner {
787            let stmt = Self::build_bulk_token_owner_index_insert_statement(&mut client, config)?;
788            Some(stmt)
789        } else {
790            None
791        };
792
793        let bulk_insert_token_mint_index_stmt = if let Some(true) = config.index_token_mint {
794            let stmt = Self::build_bulk_token_mint_index_insert_statement(&mut client, config)?;
795            Some(stmt)
796        } else {
797            None
798        };
799
800        let insert_token_owner_index_stmt = if let Some(true) = config.index_token_owner {
801            Some(Self::build_single_token_owner_index_upsert_statement(
802                &mut client,
803                config,
804            )?)
805        } else {
806            None
807        };
808
809        let insert_token_mint_index_stmt = if let Some(true) = config.index_token_mint {
810            Some(Self::build_single_token_mint_index_upsert_statement(
811                &mut client,
812                config,
813            )?)
814        } else {
815            None
816        };
817
818        info!("Created SimplePostgresClient.");
819        Ok(Self {
820            batch_size,
821            pending_account_updates: Vec::with_capacity(batch_size),
822            client: Mutex::new(PostgresSqlClientWrapper {
823                client,
824                update_account_stmt,
825                bulk_account_insert_stmt,
826                update_slot_with_parent_stmt,
827                update_slot_without_parent_stmt,
828                update_transaction_log_stmt,
829                update_block_metadata_stmt,
830                insert_account_audit_stmt,
831                insert_token_owner_index_stmt,
832                insert_token_mint_index_stmt,
833                bulk_insert_token_owner_index_stmt,
834                bulk_insert_token_mint_index_stmt,
835            }),
836            index_token_owner: config.index_token_owner.unwrap_or_default(),
837            index_token_mint: config.index_token_mint.unwrap_or(false),
838            pending_token_owner_index: Vec::with_capacity(batch_size),
839            pending_token_mint_index: Vec::with_capacity(batch_size),
840            slots_at_startup: HashSet::default(),
841        })
842    }
843}
844
845impl PostgresClient for SimplePostgresClient {
846    fn update_account(
847        &mut self,
848        account: DbAccountInfo,
849        is_startup: bool,
850    ) -> Result<(), GeyserPluginError> {
851        trace!(
852            "Updating account {} with owner {} at slot {}",
853            bs58::encode(account.pubkey()).into_string(),
854            bs58::encode(account.owner()).into_string(),
855            account.slot,
856        );
857        if !is_startup {
858            return self.upsert_account(&account);
859        }
860        self.slots_at_startup.insert(account.slot as u64);
861        self.insert_accounts_in_batch(account)
862    }
863
864    fn update_slot_status(
865        &mut self,
866        slot: u64,
867        parent: Option<u64>,
868        status: SlotStatus,
869    ) -> Result<(), GeyserPluginError> {
870        info!("Updating slot {:?} at with status {:?}", slot, status);
871
872        let client = self.client.get_mut().unwrap();
873
874        let statement = match parent {
875            Some(_) => &client.update_slot_with_parent_stmt,
876            None => &client.update_slot_without_parent_stmt,
877        };
878
879        Self::upsert_slot_status_internal(slot, parent, status, &mut client.client, statement)
880    }
881
882    fn notify_end_of_startup(&mut self) -> Result<(), GeyserPluginError> {
883        self.flush_buffered_writes()
884    }
885
886    fn log_transaction(
887        &mut self,
888        transaction_log_info: LogTransactionRequest,
889    ) -> Result<(), GeyserPluginError> {
890        self.log_transaction_impl(transaction_log_info)
891    }
892
893    fn update_block_metadata(
894        &mut self,
895        block_info: UpdateBlockMetadataRequest,
896    ) -> Result<(), GeyserPluginError> {
897        self.update_block_metadata_impl(block_info)
898    }
899}
900
901struct UpdateAccountRequest {
902    account: DbAccountInfo,
903    is_startup: bool,
904}
905
906struct UpdateSlotRequest {
907    slot: u64,
908    parent: Option<u64>,
909    slot_status: SlotStatus,
910}
911
912pub struct UpdateBlockMetadataRequest {
913    pub block_info: DbBlockInfo,
914}
915
916#[warn(clippy::large_enum_variant)]
917enum DbWorkItem {
918    UpdateAccount(Box<UpdateAccountRequest>),
919    UpdateSlot(Box<UpdateSlotRequest>),
920    LogTransaction(Box<LogTransactionRequest>),
921    UpdateBlockMetadata(Box<UpdateBlockMetadataRequest>),
922}
923
924impl PostgresClientWorker {
925    fn new(config: GeyserPluginPostgresConfig) -> Result<Self, GeyserPluginError> {
926        let result = SimplePostgresClient::new(&config);
927        match result {
928            Ok(client) => Ok(PostgresClientWorker {
929                client,
930                is_startup_done: false,
931            }),
932            Err(err) => {
933                error!("Error in creating SimplePostgresClient: {}", err);
934                Err(err)
935            }
936        }
937    }
938
939    fn do_work(
940        &mut self,
941        receiver: Receiver<DbWorkItem>,
942        exit_worker: Arc<AtomicBool>,
943        is_startup_done: Arc<AtomicBool>,
944        startup_done_count: Arc<AtomicUsize>,
945        panic_on_db_errors: bool,
946    ) -> Result<(), GeyserPluginError> {
947        while !exit_worker.load(Ordering::Relaxed) {
948            let mut measure = Measure::start("geyser-plugin-postgres-worker-recv");
949            let work = receiver.recv_timeout(Duration::from_millis(500));
950            measure.stop();
951            inc_new_counter_debug!(
952                "geyser-plugin-postgres-worker-recv-us",
953                measure.as_us() as usize,
954                100000,
955                100000
956            );
957            match work {
958                Ok(work) => match work {
959                    DbWorkItem::UpdateAccount(request) => {
960                        if let Err(err) = self
961                            .client
962                            .update_account(request.account, request.is_startup)
963                        {
964                            error!("Failed to update account: ({})", err);
965                            if panic_on_db_errors {
966                                abort();
967                            }
968                        }
969                    }
970                    DbWorkItem::UpdateSlot(request) => {
971                        if let Err(err) = self.client.update_slot_status(
972                            request.slot,
973                            request.parent,
974                            request.slot_status,
975                        ) {
976                            error!("Failed to update slot: ({})", err);
977                            if panic_on_db_errors {
978                                abort();
979                            }
980                        }
981                    }
982                    DbWorkItem::LogTransaction(transaction_log_info) => {
983                        if let Err(err) = self.client.log_transaction(*transaction_log_info) {
984                            error!("Failed to update transaction: ({})", err);
985                            if panic_on_db_errors {
986                                abort();
987                            }
988                        }
989                    }
990                    DbWorkItem::UpdateBlockMetadata(block_info) => {
991                        if let Err(err) = self.client.update_block_metadata(*block_info) {
992                            error!("Failed to update block metadata: ({})", err);
993                            if panic_on_db_errors {
994                                abort();
995                            }
996                        }
997                    }
998                },
999                Err(err) => match err {
1000                    RecvTimeoutError::Timeout => {
1001                        if !self.is_startup_done && is_startup_done.load(Ordering::Relaxed) {
1002                            if let Err(err) = self.client.notify_end_of_startup() {
1003                                error!("Error in notifying end of startup: ({})", err);
1004                                if panic_on_db_errors {
1005                                    abort();
1006                                }
1007                            }
1008                            self.is_startup_done = true;
1009                            startup_done_count.fetch_add(1, Ordering::Relaxed);
1010                        }
1011
1012                        continue;
1013                    }
1014                    _ => {
1015                        error!("Error in receiving the item {:?}", err);
1016                        if panic_on_db_errors {
1017                            abort();
1018                        }
1019                        break;
1020                    }
1021                },
1022            }
1023        }
1024        Ok(())
1025    }
1026}
1027pub struct ParallelPostgresClient {
1028    workers: Vec<JoinHandle<Result<(), GeyserPluginError>>>,
1029    exit_worker: Arc<AtomicBool>,
1030    is_startup_done: Arc<AtomicBool>,
1031    startup_done_count: Arc<AtomicUsize>,
1032    initialized_worker_count: Arc<AtomicUsize>,
1033    sender: Sender<DbWorkItem>,
1034    last_report: AtomicInterval,
1035}
1036
1037impl ParallelPostgresClient {
1038    pub fn new(config: &GeyserPluginPostgresConfig) -> Result<Self, GeyserPluginError> {
1039        info!("Creating ParallelPostgresClient...");
1040        let (sender, receiver) = bounded(MAX_ASYNC_REQUESTS);
1041        let exit_worker = Arc::new(AtomicBool::new(false));
1042        let mut workers = Vec::default();
1043        let is_startup_done = Arc::new(AtomicBool::new(false));
1044        let startup_done_count = Arc::new(AtomicUsize::new(0));
1045        let worker_count = config.threads.unwrap_or(DEFAULT_THREADS_COUNT);
1046        let initialized_worker_count = Arc::new(AtomicUsize::new(0));
1047        for i in 0..worker_count {
1048            let cloned_receiver = receiver.clone();
1049            let exit_clone = exit_worker.clone();
1050            let is_startup_done_clone = is_startup_done.clone();
1051            let startup_done_count_clone = startup_done_count.clone();
1052            let initialized_worker_count_clone = initialized_worker_count.clone();
1053            let config = config.clone();
1054            let worker = Builder::new()
1055                .name(format!("worker-{}", i))
1056                .spawn(move || -> Result<(), GeyserPluginError> {
1057                    let panic_on_db_errors = *config
1058                        .panic_on_db_errors
1059                        .as_ref()
1060                        .unwrap_or(&DEFAULT_PANIC_ON_DB_ERROR);
1061                    let result = PostgresClientWorker::new(config);
1062
1063                    match result {
1064                        Ok(mut worker) => {
1065                            initialized_worker_count_clone.fetch_add(1, Ordering::Relaxed);
1066                            worker.do_work(
1067                                cloned_receiver,
1068                                exit_clone,
1069                                is_startup_done_clone,
1070                                startup_done_count_clone,
1071                                panic_on_db_errors,
1072                            )?;
1073                            Ok(())
1074                        }
1075                        Err(err) => {
1076                            error!("Error when making connection to database: ({})", err);
1077                            if panic_on_db_errors {
1078                                abort();
1079                            }
1080                            Err(err)
1081                        }
1082                    }
1083                })
1084                .unwrap();
1085
1086            workers.push(worker);
1087        }
1088
1089        info!("Created ParallelPostgresClient.");
1090        Ok(Self {
1091            last_report: AtomicInterval::default(),
1092            workers,
1093            exit_worker,
1094            is_startup_done,
1095            startup_done_count,
1096            initialized_worker_count,
1097            sender,
1098        })
1099    }
1100
1101    pub fn join(&mut self) -> thread::Result<()> {
1102        self.exit_worker.store(true, Ordering::Relaxed);
1103        while !self.workers.is_empty() {
1104            let worker = self.workers.pop();
1105            if worker.is_none() {
1106                break;
1107            }
1108            let worker = worker.unwrap();
1109            let result = worker.join().unwrap();
1110            if result.is_err() {
1111                error!("The worker thread has failed: {:?}", result);
1112            }
1113        }
1114
1115        Ok(())
1116    }
1117
1118    pub fn update_account(
1119        &mut self,
1120        account: &ReplicaAccountInfo,
1121        slot: u64,
1122        is_startup: bool,
1123    ) -> Result<(), GeyserPluginError> {
1124        if self.last_report.should_update(30000) {
1125            datapoint_debug!(
1126                "postgres-plugin-stats",
1127                ("message-queue-length", self.sender.len() as i64, i64),
1128            );
1129        }
1130        let mut measure = Measure::start("geyser-plugin-posgres-create-work-item");
1131        let wrk_item = DbWorkItem::UpdateAccount(Box::new(UpdateAccountRequest {
1132            account: DbAccountInfo::new(account, slot),
1133            is_startup,
1134        }));
1135
1136        measure.stop();
1137
1138        inc_new_counter_debug!(
1139            "geyser-plugin-posgres-create-work-item-us",
1140            measure.as_us() as usize,
1141            100000,
1142            100000
1143        );
1144
1145        let mut measure = Measure::start("geyser-plugin-posgres-send-msg");
1146
1147        if let Err(err) = self.sender.send(wrk_item) {
1148            return Err(GeyserPluginError::AccountsUpdateError {
1149                msg: format!(
1150                    "Failed to update the account {:?}, error: {:?}",
1151                    bs58::encode(account.pubkey()).into_string(),
1152                    err
1153                ),
1154            });
1155        }
1156
1157        measure.stop();
1158        inc_new_counter_debug!(
1159            "geyser-plugin-posgres-send-msg-us",
1160            measure.as_us() as usize,
1161            100000,
1162            100000
1163        );
1164
1165        Ok(())
1166    }
1167
1168    pub fn update_slot_status(
1169        &mut self,
1170        slot: u64,
1171        parent: Option<u64>,
1172        status: SlotStatus,
1173    ) -> Result<(), GeyserPluginError> {
1174        if let Err(err) = self
1175            .sender
1176            .send(DbWorkItem::UpdateSlot(Box::new(UpdateSlotRequest {
1177                slot,
1178                parent,
1179                slot_status: status,
1180            })))
1181        {
1182            return Err(GeyserPluginError::SlotStatusUpdateError {
1183                msg: format!("Failed to update the slot {:?}, error: {:?}", slot, err),
1184            });
1185        }
1186        Ok(())
1187    }
1188
1189    pub fn update_block_metadata(
1190        &mut self,
1191        block_info: &ReplicaBlockInfo,
1192    ) -> Result<(), GeyserPluginError> {
1193        if let Err(err) = self.sender.send(DbWorkItem::UpdateBlockMetadata(Box::new(
1194            UpdateBlockMetadataRequest {
1195                block_info: DbBlockInfo::from(block_info),
1196            },
1197        ))) {
1198            return Err(GeyserPluginError::SlotStatusUpdateError {
1199                msg: format!(
1200                    "Failed to update the block metadata at slot {:?}, error: {:?}",
1201                    block_info.slot, err
1202                ),
1203            });
1204        }
1205        Ok(())
1206    }
1207
1208    pub fn notify_end_of_startup(&mut self) -> Result<(), GeyserPluginError> {
1209        info!("Notifying the end of startup");
1210        // Ensure all items in the queue has been received by the workers
1211        while !self.sender.is_empty() {
1212            sleep(Duration::from_millis(100));
1213        }
1214        self.is_startup_done.store(true, Ordering::Relaxed);
1215
1216        // Wait for all worker threads to be done with flushing
1217        while self.startup_done_count.load(Ordering::Relaxed)
1218            != self.initialized_worker_count.load(Ordering::Relaxed)
1219        {
1220            info!(
1221                "Startup done count: {}, good worker thread count: {}",
1222                self.startup_done_count.load(Ordering::Relaxed),
1223                self.initialized_worker_count.load(Ordering::Relaxed)
1224            );
1225            sleep(Duration::from_millis(100));
1226        }
1227
1228        info!("Done with notifying the end of startup");
1229        Ok(())
1230    }
1231}
1232
1233pub struct PostgresClientBuilder {}
1234
1235impl PostgresClientBuilder {
1236    pub fn build_pararallel_postgres_client(
1237        config: &GeyserPluginPostgresConfig,
1238    ) -> Result<ParallelPostgresClient, GeyserPluginError> {
1239        ParallelPostgresClient::new(config)
1240    }
1241
1242    pub fn build_simple_postgres_client(
1243        config: &GeyserPluginPostgresConfig,
1244    ) -> Result<SimplePostgresClient, GeyserPluginError> {
1245        SimplePostgresClient::new(config)
1246    }
1247}