photon_indexer/common/
mod.rs

1use core::fmt;
2use std::{env, net::UdpSocket, path::PathBuf, sync::Arc, thread::sleep, time::Duration};
3
4use cadence::{BufferedUdpMetricSink, QueuingMetricSink, StatsdClient};
5use cadence_macros::set_global_default;
6use clap::{Parser, ValueEnum};
7use sea_orm::{DatabaseConnection, SqlxPostgresConnector};
8use solana_client::{nonblocking::rpc_client::RpcClient, rpc_config::RpcBlockConfig};
9use solana_sdk::commitment_config::CommitmentConfig;
10use solana_transaction_status::{TransactionDetails, UiTransactionEncoding};
11use sqlx::{
12    postgres::{PgConnectOptions, PgPoolOptions},
13    PgPool,
14};
15pub mod typedefs;
16
17pub fn relative_project_path(path: &str) -> PathBuf {
18    PathBuf::from(env!("CARGO_MANIFEST_DIR")).join(path)
19}
20
21#[macro_export]
22macro_rules! metric {
23    {$($block:stmt;)*} => {
24        use cadence_macros::is_global_default_set;
25        if is_global_default_set() {
26            $(
27                $block
28            )*
29        }
30    };
31}
32
33pub fn setup_metrics(metrics_endpoint: Option<String>) {
34    if let Some(metrics_endpoint) = metrics_endpoint {
35        let env = env::var("ENV").unwrap_or("dev".to_string());
36        let socket = UdpSocket::bind("0.0.0.0:0").unwrap();
37        socket.set_nonblocking(true).unwrap();
38        let (host, port) = {
39            let mut iter = metrics_endpoint.split(":");
40            (iter.next().unwrap(), iter.next().unwrap())
41        };
42        let port = port.parse::<u16>().unwrap();
43        let udp_sink = BufferedUdpMetricSink::from((host, port), socket).unwrap();
44        let queuing_sink = QueuingMetricSink::from(udp_sink);
45        let builder = StatsdClient::builder("photon", queuing_sink);
46        let client = builder.with_tag("env", env).build();
47        set_global_default(client);
48    }
49}
50
51pub async fn get_genesis_hash_with_infinite_retry(rpc_client: &RpcClient) -> String {
52    loop {
53        match rpc_client.get_genesis_hash().await {
54            Ok(genesis_hash) => return genesis_hash.to_string(),
55            Err(e) => {
56                log::error!("Failed to fetch genesis hash: {}", e);
57                sleep(Duration::from_secs(5));
58            }
59        }
60    }
61}
62
63pub async fn fetch_block_parent_slot(rpc_client: &RpcClient, slot: u64) -> u64 {
64    rpc_client
65        .get_block_with_config(
66            slot,
67            RpcBlockConfig {
68                encoding: Some(UiTransactionEncoding::Base64),
69                transaction_details: Some(TransactionDetails::None),
70                rewards: None,
71                commitment: Some(CommitmentConfig::confirmed()),
72                max_supported_transaction_version: Some(0),
73            },
74        )
75        .await
76        .unwrap()
77        .parent_slot
78}
79
80pub async fn get_network_start_slot(rpc_client: &RpcClient) -> u64 {
81    let genesis_hash = get_genesis_hash_with_infinite_retry(rpc_client).await;
82    match genesis_hash.as_str() {
83        // Devnet
84        "EtWTRABZaYq6iMfeYKouRu166VU2xqa1wcaWoxPkrZBG" => 319998226 - 1,
85        // Mainnet
86        "5eykt4UsFv8P8NJdTREpY1vzqKqZKvdpKuc147dw2N9d" => 286193746 - 1,
87        _ => 0,
88    }
89}
90
91#[derive(Parser, Debug, Clone, ValueEnum)]
92pub enum LoggingFormat {
93    Standard,
94    Json,
95}
96
97impl fmt::Display for LoggingFormat {
98    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
99        match self {
100            LoggingFormat::Standard => write!(f, "standard"),
101            LoggingFormat::Json => write!(f, "json"),
102        }
103    }
104}
105
106pub fn setup_logging(logging_format: LoggingFormat) {
107    let env_filter = env::var("RUST_LOG")
108        .unwrap_or("info,sqlx=error,sea_orm_migration=error,jsonrpsee_server=warn".to_string());
109    let subscriber = tracing_subscriber::fmt().with_env_filter(env_filter);
110    match logging_format {
111        LoggingFormat::Standard => subscriber.init(),
112        LoggingFormat::Json => subscriber.json().init(),
113    }
114}
115
116pub async fn setup_pg_pool(database_url: &str, max_connections: u32) -> PgPool {
117    let options: PgConnectOptions = database_url.parse().unwrap();
118    PgPoolOptions::new()
119        .max_connections(max_connections)
120        .connect_with(options)
121        .await
122        .unwrap()
123}
124
125pub async fn setup_pg_connection(database_url: &str, max_connections: u32) -> DatabaseConnection {
126    SqlxPostgresConnector::from_sqlx_postgres_pool(
127        setup_pg_pool(database_url, max_connections).await,
128    )
129}
130
131pub async fn fetch_current_slot_with_infinite_retry(client: &RpcClient) -> u64 {
132    loop {
133        match client.get_slot().await {
134            Ok(slot) => {
135                return slot;
136            }
137            Err(e) => {
138                log::error!("Failed to fetch current slot: {}", e);
139                sleep(Duration::from_secs(5));
140            }
141        }
142    }
143}
144
145pub fn get_rpc_client(rpc_url: &str) -> Arc<RpcClient> {
146    Arc::new(RpcClient::new_with_timeout_and_commitment(
147        rpc_url.to_string(),
148        Duration::from_secs(90),
149        CommitmentConfig::confirmed(),
150    ))
151}