use crate::swqos::common::{poll_transaction_confirmation, serialize_transaction_and_encode};
use rand::seq::IndexedRandom;
use reqwest::Client;
use serde_json::json;
use std::{sync::Arc, time::Instant};
use std::time::Duration;
use solana_transaction_status::UiTransactionEncoding;
use anyhow::Result;
use solana_sdk::transaction::VersionedTransaction;
use crate::swqos::{SwqosType, TradeType};
use crate::swqos::SwqosClientTrait;
use crate::{common::SolanaRpcClient, constants::swqos::ASTRALANE_TIP_ACCOUNTS};
use tokio::task::JoinHandle;
use std::sync::atomic::{AtomicBool, Ordering};
#[derive(Clone)]
pub struct AstralaneClient {
pub endpoint: String,
pub auth_token: String,
pub rpc_client: Arc<SolanaRpcClient>,
pub http_client: Client,
pub ping_handle: Arc<tokio::sync::Mutex<Option<JoinHandle<()>>>>,
pub stop_ping: Arc<AtomicBool>,
}
#[async_trait::async_trait]
impl SwqosClientTrait for AstralaneClient {
async fn send_transaction(&self, trade_type: TradeType, transaction: &VersionedTransaction, wait_confirmation: bool) -> Result<()> {
self.send_transaction(trade_type, transaction, wait_confirmation).await
}
async fn send_transactions(&self, trade_type: TradeType, transactions: &Vec<VersionedTransaction>, wait_confirmation: bool) -> Result<()> {
self.send_transactions(trade_type, transactions, wait_confirmation).await
}
fn get_tip_account(&self) -> Result<String> {
let tip_account = *ASTRALANE_TIP_ACCOUNTS.choose(&mut rand::rng()).or_else(|| ASTRALANE_TIP_ACCOUNTS.first()).unwrap();
Ok(tip_account.to_string())
}
fn get_swqos_type(&self) -> SwqosType {
SwqosType::Astralane
}
}
impl AstralaneClient {
pub fn new(rpc_url: String, endpoint: String, auth_token: String) -> Self {
let rpc_client = SolanaRpcClient::new(rpc_url);
let http_client = Client::builder()
.pool_idle_timeout(Duration::from_secs(300))
.pool_max_idle_per_host(4)
.tcp_keepalive(Some(Duration::from_secs(60))) .tcp_nodelay(true) .http2_keep_alive_interval(Duration::from_secs(10))
.http2_keep_alive_timeout(Duration::from_secs(5))
.http2_adaptive_window(true) .timeout(Duration::from_millis(3000)) .connect_timeout(Duration::from_millis(2000)) .build()
.unwrap();
let client = Self {
rpc_client: Arc::new(rpc_client),
endpoint,
auth_token,
http_client,
ping_handle: Arc::new(tokio::sync::Mutex::new(None)),
stop_ping: Arc::new(AtomicBool::new(false)),
};
let client_clone = client.clone();
tokio::spawn(async move {
client_clone.start_ping_task().await;
});
client
}
async fn start_ping_task(&self) {
let endpoint = self.endpoint.clone();
let auth_token = self.auth_token.clone();
let http_client = self.http_client.clone();
let stop_ping = self.stop_ping.clone();
let handle = tokio::spawn(async move {
if let Err(e) = Self::send_ping_request(&http_client, &endpoint, &auth_token).await {
eprintln!("Astralane ping request failed: {}", e);
}
let mut interval = tokio::time::interval(Duration::from_secs(30));
loop {
interval.tick().await;
if stop_ping.load(Ordering::Relaxed) {
break;
}
if let Err(e) = Self::send_ping_request(&http_client, &endpoint, &auth_token).await {
eprintln!("Astralane ping request failed: {}", e);
}
}
});
{
let mut ping_guard = self.ping_handle.lock().await;
if let Some(old_handle) = ping_guard.as_ref() {
old_handle.abort();
}
*ping_guard = Some(handle);
}
}
async fn send_ping_request(http_client: &Client, endpoint: &str, auth_token: &str) -> Result<()> {
let ping_url = if endpoint.ends_with("/iris") {
endpoint.replace("/iris", "/gethealth")
} else if endpoint.ends_with("/iris/") {
endpoint.replace("/iris/", "/gethealth")
} else if endpoint.ends_with('/') {
format!("{}gethealth", endpoint)
} else {
format!("{}/gethealth", endpoint)
};
let response = http_client.get(&ping_url)
.header("api_key", auth_token)
.timeout(Duration::from_millis(1500))
.send()
.await?;
let status = response.status();
let _ = response.bytes().await;
if !status.is_success() {
eprintln!("Astralane ping request returned non-success status: {}", status);
}
Ok(())
}
pub async fn send_transaction(&self, trade_type: TradeType, transaction: &VersionedTransaction, wait_confirmation: bool) -> Result<()> {
let start_time = Instant::now();
let (content, signature) = serialize_transaction_and_encode(transaction, UiTransactionEncoding::Base64)?;
let request_body = serde_json::to_string(&json!({
"jsonrpc": "2.0",
"id": 1,
"method": "sendTransaction",
"params": [
content,
{ "encoding": "base64", "skipPreflight": true },
{ "mevProtect": false }
]
}))?;
let response_text = self.http_client.post(&self.endpoint)
.body(request_body)
.header("Content-Type", "application/json")
.header("api_key", &self.auth_token)
.send()
.await?
.text()
.await?;
if let Ok(response_json) = serde_json::from_str::<serde_json::Value>(&response_text) {
if response_json.get("result").is_some() {
println!(" [astralane] {} submitted: {:?}", trade_type, start_time.elapsed());
} else if let Some(_error) = response_json.get("error") {
eprintln!(" [astralane] {} submission failed: {:?}", trade_type, _error);
}
} else {
eprintln!(" [astralane] {} submission failed: {:?}", trade_type, response_text);
}
let start_time: Instant = Instant::now();
match poll_transaction_confirmation(&self.rpc_client, signature, wait_confirmation).await {
Ok(_) => (),
Err(e) => {
println!(" signature: {:?}", signature);
println!(" [astralane] {} confirmation failed: {:?}", trade_type, start_time.elapsed());
return Err(e);
},
}
if wait_confirmation {
println!(" signature: {:?}", signature);
println!(" [astralane] {} confirmed: {:?}", trade_type, start_time.elapsed());
}
Ok(())
}
pub async fn send_transactions(&self, trade_type: TradeType, transactions: &Vec<VersionedTransaction>, wait_confirmation: bool) -> Result<()> {
for transaction in transactions {
self.send_transaction(trade_type, transaction, wait_confirmation).await?;
}
Ok(())
}
}
impl Drop for AstralaneClient {
fn drop(&mut self) {
self.stop_ping.store(true, Ordering::Relaxed);
let ping_handle = self.ping_handle.clone();
tokio::spawn(async move {
let mut ping_guard = ping_handle.lock().await;
if let Some(handle) = ping_guard.as_ref() {
handle.abort();
}
*ping_guard = None;
});
}
}