use super::super::lut_owner::LutOwner;
use super::super::gateway::Gateway;
use super::super::crossbar::CrossbarClient;
use crate::on_demand::accounts::QueueAccountData;
use crate::RpcClient;
use crate::Pubkey;
use anyhow::{anyhow, Context};
use anyhow::Error as AnyhowError;
use futures::future::join_all;
impl LutOwner for QueueAccountData {
fn lut_slot(&self) -> u64 {
self.lut_slot
}
}
impl QueueAccountData {
pub async fn load(client: &RpcClient, key: &Pubkey) -> Result<QueueAccountData, AnyhowError> {
let key_anchor: Pubkey = key.to_bytes().into();
let account = client
.get_account_data(&key_anchor)
.await
.map_err(|_| anyhow!("QueueAccountData.load: Account not found"))?;
let buf = account[8..].to_vec();
let parsed: &QueueAccountData = bytemuck::try_from_bytes(&buf)
.map_err(|e| anyhow!("Failed to parse QueueAccountData: {:?}", e))?;
Ok(*parsed)
}
pub async fn fetch_gateways(&self, client: &RpcClient) -> Result<Vec<Gateway>, AnyhowError> {
let oracles = self.fetch_oracles(client).await?;
let gateways = oracles
.into_iter()
.map(|x| x.1)
.filter_map(|x| x.gateway_uri())
.map(Gateway::new)
.collect::<Vec<_>>();
let mut test_futures = Vec::new();
for gateway in gateways.iter() {
test_futures.push(gateway.test_gateway());
}
let results = join_all(test_futures).await;
let mut good_gws = Vec::new();
for (i, is_good) in results.into_iter().enumerate() {
if is_good {
good_gws.push(gateways[i].clone());
}
}
Ok(good_gws)
}
pub async fn fetch_gateway_from_crossbar(&self, crossbar: &CrossbarClient) -> Result<Gateway, AnyhowError> {
let network = "mainnet";
let gateways = crossbar.fetch_gateways(network).await
.context("Failed to fetch gateways from crossbar")?;
let gateway_url = gateways
.first()
.ok_or_else(|| anyhow!("No gateways available for network: {}", network))?;
Ok(Gateway::new(gateway_url.clone()))
}
}
pub struct Queue {
pub pubkey: Pubkey,
pub client: RpcClient,
}
impl Queue {
pub const DEFAULT_DEVNET_KEY: &'static str = "EYiAmGSdsQTuCw413V5BzaruWuCCSDgTPtBGvLkXHbe7";
pub const DEFAULT_MAINNET_KEY: &'static str = "A43DyUGA7s8eXPxqEjJY6EBu1KKbNgfxF8h17VAHn13w";
pub fn new(client: RpcClient, pubkey: Pubkey) -> Self {
Self { pubkey, client }
}
pub fn default_mainnet(client: RpcClient) -> Result<Self, AnyhowError> {
let pubkey_str = Self::DEFAULT_MAINNET_KEY;
let bytes: [u8; 32] = bs58::decode(pubkey_str)
.into_vec()
.map_err(|e| anyhow!("Failed to decode mainnet queue key: {}", e))?
.try_into()
.map_err(|_| anyhow!("Invalid mainnet queue key length"))?;
let pubkey = Pubkey::new_from_array(bytes);
Ok(Self::new(client, pubkey))
}
pub fn default_devnet(client: RpcClient) -> Result<Self, AnyhowError> {
let pubkey_str = Self::DEFAULT_DEVNET_KEY;
let bytes: [u8; 32] = bs58::decode(pubkey_str)
.into_vec()
.map_err(|e| anyhow!("Failed to decode devnet queue key: {}", e))?
.try_into()
.map_err(|_| anyhow!("Invalid devnet queue key length"))?;
let pubkey = Pubkey::new_from_array(bytes);
Ok(Self::new(client, pubkey))
}
pub async fn load_data(&self) -> Result<QueueAccountData, AnyhowError> {
QueueAccountData::load(&self.client, &self.pubkey).await
}
pub async fn fetch_gateway_from_crossbar(&self, crossbar: &CrossbarClient) -> Result<Gateway, AnyhowError> {
let mut network = "mainnet";
let mainnet_client = RpcClient::new(self.client.url());
let mainnet_queue = Queue::default_mainnet(mainnet_client)?;
if mainnet_queue.load_data().await.is_err() {
network = "devnet";
}
let gateways = crossbar.fetch_gateways(network).await
.context("Failed to fetch gateways from crossbar")?;
let gateway_url = gateways
.first()
.ok_or_else(|| anyhow!("No gateways available for network: {}", network))?;
Ok(Gateway::new(gateway_url.clone()))
}
}