use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use std::time::{Duration, Instant};
use phoenix_eternal_types::discriminant::accounts::TRADER as TRADER_DISCRIMINANT;
use phoenix_eternal_types::{program_ids, GlobalTraderIndexTree};
use solana_account_decoder_client_types::{UiAccountEncoding, UiDataSliceConfig};
use solana_commitment_config::CommitmentConfig;
use solana_pubkey::Pubkey as PhoenixPubkey;
use solana_rpc_client::nonblocking::rpc_client::RpcClient;
use solana_rpc_client_types::config::RpcAccountInfoConfig;
use tokio::sync::{Notify, RwLock};
use tracing::warn;
const REFRESH_MIN_INTERVAL: Duration = Duration::from_secs(10);
const GTI_HEADER_SIZE: usize = 48;
const NUM_ARENAS_OFFSET: usize = GTI_HEADER_SIZE + 4;
const TRADER_AUTHORITY_OFFSET: usize = 56;
const TRADER_AUTHORITY_END: usize = TRADER_AUTHORITY_OFFSET + 32;
const RPC_BATCH_SIZE: usize = 100;
pub struct GtiCache {
authorities: HashMap<u32, PhoenixPubkey>,
pda_to_authority: HashMap<PhoenixPubkey, PhoenixPubkey>,
loaded_at: Instant,
}
impl GtiCache {
pub fn resolve(&self, addr: u32) -> Option<PhoenixPubkey> {
if addr == 0 {
return None;
}
self.authorities.get(&addr).copied()
}
pub fn resolve_pda(&self, pda: &PhoenixPubkey) -> Option<PhoenixPubkey> {
self.pda_to_authority.get(pda).copied()
}
fn is_fresh_enough(&self) -> bool {
self.loaded_at.elapsed() < REFRESH_MIN_INTERVAL
}
}
async fn fetch_gti_buffers(client: &RpcClient) -> Result<Vec<Vec<u8>>, String> {
let (header_key, _) = program_ids::get_global_trader_index_address_default(0);
let header_account = client
.get_account(&header_key)
.await
.map_err(|e| format!("fetch GTI header: {e}"))?;
if header_account.data.len() < NUM_ARENAS_OFFSET + 2 {
return Err("GTI header account too small for superblock".to_string());
}
let num_arenas = u16::from_le_bytes([
header_account.data[NUM_ARENAS_OFFSET],
header_account.data[NUM_ARENAS_OFFSET + 1],
]);
let mut buffers: Vec<Vec<u8>> = Vec::with_capacity(num_arenas.max(1) as usize);
buffers.push(header_account.data);
for i in 1..num_arenas {
let (arena_key, _) = program_ids::get_global_trader_index_address_default(i);
match client.get_account(&arena_key).await {
Ok(acc) => buffers.push(acc.data),
Err(e) => {
warn!(arena = i, error = %e, "GTI arena fetch failed; truncating cache");
break;
}
}
}
Ok(buffers)
}
fn collect_tree_pairs(buffers: &[Vec<u8>]) -> Vec<(u32, PhoenixPubkey)> {
std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
let tree = GlobalTraderIndexTree::load_from_buffers(buffers.iter().map(|b| b.as_slice()));
let mut pairs: Vec<(u32, PhoenixPubkey)> = Vec::with_capacity(tree.tree.len());
for (pda, _state) in tree.tree.iter() {
let addr = tree.tree.get_addr(pda);
if addr != 0 {
pairs.push((addr, *pda));
}
}
pairs
}))
.unwrap_or_default()
}
async fn fetch_authorities(
client: &RpcClient,
pairs: &[(u32, PhoenixPubkey)],
) -> (
HashMap<u32, PhoenixPubkey>,
HashMap<PhoenixPubkey, PhoenixPubkey>,
) {
let mut seen_addrs: HashSet<u32> = HashSet::with_capacity(pairs.len());
let mut seen_pdas: HashSet<[u8; 32]> = HashSet::with_capacity(pairs.len());
let deduped: Vec<(u32, PhoenixPubkey)> = pairs
.iter()
.filter(|(addr, pda)| seen_addrs.insert(*addr) && seen_pdas.insert(pda.to_bytes()))
.copied()
.collect();
if deduped.len() != pairs.len() {
warn!(
dropped = pairs.len() - deduped.len(),
"GTI tree iter yielded duplicate entries; deduped before RPC"
);
}
let mut out: HashMap<u32, PhoenixPubkey> = HashMap::with_capacity(deduped.len());
let mut pda_out: HashMap<PhoenixPubkey, PhoenixPubkey> = HashMap::with_capacity(deduped.len());
for chunk in deduped.chunks(RPC_BATCH_SIZE) {
let pairs_aligned: Vec<(u32, PhoenixPubkey)> = chunk.to_vec();
let pks: Vec<PhoenixPubkey> = pairs_aligned.iter().map(|(_, p)| *p).collect();
let cfg = RpcAccountInfoConfig {
encoding: Some(UiAccountEncoding::Base64),
commitment: Some(CommitmentConfig::processed()),
data_slice: Some(UiDataSliceConfig {
offset: 0,
length: TRADER_AUTHORITY_END,
}),
min_context_slot: None,
};
let resp = match client.get_multiple_accounts_with_config(&pks, cfg).await {
Ok(r) => r,
Err(e) => {
warn!(error = %e, "getMultipleAccounts failed for trader batch; skipping");
continue;
}
};
let accounts = resp.value;
for ((addr, pda), acc) in pairs_aligned.iter().zip(accounts) {
let Some(acc) = acc else { continue };
let data = acc.data.as_slice();
if data.len() < TRADER_AUTHORITY_END {
continue;
}
let disc = u64::from_le_bytes(data[..8].try_into().unwrap_or([0u8; 8]));
if disc != *TRADER_DISCRIMINANT {
warn!(
addr = *addr,
disc = disc,
"trader account discriminant mismatch; skipping"
);
continue;
}
let mut bytes = [0u8; 32];
bytes.copy_from_slice(&data[TRADER_AUTHORITY_OFFSET..TRADER_AUTHORITY_END]);
let authority = PhoenixPubkey::from(bytes);
out.insert(*addr, authority);
pda_out.insert(*pda, authority);
}
}
(out, pda_out)
}
async fn fetch_cache(rpc_url: String) -> Result<GtiCache, String> {
let client = RpcClient::new_with_commitment(rpc_url, CommitmentConfig::processed());
let buffers = fetch_gti_buffers(&client).await?;
let pairs = collect_tree_pairs(&buffers);
let (authorities, pda_to_authority) = fetch_authorities(&client, &pairs).await;
Ok(GtiCache {
authorities,
pda_to_authority,
loaded_at: Instant::now(),
})
}
pub type GtiHandle = Arc<RwLock<Option<GtiCache>>>;
pub fn spawn_gti_loader<F>(
cache: GtiHandle,
refresh: Arc<Notify>,
rpc_url_fn: F,
) -> tokio::task::JoinHandle<()>
where
F: Fn() -> String + Send + 'static,
{
tokio::spawn(async move {
refresh.notify_one();
loop {
refresh.notified().await;
if let Some(existing) = cache.read().await.as_ref() {
if existing.is_fresh_enough() {
continue;
}
}
let url = rpc_url_fn();
match fetch_cache(url).await {
Ok(new_cache) => {
*cache.write().await = Some(new_cache);
}
Err(e) => {
warn!(error = %e, "GTI refresh failed; will retry on next notify");
}
}
}
})
}
#[cfg(test)]
mod tests {
use super::*;
fn make_cache(entries: &[(u32, [u8; 32])], age: Duration) -> GtiCache {
let mut authorities = HashMap::new();
for (addr, bytes) in entries {
authorities.insert(*addr, PhoenixPubkey::from(*bytes));
}
GtiCache {
authorities,
pda_to_authority: HashMap::new(),
loaded_at: Instant::now()
.checked_sub(age)
.expect("test age fits in Instant arithmetic"),
}
}
#[test]
fn resolve_returns_cached_authority() {
let key = [7u8; 32];
let cache = make_cache(&[(42, key)], Duration::from_secs(0));
assert_eq!(cache.resolve(42), Some(PhoenixPubkey::from(key)));
}
#[test]
fn resolve_returns_none_for_missing_addr() {
let cache = make_cache(&[(1, [0u8; 32])], Duration::from_secs(0));
assert_eq!(cache.resolve(999), None);
}
#[test]
fn resolve_treats_zero_as_sentinel_null() {
let cache = make_cache(&[(0, [0u8; 32])], Duration::from_secs(0));
assert_eq!(cache.resolve(0), None);
}
#[test]
fn is_fresh_enough_is_true_for_recent_load() {
let cache = make_cache(&[], Duration::from_secs(0));
assert!(cache.is_fresh_enough());
}
#[test]
fn is_fresh_enough_is_false_after_refresh_interval() {
let cache = make_cache(&[], REFRESH_MIN_INTERVAL + Duration::from_secs(1));
assert!(!cache.is_fresh_enough());
}
}