solana_geyser_plugin_postgres/postgres_client/
postgres_client_account_index.rs

1use {
2    super::{
3        DbAccountInfo, ReadableAccountInfo, SimplePostgresClient,
4        DEFAULT_ACCOUNTS_INSERT_BATCH_SIZE,
5    },
6    crate::{
7        geyser_plugin_postgres::{GeyserPluginPostgresConfig, GeyserPluginPostgresError},
8        inline_spl_token::{self, GenericTokenAccount},
9        inline_spl_token_2022,
10    },
11    log::*,
12    postgres::{Client, Statement},
13    solana_geyser_plugin_interface::geyser_plugin_interface::GeyserPluginError,
14    solana_measure::measure::Measure,
15    solana_metrics::*,
16    solana_sdk::pubkey::Pubkey,
17    tokio_postgres::types,
18};
19
20const TOKEN_INDEX_COLUMN_COUNT: usize = 3;
21/// Struct for the secondary index for both token account's owner and mint index,
22pub struct TokenSecondaryIndexEntry {
23    /// In case of token owner, the secondary key is the Pubkey of the owner and in case of
24    /// token index the secondary_key is the Pubkey of mint.
25    secondary_key: Vec<u8>,
26
27    /// The Pubkey of the account
28    account_key: Vec<u8>,
29
30    /// Record the slot at which the index entry is created.
31    slot: i64,
32}
33
34impl SimplePostgresClient {
35    pub fn build_single_token_owner_index_upsert_statement(
36        client: &mut Client,
37        config: &GeyserPluginPostgresConfig,
38    ) -> Result<Statement, GeyserPluginError> {
39        const BULK_OWNER_INDEX_INSERT_STATEMENT: &str =
40            "INSERT INTO spl_token_owner_index AS owner_index (owner_key, account_key, slot) \
41        VALUES ($1, $2, $3) \
42        ON CONFLICT (owner_key, account_key) \
43        DO UPDATE SET slot=excluded.slot \
44        WHERE owner_index.slot < excluded.slot";
45
46        Self::prepare_query_statement(client, config, BULK_OWNER_INDEX_INSERT_STATEMENT)
47    }
48
49    pub fn build_single_token_mint_index_upsert_statement(
50        client: &mut Client,
51        config: &GeyserPluginPostgresConfig,
52    ) -> Result<Statement, GeyserPluginError> {
53        const BULK_MINT_INDEX_INSERT_STATEMENT: &str =
54            "INSERT INTO spl_token_mint_index AS mint_index (mint_key, account_key, slot) \
55        VALUES ($1, $2, $3) \
56        ON CONFLICT (mint_key, account_key) \
57        DO UPDATE SET slot=excluded.slot \
58        WHERE mint_index.slot < excluded.slot";
59
60        Self::prepare_query_statement(client, config, BULK_MINT_INDEX_INSERT_STATEMENT)
61    }
62
63    /// Common build the token mint index bulk insert statement.
64    pub fn build_bulk_token_index_insert_statement_common(
65        client: &mut Client,
66        table: &str,
67        source_key_name: &str,
68        config: &GeyserPluginPostgresConfig,
69    ) -> Result<Statement, GeyserPluginError> {
70        let batch_size = config
71            .batch_size
72            .unwrap_or(DEFAULT_ACCOUNTS_INSERT_BATCH_SIZE);
73        let mut stmt = format!(
74            "INSERT INTO {} AS index ({}, account_key, slot) VALUES",
75            table, source_key_name
76        );
77        for j in 0..batch_size {
78            let row = j * TOKEN_INDEX_COLUMN_COUNT;
79            let val_str = format!("(${}, ${}, ${})", row + 1, row + 2, row + 3);
80
81            if j == 0 {
82                stmt = format!("{} {}", &stmt, val_str);
83            } else {
84                stmt = format!("{}, {}", &stmt, val_str);
85            }
86        }
87
88        let handle_conflict = format!(
89            "ON CONFLICT ({}, account_key) DO UPDATE SET slot=excluded.slot where index.slot < excluded.slot",
90            source_key_name);
91
92        stmt = format!("{} {}", stmt, handle_conflict);
93
94        info!("{}", stmt);
95        let bulk_stmt = client.prepare(&stmt);
96
97        match bulk_stmt {
98            Err(err) => {
99                return Err(GeyserPluginError::Custom(Box::new(GeyserPluginPostgresError::DataSchemaError {
100                    msg: format!(
101                        "Error in preparing for the {} index update PostgreSQL database: {} host: {:?} user: {:?} config: {:?}",
102                        table, err, config.host, config.user, config
103                    ),
104                })));
105            }
106            Ok(statement) => Ok(statement),
107        }
108    }
109
110    /// Build the token owner index bulk insert statement
111    pub fn build_bulk_token_owner_index_insert_statement(
112        client: &mut Client,
113        config: &GeyserPluginPostgresConfig,
114    ) -> Result<Statement, GeyserPluginError> {
115        Self::build_bulk_token_index_insert_statement_common(
116            client,
117            "spl_token_owner_index",
118            "owner_key",
119            config,
120        )
121    }
122
123    /// Build the token mint index bulk insert statement.
124    pub fn build_bulk_token_mint_index_insert_statement(
125        client: &mut Client,
126        config: &GeyserPluginPostgresConfig,
127    ) -> Result<Statement, GeyserPluginError> {
128        Self::build_bulk_token_index_insert_statement_common(
129            client,
130            "spl_token_mint_index",
131            "mint_key",
132            config,
133        )
134    }
135
136    /// Execute the common token bulk insert query.
137    fn bulk_insert_token_index_common(
138        batch_size: usize,
139        client: &mut Client,
140        index_entries: &mut Vec<TokenSecondaryIndexEntry>,
141        query: &Statement,
142    ) -> Result<(), GeyserPluginError> {
143        if index_entries.len() == batch_size {
144            let mut measure = Measure::start("geyser-plugin-postgres-prepare-index-values");
145
146            let mut values: Vec<&(dyn types::ToSql + Sync)> =
147                Vec::with_capacity(batch_size * TOKEN_INDEX_COLUMN_COUNT);
148            for index in index_entries.iter().take(batch_size) {
149                values.push(&index.secondary_key);
150                values.push(&index.account_key);
151                values.push(&index.slot);
152            }
153            measure.stop();
154            inc_new_counter_debug!(
155                "geyser-plugin-postgres-prepare-index-values-us",
156                measure.as_us() as usize,
157                10000,
158                10000
159            );
160
161            let mut measure = Measure::start("geyser-plugin-postgres-update-index-account");
162            let result = client.query(query, &values);
163
164            index_entries.clear();
165
166            if let Err(err) = result {
167                let msg = format!(
168                    "Failed to persist the update of account to the PostgreSQL database. Error: {:?}",
169                    err
170                );
171                error!("{}", msg);
172                return Err(GeyserPluginError::AccountsUpdateError { msg });
173            }
174
175            measure.stop();
176            inc_new_counter_debug!(
177                "geyser-plugin-postgres-update-index-us",
178                measure.as_us() as usize,
179                10000,
180                10000
181            );
182            inc_new_counter_debug!(
183                "geyser-plugin-postgres-update-index-count",
184                batch_size,
185                10000,
186                10000
187            );
188        }
189        Ok(())
190    }
191
192    /// Execute the token owner bulk insert query.
193    pub fn bulk_insert_token_owner_index(&mut self) -> Result<(), GeyserPluginError> {
194        let client = self.client.get_mut().unwrap();
195        if client.bulk_insert_token_owner_index_stmt.is_none() {
196            return Ok(());
197        }
198        let query = client.bulk_insert_token_owner_index_stmt.as_ref().unwrap();
199        Self::bulk_insert_token_index_common(
200            self.batch_size,
201            &mut client.client,
202            &mut self.pending_token_owner_index,
203            query,
204        )
205    }
206
207    /// Execute the token mint index bulk insert query.
208    pub fn bulk_insert_token_mint_index(&mut self) -> Result<(), GeyserPluginError> {
209        let client = self.client.get_mut().unwrap();
210        if client.bulk_insert_token_mint_index_stmt.is_none() {
211            return Ok(());
212        }
213        let query = client.bulk_insert_token_mint_index_stmt.as_ref().unwrap();
214        Self::bulk_insert_token_index_common(
215            self.batch_size,
216            &mut client.client,
217            &mut self.pending_token_mint_index,
218            query,
219        )
220    }
221
222    /// Generic function to queue the token owner index for bulk insert.
223    fn queue_token_owner_index_generic<G: GenericTokenAccount>(
224        &mut self,
225        token_id: &Pubkey,
226        account: &DbAccountInfo,
227    ) {
228        if account.owner() == token_id.as_ref() {
229            if let Some(owner_key) = G::unpack_account_owner(account.data()) {
230                let owner_key = owner_key.as_ref().to_vec();
231                let pubkey = account.pubkey();
232                self.pending_token_owner_index
233                    .push(TokenSecondaryIndexEntry {
234                        secondary_key: owner_key,
235                        account_key: pubkey.to_vec(),
236                        slot: account.slot,
237                    });
238            }
239        }
240    }
241
242    /// Generic function to queue the token mint index for bulk insert.
243    fn queue_token_mint_index_generic<G: GenericTokenAccount>(
244        &mut self,
245        token_id: &Pubkey,
246        account: &DbAccountInfo,
247    ) {
248        if account.owner() == token_id.as_ref() {
249            if let Some(mint_key) = G::unpack_account_mint(account.data()) {
250                let mint_key = mint_key.as_ref().to_vec();
251                let pubkey = account.pubkey();
252                self.pending_token_mint_index
253                    .push(TokenSecondaryIndexEntry {
254                        secondary_key: mint_key,
255                        account_key: pubkey.to_vec(),
256                        slot: account.slot,
257                    })
258            }
259        }
260    }
261
262    /// Queue bulk insert secondary indexes: token owner and token mint indexes.
263    pub fn queue_secondary_indexes(&mut self, account: &DbAccountInfo) {
264        if self.index_token_owner {
265            self.queue_token_owner_index_generic::<inline_spl_token::Account>(
266                &inline_spl_token::id(),
267                account,
268            );
269            self.queue_token_owner_index_generic::<inline_spl_token_2022::Account>(
270                &inline_spl_token_2022::id(),
271                account,
272            );
273        }
274
275        if self.index_token_mint {
276            self.queue_token_mint_index_generic::<inline_spl_token::Account>(
277                &inline_spl_token::id(),
278                account,
279            );
280            self.queue_token_mint_index_generic::<inline_spl_token_2022::Account>(
281                &inline_spl_token_2022::id(),
282                account,
283            );
284        }
285    }
286
287    /// Generic function to update a single token owner index.
288    fn update_token_owner_index_generic<G: GenericTokenAccount>(
289        client: &mut Client,
290        statement: &Statement,
291        token_id: &Pubkey,
292        account: &DbAccountInfo,
293    ) -> Result<(), GeyserPluginError> {
294        if account.owner() == token_id.as_ref() {
295            if let Some(owner_key) = G::unpack_account_owner(account.data()) {
296                let owner_key = owner_key.as_ref().to_vec();
297                let pubkey = account.pubkey();
298                let slot = account.slot;
299                let result = client.execute(statement, &[&owner_key, &pubkey, &slot]);
300                if let Err(err) = result {
301                    let msg = format!(
302                        "Failed to update the token owner index to the PostgreSQL database. Error: {:?}",
303                        err
304                    );
305                    error!("{}", msg);
306                    return Err(GeyserPluginError::AccountsUpdateError { msg });
307                }
308            }
309        }
310
311        Ok(())
312    }
313
314    /// Generic function to update a single token mint index.
315    fn update_token_mint_index_generic<G: GenericTokenAccount>(
316        client: &mut Client,
317        statement: &Statement,
318        token_id: &Pubkey,
319        account: &DbAccountInfo,
320    ) -> Result<(), GeyserPluginError> {
321        if account.owner() == token_id.as_ref() {
322            if let Some(mint_key) = G::unpack_account_mint(account.data()) {
323                let mint_key = mint_key.as_ref().to_vec();
324                let pubkey = account.pubkey();
325                let slot = account.slot;
326                let result = client.execute(statement, &[&mint_key, &pubkey, &slot]);
327                if let Err(err) = result {
328                    let msg = format!(
329                        "Failed to update the token mint index to the PostgreSQL database. Error: {:?}",
330                        err
331                    );
332                    error!("{}", msg);
333                    return Err(GeyserPluginError::AccountsUpdateError { msg });
334                }
335            }
336        }
337
338        Ok(())
339    }
340
341    /// Function for updating a single token owner index.
342    pub fn update_token_owner_index(
343        client: &mut Client,
344        statement: &Statement,
345        account: &DbAccountInfo,
346    ) -> Result<(), GeyserPluginError> {
347        Self::update_token_owner_index_generic::<inline_spl_token::Account>(
348            client,
349            statement,
350            &inline_spl_token::id(),
351            account,
352        )?;
353
354        Self::update_token_owner_index_generic::<inline_spl_token_2022::Account>(
355            client,
356            statement,
357            &inline_spl_token_2022::id(),
358            account,
359        )
360    }
361
362    /// Function for updating a single token mint index.
363    pub fn update_token_mint_index(
364        client: &mut Client,
365        statement: &Statement,
366        account: &DbAccountInfo,
367    ) -> Result<(), GeyserPluginError> {
368        Self::update_token_mint_index_generic::<inline_spl_token::Account>(
369            client,
370            statement,
371            &inline_spl_token::id(),
372            account,
373        )?;
374
375        Self::update_token_mint_index_generic::<inline_spl_token_2022::Account>(
376            client,
377            statement,
378            &inline_spl_token_2022::id(),
379            account,
380        )
381    }
382
383    /// Clean up the buffered indexes -- we do not need to
384    /// write them to disk individually as they have already been handled
385    /// when the accounts were flushed out individually in `upsert_account_internal`.
386    pub fn clear_buffered_indexes(&mut self) {
387        self.pending_token_owner_index.clear();
388        self.pending_token_mint_index.clear();
389    }
390}