solana_accountsdb_plugin_postgres/
accountsdb_plugin_postgres.rs1use {
3 crate::{
4 accounts_selector::AccountsSelector,
5 postgres_client::{ParallelPostgresClient, PostgresClientBuilder},
6 transaction_selector::TransactionSelector,
7 },
8 bs58,
9 log::*,
10 serde_derive::{Deserialize, Serialize},
11 serde_json,
12 solana_accountsdb_plugin_interface::accountsdb_plugin_interface::{
13 AccountsDbPlugin, AccountsDbPluginError, ReplicaAccountInfoVersions,
14 ReplicaBlockInfoVersions, ReplicaTransactionInfoVersions, Result, SlotStatus,
15 },
16 solana_measure::measure::Measure,
17 solana_metrics::*,
18 std::{fs::File, io::Read},
19 thiserror::Error,
20};
21
22#[derive(Default)]
23pub struct AccountsDbPluginPostgres {
24 client: Option<ParallelPostgresClient>,
25 accounts_selector: Option<AccountsSelector>,
26 transaction_selector: Option<TransactionSelector>,
27}
28
29impl std::fmt::Debug for AccountsDbPluginPostgres {
30 fn fmt(&self, _: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
31 Ok(())
32 }
33}
34
35#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
36pub struct AccountsDbPluginPostgresConfig {
37 pub host: Option<String>,
38 pub user: Option<String>,
39 pub port: Option<u16>,
40 pub connection_str: Option<String>,
41 pub threads: Option<usize>,
42 pub batch_size: Option<usize>,
43 pub panic_on_db_errors: Option<bool>,
44 pub store_account_historical_data: Option<bool>,
46 pub use_ssl: Option<bool>,
47 pub server_ca: Option<String>,
48 pub client_cert: Option<String>,
49 pub client_key: Option<String>,
50}
51
52#[derive(Error, Debug)]
53pub enum AccountsDbPluginPostgresError {
54 #[error("Error connecting to the backend data store. Error message: ({msg})")]
55 DataStoreConnectionError { msg: String },
56
57 #[error("Error preparing data store schema. Error message: ({msg})")]
58 DataSchemaError { msg: String },
59
60 #[error("Error preparing data store schema. Error message: ({msg})")]
61 ConfigurationError { msg: String },
62}
63
64impl AccountsDbPlugin for AccountsDbPluginPostgres {
65 fn name(&self) -> &'static str {
66 "AccountsDbPluginPostgres"
67 }
68
69 fn on_load(&mut self, config_file: &str) -> Result<()> {
129 solana_logger::setup_with_default("info");
130 info!(
131 "Loading plugin {:?} from config_file {:?}",
132 self.name(),
133 config_file
134 );
135 let mut file = File::open(config_file)?;
136 let mut contents = String::new();
137 file.read_to_string(&mut contents)?;
138
139 let result: serde_json::Value = serde_json::from_str(&contents).unwrap();
140 self.accounts_selector = Some(Self::create_accounts_selector_from_config(&result));
141 self.transaction_selector = Some(Self::create_transaction_selector_from_config(&result));
142
143 let result: serde_json::Result<AccountsDbPluginPostgresConfig> =
144 serde_json::from_str(&contents);
145 match result {
146 Err(err) => {
147 return Err(AccountsDbPluginError::ConfigFileReadError {
148 msg: format!(
149 "The config file is not in the JSON format expected: {:?}",
150 err
151 ),
152 })
153 }
154 Ok(config) => {
155 let client = PostgresClientBuilder::build_pararallel_postgres_client(&config)?;
156 self.client = Some(client);
157 }
158 }
159
160 Ok(())
161 }
162
163 fn on_unload(&mut self) {
164 info!("Unloading plugin: {:?}", self.name());
165
166 match &mut self.client {
167 None => {}
168 Some(client) => {
169 client.join().unwrap();
170 }
171 }
172 }
173
174 fn update_account(
175 &mut self,
176 account: ReplicaAccountInfoVersions,
177 slot: u64,
178 is_startup: bool,
179 ) -> Result<()> {
180 let mut measure_all = Measure::start("accountsdb-plugin-postgres-update-account-main");
181 match account {
182 ReplicaAccountInfoVersions::V0_0_1(account) => {
183 let mut measure_select =
184 Measure::start("accountsdb-plugin-postgres-update-account-select");
185 if let Some(accounts_selector) = &self.accounts_selector {
186 if !accounts_selector.is_account_selected(account.pubkey, account.owner) {
187 return Ok(());
188 }
189 } else {
190 return Ok(());
191 }
192 measure_select.stop();
193 inc_new_counter_debug!(
194 "accountsdb-plugin-postgres-update-account-select-us",
195 measure_select.as_us() as usize,
196 100000,
197 100000
198 );
199
200 debug!(
201 "Updating account {:?} with owner {:?} at slot {:?} using account selector {:?}",
202 bs58::encode(account.pubkey).into_string(),
203 bs58::encode(account.owner).into_string(),
204 slot,
205 self.accounts_selector.as_ref().unwrap()
206 );
207
208 match &mut self.client {
209 None => {
210 return Err(AccountsDbPluginError::Custom(Box::new(
211 AccountsDbPluginPostgresError::DataStoreConnectionError {
212 msg: "There is no connection to the PostgreSQL database."
213 .to_string(),
214 },
215 )));
216 }
217 Some(client) => {
218 let mut measure_update =
219 Measure::start("accountsdb-plugin-postgres-update-account-client");
220 let result = { client.update_account(account, slot, is_startup) };
221 measure_update.stop();
222
223 inc_new_counter_debug!(
224 "accountsdb-plugin-postgres-update-account-client-us",
225 measure_update.as_us() as usize,
226 100000,
227 100000
228 );
229
230 if let Err(err) = result {
231 return Err(AccountsDbPluginError::AccountsUpdateError {
232 msg: format!("Failed to persist the update of account to the PostgreSQL database. Error: {:?}", err)
233 });
234 }
235 }
236 }
237 }
238 }
239
240 measure_all.stop();
241
242 inc_new_counter_debug!(
243 "accountsdb-plugin-postgres-update-account-main-us",
244 measure_all.as_us() as usize,
245 100000,
246 100000
247 );
248
249 Ok(())
250 }
251
252 fn update_slot_status(
253 &mut self,
254 slot: u64,
255 parent: Option<u64>,
256 status: SlotStatus,
257 ) -> Result<()> {
258 info!("Updating slot {:?} at with status {:?}", slot, status);
259
260 match &mut self.client {
261 None => {
262 return Err(AccountsDbPluginError::Custom(Box::new(
263 AccountsDbPluginPostgresError::DataStoreConnectionError {
264 msg: "There is no connection to the PostgreSQL database.".to_string(),
265 },
266 )));
267 }
268 Some(client) => {
269 let result = client.update_slot_status(slot, parent, status);
270
271 if let Err(err) = result {
272 return Err(AccountsDbPluginError::SlotStatusUpdateError{
273 msg: format!("Failed to persist the update of slot to the PostgreSQL database. Error: {:?}", err)
274 });
275 }
276 }
277 }
278
279 Ok(())
280 }
281
282 fn notify_end_of_startup(&mut self) -> Result<()> {
283 info!("Notifying the end of startup for accounts notifications");
284 match &mut self.client {
285 None => {
286 return Err(AccountsDbPluginError::Custom(Box::new(
287 AccountsDbPluginPostgresError::DataStoreConnectionError {
288 msg: "There is no connection to the PostgreSQL database.".to_string(),
289 },
290 )));
291 }
292 Some(client) => {
293 let result = client.notify_end_of_startup();
294
295 if let Err(err) = result {
296 return Err(AccountsDbPluginError::SlotStatusUpdateError{
297 msg: format!("Failed to notify the end of startup for accounts notifications. Error: {:?}", err)
298 });
299 }
300 }
301 }
302 Ok(())
303 }
304
305 fn notify_transaction(
306 &mut self,
307 transaction_info: ReplicaTransactionInfoVersions,
308 slot: u64,
309 ) -> Result<()> {
310 match &mut self.client {
311 None => {
312 return Err(AccountsDbPluginError::Custom(Box::new(
313 AccountsDbPluginPostgresError::DataStoreConnectionError {
314 msg: "There is no connection to the PostgreSQL database.".to_string(),
315 },
316 )));
317 }
318 Some(client) => match transaction_info {
319 ReplicaTransactionInfoVersions::V0_0_1(transaction_info) => {
320 if let Some(transaction_selector) = &self.transaction_selector {
321 if !transaction_selector.is_transaction_selected(
322 transaction_info.is_vote,
323 transaction_info.transaction.message().account_keys_iter(),
324 ) {
325 return Ok(());
326 }
327 } else {
328 return Ok(());
329 }
330
331 let result = client.log_transaction_info(transaction_info, slot);
332
333 if let Err(err) = result {
334 return Err(AccountsDbPluginError::SlotStatusUpdateError{
335 msg: format!("Failed to persist the transaction info to the PostgreSQL database. Error: {:?}", err)
336 });
337 }
338 }
339 },
340 }
341
342 Ok(())
343 }
344
345 fn notify_block_metadata(&mut self, block_info: ReplicaBlockInfoVersions) -> Result<()> {
346 match &mut self.client {
347 None => {
348 return Err(AccountsDbPluginError::Custom(Box::new(
349 AccountsDbPluginPostgresError::DataStoreConnectionError {
350 msg: "There is no connection to the PostgreSQL database.".to_string(),
351 },
352 )));
353 }
354 Some(client) => match block_info {
355 ReplicaBlockInfoVersions::V0_0_1(block_info) => {
356 let result = client.update_block_metadata(block_info);
357
358 if let Err(err) = result {
359 return Err(AccountsDbPluginError::SlotStatusUpdateError{
360 msg: format!("Failed to persist the update of block metadata to the PostgreSQL database. Error: {:?}", err)
361 });
362 }
363 }
364 },
365 }
366
367 Ok(())
368 }
369
370 fn account_data_notifications_enabled(&self) -> bool {
374 self.accounts_selector
375 .as_ref()
376 .map_or_else(|| false, |selector| selector.is_enabled())
377 }
378
379 fn transaction_notifications_enabled(&self) -> bool {
381 self.transaction_selector
382 .as_ref()
383 .map_or_else(|| false, |selector| selector.is_enabled())
384 }
385}
386
387impl AccountsDbPluginPostgres {
388 fn create_accounts_selector_from_config(config: &serde_json::Value) -> AccountsSelector {
389 let accounts_selector = &config["accounts_selector"];
390
391 if accounts_selector.is_null() {
392 AccountsSelector::default()
393 } else {
394 let accounts = &accounts_selector["accounts"];
395 let accounts: Vec<String> = if accounts.is_array() {
396 accounts
397 .as_array()
398 .unwrap()
399 .iter()
400 .map(|val| val.as_str().unwrap().to_string())
401 .collect()
402 } else {
403 Vec::default()
404 };
405 let owners = &accounts_selector["owners"];
406 let owners: Vec<String> = if owners.is_array() {
407 owners
408 .as_array()
409 .unwrap()
410 .iter()
411 .map(|val| val.as_str().unwrap().to_string())
412 .collect()
413 } else {
414 Vec::default()
415 };
416 AccountsSelector::new(&accounts, &owners)
417 }
418 }
419
420 fn create_transaction_selector_from_config(config: &serde_json::Value) -> TransactionSelector {
421 let transaction_selector = &config["transaction_selector"];
422
423 if transaction_selector.is_null() {
424 TransactionSelector::default()
425 } else {
426 let accounts = &transaction_selector["mentions"];
427 let accounts: Vec<String> = if accounts.is_array() {
428 accounts
429 .as_array()
430 .unwrap()
431 .iter()
432 .map(|val| val.as_str().unwrap().to_string())
433 .collect()
434 } else {
435 Vec::default()
436 };
437 TransactionSelector::new(&accounts)
438 }
439 }
440
441 pub fn new() -> Self {
442 Self::default()
443 }
444}
445
446#[no_mangle]
447#[allow(improper_ctypes_definitions)]
448pub unsafe extern "C" fn _create_plugin() -> *mut dyn AccountsDbPlugin {
452 let plugin = AccountsDbPluginPostgres::new();
453 let plugin: Box<dyn AccountsDbPlugin> = Box::new(plugin);
454 Box::into_raw(plugin)
455}
456
457#[cfg(test)]
458pub(crate) mod tests {
459 use {super::*, serde_json};
460
461 #[test]
462 fn test_accounts_selector_from_config() {
463 let config = "{\"accounts_selector\" : { \
464 \"owners\" : [\"9xQeWvG816bUx9EPjHmaT23yvVM2ZWbrrpZb9PusVFin\"] \
465 }}";
466
467 let config: serde_json::Value = serde_json::from_str(config).unwrap();
468 AccountsDbPluginPostgres::create_accounts_selector_from_config(&config);
469 }
470}