1use std::{
2 collections::{HashMap, HashSet},
3 net::SocketAddr,
4 sync::Arc,
5 thread::{sleep, JoinHandle},
6 time::{Duration, Instant},
7};
8
9use agave_geyser_plugin_interface::geyser_plugin_interface::{
10 GeyserPlugin, ReplicaTransactionInfoV2, ReplicaTransactionInfoVersions,
11};
12use crossbeam::select;
13use crossbeam_channel::{unbounded, Receiver, Sender};
14use ipc_channel::{
15 ipc::{IpcOneShotServer, IpcReceiver},
16 router::RouterProxy,
17};
18use jsonrpc_core::MetaIoHandler;
19use jsonrpc_http_server::{DomainsValidation, ServerBuilder};
20use jsonrpc_pubsub::{PubSubHandler, Session};
21use jsonrpc_ws_server::{RequestContext, ServerBuilder as WsServerBuilder};
22use solana_commitment_config::CommitmentConfig;
23use solana_message::{v0::LoadedAddresses, SimpleAddressLoader};
24use solana_sdk::transaction::MessageHash;
25use solana_transaction::sanitized::SanitizedTransaction;
26use solana_transaction_status::{InnerInstruction, InnerInstructions, TransactionStatusMeta};
27use surfpool_subgraph::SurfpoolSubgraphPlugin;
28use surfpool_types::{
29 BlockProductionMode, ClockCommand, ClockEvent, SchemaDataSourcingEvent, SimnetCommand,
30 SimnetEvent, SubgraphCommand, SubgraphPluginConfig, SurfpoolConfig,
31};
32
33use crate::{
34 rpc::{
35 self, accounts_data::AccountsData, accounts_scan::AccountsScan, admin::AdminRpc,
36 bank_data::BankData, full::Full, minimal::Minimal, surfnet_cheatcodes::SvmTricksRpc,
37 ws::Rpc, RunloopContext, SurfpoolMiddleware, SurfpoolWebsocketMeta,
38 SurfpoolWebsocketMiddleware,
39 },
40 surfnet::{
41 locker::SurfnetSvmLocker,
42 remote::{SomeRemoteCtx, SurfnetRemoteClient},
43 GeyserEvent,
44 },
45 PluginManagerCommand,
46};
47
48const BLOCKHASH_SLOT_TTL: u64 = 75;
49
50pub async fn start_local_surfnet_runloop(
51 svm_locker: SurfnetSvmLocker,
52 config: SurfpoolConfig,
53 subgraph_commands_tx: Sender<SubgraphCommand>,
54 simnet_commands_tx: Sender<SimnetCommand>,
55 simnet_commands_rx: Receiver<SimnetCommand>,
56 geyser_events_rx: Receiver<GeyserEvent>,
57) -> Result<(), Box<dyn std::error::Error>> {
58 let Some(simnet) = config.simnets.first() else {
59 return Ok(());
60 };
61 let block_production_mode = simnet.block_production_mode.clone();
62
63 let remote_rpc_client = Some(SurfnetRemoteClient::new(&simnet.remote_rpc_url));
64
65 let _ = svm_locker.initialize(&remote_rpc_client).await?;
66
67 svm_locker.airdrop_pubkeys(simnet.airdrop_token_amount, &simnet.airdrop_addresses);
68 let simnet_events_tx_cc = svm_locker.simnet_events_tx();
69
70 let (plugin_manager_commands_rx, _rpc_handle, _ws_handle) = start_rpc_servers_runloop(
71 &config,
72 &simnet_commands_tx,
73 svm_locker.clone(),
74 &remote_rpc_client,
75 )
76 .await?;
77
78 let simnet_config = simnet.clone();
79
80 if !config.plugin_config_path.is_empty() {
81 match start_geyser_runloop(
82 plugin_manager_commands_rx,
83 subgraph_commands_tx.clone(),
84 simnet_events_tx_cc.clone(),
85 geyser_events_rx,
86 ) {
87 Ok(_) => {}
88 Err(e) => {
89 let _ = simnet_events_tx_cc
90 .send(SimnetEvent::error(format!("Geyser plugin failed: {e}")));
91 }
92 };
93 }
94
95 let (clock_event_rx, clock_command_tx) = start_clock_runloop(simnet_config.slot_time);
96
97 let _ = simnet_events_tx_cc.send(SimnetEvent::Ready);
98
99 start_block_production_runloop(
100 clock_event_rx,
101 clock_command_tx,
102 simnet_commands_rx,
103 svm_locker,
104 block_production_mode,
105 &remote_rpc_client,
106 )
107 .await
108}
109
110pub async fn start_block_production_runloop(
111 clock_event_rx: Receiver<ClockEvent>,
112 clock_command_tx: Sender<ClockCommand>,
113 simnet_commands_rx: Receiver<SimnetCommand>,
114 svm_locker: SurfnetSvmLocker,
115 mut block_production_mode: BlockProductionMode,
116 remote_rpc_client: &Option<SurfnetRemoteClient>,
117) -> Result<(), Box<dyn std::error::Error>> {
118 loop {
119 let mut do_produce_block = false;
120
121 select! {
122 recv(clock_event_rx) -> msg => if let Ok(event) = msg {
123 match event {
124 ClockEvent::Tick => {
125 if block_production_mode.eq(&BlockProductionMode::Clock) {
126 do_produce_block = true;
127 }
128 }
129 ClockEvent::ExpireBlockHash => {
130 do_produce_block = true;
131 }
132 }
133 },
134 recv(simnet_commands_rx) -> msg => if let Ok(event) = msg {
135 match event {
136 SimnetCommand::SlotForward(_key) => {
137 block_production_mode = BlockProductionMode::Manual;
138 do_produce_block = true;
139 }
140 SimnetCommand::SlotBackward(_key) => {
141
142 }
143 SimnetCommand::UpdateClock(update) => {
144 let _ = clock_command_tx.send(update);
145 continue
146 }
147 SimnetCommand::UpdateBlockProductionMode(update) => {
148 block_production_mode = update;
149 continue
150 }
151 SimnetCommand::TransactionReceived(_key, transaction, status_tx, skip_preflight) => {
152 svm_locker.process_transaction(&remote_rpc_client.get_remote_ctx(CommitmentConfig::confirmed()), transaction, status_tx, skip_preflight).await?;
153 }
154 SimnetCommand::Terminate(_) => {
155 std::process::exit(0)
156 }
157 }
158 },
159 }
160
161 {
162 if do_produce_block {
163 svm_locker.confirm_current_block()?;
164 }
165 }
166 }
167}
168
169pub fn start_clock_runloop(mut slot_time: u64) -> (Receiver<ClockEvent>, Sender<ClockCommand>) {
170 let (clock_event_tx, clock_event_rx) = unbounded::<ClockEvent>();
171 let (clock_command_tx, clock_command_rx) = unbounded::<ClockCommand>();
172
173 let _handle = hiro_system_kit::thread_named("clock").spawn(move || {
174 let mut enabled = true;
175 let mut block_hash_timeout = Instant::now();
176
177 loop {
178 match clock_command_rx.try_recv() {
179 Ok(ClockCommand::Pause) => {
180 enabled = false;
181 }
182 Ok(ClockCommand::Resume) => {
183 enabled = true;
184 }
185 Ok(ClockCommand::Toggle) => {
186 enabled = !enabled;
187 }
188 Ok(ClockCommand::UpdateSlotInterval(updated_slot_time)) => {
189 slot_time = updated_slot_time;
190 }
191 Err(_e) => {}
192 }
193 sleep(Duration::from_millis(slot_time));
194 if enabled {
195 let _ = clock_event_tx.send(ClockEvent::Tick);
196 if block_hash_timeout.elapsed()
198 > Duration::from_millis(BLOCKHASH_SLOT_TTL * slot_time)
199 {
200 let _ = clock_event_tx.send(ClockEvent::ExpireBlockHash);
201 block_hash_timeout = Instant::now();
202 }
203 }
204 }
205 });
206
207 (clock_event_rx, clock_command_tx)
208}
209
210fn start_geyser_runloop(
211 plugin_manager_commands_rx: Receiver<PluginManagerCommand>,
212 subgraph_commands_tx: Sender<SubgraphCommand>,
213 simnet_events_tx: Sender<SimnetEvent>,
214 geyser_events_rx: Receiver<GeyserEvent>,
215) -> Result<JoinHandle<Result<(), String>>, String> {
216 let handle = hiro_system_kit::thread_named("Geyser Plugins Handler").spawn(move || {
217 let mut plugin_manager = vec![];
218
219 let ipc_router = RouterProxy::new();
220 let err = loop {
258 select! {
259 recv(plugin_manager_commands_rx) -> msg => {
260 match msg {
261 Ok(event) => {
262 match event {
263 PluginManagerCommand::LoadConfig(uuid, config, notifier) => {
264 let _ = subgraph_commands_tx.send(SubgraphCommand::CreateSubgraph(uuid, config.data.clone(), notifier));
265 let mut plugin = SurfpoolSubgraphPlugin::default();
266
267 let (server, ipc_token) = IpcOneShotServer::<IpcReceiver<SchemaDataSourcingEvent>>::new().expect("Failed to create IPC one-shot server.");
268 let subgraph_plugin_config = SubgraphPluginConfig {
269 uuid,
270 ipc_token,
271 subgraph_request: config.data.clone()
272 };
273
274 let config_file = match serde_json::to_string(&subgraph_plugin_config) {
275 Ok(c) => c,
276 Err(e) => {
277 let _ = simnet_events_tx.send(SimnetEvent::error(format!("Failed to serialize subgraph plugin config: {:?}", e)));
278 continue;
279 }
280 };
281
282 if let Err(e) = plugin.on_load(&config_file, false) {
283 let _ = simnet_events_tx.send(SimnetEvent::error(format!("Failed to load Geyser plugin: {:?}", e)));
284 };
285 if let Ok((_, rx)) = server.accept() {
286 let subgraph_rx = ipc_router.route_ipc_receiver_to_new_crossbeam_receiver::<SchemaDataSourcingEvent>(rx);
287 let _ = subgraph_commands_tx.send(SubgraphCommand::ObserveSubgraph(subgraph_rx));
288 };
289 let plugin: Box<dyn GeyserPlugin> = Box::new(plugin);
290 plugin_manager.push(plugin);
291 let _ = simnet_events_tx.send(SimnetEvent::PluginLoaded("surfpool-subgraph".into()));
292 }
293 }
294 },
295 Err(e) => {
296 break format!("Failed to read plugin manager command: {:?}", e);
297 },
298 }
299 },
300 recv(geyser_events_rx) -> msg => match msg {
301 Err(e) => {
302 break format!("Failed to read new transaction to send to Geyser plugin: {e}");
303 },
304 Ok(GeyserEvent::NewTransaction(transaction, transaction_metadata, slot)) => {
305 let mut inner_instructions = vec![];
306 for (i,inner) in transaction_metadata.inner_instructions.iter().enumerate() {
307 inner_instructions.push(
308 InnerInstructions {
309 index: i as u8,
310 instructions: inner.iter().map(|i| InnerInstruction {
311 instruction: i.instruction.clone(),
312 stack_height: Some(i.stack_height as u32)
313 }).collect()
314 }
315 )
316 }
317
318 let transaction_status_meta = TransactionStatusMeta {
319 status: Ok(()),
320 fee: 0,
321 pre_balances: vec![],
322 post_balances: vec![],
323 inner_instructions: Some(inner_instructions),
324 log_messages: Some(transaction_metadata.logs.clone()),
325 pre_token_balances: None,
326 post_token_balances: None,
327 rewards: None,
328 loaded_addresses: LoadedAddresses {
329 writable: vec![],
330 readonly: vec![],
331 },
332 return_data: Some(transaction_metadata.return_data.clone()),
333 compute_units_consumed: Some(transaction_metadata.compute_units_consumed),
334 };
335
336 let transaction = match SanitizedTransaction::try_create(transaction, MessageHash::Compute, None, SimpleAddressLoader::Disabled, &HashSet::new()) {
337 Ok(tx) => tx,
338 Err(e) => {
339 let _ = simnet_events_tx.send(SimnetEvent::error(format!("Failed to notify Geyser plugin of new transaction: failed to serialize transaction: {:?}", e)));
340 continue;
341 }
342 };
343
344 let transaction_replica = ReplicaTransactionInfoV2 {
345 signature: &transaction_metadata.signature,
346 is_vote: false,
347 transaction: &transaction,
348 transaction_status_meta: &transaction_status_meta,
349 index: 0
350 };
351 for plugin in plugin_manager.iter() {
352 if let Err(e) = plugin.notify_transaction(ReplicaTransactionInfoVersions::V0_0_2(&transaction_replica), slot) {
353 let _ = simnet_events_tx.send(SimnetEvent::error(format!("Failed to notify Geyser plugin of new transaction: {:?}", e)));
354 };
355 }
356 }
357 }
358 }
359 };
360 Err(err)
361 }).map_err(|e| format!("Failed to spawn Geyser Plugins Handler thread: {:?}", e))?;
362 Ok(handle)
363}
364
365async fn start_rpc_servers_runloop(
366 config: &SurfpoolConfig,
367 simnet_commands_tx: &Sender<SimnetCommand>,
368 svm_locker: SurfnetSvmLocker,
369 remote_rpc_client: &Option<SurfnetRemoteClient>,
370) -> Result<
371 (
372 Receiver<PluginManagerCommand>,
373 JoinHandle<()>,
374 JoinHandle<()>,
375 ),
376 String,
377> {
378 let (plugin_manager_commands_tx, plugin_manager_commands_rx) = unbounded();
379 let simnet_events_tx = svm_locker.simnet_events_tx();
380
381 let middleware = SurfpoolMiddleware::new(
382 svm_locker,
383 simnet_commands_tx,
384 &plugin_manager_commands_tx,
385 &config.rpc,
386 remote_rpc_client,
387 );
388
389 let rpc_handle =
390 start_http_rpc_server_runloop(config, middleware.clone(), simnet_events_tx.clone()).await?;
391 let ws_handle = start_ws_rpc_server_runloop(config, middleware, simnet_events_tx).await?;
392 Ok((plugin_manager_commands_rx, rpc_handle, ws_handle))
393}
394
395async fn start_http_rpc_server_runloop(
396 config: &SurfpoolConfig,
397 middleware: SurfpoolMiddleware,
398 simnet_events_tx: Sender<SimnetEvent>,
399) -> Result<JoinHandle<()>, String> {
400 let server_bind: SocketAddr = config
401 .rpc
402 .get_socket_address()
403 .parse::<SocketAddr>()
404 .map_err(|e| e.to_string())?;
405
406 let mut io = MetaIoHandler::with_middleware(middleware.clone());
407 io.extend_with(rpc::minimal::SurfpoolMinimalRpc.to_delegate());
408 io.extend_with(rpc::full::SurfpoolFullRpc.to_delegate());
409 io.extend_with(rpc::accounts_data::SurfpoolAccountsDataRpc.to_delegate());
410 io.extend_with(rpc::accounts_scan::SurfpoolAccountsScanRpc.to_delegate());
411 io.extend_with(rpc::bank_data::SurfpoolBankDataRpc.to_delegate());
412 io.extend_with(rpc::surfnet_cheatcodes::SurfnetCheatcodesRpc.to_delegate());
413 io.extend_with(rpc::admin::SurfpoolAdminRpc.to_delegate());
414
415 if !config.plugin_config_path.is_empty() {
416 io.extend_with(rpc::admin::SurfpoolAdminRpc.to_delegate());
417 }
418
419 let _ = std::net::TcpListener::bind(server_bind)
420 .map_err(|e| format!("Failed to start RPC server: {}", e))?;
421
422 let _handle = hiro_system_kit::thread_named("RPC Handler")
423 .spawn(move || {
424 let server = match ServerBuilder::new(io)
425 .cors(DomainsValidation::Disabled)
426 .start_http(&server_bind)
427 {
428 Ok(server) => server,
429 Err(e) => {
430 let _ = simnet_events_tx.send(SimnetEvent::Aborted(format!(
431 "Failed to start RPC server: {:?}",
432 e
433 )));
434 return;
435 }
436 };
437
438 server.wait();
439 let _ = simnet_events_tx.send(SimnetEvent::Shutdown);
440 })
441 .map_err(|e| format!("Failed to spawn RPC Handler thread: {:?}", e))?;
442
443 Ok(_handle)
444}
445async fn start_ws_rpc_server_runloop(
446 config: &SurfpoolConfig,
447 middleware: SurfpoolMiddleware,
448 simnet_events_tx: Sender<SimnetEvent>,
449) -> Result<JoinHandle<()>, String> {
450 let ws_server_bind: SocketAddr = config
451 .rpc
452 .get_ws_address()
453 .parse::<SocketAddr>()
454 .map_err(|e| e.to_string())?;
455
456 let uid = std::sync::atomic::AtomicUsize::new(0);
457 let subscription_map = Arc::new(std::sync::RwLock::new(HashMap::new()));
458 let ws_middleware = SurfpoolWebsocketMiddleware::new(middleware.clone(), None);
459
460 let mut rpc_io = PubSubHandler::new(MetaIoHandler::with_middleware(ws_middleware));
461
462 let _ws_handle = hiro_system_kit::thread_named("WebSocket RPC Handler")
463 .spawn(move || {
464 let runtime = tokio::runtime::Builder::new_multi_thread()
466 .enable_all()
467 .build()
468 .expect("Failed to build Tokio runtime");
469
470 let tokio_handle = runtime.handle();
471 rpc_io.extend_with(
472 rpc::ws::SurfpoolWsRpc {
473 uid,
474 active: subscription_map,
475 tokio_handle: tokio_handle.clone(),
476 }
477 .to_delegate(),
478 );
479 runtime.block_on(async move {
480 let server = match WsServerBuilder::new(rpc_io)
481 .session_meta_extractor(move |ctx: &RequestContext| {
482 let runloop_context = RunloopContext {
484 id: None,
485 svm_locker: middleware.surfnet_svm.clone(),
486 simnet_commands_tx: middleware.simnet_commands_tx.clone(),
487 plugin_manager_commands_tx: middleware
488 .plugin_manager_commands_tx
489 .clone(),
490 remote_rpc_client: middleware.remote_rpc_client.clone(),
491 };
492 Some(SurfpoolWebsocketMeta::new(
493 runloop_context,
494 Some(Arc::new(Session::new(ctx.sender()))),
495 ))
496 })
497 .start(&ws_server_bind)
498 {
499 Ok(server) => server,
500 Err(e) => {
501 let _ = simnet_events_tx.send(SimnetEvent::Aborted(format!(
502 "Failed to start WebSocket RPC server: {:?}",
503 e
504 )));
505 return;
506 }
507 };
508 tokio::task::spawn_blocking(move || {
510 server.wait().unwrap();
511 })
512 .await
513 .ok();
514
515 let _ = simnet_events_tx.send(SimnetEvent::Shutdown);
516 });
517 })
518 .map_err(|e| format!("Failed to spawn WebSocket RPC Handler thread: {:?}", e))?;
519 Ok(_ws_handle)
520}