use super::Transport;
use crate::config::{Config, Protocol};
use crate::error::{Result, SdkError};
use crate::types::{
ConnectionInfo, LatestBlockhash, LatestSlot, LeaderHint, PingResult, PriorityFee, RateLimitInfo,
SubmitOptions, TipInstruction, TransactionResult, TransactionStatus,
};
use async_trait::async_trait;
use quinn::{ClientConfig, Connection, Endpoint};
use std::net::SocketAddr;
use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::{mpsc, RwLock};
use tracing::{debug, info, warn};
#[repr(u8)]
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum StreamType {
TransactionSubmit = 0x01,
LeaderHints = 0x02,
TipInstructions = 0x03,
PriorityFees = 0x04,
Metrics = 0x05,
LatestBlockhash = 0x06,
LatestSlot = 0x07,
Ping = 0x08,
BundleSubmit = 0x09,
TransactionStatus = 0x0A,
TpuSubmit = 0x0B,
}
#[repr(u8)]
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum ResponseStatus {
Accepted = 0x01,
Duplicate = 0x02,
RateLimited = 0x03,
ServerError = 0x04,
}
impl ResponseStatus {
fn from_byte(value: u8) -> Option<Self> {
match value {
0x01 => Some(ResponseStatus::Accepted),
0x02 => Some(ResponseStatus::Duplicate),
0x03 => Some(ResponseStatus::RateLimited),
0x04 => Some(ResponseStatus::ServerError),
_ => None,
}
}
}
pub struct QuicTransport {
endpoint: Option<Endpoint>,
connection: RwLock<Option<Connection>>,
config: Option<Config>,
connected: AtomicBool,
session_id: RwLock<Option<String>>,
request_counter: AtomicU32,
ping_seq: AtomicU32,
}
impl QuicTransport {
pub fn new() -> Self {
Self {
endpoint: None,
connection: RwLock::new(None),
config: None,
connected: AtomicBool::new(false),
session_id: RwLock::new(None),
request_counter: AtomicU32::new(0),
ping_seq: AtomicU32::new(0),
}
}
fn next_request_id(&self) -> u32 {
self.request_counter.fetch_add(1, Ordering::Relaxed)
}
fn build_client_config() -> Result<ClientConfig> {
let mut crypto = rustls::ClientConfig::builder()
.with_safe_defaults()
.with_custom_certificate_verifier(Arc::new(SkipServerVerification))
.with_no_client_auth();
crypto.alpn_protocols = vec![b"slipstream".to_vec()];
let mut client_config = ClientConfig::new(Arc::new(crypto));
let mut transport_config = quinn::TransportConfig::default();
transport_config.keep_alive_interval(Some(Duration::from_secs(5)));
transport_config.max_idle_timeout(Some(
quinn::IdleTimeout::try_from(Duration::from_secs(30))
.map_err(|e| SdkError::protocol(format!("Invalid timeout: {}", e)))?,
));
client_config.transport_config(Arc::new(transport_config));
Ok(client_config)
}
fn parse_endpoint(url: &str) -> Result<(SocketAddr, String)> {
let url = url.trim_start_matches("quic://");
let parts: Vec<&str> = url.rsplitn(2, ':').collect();
if parts.len() != 2 {
return Err(SdkError::config("Invalid QUIC endpoint format, expected host:port"));
}
let port: u16 = parts[0]
.parse()
.map_err(|_| SdkError::config("Invalid port number"))?;
let host = parts[1];
let addr_str = format!("{}:{}", host, port);
let addr: SocketAddr = addr_str
.parse()
.or_else(|_| {
use std::net::ToSocketAddrs;
addr_str
.to_socket_addrs()
.map_err(|e| SdkError::connection(format!("DNS resolution failed: {}", e)))?
.next()
.ok_or_else(|| SdkError::connection("No addresses found"))
})?;
Ok((addr, host.to_string()))
}
async fn authenticate(
&self,
connection: &Connection,
api_key: &str,
) -> Result<String> {
let (mut send, mut recv) = connection
.open_bi()
.await
.map_err(|e| SdkError::connection(format!("Failed to open auth stream: {}", e)))?;
let key_prefix = &api_key[..16.min(api_key.len())];
let mut auth_frame = [0u8; 64];
let prefix_bytes = key_prefix.as_bytes();
let prefix_len = prefix_bytes.len().min(16);
auth_frame[..prefix_len].copy_from_slice(&prefix_bytes[..prefix_len]);
let version = b"rust-sdk-v0.1";
auth_frame[16..16 + version.len()].copy_from_slice(version);
send.write_all(&auth_frame[..16 + version.len()])
.await
.map_err(|e| SdkError::connection(format!("Failed to send auth: {}", e)))?;
let mut response_buf = [0u8; 256];
let n = recv
.read(&mut response_buf)
.await
.map_err(|e| SdkError::connection(format!("Failed to read auth response: {}", e)))?
.ok_or_else(|| SdkError::auth("Connection closed during authentication"))?;
if n < 1 {
return Err(SdkError::auth("Empty auth response"));
}
let status = response_buf[0];
let message = String::from_utf8_lossy(&response_buf[1..n]).to_string();
if status == 0x01 {
debug!(message = %message, "QUIC authentication successful");
Ok(message)
} else {
Err(SdkError::auth(format!("Authentication failed: {}", message)))
}
}
async fn subscribe_stream<T, F>(
&self,
stream_type: StreamType,
decoder: F,
) -> Result<mpsc::Receiver<T>>
where
T: Send + 'static,
F: Fn(&[u8]) -> Option<T> + Send + Sync + 'static,
{
let connection = {
let guard = self.connection.read().await;
guard
.as_ref()
.ok_or(SdkError::NotConnected)?
.clone()
};
let (tx, rx) = mpsc::channel(32);
let mut send = connection
.open_uni()
.await
.map_err(|e| SdkError::connection(format!("Failed to open subscription stream: {}", e)))?;
send.write_all(&[stream_type as u8])
.await
.map_err(|e| SdkError::connection(format!("Failed to send subscription request: {}", e)))?;
if let Err(e) = send.finish().await {
debug!(
stream_type = ?stream_type,
error = %e,
"Stream finish returned error (harmless — server acknowledged subscription)"
);
}
debug!(
stream_type = ?stream_type,
"Subscription request sent, waiting for server stream"
);
let conn = connection.clone();
tokio::spawn(async move {
let mut recv = match conn.accept_uni().await {
Ok(r) => {
debug!(
stream_type = ?stream_type,
"Accepted server subscription stream"
);
r
}
Err(quinn::ConnectionError::ApplicationClosed(_)) => {
debug!("Connection closed before subscription stream accepted");
return;
}
Err(e) => {
warn!(
stream_type = ?stream_type,
error = %e,
"Failed to accept subscription stream from server"
);
return;
}
};
let mut msg_count: u64 = 0;
loop {
let mut len_buf = [0u8; 2];
match recv.read_exact(&mut len_buf).await {
Ok(()) => {}
Err(e) => {
if msg_count == 0 {
warn!(
stream_type = ?stream_type,
error = %e,
"Subscription stream closed before any data received"
);
} else {
debug!(
stream_type = ?stream_type,
messages_received = msg_count,
"Subscription stream ended"
);
}
break;
}
}
let msg_len = u16::from_be_bytes(len_buf) as usize;
if msg_len == 0 || msg_len > 4096 {
warn!(
stream_type = ?stream_type,
msg_len = msg_len,
"Invalid message length, closing subscription"
);
break;
}
let mut msg_buf = vec![0u8; msg_len];
match recv.read_exact(&mut msg_buf).await {
Ok(()) => {}
Err(e) => {
warn!(
stream_type = ?stream_type,
error = %e,
"Failed to read message body"
);
break;
}
}
msg_count += 1;
if let Some(data) = decoder(&msg_buf) {
if tx.send(data).await.is_err() {
debug!(
stream_type = ?stream_type,
"Receiver dropped, stopping subscription"
);
break;
}
} else if msg_count <= 3 {
debug!(
stream_type = ?stream_type,
msg_len = msg_len,
first_byte = msg_buf.first().copied().unwrap_or(0),
"Failed to decode subscription message"
);
}
}
});
Ok(rx)
}
}
impl Default for QuicTransport {
fn default() -> Self {
Self::new()
}
}
#[async_trait]
impl Transport for QuicTransport {
async fn connect(&mut self, config: &Config) -> Result<ConnectionInfo> {
if self.is_connected() {
let session_id = self.session_id.read().await.clone().unwrap_or_default();
debug!(session_id = %session_id, "QUIC already connected, reusing existing connection");
return Ok(ConnectionInfo {
session_id,
protocol: "quic".to_string(),
region: config.region.clone(),
server_time: std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_millis() as u64,
features: vec!["streaming".to_string(), "bidirectional".to_string()],
rate_limit: crate::types::RateLimitInfo { rps: 1000, burst: 2000 },
});
}
let endpoint_url = config.get_endpoint(Protocol::Quic);
self.config = Some(config.clone());
debug!(endpoint = %endpoint_url, "Connecting via QUIC");
let (server_addr, server_name) = Self::parse_endpoint(&endpoint_url)?;
let client_config = Self::build_client_config()?;
let mut endpoint = Endpoint::client("0.0.0.0:0".parse().unwrap())
.map_err(|e| SdkError::connection(format!("Failed to create endpoint: {}", e)))?;
endpoint.set_default_client_config(client_config);
let connection = endpoint
.connect(server_addr, &server_name)
.map_err(|e| SdkError::connection(format!("Failed to initiate connection: {}", e)))?
.await
.map_err(|e| SdkError::connection(format!("Connection failed: {}", e)))?;
debug!(
remote = %connection.remote_address(),
"QUIC connection established"
);
let auth_result = self.authenticate(&connection, &config.api_key).await?;
self.endpoint = Some(endpoint);
{
let mut conn_guard = self.connection.write().await;
*conn_guard = Some(connection.clone());
}
let (session_id, server_region) = if auth_result.starts_with("ok:") {
let payload = &auth_result[3..];
let parts: Vec<&str> = payload.splitn(2, ':').collect();
let key_prefix = parts[0];
let region = parts.get(1).map(|r| r.to_string());
(format!("quic-{}", key_prefix), region)
} else {
(uuid::Uuid::new_v4().to_string(), None)
};
{
let mut session_guard = self.session_id.write().await;
*session_guard = Some(session_id.clone());
}
self.connected.store(true, Ordering::SeqCst);
let region = server_region.or_else(|| config.region.clone());
info!(
session_id = %session_id,
region = ?region,
remote = %connection.remote_address(),
"QUIC transport connected"
);
Ok(ConnectionInfo {
session_id,
protocol: "quic".to_string(),
region,
server_time: std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_millis() as u64,
features: vec!["streaming".to_string(), "bidirectional".to_string()],
rate_limit: RateLimitInfo { rps: 1000, burst: 2000 },
})
}
async fn disconnect(&mut self) -> Result<()> {
self.connected.store(false, Ordering::SeqCst);
{
let mut conn_guard = self.connection.write().await;
if let Some(conn) = conn_guard.take() {
conn.close(0u32.into(), b"client_disconnect");
}
}
{
let mut session_guard = self.session_id.write().await;
*session_guard = None;
}
if let Some(endpoint) = self.endpoint.take() {
endpoint.wait_idle().await;
}
debug!("QUIC transport disconnected");
Ok(())
}
fn is_connected(&self) -> bool {
self.connected.load(Ordering::SeqCst)
}
fn protocol(&self) -> Protocol {
Protocol::Quic
}
async fn submit_transaction(
&self,
transaction: &[u8],
options: &SubmitOptions,
) -> Result<TransactionResult> {
if !self.is_connected() {
return Err(SdkError::NotConnected);
}
let connection = {
let guard = self.connection.read().await;
guard
.as_ref()
.ok_or(SdkError::NotConnected)?
.clone()
};
let request_id = self.next_request_id();
debug!(
request_id = request_id,
tx_size = transaction.len(),
"Submitting transaction via QUIC"
);
let (mut send, mut recv) = connection
.open_bi()
.await
.map_err(|e| SdkError::connection(format!("Failed to open transaction stream: {}", e)))?;
let mut frame = Vec::with_capacity(1 + transaction.len());
let stream_type = if options.tpu_submission {
StreamType::TpuSubmit
} else {
StreamType::TransactionSubmit
};
frame.push(stream_type as u8);
frame.extend_from_slice(transaction);
send.write_all(&frame)
.await
.map_err(|e| SdkError::connection(format!("Failed to send transaction: {}", e)))?;
send.finish()
.await
.map_err(|e| SdkError::connection(format!("Failed to finish send: {}", e)))?;
let response_buf = recv
.read_to_end(1024)
.await
.map_err(|e| SdkError::connection(format!("Failed to read response: {}", e)))?;
let n = response_buf.len();
if n < 6 {
return Err(SdkError::protocol(format!("Response too short: {} bytes", n)));
}
let _resp_request_id = u32::from_be_bytes([
response_buf[0],
response_buf[1],
response_buf[2],
response_buf[3],
]);
let status_byte = response_buf[4];
let has_signature = response_buf[5] != 0;
let mut offset = 6;
let signature = if has_signature && n >= offset + 64 {
let sig = &response_buf[offset..offset + 64];
offset += 64;
Some(bs58::encode(sig).into_string())
} else {
if has_signature { offset = n.min(offset + 64); }
None
};
let error = if n >= offset + 2 {
let error_len = u16::from_be_bytes([response_buf[offset], response_buf[offset + 1]]) as usize;
offset += 2;
if error_len > 0 && n >= offset + error_len {
let msg = String::from_utf8_lossy(&response_buf[offset..offset + error_len]).to_string();
offset += error_len;
Some(msg)
} else {
offset += error_len.min(n - offset);
None
}
} else {
None
};
let sender_id = if n > offset {
let sender_len = response_buf[offset] as usize;
offset += 1;
if sender_len > 0 && n >= offset + sender_len {
let s = String::from_utf8_lossy(&response_buf[offset..offset + sender_len]).to_string();
offset += sender_len;
Some(s)
} else {
offset += sender_len.min(n.saturating_sub(offset));
None
}
} else {
None
};
let region = if n > offset {
let region_len = response_buf[offset] as usize;
offset += 1;
if region_len > 0 && n >= offset + region_len {
let r = String::from_utf8_lossy(&response_buf[offset..offset + region_len]).to_string();
offset += region_len;
Some(r)
} else {
offset += region_len.min(n.saturating_sub(offset));
None
}
} else {
None
};
let slot_sent = if n > offset && response_buf[offset] == 1 {
offset += 1;
if n >= offset + 8 {
let slot = u64::from_be_bytes([
response_buf[offset], response_buf[offset+1],
response_buf[offset+2], response_buf[offset+3],
response_buf[offset+4], response_buf[offset+5],
response_buf[offset+6], response_buf[offset+7],
]);
offset += 8;
Some(slot)
} else {
None
}
} else {
if n > offset { offset += 1; }
None
};
let slot_accepted = if n > offset && response_buf[offset] == 1 {
offset += 1;
if n >= offset + 8 {
let slot = u64::from_be_bytes([
response_buf[offset], response_buf[offset+1],
response_buf[offset+2], response_buf[offset+3],
response_buf[offset+4], response_buf[offset+5],
response_buf[offset+6], response_buf[offset+7],
]);
offset += 8;
Some(slot)
} else {
None
}
} else {
if n > offset { offset += 1; }
None
};
let (latency_ms, sender_latency_ms) = if n >= offset + 8 {
let total = u32::from_be_bytes([
response_buf[offset], response_buf[offset+1],
response_buf[offset+2], response_buf[offset+3],
]);
let sender = u32::from_be_bytes([
response_buf[offset+4], response_buf[offset+5],
response_buf[offset+6], response_buf[offset+7],
]);
(total, sender)
} else {
(0, 0)
};
let status = match ResponseStatus::from_byte(status_byte) {
Some(ResponseStatus::Accepted) => TransactionStatus::Sent,
Some(ResponseStatus::Duplicate) => TransactionStatus::Duplicate,
Some(ResponseStatus::RateLimited) => TransactionStatus::RateLimited,
Some(ResponseStatus::ServerError) => TransactionStatus::Failed,
None => TransactionStatus::Failed,
};
let transaction_id = uuid::Uuid::new_v4().to_string();
let routing_latency = latency_ms.saturating_sub(sender_latency_ms);
let routing = if sender_id.is_some() || region.is_some() || latency_ms > 0 {
Some(crate::types::RoutingInfo {
region: region.unwrap_or_default(),
sender: sender_id.unwrap_or_default(),
routing_latency_ms: routing_latency,
sender_latency_ms,
total_latency_ms: latency_ms,
})
} else {
None
};
Ok(TransactionResult {
request_id: uuid::Uuid::new_v4().to_string(),
transaction_id,
signature,
status,
slot: None,
slot_sent,
slot_accepted,
slot_landed: None,
slot_delta: None,
commitment_level: None,
confirmations: None,
slot_processed: None,
slot_confirmed: None,
slot_finalized: None,
timestamp: std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_millis() as u64,
routing,
error: error.map(|msg| crate::types::TransactionError {
code: "QUIC_ERROR".to_string(),
message: msg,
details: None,
}),
})
}
async fn subscribe_leader_hints(&self) -> Result<mpsc::Receiver<LeaderHint>> {
if !self.is_connected() {
return Err(SdkError::NotConnected);
}
self.subscribe_stream(StreamType::LeaderHints, |data| {
if data.len() < 2 || data[0] != StreamType::LeaderHints as u8 {
return None;
}
let mut offset = 1;
if offset >= data.len() { return None; }
let region_len = data[offset] as usize;
offset += 1;
if data.len() < offset + region_len + 7 { return None;
}
let preferred_region = String::from_utf8_lossy(&data[offset..offset + region_len]).to_string();
offset += region_len;
let confidence_raw = u16::from_be_bytes([data[offset], data[offset + 1]]);
let confidence = (confidence_raw / 100) as u32;
offset += 2;
let slots_remaining = u32::from_be_bytes([
data[offset],
data[offset + 1],
data[offset + 2],
data[offset + 3],
]);
offset += 4;
if offset >= data.len() { return None; }
let pubkey_len = data[offset] as usize;
offset += 1;
let leader_pubkey = if pubkey_len > 0 && offset + pubkey_len <= data.len() {
let pk = String::from_utf8_lossy(&data[offset..offset + pubkey_len]).to_string();
offset += pubkey_len;
pk
} else {
offset += pubkey_len.min(data.len().saturating_sub(offset));
String::new()
};
if data.len() < offset + 8 { return None; }
let timestamp = u64::from_be_bytes([
data[offset],
data[offset + 1],
data[offset + 2],
data[offset + 3],
data[offset + 4],
data[offset + 5],
data[offset + 6],
data[offset + 7],
]);
let slot = timestamp / 400;
Some(LeaderHint {
timestamp,
slot,
expires_at_slot: slot + slots_remaining as u64,
preferred_region,
backup_regions: vec![],
confidence,
leader_pubkey,
metadata: crate::types::LeaderHintMetadata {
tpu_rtt_ms: 0,
region_score: confidence as f64 / 100.0,
leader_tpu_address: None,
region_rtt_ms: None,
},
})
})
.await
}
async fn subscribe_tip_instructions(&self) -> Result<mpsc::Receiver<TipInstruction>> {
if !self.is_connected() {
return Err(SdkError::NotConnected);
}
self.subscribe_stream(StreamType::TipInstructions, |data| {
if data.len() < 2 || data[0] != StreamType::TipInstructions as u8 {
return None;
}
let mut offset = 1;
let sender_len = data[offset] as usize;
offset += 1;
if data.len() < offset + sender_len {
return None;
}
let sender = String::from_utf8_lossy(&data[offset..offset + sender_len]).to_string();
offset += sender_len;
if data.len() < offset + 1 {
return None;
}
let wallet_len = data[offset] as usize;
offset += 1;
if data.len() < offset + wallet_len {
return None;
}
let tip_wallet_address = String::from_utf8_lossy(&data[offset..offset + wallet_len]).to_string();
offset += wallet_len;
if data.len() < offset + 8 {
return None;
}
let tip_amount_lamports = u64::from_be_bytes([
data[offset],
data[offset + 1],
data[offset + 2],
data[offset + 3],
data[offset + 4],
data[offset + 5],
data[offset + 6],
data[offset + 7],
]);
let tip_amount_sol = tip_amount_lamports as f64 / 1_000_000_000.0;
offset += 8;
if data.len() < offset + 1 {
return None;
}
let tier_len = data[offset] as usize;
offset += 1;
if data.len() < offset + tier_len {
return None;
}
let tip_tier = String::from_utf8_lossy(&data[offset..offset + tier_len]).to_string();
offset += tier_len;
if data.len() < offset + 4 {
return None;
}
let expected_latency_ms = u32::from_be_bytes([
data[offset],
data[offset + 1],
data[offset + 2],
data[offset + 3],
]);
offset += 4;
if data.len() < offset + 4 {
return None;
}
let confidence = u32::from_be_bytes([
data[offset],
data[offset + 1],
data[offset + 2],
data[offset + 3],
]);
offset += 4;
if data.len() < offset + 8 {
return None;
}
let valid_until_slot = u64::from_be_bytes([
data[offset],
data[offset + 1],
data[offset + 2],
data[offset + 3],
data[offset + 4],
data[offset + 5],
data[offset + 6],
data[offset + 7],
]);
offset += 8;
if data.len() < offset + 8 {
return None;
}
let timestamp = u64::from_be_bytes([
data[offset],
data[offset + 1],
data[offset + 2],
data[offset + 3],
data[offset + 4],
data[offset + 5],
data[offset + 6],
data[offset + 7],
]);
Some(TipInstruction {
timestamp,
sender: sender.clone(),
sender_name: sender,
tip_wallet_address,
tip_amount_sol,
tip_tier,
expected_latency_ms,
confidence,
valid_until_slot,
alternative_senders: vec![],
})
})
.await
}
async fn subscribe_priority_fees(&self) -> Result<mpsc::Receiver<PriorityFee>> {
if !self.is_connected() {
return Err(SdkError::NotConnected);
}
self.subscribe_stream(StreamType::PriorityFees, |data| {
if data.len() < 22 || data[0] != StreamType::PriorityFees as u8 {
return None;
}
let compute_unit_price = u64::from_be_bytes([
data[1], data[2], data[3], data[4], data[5], data[6], data[7], data[8],
]);
let percentile = data[9];
let sample_count = u32::from_be_bytes([data[10], data[11], data[12], data[13]]);
let timestamp = u64::from_be_bytes([
data[14], data[15], data[16], data[17], data[18], data[19], data[20], data[21],
]);
let speed = match percentile {
0..=50 => "low",
51..=75 => "medium",
_ => "high",
}
.to_string();
let compute_unit_limit = 200_000u32;
let estimated_cost_sol = (compute_unit_price * compute_unit_limit as u64) as f64 / 1e15;
let landing_probability = match percentile {
0..=25 => 50,
26..=50 => 70,
51..=75 => 85,
76..=90 => 95,
_ => 99,
};
Some(PriorityFee {
timestamp,
speed,
compute_unit_price,
compute_unit_limit,
estimated_cost_sol,
landing_probability,
network_congestion: if compute_unit_price > 100_000 { "high" } else if compute_unit_price > 10_000 { "medium" } else { "low" }.to_string(),
recent_success_rate: sample_count as f64 / 100.0, })
})
.await
}
async fn subscribe_latest_blockhash(&self) -> Result<mpsc::Receiver<LatestBlockhash>> {
if !self.is_connected() {
return Err(SdkError::NotConnected);
}
self.subscribe_stream(StreamType::LatestBlockhash, |data| {
if data.len() < 2 || data[0] != StreamType::LatestBlockhash as u8 {
return None;
}
let mut offset = 1;
let hash_len = data[offset] as usize;
offset += 1;
if data.len() < offset + hash_len + 16 {
return None;
}
let blockhash = String::from_utf8_lossy(&data[offset..offset + hash_len]).to_string();
offset += hash_len;
let last_valid_block_height = u64::from_be_bytes([
data[offset], data[offset + 1], data[offset + 2], data[offset + 3],
data[offset + 4], data[offset + 5], data[offset + 6], data[offset + 7],
]);
offset += 8;
let timestamp = u64::from_be_bytes([
data[offset], data[offset + 1], data[offset + 2], data[offset + 3],
data[offset + 4], data[offset + 5], data[offset + 6], data[offset + 7],
]);
Some(LatestBlockhash {
blockhash,
last_valid_block_height,
timestamp,
})
})
.await
}
async fn subscribe_latest_slot(&self) -> Result<mpsc::Receiver<LatestSlot>> {
if !self.is_connected() {
return Err(SdkError::NotConnected);
}
self.subscribe_stream(StreamType::LatestSlot, |data| {
if data.len() < 17 || data[0] != StreamType::LatestSlot as u8 {
return None;
}
let slot = u64::from_be_bytes([
data[1], data[2], data[3], data[4],
data[5], data[6], data[7], data[8],
]);
let timestamp = u64::from_be_bytes([
data[9], data[10], data[11], data[12],
data[13], data[14], data[15], data[16],
]);
Some(LatestSlot {
slot,
timestamp,
})
})
.await
}
async fn ping(&self) -> Result<PingResult> {
if !self.is_connected() {
return Err(SdkError::NotConnected);
}
let connection = {
let guard = self.connection.read().await;
guard
.as_ref()
.ok_or(SdkError::NotConnected)?
.clone()
};
let seq = self.ping_seq.fetch_add(1, Ordering::Relaxed);
let client_send_time = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_millis() as u64;
let (mut send, mut recv) = connection
.open_bi()
.await
.map_err(|e| SdkError::connection(format!("Failed to open ping stream: {}", e)))?;
let mut frame = [0u8; 13];
frame[0] = StreamType::Ping as u8;
frame[1..5].copy_from_slice(&seq.to_be_bytes());
frame[5..13].copy_from_slice(&client_send_time.to_be_bytes());
send.write_all(&frame)
.await
.map_err(|e| SdkError::connection(format!("Failed to send ping: {}", e)))?;
let _ = send.finish();
let mut response = [0u8; 21];
recv.read_exact(&mut response)
.await
.map_err(|e| SdkError::connection(format!("Failed to read pong: {}", e)))?;
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_millis() as u64;
let _resp_type = response[0]; let _resp_seq = u32::from_be_bytes([response[1], response[2], response[3], response[4]]);
let _resp_client_time = u64::from_be_bytes(response[5..13].try_into().unwrap());
let server_time = u64::from_be_bytes(response[13..21].try_into().unwrap());
let rtt_ms = now.saturating_sub(client_send_time);
let clock_offset_ms = server_time as i64 - (client_send_time as i64 + rtt_ms as i64 / 2);
Ok(PingResult {
seq,
rtt_ms,
clock_offset_ms,
server_time,
})
}
}
#[derive(Debug)]
struct SkipServerVerification;
impl rustls::client::ServerCertVerifier for SkipServerVerification {
fn verify_server_cert(
&self,
_end_entity: &rustls::Certificate,
_intermediates: &[rustls::Certificate],
_server_name: &rustls::ServerName,
_scts: &mut dyn Iterator<Item = &[u8]>,
_ocsp_response: &[u8],
_now: std::time::SystemTime,
) -> std::result::Result<rustls::client::ServerCertVerified, rustls::Error> {
Ok(rustls::client::ServerCertVerified::assertion())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_quic_transport_new() {
let transport = QuicTransport::new();
assert!(!transport.is_connected());
assert_eq!(transport.protocol(), Protocol::Quic);
}
#[test]
fn test_parse_endpoint() {
let result = QuicTransport::parse_endpoint("quic://127.0.0.1:4433");
assert!(result.is_ok());
let (addr, host) = result.unwrap();
assert_eq!(addr.port(), 4433);
assert_eq!(host, "127.0.0.1");
let result = QuicTransport::parse_endpoint("127.0.0.1:4433");
assert!(result.is_ok());
let result = QuicTransport::parse_endpoint("invalid");
assert!(result.is_err());
}
#[test]
fn test_request_id_generation() {
let transport = QuicTransport::new();
let id1 = transport.next_request_id();
let id2 = transport.next_request_id();
assert_eq!(id1, 0);
assert_eq!(id2, 1);
}
#[test]
fn test_stream_type_values() {
assert_eq!(StreamType::TransactionSubmit as u8, 0x01);
assert_eq!(StreamType::LeaderHints as u8, 0x02);
assert_eq!(StreamType::TipInstructions as u8, 0x03);
assert_eq!(StreamType::PriorityFees as u8, 0x04);
assert_eq!(StreamType::Metrics as u8, 0x05);
assert_eq!(StreamType::LatestBlockhash as u8, 0x06);
assert_eq!(StreamType::LatestSlot as u8, 0x07);
}
#[test]
fn test_response_status_parsing() {
assert_eq!(ResponseStatus::from_byte(0x01), Some(ResponseStatus::Accepted));
assert_eq!(ResponseStatus::from_byte(0x02), Some(ResponseStatus::Duplicate));
assert_eq!(ResponseStatus::from_byte(0x03), Some(ResponseStatus::RateLimited));
assert_eq!(ResponseStatus::from_byte(0x04), Some(ResponseStatus::ServerError));
assert_eq!(ResponseStatus::from_byte(0x00), None);
assert_eq!(ResponseStatus::from_byte(0x05), None);
}
}