use crate::swqos::common::{
default_http_client_builder, poll_transaction_confirmation,
};
use rand::seq::IndexedRandom;
use reqwest::Client;
use std::{sync::Arc, time::Instant, time::Duration};
use std::sync::atomic::{AtomicBool, Ordering};
use tokio::task::JoinHandle;
use bincode;
use crate::swqos::SwqosClientTrait;
use crate::swqos::{SwqosType, TradeType};
use anyhow::Result;
use solana_sdk::transaction::VersionedTransaction;
use crate::{common::SolanaRpcClient, constants::swqos::ZEROSLOT_TIP_ACCOUNTS};
#[derive(Clone)]
pub struct ZeroSlotClient {
pub endpoint: String,
pub auth_token: String,
pub rpc_client: Arc<SolanaRpcClient>,
pub http_client: Client,
pub ping_handle: Arc<tokio::sync::Mutex<Option<JoinHandle<()>>>>,
pub stop_ping: Arc<AtomicBool>,
}
#[async_trait::async_trait]
impl SwqosClientTrait for ZeroSlotClient {
async fn send_transaction(
&self,
trade_type: TradeType,
transaction: &VersionedTransaction,
wait_confirmation: bool,
) -> Result<()> {
self.send_transaction(trade_type, transaction, wait_confirmation).await
}
async fn send_transactions(
&self,
trade_type: TradeType,
transactions: &Vec<VersionedTransaction>,
wait_confirmation: bool,
) -> Result<()> {
self.send_transactions(trade_type, transactions, wait_confirmation).await
}
fn get_tip_account(&self) -> Result<String> {
let tip_account = *ZEROSLOT_TIP_ACCOUNTS
.choose(&mut rand::rng())
.or_else(|| ZEROSLOT_TIP_ACCOUNTS.first())
.unwrap();
Ok(tip_account.to_string())
}
fn get_swqos_type(&self) -> SwqosType {
SwqosType::ZeroSlot
}
}
impl ZeroSlotClient {
pub fn new(rpc_url: String, endpoint: String, auth_token: String) -> Self {
let rpc_client = SolanaRpcClient::new(rpc_url);
let http_client = default_http_client_builder().build().unwrap();
let client = Self {
rpc_client: Arc::new(rpc_client),
endpoint,
auth_token,
http_client,
ping_handle: Arc::new(tokio::sync::Mutex::new(None)),
stop_ping: Arc::new(AtomicBool::new(false)),
};
let client_clone = client.clone();
tokio::spawn(async move {
client_clone.start_ping_task().await;
});
client
}
async fn start_ping_task(&self) {
let endpoint = self.endpoint.clone();
let auth_token = self.auth_token.clone();
let http_client = self.http_client.clone();
let stop_ping = self.stop_ping.clone();
let handle = tokio::spawn(async move {
if let Err(e) = Self::send_ping_request(&http_client, &endpoint, &auth_token).await {
if crate::common::sdk_log::sdk_log_enabled() {
eprintln!("0slot ping request failed: {}", e);
}
}
let mut interval = tokio::time::interval(Duration::from_secs(30)); loop {
interval.tick().await;
if stop_ping.load(Ordering::Relaxed) {
break;
}
if let Err(e) = Self::send_ping_request(&http_client, &endpoint, &auth_token).await {
if crate::common::sdk_log::sdk_log_enabled() {
eprintln!("0slot ping request failed: {}", e);
}
}
}
});
{
let mut ping_guard = self.ping_handle.lock().await;
if let Some(old_handle) = ping_guard.as_ref() {
old_handle.abort();
}
*ping_guard = Some(handle);
}
}
async fn send_ping_request(
http_client: &Client,
endpoint: &str,
auth_token: &str,
) -> Result<()> {
let url = format!("{}/?api-key={}", endpoint, auth_token);
let response = http_client
.post(&url)
.header("Content-Type", "application/json")
.timeout(Duration::from_millis(1500))
.body(r#"{"jsonrpc":"2.0","id":1,"method":"getHealth"}"#)
.send()
.await?;
let _ = response.bytes().await;
Ok(())
}
pub async fn send_transaction(
&self,
trade_type: TradeType,
transaction: &VersionedTransaction,
wait_confirmation: bool,
) -> Result<()> {
let start_time = Instant::now();
let tx_bytes = bincode::serialize(transaction)?;
let mut url = String::with_capacity(self.endpoint.len() + self.auth_token.len() + 20);
url.push_str(&self.endpoint);
url.push_str("/txb?api-key=");
url.push_str(&self.auth_token);
let response = self
.http_client
.post(&url)
.header("User-Agent", "") .body(tx_bytes)
.send()
.await?;
let status = response.status();
let response_text = response.text().await?;
match status.as_u16() {
200 => {
if let Ok(json_value) = serde_json::from_str::<serde_json::Value>(&response_text) {
if json_value.get("result").is_some() {
crate::common::sdk_log::log_swqos_submitted("0slot", trade_type, start_time.elapsed());
} else if let Some(error) = json_value.get("error") {
let code = error.get("code")
.and_then(|c| c.as_i64())
.map(|c| c.to_string())
.unwrap_or_else(|| "unknown".to_string());
let message = error.get("message")
.and_then(|m| m.as_str())
.unwrap_or("unknown error");
crate::common::sdk_log::log_swqos_submission_failed("0slot", trade_type, start_time.elapsed(), format!("code {}: {}", code, message));
return Err(anyhow::anyhow!("0slot Binary-Tx error: {}", message));
} else {
crate::common::sdk_log::log_swqos_submission_failed("0slot", trade_type, start_time.elapsed(), format!("unexpected JSON: {}", response_text));
return Err(anyhow::anyhow!("0slot Binary-Tx unexpected JSON: {}", response_text));
}
} else {
crate::common::sdk_log::log_swqos_submission_failed("0slot", trade_type, start_time.elapsed(), format!("invalid JSON: {}", response_text));
return Err(anyhow::anyhow!("0slot Binary-Tx invalid JSON: {}", response_text));
}
}
403 => {
crate::common::sdk_log::log_swqos_submission_failed("0slot", trade_type, start_time.elapsed(), response_text.clone());
return Err(anyhow::anyhow!("0slot API key error: {}", response_text));
}
419 => {
crate::common::sdk_log::log_swqos_submission_failed("0slot", trade_type, start_time.elapsed(), response_text.clone());
return Err(anyhow::anyhow!("0slot rate limit exceeded"));
}
500 => {
crate::common::sdk_log::log_swqos_submission_failed("0slot", trade_type, start_time.elapsed(), "submission failed".to_string());
return Err(anyhow::anyhow!("0slot transaction submission failed"));
}
_ => {
crate::common::sdk_log::log_swqos_submission_failed("0slot", trade_type, start_time.elapsed(), format!("status {} body: {}", status, response_text));
return Err(anyhow::anyhow!("0slot Binary-Tx failed with status {}: {}", status, response_text));
}
}
let signature = transaction.signatures[0];
let start_time = Instant::now();
match poll_transaction_confirmation(&self.rpc_client, signature, wait_confirmation).await {
Ok(_) => (),
Err(e) => {
println!(" signature: {:?}", signature);
println!(" [{:width$}] {} confirmation failed: {:?}", "0slot", trade_type, start_time.elapsed(), width = crate::common::sdk_log::SWQOS_LABEL_WIDTH);
return Err(e);
}
}
if wait_confirmation {
println!(" signature: {:?}", signature);
println!(" [{:width$}] {} confirmed: {:?}", "0slot", trade_type, start_time.elapsed(), width = crate::common::sdk_log::SWQOS_LABEL_WIDTH);
}
Ok(())
}
pub async fn send_transactions(
&self,
trade_type: TradeType,
transactions: &Vec<VersionedTransaction>,
wait_confirmation: bool,
) -> Result<()> {
for transaction in transactions {
self.send_transaction(trade_type, transaction, wait_confirmation).await?;
}
Ok(())
}
}
impl Drop for ZeroSlotClient {
fn drop(&mut self) {
self.stop_ping.store(true, Ordering::Relaxed);
let ping_handle = self.ping_handle.clone();
tokio::spawn(async move {
let mut ping_guard = ping_handle.lock().await;
if let Some(handle) = ping_guard.as_ref() {
handle.abort();
}
*ping_guard = None;
});
}
}