1use {
2 super::{
3 DbAccountInfo, ReadableAccountInfo, SimplePostgresClient,
4 DEFAULT_ACCOUNTS_INSERT_BATCH_SIZE,
5 },
6 crate::{
7 geyser_plugin_postgres::{GeyserPluginPostgresConfig, GeyserPluginPostgresError},
8 inline_spl_token::{self, GenericTokenAccount},
9 inline_spl_token_2022,
10 },
11 log::*,
12 postgres::{Client, Statement},
13 solana_geyser_plugin_interface::geyser_plugin_interface::GeyserPluginError,
14 solana_measure::measure::Measure,
15 solana_metrics::*,
16 solana_sdk::pubkey::Pubkey,
17 tokio_postgres::types,
18};
19
20const TOKEN_INDEX_COLUMN_COUNT: usize = 3;
21pub struct TokenSecondaryIndexEntry {
23 secondary_key: Vec<u8>,
26
27 account_key: Vec<u8>,
29
30 slot: i64,
32}
33
34impl SimplePostgresClient {
35 pub fn build_single_token_owner_index_upsert_statement(
36 client: &mut Client,
37 config: &GeyserPluginPostgresConfig,
38 ) -> Result<Statement, GeyserPluginError> {
39 const BULK_OWNER_INDEX_INSERT_STATEMENT: &str =
40 "INSERT INTO spl_token_owner_index AS owner_index (owner_key, account_key, slot) \
41 VALUES ($1, $2, $3) \
42 ON CONFLICT (owner_key, account_key) \
43 DO UPDATE SET slot=excluded.slot \
44 WHERE owner_index.slot < excluded.slot";
45
46 Self::prepare_query_statement(client, config, BULK_OWNER_INDEX_INSERT_STATEMENT)
47 }
48
49 pub fn build_single_token_mint_index_upsert_statement(
50 client: &mut Client,
51 config: &GeyserPluginPostgresConfig,
52 ) -> Result<Statement, GeyserPluginError> {
53 const BULK_MINT_INDEX_INSERT_STATEMENT: &str =
54 "INSERT INTO spl_token_mint_index AS mint_index (mint_key, account_key, slot) \
55 VALUES ($1, $2, $3) \
56 ON CONFLICT (mint_key, account_key) \
57 DO UPDATE SET slot=excluded.slot \
58 WHERE mint_index.slot < excluded.slot";
59
60 Self::prepare_query_statement(client, config, BULK_MINT_INDEX_INSERT_STATEMENT)
61 }
62
63 pub fn build_bulk_token_index_insert_statement_common(
65 client: &mut Client,
66 table: &str,
67 source_key_name: &str,
68 config: &GeyserPluginPostgresConfig,
69 ) -> Result<Statement, GeyserPluginError> {
70 let batch_size = config
71 .batch_size
72 .unwrap_or(DEFAULT_ACCOUNTS_INSERT_BATCH_SIZE);
73 let mut stmt = format!(
74 "INSERT INTO {} AS index ({}, account_key, slot) VALUES",
75 table, source_key_name
76 );
77 for j in 0..batch_size {
78 let row = j * TOKEN_INDEX_COLUMN_COUNT;
79 let val_str = format!("(${}, ${}, ${})", row + 1, row + 2, row + 3);
80
81 if j == 0 {
82 stmt = format!("{} {}", &stmt, val_str);
83 } else {
84 stmt = format!("{}, {}", &stmt, val_str);
85 }
86 }
87
88 let handle_conflict = format!(
89 "ON CONFLICT ({}, account_key) DO UPDATE SET slot=excluded.slot where index.slot < excluded.slot",
90 source_key_name);
91
92 stmt = format!("{} {}", stmt, handle_conflict);
93
94 info!("{}", stmt);
95 let bulk_stmt = client.prepare(&stmt);
96
97 match bulk_stmt {
98 Err(err) => {
99 return Err(GeyserPluginError::Custom(Box::new(GeyserPluginPostgresError::DataSchemaError {
100 msg: format!(
101 "Error in preparing for the {} index update PostgreSQL database: {} host: {:?} user: {:?} config: {:?}",
102 table, err, config.host, config.user, config
103 ),
104 })));
105 }
106 Ok(statement) => Ok(statement),
107 }
108 }
109
110 pub fn build_bulk_token_owner_index_insert_statement(
112 client: &mut Client,
113 config: &GeyserPluginPostgresConfig,
114 ) -> Result<Statement, GeyserPluginError> {
115 Self::build_bulk_token_index_insert_statement_common(
116 client,
117 "spl_token_owner_index",
118 "owner_key",
119 config,
120 )
121 }
122
123 pub fn build_bulk_token_mint_index_insert_statement(
125 client: &mut Client,
126 config: &GeyserPluginPostgresConfig,
127 ) -> Result<Statement, GeyserPluginError> {
128 Self::build_bulk_token_index_insert_statement_common(
129 client,
130 "spl_token_mint_index",
131 "mint_key",
132 config,
133 )
134 }
135
136 fn bulk_insert_token_index_common(
138 batch_size: usize,
139 client: &mut Client,
140 index_entries: &mut Vec<TokenSecondaryIndexEntry>,
141 query: &Statement,
142 ) -> Result<(), GeyserPluginError> {
143 if index_entries.len() == batch_size {
144 let mut measure = Measure::start("geyser-plugin-postgres-prepare-index-values");
145
146 let mut values: Vec<&(dyn types::ToSql + Sync)> =
147 Vec::with_capacity(batch_size * TOKEN_INDEX_COLUMN_COUNT);
148 for index in index_entries.iter().take(batch_size) {
149 values.push(&index.secondary_key);
150 values.push(&index.account_key);
151 values.push(&index.slot);
152 }
153 measure.stop();
154 inc_new_counter_debug!(
155 "geyser-plugin-postgres-prepare-index-values-us",
156 measure.as_us() as usize,
157 10000,
158 10000
159 );
160
161 let mut measure = Measure::start("geyser-plugin-postgres-update-index-account");
162 let result = client.query(query, &values);
163
164 index_entries.clear();
165
166 if let Err(err) = result {
167 let msg = format!(
168 "Failed to persist the update of account to the PostgreSQL database. Error: {:?}",
169 err
170 );
171 error!("{}", msg);
172 return Err(GeyserPluginError::AccountsUpdateError { msg });
173 }
174
175 measure.stop();
176 inc_new_counter_debug!(
177 "geyser-plugin-postgres-update-index-us",
178 measure.as_us() as usize,
179 10000,
180 10000
181 );
182 inc_new_counter_debug!(
183 "geyser-plugin-postgres-update-index-count",
184 batch_size,
185 10000,
186 10000
187 );
188 }
189 Ok(())
190 }
191
192 pub fn bulk_insert_token_owner_index(&mut self) -> Result<(), GeyserPluginError> {
194 let client = self.client.get_mut().unwrap();
195 if client.bulk_insert_token_owner_index_stmt.is_none() {
196 return Ok(());
197 }
198 let query = client.bulk_insert_token_owner_index_stmt.as_ref().unwrap();
199 Self::bulk_insert_token_index_common(
200 self.batch_size,
201 &mut client.client,
202 &mut self.pending_token_owner_index,
203 query,
204 )
205 }
206
207 pub fn bulk_insert_token_mint_index(&mut self) -> Result<(), GeyserPluginError> {
209 let client = self.client.get_mut().unwrap();
210 if client.bulk_insert_token_mint_index_stmt.is_none() {
211 return Ok(());
212 }
213 let query = client.bulk_insert_token_mint_index_stmt.as_ref().unwrap();
214 Self::bulk_insert_token_index_common(
215 self.batch_size,
216 &mut client.client,
217 &mut self.pending_token_mint_index,
218 query,
219 )
220 }
221
222 fn queue_token_owner_index_generic<G: GenericTokenAccount>(
224 &mut self,
225 token_id: &Pubkey,
226 account: &DbAccountInfo,
227 ) {
228 if account.owner() == token_id.as_ref() {
229 if let Some(owner_key) = G::unpack_account_owner(account.data()) {
230 let owner_key = owner_key.as_ref().to_vec();
231 let pubkey = account.pubkey();
232 self.pending_token_owner_index
233 .push(TokenSecondaryIndexEntry {
234 secondary_key: owner_key,
235 account_key: pubkey.to_vec(),
236 slot: account.slot,
237 });
238 }
239 }
240 }
241
242 fn queue_token_mint_index_generic<G: GenericTokenAccount>(
244 &mut self,
245 token_id: &Pubkey,
246 account: &DbAccountInfo,
247 ) {
248 if account.owner() == token_id.as_ref() {
249 if let Some(mint_key) = G::unpack_account_mint(account.data()) {
250 let mint_key = mint_key.as_ref().to_vec();
251 let pubkey = account.pubkey();
252 self.pending_token_mint_index
253 .push(TokenSecondaryIndexEntry {
254 secondary_key: mint_key,
255 account_key: pubkey.to_vec(),
256 slot: account.slot,
257 })
258 }
259 }
260 }
261
262 pub fn queue_secondary_indexes(&mut self, account: &DbAccountInfo) {
264 if self.index_token_owner {
265 self.queue_token_owner_index_generic::<inline_spl_token::Account>(
266 &inline_spl_token::id(),
267 account,
268 );
269 self.queue_token_owner_index_generic::<inline_spl_token_2022::Account>(
270 &inline_spl_token_2022::id(),
271 account,
272 );
273 }
274
275 if self.index_token_mint {
276 self.queue_token_mint_index_generic::<inline_spl_token::Account>(
277 &inline_spl_token::id(),
278 account,
279 );
280 self.queue_token_mint_index_generic::<inline_spl_token_2022::Account>(
281 &inline_spl_token_2022::id(),
282 account,
283 );
284 }
285 }
286
287 fn update_token_owner_index_generic<G: GenericTokenAccount>(
289 client: &mut Client,
290 statement: &Statement,
291 token_id: &Pubkey,
292 account: &DbAccountInfo,
293 ) -> Result<(), GeyserPluginError> {
294 if account.owner() == token_id.as_ref() {
295 if let Some(owner_key) = G::unpack_account_owner(account.data()) {
296 let owner_key = owner_key.as_ref().to_vec();
297 let pubkey = account.pubkey();
298 let slot = account.slot;
299 let result = client.execute(statement, &[&owner_key, &pubkey, &slot]);
300 if let Err(err) = result {
301 let msg = format!(
302 "Failed to update the token owner index to the PostgreSQL database. Error: {:?}",
303 err
304 );
305 error!("{}", msg);
306 return Err(GeyserPluginError::AccountsUpdateError { msg });
307 }
308 }
309 }
310
311 Ok(())
312 }
313
314 fn update_token_mint_index_generic<G: GenericTokenAccount>(
316 client: &mut Client,
317 statement: &Statement,
318 token_id: &Pubkey,
319 account: &DbAccountInfo,
320 ) -> Result<(), GeyserPluginError> {
321 if account.owner() == token_id.as_ref() {
322 if let Some(mint_key) = G::unpack_account_mint(account.data()) {
323 let mint_key = mint_key.as_ref().to_vec();
324 let pubkey = account.pubkey();
325 let slot = account.slot;
326 let result = client.execute(statement, &[&mint_key, &pubkey, &slot]);
327 if let Err(err) = result {
328 let msg = format!(
329 "Failed to update the token mint index to the PostgreSQL database. Error: {:?}",
330 err
331 );
332 error!("{}", msg);
333 return Err(GeyserPluginError::AccountsUpdateError { msg });
334 }
335 }
336 }
337
338 Ok(())
339 }
340
341 pub fn update_token_owner_index(
343 client: &mut Client,
344 statement: &Statement,
345 account: &DbAccountInfo,
346 ) -> Result<(), GeyserPluginError> {
347 Self::update_token_owner_index_generic::<inline_spl_token::Account>(
348 client,
349 statement,
350 &inline_spl_token::id(),
351 account,
352 )?;
353
354 Self::update_token_owner_index_generic::<inline_spl_token_2022::Account>(
355 client,
356 statement,
357 &inline_spl_token_2022::id(),
358 account,
359 )
360 }
361
362 pub fn update_token_mint_index(
364 client: &mut Client,
365 statement: &Statement,
366 account: &DbAccountInfo,
367 ) -> Result<(), GeyserPluginError> {
368 Self::update_token_mint_index_generic::<inline_spl_token::Account>(
369 client,
370 statement,
371 &inline_spl_token::id(),
372 account,
373 )?;
374
375 Self::update_token_mint_index_generic::<inline_spl_token_2022::Account>(
376 client,
377 statement,
378 &inline_spl_token_2022::id(),
379 account,
380 )
381 }
382
383 pub fn clear_buffered_indexes(&mut self) {
387 self.pending_token_owner_index.clear();
388 self.pending_token_mint_index.clear();
389 }
390}