1#![allow(unused_imports, dead_code, unused_mut, unused_variables)]
2
3use std::{
4 collections::{HashMap, HashSet},
5 net::SocketAddr,
6 path::PathBuf,
7 sync::{Arc, RwLock},
8 thread::{JoinHandle, sleep},
9 time::{Duration, Instant},
10};
11
12use agave_geyser_plugin_interface::geyser_plugin_interface::{
13 GeyserPlugin, ReplicaBlockInfoV4, ReplicaBlockInfoVersions, ReplicaEntryInfoV2,
14 ReplicaEntryInfoVersions, ReplicaTransactionInfoV3, ReplicaTransactionInfoVersions, SlotStatus,
15};
16use chrono::{Local, Utc};
17use crossbeam::select;
18use crossbeam_channel::{Receiver, Sender, unbounded};
19use ipc_channel::{
20 ipc::{IpcOneShotServer, IpcReceiver},
21 router::RouterProxy,
22};
23use itertools::Itertools;
24use jsonrpc_core::MetaIoHandler;
25use jsonrpc_http_server::{DomainsValidation, ServerBuilder};
26use jsonrpc_pubsub::{PubSubHandler, Session};
27use jsonrpc_ws_server::{RequestContext, ServerBuilder as WsServerBuilder};
28use libloading::{Library, Symbol};
29use serde::Serialize;
30use solana_commitment_config::CommitmentConfig;
31#[cfg(feature = "geyser_plugin")]
32use solana_geyser_plugin_manager::geyser_plugin_manager::{
33 GeyserPluginManager, LoadedGeyserPlugin,
34};
35use solana_message::SimpleAddressLoader;
36use solana_transaction::sanitized::{MessageHash, SanitizedTransaction};
37use solana_transaction_status::RewardsAndNumPartitions;
38#[cfg(feature = "subgraph")]
39use surfpool_subgraph::SurfpoolSubgraphPlugin;
40use surfpool_types::{
41 BlockProductionMode, ClockCommand, ClockEvent, DEFAULT_MAINNET_RPC_URL, DataIndexingCommand,
42 SimnetCommand, SimnetConfig, SimnetEvent, SubgraphCommand, SubgraphPluginConfig,
43 SurfpoolConfig,
44};
45type PluginConstructor = unsafe fn() -> *mut dyn GeyserPlugin;
46use txtx_addon_kit::helpers::fs::FileLocation;
47
48use crate::{
49 PluginManagerCommand,
50 rpc::{
51 self, RunloopContext, SurfpoolMiddleware, SurfpoolWebsocketMeta,
52 SurfpoolWebsocketMiddleware, accounts_data::AccountsData, accounts_scan::AccountsScan,
53 admin::AdminRpc, bank_data::BankData, full::Full, minimal::Minimal,
54 surfnet_cheatcodes::SurfnetCheatcodes, ws::Rpc,
55 },
56 surfnet::{GeyserEvent, locker::SurfnetSvmLocker, remote::SurfnetRemoteClient},
57};
58
59const BLOCKHASH_SLOT_TTL: u64 = 75;
60
61fn check_port_availability(addr: SocketAddr, server_type: &str) -> Result<(), String> {
63 match std::net::TcpListener::bind(addr) {
64 Ok(_listener) => Ok(()),
65 Err(e) if e.kind() == std::io::ErrorKind::AddrInUse => {
66 let msg = format!(
67 "{} port {} is already in use. Try --port or --ws-port to use a different port.",
68 server_type,
69 addr.port()
70 );
71 eprintln!("Error: {}", msg);
72 Err(msg)
73 }
74 Err(e) => {
75 let msg = format!("Failed to bind {} server to {}: {}", server_type, addr, e);
76 eprintln!("Error: {}", msg);
77 Err(msg)
78 }
79 }
80}
81
82pub async fn start_local_surfnet_runloop(
83 svm_locker: SurfnetSvmLocker,
84 config: SurfpoolConfig,
85 subgraph_commands_tx: Sender<SubgraphCommand>,
86 simnet_commands_tx: Sender<SimnetCommand>,
87 simnet_commands_rx: Receiver<SimnetCommand>,
88 geyser_events_rx: Receiver<GeyserEvent>,
89) -> Result<(), Box<dyn std::error::Error>> {
90 let Some(simnet) = config.simnets.first() else {
91 return Ok(());
92 };
93 let block_production_mode = simnet.block_production_mode.clone();
94
95 let remote_rpc_client = match simnet.offline_mode {
96 true => None,
97 false => SurfnetRemoteClient::new_unsafe(
98 simnet
99 .remote_rpc_url
100 .as_ref()
101 .unwrap_or(&DEFAULT_MAINNET_RPC_URL.to_string()),
102 ),
103 };
104
105 svm_locker
106 .initialize(
107 simnet.slot_time,
108 &remote_rpc_client,
109 simnet.instruction_profiling_enabled,
110 simnet.log_bytes_limit,
111 )
112 .await?;
113
114 svm_locker.airdrop_pubkeys(simnet.airdrop_token_amount, &simnet.airdrop_addresses);
115
116 if !simnet.snapshot.is_empty() {
118 match svm_locker
119 .load_snapshot(
120 &simnet.snapshot,
121 remote_rpc_client.as_ref(),
122 CommitmentConfig::confirmed(),
123 )
124 .await
125 {
126 Ok(loaded_count) => {
127 let _ = svm_locker.with_svm_reader(|svm| {
128 svm.simnet_events_tx.send(SimnetEvent::info(format!(
129 "Preloaded {} accounts from snapshot(s) into SVM",
130 loaded_count
131 )))
132 });
133 }
134 Err(e) => {
135 let _ = svm_locker.with_svm_reader(|svm| {
136 svm.simnet_events_tx.send(SimnetEvent::warn(format!(
137 "Error loading snapshot accounts: {}",
138 e
139 )))
140 });
141 }
142 }
143 }
144
145 let simnet_events_tx_cc = svm_locker.simnet_events_tx();
146
147 let (plugin_manager_commands_rx, _rpc_handle, _ws_handle) = start_rpc_servers_runloop(
148 &config,
149 &simnet_commands_tx,
150 svm_locker.clone(),
151 &remote_rpc_client,
152 )
153 .await?;
154
155 let simnet_config = simnet.clone();
156
157 match start_geyser_runloop(
158 config.plugin_config_path.clone(),
159 plugin_manager_commands_rx,
160 subgraph_commands_tx.clone(),
161 simnet_events_tx_cc.clone(),
162 geyser_events_rx,
163 ) {
164 Ok(_) => {}
165 Err(e) => {
166 let _ =
167 simnet_events_tx_cc.send(SimnetEvent::error(format!("Geyser plugin failed: {e}")));
168 }
169 };
170
171 let (clock_event_rx, clock_command_tx) =
172 start_clock_runloop(simnet_config.slot_time, Some(simnet_events_tx_cc.clone()));
173
174 let initial_transaction_count = svm_locker.with_svm_reader(|svm| {
176 let iter_result = svm.transactions.into_iter();
177 let mut count: u64 = 0;
178
179 if let Ok(iter) = iter_result {
180 let mut events = vec![];
181 for (_, status) in iter {
182 if let Some((tx_meta, _updated_accounts)) = status.as_processed() {
183 let signature = tx_meta.transaction.signatures[0];
184 let err = tx_meta.meta.status.clone().err();
185
186 let meta = surfpool_types::TransactionMetadata {
188 signature,
189 logs: tx_meta.meta.log_messages.clone().unwrap_or_default(),
190 inner_instructions: tx_meta
191 .meta
192 .inner_instructions
193 .clone()
194 .unwrap_or_default()
195 .into_iter()
196 .map(|inner_ixs| {
197 inner_ixs
198 .instructions
199 .into_iter()
200 .map(|ix| solana_message::inner_instruction::InnerInstruction {
201 instruction: ix.instruction,
202 stack_height: ix.stack_height.unwrap_or(1) as u8,
203 })
204 .collect()
205 })
206 .collect(),
207 compute_units_consumed: tx_meta.meta.compute_units_consumed.unwrap_or(0),
208 return_data: tx_meta.meta.return_data.clone().unwrap_or_default(),
209 fee: tx_meta.meta.fee,
210 };
211
212 events.push((
213 tx_meta.slot,
214 SimnetEvent::TransactionProcessed(Local::now(), meta, err.clone()),
215 ));
216
217 count += 1;
218 }
219 }
220 for (_, event) in events
221 .into_iter()
222 .sorted_by(|(a_slot, _), (b_slot, _)| a_slot.cmp(b_slot))
223 {
224 let _ = svm.simnet_events_tx.send(event);
225 }
226 }
227
228 count
229 });
230 let _ = simnet_events_tx_cc.send(SimnetEvent::Ready(initial_transaction_count));
231
232 let _ = svm_locker.with_svm_reader(|svm| svm.geyser_events_tx.send(GeyserEvent::EndOfStartup));
234
235 start_block_production_runloop(
236 clock_event_rx,
237 clock_command_tx,
238 simnet_commands_rx,
239 simnet_commands_tx.clone(),
240 svm_locker,
241 block_production_mode,
242 &remote_rpc_client,
243 simnet_config.expiry.map(|e| e * 1000),
244 &simnet_config,
245 )
246 .await
247}
248
249#[allow(clippy::too_many_arguments)]
250pub async fn start_block_production_runloop(
251 clock_event_rx: Receiver<ClockEvent>,
252 clock_command_tx: Sender<ClockCommand>,
253 simnet_commands_rx: Receiver<SimnetCommand>,
254 simnet_commands_tx: Sender<SimnetCommand>,
255 svm_locker: SurfnetSvmLocker,
256 mut block_production_mode: BlockProductionMode,
257 remote_rpc_client: &Option<SurfnetRemoteClient>,
258 expiry_duration_ms: Option<u64>,
259 simnet_config: &SimnetConfig,
260) -> Result<(), Box<dyn std::error::Error>> {
261 let remote_client_with_commitment = remote_rpc_client.as_ref().map(|c| {
262 (
263 c.clone(),
264 solana_commitment_config::CommitmentConfig::confirmed(),
265 )
266 });
267 let mut next_scheduled_expiry_check: Option<u64> =
268 expiry_duration_ms.map(|expiry_val| Utc::now().timestamp_millis() as u64 + expiry_val);
269 let global_skip_sig_verify = simnet_config.skip_signature_verification;
270 loop {
271 let mut do_produce_block = false;
272
273 select! {
274 recv(clock_event_rx) -> msg => if let Ok(event) = msg {
275 match event {
276 ClockEvent::Tick => {
277 if block_production_mode.eq(&BlockProductionMode::Clock) {
278 do_produce_block = true;
279 }
280
281 if let Some(expiry_ms) = expiry_duration_ms {
282 if let Some(scheduled_time_ref) = &mut next_scheduled_expiry_check {
283 let now_ms = Utc::now().timestamp_millis() as u64;
284 if now_ms >= *scheduled_time_ref {
285 let svm = svm_locker.0.read().await;
286 if svm.updated_at + expiry_ms < now_ms {
287 let _ = simnet_commands_tx.send(SimnetCommand::Terminate(None));
288 } else {
289 *scheduled_time_ref = svm.updated_at + expiry_ms;
290 }
291 }
292 }
293 }
294 }
295 ClockEvent::ExpireBlockHash => {
296 do_produce_block = true;
297 }
298 }
299 },
300 recv(simnet_commands_rx) -> msg => if let Ok(event) = msg {
301 match event {
302 SimnetCommand::SlotForward(_key) => {
303 block_production_mode = BlockProductionMode::Manual;
304 do_produce_block = true;
305 }
306 SimnetCommand::SlotBackward(_key) => {
307
308 }
309 SimnetCommand::CommandClock(_, update) => {
310 if let ClockCommand::UpdateSlotInterval(updated_slot_time) = update {
311 svm_locker.with_svm_writer(|svm_writer| {
312 svm_writer.slot_time = updated_slot_time;
313 });
314 }
315
316 if let ClockCommand::PauseWithConfirmation(response_tx) = update {
318 let (current_slot, slot_time) = svm_locker.with_svm_reader(|svm_reader| {
320 (svm_reader.latest_epoch_info.absolute_slot, svm_reader.slot_time)
321 });
322
323 let _ = clock_command_tx.send(ClockCommand::Pause);
325
326 tokio::time::sleep(tokio::time::Duration::from_millis(slot_time / 2)).await;
328
329 let max_attempts = 10;
331 let mut attempts = 0;
332 loop {
333 tokio::time::sleep(tokio::time::Duration::from_millis(slot_time)).await;
334
335 let new_slot = svm_locker.with_svm_reader(|svm_reader| {
336 svm_reader.latest_epoch_info.absolute_slot
337 });
338
339 if new_slot == current_slot || attempts >= max_attempts {
341 break;
342 }
343
344 attempts += 1;
345 }
346
347 let epoch_info = svm_locker.with_svm_reader(|svm_reader| {
349 svm_reader.latest_epoch_info.clone()
350 });
351 let _ = response_tx.send(epoch_info);
353 } else {
354 let _ = clock_command_tx.send(update);
355 }
356 continue
357 }
358 SimnetCommand::UpdateInternalClock(_, clock) => {
359 if let Err(e) = svm_locker.confirm_current_block(&remote_client_with_commitment).await {
361 let _ = svm_locker.simnet_events_tx().send(SimnetEvent::error(format!(
362 "Failed to confirm block after time travel: {}", e
363 )));
364 }
365
366 svm_locker.with_svm_writer(|svm_writer| {
367 svm_writer.inner.set_sysvar(&clock);
368 svm_writer.updated_at = clock.unix_timestamp as u64 * 1_000;
369 svm_writer.latest_epoch_info.absolute_slot = clock.slot;
370 svm_writer.latest_epoch_info.epoch = clock.epoch;
371 svm_writer.latest_epoch_info.slot_index = clock.slot;
372 svm_writer.latest_epoch_info.epoch = clock.epoch;
373 svm_writer.latest_epoch_info.absolute_slot = clock.slot + clock.epoch * svm_writer.latest_epoch_info.slots_in_epoch;
374 let _ = svm_writer.simnet_events_tx.send(SimnetEvent::SystemClockUpdated(clock));
375 });
376 }
377 SimnetCommand::UpdateInternalClockWithConfirmation(_, clock, response_tx) => {
378 if let Err(e) = svm_locker.confirm_current_block(&remote_client_with_commitment).await {
380 let _ = svm_locker.simnet_events_tx().send(SimnetEvent::error(format!(
381 "Failed to confirm block after time travel: {}", e
382 )));
383 }
384
385 let epoch_info = svm_locker.with_svm_writer(|svm_writer| {
386 svm_writer.inner.set_sysvar(&clock);
387 svm_writer.updated_at = clock.unix_timestamp as u64 * 1_000;
388 svm_writer.latest_epoch_info.absolute_slot = clock.slot;
389 svm_writer.latest_epoch_info.epoch = clock.epoch;
390 svm_writer.latest_epoch_info.slot_index = clock.slot;
391 svm_writer.latest_epoch_info.epoch = clock.epoch;
392 svm_writer.latest_epoch_info.absolute_slot = clock.slot + clock.epoch * svm_writer.latest_epoch_info.slots_in_epoch;
393 let _ = svm_writer.simnet_events_tx.send(SimnetEvent::SystemClockUpdated(clock));
394 svm_writer.latest_epoch_info.clone()
395 });
396
397 let _ = response_tx.send(epoch_info);
399 }
400 SimnetCommand::UpdateBlockProductionMode(update) => {
401 block_production_mode = update;
402 continue
403 }
404 SimnetCommand::ProcessTransaction(_key, transaction, status_tx, skip_preflight, skip_sig_verify_override) => {
405 let skip_sig_verify = skip_sig_verify_override.unwrap_or(global_skip_sig_verify);
406 let sigverify = !skip_sig_verify;
407 if let Err(e) = svm_locker.process_transaction(&remote_client_with_commitment, transaction, status_tx, skip_preflight, sigverify).await {
408 let _ = svm_locker.simnet_events_tx().send(SimnetEvent::error(format!("Failed to process transaction: {}", e)));
409 }
410 if block_production_mode.eq(&BlockProductionMode::Transaction) {
411 do_produce_block = true;
412 }
413 }
414 SimnetCommand::Terminate(_) => {
415 svm_locker.shutdown();
417 break;
418 }
419 SimnetCommand::StartRunbookExecution(runbook_id) => {
420 svm_locker.start_runbook_execution(runbook_id);
421 }
422 SimnetCommand::CompleteRunbookExecution(runbook_id, error) => {
423 svm_locker.complete_runbook_execution(runbook_id, error);
424 }
425 SimnetCommand::FetchRemoteAccounts(pubkeys, remote_url) => {
426 let remote_client = SurfnetRemoteClient::new_unsafe(&remote_url);
427 if let Some(remote_client) = remote_client {
428 match svm_locker.get_multiple_accounts_with_remote_fallback(&remote_client, &pubkeys, CommitmentConfig::confirmed()).await {
429 Ok(account_updates) => {
430 svm_locker.write_multiple_account_updates(&account_updates.inner);
431 }
432 Err(e) => {
433 svm_locker.simnet_events_tx().try_send(SimnetEvent::error(format!("Failed to fetch remote accounts {:?}: {}", pubkeys, e))).ok();
434 }
435 };
436 }
437 }
438 SimnetCommand::AirdropProcessed => {
439 if block_production_mode.eq(&BlockProductionMode::Transaction) {
440 do_produce_block = true;
441 }
442 }
443 }
444 },
445 }
446
447 {
448 if do_produce_block {
449 svm_locker
450 .confirm_current_block(&remote_client_with_commitment)
451 .await?;
452 }
453 }
454 }
455 Ok(())
456}
457
458pub fn start_clock_runloop(
459 mut slot_time: u64,
460 simnet_events_tx: Option<Sender<SimnetEvent>>,
461) -> (Receiver<ClockEvent>, Sender<ClockCommand>) {
462 let (clock_event_tx, clock_event_rx) = unbounded::<ClockEvent>();
463 let (clock_command_tx, clock_command_rx) = unbounded::<ClockCommand>();
464
465 let _handle = hiro_system_kit::thread_named("clock").spawn(move || {
466 let mut enabled = true;
467 let mut block_hash_timeout = Instant::now();
468
469 loop {
470 match clock_command_rx.try_recv() {
471 Ok(ClockCommand::Pause) => {
472 enabled = false;
473 if let Some(ref simnet_events_tx) = simnet_events_tx {
474 let _ =
475 simnet_events_tx.send(SimnetEvent::ClockUpdate(ClockCommand::Pause));
476 }
477 }
478 Ok(ClockCommand::Resume) => {
479 enabled = true;
480 if let Some(ref simnet_events_tx) = simnet_events_tx {
481 let _ =
482 simnet_events_tx.send(SimnetEvent::ClockUpdate(ClockCommand::Resume));
483 }
484 }
485 Ok(ClockCommand::Toggle) => {
486 enabled = !enabled;
487 if let Some(ref simnet_events_tx) = simnet_events_tx {
488 let _ =
489 simnet_events_tx.send(SimnetEvent::ClockUpdate(ClockCommand::Toggle));
490 }
491 }
492 Ok(ClockCommand::UpdateSlotInterval(updated_slot_time)) => {
493 slot_time = updated_slot_time;
494 }
495 Ok(ClockCommand::PauseWithConfirmation(_)) => {
496 enabled = false;
499 if let Some(ref simnet_events_tx) = simnet_events_tx {
500 let _ =
501 simnet_events_tx.send(SimnetEvent::ClockUpdate(ClockCommand::Pause));
502 }
503 }
504 Err(_e) => {}
505 }
506 sleep(Duration::from_millis(slot_time));
507 if enabled {
508 let _ = clock_event_tx.send(ClockEvent::Tick);
509 if block_hash_timeout.elapsed()
511 > Duration::from_millis(BLOCKHASH_SLOT_TTL * slot_time)
512 {
513 let _ = clock_event_tx.send(ClockEvent::ExpireBlockHash);
514 block_hash_timeout = Instant::now();
515 }
516 }
517 }
518 });
519
520 (clock_event_rx, clock_command_tx)
521}
522
523fn start_geyser_runloop(
524 plugin_config_paths: Vec<PathBuf>,
525 plugin_manager_commands_rx: Receiver<PluginManagerCommand>,
526 subgraph_commands_tx: Sender<SubgraphCommand>,
527 simnet_events_tx: Sender<SimnetEvent>,
528 geyser_events_rx: Receiver<GeyserEvent>,
529) -> Result<JoinHandle<Result<(), String>>, String> {
530 let handle: JoinHandle<Result<(), String>> = hiro_system_kit::thread_named("Geyser Plugins Handler").spawn(move || {
531 let mut indexing_enabled = false;
532
533 #[cfg(feature = "geyser_plugin")]
534 let mut plugin_manager = GeyserPluginManager::new();
535 #[cfg(not(feature = "geyser_plugin"))]
536 let mut plugin_manager = ();
537
538 let mut surfpool_plugin_manager: Vec<Box<dyn GeyserPlugin>> = vec![];
539
540 let mut plugin_map: HashMap<crate::Uuid, (usize, String)> = HashMap::new();
542
543 let log_error = |msg:String|{
545 let _ = simnet_events_tx.send(SimnetEvent::error(msg));
546 };
547
548 let log_warn = |msg:String|{
549 let _ = simnet_events_tx.send(SimnetEvent::warn(msg));
550 };
551
552 let log_info = |msg:String|{
553 let _ = simnet_events_tx.send(SimnetEvent::info(msg));
554 };
555
556
557 #[cfg(feature = "geyser_plugin")]
558 for plugin_config_path in plugin_config_paths.into_iter() {
559 let plugin_manifest_location = FileLocation::from_path(plugin_config_path);
560 let config_file = plugin_manifest_location.read_content_as_utf8()?;
561 let result: serde_json::Value = match json5::from_str(&config_file) {
562 Ok(res) => res,
563 Err(e) => {
564 let error = format!("Unable to read manifest: {}", e);
565 let _ = simnet_events_tx.send(SimnetEvent::error(error.clone()));
566 return Err(error)
567 }
568 };
569
570 let plugin_dylib_path = match result.get("libpath").map(|p| p.as_str()) {
571 Some(Some(name)) => name,
572 _ => {
573 let error = format!("Plugin config file should include a 'libpath' field: {}", plugin_manifest_location);
574 let _ = simnet_events_tx.send(SimnetEvent::error(error.clone()));
575 return Err(error)
576 }
577 };
578
579 let mut plugin_dylib_location = plugin_manifest_location.get_parent_location().expect("path invalid");
580 plugin_dylib_location.append_path(&plugin_dylib_path).expect("path invalid");
581
582 let (plugin, lib) = unsafe {
583 let lib = match Library::new(&plugin_dylib_location.to_string()) {
584 Ok(lib) => lib,
585 Err(e) => {
586 log_error(format!("Unable to load plugin {}: {}", plugin_dylib_location.to_string(), e.to_string()));
587 continue;
588 }
589 };
590 let constructor: Symbol<PluginConstructor> = lib
591 .get(b"_create_plugin")
592 .map_err(|e| format!("{}", e.to_string()))?;
593 let plugin_raw = constructor();
594 (Box::from_raw(plugin_raw), lib)
595 };
596 indexing_enabled = true;
597
598 let mut plugin = LoadedGeyserPlugin::new(lib, plugin, None);
599 if let Err(e) = plugin.on_load(&plugin_manifest_location.to_string(), false) {
600 let error = format!("Unable to load plugin:: {}", e.to_string());
601 let _ = simnet_events_tx.send(SimnetEvent::error(error.clone()));
602 return Err(error)
603 }
604
605 plugin_manager.plugins.push(plugin);
606 }
607
608 let ipc_router = RouterProxy::new();
609
610 #[cfg(feature = "subgraph")]
612 let load_subgraph_plugin = |uuid: uuid::Uuid,
613 config: txtx_addon_network_svm_types::subgraph::PluginConfig,
614 notifier: crossbeam_channel::Sender<String>,
615 surfpool_plugin_manager: &mut Vec<Box<dyn GeyserPlugin>>,
616 plugin_map: &mut HashMap<uuid::Uuid, (usize, String)>,
617 indexing_enabled: &mut bool|
618 -> Result<(), String> {
619 if let Err(e) = subgraph_commands_tx.send(SubgraphCommand::CreateCollection(
620 uuid,
621 config.data.clone(),
622 notifier,
623 )){
624 return Err(format!("Failed to send CreateCollection command: {:?}", e));
625 };
626
627 let mut plugin = SurfpoolSubgraphPlugin::default();
628
629 let (server, ipc_token) =
630 IpcOneShotServer::<IpcReceiver<DataIndexingCommand>>::new()
631 .expect("Failed to create IPC one-shot server.");
632 let subgraph_plugin_config = SubgraphPluginConfig {
633 uuid,
634 ipc_token,
635 subgraph_request: config.data.clone(),
636 };
637
638 let config_file = serde_json::to_string(&subgraph_plugin_config)
639 .map_err(|e| format!("Failed to serialize subgraph plugin config: {:?}", e))?;
640
641 plugin
642 .on_load(&config_file, false)
643 .map_err(|e| format!("Failed to load Geyser plugin: {:?}", e))?;
644
645 match server.accept() {
646 Ok((_, rx)) => {
647 let subgraph_rx = ipc_router
648 .route_ipc_receiver_to_new_crossbeam_receiver::<DataIndexingCommand>(rx);
649 if let Err(e) = subgraph_commands_tx.send(SubgraphCommand::ObserveCollection(subgraph_rx)) {
650 return Err(format!("Failed to send ObserveCollection command: {:?}", e));
651 }
652 }
653 Err(e) => {
654 return Err(format!("Failed to accept IPC connection for subgraph {}: {:?}", uuid, e));
655 }
656 };
657
658 *indexing_enabled = true;
659
660 let plugin: Box<dyn GeyserPlugin> = Box::new(plugin);
661 let plugin_index = surfpool_plugin_manager.len();
662 surfpool_plugin_manager.push(plugin);
663 plugin_map.insert(uuid, (plugin_index, config.plugin_name.to_string()));
664
665 Ok(())
666 };
667
668 #[cfg(feature = "subgraph")]
670 let unload_plugin_by_uuid = |uuid: uuid::Uuid,
671 surfpool_plugin_manager: &mut Vec<Box<dyn GeyserPlugin>>,
672 plugin_map: &mut HashMap<uuid::Uuid, (usize, String)>,
673 indexing_enabled: &mut bool|
674 -> Result<(), String> {
675 let plugin_index = plugin_map
676 .get(&uuid)
677 .ok_or_else(|| format!("Plugin {} not found", uuid))?
678 .0;
679
680 if plugin_index >= surfpool_plugin_manager.len() {
681 return Err(format!("Plugin index {} out of bounds", plugin_index));
682 }
683
684 if let Err(e) = subgraph_commands_tx.send(SubgraphCommand::DestroyCollection(uuid)){
686 return Err(format!("Failed to send DestroyCollection command for {}: {:?}", uuid, e));
687 }
688
689 surfpool_plugin_manager[plugin_index].on_unload();
691
692 surfpool_plugin_manager.remove(plugin_index);
694 plugin_map.remove(&uuid);
695
696 for (index, _) in plugin_map.values_mut() {
698 if *index > plugin_index {
699 *index -= 1;
700 }
701 }
702
703 if surfpool_plugin_manager.is_empty() {
705 *indexing_enabled = false;
706 log_info("All plugins unloaded,indexing disabled".to_string())
708 }
709
710 Ok(())
711 };
712
713 let err = loop {
714 use agave_geyser_plugin_interface::geyser_plugin_interface::{ReplicaAccountInfoV3, ReplicaAccountInfoVersions};
715
716 use crate::types::GeyserAccountUpdate;
717
718 select! {
719 recv(plugin_manager_commands_rx) -> msg => {
720 match msg {
721 Ok(event) => {
722 match event {
723 #[cfg(not(feature = "subgraph"))]
724 PluginManagerCommand::LoadConfig(_, _, _) => {
725 continue;
726 }
727 #[cfg(feature = "subgraph")]
728 PluginManagerCommand::LoadConfig(uuid, config, notifier) => {
729 if let Err(e) = load_subgraph_plugin(uuid, config, notifier, &mut surfpool_plugin_manager, &mut plugin_map, &mut indexing_enabled) {
730 let _ = simnet_events_tx.send(SimnetEvent::error(format!("Failed to load plugin: {}", e)));
731 }
732 }
733 #[cfg(not(feature = "subgraph"))]
734 PluginManagerCommand::UnloadPlugin(_, _) => {
735 continue;
736 }
737 #[cfg(feature = "subgraph")]
738 PluginManagerCommand::UnloadPlugin(uuid, notifier) => {
739 match unload_plugin_by_uuid(uuid, &mut surfpool_plugin_manager, &mut plugin_map, &mut indexing_enabled) {
740 Ok(_)=>{
741 log_info(format!("Successfully unloaded plugin with UUID {}", uuid));
742 let _ = notifier.send(Ok(()));
743 }
744 Err(e)=>{
745 log_error(format!("Failed to unload plugin {}: {}", uuid, e));
746 let _ = notifier.send(Err(e));
747 }
748 }
749 }
750 #[cfg(not(feature = "subgraph"))]
751 PluginManagerCommand::ReloadPlugin(_, _, _) => {
752 continue;
753 }
754 #[cfg(feature = "subgraph")]
755 PluginManagerCommand::ReloadPlugin(uuid, config, notifier) => {
756 match unload_plugin_by_uuid(uuid, &mut surfpool_plugin_manager, &mut plugin_map, &mut indexing_enabled) {
758 Ok(_)=>{
759 log_info(format!("Unloaded plugin with UUID {} for reload", uuid));
760
761 match load_subgraph_plugin(uuid, config, notifier.clone(), &mut surfpool_plugin_manager, &mut plugin_map, &mut indexing_enabled) {
763 Ok(_)=>{
764 log_info(format!("Successfully reloaded plugin with UUID {}", uuid));
765 let _ = notifier.send(format!("Plugin {} reloaded successfully", uuid));
766 }
767 Err(e)=>{
768 let error_msg = format!("Failed to reload plugin {}: {}", uuid, e);
769 log_error(error_msg.clone());
770 let _ = notifier.send(error_msg);
771 }
772 }
773 }
774 Err(e)=>{
775 let error_msg = format!("Failed to unload plugin {} during reload: {}", uuid, e);
776 log_error(error_msg.clone());
777 let _ = notifier.send(error_msg);
778 }
779 }
780 }
781 PluginManagerCommand::ListPlugins(notifier) => {
782 let plugin_list: Vec<crate::PluginInfo> = plugin_map.iter().map(|(uuid, (_, plugin_name))| {
783 crate::PluginInfo {
784 plugin_name: plugin_name.clone(),
785 uuid: uuid.to_string(),
786 }
787 }).collect();
788 let _ = notifier.send(plugin_list);
789 }
790 }
791 },
792 Err(e) => {
793 break format!("Failed to read plugin manager command: {:?}", e);
794 },
795 }
796 },
797 recv(geyser_events_rx) -> msg => match msg {
798 Err(e) => {
799 break format!("Failed to read new transaction to send to Geyser plugin: {e}");
800 },
801 Ok(GeyserEvent::NotifyTransaction(transaction_with_status_meta, versioned_transaction)) => {
802
803 if !indexing_enabled {
804 continue;
805 }
806
807 let transaction = match versioned_transaction {
808 Some(tx) => tx,
809 None => {
810 log_warn("Unable to index sanitized transaction".to_string());
811 continue;
812 }
813 };
814
815 let transaction_replica = ReplicaTransactionInfoV3 {
816 signature: &transaction.signatures[0],
817 is_vote: false,
818 transaction: &transaction,
819 transaction_status_meta: &transaction_with_status_meta.meta,
820 index: 0,
821 message_hash: &transaction.message.hash(),
822 };
823
824 for plugin in surfpool_plugin_manager.iter() {
825 if let Err(e) = plugin.notify_transaction(ReplicaTransactionInfoVersions::V0_0_3(&transaction_replica), transaction_with_status_meta.slot) {
826 log_error(format!("Failed to notify Geyser plugin of new transaction: {:?}", e))
827 };
828 }
829
830 #[cfg(feature = "geyser_plugin")]
831 for plugin in plugin_manager.plugins.iter() {
832 if let Err(e) = plugin.notify_transaction(ReplicaTransactionInfoVersions::V0_0_3(&transaction_replica), transaction_with_status_meta.slot) {
833 log_error(format!("Failed to notify Geyser plugin of new transaction: {:?}", e))
834 };
835 }
836 }
837 Ok(GeyserEvent::UpdateAccount(account_update)) => {
838 let GeyserAccountUpdate {
839 pubkey,
840 account,
841 slot,
842 sanitized_transaction,
843 write_version,
844 } = account_update;
845
846 let account_replica = ReplicaAccountInfoV3 {
847 pubkey: pubkey.as_ref(),
848 lamports: account.lamports,
849 owner: account.owner.as_ref(),
850 executable: account.executable,
851 rent_epoch: account.rent_epoch,
852 data: account.data.as_ref(),
853 write_version,
854 txn: sanitized_transaction.as_ref(),
855 };
856
857 for plugin in surfpool_plugin_manager.iter() {
858 if let Err(e) = plugin.update_account(ReplicaAccountInfoVersions::V0_0_3(&account_replica), slot, false) {
859 log_error(format!("Failed to update account in Geyser plugin: {:?}", e));
860 }
861 }
862
863 #[cfg(feature = "geyser_plugin")]
864 for plugin in plugin_manager.plugins.iter() {
865 if let Err(e) = plugin.update_account(ReplicaAccountInfoVersions::V0_0_3(&account_replica), slot, false) {
866 log_error(format!("Failed to update account in Geyser plugin: {:?}", e))
867 }
868 }
869 }
870 Ok(GeyserEvent::StartupAccountUpdate(account_update)) => {
871 let GeyserAccountUpdate {
872 pubkey,
873 account,
874 slot,
875 sanitized_transaction,
876 write_version,
877 } = account_update;
878
879 let account_replica = ReplicaAccountInfoV3 {
880 pubkey: pubkey.as_ref(),
881 lamports: account.lamports,
882 owner: account.owner.as_ref(),
883 executable: account.executable,
884 rent_epoch: account.rent_epoch,
885 data: account.data.as_ref(),
886 write_version,
887 txn: sanitized_transaction.as_ref(),
888 };
889
890 for plugin in surfpool_plugin_manager.iter() {
892 if let Err(e) = plugin.update_account(ReplicaAccountInfoVersions::V0_0_3(&account_replica), slot, true) {
893 log_error(format!("Failed to send startup account update to Geyser plugin: {:?}", e));
894 }
895 }
896
897 #[cfg(feature = "geyser_plugin")]
898 for plugin in plugin_manager.plugins.iter() {
899 if let Err(e) = plugin.update_account(ReplicaAccountInfoVersions::V0_0_3(&account_replica), slot, true) {
900 log_error(format!("Failed to send startup account update to Geyser plugin: {:?}", e))
901 }
902 }
903 }
904 Ok(GeyserEvent::EndOfStartup) => {
905 for plugin in surfpool_plugin_manager.iter() {
906 if let Err(e) = plugin.notify_end_of_startup() {
907 let _ = simnet_events_tx.send(SimnetEvent::error(format!("Failed to notify end of startup to Geyser plugin: {:?}", e)));
908 }
909 }
910
911 #[cfg(feature = "geyser_plugin")]
912 for plugin in plugin_manager.plugins.iter() {
913 if let Err(e) = plugin.notify_end_of_startup() {
914 let _ = simnet_events_tx.send(SimnetEvent::error(format!("Failed to notify end of startup to Geyser plugin: {:?}", e)));
915 }
916 }
917 }
918 Ok(GeyserEvent::UpdateSlotStatus { slot, parent, status }) => {
919 let slot_status = match status {
920 crate::surfnet::GeyserSlotStatus::Processed => SlotStatus::Processed,
921 crate::surfnet::GeyserSlotStatus::Confirmed => SlotStatus::Confirmed,
922 crate::surfnet::GeyserSlotStatus::Rooted => SlotStatus::Rooted,
923 };
924
925 for plugin in surfpool_plugin_manager.iter() {
926 if let Err(e) = plugin.update_slot_status(slot, parent, &slot_status) {
927 let _ = simnet_events_tx.send(SimnetEvent::error(format!("Failed to update slot status in Geyser plugin: {:?}", e)));
928 }
929 }
930
931 #[cfg(feature = "geyser_plugin")]
932 for plugin in plugin_manager.plugins.iter() {
933 if let Err(e) = plugin.update_slot_status(slot, parent, &slot_status) {
934 let _ = simnet_events_tx.send(SimnetEvent::error(format!("Failed to update slot status in Geyser plugin: {:?}", e)));
935 }
936 }
937 }
938 Ok(GeyserEvent::NotifyBlockMetadata(block_metadata)) => {
939 let rewards = RewardsAndNumPartitions {
940 rewards: vec![],
941 num_partitions: None,
942 };
943
944 let block_info = ReplicaBlockInfoV4 {
945 slot: block_metadata.slot,
946 blockhash: &block_metadata.blockhash,
947 rewards: &rewards,
948 block_time: block_metadata.block_time,
949 block_height: block_metadata.block_height,
950 parent_slot: block_metadata.parent_slot,
951 parent_blockhash: &block_metadata.parent_blockhash,
952 executed_transaction_count: block_metadata.executed_transaction_count,
953 entry_count: block_metadata.entry_count,
954 };
955
956 for plugin in surfpool_plugin_manager.iter() {
957 if let Err(e) = plugin.notify_block_metadata(ReplicaBlockInfoVersions::V0_0_4(&block_info)) {
958 let _ = simnet_events_tx.send(SimnetEvent::error(format!("Failed to notify block metadata to Geyser plugin: {:?}", e)));
959 }
960 }
961
962 #[cfg(feature = "geyser_plugin")]
963 for plugin in plugin_manager.plugins.iter() {
964 if let Err(e) = plugin.notify_block_metadata(ReplicaBlockInfoVersions::V0_0_4(&block_info)) {
965 let _ = simnet_events_tx.send(SimnetEvent::error(format!("Failed to notify block metadata to Geyser plugin: {:?}", e)));
966 }
967 }
968 }
969 Ok(GeyserEvent::NotifyEntry(entry_info)) => {
970 let entry_replica = ReplicaEntryInfoV2 {
971 slot: entry_info.slot,
972 index: entry_info.index,
973 num_hashes: entry_info.num_hashes,
974 hash: &entry_info.hash,
975 executed_transaction_count: entry_info.executed_transaction_count,
976 starting_transaction_index: entry_info.starting_transaction_index,
977 };
978
979 for plugin in surfpool_plugin_manager.iter() {
980 if let Err(e) = plugin.notify_entry(ReplicaEntryInfoVersions::V0_0_2(&entry_replica)) {
981 let _ = simnet_events_tx.send(SimnetEvent::error(format!("Failed to notify entry to Geyser plugin: {:?}", e)));
982 }
983 }
984
985 #[cfg(feature = "geyser_plugin")]
986 for plugin in plugin_manager.plugins.iter() {
987 if let Err(e) = plugin.notify_entry(ReplicaEntryInfoVersions::V0_0_2(&entry_replica)) {
988 let _ = simnet_events_tx.send(SimnetEvent::error(format!("Failed to notify entry to Geyser plugin: {:?}", e)));
989 }
990 }
991 }
992 }
993 }
994 };
995 Err(err)
996 }).map_err(|e| format!("Failed to spawn Geyser Plugins Handler thread: {:?}", e))?;
997 Ok(handle)
998}
999
1000async fn start_rpc_servers_runloop(
1001 config: &SurfpoolConfig,
1002 simnet_commands_tx: &Sender<SimnetCommand>,
1003 svm_locker: SurfnetSvmLocker,
1004 remote_rpc_client: &Option<SurfnetRemoteClient>,
1005) -> Result<
1006 (
1007 Receiver<PluginManagerCommand>,
1008 JoinHandle<()>,
1009 JoinHandle<()>,
1010 ),
1011 String,
1012> {
1013 let rpc_addr: SocketAddr = config
1014 .rpc
1015 .get_rpc_base_url()
1016 .parse()
1017 .map_err(|e: std::net::AddrParseError| e.to_string())?;
1018 let ws_addr: SocketAddr = config
1019 .rpc
1020 .get_ws_base_url()
1021 .parse()
1022 .map_err(|e: std::net::AddrParseError| e.to_string())?;
1023
1024 check_port_availability(rpc_addr, "RPC")?;
1025 check_port_availability(ws_addr, "WebSocket")?;
1026
1027 let (plugin_manager_commands_tx, plugin_manager_commands_rx) = unbounded();
1028 let simnet_events_tx = svm_locker.simnet_events_tx();
1029
1030 let middleware = SurfpoolMiddleware::new(
1031 svm_locker,
1032 simnet_commands_tx,
1033 &plugin_manager_commands_tx,
1034 &config.rpc,
1035 remote_rpc_client,
1036 );
1037
1038 let rpc_handle =
1039 start_http_rpc_server_runloop(config, middleware.clone(), simnet_events_tx.clone()).await?;
1040 let ws_handle = start_ws_rpc_server_runloop(config, middleware, simnet_events_tx).await?;
1041 Ok((plugin_manager_commands_rx, rpc_handle, ws_handle))
1042}
1043
1044async fn start_http_rpc_server_runloop(
1045 config: &SurfpoolConfig,
1046 middleware: SurfpoolMiddleware,
1047 simnet_events_tx: Sender<SimnetEvent>,
1048) -> Result<JoinHandle<()>, String> {
1049 let server_bind: SocketAddr = config
1050 .rpc
1051 .get_rpc_base_url()
1052 .parse::<SocketAddr>()
1053 .map_err(|e| e.to_string())?;
1054
1055 let mut io = MetaIoHandler::with_middleware(middleware);
1056 io.extend_with(rpc::minimal::SurfpoolMinimalRpc.to_delegate());
1057 io.extend_with(rpc::full::SurfpoolFullRpc.to_delegate());
1058 io.extend_with(rpc::accounts_data::SurfpoolAccountsDataRpc.to_delegate());
1059 io.extend_with(rpc::accounts_scan::SurfpoolAccountsScanRpc.to_delegate());
1060 io.extend_with(rpc::bank_data::SurfpoolBankDataRpc.to_delegate());
1061 io.extend_with(rpc::surfnet_cheatcodes::SurfnetCheatcodesRpc.to_delegate());
1062 io.extend_with(rpc::admin::SurfpoolAdminRpc.to_delegate());
1063
1064 if !config.plugin_config_path.is_empty() {
1065 io.extend_with(rpc::admin::SurfpoolAdminRpc.to_delegate());
1066 }
1067
1068 let _handle = hiro_system_kit::thread_named("RPC Handler")
1069 .spawn(move || {
1070 let server = match ServerBuilder::new(io)
1071 .cors(DomainsValidation::Disabled)
1072 .threads(6)
1073 .max_request_body_size(15 * 1024 * 1024)
1074 .start_http(&server_bind)
1075 {
1076 Ok(server) => server,
1077 Err(e) => {
1078 let _ = simnet_events_tx.send(SimnetEvent::Aborted(format!(
1079 "Failed to start RPC server: {:?}",
1080 e
1081 )));
1082 return;
1083 }
1084 };
1085
1086 server.wait();
1087 let _ = simnet_events_tx.send(SimnetEvent::Shutdown);
1088 })
1089 .map_err(|e| format!("Failed to spawn RPC Handler thread: {:?}", e))?;
1090
1091 Ok(_handle)
1092}
1093async fn start_ws_rpc_server_runloop(
1094 config: &SurfpoolConfig,
1095 middleware: SurfpoolMiddleware,
1096 simnet_events_tx: Sender<SimnetEvent>,
1097) -> Result<JoinHandle<()>, String> {
1098 let ws_server_bind: SocketAddr = config
1099 .rpc
1100 .get_ws_base_url()
1101 .parse::<SocketAddr>()
1102 .map_err(|e| e.to_string())?;
1103
1104 let uid = std::sync::atomic::AtomicUsize::new(0);
1105 let ws_middleware = SurfpoolWebsocketMiddleware::new(middleware.clone(), None);
1106
1107 let mut rpc_io = PubSubHandler::new(MetaIoHandler::with_middleware(ws_middleware));
1108
1109 let _ws_handle = hiro_system_kit::thread_named("WebSocket RPC Handler")
1110 .spawn(move || {
1111 let runtime = tokio::runtime::Builder::new_multi_thread()
1113 .enable_all()
1114 .build()
1115 .expect("Failed to build Tokio runtime");
1116
1117 let tokio_handle = runtime.handle();
1118 rpc_io.extend_with(
1119 rpc::ws::SurfpoolWsRpc {
1120 uid,
1121 signature_subscription_map: Arc::new(RwLock::new(HashMap::new())),
1122 account_subscription_map: Arc::new(RwLock::new(HashMap::new())),
1123 program_subscription_map: Arc::new(RwLock::new(HashMap::new())),
1124 slot_subscription_map: Arc::new(RwLock::new(HashMap::new())),
1125 logs_subscription_map: Arc::new(RwLock::new(HashMap::new())),
1126 snapshot_subscription_map: Arc::new(RwLock::new(HashMap::new())),
1127 tokio_handle: tokio_handle.clone(),
1128 }
1129 .to_delegate(),
1130 );
1131 runtime.block_on(async move {
1132 let server = match WsServerBuilder::new(rpc_io)
1133 .session_meta_extractor(move |ctx: &RequestContext| {
1134 let runloop_context = RunloopContext {
1136 id: None,
1137 svm_locker: middleware.surfnet_svm.clone(),
1138 simnet_commands_tx: middleware.simnet_commands_tx.clone(),
1139 plugin_manager_commands_tx: middleware
1140 .plugin_manager_commands_tx
1141 .clone(),
1142 remote_rpc_client: middleware.remote_rpc_client.clone(),
1143 rpc_config: middleware.config.clone(),
1144 };
1145 Some(SurfpoolWebsocketMeta::new(
1146 runloop_context,
1147 Some(Arc::new(Session::new(ctx.sender()))),
1148 ))
1149 })
1150 .start(&ws_server_bind)
1151 {
1152 Ok(server) => server,
1153 Err(e) => {
1154 let _ = simnet_events_tx.send(SimnetEvent::Aborted(format!(
1155 "Failed to start WebSocket RPC server: {:?}",
1156 e
1157 )));
1158 return;
1159 }
1160 };
1161 tokio::task::spawn_blocking(move || {
1163 server.wait().unwrap();
1164 })
1165 .await
1166 .ok();
1167
1168 let _ = simnet_events_tx.send(SimnetEvent::Shutdown);
1169 });
1170 })
1171 .map_err(|e| format!("Failed to spawn WebSocket RPC Handler thread: {:?}", e))?;
1172 Ok(_ws_handle)
1173}