use crate::config::Config;
use crate::connection::selector::WorkerSelector;
use crate::connection::{FallbackChain, Transport};
use crate::discovery::DiscoveryClient;
use crate::error::{Result, SdkError};
use crate::types::{
Balance, BundleResult, ConnectionInfo, ConnectionState, ConnectionStatus, FallbackStrategy,
LandingRateOptions, LandingRateStats, LatestBlockhash, LatestSlot, LeaderHint,
PerformanceMetrics, PingResult, PriorityFee, RegionInfo, RegisterWebhookRequest, RpcResponse, SenderInfo,
RoutingRecommendation, SimulationResult, SubmitOptions, TipInstruction, TopUpInfo,
TransactionResult, UsageEntry, UsageHistoryOptions, WebhookConfig,
};
use std::collections::HashMap;
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::Instant;
use tokio::sync::{mpsc, RwLock};
use tracing::{debug, info, warn};
struct TimeSyncState {
samples: std::sync::RwLock<Vec<PingResult>>,
max_samples: usize,
}
impl TimeSyncState {
fn new(max_samples: usize) -> Self {
Self {
samples: std::sync::RwLock::new(Vec::with_capacity(max_samples)),
max_samples,
}
}
fn record(&self, result: PingResult) {
let mut samples = self.samples.write().unwrap();
if samples.len() >= self.max_samples {
samples.remove(0);
}
samples.push(result);
}
fn median_rtt_ms(&self) -> Option<u64> {
let samples = self.samples.read().unwrap();
if samples.is_empty() {
return None;
}
let mut rtts: Vec<u64> = samples.iter().map(|s| s.rtt_ms).collect();
rtts.sort();
Some(rtts[rtts.len() / 2])
}
fn median_clock_offset_ms(&self) -> Option<i64> {
let samples = self.samples.read().unwrap();
if samples.is_empty() {
return None;
}
let mut offsets: Vec<i64> = samples.iter().map(|s| s.clock_offset_ms).collect();
offsets.sort();
Some(offsets[offsets.len() / 2])
}
}
struct DedupCache {
entries: std::sync::RwLock<HashMap<Vec<u8>, Instant>>,
ttl: std::time::Duration,
}
impl DedupCache {
fn new() -> Self {
Self {
entries: std::sync::RwLock::new(HashMap::new()),
ttl: std::time::Duration::from_secs(60),
}
}
fn is_duplicate(&self, tx_bytes: &[u8]) -> bool {
let entries = self.entries.read().unwrap();
if let Some(submitted_at) = entries.get(tx_bytes) {
submitted_at.elapsed() < self.ttl
} else {
false
}
}
fn record(&self, tx_bytes: &[u8]) {
let mut entries = self.entries.write().unwrap();
entries.insert(tx_bytes.to_vec(), Instant::now());
if entries.len() > 1000 {
let ttl = self.ttl;
entries.retain(|_, submitted_at| submitted_at.elapsed() < ttl);
}
}
}
pub struct SlipstreamClient {
config: Config,
transport: Arc<RwLock<Box<dyn Transport>>>,
connection_info: ConnectionInfo,
http_client: reqwest::Client,
latest_tip: Arc<RwLock<Option<TipInstruction>>>,
metrics: Arc<ClientMetrics>,
time_sync: Arc<TimeSyncState>,
keepalive_handle: tokio::sync::Mutex<Option<tokio::task::JoinHandle<()>>>,
dedup_cache: Arc<DedupCache>,
subscription_tracker: Arc<crate::connection::health::SubscriptionTracker>,
}
struct ClientMetrics {
transactions_submitted: AtomicU64,
transactions_confirmed: AtomicU64,
total_latency_ms: AtomicU64,
last_latency_ms: AtomicU64,
}
impl SlipstreamClient {
pub async fn connect(mut config: Config) -> Result<Self> {
config.validate()?;
if config.endpoint.is_none() && config.selected_worker.is_none() {
info!(
discovery_url = %config.discovery_url,
region = ?config.region,
"Discovering workers"
);
let discovery = DiscoveryClient::new(&config.discovery_url);
let response = discovery.discover().await?;
let region = discovery
.best_region(&response, config.region.as_deref())
.ok_or_else(|| SdkError::connection("No healthy workers found via discovery"))?;
let region_workers = discovery.workers_for_region(&response, ®ion);
if region_workers.is_empty() {
return Err(SdkError::connection(format!(
"No healthy workers in region '{}'",
region
)));
}
let endpoints: Vec<_> = region_workers
.iter()
.map(|w| DiscoveryClient::worker_to_endpoint(w))
.collect();
let selector = WorkerSelector::new(endpoints.clone());
selector.measure_all().await;
let latencies = selector.get_all_latencies().await;
let mut ranked = endpoints;
ranked.sort_by_key(|w| {
latencies
.get(&w.id)
.filter(|m| m.reachable)
.map(|m| m.rtt_ms)
.unwrap_or(u64::MAX)
});
info!(
region = %region,
workers = ranked.len(),
best_worker = %ranked[0].id,
"Ranked workers by latency"
);
let mut last_error: Option<SdkError> = None;
for (i, worker) in ranked.iter().enumerate() {
info!(
worker_id = %worker.id,
region = %region,
attempt = i + 1,
total = ranked.len(),
"Trying worker"
);
config.selected_worker = Some(worker.clone());
config.region = Some(region.clone());
let fallback_chain = FallbackChain::new(config.protocol_timeouts.clone());
match fallback_chain.connect(&config).await {
Ok(mut transport) => {
match transport.connect(&config).await {
Ok(connection_info) => {
info!(
session_id = %connection_info.session_id,
protocol = %connection_info.protocol,
worker_id = %worker.id,
"Connected to Slipstream"
);
return Self::finish_connect(config, transport, connection_info).await;
}
Err(e) => {
warn!(worker_id = %worker.id, error = %e, "Worker connection failed, trying next");
last_error = Some(e);
}
}
}
Err(e) => {
warn!(worker_id = %worker.id, error = %e, "All protocols failed for worker, trying next");
last_error = Some(e);
}
}
}
return Err(last_error.unwrap_or_else(|| SdkError::connection(
format!("All workers in region '{}' rejected connection", region)
)));
}
info!(
region = ?config.region,
endpoint = ?config.endpoint,
"Connecting to Slipstream"
);
let fallback_chain = FallbackChain::new(config.protocol_timeouts.clone());
let mut transport = fallback_chain.connect(&config).await?;
let connection_info = transport.connect(&config).await?;
info!(
session_id = %connection_info.session_id,
protocol = %connection_info.protocol,
"Connected to Slipstream"
);
Self::finish_connect(config, transport, connection_info).await
}
async fn finish_connect(
config: Config,
transport: Box<dyn Transport>,
connection_info: ConnectionInfo,
) -> Result<Self> {
let transport = Arc::new(RwLock::new(transport));
let subscription_tracker = Arc::new(crate::connection::health::SubscriptionTracker::new());
let monitor = crate::connection::health::HealthMonitor::new(
config.clone(),
transport.clone(),
Arc::clone(&subscription_tracker),
);
monitor.start();
let http_client = reqwest::Client::builder()
.timeout(std::time::Duration::from_secs(10))
.build()
.map_err(|e| SdkError::connection(format!("Failed to create HTTP client: {}", e)))?;
let time_sync = Arc::new(TimeSyncState::new(10));
let keepalive_handle = if config.keepalive {
let transport_clone = Arc::clone(&transport);
let sync_clone = Arc::clone(&time_sync);
let interval = config.keepalive_interval;
Some(tokio::spawn(async move {
let mut ticker = tokio::time::interval(interval);
loop {
ticker.tick().await;
let transport = transport_clone.read().await;
if !transport.is_connected() {
break;
}
match transport.ping().await {
Ok(result) => {
debug!(rtt_ms = result.rtt_ms, offset_ms = result.clock_offset_ms, "Keepalive ping");
sync_clone.record(result);
}
Err(_) => {
break;
}
}
}
}))
} else {
None
};
let client = Self {
config,
transport,
connection_info,
http_client,
latest_tip: Arc::new(RwLock::new(None)),
metrics: Arc::new(ClientMetrics {
transactions_submitted: AtomicU64::new(0),
transactions_confirmed: AtomicU64::new(0),
total_latency_ms: AtomicU64::new(0),
last_latency_ms: AtomicU64::new(0),
}),
time_sync,
keepalive_handle: tokio::sync::Mutex::new(keepalive_handle),
dedup_cache: Arc::new(DedupCache::new()),
subscription_tracker,
};
let webhook_fut = async {
if client.config.webhook_url.is_some() {
let url = client.config.webhook_url.clone().unwrap();
let events = client.config.webhook_events.clone();
let level = client.config.webhook_notification_level.clone();
match client
.register_webhook(&url, Some(events), Some(level))
.await
{
Ok(_) => info!("Webhook auto-registered at {}", url),
Err(e) => debug!("Failed to auto-register webhook: {}", e),
}
}
};
let tip_fut = client.fetch_initial_tip();
tokio::join!(webhook_fut, tip_fut);
Ok(client)
}
pub fn connection_info(&self) -> &ConnectionInfo {
&self.connection_info
}
pub fn config(&self) -> &Config {
&self.config
}
pub async fn is_connected(&self) -> bool {
let transport = self.transport.read().await;
transport.is_connected()
}
pub async fn disconnect(&self) -> Result<()> {
if let Some(handle) = self.keepalive_handle.lock().await.take() {
handle.abort();
}
let mut transport = self.transport.write().await;
transport.disconnect().await
}
pub async fn submit_transaction(&self, transaction: &[u8]) -> Result<TransactionResult> {
self.submit_transaction_with_options(transaction, &SubmitOptions::default())
.await
}
pub async fn submit_transaction_with_options(
&self,
transaction: &[u8],
options: &SubmitOptions,
) -> Result<TransactionResult> {
if self.dedup_cache.is_duplicate(transaction) {
warn!(tx_size = transaction.len(), "Duplicate transaction detected by client-side cache");
return Err(SdkError::Transaction(
"Duplicate transaction: already submitted within the last 60 seconds".to_string(),
));
}
debug!(
tx_size = transaction.len(),
broadcast_mode = options.broadcast_mode,
preferred_sender = ?options.preferred_sender,
"Submitting transaction"
);
let start = Instant::now();
let transport = self.transport.read().await;
let result = transport.submit_transaction(transaction, options).await;
let elapsed_ms = start.elapsed().as_millis() as u64;
self.metrics.last_latency_ms.store(elapsed_ms, Ordering::Relaxed);
self.metrics.transactions_submitted.fetch_add(1, Ordering::Relaxed);
self.metrics.total_latency_ms.fetch_add(elapsed_ms, Ordering::Relaxed);
if result.is_ok() {
self.metrics.transactions_confirmed.fetch_add(1, Ordering::Relaxed);
self.dedup_cache.record(transaction);
}
result
}
pub async fn subscribe_leader_hints(&self) -> Result<mpsc::Receiver<LeaderHint>> {
debug!("Subscribing to leader hints");
self.subscription_tracker
.track(crate::connection::health::StreamType::LeaderHints);
let transport = self.transport.read().await;
transport.subscribe_leader_hints().await
}
pub async fn subscribe_tip_instructions(&self) -> Result<mpsc::Receiver<TipInstruction>> {
debug!("Subscribing to tip instructions");
self.subscription_tracker
.track(crate::connection::health::StreamType::TipInstructions);
let transport = self.transport.read().await;
transport.subscribe_tip_instructions().await
}
pub async fn subscribe_priority_fees(&self) -> Result<mpsc::Receiver<PriorityFee>> {
debug!("Subscribing to priority fees");
self.subscription_tracker
.track(crate::connection::health::StreamType::PriorityFees);
let transport = self.transport.read().await;
transport.subscribe_priority_fees().await
}
pub async fn subscribe_latest_blockhash(&self) -> Result<mpsc::Receiver<LatestBlockhash>> {
debug!("Subscribing to latest blockhash");
self.subscription_tracker
.track(crate::connection::health::StreamType::LatestBlockhash);
let transport = self.transport.read().await;
transport.subscribe_latest_blockhash().await
}
pub async fn subscribe_latest_slot(&self) -> Result<mpsc::Receiver<LatestSlot>> {
debug!("Subscribing to latest slot");
self.subscription_tracker
.track(crate::connection::health::StreamType::LatestSlot);
let transport = self.transport.read().await;
transport.subscribe_latest_slot().await
}
pub async fn get_latest_tip(&self) -> Option<TipInstruction> {
self.latest_tip.read().await.clone()
}
pub(crate) async fn set_latest_tip(&self, tip: TipInstruction) {
let mut latest = self.latest_tip.write().await;
*latest = Some(tip);
}
async fn fetch_initial_tip(&self) {
let base_url = self.config.get_endpoint(crate::types::Protocol::Http);
let url = format!("{}/v1/tip-instructions", base_url);
let response = match self
.http_client
.get(&url)
.header("Authorization", format!("Bearer {}", self.config.api_key))
.send()
.await
{
Ok(r) if r.status().is_success() => r,
_ => return,
};
#[derive(serde::Deserialize)]
struct RawTip {
sender_id: Option<String>,
tip_wallet: Option<String>,
#[serde(default)]
tip_amount_lamports: u64,
tier: Option<String>,
#[serde(default)]
expected_latency_ms: u32,
#[serde(default)]
confidence: u32,
#[serde(default)]
valid_until_slot: u64,
#[serde(default)]
timestamp: u64,
}
if let Ok(tips) = response.json::<Vec<RawTip>>().await {
if let Some(raw) = tips.last() {
let tip = TipInstruction {
timestamp: if raw.timestamp > 0 {
raw.timestamp
} else {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as u64
},
sender: raw.sender_id.clone().unwrap_or_default(),
sender_name: raw.sender_id.clone().unwrap_or_default(),
tip_wallet_address: raw.tip_wallet.clone().unwrap_or_default(),
tip_amount_sol: raw.tip_amount_lamports as f64 / 1_000_000_000.0,
tip_tier: raw.tier.clone().unwrap_or_else(|| "standard".to_string()),
expected_latency_ms: raw.expected_latency_ms,
confidence: raw.confidence,
valid_until_slot: raw.valid_until_slot,
alternative_senders: vec![],
};
self.set_latest_tip(tip).await;
}
}
}
pub async fn connection_status(&self) -> ConnectionStatus {
let transport = self.transport.read().await;
let is_connected = transport.is_connected();
let protocol = transport.protocol();
ConnectionStatus {
state: if is_connected { ConnectionState::Connected } else { ConnectionState::Disconnected },
protocol,
latency_ms: self.metrics.last_latency_ms.load(Ordering::Relaxed),
region: self.connection_info.region.clone(),
}
}
pub async fn get_routing_recommendation(&self) -> Result<RoutingRecommendation> {
let base_url = self.config.get_endpoint(crate::types::Protocol::Http);
let url = format!("{}/v1/routing/recommendation", base_url);
debug!(url = %url, "Fetching routing recommendation");
let response = self
.http_client
.get(&url)
.header("Authorization", format!("Bearer {}", self.config.api_key))
.send()
.await?;
if response.status() == reqwest::StatusCode::UNAUTHORIZED {
return Err(SdkError::auth("Invalid API key"));
}
if !response.status().is_success() {
if response.status() == reqwest::StatusCode::NOT_FOUND {
debug!("Routing endpoint not available, using local fallback");
return Ok(self.create_local_routing_recommendation().await);
}
let error_text = response.text().await.unwrap_or_default();
return Err(SdkError::Internal(format!(
"Failed to fetch routing recommendation: {}",
error_text
)));
}
let recommendation: RoutingRecommendation = response.json().await?;
Ok(recommendation)
}
async fn create_local_routing_recommendation(&self) -> RoutingRecommendation {
RoutingRecommendation {
best_region: self
.connection_info
.region
.clone()
.unwrap_or_else(|| "unknown".to_string()),
leader_pubkey: String::new(),
slot: 0,
confidence: 50, expected_rtt_ms: None,
fallback_regions: vec![],
fallback_strategy: FallbackStrategy::Retry,
valid_for_ms: 1000,
}
}
pub async fn get_regions(&self) -> Result<Vec<RegionInfo>> {
let base_url = self.config.get_endpoint(crate::types::Protocol::Http);
let url = format!("{}/v1/config/regions", base_url);
debug!(url = %url, "Fetching configured regions");
let response = self
.http_client
.get(&url)
.send()
.await?;
if !response.status().is_success() {
let error_text = response.text().await.unwrap_or_default();
return Err(SdkError::Internal(format!("Failed to fetch regions: {}", error_text)));
}
let body: serde_json::Value = response.json().await?;
let regions = body["regions"]
.as_array()
.map(|arr| {
arr.iter()
.filter_map(|r| serde_json::from_value::<RegionInfo>(r.clone()).ok())
.collect()
})
.unwrap_or_default();
Ok(regions)
}
pub async fn get_senders(&self) -> Result<Vec<SenderInfo>> {
let base_url = self.config.get_endpoint(crate::types::Protocol::Http);
let url = format!("{}/v1/config/senders", base_url);
debug!(url = %url, "Fetching configured senders");
let response = self
.http_client
.get(&url)
.header("Authorization", format!("Bearer {}", self.config.api_key))
.send()
.await?;
if response.status() == reqwest::StatusCode::UNAUTHORIZED {
return Err(SdkError::auth("Invalid API key"));
}
if !response.status().is_success() {
let error_text = response.text().await.unwrap_or_default();
return Err(SdkError::Internal(format!("Failed to fetch senders: {}", error_text)));
}
let body: serde_json::Value = response.json().await?;
let senders = body["senders"]
.as_array()
.map(|arr| {
arr.iter()
.filter_map(|s| serde_json::from_value::<SenderInfo>(s.clone()).ok())
.collect()
})
.unwrap_or_default();
Ok(senders)
}
pub async fn get_balance(&self) -> Result<Balance> {
let base_url = self.config.get_endpoint(crate::types::Protocol::Http);
let url = format!("{}/v1/balance", base_url);
debug!(url = %url, "Fetching token balance");
let response = self
.http_client
.get(&url)
.header("Authorization", format!("Bearer {}", self.config.api_key))
.send()
.await?;
if response.status() == reqwest::StatusCode::UNAUTHORIZED {
return Err(SdkError::auth("Invalid API key"));
}
if !response.status().is_success() {
let error_text = response.text().await.unwrap_or_default();
return Err(SdkError::Internal(format!("Failed to fetch balance: {}", error_text)));
}
let body: serde_json::Value = response.json().await?;
let balance_lamports = body["balance_lamports"].as_i64().unwrap_or(0);
let cost_per_query = 50_000i64; let grace_limit = 1_000_000i64;
let tier = body["tier"].as_str().map(|s| s.to_string());
let free_tier_usage = serde_json::from_value(body["free_tier_usage"].clone()).ok();
Ok(Balance {
balance_sol: balance_lamports as f64 / 1_000_000_000.0,
balance_tokens: balance_lamports / cost_per_query,
balance_lamports,
grace_remaining_tokens: (balance_lamports + grace_limit) / cost_per_query,
tier,
free_tier_usage,
})
}
pub async fn get_deposit_address(&self) -> Result<TopUpInfo> {
let base_url = self.config.get_endpoint(crate::types::Protocol::Http);
let url = format!("{}/v1/deposit-address", base_url);
debug!(url = %url, "Fetching deposit address");
let response = self
.http_client
.get(&url)
.header("Authorization", format!("Bearer {}", self.config.api_key))
.send()
.await?;
if response.status() == reqwest::StatusCode::UNAUTHORIZED {
return Err(SdkError::auth("Invalid API key"));
}
if !response.status().is_success() {
let error_text = response.text().await.unwrap_or_default();
return Err(SdkError::Internal(format!("Failed to fetch deposit address: {}", error_text)));
}
let info: TopUpInfo = response.json().await?;
Ok(info)
}
pub async fn get_usage_history(&self, options: UsageHistoryOptions) -> Result<Vec<UsageEntry>> {
let base_url = self.config.get_endpoint(crate::types::Protocol::Http);
let mut url = format!("{}/v1/usage-history", base_url);
let mut params = Vec::new();
if let Some(limit) = options.limit {
params.push(format!("limit={}", limit));
}
if let Some(offset) = options.offset {
params.push(format!("offset={}", offset));
}
if !params.is_empty() {
url = format!("{}?{}", url, params.join("&"));
}
debug!(url = %url, "Fetching usage history");
let response = self
.http_client
.get(&url)
.header("Authorization", format!("Bearer {}", self.config.api_key))
.send()
.await?;
if response.status() == reqwest::StatusCode::UNAUTHORIZED {
return Err(SdkError::auth("Invalid API key"));
}
if !response.status().is_success() {
let error_text = response.text().await.unwrap_or_default();
return Err(SdkError::Internal(format!("Failed to fetch usage history: {}", error_text)));
}
let body: serde_json::Value = response.json().await?;
let entries: Vec<UsageEntry> = serde_json::from_value(
body["entries"].clone(),
)
.unwrap_or_default();
Ok(entries)
}
pub async fn get_deposit_history(
&self,
options: crate::types::DepositHistoryOptions,
) -> Result<Vec<crate::types::DepositEntry>> {
let base_url = self.config.get_endpoint(crate::types::Protocol::Http);
let mut url = format!("{}/v1/deposit-history", base_url);
let mut params = Vec::new();
if let Some(limit) = options.limit {
params.push(format!("limit={}", limit));
}
if let Some(offset) = options.offset {
params.push(format!("offset={}", offset));
}
if !params.is_empty() {
url = format!("{}?{}", url, params.join("&"));
}
debug!(url = %url, "Fetching deposit history");
let response = self
.http_client
.get(&url)
.header("Authorization", format!("Bearer {}", self.config.api_key))
.send()
.await?;
if response.status() == reqwest::StatusCode::UNAUTHORIZED {
return Err(SdkError::auth("Invalid API key"));
}
if !response.status().is_success() {
let error_text = response.text().await.unwrap_or_default();
return Err(SdkError::Internal(format!(
"Failed to fetch deposit history: {}",
error_text
)));
}
let body: serde_json::Value = response.json().await?;
let entries: Vec<crate::types::DepositEntry> = serde_json::from_value(
body["deposits"].clone(),
)
.unwrap_or_default();
Ok(entries)
}
pub async fn get_pending_deposit(&self) -> Result<crate::types::PendingDeposit> {
let base_url = self.config.get_endpoint(crate::types::Protocol::Http);
let url = format!("{}/v1/deposit-pending", base_url);
debug!(url = %url, "Fetching pending deposit info");
let response = self
.http_client
.get(&url)
.header("Authorization", format!("Bearer {}", self.config.api_key))
.send()
.await?;
if response.status() == reqwest::StatusCode::UNAUTHORIZED {
return Err(SdkError::auth("Invalid API key"));
}
if !response.status().is_success() {
let error_text = response.text().await.unwrap_or_default();
return Err(SdkError::Internal(format!(
"Failed to fetch pending deposits: {}",
error_text
)));
}
let pending: crate::types::PendingDeposit = response.json().await?;
Ok(pending)
}
pub async fn get_free_tier_usage(&self) -> Result<crate::types::FreeTierUsage> {
let base_url = self.config.get_endpoint(crate::types::Protocol::Http);
let url = format!("{}/v1/free-tier-usage", base_url);
debug!(url = %url, "Fetching free tier usage");
let response = self
.http_client
.get(&url)
.header("Authorization", format!("Bearer {}", self.config.api_key))
.send()
.await?;
if response.status() == reqwest::StatusCode::UNAUTHORIZED {
return Err(SdkError::auth("Invalid API key"));
}
if !response.status().is_success() {
let error_text = response.text().await.unwrap_or_default();
return Err(SdkError::Internal(format!(
"Failed to fetch free tier usage: {}",
error_text
)));
}
let usage: crate::types::FreeTierUsage = response.json().await?;
Ok(usage)
}
pub fn get_minimum_deposit_usd(&self) -> f64 {
10.0
}
pub fn metrics(&self) -> PerformanceMetrics {
let submitted = self.metrics.transactions_submitted.load(Ordering::Relaxed);
let confirmed = self.metrics.transactions_confirmed.load(Ordering::Relaxed);
let total_latency = self.metrics.total_latency_ms.load(Ordering::Relaxed);
PerformanceMetrics {
transactions_submitted: submitted,
transactions_confirmed: confirmed,
average_latency_ms: if submitted > 0 { total_latency as f64 / submitted as f64 } else { 0.0 },
success_rate: if submitted > 0 { confirmed as f64 / submitted as f64 } else { 0.0 },
}
}
pub async fn submit_bundle(&self, transactions: &[Vec<u8>]) -> Result<BundleResult> {
self.submit_bundle_with_tip(transactions, None).await
}
pub async fn submit_bundle_with_tip(
&self,
transactions: &[Vec<u8>],
tip_lamports: Option<u64>,
) -> Result<BundleResult> {
if transactions.len() < 2 || transactions.len() > 5 {
return Err(SdkError::config(
"Bundle must contain 2-5 transactions",
));
}
debug!(
tx_count = transactions.len(),
tip = ?tip_lamports,
"Submitting bundle"
);
let base_url = self.config.get_endpoint(crate::types::Protocol::Http);
let url = format!("{}/v1/bundles/submit", base_url);
let txs_base64: Vec<String> = transactions
.iter()
.map(|tx| base64::Engine::encode(&base64::engine::general_purpose::STANDARD, tx))
.collect();
let body = serde_json::json!({
"transactions": txs_base64,
"tip_lamports": tip_lamports,
});
let response = self
.http_client
.post(&url)
.header("Authorization", format!("Bearer {}", self.config.api_key))
.json(&body)
.send()
.await?;
if response.status() == reqwest::StatusCode::UNAUTHORIZED {
return Err(SdkError::auth("Invalid API key"));
}
if !response.status().is_success() {
let error_text = response.text().await.unwrap_or_default();
return Err(SdkError::Internal(format!(
"Failed to submit bundle: {}",
error_text
)));
}
let result: BundleResult = response.json().await?;
Ok(result)
}
pub fn latency_ms(&self) -> Option<u64> {
self.time_sync.median_rtt_ms().map(|rtt| rtt / 2)
}
pub fn clock_offset_ms(&self) -> Option<i64> {
self.time_sync.median_clock_offset_ms()
}
pub fn server_time(&self) -> u64 {
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_millis() as u64;
let offset = self.time_sync.median_clock_offset_ms().unwrap_or(0);
(now as i64 + offset) as u64
}
pub async fn ping(&self) -> Result<PingResult> {
let transport = self.transport.read().await;
let result = transport.ping().await?;
self.time_sync.record(result.clone());
Ok(result)
}
pub async fn register_webhook(
&self,
url: &str,
events: Option<Vec<String>>,
notification_level: Option<String>,
) -> Result<WebhookConfig> {
let base_url = self.config.get_endpoint(crate::types::Protocol::Http);
let api_url = format!("{}/v1/webhooks", base_url);
let request = RegisterWebhookRequest {
url: url.to_string(),
events,
notification_level,
};
let response = self
.http_client
.post(&api_url)
.header("Authorization", format!("Bearer {}", self.config.api_key))
.json(&request)
.send()
.await?;
if response.status() == reqwest::StatusCode::UNAUTHORIZED {
return Err(SdkError::auth("Invalid API key"));
}
if !response.status().is_success() {
let error_text = response.text().await.unwrap_or_default();
return Err(SdkError::Internal(format!(
"Failed to register webhook: {}",
error_text
)));
}
let config: WebhookConfig = response.json().await?;
Ok(config)
}
pub async fn get_webhook(&self) -> Result<Option<WebhookConfig>> {
let base_url = self.config.get_endpoint(crate::types::Protocol::Http);
let url = format!("{}/v1/webhooks", base_url);
let response = self
.http_client
.get(&url)
.header("Authorization", format!("Bearer {}", self.config.api_key))
.send()
.await?;
if response.status() == reqwest::StatusCode::UNAUTHORIZED {
return Err(SdkError::auth("Invalid API key"));
}
if response.status() == reqwest::StatusCode::NOT_FOUND {
return Ok(None);
}
if !response.status().is_success() {
let error_text = response.text().await.unwrap_or_default();
return Err(SdkError::Internal(format!(
"Failed to get webhook: {}",
error_text
)));
}
let config: WebhookConfig = response.json().await?;
Ok(Some(config))
}
pub async fn delete_webhook(&self) -> Result<()> {
let base_url = self.config.get_endpoint(crate::types::Protocol::Http);
let url = format!("{}/v1/webhooks", base_url);
let response = self
.http_client
.delete(&url)
.header("Authorization", format!("Bearer {}", self.config.api_key))
.send()
.await?;
if response.status() == reqwest::StatusCode::UNAUTHORIZED {
return Err(SdkError::auth("Invalid API key"));
}
if !response.status().is_success() {
let error_text = response.text().await.unwrap_or_default();
return Err(SdkError::Internal(format!(
"Failed to delete webhook: {}",
error_text
)));
}
Ok(())
}
pub async fn get_landing_rates(
&self,
options: Option<LandingRateOptions>,
) -> Result<LandingRateStats> {
let base_url = self.config.get_endpoint(crate::types::Protocol::Http);
let mut url = format!("{}/v1/metrics/landing-rates", base_url);
if let Some(opts) = &options {
let mut params = Vec::new();
if let Some(start) = &opts.start {
params.push(format!("start={}", start));
}
if let Some(end) = &opts.end {
params.push(format!("end={}", end));
}
if !params.is_empty() {
url.push('?');
url.push_str(¶ms.join("&"));
}
}
let response = self
.http_client
.get(&url)
.header("Authorization", format!("Bearer {}", self.config.api_key))
.send()
.await?;
if response.status() == reqwest::StatusCode::UNAUTHORIZED {
return Err(SdkError::auth("Invalid API key"));
}
if !response.status().is_success() {
let error_text = response.text().await.unwrap_or_default();
return Err(SdkError::Internal(format!(
"Failed to get landing rates: {}",
error_text
)));
}
let stats: LandingRateStats = response.json().await?;
Ok(stats)
}
pub async fn get_transaction_status(&self, signature: &str) -> Result<TransactionResult> {
let base_url = self.config.get_endpoint(crate::types::Protocol::Http);
let url = format!("{}/v1/transactions/{}/status", base_url, signature);
debug!(url = %url, signature = %signature, "Fetching transaction status");
let response = self
.http_client
.get(&url)
.header("Authorization", format!("Bearer {}", self.config.api_key))
.send()
.await?;
if response.status() == reqwest::StatusCode::UNAUTHORIZED {
return Err(SdkError::auth("Invalid API key"));
}
if response.status() == reqwest::StatusCode::NOT_FOUND {
return Err(SdkError::Internal(format!(
"Transaction not found for signature: {}",
signature
)));
}
if !response.status().is_success() {
let error_text = response.text().await.unwrap_or_default();
return Err(SdkError::Internal(format!(
"Failed to get transaction status: {}",
error_text
)));
}
let result: TransactionResult = response.json().await?;
Ok(result)
}
pub async fn rpc(
&self,
method: &str,
params: serde_json::Value,
) -> Result<RpcResponse> {
let base_url = self.config.get_endpoint(crate::types::Protocol::Http);
let url = format!("{}/v1/rpc", base_url);
let body = serde_json::json!({
"jsonrpc": "2.0",
"id": 1,
"method": method,
"params": params,
});
let response = self
.http_client
.post(&url)
.header("Authorization", format!("Bearer {}", self.config.api_key))
.json(&body)
.send()
.await?;
if response.status() == reqwest::StatusCode::UNAUTHORIZED {
return Err(SdkError::auth("Invalid API key"));
}
if response.status() == reqwest::StatusCode::PAYMENT_REQUIRED {
return Err(SdkError::Internal(
"Insufficient token balance for RPC query".to_string(),
));
}
if !response.status().is_success() {
let error_text = response.text().await.unwrap_or_default();
return Err(SdkError::Internal(format!("RPC proxy error: {}", error_text)));
}
let rpc_response: RpcResponse = response.json().await?;
Ok(rpc_response)
}
pub async fn simulate_transaction(&self, transaction: &[u8]) -> Result<SimulationResult> {
use base64::{engine::general_purpose::STANDARD, Engine};
let tx_b64 = STANDARD.encode(transaction);
let response = self
.rpc(
"simulateTransaction",
serde_json::json!([tx_b64, {
"encoding": "base64",
"commitment": "confirmed",
"replaceRecentBlockhash": true,
}]),
)
.await?;
if let Some(error) = response.error {
return Err(SdkError::Internal(format!(
"RPC error {}: {}",
error.code, error.message
)));
}
let result_value = response
.result
.ok_or_else(|| SdkError::Internal("No result in RPC response".to_string()))?;
let value = result_value
.get("value")
.cloned()
.unwrap_or(result_value);
let sim: SimulationResult = serde_json::from_value(value)
.map_err(|e| SdkError::Internal(format!("Failed to parse simulation result: {}", e)))?;
Ok(sim)
}
pub async fn simulate_bundle(
&self,
transactions: &[Vec<u8>],
) -> Result<Vec<SimulationResult>> {
let mut results = Vec::with_capacity(transactions.len());
for tx in transactions {
let sim = self.simulate_transaction(tx).await?;
let failed = sim.err.is_some();
results.push(sim);
if failed {
break;
}
}
Ok(results)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_submit_options_default() {
let options = SubmitOptions::default();
assert!(!options.broadcast_mode);
assert_eq!(options.max_retries, 2);
}
}