photon_indexer/common/
mod.rs1use core::fmt;
2use std::{env, net::UdpSocket, path::PathBuf, sync::Arc, thread::sleep, time::Duration};
3
4use cadence::{BufferedUdpMetricSink, QueuingMetricSink, StatsdClient};
5use cadence_macros::set_global_default;
6use clap::{Parser, ValueEnum};
7use sea_orm::{DatabaseConnection, SqlxPostgresConnector};
8use solana_client::{nonblocking::rpc_client::RpcClient, rpc_config::RpcBlockConfig};
9use solana_sdk::commitment_config::CommitmentConfig;
10use solana_transaction_status::{TransactionDetails, UiTransactionEncoding};
11use sqlx::{
12 postgres::{PgConnectOptions, PgPoolOptions},
13 PgPool,
14};
15pub mod typedefs;
16
17pub fn relative_project_path(path: &str) -> PathBuf {
18 PathBuf::from(env!("CARGO_MANIFEST_DIR")).join(path)
19}
20
21#[macro_export]
22macro_rules! metric {
23 {$($block:stmt;)*} => {
24 use cadence_macros::is_global_default_set;
25 if is_global_default_set() {
26 $(
27 $block
28 )*
29 }
30 };
31}
32
33pub fn setup_metrics(metrics_endpoint: Option<String>) {
34 if let Some(metrics_endpoint) = metrics_endpoint {
35 let env = env::var("ENV").unwrap_or("dev".to_string());
36 let socket = UdpSocket::bind("0.0.0.0:0").unwrap();
37 socket.set_nonblocking(true).unwrap();
38 let (host, port) = {
39 let mut iter = metrics_endpoint.split(":");
40 (iter.next().unwrap(), iter.next().unwrap())
41 };
42 let port = port.parse::<u16>().unwrap();
43 let udp_sink = BufferedUdpMetricSink::from((host, port), socket).unwrap();
44 let queuing_sink = QueuingMetricSink::from(udp_sink);
45 let builder = StatsdClient::builder("photon", queuing_sink);
46 let client = builder.with_tag("env", env).build();
47 set_global_default(client);
48 }
49}
50
51pub async fn get_genesis_hash_with_infinite_retry(rpc_client: &RpcClient) -> String {
52 loop {
53 match rpc_client.get_genesis_hash().await {
54 Ok(genesis_hash) => return genesis_hash.to_string(),
55 Err(e) => {
56 log::error!("Failed to fetch genesis hash: {}", e);
57 sleep(Duration::from_secs(5));
58 }
59 }
60 }
61}
62
63pub async fn fetch_block_parent_slot(rpc_client: &RpcClient, slot: u64) -> u64 {
64 rpc_client
65 .get_block_with_config(
66 slot,
67 RpcBlockConfig {
68 encoding: Some(UiTransactionEncoding::Base64),
69 transaction_details: Some(TransactionDetails::None),
70 rewards: None,
71 commitment: Some(CommitmentConfig::confirmed()),
72 max_supported_transaction_version: Some(0),
73 },
74 )
75 .await
76 .unwrap()
77 .parent_slot
78}
79
80pub async fn get_network_start_slot(rpc_client: &RpcClient) -> u64 {
81 let genesis_hash = get_genesis_hash_with_infinite_retry(rpc_client).await;
82 match genesis_hash.as_str() {
83 "EtWTRABZaYq6iMfeYKouRu166VU2xqa1wcaWoxPkrZBG" => 319998226 - 1,
85 "5eykt4UsFv8P8NJdTREpY1vzqKqZKvdpKuc147dw2N9d" => 286193746 - 1,
87 _ => 0,
88 }
89}
90
91#[derive(Parser, Debug, Clone, ValueEnum)]
92pub enum LoggingFormat {
93 Standard,
94 Json,
95}
96
97impl fmt::Display for LoggingFormat {
98 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
99 match self {
100 LoggingFormat::Standard => write!(f, "standard"),
101 LoggingFormat::Json => write!(f, "json"),
102 }
103 }
104}
105
106pub fn setup_logging(logging_format: LoggingFormat) {
107 let env_filter = env::var("RUST_LOG")
108 .unwrap_or("info,sqlx=error,sea_orm_migration=error,jsonrpsee_server=warn".to_string());
109 let subscriber = tracing_subscriber::fmt().with_env_filter(env_filter);
110 match logging_format {
111 LoggingFormat::Standard => subscriber.init(),
112 LoggingFormat::Json => subscriber.json().init(),
113 }
114}
115
116pub async fn setup_pg_pool(database_url: &str, max_connections: u32) -> PgPool {
117 let options: PgConnectOptions = database_url.parse().unwrap();
118 PgPoolOptions::new()
119 .max_connections(max_connections)
120 .connect_with(options)
121 .await
122 .unwrap()
123}
124
125pub async fn setup_pg_connection(database_url: &str, max_connections: u32) -> DatabaseConnection {
126 SqlxPostgresConnector::from_sqlx_postgres_pool(
127 setup_pg_pool(database_url, max_connections).await,
128 )
129}
130
131pub async fn fetch_current_slot_with_infinite_retry(client: &RpcClient) -> u64 {
132 loop {
133 match client.get_slot().await {
134 Ok(slot) => {
135 return slot;
136 }
137 Err(e) => {
138 log::error!("Failed to fetch current slot: {}", e);
139 sleep(Duration::from_secs(5));
140 }
141 }
142 }
143}
144
145pub fn get_rpc_client(rpc_url: &str) -> Arc<RpcClient> {
146 Arc::new(RpcClient::new_with_timeout_and_commitment(
147 rpc_url.to_string(),
148 Duration::from_secs(90),
149 CommitmentConfig::confirmed(),
150 ))
151}