pub mod grpc;
pub mod http;
pub mod health;
pub mod quic;
pub mod selector;
pub mod websocket;
use crate::config::{Config, ProtocolTimeouts};
use crate::error::{Result, SdkError};
use crate::types::{
ConnectionInfo, LatestBlockhash, LatestSlot, LeaderHint, PingResult, PriorityFee, Protocol,
SubmitOptions, TipInstruction, TransactionResult,
};
use async_trait::async_trait;
use std::time::Duration;
use tokio::sync::mpsc;
use tracing::{debug, info, warn};
pub use async_trait::async_trait as transport_trait;
#[async_trait]
pub trait Transport: Send + Sync {
async fn connect(&mut self, config: &Config) -> Result<ConnectionInfo>;
async fn disconnect(&mut self) -> Result<()>;
fn is_connected(&self) -> bool;
fn protocol(&self) -> Protocol;
async fn submit_transaction(
&self,
transaction: &[u8],
options: &SubmitOptions,
) -> Result<TransactionResult>;
async fn subscribe_leader_hints(&self) -> Result<mpsc::Receiver<LeaderHint>>;
async fn subscribe_tip_instructions(&self) -> Result<mpsc::Receiver<TipInstruction>>;
async fn subscribe_priority_fees(&self) -> Result<mpsc::Receiver<PriorityFee>>;
async fn subscribe_latest_blockhash(&self) -> Result<mpsc::Receiver<LatestBlockhash>>;
async fn subscribe_latest_slot(&self) -> Result<mpsc::Receiver<LatestSlot>>;
async fn ping(&self) -> Result<PingResult> {
Err(SdkError::connection("ping not supported on this transport"))
}
}
pub struct FallbackChain {
timeouts: ProtocolTimeouts,
}
impl FallbackChain {
pub fn new(timeouts: ProtocolTimeouts) -> Self {
Self { timeouts }
}
pub fn timeout_for(&self, protocol: Protocol) -> Duration {
match protocol {
Protocol::Quic => self.timeouts.quic,
Protocol::Grpc => self.timeouts.grpc,
Protocol::WebSocket => self.timeouts.websocket,
Protocol::Http => self.timeouts.http,
}
}
pub async fn connect(&self, config: &Config) -> Result<Box<dyn Transport>> {
let mut resolved_config = config.clone();
if config.endpoint.is_none() && config.selected_worker.is_none() {
debug!("No endpoint configured, performing worker selection");
let selector = selector::WorkerSelectorBuilder::new()
.add_worker_host("local-1", "us-east", "127.0.0.1")
.build();
match selector.select_best().await {
Ok(worker) => {
info!(worker = %worker.id, "Selected best worker");
resolved_config.selected_worker = Some(worker.clone());
}
Err(e) => {
warn!(error = %e, "Worker selection failed, falling back to defaults");
}
}
}
let config = &resolved_config;
if let Some(preferred) = config.preferred_protocol {
return self.try_protocol(config, preferred).await;
}
let mut last_error = None;
for protocol in Protocol::fallback_order() {
if let Some(ref worker) = config.selected_worker {
if worker.get_endpoint(*protocol).is_none() {
debug!(protocol = ?protocol, "No endpoint configured, skipping");
continue;
}
}
match self.try_protocol(config, *protocol).await {
Ok(transport) => {
info!(protocol = ?protocol, "Connected successfully");
return Ok(transport);
}
Err(e) => {
warn!(protocol = ?protocol, error = %e, "Protocol failed, trying next");
last_error = Some(e);
}
}
}
Err(last_error.unwrap_or(SdkError::AllProtocolsFailed))
}
async fn try_protocol(&self, config: &Config, protocol: Protocol) -> Result<Box<dyn Transport>> {
let timeout = self.timeout_for(protocol);
debug!(protocol = ?protocol, timeout_ms = timeout.as_millis(), "Attempting protocol");
match protocol {
Protocol::Http => {
let mut transport = http::HttpTransport::new();
tokio::time::timeout(timeout, transport.connect(config))
.await
.map_err(|_| SdkError::Timeout(timeout))??;
Ok(Box::new(transport))
}
Protocol::WebSocket => {
let mut transport = websocket::WebSocketTransport::new();
tokio::time::timeout(timeout, transport.connect(config))
.await
.map_err(|_| SdkError::Timeout(timeout))??;
Ok(Box::new(transport))
}
Protocol::Grpc => {
let mut transport = grpc::GrpcTransport::new();
tokio::time::timeout(timeout, transport.connect(config))
.await
.map_err(|_| SdkError::Timeout(timeout))??;
Ok(Box::new(transport))
}
Protocol::Quic => {
let mut transport = quic::QuicTransport::new();
tokio::time::timeout(timeout, transport.connect(config))
.await
.map_err(|_| SdkError::Timeout(timeout))??;
Ok(Box::new(transport))
}
}
}
}
impl Default for FallbackChain {
fn default() -> Self {
Self::new(ProtocolTimeouts::default())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_fallback_chain_timeouts() {
let chain = FallbackChain::default();
assert_eq!(chain.timeout_for(Protocol::Quic), Duration::from_millis(2000));
assert_eq!(chain.timeout_for(Protocol::Grpc), Duration::from_millis(3000));
assert_eq!(chain.timeout_for(Protocol::WebSocket), Duration::from_millis(3000));
assert_eq!(chain.timeout_for(Protocol::Http), Duration::from_millis(5000));
}
}