1use solana_message::{v0::LoadedAddresses, SimpleAddressLoader};
2use solana_sdk::transaction::MessageHash;
3use solana_transaction::sanitized::SanitizedTransaction;
4use solana_transaction_status::{InnerInstruction, InnerInstructions, TransactionStatusMeta};
5use std::{
6 collections::HashSet,
7 net::SocketAddr,
8 sync::Arc,
9 thread::{sleep, JoinHandle},
10 time::{Duration, Instant},
11};
12use surfpool_subgraph::SurfpoolSubgraphPlugin;
13use tokio::sync::RwLock;
14
15use crate::{
16 rpc::{
17 self, accounts_data::AccountsData, accounts_scan::AccountsScan, admin::AdminRpc,
18 bank_data::BankData, full::Full, minimal::Minimal, svm_tricks::SvmTricksRpc,
19 SurfpoolMiddleware,
20 },
21 surfnet::{GeyserEvent, SurfnetSvm},
22 PluginManagerCommand,
23};
24use agave_geyser_plugin_interface::geyser_plugin_interface::{
25 GeyserPlugin, ReplicaTransactionInfoV2, ReplicaTransactionInfoVersions,
26};
27use crossbeam::select;
28use crossbeam_channel::{unbounded, Receiver, Sender};
29use ipc_channel::{
30 ipc::{IpcOneShotServer, IpcReceiver},
31 router::RouterProxy,
32};
33use jsonrpc_core::MetaIoHandler;
34use jsonrpc_http_server::{DomainsValidation, ServerBuilder};
35use surfpool_types::{
36 BlockProductionMode, ClockCommand, ClockEvent, SchemaDataSourcingEvent, SubgraphPluginConfig,
37};
38use surfpool_types::{SimnetCommand, SimnetEvent, SubgraphCommand, SurfpoolConfig};
39
40const BLOCKHASH_SLOT_TTL: u64 = 75;
41
42pub async fn start_local_surfnet_runloop(
43 mut surfnet_svm: SurfnetSvm,
44 config: SurfpoolConfig,
45 subgraph_commands_tx: Sender<SubgraphCommand>,
46 simnet_commands_tx: Sender<SimnetCommand>,
47 simnet_commands_rx: Receiver<SimnetCommand>,
48 geyser_events_rx: Receiver<GeyserEvent>,
49) -> Result<(), Box<dyn std::error::Error>> {
50 let Some(simnet) = config.simnets.first() else {
51 return Ok(());
52 };
53 let block_production_mode = simnet.block_production_mode.clone();
54 surfnet_svm.airdrop_pubkeys(simnet.airdrop_token_amount, &simnet.airdrop_addresses);
55 let _ = surfnet_svm.connect(&simnet.remote_rpc_url).await?;
56
57 let simnet_events_tx_cc = surfnet_svm.simnet_events_tx.clone();
60 let svm_locker = Arc::new(RwLock::new(surfnet_svm));
61 let (plugin_manager_commands_rx, _rpc_handle) =
62 start_rpc_server_runloop(&config, &simnet_commands_tx, svm_locker.clone()).await?;
63
64 let simnet_config = simnet.clone();
65
66 if !config.plugin_config_path.is_empty() {
67 match start_geyser_runloop(
68 plugin_manager_commands_rx,
69 subgraph_commands_tx.clone(),
70 simnet_events_tx_cc.clone(),
71 geyser_events_rx,
72 ) {
73 Ok(_) => {}
74 Err(e) => {
75 let _ = simnet_events_tx_cc
76 .send(SimnetEvent::error(format!("Geyser plugin failed: {e}")));
77 }
78 };
79 }
80
81 let (clock_event_rx, clock_command_tx) = start_clock_runloop(simnet_config.slot_time);
82
83 start_block_production_runloop(
84 clock_event_rx,
85 clock_command_tx,
86 simnet_commands_rx,
87 svm_locker,
88 block_production_mode,
89 )
90 .await
91}
92
93pub async fn start_block_production_runloop(
94 clock_event_rx: Receiver<ClockEvent>,
95 clock_command_tx: Sender<ClockCommand>,
96 simnet_commands_rx: Receiver<SimnetCommand>,
97 svm_locker: Arc<RwLock<SurfnetSvm>>,
98 mut block_production_mode: BlockProductionMode,
99) -> Result<(), Box<dyn std::error::Error>> {
100 loop {
101 let mut do_produce_block = false;
102
103 select! {
104 recv(clock_event_rx) -> msg => if let Ok(event) = msg {
105 match event {
106 ClockEvent::Tick => {
107 if block_production_mode.eq(&BlockProductionMode::Clock) {
108 do_produce_block = true;
109 }
110 }
111 ClockEvent::ExpireBlockHash => {
112 do_produce_block = true;
113 }
114 }
115 },
116 recv(simnet_commands_rx) -> msg => if let Ok(event) = msg {
117 match event {
118 SimnetCommand::SlotForward(_key) => {
119 block_production_mode = BlockProductionMode::Manual;
120 do_produce_block = true;
121 }
122 SimnetCommand::SlotBackward(_key) => {
123
124 }
125 SimnetCommand::UpdateClock(update) => {
126 let _ = clock_command_tx.send(update);
127 continue
128 }
129 SimnetCommand::UpdateBlockProductionMode(update) => {
130 block_production_mode = update;
131 continue
132 }
133 SimnetCommand::TransactionReceived(_key, transaction, status_tx, skip_preflight) => {
134 let mut svm_writer = svm_locker.write().await;
135 svm_writer.process_transaction(transaction, status_tx ,skip_preflight).await?;
136 }
137 SimnetCommand::Terminate(_) => {
138 std::process::exit(0)
139 }
140 }
141 },
142 }
143
144 {
145 if do_produce_block {
146 let mut svm_writer = svm_locker.write().await;
147 svm_writer.confirm_transactions()?;
148 svm_writer.finalize_transactions()?;
149 svm_writer.new_blockhash();
150 }
151 }
152 }
153}
154
155pub fn start_clock_runloop(mut slot_time: u64) -> (Receiver<ClockEvent>, Sender<ClockCommand>) {
156 let (clock_event_tx, clock_event_rx) = unbounded::<ClockEvent>();
157 let (clock_command_tx, clock_command_rx) = unbounded::<ClockCommand>();
158
159 let _handle = hiro_system_kit::thread_named("clock").spawn(move || {
160 let mut enabled = true;
161 let mut block_hash_timeout = Instant::now();
162
163 loop {
164 match clock_command_rx.try_recv() {
165 Ok(ClockCommand::Pause) => {
166 enabled = false;
167 }
168 Ok(ClockCommand::Resume) => {
169 enabled = true;
170 }
171 Ok(ClockCommand::Toggle) => {
172 enabled = !enabled;
173 }
174 Ok(ClockCommand::UpdateSlotInterval(updated_slot_time)) => {
175 slot_time = updated_slot_time;
176 }
177 Err(_e) => {}
178 }
179 sleep(Duration::from_millis(slot_time));
180 if enabled {
181 let _ = clock_event_tx.send(ClockEvent::Tick);
182 if block_hash_timeout.elapsed()
184 > Duration::from_millis(BLOCKHASH_SLOT_TTL * slot_time)
185 {
186 let _ = clock_event_tx.send(ClockEvent::ExpireBlockHash);
187 block_hash_timeout = Instant::now();
188 }
189 }
190 }
191 });
192
193 (clock_event_rx, clock_command_tx)
194}
195
196fn start_geyser_runloop(
197 plugin_manager_commands_rx: Receiver<PluginManagerCommand>,
198 subgraph_commands_tx: Sender<SubgraphCommand>,
199 simnet_events_tx: Sender<SimnetEvent>,
200 geyser_events_rx: Receiver<GeyserEvent>,
201) -> Result<JoinHandle<Result<(), String>>, String> {
202 let handle = hiro_system_kit::thread_named("Geyser Plugins Handler").spawn(move || {
203 let mut plugin_manager = vec![];
204
205 let ipc_router = RouterProxy::new();
206 let err = loop {
244 select! {
245 recv(plugin_manager_commands_rx) -> msg => {
246 match msg {
247 Ok(event) => {
248 match event {
249 PluginManagerCommand::LoadConfig(uuid, config, notifier) => {
250 let _ = subgraph_commands_tx.send(SubgraphCommand::CreateSubgraph(uuid, config.data.clone(), notifier));
251 let mut plugin = SurfpoolSubgraphPlugin::default();
252
253 let (server, ipc_token) = IpcOneShotServer::<IpcReceiver<SchemaDataSourcingEvent>>::new().expect("Failed to create IPC one-shot server.");
254 let subgraph_plugin_config = SubgraphPluginConfig {
255 uuid,
256 ipc_token,
257 subgraph_request: config.data.clone()
258 };
259
260 let config_file = match serde_json::to_string(&subgraph_plugin_config) {
261 Ok(c) => c,
262 Err(e) => {
263 let _ = simnet_events_tx.send(SimnetEvent::error(format!("Failed to serialize subgraph plugin config: {:?}", e)));
264 continue;
265 }
266 };
267
268 if let Err(e) = plugin.on_load(&config_file, false) {
269 let _ = simnet_events_tx.send(SimnetEvent::error(format!("Failed to load Geyser plugin: {:?}", e)));
270 };
271 if let Ok((_, rx)) = server.accept() {
272 let subgraph_rx = ipc_router.route_ipc_receiver_to_new_crossbeam_receiver::<SchemaDataSourcingEvent>(rx);
273 let _ = subgraph_commands_tx.send(SubgraphCommand::ObserveSubgraph(subgraph_rx));
274 };
275 let plugin: Box<dyn GeyserPlugin> = Box::new(plugin);
276 plugin_manager.push(plugin);
277 let _ = simnet_events_tx.send(SimnetEvent::PluginLoaded("surfpool-subgraph".into()));
278 }
279 }
280 },
281 Err(e) => {
282 break format!("Failed to read plugin manager command: {:?}", e);
283 },
284 }
285 },
286 recv(geyser_events_rx) -> msg => match msg {
287 Err(e) => {
288 break format!("Failed to read new transaction to send to Geyser plugin: {e}");
289 },
290 Ok(GeyserEvent::NewTransaction(transaction, transaction_metadata, slot)) => {
291 let mut inner_instructions = vec![];
292 for (i,inner) in transaction_metadata.inner_instructions.iter().enumerate() {
293 inner_instructions.push(
294 InnerInstructions {
295 index: i as u8,
296 instructions: inner.iter().map(|i| InnerInstruction {
297 instruction: i.instruction.clone(),
298 stack_height: Some(i.stack_height as u32)
299 }).collect()
300 }
301 )
302 }
303
304 let transaction_status_meta = TransactionStatusMeta {
305 status: Ok(()),
306 fee: 0,
307 pre_balances: vec![],
308 post_balances: vec![],
309 inner_instructions: Some(inner_instructions),
310 log_messages: Some(transaction_metadata.logs.clone()),
311 pre_token_balances: None,
312 post_token_balances: None,
313 rewards: None,
314 loaded_addresses: LoadedAddresses {
315 writable: vec![],
316 readonly: vec![],
317 },
318 return_data: Some(transaction_metadata.return_data.clone()),
319 compute_units_consumed: Some(transaction_metadata.compute_units_consumed),
320 };
321
322 let transaction = match SanitizedTransaction::try_create(transaction, MessageHash::Compute, None, SimpleAddressLoader::Disabled, &HashSet::new()) {
323 Ok(tx) => tx,
324 Err(e) => {
325 let _ = simnet_events_tx.send(SimnetEvent::error(format!("Failed to notify Geyser plugin of new transaction: failed to serialize transaction: {:?}", e)));
326 continue;
327 }
328 };
329
330 let transaction_replica = ReplicaTransactionInfoV2 {
331 signature: &transaction_metadata.signature,
332 is_vote: false,
333 transaction: &transaction,
334 transaction_status_meta: &transaction_status_meta,
335 index: 0
336 };
337 for plugin in plugin_manager.iter() {
338 if let Err(e) = plugin.notify_transaction(ReplicaTransactionInfoVersions::V0_0_2(&transaction_replica), slot) {
339 let _ = simnet_events_tx.send(SimnetEvent::error(format!("Failed to notify Geyser plugin of new transaction: {:?}", e)));
340 };
341 }
342 }
343 }
344 }
345 };
346 Err(err)
347 }).map_err(|e| format!("Failed to spawn Geyser Plugins Handler thread: {:?}", e))?;
348 Ok(handle)
349}
350
351async fn start_rpc_server_runloop(
352 config: &SurfpoolConfig,
353 simnet_commands_tx: &Sender<SimnetCommand>,
354 svm_locker: Arc<RwLock<SurfnetSvm>>,
355) -> Result<(Receiver<PluginManagerCommand>, JoinHandle<()>), String> {
356 let (plugin_manager_commands_tx, plugin_manager_commands_rx) = unbounded();
357 let simnet_events_tx = svm_locker.read().await.simnet_events_tx.clone();
358
359 let middleware = SurfpoolMiddleware::new(
360 svm_locker,
361 &simnet_commands_tx,
362 &plugin_manager_commands_tx,
363 &config.rpc,
364 );
365 let server_bind: SocketAddr = config
366 .rpc
367 .get_socket_address()
368 .parse::<SocketAddr>()
369 .map_err(|e| e.to_string())?;
370
371 let mut io = MetaIoHandler::with_middleware(middleware);
372 io.extend_with(rpc::minimal::SurfpoolMinimalRpc.to_delegate());
373 io.extend_with(rpc::full::SurfpoolFullRpc.to_delegate());
374 io.extend_with(rpc::accounts_data::SurfpoolAccountsDataRpc.to_delegate());
375 io.extend_with(rpc::accounts_scan::SurfpoolAccountsScanRpc.to_delegate());
376 io.extend_with(rpc::bank_data::SurfpoolBankDataRpc.to_delegate());
377 io.extend_with(rpc::svm_tricks::SurfpoolSvmTricksRpc.to_delegate());
378 io.extend_with(rpc::admin::SurfpoolAdminRpc.to_delegate());
379
380 if !config.plugin_config_path.is_empty() {
381 io.extend_with(rpc::admin::SurfpoolAdminRpc.to_delegate());
382 }
383
384 let _ = std::net::TcpListener::bind(server_bind)
385 .map_err(|e| format!("Failed to start RPC server: {}", e))?;
386
387 let _handle = hiro_system_kit::thread_named("RPC Handler")
388 .spawn(move || {
389 let server = match ServerBuilder::new(io)
390 .cors(DomainsValidation::Disabled)
391 .start_http(&server_bind)
392 {
393 Ok(server) => server,
394 Err(e) => {
395 let _ = simnet_events_tx.send(SimnetEvent::Aborted(format!(
396 "Failed to start RPC server: {:?}",
397 e
398 )));
399 return;
400 }
401 };
402
403 server.wait();
404 let _ = simnet_events_tx.send(SimnetEvent::Shutdown);
405 })
406 .map_err(|e| format!("Failed to spawn RPC Handler thread: {:?}", e))?;
407 Ok((plugin_manager_commands_rx, _handle))
408}