1use std::{
2 collections::{HashMap, HashSet},
3 net::SocketAddr,
4 sync::{Arc, RwLock},
5 thread::{sleep, JoinHandle},
6 time::{Duration, Instant},
7};
8
9use agave_geyser_plugin_interface::geyser_plugin_interface::{
10 GeyserPlugin, ReplicaTransactionInfoV2, ReplicaTransactionInfoVersions,
11};
12use chrono::Utc;
13use crossbeam::select;
14use crossbeam_channel::{unbounded, Receiver, Sender};
15use ipc_channel::{
16 ipc::{IpcOneShotServer, IpcReceiver},
17 router::RouterProxy,
18};
19use jsonrpc_core::MetaIoHandler;
20use jsonrpc_http_server::{DomainsValidation, ServerBuilder};
21use jsonrpc_pubsub::{PubSubHandler, Session};
22use jsonrpc_ws_server::{RequestContext, ServerBuilder as WsServerBuilder};
23use solana_message::{v0::LoadedAddresses, SimpleAddressLoader};
24use solana_sdk::transaction::MessageHash;
25use solana_transaction::sanitized::SanitizedTransaction;
26use solana_transaction_status::{InnerInstruction, InnerInstructions, TransactionStatusMeta};
27use surfpool_subgraph::SurfpoolSubgraphPlugin;
28use surfpool_types::{
29 BlockProductionMode, ClockCommand, ClockEvent, SchemaDataSourcingEvent, SimnetCommand,
30 SimnetEvent, SubgraphCommand, SubgraphPluginConfig, SurfpoolConfig,
31};
32
33use crate::{
34 rpc::{
35 self, accounts_data::AccountsData, accounts_scan::AccountsScan, admin::AdminRpc,
36 bank_data::BankData, full::Full, minimal::Minimal, surfnet_cheatcodes::SvmTricksRpc,
37 ws::Rpc, RunloopContext, SurfpoolMiddleware, SurfpoolWebsocketMeta,
38 SurfpoolWebsocketMiddleware,
39 },
40 surfnet::{locker::SurfnetSvmLocker, remote::SurfnetRemoteClient, GeyserEvent},
41 PluginManagerCommand,
42};
43
44const BLOCKHASH_SLOT_TTL: u64 = 75;
45
46pub async fn start_local_surfnet_runloop(
47 svm_locker: SurfnetSvmLocker,
48 config: SurfpoolConfig,
49 subgraph_commands_tx: Sender<SubgraphCommand>,
50 simnet_commands_tx: Sender<SimnetCommand>,
51 simnet_commands_rx: Receiver<SimnetCommand>,
52 geyser_events_rx: Receiver<GeyserEvent>,
53) -> Result<(), Box<dyn std::error::Error>> {
54 let Some(simnet) = config.simnets.first() else {
55 return Ok(());
56 };
57 let block_production_mode = simnet.block_production_mode.clone();
58
59 let remote_rpc_client = Some(SurfnetRemoteClient::new(&simnet.remote_rpc_url));
60
61 let _ = svm_locker.initialize(&remote_rpc_client).await?;
62
63 svm_locker.airdrop_pubkeys(simnet.airdrop_token_amount, &simnet.airdrop_addresses);
64 let simnet_events_tx_cc = svm_locker.simnet_events_tx();
65
66 let (plugin_manager_commands_rx, _rpc_handle, _ws_handle) = start_rpc_servers_runloop(
67 &config,
68 &simnet_commands_tx,
69 svm_locker.clone(),
70 &remote_rpc_client,
71 )
72 .await?;
73
74 let simnet_config = simnet.clone();
75
76 if !config.plugin_config_path.is_empty() {
77 match start_geyser_runloop(
78 plugin_manager_commands_rx,
79 subgraph_commands_tx.clone(),
80 simnet_events_tx_cc.clone(),
81 geyser_events_rx,
82 ) {
83 Ok(_) => {}
84 Err(e) => {
85 let _ = simnet_events_tx_cc
86 .send(SimnetEvent::error(format!("Geyser plugin failed: {e}")));
87 }
88 };
89 }
90
91 let (clock_event_rx, clock_command_tx) = start_clock_runloop(simnet_config.slot_time);
92
93 let _ = simnet_events_tx_cc.send(SimnetEvent::Ready);
94
95 start_block_production_runloop(
96 clock_event_rx,
97 clock_command_tx,
98 simnet_commands_rx,
99 simnet_commands_tx.clone(),
100 svm_locker,
101 block_production_mode,
102 &remote_rpc_client,
103 simnet_config.expiry.map(|e| e * 1000),
104 )
105 .await
106}
107
108pub async fn start_block_production_runloop(
109 clock_event_rx: Receiver<ClockEvent>,
110 clock_command_tx: Sender<ClockCommand>,
111 simnet_commands_rx: Receiver<SimnetCommand>,
112 simnet_commands_tx: Sender<SimnetCommand>,
113 svm_locker: SurfnetSvmLocker,
114 mut block_production_mode: BlockProductionMode,
115 remote_rpc_client: &Option<SurfnetRemoteClient>,
116 expiry_duration_ms: Option<u64>,
117) -> Result<(), Box<dyn std::error::Error>> {
118 let mut next_scheduled_expiry_check: Option<u64> =
119 expiry_duration_ms.map(|expiry_val| Utc::now().timestamp_millis() as u64 + expiry_val);
120 loop {
121 let mut do_produce_block = false;
122
123 select! {
124 recv(clock_event_rx) -> msg => if let Ok(event) = msg {
125 match event {
126 ClockEvent::Tick => {
127 if block_production_mode.eq(&BlockProductionMode::Clock) {
128 do_produce_block = true;
129 }
130
131 if let Some(expiry_ms) = expiry_duration_ms {
132 if let Some(scheduled_time_ref) = &mut next_scheduled_expiry_check {
133 let now_ms = Utc::now().timestamp_millis() as u64;
134 if now_ms >= *scheduled_time_ref {
135 let svm = svm_locker.0.read().await;
136 if svm.updated_at + expiry_ms < now_ms {
137 let _ = simnet_commands_tx.send(SimnetCommand::Terminate(None));
138 } else {
139 *scheduled_time_ref = svm.updated_at + expiry_ms;
140 }
141 }
142 }
143 }
144 }
145 ClockEvent::ExpireBlockHash => {
146 do_produce_block = true;
147 }
148 }
149 },
150 recv(simnet_commands_rx) -> msg => if let Ok(event) = msg {
151 match event {
152 SimnetCommand::SlotForward(_key) => {
153 block_production_mode = BlockProductionMode::Manual;
154 do_produce_block = true;
155 }
156 SimnetCommand::SlotBackward(_key) => {
157
158 }
159 SimnetCommand::UpdateClock(update) => {
160 let _ = clock_command_tx.send(update);
161 continue
162 }
163 SimnetCommand::UpdateBlockProductionMode(update) => {
164 block_production_mode = update;
165 continue
166 }
167 SimnetCommand::TransactionReceived(_key, transaction, status_tx, skip_preflight) => {
168 svm_locker.process_transaction(remote_rpc_client, transaction, status_tx, skip_preflight).await?;
169 }
170 SimnetCommand::Terminate(_) => {
171 let _ = svm_locker.simnet_events_tx().send(SimnetEvent::Aborted("Terminated due to inactivity.".to_string()));
172 break;
173 }
174 }
175 },
176 }
177
178 {
179 if do_produce_block {
180 svm_locker.confirm_current_block()?;
181 }
182 }
183 }
184 Ok(())
185}
186
187pub fn start_clock_runloop(mut slot_time: u64) -> (Receiver<ClockEvent>, Sender<ClockCommand>) {
188 let (clock_event_tx, clock_event_rx) = unbounded::<ClockEvent>();
189 let (clock_command_tx, clock_command_rx) = unbounded::<ClockCommand>();
190
191 let _handle = hiro_system_kit::thread_named("clock").spawn(move || {
192 let mut enabled = true;
193 let mut block_hash_timeout = Instant::now();
194
195 loop {
196 match clock_command_rx.try_recv() {
197 Ok(ClockCommand::Pause) => {
198 enabled = false;
199 }
200 Ok(ClockCommand::Resume) => {
201 enabled = true;
202 }
203 Ok(ClockCommand::Toggle) => {
204 enabled = !enabled;
205 }
206 Ok(ClockCommand::UpdateSlotInterval(updated_slot_time)) => {
207 slot_time = updated_slot_time;
208 }
209 Err(_e) => {}
210 }
211 sleep(Duration::from_millis(slot_time));
212 if enabled {
213 let _ = clock_event_tx.send(ClockEvent::Tick);
214 if block_hash_timeout.elapsed()
216 > Duration::from_millis(BLOCKHASH_SLOT_TTL * slot_time)
217 {
218 let _ = clock_event_tx.send(ClockEvent::ExpireBlockHash);
219 block_hash_timeout = Instant::now();
220 }
221 }
222 }
223 });
224
225 (clock_event_rx, clock_command_tx)
226}
227
228fn start_geyser_runloop(
229 plugin_manager_commands_rx: Receiver<PluginManagerCommand>,
230 subgraph_commands_tx: Sender<SubgraphCommand>,
231 simnet_events_tx: Sender<SimnetEvent>,
232 geyser_events_rx: Receiver<GeyserEvent>,
233) -> Result<JoinHandle<Result<(), String>>, String> {
234 let handle = hiro_system_kit::thread_named("Geyser Plugins Handler").spawn(move || {
235 let mut plugin_manager = vec![];
236
237 let ipc_router = RouterProxy::new();
238 let err = loop {
276 select! {
277 recv(plugin_manager_commands_rx) -> msg => {
278 match msg {
279 Ok(event) => {
280 match event {
281 PluginManagerCommand::LoadConfig(uuid, config, notifier) => {
282 let _ = subgraph_commands_tx.send(SubgraphCommand::CreateSubgraph(uuid, config.data.clone(), notifier));
283 let mut plugin = SurfpoolSubgraphPlugin::default();
284
285 let (server, ipc_token) = IpcOneShotServer::<IpcReceiver<SchemaDataSourcingEvent>>::new().expect("Failed to create IPC one-shot server.");
286 let subgraph_plugin_config = SubgraphPluginConfig {
287 uuid,
288 ipc_token,
289 subgraph_request: config.data.clone()
290 };
291
292 let config_file = match serde_json::to_string(&subgraph_plugin_config) {
293 Ok(c) => c,
294 Err(e) => {
295 let _ = simnet_events_tx.send(SimnetEvent::error(format!("Failed to serialize subgraph plugin config: {:?}", e)));
296 continue;
297 }
298 };
299
300 if let Err(e) = plugin.on_load(&config_file, false) {
301 let _ = simnet_events_tx.send(SimnetEvent::error(format!("Failed to load Geyser plugin: {:?}", e)));
302 };
303 if let Ok((_, rx)) = server.accept() {
304 let subgraph_rx = ipc_router.route_ipc_receiver_to_new_crossbeam_receiver::<SchemaDataSourcingEvent>(rx);
305 let _ = subgraph_commands_tx.send(SubgraphCommand::ObserveSubgraph(subgraph_rx));
306 };
307 let plugin: Box<dyn GeyserPlugin> = Box::new(plugin);
308 plugin_manager.push(plugin);
309 let _ = simnet_events_tx.send(SimnetEvent::PluginLoaded("surfpool-subgraph".into()));
310 }
311 }
312 },
313 Err(e) => {
314 break format!("Failed to read plugin manager command: {:?}", e);
315 },
316 }
317 },
318 recv(geyser_events_rx) -> msg => match msg {
319 Err(e) => {
320 break format!("Failed to read new transaction to send to Geyser plugin: {e}");
321 },
322 Ok(GeyserEvent::NewTransaction(transaction, transaction_metadata, slot)) => {
323 let mut inner_instructions = vec![];
324 for (i,inner) in transaction_metadata.inner_instructions.iter().enumerate() {
325 inner_instructions.push(
326 InnerInstructions {
327 index: i as u8,
328 instructions: inner.iter().map(|i| InnerInstruction {
329 instruction: i.instruction.clone(),
330 stack_height: Some(i.stack_height as u32)
331 }).collect()
332 }
333 )
334 }
335
336 let transaction_status_meta = TransactionStatusMeta {
337 status: Ok(()),
338 fee: 0,
339 pre_balances: vec![],
340 post_balances: vec![],
341 inner_instructions: Some(inner_instructions),
342 log_messages: Some(transaction_metadata.logs.clone()),
343 pre_token_balances: None,
344 post_token_balances: None,
345 rewards: None,
346 loaded_addresses: LoadedAddresses {
347 writable: vec![],
348 readonly: vec![],
349 },
350 return_data: Some(transaction_metadata.return_data.clone()),
351 compute_units_consumed: Some(transaction_metadata.compute_units_consumed),
352 };
353
354 let transaction = match SanitizedTransaction::try_create(transaction, MessageHash::Compute, None, SimpleAddressLoader::Disabled, &HashSet::new()) {
355 Ok(tx) => tx,
356 Err(e) => {
357 let _ = simnet_events_tx.send(SimnetEvent::error(format!("Failed to notify Geyser plugin of new transaction: failed to serialize transaction: {:?}", e)));
358 continue;
359 }
360 };
361
362 let transaction_replica = ReplicaTransactionInfoV2 {
363 signature: &transaction_metadata.signature,
364 is_vote: false,
365 transaction: &transaction,
366 transaction_status_meta: &transaction_status_meta,
367 index: 0
368 };
369 for plugin in plugin_manager.iter() {
370 if let Err(e) = plugin.notify_transaction(ReplicaTransactionInfoVersions::V0_0_2(&transaction_replica), slot) {
371 let _ = simnet_events_tx.send(SimnetEvent::error(format!("Failed to notify Geyser plugin of new transaction: {:?}", e)));
372 };
373 }
374 }
375 }
376 }
377 };
378 Err(err)
379 }).map_err(|e| format!("Failed to spawn Geyser Plugins Handler thread: {:?}", e))?;
380 Ok(handle)
381}
382
383async fn start_rpc_servers_runloop(
384 config: &SurfpoolConfig,
385 simnet_commands_tx: &Sender<SimnetCommand>,
386 svm_locker: SurfnetSvmLocker,
387 remote_rpc_client: &Option<SurfnetRemoteClient>,
388) -> Result<
389 (
390 Receiver<PluginManagerCommand>,
391 JoinHandle<()>,
392 JoinHandle<()>,
393 ),
394 String,
395> {
396 let (plugin_manager_commands_tx, plugin_manager_commands_rx) = unbounded();
397 let simnet_events_tx = svm_locker.simnet_events_tx();
398
399 let middleware = SurfpoolMiddleware::new(
400 svm_locker,
401 simnet_commands_tx,
402 &plugin_manager_commands_tx,
403 &config.rpc,
404 remote_rpc_client,
405 );
406
407 let rpc_handle =
408 start_http_rpc_server_runloop(config, middleware.clone(), simnet_events_tx.clone()).await?;
409 let ws_handle = start_ws_rpc_server_runloop(config, middleware, simnet_events_tx).await?;
410 Ok((plugin_manager_commands_rx, rpc_handle, ws_handle))
411}
412
413async fn start_http_rpc_server_runloop(
414 config: &SurfpoolConfig,
415 middleware: SurfpoolMiddleware,
416 simnet_events_tx: Sender<SimnetEvent>,
417) -> Result<JoinHandle<()>, String> {
418 let server_bind: SocketAddr = config
419 .rpc
420 .get_socket_address()
421 .parse::<SocketAddr>()
422 .map_err(|e| e.to_string())?;
423
424 let mut io = MetaIoHandler::with_middleware(middleware.clone());
425 io.extend_with(rpc::minimal::SurfpoolMinimalRpc.to_delegate());
426 io.extend_with(rpc::full::SurfpoolFullRpc.to_delegate());
427 io.extend_with(rpc::accounts_data::SurfpoolAccountsDataRpc.to_delegate());
428 io.extend_with(rpc::accounts_scan::SurfpoolAccountsScanRpc.to_delegate());
429 io.extend_with(rpc::bank_data::SurfpoolBankDataRpc.to_delegate());
430 io.extend_with(rpc::surfnet_cheatcodes::SurfnetCheatcodesRpc.to_delegate());
431 io.extend_with(rpc::admin::SurfpoolAdminRpc.to_delegate());
432
433 if !config.plugin_config_path.is_empty() {
434 io.extend_with(rpc::admin::SurfpoolAdminRpc.to_delegate());
435 }
436
437 let _ = std::net::TcpListener::bind(server_bind)
438 .map_err(|e| format!("Failed to start RPC server: {}", e))?;
439
440 let _handle = hiro_system_kit::thread_named("RPC Handler")
441 .spawn(move || {
442 let server = match ServerBuilder::new(io)
443 .cors(DomainsValidation::Disabled)
444 .start_http(&server_bind)
445 {
446 Ok(server) => server,
447 Err(e) => {
448 let _ = simnet_events_tx.send(SimnetEvent::Aborted(format!(
449 "Failed to start RPC server: {:?}",
450 e
451 )));
452 return;
453 }
454 };
455
456 server.wait();
457 let _ = simnet_events_tx.send(SimnetEvent::Shutdown);
458 })
459 .map_err(|e| format!("Failed to spawn RPC Handler thread: {:?}", e))?;
460
461 Ok(_handle)
462}
463async fn start_ws_rpc_server_runloop(
464 config: &SurfpoolConfig,
465 middleware: SurfpoolMiddleware,
466 simnet_events_tx: Sender<SimnetEvent>,
467) -> Result<JoinHandle<()>, String> {
468 let ws_server_bind: SocketAddr = config
469 .rpc
470 .get_ws_address()
471 .parse::<SocketAddr>()
472 .map_err(|e| e.to_string())?;
473
474 let uid = std::sync::atomic::AtomicUsize::new(0);
475 let ws_middleware = SurfpoolWebsocketMiddleware::new(middleware.clone(), None);
476
477 let mut rpc_io = PubSubHandler::new(MetaIoHandler::with_middleware(ws_middleware));
478
479 let _ws_handle = hiro_system_kit::thread_named("WebSocket RPC Handler")
480 .spawn(move || {
481 let runtime = tokio::runtime::Builder::new_multi_thread()
483 .enable_all()
484 .build()
485 .expect("Failed to build Tokio runtime");
486
487 let tokio_handle = runtime.handle();
488 rpc_io.extend_with(
489 rpc::ws::SurfpoolWsRpc {
490 uid,
491 signature_subscription_map: Arc::new(RwLock::new(HashMap::new())),
492 account_subscription_map: Arc::new(RwLock::new(HashMap::new())),
493 slot_subscription_map: Arc::new(RwLock::new(HashMap::new())),
494 tokio_handle: tokio_handle.clone(),
495 }
496 .to_delegate(),
497 );
498 runtime.block_on(async move {
499 let server = match WsServerBuilder::new(rpc_io)
500 .session_meta_extractor(move |ctx: &RequestContext| {
501 let runloop_context = RunloopContext {
503 id: None,
504 svm_locker: middleware.surfnet_svm.clone(),
505 simnet_commands_tx: middleware.simnet_commands_tx.clone(),
506 plugin_manager_commands_tx: middleware
507 .plugin_manager_commands_tx
508 .clone(),
509 remote_rpc_client: middleware.remote_rpc_client.clone(),
510 };
511 Some(SurfpoolWebsocketMeta::new(
512 runloop_context,
513 Some(Arc::new(Session::new(ctx.sender()))),
514 ))
515 })
516 .start(&ws_server_bind)
517 {
518 Ok(server) => server,
519 Err(e) => {
520 let _ = simnet_events_tx.send(SimnetEvent::Aborted(format!(
521 "Failed to start WebSocket RPC server: {:?}",
522 e
523 )));
524 return;
525 }
526 };
527 tokio::task::spawn_blocking(move || {
529 server.wait().unwrap();
530 })
531 .await
532 .ok();
533
534 let _ = simnet_events_tx.send(SimnetEvent::Shutdown);
535 });
536 })
537 .map_err(|e| format!("Failed to spawn WebSocket RPC Handler thread: {:?}", e))?;
538 Ok(_ws_handle)
539}