use std::{
net::TcpListener,
thread::sleep,
time::{Duration, Instant},
};
use crossbeam_channel::{Receiver, Sender};
use solana_commitment_config::CommitmentConfig;
use solana_keypair::Keypair;
use solana_pubkey::Pubkey;
use solana_rpc_client::rpc_client::RpcClient;
use solana_signer::Signer;
use surfpool_core::surfnet::{
locker::SurfnetSvmLocker,
svm::{SurfnetSvm, SurfnetSvmConfig},
};
use surfpool_types::{
BlockProductionMode, RpcConfig, SimnetCommand, SimnetConfig, SimnetEvent, SurfpoolConfig,
};
use crate::{
Cheatcodes,
error::{SurfnetError, SurfnetResult},
};
pub struct SurfnetBuilder {
offline_mode: bool,
remote_rpc_url: Option<String>,
block_production_mode: BlockProductionMode,
slot_time_ms: u64,
airdrop_addresses: Vec<Pubkey>,
airdrop_lamports: u64,
payer: Option<Keypair>,
}
impl Default for SurfnetBuilder {
fn default() -> Self {
Self {
offline_mode: true,
remote_rpc_url: None,
block_production_mode: BlockProductionMode::Transaction,
slot_time_ms: 1,
airdrop_addresses: vec![],
airdrop_lamports: 10_000_000_000, payer: None,
}
}
}
impl SurfnetBuilder {
pub fn offline(mut self, offline: bool) -> Self {
self.offline_mode = offline;
self
}
pub fn remote_rpc_url(mut self, url: impl Into<String>) -> Self {
self.remote_rpc_url = Some(url.into());
self.offline_mode = false;
self
}
pub fn block_production_mode(mut self, mode: BlockProductionMode) -> Self {
self.block_production_mode = mode;
self
}
pub fn slot_time_ms(mut self, ms: u64) -> Self {
self.slot_time_ms = ms;
self
}
pub fn airdrop_addresses(mut self, addresses: Vec<Pubkey>) -> Self {
self.airdrop_addresses = addresses;
self
}
pub fn airdrop_sol(mut self, lamports: u64) -> Self {
self.airdrop_lamports = lamports;
self
}
pub fn payer(mut self, keypair: Keypair) -> Self {
self.payer = Some(keypair);
self
}
pub async fn start(self) -> SurfnetResult<Surfnet> {
let payer = self.payer.unwrap_or_else(Keypair::new);
let airdrop_lamports = self.airdrop_lamports;
let bind_port = get_free_port()?;
let ws_port = get_free_port()?;
let bind_host = "127.0.0.1".to_string();
let mut startup_airdrop_addresses = vec![payer.pubkey()];
startup_airdrop_addresses.extend(self.airdrop_addresses);
let surfpool_config = SurfpoolConfig {
simnets: vec![SimnetConfig {
offline_mode: self.offline_mode,
remote_rpc_url: self.remote_rpc_url,
slot_time: self.slot_time_ms,
block_production_mode: self.block_production_mode,
airdrop_addresses: startup_airdrop_addresses.clone(),
airdrop_token_amount: airdrop_lamports,
..Default::default()
}],
rpc: RpcConfig {
bind_host: bind_host.clone(),
bind_port,
ws_port,
..Default::default()
},
..Default::default()
};
let rpc_url = format!("http://{bind_host}:{bind_port}");
let ws_url = format!("ws://{bind_host}:{ws_port}");
let svm_config = SurfnetSvmConfig {
surfnet_id: surfpool_config.simnets[0].surfnet_id.clone(),
slot_time: surfpool_config.simnets[0].slot_time,
instruction_profiling_enabled: surfpool_config.simnets[0].instruction_profiling_enabled,
max_profiles: surfpool_config.simnets[0].max_profiles,
log_bytes_limit: surfpool_config.simnets[0].log_bytes_limit,
..SurfnetSvmConfig::default()
};
let (surfnet_svm, simnet_events_rx, geyser_events_rx) = SurfnetSvm::new(svm_config)
.map_err(|e| SurfnetError::Runtime(format!("failed to initialize Surfnet SVM: {e}")))?;
let (simnet_commands_tx, simnet_commands_rx) = crossbeam_channel::unbounded();
let svm_locker = SurfnetSvmLocker::new(surfnet_svm);
let svm_locker_clone = svm_locker.clone();
let simnet_commands_tx_clone = simnet_commands_tx.clone();
let _handle = std::thread::Builder::new()
.name("surfnet-sdk".into())
.spawn(move || {
let future = surfpool_core::runloops::start_local_surfnet_runloop(
svm_locker_clone,
surfpool_config,
simnet_commands_tx_clone,
simnet_commands_rx,
geyser_events_rx,
);
if let Err(e) = hiro_system_kit::nestable_block_on(future) {
log::error!("Surfnet exited with error: {e}");
}
})
.map_err(|e| SurfnetError::Runtime(e.to_string()))?;
wait_for_ready(&simnet_events_rx)?;
wait_for_startup_airdrops(&rpc_url, &startup_airdrop_addresses, airdrop_lamports)?;
Ok(Surfnet {
rpc_url,
ws_url,
payer,
simnet_commands_tx,
simnet_events_rx,
svm_locker,
instance_id: uuid::Uuid::new_v4().to_string(),
})
}
}
pub struct Surfnet {
rpc_url: String,
ws_url: String,
payer: Keypair,
simnet_commands_tx: Sender<SimnetCommand>,
simnet_events_rx: Receiver<SimnetEvent>,
#[allow(dead_code)] svm_locker: SurfnetSvmLocker,
instance_id: String,
}
impl Surfnet {
pub async fn start() -> SurfnetResult<Self> {
SurfnetBuilder::default().start().await
}
pub fn builder() -> SurfnetBuilder {
SurfnetBuilder::default()
}
pub fn rpc_url(&self) -> &str {
&self.rpc_url
}
pub fn ws_url(&self) -> &str {
&self.ws_url
}
pub fn rpc_client(&self) -> RpcClient {
RpcClient::new(&self.rpc_url)
}
pub fn payer(&self) -> &Keypair {
&self.payer
}
pub fn cheatcodes(&self) -> Cheatcodes<'_> {
Cheatcodes::new(&self.rpc_url)
}
pub fn events(&self) -> &Receiver<SimnetEvent> {
&self.simnet_events_rx
}
pub fn send_command(&self, command: SimnetCommand) -> SurfnetResult<()> {
self.simnet_commands_tx
.send(command)
.map_err(|e| SurfnetError::Runtime(format!("failed to send command: {e}")))
}
pub fn instance_id(&self) -> &str {
&self.instance_id
}
}
impl Drop for Surfnet {
fn drop(&mut self) {
let _ = self.simnet_commands_tx.send(SimnetCommand::Terminate(None));
}
}
fn get_free_port() -> SurfnetResult<u16> {
let listener = TcpListener::bind("127.0.0.1:0")
.map_err(|e| SurfnetError::PortAllocation(e.to_string()))?;
let port = listener
.local_addr()
.map_err(|e| SurfnetError::PortAllocation(e.to_string()))?
.port();
drop(listener);
Ok(port)
}
fn wait_for_ready(events_rx: &Receiver<SimnetEvent>) -> SurfnetResult<()> {
loop {
match events_rx.recv() {
Ok(SimnetEvent::Ready(_)) => return Ok(()),
Ok(SimnetEvent::Aborted(err)) => return Err(SurfnetError::Aborted(err)),
Ok(SimnetEvent::Shutdown) => {
return Err(SurfnetError::Aborted(
"surfnet shut down during startup".into(),
));
}
Ok(_) => continue,
Err(e) => {
return Err(SurfnetError::Startup(format!(
"events channel closed unexpectedly: {e}"
)));
}
}
}
}
fn wait_for_startup_airdrops(
rpc_url: &str,
addresses: &[Pubkey],
expected_lamports: u64,
) -> SurfnetResult<()> {
let rpc_client = RpcClient::new(rpc_url.to_string());
let deadline = Instant::now() + Duration::from_secs(5);
let mut last_error = None;
let mut last_balances = vec![];
while Instant::now() < deadline {
last_balances.clear();
let mut all_match = true;
for address in addresses {
match rpc_client.get_balance_with_commitment(address, CommitmentConfig::processed()) {
Ok(response) => {
last_balances.push((address.to_string(), response.value));
if response.value != expected_lamports {
all_match = false;
}
}
Err(err) => {
last_error = Some(err.to_string());
all_match = false;
break;
}
}
}
if all_match {
return Ok(());
}
sleep(Duration::from_millis(25));
}
let balance_summary = if last_balances.is_empty() {
"no balances observed".to_string()
} else {
last_balances
.iter()
.map(|(address, balance)| format!("{address}={balance}"))
.collect::<Vec<_>>()
.join(", ")
};
Err(SurfnetError::Startup(format!(
"startup balances not visible over RPC within timeout (expected {expected_lamports}); last balances: {balance_summary}; last error: {}",
last_error.unwrap_or_else(|| "none".to_string())
)))
}